diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index 81ca75c9..1a6c3dff 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -72,10 +72,10 @@ pub enum Web3ProxyError { EthersHttpClient(ethers::providers::HttpClientError), EthersProvider(ethers::prelude::ProviderError), EthersWsClient(ethers::prelude::WsClientError), - #[display(fmt = "{} < {}", head, requested)] + #[display(fmt = "{:?} < {}", head, requested)] #[from(ignore)] FarFutureBlock { - head: U64, + head: Option, requested: U64, }, GasEstimateNotU256, diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 8c8caabb..b3fb11d1 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -932,7 +932,7 @@ impl Web3Rpc { Ok(()) } - /// Subscribe to new blocks. + /// Subscribe to new block headers. async fn subscribe_new_heads( self: &Arc, block_sender: mpsc::UnboundedSender, @@ -940,7 +940,6 @@ impl Web3Rpc { ) -> Web3ProxyResult<()> { trace!("subscribing to new heads on {}", self); - // TODO: different handler depending on backup or not let error_handler = if self.backup { Some(Level::DEBUG.into()) } else { @@ -955,9 +954,7 @@ impl Web3Rpc { // query the block once since the subscription doesn't send the current block // there is a very small race condition here where the stream could send us a new block right now - // but all seeing the same block twice won't break anything - // TODO: how does this get wrapped in an arc? does ethers handle that? - // TODO: send this request to the ws_provider instead of the http_provider + // but sending the same block twice won't break anything let latest_block: Result, _> = self .internal_request( "eth_getBlockByNumber".into(), @@ -979,7 +976,6 @@ impl Web3Rpc { } else if self.http_client.is_some() { // there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints // TODO: is 1/2 the block time okay? - // TODO: tokio-console shows this as 1ms, but it should definitely be more than that. block_interval definitely isn't 2ms let mut i = interval(self.block_interval / 2); i.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -1075,7 +1071,6 @@ impl Web3Rpc { match self.try_throttle().await? { RedisRateLimitResult::Allowed(y) => return Ok(y), RedisRateLimitResult::RetryAt(retry_at, _) => { - // TODO: check timeouts if wait_until < retry_at { break; } @@ -1086,7 +1081,7 @@ impl Web3Rpc { "retry_at is in the past {}s", retry_at.elapsed().as_secs_f32() ); - sleep(Duration::from_secs(1)).await; + sleep(Duration::from_millis(200)).await; } else { sleep_until(retry_at).await; } @@ -1162,7 +1157,6 @@ impl Web3Rpc { } // make sure this block has the oldest block that this request needs - // TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check if let Some(block_needed) = web3_request.min_block_needed() { if !self.has_block_data(block_needed) { trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing min block", self); @@ -1171,51 +1165,72 @@ impl Web3Rpc { } // make sure this block has the newest block that this request needs - // TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check if let Some(block_needed) = web3_request.max_block_needed() { if !self.has_block_data(block_needed) { trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing max block", self); - let clone = self.clone(); + let rpc = self.clone(); let connect_timeout_at = web3_request.connect_timeout_at(); + let mut head_block_receiver = + self.head_block_sender.as_ref().unwrap().subscribe(); + + // if head_block is far behind block_needed, return now + // TODO: future block limit from the config + if let Some(head_block) = head_block_receiver.borrow_and_update().as_ref() { + let head_block_number = head_block.number(); + + if head_block_number + U64::from(5) < block_needed { + return Err(Web3ProxyError::FarFutureBlock { + head: Some(head_block_number), + requested: block_needed, + }); + } + } else { + return Ok(OpenRequestResult::Failed); + } + // create a future that resolves once this rpc can serve this request // TODO: i don't love this future. think about it more let synced_f = async move { - let mut head_block_receiver = - clone.head_block_sender.as_ref().unwrap().subscribe(); - - // TODO: if head_block is far behind block_needed, retrurn now - head_block_receiver.borrow_and_update(); - loop { select! { _ = head_block_receiver.changed() => { - if let Some(head_block_number) = head_block_receiver.borrow_and_update().as_ref().map(|x| x.number()) { + let head_block = head_block_receiver.borrow_and_update(); + + if let Some(head_block_number) = head_block.as_ref().map(|x| x.number()) { if head_block_number >= block_needed { trace!("the block we needed has arrived!"); - break; - } - // wait up to 2 blocks - // TODO: configurable wait per chain - if head_block_number + U64::from(2) < block_needed { - return Err(Web3ProxyError::FarFutureBlock { head: head_block_number, requested: block_needed }); + return Ok(rpc); } } else { // TODO: what should we do? this server has no blocks at all. we can wait, but i think exiting now is best - // yield_now().await; - error!("no head block during try_request_handle on {}", clone); - return Err(Web3ProxyError::NoServersSynced); + error!("no head block during try_request_handle on {}", rpc); + break; } } _ = sleep_until(connect_timeout_at) => { - error!("connection timeout on {}", clone); - return Err(Web3ProxyError::NoServersSynced); + error!("connection timeout on {}", rpc); + break; } } } - Ok(clone) + if let Some(head_block_number) = head_block_receiver + .borrow_and_update() + .as_ref() + .map(|x| x.number()) + { + Err(Web3ProxyError::FarFutureBlock { + head: Some(head_block_number), + requested: block_needed, + }) + } else { + Err(Web3ProxyError::FarFutureBlock { + head: None, + requested: block_needed, + }) + } }; return Ok(OpenRequestResult::Lagged(Box::pin(synced_f)));