thresholds and fork detection

This commit is contained in:
Bryan Stitt 2022-07-25 00:27:00 +00:00
parent b70807f744
commit ee98de4065
6 changed files with 277 additions and 182 deletions

@ -53,9 +53,11 @@
- [x] web3_sha3 rpc command
- [x] test that launches anvil and connects the proxy to it and does some basic queries
- [x] need to have some sort of shutdown signaling. doesn't need to be graceful at this point, but should be eventually
- [ ] if the fastest server has hit rate limits, we won't be able to serve any traffic until another server is synced.
- [x] if the fastest server has hit rate limits, we won't be able to serve any traffic until another server is synced.
- thundering herd problem if we only allow a lag of 0 blocks
- we can improve this by only `publish`ing the sorted list once a threshold of total available soft and hard limits is passed. how can we do this without hammering redis? at least its only once per block per server
- we can improve this by only publishing the synced connections once a threshold of total available soft and hard limits is passed. how can we do this without hammering redis? at least its only once per block per server
- [x] instead of tracking `pending_synced_connections`, have a mapping of where all connections are individually. then each change, re-check for consensus.
- [ ] synced connections swap threshold should come from config
- [ ] basic request method stats
- [ ] nice output when cargo doc is run
@ -183,4 +185,5 @@ in another repo: event subscriber
2022-07-22T23:52:18.593956Z WARN block_receiver: web3_proxy::connections: chain is forked! 1 possible heads. 1/1/4 rpcs have 0xa906…5bc1 rpc=Web3Connection { url: "ws://127.0.0.1:8546", data: 64, .. } new_block_num=15195517
2022-07-22T23:52:18.983441Z WARN block_receiver: web3_proxy::connections: chain is forked! 1 possible heads. 1/1/4 rpcs have 0x70e8…48e0 rpc=Web3Connection { url: "ws://127.0.0.1:8546", data: 64, .. } new_block_num=15195517
2022-07-22T23:52:19.350720Z WARN block_receiver: web3_proxy::connections: chain is forked! 2 possible heads. 1/2/4 rpcs have 0x70e8…48e0 rpc=Web3Connection { url: "ws://127.0.0.1:8549", data: "archive", .. } new_block_num=15195517
2022-07-22T23:52:26.041140Z WARN block_receiver: web3_proxy::connections: chain is forked! 2 possible heads. 2/4/4 rpcs have 0x70e8…48e0 rpc=Web3Connection { url: "http://127.0.0.1:8549", data: "archive", .. } new_block_num=15195517
2022-07-22T23:52:26.041140Z WARN block_receiver: web3_proxy::connections: chain is forked! 2 possible heads. 2/4/4 rpcs have 0x70e8…48e0 rpc=Web3Connection { url: "http://127.0.0.1:8549", data: "archive", .. } new_block_num=15195517
- [ ] threshold should check actual available request limits (if any) instead of just the soft limit

@ -5,6 +5,7 @@ version: "3.4"
services:
dev-redis:
build: ./redis-cell-server/
# TODO: expose these ports?
dev-eth:
extends:

