diff --git a/Cargo.lock b/Cargo.lock index 79ff44fe..6f9a2d7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5533,7 +5533,6 @@ dependencies = [ "num", "num-traits", "parking_lot 0.12.1", - "petgraph", "proctitle", "redis-rate-limiter", "regex", diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 4770a7b9..aefd9e52 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -47,7 +47,6 @@ num = "0.4.0" # TODO: import num_traits from sea-orm so we always have the same version num-traits = "0.2.15" parking_lot = { version = "0.12.1", features = ["arc_lock"] } -petgraph = "0.6.2" proctitle = "0.1.1" # TODO: regex has several "perf" features that we might want to use regex = "1.7.0" diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 4fc49b7f..35b8ec21 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -9,7 +9,7 @@ use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; -use crate::rpcs::blockchain::{ArcBlock, BlockId}; +use crate::rpcs::blockchain::{ArcBlock, SavedBlock}; use crate::rpcs::connections::Web3Connections; use crate::rpcs::request::OpenRequestHandleMetrics; use crate::rpcs::transactions::TxStatus; @@ -859,18 +859,18 @@ impl Web3ProxyApp { // emit stats // TODO: if no servers synced, wait for them to be synced? - let head_block_id = self + let head_block = self .balanced_rpcs .head_block_id() .context("no servers synced")?; // we do this check before checking caches because it might modify the request params // TODO: add a stat for archive vs full since they should probably cost different - let request_block_id = if let Some(request_block_needed) = block_needed( + let request_block = if let Some(request_block_needed) = block_needed( authorization, method, request.params.as_mut(), - head_block_id.num, + head_block.number(), &self.balanced_rpcs, ) .await? @@ -887,18 +887,20 @@ impl Web3ProxyApp { .store(true, atomic::Ordering::Relaxed); } - BlockId { - num: request_block_needed, - hash: request_block_hash, - } + let request_block = self + .balanced_rpcs + .block(authorization, &request_block_hash, None) + .await?; + + SavedBlock::new(request_block) } else { - head_block_id + head_block }; // TODO: struct for this? // TODO: this can be rather large. is that okay? let cache_key = ( - request_block_id.hash, + request_block.hash(), request.method.clone(), request.params.clone().map(|x| x.to_string()), ); @@ -919,7 +921,7 @@ impl Web3ProxyApp { &authorization, request, Some(&request_metadata), - Some(&request_block_id.num), + Some(&request_block.number()), ) .await?; diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index 5ddb5da7..69cba460 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -10,7 +10,8 @@ use anyhow::Context; use futures::StreamExt; -use log::{debug, info, warn}; +use log::{debug, error, info, warn}; +use num::Zero; use parking_lot::deadlock; use std::fs; use std::path::Path; @@ -78,6 +79,7 @@ fn run( let app_frontend_port = cli_config.port; let app_prometheus_port = cli_config.prometheus_port; + // start the main app let mut spawned_app = Web3ProxyApp::spawn(top_config, num_workers, shutdown_sender.subscribe()).await?; @@ -90,7 +92,6 @@ fn run( )); // if everything is working, these should both run forever - // TODO: join these instead and use shutdown handler properly. probably use tokio's ctrl+c helper tokio::select! { x = flatten_handles(spawned_app.app_handles) => { match x { @@ -139,16 +140,29 @@ fn run( warn!("shutdown sender err={:?}", err); }; - // wait on all the important background tasks (like saving stats to the database) to complete + // wait for things like saving stats to the database to complete + info!("waiting on important background tasks"); + let mut background_errors = 0; while let Some(x) = spawned_app.background_handles.next().await { match x { - Err(e) => return Err(e.into()), - Ok(Err(e)) => return Err(e), + Err(e) => { + error!("{:?}", e); + background_errors += 1; + } + Ok(Err(e)) => { + error!("{:?}", e); + background_errors += 1; + } Ok(Ok(_)) => continue, } } - info!("finished"); + if background_errors.is_zero() { + info!("finished"); + } else { + // TODO: collect instead? + error!("finished with errors!") + } Ok(()) }) diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 8e0354d2..3159dfe2 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -7,7 +7,6 @@ use crate::{ config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections, }; use anyhow::Context; -use chrono::{DateTime, NaiveDateTime, Utc}; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; use hashbrown::{HashMap, HashSet}; @@ -27,20 +26,75 @@ pub type BlockHashesCache = Cache Self { + // TODO: read this from a global config. different chains should probably have different gaps. + let allowed_lag: u64 = 60; + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("there should always be time"); + + // TODO: get this from config + // TODO: is this safe enough? what if something about the chain is actually lagged? what if its a chain like BTC with 10 minute blocks? + let oldest_allowed = now - Duration::from_secs(allowed_lag); + + let block_timestamp = Duration::from_secs(block.timestamp.as_u64()); + + // TODO: recalculate this every time? + let lag = if block_timestamp < oldest_allowed { + // this server is still syncing from too far away to serve requests + // u64 is safe because ew checked equality above + (oldest_allowed - block_timestamp).as_secs() as u64 + } else { + 0 + }; + + Self { block, lag } + } + + pub fn hash(&self) -> H256 { + self.block.hash.unwrap() + } + + // TODO: return as U64 or u64? + pub fn number(&self) -> U64 { + self.block.number.unwrap() + } + + /// When the block was received, this node was still syncing + pub fn was_syncing(&self) -> bool { + // TODO: margin should come from a global config + self.lag > 60 + } +} + +impl From for SavedBlock { + fn from(x: ArcBlock) -> Self { + SavedBlock::new(x) + } +} + +impl Display for SavedBlock { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{} ({})", self.num, self.hash) + write!(f, "{} ({})", self.number(), self.hash())?; + + if self.was_syncing() { + write!(f, " (behind by {} seconds)", self.lag)?; + } + + Ok(()) } } impl Web3Connections { - /// add a block to our map and it's hash to our graphmap of the blockchain - + /// add a block to our mappings and track the heaviest chain pub async fn save_block(&self, block: &ArcBlock, heaviest_chain: bool) -> anyhow::Result<()> { // TODO: i think we can rearrange this function to make it faster on the hot path let block_hash = block.hash.as_ref().context("no block hash")?; @@ -51,38 +105,22 @@ impl Web3Connections { return Ok(()); } - let mut blockchain = self.blockchain_graphmap.write().await; - let block_num = block.number.as_ref().context("no block num")?; // TODO: think more about heaviest_chain. would be better to do the check inside this function if heaviest_chain { // this is the only place that writes to block_numbers - // its inside a write lock on blockchain_graphmap, so i think there is no race // multiple inserts should be okay though + // TODO: info that there was a fork? self.block_numbers.insert(*block_num, *block_hash).await; } - if blockchain.contains_node(*block_hash) { - // trace!(%block_hash, %block_num, "block already saved"); - return Ok(()); - } - - // trace!(%block_hash, %block_num, "saving new block"); - - // TODO: this block is very likely already in block_hashes + // this block is very likely already in block_hashes + // TODO: use their get_with self.block_hashes - .insert(*block_hash, block.to_owned()) + .get_with(*block_hash, async move { block.clone() }) .await; - blockchain.add_node(*block_hash); - - // what should edge weight be? and should the nodes be the blocks instead? - // we store parent_hash -> hash because the block already stores the parent_hash - blockchain.add_edge(block.parent_hash, *block_hash, 0); - - // TODO: prune blockchain to only keep a configurable (256 on ETH?) number of blocks? - Ok(()) } @@ -164,7 +202,7 @@ impl Web3Connections { // be sure the requested block num exists let head_block_num = self.head_block_num().context("no servers in sync")?; - // TODO: not 64 on all chains? get from config? + // TODO: geth does 64, erigon does 90k. sometimes we run a mix let archive_needed = num < &(head_block_num - U64::from(64)); if num > &head_block_num { @@ -206,7 +244,7 @@ impl Web3Connections { // the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain self.save_block(&block, true).await?; - Ok((block, true)) + Ok((block, archive_needed)) } pub(super) async fn process_incoming_blocks( @@ -223,7 +261,10 @@ impl Web3Connections { let mut connection_heads = HashMap::new(); while let Ok((new_block, rpc)) = block_receiver.recv_async().await { + let new_block = new_block.map(Into::into); + let rpc_name = rpc.name.clone(); + if let Err(err) = self .process_block_from_rpc( authorization, @@ -252,51 +293,49 @@ impl Web3Connections { &self, authorization: &Arc, connection_heads: &mut HashMap, - rpc_head_block: Option, + rpc_head_block: Option, rpc: Arc, head_block_sender: &watch::Sender, pending_tx_sender: &Option>, ) -> anyhow::Result<()> { // add the rpc's block to connection_heads, or remove the rpc from connection_heads - let rpc_head_id = match rpc_head_block { + let rpc_head_block = match rpc_head_block { Some(rpc_head_block) => { - let rpc_head_num = rpc_head_block.number.unwrap(); - let rpc_head_hash = rpc_head_block.hash.unwrap(); + let rpc_head_num = rpc_head_block.number(); + let rpc_head_hash = rpc_head_block.hash(); // we don't know if its on the heaviest chain yet - self.save_block(&rpc_head_block, false).await?; - - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .context("no time")?; - - // TODO: get this from config - let oldest_allowed = now - Duration::from_secs(120); - - let block_timestamp = Duration::from_secs(rpc_head_block.timestamp.as_u64()); - - if block_timestamp < oldest_allowed { - let behind_secs = (oldest_allowed - block_timestamp).as_secs(); + self.save_block(&rpc_head_block.block, false).await?; + if rpc_head_block.was_syncing() { if connection_heads.remove(&rpc.name).is_some() { - warn!("{} is behind by {} seconds", &rpc.name, behind_secs); + warn!("{} is behind by {} seconds", &rpc.name, rpc_head_block.lag); }; None } else { - connection_heads.insert(rpc.name.to_owned(), rpc_head_hash); + if let Some(prev_hash) = + connection_heads.insert(rpc.name.to_owned(), rpc_head_hash) + { + if prev_hash == rpc_head_hash { + // this block was already sent by this node. return early + return Ok(()); + } + } - Some(BlockId { - hash: rpc_head_hash, - num: rpc_head_num, - }) + // TODO: should we just keep the ArcBlock here? + Some(rpc_head_block) } } None => { // TODO: warn is too verbose. this is expected if a node disconnects and has to reconnect // // trace!(%rpc, "Block without number or hash!"); - connection_heads.remove(&rpc.name); + if connection_heads.remove(&rpc.name).is_none() { + // this connection was already removed. + // return early. no need to process synced connections + return Ok(()); + } None } @@ -451,24 +490,19 @@ impl Web3Connections { .filter_map(|conn_name| self.conns.get(conn_name).cloned()) .collect(); - let consensus_head_block = maybe_head_block; - - let consensus_head_hash = consensus_head_block + let consensus_head_hash = maybe_head_block .hash .expect("head blocks always have hashes"); - let consensus_head_num = consensus_head_block + let consensus_head_num = maybe_head_block .number .expect("head blocks always have numbers"); let num_consensus_rpcs = conns.len(); - let consensus_head_block_id = BlockId { - hash: consensus_head_hash, - num: consensus_head_num, - }; + let consensus_head_block: SavedBlock = maybe_head_block.into(); let new_synced_connections = SyncedConnections { - head_block_id: Some(consensus_head_block_id.clone()), + head_block_id: Some(consensus_head_block.clone()), conns, }; @@ -484,35 +518,35 @@ impl Web3Connections { num_consensus_rpcs, num_connection_heads, total_conns, - consensus_head_block_id, + consensus_head_block, rpc ); - self.save_block(&consensus_head_block, true).await?; + self.save_block(&consensus_head_block.block, true).await?; head_block_sender - .send(consensus_head_block) + .send(consensus_head_block.block) .context("head_block_sender sending consensus_head_block")?; } - Some(old_block_id) => { + Some(old_head_block) => { // TODO: do this log item better - let rpc_head_str = rpc_head_id + let rpc_head_str = rpc_head_block .map(|x| x.to_string()) .unwrap_or_else(|| "None".to_string()); - match consensus_head_block_id.num.cmp(&old_block_id.num) { + match consensus_head_block.number().cmp(&old_head_block.number()) { Ordering::Equal => { // TODO: if rpc_block_id != consensus_head_block_id, do a different log? // multiple blocks with the same fork! - if consensus_head_block_id.hash == old_block_id.hash { + if consensus_head_block.hash() == old_head_block.hash() { // no change in hash. no need to use head_block_sender debug!( "con {}/{}/{} con_head={} rpc={} rpc_head={}", num_consensus_rpcs, num_connection_heads, total_conns, - consensus_head_block_id, + consensus_head_block, rpc, rpc_head_str ) @@ -523,17 +557,17 @@ impl Web3Connections { num_consensus_rpcs, num_connection_heads, total_conns, - consensus_head_block_id, - old_block_id, + consensus_head_block, + old_head_block, rpc_head_str, rpc ); - self.save_block(&consensus_head_block, true) + self.save_block(&consensus_head_block.block, true) .await .context("save consensus_head_block as heaviest chain")?; - head_block_sender.send(consensus_head_block).context( + head_block_sender.send(consensus_head_block.block).context( "head_block_sender sending consensus_head_block", )?; } @@ -541,15 +575,17 @@ impl Web3Connections { Ordering::Less => { // this is unlikely but possible // TODO: better log - warn!("chain rolled back {}/{}/{} con_head={} old_head={} rpc_head={} rpc={}", num_consensus_rpcs, num_connection_heads, total_conns, consensus_head_block_id, old_block_id, rpc_head_str, rpc); + warn!("chain rolled back {}/{}/{} con_head={} old_head={} rpc_head={} rpc={}", num_consensus_rpcs, num_connection_heads, total_conns, consensus_head_block, old_head_block, rpc_head_str, rpc); // TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems slike a good idea - self.save_block(&consensus_head_block, true).await.context( - "save_block sending consensus_head_block as heaviest chain", - )?; + self.save_block(&consensus_head_block.block, true) + .await + .context( + "save_block sending consensus_head_block as heaviest chain", + )?; head_block_sender - .send(consensus_head_block) + .send(consensus_head_block.block) .context("head_block_sender sending consensus_head_block")?; } Ordering::Greater => { @@ -558,14 +594,14 @@ impl Web3Connections { num_consensus_rpcs, num_connection_heads, total_conns, - consensus_head_block_id, + consensus_head_block, rpc_head_str, rpc ); - self.save_block(&consensus_head_block, true).await?; + self.save_block(&consensus_head_block.block, true).await?; - head_block_sender.send(consensus_head_block)?; + head_block_sender.send(consensus_head_block.block)?; } } } diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 96b9272d..29cb80c9 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -1,5 +1,5 @@ ///! Rate-limited communication with a web3 provider. -use super::blockchain::{ArcBlock, BlockHashesCache, BlockId}; +use super::blockchain::{ArcBlock, BlockHashesCache, SavedBlock}; use super::provider::Web3Provider; use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult}; use crate::app::{flatten_handle, AnyhowJoinHandle}; @@ -54,7 +54,7 @@ pub struct Web3Connection { /// Lower weight are higher priority when sending requests. 0 to 99. pub(super) weight: f64, /// TODO: should this be an AsyncRwLock? - pub(super) head_block_id: RwLock>, + pub(super) head_block: RwLock>, pub(super) open_request_handle_metrics: Arc, } @@ -111,7 +111,7 @@ impl Web3Connection { hard_limit, soft_limit, block_data_limit, - head_block_id: RwLock::new(Default::default()), + head_block: RwLock::new(Default::default()), weight, open_request_handle_metrics, }; @@ -119,6 +119,7 @@ impl Web3Connection { let new_connection = Arc::new(new_connection); // connect to the server (with retries) + // TODO: PROBLEM! THIS RETRIES FOREVER AND BLOCKS THE APP STARTING new_connection .retrying_reconnect(block_sender.as_ref(), false) .await?; @@ -148,11 +149,11 @@ impl Web3Connection { chain_id, found_chain_id ) - .context(format!("failed @ {}", new_connection))); + .context(format!("failed @ {}", new_connection.name))); } } Err(e) => { - let e = anyhow::Error::from(e).context(format!("failed @ {}", new_connection)); + let e = anyhow::Error::from(e).context(format!("failed @ {}", new_connection.name)); return Err(e); } } @@ -212,7 +213,7 @@ impl Web3Connection { // TODO: binary search between 90k and max? // TODO: start at 0 or 1 for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] { - let mut head_block_id = self.head_block_id.read().clone(); + let mut head_block_id = self.head_block.read().clone(); // TODO: subscribe to a channel instead of polling. subscribe to http_interval_sender? while head_block_id.is_none() { @@ -221,9 +222,9 @@ impl Web3Connection { // TODO: sleep for the block time, or maybe subscribe to a channel instead of this simple pull sleep(Duration::from_secs(13)).await; - head_block_id = self.head_block_id.read().clone(); + head_block_id = self.head_block.read().clone(); } - let head_block_num = head_block_id.expect("is_none was checked above").num; + let head_block_num = head_block_id.expect("is_none was checked above").number(); // TODO: subtract 1 from block_data_limit for safety? let maybe_archive_block = head_block_num.saturating_sub((block_data_limit).into()); @@ -274,9 +275,9 @@ impl Web3Connection { } pub fn has_block_data(&self, needed_block_num: &U64) -> bool { - let head_block_num = match self.head_block_id.read().clone() { + let head_block_num = match self.head_block.read().clone() { None => return false, - Some(x) => x.num, + Some(x) => x.number(), }; // this rpc doesn't have that block yet. still syncing @@ -374,7 +375,7 @@ impl Web3Connection { // reset sync status { - let mut head_block_id = self.head_block_id.write(); + let mut head_block_id = self.head_block.write(); *head_block_id = None; } @@ -415,75 +416,60 @@ impl Web3Connection { block_sender: &flume::Sender, block_map: BlockHashesCache, ) -> anyhow::Result<()> { - match new_head_block { + let new_head_block = match new_head_block { Ok(None) => { - // TODO: i think this should clear the local block and then update over the block sender - warn!("unsynced server {}", self); - { - let mut head_block_id = self.head_block_id.write(); + let mut head_block_id = self.head_block.write(); + + if head_block_id.is_none() { + // we previously sent a None. return early + return Ok(()); + } + warn!("{} is not synced!", self); *head_block_id = None; } - block_sender - .send_async((None, self.clone())) - .await - .context("clearing block_sender")?; + None } Ok(Some(new_head_block)) => { - // TODO: is unwrap_or_default ok? we might have an empty block - let new_hash = new_head_block.hash.unwrap_or_default(); + let new_hash = new_head_block + .hash + .context("sending block to connections")?; // if we already have this block saved, set new_head_block to that arc. otherwise store this copy let new_head_block = block_map .get_with(new_hash, async move { new_head_block }) .await; - let new_num = new_head_block.number.unwrap_or_default(); - // save the block so we don't send the same one multiple times // also save so that archive checks can know how far back to query { - let mut head_block_id = self.head_block_id.write(); + let mut head_block = self.head_block.write(); - if head_block_id.is_none() { - *head_block_id = Some(BlockId { - hash: new_hash, - num: new_num, - }); - } else { - head_block_id.as_mut().map(|x| { - x.hash = new_hash; - x.num = new_num; - x - }); - } + let _ = head_block.insert(new_head_block.clone().into()); } - // send the block off to be saved - block_sender - .send_async((Some(new_head_block), self.clone())) - .await - .context("block_sender")?; + Some(new_head_block) } Err(err) => { warn!("unable to get block from {}. err={:?}", self, err); { - let mut head_block_id = self.head_block_id.write(); + let mut head_block_id = self.head_block.write(); *head_block_id = None; } - // send an empty block to take this server out of rotation - // TODO: this is NOT working!!!! - block_sender - .send_async((None, self.clone())) - .await - .context("block_sender")?; + None } - } + }; + + // send an empty block to take this server out of rotation + block_sender + .send_async((new_head_block, self.clone())) + .await + .context("block_sender")?; Ok(()) } @@ -965,7 +951,7 @@ impl Serialize for Web3Connection { &self.frontend_requests.load(atomic::Ordering::Relaxed), )?; - let head_block_id = &*self.head_block_id.read(); + let head_block_id = &*self.head_block.read(); state.serialize_field("head_block_id", head_block_id)?; state.end() @@ -999,13 +985,20 @@ impl fmt::Display for Web3Connection { mod tests { #![allow(unused_imports)] use super::*; + use ethers::types::Block; #[test] fn test_archive_node_has_block_data() { - let head_block = BlockId { - hash: H256::random(), - num: 1_000_000.into(), + let random_block = Block { + hash: Some(H256::random()), + number: Some(1_000_000.into()), + // TODO: timestamp? + ..Default::default() }; + + let random_block = Arc::new(random_block); + + let head_block = SavedBlock::new(random_block); let block_data_limit = u64::MAX; let metrics = OpenRequestHandleMetrics::default(); @@ -1023,23 +1016,25 @@ mod tests { soft_limit: 1_000, block_data_limit: block_data_limit.into(), weight: 100.0, - head_block_id: RwLock::new(Some(head_block.clone())), + head_block: RwLock::new(Some(head_block.clone())), open_request_handle_metrics: Arc::new(metrics), }; assert!(x.has_block_data(&0.into())); assert!(x.has_block_data(&1.into())); - assert!(x.has_block_data(&head_block.num)); - assert!(!x.has_block_data(&(head_block.num + 1))); - assert!(!x.has_block_data(&(head_block.num + 1000))); + assert!(x.has_block_data(&head_block.number())); + assert!(!x.has_block_data(&(head_block.number() + 1))); + assert!(!x.has_block_data(&(head_block.number() + 1000))); } #[test] fn test_pruned_node_has_block_data() { - let head_block = BlockId { - hash: H256::random(), - num: 1_000_000.into(), - }; + let head_block: SavedBlock = Arc::new(Block { + hash: Some(H256::random()), + number: Some(1_000_000.into()), + ..Default::default() + }) + .into(); let block_data_limit = 64; @@ -1059,16 +1054,16 @@ mod tests { soft_limit: 1_000, block_data_limit: block_data_limit.into(), weight: 100.0, - head_block_id: RwLock::new(Some(head_block.clone())), + head_block: RwLock::new(Some(head_block.clone())), open_request_handle_metrics: Arc::new(metrics), }; assert!(!x.has_block_data(&0.into())); assert!(!x.has_block_data(&1.into())); - assert!(!x.has_block_data(&(head_block.num - block_data_limit - 1))); - assert!(x.has_block_data(&(head_block.num - block_data_limit))); - assert!(x.has_block_data(&head_block.num)); - assert!(!x.has_block_data(&(head_block.num + 1))); - assert!(!x.has_block_data(&(head_block.num + 1000))); + assert!(!x.has_block_data(&(head_block.number() - block_data_limit - 1))); + assert!(x.has_block_data(&(head_block.number() - block_data_limit))); + assert!(x.has_block_data(&head_block.number())); + assert!(!x.has_block_data(&(head_block.number() + 1))); + assert!(!x.has_block_data(&(head_block.number() + 1000))); } } diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index b153d189..d5492920 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -21,7 +21,6 @@ use hashbrown::HashMap; use log::{error, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; use moka::future::{Cache, ConcurrentCacheExt}; -use petgraph::graphmap::DiGraphMap; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; @@ -30,7 +29,6 @@ use std::fmt; use std::sync::atomic::Ordering; use std::sync::Arc; use thread_fast_rng::rand::seq::SliceRandom; -use tokio::sync::RwLock as AsyncRwLock; use tokio::sync::{broadcast, watch}; use tokio::task; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; @@ -48,9 +46,6 @@ pub struct Web3Connections { pub(super) block_hashes: BlockHashesCache, /// blocks on the heaviest chain pub(super) block_numbers: Cache, - /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? - /// TODO: what should we use for edges? - pub(super) blockchain_graphmap: AsyncRwLock>, pub(super) min_head_rpcs: usize, pub(super) min_sum_soft_limit: u32, } @@ -209,7 +204,6 @@ impl Web3Connections { pending_transactions, block_hashes, block_numbers, - blockchain_graphmap: Default::default(), min_sum_soft_limit, min_head_rpcs, }); @@ -846,15 +840,15 @@ impl Serialize for Web3Connections { } mod tests { - #![allow(unused_imports)] - use std::time::{SystemTime, UNIX_EPOCH}; - // TODO: why is this allow needed? does tokio::test get in the way somehow? + #![allow(unused_imports)] use super::*; - use crate::rpcs::{blockchain::BlockId, provider::Web3Provider}; + use crate::rpcs::{blockchain::SavedBlock, provider::Web3Provider}; use ethers::types::{Block, U256}; use log::{trace, LevelFilter}; use parking_lot::RwLock; + use std::time::{SystemTime, UNIX_EPOCH}; + use tokio::sync::RwLock as AsyncRwLock; #[tokio::test] async fn test_server_selection_by_height() { @@ -886,19 +880,13 @@ mod tests { ..Default::default() }; - // TODO: write a impl From for Block -> BlockId? - let lagged_block_id = BlockId { - hash: lagged_block.hash.unwrap(), - num: lagged_block.number.unwrap(), - }; - let head_block_id = BlockId { - hash: head_block.hash.unwrap(), - num: head_block.number.unwrap(), - }; - let lagged_block = Arc::new(lagged_block); let head_block = Arc::new(head_block); + // TODO: write a impl From for Block -> BlockId? + let lagged_block: SavedBlock = lagged_block.into(); + let head_block: SavedBlock = head_block.into(); + let block_data_limit = u64::MAX; let head_rpc = Web3Connection { @@ -914,7 +902,7 @@ mod tests { soft_limit: 1_000, block_data_limit: block_data_limit.into(), weight: 100.0, - head_block_id: RwLock::new(Some(head_block_id)), + head_block: RwLock::new(Some(head_block.clone())), open_request_handle_metrics: Arc::new(Default::default()), }; @@ -931,15 +919,15 @@ mod tests { soft_limit: 1_000, block_data_limit: block_data_limit.into(), weight: 100.0, - head_block_id: RwLock::new(Some(lagged_block_id)), + head_block: RwLock::new(Some(lagged_block.clone())), open_request_handle_metrics: Arc::new(Default::default()), }; - assert!(head_rpc.has_block_data(&lagged_block.number.unwrap())); - assert!(head_rpc.has_block_data(&head_block.number.unwrap())); + assert!(head_rpc.has_block_data(&lagged_block.number())); + assert!(head_rpc.has_block_data(&head_block.number())); - assert!(lagged_rpc.has_block_data(&lagged_block.number.unwrap())); - assert!(!lagged_rpc.has_block_data(&head_block.number.unwrap())); + assert!(lagged_rpc.has_block_data(&lagged_block.number())); + assert!(!lagged_rpc.has_block_data(&head_block.number())); let head_rpc = Arc::new(head_rpc); let lagged_rpc = Arc::new(lagged_rpc); @@ -961,7 +949,6 @@ mod tests { block_numbers: Cache::builder() .max_capacity(10_000) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), - blockchain_graphmap: Default::default(), min_head_rpcs: 1, min_sum_soft_limit: 1, }; @@ -1020,7 +1007,7 @@ mod tests { assert!(matches!(x, OpenRequestResult::NotSynced)); // add lagged blocks to the conns. both servers should be allowed - conns.save_block(&lagged_block, true).await.unwrap(); + conns.save_block(&lagged_block.block, true).await.unwrap(); conns .process_block_from_rpc( @@ -1048,7 +1035,7 @@ mod tests { assert_eq!(conns.num_synced_rpcs(), 2); // add head block to the conns. lagged_rpc should not be available - conns.save_block(&head_block, true).await.unwrap(); + conns.save_block(&head_block.block, true).await.unwrap(); conns .process_block_from_rpc( @@ -1109,7 +1096,7 @@ mod tests { .as_secs() .into(); - let head_block: Block = Block { + let head_block = Block { hash: Some(H256::random()), number: Some(1_000_000.into()), parent_hash: H256::random(), @@ -1117,13 +1104,7 @@ mod tests { ..Default::default() }; - // TODO: write a impl From for Block -> BlockId? - let head_block_id = BlockId { - hash: head_block.hash.unwrap(), - num: head_block.number.unwrap(), - }; - - let head_block = Arc::new(head_block); + let head_block: SavedBlock = Arc::new(head_block).into(); let pruned_rpc = Web3Connection { name: "pruned".to_string(), @@ -1138,7 +1119,7 @@ mod tests { soft_limit: 3_000, block_data_limit: 64.into(), weight: 1.0, - head_block_id: RwLock::new(Some(head_block_id.clone())), + head_block: RwLock::new(Some(head_block.clone())), open_request_handle_metrics: Arc::new(Default::default()), }; @@ -1156,12 +1137,12 @@ mod tests { block_data_limit: u64::MAX.into(), // TODO: does weight = 0 work? weight: 0.01, - head_block_id: RwLock::new(Some(head_block_id)), + head_block: RwLock::new(Some(head_block.clone())), open_request_handle_metrics: Arc::new(Default::default()), }; - assert!(pruned_rpc.has_block_data(&head_block.number.unwrap())); - assert!(archive_rpc.has_block_data(&head_block.number.unwrap())); + assert!(pruned_rpc.has_block_data(&head_block.number())); + assert!(archive_rpc.has_block_data(&head_block.number())); assert!(!pruned_rpc.has_block_data(&1.into())); assert!(archive_rpc.has_block_data(&1.into())); @@ -1185,7 +1166,6 @@ mod tests { block_numbers: Cache::builder() .max_capacity(10) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), - blockchain_graphmap: Default::default(), min_head_rpcs: 1, min_sum_soft_limit: 3_000, }; @@ -1223,7 +1203,7 @@ mod tests { // best_synced_backend_connection requires servers to be synced with the head block let best_head_server = conns - .best_synced_backend_connection(&authorization, None, &[], head_block.number.as_ref()) + .best_synced_backend_connection(&authorization, None, &[], Some(&head_block.number())) .await; assert!(matches!( diff --git a/web3_proxy/src/rpcs/synced_connections.rs b/web3_proxy/src/rpcs/synced_connections.rs index 7050dd07..7101ffdd 100644 --- a/web3_proxy/src/rpcs/synced_connections.rs +++ b/web3_proxy/src/rpcs/synced_connections.rs @@ -1,4 +1,4 @@ -use super::blockchain::BlockId; +use super::blockchain::SavedBlock; use super::connection::Web3Connection; use super::connections::Web3Connections; use ethers::prelude::{H256, U64}; @@ -11,7 +11,7 @@ use std::sync::Arc; #[derive(Clone, Default, Serialize)] pub struct SyncedConnections { // TODO: store ArcBlock instead? - pub(super) head_block_id: Option, + pub(super) head_block_id: Option, // TODO: this should be able to serialize, but it isn't #[serde(skip_serializing)] pub(super) conns: Vec>, @@ -29,7 +29,7 @@ impl fmt::Debug for SyncedConnections { } impl Web3Connections { - pub fn head_block_id(&self) -> Option { + pub fn head_block_id(&self) -> Option { self.synced_connections.load().head_block_id.clone() } @@ -38,7 +38,7 @@ impl Web3Connections { .load() .head_block_id .as_ref() - .map(|head_block_id| head_block_id.hash) + .map(|head_block_id| head_block_id.hash()) } pub fn head_block_num(&self) -> Option { @@ -46,7 +46,7 @@ impl Web3Connections { .load() .head_block_id .as_ref() - .map(|head_block_id| head_block_id.num) + .map(|head_block_id| head_block_id.number()) } pub fn synced(&self) -> bool {