From f09d836dfe52fd1f633a589d00dc2519d43173dd Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 25 Nov 2022 07:41:53 +0000 Subject: [PATCH] better weights --- web3_proxy/src/rpcs/connection.rs | 8 +- web3_proxy/src/rpcs/connections.rs | 323 +++++++++++++++++++++++------ web3_proxy/src/rpcs/request.rs | 4 +- 3 files changed, 268 insertions(+), 67 deletions(-) diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index dcafa05d..42c3dd47 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -841,14 +841,16 @@ impl Web3Connection { if retry_at > max_wait { // break now since we will wait past our maximum wait time + // TODO: don't use anyhow. use specific error type return Err(anyhow::anyhow!("timeout waiting for request handle")); } sleep_until(retry_at).await; } - Ok(OpenRequestResult::RetryNever) => { + Ok(OpenRequestResult::NotSynced) => { // TODO: when can this happen? log? emit a stat? // TODO: subscribe to the head block on this // TODO: sleep how long? maybe just error? + // TODO: don't use anyhow. use specific error type return Err(anyhow::anyhow!("unable to retry for request handle")); } Err(err) => return Err(err), @@ -864,7 +866,7 @@ impl Web3Connection { if !self.has_provider().await { // TODO: emit a stat? // TODO: wait until we have a provider? - return Ok(OpenRequestResult::RetryNever); + return Ok(OpenRequestResult::NotSynced); } // check rate limits @@ -884,7 +886,7 @@ impl Web3Connection { return Ok(OpenRequestResult::RetryAt(retry_at)); } RedisRateLimitResult::RetryNever => { - return Ok(OpenRequestResult::RetryNever); + return Ok(OpenRequestResult::NotSynced); } } }; diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index ad48089a..107e6f8e 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -19,7 +19,7 @@ use futures::future::{join_all, try_join_all}; use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::HashMap; -use log::{error, info, warn, Level}; +use log::{error, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; use moka::future::{Cache, ConcurrentCacheExt}; use petgraph::graphmap::DiGraphMap; @@ -394,10 +394,17 @@ impl Web3Connections { let min_block_needed = if let Some(min_block_needed) = min_block_needed { *min_block_needed } else { - // TODO: error or OpenRequestResult::NotSynced? and then we turn that into a 502? - self.head_block_num().context("no servers are synced")? + match self.head_block_num() { + Some(x) => x, + None => { + trace!("no head block on {:?}", self); + return Ok(OpenRequestResult::NotSynced); + } + } }; + trace!("min block needed: {}", min_block_needed); + // filter the synced rpcs // TODO: we are going to be checking "has_block_data" a lot now let head_rpcs: Vec> = self @@ -410,68 +417,90 @@ impl Web3Connections { .cloned() .collect(); - if head_rpcs.is_empty() { - // TODO: what should happen here? automatic retry? - // TODO: more detailed error - return Err(anyhow::anyhow!("no servers are synced")); + match head_rpcs.len() { + 0 => { + trace!("no head rpcs: {:?}", self); + // TODO: what should happen here? automatic retry? + // TODO: more detailed error + return Ok(OpenRequestResult::NotSynced); + } + 1 => { + let rpc = head_rpcs.get(0).expect("len is 1"); + + // TODO: try or wait for a request handle? + let handle = rpc + .wait_for_request_handle(authorization, Duration::from_secs(60)) + .await?; + + return Ok(OpenRequestResult::Handle(handle)); + } + _ => { + // anything else and we need to pick with a weighted random chooser + } } - let mut minimum = 0.0; + let mut minimum = f64::MAX; // we sort on a bunch of values. cache them here so that we don't do this math multiple times. - let weight_map: HashMap<_, f64> = head_rpcs + let available_request_map: HashMap<_, f64> = head_rpcs .iter() .map(|rpc| { - // TODO: put this on the rpc object instead? - let weight = rpc.weight; - // TODO: are active requests what we want? do we want a counter for requests in the last second + any actives longer than that? - // TODO: get active requests out of redis? - // TODO: do something with hard limit instead? - let active_requests = rpc.active_requests(); - let soft_limit = rpc.soft_limit; + // TODO: get active requests out of redis (that's definitely too slow) + // TODO: do something with hard limit instead? (but that is hitting redis too much) + let active_requests = rpc.active_requests() as f64; + let soft_limit = rpc.soft_limit as f64 * rpc.weight; // TODO: maybe store weight as the percentile - let available_requests = soft_limit as f64 * weight - active_requests as f64; + let available_requests = soft_limit - active_requests; - if available_requests < 0.0 { - minimum = available_requests.min(minimum); - } + trace!("available requests on {}: {}", rpc, available_requests); + + // under heavy load, it is possible for even our best server to be negative + minimum = available_requests.min(minimum); (rpc.clone(), available_requests) }) .collect(); - // we can't have negative numbers. shift up if any are negative - let weight_map: HashMap<_, f64> = if minimum < 0.0 { - weight_map - } else { - weight_map + trace!("minimum available requests: {}", minimum); + + // weights can't have negative numbers. shift up if any are negative + let available_request_map: HashMap<_, f64> = if minimum < 0.0 { + available_request_map .into_iter() .map(|(rpc, weight)| { // TODO: is simple addition the right way to shift everyone? - let x = weight + minimum; + // TODO: probably want something non-linear + // minimum is negative, so we subtract + let x = weight - minimum; (rpc, x) }) .collect() + } else { + available_request_map }; let sorted_rpcs = { - let mut rng = thread_fast_rng::thread_fast_rng(); + if head_rpcs.len() == 1 { + vec![head_rpcs.get(0).expect("there should be 1")] + } else { + let mut rng = thread_fast_rng::thread_fast_rng(); - head_rpcs - .choose_multiple_weighted(&mut rng, head_rpcs.len(), |rpc| { - *weight_map - .get(rpc) - .expect("rpc should always be in the weight map") - }) - .unwrap() - .collect::>() + head_rpcs + .choose_multiple_weighted(&mut rng, head_rpcs.len(), |rpc| { + *available_request_map + .get(rpc) + .expect("rpc should always be in the weight map") + }) + .unwrap() + .collect::>() + } }; // now that the rpcs are sorted, try to get an active request handle for one of them - for rpc in sorted_rpcs.into_iter() { + for rpc in sorted_rpcs.iter() { // increment our connection counter match rpc.try_request_handle(authorization).await { Ok(OpenRequestResult::Handle(handle)) => { @@ -481,7 +510,7 @@ impl Web3Connections { Ok(OpenRequestResult::RetryAt(retry_at)) => { earliest_retry_at = earliest_retry_at.min(Some(retry_at)); } - Ok(OpenRequestResult::RetryNever) => { + Ok(OpenRequestResult::NotSynced) => { // TODO: log a warning? } Err(err) => { @@ -493,12 +522,20 @@ impl Web3Connections { match earliest_retry_at { None => { + // none of the servers gave us a time to retry at if let Some(request_metadata) = request_metadata { request_metadata.no_servers.fetch_add(1, Ordering::Release); } - // TODO: error works, but maybe we should just wait a second? - Err(anyhow::anyhow!("no servers synced")) + // we could return an error here, but maybe waiting a second will fix the problem + // TODO: configurable max wait? the whole max request time, or just some portion? + let handle = sorted_rpcs + .get(0) + .expect("at least 1 is available") + .wait_for_request_handle(authorization, Duration::from_secs(3)) + .await?; + + Ok(OpenRequestResult::Handle(handle)) } Some(earliest_retry_at) => { warn!("no servers on {:?}! {:?}", self, earliest_retry_at); @@ -538,7 +575,7 @@ impl Web3Connections { earliest_retry_at = earliest_retry_at.min(Some(retry_at)); } Ok(OpenRequestResult::Handle(handle)) => selected_rpcs.push(handle), - Ok(OpenRequestResult::RetryNever) => { + Ok(OpenRequestResult::NotSynced) => { warn!("no request handle for {}", connection) } Err(err) => { @@ -672,7 +709,7 @@ impl Web3Connections { continue; } - OpenRequestResult::RetryNever => { + OpenRequestResult::NotSynced => { warn!("No server handles! {:?}", self); if let Some(request_metadata) = request_metadata { @@ -811,11 +848,11 @@ mod tests { use super::*; use crate::rpcs::{blockchain::BlockId, provider::Web3Provider}; use ethers::types::Block; - use log::LevelFilter; + use log::{trace, LevelFilter}; use parking_lot::RwLock; #[tokio::test] - async fn test_server_selection() { + async fn test_server_selection_by_height() { // TODO: do this better. can test_env_logger and tokio test be stacked? let _ = env_logger::builder() .filter_level(LevelFilter::Error) @@ -960,11 +997,18 @@ mod tests { ); // best_synced_backend_connection requires servers to be synced with the head block - // TODO: should this be an error, or a OpenRequestResult::NotSynced? - assert!(conns - .best_synced_backend_connection(&authorization, None, &[], lagged_block.number.as_ref()) - .await - .is_err()); + assert!(matches!( + conns + .best_synced_backend_connection( + &authorization, + None, + &[], + lagged_block.number.as_ref() + ) + .await + .unwrap(), + OpenRequestResult::NotSynced + )); // add lagged blocks to the conns. both servers should be allowed conns.save_block(&lagged_block, true).await.unwrap(); @@ -1011,22 +1055,177 @@ mod tests { assert_eq!(conns.num_synced_rpcs(), 1); - // TODO: is_ok is too simple. make sure its head_rpc - assert!(conns - .best_synced_backend_connection(&authorization, None, &[], None) + assert!(matches!( + conns + .best_synced_backend_connection(&authorization, None, &[], None) + .await, + Ok(OpenRequestResult::Handle(_)) + )); + + assert!(matches!( + conns + .best_synced_backend_connection(&authorization, None, &[], Some(&0.into())) + .await, + Ok(OpenRequestResult::Handle(_)) + )); + + assert!(matches!( + conns + .best_synced_backend_connection(&authorization, None, &[], Some(&1.into())) + .await, + Ok(OpenRequestResult::Handle(_)) + )); + + // future block should not get a handle + assert!(matches!( + conns + .best_synced_backend_connection(&authorization, None, &[], Some(&2.into())) + .await, + Ok(OpenRequestResult::NotSynced) + )); + } + + #[tokio::test] + async fn test_server_selection_by_archive() { + // TODO: do this better. can test_env_logger and tokio test be stacked? + let _ = env_logger::builder() + .filter_level(LevelFilter::Error) + .filter_module("web3_proxy", LevelFilter::Trace) + .is_test(true) + .try_init(); + + let head_block: Block = Block { + hash: Some(H256::random()), + number: Some(1_000_000.into()), + parent_hash: H256::random(), + ..Default::default() + }; + + // TODO: write a impl From for Block -> BlockId? + let head_block_id = BlockId { + hash: head_block.hash.unwrap(), + num: head_block.number.unwrap(), + }; + + let head_block = Arc::new(head_block); + + let pruned_rpc = Web3Connection { + name: "pruned".to_string(), + display_name: None, + url: "ws://example.com/pruned".to_string(), + http_client: None, + active_requests: 0.into(), + frontend_requests: 0.into(), + internal_requests: 0.into(), + provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), + hard_limit: None, + soft_limit: 3_000, + block_data_limit: 64.into(), + weight: 1.0, + head_block_id: RwLock::new(Some(head_block_id.clone())), + open_request_handle_metrics: Arc::new(Default::default()), + }; + + let archive_rpc = Web3Connection { + name: "archive".to_string(), + display_name: None, + url: "ws://example.com/archive".to_string(), + http_client: None, + active_requests: 0.into(), + frontend_requests: 0.into(), + internal_requests: 0.into(), + provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), + hard_limit: None, + soft_limit: 1_000, + block_data_limit: u64::MAX.into(), + // TODO: does weight = 0 work? + weight: 0.01, + head_block_id: RwLock::new(Some(head_block_id)), + open_request_handle_metrics: Arc::new(Default::default()), + }; + + assert!(pruned_rpc.has_block_data(&head_block.number.unwrap())); + assert!(archive_rpc.has_block_data(&head_block.number.unwrap())); + assert!(!pruned_rpc.has_block_data(&1.into())); + assert!(archive_rpc.has_block_data(&1.into())); + + let pruned_rpc = Arc::new(pruned_rpc); + let archive_rpc = Arc::new(archive_rpc); + + let conns = HashMap::from([ + (pruned_rpc.name.clone(), pruned_rpc.clone()), + (archive_rpc.name.clone(), archive_rpc.clone()), + ]); + + let conns = Web3Connections { + conns, + synced_connections: Default::default(), + pending_transactions: Cache::builder() + .max_capacity(10) + .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), + block_hashes: Cache::builder() + .max_capacity(10) + .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), + block_numbers: Cache::builder() + .max_capacity(10) + .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), + blockchain_graphmap: Default::default(), + min_head_rpcs: 1, + min_sum_soft_limit: 3_000, + }; + + let authorization = Arc::new(Authorization::internal(None).unwrap()); + + let (head_block_sender, _head_block_receiver) = + watch::channel::(Default::default()); + let mut connection_heads = HashMap::new(); + + conns + .process_block_from_rpc( + &authorization, + &mut connection_heads, + Some(head_block.clone()), + pruned_rpc.clone(), + &head_block_sender, + &None, + ) .await - .is_ok()); - assert!(conns - .best_synced_backend_connection(&authorization, None, &[], Some(&0.into())) + .unwrap(); + conns + .process_block_from_rpc( + &authorization, + &mut connection_heads, + Some(head_block.clone()), + archive_rpc.clone(), + &head_block_sender, + &None, + ) .await - .is_ok()); - assert!(conns + .unwrap(); + + assert_eq!(conns.num_synced_rpcs(), 2); + + // best_synced_backend_connection requires servers to be synced with the head block + let best_head_server = conns + .best_synced_backend_connection(&authorization, None, &[], head_block.number.as_ref()) + .await; + + assert!(matches!( + best_head_server.unwrap(), + OpenRequestResult::Handle(_) + )); + + let best_archive_server = conns .best_synced_backend_connection(&authorization, None, &[], Some(&1.into())) - .await - .is_ok()); - assert!(conns - .best_synced_backend_connection(&authorization, None, &[], Some(&2.into())) - .await - .is_err()); + .await; + + match best_archive_server { + Ok(OpenRequestResult::Handle(x)) => { + assert_eq!(x.clone_connection().name, "archive".to_string()) + } + x => { + error!("unexpected result: {:?}", x); + } + } } } diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 1b61a8df..7f347d5f 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -26,8 +26,8 @@ pub enum OpenRequestResult { Handle(OpenRequestHandle), /// Unable to start a request. Retry at the given time. RetryAt(Instant), - /// Unable to start a request. Retrying will not succeed. - RetryNever, + /// Unable to start a request because the server is not synced + NotSynced, } /// Make RPC requests through this handle and drop it when you are done.