improve fork logic again

This commit is contained in:
Bryan Stitt 2022-09-05 16:25:21 +00:00
parent 2092e74fd3
commit 068c05cf4f
6 changed files with 48 additions and 41 deletions

4
Cargo.lock generated

@ -2680,9 +2680,9 @@ dependencies = [
[[package]]
name = "moka"
version = "0.9.3"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a89c33e91526792a0260425073c3db0b472cdca2cc6fcaa666dd6e65450462a"
checksum = "dd8256cf9fa396576521e5e94affadb95818ec48f5dbedca36714387688aea29"
dependencies = [
"async-io",
"async-lock",

@ -34,7 +34,7 @@ flume = "0.10.14"
futures = { version = "0.3.24", features = ["thread-pool"] }
hashbrown = { version = "0.12.3", features = ["serde"] }
http = "0.2.8"
moka = { version = "0.9.3", default-features = false, features = ["future"] }
moka = { version = "0.9.4", default-features = false, features = ["future"] }
notify = "5.0.0"
num = "0.4.0"
parking_lot = { version = "0.12.1", features = ["arc_lock"] }

@ -39,7 +39,7 @@ use tokio::sync::{broadcast, watch};
use tokio::task::JoinHandle;
use tokio::time::{timeout, Instant};
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
use tracing::{debug, info, info_span, instrument, trace, warn, Instrument};
use tracing::{info, info_span, instrument, trace, warn, Instrument};
use uuid::Uuid;
// TODO: make this customizable?
@ -151,7 +151,6 @@ impl Web3ProxyApp {
Pin<Box<dyn Future<Output = anyhow::Result<()>>>>,
)> {
// safety checks on the config
debug!("redirect_user_url: {}", top_config.app.redirect_user_url);
assert!(
top_config.app.redirect_user_url.contains("{{user_id}}"),
"redirect user url must contain \"{{user_id}}\""

@ -313,7 +313,7 @@ impl Web3Connections {
// clone to release the read lock on self.block_hashes
if let Some(mut maybe_head_block) = highest_work_block {
// track rpcs on this heaviest chain so we can build a new SyncedConnections
let mut heavy_rpcs: Vec<&Arc<Web3Connection>> = vec![];
let mut heavy_rpcs = HashSet::<&String>::new();
// a running total of the soft limits covered by the heavy rpcs
let mut heavy_sum_soft_limit: u32 = 0;
// TODO: also track heavy_sum_hard_limit?
@ -335,7 +335,7 @@ impl Web3Connections {
}
if let Some(rpc) = self.conns.get(conn_name) {
heavy_rpcs.push(rpc);
heavy_rpcs.insert(conn_name);
heavy_sum_soft_limit += rpc.soft_limit;
} else {
warn!("connection missing")
@ -366,9 +366,27 @@ impl Web3Connections {
break;
}
}
}
// TODO: if heavy_rpcs.is_empty, try another method of finding the head block
// we've done all the searching for the heaviest block that we can
if heavy_rpcs.is_empty() {
// if we get here, something is wrong. clear synced connections
let empty_synced_connections = SyncedConnections::default();
let old_synced_connections = self
.synced_connections
.swap(Arc::new(empty_synced_connections));
// TODO: log different things depending on old_synced_connections
warn!("no consensus head!");
} else {
// success! this block has enough soft limit and nodes on it (or on later blocks)
let conns = heavy_rpcs.into_iter().cloned().collect();
let conns: Vec<Arc<Web3Connection>> = heavy_rpcs
.into_iter()
.filter_map(|conn_name| self.conns.get(conn_name).cloned())
.collect();
let heavy_block = maybe_head_block;
@ -377,6 +395,11 @@ impl Web3Connections {
debug_assert_ne!(heavy_num, U64::zero());
// TODO: add these to the log messages
let num_consensus_rpcs = conns.len();
let num_connection_heads = connection_heads.len();
let total_conns = self.conns.len();
let heavy_block_id = BlockId {
hash: heavy_hash,
num: heavy_num,
@ -391,14 +414,10 @@ impl Web3Connections {
.synced_connections
.swap(Arc::new(new_synced_connections));
// TODO: add these to the log messages
let num_connection_heads = connection_heads.len();
let total_conns = self.conns.len();
// TODO: if the rpc_head_block != heavy, log something somewhere in here
match &old_synced_connections.head_block_id {
None => {
debug!(block=%heavy_block_id, %rpc, "first consensus head");
debug!(block=%heavy_block_id, %rpc, "first consensus head {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
self.save_block(&rpc_head_block, true).await?;
@ -410,10 +429,10 @@ impl Web3Connections {
// multiple blocks with the same fork!
if heavy_block_id.hash == old_block_id.hash {
// no change in hash. no need to use head_block_sender
debug!(head=%heavy_block_id, old=%old_block_id, %rpc, "con block")
debug!(head=%heavy_block_id, %rpc, "con block {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns)
} else {
// hash changed
info!(heavy=%heavy_block_id, old=%old_block_id, %rpc, "unc block");
info!(heavy=%heavy_block_id, old=%old_block_id, %rpc, "unc block {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
// todo!("handle equal by updating the cannonical chain");
self.save_block(&rpc_head_block, true).await?;
@ -424,7 +443,7 @@ impl Web3Connections {
Ordering::Less => {
// this is unlikely but possible
// TODO: better log
warn!(head=%heavy_block_id, %rpc, "chain rolled back");
warn!(head=%heavy_block_id, %rpc, "chain rolled back {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
self.save_block(&rpc_head_block, true).await?;
@ -432,7 +451,7 @@ impl Web3Connections {
head_block_sender.send(heavy_block)?;
}
Ordering::Greater => {
debug!(head=%heavy_block_id, %rpc, "new block");
debug!(head=%heavy_block_id, %rpc, "new block {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
// todo!("handle greater by adding this block to and any missing parents to the cannonical chain");
@ -443,18 +462,7 @@ impl Web3Connections {
}
}
}
return Ok(());
}
// if we get here, something is wrong. clear synced connections
let empty_synced_connections = SyncedConnections::default();
let old_synced_connections = self
.synced_connections
.swap(Arc::new(empty_synced_connections));
// TODO: log different things depending on old_synced_connections
}
Ok(())

@ -157,7 +157,7 @@ impl Web3Connection {
Ok((new_connection, handle))
}
#[instrument]
#[instrument(skip_all)]
async fn check_block_data_limit(self: &Arc<Self>) -> anyhow::Result<Option<u64>> {
let mut limit = None;
@ -166,7 +166,7 @@ impl Web3Connection {
// TODO: wait until head block is set outside the loop? if we disconnect while starting we could actually get 0 though
while head_block_num == U64::zero() {
warn!("no head block");
warn!(rpc=%self, "no head block yet. retrying");
// TODO: subscribe to a channel instead of polling? subscribe to http_interval_sender?
sleep(Duration::from_secs(1)).await;
@ -191,7 +191,7 @@ impl Web3Connection {
)
.await;
trace!(?archive_result, %self);
trace!(?archive_result, rpc=%self);
if archive_result.is_ok() {
limit = Some(block_data_limit);
@ -298,7 +298,7 @@ impl Web3Connection {
// self got the head block first. unfortunately its missing a necessary field
// keep this even after https://github.com/ledgerwatch/erigon/issues/5190 is closed.
// there are other clients and we might have to use a third party without the td fix.
trace!(rpc=?self, ?new_hash, "total_difficulty missing");
trace!(rpc=%self, ?new_hash, "total_difficulty missing");
// todo: this can wait forever!
let complete_head_block: Block<TxHash> = self
.wait_for_request_handle()
@ -382,7 +382,7 @@ impl Web3Connection {
if futures.is_empty() {
// TODO: is there a better way to make a channel that is never ready?
info!(?self, "no-op subscription");
info!(rpc=%self, "no-op subscription");
return Ok(());
}
@ -393,7 +393,7 @@ impl Web3Connection {
// TODO: exponential backoff
let retry_in = Duration::from_secs(1);
warn!(
?self,
rpc=%self,
"subscription exited. Attempting to reconnect in {:?}. {:?}",
retry_in,
err
@ -404,7 +404,7 @@ impl Web3Connection {
// TODO: this isn't going to work. it will get in a loop with newHeads
self.reconnect(block_sender.clone()).await?;
} else {
error!(?self, ?err, "subscription exited");
error!(rpc=%self, ?err, "subscription exited");
return Err(err);
}
}
@ -490,12 +490,12 @@ impl Web3Connection {
broadcast::error::RecvError::Lagged(lagged) => {
// querying the block was delayed
// this can happen if tokio is very busy or waiting for requests limits took too long
warn!(?err, ?self, "http interval lagging by {}!", lagged);
warn!(?err, rpc=%self, "http interval lagging by {}!", lagged);
}
}
}
trace!(?self, "ok http interval");
trace!(rpc=%self, "ok http interval");
}
}
Web3Provider::Ws(provider) => {
@ -526,7 +526,7 @@ impl Web3Connection {
.await?;
}
warn!(?self, "subscription ended");
warn!(rpc=%self, "subscription ended");
}
}
}
@ -588,7 +588,7 @@ impl Web3Connection {
// TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription
}
warn!(?self, "subscription ended");
warn!(rpc=%self, "subscription ended");
}
}
}
@ -644,7 +644,7 @@ impl Web3Connection {
// save the smallest retry_after. if nothing succeeds, return an Err with retry_after in it
// TODO: use tracing better
// TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0?
warn!(?retry_at, ?self, "Exhausted rate limit");
warn!(?retry_at, rpc=%self, "Exhausted rate limit");
return Ok(OpenRequestResult::RetryAt(retry_at.into()));
}

@ -58,7 +58,7 @@ impl OpenRequestHandle {
while provider.is_none() {
match self.0.provider.read().await.as_ref() {
None => {
warn!(rpc=?self.0, "no provider!");
warn!(rpc=%self.0, "no provider!");
// TODO: how should this work? a reconnect should be in progress. but maybe force one now?
// TODO: sleep how long? subscribe to something instead?
sleep(Duration::from_millis(100)).await