by_name needs to a lock

This commit is contained in:
Bryan Stitt 2023-02-26 20:00:13 -08:00
parent e96f09a9c4
commit bf79d677b0
4 changed files with 42 additions and 100 deletions

@ -677,3 +677,5 @@ in another repo: event subscriber
- [ ] give public_recent_ips_salt a better, more general, name
- [ ] include tier in the head block logs?
- [ ] i think i use FuturesUnordered when a try_join_all might be better
- [ ] since we are read-heavy on our configs, maybe we should use a cache
- "using a thread local storage and explicit types" https://docs.rs/arc-swap/latest/arc_swap/cache/struct.Cache.html

@ -8,7 +8,6 @@ use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest};
use anyhow::{anyhow, Context};
use derive_more::From;
use ethers::prelude::{Block, TxHash, H256, U64};
use hashbrown::HashSet;
use log::{debug, error, trace, warn, Level};
use moka::future::Cache;
use serde::Serialize;
@ -339,18 +338,7 @@ impl Web3Rpcs {
// Geth's subscriptions have the same potential for skipping blocks.
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
// TODO: indexmap or hashmap? what hasher? with_capacity?
// TODO: this will grow unbounded. prune old heads on this at the same time we prune the graph?
let configured_tiers: Vec<u64> = self
.by_name
.values()
.map(|x| x.tier)
.collect::<HashSet<u64>>()
.into_iter()
.collect();
let mut connection_heads =
ConsensusFinder::new(&configured_tiers, self.max_block_age, self.max_block_lag);
let mut connection_heads = ConsensusFinder::new(self.max_block_age, self.max_block_lag);
loop {
match block_receiver.recv_async().await {
@ -385,13 +373,13 @@ impl Web3Rpcs {
&self,
authorization: &Arc<Authorization>,
consensus_finder: &mut ConsensusFinder,
rpc_head_block: Option<Web3ProxyBlock>,
new_block: Option<Web3ProxyBlock>,
rpc: Arc<Web3Rpc>,
pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
// TODO: how should we handle an error here?
if !consensus_finder
.update_rpc(rpc_head_block.clone(), rpc.clone(), self)
.update_rpc(new_block.clone(), rpc.clone(), self)
.await
.context("failed to update rpc")?
{
@ -422,7 +410,7 @@ impl Web3Rpcs {
.all_rpcs_group()
.map(|x| x.len())
.unwrap_or_default();
let total_rpcs = self.by_name.len();
let total_rpcs = self.by_name.read().len();
let old_consensus_head_connections = self
.watch_consensus_rpcs_sender
@ -462,7 +450,7 @@ impl Web3Rpcs {
}
Some(old_head_block) => {
// TODO: do this log item better
let rpc_head_str = rpc_head_block
let rpc_head_str = new_block
.map(|x| x.to_string())
.unwrap_or_else(|| "None".to_string());
@ -578,7 +566,7 @@ impl Web3Rpcs {
}
} else {
// TODO: do this log item better
let rpc_head_str = rpc_head_block
let rpc_head_str = new_block
.map(|x| x.to_string())
.unwrap_or_else(|| "None".to_string());

@ -141,54 +141,6 @@ impl ConnectionsGroup {
self.rpc_name_to_block.insert(rpc.name.clone(), block)
}
// // TODO: do this during insert/remove?
// pub(self) async fn highest_block(
// &self,
// authorization: &Arc<Authorization>,
// web3_rpcs: &Web3Rpcs,
// ) -> Option<ArcBlock> {
// let mut checked_heads = HashSet::with_capacity(self.rpc_name_to_hash.len());
// let mut highest_block = None::<ArcBlock>;
// for (rpc_name, rpc_head_hash) in self.rpc_name_to_hash.iter() {
// // don't waste time checking the same hash multiple times
// if checked_heads.contains(rpc_head_hash) {
// continue;
// }
// let rpc_block = match web3_rpcs
// .get_block_from_rpc(rpc_name, rpc_head_hash, authorization)
// .await
// {
// Ok(x) => x,
// Err(err) => {
// warn!(
// "failed getting block {} from {} while finding highest block number: {:?}",
// rpc_head_hash, rpc_name, err,
// );
// continue;
// }
// };
// checked_heads.insert(rpc_head_hash);
// // if this is the first block we've tried
// // or if this rpc's newest block has a higher number
// // we used to check total difficulty, but that isn't a thing anymore on ETH
// // TODO: we still need total difficulty on some other PoW chains. whats annoying is it isn't considered part of the "block header" just the block. so websockets don't return it
// let highest_num = highest_block
// .as_ref()
// .map(|x| x.number.expect("blocks here should always have a number"));
// let rpc_num = rpc_block.as_ref().number;
// if rpc_num > highest_num {
// highest_block = Some(rpc_block);
// }
// }
// highest_block
// }
/// min_consensus_block_num keeps us from ever going backwards.
/// TODO: think about min_consensus_block_num more. i think this might cause an outage if the chain is doing weird things. but 503s is probably better than broken data.
pub(self) async fn consensus_head_connections(
@ -266,7 +218,7 @@ impl ConnectionsGroup {
continue;
}
if let Some(rpc) = web3_rpcs.by_name.get(rpc_name.as_str()) {
if let Some(rpc) = web3_rpcs.by_name.read().get(rpc_name.as_str()) {
if backup_rpcs_voted.is_some() {
// backups already voted for a head block. don't change it
} else {
@ -280,7 +232,7 @@ impl ConnectionsGroup {
} else {
// i don't think this is an error. i think its just if a reconnect is currently happening
warn!("connection missing: {}", rpc_name);
debug!("web3_rpcs.conns: {:#?}", web3_rpcs.by_name);
debug!("web3_rpcs.by_name: {:#?}", web3_rpcs.by_name);
}
}
@ -363,7 +315,7 @@ impl ConnectionsGroup {
// success! this block has enough soft limit and nodes on it (or on later blocks)
let rpcs: Vec<Arc<Web3Rpc>> = primary_consensus_rpcs
.into_iter()
.filter_map(|conn_name| web3_rpcs.by_name.get(conn_name).cloned())
.filter_map(|conn_name| web3_rpcs.by_name.read().get(conn_name).cloned())
.collect();
#[cfg(debug_assertions)]
@ -396,20 +348,16 @@ pub struct ConsensusFinder {
}
impl ConsensusFinder {
pub fn new(
configured_tiers: &[u64],
max_block_age: Option<u64>,
max_block_lag: Option<U64>,
) -> Self {
// TODO: what's a good capacity for this?
pub fn new(max_block_age: Option<u64>, max_block_lag: Option<U64>) -> Self {
// TODO: what's a good capacity for this? it shouldn't need to be very large
// TODO: if we change Web3ProxyBlock to store the instance, i think we could use the block_by_hash cache
let first_seen = Cache::builder()
.max_capacity(16)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// TODO: this will need some thought when config reloading is written
let tiers = configured_tiers
.iter()
.map(|x| (*x, ConnectionsGroup::new(first_seen.clone())))
// TODO: hard coding 0-9 isn't great, but its easier than refactoring this to be smart about config reloading
let tiers = (0..10)
.map(|x| (x, ConnectionsGroup::new(first_seen.clone())))
.collect();
Self {

@ -21,6 +21,7 @@ use log::{debug, error, info, trace, warn, Level};
use migration::sea_orm::DatabaseConnection;
use moka::future::{Cache, ConcurrentCacheExt};
use ordered_float::OrderedFloat;
use parking_lot::RwLock;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use serde_json::json;
@ -31,7 +32,7 @@ use std::sync::atomic::{self, Ordering};
use std::sync::Arc;
use std::{cmp, fmt};
use thread_fast_rng::rand::seq::SliceRandom;
use tokio::sync::{broadcast, watch, RwLock as AsyncRwLock};
use tokio::sync::{broadcast, watch};
use tokio::task;
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
@ -41,7 +42,9 @@ pub struct Web3Rpcs {
/// if watch_consensus_head_sender is some, Web3Rpc inside self will send blocks here when they get them
pub(crate) block_sender: flume::Sender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
/// any requests will be forwarded to one (or more) of these connections
pub(crate) by_name: HashMap<String, Arc<Web3Rpc>>,
/// TODO: i tried to make this an AsyncRwLock, but then we have trouble serializing it
/// TODO: maybe an ArcSwap would be better. writes are rare
pub(crate) by_name: RwLock<HashMap<String, Arc<Web3Rpc>>>,
pub(crate) http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
/// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender`
/// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
@ -269,16 +272,13 @@ impl Web3Rpcs {
})
.collect();
// map of connection names to their connection
let connections = AsyncRwLock::new(HashMap::new());
while let Some(x) = spawn_handles.next().await {
match x {
Ok(Ok((connection, _handle))) => {
// web3 connection worked
let old_rpc = connections
let old_rpc = self
.by_name
.write()
.await
.insert(connection.name.clone(), connection);
if let Some(old_rpc) = old_rpc {
@ -303,11 +303,11 @@ impl Web3Rpcs {
Ok(())
}
pub fn get(&self, conn_name: &str) -> Option<&Arc<Web3Rpc>> {
self.by_name.get(conn_name)
pub fn get(&self, conn_name: &str) -> Option<Arc<Web3Rpc>> {
self.by_name.read().get(conn_name).cloned()
}
pub async fn min_head_rpcs(&self) -> usize {
pub fn min_head_rpcs(&self) -> usize {
self.min_head_rpcs
}
@ -526,6 +526,7 @@ impl Web3Rpcs {
// TODO: maybe have a helper on synced_connections? that way sum_soft_limits/min_synced_rpcs will be DRY
for x in self
.by_name
.read()
.values()
.filter(|x| {
// TODO: move a bunch of this onto a rpc.is_synced function
@ -744,7 +745,7 @@ impl Web3Rpcs {
let mut max_count = if let Some(max_count) = max_count {
max_count
} else {
self.by_name.len()
self.by_name.read().len()
};
let mut tried = HashSet::new();
@ -756,7 +757,7 @@ impl Web3Rpcs {
// if there aren't enough synced connections, include more connections
// TODO: only do this sorting if the synced_conns isn't enough
let mut all_conns: Vec<_> = self.by_name.values().cloned().collect();
let mut all_conns: Vec<_> = self.by_name.read().values().cloned().collect();
all_conns.sort_by_cached_key(rpc_sync_status_sort_key);
for connection in itertools::chain(synced_conns, all_conns) {
@ -835,7 +836,7 @@ impl Web3Rpcs {
loop {
let num_skipped = skip_rpcs.len();
if num_skipped == self.by_name.len() {
if num_skipped >= self.by_name.read().len() {
break;
}
@ -1008,7 +1009,7 @@ impl Web3Rpcs {
return Ok(r);
}
let num_conns = self.by_name.len();
let num_conns = self.by_name.read().len();
let num_skipped = skip_rpcs.len();
if num_skipped == 0 {
@ -1171,9 +1172,12 @@ impl Serialize for Web3Rpcs {
{
let mut state = serializer.serialize_struct("Web3Rpcs", 6)?;
let rpcs: Vec<&Web3Rpc> = self.by_name.values().map(|x| x.as_ref()).collect();
// TODO: coordinate with frontend team to rename "conns" to "rpcs"
state.serialize_field("conns", &rpcs)?;
{
let by_name = self.by_name.read();
let rpcs: Vec<&Web3Rpc> = by_name.values().map(|x| x.as_ref()).collect();
// TODO: coordinate with frontend team to rename "conns" to "rpcs"
state.serialize_field("conns", &rpcs)?;
}
{
let consensus_connections = self.watch_consensus_rpcs_sender.borrow().clone();
@ -1387,7 +1391,7 @@ mod tests {
// TODO: make a Web3Rpcs::new
let rpcs = Web3Rpcs {
block_sender,
by_name: rpcs_by_name,
by_name: RwLock::new(rpcs_by_name),
http_interval_sender: None,
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
@ -1409,7 +1413,7 @@ mod tests {
let authorization = Arc::new(Authorization::internal(None).unwrap());
let mut consensus_finder = ConsensusFinder::new(&[0, 1, 2, 3], None, None);
let mut consensus_finder = ConsensusFinder::new(None, None);
// process None so that
rpcs.process_block_from_rpc(
@ -1588,7 +1592,7 @@ mod tests {
// TODO: make a Web3Rpcs::new
let rpcs = Web3Rpcs {
block_sender,
by_name: rpcs_by_name,
by_name: RwLock::new(rpcs_by_name),
http_interval_sender: None,
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
@ -1608,7 +1612,7 @@ mod tests {
let authorization = Arc::new(Authorization::internal(None).unwrap());
let mut connection_heads = ConsensusFinder::new(&[0, 1, 2, 3], None, None);
let mut connection_heads = ConsensusFinder::new(None, None);
// min sum soft limit will require tier 2
rpcs.process_block_from_rpc(