diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index fe9033d6..1b5940d1 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1076,7 +1076,7 @@ impl App { self: &Arc, web3_request: &Arc, ) -> Web3ProxyResult>> { - if self.protected_rpcs.is_empty().await { + if self.protected_rpcs.is_empty() { self.balanced_rpcs.request_with_metadata(web3_request).await } else { self.protected_rpcs diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index d7b5e100..f7bb77ca 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -319,7 +319,7 @@ async fn websocket_proxy_web3_rpc( ) -> Web3ProxyResult { match &json_request.method[..] { "eth_subscribe" => { - // TODO: this needs a permit + // todo!(this needs a permit) let web3_request = ValidatedRequest::new_with_app( app, authorization, diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index cee32503..80823f96 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -384,7 +384,7 @@ impl ConsensusFinder { let consensus_head_block = new_ranked_rpcs.head_block.clone(); let num_consensus_rpcs = new_ranked_rpcs.num_active_rpcs(); let num_active_rpcs = self.len(); - let total_rpcs = web3_rpcs.len().await; + let total_rpcs = web3_rpcs.len(); let new_ranked_rpcs = Arc::new(new_ranked_rpcs); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 60344136..5c2d740a 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -17,13 +17,14 @@ use futures_util::future::join_all; use hashbrown::HashMap; use http::StatusCode; use moka::future::CacheBuilder; +use parking_lot::RwLock; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; use std::borrow::Cow; use std::fmt::{self, Display}; use std::sync::Arc; -use tokio::sync::{mpsc, watch, RwLock as AsyncRwLock}; +use tokio::sync::{mpsc, watch}; use tokio::time::{sleep_until, Duration, Instant}; use tokio::{pin, select}; use tracing::{debug, error, info, trace, warn}; @@ -37,7 +38,7 @@ pub struct Web3Rpcs { pub(crate) block_sender: mpsc::UnboundedSender<(Option, Arc)>, /// any requests will be forwarded to one (or more) of these connections /// TODO: hopefully this not being an async lock will be okay. if you need it across awaits, clone the arc - pub(crate) by_name: AsyncRwLock>>, + pub(crate) by_name: RwLock>>, /// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender` /// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed /// TODO: why is watch_consensus_head_sender in an Option, but this one isn't? @@ -130,7 +131,7 @@ impl Web3Rpcs { watch::channel(Default::default()); // by_name starts empty. self.apply_server_configs will add to it - let by_name = AsyncRwLock::new(HashMap::new()); + let by_name = RwLock::new(HashMap::new()); let max_head_block_lag = max_head_block_lag.unwrap_or(5.into()); @@ -252,11 +253,7 @@ impl Web3Rpcs { Ok((new_rpc, _handle)) => { // web3 connection worked - // we don't remove it yet because we want the new one connected first - let old_rpc = { - let by_name = self.by_name.read().await; - by_name.get(&new_rpc.name).cloned() - }; + let old_rpc = self.by_name.read().get(&new_rpc.name).map(Arc::clone); // clean up the old rpc if it exists if let Some(old_rpc) = old_rpc { @@ -284,10 +281,7 @@ impl Web3Rpcs { // new rpc is synced (or old one was not synced). update the local map // make sure that any new requests use the new connection - self.by_name - .write() - .await - .insert(new_rpc.name.clone(), new_rpc); + self.by_name.write().insert(new_rpc.name.clone(), new_rpc); // tell the old rpc to disconnect if let Some(ref disconnect_sender) = old_rpc.disconnect_watch { @@ -295,10 +289,7 @@ impl Web3Rpcs { disconnect_sender.send_replace(true); } } else { - self.by_name - .write() - .await - .insert(new_rpc.name.clone(), new_rpc); + self.by_name.write().insert(new_rpc.name.clone(), new_rpc); } } Err(err) => { @@ -311,12 +302,12 @@ impl Web3Rpcs { } // TODO: remove any RPCs that were part of the config, but are now removed - let active_names: Vec<_> = self.by_name.read().await.keys().cloned().collect(); + let active_names: Vec<_> = self.by_name.read().keys().cloned().collect(); for name in active_names { if names_to_keep.contains(&name) { continue; } - if let Some(old_rpc) = self.by_name.write().await.remove(&name) { + if let Some(old_rpc) = self.by_name.write().remove(&name) { if let Some(ref disconnect_sender) = old_rpc.disconnect_watch { debug!("telling {} to disconnect. no longer needed", old_rpc); disconnect_sender.send_replace(true); @@ -324,7 +315,7 @@ impl Web3Rpcs { } } - let num_rpcs = self.len().await; + let num_rpcs = self.len(); if num_rpcs < self.min_synced_rpcs { return Err(Web3ProxyError::NotEnoughRpcs { @@ -336,16 +327,16 @@ impl Web3Rpcs { Ok(()) } - pub async fn get(&self, conn_name: &str) -> Option> { - self.by_name.read().await.get(conn_name).cloned() + pub fn get(&self, conn_name: &str) -> Option> { + self.by_name.read().get(conn_name).cloned() } - pub async fn len(&self) -> usize { - self.by_name.read().await.len() + pub fn len(&self) -> usize { + self.by_name.read().len() } - pub async fn is_empty(&self) -> bool { - self.by_name.read().await.is_empty() + pub fn is_empty(&self) -> bool { + self.by_name.read().is_empty() } /// TODO: rename to be consistent between "head" and "synced" @@ -407,30 +398,25 @@ impl Web3Rpcs { web3_request: &Arc, ) -> Web3ProxyResult { // TODO: by_name might include things that are on a forked - - let option_ranked_rpcs = self.watch_ranked_rpcs.borrow().clone(); - - let ranked_rpcs: Arc = if let Some(ranked_rpcs) = option_ranked_rpcs { - ranked_rpcs - } else if self.watch_head_block.is_some() { - // if we are here, this set of rpcs is subscribed to newHeads. But we didn't get a RankedRpcs. that means something is wrong - return Err(Web3ProxyError::NoServersSynced); - } else { - // no RankedRpcs, but also no newHeads subscription. This is probably a set of "protected" rpcs or similar - // TODO: return a future that resolves once we do have something? - let rpcs = { - let by_name = self.by_name.read().await; - by_name.values().cloned().collect() - }; - - if let Some(x) = RankedRpcs::from_rpcs(rpcs, web3_request.head_block.clone()) { - Arc::new(x) - } else { - // i doubt we will ever get here - // TODO: return a future that resolves once we do have something? + let ranked_rpcs: Arc = + if let Some(ranked_rpcs) = self.watch_ranked_rpcs.borrow().clone() { + ranked_rpcs + } else if self.watch_head_block.is_some() { + // if we are here, this set of rpcs is subscribed to newHeads. But we didn't get a RankedRpcs. that means something is wrong return Err(Web3ProxyError::NoServersSynced); - } - }; + } else { + // no RankedRpcs, but also no newHeads subscription. This is probably a set of "protected" rpcs or similar + // TODO: return a future that resolves once we do have something? + let rpcs = self.by_name.read().values().cloned().collect(); + + if let Some(x) = RankedRpcs::from_rpcs(rpcs, web3_request.head_block.clone()) { + Arc::new(x) + } else { + // i doubt we will ever get here + // TODO: return a future that resolves once we do have something? + return Err(Web3ProxyError::NoServersSynced); + } + }; match ranked_rpcs.for_request(web3_request) { None => Err(Web3ProxyError::NoServersSynced), @@ -592,10 +578,9 @@ impl fmt::Debug for Web3Rpcs { // TODO: the default formatter takes forever to write. this is too quiet though let consensus_rpcs = self.watch_ranked_rpcs.borrow().is_some(); - // todo!(get names) - // let names = self.by_name.blocking_read(); - // let names = names.values().map(|x| x.name.as_str()).collect::>(); - let names = (); + let names = self.by_name.read(); + + let names = names.values().map(|x| x.name.as_str()).collect::>(); let head_block = self.head_block(); @@ -615,11 +600,9 @@ impl Serialize for Web3Rpcs { let mut state = serializer.serialize_struct("Web3Rpcs", 5)?; { - // todo!(get rpcs) - // let by_name = self.by_name.read().await; - // let rpcs: Vec<&Arc> = by_name.values().collect(); + let by_name = self.by_name.read(); + let rpcs: Vec<&Arc> = by_name.values().collect(); // TODO: coordinate with frontend team to rename "conns" to "rpcs" - let rpcs = (); state.serialize_field("conns", &rpcs)?; } diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index fe296024..c9f81945 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -1315,6 +1315,7 @@ impl Serialize for Web3Rpc { where S: Serializer, { + // 15 if we bring head_delay back let mut state = serializer.serialize_struct("Web3Rpc", 14)?; // the url is excluded because it likely includes private information. just show the name that we use in keys diff --git a/web3_proxy_cli/src/test_utils/app.rs b/web3_proxy_cli/src/test_utils/app.rs index db904449..3294e84a 100644 --- a/web3_proxy_cli/src/test_utils/app.rs +++ b/web3_proxy_cli/src/test_utils/app.rs @@ -151,9 +151,7 @@ impl TestApp { let start = Instant::now(); while frontend_port == 0 { // we have to give it some time because it might have to do migrations - // we also added an arbitrary delay so that all the servers would definitely have time to respond. - // TODO: that arbitrary delay needs tuning - if start.elapsed() > Duration::from_secs(60) { + if start.elapsed() > Duration::from_secs(10) { panic!("took too long to start!"); }