From 045065986ade8cca238c433a7b45f14376d74f59 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 4 Jan 2023 12:07:53 -0800 Subject: [PATCH] move allowed_lag around --- config/example.toml | 38 +++++++++++------------ web3_proxy/src/config.rs | 2 ++ web3_proxy/src/frontend/rpc_proxy_http.rs | 3 +- web3_proxy/src/rpcs/blockchain.rs | 18 ++++------- web3_proxy/src/rpcs/connection.rs | 12 +++++-- web3_proxy/src/rpcs/connections.rs | 27 ++++++++++++---- 6 files changed, 60 insertions(+), 40 deletions(-) diff --git a/config/example.toml b/config/example.toml index 729bd083..e2c9d8b7 100644 --- a/config/example.toml +++ b/config/example.toml @@ -54,50 +54,50 @@ response_cache_max_bytes = 10_000_000_000 display_name = "Ankr" url = "https://rpc.ankr.com/eth" soft_limit = 1_000 - weight = 0 + tier = 0 [balanced_rpcs.cloudflare] display_name = "Cloudflare" url = "https://cloudflare-eth.com" soft_limit = 1_000 - weight = 10 + tier = 1 [balanced_rpcs.blastapi] display_name = "Blast" url = "https://eth-mainnet.public.blastapi.io" soft_limit = 1_000 - weight = 10 + tier = 1 [balanced_rpcs.mycryptoapi] display_name = "MyCrypto" disabled = true url = "https://api.mycryptoapi.com/eth" soft_limit = 1_000 - weight = 25 + tier = 2 [balanced_rpcs.pokt-v1] display_name = "Pokt #1" url = "https://eth-mainnet.gateway.pokt.network/v1/5f3453978e354ab992c4da79" soft_limit = 500 - weight = 25 + tier = 2 [balanced_rpcs.pokt] display_name = "Pokt #2" url = "https://eth-rpc.gateway.pokt.network" soft_limit = 500 - weight = 50 - - [balanced_rpcs.runonflux] - display_name = "Run on Flux (light)" - url = "https://ethereumnodelight.app.runonflux.io" - soft_limit = 1_000 - weight = 75 + tier = 3 [balanced_rpcs.linkpool] display_name = "Linkpool" url = "https://main-rpc.linkpool.io" soft_limit = 500 - weight = 75 + tier = 4 + + [balanced_rpcs.runonflux] + display_name = "Run on Flux (light)" + url = "https://ethereumnodelight.app.runonflux.io" + soft_limit = 1_000 + tier = 5 # load balanced light nodes are not very reliable [balanced_rpcs.linkpool-light] @@ -105,7 +105,7 @@ response_cache_max_bytes = 10_000_000_000 disabled = true url = "https://main-light.eth.linkpool.io" soft_limit = 100 - weight = 75 + tier = 5 [private_rpcs] @@ -116,32 +116,32 @@ response_cache_max_bytes = 10_000_000_000 display_name = "Eden network" url = "https://api.edennetwork.io/v1/" soft_limit = 1_805 - weight = 0 + tier = 0 [private_rpcs.eden_beta] disabled = true display_name = "Eden network beta" url = "https://api.edennetwork.io/v1/beta" soft_limit = 5_861 - weight = 0 + tier = 0 [private_rpcs.ethermine] disabled = true display_name = "Ethermine" url = "https://rpc.ethermine.org" soft_limit = 5_861 - weight = 0 + tier = 0 [private_rpcs.flashbots] disabled = true display_name = "Flashbots Fast" url = "https://rpc.flashbots.net/fast" soft_limit = 7_074 - weight = 0 + tier = 0 [private_rpcs.securerpc] disabled = true display_name = "SecureRPC" url = "https://gibson.securerpc.com/v1" soft_limit = 4_560 - weight = 0 + tier = 0 diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 41fa9793..9bb125e3 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -221,6 +221,7 @@ impl Web3ConnectionConfig { pub async fn spawn( self, name: String, + allowed_lag: u64, db_conn: Option, redis_pool: Option, chain_id: u64, @@ -257,6 +258,7 @@ impl Web3ConnectionConfig { Web3Connection::spawn( name, + allowed_lag, self.display_name, chain_id, db_conn, diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 8844eaa4..4dc02e21 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -10,6 +10,7 @@ use axum::{response::IntoResponse, Extension, Json}; use axum_client_ip::ClientIp; use axum_macros::debug_handler; use itertools::Itertools; +use log::debug; use std::sync::Arc; /// POST /rpc -- Public entrypoint for HTTP JSON-RPC requests. Web3 wallets use this. @@ -90,7 +91,7 @@ pub async fn proxy_web3_rpc_with_key( let headers = response.headers_mut(); - // TODO: special string if no rpcs were used (cache hit)? + // TODO: special string if no rpcs were used (cache hit)? or is an empty string fine? maybe the rpc name + "cached" let rpcs: String = rpcs.into_iter().map(|x| x.name.clone()).join(","); headers.insert( diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index b50f52f5..456d21fd 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -29,7 +29,7 @@ pub type BlockHashesCache = Cache Self { - let mut x = Self { block, lag: 0 }; + let mut x = Self { block, age: 0 }; // no need to recalulate lag every time // if the head block gets too old, a health check restarts this connection - x.lag = x.lag(); + x.age = x.lag(); x } @@ -81,7 +81,7 @@ impl SavedBlock { /// When the block was received, this node was still syncing pub fn syncing(&self, allowed_lag: u64) -> bool { - self.lag > allowed_lag + self.age > allowed_lag } } @@ -93,13 +93,7 @@ impl From for SavedBlock { impl Display for SavedBlock { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{} ({})", self.number(), self.hash())?; - - if self.syncing(0) { - write!(f, " (behind by {} seconds)", self.lag)?; - } - - Ok(()) + write!(f, "{} ({}, {}s old)", self.number(), self.hash(), self.age) } } @@ -320,7 +314,7 @@ impl Web3Connections { // TODO: don't default to 60. different chains are differen if rpc_head_block.syncing(60) { if connection_heads.remove(&rpc.name).is_some() { - warn!("{} is behind by {} seconds", &rpc.name, rpc_head_block.lag); + warn!("{} is behind by {} seconds", &rpc.name, rpc_head_block.age); } else { // we didn't remove anything and this block is old. exit early return Ok(()); diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index e30a10b6..64ab933f 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -63,6 +63,7 @@ pub struct Web3Connection { pub name: String, pub display_name: Option, pub db_conn: Option, + pub(super) allowed_lag: u64, /// TODO: can we get this from the provider? do we even need it? pub(super) url: String, /// Some connections use an http_client. we keep a clone for reconnecting @@ -98,6 +99,7 @@ impl Web3Connection { #[allow(clippy::too_many_arguments)] pub async fn spawn( name: String, + allowed_lag: u64, display_name: Option, chain_id: u64, db_conn: Option, @@ -135,6 +137,7 @@ impl Web3Connection { let new_connection = Self { name, + allowed_lag, db_conn: db_conn.clone(), display_name, http_client, @@ -587,6 +590,8 @@ impl Web3Connection { reconnect: bool, tx_id_sender: Option)>>, ) -> anyhow::Result<()> { + let allowed_lag = self.allowed_lag; + loop { let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe()); @@ -641,10 +646,10 @@ impl Web3Connection { if let Some(x) = &*conn.head_block.read() { // if this block is too old, return an error so we reconnect let current_lag = x.lag(); - if current_lag > 0 { + if current_lag > allowed_lag { let level = if warned == 0 { log::Level::Warn - } else if current_lag % 1000 == 0 { + } else if warned % 100 == 0 { log::Level::Debug } else { log::Level::Trace @@ -1208,6 +1213,7 @@ mod tests { let x = Web3Connection { name: "name".to_string(), + allowed_lag: 10, db_conn: None, display_name: None, url: "ws://example.com".to_string(), @@ -1255,6 +1261,7 @@ mod tests { // TODO: this is getting long. have a `impl Default` let x = Web3Connection { name: "name".to_string(), + allowed_lag: 10, db_conn: None, display_name: None, url: "ws://example.com".to_string(), @@ -1306,6 +1313,7 @@ mod tests { let x = Web3Connection { name: "name".to_string(), + allowed_lag: 10, db_conn: None, display_name: None, url: "ws://example.com".to_string(), diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index f2497515..154cd818 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -88,6 +88,9 @@ impl Web3Connections { } }; + // TODO: this might be too aggressive. think about this more + let allowed_lag = ((expected_block_time_ms * 3) as f64 / 1000.0).round() as u64; + let http_interval_sender = if http_client.is_some() { let (sender, receiver) = broadcast::channel(1); @@ -151,6 +154,7 @@ impl Web3Connections { server_config .spawn( server_name, + allowed_lag, db_conn, redis_pool, chain_id, @@ -428,7 +432,7 @@ impl Web3Connections { match x_head_block { None => continue, Some(x_head) => { - let key = (x_head.number(), u64::MAX - x.tier); + let key = (Some(x_head.number()), u64::MAX - x.tier); m.entry(key).or_insert_with(Vec::new).push(x); } @@ -450,8 +454,6 @@ impl Web3Connections { return Ok(OpenRequestResult::NotReady); } - let head_num = head_block.number(); - let mut m = BTreeMap::new(); for x in synced_connections @@ -459,7 +461,7 @@ impl Web3Connections { .iter() .filter(|x| !skip.contains(x)) { - let key = (head_num, u64::MAX - x.tier); + let key = (None, u64::MAX - x.tier); m.entry(key).or_insert_with(Vec::new).push(x.clone()); } @@ -472,6 +474,7 @@ impl Web3Connections { for usable_rpcs in usable_rpcs_by_head_num_and_weight.into_values().rev() { // under heavy load, it is possible for even our best server to be negative let mut minimum = f64::MAX; + let mut maximum = f64::MIN; // we sort on a combination of values. cache them here so that we don't do this math multiple times. let mut available_request_map: HashMap<_, f64> = usable_rpcs @@ -487,13 +490,20 @@ impl Web3Connections { trace!("available requests on {}: {}", rpc, available_requests); - minimum = available_requests.min(minimum); + minimum = minimum.min(available_requests); + maximum = maximum.max(available_requests); (rpc, available_requests) }) .collect(); trace!("minimum available requests: {}", minimum); + trace!("maximum available requests: {}", minimum); + + if maximum < 0.0 { + // TODO: if maximum < 0 and there are other tiers on the same block, we should include them now + warn!("soft limits overloaded: {} to {}", minimum, maximum) + } // choose_multiple_weighted can't have negative numbers. shift up if any are negative // TODO: is this a correct way to shift? @@ -513,7 +523,7 @@ impl Web3Connections { let sorted_rpcs = { if usable_rpcs.len() == 1 { - // TODO: return now instead? + // TODO: return now instead? we shouldn't need another alloc vec![usable_rpcs.get(0).expect("there should be 1")] } else { let mut rng = thread_fast_rng::thread_fast_rng(); @@ -935,6 +945,8 @@ mod tests { let head_rpc = Web3Connection { name: "synced".to_string(), + // TODO: what should this be? + allowed_lag: 10, db_conn: None, display_name: None, url: "ws://example.com/synced".to_string(), @@ -954,6 +966,7 @@ mod tests { let lagged_rpc = Web3Connection { name: "lagged".to_string(), + allowed_lag: 10, db_conn: None, display_name: None, url: "ws://example.com/lagged".to_string(), @@ -1157,6 +1170,7 @@ mod tests { let pruned_rpc = Web3Connection { name: "pruned".to_string(), + allowed_lag: 10, db_conn: None, display_name: None, url: "ws://example.com/pruned".to_string(), @@ -1176,6 +1190,7 @@ mod tests { let archive_rpc = Web3Connection { name: "archive".to_string(), + allowed_lag: 10, db_conn: None, display_name: None, url: "ws://example.com/archive".to_string(),