From cffc60e7f66b8107d431b135d2c173f30a258bb5 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 24 Jan 2023 22:45:20 -0800 Subject: [PATCH] improve responses when blocks are not available --- TODO.md | 2 +- web3_proxy/src/app/mod.rs | 8 +-- web3_proxy/src/bin/web3_proxy_cli/main.rs | 1 + web3_proxy/src/rpcs/blockchain.rs | 11 ++- web3_proxy/src/rpcs/connection.rs | 75 ++++++++++++-------- web3_proxy/src/rpcs/connections.rs | 86 ++++++++++++++++++----- web3_proxy/src/rpcs/request.rs | 3 +- 7 files changed, 131 insertions(+), 55 deletions(-) diff --git a/TODO.md b/TODO.md index e1b8711c..1ad82e88 100644 --- a/TODO.md +++ b/TODO.md @@ -347,7 +347,7 @@ These are not yet ordered. There might be duplicates. We might not actually need - [ ] `stat delay` script - query database for newest stat - [ ] period_datetime should always be :00. right now it depends on start time -- [ ] two servers running will confuse rpc_accounting! +- [ ] we have our hard rate limiter set up with a period of 60. but most providers have period of 1- [ ] two servers running will confuse rpc_accounting! - it won't happen with users often because they should be sticky to one proxy, but unauthenticated users will definitely hit this - one option: we need the insert to be an upsert, but how do we merge historgrams? - [ ] don't use systemtime. use chrono diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 7f92955e..f41c1210 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1326,8 +1326,8 @@ impl Web3ProxyApp { } "eth_subscribe" => { return Ok(( - JsonRpcForwardedResponse::from_string( - format!("notifications not supported. eth_subscribe is only available over a websocket"), + JsonRpcForwardedResponse::from_str( + "notifications not supported. eth_subscribe is only available over a websocket", Some(-32601), Some(request_id), ), @@ -1336,8 +1336,8 @@ impl Web3ProxyApp { } "eth_unsubscribe" => { return Ok(( - JsonRpcForwardedResponse::from_string( - format!("notifications not supported. eth_unsubscribe is only available over a websocket"), + JsonRpcForwardedResponse::from_str( + "notifications not supported. eth_unsubscribe is only available over a websocket", Some(-32601), Some(request_id), ), diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index 113336b4..da95dc13 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -123,6 +123,7 @@ fn main() -> anyhow::Result<()> { "web3_proxy=trace", "web3_proxy_cli=trace", "web3_proxy::rpcs::blockchain=info", + "web3_proxy::rpcs::request=debug", ] } _ => { diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index bcda8579..da1c2188 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -149,7 +149,7 @@ impl Web3Connections { // TODO: if error, retry? let block: ArcBlock = match rpc { Some(rpc) => rpc - .wait_for_request_handle(authorization, Duration::from_secs(30), false) + .wait_for_request_handle(authorization, Some(Duration::from_secs(30)), false) .await? .request::<_, Option<_>>( "eth_getBlockByHash", @@ -253,11 +253,16 @@ impl Web3Connections { // TODO: if error, retry? // TODO: request_metadata or authorization? + // we don't actually set min_block_needed here because all nodes have all blocks let response = self - .try_send_best_consensus_head_connection(authorization, request, None, Some(num)) + .try_send_best_consensus_head_connection(authorization, request, None, None) .await?; - let raw_block = response.result.context("no block result")?; + if let Some(err) = response.error { + debug!("could not find canonical block {}: {:?}", num, err); + } + + let raw_block = response.result.context("no cannonical block result")?; let block: ArcBlock = serde_json::from_str(raw_block.get())?; diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index e56a4448..99fc3cd1 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -213,7 +213,7 @@ impl Web3Connection { // TODO: start at 0 or 1? for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] { let handle = self - .wait_for_request_handle(authorization, Duration::from_secs(30), true) + .wait_for_request_handle(authorization, None, true) .await?; let head_block_num_future = handle.request::, U256>( @@ -239,7 +239,7 @@ impl Web3Connection { // TODO: wait for the handle BEFORE we check the current block number. it might be delayed too! // TODO: what should the request be? let handle = self - .wait_for_request_handle(authorization, Duration::from_secs(30), true) + .wait_for_request_handle(authorization, None, true) .await?; let archive_result: Result = handle @@ -436,7 +436,7 @@ impl Web3Connection { // TODO: what should the timeout be? should there be a request timeout? // trace!("waiting on chain id for {}", self); let found_chain_id: Result = self - .wait_for_request_handle(&authorization, Duration::from_secs(30), true) + .wait_for_request_handle(&authorization, None, true) .await? .request( "eth_chainId", @@ -720,7 +720,7 @@ impl Web3Connection { loop { // TODO: what should the max_wait be? match self - .wait_for_request_handle(&authorization, Duration::from_secs(30), false) + .wait_for_request_handle(&authorization, None, false) .await { Ok(active_request_handle) => { @@ -806,7 +806,7 @@ impl Web3Connection { Web3Provider::Ws(provider) => { // todo: move subscribe_blocks onto the request handle? let active_request_handle = self - .wait_for_request_handle(&authorization, Duration::from_secs(30), false) + .wait_for_request_handle(&authorization, None, false) .await; let mut stream = provider.subscribe_blocks().await?; drop(active_request_handle); @@ -816,7 +816,7 @@ impl Web3Connection { // all it does is print "new block" for the same block as current block // TODO: how does this get wrapped in an arc? does ethers handle that? let block: Result, _> = self - .wait_for_request_handle(&authorization, Duration::from_secs(30), false) + .wait_for_request_handle(&authorization, None, false) .await? .request( "eth_getBlockByNumber", @@ -917,8 +917,8 @@ impl Web3Connection { Web3Provider::Ws(provider) => { // TODO: maybe the subscribe_pending_txs function should be on the active_request_handle let active_request_handle = self - .wait_for_request_handle(&authorization, Duration::from_secs(30), false) - .await; + .wait_for_request_handle(&authorization, None, false) + .await?; let mut stream = provider.subscribe_pending_txs().await?; @@ -955,10 +955,10 @@ impl Web3Connection { pub async fn wait_for_request_handle( self: &Arc, authorization: &Arc, - max_wait: Duration, + max_wait: Option, allow_not_ready: bool, ) -> anyhow::Result { - let max_wait = Instant::now() + max_wait; + let max_wait = max_wait.map(|x| Instant::now() + x); loop { match self @@ -968,24 +968,34 @@ impl Web3Connection { Ok(OpenRequestResult::Handle(handle)) => return Ok(handle), Ok(OpenRequestResult::RetryAt(retry_at)) => { // TODO: emit a stat? - trace!("{} waiting for request handle until {:?}", self, retry_at); + let wait = retry_at.duration_since(Instant::now()); - if retry_at > max_wait { - // break now since we will wait past our maximum wait time - // TODO: don't use anyhow. use specific error type - return Err(anyhow::anyhow!("timeout waiting for request handle")); + trace!( + "waiting {} millis for request handle on {}", + wait.as_millis(), + self + ); + + if let Some(max_wait) = max_wait { + if retry_at > max_wait { + // break now since we will wait past our maximum wait time + // TODO: don't use anyhow. use specific error type + return Err(anyhow::anyhow!("timeout waiting for request handle")); + } } sleep_until(retry_at).await; } - Ok(OpenRequestResult::NotReady) => { + Ok(OpenRequestResult::NotReady(_)) => { // TODO: when can this happen? log? emit a stat? trace!("{} has no handle ready", self); - let now = Instant::now(); + if let Some(max_wait) = max_wait { + let now = Instant::now(); - if now > max_wait { - return Err(anyhow::anyhow!("unable to retry for request handle")); + if now > max_wait { + return Err(anyhow::anyhow!("unable to retry for request handle")); + } } // TODO: sleep how long? maybe just error? @@ -1013,7 +1023,8 @@ impl Web3Connection { .await .is_none() { - return Ok(OpenRequestResult::NotReady); + trace!("{} is not ready", self); + return Ok(OpenRequestResult::NotReady(self.backup)); } if let Some(hard_limit_until) = self.hard_limit_until.as_ref() { @@ -1029,25 +1040,33 @@ impl Web3Connection { // check rate limits if let Some(ratelimiter) = self.hard_limit.as_ref() { // TODO: how should we know if we should set expire or not? - match ratelimiter.throttle().await? { + match ratelimiter + .throttle() + .await + .context(format!("attempting to throttle {}", self))? + { RedisRateLimitResult::Allowed(_) => { // trace!("rate limit succeeded") } RedisRateLimitResult::RetryAt(retry_at, _) => { - // rate limit failed - // save the smallest retry_after. if nothing succeeds, return an Err with retry_after in it - // TODO: use tracing better - // TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0? - warn!("Exhausted rate limit on {}. Retry at {:?}", self, retry_at); + // rate limit gave us a wait time + if !self.backup { + let when = retry_at.duration_since(Instant::now()); + warn!( + "Exhausted rate limit on {}. Retry in {}ms", + self, + when.as_millis() + ); + } if let Some(hard_limit_until) = self.hard_limit_until.as_ref() { - hard_limit_until.send(retry_at.clone())?; + hard_limit_until.send_replace(retry_at.clone()); } return Ok(OpenRequestResult::RetryAt(retry_at)); } RedisRateLimitResult::RetryNever => { - return Ok(OpenRequestResult::NotReady); + return Ok(OpenRequestResult::NotReady(self.backup)); } } }; diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 5b97a49a..9ecf3fd9 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -428,7 +428,10 @@ impl Web3Connections { ) .await { - return Ok(without_backups); + // TODO: this might use backups too eagerly. but even when we allow backups, we still prioritize our own + if matches!(without_backups, OpenRequestResult::Handle(_)) { + return Ok(without_backups); + } } self._best_consensus_head_connection( @@ -460,7 +463,7 @@ impl Web3Connections { head_block.number() } else { // TODO: optionally wait for a head block >= min_block_needed - return Ok(OpenRequestResult::NotReady); + return Ok(OpenRequestResult::NotReady(allow_backups)); }; let min_block_needed = min_block_needed.unwrap_or(&head_block_num); @@ -504,7 +507,7 @@ impl Web3Connections { } cmp::Ordering::Greater => { // TODO? if the blocks is close and wait_for_sync and allow_backups, wait for change on a watch_consensus_connections_receiver().subscribe() - return Ok(OpenRequestResult::NotReady); + return Ok(OpenRequestResult::NotReady(allow_backups)); } } @@ -595,7 +598,7 @@ impl Web3Connections { Ok(OpenRequestResult::RetryAt(retry_at)) => { earliest_retry_at = earliest_retry_at.min(Some(retry_at)); } - Ok(OpenRequestResult::NotReady) => { + Ok(OpenRequestResult::NotReady(_)) => { // TODO: log a warning? emit a stat? } Err(err) => { @@ -625,7 +628,7 @@ impl Web3Connections { // TODO: should we log here? - Ok(OpenRequestResult::NotReady) + Ok(OpenRequestResult::NotReady(allow_backups)) } Some(earliest_retry_at) => { warn!("no servers on {:?}! {:?}", self, earliest_retry_at); @@ -719,7 +722,7 @@ impl Web3Connections { max_count -= 1; selected_rpcs.push(handle) } - Ok(OpenRequestResult::NotReady) => { + Ok(OpenRequestResult::NotReady(_)) => { warn!("no request handle for {}", connection) } Err(err) => { @@ -911,17 +914,51 @@ impl Web3Connections { } } } - OpenRequestResult::NotReady => { + OpenRequestResult::NotReady(backups_included) => { if let Some(request_metadata) = request_metadata { request_metadata.no_servers.fetch_add(1, Ordering::Release); } - trace!("No servers ready. Waiting up to 1 second for change in synced servers"); + // todo!( + // "check if we are requesting an old block and no archive servers are synced" + // ); + + if let Some(min_block_needed) = min_block_needed { + let mut theres_a_chance = false; + + for potential_conn in self.conns.values() { + if skip_rpcs.contains(potential_conn) { + continue; + } + + // TODO: should we instead check if has_block_data but with the current head block? + if potential_conn.has_block_data(min_block_needed) { + trace!("chance for {} on {}", min_block_needed, potential_conn); + theres_a_chance = true; + break; + } + + skip_rpcs.push(potential_conn.clone()); + } + + if !theres_a_chance { + debug!("no chance of finding data in block #{}", min_block_needed); + break; + } + } + + if backups_included { + // if NotReady and we tried backups, there's no chance + warn!("No servers ready even after checking backups"); + break; + } + + debug!("No servers ready. Waiting up to 1 second for change in synced servers"); // TODO: exponential backoff? tokio::select! { _ = sleep(Duration::from_secs(1)) => { - skip_rpcs.pop(); + // do NOT pop the last rpc off skip here } _ = watch_consensus_connections.changed() => { watch_consensus_connections.borrow_and_update(); @@ -944,17 +981,30 @@ impl Web3Connections { } let num_conns = self.conns.len(); + let num_skipped = skip_rpcs.len(); - if skip_rpcs.is_empty() { + if num_skipped == 0 { error!("No servers synced ({} known)", num_conns); - Err(anyhow::anyhow!("No servers synced ({} known)", num_conns)) + return Ok(JsonRpcForwardedResponse::from_str( + "No servers synced", + Some(-32000), + Some(request.id), + )); } else { - Err(anyhow::anyhow!( - "{}/{} servers erred", - skip_rpcs.len(), - num_conns - )) + // TODO: warn? debug? trace? + warn!( + "Requested data was not available on {}/{} servers", + num_skipped, num_conns + ); + + // TODO: what error code? + // cloudflare gives {"jsonrpc":"2.0","error":{"code":-32043,"message":"Requested data cannot be older than 128 blocks."},"id":1} + return Ok(JsonRpcForwardedResponse::from_str( + "Requested data is not available", + Some(-32043), + Some(request.id), + )); } } @@ -1287,7 +1337,7 @@ mod tests { dbg!(&x); - assert!(matches!(x, OpenRequestResult::NotReady)); + assert!(matches!(x, OpenRequestResult::NotReady(true))); // add lagged blocks to the conns. both servers should be allowed lagged_block.block = conns.save_block(lagged_block.block, true).await.unwrap(); @@ -1360,7 +1410,7 @@ mod tests { conns .best_consensus_head_connection(&authorization, None, &[], Some(&2.into())) .await, - Ok(OpenRequestResult::NotReady) + Ok(OpenRequestResult::NotReady(true)) )); } diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 2c440d26..87e79ba8 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -27,7 +27,8 @@ pub enum OpenRequestResult { /// Unable to start a request. Retry at the given time. RetryAt(Instant), /// Unable to start a request because the server is not synced - NotReady, + /// contains "true" if backup servers were attempted + NotReady(bool), } /// Make RPC requests through this handle and drop it when you are done.