From 5be5128c936b489e164fda1e60652c3ff550ae39 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 3 Jan 2023 08:33:49 -0800 Subject: [PATCH] partial refactor of allowed lag --- web3_proxy/src/app/mod.rs | 15 +++++++++++++ web3_proxy/src/rpcs/blockchain.rs | 26 +++++++++------------ web3_proxy/src/rpcs/connection.rs | 6 ++--- web3_proxy/src/rpcs/connections.rs | 36 ++++++++++++++++++++++-------- 4 files changed, 55 insertions(+), 28 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 8d599074..436cbb68 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -184,6 +184,7 @@ pub struct Web3ProxyApp { head_block_receiver: watch::Receiver, pending_tx_sender: broadcast::Sender, pub config: AppConfig, + pub allowed_lag: u64, pub db_conn: Option, pub db_replica: Option, /// prometheus metrics @@ -680,8 +681,20 @@ impl Web3ProxyApp { .time_to_idle(Duration::from_secs(120)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + // TODO: get this out of the toml instead + let allowed_lag = match top_config.app.chain_id { + 1 => 60, + 137 => 10, + 250 => 10, + _ => { + warn!("defaulting allowed lag to 60"); + 60 + } + }; + let app = Self { config: top_config.app, + allowed_lag, balanced_rpcs, private_rpcs, response_cache, @@ -1374,6 +1387,7 @@ impl Web3ProxyApp { let mut response = self .balanced_rpcs .try_send_best_upstream_server( + self.allowed_lag, &authorization, request, Some(&request_metadata), @@ -1397,6 +1411,7 @@ impl Web3ProxyApp { } else { self.balanced_rpcs .try_send_best_upstream_server( + self.allowed_lag, &authorization, request, Some(&request_metadata), diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index b43f7294..b50f52f5 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -55,23 +55,16 @@ impl SavedBlock { } pub fn lag(&self) -> u64 { - // TODO: read this from a global config. different chains should probably have different gaps. - let allowed_lag: u64 = 60; - let now = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("there should always be time"); - // TODO: get this from config - // TODO: is this safe enough? what if something about the chain is actually lagged? what if its a chain like BTC with 10 minute blocks? - let oldest_allowed = now - Duration::from_secs(allowed_lag); - let block_timestamp = Duration::from_secs(self.block.timestamp.as_u64()); - if block_timestamp < oldest_allowed { + if block_timestamp < now { // this server is still syncing from too far away to serve requests // u64 is safe because ew checked equality above - (oldest_allowed - block_timestamp).as_secs() as u64 + (now - block_timestamp).as_secs() as u64 } else { 0 } @@ -87,9 +80,8 @@ impl SavedBlock { } /// When the block was received, this node was still syncing - pub fn syncing(&self) -> bool { - // TODO: margin should come from a global config - self.lag > 60 + pub fn syncing(&self, allowed_lag: u64) -> bool { + self.lag > allowed_lag } } @@ -103,7 +95,7 @@ impl Display for SavedBlock { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{} ({})", self.number(), self.hash())?; - if self.syncing() { + if self.syncing(0) { write!(f, " (behind by {} seconds)", self.lag)?; } @@ -177,8 +169,9 @@ impl Web3Connections { let request: JsonRpcRequest = serde_json::from_value(request)?; // TODO: request_metadata? maybe we should put it in the authorization? + // TODO: don't hard code allowed lag let response = self - .try_send_best_upstream_server(authorization, request, None, None) + .try_send_best_upstream_server(60, authorization, request, None, None) .await?; let block = response.result.context("failed fetching block")?; @@ -254,7 +247,7 @@ impl Web3Connections { // TODO: if error, retry? // TODO: request_metadata or authorization? let response = self - .try_send_best_upstream_server(authorization, request, None, Some(num)) + .try_send_best_upstream_server(60, authorization, request, None, Some(num)) .await?; let raw_block = response.result.context("no block result")?; @@ -324,7 +317,8 @@ impl Web3Connections { // we don't know if its on the heaviest chain yet self.save_block(&rpc_head_block.block, false).await?; - if rpc_head_block.syncing() { + // TODO: don't default to 60. different chains are differen + if rpc_head_block.syncing(60) { if connection_heads.remove(&rpc.name).is_some() { warn!("{} is behind by {} seconds", &rpc.name, rpc_head_block.lag); } else { diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 5f197201..1b0225f0 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -202,7 +202,7 @@ impl Web3Connection { .await? .context("no block during check_block_data_limit!")?; - if SavedBlock::from(head_block).syncing() { + if SavedBlock::from(head_block).syncing(60) { // if the node is syncing, we can't check its block data limit return Ok(None); } @@ -289,7 +289,7 @@ impl Web3Connection { pub fn syncing(&self) -> bool { match self.head_block.read().clone() { None => true, - Some(x) => x.syncing(), + Some(x) => x.syncing(60), } } @@ -297,7 +297,7 @@ impl Web3Connection { let head_block_num = match self.head_block.read().clone() { None => return false, Some(x) => { - if x.syncing() { + if x.syncing(60) { // skip syncing nodes. even though they might be able to serve a query, // latency will be poor and it will get in the way of them syncing further return false; diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index a627b88b..a4a2b7ff 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -388,6 +388,7 @@ impl Web3Connections { /// get the best available rpc server pub async fn best_synced_backend_connection( &self, + allowed_lag: u64, authorization: &Arc, request_metadata: Option<&Arc>, skip: &[Arc], @@ -422,11 +423,18 @@ impl Web3Connections { // TODO: double check has_block_data? let synced_connections = self.synced_connections.load(); - let head_num = match synced_connections.head_block.as_ref() { + let head_block = match synced_connections.head_block.as_ref() { None => return Ok(OpenRequestResult::NotReady), - Some(x) => x.number(), + Some(x) => x, }; + // TODO: different allowed_lag depending on the chain + if head_block.syncing(allowed_lag) { + return Ok(OpenRequestResult::NotReady); + } + + let head_num = head_block.number(); + let c: Vec<_> = synced_connections .conns .iter() @@ -607,8 +615,10 @@ impl Web3Connections { } /// be sure there is a timeout on this or it might loop forever + /// TODO: do not take allowed_lag here. have it be on the connections struct instead pub async fn try_send_best_upstream_server( &self, + allowed_lag: u64, authorization: &Arc, request: JsonRpcRequest, request_metadata: Option<&Arc>, @@ -624,6 +634,7 @@ impl Web3Connections { } match self .best_synced_backend_connection( + allowed_lag, authorization, request_metadata, &skip_rpcs, @@ -1011,8 +1022,9 @@ mod tests { ); // best_synced_backend_connection requires servers to be synced with the head block + // TODO: don't hard code allowed_lag let x = conns - .best_synced_backend_connection(&authorization, None, &[], None) + .best_synced_backend_connection(60, &authorization, None, &[], None) .await .unwrap(); @@ -1067,21 +1079,21 @@ mod tests { assert!(matches!( conns - .best_synced_backend_connection(&authorization, None, &[], None) + .best_synced_backend_connection(60, &authorization, None, &[], None) .await, Ok(OpenRequestResult::Handle(_)) )); assert!(matches!( conns - .best_synced_backend_connection(&authorization, None, &[], Some(&0.into())) + .best_synced_backend_connection(60, &authorization, None, &[], Some(&0.into())) .await, Ok(OpenRequestResult::Handle(_)) )); assert!(matches!( conns - .best_synced_backend_connection(&authorization, None, &[], Some(&1.into())) + .best_synced_backend_connection(60, &authorization, None, &[], Some(&1.into())) .await, Ok(OpenRequestResult::Handle(_)) )); @@ -1089,7 +1101,7 @@ mod tests { // future block should not get a handle assert!(matches!( conns - .best_synced_backend_connection(&authorization, None, &[], Some(&2.into())) + .best_synced_backend_connection(60, &authorization, None, &[], Some(&2.into())) .await, Ok(OpenRequestResult::NotReady) )); @@ -1219,7 +1231,13 @@ mod tests { // best_synced_backend_connection requires servers to be synced with the head block let best_head_server = conns - .best_synced_backend_connection(&authorization, None, &[], Some(&head_block.number())) + .best_synced_backend_connection( + 60, + &authorization, + None, + &[], + Some(&head_block.number()), + ) .await; assert!(matches!( @@ -1228,7 +1246,7 @@ mod tests { )); let best_archive_server = conns - .best_synced_backend_connection(&authorization, None, &[], Some(&1.into())) + .best_synced_backend_connection(60, &authorization, None, &[], Some(&1.into())) .await; match best_archive_server {