trust just removing uncles from the cache

This commit is contained in:
Bryan Stitt 2023-10-10 12:19:54 -07:00
parent 4fe9ec38f0
commit a10a283023
3 changed files with 70 additions and 111 deletions

@ -52,6 +52,12 @@ pub enum Web3ProxyError {
#[error(ignore)]
Anyhow(anyhow::Error),
Arc(Arc<Self>),
#[from(ignore)]
#[display(fmt = "{:?} to {:?}", min, max)]
ArchiveRequired {
min: Option<U64>,
max: Option<U64>,
},
#[error(ignore)]
#[from(ignore)]
BadRequest(Cow<'static, str>),
@ -235,6 +241,21 @@ impl Web3ProxyError {
},
)
}
Self::ArchiveRequired { min, max } => {
// TODO: attach something to this trace. probably don't include much in the message though. don't want to leak creds by accident
trace!(?min, ?max, "archive node required");
(
StatusCode::OK,
JsonRpcErrorData {
message: "Archive data required".into(),
code: StatusCode::OK.as_u16().into(),
data: Some(json!({
"min": min,
"max": max,
})),
},
)
}
Self::Anyhow(err) => {
error!(?err, "anyhow: {}", err);
(

@ -12,8 +12,9 @@ use serde_json::json;
use std::hash::Hash;
use std::time::Duration;
use std::{fmt::Display, sync::Arc};
use tokio::select;
use tokio::sync::mpsc;
use tokio::time::timeout;
use tokio::time::sleep;
use tracing::{debug, error, warn};
// TODO: type for Hydrated Blocks with their full transactions?
@ -172,60 +173,9 @@ impl Web3Rpcs {
// loop to make sure parent hashes match our caches
// set the first ancestor to the blocks' parent hash. but keep going up the chain
if let Some(parent_num) = block.number().checked_sub(1.into()) {
struct Ancestor {
num: U64,
hash: H256,
}
let mut ancestor = Ancestor {
num: parent_num,
hash: *block.parent_hash(),
};
// TODO: smarter max loop on this
for _ in 0..16 {
let ancestor_number_to_hash_entry = self
.blocks_by_number
.entry_by_ref(&ancestor.num)
.or_insert(ancestor.hash)
.await;
if *ancestor_number_to_hash_entry.value() == ancestor.hash {
// the existing number entry matches. all good
break;
}
// oh no! ancestor_number_to_hash_entry is different
// remove the uncled entry in blocks_by_hash
// we will look it up later if necessary
self.blocks_by_hash
.invalidate(ancestor_number_to_hash_entry.value())
.await;
// TODO: delete any cached entries for eth_getBlockByHash or eth_getBlockByNumber
// TODO: race on this drop and insert?
drop(ancestor_number_to_hash_entry);
// update the entry in blocks_by_number
self.blocks_by_number
.insert(ancestor.num, ancestor.hash)
.await;
// try to check the parent of this ancestor
if let Some(ancestor_block) = self.blocks_by_hash.get(&ancestor.hash).await {
match ancestor_block.number().checked_sub(1.into()) {
None => break,
Some(ancestor_parent_num) => {
ancestor = Ancestor {
num: ancestor_parent_num,
hash: *ancestor_block.parent_hash(),
}
}
}
} else {
break;
}
}
self.blocks_by_number
.insert(parent_num, *block.parent_hash())
.await;
}
}
@ -399,67 +349,43 @@ impl Web3Rpcs {
ConsensusFinder::new(Some(self.max_head_block_age), Some(self.max_head_block_lag));
// TODO: what timeout on block receiver? we want to keep consensus_finder fresh so that server tiers are correct
let double_block_time = average_block_interval(self.chain_id).mul_f32(2.0);
let mut had_first_success = false;
let triple_block_time = average_block_interval(self.chain_id).mul_f32(3.0);
loop {
match timeout(double_block_time, block_and_rpc_receiver.recv()).await {
Ok(Some((new_block, rpc))) => {
let rpc_name = rpc.name.clone();
let rpc_is_backup = rpc.backup;
select! {
x = block_and_rpc_receiver.recv() => {
match x {
Some((new_block, rpc)) => {
let rpc_name = rpc.name.clone();
// TODO: what timeout on this?
match timeout(
Duration::from_secs(1),
consensus_finder.process_block_from_rpc(self, new_block, rpc),
)
.await
{
Ok(Ok(_)) => had_first_success = true,
Ok(Err(err)) => {
if had_first_success {
error!(
"error while processing block from rpc {}: {:#?}",
rpc_name, err
);
} else {
debug!(
"startup error while processing block from rpc {}: {:#?}",
rpc_name, err
);
// TODO: we used to have a timeout on this, but i think it was obscuring a bug
match consensus_finder
.process_block_from_rpc(self, new_block, rpc)
.await
{
Ok(_) => {},
Err(err) => {
error!(
"error while processing block from rpc {}: {:#?}",
rpc_name, err
);
}
}
}
Err(timeout) => {
if rpc_is_backup {
debug!(
?timeout,
"timeout while processing block from {}", rpc_name
);
} else {
warn!(?timeout, "timeout while processing block from {}", rpc_name);
}
None => {
// TODO: panic is probably too much, but getting here is definitely not good
return Err(anyhow::anyhow!("block_receiver on {} exited", self).into());
}
}
}
Ok(None) => {
// TODO: panic is probably too much, but getting here is definitely not good
return Err(anyhow::anyhow!("block_receiver on {} exited", self).into());
}
Err(_) => {
_ = sleep(triple_block_time) => {
// TODO: what timeout on this?
match timeout(
Duration::from_secs(2),
consensus_finder.refresh(self, None, None),
)
.await
{
Ok(Ok(_)) => {}
Ok(Err(err)) => {
error!("error while refreshing consensus finder: {:#?}", err);
match consensus_finder.refresh(self, None, None).await {
Ok(_) => {
warn!("had to refresh consensus finder. is the network going slow?");
}
Err(timeout) => {
error!("timeout while refreshing consensus finder: {:#?}", timeout);
Err(err) => {
error!("error while refreshing consensus finder: {:#?}", err);
}
}
}

@ -398,11 +398,23 @@ impl OpenRequestHandle {
for prefix in archive_prefixes {
if error.message.starts_with(prefix) {
// TODO: what error?
response = Err(Web3ProxyError::NoBlockNumberOrHash);
response = Err(Web3ProxyError::ArchiveRequired {
min: self.web3_request.min_block_needed(),
max: self.web3_request.max_block_needed(),
});
break;
}
}
}
ResponseType::Error
}
-32001 => {
if error.message == "Exceeded the quota usage" {
ResponseType::RateLimited
} else {
ResponseType::Error
}
}
-32601 => {
let error_msg = error.message.as_ref();
@ -422,11 +434,11 @@ impl OpenRequestHandle {
response =
Err(Web3ProxyError::MethodNotFound(method.into()))
}
}
_ => {}
}
ResponseType::Error
ResponseType::Error
}
_ => ResponseType::Error,
}
}
}
},