From 31f772591b591fb95270854984eecec9d18af768 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 10 Jul 2023 18:09:58 -0700 Subject: [PATCH] put rpcs in a rwlock again --- Dockerfile | 3 ++ web3_proxy/src/app/mod.rs | 3 ++ web3_proxy/src/balance.rs | 7 ++- web3_proxy/src/rpcs/many.rs | 86 ++++++++++++++----------------------- 4 files changed, 42 insertions(+), 57 deletions(-) diff --git a/Dockerfile b/Dockerfile index 864b3f1c..791bd333 100644 --- a/Dockerfile +++ b/Dockerfile @@ -72,6 +72,7 @@ RUN --mount=type=cache,target=/root/.cargo/git \ --mount=type=cache,target=/app/target \ set -eux; \ \ + [ -e "$(pwd)/payment-contracts/src/contracts/mod.rs" ] || touch "$(pwd)/payment-contracts/build.rs"; \ cargo --locked --verbose fetch # build tests (done its in own FROM so that it can run in parallel) @@ -86,6 +87,7 @@ RUN --mount=type=cache,target=/root/.cargo/git \ --mount=type=cache,target=/app/target \ set -eux; \ \ + [ -e "$(pwd)/payment-contracts/src/contracts/mod.rs" ] || touch "$(pwd)/payment-contracts/build.rs"; \ RUST_LOG=web3_proxy=trace,info \ cargo \ --frozen \ @@ -106,6 +108,7 @@ RUN --mount=type=cache,target=/root/.cargo/git \ --mount=type=cache,target=/app/target \ set -eux; \ \ + [ -e "$(pwd)/payment-contracts/src/contracts/mod.rs" ] || touch "$(pwd)/payment-contracts/build.rs"; \ cargo install \ --features "$WEB3_PROXY_FEATURES" \ --frozen \ diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index dc4d6b66..3cdbdf88 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -654,6 +654,7 @@ impl Web3ProxyApp { pub async fn apply_top_config(&self, new_top_config: TopConfig) -> Web3ProxyResult<()> { // TODO: also update self.config from new_top_config.app + info!("applying new config"); // connect to the backends self.balanced_rpcs @@ -685,6 +686,8 @@ impl Web3ProxyApp { } } + info!("config applied successfully"); + Ok(()) } diff --git a/web3_proxy/src/balance.rs b/web3_proxy/src/balance.rs index 6c9ecdf3..b8dde440 100644 --- a/web3_proxy/src/balance.rs +++ b/web3_proxy/src/balance.rs @@ -1,5 +1,4 @@ use crate::errors::{Web3ProxyErrorContext, Web3ProxyResult}; -use crate::frontend::users::referral; use entities::{ admin_increase_balance_receipt, increase_on_chain_balance_receipt, referee, referrer, rpc_accounting_v2, rpc_key, stripe_increase_balance_receipt, @@ -147,7 +146,7 @@ impl Balance { .into_tuple() .one(db_conn) .await - .web3_context("fetching rpc_accounting_v2")? + .web3_context("fetching total_spent_paid_credits and total_spent")? .unwrap_or_default(); let one_time_referee_bonus = referee::Entity::find() @@ -160,7 +159,7 @@ impl Balance { .into_tuple() .one(db_conn) .await - .web3_context("fetching referee")? + .web3_context("fetching one time referee bonus")? .unwrap_or_default(); let referal_bonus = referee::Entity::find() @@ -177,7 +176,7 @@ impl Balance { .into_tuple() .one(db_conn) .await - .web3_context("fetching referee and referral_codes")? + .web3_context("fetching referal bonus")? .unwrap_or_default(); let balance = Self { diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index ffe8c853..1f9e493b 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -20,7 +20,8 @@ use futures::StreamExt; use hashbrown::HashMap; use itertools::Itertools; use migration::sea_orm::DatabaseConnection; -use moka::future::{Cache, CacheBuilder, ConcurrentCacheExt}; +use moka::future::{Cache, CacheBuilder}; +use parking_lot::RwLock; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; @@ -44,7 +45,7 @@ pub struct Web3Rpcs { pub(crate) block_sender: flume::Sender<(Option, Arc)>, /// any requests will be forwarded to one (or more) of these connections /// TODO: hopefully this not being an async lock will be okay. if you need it across awaits, clone the arc - pub(crate) by_name: Cache>, + pub(crate) by_name: RwLock>>, /// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender` /// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed /// TODO: why is watch_consensus_head_sender in an Option, but this one isn't? @@ -113,7 +114,7 @@ impl Web3Rpcs { watch::channel(Default::default()); // by_name starts empty. self.apply_server_configs will add to it - let by_name = Cache::builder().build(); + let by_name = RwLock::new(HashMap::new()); let max_head_block_lag = max_head_block_lag.unwrap_or(5.into()); @@ -225,18 +226,20 @@ impl Web3Rpcs { while let Some(x) = spawn_handles.next().await { match x { - Ok(Ok((rpc, _handle))) => { + Ok(Ok((new_rpc, _handle))) => { // web3 connection worked + let old_rpc = self.by_name.read().get(&new_rpc.name).map(Arc::clone); + // clean up the old rpc if it exists - if let Some(old_rpc) = self.by_name.get(&rpc.name) { + if let Some(old_rpc) = old_rpc { trace!("old_rpc: {}", old_rpc); // if the old rpc was synced, wait for the new one to sync if old_rpc.head_block.as_ref().unwrap().borrow().is_some() { let mut new_head_receiver = - rpc.head_block.as_ref().unwrap().subscribe(); - trace!("waiting for new {} connection to sync", rpc); + new_rpc.head_block.as_ref().unwrap().subscribe(); + trace!("waiting for new {} connection to sync", new_rpc); // TODO: maximum wait time while new_head_receiver.borrow_and_update().is_none() { @@ -248,8 +251,7 @@ impl Web3Rpcs { // new rpc is synced (or old one was not synced). update the local map // make sure that any new requests use the new connection - self.by_name.insert(rpc.name.clone(), rpc).await; - self.by_name.sync(); + self.by_name.write().insert(new_rpc.name.clone(), new_rpc); // tell the old rpc to disconnect if let Some(ref disconnect_sender) = old_rpc.disconnect_watch { @@ -257,8 +259,7 @@ impl Web3Rpcs { disconnect_sender.send_replace(true); } } else { - self.by_name.insert(rpc.name.clone(), rpc).await; - self.by_name.sync(); + self.by_name.write().insert(new_rpc.name.clone(), new_rpc); } } Ok(Err(err)) => { @@ -287,17 +288,15 @@ impl Web3Rpcs { } pub fn get(&self, conn_name: &str) -> Option> { - self.by_name.get(conn_name) + self.by_name.read().get(conn_name).cloned() } pub fn len(&self) -> usize { - // // TODO: this seems not great. investigate better ways to do this - // self.by_name.sync(); - self.by_name.entry_count() as usize + self.by_name.read().len() } pub fn is_empty(&self) -> bool { - self.len() > 0 + self.by_name.read().is_empty() } /// TODO: rename to be consistent between "head" and "synced" @@ -652,7 +651,7 @@ impl Web3Rpcs { let mut earliest_retry_at = None; // TODO: filter the rpcs with Ranked.will_work_now - let mut all_rpcs: Vec<_> = self.by_name.iter().map(|x| x.1).collect(); + let mut all_rpcs: Vec<_> = self.by_name.read().values().cloned().collect(); let mut max_count = if let Some(max_count) = max_count { max_count @@ -1291,8 +1290,6 @@ impl Display for Web3Rpcs { impl fmt::Debug for Web3Rpcs { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default formatter takes forever to write. this is too quiet though - self.by_name.sync(); - let consensus_rpcs = self.watch_ranked_rpcs.borrow().is_some(); f.debug_struct("Web3Rpcs") @@ -1310,10 +1307,8 @@ impl Serialize for Web3Rpcs { let mut state = serializer.serialize_struct("Web3Rpcs", 6)?; { - // TODO: i didn't think we'd need this, but it shouldn't hurt - self.by_name.sync(); - - let rpcs: Vec> = self.by_name.iter().map(|x| x.1).collect(); + let by_name = self.by_name.read(); + let rpcs: Vec<&Arc> = by_name.values().collect(); // TODO: coordinate with frontend team to rename "conns" to "rpcs" state.serialize_field("conns", &rpcs)?; } @@ -1528,19 +1523,14 @@ mod tests { let chain_id = 1; - let by_name = Cache::builder().build(); - by_name - .insert(head_rpc.name.clone(), head_rpc.clone()) - .await; - by_name - .insert(lagged_rpc.name.clone(), lagged_rpc.clone()) - .await; - by_name.sync(); + let mut by_name = HashMap::new(); + by_name.insert(head_rpc.name.clone(), head_rpc.clone()); + by_name.insert(lagged_rpc.name.clone(), lagged_rpc.clone()); // TODO: make a Web3Rpcs::new let rpcs = Web3Rpcs { block_sender: block_sender.clone(), - by_name, + by_name: RwLock::new(by_name), chain_id, name: "test".to_string(), watch_head_block: Some(watch_consensus_head_sender), @@ -1803,18 +1793,13 @@ mod tests { let chain_id = 1; - let by_name = Cache::builder().build(); - by_name - .insert(pruned_rpc.name.clone(), pruned_rpc.clone()) - .await; - by_name - .insert(archive_rpc.name.clone(), archive_rpc.clone()) - .await; - by_name.sync(); + let mut by_name = HashMap::new(); + by_name.insert(pruned_rpc.name.clone(), pruned_rpc.clone()); + by_name.insert(archive_rpc.name.clone(), archive_rpc.clone()); let rpcs = Web3Rpcs { block_sender, - by_name, + by_name: RwLock::new(by_name), chain_id, name: "test".to_string(), watch_head_block: Some(watch_consensus_head_sender), @@ -1992,22 +1977,17 @@ mod tests { let chain_id = 1; - let by_name = Cache::builder().build(); - by_name - .insert(mock_geth.name.clone(), mock_geth.clone()) - .await; - by_name - .insert( - mock_erigon_archive.name.clone(), - mock_erigon_archive.clone(), - ) - .await; - by_name.sync(); + let mut by_name = HashMap::new(); + by_name.insert(mock_geth.name.clone(), mock_geth.clone()); + by_name.insert( + mock_erigon_archive.name.clone(), + mock_erigon_archive.clone(), + ); // TODO: make a Web3Rpcs::new let rpcs = Web3Rpcs { block_sender, - by_name, + by_name: RwLock::new(by_name), chain_id, name: "test".to_string(), watch_head_block: Some(watch_consensus_head_sender),