diff --git a/Cargo.lock b/Cargo.lock index bb87843d..0169c6d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6642,6 +6642,7 @@ dependencies = [ "arc-swap", "argh", "async-recursion", + "async-stream", "async-stripe", "async-trait", "axum", diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 572ee327..ce90b9ab 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -102,6 +102,7 @@ test-log = { version = "0.2.12", default-features = false, features = ["trace"] bytes = "1.5.0" futures-util = "0.3.28" async-recursion = "1.0.5" +async-stream = "0.3.5" # # TODO: bring this back # check-if-email-exists = "0.9.0" diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 354eb37b..485739c8 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -42,14 +42,14 @@ use std::fmt; use std::net::IpAddr; use std::num::NonZeroU64; use std::str::FromStr; -use std::sync::atomic::{AtomicU16, Ordering}; -use std::sync::{atomic, Arc}; +use std::sync::atomic::{self, AtomicU16, Ordering}; +use std::sync::Arc; use std::time::Duration; use tokio::select; use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore}; use tokio::task::JoinHandle; use tokio::time::{sleep, timeout, Instant}; -use tracing::{error, info, trace, warn, Level}; +use tracing::{error, info, trace, warn}; // TODO: make this customizable? // TODO: include GIT_REF in here. i had trouble getting https://docs.rs/vergen/latest/vergen/ to work with a workspace. also .git is in .dockerignore @@ -1106,28 +1106,31 @@ impl Web3ProxyApp { // TODO: what if we do 2 per tier? we want to blast the third party rpcs // TODO: maybe having the third party rpcs in their own Web3Rpcs would be good for this ProxyMode::Fastest(x) => Some(x * 4), + ProxyMode::Quorum(x, ..) => Some(x), ProxyMode::Versus => None, }; - // no private rpcs to send to. send to a few public rpcs - // try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here. - self.balanced_rpcs - .try_send_all_synced_connections( - web3_request, - Some(Duration::from_secs(10)), - Some(Level::TRACE.into()), - num_public_rpcs, - ) - .await + // // no private rpcs to send to. send to a few public rpcs + // // try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here. + // self.balanced_rpcs + // .try_send_all_synced_connections( + // web3_request, + // Some(Duration::from_secs(10)), + // Some(Level::TRACE.into()), + // num_public_rpcs, + // ) + // .await + todo!(); } else { - self.protected_rpcs - .try_send_all_synced_connections( - web3_request, - Some(Duration::from_secs(10)), - Some(Level::TRACE.into()), - Some(3), - ) - .await + // self.protected_rpcs + // .try_send_all_synced_connections( + // web3_request, + // Some(Duration::from_secs(10)), + // Some(Level::TRACE.into()), + // Some(3), + // ) + // .await + todo!(); } } @@ -1154,51 +1157,52 @@ impl Web3ProxyApp { tries += 1; - let (code, response) = match self._proxy_request_with_caching(&web3_request).await { - Ok(response_data) => { - web3_request.error_response.store(false, Ordering::Relaxed); - web3_request - .user_error_response - .store(false, Ordering::Relaxed); + let (code, response) = + match self._proxy_request_with_caching(web3_request.clone()).await { + Ok(response_data) => { + web3_request.error_response.store(false, Ordering::Relaxed); + web3_request + .user_error_response + .store(false, Ordering::Relaxed); - (StatusCode::OK, response_data) - } - Err(err @ Web3ProxyError::NullJsonRpcResult) => { - web3_request.error_response.store(false, Ordering::Relaxed); - web3_request - .user_error_response - .store(false, Ordering::Relaxed); - - err.as_json_response_parts(web3_request.id()) - } - Err(Web3ProxyError::JsonRpcResponse(response_data)) => { - web3_request.error_response.store(false, Ordering::Relaxed); - web3_request - .user_error_response - .store(response_data.is_error(), Ordering::Relaxed); - - let response = jsonrpc::ParsedResponse::from_response_data( - response_data, - web3_request.id(), - ); - (StatusCode::OK, response.into()) - } - Err(err) => { - if tries <= max_tries { - // TODO: log the error before retrying - continue; + (StatusCode::OK, response_data) } + Err(err @ Web3ProxyError::NullJsonRpcResult) => { + web3_request.error_response.store(false, Ordering::Relaxed); + web3_request + .user_error_response + .store(false, Ordering::Relaxed); - // max tries exceeded. return the error + err.as_json_response_parts(web3_request.id()) + } + Err(Web3ProxyError::JsonRpcResponse(response_data)) => { + web3_request.error_response.store(false, Ordering::Relaxed); + web3_request + .user_error_response + .store(response_data.is_error(), Ordering::Relaxed); - web3_request.error_response.store(true, Ordering::Relaxed); - web3_request - .user_error_response - .store(false, Ordering::Relaxed); + let response = jsonrpc::ParsedResponse::from_response_data( + response_data, + web3_request.id(), + ); + (StatusCode::OK, response.into()) + } + Err(err) => { + if tries <= max_tries { + // TODO: log the error before retrying + continue; + } - err.as_json_response_parts(web3_request.id()) - } - }; + // max tries exceeded. return the error + + web3_request.error_response.store(true, Ordering::Relaxed); + web3_request + .user_error_response + .store(false, Ordering::Relaxed); + + err.as_json_response_parts(web3_request.id()) + } + }; web3_request.add_response(&response); @@ -1215,7 +1219,7 @@ impl Web3ProxyApp { /// TODO: how can we make this generic? async fn _proxy_request_with_caching( self: &Arc, - web3_request: &Arc, + web3_request: Arc, ) -> Web3ProxyResult { // TODO: serve net_version without querying the backend // TODO: don't force RawValue @@ -1334,7 +1338,7 @@ impl Web3ProxyApp { let mut gas_estimate = self .balanced_rpcs .try_proxy_connection::( - web3_request, + web3_request.clone(), ) .await? .parsed() @@ -1370,7 +1374,7 @@ impl Web3ProxyApp { let mut result = self .balanced_rpcs .try_proxy_connection::>( - web3_request, + web3_request.clone(), ) .await; @@ -1429,7 +1433,7 @@ impl Web3ProxyApp { let response = self .try_send_protected( - web3_request, + &web3_request, ).await; let mut response = response.try_into()?; @@ -1633,7 +1637,7 @@ impl Web3ProxyApp { web3_request.ttl(), self.balanced_rpcs .try_proxy_connection::>( - &web3_request, + web3_request, ) ).await?? } else { @@ -1654,7 +1658,7 @@ impl Web3ProxyApp { web3_request.ttl(), self.balanced_rpcs .try_proxy_connection::>( - &web3_request, + web3_request.clone(), ) ).await?? } @@ -1674,7 +1678,7 @@ impl Web3ProxyApp { // TODO: dynamic timeout based on whats left on web3_request let response_data = timeout(duration, app.balanced_rpcs .try_proxy_connection::>( - &web3_request, + web3_request.clone(), )).await; match response_data { @@ -1750,7 +1754,7 @@ impl Web3ProxyApp { web3_request.ttl(), self.balanced_rpcs .try_proxy_connection::>( - &web3_request, + web3_request.clone(), ) ).await??; diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 368ed7cc..9e4d8209 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -40,11 +40,13 @@ use tracing::trace; /// How to select backend servers for a request #[derive(Copy, Clone, Debug, Default)] pub enum ProxyMode { - /// send to the "best" synced server + /// send to the "best" synced server. on error, try the next #[default] Best, /// send to all synced servers and return the fastest non-error response (reverts do not count as errors here) Fastest(usize), + /// send to k servers and return the best response common between at least n servers + Quorum(usize, usize), /// send to all servers for benchmarking. return the fastest non-error response Versus, /// send all requests and responses to kafka diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index 55b288bb..d957037a 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -209,6 +209,7 @@ pub enum Payload { #[derive(Debug)] pub struct StreamResponse { + // TODO: phantom T on here? buffer: Bytes, response: reqwest::Response, web3_request: Arc, @@ -246,6 +247,7 @@ impl IntoResponse for StreamResponse { #[derive(Debug)] pub enum SingleResponse> { + /// TODO: save the size here so we don't have to serialize again Parsed(ParsedResponse), Stream(StreamResponse), } diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index b83696e0..e8d21487 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -1,20 +1,25 @@ use super::blockchain::Web3ProxyBlock; use super::many::Web3Rpcs; use super::one::Web3Rpc; +use super::request::OpenRequestHandle; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; +use crate::frontend::authorization::Web3Request; +use crate::rpcs::request::OpenRequestResult; +use async_stream::stream; use base64::engine::general_purpose; use derive_more::Constructor; use ethers::prelude::{H256, U64}; +use futures_util::Stream; use hashbrown::{HashMap, HashSet}; use hdrhistogram::serialization::{Serializer, V2DeflateSerializer}; use hdrhistogram::Histogram; use itertools::{Itertools, MinMaxResult}; use moka::future::Cache; use serde::Serialize; -use std::cmp::{Ordering, Reverse}; +use std::cmp::{min_by_key, Ordering, Reverse}; use std::sync::{atomic, Arc}; use std::time::Duration; -use tokio::time::Instant; +use tokio::time::{sleep_until, Instant}; use tracing::{debug, enabled, info, trace, warn, Level}; #[derive(Clone, Debug, Serialize)] @@ -81,6 +86,7 @@ impl PartialOrd for RpcRanking { } } +/// TODO: i think we can get rid of this in favor of pub enum ShouldWaitForBlock { Ready, // BackupReady, @@ -107,7 +113,34 @@ pub struct RankedRpcs { rpc_data: HashMap, ConsensusRpcData>, } +pub struct RpcsForRequest { + inner: Vec>, + outer: Vec>, + request: Arc, +} + impl RankedRpcs { + pub fn from_rpcs(rpcs: Vec>, head_block: Web3ProxyBlock) -> Option { + // we don't need to sort the rpcs now. we will sort them when a request neds them + // TODO: the shame about this is that we lose just being able to compare 2 random servers + + let backups_needed = rpcs.iter().any(|x| x.backup); + + let num_synced = rpcs.len(); + + let rpc_data = Default::default(); + + let ranked_rpcs = RankedRpcs { + backups_needed, + head_block, + inner: rpcs, + num_synced, + rpc_data, + }; + + Some(ranked_rpcs) + } + pub fn from_votes( min_synced_rpcs: usize, min_sum_soft_limit: u32, @@ -186,6 +219,42 @@ impl RankedRpcs { None } + pub fn for_request(&self, web3_request: Arc) -> Option { + if self.num_active_rpcs() == 0 { + return None; + } + + // these are bigger than we need, but how much does that matter? + let mut inner = Vec::with_capacity(self.num_active_rpcs()); + let mut outer = Vec::with_capacity(self.num_active_rpcs()); + + // TODO: what if min is set to some future block? + let max_block_needed = web3_request + .max_block_needed() + .or_else(|| web3_request.head_block.as_ref().map(|x| x.number())); + let min_block_needed = web3_request.min_block_needed(); + + for rpc in &self.inner { + match self.rpc_will_work_eventually(rpc, min_block_needed, max_block_needed) { + ShouldWaitForBlock::NeverReady => continue, + ShouldWaitForBlock::Ready => inner.push(rpc.clone()), + ShouldWaitForBlock::Wait { .. } => outer.push(rpc.clone()), + } + } + + let mut rng = nanorand::tls_rng(); + + // we use shuffle instead of sort. we will compare weights when iterating RankedRpcsForRequest + inner.sort_by_cached_key(|x| x.shuffle_for_load_balancing_on(&mut rng, max_block_needed)); + outer.sort_by_cached_key(|x| x.shuffle_for_load_balancing_on(&mut rng, max_block_needed)); + + Some(RpcsForRequest { + inner, + outer, + request: web3_request, + }) + } + pub fn all(&self) -> &[Arc] { &self.inner } @@ -200,27 +269,6 @@ impl RankedRpcs { self.inner.len() } - /// will tell you if waiting will eventually should wait for a block - /// TODO: error if backup will be needed to serve the request? - /// TODO: serve now if a backup server has the data? - /// TODO: also include method (or maybe an enum representing the different prune types) - pub fn should_wait_for_block( - &self, - min_block_num: Option, - max_block_num: Option, - skip_rpcs: &[Arc], - ) -> ShouldWaitForBlock { - for rpc in self.inner.iter() { - match self.rpc_will_work_eventually(rpc, min_block_num, max_block_num, skip_rpcs) { - ShouldWaitForBlock::NeverReady => continue, - x => return x, - } - } - - ShouldWaitForBlock::NeverReady - } - - /// TODO: change this to take a min and a max pub fn has_block_data(&self, rpc: &Web3Rpc, block_num: U64) -> bool { self.rpc_data .get(rpc) @@ -228,21 +276,13 @@ impl RankedRpcs { .unwrap_or(false) } - // TODO: take method as a param, too. mark nodes with supported methods (maybe do it optimistically? on) - // TODO: move this onto Web3Rpc? - // TODO: this needs min and max block on it + // TODO: take method as a param, too. mark nodes with supported methods (maybe do it optimistically? on error mark them as not supporting it) pub fn rpc_will_work_eventually( &self, rpc: &Arc, min_block_num: Option, max_block_num: Option, - skip_rpcs: &[Arc], ) -> ShouldWaitForBlock { - if skip_rpcs.contains(rpc) { - // if rpc is skipped, it must have already been determined it is unable to serve the request - return ShouldWaitForBlock::NeverReady; - } - if let Some(min_block_num) = min_block_num { if !self.has_block_data(rpc, min_block_num) { trace!( @@ -289,16 +329,10 @@ impl RankedRpcs { // TODO: this should probably take the method, too pub fn rpc_will_work_now( &self, - skip: &[Arc], min_block_needed: Option, max_block_needed: Option, rpc: &Arc, ) -> bool { - if skip.contains(rpc) { - trace!("skipping {}", rpc); - return false; - } - if rpc.backup && !self.backups_needed { // skip backups unless backups are needed for ranked_rpcs to exist return false; @@ -326,13 +360,14 @@ impl RankedRpcs { } } - // TODO: this might be a big perf hit. benchmark - if let Some(x) = rpc.hard_limit_until.as_ref() { - if *x.borrow() > Instant::now() { - trace!("{} is rate limited. will not work now", rpc,); - return false; - } - } + // TODO: i think its better to do rate limits later anyways. think more about it though + // // TODO: this might be a big perf hit. benchmark + // if let Some(x) = rpc.hard_limit_until.as_ref() { + // if *x.borrow() > Instant::now() { + // trace!("{} is rate limited. will not work now", rpc,); + // return false; + // } + // } true } @@ -914,3 +949,65 @@ impl ConsensusFinder { .max() } } + +impl RpcsForRequest { + pub fn to_stream(self) -> impl Stream { + // TODO: get error_handler out of the web3_request, probably the authorization + // let error_handler = web3_request.authorization.error_handler; + let error_handler = None; + + stream! { + loop { + let mut earliest_retry_at = None; + + for (rpc_a, rpc_b) in self.inner.iter().circular_tuple_windows() { + trace!("{} vs {}", rpc_a, rpc_b); + // TODO: ties within X% to the server with the smallest block_data_limit? + // find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose + let faster_rpc = min_by_key(rpc_a, rpc_b, |x| (Reverse(x.next_available()), x.backup, x.weighted_peak_latency())); + trace!("winner: {}", faster_rpc); + + match faster_rpc + .try_request_handle(&self.request, error_handler) + .await + { + Ok(OpenRequestResult::Handle(handle)) => { + trace!("opened handle: {}", faster_rpc); + yield handle; + } + Ok(OpenRequestResult::RetryAt(retry_at)) => { + trace!( + "retry on {} @ {}", + faster_rpc, + retry_at.duration_since(Instant::now()).as_secs_f32() + ); + + earliest_retry_at = earliest_retry_at.min(Some(retry_at)); + continue; + } + Ok(OpenRequestResult::NotReady) => { + // TODO: log a warning? emit a stat? + trace!("best_rpc not ready: {}", faster_rpc); + continue; + } + Err(err) => { + trace!("No request handle for {}. err={:?}", faster_rpc, err); + continue; + } + } + } + + // TODO: check self.outer + + if let Some(retry_at) = earliest_retry_at { + if self.request.expire_instant <= retry_at { + break; + } + sleep_until(retry_at).await; + } else { + break; + } + } + } + } +} diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 508c3b54..e1478eb1 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1,6 +1,6 @@ //! Load balanced communication with a group of web3 rpc providers use super::blockchain::{BlocksByHashCache, BlocksByNumberCache, Web3ProxyBlock}; -use super::consensus::{RankedRpcs, ShouldWaitForBlock}; +use super::consensus::{RankedRpcs, RpcsForRequest, ShouldWaitForBlock}; use super::one::Web3Rpc; use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler}; use crate::app::{flatten_handle, Web3ProxyApp, Web3ProxyJoinHandle}; @@ -27,10 +27,10 @@ use std::cmp::min_by_key; use std::fmt::{self, Display}; use std::sync::atomic::Ordering; use std::sync::Arc; -use tokio::select; use tokio::sync::{mpsc, watch}; use tokio::task::yield_now; use tokio::time::{sleep, sleep_until, timeout, Duration, Instant}; +use tokio::{pin, select}; use tracing::{debug, error, info, instrument, trace, warn}; /// A collection of web3 connections. Sends requests either the current best server or all servers. @@ -69,6 +69,34 @@ pub struct Web3Rpcs { pub(super) pending_txid_firehose_sender: Option>, } +/// this is a RankedRpcs that should be ready to use +/// there is a small chance of waiting for rate limiting depending on how many backend servers you have +#[derive(From)] +pub enum TryRpcsForRequest { + Some(RpcsForRequest), + RetryAt(Instant), + // WaitForBlock(U64), + None, +} + +impl From> for TryRpcsForRequest { + fn from(value: Option) -> Self { + match value { + None => Self::None, + Some(x) => Self::RetryAt(x), + } + } +} + +impl From> for TryRpcsForRequest { + fn from(value: Option) -> Self { + match value { + None => Self::None, + Some(x) => Self::Some(x), + } + } +} + impl Web3Rpcs { /// Spawn durable connections to multiple Web3 providers. pub async fn spawn( @@ -368,370 +396,32 @@ impl Web3Rpcs { Ok(()) } - /// Send the same request to all the handles. Returning the most common success or most common error. - /// TODO: option to return the fastest (non-null, non-error) response and handles for all the others instead? - pub async fn try_send_parallel_requests( - &self, - active_request_handles: Vec, - max_wait: Option, - ) -> Result { - // TODO: if only 1 active_request_handles, do self.try_send_request? - - let max_wait = max_wait.unwrap_or_else(|| Duration::from_secs(300)); - - // TODO: iter stream - let responses = active_request_handles - .into_iter() - .map(|active_request_handle| async move { - let result: Result, Web3ProxyError> = - timeout(max_wait, async { - match active_request_handle.request().await { - Ok(response) => match response.parsed().await { - Ok(parsed) => parsed.into_result(), - Err(err) => Err(Web3ProxyError::EthersProvider(err)), - }, - Err(err) => Err(Web3ProxyError::EthersProvider(err)), - } - }) - .await - .map_err(Web3ProxyError::from); - - result.flatten() - }) - .collect::>() - .collect::>>() - .await; - - // TODO: Strings are not great keys, but we can't use RawValue or ProviderError as keys because they don't implement Hash or Eq - let mut count_map: HashMap = HashMap::new(); - let mut counts: Counter = Counter::new(); - let mut any_ok_with_json_result = false; - for partial_response in responses { - if partial_response.is_ok() { - any_ok_with_json_result = true; - } - - // TODO: better key! - let s = format!("{:?}", partial_response); - - if count_map.get(&s).is_none() { - count_map.insert(s.clone(), partial_response); - } - - counts.update([s]); - } - - // return the most_common success if any. otherwise return the most_common error - for (most_common, _) in counts.most_common_ordered() { - let most_common = count_map - .remove(&most_common) - .expect("most_common key must exist"); - - match most_common { - Ok(x) => { - // return the most common success - return Ok(x); - } - Err(err) => { - if any_ok_with_json_result { - // the most common is an error, but there is an Ok in here somewhere. continue the loop to find it - continue; - } - - return Err(err); - } - } - } - - // TODO: what should we do if we get here? i don't think we will - unimplemented!("this shouldn't be possible") - } - - async fn _best_available_rpc( - &self, - web3_request: &Arc, - error_handler: Option, - potential_rpcs: &[Arc], - skip: &mut Vec>, - ) -> OpenRequestResult { - let mut earliest_retry_at = None; - - for (rpc_a, rpc_b) in potential_rpcs.iter().circular_tuple_windows() { - trace!("{} vs {}", rpc_a, rpc_b); - // TODO: ties within X% to the server with the smallest block_data_limit - // faster rpc. backups always lose. - let faster_rpc = min_by_key(rpc_a, rpc_b, |x| (x.backup, x.weighted_peak_latency())); - trace!("winner: {}", faster_rpc); - - // add to the skip list in case this one fails - skip.push(Arc::clone(faster_rpc)); - - // just because it has lower latency doesn't mean we are sure to get a connection. there might be rate limits - // TODO: what error_handler? - match faster_rpc - .try_request_handle(web3_request, error_handler) - .await - { - Ok(OpenRequestResult::Handle(handle)) => { - trace!("opened handle: {}", faster_rpc); - return OpenRequestResult::Handle(handle); - } - Ok(OpenRequestResult::RetryAt(retry_at)) => { - trace!( - "retry on {} @ {}", - faster_rpc, - retry_at.duration_since(Instant::now()).as_secs_f32() - ); - - if earliest_retry_at.is_none() { - earliest_retry_at = Some(retry_at); - } else { - earliest_retry_at = earliest_retry_at.min(Some(retry_at)); - } - } - Ok(OpenRequestResult::NotReady) => { - // TODO: log a warning? emit a stat? - trace!("best_rpc not ready: {}", faster_rpc); - } - Err(err) => { - trace!("No request handle for {}. err={:?}", faster_rpc, err) - } - } - } - - if let Some(retry_at) = earliest_retry_at { - OpenRequestResult::RetryAt(retry_at) - } else { - OpenRequestResult::NotReady - } - } - - /// TODO: skip_rpcs should probably be on the web3_request, too - #[instrument(level = "trace")] - pub async fn wait_for_best_rpc( - &self, - web3_request: &Arc, - skip_rpcs: &mut Vec>, - error_handler: Option, - ) -> Web3ProxyResult { - let mut earliest_retry_at: Option = None; - - if self.watch_head_block.is_none() { - // if this group of servers is not watching the head block, we don't know what is "best" based on block height - // TODO: do this without cloning here - let potential_rpcs = self.by_name.read().values().cloned().collect::>(); - - let x = self - ._best_available_rpc(web3_request, error_handler, &potential_rpcs, skip_rpcs) - .await; - - return Ok(x); - } - - let mut watch_ranked_rpcs = self.watch_ranked_rpcs.subscribe(); - - let mut potential_rpcs = Vec::new(); - - let min_block_needed = web3_request.min_block_needed(); - let max_block_needed = web3_request.max_block_needed(); - - // TODO: max loop count if no max_wait? - loop { - // TODO: need a change so that protected and 4337 rpcs set watch_consensus_rpcs on start - let ranked_rpcs: Option> = - watch_ranked_rpcs.borrow_and_update().clone(); - - // first check everything that is synced - // even though we might be querying an old block that an unsynced server can handle, - // it is best to not send queries to a syncing server. that slows down sync and can bloat erigon's disk usage. - if let Some(ranked_rpcs) = ranked_rpcs { - potential_rpcs.extend( - ranked_rpcs - .all() - .iter() - .filter(|rpc| { - // TODO: instrument this? - ranked_rpcs.rpc_will_work_now( - skip_rpcs, - min_block_needed, - max_block_needed, - rpc, - ) - }) - .cloned(), - ); - - if potential_rpcs.len() >= self.min_synced_rpcs { - // we have enough potential rpcs. try to load balance - potential_rpcs - .sort_by_cached_key(|x| x.shuffle_for_load_balancing_on(max_block_needed)); - - match self - ._best_available_rpc( - web3_request, - error_handler, - &potential_rpcs, - skip_rpcs, - ) - .await - { - OpenRequestResult::Handle(x) => return Ok(OpenRequestResult::Handle(x)), - OpenRequestResult::NotReady => {} - OpenRequestResult::RetryAt(retry_at) => { - if earliest_retry_at.is_none() { - earliest_retry_at = Some(retry_at); - } else { - earliest_retry_at = earliest_retry_at.min(Some(retry_at)); - } - } - } - } - - match ranked_rpcs.should_wait_for_block( - min_block_needed, - max_block_needed, - skip_rpcs, - ) { - ShouldWaitForBlock::NeverReady => break, - ShouldWaitForBlock::Ready => { - if web3_request.ttl_expired() { - break; - } - // TODO: i don't see how we can get here. something feels wrong if this is common. - // maybe from a race? maybe _best_available_rpc returned NotReady just as a node synced - yield_now().await; - } - ShouldWaitForBlock::Wait { .. } => select! { - _ = watch_ranked_rpcs.changed() => { - // no need to borrow_and_update because we do that at the top of the loop - // TODO: wait until watched_ranked_rpcs is on the right block? - trace!("watch ranked rpcs changed"); - }, - _ = sleep(web3_request.ttl()) => break, - }, - } - } else { - trace!("no potential rpcs"); - select! { - _ = watch_ranked_rpcs.changed() => { - // no need to borrow_and_update because we do that at the top of the loop - trace!("watch ranked rpcs changed"); - }, - _ = sleep(web3_request.ttl()) => break, - } - } - - // clear for the next loop - potential_rpcs.clear(); - } - - web3_request.no_servers.fetch_add(1, Ordering::Relaxed); - - if let Some(retry_at) = earliest_retry_at { - // TODO: log the server that retry_at came from - warn!( - ?skip_rpcs, - retry_in_s=?retry_at.duration_since(Instant::now()).as_secs_f32(), - "no servers in {} ready!", - self, - ); - - Ok(OpenRequestResult::RetryAt(retry_at)) - } else { - warn!(?skip_rpcs, "no servers in {} ready!", self); - - Ok(OpenRequestResult::NotReady) - } - } - /// get all rpc servers that are not rate limited /// this prefers synced servers, but it will return servers even if they aren't fully in sync. - /// This is useful for broadcasting signed transactions. - /// TODO: better type on this that can return an anyhow::Result - /// TODO: redo this to just return all the connections. better to do rate limits at the end - pub async fn all_connections( - &self, - web3_request: &Arc, - max_count: Option, - error_level: Option, - ) -> Result, Option> { - let mut earliest_retry_at = None; + /// this does not gaurentee you won't be rate limited. we don't increment our counters until you try to send. so you might have to wait to be able to send + /// TODO: should this wait for ranked rpcs? maybe only a fraction of web3_request's time? + pub async fn try_rpcs_for_request(&self, web3_request: Arc) -> TryRpcsForRequest { + // TODO: by_name might include things that are on a forked + let ranked_rpcs: Arc = + if let Some(ranked_rpcs) = self.watch_ranked_rpcs.borrow().clone() { + ranked_rpcs + } else if let Some(head_block) = web3_request.head_block.clone() { + // if we are here, self isn't watching head blocks but some other Web3Rpcs is. Return all the rpcs + let rpcs = self.by_name.read().values().cloned().collect(); - // TODO: filter the rpcs with Ranked.will_work_now - let mut all_rpcs: Vec<_> = self.by_name.read().values().cloned().collect(); - - let mut max_count = if let Some(max_count) = max_count { - max_count - } else { - all_rpcs.len() - }; - - trace!("max_count: {}", max_count); - - if max_count == 0 { - // TODO: return a future that resolves when we know a head block? - return Err(None); - } - - let mut selected_rpcs = Vec::with_capacity(max_count); - - // TODO: this sorts them all even though we probably won't need all of them. think about this more - all_rpcs - .sort_by_cached_key(|x| x.sort_for_load_balancing_on(web3_request.max_block_needed())); - - trace!("unfiltered all_rpcs: {:#?}", all_rpcs); - - for rpc in all_rpcs { - trace!("trying {}", rpc); - - // TODO: use a helper function for these - if let Some(block_needed) = web3_request.min_block_needed() { - trace!("min_block_needed: {}", block_needed); - if !rpc.has_block_data(block_needed) { - trace!("{} is missing min_block_needed. skipping", rpc); - continue; + if let Some(x) = RankedRpcs::from_rpcs(rpcs, head_block) { + Arc::new(x) + } else { + // i doubt we will ever get here + // TODO: return a future that resolves once we do have something? + return TryRpcsForRequest::None; } - } + } else { + // TODO: return a future that resolves once we do have something? + return TryRpcsForRequest::None; + }; - if let Some(block_needed) = web3_request.max_block_needed() { - trace!("max_block_needed: {}", block_needed); - if !rpc.has_block_data(block_needed) { - trace!("{} is missing max_block_needed. skipping", rpc); - continue; - } - } - - // check rate limits and increment our connection counter - match rpc.try_request_handle(web3_request, error_level).await { - Ok(OpenRequestResult::RetryAt(retry_at)) => { - // this rpc is not available. skip it - trace!("{} is rate limited. skipping", rpc); - earliest_retry_at = earliest_retry_at.min(Some(retry_at)); - } - Ok(OpenRequestResult::Handle(handle)) => { - trace!("{} is available", rpc); - selected_rpcs.push(handle); - - max_count -= 1; - if max_count == 0 { - break; - } - } - Ok(OpenRequestResult::NotReady) => { - warn!("no request handle for {}", rpc) - } - Err(err) => { - warn!(?err, "error getting request handle for {}", rpc) - } - } - } - - if !selected_rpcs.is_empty() { - return Ok(selected_rpcs); - } - - // return the earliest retry_after (if no rpcs are synced, this will be None) - Err(earliest_retry_at) + ranked_rpcs.for_request(web3_request).into() } pub async fn internal_request( @@ -742,12 +432,13 @@ impl Web3Rpcs { ) -> Web3ProxyResult { let head_block = self.head_block(); - // TODO: i think we actually always want balanced_rpcs on this! let web3_request = Web3Request::new_internal(method.into(), params, head_block, max_wait).await; - let response = self.request_with_metadata(&web3_request).await?; + let response = self.request_with_metadata(web3_request).await?; + let parsed = response.parsed().await?; + match parsed.payload { jsonrpc::Payload::Success { result } => Ok(result), // TODO: confirm this error type is correct @@ -756,33 +447,42 @@ impl Web3Rpcs { } /// Make a request with stat tracking. + /// The first jsonrpc response will be returned. + /// TODO: move this to RankedRpcsForRequest along with a bunch of other similar functions + /// TODO: take an arg for max_tries. take an arg for quorum(size) or serial pub async fn request_with_metadata( &self, - web3_request: &Arc, + web3_request: Arc, ) -> Web3ProxyResult> { - let mut skip_rpcs = vec![]; let mut method_not_available_response = None; - let mut watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe(); + let watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe(); // set error_handler to Save. this might be overridden depending on the web3_request.authorization + // TODO: rename this to make it clear it might be overriden let error_handler = Some(RequestErrorHandler::Save); + // TODO: collect the most common error + let mut last_jsonrpc_error = None; let mut last_provider_error = None; - // TODO: the loop here feels somewhat redundant with the loop in best_available_rpc - loop { - if web3_request.ttl_expired() { - break; + // TODO: limit number of tries + match self.try_rpcs_for_request(web3_request.clone()).await { + TryRpcsForRequest::None => return Err(Web3ProxyError::NoServersSynced), + TryRpcsForRequest::RetryAt(retry_at) => { + if retry_at > web3_request.expire_instant { + return Err(Web3ProxyError::RateLimited( + Default::default(), + Some(retry_at), + )); + } } + TryRpcsForRequest::Some(rpcs) => { + let stream = rpcs.to_stream(); - match self - .wait_for_best_rpc(web3_request, &mut skip_rpcs, error_handler) - .await? - { - OpenRequestResult::Handle(active_request_handle) => { - // save the rpc in case we get an error and want to retry on another server - // TODO: look at backend_requests instead + pin!(stream); + + while let Some(active_request_handle) = stream.next().await { let rpc = active_request_handle.clone_connection(); web3_request.backend_requests.lock().push(rpc.clone()); @@ -856,7 +556,7 @@ impl Web3Rpcs { error!( %error_msg, "unexpected result limit by {}", - skip_rpcs.last().unwrap(), + "???" ); continue; } @@ -864,7 +564,7 @@ impl Web3Rpcs { warn!( %error_msg, "rate limited by {}", - skip_rpcs.last().unwrap() + "???" ); continue; } @@ -914,62 +614,21 @@ impl Web3Rpcs { _ => {} } - // let rpc = skip_rpcs - // .last() - // .expect("there must have been a provider if we got an error"); - - // TODO: emit a stat. if a server is getting skipped a lot, something is not right - - // TODO: if we get a TrySendError, reconnect. wait why do we see a trysenderror on a dual provider? shouldn't it be using reqwest - - // TODO! WRONG! ONLY SET RETRY_AT IF THIS IS A SERVER/CONNECTION ERROR. JSONRPC "error" is FINE - // trace!( - // "Backend server error on {}! Retrying {:?} on another. err={:?}", - // rpc, - // request, - // error, - // ); - // if let Some(ref hard_limit_until) = rpc.hard_limit_until { - // let retry_at = Instant::now() + Duration::from_secs(1); - - // hard_limit_until.send_replace(retry_at); - // } - - return Err(error.into()); + last_jsonrpc_error = Some(error); } } } - OpenRequestResult::RetryAt(retry_at) => { - // TODO: move this to a helper function - // sleep (TODO: with a lock?) until our rate limits should be available - // TODO: if a server catches up sync while we are waiting, we could stop waiting - warn!( - "All rate limits exceeded. waiting for change in synced servers or {:?}s", - retry_at.duration_since(Instant::now()).as_secs_f32() - ); - - // TODO: have a separate column for rate limited? - web3_request.no_servers.fetch_add(1, Ordering::Relaxed); - - select! { - _ = sleep_until(retry_at) => { - trace!("slept!"); - skip_rpcs.pop(); - } - _ = watch_consensus_rpcs.changed() => { - // we don't actually care what they are now. we just care that they changed - watch_consensus_rpcs.borrow_and_update(); - - } - } - } - OpenRequestResult::NotReady => { - web3_request.error_response.store(true, Ordering::Relaxed); - break; - } } } + if let Some(err) = last_jsonrpc_error { + return Ok(err.into()); + } + + if let Some(err) = last_provider_error { + return Err(err.into()); + } + if let Some(err) = method_not_available_response { web3_request.error_response.store(false, Ordering::Relaxed); @@ -982,61 +641,57 @@ impl Web3Rpcs { return Err(err.into()); } - if let Some(err) = last_provider_error { - return Err(err.into()); - } + // let min_block_needed = web3_request.min_block_needed(); + // let max_block_needed = web3_request.max_block_needed(); - let min_block_needed = web3_request.min_block_needed(); - let max_block_needed = web3_request.max_block_needed(); + // let num_conns = self.len(); + // let num_skipped = skip_rpcs.len(); - let num_conns = self.len(); - let num_skipped = skip_rpcs.len(); + // let head_block_num = watch_consensus_rpcs + // .borrow_and_update() + // .as_ref() + // .map(|x| x.head_block.number()); - let head_block_num = watch_consensus_rpcs - .borrow_and_update() - .as_ref() - .map(|x| x.head_block.number()); + // // TODO: error? warn? debug? trace? + // if head_block_num.is_none() { + // error!( + // min=?min_block_needed, + // max=?max_block_needed, + // head=?head_block_num, + // known=num_conns, + // method=%web3_request.request.method(), + // params=?web3_request.request.params(), + // "No servers synced", + // ); + // } else if head_block_num > max_block_needed { + // // we have synced past the needed block + // // TODO: log ranked rpcs + // // TODO: only log params in development + // error!( + // min=?min_block_needed, + // max=?max_block_needed, + // head=?head_block_num, + // known=%num_conns, + // method=%web3_request.request.method(), + // params=?web3_request.request.params(), + // "No archive servers synced", + // ); + // } else { + // // TODO: only log params in development + // // TODO: log ranked rpcs + // error!( + // min=?min_block_needed, + // max=?max_block_needed, + // head=?head_block_num, + // skipped=%num_skipped, + // known=%num_conns, + // method=%web3_request.request.method(), + // params=?web3_request.request.params(), + // "Requested data is not available", + // ); + // } - // TODO: error? warn? debug? trace? - if head_block_num.is_none() { - error!( - min=?min_block_needed, - max=?max_block_needed, - head=?head_block_num, - known=num_conns, - method=%web3_request.request.method(), - params=?web3_request.request.params(), - "No servers synced", - ); - } else if head_block_num > max_block_needed { - // we have synced past the needed block - // TODO: log ranked rpcs - // TODO: only log params in development - error!( - min=?min_block_needed, - max=?max_block_needed, - head=?head_block_num, - known=%num_conns, - method=%web3_request.request.method(), - params=?web3_request.request.params(), - "No archive servers synced", - ); - } else { - // TODO: only log params in development - // TODO: log ranked rpcs - error!( - min=?min_block_needed, - max=?max_block_needed, - head=?head_block_num, - skipped=%num_skipped, - known=%num_conns, - method=%web3_request.request.method(), - params=?web3_request.request.params(), - "Requested data is not available", - ); - } - - // TODO: what error code? + // TODO: what error code? what data? // cloudflare gives {"jsonrpc":"2.0","error":{"code":-32043,"message":"Requested data cannot be older than 128 blocks."},"id":1} Err(JsonRpcErrorData { message: "Requested data is not available".into(), @@ -1046,149 +701,150 @@ impl Web3Rpcs { .into()) } - /// be sure there is a timeout on this or it might loop forever - #[allow(clippy::too_many_arguments)] - pub async fn try_send_all_synced_connections( - self: &Arc, - web3_request: &Arc, - max_wait: Option, - error_level: Option, - max_sends: Option, - ) -> Web3ProxyResult { - let mut watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe(); + // /// be sure there is a timeout on this or it might loop forever + // #[allow(clippy::too_many_arguments)] + // pub async fn xxx_try_send_all_synced_connections( + // self: &Arc, + // web3_request: &Arc, + // max_wait: Option, + // error_level: Option, + // max_sends: Option, + // ) -> Web3ProxyResult { + // let mut watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe(); - // todo!() we are inconsistent with max_wait and web3_request.expires_at - let start = Instant::now(); + // // todo!() we are inconsistent with max_wait and web3_request.expires_at + // let start = Instant::now(); - loop { - if let Some(max_wait) = max_wait { - if start.elapsed() > max_wait { - break; - } - } + // loop { + // if let Some(max_wait) = max_wait { + // if start.elapsed() > max_wait { + // break; + // } + // } - match self - .all_connections(web3_request, max_sends, error_level) - .await - { - Ok(active_request_handles) => { - let mut only_backups_used = true; + // match self + // .all_connections(web3_request, max_sends, error_level) + // .await + // { + // Ok(active_request_handles) => { + // let mut only_backups_used = true; - web3_request - .backend_requests - .lock() - .extend(active_request_handles.iter().map(|x| { - let rpc = x.clone_connection(); + // web3_request + // .backend_requests + // .lock() + // .extend(active_request_handles.iter().map(|x| { + // let rpc = x.clone_connection(); - if !rpc.backup { - // TODO: even if a backup is included, it is possible the response is still from a primary connection. think about this more - only_backups_used = false; - } + // if !rpc.backup { + // // TODO: even if a backup is included, it is possible the response is still from a primary connection. think about this more + // only_backups_used = false; + // } - rpc - })); + // rpc + // })); - warn!("move this to where we turn RequestMetadata into a Stat"); - web3_request - .response_from_backup_rpc - .store(only_backups_used, Ordering::Relaxed); + // warn!("move this to where we turn RequestMetadata into a Stat"); + // web3_request + // .response_from_backup_rpc + // .store(only_backups_used, Ordering::Relaxed); - let x = self - .try_send_parallel_requests(active_request_handles, max_wait) - .await?; + // let x = self + // .try_send_parallel_requests(active_request_handles, max_wait) + // .await?; - // TODO: count the number of successes and possibly retry if there weren't enough + // // TODO: count the number of successes and possibly retry if there weren't enough - return Ok(x); - } - Err(None) => { - warn!( - ?self, - min_block_needed=?web3_request.min_block_needed(), - max_block_needed=?web3_request.max_block_needed(), - "No servers in sync on! Retrying", - ); + // return Ok(x); + // } + // Err(None) => { + // warn!( + // ?self, + // min_block_needed=?web3_request.min_block_needed(), + // max_block_needed=?web3_request.max_block_needed(), + // "No servers in sync on! Retrying", + // ); - // TODO: if this times out, i think we drop this - web3_request.no_servers.fetch_add(1, Ordering::Relaxed); + // // TODO: if this times out, i think we drop this + // web3_request.no_servers.fetch_add(1, Ordering::Relaxed); - let max_sleep = if let Some(max_wait) = max_wait { - start + max_wait - } else { - break; - }; + // let max_sleep = if let Some(max_wait) = max_wait { + // start + max_wait + // } else { + // break; + // }; - select! { - _ = sleep_until(max_sleep) => { - // rpcs didn't change and we have waited too long. break to return an error - warn!(?self, "timeout waiting for try_send_all_synced_connections!"); - break; - }, - _ = watch_consensus_rpcs.changed() => { - // consensus rpcs changed! - watch_consensus_rpcs.borrow_and_update(); + // select! { + // _ = sleep_until(max_sleep) => { + // // rpcs didn't change and we have waited too long. break to return an error + // warn!(?self, "timeout waiting for try_send_all_synced_connections!"); + // break; + // }, + // _ = watch_consensus_rpcs.changed() => { + // // consensus rpcs changed! + // watch_consensus_rpcs.borrow_and_update(); - // continue to try again - continue; - } - } - } - Err(Some(retry_at)) => { - web3_request.no_servers.fetch_add(1, Ordering::Relaxed); + // // continue to try again + // continue; + // } + // } + // } + // Err(Some(retry_at)) => { + // web3_request.no_servers.fetch_add(1, Ordering::Relaxed); - if let Some(max_wait) = max_wait { - if start.elapsed() > max_wait { - warn!( - ?self, - "All rate limits exceeded. And sleeping would take too long" - ); - break; - } + // if let Some(max_wait) = max_wait { + // if start.elapsed() > max_wait { + // warn!( + // ?self, + // "All rate limits exceeded. And sleeping would take too long" + // ); + // break; + // } - warn!("All rate limits exceeded. Sleeping"); + // warn!("All rate limits exceeded. Sleeping"); - // TODO: only make one of these sleep_untils + // // TODO: only make one of these sleep_untils - let break_at = start + max_wait; + // let break_at = start + max_wait; - if break_at <= retry_at { - select! { - _ = sleep_until(break_at) => {break} - _ = watch_consensus_rpcs.changed() => { - watch_consensus_rpcs.borrow_and_update(); - } - } - } else { - select! { - _ = sleep_until(retry_at) => {} - _ = watch_consensus_rpcs.changed() => { - watch_consensus_rpcs.borrow_and_update(); - } - } - } + // if break_at <= retry_at { + // select! { + // _ = sleep_until(break_at) => {break} + // _ = watch_consensus_rpcs.changed() => { + // watch_consensus_rpcs.borrow_and_update(); + // } + // } + // } else { + // select! { + // _ = sleep_until(retry_at) => {} + // _ = watch_consensus_rpcs.changed() => { + // watch_consensus_rpcs.borrow_and_update(); + // } + // } + // } - continue; - } else { - warn!(?self, "all rate limits exceeded"); - break; - } - } - } - } + // continue; + // } else { + // warn!(?self, "all rate limits exceeded"); + // break; + // } + // } + // } + // } - Err(Web3ProxyError::NoServersSynced) - } + // Err(Web3ProxyError::NoServersSynced) + // } #[allow(clippy::too_many_arguments)] pub async fn try_proxy_connection( &self, - web3_request: &Arc, + web3_request: Arc, ) -> Web3ProxyResult> { let proxy_mode = web3_request.proxy_mode(); match proxy_mode { ProxyMode::Debug | ProxyMode::Best => self.request_with_metadata(web3_request).await, ProxyMode::Fastest(_x) => todo!("Fastest"), + ProxyMode::Quorum(_x, _y) => todo!("Fastest"), ProxyMode::Versus => todo!("Versus"), } } diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 11552dfb..277b01b4 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -15,6 +15,7 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency}; use migration::sea_orm::DatabaseConnection; +use nanorand::tls::TlsWyRand; use nanorand::Rng; use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; use serde::ser::{SerializeStruct, Serializer}; @@ -31,6 +32,7 @@ use tracing::{debug, error, info, trace, warn, Level}; use url::Url; /// An active connection to a Web3 RPC server like geth or erigon. +/// TODO: smarter Default derive or move the channels around so they aren't part of this at all #[derive(Default)] pub struct Web3Rpc { pub name: String, @@ -229,7 +231,14 @@ impl Web3Rpc { Ok((new_connection, handle)) } + pub fn next_available(&self) -> Instant { + let hard_limit_until = *self.hard_limit_until.as_ref().unwrap().borrow(); + + hard_limit_until.max(Instant::now()) + } + /// sort by... + /// - rate limit (ascending) /// - backups last /// - tier (ascending) /// - block number (descending) @@ -237,7 +246,7 @@ impl Web3Rpc { /// TODO: should tier or block number take priority? /// TODO: should this return a struct that implements sorting traits? /// TODO: move this to consensus.rs - fn sort_on(&self, max_block: Option) -> (bool, Reverse, u32) { + fn sort_on(&self, max_block: Option) -> (Reverse, bool, Reverse, u32) { let mut head_block = self .head_block_sender .as_ref() @@ -252,14 +261,22 @@ impl Web3Rpc { let backup = self.backup; - (!backup, Reverse(head_block), tier) + let rate_limit_until = + (*self.hard_limit_until.as_ref().unwrap().borrow()).max(Instant::now()); + + ( + Reverse(rate_limit_until), + !backup, + Reverse(head_block), + tier, + ) } /// TODO: move this to consensus.rs pub fn sort_for_load_balancing_on( &self, max_block: Option, - ) -> ((bool, Reverse, u32), Duration) { + ) -> ((Reverse, bool, Reverse, u32), Duration) { let sort_on = self.sort_on(max_block); let weighted_peak_latency = self.weighted_peak_latency(); @@ -273,14 +290,14 @@ impl Web3Rpc { /// like sort_for_load_balancing, but shuffles tiers randomly instead of sorting by weighted_peak_latency /// TODO: move this to consensus.rs + /// TODO: this return type is too complex pub fn shuffle_for_load_balancing_on( &self, + rng: &mut TlsWyRand, max_block: Option, - ) -> ((bool, Reverse, u32), u8) { + ) -> ((Reverse, bool, Reverse, u32), u8) { let sort_on = self.sort_on(max_block); - let mut rng = nanorand::tls_rng(); - let r = rng.generate::(); (sort_on, r) diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index d6b84d8c..d26f0c97 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -173,6 +173,7 @@ impl OpenRequestHandle { /// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented /// depending on how things are locked, you might need to pass the provider in /// we take self to ensure this function only runs once + /// TODO: abandon ProviderError pub async fn request( self, ) -> Result, ProviderError> { @@ -212,12 +213,15 @@ impl OpenRequestHandle { (&self.rpc.http_url, &self.rpc.http_client) { let params: serde_json::Value = serde_json::to_value(params)?; + + // TODO: why recreate this? we should be able to just use the one from the user let request = jsonrpc::JsonRpcRequest::new( self.web3_request.id().into(), method.to_string(), params, ) .expect("request creation cannot fail"); + match client.post(url.clone()).json(&request).send().await { // TODO: threshold from configs Ok(response) => {