From 0041709f3e74fa528a68dd77a185ad9e724c9de8 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sun, 15 May 2022 06:27:13 +0000 Subject: [PATCH] check block hash and store block number fewer places --- TODO.md | 2 + web3-proxy/src/connection.rs | 83 +++++++---------- web3-proxy/src/connections.rs | 167 +++++++++++++--------------------- 3 files changed, 101 insertions(+), 151 deletions(-) diff --git a/TODO.md b/TODO.md index 90f37552..0d1a0a99 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,7 @@ # Todo +- [ ] eth_sendRawTransaction should return the most common result, not the first +- [ ] if chain split detected, don't send transactions - [ ] endpoint for health checks. if no synced servers, give a 502 error - [ ] some production configs are occassionally stuck waiting at 100% cpu - looks like its getting stuck on `futex(0x7fc15067b478, FUTEX_WAIT_PRIVATE, 1, NULL` diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index dbbc21ad..b7346ce9 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -1,6 +1,6 @@ ///! Rate-limited communication with a web3 provider use derive_more::From; -use ethers::prelude::Middleware; +use ethers::prelude::{Block, Middleware, ProviderError, TxHash, H256}; use futures::StreamExt; use governor::clock::{Clock, QuantaClock, QuantaInstant}; use governor::middleware::NoOpMiddleware; @@ -9,14 +9,12 @@ use governor::NotUntil; use governor::RateLimiter; use std::fmt; use std::num::NonZeroU32; -use std::sync::atomic::{self, AtomicU32, AtomicU64}; +use std::sync::atomic::{self, AtomicU32}; use std::time::Duration; use std::{cmp::Ordering, sync::Arc}; use tokio::time::{interval, sleep, MissedTickBehavior}; use tracing::{info, trace, warn}; -use crate::connections::Web3Connections; - type Web3RateLimiter = RateLimiter>; @@ -44,7 +42,6 @@ pub struct Web3Connection { ratelimiter: Option, /// used for load balancing to the least loaded server soft_limit: u32, - head_block_number: AtomicU64, /// the same clock that is used by the rate limiter clock: QuantaClock, } @@ -117,7 +114,6 @@ impl Web3Connection { provider, ratelimiter: hard_rate_limiter, soft_limit, - head_block_number: 0.into(), }; let connection = Arc::new(connection); @@ -158,11 +154,6 @@ impl Web3Connection { self.active_requests.load(atomic::Ordering::Acquire) } - #[inline] - pub fn head_block_number(&self) -> u64 { - self.head_block_number.load(atomic::Ordering::Acquire) - } - #[inline] pub fn soft_limit(&self) -> u32 { self.soft_limit @@ -173,11 +164,32 @@ impl Web3Connection { &self.url } + fn send_block( + self: &Arc, + block: Result, ProviderError>, + block_sender: &flume::Sender<(u64, H256, Arc)>, + ) { + match block { + Ok(block) => { + let block_number = block.number.unwrap().as_u64(); + let block_hash = block.hash.unwrap(); + + // TODO: i'm pretty sure we don't need send_async, but double check + block_sender + .send((block_number, block_hash, self.clone())) + .unwrap(); + } + Err(e) => { + warn!("unable to get block from {}: {}", self, e); + } + } + } + /// Subscribe to new blocks // #[instrument] - pub async fn new_heads( + pub async fn subscribe_new_heads( self: Arc, - connections: Option>, + block_sender: flume::Sender<(u64, H256, Arc)>, ) -> anyhow::Result<()> { info!("Watching new_heads on {}", self); @@ -196,23 +208,14 @@ impl Web3Connection { let active_request_handle = self.wait_for_request_handle().await; - let block_number = provider.get_block_number().await.map(|x| x.as_u64())?; + // TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest" + let block: Result, _> = provider + .request("eth_getBlockByNumber", ("latest", false)) + .await; drop(active_request_handle); - // TODO: only store if this isn't already stored? - // TODO: also send something to the provider_tier so it can sort? - let old_block_number = self - .head_block_number - .swap(block_number, atomic::Ordering::AcqRel); - - if old_block_number != block_number { - if let Some(connections) = &connections { - connections.update_synced_rpcs(&self).await?; - } else { - info!("new block on {}: {}", self, block_number); - } - } + self.send_block(block, &block_sender); } } Web3Provider::Ws(provider) => { @@ -231,32 +234,16 @@ impl Web3Connection { // there is a very small race condition here where the stream could send us a new block right now // all it does is print "new block" for the same block as current block // TODO: rate limit! - let block_number = provider.get_block_number().await.map(|x| x.as_u64())?; + let block: Result, _> = provider + .request("eth_getBlockByNumber", ("latest", false)) + .await; drop(active_request_handle); - // TODO: swap and check the result? - self.head_block_number - .store(block_number, atomic::Ordering::Release); - - if let Some(connections) = &connections { - connections.update_synced_rpcs(&self).await?; - } else { - info!("new head block {} from {}", block_number, self); - } + self.send_block(block, &block_sender); while let Some(new_block) = stream.next().await { - let new_block_number = new_block.number.unwrap().as_u64(); - - // TODO: only store if this isn't already stored? - // TODO: also send something to the provider_tier so it can sort? - // TODO: do we need this old block number check? its helpful on http, but here it shouldn't dupe except maybe on the first run - self.head_block_number - .fetch_max(new_block_number, atomic::Ordering::AcqRel); - - if let Some(connections) = &connections { - connections.update_synced_rpcs(&self).await?; - } + self.send_block(Ok(new_block), &block_sender); } } } diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 9601cc70..e7fd1c87 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -1,5 +1,6 @@ ///! Load balanced communication with a group of web3 providers use derive_more::From; +use ethers::prelude::H256; use futures::stream::FuturesUnordered; use futures::StreamExt; use governor::clock::{QuantaClock, QuantaInstant}; @@ -11,7 +12,7 @@ use std::fmt; use std::sync::atomic::{self, AtomicU64}; use std::sync::Arc; use tokio::sync::RwLock; -use tracing::{debug, info, instrument, trace, warn}; +use tracing::{debug, info, trace, warn}; use crate::config::Web3ConnectionConfig; use crate::connection::{ActiveRequestHandle, Web3Connection}; @@ -19,7 +20,8 @@ use crate::connection::{ActiveRequestHandle, Web3Connection}; #[derive(Clone, Default)] struct SyncedConnections { head_block_number: u64, - inner: Vec, + head_block_hash: H256, + inner: Vec>, } impl fmt::Debug for SyncedConnections { @@ -35,6 +37,7 @@ impl SyncedConnections { Self { head_block_number: 0, + head_block_hash: Default::default(), inner, } } @@ -91,16 +94,23 @@ impl Web3Connections { }); if subscribe_heads { + let (block_sender, block_receiver) = flume::unbounded(); + + { + let connections = Arc::clone(&connections); + tokio::spawn(async move { connections.update_synced_rpcs(block_receiver).await }); + } + for connection in connections.inner.iter() { // subscribe to new heads in a spawned future // TODO: channel instead. then we can have one future with write access to a left-right? let connection = Arc::clone(connection); - let connections = connections.clone(); + let block_sender = block_sender.clone(); tokio::spawn(async move { let url = connection.url().to_string(); // TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date - if let Err(e) = connection.new_heads(Some(connections)).await { + if let Err(e) = connection.subscribe_new_heads(block_sender).await { warn!("new_heads error on {}: {:?}", url, e); } }); @@ -175,106 +185,56 @@ impl Web3Connections { } /// TODO: possible dead lock here. investigate more. probably refactor - #[instrument] - pub async fn update_synced_rpcs(&self, rpc: &Arc) -> anyhow::Result<()> { - info!("Locking synced_connections"); - let mut synced_connections = self.synced_connections.write().await; - info!("Locked synced_connections"); + // #[instrument] + pub async fn update_synced_rpcs( + &self, + block_receiver: flume::Receiver<(u64, H256, Arc)>, + ) -> anyhow::Result<()> { + while let Ok((new_block_num, new_block_hash, rpc)) = block_receiver.recv_async().await { + if new_block_num == 0 { + warn!("{} is still syncing", rpc); + continue; + } - let new_block = rpc.head_block_number(); + // TODO: experiment with different locks and such here + let mut synced_connections = self.synced_connections.write().await; - if new_block == 0 { - warn!("{} is still syncing", rpc); - return Ok(()); - } + // TODO: double check this logic + match new_block_num.cmp(&synced_connections.head_block_number) { + cmp::Ordering::Greater => { + // the rpc's newest block is the new overall best block + info!("new head block {} from {}", new_block_num, rpc); - let current_best_block_number = synced_connections.head_block_number; + synced_connections.inner.clear(); + synced_connections.inner.push(rpc); - let overall_best_head_block = self.head_block_number(); - - // TODO: double check this logic - match ( - new_block.cmp(&overall_best_head_block), - new_block.cmp(¤t_best_block_number), - ) { - (cmp::Ordering::Greater, cmp::Ordering::Greater) => { - // the rpc's newest block is the new overall best block - synced_connections.inner.clear(); - - synced_connections.head_block_number = new_block; - - // TODO: what ordering? - match self.best_head_block_number.compare_exchange( - overall_best_head_block, - new_block, - atomic::Ordering::AcqRel, - atomic::Ordering::Acquire, - ) { - Ok(_) => { - info!("new head block {} from {}", new_block, rpc); - } - Err(current_best_block_number) => { - // actually, there was a race and this ended up not being the latest block. return now without adding this rpc to the synced list - debug!( - "behind {} on {:?}: {}", - current_best_block_number, rpc, new_block + synced_connections.head_block_number = new_block_num; + synced_connections.head_block_hash = new_block_hash; + } + cmp::Ordering::Equal => { + if new_block_hash != synced_connections.head_block_hash { + // same height, but different chain + warn!( + "chain is forked! {} has {}. First #{} was {}", + rpc, new_block_hash, new_block_num, synced_connections.head_block_hash ); - return Ok(()); + continue; } + + // do not clear synced_connections. + // we just want to add this rpc to the end + synced_connections.inner.push(rpc); + } + cmp::Ordering::Less => { + // this isn't the best block in the tier. don't do anything + continue; } } - (cmp::Ordering::Equal, cmp::Ordering::Less) => { - // no need to do anything - return Ok(()); - } - (cmp::Ordering::Greater, cmp::Ordering::Less) => { - // this isn't the best block in the tier. don't do anything - return Ok(()); - } - (cmp::Ordering::Equal, cmp::Ordering::Equal) => { - // this rpc tier is synced, and it isn't the first to this block - } - (cmp::Ordering::Less, cmp::Ordering::Less) => { - // this rpc is behind the best and the tier. don't do anything - return Ok(()); - } - (cmp::Ordering::Less, cmp::Ordering::Equal) => { - // this rpc is behind the best. but is an improvement for the tier - return Ok(()); - } - (cmp::Ordering::Less, cmp::Ordering::Greater) => { - // this rpc is behind the best, but it is catching up - synced_connections.inner.clear(); - synced_connections.head_block_number = new_block; - - // return now because this isn't actually synced - return Ok(()); - } - (cmp::Ordering::Equal, cmp::Ordering::Greater) => { - // we caught up to another tier - synced_connections.inner.clear(); - - synced_connections.head_block_number = new_block; - } - (cmp::Ordering::Greater, cmp::Ordering::Equal) => { - // TODO: what should we do? i think we got here because we aren't using atomics properly - // the overall block hasn't yet updated, but our internal block has - // TODO: maybe we should - } + // TODO: better log + trace!("Now synced: {:?}", synced_connections.inner); } - let rpc_index = self - .inner - .iter() - .position(|x| x.url() == rpc.url()) - .unwrap(); - - // TODO: hopefully nothing ends up in here twice. Greater+Equal might do that to us - synced_connections.inner.push(rpc_index); - - info!("Now synced {:?}: {:?}", self, synced_connections.inner); - Ok(()) } @@ -291,13 +251,12 @@ impl Web3Connections { // let a = a as f32 / self.soft_limit as f32; // let b = b as f32 / other.soft_limit as f32; - let sort_cache: HashMap = synced_rpc_indexes + // TODO: better key! + let sort_cache: HashMap = synced_rpc_indexes .iter() - .map(|synced_index| { - let key = *synced_index; - - let connection = self.inner.get(*synced_index).unwrap(); - + .map(|connection| { + // TODO: better key! + let key = format!("{}", connection); let active_requests = connection.active_requests(); let soft_limit = connection.soft_limit(); @@ -309,8 +268,12 @@ impl Web3Connections { // TODO: i think we might need to load active connections and then synced_rpc_indexes.sort_unstable_by(|a, b| { - let (a_utilization, a_soft_limit) = sort_cache.get(a).unwrap(); - let (b_utilization, b_soft_limit) = sort_cache.get(b).unwrap(); + // TODO: better keys + let a_key = format!("{}", a); + let b_key = format!("{}", b); + + let (a_utilization, a_soft_limit) = sort_cache.get(&a_key).unwrap(); + let (b_utilization, b_soft_limit) = sort_cache.get(&b_key).unwrap(); // TODO: i'm comparing floats. crap match a_utilization @@ -323,8 +286,6 @@ impl Web3Connections { }); for selected_rpc in synced_rpc_indexes.into_iter() { - let selected_rpc = self.inner.get(selected_rpc).unwrap(); - // increment our connection counter match selected_rpc.try_request_handle() { Err(not_until) => {