From 9f7f657926305877a5ca07bdc542d48cc37a6f3b Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 3 Jun 2022 22:22:55 +0000 Subject: [PATCH] retry providers --- web3-proxy/src/connection.rs | 166 ++++++++++++++++++++--------------- 1 file changed, 93 insertions(+), 73 deletions(-) diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 5fb2b948..c5ebed5a 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -71,7 +71,7 @@ pub struct Web3Connection { /// keep track of currently open requests. We sort on this active_requests: AtomicU32, /// provider is in a RwLock so that we can replace it if re-connecting - provider: RwLock>, + provider: RwLock>>, /// rate limits are stored in a central redis so that multiple proxies can share their rate limits hard_limit: Option, /// used for load balancing to the least loaded server @@ -127,12 +127,14 @@ impl Web3Connection { // since this lock is held open over an await, we use tokio's locking let mut provider = self.provider.write().await; + *provider = None; + // tell the block subscriber that we are at 0 block_sender.send_async((Block::default(), rpc_id)).await?; let new_provider = Web3Provider::from_str(&self.url, http_client).await?; - *provider = Arc::new(new_provider); + *provider = Some(Arc::new(new_provider)); Ok(()) } @@ -165,7 +167,7 @@ impl Web3Connection { let connection = Web3Connection { url: url_str.clone(), active_requests: 0.into(), - provider: RwLock::new(Arc::new(provider)), + provider: RwLock::new(Some(Arc::new(provider))), hard_limit, soft_limit, }; @@ -214,6 +216,11 @@ impl Web3Connection { self.soft_limit } + #[inline] + pub async fn has_provider(&self) -> bool { + self.provider.read().await.is_some() + } + #[instrument(skip_all)] async fn send_block( self: &Arc, @@ -246,89 +253,89 @@ impl Web3Connection { loop { info!("Watching new_heads on {}", self); - // TODO: is a RwLock of Arc the right thing here? - let provider = self.provider.read().await.clone(); + // TODO: is a RwLock of an Option the right thing here? + if let Some(provider) = self.provider.read().await.clone() { + match &*provider { + Web3Provider::Http(provider) => { + // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints + // TODO: what should this interval be? probably some fraction of block time. set automatically? + // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now + // TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though + let mut interval = interval(Duration::from_secs(2)); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - match &*provider { - Web3Provider::Http(provider) => { - // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints - // TODO: what should this interval be? probably some fraction of block time. set automatically? - // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now - // TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though - let mut interval = interval(Duration::from_secs(2)); - interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + let mut last_hash = Default::default(); - let mut last_hash = Default::default(); + loop { + // wait for the interval + // TODO: if error or rate limit, increase interval? + interval.tick().await; - loop { - // wait for the interval - // TODO: if error or rate limit, increase interval? - interval.tick().await; + match self.try_request_handle().await { + Ok(active_request_handle) => { + // TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest" + let block: Result, _> = provider + .request("eth_getBlockByNumber", ("latest", false)) + .await; - match self.try_request_handle().await { - Ok(active_request_handle) => { - // TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest" - let block: Result, _> = provider - .request("eth_getBlockByNumber", ("latest", false)) - .await; + drop(active_request_handle); - drop(active_request_handle); + // don't send repeat blocks + if let Ok(block) = &block { + let new_hash = block.hash.unwrap(); - // don't send repeat blocks - if let Ok(block) = &block { - let new_hash = block.hash.unwrap(); + if new_hash == last_hash { + continue; + } - if new_hash == last_hash { - continue; + last_hash = new_hash; } - last_hash = new_hash; + self.send_block(block, &block_sender, rpc_id).await?; + } + Err(e) => { + warn!("Failed getting latest block from {}: {:?}", self, e); } - - self.send_block(block, &block_sender, rpc_id).await?; - } - Err(e) => { - warn!("Failed getting latest block from {}: {:?}", self, e); } } } - } - Web3Provider::Ws(provider) => { - // rate limits - let active_request_handle = self.wait_for_request_handle().await; + Web3Provider::Ws(provider) => { + // rate limits + let active_request_handle = self.wait_for_request_handle().await; - // TODO: automatically reconnect? - // TODO: it would be faster to get the block number, but subscriptions don't provide that - // TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out? - let mut stream = provider.subscribe_blocks().await?; + // TODO: automatically reconnect? + // TODO: it would be faster to get the block number, but subscriptions don't provide that + // TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out? + let mut stream = provider.subscribe_blocks().await?; - drop(active_request_handle); - let active_request_handle = self.wait_for_request_handle().await; + drop(active_request_handle); + let active_request_handle = self.wait_for_request_handle().await; - // query the block once since the subscription doesn't send the current block - // there is a very small race condition here where the stream could send us a new block right now - // all it does is print "new block" for the same block as current block - // TODO: rate limit! - let block: Result, _> = active_request_handle - .request("eth_getBlockByNumber", ("latest", false)) - .await; + // query the block once since the subscription doesn't send the current block + // there is a very small race condition here where the stream could send us a new block right now + // all it does is print "new block" for the same block as current block + // TODO: rate limit! + let block: Result, _> = active_request_handle + .request("eth_getBlockByNumber", ("latest", false)) + .await; - self.send_block(block, &block_sender, rpc_id).await?; + self.send_block(block, &block_sender, rpc_id).await?; - // TODO: should the stream have a timeout on it here? - // TODO: although reconnects will make this less of an issue - loop { - match stream.next().await { - Some(new_block) => { - self.send_block(Ok(new_block), &block_sender, rpc_id) - .await?; + // TODO: should the stream have a timeout on it here? + // TODO: although reconnects will make this less of an issue + loop { + match stream.next().await { + Some(new_block) => { + self.send_block(Ok(new_block), &block_sender, rpc_id) + .await?; - // TODO: really not sure about this - task::yield_now().await; - } - None => { - warn!("subscription ended"); - break; + // TODO: really not sure about this + task::yield_now().await; + } + None => { + warn!("subscription ended"); + break; + } } } } @@ -336,12 +343,11 @@ impl Web3Connection { } if reconnect { - drop(provider); - // TODO: exponential backoff - warn!("new heads subscription exited. reconnecting in 10 seconds..."); - sleep(Duration::from_secs(10)).await; + warn!("new heads subscription exited. Attempting to reconnect in 1 second..."); + sleep(Duration::from_secs(1)).await; + // TODO: loop on reconnecting! do not return with a "?" here self.reconnect(&block_sender, rpc_id).await?; } else { break; @@ -370,6 +376,12 @@ impl Web3Connection { } pub async fn try_request_handle(self: &Arc) -> Result { + // check that we are connected + if !self.has_provider().await { + // TODO: how long? use the same amount as the exponential backoff on retry + return Err(Duration::from_secs(1)); + } + // check rate limits if let Some(ratelimiter) = self.hard_limit.as_ref() { match ratelimiter.throttle().await { @@ -424,9 +436,17 @@ impl ActiveRequestHandle { // TODO: including params in this is way too verbose trace!("Sending {} to {}", method, self.0); - let provider = self.0.provider.read().await.clone(); + let mut provider = None; - let response = match &*provider { + while provider.is_none() { + // TODO: if no provider, don't unwrap. wait until there is one. + match self.0.provider.read().await.as_ref() { + None => {} + Some(found_provider) => provider = Some(found_provider.clone()), + } + } + + let response = match &*provider.unwrap() { Web3Provider::Http(provider) => provider.request(method, params).await, Web3Provider::Ws(provider) => provider.request(method, params).await, };