From 86f6b167616aad2b89923c335e63e2efcfc57941 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 7 Dec 2022 22:54:38 -0800 Subject: [PATCH] another pass at server selection --- TODO.md | 3 + web3_proxy/src/app/mod.rs | 1 + web3_proxy/src/rpcs/connection.rs | 15 +- web3_proxy/src/rpcs/connections.rs | 266 ++++++++++++++--------------- web3_proxy/src/rpcs/request.rs | 2 +- 5 files changed, 151 insertions(+), 136 deletions(-) diff --git a/TODO.md b/TODO.md index c5569070..f65cee06 100644 --- a/TODO.md +++ b/TODO.md @@ -254,6 +254,9 @@ These are roughly in order of completition - need to do all the connections in parallel with spawns - [x] add block timestamp to the /status page - [x] be sure to save the timestamp in a way that our request routing logic can make use of it +- [x] node selection still needs improvements. we still send to syncing nodes if they are close + - try consensus heads first! only if that is empty should we try others. and we should try them sorted by block height and then randomly chosen from there +- [ ] having the whole block in status is very verbose. trim it down - [ ] `cost estimate` script - sum bytes and number of requests. prompt hosting costs. divide - [ ] `stat delay` script diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 6d1ab5b0..30417d9c 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -189,6 +189,7 @@ pub async fn get_migrated_db( min_connections: u32, max_connections: u32, ) -> anyhow::Result { + // TODO: this seems to fail silently let db_conn = get_db(db_url, min_connections, max_connections).await?; let db_backend = db_conn.get_database_backend(); diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 006b43c9..48a23ed4 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -310,7 +310,7 @@ impl Web3Connection { let oldest_block_num = head_block_num.saturating_sub(block_data_limit); - needed_block_num >= &oldest_block_num + *needed_block_num >= oldest_block_num } /// reconnect to the provider. errors are retried forever with exponential backoff with jitter. @@ -1017,6 +1017,19 @@ impl Web3Connection { // TODO? ready_provider: Option<&Arc>, allow_not_ready: bool, ) -> anyhow::Result { + // TODO: think more about this read block + if !allow_not_ready + && self + .provider_state + .read() + .await + .provider(allow_not_ready) + .await + .is_none() + { + return Ok(OpenRequestResult::NotReady); + } + // check rate limits if let Some(ratelimiter) = self.hard_limit.as_ref() { // TODO: how should we know if we should set expire or not? diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index dbca28cf..5a79e868 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -10,6 +10,7 @@ use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::rpcs::transactions::TxStatus; +use anyhow::Context; use arc_swap::ArcSwap; use counter::Counter; use derive_more::From; @@ -25,6 +26,7 @@ use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; use serde_json::value::RawValue; +use std::collections::BTreeMap; use std::fmt; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -371,163 +373,159 @@ impl Web3Connections { skip: &[Arc], min_block_needed: Option<&U64>, ) -> anyhow::Result { + let usable_rpcs_by_head_num: BTreeMap>> = + if let Some(min_block_needed) = min_block_needed { + // need a potentially old block. check all the rpcs + // TODO: we are going to be checking "has_block_data" a lot now + let mut m = BTreeMap::new(); + + for x in self + .conns + .values() + .filter(|x| !skip.contains(x)) + .filter(|x| x.has_block_data(min_block_needed)) + .cloned() + { + let x_head_block = x.head_block.read().clone(); + + match x_head_block { + None => continue, + Some(x_head) => { + m.entry(x_head.number()).or_insert_with(Vec::new).push(x); + } + } + } + + m + } else { + // need latest. filter the synced rpcs + // TODO: double check has_block_data? + let synced_connections = self.synced_connections.load(); + + let head_num = match synced_connections.head_block.as_ref() { + None => return Ok(OpenRequestResult::NotReady), + Some(x) => x.number(), + }; + + let c: Vec<_> = synced_connections + .conns + .iter() + .filter(|x| !skip.contains(x)) + .cloned() + .collect(); + + BTreeMap::from([(head_num, c)]) + }; + let mut earliest_retry_at = None; - let usable_rpcs: Vec> = if let Some(min_block_needed) = min_block_needed - { - // need a potentially old block. check all the rpcs - // TODO: we are going to be checking "has_block_data" a lot now - self.conns - .values() - .filter(|x| !skip.contains(x)) - .filter(|x| x.has_block_data(min_block_needed)) - .cloned() - .collect() - } else { - // need latest. filter the synced rpcs - // TODO: double check has_block_data? - self.synced_connections - .load() - .conns + for usable_rpcs in usable_rpcs_by_head_num.into_values().rev() { + let mut minimum = f64::MAX; + + // we sort on a combination of values. cache them here so that we don't do this math multiple times. + let mut available_request_map: HashMap<_, f64> = usable_rpcs .iter() - .filter(|x| !skip.contains(x)) - .cloned() - .collect() - }; + .map(|rpc| { + // TODO: are active requests what we want? do we want a counter for requests in the last second + any actives longer than that? + // TODO: get active requests out of redis (that's definitely too slow) + // TODO: do something with hard limit instead? (but that is hitting redis too much) + let active_requests = rpc.active_requests() as f64; + let soft_limit = rpc.soft_limit as f64 * rpc.weight; - match usable_rpcs.len() { - 0 => { - warn!( - "no rpcs @ {:?}: {:?} (skipped {:?})", - min_block_needed, - self.synced_connections.load(), - skip.iter().map(|x| &x.name).collect::>() - ); - // TODO: what should happen here? automatic retry? - // TODO: more detailed error - return Ok(OpenRequestResult::NotReady); + // TODO: maybe store weight as the percentile + let available_requests = soft_limit - active_requests; + + trace!("available requests on {}: {}", rpc, available_requests); + + // under heavy load, it is possible for even our best server to be negative + minimum = available_requests.min(minimum); + + // TODO: clone needed? + (rpc, available_requests) + }) + .collect(); + + trace!("minimum available requests: {}", minimum); + + // weights can't have negative numbers. shift up if any are negative + if minimum < 0.0 { + available_request_map = available_request_map + .into_iter() + .map(|(rpc, weight)| { + // TODO: is simple addition the right way to shift everyone? + // TODO: probably want something non-linear + // minimum is negative, so we subtract + let x = weight - minimum; + + (rpc, x) + }) + .collect() } - 1 => { - let rpc = usable_rpcs.get(0).expect("len is 1"); - // TODO: try or wait for a request handle? - let handle = rpc - .wait_for_request_handle(authorization, Duration::from_secs(60), false) - .await?; + let sorted_rpcs = { + if usable_rpcs.len() == 1 { + // TODO: return now instead? + vec![usable_rpcs.get(0).expect("there should be 1")] + } else { + let mut rng = thread_fast_rng::thread_fast_rng(); - return Ok(OpenRequestResult::Handle(handle)); - } - _ => { - // anything else and we need to pick with a weighted random chooser + // TODO: sort or weight the non-archive nodes to be first + usable_rpcs + .choose_multiple_weighted(&mut rng, usable_rpcs.len(), |rpc| { + *available_request_map + .get(rpc) + .expect("rpc should always be in the weight map") + }) + .unwrap() + .collect::>() + } + }; + + // now that the rpcs are sorted, try to get an active request handle for one of them + for best_rpc in sorted_rpcs.into_iter() { + // increment our connection counter + match best_rpc.try_request_handle(authorization, false).await { + Ok(OpenRequestResult::Handle(handle)) => { + // // trace!("next server on {:?}: {:?}", self, best_rpc); + return Ok(OpenRequestResult::Handle(handle)); + } + Ok(OpenRequestResult::RetryAt(retry_at)) => { + earliest_retry_at = earliest_retry_at.min(Some(retry_at)); + } + Ok(OpenRequestResult::NotReady) => { + // TODO: log a warning? + } + Err(err) => { + // TODO: log a warning? + warn!("No request handle for {}. err={:?}", best_rpc, err) + } + } } } - let mut minimum = f64::MAX; - - // we sort on a bunch of values. cache them here so that we don't do this math multiple times. - let available_request_map: HashMap<_, f64> = usable_rpcs - .iter() - .map(|rpc| { - // TODO: are active requests what we want? do we want a counter for requests in the last second + any actives longer than that? - // TODO: get active requests out of redis (that's definitely too slow) - // TODO: do something with hard limit instead? (but that is hitting redis too much) - let active_requests = rpc.active_requests() as f64; - let soft_limit = rpc.soft_limit as f64 * rpc.weight; - - // TODO: maybe store weight as the percentile - let available_requests = soft_limit - active_requests; - - trace!("available requests on {}: {}", rpc, available_requests); - - // under heavy load, it is possible for even our best server to be negative - minimum = available_requests.min(minimum); - - (rpc.clone(), available_requests) - }) - .collect(); - - trace!("minimum available requests: {}", minimum); - - // weights can't have negative numbers. shift up if any are negative - let available_request_map: HashMap<_, f64> = if minimum < 0.0 { - available_request_map - .into_iter() - .map(|(rpc, weight)| { - // TODO: is simple addition the right way to shift everyone? - // TODO: probably want something non-linear - // minimum is negative, so we subtract - let x = weight - minimum; - - (rpc, x) - }) - .collect() - } else { - available_request_map - }; - - let sorted_rpcs = { - if usable_rpcs.len() == 1 { - vec![usable_rpcs.get(0).expect("there should be 1")] - } else { - let mut rng = thread_fast_rng::thread_fast_rng(); - - // TODO: sort or weight the non-archive nodes to be first - usable_rpcs - .choose_multiple_weighted(&mut rng, usable_rpcs.len(), |rpc| { - *available_request_map - .get(rpc) - .expect("rpc should always be in the weight map") - }) - .unwrap() - .collect::>() - } - }; - - // now that the rpcs are sorted, try to get an active request handle for one of them - for rpc in sorted_rpcs.iter() { - // increment our connection counter - match rpc.try_request_handle(authorization, false).await { - Ok(OpenRequestResult::Handle(handle)) => { - // // trace!("next server on {:?}: {:?}", self, rpc); - return Ok(OpenRequestResult::Handle(handle)); - } - Ok(OpenRequestResult::RetryAt(retry_at)) => { - earliest_retry_at = earliest_retry_at.min(Some(retry_at)); - } - Ok(OpenRequestResult::NotReady) => { - // TODO: log a warning? - } - Err(err) => { - // TODO: log a warning? - warn!("No request handle for {}. err={:?}", rpc, err) - } - } + if let Some(request_metadata) = request_metadata { + request_metadata.no_servers.fetch_add(1, Ordering::Release); } match earliest_retry_at { None => { // none of the servers gave us a time to retry at - if let Some(request_metadata) = request_metadata { - request_metadata.no_servers.fetch_add(1, Ordering::Release); - } + // TODO: bring this back? // we could return an error here, but maybe waiting a second will fix the problem // TODO: configurable max wait? the whole max request time, or just some portion? - let handle = sorted_rpcs - .get(0) - .expect("at least 1 is available") - .wait_for_request_handle(authorization, Duration::from_secs(3), false) - .await?; + // let handle = sorted_rpcs + // .get(0) + // .expect("at least 1 is available") + // .wait_for_request_handle(authorization, Duration::from_secs(3), false) + // .await?; + // Ok(OpenRequestResult::Handle(handle)) - Ok(OpenRequestResult::Handle(handle)) + Ok(OpenRequestResult::NotReady) } Some(earliest_retry_at) => { warn!("no servers on {:?}! {:?}", self, earliest_retry_at); - if let Some(request_metadata) = request_metadata { - request_metadata.no_servers.fetch_add(1, Ordering::Release); - } - Ok(OpenRequestResult::RetryAt(earliest_retry_at)) } } diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index e71ad04a..6ef3c97a 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -8,7 +8,7 @@ use entities::revert_log; use entities::sea_orm_active_enums::Method; use ethers::providers::{HttpClientError, ProviderError, WsClientError}; use ethers::types::{Address, Bytes}; -use log::{debug, error, info, trace, warn, Level}; +use log::{debug, error, trace, warn, Level}; use metered::metered; use metered::HitCount; use metered::ResponseTime;