auto detect archive nodes

This commit is contained in:
Bryan Stitt 2022-07-09 04:25:59 +00:00
parent 8926b5811e
commit 290da41f12
3 changed files with 58 additions and 21 deletions

View File

@ -1,7 +1,7 @@
[shared] [shared]
chain_id = 1 chain_id = 1
# in prod, do `rate_limit_redis = "redis://redis:6379/"` # in prod, do `rate_limit_redis = "redis://redis:6379/"`
rate_limit_redis = "redis://dev-redis:6379/" #rate_limit_redis = "redis://dev-redis:6379/"
public_rate_limit_per_minute = 60_000 public_rate_limit_per_minute = 60_000
[balanced_rpcs] [balanced_rpcs]

View File

@ -1,7 +1,7 @@
///! Rate-limited communication with a web3 provider ///! Rate-limited communication with a web3 provider
use anyhow::Context; use anyhow::Context;
use derive_more::From; use derive_more::From;
use ethers::prelude::{Block, Middleware, ProviderError, TxHash}; use ethers::prelude::{Block, Middleware, ProviderError, TxHash, U256};
use futures::future::try_join_all; use futures::future::try_join_all;
use futures::StreamExt; use futures::StreamExt;
use redis_cell_client::RedisCellClient; use redis_cell_client::RedisCellClient;
@ -9,12 +9,12 @@ use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize; use serde::Serialize;
use std::fmt; use std::fmt;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicU32}; use std::sync::atomic::{self, AtomicBool, AtomicU32};
use std::{cmp::Ordering, sync::Arc}; use std::{cmp::Ordering, sync::Arc};
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio::time::{interval, sleep, Duration, MissedTickBehavior}; use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
use tracing::{error, info, instrument, trace, warn}; use tracing::{debug, error, info, instrument, trace, warn};
use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::app::{flatten_handle, AnyhowJoinHandle};
@ -81,7 +81,7 @@ pub struct Web3Connection {
hard_limit: Option<redis_cell_client::RedisCellClient>, hard_limit: Option<redis_cell_client::RedisCellClient>,
/// used for load balancing to the least loaded server /// used for load balancing to the least loaded server
soft_limit: u32, soft_limit: u32,
// TODO: track total number of requests? archive: AtomicBool,
} }
impl Serialize for Web3Connection { impl Serialize for Web3Connection {
@ -109,6 +109,7 @@ impl fmt::Debug for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Web3Connection") f.debug_struct("Web3Connection")
.field("url", &self.url) .field("url", &self.url)
.field("archive", &self.is_archive())
.finish_non_exhaustive() .finish_non_exhaustive()
} }
} }
@ -150,21 +151,22 @@ impl Web3Connection {
let provider = Web3Provider::from_str(&url_str, http_client).await?; let provider = Web3Provider::from_str(&url_str, http_client).await?;
let connection = Self { let new_connection = Self {
url: url_str.clone(), url: url_str.clone(),
active_requests: 0.into(), active_requests: 0.into(),
provider: RwLock::new(Some(Arc::new(provider))), provider: RwLock::new(Some(Arc::new(provider))),
hard_limit, hard_limit,
soft_limit, soft_limit,
archive: Default::default(),
}; };
let connection = Arc::new(connection); let new_connection = Arc::new(new_connection);
// check the server's chain_id here // check the server's chain_id here
// TODO: move this outside the `new` function and into a `start` function or something // TODO: move this outside the `new` function and into a `start` function or something
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
// TODO: this will wait forever. do we want that? // TODO: this will wait forever. do we want that?
let found_chain_id: Result<String, _> = connection let found_chain_id: Result<String, _> = new_connection
.wait_for_request_handle() .wait_for_request_handle()
.await .await
.request("eth_chainId", Option::None::<()>) .request("eth_chainId", Option::None::<()>)
@ -182,26 +184,50 @@ impl Web3Connection {
chain_id, chain_id,
found_chain_id found_chain_id
)); ));
} else {
info!(?connection, "success");
} }
} }
Err(e) => { Err(e) => {
let e = anyhow::Error::from(e).context(format!("{}", connection)); let e = anyhow::Error::from(e).context(format!("{}", new_connection));
return Err(e); return Err(e);
} }
} }
// we could take "archive" as a parameter, but we would want a safety check on it regardless
// so instead we just query it and use it
// TODO: this query is not correct. find one that fails on
let archive_result: Result<U256, _> = new_connection
.wait_for_request_handle()
.await
.request(
"eth_getBalance",
("0xdead00000000000000000000000000000000beef", "0x1"),
)
.await;
trace!(?archive_result, "{}", new_connection);
if archive_result.is_ok() {
new_connection
.archive
.store(true, atomic::Ordering::Relaxed);
}
info!(?new_connection, "success");
let handle = { let handle = {
let connection = connection.clone(); let new_connection = new_connection.clone();
tokio::spawn(async move { tokio::spawn(async move {
connection new_connection
.subscribe(http_interval_sender, block_sender, tx_id_sender, reconnect) .subscribe(http_interval_sender, block_sender, tx_id_sender, reconnect)
.await .await
}) })
}; };
Ok((connection, handle)) Ok((new_connection, handle))
}
pub fn is_archive(&self) -> bool {
self.archive.load(atomic::Ordering::Relaxed)
} }
#[instrument(skip_all)] #[instrument(skip_all)]

View File

@ -613,6 +613,7 @@ impl Web3Connections {
.inner .inner
.iter() .iter()
.filter(|x| !skip.contains(x)) .filter(|x| !skip.contains(x))
.filter(|x| if archive_needed { x.is_archive() } else { true })
.cloned() .cloned()
.collect(); .collect();
@ -677,6 +678,10 @@ impl Web3Connections {
let mut selected_rpcs = vec![]; let mut selected_rpcs = vec![];
for connection in self.inner.iter() { for connection in self.inner.iter() {
if archive_needed && !connection.is_archive() {
continue;
}
// check rate limits and increment our connection counter // check rate limits and increment our connection counter
match connection.try_request_handle().await { match connection.try_request_handle().await {
Err(retry_after) => { Err(retry_after) => {
@ -725,17 +730,23 @@ impl Web3Connections {
if let Some(error) = &response.error { if let Some(error) = &response.error {
trace!(?response, "rpc error"); trace!(?response, "rpc error");
// some errors should be retried // some errors should be retried on other nodes
if error.code == -32000 if error.code == -32000 {
&& [ let error_msg = error.message.as_str();
// TODO: regex?
let retry_prefixes = [
"header not found", "header not found",
"header for hash not found", "header for hash not found",
"missing trie node",
"node not started", "node not started",
"RPC timeout", "RPC timeout",
] ];
.contains(&error.message.as_str()) for retry_prefix in retry_prefixes {
{ if error_msg.starts_with(retry_prefix) {
continue; continue;
}
}
} }
} else { } else {
trace!(?response, "rpc success"); trace!(?response, "rpc success");