more logs and beginning of a health check

This commit is contained in:
Bryan Stitt 2022-09-06 16:49:07 +00:00
parent 83a7b03dea
commit 041dd5f317
3 changed files with 85 additions and 28 deletions

@ -244,7 +244,7 @@ impl Web3Connections {
pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
// add the rpc's block to connection_heads, or remove the rpc from connection_heads
match rpc_head_block {
let rpc_head_id = match rpc_head_block {
Some(rpc_head_block) => {
let rpc_head_num = rpc_head_block.number.unwrap();
let rpc_head_hash = rpc_head_block.hash.unwrap();
@ -254,11 +254,18 @@ impl Web3Connections {
debug!(%rpc, "still syncing");
connection_heads.remove(&rpc.name);
None
} else {
// we don't know if its on the heaviest chain yet
self.save_block(&rpc_head_block, false).await?;
connection_heads.insert(rpc.name.to_owned(), rpc_head_hash);
Some(BlockId {
hash: rpc_head_hash,
num: rpc_head_num,
})
}
}
None => {
@ -266,6 +273,8 @@ impl Web3Connections {
trace!(%rpc, "Block without number or hash!");
connection_heads.remove(&rpc.name);
None
}
};
@ -427,22 +436,29 @@ impl Web3Connections {
// TODO: if the rpc_head_block != heavy, log something somewhere in here
match &old_synced_connections.head_block_id {
None => {
debug!(block=%heavy_block_id, %rpc, "first consensus head {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
debug!(block=%heavy_block_id, %rpc, "first {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
self.save_block(&heavy_block, true).await?;
head_block_sender.send(heavy_block)?;
}
Some(old_block_id) => {
// TODO: do this log item better
let rpc_head_str = rpc_head_id
.map(|x| x.to_string())
.unwrap_or_else(|| "None".to_string());
match heavy_block_id.num.cmp(&old_block_id.num) {
Ordering::Equal => {
// TODO: if rpc_block_id != heavy_block_id, do a different log
// multiple blocks with the same fork!
if heavy_block_id.hash == old_block_id.hash {
// no change in hash. no need to use head_block_sender
debug!(head=%heavy_block_id, %rpc, "con block {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns)
debug!(con_head=%heavy_block_id, rpc_head=%rpc_head_str, %rpc, "con {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns)
} else {
// hash changed
info!(heavy=%heavy_block_id, old=%old_block_id, %rpc, "unc block {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
info!(con_head=%heavy_block_id, rpc_head=%rpc_head_str, old=%old_block_id, %rpc, "unc {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
// todo!("handle equal by updating the cannonical chain");
self.save_block(&heavy_block, true).await?;
@ -453,7 +469,7 @@ impl Web3Connections {
Ordering::Less => {
// this is unlikely but possible
// TODO: better log
warn!(head=%heavy_block_id, %rpc, "chain rolled back {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
warn!(con_head=%heavy_block_id, rpc_head=%rpc_head_str, %rpc, "chain rolled back {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
self.save_block(&heavy_block, true).await?;
@ -461,7 +477,7 @@ impl Web3Connections {
head_block_sender.send(heavy_block)?;
}
Ordering::Greater => {
debug!(head=%heavy_block_id, %rpc, "new block {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
debug!(con_head=%heavy_block_id, rpc_head=%rpc_head_str, %rpc, "new {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
// todo!("handle greater by adding this block to and any missing parents to the cannonical chain");

@ -16,8 +16,8 @@ use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicU32, AtomicU64};
use std::{cmp::Ordering, sync::Arc};
use tokio::sync::broadcast;
use tokio::sync::RwLock as AsyncRwLock;
use tokio::sync::{broadcast, oneshot};
use tokio::time::{interval, sleep, sleep_until, Duration, MissedTickBehavior};
use tracing::{debug, error, info, instrument, trace, warn};
@ -409,18 +409,27 @@ impl Web3Connection {
futures.push(flatten_handle(tokio::spawn(f)));
}
let mut never_shot = None;
{
// TODO: move this into a proper function
let conn = self.clone();
// health check
let f = async move {
loop {
if let Some(provider) = conn.provider.read().await.as_ref() {
if provider.ready() {
trace!(rpc=%conn, "provider is ready");
} else {
warn!(rpc=%conn, "provider is NOT ready");
return Err(anyhow::anyhow!("provider is not ready"));
}
}
if futures.is_empty() {
info!(rpc=%self, "no-op subscription");
// TODO: is there a better way to make a channel that is never ready?
// TODO: this is wrong! we still need retries! have this do a health check on an interval instead
let (tx, rx) = oneshot::channel();
never_shot = Some(tx);
let f = async move { rx.await.map_err(Into::into) };
// TODO: how often?
// TODO: should we also check that the head block has changed recently?
// TODO: maybe instead we should do a simple subscription and follow that instead
sleep(Duration::from_secs(10)).await;
}
};
futures.push(flatten_handle(tokio::spawn(f)));
}
@ -433,14 +442,13 @@ impl Web3Connection {
let retry_in = Duration::from_secs(1);
warn!(
rpc=%self,
"subscription exited. Attempting to reconnect in {:?}. {:?}",
retry_in,
err
?err,
?retry_in,
"subscription exited",
);
sleep(retry_in).await;
// TODO: loop on reconnecting! do not return with a "?" here
// TODO: this isn't going to work. it will get in a loop with newHeads
self.reconnect(block_sender.clone()).await?;
} else {
error!(rpc=%self, ?err, "subscription exited");
@ -448,11 +456,9 @@ impl Web3Connection {
}
}
}
drop(never_shot);
}
info!(rpc=%self, "subscription complete");
info!(rpc=%self, "all subscriptions complete");
Ok(())
}
@ -557,10 +563,29 @@ impl Web3Connection {
.await
.map(|x| Some(Arc::new(x)));
let mut last_hash = match &block {
Ok(Some(new_block)) => new_block
.hash
.expect("blocks should always have a hash here"),
_ => H256::zero(),
};
self.send_head_block_result(block, &block_sender, block_map.clone())
.await?;
while let Some(new_block) = stream.next().await {
// TODO: check the new block's hash to be sure we don't send dupes
let new_hash = new_block
.hash
.expect("blocks should always have a hash here");
if new_hash == last_hash {
// some rpcs like to give us duplicates. don't waste our time on them
continue;
} else {
last_hash = new_hash;
}
self.send_head_block_result(
Ok(Some(Arc::new(new_block))),
&block_sender,
@ -569,7 +594,10 @@ impl Web3Connection {
.await?;
}
warn!(rpc=%self, "subscription ended");
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
warn!(rpc=%self, "new_heads subscription ended");
return Err(anyhow::anyhow!("new_heads subscription ended"));
}
}
}
@ -631,7 +659,10 @@ impl Web3Connection {
// TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription
}
warn!(rpc=%self, "subscription ended");
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
warn!(rpc=%self, "pending_transactions subscription ended");
return Err(anyhow::anyhow!("pending_transactions subscription ended"));
}
}
}

@ -1,7 +1,8 @@
use std::time::Duration;
use derive_more::From;
use tracing::{info_span, instrument, Instrument};
use ethers::providers::Middleware;
use tracing::{error_span, info_span, instrument, Instrument};
/// Use HTTP and WS providers.
// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit <https://github.com/gakonst/ethers-rs/issues/592>
@ -12,6 +13,14 @@ pub enum Web3Provider {
}
impl Web3Provider {
pub fn ready(&self) -> bool {
// TODO: i'm not sure if this is enough
match self {
Self::Http(_) => true,
Self::Ws(provider) => provider.as_ref().ready(),
}
}
#[instrument]
pub async fn from_str(
url_str: &str,
@ -30,6 +39,7 @@ impl Web3Provider {
.interval(Duration::from_secs(13))
.into()
} else if url_str.starts_with("ws") {
// TODO: i dont think this instrument does much of anything. what level should it be?
let provider = ethers::providers::Ws::connect(url_str)
.instrument(info_span!("Web3Provider", %url_str))
.await?;