diff --git a/Cargo.lock b/Cargo.lock index e48f45e4..2e5ab8d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6590,7 +6590,7 @@ dependencies = [ [[package]] name = "web3_proxy" -version = "1.43.16" +version = "1.43.19" dependencies = [ "anyhow", "arc-swap", @@ -6672,7 +6672,7 @@ dependencies = [ [[package]] name = "web3_proxy_cli" -version = "1.43.16" +version = "1.43.19" dependencies = [ "env_logger", "parking_lot", diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 460569d5..18e8b800 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "web3_proxy" -version = "1.43.16" +version = "1.43.19" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 9ea4dcf1..cbf1107d 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -139,14 +139,13 @@ impl RankedRpcs { let num_synced = rpcs.len(); - // TODO: do we need real data in here? if we are calling from_rpcs, we probably don't even track their block - let mut rpc_data = HashMap::, ConsensusRpcData>::with_capacity(num_synced); - - for rpc in rpcs.iter().cloned() { - let data = ConsensusRpcData::new(&rpc, &head_block); - - rpc_data.insert(rpc, data); - } + let rpc_data = Default::default(); + // TODO: do we need real data in rpc_data? if we are calling from_rpcs, we probably don't even track their block + // let mut rpc_data = HashMap::, ConsensusRpcData>::with_capacity(num_synced); + // for rpc in rpcs.iter().cloned() { + // let data = ConsensusRpcData::new(&rpc, &head_block); + // rpc_data.insert(rpc, data); + // } let sort_mode = SortMethod::Shuffle; @@ -245,51 +244,67 @@ impl RankedRpcs { return None; } + let head_block = self.head_block.number(); + // these are bigger than we need, but how much does that matter? let mut inner = Vec::with_capacity(self.num_active_rpcs()); let mut outer = Vec::with_capacity(self.num_active_rpcs()); - // TODO: what if min is set to some future block? - let max_block_needed = web3_request - .max_block_needed() - .or_else(|| web3_request.head_block.as_ref().map(|x| x.number())); - let min_block_needed = web3_request.min_block_needed(); - - for rpc in &self.inner { - match self.rpc_will_work_eventually(rpc, min_block_needed, max_block_needed) { - ShouldWaitForBlock::NeverReady => continue, - ShouldWaitForBlock::Ready => inner.push(rpc.clone()), - ShouldWaitForBlock::Wait { .. } => outer.push(rpc.clone()), - } - } - - let mut rng = nanorand::tls_rng(); - - // TODO: use web3_request.start_instant? I think we want it to be now - let now = Instant::now(); + let max_block_needed = web3_request.max_block_needed(); match self.sort_mode { SortMethod::Shuffle => { - // we use shuffle instead of sort. we will compare weights when iterating RankedRpcsForRequest + // if we are shuffling, it is because we don't watch the head_blocks of the rpcs + // clone all of the rpcs + self.inner.clone_into(&mut inner); + + let mut rng = nanorand::tls_rng(); + + // we use shuffle instead of sort. we will compare weights during `RpcsForRequest::to_stream` + // TODO: use web3_request.start_instant? I think we want it to be as recent as possible + let now = Instant::now(); + inner.sort_by_cached_key(|x| { x.shuffle_for_load_balancing_on(max_block_needed, &mut rng, now) }); - outer.sort_by_cached_key(|x| { - x.shuffle_for_load_balancing_on(max_block_needed, &mut rng, now) - }); } SortMethod::Sort => { - // we use shuffle instead of sort. we will compare weights when iterating RankedRpcsForRequest + // TODO: what if min is set to some future block? + let min_block_needed = web3_request.min_block_needed(); + + // TODO: max lag from config + let recent_block_needed = head_block.saturating_sub(U64::from(5)); + + for rpc in &self.inner { + if self.has_block_data(rpc, recent_block_needed) { + match self.rpc_will_work_eventually(rpc, min_block_needed, max_block_needed) + { + ShouldWaitForBlock::NeverReady => continue, + ShouldWaitForBlock::Ready => inner.push(rpc.clone()), + ShouldWaitForBlock::Wait { .. } => outer.push(rpc.clone()), + } + } + } + + let now = Instant::now(); + + // we use shuffle instead of sort. we will compare weights during `RpcsForRequest::to_stream` inner.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed, now)); outer.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed, now)); } } - Some(RpcsForRequest { - inner, - outer, - request: web3_request.clone(), - }) + if inner.is_empty() && outer.is_empty() { + warn!(?inner, ?outer, %web3_request, %head_block, "no rpcs for request"); + None + } else { + trace!(?inner, ?outer, %web3_request, "for_request"); + Some(RpcsForRequest { + inner, + outer, + request: web3_request.clone(), + }) + } } pub fn all(&self) -> &[Arc] { @@ -1001,6 +1016,7 @@ impl RpcsForRequest { stream! { loop { let mut earliest_retry_at = None; + let mut wait_for_sync = None; // first check the inners // TODO: DRY @@ -1031,6 +1047,14 @@ impl RpcsForRequest { earliest_retry_at = earliest_retry_at.min(Some(retry_at)); continue; } + Ok(OpenRequestResult::Lagged(..)) => { + trace!("{} is lagged. will not work now", best_rpc); + // this will probably always be the same block, right? + if wait_for_sync.is_none() { + wait_for_sync = Some(best_rpc); + } + continue; + } Ok(OpenRequestResult::NotReady) => { // TODO: log a warning? emit a stat? trace!("best_rpc not ready: {}", best_rpc); @@ -1068,6 +1092,7 @@ impl RpcsForRequest { } } + // TODO: if block_needed, do something with it here. not sure how best to subscribe if let Some(retry_at) = earliest_retry_at { if self.request.expire_instant <= retry_at { break; diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 2bfefcbc..0baf07ca 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -595,139 +595,6 @@ impl Web3Rpcs { .into()) } - // /// be sure there is a timeout on this or it might loop forever - // #[allow(clippy::too_many_arguments)] - // pub async fn xxx_try_send_all_synced_connections( - // self: &Arc, - // web3_request: &Arc, - // max_wait: Option, - // error_level: Option, - // max_sends: Option, - // ) -> Web3ProxyResult { - // let mut watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe(); - - // // todo!() we are inconsistent with max_wait and web3_request.expires_at - // let start = Instant::now(); - - // loop { - // if let Some(max_wait) = max_wait { - // if start.elapsed() > max_wait { - // break; - // } - // } - - // match self - // .all_connections(web3_request, max_sends, error_level) - // .await - // { - // Ok(active_request_handles) => { - // let mut only_backups_used = true; - - // web3_request - // .backend_requests - // .lock() - // .extend(active_request_handles.iter().map(|x| { - // let rpc = x.clone_connection(); - - // if !rpc.backup { - // // TODO: even if a backup is included, it is possible the response is still from a primary connection. think about this more - // only_backups_used = false; - // } - - // rpc - // })); - - // warn!("move this to where we turn RequestMetadata into a Stat"); - // web3_request - // .response_from_backup_rpc - // .store(only_backups_used, Ordering::Relaxed); - - // let x = self - // .try_send_parallel_requests(active_request_handles, max_wait) - // .await?; - - // // TODO: count the number of successes and possibly retry if there weren't enough - - // return Ok(x); - // } - // Err(None) => { - // warn!( - // ?self, - // min_block_needed=?web3_request.min_block_needed(), - // max_block_needed=?web3_request.max_block_needed(), - // "No servers in sync on! Retrying", - // ); - - // // TODO: if this times out, i think we drop this - // web3_request.no_servers.fetch_add(1, Ordering::Relaxed); - - // let max_sleep = if let Some(max_wait) = max_wait { - // start + max_wait - // } else { - // break; - // }; - - // select! { - // _ = sleep_until(max_sleep) => { - // // rpcs didn't change and we have waited too long. break to return an error - // warn!(?self, "timeout waiting for try_send_all_synced_connections!"); - // break; - // }, - // _ = watch_consensus_rpcs.changed() => { - // // consensus rpcs changed! - // watch_consensus_rpcs.borrow_and_update(); - - // // continue to try again - // continue; - // } - // } - // } - // Err(Some(retry_at)) => { - // web3_request.no_servers.fetch_add(1, Ordering::Relaxed); - - // if let Some(max_wait) = max_wait { - // if start.elapsed() > max_wait { - // warn!( - // ?self, - // "All rate limits exceeded. And sleeping would take too long" - // ); - // break; - // } - - // warn!("All rate limits exceeded. Sleeping"); - - // // TODO: only make one of these sleep_untils - - // let break_at = start + max_wait; - - // if break_at <= retry_at { - // select! { - // _ = sleep_until(break_at) => {break} - // _ = watch_consensus_rpcs.changed() => { - // watch_consensus_rpcs.borrow_and_update(); - // } - // } - // } else { - // select! { - // _ = sleep_until(retry_at) => {} - // _ = watch_consensus_rpcs.changed() => { - // watch_consensus_rpcs.borrow_and_update(); - // } - // } - // } - - // continue; - // } else { - // warn!(?self, "all rate limits exceeded"); - // break; - // } - // } - // } - // } - - // Err(Web3ProxyError::NoServersSynced) - // } - #[allow(clippy::too_many_arguments)] pub async fn try_proxy_connection( &self, diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index f2fd3cb8..61c2b979 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -252,7 +252,8 @@ impl Web3Rpc { /// TODO: tests on this! /// TODO: should tier or block number take priority? /// TODO: should this return a struct that implements sorting traits? - /// TODO: move this to consensus.rs + /// TODO: better return type! + /// TODO: move this to consensus.rs? fn sort_on( &self, max_block: Option, @@ -281,6 +282,7 @@ impl Web3Rpc { /// This is useful when you care about latency over spreading the load /// For example, use this when selecting rpcs for balanced_rpcs /// TODO: move this to consensus.rs? + /// TODO: better return type! pub fn sort_for_load_balancing_on( &self, max_block: Option, @@ -292,9 +294,13 @@ impl Web3Rpc { let sort_on = self.sort_on(max_block, start_instant); // TODO: i think median is better than weighted at this point. we save checking weighted for the very end - let median_latency = OrderedFloat::from(self.median_latency.as_ref().unwrap().seconds()); + let median_latency = self + .median_latency + .as_ref() + .map(|x| x.seconds()) + .unwrap_or_default(); - let x = (sort_on, median_latency); + let x = (sort_on, OrderedFloat::from(median_latency)); trace!("sort_for_load_balancing {}: {:?}", self, x); @@ -304,8 +310,8 @@ impl Web3Rpc { /// like sort_for_load_balancing, but shuffles tiers randomly instead of sorting by weighted_peak_latency /// This is useful when you care about spreading the load over latency. /// For example, use this when selecting rpcs for protected_rpcs - /// TODO: move this to consensus.rs - /// TODO: this return type is too complex + /// TODO: move this to consensus.rs? + /// TODO: better return type pub fn shuffle_for_load_balancing_on( &self, max_block: Option, @@ -458,7 +464,8 @@ impl Web3Rpc { } true } else { - false + // do we want true or false here? false is accurate, but it stops the proxy from sending any requests so I think we want to lie + true } } @@ -872,8 +879,6 @@ impl Web3Rpc { break; } - // this should always work - // todo!("this has a bug! it gets full very quickly when no one is subscribed!"); pending_txid_firehose.send(x).await; } } else { @@ -1017,6 +1022,14 @@ impl Web3Rpc { sleep_until(retry_at).await; } + Ok(OpenRequestResult::Lagged(now_synced_f)) => { + select! { + _ = now_synced_f => {} + _ = sleep_until(web3_request.expire_instant) => { + break; + } + } + } Ok(OpenRequestResult::NotReady) => { // TODO: when can this happen? log? emit a stat? trace!("{} has no handle ready", self); @@ -1119,11 +1132,54 @@ impl Web3Rpc { web3_request: &Arc, error_handler: Option, ) -> Web3ProxyResult { + // TODO: check a boolean set by health checks? // TODO: if websocket is reconnecting, return an error? - // TODO: if this server can't handle this request because it isn't synced, return an error + // make sure this block has the oldest block that this request needs + // TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check + if let Some(block_needed) = web3_request.min_block_needed() { + if !self.has_block_data(block_needed) { + return Ok(OpenRequestResult::NotReady); + } + } - // check shared rate limits + // make sure this block has the newest block that this request needs + // TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check + if let Some(block_needed) = web3_request.max_block_needed() { + if !self.has_block_data(block_needed) { + let clone = self.clone(); + let expire_instant = web3_request.expire_instant; + + let synced_f = async move { + let mut head_block_receiver = + clone.head_block_sender.as_ref().unwrap().subscribe(); + + // TODO: if head_block is far behind block_needed, retrurn now + + loop { + select! { + _ = head_block_receiver.changed() => { + if let Some(head_block) = head_block_receiver.borrow_and_update().clone() { + if head_block.number() >= block_needed { + // the block we needed has arrived! + break; + } + } + } + _ = sleep_until(expire_instant) => { + return Err(Web3ProxyError::NoServersSynced); + } + } + } + + Ok(()) + }; + + return Ok(OpenRequestResult::Lagged(Box::pin(synced_f))); + } + } + + // check rate limits match self.try_throttle().await? { RedisRateLimitResult::Allowed(_) => {} RedisRateLimitResult::RetryAt(retry_at, _) => { diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 9e7298a6..ae4e8371 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -10,21 +10,26 @@ use entities::revert_log; use entities::sea_orm_active_enums::Method; use ethers::providers::ProviderError; use ethers::types::{Address, Bytes}; +use futures::Future; use http::StatusCode; use migration::sea_orm::{self, ActiveEnum, ActiveModelTrait}; use nanorand::Rng; use serde_json::json; +use std::pin::Pin; use std::sync::atomic; use std::sync::Arc; use tokio::time::{Duration, Instant}; use tracing::{debug, error, info, trace, warn, Level}; -#[derive(Debug, From)] +#[derive(From)] pub enum OpenRequestResult { Handle(OpenRequestHandle), /// Unable to start a request. Retry at the given time. RetryAt(Instant), - /// Unable to start a request because no servers are synced + /// The rpc are not synced, but they should be soon. + /// You should wait for the given block number. + Lagged(Pin> + Send>>), + /// Unable to start a request because no servers are synced or the necessary data has been pruned NotReady, } diff --git a/web3_proxy_cli/Cargo.toml b/web3_proxy_cli/Cargo.toml index 98f3ae51..903145aa 100644 --- a/web3_proxy_cli/Cargo.toml +++ b/web3_proxy_cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "web3_proxy_cli" -version = "1.43.16" +version = "1.43.19" edition = "2021" default-run = "web3_proxy_cli"