diff --git a/Cargo.lock b/Cargo.lock index 4e09fdfb..60ff16b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -747,9 +747,9 @@ checksum = "fb4a24b1aaf0fd0ce8b45161144d6f42cd91677fd5940fd431183eb023b3a2b8" [[package]] name = "counter" -version = "0.5.5" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63b05d7acd00b53d97b7369c4062027ff55711db0c509f5b28d6d964a2f1ff87" +checksum = "48388d8711a360319610960332b6a6f9fc2b5a63bba9fd10f1b7aa50677d956f" dependencies = [ "num-traits", ] @@ -942,7 +942,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3495912c9c1ccf2e18976439f4443f3fee0fd61f424ff99fde6a66b15ecb448f" dependencies = [ "cfg-if 1.0.0", - "hashbrown 0.12.2", + "hashbrown 0.12.3", "lock_api", "parking_lot_core 0.9.3", ] @@ -1829,9 +1829,9 @@ checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" [[package]] name = "hashbrown" -version = "0.12.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "607c8a29735385251a339424dd462993c0fed8fa09d378f259377df08c126022" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" dependencies = [ "ahash", ] @@ -1851,7 +1851,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d452c155cb93fecdfb02a73dd57b5d8e442c2063bd7aac72f1bc5e4263a43086" dependencies = [ - "hashbrown 0.12.2", + "hashbrown 0.12.3", ] [[package]] @@ -2255,7 +2255,7 @@ name = "linkedhashmap" version = "0.2.0" dependencies = [ "criterion", - "hashbrown 0.12.2", + "hashbrown 0.12.3", "hashlink", "linked-hash-map", "slab", @@ -4333,7 +4333,7 @@ dependencies = [ "fdlimit", "flume", "futures", - "hashbrown 0.12.2", + "hashbrown 0.12.3", "linkedhashmap", "notify", "num", diff --git a/linkedhashmap/Cargo.toml b/linkedhashmap/Cargo.toml index 5afff8b4..b520c978 100644 --- a/linkedhashmap/Cargo.toml +++ b/linkedhashmap/Cargo.toml @@ -9,7 +9,7 @@ inline-more = [ "hashbrown" ] [dependencies] slab = "0.4.6" -hashbrown = { version = "0.12.2", optional = true } +hashbrown = { version = "0.12.3", optional = true } [dev-dependencies] criterion = "0.3.6" diff --git a/web3-proxy/Cargo.toml b/web3-proxy/Cargo.toml index 7c2883c9..8d87d989 100644 --- a/web3-proxy/Cargo.toml +++ b/web3-proxy/Cargo.toml @@ -15,14 +15,14 @@ arc-swap = "1.5.0" argh = "0.1.8" axum = { version = "0.5.13", features = ["serde_json", "tokio-tungstenite", "ws"] } axum-client-ip = "0.2.0" -counter = "0.5.5" +counter = "0.5.6" dashmap = "5.3.4" derive_more = "0.99.17" ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] } fdlimit = "0.2.1" flume = "0.10.13" futures = { version = "0.3.21", features = ["thread-pool"] } -hashbrown = "0.12.2" +hashbrown = "0.12.3" linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] } notify = "4.0.17" num = "0.4.0" diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 8a66282e..b126ae8f 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -1,8 +1,7 @@ use axum::extract::ws::Message; use dashmap::mapref::entry::Entry as DashMapEntry; use dashmap::DashMap; -use ethers::prelude::{Address, BlockNumber, Bytes, Transaction}; -use ethers::prelude::{Block, TxHash, H256}; +use ethers::prelude::{Address, Block, BlockNumber, Bytes, Transaction, TxHash, H256, U64}; use futures::future::Abortable; use futures::future::{join_all, AbortHandle}; use futures::stream::FuturesUnordered; @@ -23,7 +22,7 @@ use tokio::sync::{broadcast, watch}; use tokio::task::JoinHandle; use tokio::time::timeout; use tokio_stream::wrappers::{BroadcastStream, WatchStream}; -use tracing::{debug, info, info_span, instrument, trace, warn, Instrument}; +use tracing::{info, info_span, instrument, trace, warn, Instrument}; use crate::bb8_helpers; use crate::config::AppConfig; @@ -79,26 +78,14 @@ pub async fn flatten_handles( Ok(()) } -fn value_to_block_num(x: &serde_json::Value) -> anyhow::Result { - let block_num = if x.is_string() { - BlockNumber::from_str(x.as_str().unwrap()).unwrap() - } else if x.is_number() { - BlockNumber::from(x.as_u64().unwrap()) - } else { - return Err(anyhow::anyhow!("unexpected BlockNumber")); - }; - - Ok(block_num) -} - -fn block_num_to_u64(block_num: BlockNumber, latest_block: u64) -> (bool, u64) { +fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> (bool, U64) { match block_num { - BlockNumber::Earliest => (false, 0), + BlockNumber::Earliest => (false, U64::zero()), BlockNumber::Latest => { // change "latest" to a number (true, latest_block) } - BlockNumber::Number(x) => (false, x.as_u64()), + BlockNumber::Number(x) => (false, x), // TODO: think more about how to handle Pending BlockNumber::Pending => (false, latest_block), } @@ -107,8 +94,8 @@ fn block_num_to_u64(block_num: BlockNumber, latest_block: u64) -> (bool, u64) { fn get_or_set_block_number( params: &mut serde_json::Value, block_param_id: usize, - latest_block: u64, -) -> anyhow::Result { + latest_block: U64, +) -> anyhow::Result { match params.as_array_mut() { None => Err(anyhow::anyhow!("params not an array")), Some(params) => match params.get_mut(block_param_id) { @@ -118,19 +105,19 @@ fn get_or_set_block_number( } // add the latest block number to the end of the params - params.push(latest_block.into()); + params.push(serde_json::to_value(latest_block)?); Ok(latest_block) } Some(x) => { // convert the json value to a BlockNumber - let block_num = value_to_block_num(x)?; + let block_num: BlockNumber = serde_json::from_value(x.clone())?; let (modified, block_num) = block_num_to_u64(block_num, latest_block); // if we changed "latest" to a number, update the params to match if modified { - *x = format!("0x{:x}", block_num).into(); + *x = serde_json::to_value(block_num)?; } Ok(block_num) @@ -139,23 +126,13 @@ fn get_or_set_block_number( } } -// TODO: change this to return the height needed instead -fn is_archive_needed( +// TODO: change this to return the height needed (OR hash if recent) +fn get_min_block_needed( method: &str, params: Option<&mut serde_json::Value>, - latest_block: u64, -) -> bool { - if params.is_none() { - return false; - } - - // if the query's block is recent, return false - // geth: 128 blocks (or only 64 if you’re using fast/snap sync) - // TODO: apparently erigon keeps data for 90k blocks. so true/false isn't going to be enough. need per-server archive limits - // TODO: maybe give 1 block of headroom? - let last_full_block = latest_block.saturating_sub(64); - - let params = params.unwrap(); + latest_block: U64, +) -> Option { + let params = params?; // TODO: double check these. i think some of the getBlock stuff will never need archive let block_param_id = match method { @@ -163,14 +140,14 @@ fn is_archive_needed( "eth_estimateGas" => 1, "eth_getBalance" => 1, "eth_getBlockByHash" => { - return false; + return None; } "eth_getBlockByNumber" => { - return false; + return None; } "eth_getBlockTransactionCountByHash" => { // TODO: turn block hash into number and check. will need a linkedhashmap of recent hashes - return false; + return None; } "eth_getBlockTransactionCountByNumber" => 0, "eth_getCode" => 1, @@ -178,82 +155,81 @@ fn is_archive_needed( let obj = params[0].as_object_mut().unwrap(); if let Some(x) = obj.get_mut("fromBlock") { - if let Ok(block_num) = value_to_block_num(x) { - let (modified, block_num) = block_num_to_u64(block_num, latest_block); + let block_num: BlockNumber = serde_json::from_value(x.clone()).ok()?; - if modified { - *x = format!("0x{:x}", block_num).into(); - } + let (modified, block_num) = block_num_to_u64(block_num, latest_block); - if block_num < last_full_block { - return true; - } + if modified { + *x = serde_json::to_value(block_num).unwrap(); } + + return Some(block_num); } if let Some(x) = obj.get_mut("toBlock") { - if let Ok(block_num) = value_to_block_num(x) { - let (modified, block_num) = block_num_to_u64(block_num, latest_block); + let block_num: BlockNumber = serde_json::from_value(x.clone()).ok()?; - if modified { - *x = format!("0x{:x}", block_num).into(); - } + let (modified, block_num) = block_num_to_u64(block_num, latest_block); - if block_num < last_full_block { - return true; - } + if modified { + *x = serde_json::to_value(block_num).unwrap(); } + + return Some(block_num); } if let Some(x) = obj.get("blockHash") { // TODO: check a linkedhashmap of recent hashes // TODO: error if fromBlock or toBlock were set + unimplemented!("handle blockHash {}", x); } - return false; + return None; } "eth_getStorageAt" => 2, "eth_getTransactionByHash" => { // TODO: not sure how best to look these up // try full nodes first. retry will use archive - return false; + return None; } "eth_getTransactionByBlockHashAndIndex" => { // TODO: check a linkedhashmap of recent hashes // try full nodes first. retry will use archive - return false; + return None; } "eth_getTransactionByBlockNumberAndIndex" => 0, "eth_getTransactionCount" => 1, "eth_getTransactionReceipt" => { // TODO: not sure how best to look these up // try full nodes first. retry will use archive - return false; + return None; } "eth_getUncleByBlockHashAndIndex" => { // TODO: check a linkedhashmap of recent hashes // try full nodes first. retry will use archive - return false; + return None; } "eth_getUncleByBlockNumberAndIndex" => 0, "eth_getUncleCountByBlockHash" => { // TODO: check a linkedhashmap of recent hashes // try full nodes first. retry will use archive - return false; + return None; } "eth_getUncleCountByBlockNumber" => 0, _ => { // some other command that doesn't take block numbers as an argument - return false; + return None; } }; - if let Ok(block) = get_or_set_block_number(params, block_param_id, latest_block) { - block < last_full_block - } else { - // TODO: seems unlikely that we will get here. probably should log this error - // if this is incorrect, it should retry on an archive server - false + match get_or_set_block_number(params, block_param_id, latest_block) { + Ok(block) => Some(block), + Err(err) => { + // TODO: seems unlikely that we will get here + // if this is incorrect, it should retry on an archive server + warn!(?err, "could not get block from params"); + None + } } } @@ -704,41 +680,48 @@ impl Web3ProxyApp { Ok(collected) } - fn get_cached_response( + async fn get_cached_response( &self, + // TODO: accept a block hash here also + min_block_needed: Option, request: &JsonRpcRequest, - ) -> ( + ) -> anyhow::Result<( CacheKey, Result, - ) { + )> { // TODO: inspect the request to pick the right cache // TODO: https://github.com/ethereum/web3.py/blob/master/web3/middleware/cache.py - // TODO: Some requests should skip caching on an older block hash. take number as an argument. if its older than 64, use it as the key - let head_block_hash = self.balanced_rpcs.get_head_block_hash(); + let request_block_hash = if let Some(min_block_needed) = min_block_needed { + let block_result = self.balanced_rpcs.get_block_hash(min_block_needed).await; + + block_result? + } else { + self.balanced_rpcs.get_head_block_hash() + }; // TODO: better key? benchmark this let key = ( - head_block_hash, + request_block_hash, request.method.clone(), request.params.clone().map(|x| x.to_string()), ); if let Some(response) = self.response_cache.read().get(&key) { // TODO: emit a stat - trace!("{:?} cache hit!", request); + trace!(?request.method, "cache hit!"); // TODO: can we make references work? maybe put them in an Arc? - return (key, Ok(response.to_owned())); + return Ok((key, Ok(response.to_owned()))); } else { // TODO: emit a stat - trace!("{:?} cache miss!", request); + trace!(?request.method, "cache miss!"); } // TODO: multiple caches. if head_block_hash is None, have a persistent cache (disk backed?) let cache = &self.response_cache; - (key, Err(cache)) + Ok((key, Err(cache))) } // #[instrument(skip_all)] @@ -755,7 +738,7 @@ impl Web3ProxyApp { // // TODO: add more to this span such as let span = info_span!("rpc_request"); // let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon) - match &request.method[..] { + match request.method.as_ref() { // lots of commands are blocked "admin_addPeer" | "admin_datadir" @@ -894,7 +877,7 @@ impl Web3ProxyApp { if check_firewall_raw(&raw_tx).await? { self.private_rpcs - .try_send_all_upstream_servers(request, false) + .try_send_all_upstream_servers(request, None) .instrument(span) .await } else { @@ -944,20 +927,23 @@ impl Web3ProxyApp { // we do this check before checking caches because it might modify the request params // TODO: add a stat for archive vs full since they should probably cost different - let archive_needed = - is_archive_needed(method, request.params.as_mut(), head_block_number); + let min_block_needed = + get_min_block_needed(method, request.params.as_mut(), head_block_number.into()); - trace!(?archive_needed, ?method); + trace!(?min_block_needed, ?method); - let (cache_key, response_cache) = match self.get_cached_response(&request) { - (cache_key, Ok(response)) => { + let (cache_key, cache_result) = + self.get_cached_response(min_block_needed, &request).await?; + + let response_cache = match cache_result { + Ok(response) => { let _ = self.incoming_requests.remove(&cache_key); // TODO: if the response is cached, should it count less against the account's costs? return Ok(response); } - (cache_key, Err(response_cache)) => (cache_key, response_cache), + Err(response_cache) => response_cache, }; // check if this request is already in flight @@ -1004,13 +990,13 @@ impl Web3ProxyApp { "eth_getTransactionByHash" | "eth_getTransactionReceipt" => { // TODO: try_send_all serially with retries instead of parallel self.private_rpcs - .try_send_all_upstream_servers(request, archive_needed) + .try_send_all_upstream_servers(request, min_block_needed) .await? } _ => { // TODO: retries? self.balanced_rpcs - .try_send_best_upstream_server(request, archive_needed) + .try_send_best_upstream_server(request, min_block_needed) .await? } }; diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 80f274f1..e318319a 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -1,7 +1,7 @@ ///! Rate-limited communication with a web3 provider use anyhow::Context; use derive_more::From; -use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash}; +use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64}; use futures::future::try_join_all; use futures::StreamExt; use redis_cell_client::RedisCellClient; @@ -9,7 +9,7 @@ use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use std::fmt; use std::hash::{Hash, Hasher}; -use std::sync::atomic::{self, AtomicBool, AtomicU32}; +use std::sync::atomic::{self, AtomicU32, AtomicU64}; use std::{cmp::Ordering, sync::Arc}; use tokio::sync::broadcast; use tokio::sync::RwLock; @@ -81,7 +81,8 @@ pub struct Web3Connection { hard_limit: Option, /// used for load balancing to the least loaded server soft_limit: u32, - archive: AtomicBool, + block_data_limit: AtomicU64, + head_block: parking_lot::RwLock<(H256, U64)>, } impl Serialize for Web3Connection { @@ -107,10 +108,18 @@ impl Serialize for Web3Connection { } impl fmt::Debug for Web3Connection { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Web3Connection") - .field("url", &self.url) - .field("archive", &self.is_archive()) - .finish_non_exhaustive() + let mut f = f.debug_struct("Web3Connection"); + + f.field("url", &self.url); + + let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed); + if block_data_limit == u64::MAX { + f.field("limit", &"archive"); + } else { + f.field("limit", &block_data_limit); + } + + f.finish_non_exhaustive() } } @@ -123,6 +132,7 @@ impl fmt::Display for Web3Connection { impl Web3Connection { /// Connect to a web3 rpc // #[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))] + // TODO: have this take a builder (which will have senders attached) #[allow(clippy::too_many_arguments)] pub async fn spawn( chain_id: usize, @@ -157,13 +167,14 @@ impl Web3Connection { provider: RwLock::new(Some(Arc::new(provider))), hard_limit, soft_limit, - archive: Default::default(), + block_data_limit: Default::default(), + head_block: parking_lot::RwLock::new((H256::zero(), 0isize.into())), }; let new_connection = Arc::new(new_connection); // check the server's chain_id here - // TODO: move this outside the `new` function and into a `start` function or something + // TODO: move this outside the `new` function and into a `start` function or something. that way we can do retries from there // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error // TODO: this will wait forever. do we want that? let found_chain_id: Result = new_connection @@ -192,27 +203,8 @@ impl Web3Connection { } } - // we could take "archive" as a parameter, but we would want a safety check on it regardless - // just query something very old and if we get an error, we don't have archive data - let archive_result: Result = new_connection - .wait_for_request_handle() - .await - .request( - "eth_getCode", - ("0xdead00000000000000000000000000000000beef", "0x1"), - ) - .await; - - trace!(?archive_result, "{}", new_connection); - - if archive_result.is_ok() { - new_connection - .archive - .store(true, atomic::Ordering::Relaxed); - } - - info!(?new_connection, "success"); - + // subscribe to new blocks and new transactions + // TODO: make transaction subscription optional (just pass None for tx_id_sender) let handle = { let new_connection = new_connection.clone(); tokio::spawn(async move { @@ -222,11 +214,74 @@ impl Web3Connection { }) }; + // TODO: make sure the server isn't still syncing + + // TODO: don't sleep. wait for new heads subscription instead + // TODO: i think instead of atomics, we could maybe use a watch channel + sleep(Duration::from_millis(100)).await; + + // we could take "archive" as a parameter, but we would want a safety check on it regardless + // just query something very old and if we get an error, we don't have archive data + for block_data_limit in [u64::MAX, 90_000, 128, 64, 32] { + let mut head_block_num = new_connection.head_block.read().1; + + // TODO: wait until head block is set outside the loop? + while head_block_num == U64::zero() { + info!(?new_connection, "no head block"); + + // TODO: subscribe to a channel instead of polling? subscribe to http_interval_sender? + sleep(Duration::from_secs(1)).await; + + head_block_num = new_connection.head_block.read().1; + } + + let maybe_archive_block = head_block_num + .saturating_sub(block_data_limit.into()) + .max(U64::one()); + + let archive_result: Result = new_connection + .wait_for_request_handle() + .await + .request( + "eth_getCode", + ( + "0xdead00000000000000000000000000000000beef", + maybe_archive_block, + ), + ) + .await; + + trace!(?archive_result, "{}", new_connection); + + if archive_result.is_ok() { + new_connection + .block_data_limit + .store(block_data_limit, atomic::Ordering::Release); + + break; + } + } + + info!(?new_connection, "success"); + Ok((new_connection, handle)) } - pub fn is_archive(&self) -> bool { - self.archive.load(atomic::Ordering::Relaxed) + /// TODO: this might be too simple. different nodes can prune differently + pub fn get_block_data_limit(&self) -> U64 { + self.block_data_limit.load(atomic::Ordering::Acquire).into() + } + + pub fn has_block_data(&self, needed_block_num: U64) -> bool { + let block_data_limit: U64 = self.get_block_data_limit(); + + let newest_block_num = self.head_block.read().1; + + let oldest_block_num = newest_block_num + .saturating_sub(block_data_limit) + .max(U64::one()); + + needed_block_num >= oldest_block_num && needed_block_num <= newest_block_num } #[instrument(skip_all)] @@ -237,7 +292,10 @@ impl Web3Connection { // websocket doesn't need the http client let http_client = None; + info!(?self, "reconnecting"); + // since this lock is held open over an await, we use tokio's locking + // TODO: timeout on this lock. if its slow, something is wrong let mut provider = self.provider.write().await; *provider = None; @@ -281,7 +339,15 @@ impl Web3Connection { ) -> anyhow::Result<()> { match block { Ok(block) => { - // TODO: i'm pretty sure we don't need send_async, but double check + { + let hash = block.hash.unwrap(); + let num = block.number.unwrap(); + + let mut head_block = self.head_block.write(); + + *head_block = (hash, num); + } + block_sender .send_async((block, self.clone())) .await @@ -376,19 +442,9 @@ impl Web3Connection { let mut http_interval_receiver = http_interval_receiver.unwrap(); - let mut last_hash = Default::default(); + let mut last_hash = H256::zero(); loop { - // wait for the interval - // TODO: if error or rate limit, increase interval? - while let Err(err) = http_interval_receiver.recv().await { - // TODO: if recverror is not Lagged, exit? - // querying the block was delayed. this can happen if tokio was busy. - warn!(?err, ?self, "http interval lagging!"); - } - - trace!(?self, "ok http interval"); - match self.try_request_handle().await { Ok(active_request_handle) => { // TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest" @@ -413,6 +469,22 @@ impl Web3Connection { warn!(?err, "Rate limited on latest block from {}", self); } } + + // wait for the interval + // TODO: if error or rate limit, increase interval? + while let Err(err) = http_interval_receiver.recv().await { + match err { + broadcast::error::RecvError::Closed => { + return Err(err.into()); + } + broadcast::error::RecvError::Lagged(lagged) => { + // querying the block was delayed. this can happen if tokio is very busy. + warn!(?err, ?self, "http interval lagging by {}!", lagged); + } + } + } + + trace!(?self, "ok http interval"); } } Web3Provider::Ws(provider) => { diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index da6288a9..7f1de3dc 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -4,13 +4,14 @@ use arc_swap::ArcSwap; use counter::Counter; use dashmap::DashMap; use derive_more::From; -use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256}; +use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256, U64}; use futures::future::try_join_all; use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::HashMap; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; +use serde_json::json; use serde_json::value::RawValue; use std::cmp; use std::collections::{BTreeMap, BTreeSet}; @@ -357,6 +358,26 @@ impl Web3Connections { self.synced_connections.load().get_head_block_num() } + pub async fn get_block_hash(&self, num: U64) -> anyhow::Result { + // TODO: this definitely needs caching + warn!("this needs to be much more efficient"); + + // TODO: helper for this + let request = + json!({ "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) }); + let request: JsonRpcRequest = serde_json::from_value(request)?; + + // TODO: if error, retry + let response = self + .try_send_best_upstream_server(request, Some(num)) + .await?; + + let block = response.result.unwrap().to_string(); + let block: Block = serde_json::from_str(&block)?; + + Ok(block.hash.unwrap()) + } + pub fn get_head_block_hash(&self) -> H256 { *self.synced_connections.load().get_head_block_hash() } @@ -608,29 +629,31 @@ impl Web3Connections { pub async fn next_upstream_server( &self, skip: &[Arc], - archive_needed: bool, + min_block_needed: Option, ) -> Result> { let mut earliest_retry_after = None; // filter the synced rpcs - let mut synced_rpcs: Vec> = if archive_needed { - // TODO: this includes ALL archive servers. but we only want them if they are on a somewhat recent block - // TODO: maybe instead of "archive_needed" bool it should be the minimum height. then even delayed servers might be fine. will need to track all heights then - self.inner - .iter() - .filter(|x| x.is_archive()) - .filter(|x| !skip.contains(x)) - .cloned() - .collect() - } else { - self.synced_connections - .load() - .inner - .iter() - .filter(|x| !skip.contains(x)) - .cloned() - .collect() - }; + // TODO: we are going to be checking "has_block_data" a lot now. i think we pretty much always have min_block_needed now that we override "latest" + let mut synced_rpcs: Vec> = + if let Some(min_block_needed) = min_block_needed { + // TODO: this includes ALL archive servers. but we only want them if they are on a somewhat recent block + // TODO: maybe instead of "archive_needed" bool it should be the minimum height. then even delayed servers might be fine. will need to track all heights then + self.inner + .iter() + .filter(|x| x.has_block_data(min_block_needed)) + .filter(|x| !skip.contains(x)) + .cloned() + .collect() + } else { + self.synced_connections + .load() + .inner + .iter() + .filter(|x| !skip.contains(x)) + .cloned() + .collect() + }; if synced_rpcs.is_empty() { return Err(None); @@ -639,13 +662,15 @@ impl Web3Connections { let sort_cache: HashMap<_, _> = synced_rpcs .iter() .map(|rpc| { + // TODO: get active requests and the soft limit out of redis? let active_requests = rpc.active_requests(); let soft_limit = rpc.soft_limit(); - let is_archive = rpc.is_archive(); + let block_data_limit = rpc.get_block_data_limit(); let utilization = active_requests as f32 / soft_limit as f32; - (rpc.clone(), (is_archive, utilization, soft_limit)) + // TODO: double check this sorts how we want + (rpc.clone(), (block_data_limit, utilization, soft_limit)) }) .collect(); @@ -681,15 +706,17 @@ impl Web3Connections { /// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions pub async fn get_upstream_servers( &self, - archive_needed: bool, + min_block_needed: Option, ) -> Result, Option> { let mut earliest_retry_after = None; // TODO: with capacity? let mut selected_rpcs = vec![]; for connection in self.inner.iter() { - if archive_needed && !connection.is_archive() { - continue; + if let Some(min_block_needed) = min_block_needed { + if !connection.has_block_data(min_block_needed) { + continue; + } } // check rate limits and increment our connection counter @@ -714,7 +741,7 @@ impl Web3Connections { pub async fn try_send_best_upstream_server( &self, request: JsonRpcRequest, - archive_needed: bool, + min_block_needed: Option, ) -> anyhow::Result { let mut skip_rpcs = vec![]; @@ -723,7 +750,10 @@ impl Web3Connections { if skip_rpcs.len() == self.inner.len() { break; } - match self.next_upstream_server(&skip_rpcs, archive_needed).await { + match self + .next_upstream_server(&skip_rpcs, min_block_needed) + .await + { Ok(active_request_handle) => { // save the rpc in case we get an error and want to retry on another server skip_rpcs.push(active_request_handle.clone_connection()); @@ -807,10 +837,10 @@ impl Web3Connections { pub async fn try_send_all_upstream_servers( &self, request: JsonRpcRequest, - archive_needed: bool, + min_block_needed: Option, ) -> anyhow::Result { loop { - match self.get_upstream_servers(archive_needed).await { + match self.get_upstream_servers(min_block_needed).await { Ok(active_request_handles) => { // TODO: benchmark this compared to waiting on unbounded futures // TODO: do something with this handle? @@ -864,7 +894,7 @@ mod tests { fn test_false_before_true() { let mut x = [true, false, true]; - x.sort(); + x.sort_unstable(); assert_eq!(x, [false, true, true]) }