diff --git a/TODO.md b/TODO.md index 7b46c0e8..10974a82 100644 --- a/TODO.md +++ b/TODO.md @@ -330,6 +330,8 @@ These are not yet ordered. There might be duplicates. We might not actually need - [x] block all admin_ rpc commands - [x] remove the "metered" crate now that we save aggregate queries? - [x] add archive depth to app config +- [x] use from_block and to_block so that eth_getLogs is routed correctly +- [x] improve eth_sendRawTransaction server selection - [-] proxy mode for benchmarking all backends - [-] proxy mode for sending to multiple backends - [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly @@ -343,6 +345,8 @@ These are not yet ordered. There might be duplicates. We might not actually need - erigon only streams the JSON over HTTP. that code isn't enabled for websockets. so this should save memory on the erigon servers - i think this also means we don't need to worry about changing the id that the user gives us. - have the healthcheck get the block over http. if it errors, or doesn't match what the websocket says, something is wrong (likely a deadlock in the websocket code) +- [ ] maybe we shouldn't route eth_getLogs to syncing nodes. serving queries slows down sync significantly + - change the send_best function to only include servers that are at least close to fully synced - [ ] have private transactions be enabled by a url setting rather than a setting on the key - [ ] cli for adding rpc keys to an existing user - [ ] rate limiting/throttling on query_user_stats diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index a0805fbe..4a35eb71 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1158,6 +1158,7 @@ impl Web3ProxyApp { request, Some(&request_metadata), None, + None, ) .await?; @@ -1231,6 +1232,7 @@ impl Web3ProxyApp { &request, Some(request_metadata.clone()), head_block_num.as_ref(), + None, Level::Trace, num, true, @@ -1527,6 +1529,7 @@ impl Web3ProxyApp { if let Some(cache_key) = cache_key { let from_block_num = cache_key.from_block.as_ref().map(|x| x.number()); + let to_block_num = cache_key.to_block.as_ref().map(|x| x.number()); self.response_cache .try_get_with(cache_key, async move { @@ -1539,6 +1542,7 @@ impl Web3ProxyApp { request, Some(&request_metadata), from_block_num.as_ref(), + to_block_num.as_ref(), ) .await?; @@ -1547,7 +1551,7 @@ impl Web3ProxyApp { // TODO: only cache the inner response // TODO: how are we going to stream this? - // TODO: check response size. if its very large, return it in a custom Error type that bypasses caching + // TODO: check response size. if its very large, return it in a custom Error type that bypasses caching? or will moka do that for us? Ok::<_, anyhow::Error>(response) }) .await @@ -1567,6 +1571,7 @@ impl Web3ProxyApp { request, Some(&request_metadata), None, + None, ) .await? } diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index 33ef7f54..ef256b84 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -215,8 +215,8 @@ pub async fn block_needed( }; return Ok(BlockNeeded::CacheRange { - from_block_num: from_block_num, - to_block_num: to_block_num, + from_block_num, + to_block_num, cache_errors: true, }); } diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index ce79d76a..679516a0 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -167,7 +167,13 @@ impl Web3Rpcs { // TODO: request_metadata? maybe we should put it in the authorization? // TODO: think more about this wait_for_sync let response = self - .try_send_best_consensus_head_connection(authorization, request, None, None) + .try_send_best_consensus_head_connection( + authorization, + request, + None, + None, + None, + ) .await?; let block = response.result.context("failed fetching block")?; @@ -258,7 +264,7 @@ impl Web3Rpcs { // TODO: request_metadata or authorization? // we don't actually set min_block_needed here because all nodes have all blocks let response = self - .try_send_best_consensus_head_connection(authorization, request, None, None) + .try_send_best_consensus_head_connection(authorization, request, None, None, None) .await?; if let Some(err) = response.error { diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index a2b555b5..32cfc8a0 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -411,7 +411,9 @@ impl Web3Rpcs { authorization: &Arc, request_metadata: Option<&Arc>, skip: &[Arc], + // TODO: if we are checking for the consensus head, i don' think we need min_block_needed/max_block_needed min_block_needed: Option<&U64>, + max_block_needed: Option<&U64>, ) -> anyhow::Result { if let Ok(without_backups) = self ._best_consensus_head_connection( @@ -420,6 +422,7 @@ impl Web3Rpcs { request_metadata, skip, min_block_needed, + max_block_needed, ) .await { @@ -435,6 +438,7 @@ impl Web3Rpcs { request_metadata, skip, min_block_needed, + max_block_needed, ) .await } @@ -447,6 +451,7 @@ impl Web3Rpcs { request_metadata: Option<&Arc>, skip: &[Arc], min_block_needed: Option<&U64>, + max_block_needed: Option<&U64>, ) -> anyhow::Result { let usable_rpcs_by_head_num_and_weight: BTreeMap<(Option, u64), Vec>> = { let synced_connections = self.watch_consensus_connections_sender.borrow().clone(); @@ -471,6 +476,13 @@ impl Web3Rpcs { .filter(|x| if allow_backups { true } else { !x.backup }) .filter(|x| !skip.contains(x)) .filter(|x| x.has_block_data(min_block_needed)) + .filter(|x| { + if let Some(max_block_needed) = max_block_needed { + x.has_block_data(max_block_needed) + } else { + true + } + }) .cloned() { let x_head_block = x.head_block.read().clone(); @@ -637,28 +649,42 @@ impl Web3Rpcs { pub async fn all_connections( &self, authorization: &Arc, - block_needed: Option<&U64>, + min_block_needed: Option<&U64>, + max_block_needed: Option<&U64>, max_count: Option, always_include_backups: bool, ) -> Result, Option> { if !always_include_backups { if let Ok(without_backups) = self - ._all_connections(false, authorization, block_needed, max_count) + ._all_connections( + false, + authorization, + min_block_needed, + max_block_needed, + max_count, + ) .await { return Ok(without_backups); } } - self._all_connections(true, authorization, block_needed, max_count) - .await + self._all_connections( + true, + authorization, + min_block_needed, + max_block_needed, + max_count, + ) + .await } async fn _all_connections( &self, allow_backups: bool, authorization: &Arc, - block_needed: Option<&U64>, + min_block_needed: Option<&U64>, + max_block_needed: Option<&U64>, max_count: Option, ) -> Result, Option> { let mut earliest_retry_at = None; @@ -680,11 +706,12 @@ impl Web3Rpcs { .clone(); // synced connections are all on the same block. sort them by tier with higher soft limits first - synced_conns.sort_by_cached_key(sort_rpcs_by_sync_status); + synced_conns.sort_by_cached_key(rpc_sync_status_sort_key); // if there aren't enough synced connections, include more connections + // TODO: only do this sorting if the synced_conns isn't enough let mut all_conns: Vec<_> = self.conns.values().cloned().collect(); - all_conns.sort_by_cached_key(sort_rpcs_by_sync_status); + all_conns.sort_by_cached_key(rpc_sync_status_sort_key); for connection in itertools::chain(synced_conns, all_conns) { if max_count == 0 { @@ -701,7 +728,13 @@ impl Web3Rpcs { continue; } - if let Some(block_needed) = block_needed { + if let Some(block_needed) = min_block_needed { + if !connection.has_block_data(block_needed) { + continue; + } + } + + if let Some(block_needed) = max_block_needed { if !connection.has_block_data(block_needed) { continue; } @@ -709,7 +742,7 @@ impl Web3Rpcs { // check rate limits and increment our connection counter match connection - .try_request_handle(authorization, block_needed.is_none()) + .try_request_handle(authorization, min_block_needed.is_none()) .await { Ok(OpenRequestResult::RetryAt(retry_at)) => { @@ -748,6 +781,7 @@ impl Web3Rpcs { request: JsonRpcRequest, request_metadata: Option<&Arc>, min_block_needed: Option<&U64>, + max_block_needed: Option<&U64>, ) -> anyhow::Result { let mut skip_rpcs = vec![]; let mut method_not_available_response = None; @@ -768,6 +802,7 @@ impl Web3Rpcs { request_metadata, &skip_rpcs, min_block_needed, + max_block_needed, ) .await? { @@ -1007,7 +1042,8 @@ impl Web3Rpcs { authorization: &Arc, request: &JsonRpcRequest, request_metadata: Option>, - block_needed: Option<&U64>, + min_block_needed: Option<&U64>, + max_block_needed: Option<&U64>, error_level: Level, max_count: Option, always_include_backups: bool, @@ -1016,7 +1052,8 @@ impl Web3Rpcs { match self .all_connections( authorization, - block_needed, + min_block_needed, + max_block_needed, max_count, always_include_backups, ) @@ -1099,6 +1136,7 @@ impl Web3Rpcs { request: JsonRpcRequest, request_metadata: Option<&Arc>, min_block_needed: Option<&U64>, + max_block_needed: Option<&U64>, ) -> anyhow::Result { match proxy_mode { ProxyMode::Best => { @@ -1107,6 +1145,7 @@ impl Web3Rpcs { request, request_metadata, min_block_needed, + max_block_needed, ) .await } @@ -1154,7 +1193,7 @@ impl Serialize for Web3Rpcs { /// sort by block number (descending) and tier (ascending) /// TODO: should this be moved into a `impl Web3Rpc`? /// TODO: take AsRef or something like that? We don't need an Arc here -fn sort_rpcs_by_sync_status(x: &Arc) -> (u64, u64, u32) { +fn rpc_sync_status_sort_key(x: &Arc) -> (u64, u64, u32) { let reversed_head_block = u64::MAX - x.head_block .read() @@ -1251,7 +1290,7 @@ mod tests { .map(Arc::new) .collect(); - rpcs.sort_by_cached_key(sort_rpcs_by_sync_status); + rpcs.sort_by_cached_key(rpc_sync_status_sort_key); let names_in_sort_order: Vec<_> = rpcs.iter().map(|x| x.name.as_str()).collect(); @@ -1395,7 +1434,7 @@ mod tests { // all_backend_connections gives all non-backup servers regardless of sync status assert_eq!( conns - .all_connections(&authorization, None, None, false) + .all_connections(&authorization, None, None, None, false) .await .unwrap() .len(), @@ -1404,7 +1443,7 @@ mod tests { // best_synced_backend_connection requires servers to be synced with the head block let x = conns - .best_consensus_head_connection(&authorization, None, &[], None) + .best_consensus_head_connection(&authorization, None, &[], None, None) .await .unwrap(); @@ -1459,21 +1498,21 @@ mod tests { assert!(matches!( conns - .best_consensus_head_connection(&authorization, None, &[], None) + .best_consensus_head_connection(&authorization, None, &[], None, None) .await, Ok(OpenRequestResult::Handle(_)) )); assert!(matches!( conns - .best_consensus_head_connection(&authorization, None, &[], Some(&0.into())) + .best_consensus_head_connection(&authorization, None, &[], Some(&0.into()), None) .await, Ok(OpenRequestResult::Handle(_)) )); assert!(matches!( conns - .best_consensus_head_connection(&authorization, None, &[], Some(&1.into())) + .best_consensus_head_connection(&authorization, None, &[], Some(&1.into()), None) .await, Ok(OpenRequestResult::Handle(_)) )); @@ -1481,7 +1520,7 @@ mod tests { // future block should not get a handle assert!(matches!( conns - .best_consensus_head_connection(&authorization, None, &[], Some(&2.into())) + .best_consensus_head_connection(&authorization, None, &[], Some(&2.into()), None) .await, Ok(OpenRequestResult::NotReady(true)) )); @@ -1605,8 +1644,15 @@ mod tests { assert_eq!(conns.num_synced_rpcs(), 2); // best_synced_backend_connection requires servers to be synced with the head block + // TODO: test with and without passing the head_block.number? let best_head_server = conns - .best_consensus_head_connection(&authorization, None, &[], Some(&head_block.number())) + .best_consensus_head_connection( + &authorization, + None, + &[], + Some(&head_block.number()), + None, + ) .await; assert!(matches!( @@ -1615,7 +1661,7 @@ mod tests { )); let best_archive_server = conns - .best_consensus_head_connection(&authorization, None, &[], Some(&1.into())) + .best_consensus_head_connection(&authorization, None, &[], Some(&1.into()), None) .await; match best_archive_server {