From 290da41f1223c4a03e9c90616014cc1583aed9a5 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 9 Jul 2022 04:25:59 +0000 Subject: [PATCH] auto detect archive nodes --- config/example.toml | 2 +- web3-proxy/src/connection.rs | 52 ++++++++++++++++++++++++++--------- web3-proxy/src/connections.rs | 25 ++++++++++++----- 3 files changed, 58 insertions(+), 21 deletions(-) diff --git a/config/example.toml b/config/example.toml index 7e4b81d4..c8b0b8e5 100644 --- a/config/example.toml +++ b/config/example.toml @@ -1,7 +1,7 @@ [shared] chain_id = 1 # in prod, do `rate_limit_redis = "redis://redis:6379/"` -rate_limit_redis = "redis://dev-redis:6379/" +#rate_limit_redis = "redis://dev-redis:6379/" public_rate_limit_per_minute = 60_000 [balanced_rpcs] diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 69709f2e..2f9b26e4 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -1,7 +1,7 @@ ///! Rate-limited communication with a web3 provider use anyhow::Context; use derive_more::From; -use ethers::prelude::{Block, Middleware, ProviderError, TxHash}; +use ethers::prelude::{Block, Middleware, ProviderError, TxHash, U256}; use futures::future::try_join_all; use futures::StreamExt; use redis_cell_client::RedisCellClient; @@ -9,12 +9,12 @@ use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use std::fmt; use std::hash::{Hash, Hasher}; -use std::sync::atomic::{self, AtomicU32}; +use std::sync::atomic::{self, AtomicBool, AtomicU32}; use std::{cmp::Ordering, sync::Arc}; use tokio::sync::broadcast; use tokio::sync::RwLock; use tokio::time::{interval, sleep, Duration, MissedTickBehavior}; -use tracing::{error, info, instrument, trace, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; use crate::app::{flatten_handle, AnyhowJoinHandle}; @@ -81,7 +81,7 @@ pub struct Web3Connection { hard_limit: Option, /// used for load balancing to the least loaded server soft_limit: u32, - // TODO: track total number of requests? + archive: AtomicBool, } impl Serialize for Web3Connection { @@ -109,6 +109,7 @@ impl fmt::Debug for Web3Connection { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Web3Connection") .field("url", &self.url) + .field("archive", &self.is_archive()) .finish_non_exhaustive() } } @@ -150,21 +151,22 @@ impl Web3Connection { let provider = Web3Provider::from_str(&url_str, http_client).await?; - let connection = Self { + let new_connection = Self { url: url_str.clone(), active_requests: 0.into(), provider: RwLock::new(Some(Arc::new(provider))), hard_limit, soft_limit, + archive: Default::default(), }; - let connection = Arc::new(connection); + let new_connection = Arc::new(new_connection); // check the server's chain_id here // TODO: move this outside the `new` function and into a `start` function or something // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error // TODO: this will wait forever. do we want that? - let found_chain_id: Result = connection + let found_chain_id: Result = new_connection .wait_for_request_handle() .await .request("eth_chainId", Option::None::<()>) @@ -182,26 +184,50 @@ impl Web3Connection { chain_id, found_chain_id )); - } else { - info!(?connection, "success"); } } Err(e) => { - let e = anyhow::Error::from(e).context(format!("{}", connection)); + let e = anyhow::Error::from(e).context(format!("{}", new_connection)); return Err(e); } } + // we could take "archive" as a parameter, but we would want a safety check on it regardless + // so instead we just query it and use it + // TODO: this query is not correct. find one that fails on + let archive_result: Result = new_connection + .wait_for_request_handle() + .await + .request( + "eth_getBalance", + ("0xdead00000000000000000000000000000000beef", "0x1"), + ) + .await; + + trace!(?archive_result, "{}", new_connection); + + if archive_result.is_ok() { + new_connection + .archive + .store(true, atomic::Ordering::Relaxed); + } + + info!(?new_connection, "success"); + let handle = { - let connection = connection.clone(); + let new_connection = new_connection.clone(); tokio::spawn(async move { - connection + new_connection .subscribe(http_interval_sender, block_sender, tx_id_sender, reconnect) .await }) }; - Ok((connection, handle)) + Ok((new_connection, handle)) + } + + pub fn is_archive(&self) -> bool { + self.archive.load(atomic::Ordering::Relaxed) } #[instrument(skip_all)] diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index c810d25b..241de8b7 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -613,6 +613,7 @@ impl Web3Connections { .inner .iter() .filter(|x| !skip.contains(x)) + .filter(|x| if archive_needed { x.is_archive() } else { true }) .cloned() .collect(); @@ -677,6 +678,10 @@ impl Web3Connections { let mut selected_rpcs = vec![]; for connection in self.inner.iter() { + if archive_needed && !connection.is_archive() { + continue; + } + // check rate limits and increment our connection counter match connection.try_request_handle().await { Err(retry_after) => { @@ -725,17 +730,23 @@ impl Web3Connections { if let Some(error) = &response.error { trace!(?response, "rpc error"); - // some errors should be retried - if error.code == -32000 - && [ + // some errors should be retried on other nodes + if error.code == -32000 { + let error_msg = error.message.as_str(); + + // TODO: regex? + let retry_prefixes = [ "header not found", "header for hash not found", + "missing trie node", "node not started", "RPC timeout", - ] - .contains(&error.message.as_str()) - { - continue; + ]; + for retry_prefix in retry_prefixes { + if error_msg.starts_with(retry_prefix) { + continue; + } + } } } else { trace!(?response, "rpc success");