From 068c05cf4f0e11533f2cf404d4cd51805ef66236 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 5 Sep 2022 16:25:21 +0000 Subject: [PATCH] improve fork logic again --- Cargo.lock | 4 +-- web3_proxy/Cargo.toml | 2 +- web3_proxy/src/app.rs | 3 +- web3_proxy/src/rpcs/blockchain.rs | 54 ++++++++++++++++++------------- web3_proxy/src/rpcs/connection.rs | 24 +++++++------- web3_proxy/src/rpcs/request.rs | 2 +- 6 files changed, 48 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 214ae79a..fb35b64c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2680,9 +2680,9 @@ dependencies = [ [[package]] name = "moka" -version = "0.9.3" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a89c33e91526792a0260425073c3db0b472cdca2cc6fcaa666dd6e65450462a" +checksum = "dd8256cf9fa396576521e5e94affadb95818ec48f5dbedca36714387688aea29" dependencies = [ "async-io", "async-lock", diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 794c2f1d..2bab0816 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -34,7 +34,7 @@ flume = "0.10.14" futures = { version = "0.3.24", features = ["thread-pool"] } hashbrown = { version = "0.12.3", features = ["serde"] } http = "0.2.8" -moka = { version = "0.9.3", default-features = false, features = ["future"] } +moka = { version = "0.9.4", default-features = false, features = ["future"] } notify = "5.0.0" num = "0.4.0" parking_lot = { version = "0.12.1", features = ["arc_lock"] } diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index d7287959..1cf30536 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -39,7 +39,7 @@ use tokio::sync::{broadcast, watch}; use tokio::task::JoinHandle; use tokio::time::{timeout, Instant}; use tokio_stream::wrappers::{BroadcastStream, WatchStream}; -use tracing::{debug, info, info_span, instrument, trace, warn, Instrument}; +use tracing::{info, info_span, instrument, trace, warn, Instrument}; use uuid::Uuid; // TODO: make this customizable? @@ -151,7 +151,6 @@ impl Web3ProxyApp { Pin>>>, )> { // safety checks on the config - debug!("redirect_user_url: {}", top_config.app.redirect_user_url); assert!( top_config.app.redirect_user_url.contains("{{user_id}}"), "redirect user url must contain \"{{user_id}}\"" diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 487a6ff4..2fff25ee 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -313,7 +313,7 @@ impl Web3Connections { // clone to release the read lock on self.block_hashes if let Some(mut maybe_head_block) = highest_work_block { // track rpcs on this heaviest chain so we can build a new SyncedConnections - let mut heavy_rpcs: Vec<&Arc> = vec![]; + let mut heavy_rpcs = HashSet::<&String>::new(); // a running total of the soft limits covered by the heavy rpcs let mut heavy_sum_soft_limit: u32 = 0; // TODO: also track heavy_sum_hard_limit? @@ -335,7 +335,7 @@ impl Web3Connections { } if let Some(rpc) = self.conns.get(conn_name) { - heavy_rpcs.push(rpc); + heavy_rpcs.insert(conn_name); heavy_sum_soft_limit += rpc.soft_limit; } else { warn!("connection missing") @@ -366,9 +366,27 @@ impl Web3Connections { break; } } + } + // TODO: if heavy_rpcs.is_empty, try another method of finding the head block + + // we've done all the searching for the heaviest block that we can + if heavy_rpcs.is_empty() { + // if we get here, something is wrong. clear synced connections + let empty_synced_connections = SyncedConnections::default(); + + let old_synced_connections = self + .synced_connections + .swap(Arc::new(empty_synced_connections)); + + // TODO: log different things depending on old_synced_connections + warn!("no consensus head!"); + } else { // success! this block has enough soft limit and nodes on it (or on later blocks) - let conns = heavy_rpcs.into_iter().cloned().collect(); + let conns: Vec> = heavy_rpcs + .into_iter() + .filter_map(|conn_name| self.conns.get(conn_name).cloned()) + .collect(); let heavy_block = maybe_head_block; @@ -377,6 +395,11 @@ impl Web3Connections { debug_assert_ne!(heavy_num, U64::zero()); + // TODO: add these to the log messages + let num_consensus_rpcs = conns.len(); + let num_connection_heads = connection_heads.len(); + let total_conns = self.conns.len(); + let heavy_block_id = BlockId { hash: heavy_hash, num: heavy_num, @@ -391,14 +414,10 @@ impl Web3Connections { .synced_connections .swap(Arc::new(new_synced_connections)); - // TODO: add these to the log messages - let num_connection_heads = connection_heads.len(); - let total_conns = self.conns.len(); - // TODO: if the rpc_head_block != heavy, log something somewhere in here match &old_synced_connections.head_block_id { None => { - debug!(block=%heavy_block_id, %rpc, "first consensus head"); + debug!(block=%heavy_block_id, %rpc, "first consensus head {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); self.save_block(&rpc_head_block, true).await?; @@ -410,10 +429,10 @@ impl Web3Connections { // multiple blocks with the same fork! if heavy_block_id.hash == old_block_id.hash { // no change in hash. no need to use head_block_sender - debug!(head=%heavy_block_id, old=%old_block_id, %rpc, "con block") + debug!(head=%heavy_block_id, %rpc, "con block {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns) } else { // hash changed - info!(heavy=%heavy_block_id, old=%old_block_id, %rpc, "unc block"); + info!(heavy=%heavy_block_id, old=%old_block_id, %rpc, "unc block {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); // todo!("handle equal by updating the cannonical chain"); self.save_block(&rpc_head_block, true).await?; @@ -424,7 +443,7 @@ impl Web3Connections { Ordering::Less => { // this is unlikely but possible // TODO: better log - warn!(head=%heavy_block_id, %rpc, "chain rolled back"); + warn!(head=%heavy_block_id, %rpc, "chain rolled back {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); self.save_block(&rpc_head_block, true).await?; @@ -432,7 +451,7 @@ impl Web3Connections { head_block_sender.send(heavy_block)?; } Ordering::Greater => { - debug!(head=%heavy_block_id, %rpc, "new block"); + debug!(head=%heavy_block_id, %rpc, "new block {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); // todo!("handle greater by adding this block to and any missing parents to the cannonical chain"); @@ -443,18 +462,7 @@ impl Web3Connections { } } } - - return Ok(()); } - - // if we get here, something is wrong. clear synced connections - let empty_synced_connections = SyncedConnections::default(); - - let old_synced_connections = self - .synced_connections - .swap(Arc::new(empty_synced_connections)); - - // TODO: log different things depending on old_synced_connections } Ok(()) diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 9675dddc..d2ab7d31 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -157,7 +157,7 @@ impl Web3Connection { Ok((new_connection, handle)) } - #[instrument] + #[instrument(skip_all)] async fn check_block_data_limit(self: &Arc) -> anyhow::Result> { let mut limit = None; @@ -166,7 +166,7 @@ impl Web3Connection { // TODO: wait until head block is set outside the loop? if we disconnect while starting we could actually get 0 though while head_block_num == U64::zero() { - warn!("no head block"); + warn!(rpc=%self, "no head block yet. retrying"); // TODO: subscribe to a channel instead of polling? subscribe to http_interval_sender? sleep(Duration::from_secs(1)).await; @@ -191,7 +191,7 @@ impl Web3Connection { ) .await; - trace!(?archive_result, %self); + trace!(?archive_result, rpc=%self); if archive_result.is_ok() { limit = Some(block_data_limit); @@ -298,7 +298,7 @@ impl Web3Connection { // self got the head block first. unfortunately its missing a necessary field // keep this even after https://github.com/ledgerwatch/erigon/issues/5190 is closed. // there are other clients and we might have to use a third party without the td fix. - trace!(rpc=?self, ?new_hash, "total_difficulty missing"); + trace!(rpc=%self, ?new_hash, "total_difficulty missing"); // todo: this can wait forever! let complete_head_block: Block = self .wait_for_request_handle() @@ -382,7 +382,7 @@ impl Web3Connection { if futures.is_empty() { // TODO: is there a better way to make a channel that is never ready? - info!(?self, "no-op subscription"); + info!(rpc=%self, "no-op subscription"); return Ok(()); } @@ -393,7 +393,7 @@ impl Web3Connection { // TODO: exponential backoff let retry_in = Duration::from_secs(1); warn!( - ?self, + rpc=%self, "subscription exited. Attempting to reconnect in {:?}. {:?}", retry_in, err @@ -404,7 +404,7 @@ impl Web3Connection { // TODO: this isn't going to work. it will get in a loop with newHeads self.reconnect(block_sender.clone()).await?; } else { - error!(?self, ?err, "subscription exited"); + error!(rpc=%self, ?err, "subscription exited"); return Err(err); } } @@ -490,12 +490,12 @@ impl Web3Connection { broadcast::error::RecvError::Lagged(lagged) => { // querying the block was delayed // this can happen if tokio is very busy or waiting for requests limits took too long - warn!(?err, ?self, "http interval lagging by {}!", lagged); + warn!(?err, rpc=%self, "http interval lagging by {}!", lagged); } } } - trace!(?self, "ok http interval"); + trace!(rpc=%self, "ok http interval"); } } Web3Provider::Ws(provider) => { @@ -526,7 +526,7 @@ impl Web3Connection { .await?; } - warn!(?self, "subscription ended"); + warn!(rpc=%self, "subscription ended"); } } } @@ -588,7 +588,7 @@ impl Web3Connection { // TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription } - warn!(?self, "subscription ended"); + warn!(rpc=%self, "subscription ended"); } } } @@ -644,7 +644,7 @@ impl Web3Connection { // save the smallest retry_after. if nothing succeeds, return an Err with retry_after in it // TODO: use tracing better // TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0? - warn!(?retry_at, ?self, "Exhausted rate limit"); + warn!(?retry_at, rpc=%self, "Exhausted rate limit"); return Ok(OpenRequestResult::RetryAt(retry_at.into())); } diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 168f9057..32667d19 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -58,7 +58,7 @@ impl OpenRequestHandle { while provider.is_none() { match self.0.provider.read().await.as_ref() { None => { - warn!(rpc=?self.0, "no provider!"); + warn!(rpc=%self.0, "no provider!"); // TODO: how should this work? a reconnect should be in progress. but maybe force one now? // TODO: sleep how long? subscribe to something instead? sleep(Duration::from_millis(100)).await