From b87540d99d7b1f83428c55259d2114518f4211a7 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 16 Jul 2022 04:13:02 +0000 Subject: [PATCH] better archive split --- Cargo.lock | 9 +- TODO.md | 18 ++-- web3-proxy/Cargo.toml | 6 +- web3-proxy/src/app.rs | 196 +++++++++++++++++++++++++++++----- web3-proxy/src/connections.rs | 28 +++-- 5 files changed, 208 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dad90cd9..4e09fdfb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -180,9 +180,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.5.12" +version = "0.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d16705af05732b7d3258ec0f7b73c03a658a28925e050d8852d5b568ee8bcf4e" +checksum = "6b9496f0c1d1afb7a2af4338bbe1d969cddfead41d87a9fb3aaa6d0bbc7af648" dependencies = [ "async-trait", "axum-core", @@ -3835,10 +3835,11 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.19.2" +version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c51a52ed6686dd62c320f9b89299e9dfb46f730c7a48e635c19f21d116cb1439" +checksum = "57aec3cfa4c296db7255446efb4928a6be304b431a806216105542a67b6ca82e" dependencies = [ + "autocfg", "bytes", "libc", "memchr", diff --git a/TODO.md b/TODO.md index fcaf4337..430bc32f 100644 --- a/TODO.md +++ b/TODO.md @@ -36,11 +36,10 @@ - [x] rpc errors propagate too far. one subscription failing ends the app. isolate the providers more (might already be fixed) - [x] incoming rate limiting (by ip) - [x] connection pool for redis -- [ ] automatically route to archive server when necessary +- [x] automatically route to archive server when necessary - originally, no processing was done to params; they were just serde_json::RawValue. this is probably fastest, but we need to look for "latest" and count elements, so we have to use serde_json::Value - - when getting the next server, filtering on "archive" isn't going to work well. - - [ ] we need a list of "Arc" just of archive servers. we can route to any of them even if they are behind by many blocks - - [ ] if the requested block is ahead of the best block, return without querying any backend servers + - when getting the next server, filtering on "archive" isn't going to work well. need to check inner instead +- [ ] if the requested block is ahead of the best block, return without querying any backend servers - [ ] handle log subscriptions - [ ] basic request method stats - [x] http servers should check block at the very start @@ -61,10 +60,12 @@ - [ ] improved logging with useful instrumentation - [ ] add a subscription that returns the head block number and hash but nothing else - [ ] if we don't cache errors, then in-flight request caching is going to bottleneck + - i think now that we retry header not found and similar, caching errors should be fine - [ ] improve caching - - [ ] if the eth_call (or similar) params include a block, we can cache for longer - - [ ] if the call is something simple like "symbol" or "decimals", cache that too - - [ ] when we receive a block, we should store it for later eth_getBlockByNumber and similar calls + - [ ] if the eth_call (or similar) params include a block, we can cache for that. if its archive-age, itshould be fine to cache by number instead of hash + - [ ] add a "recent hashes" to synced connections with 64 parent blocks (maybe 128) + - [ ] if the call is something simple like "symbol" or "decimals", cache that too. though i think this could bite us +- [ ] when we receive a block, we should store it for later eth_getBlockByNumber and similar calls - [x] eth_blockNumber without a backend request - [ ] if a rpc fails to connect at start, retry later instead of skipping it forever - [ ] inspect any jsonrpc errors. if its something like "header not found" or "block with id $x not found" retry on another node (and add a negative score to that server) @@ -135,3 +136,6 @@ in another repo: event subscriber - [ ] flashbots protect fast mode or not? probably fast matches most user's needs, but no reverts is nice. - [ ] https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#authentication maybe have per-user keys. or pass their header on if its set - [ ] if no redis set, but public rate limits are set, exit with an error +- [ ] i saw "WebSocket connection closed unexpectedly" but no auto reconnect. need better logs on these +- [ ] if archive servers are added to the rotation while they are still syncing, they might get requests too soon. keep archive servers out of the configs until they are done syncing. full nodes should be fine to add to the configs even while syncing, though its a wasted connection +- [ ] when under load, i'm seeing "http interval lagging!". sometimes it happens when not loaded. diff --git a/web3-proxy/Cargo.toml b/web3-proxy/Cargo.toml index 978e09cf..7c2883c9 100644 --- a/web3-proxy/Cargo.toml +++ b/web3-proxy/Cargo.toml @@ -13,7 +13,7 @@ deadlock_detection = ["parking_lot/deadlock_detection"] anyhow = { version = "1.0.58", features = ["backtrace"] } arc-swap = "1.5.0" argh = "0.1.8" -axum = { version = "0.5.12", features = ["serde_json", "tokio-tungstenite", "ws"] } +axum = { version = "0.5.13", features = ["serde_json", "tokio-tungstenite", "ws"] } axum-client-ip = "0.2.0" counter = "0.5.5" dashmap = "5.3.4" @@ -34,10 +34,10 @@ proctitle = "0.1.1" regex = "1.6.0" reqwest = { version = "0.11.11", default-features = false, features = ["json", "tokio-rustls"] } rustc-hash = "1.1.0" -siwe = "0.3" +siwe = "0.3.0" serde = { version = "1.0.139", features = [] } serde_json = { version = "1.0.82", default-features = false, features = ["alloc", "raw_value"] } -tokio = { version = "1.19.2", features = ["full", "tracing"] } +tokio = { version = "1.20.0", features = ["full", "tracing"] } toml = "0.5.9" tracing = "0.1.35" # TODO: tracing-subscriber has serde and serde_json features that we might want to use diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 62453201..5209d894 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -1,7 +1,7 @@ use axum::extract::ws::Message; use dashmap::mapref::entry::Entry as DashMapEntry; use dashmap::DashMap; -use ethers::prelude::{Address, Bytes, Transaction}; +use ethers::prelude::{Address, BlockNumber, Bytes, Transaction}; use ethers::prelude::{Block, TxHash, H256}; use futures::future::Abortable; use futures::future::{join_all, AbortHandle}; @@ -79,34 +79,179 @@ pub async fn flatten_handles( Ok(()) } -fn is_archive_needed(method: &str, params: Option<&mut serde_json::Value>) -> bool { - match method { - "eth_call" => unimplemented!(), - "eth_getBalance" => unimplemented!(), - "eth_getCode" => unimplemented!(), - "eth_getLogs" => unimplemented!(), - "eth_getStorageAt" => unimplemented!(), - "eth_getTransactionByBlockHashAndIndex" => unimplemented!(), - "eth_getTransactionByBlockNumberAndIndex" => unimplemented!(), - "eth_getTransactionCount" => unimplemented!(), - "eth_getTransactionReceipt" => unimplemented!(), - "eth_getUncleByBlockHashAndIndex" => unimplemented!(), - "eth_getUncleByBlockNumberAndIndex" => unimplemented!(), - _ => { - return false; +fn value_to_block_num(x: &serde_json::Value) -> anyhow::Result { + let block_num = if x.is_string() { + BlockNumber::from_str(x.as_str().unwrap()).unwrap() + } else if x.is_number() { + BlockNumber::from(x.as_u64().unwrap()) + } else { + return Err(anyhow::anyhow!("unexpected BlockNumber")); + }; + + Ok(block_num) +} + +fn block_num_to_u64(block_num: BlockNumber, latest_block: u64) -> (bool, u64) { + match block_num { + BlockNumber::Earliest => (false, 0), + BlockNumber::Latest => { + // change "latest" to a number + (true, latest_block) } + BlockNumber::Number(x) => (false, x.as_u64()), + // TODO: think more about how to handle Pending + BlockNumber::Pending => (false, latest_block), + } +} + +fn get_or_set_block_number( + params: &mut serde_json::Value, + block_param_id: usize, + latest_block: u64, +) -> anyhow::Result { + match params.as_array_mut() { + None => Err(anyhow::anyhow!("params not an array")), + Some(params) => match params.get_mut(block_param_id) { + None => { + if params.len() != block_param_id - 1 { + return Err(anyhow::anyhow!("unexpected params length")); + } + + // add the latest block number to the end of the params + params.push(latest_block.into()); + + Ok(latest_block) + } + Some(x) => { + // convert the json value to a BlockNumber + let block_num = value_to_block_num(x)?; + + let (modified, block_num) = block_num_to_u64(block_num, latest_block); + + // if we changed "latest" to a number, update the params to match + if modified { + *x = block_num.into(); + } + + Ok(block_num) + } + }, + } +} + +fn is_archive_needed( + method: &str, + params: Option<&mut serde_json::Value>, + latest_block: u64, +) -> bool { + if params.is_none() { + return false; } - // TODO: find the given block number in params + // if the query's block is recent, return false + // geth: 128 blocks (or only 64 if you’re using fast/snap sync) + // TODO: apparently erigon keeps data for 90k blocks. so true/false isn't going to be enough. need per-server archive limits + // TODO: maybe give 1 block of headroom? + let last_full_block = latest_block.saturating_sub(64); - // TODO: if its "latest" (or not given), modify params to have the latest block. return false + let params = params.unwrap(); - // TODO: if its "pending", do something special? return false + // TODO: double check these. i think some of the getBlock stuff will never need archive + let block_param_id = match method { + "eth_call" => 1, + "eth_estimateGas" => 1, + "eth_getBalance" => 1, + "eth_getBlockByHash" => { + return false; + } + "eth_getBlockByNumber" => 0, + "eth_getBlockTransactionCountByHash" => { + // TODO: turn block hash into number and check. will need a linkedhashmap of recent hashes + return false; + } + "eth_getBlockTransactionCountByNumber" => 0, + "eth_getCode" => 1, + "eth_getLogs" => { + let obj = params[0].as_object_mut().unwrap(); - // TODO: we probably need a list of recent hashes/heights. if specified block is recent, return false + if let Some(x) = obj.get_mut("fromBlock") { + if let Ok(block_num) = value_to_block_num(x) { + let (modified, block_num) = block_num_to_u64(block_num, latest_block); - // this command needs an archive server - true + if modified { + *x = block_num.into(); + } + + if block_num < last_full_block { + return true; + } + } + } + + if let Some(x) = obj.get_mut("toBlock") { + if let Ok(block_num) = value_to_block_num(x) { + let (modified, block_num) = block_num_to_u64(block_num, latest_block); + + if modified { + *x = block_num.into(); + } + + if block_num < last_full_block { + return true; + } + } + } + + if let Some(x) = obj.get("blockHash") { + // TODO: check a linkedhashmap of recent hashes + // TODO: error if fromBlock or toBlock were set + } + + return false; + } + "eth_getStorageAt" => 2, + "eth_getTransactionByHash" => { + // TODO: not sure how best to look these up + // try full nodes first. retry will use archive + return false; + } + "eth_getTransactionByBlockHashAndIndex" => { + // TODO: check a linkedhashmap of recent hashes + // try full nodes first. retry will use archive + return false; + } + "eth_getTransactionByBlockNumberAndIndex" => 0, + "eth_getTransactionCount" => 1, + "eth_getTransactionReceipt" => { + // TODO: not sure how best to look these up + // try full nodes first. retry will use archive + return false; + } + "eth_getUncleByBlockHashAndIndex" => { + // TODO: check a linkedhashmap of recent hashes + // try full nodes first. retry will use archive + return false; + } + "eth_getUncleByBlockNumberAndIndex" => 0, + "eth_getUncleCountByBlockHash" => { + // TODO: check a linkedhashmap of recent hashes + // try full nodes first. retry will use archive + return false; + } + "eth_getUncleCountByBlockNumber" => 0, + _ => { + // some other command that doesn't take block numbers as an argument + return false; + } + }; + + if let Ok(block) = get_or_set_block_number(params, block_param_id, latest_block) { + block < last_full_block + } else { + // TODO: seems unlikely that we will get here. probably should log this error + // if this is incorrect, it should retry on an archive server + false + } } // TODO: think more about TxState. d @@ -566,7 +711,7 @@ impl Web3ProxyApp { // TODO: inspect the request to pick the right cache // TODO: https://github.com/ethereum/web3.py/blob/master/web3/middleware/cache.py - // TODO: Some requests should skip caching on the head_block_hash + // TODO: Some requests should skip caching on an older block hash. take number as an argument. if its older than 64, use it as the key let head_block_hash = self.balanced_rpcs.get_head_block_hash(); // TODO: better key? benchmark this @@ -792,8 +937,11 @@ impl Web3ProxyApp { // everything else is relayed to a backend // this is not a private transaction + let head_block_number = self.balanced_rpcs.get_head_block_num(); + // we do this check before checking caches because it might modify the request params - let archive_needed = is_archive_needed(method, request.params.as_mut()); + let archive_needed = + is_archive_needed(method, request.params.as_mut(), head_block_number); let (cache_key, response_cache) = match self.get_cached_response(&request) { (cache_key, Ok(response)) => { diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 76282c16..bb179512 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -633,30 +633,25 @@ impl Web3Connections { return Err(None); } - let sort_cache: HashMap, (f32, u32)> = synced_rpcs + let sort_cache: HashMap<_, _> = synced_rpcs .iter() .map(|rpc| { let active_requests = rpc.active_requests(); let soft_limit = rpc.soft_limit(); + let is_archive = rpc.is_archive(); let utilization = active_requests as f32 / soft_limit as f32; - (rpc.clone(), (utilization, soft_limit)) + (rpc.clone(), (is_archive, utilization, soft_limit)) }) .collect(); synced_rpcs.sort_unstable_by(|a, b| { - let (a_utilization, a_soft_limit) = sort_cache.get(a).unwrap(); - let (b_utilization, b_soft_limit) = sort_cache.get(b).unwrap(); + let a_sorts = sort_cache.get(a).unwrap(); + let b_sorts = sort_cache.get(b).unwrap(); // TODO: i'm comparing floats. crap - match a_utilization - .partial_cmp(b_utilization) - .unwrap_or(cmp::Ordering::Equal) - { - cmp::Ordering::Equal => a_soft_limit.cmp(b_soft_limit), - x => x, - } + a_sorts.partial_cmp(b_sorts).unwrap_or(cmp::Ordering::Equal) }); // now that the rpcs are sorted, try to get an active request handle for one of them @@ -859,3 +854,14 @@ impl Web3Connections { } } } + +mod tests { + #[test] + fn test_false_before_true() { + let mut x = [true, false, true]; + + x.sort(); + + assert_eq!(x, [false, true, true]) + } +}