@ -824,6 +824,7 @@ impl Web3ProxyApp {
"eth_blockNumber" => {
let head_block_number = self.balanced_rpcs.get_head_block_num();
// TODO: technically, block 0 is okay. i guess we should be using an option
if head_block_number.as_u64() == 0 {
return Err(anyhow::anyhow!("no servers synced"));
}

@ -268,13 +268,17 @@ impl Web3Connection {
Ok((new_connection, handle))
}
pub fn url(&self) -> &str {
&self.url
}
/// TODO: this might be too simple. different nodes can prune differently
pub fn get_block_data_limit(&self) -> U64 {
pub fn block_data_limit(&self) -> U64 {
self.block_data_limit.load(atomic::Ordering::Acquire).into()
}
pub fn has_block_data(&self, needed_block_num: &U64) -> bool {
let block_data_limit: U64 = self.get_block_data_limit();
let block_data_limit: U64 = self.block_data_limit();
let newest_block_num = self.head_block.read().1;
@ -301,7 +305,7 @@ impl Web3Connection {
*provider = None;
// tell the block subscriber that we are at 0
// tell the block subscriber that we don't have any blocks
if let Some(block_sender) = block_sender {
block_sender
.send_async((Arc::new(Block::default()), self.clone()))

@ -23,7 +23,7 @@ use std::time::Duration;
use tokio::sync::{broadcast, watch};
use tokio::task;
use tokio::time::{interval, sleep, MissedTickBehavior};
use tracing::{debug, error, info, info_span, instrument, trace, warn};
use tracing::{debug, error, info, instrument, trace, warn};
use crate::app::{flatten_handle, AnyhowJoinHandle, TxState};
use crate::config::Web3ConnectionConfig;
@ -38,7 +38,7 @@ struct SyncedConnections {
// TODO: this should be able to serialize, but it isn't
// TODO: use linkedhashmap?
#[serde(skip_serializing)]
inner: IndexSet<Arc<Web3Connection>>,
conns: IndexSet<Arc<Web3Connection>>,
}
impl fmt::Debug for SyncedConnections {
@ -106,7 +106,7 @@ impl BlockChain {
/// A collection of web3 connections. Sends requests either the current best server or all servers.
#[derive(From)]
pub struct Web3Connections {
inner: Vec<Arc<Web3Connection>>,
conns: IndexMap<String, Arc<Web3Connection>>,
synced_connections: ArcSwap<SyncedConnections>,
pending_transactions: Arc<DashMap<TxHash, TxState>>,
// TODO: i think chain is what we want, but i'm not sure how we'll use it yet
@ -120,11 +120,10 @@ impl Serialize for Web3Connections {
where
S: Serializer,
{
let inner: Vec<&Web3Connection> = self.inner.iter().map(|x| x.as_ref()).collect();
let conns: Vec<&Web3Connection> = self.conns.iter().map(|x| x.1.as_ref()).collect();
// 3 is the number of fields in the struct.
let mut state = serializer.serialize_struct("Web3Connections", 2)?;
state.serialize_field("rpcs", &inner)?;
state.serialize_field("conns", &conns)?;
state.serialize_field("synced_connections", &**self.synced_connections.load())?;
state.end()
}
@ -134,7 +133,7 @@ impl fmt::Debug for Web3Connections {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("Web3Connections")
.field("inner", &self.inner)
.field("conns", &self.conns)
.finish_non_exhaustive()
}
}
@ -214,7 +213,7 @@ impl Web3Connections {
})
.collect();
let mut connections = vec![];
let mut connections = IndexMap::new();
let mut handles = vec![];
// TODO: futures unordered?
@ -222,7 +221,7 @@ impl Web3Connections {
// TODO: how should we handle errors here? one rpc being down shouldn't cause the program to exit
match x {
Ok(Ok((connection, handle))) => {
connections.push(connection);
connections.insert(connection.url().to_string(), connection);
handles.push(handle);
}
Ok(Err(err)) => {
@ -243,7 +242,7 @@ impl Web3Connections {
let synced_connections = SyncedConnections::default();
let connections = Arc::new(Self {
inner: connections,
conns: connections,
synced_connections: ArcSwap::new(Arc::new(synced_connections)),
pending_transactions,
chain: Default::default(),
@ -516,14 +515,14 @@ impl Web3Connections {
}
pub fn has_synced_rpcs(&self) -> bool {
if self.synced_connections.load().inner.is_empty() {
if self.synced_connections.load().conns.is_empty() {
return false;
}
self.get_head_block_num() > U64::zero()
}
pub fn num_synced_rpcs(&self) -> usize {
self.synced_connections.load().inner.len()
self.synced_connections.load().conns.len()
}
/// Send the same request to all the handles. Returning the most common success or most common error.
@ -592,187 +591,273 @@ impl Web3Connections {
// TODO: use pending_tx_sender
pending_tx_sender: Option<broadcast::Sender<TxState>>,
) -> anyhow::Result<()> {
let total_rpcs = self.inner.len();
let total_rpcs = self.conns.len();
let mut connection_states: HashMap<Arc<Web3Connection>, _> =
HashMap::with_capacity(total_rpcs);
// keep a pending one so that we can delay publishing a new head block until multiple servers are synced
let mut pending_synced_connections = SyncedConnections::default();
// TODO: rpc name instead of url (will do this with config reload revamp)
// TODO: indexmap or hashmap? what hasher? with_capacity?
let mut connection_heads = IndexMap::<String, Arc<Block<TxHash>>>::new();
while let Ok((new_block, rpc)) = block_receiver.recv_async().await {
let new_block_num = match new_block.number {
Some(x) => x.as_u64(),
None => {
// block without a number is expected a node is syncing or
if new_block.hash.is_some() {
// this seems unlikely, but i'm pretty sure we see it
warn!(?new_block, "Block without number!");
}
continue;
}
let new_block_hash = if let Some(hash) = new_block.hash {
hash
} else {
warn!(%rpc, ?new_block, "Block without hash!");
connection_heads.remove(rpc.url());
continue;
};
// TODO: dry this with the code above
let new_block_num = if let Some(num) = new_block.number {
num
} else {
// this seems unlikely, but i'm pretty sure we have seen it
// maybe when a node is syncing or reconnecting?
warn!(%rpc, ?new_block, "Block without number!");
connection_heads.remove(rpc.url());
continue;
};
let new_block_hash = new_block.hash.unwrap();
// TODO: span with more in it?
// TODO: make sure i'm doing this span right
// TODO: show the actual rpc url?
let span = info_span!("block_receiver", ?rpc, new_block_num);
// TODO: clippy lint to make sure we don't hold this across an awaited future
let _enter = span.enter();
// TODO: what level?
// let _span = info_span!("block_receiver", %rpc, %new_block_num).entered();
if new_block_num == 0 {
warn!("still syncing");
if new_block_num == U64::zero() {
warn!(%rpc, %new_block_num, "still syncing");
connection_heads.remove(rpc.url());
} else {
// TODO: no clone? we end up with different blocks for every rpc
connection_heads.insert(rpc.url().to_string(), new_block.clone());
self.chain.add_block(new_block.clone(), false);
}
let mut new_head_block = false;
// iterate connection_heads to find the oldest block
let lowest_block_num = if let Some(lowest_block) = connection_heads
.values()
.min_by(|a, b| a.number.cmp(&b.number))
{
lowest_block.number.unwrap()
} else {
continue;
};
connection_states.insert(rpc.clone(), (new_block_num, new_block_hash));
// iterate connection_heads to find the consensus block
let mut rpcs_by_num = IndexMap::<U64, Vec<&str>>::new();
let mut blocks_by_hash = IndexMap::<H256, Arc<Block<TxHash>>>::new();
// block_hash => soft_limit, rpcs
// TODO: proper type for this?
let mut rpcs_by_hash = IndexMap::<H256, Vec<&str>>::new();
let mut total_soft_limit = 0;
// TODO: do something to update the synced blocks
match new_block_num.cmp(&pending_synced_connections.head_block_num) {
cmp::Ordering::Greater => {
// the rpc's newest block is the new overall best block
// TODO: if trace, do the full block hash?
// TODO: only accept this block if it is a child of the current head_block
info!("new head: {}", new_block_hash);
for (rpc_url, head_block) in connection_heads.iter() {
if let Some(rpc) = self.conns.get(rpc_url) {
// we need the total soft limit in order to know when its safe to update the backends
total_soft_limit += rpc.soft_limit();
pending_synced_connections.inner.clear();
pending_synced_connections.inner.insert(rpc);
let head_hash = head_block.hash.unwrap();
pending_synced_connections.head_block_num = new_block_num;
// save the block
blocks_by_hash
.entry(head_hash)
.or_insert_with(|| head_block.clone());
// TODO: if the parent hash isn't our previous best block, ignore it
pending_synced_connections.head_block_hash = new_block_hash;
// add the rpc to all relevant block heights
let mut block = head_block.clone();
while block.number.unwrap() >= lowest_block_num {
let block_hash = block.hash.unwrap();
let block_num = block.number.unwrap();
// TODO: wait to send this until we publish
// save the rpcs and the sum of their soft limit by their head hash
let rpc_urls_by_hash =
rpcs_by_hash.entry(block_hash).or_insert_with(Vec::new);
rpc_urls_by_hash.push(rpc_url);
// save the rpcs by their number
let rpc_urls_by_num = rpcs_by_num.entry(block_num).or_insert_with(Vec::new);
rpc_urls_by_num.push(rpc_url);
if let Some(parent) = self.chain.get_block(&block.parent_hash) {
// save the parent block
blocks_by_hash.insert(block.parent_hash, parent.clone());
block = parent
} else {
// log this? eventually we will hit a block we don't have, so it's not an error
break;
}
}
}
}
// TODO: configurable threshold. in test we have 4 servers so
// TODO:
// TODO: minimum total_soft_limit? without, when one server is in the loop
let min_soft_limit = total_soft_limit / 2;
struct State<'a> {
block: &'a Arc<Block<TxHash>>,
sum_soft_limit: u32,
conns: Vec<&'a str>,
}
impl<'a> State<'a> {
// TODO: there are sortable traits, but this seems simpler
fn sortable_values(&self) -> (&U64, &u32, usize, &H256) {
let block_num = self.block.number.as_ref().unwrap();
let sum_soft_limit = &self.sum_soft_limit;
let conns = self.conns.len();
let block_hash = self.block.hash.as_ref().unwrap();
(block_num, sum_soft_limit, conns, block_hash)
}
}
trace!(?rpcs_by_hash);
// TODO: i'm always getting None
if let Some(x) = rpcs_by_hash
.into_iter()
.filter_map(|(hash, conns)| {
// TODO: move this to `State::new` function on
let sum_soft_limit = conns
.iter()
.map(|rpc_url| {
if let Some(rpc) = self.conns.get(*rpc_url) {
rpc.soft_limit()
} else {
0
}
})
.sum();
if sum_soft_limit < min_soft_limit {
trace!(?sum_soft_limit, ?min_soft_limit, "sum_soft_limit too low");
None
} else {
let block = blocks_by_hash.get(&hash).unwrap();
Some(State {
block,
sum_soft_limit,
conns,
})
}
})
.max_by(|a, b| a.sortable_values().cmp(&b.sortable_values()))
{
let best_head_num = x.block.number.unwrap();
let best_head_hash = x.block.hash.unwrap();
let best_rpcs = x.conns;
let synced_rpcs = rpcs_by_num.remove(&best_head_num).unwrap();
if best_rpcs.len() == synced_rpcs.len() {
trace!(
"{}/{}/{}/{} rpcs have {}",
best_rpcs.len(),
synced_rpcs.len(),
connection_heads.len(),
total_rpcs,
best_head_hash
);
} else {
warn!(
"chain is forked! {} possible heads. {}/{}/{}/{} rpcs have {}",
"?", // TODO: how should we get this?
best_rpcs.len(),
synced_rpcs.len(),
connection_heads.len(),
total_rpcs,
best_head_hash
);
}
let num_best_rpcs = best_rpcs.len();
// TODOL: do this without clone?
let conns = best_rpcs
.into_iter()
.map(|x| self.conns.get(x).unwrap().clone())
.collect();
let pending_synced_connections = SyncedConnections {
head_block_num: best_head_num.as_u64(),
head_block_hash: best_head_hash,
conns,
};
let current_head_block = self.get_head_block_hash();
let new_head_block =
pending_synced_connections.head_block_hash != current_head_block;
if new_head_block {
self.chain.add_block(new_block.clone(), true);
info!(
"{}/{} rpcs at {} ({}). publishing new head!",
pending_synced_connections.conns.len(),
self.conns.len(),
pending_synced_connections.head_block_hash,
pending_synced_connections.head_block_num,
);
// TODO: what if the hashes don't match?
if pending_synced_connections.head_block_hash == new_block_hash {
// mark all transactions in the block as confirmed
if pending_tx_sender.is_some() {
for tx_hash in &new_block.transactions {
// TODO: should we mark as confirmed via pending_tx_sender?
// TODO: possible deadlock here!
// trace!("removing {}...", tx_hash);
let _ = self.pending_transactions.remove(tx_hash);
// trace!("removed {}", tx_hash);
}
};
// TODO: mark any orphaned transactions as unconfirmed
}
} else {
// TODO: i'm seeing 4/4 print twice. maybe because of http providers?
// TODO: only do this log if there was a change
trace!(
"{}/{} rpcs at {} ({})",
pending_synced_connections.conns.len(),
self.conns.len(),
pending_synced_connections.head_block_hash,
pending_synced_connections.head_block_num,
);
}
// TODO: do this before or after processing all the transactions in this block?
// TODO: only swap if there is a change
trace!(?pending_synced_connections, "swapping");
self.synced_connections
.swap(Arc::new(pending_synced_connections));
if new_head_block {
// TODO: this will need a refactor to only send once a minmum threshold has this block
// TODO: move this onto self.chain
// TODO: pending_synced_connections isn't published yet. which means fast queries using this block will fail
head_block_sender
.send(new_block.clone())
.context("head_block_sender")?;
// TODO: mark all transactions as confirmed
// TODO: mark any orphaned transactions as unconfirmed
// TODO: do not mark cannonical until a threshold of RPCs have this block!
new_head_block = true;
self.chain.add_block(new_block.clone(), new_head_block);
}
cmp::Ordering::Equal => {
if new_block_hash == pending_synced_connections.head_block_hash {
// this rpc has caught up with the best known head
// do not clear synced_connections.
// we just want to add this rpc to the end
// TODO: HashSet here? i think we get dupes if we don't
pending_synced_connections.inner.insert(rpc);
} else {
// same height, but different chain
// check connection_states to see which head block is more popular!
// TODO: i don't think btreemap is what we want. i think we want indexmap or linkedhashmap
let mut rpc_ids_by_block =
IndexMap::<H256, Vec<Arc<Web3Connection>>>::new();
let mut counted_rpcs = 0;
for (rpc, (block_num, block_hash)) in connection_states.iter() {
if *block_num != new_block_num {
// this connection isn't synced. we don't care what hash it has
continue;
}
counted_rpcs += 1;
let count = rpc_ids_by_block
.entry(*block_hash)
.or_insert_with(|| Vec::with_capacity(total_rpcs - 1));
count.push(rpc.clone());
}
let most_common_head_hash = *rpc_ids_by_block
.iter()
.max_by(|a, b| a.1.len().cmp(&b.1.len()))
.map(|(k, _v)| k)
.unwrap();
let synced_rpcs = rpc_ids_by_block.remove(&most_common_head_hash).unwrap();
warn!(
"chain is forked! {} possible heads. {}/{}/{} rpcs have {}",
rpc_ids_by_block.len() + 1,
synced_rpcs.len(),
counted_rpcs,
total_rpcs,
most_common_head_hash
);
self.chain
.add_block(new_block.clone(), new_block_hash == most_common_head_hash);
// TODO: do this more efficiently?
if pending_synced_connections.head_block_hash != most_common_head_hash {
pending_synced_connections.head_block_hash = most_common_head_hash;
}
pending_synced_connections.inner = synced_rpcs.into_iter().collect();
}
}
cmp::Ordering::Less => {
// this isn't the best block in the tier. don't do anything
if !pending_synced_connections.inner.remove(&rpc) {
// we didn't remove anything. nothing more to do
continue;
}
// TODO: insert the hash if it isn't known?
// we removed. don't continue so that we update self.synced_connections
}
}
// the synced connections have changed
if pending_synced_connections.inner.len() == total_rpcs {
// TODO: more metrics
trace!("all head: {}", new_block_hash);
} else {
trace!(
"rpcs at {}: {:?}",
pending_synced_connections.head_block_hash,
pending_synced_connections.inner
);
}
// TODO: what if the hashes don't match?
if pending_synced_connections.head_block_hash == new_block_hash {
// mark all transactions in the block as confirmed
if pending_tx_sender.is_some() {
for tx_hash in &new_block.transactions {
// TODO: should we mark as confirmed via pending_tx_sender?
// TODO: possible deadlock here!
// trace!("removing {}...", tx_hash);
let _ = self.pending_transactions.remove(tx_hash);
// trace!("removed {}", tx_hash);
}
};
// TODO: mark any orphaned transactions as unconfirmed
}
// TODO: only publish if there are x (default 50%) nodes synced to this block?
// TODO: do this before or after processing all the transactions in this block?
self.synced_connections
.swap(Arc::new(pending_synced_connections.clone()));
if new_head_block {
// TODO: this will need a refactor to only send once a minmum threshold has this block
// TODO: move this onto self.chain
// TODO: pending_synced_connections isn't published yet. which means fast queries using this block will fail
head_block_sender
.send(new_block.clone())
.context("head_block_sender")?;
// TODO: this expected when we first start
// TODO: make sure self.synced_connections is empty
warn!("not enough rpcs in sync");
}
}
@ -797,8 +882,8 @@ impl Web3Connections {
if let Some(min_block_needed) = min_block_needed {
// TODO: this includes ALL archive servers. but we only want them if they are on a somewhat recent block
// TODO: maybe instead of "archive_needed" bool it should be the minimum height. then even delayed servers might be fine. will need to track all heights then
self.inner
.iter()
self.conns
.values()
.filter(|x| x.has_block_data(min_block_needed))
.filter(|x| !skip.contains(x))
.cloned()
@ -806,7 +891,7 @@ impl Web3Connections {
} else {
self.synced_connections
.load()
.inner
.conns
.iter()
.filter(|x| !skip.contains(x))
.cloned()
@ -823,7 +908,7 @@ impl Web3Connections {
// TODO: get active requests and the soft limit out of redis?
let active_requests = rpc.active_requests();
let soft_limit = rpc.soft_limit();
let block_data_limit = rpc.get_block_data_limit();
let block_data_limit = rpc.block_data_limit();
let utilization = active_requests as f32 / soft_limit as f32;
@ -870,7 +955,7 @@ impl Web3Connections {
// TODO: with capacity?
let mut selected_rpcs = vec![];
for connection in self.inner.iter() {
for connection in self.conns.values() {
if let Some(min_block_needed) = min_block_needed {
if !connection.has_block_data(min_block_needed) {
continue;
@ -905,7 +990,7 @@ impl Web3Connections {
// TODO: maximum retries?
loop {
if skip_rpcs.len() == self.inner.len() {
if skip_rpcs.len() == self.conns.len() {
break;
}
match self

@ -37,6 +37,7 @@ async fn proxy_web3_socket(app: Arc<Web3ProxyApp>, socket: WebSocket) {
tokio::spawn(read_web3_socket(app, ws_rx, response_tx));
}
/// websockets support a few more methods than http clients
async fn handle_socket_payload(
app: Arc<Web3ProxyApp>,
payload: &str,