handle a missing block
This commit is contained in:
parent
ac6296c5ac
commit
010669cf81
@ -17,7 +17,7 @@ use serde::Serialize;
|
|||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::{cmp::Ordering, fmt::Display, sync::Arc};
|
use std::{cmp::Ordering, fmt::Display, sync::Arc};
|
||||||
use tokio::sync::{broadcast, watch};
|
use tokio::sync::{broadcast, watch};
|
||||||
use tracing::{debug, trace, warn};
|
use tracing::{debug, info, trace, warn};
|
||||||
|
|
||||||
pub type ArcBlock = Arc<Block<TxHash>>;
|
pub type ArcBlock = Arc<Block<TxHash>>;
|
||||||
|
|
||||||
@ -274,7 +274,8 @@ impl Web3Connections {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
warn!(%rpc, ?rpc_head_block, "Block without number or hash!");
|
// TODO: warn is too verbose. this is expected if a node disconnects and has to reconnect
|
||||||
|
info!(%rpc, "Block without number or hash!");
|
||||||
|
|
||||||
connection_heads.remove(&rpc.name);
|
connection_heads.remove(&rpc.name);
|
||||||
|
|
||||||
@ -294,7 +295,13 @@ impl Web3Connections {
|
|||||||
// don't check the same hash multiple times
|
// don't check the same hash multiple times
|
||||||
checked_heads.insert(rpc_head_hash);
|
checked_heads.insert(rpc_head_hash);
|
||||||
|
|
||||||
let rpc_head_block = self.block_hashes.get(rpc_head_hash).unwrap();
|
let rpc_head_block = if let Some(x) = self.block_hashes.get(rpc_head_hash) {
|
||||||
|
x
|
||||||
|
} else {
|
||||||
|
// TODO: why does this happen?
|
||||||
|
warn!(%rpc_head_hash, %rpc, "No block found");
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
match &rpc_head_block.total_difficulty {
|
match &rpc_head_block.total_difficulty {
|
||||||
None => {
|
None => {
|
||||||
|
@ -363,6 +363,7 @@ impl Web3Connection {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip_all)]
|
||||||
async fn subscribe(
|
async fn subscribe(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
|
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
|
||||||
@ -429,7 +430,6 @@ impl Web3Connection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Subscribe to new blocks. If `reconnect` is true, this runs forever.
|
/// Subscribe to new blocks. If `reconnect` is true, this runs forever.
|
||||||
/// TODO: instrument with the url
|
|
||||||
#[instrument(skip_all)]
|
#[instrument(skip_all)]
|
||||||
async fn subscribe_new_heads(
|
async fn subscribe_new_heads(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
@ -437,7 +437,7 @@ impl Web3Connection {
|
|||||||
block_sender: flume::Sender<BlockAndRpc>,
|
block_sender: flume::Sender<BlockAndRpc>,
|
||||||
block_map: BlockHashesMap,
|
block_map: BlockHashesMap,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
info!(?self, "watching new_heads");
|
info!(?self, "watching new heads");
|
||||||
|
|
||||||
// TODO: is a RwLock of an Option<Arc> the right thing here?
|
// TODO: is a RwLock of an Option<Arc> the right thing here?
|
||||||
if let Some(provider) = self.provider.read().await.clone() {
|
if let Some(provider) = self.provider.read().await.clone() {
|
||||||
@ -554,8 +554,7 @@ impl Web3Connection {
|
|||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
|
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
// TODO: move this data into a span?
|
info!(?self, "watching pending transactions");
|
||||||
info!("watching {}", self);
|
|
||||||
|
|
||||||
// TODO: is a RwLock of an Option<Arc> the right thing here?
|
// TODO: is a RwLock of an Option<Arc> the right thing here?
|
||||||
if let Some(provider) = self.provider.read().await.clone() {
|
if let Some(provider) = self.provider.read().await.clone() {
|
||||||
|
Loading…
Reference in New Issue
Block a user