by_name needs a lock to prevent races during disconnects

This commit is contained in:
Bryan Stitt 2023-06-16 10:40:02 -07:00
parent 7c876ac561
commit af7759e491
3 changed files with 28 additions and 32 deletions

@ -904,13 +904,6 @@ impl Web3ProxyApp {
}
};
// app.pending_transactions.sync();
// app.rpc_secret_key_cache.sync();
// "pending_transactions_count": app.pending_transactions.entry_count(),
// "pending_transactions_size": app.pending_transactions.weighted_size(),
// "user_cache_count": app.rpc_secret_key_cache.entry_count(),
// "user_cache_size": app.rpc_secret_key_cache.weighted_size(),
#[derive(Serialize)]
struct CombinedMetrics {
recent_ip_counts: RecentCounts,

@ -476,7 +476,7 @@ impl Web3Rpcs {
let consensus_head_block = new_consensus_rpcs.head_block.clone();
let num_consensus_rpcs = new_consensus_rpcs.num_consensus_rpcs();
let num_active_rpcs = consensus_finder.len();
let total_rpcs = self.by_name.load().len();
let total_rpcs = self.len();
let new_consensus_rpcs = Arc::new(new_consensus_rpcs);

@ -11,7 +11,6 @@ use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::frontend::status::MokaCacheSerializer;
use crate::jsonrpc::{JsonRpcErrorData, JsonRpcParams, JsonRpcResultData};
use crate::rpcs::transactions::TxStatus;
use arc_swap::ArcSwap;
use counter::Counter;
use derive_more::From;
use ethers::prelude::{ProviderError, TxHash, U64};
@ -23,6 +22,7 @@ use itertools::Itertools;
use log::{debug, error, info, trace, warn};
use migration::sea_orm::DatabaseConnection;
use moka::future::{Cache, CacheBuilder};
use parking_lot::RwLock;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use serde_json::json;
@ -38,11 +38,13 @@ use tokio::time::{sleep, sleep_until, Duration, Instant};
/// A collection of web3 connections. Sends requests either the current best server or all servers.
#[derive(From)]
pub struct Web3Rpcs {
/// TODO: this should be a Cow
pub(crate) name: String,
/// if watch_consensus_head_sender is some, Web3Rpc inside self will send blocks here when they get them
pub(crate) block_sender: flume::Sender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
/// any requests will be forwarded to one (or more) of these connections
pub(crate) by_name: ArcSwap<HashMap<String, Arc<Web3Rpc>>>,
/// TODO: hopefully this not being an async lock will be okay. if you need it across awaits, clone the arc
pub(crate) by_name: RwLock<HashMap<String, Arc<Web3Rpc>>>,
/// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender`
/// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
/// TODO: why is watch_consensus_head_sender in an Option, but this one isn't?
@ -215,15 +217,10 @@ impl Web3Rpcs {
match x {
Ok(Ok((rpc, _handle))) => {
// web3 connection worked
let mut new_by_name = (*self.by_name.load_full()).clone();
// make sure that any new requests use the new connection
let old_rpc = new_by_name.insert(rpc.name.clone(), rpc.clone());
// clean up the old rpc if it exists
let old_rpc = self.by_name.read().get(&rpc.name).map(Arc::clone);
// update the arc swap
self.by_name.store(Arc::new(new_by_name));
// clean up the old rpc
if let Some(old_rpc) = old_rpc {
trace!("old_rpc: {}", old_rpc);
@ -241,11 +238,17 @@ impl Web3Rpcs {
}
}
// new rpc is synced (or old one was not synced). update the local map
// make sure that any new requests use the new connection
self.by_name.write().insert(rpc.name.clone(), rpc);
// tell the old rpc to disconnect
if let Some(ref disconnect_sender) = old_rpc.disconnect_watch {
trace!("telling {} to disconnect", old_rpc);
disconnect_sender.send_replace(true);
}
} else {
self.by_name.write().insert(rpc.name.clone(), rpc);
}
}
Ok(Err(err)) => {
@ -261,7 +264,7 @@ impl Web3Rpcs {
}
}
let num_rpcs = self.by_name.load().len();
let num_rpcs = self.len();
if num_rpcs < self.min_synced_rpcs {
return Err(Web3ProxyError::NotEnoughRpcs {
@ -274,15 +277,15 @@ impl Web3Rpcs {
}
pub fn get(&self, conn_name: &str) -> Option<Arc<Web3Rpc>> {
self.by_name.load().get(conn_name).cloned()
self.by_name.read().get(conn_name).map(Arc::clone)
}
pub fn len(&self) -> usize {
self.by_name.load().len()
self.by_name.read().len()
}
pub fn is_empty(&self) -> bool {
self.by_name.load().is_empty()
self.by_name.read().is_empty()
}
pub fn min_head_rpcs(&self) -> usize {
@ -512,9 +515,9 @@ impl Web3Rpcs {
if self.watch_consensus_head_sender.is_none() {
// this Web3Rpcs is not tracking head blocks. pick any server
let by_name = self.by_name.load();
let mut potential_rpcs: Vec<_> = by_name
let mut potential_rpcs: Vec<_> = self
.by_name
.read()
.values()
.filter(|rpc| !skip_rpcs.contains(rpc))
.filter(|rpc| {
@ -553,7 +556,7 @@ impl Web3Rpcs {
let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe();
let mut potential_rpcs = Vec::with_capacity(self.by_name.load().len());
let mut potential_rpcs = Vec::with_capacity(self.len());
loop {
let consensus_rpcs = watch_consensus_rpcs.borrow_and_update().clone();
@ -751,7 +754,7 @@ impl Web3Rpcs {
let mut max_count = if let Some(max_count) = max_count {
max_count
} else {
self.by_name.load().len()
self.len()
};
trace!("max_count: {}", max_count);
@ -777,7 +780,7 @@ impl Web3Rpcs {
// if there aren't enough synced connections, include more connections
// TODO: only do this sorting if the synced_rpcs isn't enough
let mut all_rpcs: Vec<_> = self.by_name.load().values().cloned().collect();
let mut all_rpcs: Vec<_> = self.by_name.read().values().cloned().collect();
all_rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed.copied()));
trace!("all_rpcs: {:#?}", all_rpcs);
@ -1071,7 +1074,7 @@ impl Web3Rpcs {
return Err(err.into());
}
let num_conns = self.by_name.load().len();
let num_conns = self.len();
let num_skipped = skip_rpcs.len();
let needed = min_block_needed.max(max_block_needed);
@ -1269,7 +1272,7 @@ impl Serialize for Web3Rpcs {
let mut state = serializer.serialize_struct("Web3Rpcs", 8)?;
{
let by_name = self.by_name.load();
let by_name = self.by_name.read();
let rpcs: Vec<&Web3Rpc> = by_name.values().map(|x| x.as_ref()).collect();
// TODO: coordinate with frontend team to rename "conns" to "rpcs"
state.serialize_field("conns", &rpcs)?;
@ -1509,7 +1512,7 @@ mod tests {
// TODO: make a Web3Rpcs::new
let rpcs = Web3Rpcs {
block_sender: block_sender.clone(),
by_name: ArcSwap::from_pointee(rpcs_by_name),
by_name: RwLock::new(rpcs_by_name),
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
@ -1789,7 +1792,7 @@ mod tests {
let rpcs = Web3Rpcs {
block_sender,
by_name: ArcSwap::from_pointee(rpcs_by_name),
by_name: RwLock::new(rpcs_by_name),
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
@ -1975,7 +1978,7 @@ mod tests {
// TODO: make a Web3Rpcs::new
let rpcs = Web3Rpcs {
block_sender,
by_name: ArcSwap::from_pointee(rpcs_by_name),
by_name: RwLock::new(rpcs_by_name),
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,