try a RwLock (more should be done if this works)

This commit is contained in:
Bryan Stitt 2023-10-11 10:55:45 -07:00
parent 28687f153f
commit 9aa33dae2d
8 changed files with 60 additions and 45 deletions

View File

@ -1100,7 +1100,7 @@ impl App {
self: &Arc<Self>, self: &Arc<Self>,
web3_request: &Arc<ValidatedRequest>, web3_request: &Arc<ValidatedRequest>,
) -> Web3ProxyResult<SingleResponse<Arc<RawValue>>> { ) -> Web3ProxyResult<SingleResponse<Arc<RawValue>>> {
if self.protected_rpcs.is_empty() { if self.protected_rpcs.is_empty().await {
self.balanced_rpcs.request_with_metadata(web3_request).await self.balanced_rpcs.request_with_metadata(web3_request).await
} else { } else {
self.protected_rpcs self.protected_rpcs

View File

@ -82,7 +82,7 @@ impl App {
continue; continue;
}; };
// TODO: this needs a permit // todo!(this needs a permit)
let subscription_web3_request = ValidatedRequest::new_with_app( let subscription_web3_request = ValidatedRequest::new_with_app(
&app, &app,
authorization.clone(), authorization.clone(),

View File

@ -1013,8 +1013,6 @@ impl Authorization {
let a = Arc::new(a); let a = Arc::new(a);
// todo!(semaphore permit)
Ok((a, p)) Ok((a, p))
} }
} }

View File

@ -384,7 +384,7 @@ impl ConsensusFinder {
let consensus_head_block = new_ranked_rpcs.head_block.clone(); let consensus_head_block = new_ranked_rpcs.head_block.clone();
let num_consensus_rpcs = new_ranked_rpcs.num_active_rpcs(); let num_consensus_rpcs = new_ranked_rpcs.num_active_rpcs();
let num_active_rpcs = self.len(); let num_active_rpcs = self.len();
let total_rpcs = web3_rpcs.len(); let total_rpcs = web3_rpcs.len().await;
let new_ranked_rpcs = Arc::new(new_ranked_rpcs); let new_ranked_rpcs = Arc::new(new_ranked_rpcs);

View File

@ -18,14 +18,13 @@ use futures_util::future::join_all;
use hashbrown::HashMap; use hashbrown::HashMap;
use http::StatusCode; use http::StatusCode;
use moka::future::CacheBuilder; use moka::future::CacheBuilder;
use parking_lot::RwLock;
use serde::ser::{SerializeStruct, Serializer}; use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize; use serde::Serialize;
use serde_json::json; use serde_json::json;
use std::borrow::Cow; use std::borrow::Cow;
use std::fmt::{self, Display}; use std::fmt::{self, Display};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch, RwLock as AsyncRwLock};
use tokio::time::{sleep_until, Duration, Instant}; use tokio::time::{sleep_until, Duration, Instant};
use tokio::{pin, select}; use tokio::{pin, select};
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, trace, warn};
@ -39,7 +38,7 @@ pub struct Web3Rpcs {
pub(crate) block_sender: mpsc::UnboundedSender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>, pub(crate) block_sender: mpsc::UnboundedSender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
/// any requests will be forwarded to one (or more) of these connections /// any requests will be forwarded to one (or more) of these connections
/// TODO: hopefully this not being an async lock will be okay. if you need it across awaits, clone the arc /// TODO: hopefully this not being an async lock will be okay. if you need it across awaits, clone the arc
pub(crate) by_name: RwLock<HashMap<String, Arc<Web3Rpc>>>, pub(crate) by_name: AsyncRwLock<HashMap<String, Arc<Web3Rpc>>>,
/// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender` /// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender`
/// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed /// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
/// TODO: why is watch_consensus_head_sender in an Option, but this one isn't? /// TODO: why is watch_consensus_head_sender in an Option, but this one isn't?
@ -132,7 +131,7 @@ impl Web3Rpcs {
watch::channel(Default::default()); watch::channel(Default::default());
// by_name starts empty. self.apply_server_configs will add to it // by_name starts empty. self.apply_server_configs will add to it
let by_name = RwLock::new(HashMap::new()); let by_name = AsyncRwLock::new(HashMap::new());
let max_head_block_lag = max_head_block_lag.unwrap_or(5.into()); let max_head_block_lag = max_head_block_lag.unwrap_or(5.into());
@ -248,7 +247,11 @@ impl Web3Rpcs {
Ok((new_rpc, _handle)) => { Ok((new_rpc, _handle)) => {
// web3 connection worked // web3 connection worked
let old_rpc = self.by_name.read().get(&new_rpc.name).map(Arc::clone); // we don't remove it yet because we want the new one connected first
let old_rpc = {
let by_name = self.by_name.read().await;
by_name.get(&new_rpc.name).cloned()
};
// clean up the old rpc if it exists // clean up the old rpc if it exists
if let Some(old_rpc) = old_rpc { if let Some(old_rpc) = old_rpc {
@ -276,7 +279,10 @@ impl Web3Rpcs {
// new rpc is synced (or old one was not synced). update the local map // new rpc is synced (or old one was not synced). update the local map
// make sure that any new requests use the new connection // make sure that any new requests use the new connection
self.by_name.write().insert(new_rpc.name.clone(), new_rpc); self.by_name
.write()
.await
.insert(new_rpc.name.clone(), new_rpc);
// tell the old rpc to disconnect // tell the old rpc to disconnect
if let Some(ref disconnect_sender) = old_rpc.disconnect_watch { if let Some(ref disconnect_sender) = old_rpc.disconnect_watch {
@ -284,7 +290,10 @@ impl Web3Rpcs {
disconnect_sender.send_replace(true); disconnect_sender.send_replace(true);
} }
} else { } else {
self.by_name.write().insert(new_rpc.name.clone(), new_rpc); self.by_name
.write()
.await
.insert(new_rpc.name.clone(), new_rpc);
} }
} }
Err(err) => { Err(err) => {
@ -297,12 +306,12 @@ impl Web3Rpcs {
} }
// TODO: remove any RPCs that were part of the config, but are now removed // TODO: remove any RPCs that were part of the config, but are now removed
let active_names: Vec<_> = self.by_name.read().keys().cloned().collect(); let active_names: Vec<_> = self.by_name.read().await.keys().cloned().collect();
for name in active_names { for name in active_names {
if names_to_keep.contains(&name) { if names_to_keep.contains(&name) {
continue; continue;
} }
if let Some(old_rpc) = self.by_name.write().remove(&name) { if let Some(old_rpc) = self.by_name.write().await.remove(&name) {
if let Some(ref disconnect_sender) = old_rpc.disconnect_watch { if let Some(ref disconnect_sender) = old_rpc.disconnect_watch {
debug!("telling {} to disconnect. no longer needed", old_rpc); debug!("telling {} to disconnect. no longer needed", old_rpc);
disconnect_sender.send_replace(true); disconnect_sender.send_replace(true);
@ -310,7 +319,7 @@ impl Web3Rpcs {
} }
} }
let num_rpcs = self.len(); let num_rpcs = self.len().await;
if num_rpcs < self.min_synced_rpcs { if num_rpcs < self.min_synced_rpcs {
return Err(Web3ProxyError::NotEnoughRpcs { return Err(Web3ProxyError::NotEnoughRpcs {
@ -322,16 +331,16 @@ impl Web3Rpcs {
Ok(()) Ok(())
} }
pub fn get(&self, conn_name: &str) -> Option<Arc<Web3Rpc>> { pub async fn get(&self, conn_name: &str) -> Option<Arc<Web3Rpc>> {
self.by_name.read().get(conn_name).cloned() self.by_name.read().await.get(conn_name).cloned()
} }
pub fn len(&self) -> usize { pub async fn len(&self) -> usize {
self.by_name.read().len() self.by_name.read().await.len()
} }
pub fn is_empty(&self) -> bool { pub async fn is_empty(&self) -> bool {
self.by_name.read().is_empty() self.by_name.read().await.is_empty()
} }
/// TODO: rename to be consistent between "head" and "synced" /// TODO: rename to be consistent between "head" and "synced"
@ -443,26 +452,31 @@ impl Web3Rpcs {
web3_request: &Arc<ValidatedRequest>, web3_request: &Arc<ValidatedRequest>,
) -> Web3ProxyResult<RpcsForRequest> { ) -> Web3ProxyResult<RpcsForRequest> {
// TODO: by_name might include things that are on a forked // TODO: by_name might include things that are on a forked
let ranked_rpcs: Arc<RankedRpcs> =
if let Some(ranked_rpcs) = self.watch_ranked_rpcs.borrow().clone() {
ranked_rpcs
} else if self.watch_head_block.is_some() {
// if we are here, this set of rpcs is subscribed to newHeads. But we didn't get a RankedRpcs. that means something is wrong
return Err(Web3ProxyError::NoServersSynced);
} else {
// no RankedRpcs, but also no newHeads subscription. This is probably a set of "protected" rpcs or similar
// TODO: return a future that resolves once we do have something?
let rpcs = self.by_name.read().values().cloned().collect();
if let Some(x) = RankedRpcs::from_rpcs(rpcs, web3_request.head_block.clone()) { let option_ranked_rpcs = self.watch_ranked_rpcs.borrow().clone();
Arc::new(x)
} else { let ranked_rpcs: Arc<RankedRpcs> = if let Some(ranked_rpcs) = option_ranked_rpcs {
// i doubt we will ever get here ranked_rpcs
// TODO: return a future that resolves once we do have something? } else if self.watch_head_block.is_some() {
return Err(Web3ProxyError::NoServersSynced); // if we are here, this set of rpcs is subscribed to newHeads. But we didn't get a RankedRpcs. that means something is wrong
} return Err(Web3ProxyError::NoServersSynced);
} else {
// no RankedRpcs, but also no newHeads subscription. This is probably a set of "protected" rpcs or similar
// TODO: return a future that resolves once we do have something?
let rpcs = {
let by_name = self.by_name.read().await;
by_name.values().cloned().collect()
}; };
if let Some(x) = RankedRpcs::from_rpcs(rpcs, web3_request.head_block.clone()) {
Arc::new(x)
} else {
// i doubt we will ever get here
// TODO: return a future that resolves once we do have something?
return Err(Web3ProxyError::NoServersSynced);
}
};
match ranked_rpcs.for_request(web3_request) { match ranked_rpcs.for_request(web3_request) {
None => Err(Web3ProxyError::NoServersSynced), None => Err(Web3ProxyError::NoServersSynced),
Some(x) => Ok(x), Some(x) => Ok(x),
@ -623,9 +637,9 @@ impl fmt::Debug for Web3Rpcs {
// TODO: the default formatter takes forever to write. this is too quiet though // TODO: the default formatter takes forever to write. this is too quiet though
let consensus_rpcs = self.watch_ranked_rpcs.borrow().is_some(); let consensus_rpcs = self.watch_ranked_rpcs.borrow().is_some();
let names = self.by_name.read(); // let names = self.by_name.blocking_read();
// let names = names.values().map(|x| x.name.as_str()).collect::<Vec<_>>();
let names = names.values().map(|x| x.name.as_str()).collect::<Vec<_>>(); let names = ();
let head_block = self.head_block(); let head_block = self.head_block();
@ -645,9 +659,10 @@ impl Serialize for Web3Rpcs {
let mut state = serializer.serialize_struct("Web3Rpcs", 5)?; let mut state = serializer.serialize_struct("Web3Rpcs", 5)?;
{ {
let by_name = self.by_name.read(); // let by_name = self.by_name.read().await;
let rpcs: Vec<&Arc<Web3Rpc>> = by_name.values().collect(); // let rpcs: Vec<&Arc<Web3Rpc>> = by_name.values().collect();
// TODO: coordinate with frontend team to rename "conns" to "rpcs" // TODO: coordinate with frontend team to rename "conns" to "rpcs"
let rpcs = ();
state.serialize_field("conns", &rpcs)?; state.serialize_field("conns", &rpcs)?;
} }

View File

@ -1386,7 +1386,6 @@ impl Serialize for Web3Rpc {
where where
S: Serializer, S: Serializer,
{ {
// 14 if we bring head_delay back
let mut state = serializer.serialize_struct("Web3Rpc", 14)?; let mut state = serializer.serialize_struct("Web3Rpc", 14)?;
// the url is excluded because it likely includes private information. just show the name that we use in keys // the url is excluded because it likely includes private information. just show the name that we use in keys

View File

@ -3,6 +3,7 @@ use std::sync::atomic::AtomicU16;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::{fs, thread}; use std::{fs, thread};
use tokio::time::sleep;
use tracing::{error, info, trace, warn}; use tracing::{error, info, trace, warn};
use web3_proxy::app::{flatten_handle, flatten_handles, App}; use web3_proxy::app::{flatten_handle, flatten_handles, App};
use web3_proxy::config::TopConfig; use web3_proxy::config::TopConfig;

View File

@ -151,7 +151,9 @@ impl TestApp {
let start = Instant::now(); let start = Instant::now();
while frontend_port == 0 { while frontend_port == 0 {
// we have to give it some time because it might have to do migrations // we have to give it some time because it might have to do migrations
if start.elapsed() > Duration::from_secs(10) { // we also added an arbitrary delay so that all the servers would definitely have time to respond.
// TODO: that arbitrary delay needs tuning
if start.elapsed() > Duration::from_secs(60) {
panic!("took too long to start!"); panic!("took too long to start!");
} }