From 78edfee6b91e61f1322e140a85eedcd211f6e17d Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 9 Oct 2023 22:21:39 -0700 Subject: [PATCH] timeout and server selection fixes add a short connect timeout separate from the overall request timeout. also fix a bug when only 1 server was in the rpc list causing a very tight loop that made tokio sad --- .env | 2 + Cargo.lock | 24 ++- web3_proxy/Cargo.toml | 2 +- web3_proxy/src/app/mod.rs | 18 +- web3_proxy/src/config.rs | 2 + web3_proxy/src/errors.rs | 22 ++- web3_proxy/src/frontend/authorization.rs | 43 +++-- web3_proxy/src/jsonrpc.rs | 12 +- web3_proxy/src/rpcs/consensus.rs | 159 ++++++++++-------- web3_proxy/src/rpcs/jsonrpc_client.rs | 4 - web3_proxy/src/rpcs/many.rs | 13 +- web3_proxy/src/rpcs/one.rs | 82 +++++---- web3_proxy/src/rpcs/request.rs | 2 +- .../src/sub_commands/migrate_stats_to_v2.rs | 3 +- web3_proxy_cli/tests/test_proxy.rs | 41 ++--- 15 files changed, 257 insertions(+), 172 deletions(-) delete mode 100644 web3_proxy/src/rpcs/jsonrpc_client.rs diff --git a/.env b/.env index e9f9714b..b7b7d4ba 100644 --- a/.env +++ b/.env @@ -1 +1,3 @@ DATABASE_URL=mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy +RUST_BACKTRACE=1 +RUST_LOG=web3_proxy=debug,info diff --git a/Cargo.lock b/Cargo.lock index 68f644a4..ffb07867 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4295,14 +4295,14 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.6" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebee201405406dbf528b8b672104ae6d6d63e6d118cb10e4d51abbc7b58044ff" +checksum = "d119d7c7ca818f8a53c300863d4f87566aac09943aef5b355bb83969dae75d87" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.9", - "regex-syntax 0.7.5", + "regex-automata 0.4.1", + "regex-syntax 0.8.0", ] [[package]] @@ -4316,13 +4316,13 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.9" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59b23e92ee4318893fa3fe3e6fb365258efbfe6ac6ab30f090cdcbb7aa37efa9" +checksum = "465c6fc0621e4abc4187a2bda0937bfd4f722c2730b29562e19689ea796c9a4b" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.7.5", + "regex-syntax 0.8.0", ] [[package]] @@ -4337,6 +4337,12 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +[[package]] +name = "regex-syntax" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3cbb081b9784b07cceb8824c8583f86db4814d172ab043f3c23f7dc600bf83d" + [[package]] name = "rend" version = "0.4.1" @@ -4845,9 +4851,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.19" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad977052201c6de01a8ef2aa3378c4bd23217a056337d1d6da40468d267a4fb0" +checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090" dependencies = [ "serde", ] diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 6776fcff..488ca0e2 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -79,7 +79,7 @@ ordered-float = {version = "4.1.0" } pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async", "sync"] } parking_lot = { version = "0.12.1", features = ["arc_lock", "nightly"] } rdkafka = { version = "0.34.0", features = ["tracing"] } -regex = "1.9.6" +regex = "1.10.0" reqwest = { version = "0.11.22", default-features = false, features = ["json", "default-tls"] } rust_decimal = { version = "1.32.0" } sentry = { version = "0.31.7", default-features = false, features = ["anyhow", "backtrace", "contexts", "panic", "reqwest", "native-tls", "serde_json", "tracing"] } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 2cf17fb1..67f624fd 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -47,7 +47,7 @@ use std::time::Duration; use tokio::select; use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore}; use tokio::task::JoinHandle; -use tokio::time::{sleep, timeout, Instant}; +use tokio::time::{sleep, timeout, timeout_at, Instant}; use tracing::{error, info, trace, warn}; // TODO: make this customizable? @@ -1621,8 +1621,8 @@ impl Web3ProxyApp { } else if self.jsonrpc_response_failed_cache_keys.contains_key(&cache_key) { // this is a cache_key that we know won't cache // NOTICE! We do **NOT** use get which means the key's hotness is not updated. we don't use time-to-idler here so thats fine. but be careful if that changes - timeout( - web3_request.ttl(), + timeout_at( + web3_request.expire_at(), self.balanced_rpcs .try_proxy_connection::>( web3_request, @@ -1642,8 +1642,8 @@ impl Web3ProxyApp { let mut x = match timeout(Duration::from_secs(1), s.acquire_owned()).await { Err(_) => { // TODO: should we try to cache this? whatever has the semaphore //should// handle that for us - timeout( - web3_request.ttl(), + timeout_at( + web3_request.expire_at(), self.balanced_rpcs .try_proxy_connection::>( web3_request, @@ -1661,10 +1661,8 @@ impl Web3ProxyApp { app .jsonrpc_response_cache .try_get_with::<_, Web3ProxyError>(cache_key, async { - let duration = web3_request.ttl().saturating_sub(Duration::from_secs(1)); - // TODO: dynamic timeout based on whats left on web3_request - let response_data = timeout(duration, app.balanced_rpcs + let response_data = timeout_at(web3_request.expire_at(), app.balanced_rpcs .try_proxy_connection::>( &web3_request, )).await; @@ -1738,8 +1736,8 @@ impl Web3ProxyApp { x } else { - let mut x = timeout( - web3_request.ttl(), + let mut x = timeout_at( + web3_request.expire_at(), self.balanced_rpcs .try_proxy_connection::>( web3_request, diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 4ae81b02..3f5993e2 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -293,6 +293,8 @@ pub fn average_block_interval(chain_id: u64) -> Duration { 1101 => Duration::from_secs(7), // base 8453 => Duration::from_secs(2), + // development + 31337 => Duration::from_secs(10), // arbitrum 42161 => Duration::from_millis(500), // web3-proxy tests diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index 15008438..66958dc5 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -66,6 +66,12 @@ pub enum Web3ProxyError { EthersHttpClient(ethers::providers::HttpClientError), EthersProvider(ethers::prelude::ProviderError), EthersWsClient(ethers::prelude::WsClientError), + #[display(fmt = "{} < {}", head, requested)] + #[from(ignore)] + FarFutureBlock { + head: U64, + requested: U64, + }, GasEstimateNotU256, HdrRecord(hdrhistogram::errors::RecordError), Headers(headers::Error), @@ -382,6 +388,20 @@ impl Web3ProxyError { ) } }, + Self::FarFutureBlock { head, requested } => { + trace!(?head, ?requested, "FarFutureBlock"); + ( + StatusCode::OK, + JsonRpcErrorData { + message: "requested block is too far in the future".into(), + code: (-32002).into(), + data: Some(json!({ + "head": head, + "requested": requested, + })), + }, + ) + } // Self::JsonRpcForwardedError(x) => (StatusCode::OK, x), Self::GasEstimateNotU256 => { trace!("GasEstimateNotU256"); @@ -656,7 +676,7 @@ impl Web3ProxyError { JsonRpcErrorData { message: "mdbx panic".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), - data: Some(serde_json::Value::String(msg.to_string())), + data: Some(json!({"rpc": rpc})), }, ) } diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 49586d2f..d697c532 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -164,8 +164,6 @@ pub struct Web3Request { /// We use Instant and not timestamps to avoid problems with leap seconds and similar issues #[derivative(Default(value = "Instant::now()"))] pub start_instant: Instant, - #[derivative(Default(value = "Instant::now() + Duration::from_secs(295)"))] - pub expire_instant: Instant, /// if this is empty, there was a cache_hit /// otherwise, it is populated with any rpc servers that were used by this request pub backend_requests: BackendRequests, @@ -193,6 +191,12 @@ pub struct Web3Request { /// Cancel-safe channel for sending stats to the buffer pub stat_sender: Option>, + + /// How long to spend waiting for an rpc that can serve this request + pub connect_timeout: Duration, + /// How long to spend waiting for an rpc to respond to this request + /// TODO: this should start once the connection is established + pub expire_timeout: Duration, } impl Display for Web3Request { @@ -340,10 +344,6 @@ impl Web3Request { ) -> Web3ProxyResult> { let start_instant = Instant::now(); - // TODO: get this default from config, or from user settings - // 5 minutes with a buffer for other things being slow - let expire_instant = start_instant + max_wait.unwrap_or_else(|| Duration::from_secs(295)); - // let request: RequestOrMethod = request.into(); // we VERY INTENTIONALLY log to kafka BEFORE calculating the cache key @@ -363,6 +363,9 @@ impl Web3Request { _ => CacheMode::Never, }; + let connect_timeout = Duration::from_secs(3); + let expire_timeout = max_wait.unwrap_or_else(|| Duration::from_secs(295)); + let x = Self { archive_request: false.into(), authorization, @@ -370,7 +373,8 @@ impl Web3Request { cache_mode, chain_id, error_response: false.into(), - expire_instant, + connect_timeout, + expire_timeout, head_block: head_block.clone(), kafka_debug_logger, no_servers: 0.into(), @@ -488,6 +492,7 @@ impl Web3Request { self.inner.id() } + #[inline] pub fn max_block_needed(&self) -> Option { self.cache_mode.to_block().map(|x| *x.num()) } @@ -500,13 +505,27 @@ impl Web3Request { } } - pub fn ttl(&self) -> Duration { - self.expire_instant - .saturating_duration_since(Instant::now()) + #[inline] + pub fn connect_timeout_at(&self) -> Instant { + // TODO: get from config + self.start_instant + Duration::from_secs(3) } - pub fn ttl_expired(&self) -> bool { - self.expire_instant < Instant::now() + #[inline] + pub fn connect_timeout(&self) -> bool { + self.connect_timeout_at() <= Instant::now() + } + + #[inline] + pub fn expire_at(&self) -> Instant { + // TODO: get from config + // erigon's timeout is 5 minutes so we want it shorter than that + self.start_instant + Duration::from_secs(295) + } + + #[inline] + pub fn expired(&self) -> bool { + self.expire_at() <= Instant::now() } pub fn try_send_stat(mut self) -> Web3ProxyResult<()> { diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index 824005ad..4d3e1c95 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -3,7 +3,6 @@ use axum::response::{IntoResponse, Response as AxumResponse}; use axum::Json; use bytes::{Bytes, BytesMut}; use derive_more::From; -use ethers::providers::ProviderError; use futures_util::stream::{self, StreamExt}; use futures_util::TryStreamExt; use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor}; @@ -218,7 +217,7 @@ pub struct StreamResponse { impl StreamResponse { // TODO: error handing - pub async fn read(self) -> Result, ProviderError> + pub async fn read(self) -> Web3ProxyResult> where T: de::DeserializeOwned, { @@ -306,7 +305,7 @@ where } // TODO: error handling - pub async fn parsed(self) -> Result, ProviderError> { + pub async fn parsed(self) -> Web3ProxyResult> { match self { Self::Parsed(resp) => Ok(resp), Self::Stream(resp) => resp.read().await, @@ -362,7 +361,7 @@ pub enum Response> { } impl Response> { - pub async fn to_json_string(self) -> Result { + pub async fn to_json_string(self) -> Web3ProxyResult { let x = match self { Self::Single(resp) => { // TODO: handle streaming differently? @@ -663,6 +662,11 @@ impl JsonRpcErrorData { .expect("should always serialize") .len() } + + pub fn is_retryable(&self) -> bool { + // TODO: move stuff from request to here + todo!() + } } impl From<&'static str> for JsonRpcErrorData { diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index c45be599..9ac03f33 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -378,9 +378,8 @@ impl RankedRpcs { } } - // TODO: better name for this // TODO: this should probably be on the rpcs as "can_serve_request" - // TODO: this should probably take the method, too + // TODO: and it should take the method into account, too pub fn rpc_will_work_now( &self, min_block_needed: Option, @@ -423,6 +422,7 @@ impl RankedRpcs { } // TODO: refs for all of these. borrow on a Sender is cheap enough +// TODO: move this to many.rs impl Web3Rpcs { pub fn head_block(&self) -> Option { self.watch_head_block @@ -1004,81 +1004,107 @@ fn best_rpc<'a>(rpc_a: &'a Arc, rpc_b: &'a Arc) -> &'a Arc impl Stream { - // TODO: get error_handler out of the web3_request, probably the authorization - // let error_handler = web3_request.authorization.error_handler; - let error_handler = None; - stream! { + trace!("entered stream"); + // TODO: get error_handler out of the web3_request? probably the authorization + // let error_handler = web3_request.authorization.error_handler; + let error_handler = None; + + let max_len = self.inner.len() + self.outer.len(); + + // TODO: do this without having 3 Vecs + let mut filtered = Vec::with_capacity(max_len); + let mut attempted = Vec::with_capacity(max_len); + let mut failed = Vec::with_capacity(max_len); + + // todo!("be sure to set server_error if we exit without any rpcs!"); loop { - if self.request.ttl_expired() { + if self.request.connect_timeout() { break; - } else { - // TODO: think about this more - yield_now().await; } let mut earliest_retry_at = None; let mut wait_for_sync = None; - // first check the inners - // TODO: DRY - for rpcs_iter in [self.inner.iter(), self.outer.iter()] { - for (rpc_a, rpc_b) in rpcs_iter.circular_tuple_windows() { - // TODO: ties within X% to the server with the smallest block_data_limit? - // find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose - // TODO: should next_available be reversed? - // TODO: this is similar to sort_for_load_balancing_on, but at this point we don't want to prefer tiers - // TODO: move ethis to a helper function just so we can test it - // TODO: should x.next_available should be Reverse<_>? - let best_rpc = best_rpc(rpc_a, rpc_b); + // first check the inners, then the outers + for rpcs in [&self.inner, &self.outer] { - match best_rpc - .try_request_handle(&self.request, error_handler) - .await - { - Ok(OpenRequestResult::Handle(handle)) => { - trace!("opened handle: {}", best_rpc); - yield handle; - } - Ok(OpenRequestResult::RetryAt(retry_at)) => { - trace!( - "retry on {} @ {}", - best_rpc, - retry_at.duration_since(Instant::now()).as_secs_f32() - ); + attempted.clear(); - earliest_retry_at = earliest_retry_at.min(Some(retry_at)); - continue; - } - Ok(OpenRequestResult::Lagged(x)) => { - trace!("{} is lagged. will not work now", best_rpc); - // this will probably always be the same block, right? - if wait_for_sync.is_none() { - wait_for_sync = Some(x); + while attempted.len() + failed.len() < rpcs.len() { + filtered.clear(); + + // TODO: i'd like to do this without the collect, but since we push into `attempted`, having `attempted.contains` causes issues + filtered.extend(rpcs.iter().filter(|x| !(attempted.contains(x) || failed.contains(x)))); + + // tuple_windows doesn't do anything for single item iters. make the code DRY by just having it compare itself + if filtered.len() == 1 { + filtered.push(filtered[0]); + } + + for (rpc_a, rpc_b) in filtered.iter().tuple_windows() { + // TODO: ties within X% to the server with the smallest block_data_limit? + // find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose + // TODO: should next_available be reversed? + // TODO: this is similar to sort_for_load_balancing_on, but at this point we don't want to prefer tiers + // TODO: move ethis to a helper function just so we can test it + // TODO: should x.next_available should be Reverse<_>? + let best_rpc = best_rpc(rpc_a, rpc_b); + + attempted.push(best_rpc); + + match best_rpc + .try_request_handle(&self.request, error_handler) + .await + { + Ok(OpenRequestResult::Handle(handle)) => { + trace!("opened handle: {}", best_rpc); + yield handle; + } + Ok(OpenRequestResult::RetryAt(retry_at)) => { + trace!( + "retry on {} @ {}", + best_rpc, + retry_at.duration_since(Instant::now()).as_secs_f32() + ); + + earliest_retry_at = earliest_retry_at.min(Some(retry_at)); + continue; + } + Ok(OpenRequestResult::Lagged(x)) => { + trace!("{} is lagged. will not work now", best_rpc); + // this will probably always be the same block, right? + if wait_for_sync.is_none() { + wait_for_sync = Some(x); + } + continue; + } + Ok(OpenRequestResult::Failed) => { + // TODO: log a warning? emit a stat? + trace!("best_rpc not ready: {}", best_rpc); + attempted.pop(); + failed.push(best_rpc); + continue; + } + Err(err) => { + trace!("No request handle for {}. err={:?}", best_rpc, err); + attempted.pop(); + failed.push(best_rpc); + continue; } - continue; - } - Ok(OpenRequestResult::NotReady) => { - // TODO: log a warning? emit a stat? - trace!("best_rpc not ready: {}", best_rpc); - continue; - } - Err(err) => { - trace!("No request handle for {}. err={:?}", best_rpc, err); - continue; } } + + debug_assert!(!attempted.is_empty()); } } // if we got this far, no inner or outer rpcs are ready. thats suprising since an inner should have been - // maybe someone requested something silly like a far future block? // clear earliest_retry_at if it is too far in the future to help us if let Some(retry_at) = earliest_retry_at { - if self.request.expire_instant <= retry_at { + if self.request.connect_timeout_at() <= retry_at { // no point in waiting. it wants us to wait too long earliest_retry_at = None; } @@ -1090,15 +1116,16 @@ impl RpcsForRequest { // we have nothing to wait for. uh oh! break; } - (_, Some(retry_at)) => { + (None, Some(retry_at)) => { // try again after rate limits are done - sleep_until(retry_at).await; + if retry_at > Instant::now() { + sleep_until(retry_at).await; + } else { + // TODO: why is this happening? why would we get rate limited to now? it should be like a second at minimum + yield_now().await; + } } (Some(wait_for_sync), None) => { - break; - - // TODO: think about this more - /* select! { x = wait_for_sync => { match x { @@ -1113,13 +1140,11 @@ impl RpcsForRequest { }, } } - _ = sleep_until(self.request.expire_instant) => { + _ = sleep_until(self.request.expire_at()) => { break; } } - */ } - /* (Some(wait_for_sync), Some(retry_at)) => { select! { x = wait_for_sync => { @@ -1136,14 +1161,16 @@ impl RpcsForRequest { } } _ = sleep_until(retry_at) => { + // if sleep didn't have to wait at all, something seems wrong. have a minimum wait? yield_now().await; continue; } } } - */ } } } + + // TODO: log that no servers were available. this might not be a server error. the user might have requested something in the far future (common when people mix up chains) } } diff --git a/web3_proxy/src/rpcs/jsonrpc_client.rs b/web3_proxy/src/rpcs/jsonrpc_client.rs deleted file mode 100644 index c7d42049..00000000 --- a/web3_proxy/src/rpcs/jsonrpc_client.rs +++ /dev/null @@ -1,4 +0,0 @@ -use super::{many::Web3Rpcs, one::Web3Rpc}; -use ethers::providers::{JsonRpcClient, ProviderError}; -use serde::{de::DeserializeOwned, Serialize}; -use std::fmt::Debug; diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 0baf07ca..f4e63e30 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -408,17 +408,22 @@ impl Web3Rpcs { Err(x) => return Err(x), }; - if next_try > web3_request.expire_instant { - let next_try = Instant::now().duration_since(next_try).as_secs(); + if next_try > web3_request.connect_timeout_at() { + let retry_in = Instant::now().duration_since(next_try).as_secs(); // we don't use Web3ProxyError::RateLimited because that is for the user being rate limited return Err(Web3ProxyError::StatusCode( StatusCode::TOO_MANY_REQUESTS, "backend rpcs are all rate limited!".into(), - Some(json!({"retry_at": next_try})), + Some(json!({"retry_in": retry_in})), )); } + trace!(?next_try, "retry needed"); + + // todo!("this must be a bug in our tests. in prod if things are overloaded i could see it happening") + debug_assert!(Instant::now() < next_try); + select! { _ = sleep_until(next_try) => { // rpcs didn't change and we have waited too long. break to return an error @@ -587,7 +592,7 @@ impl Web3Rpcs { // cloudflare gives {"jsonrpc":"2.0","error":{"code":-32043,"message":"Requested data cannot be older than 128 blocks."},"id":1} Err(JsonRpcErrorData { message: "Requested data is not available".into(), - code: -32043, + code: -32001, data: Some(json!({ "request": web3_request })), diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index a12a7ef8..3099becd 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -30,7 +30,6 @@ use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize}; use std::{cmp::Ordering, sync::Arc}; use tokio::select; use tokio::sync::{mpsc, watch, RwLock as AsyncRwLock}; -use tokio::task::yield_now; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; use tracing::{debug, error, info, trace, warn, Level}; use url::Url; @@ -353,6 +352,8 @@ impl Web3Rpc { // TODO: binary search between 90k and max? // TODO: start at 0 or 1? + let mut last = U256::MAX; + // TODO: these should all be U256, not u64 for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] { let head_block_num = self .internal_request::<_, U256>( @@ -367,6 +368,13 @@ impl Web3Rpc { let maybe_archive_block = head_block_num.saturating_sub((block_data_limit).into()); + if last == maybe_archive_block { + // we already checked it. exit early + break; + } + + last = maybe_archive_block; + trace!( "checking maybe_archive_block on {}: {}", self, @@ -992,8 +1000,6 @@ impl Web3Rpc { web3_request: &Arc, error_handler: Option, ) -> Web3ProxyResult { - let mut head_block_sender = None; - loop { match self.try_request_handle(web3_request, error_handler).await { Ok(OpenRequestResult::Handle(handle)) => return Ok(handle), @@ -1011,7 +1017,7 @@ impl Web3Rpc { debug_assert!(wait > Duration::from_secs(0)); // TODO: have connect_timeout in addition to the full ttl - if retry_at > web3_request.expire_instant { + if retry_at > web3_request.expire_at() { // break now since we will wait past our maximum wait time return Err(Web3ProxyError::Timeout(Some( web3_request.start_instant.elapsed(), @@ -1022,35 +1028,34 @@ impl Web3Rpc { } Ok(OpenRequestResult::Lagged(now_synced_f)) => { select! { - _ = now_synced_f => { - // TODO: i'm guessing this is returning immediatly - yield_now().await; - } - _ = sleep_until(web3_request.expire_instant) => { + _ = now_synced_f => {} + _ = sleep_until(web3_request.expire_at()) => { break; } } } - Ok(OpenRequestResult::NotReady) => { + Ok(OpenRequestResult::Failed) => { // TODO: when can this happen? log? emit a stat? trace!("{} has no handle ready", self); - if head_block_sender.is_none() { - head_block_sender = self.head_block_sender.as_ref().map(|x| x.subscribe()); - } + // if head_block_sender.is_none() { + // head_block_sender = self.head_block_sender.as_ref().map(|x| x.subscribe()); + // } - if let Some(head_block_sender) = &mut head_block_sender { - select! { - _ = head_block_sender.changed() => { - head_block_sender.borrow_and_update(); - } - _ = sleep_until(web3_request.expire_instant) => { - break; - } - } - } else { - break; - } + // if let Some(head_block_sender) = &mut head_block_sender { + // select! { + // _ = head_block_sender.changed() => { + // head_block_sender.borrow_and_update(); + // } + // _ = sleep_until(web3_request.expire_at()) => { + // break; + // } + // } + // } else { + // break; + // } + + break; } Err(err) => return Err(err), } @@ -1140,7 +1145,8 @@ impl Web3Rpc { // TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check if let Some(block_needed) = web3_request.min_block_needed() { if !self.has_block_data(block_needed) { - return Ok(OpenRequestResult::NotReady); + trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing min block", self); + return Ok(OpenRequestResult::Failed); } } @@ -1148,9 +1154,13 @@ impl Web3Rpc { // TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check if let Some(block_needed) = web3_request.max_block_needed() { if !self.has_block_data(block_needed) { - let clone = self.clone(); - let expire_instant = web3_request.expire_instant; + trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing max block", self); + let clone = self.clone(); + let connect_timeout_at = web3_request.connect_timeout_at(); + + // create a future that resolves once this rpc can serve this request + // TODO: i don't love this future. think about it more let synced_f = async move { let mut head_block_receiver = clone.head_block_sender.as_ref().unwrap().subscribe(); @@ -1163,21 +1173,23 @@ impl Web3Rpc { _ = head_block_receiver.changed() => { if let Some(head_block_number) = head_block_receiver.borrow_and_update().as_ref().map(|x| x.number()) { if head_block_number >= block_needed { - // the block we needed has arrived! + trace!("the block we needed has arrived!"); break; } - // TODO: configurable lag per chain - if head_block_number < block_needed.saturating_sub(5.into()) { - // TODO: more detailed error about this being a far future block - return Err(Web3ProxyError::NoServersSynced); + // wait up to 2 blocks + // TODO: configurable wait per chain + if head_block_number + U64::from(2) < block_needed { + return Err(Web3ProxyError::FarFutureBlock { head: head_block_number, requested: block_needed }); } } else { // TODO: what should we do? this server has no blocks at all. we can wait, but i think exiting now is best // yield_now().await; + error!("no head block during try_request_handle on {}", clone); return Err(Web3ProxyError::NoServersSynced); } } - _ = sleep_until(expire_instant) => { + _ = sleep_until(connect_timeout_at) => { + error!("connection timeout on {}", clone); return Err(Web3ProxyError::NoServersSynced); } } @@ -1198,7 +1210,7 @@ impl Web3Rpc { } RedisRateLimitResult::RetryNever => { warn!("how did retry never on {} happen?", self); - return Ok(OpenRequestResult::NotReady); + return Ok(OpenRequestResult::Failed); } }; diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 96af7c73..380f484f 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -31,7 +31,7 @@ pub enum OpenRequestResult { /// TODO: should this return an OpenRequestHandle? that might recurse Lagged(Pin>> + Send>>), /// Unable to start a request because no servers are synced or the necessary data has been pruned - NotReady, + Failed, } /// Make RPC requests through this handle and drop it when you are done. diff --git a/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs b/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs index 9a1b06c1..5d366e23 100644 --- a/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs +++ b/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs @@ -1,6 +1,5 @@ use std::num::NonZeroU64; use std::sync::Arc; -use std::time::Duration; use tracing::{error, info}; use web3_proxy::app::BILLING_PERIOD_SECONDS; use web3_proxy::config::TopConfig; @@ -214,7 +213,7 @@ impl MigrateStatsToV2SubCommand { usd_per_cu: top_config.app.usd_per_cu.unwrap_or_default(), cache_mode: Default::default(), start_instant: Instant::now(), - expire_instant: Instant::now() + Duration::from_secs(1), + ..Default::default() }; web3_request.try_send_stat()?; diff --git a/web3_proxy_cli/tests/test_proxy.rs b/web3_proxy_cli/tests/test_proxy.rs index bd1adc2d..6acf0d9f 100644 --- a/web3_proxy_cli/tests/test_proxy.rs +++ b/web3_proxy_cli/tests/test_proxy.rs @@ -1,10 +1,7 @@ use serde_json::Value; use std::{str::FromStr, time::Duration}; -use tokio::{ - task::yield_now, - time::{sleep, Instant}, -}; -use tracing::info; +use tokio::{task::yield_now, time::sleep}; +use tracing::{info, warn}; use web3_proxy::prelude::ethers::{ prelude::{Block, Transaction, TxHash, H256, U256, U64}, providers::{Http, JsonRpcClient, Quorum, QuorumProvider, WeightedProvider}, @@ -39,6 +36,17 @@ async fn it_starts_and_stops() { let anvil_provider = &a.provider; let proxy_provider = &x.proxy_provider; + // check the /health page + let proxy_url = x.proxy_provider.url(); + let health_response = reqwest::get(format!("{}health", proxy_url)).await; + dbg!(&health_response); + assert_eq!(health_response.unwrap().status(), StatusCode::OK); + + // check the /status page + let status_response = reqwest::get(format!("{}status", proxy_url)).await; + dbg!(&status_response); + assert_eq!(status_response.unwrap().status(), StatusCode::OK); + let anvil_result = anvil_provider .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) .await @@ -52,17 +60,6 @@ async fn it_starts_and_stops() { assert_eq!(anvil_result, proxy_result); - // check the /health page - let proxy_url = x.proxy_provider.url(); - let health_response = reqwest::get(format!("{}health", proxy_url)).await; - dbg!(&health_response); - assert_eq!(health_response.unwrap().status(), StatusCode::OK); - - // check the /status page - let status_response = reqwest::get(format!("{}status", proxy_url)).await; - dbg!(&status_response); - assert_eq!(status_response.unwrap().status(), StatusCode::OK); - let first_block_num = anvil_result.number.unwrap(); // mine a block @@ -81,13 +78,9 @@ async fn it_starts_and_stops() { yield_now().await; - let mut proxy_result; - let start = Instant::now(); - loop { - if start.elapsed() > Duration::from_secs(1) { - panic!("took too long to sync!"); - } + let mut proxy_result = None; + for _ in 0..10 { proxy_result = proxy_provider .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) .await @@ -99,7 +92,9 @@ async fn it_starts_and_stops() { } } - sleep(Duration::from_millis(10)).await; + warn!(?proxy_result, ?second_block_num); + + sleep(Duration::from_millis(100)).await; } assert_eq!(anvil_result, proxy_result.unwrap());