From 52a9ba604c1155265e225fcee7b4ce1cb3e7a9c3 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 19 Jan 2023 03:05:39 -0800 Subject: [PATCH] remove allowed lag --- TODO.md | 1 + web3_proxy/src/app/mod.rs | 15 ---- web3_proxy/src/config.rs | 2 - web3_proxy/src/rpcs/blockchain.rs | 9 +-- web3_proxy/src/rpcs/connection.rs | 86 ++--------------------- web3_proxy/src/rpcs/connections.rs | 108 +++++++++++++++++------------ 6 files changed, 74 insertions(+), 147 deletions(-) diff --git a/TODO.md b/TODO.md index afe04f5d..693b2179 100644 --- a/TODO.md +++ b/TODO.md @@ -307,6 +307,7 @@ These are not yet ordered. There might be duplicates. We might not actually need - [x] improve rate limiting on websockets - [x] retry another server if we get a jsonrpc response error about rate limits - [x] major refactor to only use backup servers when absolutely necessary +- [x] remove allowed lag - [-] proxy mode for benchmarking all backends - [-] proxy mode for sending to multiple backends - [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 055694f3..81968a6b 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -190,7 +190,6 @@ pub struct Web3ProxyApp { head_block_receiver: watch::Receiver, pending_tx_sender: broadcast::Sender, pub config: AppConfig, - pub allowed_lag: u64, pub db_conn: Option, pub db_replica: Option, /// prometheus metrics @@ -687,20 +686,8 @@ impl Web3ProxyApp { .time_to_idle(Duration::from_secs(120)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - // TODO: get this out of the toml instead - let allowed_lag = match top_config.app.chain_id { - 1 => 60, - 137 => 10, - 250 => 10, - _ => { - warn!("defaulting allowed lag to 60"); - 60 - } - }; - let app = Self { config: top_config.app, - allowed_lag, balanced_rpcs, private_rpcs, response_cache, @@ -1432,7 +1419,6 @@ impl Web3ProxyApp { .balanced_rpcs .try_proxy_connection( proxy_mode, - self.allowed_lag, &authorization, request, Some(&request_metadata), @@ -1459,7 +1445,6 @@ impl Web3ProxyApp { self.balanced_rpcs .try_proxy_connection( proxy_mode, - self.allowed_lag, &authorization, request, Some(&request_metadata), diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 20aabee3..397a9c60 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -223,7 +223,6 @@ impl Web3ConnectionConfig { pub async fn spawn( self, name: String, - allowed_lag: u64, db_conn: Option, redis_pool: Option, chain_id: u64, @@ -262,7 +261,6 @@ impl Web3ConnectionConfig { Web3Connection::spawn( name, - allowed_lag, self.display_name, chain_id, db_conn, diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index f611c7d9..56bbf045 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -78,11 +78,6 @@ impl SavedBlock { pub fn number(&self) -> U64 { self.block.number.expect("saved blocks must have a number") } - - /// When the block was received, this node was still syncing - pub fn syncing(&self, allowed_lag: u64) -> bool { - self.age > allowed_lag - } } impl From for SavedBlock { @@ -172,7 +167,7 @@ impl Web3Connections { // TODO: request_metadata? maybe we should put it in the authorization? // TODO: don't hard code allowed lag let response = self - .try_send_best_consensus_head_connection(60, authorization, request, None, None) + .try_send_best_consensus_head_connection(authorization, request, None, None) .await?; let block = response.result.context("failed fetching block")?; @@ -248,7 +243,7 @@ impl Web3Connections { // TODO: if error, retry? // TODO: request_metadata or authorization? let response = self - .try_send_best_consensus_head_connection(60, authorization, request, None, Some(num)) + .try_send_best_consensus_head_connection(authorization, request, None, Some(num)) .await?; let raw_block = response.result.context("no block result")?; diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 070c88b8..ea3f9c67 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -63,7 +63,6 @@ pub struct Web3Connection { pub name: String, pub display_name: Option, pub db_conn: Option, - pub(super) allowed_lag: u64, /// TODO: can we get this from the provider? do we even need it? pub(super) url: String, /// Some connections use an http_client. we keep a clone for reconnecting @@ -101,7 +100,6 @@ impl Web3Connection { #[allow(clippy::too_many_arguments)] pub async fn spawn( name: String, - allowed_lag: u64, display_name: Option, chain_id: u64, db_conn: Option, @@ -140,7 +138,6 @@ impl Web3Connection { let new_connection = Self { name, - allowed_lag, db_conn: db_conn.clone(), display_name, http_client, @@ -195,25 +192,7 @@ impl Web3Connection { return Ok(None); } - // check if we are synced - let head_block: ArcBlock = self - .wait_for_request_handle(authorization, Duration::from_secs(30), true) - .await? - .request::<_, Option<_>>( - "eth_getBlockByNumber", - &json!(("latest", false)), - // error here are expected, so keep the level low - Level::Warn.into(), - ) - .await? - .context("no block during check_block_data_limit!")?; - - if SavedBlock::from(head_block).syncing(60) { - // if the node is syncing, we can't check its block data limit - return Ok(None); - } - - // TODO: add SavedBlock to self? probably best not to. we might not get marked Ready + // TODO: check eth_syncing. if it is not false, return Ok(None) let mut limit = None; @@ -296,27 +275,10 @@ impl Web3Connection { self.block_data_limit.load(atomic::Ordering::Acquire).into() } - pub fn syncing(&self, allowed_lag: u64) -> bool { - match self.head_block.read().clone() { - None => true, - Some(x) => x.syncing(allowed_lag), - } - } - pub fn has_block_data(&self, needed_block_num: &U64) -> bool { let head_block_num = match self.head_block.read().clone() { None => return false, - Some(x) => { - // TODO: this 60 second limit is causing our polygons to fall behind. change this to number of blocks? - // TODO: sometimes blocks might actually just take longer than 60 seconds - if x.syncing(60) { - // skip syncing nodes. even though they might be able to serve a query, - // latency will be poor and it will get in the way of them syncing further - return false; - } - - x.number() - } + Some(x) => x.number(), }; // this rpc doesn't have that block yet. still syncing @@ -548,7 +510,7 @@ impl Web3Connection { let _ = head_block.insert(new_head_block.clone().into()); } - if self.block_data_limit() == U64::zero() && !self.syncing(1) { + if self.block_data_limit() == U64::zero() { let authorization = Arc::new(Authorization::internal(self.db_conn.clone())?); if let Err(err) = self.check_block_data_limit(&authorization).await { warn!( @@ -596,8 +558,6 @@ impl Web3Connection { reconnect: bool, tx_id_sender: Option)>>, ) -> anyhow::Result<()> { - let allowed_lag = self.allowed_lag; - loop { let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe()); @@ -629,8 +589,6 @@ impl Web3Connection { let health_sleep_seconds = 10; sleep(Duration::from_secs(health_sleep_seconds)).await; - let mut warned = 0; - loop { // TODO: what if we just happened to have this check line up with another restart? // TODO: think more about this @@ -649,38 +607,6 @@ impl Web3Connection { } // trace!("health check on {}. unlocked", conn); - if let Some(x) = &*conn.head_block.read() { - // if this block is too old, return an error so we reconnect - let current_lag = x.lag(); - if current_lag > allowed_lag { - let level = if warned == 0 { - if conn.backup { - log::Level::Info - } else { - log::Level::Warn - } - } else if warned % 100 == 0 { - log::Level::Debug - } else { - log::Level::Trace - }; - - log::log!( - level, - "{} is lagged {} secs: {} {}", - conn, - current_lag, - x.number(), - x.hash(), - ); - - warned += 1; - } else { - // reset warnings now that we are connected - warned = 0; - } - } - sleep(Duration::from_secs(health_sleep_seconds)).await; } }; @@ -1222,7 +1148,6 @@ mod tests { let x = Web3Connection { name: "name".to_string(), - allowed_lag: 10, db_conn: None, display_name: None, url: "ws://example.com".to_string(), @@ -1271,7 +1196,6 @@ mod tests { // TODO: this is getting long. have a `impl Default` let x = Web3Connection { name: "name".to_string(), - allowed_lag: 10, db_conn: None, display_name: None, url: "ws://example.com".to_string(), @@ -1299,6 +1223,8 @@ mod tests { assert!(!x.has_block_data(&(head_block.number() + 1000))); } + /* + // TODO: think about how to bring the concept of a "lagged" node back #[test] fn test_lagged_node_not_has_block_data() { let now: U256 = SystemTime::now() @@ -1324,7 +1250,6 @@ mod tests { let x = Web3Connection { name: "name".to_string(), - allowed_lag: 10, db_conn: None, display_name: None, url: "ws://example.com".to_string(), @@ -1349,4 +1274,5 @@ mod tests { assert!(!x.has_block_data(&(head_block.number() + 1))); assert!(!x.has_block_data(&(head_block.number() + 1000))); } + */ } diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 93493716..99eb61a8 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -89,9 +89,6 @@ impl Web3Connections { } }; - // TODO: this might be too aggressive. think about this more - let allowed_lag = ((expected_block_time_ms * 3) as f64 / 1000.0).round() as u64; - let http_interval_sender = if http_client.is_some() { let (sender, receiver) = broadcast::channel(1); @@ -155,7 +152,6 @@ impl Web3Connections { server_config .spawn( server_name, - allowed_lag, db_conn, redis_pool, chain_id, @@ -408,10 +404,40 @@ impl Web3Connections { unimplemented!("this shouldn't be possible") } - /// get the best available rpc server with the consensus head block. it might have blocks after the consensus head pub async fn best_consensus_head_connection( &self, - allowed_lag: u64, + authorization: &Arc, + request_metadata: Option<&Arc>, + skip: &[Arc], + min_block_needed: Option<&U64>, + ) -> anyhow::Result { + if let Ok(without_backups) = self + ._best_consensus_head_connection( + false, + authorization, + request_metadata, + skip, + min_block_needed, + ) + .await + { + return Ok(without_backups); + } + + self._best_consensus_head_connection( + true, + authorization, + request_metadata, + skip, + min_block_needed, + ) + .await + } + + /// get the best available rpc server with the consensus head block. it might have blocks after the consensus head + async fn _best_consensus_head_connection( + &self, + allow_backups: bool, authorization: &Arc, request_metadata: Option<&Arc>, skip: &[Arc], @@ -421,12 +447,13 @@ impl Web3Connections { (Option, u64), Vec>, > = if let Some(min_block_needed) = min_block_needed { - // need a potentially old block. check all the rpcs + // need a potentially old block. check all the rpcs. prefer the most synced let mut m = BTreeMap::new(); for x in self .conns .values() + .filter(|x| if allow_backups { true } else { !x.backup }) .filter(|x| !skip.contains(x)) .filter(|x| x.has_block_data(min_block_needed)) .cloned() @@ -448,15 +475,7 @@ impl Web3Connections { // need latest. filter the synced rpcs let synced_connections = self.synced_connections.load(); - let head_block = match synced_connections.head_block.as_ref() { - None => return Ok(OpenRequestResult::NotReady), - Some(x) => x, - }; - - // TODO: self.allowed_lag instead of taking as an arg - if head_block.syncing(allowed_lag) { - return Ok(OpenRequestResult::NotReady); - } + // TODO: if head_block is super old. emit an error! let mut m = BTreeMap::new(); @@ -575,7 +594,7 @@ impl Web3Connections { None => { // none of the servers gave us a time to retry at - // TODO: bring this back? + // TODO: bring this back? need to think about how to do this with `allow_backups` // we could return an error here, but maybe waiting a second will fix the problem // TODO: configurable max wait? the whole max request time, or just some portion? // let handle = sorted_rpcs @@ -605,6 +624,24 @@ impl Web3Connections { authorization: &Arc, block_needed: Option<&U64>, max_count: Option, + ) -> Result, Option> { + if let Ok(without_backups) = self + ._all_synced_connections(false, authorization, block_needed, max_count) + .await + { + return Ok(without_backups); + } + + self._all_synced_connections(true, authorization, block_needed, max_count) + .await + } + + async fn _all_synced_connections( + &self, + allow_backups: bool, + authorization: &Arc, + block_needed: Option<&U64>, + max_count: Option, ) -> Result, Option> { let mut earliest_retry_at = None; // TODO: with capacity? @@ -621,12 +658,14 @@ impl Web3Connections { break; } + if !allow_backups && connection.backup { + continue; + } + if let Some(block_needed) = block_needed { if !connection.has_block_data(block_needed) { continue; } - } else if connection.syncing(30) { - continue; } // check rate limits and increment our connection counter @@ -663,10 +702,8 @@ impl Web3Connections { } /// be sure there is a timeout on this or it might loop forever - /// TODO: do not take allowed_lag here. have it be on the connections struct instead pub async fn try_send_best_consensus_head_connection( &self, - allowed_lag: u64, authorization: &Arc, request: JsonRpcRequest, request_metadata: Option<&Arc>, @@ -682,7 +719,6 @@ impl Web3Connections { } match self .best_consensus_head_connection( - allowed_lag, authorization, request_metadata, &skip_rpcs, @@ -903,7 +939,6 @@ impl Web3Connections { pub async fn try_proxy_connection( &self, proxy_mode: ProxyMode, - allowed_lag: u64, authorization: &Arc, request: JsonRpcRequest, request_metadata: Option<&Arc>, @@ -912,7 +947,6 @@ impl Web3Connections { match proxy_mode { ProxyMode::Best => { self.try_send_best_consensus_head_connection( - allowed_lag, authorization, request, request_metadata, @@ -1014,8 +1048,6 @@ mod tests { let head_rpc = Web3Connection { name: "synced".to_string(), - // TODO: what should this be? - allowed_lag: 10, db_conn: None, display_name: None, url: "ws://example.com/synced".to_string(), @@ -1036,7 +1068,6 @@ mod tests { let lagged_rpc = Web3Connection { name: "lagged".to_string(), - allowed_lag: 10, db_conn: None, display_name: None, url: "ws://example.com/lagged".to_string(), @@ -1129,9 +1160,8 @@ mod tests { ); // best_synced_backend_connection requires servers to be synced with the head block - // TODO: don't hard code allowed_lag let x = conns - .best_consensus_head_connection(60, &authorization, None, &[], None) + .best_consensus_head_connection(&authorization, None, &[], None) .await .unwrap(); @@ -1186,21 +1216,21 @@ mod tests { assert!(matches!( conns - .best_consensus_head_connection(60, &authorization, None, &[], None) + .best_consensus_head_connection(&authorization, None, &[], None) .await, Ok(OpenRequestResult::Handle(_)) )); assert!(matches!( conns - .best_consensus_head_connection(60, &authorization, None, &[], Some(&0.into())) + .best_consensus_head_connection(&authorization, None, &[], Some(&0.into())) .await, Ok(OpenRequestResult::Handle(_)) )); assert!(matches!( conns - .best_consensus_head_connection(60, &authorization, None, &[], Some(&1.into())) + .best_consensus_head_connection(&authorization, None, &[], Some(&1.into())) .await, Ok(OpenRequestResult::Handle(_)) )); @@ -1208,7 +1238,7 @@ mod tests { // future block should not get a handle assert!(matches!( conns - .best_consensus_head_connection(60, &authorization, None, &[], Some(&2.into())) + .best_consensus_head_connection(&authorization, None, &[], Some(&2.into())) .await, Ok(OpenRequestResult::NotReady) )); @@ -1241,7 +1271,6 @@ mod tests { let pruned_rpc = Web3Connection { name: "pruned".to_string(), - allowed_lag: 10, db_conn: None, display_name: None, url: "ws://example.com/pruned".to_string(), @@ -1262,7 +1291,6 @@ mod tests { let archive_rpc = Web3Connection { name: "archive".to_string(), - allowed_lag: 10, db_conn: None, display_name: None, url: "ws://example.com/archive".to_string(), @@ -1343,13 +1371,7 @@ mod tests { // best_synced_backend_connection requires servers to be synced with the head block let best_head_server = conns - .best_consensus_head_connection( - 60, - &authorization, - None, - &[], - Some(&head_block.number()), - ) + .best_consensus_head_connection(&authorization, None, &[], Some(&head_block.number())) .await; assert!(matches!( @@ -1358,7 +1380,7 @@ mod tests { )); let best_archive_server = conns - .best_consensus_head_connection(60, &authorization, None, &[], Some(&1.into())) + .best_consensus_head_connection(&authorization, None, &[], Some(&1.into())) .await; match best_archive_server {