From a10a2830234f44b2000c593ac984ade71180b217 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 10 Oct 2023 12:19:54 -0700 Subject: [PATCH] trust just removing uncles from the cache --- web3_proxy/src/errors.rs | 21 +++++ web3_proxy/src/rpcs/blockchain.rs | 138 +++++++----------------------- web3_proxy/src/rpcs/request.rs | 22 +++-- 3 files changed, 70 insertions(+), 111 deletions(-) diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index 66958dc5..700ca34b 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -52,6 +52,12 @@ pub enum Web3ProxyError { #[error(ignore)] Anyhow(anyhow::Error), Arc(Arc), + #[from(ignore)] + #[display(fmt = "{:?} to {:?}", min, max)] + ArchiveRequired { + min: Option, + max: Option, + }, #[error(ignore)] #[from(ignore)] BadRequest(Cow<'static, str>), @@ -235,6 +241,21 @@ impl Web3ProxyError { }, ) } + Self::ArchiveRequired { min, max } => { + // TODO: attach something to this trace. probably don't include much in the message though. don't want to leak creds by accident + trace!(?min, ?max, "archive node required"); + ( + StatusCode::OK, + JsonRpcErrorData { + message: "Archive data required".into(), + code: StatusCode::OK.as_u16().into(), + data: Some(json!({ + "min": min, + "max": max, + })), + }, + ) + } Self::Anyhow(err) => { error!(?err, "anyhow: {}", err); ( diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 5cf85efe..b2bd521c 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -12,8 +12,9 @@ use serde_json::json; use std::hash::Hash; use std::time::Duration; use std::{fmt::Display, sync::Arc}; +use tokio::select; use tokio::sync::mpsc; -use tokio::time::timeout; +use tokio::time::sleep; use tracing::{debug, error, warn}; // TODO: type for Hydrated Blocks with their full transactions? @@ -172,60 +173,9 @@ impl Web3Rpcs { // loop to make sure parent hashes match our caches // set the first ancestor to the blocks' parent hash. but keep going up the chain if let Some(parent_num) = block.number().checked_sub(1.into()) { - struct Ancestor { - num: U64, - hash: H256, - } - let mut ancestor = Ancestor { - num: parent_num, - hash: *block.parent_hash(), - }; - // TODO: smarter max loop on this - for _ in 0..16 { - let ancestor_number_to_hash_entry = self - .blocks_by_number - .entry_by_ref(&ancestor.num) - .or_insert(ancestor.hash) - .await; - - if *ancestor_number_to_hash_entry.value() == ancestor.hash { - // the existing number entry matches. all good - break; - } - - // oh no! ancestor_number_to_hash_entry is different - - // remove the uncled entry in blocks_by_hash - // we will look it up later if necessary - self.blocks_by_hash - .invalidate(ancestor_number_to_hash_entry.value()) - .await; - - // TODO: delete any cached entries for eth_getBlockByHash or eth_getBlockByNumber - - // TODO: race on this drop and insert? - drop(ancestor_number_to_hash_entry); - - // update the entry in blocks_by_number - self.blocks_by_number - .insert(ancestor.num, ancestor.hash) - .await; - - // try to check the parent of this ancestor - if let Some(ancestor_block) = self.blocks_by_hash.get(&ancestor.hash).await { - match ancestor_block.number().checked_sub(1.into()) { - None => break, - Some(ancestor_parent_num) => { - ancestor = Ancestor { - num: ancestor_parent_num, - hash: *ancestor_block.parent_hash(), - } - } - } - } else { - break; - } - } + self.blocks_by_number + .insert(parent_num, *block.parent_hash()) + .await; } } @@ -399,67 +349,43 @@ impl Web3Rpcs { ConsensusFinder::new(Some(self.max_head_block_age), Some(self.max_head_block_lag)); // TODO: what timeout on block receiver? we want to keep consensus_finder fresh so that server tiers are correct - let double_block_time = average_block_interval(self.chain_id).mul_f32(2.0); - - let mut had_first_success = false; + let triple_block_time = average_block_interval(self.chain_id).mul_f32(3.0); loop { - match timeout(double_block_time, block_and_rpc_receiver.recv()).await { - Ok(Some((new_block, rpc))) => { - let rpc_name = rpc.name.clone(); - let rpc_is_backup = rpc.backup; + select! { + x = block_and_rpc_receiver.recv() => { + match x { + Some((new_block, rpc)) => { + let rpc_name = rpc.name.clone(); - // TODO: what timeout on this? - match timeout( - Duration::from_secs(1), - consensus_finder.process_block_from_rpc(self, new_block, rpc), - ) - .await - { - Ok(Ok(_)) => had_first_success = true, - Ok(Err(err)) => { - if had_first_success { - error!( - "error while processing block from rpc {}: {:#?}", - rpc_name, err - ); - } else { - debug!( - "startup error while processing block from rpc {}: {:#?}", - rpc_name, err - ); + // TODO: we used to have a timeout on this, but i think it was obscuring a bug + match consensus_finder + .process_block_from_rpc(self, new_block, rpc) + .await + { + Ok(_) => {}, + Err(err) => { + error!( + "error while processing block from rpc {}: {:#?}", + rpc_name, err + ); + } } } - Err(timeout) => { - if rpc_is_backup { - debug!( - ?timeout, - "timeout while processing block from {}", rpc_name - ); - } else { - warn!(?timeout, "timeout while processing block from {}", rpc_name); - } + None => { + // TODO: panic is probably too much, but getting here is definitely not good + return Err(anyhow::anyhow!("block_receiver on {} exited", self).into()); } } } - Ok(None) => { - // TODO: panic is probably too much, but getting here is definitely not good - return Err(anyhow::anyhow!("block_receiver on {} exited", self).into()); - } - Err(_) => { + _ = sleep(triple_block_time) => { // TODO: what timeout on this? - match timeout( - Duration::from_secs(2), - consensus_finder.refresh(self, None, None), - ) - .await - { - Ok(Ok(_)) => {} - Ok(Err(err)) => { - error!("error while refreshing consensus finder: {:#?}", err); + match consensus_finder.refresh(self, None, None).await { + Ok(_) => { + warn!("had to refresh consensus finder. is the network going slow?"); } - Err(timeout) => { - error!("timeout while refreshing consensus finder: {:#?}", timeout); + Err(err) => { + error!("error while refreshing consensus finder: {:#?}", err); } } } diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 380f484f..76b8b2ea 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -398,11 +398,23 @@ impl OpenRequestHandle { for prefix in archive_prefixes { if error.message.starts_with(prefix) { // TODO: what error? - response = Err(Web3ProxyError::NoBlockNumberOrHash); + response = Err(Web3ProxyError::ArchiveRequired { + min: self.web3_request.min_block_needed(), + max: self.web3_request.max_block_needed(), + }); break; } } } + + ResponseType::Error + } + -32001 => { + if error.message == "Exceeded the quota usage" { + ResponseType::RateLimited + } else { + ResponseType::Error + } } -32601 => { let error_msg = error.message.as_ref(); @@ -422,11 +434,11 @@ impl OpenRequestHandle { response = Err(Web3ProxyError::MethodNotFound(method.into())) } - } - _ => {} - } - ResponseType::Error + ResponseType::Error + } + _ => ResponseType::Error, + } } } },