From fe53ffdb408e4e3e4aef563e4dfa867a6f54fbc4 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 29 Jun 2022 19:15:05 +0000 Subject: [PATCH] shared interval for http --- TODO.md | 1 + web3-proxy/src/config.rs | 6 ++++-- web3-proxy/src/connection.rs | 26 +++++++++++++++-------- web3-proxy/src/connections.rs | 39 ++++++++++++++++++++++++++++++++--- 4 files changed, 58 insertions(+), 14 deletions(-) diff --git a/TODO.md b/TODO.md index 0c986394..72286666 100644 --- a/TODO.md +++ b/TODO.md @@ -61,6 +61,7 @@ - [ ] emit stats for successes, retries, failures, with the types of requests, account, chain, rpc - [ ] automated soft limit - [ ] if we send a transaction to private rpcs and then people query it on public rpcs things, some interfaces might think the transaction is dropped (i saw this happen in a brownie script of mine). how should we handle this? +- [ ] don't "unwrap" anywhere. give proper errors ## V2 diff --git a/web3-proxy/src/config.rs b/web3-proxy/src/config.rs index 4f22bd9f..913f2a98 100644 --- a/web3-proxy/src/config.rs +++ b/web3-proxy/src/config.rs @@ -5,6 +5,7 @@ use serde::Deserialize; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; +use tokio::sync::broadcast; use crate::app::AnyhowJoinHandle; use crate::connection::Web3Connection; @@ -83,9 +84,9 @@ impl Web3ConnectionConfig { rate_limiter: Option<&redis_cell_client::MultiplexedConnection>, chain_id: usize, http_client: Option<&reqwest::Client>, + http_interval_sender: Option>>, block_sender: Option, Arc)>>, tx_id_sender: Option)>>, - reconnect: bool, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let hard_rate_limit = self.hard_limit.map(|x| (x, rate_limiter.unwrap())); @@ -93,11 +94,12 @@ impl Web3ConnectionConfig { chain_id, self.url, http_client, + http_interval_sender, hard_rate_limit, self.soft_limit, block_sender, tx_id_sender, - reconnect, + true, ) .await } diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 1aa56b25..f7b54159 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -11,6 +11,7 @@ use std::fmt; use std::hash::{Hash, Hasher}; use std::sync::atomic::{self, AtomicU32}; use std::{cmp::Ordering, sync::Arc}; +use tokio::sync::broadcast; use tokio::sync::RwLock; use tokio::task; use tokio::time::{interval, sleep, Duration, MissedTickBehavior}; @@ -129,6 +130,7 @@ impl Web3Connection { url_str: String, // optional because this is only used for http providers. websocket providers don't use it http_client: Option<&reqwest::Client>, + http_interval_sender: Option>>, hard_limit: Option<(u32, &redis_cell_client::MultiplexedConnection)>, // TODO: think more about this type soft_limit: u32, @@ -150,7 +152,7 @@ impl Web3Connection { let provider = Web3Provider::from_str(&url_str, http_client).await?; - let connection = Web3Connection { + let connection = Self { url: url_str.clone(), active_requests: 0.into(), provider: RwLock::new(Some(Arc::new(provider))), @@ -172,6 +174,7 @@ impl Web3Connection { match found_chain_id { Ok(found_chain_id) => { + // TODO: there has to be a cleaner way to do this let found_chain_id = usize::from_str_radix(found_chain_id.trim_start_matches("0x"), 16).unwrap(); @@ -195,7 +198,7 @@ impl Web3Connection { let connection = connection.clone(); tokio::spawn(async move { connection - .subscribe(block_sender, tx_id_sender, reconnect) + .subscribe(http_interval_sender, block_sender, tx_id_sender, reconnect) .await }) }; @@ -271,15 +274,20 @@ impl Web3Connection { async fn subscribe( self: Arc, + http_interval_sender: Option>>, block_sender: Option, Arc)>>, tx_id_sender: Option)>>, reconnect: bool, ) -> anyhow::Result<()> { loop { + let http_interval_receiver = http_interval_sender.clone().map(|x| x.subscribe()); + let mut futures = vec![]; if let Some(block_sender) = &block_sender { - let f = self.clone().subscribe_new_heads(block_sender.clone()); + let f = self + .clone() + .subscribe_new_heads(http_interval_receiver, block_sender.clone()); futures.push(flatten_handle(tokio::spawn(f))); } @@ -331,6 +339,7 @@ impl Web3Connection { #[instrument(skip_all)] async fn subscribe_new_heads( self: Arc, + http_interval_receiver: Option>, block_sender: flume::Sender<(Block, Arc)>, ) -> anyhow::Result<()> { info!("watching {}", self); @@ -340,18 +349,16 @@ impl Web3Connection { match &*provider { Web3Provider::Http(_provider) => { // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints - // TODO: what should this interval be? probably some fraction of block time. set automatically? - // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now - // TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though - let mut interval = interval(Duration::from_secs(13)); - interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + // TODO: try watch_blocks and fall back to this? + + let mut http_interval_receiver = http_interval_receiver.unwrap(); let mut last_hash = Default::default(); loop { // wait for the interval // TODO: if error or rate limit, increase interval? - interval.tick().await; + http_interval_receiver.recv().await.unwrap(); match self.try_request_handle().await { Ok(active_request_handle) => { @@ -433,6 +440,7 @@ impl Web3Connection { // TODO: what should this interval be? probably automatically set to some fraction of block time // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now // TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though + // TODO: let mut interval = interval(Duration::from_secs(60)); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 1ceb444b..d5af0509 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::{broadcast, watch}; use tokio::task; -use tokio::time::sleep; +use tokio::time::{interval, sleep, MissedTickBehavior}; use tracing::{debug, error, info, info_span, instrument, trace, warn}; use crate::app::{flatten_handle, AnyhowJoinHandle, TxState}; @@ -103,6 +103,39 @@ impl Web3Connections { let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (block_sender, block_receiver) = flume::unbounded(); + let http_interval_sender = if http_client.is_some() { + let (sender, receiver) = broadcast::channel(1); + + drop(receiver); + + // TODO: what interval? + let mut interval = interval(Duration::from_secs(13)); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + let sender = Arc::new(sender); + + let f = { + let sender = sender.clone(); + + async move { + loop { + // TODO: every time a head_block arrives (maybe with a small delay), or on the interval. + interval.tick().await; + + // errors are okay. they mean that all receivers have been dropped + let _ = sender.send(()); + } + } + }; + + // TODO: do something with this handle? + tokio::spawn(f); + + Some(sender) + } else { + None + }; + // turn configs into connections let mut connections = Vec::with_capacity(num_connections); for server_config in server_configs.into_iter() { @@ -111,9 +144,9 @@ impl Web3Connections { rate_limiter, chain_id, http_client, + http_interval_sender.clone(), Some(block_sender.clone()), Some(pending_tx_id_sender.clone()), - true, ) .await { @@ -316,7 +349,7 @@ impl Web3Connections { } pub fn has_synced_rpcs(&self) -> bool { - self.synced_connections.load().inner.len() > 0 + !self.synced_connections.load().inner.is_empty() } /// Send the same request to all the handles. Returning the most common success or most common error.