From 76c8f1ef96cb7e1c67a50cea19fbd4343ac81cb5 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 11 Oct 2022 21:31:34 +0000 Subject: [PATCH] i think it works --- Cargo.lock | 2 +- entities/src/rpc_accounting.rs | 1 + migration/src/m20221007_213828_accounting.rs | 6 ++ web3_proxy/Cargo.toml | 2 +- web3_proxy/src/app.rs | 14 ++- web3_proxy/src/frontend/authorization.rs | 1 + web3_proxy/src/rpcs/connections.rs | 48 +++++++++- web3_proxy/src/rpcs/request.rs | 96 ++++++++++---------- web3_proxy/src/stats.rs | 29 ++++-- 9 files changed, 138 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 871a5bb9..f28f1aeb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5493,7 +5493,7 @@ dependencies = [ [[package]] name = "web3_proxy" -version = "0.2.0" +version = "0.4.0" dependencies = [ "anyhow", "arc-swap", diff --git a/entities/src/rpc_accounting.rs b/entities/src/rpc_accounting.rs index 1fddc92b..7afeee7e 100644 --- a/entities/src/rpc_accounting.rs +++ b/entities/src/rpc_accounting.rs @@ -17,6 +17,7 @@ pub struct Model { pub frontend_requests: u32, pub backend_requests: u32, pub backend_retries: u32, + pub no_servers: u32, pub cache_misses: u32, pub cache_hits: u32, pub sum_request_bytes: u64, diff --git a/migration/src/m20221007_213828_accounting.rs b/migration/src/m20221007_213828_accounting.rs index 379e78e8..f8b44d44 100644 --- a/migration/src/m20221007_213828_accounting.rs +++ b/migration/src/m20221007_213828_accounting.rs @@ -54,6 +54,11 @@ impl MigrationTrait for Migration { .big_unsigned() .not_null(), ) + .col( + ColumnDef::new(RpcAccounting::NoServers) + .big_unsigned() + .not_null(), + ) .col( ColumnDef::new(RpcAccounting::CacheMisses) .big_unsigned() @@ -207,6 +212,7 @@ enum RpcAccounting { FrontendRequests, BackendRequests, BackendRetries, + NoServers, CacheMisses, CacheHits, SumRequestBytes, diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 8888986e..577e779e 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "web3_proxy" -version = "0.2.0" +version = "0.4.0" edition = "2021" default-run = "web3_proxy" diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 62fc156d..e27828f2 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -963,6 +963,7 @@ impl Web3ProxyApp { } _ => { // TODO: this needs the correct error code in the response + // TODO: emit stat? return Err(anyhow::anyhow!("invalid request")); } } @@ -1030,6 +1031,7 @@ impl Web3ProxyApp { // discard their id by replacing it with an empty response.id = Default::default(); + // TODO: only cache the inner response (or error) Ok::<_, anyhow::Error>(response) }) .await @@ -1043,9 +1045,9 @@ impl Web3ProxyApp { // since this data came likely out of a cache, the id is not going to match // replace the id with our request's id. - // TODO: cache without the id response.id = request_id; + // DRY this up by just returning the partial result (or error) here if let (Some(stat_sender), Ok(AuthorizedRequest::User(Some(_), authorized_key))) = ( self.stat_sender.as_ref(), Arc::try_unwrap(authorized_request), @@ -1066,6 +1068,16 @@ impl Web3ProxyApp { let response = JsonRpcForwardedResponse::from_value(partial_response, request_id); + if let (Some(stat_sender), Ok(AuthorizedRequest::User(Some(_), authorized_key))) = ( + self.stat_sender.as_ref(), + Arc::try_unwrap(authorized_request), + ) { + let response_stat = + ProxyResponseStat::new(request.method, authorized_key, request_metadata, &response); + + stat_sender.send_async(response_stat.into()).await?; + } + Ok(response) } } diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index e5f78612..2093beee 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -58,6 +58,7 @@ pub struct RequestMetadata { pub request_bytes: u64, /// if this is 0, there was a cache_hit pub backend_requests: AtomicU32, + pub no_servers: AtomicU32, pub error_response: AtomicBool, pub response_bytes: AtomicU64, pub response_millis: AtomicU64, diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 769b726a..07a55534 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -366,6 +366,7 @@ impl Web3Connections { pub async fn next_upstream_server( &self, authorized_request: Option<&Arc>, + request_metadata: Option<&Arc>, skip: &[Arc], min_block_needed: Option<&U64>, ) -> anyhow::Result { @@ -444,11 +445,20 @@ impl Web3Connections { match earliest_retry_at { None => { + if let Some(request_metadata) = request_metadata { + request_metadata.no_servers.fetch_add(1, Ordering::Release); + } + // TODO: error works, but maybe we should just wait a second? Err(anyhow::anyhow!("no servers synced")) } Some(earliest_retry_at) => { warn!("no servers on {:?}! {:?}", self, earliest_retry_at); + + if let Some(request_metadata) = request_metadata { + request_metadata.no_servers.fetch_add(1, Ordering::Release); + } + Ok(OpenRequestResult::RetryAt(earliest_retry_at)) } } @@ -513,7 +523,12 @@ impl Web3Connections { break; } match self - .next_upstream_server(authorized_request, &skip_rpcs, min_block_needed) + .next_upstream_server( + authorized_request, + request_metadata, + &skip_rpcs, + min_block_needed, + ) .await? { OpenRequestResult::Handle(active_request_handle) => { @@ -593,6 +608,11 @@ impl Web3Connections { // TODO: if a server catches up sync while we are waiting, we could stop waiting warn!(?retry_at, "All rate limits exceeded. Sleeping"); + // TODO: have a separate column for rate limited? + if let Some(request_metadata) = request_metadata { + request_metadata.no_servers.fetch_add(1, Ordering::Release); + } + sleep_until(retry_at).await; continue; @@ -600,6 +620,10 @@ impl Web3Connections { OpenRequestResult::RetryNever => { warn!(?self, "No server handles!"); + if let Some(request_metadata) = request_metadata { + request_metadata.no_servers.fetch_add(1, Ordering::Release); + } + // TODO: subscribe to something on synced connections. maybe it should just be a watch channel sleep(Duration::from_millis(200)).await; @@ -608,6 +632,13 @@ impl Web3Connections { } } + // TODO: do we need this here, or do we do it somewhere else? + if let Some(request_metadata) = request_metadata { + request_metadata + .error_response + .store(true, Ordering::Release); + } + Err(anyhow::anyhow!("all {} tries exhausted", skip_rpcs.len())) } @@ -629,6 +660,13 @@ impl Web3Connections { // TODO: benchmark this compared to waiting on unbounded futures // TODO: do something with this handle? // TODO: this is not working right. simplify + + if let Some(request_metadata) = request_metadata { + request_metadata + .backend_requests + .fetch_add(active_request_handles.len() as u32, Ordering::Release); + } + let quorum_response = self .try_send_parallel_requests( active_request_handles, @@ -649,6 +687,10 @@ impl Web3Connections { Err(None) => { warn!(?self, "No servers in sync! Retrying"); + if let Some(request_metadata) = &request_metadata { + request_metadata.no_servers.fetch_add(1, Ordering::Release); + } + // TODO: i don't think this will ever happen // TODO: return a 502? if it does? // return Err(anyhow::anyhow!("no available rpcs!")); @@ -664,6 +706,10 @@ impl Web3Connections { // TODO: if a server catches up sync while we are waiting, we could stop waiting warn!("All rate limits exceeded. Sleeping"); + if let Some(request_metadata) = &request_metadata { + request_metadata.no_servers.fetch_add(1, Ordering::Release); + } + sleep_until(retry_at).await; continue; diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 0fd900ca..9a922ecc 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -247,11 +247,45 @@ impl OpenRequestHandle { error_handler }; - // TODO: check for "execution reverted" here + // check for "execution reverted" here + let is_revert = if let ProviderError::JsonRpcClientError(err) = err { + // Http and Ws errors are very similar, but different types + let msg = match provider { + Web3Provider::Http(_) => { + if let Some(HttpClientError::JsonRpcError(err)) = + err.downcast_ref::() + { + Some(&err.message) + } else { + None + } + } + Web3Provider::Ws(_) => { + if let Some(WsClientError::JsonRpcError(err)) = + err.downcast_ref::() + { + Some(&err.message) + } else { + None + } + } + }; + + if let Some(msg) = msg { + msg.starts_with("execution reverted") + } else { + false + } + } else { + false + }; match error_handler { RequestErrorHandler::DebugLevel => { - debug!(?err, %method, rpc=%self.conn, "bad response!"); + // TODO: think about this revert check more. sometimes we might want reverts logged so this needs a flag + if !is_revert { + debug!(?err, %method, rpc=%self.conn, "bad response!"); + } } RequestErrorHandler::ErrorLevel => { error!(?err, %method, rpc=%self.conn, "bad response!"); @@ -260,55 +294,21 @@ impl OpenRequestHandle { warn!(?err, %method, rpc=%self.conn, "bad response!"); } RequestErrorHandler::SaveReverts => { - // TODO: logging every one is going to flood the database - // TODO: have a percent chance to do this. or maybe a "logged reverts per second" - if let ProviderError::JsonRpcClientError(err) = err { - // Http and Ws errors are very similar, but different types - let msg = match provider { - Web3Provider::Http(_) => { - if let Some(HttpClientError::JsonRpcError(err)) = - err.downcast_ref::() - { - Some(&err.message) - } else { - None - } - } - Web3Provider::Ws(_) => { - if let Some(WsClientError::JsonRpcError(err)) = - err.downcast_ref::() - { - Some(&err.message) - } else { - None - } - } - }; + // TODO: do not unwrap! (doesn't matter much since we check method as a string above) + let method: Method = Method::try_from_value(&method.to_string()).unwrap(); - if let Some(msg) = msg { - if msg.starts_with("execution reverted") { - // TODO: do not unwrap! (doesn't matter much since we check method as a string above) - let method: Method = - Method::try_from_value(&method.to_string()).unwrap(); + // TODO: DO NOT UNWRAP! But also figure out the best way to keep returning ProviderErrors here + let params: EthCallParams = serde_json::from_value(json!(params)) + .context("parsing params to EthCallParams") + .unwrap(); - // TODO: DO NOT UNWRAP! But also figure out the best way to keep returning ProviderErrors here - let params: EthCallParams = serde_json::from_value(json!(params)) - .context("parsing params to EthCallParams") - .unwrap(); + // spawn saving to the database so we don't slow down the request + let f = self + .authorized_request + .clone() + .save_revert(method, params.0 .0); - // spawn saving to the database so we don't slow down the request - let f = self - .authorized_request - .clone() - .save_revert(method, params.0 .0); - - tokio::spawn(async move { f.await }); - } else { - // TODO: log any of the errors? - debug!(?err, %method, rpc=%self.conn, "bad response!"); - } - } - } + tokio::spawn(f); } } } else { diff --git a/web3_proxy/src/stats.rs b/web3_proxy/src/stats.rs index d2611efa..d7d6bdcb 100644 --- a/web3_proxy/src/stats.rs +++ b/web3_proxy/src/stats.rs @@ -64,6 +64,7 @@ pub struct ProxyResponseAggregate { frontend_requests: AtomicU32, backend_requests: AtomicU32, backend_retries: AtomicU32, + no_servers: AtomicU32, cache_misses: AtomicU32, cache_hits: AtomicU32, sum_request_bytes: AtomicU64, @@ -122,12 +123,11 @@ impl ProxyResponseStat { let period_timestamp = (metadata.datetime.timestamp() as u64) / period_seconds * period_seconds; let request_bytes = metadata.request_bytes; - let response_millis = metadata - .datetime - .signed_duration_since(Utc::now()) - .num_seconds() as u64; let error_response = metadata.error_response.load(Ordering::Acquire); + let response_millis = + (Utc::now().timestamp_millis() - metadata.datetime.timestamp_millis()) as u64; + Self { user_key_id: authorized_key.user_key_id, method, @@ -226,6 +226,7 @@ impl StatEmitter { let frontend_requests = v.frontend_requests.load(Ordering::Acquire); let backend_requests = v.backend_requests.load(Ordering::Acquire); let backend_retries = v.backend_retries.load(Ordering::Acquire); + let no_servers = v.no_servers.load(Ordering::Acquire); let cache_misses = v.cache_misses.load(Ordering::Acquire); let cache_hits = v.cache_hits.load(Ordering::Acquire); let sum_request_bytes = v.sum_request_bytes.load(Ordering::Acquire); @@ -274,6 +275,7 @@ impl StatEmitter { frontend_requests: sea_orm::Set(frontend_requests), backend_requests: sea_orm::Set(backend_requests), backend_retries: sea_orm::Set(backend_retries), + no_servers: sea_orm::Set(no_servers), cache_misses: sea_orm::Set(cache_misses), cache_hits: sea_orm::Set(cache_hits), @@ -347,6 +349,7 @@ impl StatEmitter { frontend_requests: 0.into(), backend_requests: 0.into(), backend_retries: 0.into(), + no_servers: 0.into(), cache_misses: 0.into(), cache_hits: 0.into(), sum_request_bytes: 0.into(), @@ -364,10 +367,18 @@ impl StatEmitter { .frontend_requests .fetch_add(1, Ordering::Acquire); - // a stat might have multiple backend requests - user_aggregate - .backend_requests - .fetch_add(stat.backend_requests, Ordering::Acquire); + if stat.backend_requests == 0 { + // no backend request. cache hit! + user_aggregate.cache_hits.fetch_add(1, Ordering::Acquire); + } else { + // backend requests! cache miss! + user_aggregate.cache_misses.fetch_add(1, Ordering::Acquire); + + // a stat might have multiple backend requests + user_aggregate + .backend_requests + .fetch_add(stat.backend_requests, Ordering::Acquire); + } user_aggregate .sum_request_bytes @@ -386,8 +397,8 @@ impl StatEmitter { // TODO: use `record_correct`? histograms.request_bytes.record(stat.request_bytes)?; - histograms.response_bytes.record(stat.response_bytes)?; histograms.response_millis.record(stat.response_millis)?; + histograms.response_bytes.record(stat.response_bytes)?; } } }