diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index b36ee81a..2c0ad614 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -244,7 +244,7 @@ impl Web3Connections { pending_tx_sender: &Option>, ) -> anyhow::Result<()> { // add the rpc's block to connection_heads, or remove the rpc from connection_heads - match rpc_head_block { + let rpc_head_id = match rpc_head_block { Some(rpc_head_block) => { let rpc_head_num = rpc_head_block.number.unwrap(); let rpc_head_hash = rpc_head_block.hash.unwrap(); @@ -254,11 +254,18 @@ impl Web3Connections { debug!(%rpc, "still syncing"); connection_heads.remove(&rpc.name); + + None } else { // we don't know if its on the heaviest chain yet self.save_block(&rpc_head_block, false).await?; connection_heads.insert(rpc.name.to_owned(), rpc_head_hash); + + Some(BlockId { + hash: rpc_head_hash, + num: rpc_head_num, + }) } } None => { @@ -266,6 +273,8 @@ impl Web3Connections { trace!(%rpc, "Block without number or hash!"); connection_heads.remove(&rpc.name); + + None } }; @@ -427,22 +436,29 @@ impl Web3Connections { // TODO: if the rpc_head_block != heavy, log something somewhere in here match &old_synced_connections.head_block_id { None => { - debug!(block=%heavy_block_id, %rpc, "first consensus head {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); + debug!(block=%heavy_block_id, %rpc, "first {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); self.save_block(&heavy_block, true).await?; head_block_sender.send(heavy_block)?; } Some(old_block_id) => { + // TODO: do this log item better + let rpc_head_str = rpc_head_id + .map(|x| x.to_string()) + .unwrap_or_else(|| "None".to_string()); + match heavy_block_id.num.cmp(&old_block_id.num) { Ordering::Equal => { + // TODO: if rpc_block_id != heavy_block_id, do a different log + // multiple blocks with the same fork! if heavy_block_id.hash == old_block_id.hash { // no change in hash. no need to use head_block_sender - debug!(head=%heavy_block_id, %rpc, "con block {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns) + debug!(con_head=%heavy_block_id, rpc_head=%rpc_head_str, %rpc, "con {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns) } else { // hash changed - info!(heavy=%heavy_block_id, old=%old_block_id, %rpc, "unc block {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); + info!(con_head=%heavy_block_id, rpc_head=%rpc_head_str, old=%old_block_id, %rpc, "unc {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); // todo!("handle equal by updating the cannonical chain"); self.save_block(&heavy_block, true).await?; @@ -453,7 +469,7 @@ impl Web3Connections { Ordering::Less => { // this is unlikely but possible // TODO: better log - warn!(head=%heavy_block_id, %rpc, "chain rolled back {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); + warn!(con_head=%heavy_block_id, rpc_head=%rpc_head_str, %rpc, "chain rolled back {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); self.save_block(&heavy_block, true).await?; @@ -461,7 +477,7 @@ impl Web3Connections { head_block_sender.send(heavy_block)?; } Ordering::Greater => { - debug!(head=%heavy_block_id, %rpc, "new block {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); + debug!(con_head=%heavy_block_id, rpc_head=%rpc_head_str, %rpc, "new {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); // todo!("handle greater by adding this block to and any missing parents to the cannonical chain"); diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index dd9e62e9..2db78290 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -16,8 +16,8 @@ use std::fmt; use std::hash::{Hash, Hasher}; use std::sync::atomic::{self, AtomicU32, AtomicU64}; use std::{cmp::Ordering, sync::Arc}; +use tokio::sync::broadcast; use tokio::sync::RwLock as AsyncRwLock; -use tokio::sync::{broadcast, oneshot}; use tokio::time::{interval, sleep, sleep_until, Duration, MissedTickBehavior}; use tracing::{debug, error, info, instrument, trace, warn}; @@ -409,18 +409,27 @@ impl Web3Connection { futures.push(flatten_handle(tokio::spawn(f))); } - let mut never_shot = None; + { + // TODO: move this into a proper function + let conn = self.clone(); + // health check + let f = async move { + loop { + if let Some(provider) = conn.provider.read().await.as_ref() { + if provider.ready() { + trace!(rpc=%conn, "provider is ready"); + } else { + warn!(rpc=%conn, "provider is NOT ready"); + return Err(anyhow::anyhow!("provider is not ready")); + } + } - if futures.is_empty() { - info!(rpc=%self, "no-op subscription"); - - // TODO: is there a better way to make a channel that is never ready? - // TODO: this is wrong! we still need retries! have this do a health check on an interval instead - let (tx, rx) = oneshot::channel(); - - never_shot = Some(tx); - - let f = async move { rx.await.map_err(Into::into) }; + // TODO: how often? + // TODO: should we also check that the head block has changed recently? + // TODO: maybe instead we should do a simple subscription and follow that instead + sleep(Duration::from_secs(10)).await; + } + }; futures.push(flatten_handle(tokio::spawn(f))); } @@ -433,14 +442,13 @@ impl Web3Connection { let retry_in = Duration::from_secs(1); warn!( rpc=%self, - "subscription exited. Attempting to reconnect in {:?}. {:?}", - retry_in, - err + ?err, + ?retry_in, + "subscription exited", ); sleep(retry_in).await; // TODO: loop on reconnecting! do not return with a "?" here - // TODO: this isn't going to work. it will get in a loop with newHeads self.reconnect(block_sender.clone()).await?; } else { error!(rpc=%self, ?err, "subscription exited"); @@ -448,11 +456,9 @@ impl Web3Connection { } } } - - drop(never_shot); } - info!(rpc=%self, "subscription complete"); + info!(rpc=%self, "all subscriptions complete"); Ok(()) } @@ -557,10 +563,29 @@ impl Web3Connection { .await .map(|x| Some(Arc::new(x))); + let mut last_hash = match &block { + Ok(Some(new_block)) => new_block + .hash + .expect("blocks should always have a hash here"), + _ => H256::zero(), + }; + self.send_head_block_result(block, &block_sender, block_map.clone()) .await?; while let Some(new_block) = stream.next().await { + // TODO: check the new block's hash to be sure we don't send dupes + let new_hash = new_block + .hash + .expect("blocks should always have a hash here"); + + if new_hash == last_hash { + // some rpcs like to give us duplicates. don't waste our time on them + continue; + } else { + last_hash = new_hash; + } + self.send_head_block_result( Ok(Some(Arc::new(new_block))), &block_sender, @@ -569,7 +594,10 @@ impl Web3Connection { .await?; } - warn!(rpc=%self, "subscription ended"); + // TODO: is this always an error? + // TODO: we probably don't want a warn and to return error + warn!(rpc=%self, "new_heads subscription ended"); + return Err(anyhow::anyhow!("new_heads subscription ended")); } } } @@ -631,7 +659,10 @@ impl Web3Connection { // TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription } - warn!(rpc=%self, "subscription ended"); + // TODO: is this always an error? + // TODO: we probably don't want a warn and to return error + warn!(rpc=%self, "pending_transactions subscription ended"); + return Err(anyhow::anyhow!("pending_transactions subscription ended")); } } } diff --git a/web3_proxy/src/rpcs/provider.rs b/web3_proxy/src/rpcs/provider.rs index 5b4ba168..654d23ee 100644 --- a/web3_proxy/src/rpcs/provider.rs +++ b/web3_proxy/src/rpcs/provider.rs @@ -1,7 +1,8 @@ use std::time::Duration; use derive_more::From; -use tracing::{info_span, instrument, Instrument}; +use ethers::providers::Middleware; +use tracing::{error_span, info_span, instrument, Instrument}; /// Use HTTP and WS providers. // TODO: instead of an enum, I tried to use Box, but hit @@ -12,6 +13,14 @@ pub enum Web3Provider { } impl Web3Provider { + pub fn ready(&self) -> bool { + // TODO: i'm not sure if this is enough + match self { + Self::Http(_) => true, + Self::Ws(provider) => provider.as_ref().ready(), + } + } + #[instrument] pub async fn from_str( url_str: &str, @@ -30,6 +39,7 @@ impl Web3Provider { .interval(Duration::from_secs(13)) .into() } else if url_str.starts_with("ws") { + // TODO: i dont think this instrument does much of anything. what level should it be? let provider = ethers::providers::Ws::connect(url_str) .instrument(info_span!("Web3Provider", %url_str)) .await?;