survive an rpc being offline

i still want retries on some of these
This commit is contained in:
Bryan Stitt 2022-07-19 04:21:32 +00:00
parent 75098d83b6
commit 5fb3298cd0
4 changed files with 75 additions and 62 deletions

@ -346,8 +346,8 @@ impl Web3ProxyApp {
let (balanced_rpcs, balanced_handle) = Web3Connections::spawn(
app_config.shared.chain_id,
balanced_rpcs,
http_client.as_ref(),
redis_client_pool.as_ref(),
http_client.clone(),
redis_client_pool.clone(),
Some(head_block_sender),
Some(pending_tx_sender.clone()),
pending_transactions.clone(),
@ -364,8 +364,8 @@ impl Web3ProxyApp {
let (private_rpcs, private_handle) = Web3Connections::spawn(
app_config.shared.chain_id,
private_rpcs,
http_client.as_ref(),
redis_client_pool.as_ref(),
http_client.clone(),
redis_client_pool.clone(),
// subscribing to new heads here won't work well
None,
// TODO: subscribe to pending transactions on the private rpcs?

@ -34,8 +34,8 @@ pub struct AppConfig {
/// shared configuration between Web3Connections
#[derive(Debug, Deserialize)]
pub struct RpcSharedConfig {
/// TODO: what type for chain_id? TODO: this isn't at the right level. this is inside a "Config"
pub chain_id: usize,
// TODO: better type for chain_id? max of `u64::MAX / 2 - 36` https://github.com/ethereum/EIPs/issues/2294
pub chain_id: u64,
pub rate_limit_redis: Option<String>,
// TODO: serde default for development?
// TODO: allow no limit?
@ -54,9 +54,9 @@ impl Web3ConnectionConfig {
// #[instrument(name = "try_build_Web3ConnectionConfig", skip_all)]
pub async fn spawn(
self,
redis_client_pool: Option<&redis_cell_client::RedisClientPool>,
chain_id: usize,
http_client: Option<&reqwest::Client>,
redis_client_pool: Option<redis_cell_client::RedisClientPool>,
chain_id: u64,
http_client: Option<reqwest::Client>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
block_sender: Option<flume::Sender<(Block<TxHash>, Arc<Web3Connection>)>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Web3Connection>)>>,

@ -27,16 +27,11 @@ pub enum Web3Provider {
impl Web3Provider {
#[instrument]
async fn from_str(
url_str: &str,
http_client: Option<&reqwest::Client>,
) -> anyhow::Result<Self> {
async fn from_str(url_str: &str, http_client: Option<reqwest::Client>) -> anyhow::Result<Self> {
let provider = if url_str.starts_with("http") {
let url: url::Url = url_str.parse()?;
let http_client = http_client
.ok_or_else(|| anyhow::anyhow!("no http_client"))?
.clone();
let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?;
let provider = ethers::providers::Http::new_with_client(url, http_client);
@ -114,9 +109,9 @@ impl fmt::Debug for Web3Connection {
let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed);
if block_data_limit == u64::MAX {
f.field("limit", &"archive");
f.field("data", &"archive");
} else {
f.field("limit", &block_data_limit);
f.field("data", &block_data_limit);
}
f.finish_non_exhaustive()
@ -132,15 +127,15 @@ impl fmt::Display for Web3Connection {
impl Web3Connection {
/// Connect to a web3 rpc
// #[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))]
// TODO: have this take a builder (which will have senders attached)
// TODO: have this take a builder (which will have channels attached)
#[allow(clippy::too_many_arguments)]
pub async fn spawn(
chain_id: usize,
chain_id: u64,
url_str: String,
// optional because this is only used for http providers. websocket providers don't use it
http_client: Option<&reqwest::Client>,
http_client: Option<reqwest::Client>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
hard_limit: Option<(u32, &redis_cell_client::RedisClientPool)>,
hard_limit: Option<(u32, redis_cell_client::RedisClientPool)>,
// TODO: think more about this type
soft_limit: u32,
block_sender: Option<flume::Sender<(Block<TxHash>, Arc<Self>)>>,
@ -149,9 +144,10 @@ impl Web3Connection {
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
let hard_limit = hard_limit.map(|(hard_rate_limit, redis_conection)| {
// TODO: allow different max_burst and count_per_period and period
// TODO: if this url rate limits by IP instead of api key, we want to include our public ip in this key
let period = 1;
RedisCellClient::new(
redis_conection.clone(),
redis_conection,
format!("{},{}", chain_id, url_str),
hard_rate_limit,
hard_rate_limit,
@ -177,7 +173,7 @@ impl Web3Connection {
// TODO: move this outside the `new` function and into a `start` function or something. that way we can do retries from there
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
// TODO: this will wait forever. do we want that?
let found_chain_id: Result<String, _> = new_connection
let found_chain_id: Result<U64, _> = new_connection
.wait_for_request_handle()
.await
.request("eth_chainId", Option::None::<()>)
@ -186,19 +182,18 @@ impl Web3Connection {
match found_chain_id {
Ok(found_chain_id) => {
// TODO: there has to be a cleaner way to do this
let found_chain_id =
usize::from_str_radix(found_chain_id.trim_start_matches("0x"), 16).unwrap();
if chain_id != found_chain_id {
if chain_id != found_chain_id.as_u64() {
return Err(anyhow::anyhow!(
"incorrect chain id! Expected {}. Found {}",
chain_id,
found_chain_id
));
)
.context(format!("failed spawning {}", new_connection)));
}
}
Err(e) => {
let e = anyhow::Error::from(e).context(format!("{}", new_connection));
let e =
anyhow::Error::from(e).context(format!("failed spawning {}", new_connection));
return Err(e);
}
}
@ -225,7 +220,7 @@ impl Web3Connection {
for block_data_limit in [u64::MAX, 90_000, 128, 64, 32] {
let mut head_block_num = new_connection.head_block.read().1;
// TODO: wait until head block is set outside the loop?
// TODO: wait until head block is set outside the loop? if we disconnect while starting we could actually get 0 though
while head_block_num == U64::zero() {
info!(?new_connection, "no head block");
@ -235,8 +230,9 @@ impl Web3Connection {
head_block_num = new_connection.head_block.read().1;
}
// TODO: subtract 1 from block_data_limit for safety?
let maybe_archive_block = head_block_num
.saturating_sub(block_data_limit.into())
.saturating_sub((block_data_limit).into())
.max(U64::one());
let archive_result: Result<Bytes, _> = new_connection

@ -5,7 +5,7 @@ use counter::Counter;
use dashmap::DashMap;
use derive_more::From;
use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256, U64};
use futures::future::try_join_all;
use futures::future::{join_all, try_join_all};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hashbrown::HashMap;
@ -94,20 +94,14 @@ impl fmt::Debug for Web3Connections {
impl Web3Connections {
// #[instrument(name = "spawn_Web3Connections", skip_all)]
pub async fn spawn(
chain_id: usize,
chain_id: u64,
server_configs: Vec<Web3ConnectionConfig>,
http_client: Option<&reqwest::Client>,
redis_client_pool: Option<&redis_cell_client::RedisClientPool>,
http_client: Option<reqwest::Client>,
redis_client_pool: Option<redis_cell_client::RedisClientPool>,
head_block_sender: Option<watch::Sender<Block<TxHash>>>,
pending_tx_sender: Option<broadcast::Sender<TxState>>,
pending_transactions: Arc<DashMap<TxHash, TxState>>,
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
let num_connections = server_configs.len();
// TODO: try_join_all
let mut handles = vec![];
// TODO: only create these if head_block_sender and pending_tx_sender are set
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) = flume::unbounded();
@ -116,7 +110,7 @@ impl Web3Connections {
drop(receiver);
// TODO: what interval? follow a websocket instead?
// TODO: what interval? follow a websocket also? maybe by watching synced connections with a timeout. will need debounce
let mut interval = interval(Duration::from_secs(13));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
@ -146,26 +140,49 @@ impl Web3Connections {
None
};
// turn configs into connections
let mut connections = Vec::with_capacity(num_connections);
for server_config in server_configs.into_iter() {
match server_config
.spawn(
redis_client_pool,
chain_id,
http_client,
http_interval_sender.clone(),
Some(block_sender.clone()),
Some(pending_tx_id_sender.clone()),
)
.await
{
Ok((connection, connection_handle)) => {
handles.push(flatten_handle(connection_handle));
connections.push(connection)
// turn configs into connections (in parallel)
let spawn_handles: Vec<_> = server_configs
.into_iter()
.map(|server_config| {
let http_client = http_client.clone();
let redis_client_pool = redis_client_pool.clone();
let http_interval_sender = http_interval_sender.clone();
let block_sender = Some(block_sender.clone());
let pending_tx_id_sender = Some(pending_tx_id_sender.clone());
tokio::spawn(async move {
server_config
.spawn(
redis_client_pool,
chain_id,
http_client,
http_interval_sender,
block_sender,
pending_tx_id_sender,
)
.await
})
})
.collect();
let mut connections = vec![];
let mut handles = vec![];
// TODO: futures unordered?
for x in join_all(spawn_handles).await {
// TODO: how should we handle errors here? one rpc being down shouldn't cause the program to exit
match x {
Ok(Ok((connection, handle))) => {
connections.push(connection);
handles.push(handle);
}
Ok(Err(err)) => {
// TODO: some of these are probably retry-able
error!(?err);
}
Err(err) => {
return Err(err.into());
}
// TODO: include the server url in this
Err(e) => warn!("Unable to connect to a server! {:?}", e),
}
}