From f704d1c4aaf4e0c39ae75a7b2c81432fb31353f5 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 6 Oct 2023 10:04:43 -0700 Subject: [PATCH] more refs will cut down on cloning an Arc multiple times that isn't really necessary --- web3_proxy/src/app/mod.rs | 116 +++++++++++++++---------------- web3_proxy/src/jsonrpc.rs | 1 + web3_proxy/src/rpcs/consensus.rs | 19 ++++- web3_proxy/src/rpcs/many.rs | 17 ++--- 4 files changed, 80 insertions(+), 73 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 485739c8..bebc78f8 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1097,7 +1097,7 @@ impl Web3ProxyApp { self: &Arc, web3_request: &Arc, ) -> Web3ProxyResult> { - if self.protected_rpcs.is_empty() { + let rpcs = if self.protected_rpcs.is_empty() { let num_public_rpcs = match web3_request.proxy_mode() { // TODO: how many balanced rpcs should we send to? configurable? percentage of total? ProxyMode::Best | ProxyMode::Debug => Some(4), @@ -1110,6 +1110,8 @@ impl Web3ProxyApp { ProxyMode::Versus => None, }; + self.balanced_rpcs.try_rpcs_for_request(web3_request).await + // // no private rpcs to send to. send to a few public rpcs // // try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here. // self.balanced_rpcs @@ -1120,18 +1122,11 @@ impl Web3ProxyApp { // num_public_rpcs, // ) // .await - todo!(); } else { - // self.protected_rpcs - // .try_send_all_synced_connections( - // web3_request, - // Some(Duration::from_secs(10)), - // Some(Level::TRACE.into()), - // Some(3), - // ) - // .await - todo!(); - } + self.protected_rpcs.try_rpcs_for_request(web3_request).await + }; + + todo!(); } /// proxy request with up to 3 tries. @@ -1157,52 +1152,51 @@ impl Web3ProxyApp { tries += 1; - let (code, response) = - match self._proxy_request_with_caching(web3_request.clone()).await { - Ok(response_data) => { - web3_request.error_response.store(false, Ordering::Relaxed); - web3_request - .user_error_response - .store(false, Ordering::Relaxed); + let (code, response) = match self._proxy_request_with_caching(&web3_request).await { + Ok(response_data) => { + web3_request.error_response.store(false, Ordering::Relaxed); + web3_request + .user_error_response + .store(false, Ordering::Relaxed); - (StatusCode::OK, response_data) + (StatusCode::OK, response_data) + } + Err(err @ Web3ProxyError::NullJsonRpcResult) => { + web3_request.error_response.store(false, Ordering::Relaxed); + web3_request + .user_error_response + .store(false, Ordering::Relaxed); + + err.as_json_response_parts(web3_request.id()) + } + Err(Web3ProxyError::JsonRpcResponse(response_data)) => { + web3_request.error_response.store(false, Ordering::Relaxed); + web3_request + .user_error_response + .store(response_data.is_error(), Ordering::Relaxed); + + let response = jsonrpc::ParsedResponse::from_response_data( + response_data, + web3_request.id(), + ); + (StatusCode::OK, response.into()) + } + Err(err) => { + if tries <= max_tries { + // TODO: log the error before retrying + continue; } - Err(err @ Web3ProxyError::NullJsonRpcResult) => { - web3_request.error_response.store(false, Ordering::Relaxed); - web3_request - .user_error_response - .store(false, Ordering::Relaxed); - err.as_json_response_parts(web3_request.id()) - } - Err(Web3ProxyError::JsonRpcResponse(response_data)) => { - web3_request.error_response.store(false, Ordering::Relaxed); - web3_request - .user_error_response - .store(response_data.is_error(), Ordering::Relaxed); + // max tries exceeded. return the error - let response = jsonrpc::ParsedResponse::from_response_data( - response_data, - web3_request.id(), - ); - (StatusCode::OK, response.into()) - } - Err(err) => { - if tries <= max_tries { - // TODO: log the error before retrying - continue; - } + web3_request.error_response.store(true, Ordering::Relaxed); + web3_request + .user_error_response + .store(false, Ordering::Relaxed); - // max tries exceeded. return the error - - web3_request.error_response.store(true, Ordering::Relaxed); - web3_request - .user_error_response - .store(false, Ordering::Relaxed); - - err.as_json_response_parts(web3_request.id()) - } - }; + err.as_json_response_parts(web3_request.id()) + } + }; web3_request.add_response(&response); @@ -1219,7 +1213,7 @@ impl Web3ProxyApp { /// TODO: how can we make this generic? async fn _proxy_request_with_caching( self: &Arc, - web3_request: Arc, + web3_request: &Arc, ) -> Web3ProxyResult { // TODO: serve net_version without querying the backend // TODO: don't force RawValue @@ -1338,7 +1332,7 @@ impl Web3ProxyApp { let mut gas_estimate = self .balanced_rpcs .try_proxy_connection::( - web3_request.clone(), + web3_request, ) .await? .parsed() @@ -1374,7 +1368,7 @@ impl Web3ProxyApp { let mut result = self .balanced_rpcs .try_proxy_connection::>( - web3_request.clone(), + web3_request, ) .await; @@ -1433,7 +1427,7 @@ impl Web3ProxyApp { let response = self .try_send_protected( - &web3_request, + web3_request, ).await; let mut response = response.try_into()?; @@ -1637,7 +1631,7 @@ impl Web3ProxyApp { web3_request.ttl(), self.balanced_rpcs .try_proxy_connection::>( - web3_request, + &web3_request, ) ).await?? } else { @@ -1658,7 +1652,7 @@ impl Web3ProxyApp { web3_request.ttl(), self.balanced_rpcs .try_proxy_connection::>( - web3_request.clone(), + &web3_request, ) ).await?? } @@ -1678,7 +1672,7 @@ impl Web3ProxyApp { // TODO: dynamic timeout based on whats left on web3_request let response_data = timeout(duration, app.balanced_rpcs .try_proxy_connection::>( - web3_request.clone(), + &web3_request, )).await; match response_data { @@ -1754,7 +1748,7 @@ impl Web3ProxyApp { web3_request.ttl(), self.balanced_rpcs .try_proxy_connection::>( - web3_request.clone(), + &web3_request, ) ).await??; diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index d957037a..54117bee 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -49,6 +49,7 @@ impl ParsedResponse> { match data { JsonRpcResponseEnum::NullResult => { let x: Box = Default::default(); + // TODO: how can we make this generic if this always wants to be a Box?. Do we even want to keep NullResult? Self::from_result(Arc::from(x), id) } JsonRpcResponseEnum::RpcError { error_data, .. } => Self::from_error(error_data, id), diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index e8d21487..4dc068d8 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -95,6 +95,12 @@ pub enum ShouldWaitForBlock { NeverReady, } +#[derive(Clone, Debug, Serialize)] +enum SortMethod { + Shuffle, + Sort, +} + /// A collection of Web3Rpcs that are on the same block. /// Serialize is so we can print it on our /status endpoint /// TODO: remove head_block/head_rpcs/tier and replace with one RankedRpcMap @@ -111,6 +117,8 @@ pub struct RankedRpcs { // TODO: make serializing work. the key needs to be a string. I think we need `serialize_with` #[serde(skip_serializing)] rpc_data: HashMap, ConsensusRpcData>, + + sort_mode: SortMethod, } pub struct RpcsForRequest { @@ -128,14 +136,18 @@ impl RankedRpcs { let num_synced = rpcs.len(); + // TODO: do we need real data in here? if we are calling from_rpcs, we probably don't even track their block let rpc_data = Default::default(); + let sort_mode = SortMethod::Shuffle; + let ranked_rpcs = RankedRpcs { backups_needed, head_block, inner: rpcs, num_synced, rpc_data, + sort_mode, }; Some(ranked_rpcs) @@ -205,10 +217,13 @@ impl RankedRpcs { // consensus found! trace!(?ranked_rpcs); + let sort_mode = SortMethod::Sort; + let consensus = RankedRpcs { backups_needed, head_block: best_block, rpc_data, + sort_mode, inner: ranked_rpcs, num_synced, }; @@ -219,7 +234,7 @@ impl RankedRpcs { None } - pub fn for_request(&self, web3_request: Arc) -> Option { + pub fn for_request(&self, web3_request: &Arc) -> Option { if self.num_active_rpcs() == 0 { return None; } @@ -251,7 +266,7 @@ impl RankedRpcs { Some(RpcsForRequest { inner, outer, - request: web3_request, + request: web3_request.clone(), }) } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index f39e9878..db8115c7 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1,6 +1,6 @@ //! Load balanced communication with a group of web3 rpc providers use super::blockchain::{BlocksByHashCache, BlocksByNumberCache, Web3ProxyBlock}; -use super::consensus::{RankedRpcs, RpcsForRequest, ShouldWaitForBlock}; +use super::consensus::{RankedRpcs, RpcsForRequest}; use super::one::Web3Rpc; use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler}; use crate::app::{flatten_handle, Web3ProxyApp, Web3ProxyJoinHandle}; @@ -10,7 +10,6 @@ use crate::frontend::authorization::Web3Request; use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::frontend::status::MokaCacheSerializer; use crate::jsonrpc::{self, JsonRpcErrorData, JsonRpcParams, JsonRpcResultData, ParsedResponse}; -use counter::Counter; use derive_more::From; use ethers::prelude::{TxHash, U64}; use futures::future::try_join_all; @@ -23,13 +22,11 @@ use parking_lot::RwLock; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use std::borrow::Cow; -use std::cmp::min_by_key; use std::fmt::{self, Display}; use std::sync::atomic::Ordering; use std::sync::Arc; use tokio::sync::{mpsc, watch}; -use tokio::task::yield_now; -use tokio::time::{sleep, sleep_until, timeout, Duration, Instant}; +use tokio::time::{Duration, Instant}; use tokio::{pin, select}; use tracing::{debug, error, info, instrument, trace, warn}; @@ -400,7 +397,7 @@ impl Web3Rpcs { /// this prefers synced servers, but it will return servers even if they aren't fully in sync. /// this does not gaurentee you won't be rate limited. we don't increment our counters until you try to send. so you might have to wait to be able to send /// TODO: should this wait for ranked rpcs? maybe only a fraction of web3_request's time? - pub async fn try_rpcs_for_request(&self, web3_request: Arc) -> TryRpcsForRequest { + pub async fn try_rpcs_for_request(&self, web3_request: &Arc) -> TryRpcsForRequest { // TODO: by_name might include things that are on a forked let ranked_rpcs: Arc = if let Some(ranked_rpcs) = self.watch_ranked_rpcs.borrow().clone() { @@ -435,7 +432,7 @@ impl Web3Rpcs { let web3_request = Web3Request::new_internal(method.into(), params, head_block, max_wait).await; - let response = self.request_with_metadata(web3_request).await?; + let response = self.request_with_metadata(&web3_request).await?; let parsed = response.parsed().await?; @@ -452,7 +449,7 @@ impl Web3Rpcs { /// TODO: take an arg for max_tries. take an arg for quorum(size) or serial pub async fn request_with_metadata( &self, - web3_request: Arc, + web3_request: &Arc, ) -> Web3ProxyResult> { let mut method_not_available_response = None; @@ -467,7 +464,7 @@ impl Web3Rpcs { let mut last_provider_error = None; // TODO: limit number of tries - match self.try_rpcs_for_request(web3_request.clone()).await { + match self.try_rpcs_for_request(web3_request).await { TryRpcsForRequest::None => return Err(Web3ProxyError::NoServersSynced), TryRpcsForRequest::RetryAt(retry_at) => { if retry_at > web3_request.expire_instant { @@ -839,7 +836,7 @@ impl Web3Rpcs { #[allow(clippy::too_many_arguments)] pub async fn try_proxy_connection( &self, - web3_request: Arc, + web3_request: &Arc, ) -> Web3ProxyResult> { let proxy_mode = web3_request.proxy_mode();