no longer need to use total difficulty on ETH 2.0

This commit is contained in:
Bryan Stitt 2022-09-14 19:39:08 +00:00
parent 31227d8c0c
commit 7a1433e3c9
4 changed files with 73 additions and 106 deletions

@ -144,8 +144,9 @@ These are roughly in order of completition
- [x] it looks like our reconnect logic is not always firing. we need to make reconnect more robust!
- i am pretty sure that this is actually servers that fail to connect on initial setup (maybe the rpcs that are on the wrong chain are just timing out and they aren't set to reconnect?)
- [x] chain rolled back 1/1/1 con_head=15510065 (0xa4a3…d2d8) rpc_head=15510065 (0xa4a3…d2d8) rpc=local_erigon_archive
- include the old head number and block in the log
- [ ] rewrite rate limiting to have a tiered cache. do not put redis in the hot path
- include the old head number and block in the log
- [x] once the merge happens, we don't want to use total difficulty and instead just care about the number
- [-] rewrite rate limiting to have a tiered cache. do not put redis in the hot path
- instead, we should check a local cache for the current rate limit (+1) and spawn an update to the local cache from redis in the background.
- [x] when there are a LOT of concurrent requests, we see errors. i thought that was a problem with redis cell, but it happens with my simpler rate limit. now i think the problem is actually with bb8
- https://docs.rs/redis/latest/redis/aio/struct.ConnectionManager.html or https://crates.io/crates/deadpool-redis?
@ -202,6 +203,9 @@ These are not yet ordered.
- [ ] handle log subscriptions
- probably as a paid feature
- [ ] exponential backoff when reconnecting a connection
- [ ] on ETH, we no longer use total difficulty, but other chains might
- if total difficulty is not on the block and we aren't on ETH, fetch the full block instead of just the header
- if total difficulty is set and non-zero, use it for consensus instead of just the number
new endpoints for users (not totally sure about the exact paths, but these features are all needed):
- [x] GET /u/:api_key

@ -8,4 +8,4 @@ edition = "2021"
anyhow = "1.0.65"
deadpool-redis = { version = "0.10.2", features = ["rt_tokio_1", "serde"] }
tracing = "0.1.36"
tokio = {version = "*" }
tokio = "1.21.1"

@ -48,11 +48,6 @@ impl Web3Connections {
let block_num = block.number.as_ref().context("no block num")?;
let _block_td = block
.total_difficulty
.as_ref()
.expect("no block total difficulty. this is a bug!");
let mut blockchain = self.blockchain_graphmap.write().await;
// TODO: think more about heaviest_chain
@ -275,7 +270,7 @@ impl Web3Connections {
// iterate the known heads to find the highest_work_block
let mut checked_heads = HashSet::new();
let mut highest_work_block: Option<ArcBlock> = None;
let mut highest_num_block: Option<ArcBlock> = None;
for (conn_name, connection_head_hash) in connection_heads.iter() {
if checked_heads.contains(connection_head_hash) {
// we already checked this head from another rpc
@ -287,7 +282,7 @@ impl Web3Connections {
let conn_head_block = if let Some(x) = self.block_hashes.get(connection_head_hash) {
x
} else {
// TODO: why does this happen?!?! seems to only happen with uncled blocks. maybe a check on total difficulty skipped it?
// TODO: why does this happen?!?! seems to only happen with uncled blocks
// TODO: maybe we should do get_with?
// TODO: maybe we should just continue. this only seems to happen when an older block is received
warn!(%connection_head_hash, %conn_name, %rpc, "Missing connection_head_block in block_hashes. Fetching now");
@ -298,41 +293,42 @@ impl Web3Connections {
match self.block(connection_head_hash, conn_rpc).await {
Ok(block) => block,
Err(err) => {
warn!(%connection_head_hash, %conn_name, %rpc, ?err, "Failed fetching connection_head_block for block_hashe");
warn!(%connection_head_hash, %conn_name, %rpc, ?err, "Failed fetching connection_head_block for block_hashes");
continue;
}
}
};
match &conn_head_block.total_difficulty {
match &conn_head_block.number {
None => {
panic!("block is missing total difficulty. this is a bug");
panic!("block is missing number. this is a bug");
}
Some(td) => {
Some(conn_head_num) => {
// if this is the first block we've tried
// or if this rpc's newest block has a higher total difficulty
if highest_work_block.is_none()
|| td
> highest_work_block
// or if this rpc's newest block has a higher number
// we used to check total difficulty, but that isn't a thing anymore
if highest_num_block.is_none()
|| conn_head_num
> highest_num_block
.as_ref()
.expect("there should always be a block here")
.total_difficulty
.number
.as_ref()
.expect("there should always be total difficulty here")
.expect("there should always be number here")
{
highest_work_block = Some(conn_head_block);
highest_num_block = Some(conn_head_block);
}
}
}
}
// clone to release the read lock on self.block_hashes
if let Some(mut maybe_head_block) = highest_work_block {
if let Some(mut maybe_head_block) = highest_num_block {
// track rpcs on this heaviest chain so we can build a new SyncedConnections
let mut heavy_rpcs = HashSet::<&String>::new();
// a running total of the soft limits covered by the heavy rpcs
let mut heavy_sum_soft_limit: u32 = 0;
// TODO: also track heavy_sum_hard_limit?
let mut highest_rpcs = HashSet::<&String>::new();
// a running total of the soft limits covered by the rpcs that agree on the head block
let mut highest_rpcs_sum_soft_limit: u32 = 0;
// TODO: also track highest_rpcs_sum_hard_limit? llama doesn't need this, so it can wait
// check the highest work block for a set of rpcs that can serve our request load
// if it doesn't have enough rpcs for our request load, check the parent block
@ -350,21 +346,21 @@ impl Web3Connections {
// connection is not on the desired block
continue;
}
if heavy_rpcs.contains(conn_name) {
if highest_rpcs.contains(conn_name) {
// connection is on a child block
continue;
}
if let Some(rpc) = self.conns.get(conn_name) {
heavy_rpcs.insert(conn_name);
heavy_sum_soft_limit += rpc.soft_limit;
highest_rpcs.insert(conn_name);
highest_rpcs_sum_soft_limit += rpc.soft_limit;
} else {
warn!("connection missing")
}
}
if heavy_sum_soft_limit < self.min_sum_soft_limit
|| heavy_rpcs.len() < self.min_synced_rpcs
if highest_rpcs_sum_soft_limit < self.min_sum_soft_limit
|| highest_rpcs.len() < self.min_synced_rpcs
{
// not enough rpcs yet. check the parent
if let Some(parent_block) = self.block_hashes.get(&maybe_head_block.parent_hash)
@ -373,29 +369,29 @@ impl Web3Connections {
child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd",
);
maybe_head_block = parent_block.clone();
maybe_head_block = parent_block;
continue;
} else {
warn!(
"no parent to check. soft limit only {}/{} from {}/{} rpcs: {}%",
heavy_sum_soft_limit,
highest_rpcs_sum_soft_limit,
self.min_sum_soft_limit,
heavy_rpcs.len(),
highest_rpcs.len(),
self.min_synced_rpcs,
heavy_sum_soft_limit * 100 / self.min_sum_soft_limit
highest_rpcs_sum_soft_limit * 100 / self.min_sum_soft_limit
);
break;
}
}
}
// TODO: if heavy_rpcs.is_empty, try another method of finding the head block
// TODO: if consensus_head_rpcs.is_empty, try another method of finding the head block
let num_connection_heads = connection_heads.len();
let total_conns = self.conns.len();
// we've done all the searching for the heaviest block that we can
if heavy_rpcs.is_empty() {
if highest_rpcs.is_empty() {
// if we get here, something is wrong. clear synced connections
let empty_synced_connections = SyncedConnections::default();
@ -406,35 +402,36 @@ impl Web3Connections {
// TODO: log different things depending on old_synced_connections
warn!(%rpc, "no consensus head! {}/{}/{}", 0, num_connection_heads, total_conns);
} else {
// TODO: this is too verbose. move to trace
// i think "conns" is somehow getting dupes
trace!(?heavy_rpcs);
trace!(?highest_rpcs);
// TODO: if maybe_head_block.time() is old, ignore it
// success! this block has enough soft limit and nodes on it (or on later blocks)
let conns: Vec<Arc<Web3Connection>> = heavy_rpcs
let conns: Vec<Arc<Web3Connection>> = highest_rpcs
.into_iter()
.filter_map(|conn_name| self.conns.get(conn_name).cloned())
.collect();
let heavy_block = maybe_head_block;
let consensus_head_block = maybe_head_block;
let heavy_hash = heavy_block.hash.expect("head blocks always have hashes");
let heavy_num = heavy_block.number.expect("head blocks always have numbers");
let consensus_head_hash = consensus_head_block
.hash
.expect("head blocks always have hashes");
let consensus_head_num = consensus_head_block
.number
.expect("head blocks always have numbers");
debug_assert_ne!(heavy_num, U64::zero());
debug_assert_ne!(consensus_head_num, U64::zero());
// TODO: add these to the log messages
let num_consensus_rpcs = conns.len();
let heavy_block_id = BlockId {
hash: heavy_hash,
num: heavy_num,
let consensus_head_block_id = BlockId {
hash: consensus_head_hash,
num: consensus_head_num,
};
let new_synced_connections = SyncedConnections {
head_block_id: Some(heavy_block_id.clone()),
head_block_id: Some(consensus_head_block_id.clone()),
conns,
};
@ -442,14 +439,14 @@ impl Web3Connections {
.synced_connections
.swap(Arc::new(new_synced_connections));
// TODO: if the rpc_head_block != heavy, log something somewhere in here
// TODO: if the rpc_head_block != consensus_head_block, log something?
match &old_synced_connections.head_block_id {
None => {
debug!(block=%heavy_block_id, %rpc, "first {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
debug!(block=%consensus_head_block_id, %rpc, "first {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
self.save_block(&heavy_block, true).await?;
self.save_block(&consensus_head_block, true).await?;
head_block_sender.send(heavy_block)?;
head_block_sender.send(consensus_head_block)?;
}
Some(old_block_id) => {
// TODO: do this log item better
@ -457,42 +454,40 @@ impl Web3Connections {
.map(|x| x.to_string())
.unwrap_or_else(|| "None".to_string());
match heavy_block_id.num.cmp(&old_block_id.num) {
match consensus_head_block_id.num.cmp(&old_block_id.num) {
Ordering::Equal => {
// TODO: if rpc_block_id != heavy_block_id, do a different log
// TODO: if rpc_block_id != consensus_head_block_id, do a different log?
// multiple blocks with the same fork!
if heavy_block_id.hash == old_block_id.hash {
if consensus_head_block_id.hash == old_block_id.hash {
// no change in hash. no need to use head_block_sender
debug!(con_head=%heavy_block_id, rpc_head=%rpc_head_str, %rpc, "con {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns)
debug!(con_head=%consensus_head_block_id, rpc_head=%rpc_head_str, %rpc, "con {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns)
} else {
// hash changed
debug!(con_head=%heavy_block_id, old=%old_block_id, rpc_head=%rpc_head_str, %rpc, "unc {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
debug!(con_head=%consensus_head_block_id, old=%old_block_id, rpc_head=%rpc_head_str, %rpc, "unc {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
// todo!("handle equal by updating the cannonical chain");
self.save_block(&heavy_block, true).await?;
self.save_block(&consensus_head_block, true).await?;
head_block_sender.send(heavy_block)?;
head_block_sender.send(consensus_head_block)?;
}
}
Ordering::Less => {
// this is unlikely but possible
// TODO: better log
warn!(con_head=%heavy_block_id, old_head=%old_block_id, rpc_head=%rpc_head_str, %rpc, "chain rolled back {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
warn!(con_head=%consensus_head_block_id, old_head=%old_block_id, rpc_head=%rpc_head_str, %rpc, "chain rolled back {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
self.save_block(&heavy_block, true).await?;
// TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems slike a good idea
self.save_block(&consensus_head_block, true).await?;
// todo!("handle less by removing higher blocks from the cannonical chain");
head_block_sender.send(heavy_block)?;
head_block_sender.send(consensus_head_block)?;
}
Ordering::Greater => {
debug!(con_head=%heavy_block_id, rpc_head=%rpc_head_str, %rpc, "new {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
debug!(con_head=%consensus_head_block_id, rpc_head=%rpc_head_str, %rpc, "new {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
// todo!("handle greater by adding this block to and any missing parents to the cannonical chain");
self.save_block(&consensus_head_block, true).await?;
self.save_block(&heavy_block, true).await?;
head_block_sender.send(heavy_block)?;
head_block_sender.send(consensus_head_block)?;
}
}
}

@ -375,42 +375,10 @@ impl Web3Connection {
// TODO: is unwrap_or_default ok? we might have an empty block
let new_hash = new_head_block.hash.unwrap_or_default();
// if we already have this block saved, set new_block_head to that arc and don't store this copy
// TODO: small race here
new_head_block = if let Some(existing_block) = block_map.get(&new_hash) {
// we only save blocks with a total difficulty
debug_assert!(existing_block.total_difficulty.is_some());
existing_block
} else if new_head_block.total_difficulty.is_some() {
// this block has a total difficulty, it is safe to use
block_map.insert(new_hash, new_head_block).await;
// we get instead of return new_head_block just in case there was a race
// TODO: but how bad is this race? it might be fine
block_map.get(&new_hash).expect("we just inserted")
} else {
// Cache miss and NO TOTAL DIFFICULTY!
// self got the head block first. unfortunately its missing a necessary field
// keep this even after https://github.com/ledgerwatch/erigon/issues/5190 is closed.
// there are other clients and we might have to use a third party without the td fix.
trace!(rpc=%self, ?new_hash, "total_difficulty missing");
// todo: this can wait forever!
let complete_head_block: Block<TxHash> = self
.wait_for_request_handle()
.await?
.request("eth_getBlockByHash", (new_hash, false), false)
.await?;
debug_assert!(complete_head_block.total_difficulty.is_some());
block_map
.insert(new_hash, Arc::new(complete_head_block))
.await;
// we get instead of return new_head_block just in case there was a race
// TODO: but how bad is this race? it might be fine
block_map.get(&new_hash).expect("we just inserted")
};
// if we already have this block saved, set new_head_block to that arc. otherwise store this copy
new_head_block = block_map
.get_with(new_hash, async move { new_head_block })
.await;
let new_num = new_head_block.number.unwrap_or_default();