diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index f5b58d78..499cbe01 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1100,7 +1100,7 @@ impl App { self: &Arc, web3_request: &Arc, ) -> Web3ProxyResult>> { - if self.protected_rpcs.is_empty() { + if self.protected_rpcs.is_empty().await { self.balanced_rpcs.request_with_metadata(web3_request).await } else { self.protected_rpcs diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index 9b9e5ab5..ab6af228 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -82,7 +82,7 @@ impl App { continue; }; - // TODO: this needs a permit + // todo!(this needs a permit) let subscription_web3_request = ValidatedRequest::new_with_app( &app, authorization.clone(), diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 59f332fb..9f6e49c7 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -1013,8 +1013,6 @@ impl Authorization { let a = Arc::new(a); - // todo!(semaphore permit) - Ok((a, p)) } } diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index b9eb04bb..cacd2691 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -384,7 +384,7 @@ impl ConsensusFinder { let consensus_head_block = new_ranked_rpcs.head_block.clone(); let num_consensus_rpcs = new_ranked_rpcs.num_active_rpcs(); let num_active_rpcs = self.len(); - let total_rpcs = web3_rpcs.len(); + let total_rpcs = web3_rpcs.len().await; let new_ranked_rpcs = Arc::new(new_ranked_rpcs); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 2e2a927e..4814b6f4 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -18,14 +18,13 @@ use futures_util::future::join_all; use hashbrown::HashMap; use http::StatusCode; use moka::future::CacheBuilder; -use parking_lot::RwLock; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; use std::borrow::Cow; use std::fmt::{self, Display}; use std::sync::Arc; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, watch, RwLock as AsyncRwLock}; use tokio::time::{sleep_until, Duration, Instant}; use tokio::{pin, select}; use tracing::{debug, error, info, trace, warn}; @@ -39,7 +38,7 @@ pub struct Web3Rpcs { pub(crate) block_sender: mpsc::UnboundedSender<(Option, Arc)>, /// any requests will be forwarded to one (or more) of these connections /// TODO: hopefully this not being an async lock will be okay. if you need it across awaits, clone the arc - pub(crate) by_name: RwLock>>, + pub(crate) by_name: AsyncRwLock>>, /// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender` /// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed /// TODO: why is watch_consensus_head_sender in an Option, but this one isn't? @@ -132,7 +131,7 @@ impl Web3Rpcs { watch::channel(Default::default()); // by_name starts empty. self.apply_server_configs will add to it - let by_name = RwLock::new(HashMap::new()); + let by_name = AsyncRwLock::new(HashMap::new()); let max_head_block_lag = max_head_block_lag.unwrap_or(5.into()); @@ -248,7 +247,11 @@ impl Web3Rpcs { Ok((new_rpc, _handle)) => { // web3 connection worked - let old_rpc = self.by_name.read().get(&new_rpc.name).map(Arc::clone); + // we don't remove it yet because we want the new one connected first + let old_rpc = { + let by_name = self.by_name.read().await; + by_name.get(&new_rpc.name).cloned() + }; // clean up the old rpc if it exists if let Some(old_rpc) = old_rpc { @@ -276,7 +279,10 @@ impl Web3Rpcs { // new rpc is synced (or old one was not synced). update the local map // make sure that any new requests use the new connection - self.by_name.write().insert(new_rpc.name.clone(), new_rpc); + self.by_name + .write() + .await + .insert(new_rpc.name.clone(), new_rpc); // tell the old rpc to disconnect if let Some(ref disconnect_sender) = old_rpc.disconnect_watch { @@ -284,7 +290,10 @@ impl Web3Rpcs { disconnect_sender.send_replace(true); } } else { - self.by_name.write().insert(new_rpc.name.clone(), new_rpc); + self.by_name + .write() + .await + .insert(new_rpc.name.clone(), new_rpc); } } Err(err) => { @@ -297,12 +306,12 @@ impl Web3Rpcs { } // TODO: remove any RPCs that were part of the config, but are now removed - let active_names: Vec<_> = self.by_name.read().keys().cloned().collect(); + let active_names: Vec<_> = self.by_name.read().await.keys().cloned().collect(); for name in active_names { if names_to_keep.contains(&name) { continue; } - if let Some(old_rpc) = self.by_name.write().remove(&name) { + if let Some(old_rpc) = self.by_name.write().await.remove(&name) { if let Some(ref disconnect_sender) = old_rpc.disconnect_watch { debug!("telling {} to disconnect. no longer needed", old_rpc); disconnect_sender.send_replace(true); @@ -310,7 +319,7 @@ impl Web3Rpcs { } } - let num_rpcs = self.len(); + let num_rpcs = self.len().await; if num_rpcs < self.min_synced_rpcs { return Err(Web3ProxyError::NotEnoughRpcs { @@ -322,16 +331,16 @@ impl Web3Rpcs { Ok(()) } - pub fn get(&self, conn_name: &str) -> Option> { - self.by_name.read().get(conn_name).cloned() + pub async fn get(&self, conn_name: &str) -> Option> { + self.by_name.read().await.get(conn_name).cloned() } - pub fn len(&self) -> usize { - self.by_name.read().len() + pub async fn len(&self) -> usize { + self.by_name.read().await.len() } - pub fn is_empty(&self) -> bool { - self.by_name.read().is_empty() + pub async fn is_empty(&self) -> bool { + self.by_name.read().await.is_empty() } /// TODO: rename to be consistent between "head" and "synced" @@ -443,26 +452,31 @@ impl Web3Rpcs { web3_request: &Arc, ) -> Web3ProxyResult { // TODO: by_name might include things that are on a forked - let ranked_rpcs: Arc = - if let Some(ranked_rpcs) = self.watch_ranked_rpcs.borrow().clone() { - ranked_rpcs - } else if self.watch_head_block.is_some() { - // if we are here, this set of rpcs is subscribed to newHeads. But we didn't get a RankedRpcs. that means something is wrong - return Err(Web3ProxyError::NoServersSynced); - } else { - // no RankedRpcs, but also no newHeads subscription. This is probably a set of "protected" rpcs or similar - // TODO: return a future that resolves once we do have something? - let rpcs = self.by_name.read().values().cloned().collect(); - if let Some(x) = RankedRpcs::from_rpcs(rpcs, web3_request.head_block.clone()) { - Arc::new(x) - } else { - // i doubt we will ever get here - // TODO: return a future that resolves once we do have something? - return Err(Web3ProxyError::NoServersSynced); - } + let option_ranked_rpcs = self.watch_ranked_rpcs.borrow().clone(); + + let ranked_rpcs: Arc = if let Some(ranked_rpcs) = option_ranked_rpcs { + ranked_rpcs + } else if self.watch_head_block.is_some() { + // if we are here, this set of rpcs is subscribed to newHeads. But we didn't get a RankedRpcs. that means something is wrong + return Err(Web3ProxyError::NoServersSynced); + } else { + // no RankedRpcs, but also no newHeads subscription. This is probably a set of "protected" rpcs or similar + // TODO: return a future that resolves once we do have something? + let rpcs = { + let by_name = self.by_name.read().await; + by_name.values().cloned().collect() }; + if let Some(x) = RankedRpcs::from_rpcs(rpcs, web3_request.head_block.clone()) { + Arc::new(x) + } else { + // i doubt we will ever get here + // TODO: return a future that resolves once we do have something? + return Err(Web3ProxyError::NoServersSynced); + } + }; + match ranked_rpcs.for_request(web3_request) { None => Err(Web3ProxyError::NoServersSynced), Some(x) => Ok(x), @@ -623,9 +637,9 @@ impl fmt::Debug for Web3Rpcs { // TODO: the default formatter takes forever to write. this is too quiet though let consensus_rpcs = self.watch_ranked_rpcs.borrow().is_some(); - let names = self.by_name.read(); - - let names = names.values().map(|x| x.name.as_str()).collect::>(); + // let names = self.by_name.blocking_read(); + // let names = names.values().map(|x| x.name.as_str()).collect::>(); + let names = (); let head_block = self.head_block(); @@ -645,9 +659,10 @@ impl Serialize for Web3Rpcs { let mut state = serializer.serialize_struct("Web3Rpcs", 5)?; { - let by_name = self.by_name.read(); - let rpcs: Vec<&Arc> = by_name.values().collect(); + // let by_name = self.by_name.read().await; + // let rpcs: Vec<&Arc> = by_name.values().collect(); // TODO: coordinate with frontend team to rename "conns" to "rpcs" + let rpcs = (); state.serialize_field("conns", &rpcs)?; } diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index f316c285..425aa286 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -1386,7 +1386,6 @@ impl Serialize for Web3Rpc { where S: Serializer, { - // 14 if we bring head_delay back let mut state = serializer.serialize_struct("Web3Rpc", 14)?; // the url is excluded because it likely includes private information. just show the name that we use in keys diff --git a/web3_proxy_cli/src/sub_commands/proxyd.rs b/web3_proxy_cli/src/sub_commands/proxyd.rs index 9c1fc4e4..c3512a19 100644 --- a/web3_proxy_cli/src/sub_commands/proxyd.rs +++ b/web3_proxy_cli/src/sub_commands/proxyd.rs @@ -3,6 +3,7 @@ use std::sync::atomic::AtomicU16; use std::sync::Arc; use std::time::Duration; use std::{fs, thread}; +use tokio::time::sleep; use tracing::{error, info, trace, warn}; use web3_proxy::app::{flatten_handle, flatten_handles, App}; use web3_proxy::config::TopConfig; diff --git a/web3_proxy_cli/src/test_utils/app.rs b/web3_proxy_cli/src/test_utils/app.rs index 3294e84a..db904449 100644 --- a/web3_proxy_cli/src/test_utils/app.rs +++ b/web3_proxy_cli/src/test_utils/app.rs @@ -151,7 +151,9 @@ impl TestApp { let start = Instant::now(); while frontend_port == 0 { // we have to give it some time because it might have to do migrations - if start.elapsed() > Duration::from_secs(10) { + // we also added an arbitrary delay so that all the servers would definitely have time to respond. + // TODO: that arbitrary delay needs tuning + if start.elapsed() > Duration::from_secs(60) { panic!("took too long to start!"); }