From 7a1433e3c9c4de9242c13e38d5d3e90f0f31fe20 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 14 Sep 2022 19:39:08 +0000 Subject: [PATCH] no longer need to use total difficulty on ETH 2.0 --- TODO.md | 8 +- redis-rate-limit/Cargo.toml | 2 +- web3_proxy/src/rpcs/blockchain.rs | 129 ++++++++++++++---------------- web3_proxy/src/rpcs/connection.rs | 40 +-------- 4 files changed, 73 insertions(+), 106 deletions(-) diff --git a/TODO.md b/TODO.md index 8b64f970..39d75063 100644 --- a/TODO.md +++ b/TODO.md @@ -144,8 +144,9 @@ These are roughly in order of completition - [x] it looks like our reconnect logic is not always firing. we need to make reconnect more robust! - i am pretty sure that this is actually servers that fail to connect on initial setup (maybe the rpcs that are on the wrong chain are just timing out and they aren't set to reconnect?) - [x] chain rolled back 1/1/1 con_head=15510065 (0xa4a3…d2d8) rpc_head=15510065 (0xa4a3…d2d8) rpc=local_erigon_archive - - include the old head number and block in the log -- [ ] rewrite rate limiting to have a tiered cache. do not put redis in the hot path + - include the old head number and block in the log +- [x] once the merge happens, we don't want to use total difficulty and instead just care about the number +- [-] rewrite rate limiting to have a tiered cache. do not put redis in the hot path - instead, we should check a local cache for the current rate limit (+1) and spawn an update to the local cache from redis in the background. - [x] when there are a LOT of concurrent requests, we see errors. i thought that was a problem with redis cell, but it happens with my simpler rate limit. now i think the problem is actually with bb8 - https://docs.rs/redis/latest/redis/aio/struct.ConnectionManager.html or https://crates.io/crates/deadpool-redis? @@ -202,6 +203,9 @@ These are not yet ordered. - [ ] handle log subscriptions - probably as a paid feature - [ ] exponential backoff when reconnecting a connection +- [ ] on ETH, we no longer use total difficulty, but other chains might + - if total difficulty is not on the block and we aren't on ETH, fetch the full block instead of just the header + - if total difficulty is set and non-zero, use it for consensus instead of just the number new endpoints for users (not totally sure about the exact paths, but these features are all needed): - [x] GET /u/:api_key diff --git a/redis-rate-limit/Cargo.toml b/redis-rate-limit/Cargo.toml index 428c78a2..c610ab60 100644 --- a/redis-rate-limit/Cargo.toml +++ b/redis-rate-limit/Cargo.toml @@ -8,4 +8,4 @@ edition = "2021" anyhow = "1.0.65" deadpool-redis = { version = "0.10.2", features = ["rt_tokio_1", "serde"] } tracing = "0.1.36" -tokio = {version = "*" } +tokio = "1.21.1" diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 7846e3cb..6d31476e 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -48,11 +48,6 @@ impl Web3Connections { let block_num = block.number.as_ref().context("no block num")?; - let _block_td = block - .total_difficulty - .as_ref() - .expect("no block total difficulty. this is a bug!"); - let mut blockchain = self.blockchain_graphmap.write().await; // TODO: think more about heaviest_chain @@ -275,7 +270,7 @@ impl Web3Connections { // iterate the known heads to find the highest_work_block let mut checked_heads = HashSet::new(); - let mut highest_work_block: Option = None; + let mut highest_num_block: Option = None; for (conn_name, connection_head_hash) in connection_heads.iter() { if checked_heads.contains(connection_head_hash) { // we already checked this head from another rpc @@ -287,7 +282,7 @@ impl Web3Connections { let conn_head_block = if let Some(x) = self.block_hashes.get(connection_head_hash) { x } else { - // TODO: why does this happen?!?! seems to only happen with uncled blocks. maybe a check on total difficulty skipped it? + // TODO: why does this happen?!?! seems to only happen with uncled blocks // TODO: maybe we should do get_with? // TODO: maybe we should just continue. this only seems to happen when an older block is received warn!(%connection_head_hash, %conn_name, %rpc, "Missing connection_head_block in block_hashes. Fetching now"); @@ -298,41 +293,42 @@ impl Web3Connections { match self.block(connection_head_hash, conn_rpc).await { Ok(block) => block, Err(err) => { - warn!(%connection_head_hash, %conn_name, %rpc, ?err, "Failed fetching connection_head_block for block_hashe"); + warn!(%connection_head_hash, %conn_name, %rpc, ?err, "Failed fetching connection_head_block for block_hashes"); continue; } } }; - match &conn_head_block.total_difficulty { + match &conn_head_block.number { None => { - panic!("block is missing total difficulty. this is a bug"); + panic!("block is missing number. this is a bug"); } - Some(td) => { + Some(conn_head_num) => { // if this is the first block we've tried - // or if this rpc's newest block has a higher total difficulty - if highest_work_block.is_none() - || td - > highest_work_block + // or if this rpc's newest block has a higher number + // we used to check total difficulty, but that isn't a thing anymore + if highest_num_block.is_none() + || conn_head_num + > highest_num_block .as_ref() .expect("there should always be a block here") - .total_difficulty + .number .as_ref() - .expect("there should always be total difficulty here") + .expect("there should always be number here") { - highest_work_block = Some(conn_head_block); + highest_num_block = Some(conn_head_block); } } } } // clone to release the read lock on self.block_hashes - if let Some(mut maybe_head_block) = highest_work_block { + if let Some(mut maybe_head_block) = highest_num_block { // track rpcs on this heaviest chain so we can build a new SyncedConnections - let mut heavy_rpcs = HashSet::<&String>::new(); - // a running total of the soft limits covered by the heavy rpcs - let mut heavy_sum_soft_limit: u32 = 0; - // TODO: also track heavy_sum_hard_limit? + let mut highest_rpcs = HashSet::<&String>::new(); + // a running total of the soft limits covered by the rpcs that agree on the head block + let mut highest_rpcs_sum_soft_limit: u32 = 0; + // TODO: also track highest_rpcs_sum_hard_limit? llama doesn't need this, so it can wait // check the highest work block for a set of rpcs that can serve our request load // if it doesn't have enough rpcs for our request load, check the parent block @@ -350,21 +346,21 @@ impl Web3Connections { // connection is not on the desired block continue; } - if heavy_rpcs.contains(conn_name) { + if highest_rpcs.contains(conn_name) { // connection is on a child block continue; } if let Some(rpc) = self.conns.get(conn_name) { - heavy_rpcs.insert(conn_name); - heavy_sum_soft_limit += rpc.soft_limit; + highest_rpcs.insert(conn_name); + highest_rpcs_sum_soft_limit += rpc.soft_limit; } else { warn!("connection missing") } } - if heavy_sum_soft_limit < self.min_sum_soft_limit - || heavy_rpcs.len() < self.min_synced_rpcs + if highest_rpcs_sum_soft_limit < self.min_sum_soft_limit + || highest_rpcs.len() < self.min_synced_rpcs { // not enough rpcs yet. check the parent if let Some(parent_block) = self.block_hashes.get(&maybe_head_block.parent_hash) @@ -373,29 +369,29 @@ impl Web3Connections { child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd", ); - maybe_head_block = parent_block.clone(); + maybe_head_block = parent_block; continue; } else { warn!( "no parent to check. soft limit only {}/{} from {}/{} rpcs: {}%", - heavy_sum_soft_limit, + highest_rpcs_sum_soft_limit, self.min_sum_soft_limit, - heavy_rpcs.len(), + highest_rpcs.len(), self.min_synced_rpcs, - heavy_sum_soft_limit * 100 / self.min_sum_soft_limit + highest_rpcs_sum_soft_limit * 100 / self.min_sum_soft_limit ); break; } } } - // TODO: if heavy_rpcs.is_empty, try another method of finding the head block + // TODO: if consensus_head_rpcs.is_empty, try another method of finding the head block let num_connection_heads = connection_heads.len(); let total_conns = self.conns.len(); // we've done all the searching for the heaviest block that we can - if heavy_rpcs.is_empty() { + if highest_rpcs.is_empty() { // if we get here, something is wrong. clear synced connections let empty_synced_connections = SyncedConnections::default(); @@ -406,35 +402,36 @@ impl Web3Connections { // TODO: log different things depending on old_synced_connections warn!(%rpc, "no consensus head! {}/{}/{}", 0, num_connection_heads, total_conns); } else { - // TODO: this is too verbose. move to trace - // i think "conns" is somehow getting dupes - trace!(?heavy_rpcs); + trace!(?highest_rpcs); // TODO: if maybe_head_block.time() is old, ignore it // success! this block has enough soft limit and nodes on it (or on later blocks) - let conns: Vec> = heavy_rpcs + let conns: Vec> = highest_rpcs .into_iter() .filter_map(|conn_name| self.conns.get(conn_name).cloned()) .collect(); - let heavy_block = maybe_head_block; + let consensus_head_block = maybe_head_block; - let heavy_hash = heavy_block.hash.expect("head blocks always have hashes"); - let heavy_num = heavy_block.number.expect("head blocks always have numbers"); + let consensus_head_hash = consensus_head_block + .hash + .expect("head blocks always have hashes"); + let consensus_head_num = consensus_head_block + .number + .expect("head blocks always have numbers"); - debug_assert_ne!(heavy_num, U64::zero()); + debug_assert_ne!(consensus_head_num, U64::zero()); - // TODO: add these to the log messages let num_consensus_rpcs = conns.len(); - let heavy_block_id = BlockId { - hash: heavy_hash, - num: heavy_num, + let consensus_head_block_id = BlockId { + hash: consensus_head_hash, + num: consensus_head_num, }; let new_synced_connections = SyncedConnections { - head_block_id: Some(heavy_block_id.clone()), + head_block_id: Some(consensus_head_block_id.clone()), conns, }; @@ -442,14 +439,14 @@ impl Web3Connections { .synced_connections .swap(Arc::new(new_synced_connections)); - // TODO: if the rpc_head_block != heavy, log something somewhere in here + // TODO: if the rpc_head_block != consensus_head_block, log something? match &old_synced_connections.head_block_id { None => { - debug!(block=%heavy_block_id, %rpc, "first {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); + debug!(block=%consensus_head_block_id, %rpc, "first {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); - self.save_block(&heavy_block, true).await?; + self.save_block(&consensus_head_block, true).await?; - head_block_sender.send(heavy_block)?; + head_block_sender.send(consensus_head_block)?; } Some(old_block_id) => { // TODO: do this log item better @@ -457,42 +454,40 @@ impl Web3Connections { .map(|x| x.to_string()) .unwrap_or_else(|| "None".to_string()); - match heavy_block_id.num.cmp(&old_block_id.num) { + match consensus_head_block_id.num.cmp(&old_block_id.num) { Ordering::Equal => { - // TODO: if rpc_block_id != heavy_block_id, do a different log + // TODO: if rpc_block_id != consensus_head_block_id, do a different log? // multiple blocks with the same fork! - if heavy_block_id.hash == old_block_id.hash { + if consensus_head_block_id.hash == old_block_id.hash { // no change in hash. no need to use head_block_sender - debug!(con_head=%heavy_block_id, rpc_head=%rpc_head_str, %rpc, "con {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns) + debug!(con_head=%consensus_head_block_id, rpc_head=%rpc_head_str, %rpc, "con {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns) } else { // hash changed - debug!(con_head=%heavy_block_id, old=%old_block_id, rpc_head=%rpc_head_str, %rpc, "unc {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); + debug!(con_head=%consensus_head_block_id, old=%old_block_id, rpc_head=%rpc_head_str, %rpc, "unc {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); // todo!("handle equal by updating the cannonical chain"); - self.save_block(&heavy_block, true).await?; + self.save_block(&consensus_head_block, true).await?; - head_block_sender.send(heavy_block)?; + head_block_sender.send(consensus_head_block)?; } } Ordering::Less => { // this is unlikely but possible // TODO: better log - warn!(con_head=%heavy_block_id, old_head=%old_block_id, rpc_head=%rpc_head_str, %rpc, "chain rolled back {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); + warn!(con_head=%consensus_head_block_id, old_head=%old_block_id, rpc_head=%rpc_head_str, %rpc, "chain rolled back {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); - self.save_block(&heavy_block, true).await?; + // TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems slike a good idea + self.save_block(&consensus_head_block, true).await?; - // todo!("handle less by removing higher blocks from the cannonical chain"); - head_block_sender.send(heavy_block)?; + head_block_sender.send(consensus_head_block)?; } Ordering::Greater => { - debug!(con_head=%heavy_block_id, rpc_head=%rpc_head_str, %rpc, "new {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); + debug!(con_head=%consensus_head_block_id, rpc_head=%rpc_head_str, %rpc, "new {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); - // todo!("handle greater by adding this block to and any missing parents to the cannonical chain"); + self.save_block(&consensus_head_block, true).await?; - self.save_block(&heavy_block, true).await?; - - head_block_sender.send(heavy_block)?; + head_block_sender.send(consensus_head_block)?; } } } diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index c82d6f1d..79e3e1af 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -375,42 +375,10 @@ impl Web3Connection { // TODO: is unwrap_or_default ok? we might have an empty block let new_hash = new_head_block.hash.unwrap_or_default(); - // if we already have this block saved, set new_block_head to that arc and don't store this copy - // TODO: small race here - new_head_block = if let Some(existing_block) = block_map.get(&new_hash) { - // we only save blocks with a total difficulty - debug_assert!(existing_block.total_difficulty.is_some()); - existing_block - } else if new_head_block.total_difficulty.is_some() { - // this block has a total difficulty, it is safe to use - block_map.insert(new_hash, new_head_block).await; - - // we get instead of return new_head_block just in case there was a race - // TODO: but how bad is this race? it might be fine - block_map.get(&new_hash).expect("we just inserted") - } else { - // Cache miss and NO TOTAL DIFFICULTY! - // self got the head block first. unfortunately its missing a necessary field - // keep this even after https://github.com/ledgerwatch/erigon/issues/5190 is closed. - // there are other clients and we might have to use a third party without the td fix. - trace!(rpc=%self, ?new_hash, "total_difficulty missing"); - // todo: this can wait forever! - let complete_head_block: Block = self - .wait_for_request_handle() - .await? - .request("eth_getBlockByHash", (new_hash, false), false) - .await?; - - debug_assert!(complete_head_block.total_difficulty.is_some()); - - block_map - .insert(new_hash, Arc::new(complete_head_block)) - .await; - - // we get instead of return new_head_block just in case there was a race - // TODO: but how bad is this race? it might be fine - block_map.get(&new_hash).expect("we just inserted") - }; + // if we already have this block saved, set new_head_block to that arc. otherwise store this copy + new_head_block = block_map + .get_with(new_hash, async move { new_head_block }) + .await; let new_num = new_head_block.number.unwrap_or_default();