Revert "try a RwLock (more should be done if this works)"
This reverts commit 9aa33dae2d3869965ba131e8e561338f2298fd56.
This commit is contained in:
parent
5df4bd8da8
commit
93533eabc7
@ -1076,7 +1076,7 @@ impl App {
|
|||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
web3_request: &Arc<ValidatedRequest>,
|
web3_request: &Arc<ValidatedRequest>,
|
||||||
) -> Web3ProxyResult<SingleResponse<Arc<RawValue>>> {
|
) -> Web3ProxyResult<SingleResponse<Arc<RawValue>>> {
|
||||||
if self.protected_rpcs.is_empty().await {
|
if self.protected_rpcs.is_empty() {
|
||||||
self.balanced_rpcs.request_with_metadata(web3_request).await
|
self.balanced_rpcs.request_with_metadata(web3_request).await
|
||||||
} else {
|
} else {
|
||||||
self.protected_rpcs
|
self.protected_rpcs
|
||||||
|
@ -319,7 +319,7 @@ async fn websocket_proxy_web3_rpc(
|
|||||||
) -> Web3ProxyResult<jsonrpc::Response> {
|
) -> Web3ProxyResult<jsonrpc::Response> {
|
||||||
match &json_request.method[..] {
|
match &json_request.method[..] {
|
||||||
"eth_subscribe" => {
|
"eth_subscribe" => {
|
||||||
// TODO: this needs a permit
|
// todo!(this needs a permit)
|
||||||
let web3_request = ValidatedRequest::new_with_app(
|
let web3_request = ValidatedRequest::new_with_app(
|
||||||
app,
|
app,
|
||||||
authorization,
|
authorization,
|
||||||
|
@ -384,7 +384,7 @@ impl ConsensusFinder {
|
|||||||
let consensus_head_block = new_ranked_rpcs.head_block.clone();
|
let consensus_head_block = new_ranked_rpcs.head_block.clone();
|
||||||
let num_consensus_rpcs = new_ranked_rpcs.num_active_rpcs();
|
let num_consensus_rpcs = new_ranked_rpcs.num_active_rpcs();
|
||||||
let num_active_rpcs = self.len();
|
let num_active_rpcs = self.len();
|
||||||
let total_rpcs = web3_rpcs.len().await;
|
let total_rpcs = web3_rpcs.len();
|
||||||
|
|
||||||
let new_ranked_rpcs = Arc::new(new_ranked_rpcs);
|
let new_ranked_rpcs = Arc::new(new_ranked_rpcs);
|
||||||
|
|
||||||
|
@ -17,13 +17,14 @@ use futures_util::future::join_all;
|
|||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use http::StatusCode;
|
use http::StatusCode;
|
||||||
use moka::future::CacheBuilder;
|
use moka::future::CacheBuilder;
|
||||||
|
use parking_lot::RwLock;
|
||||||
use serde::ser::{SerializeStruct, Serializer};
|
use serde::ser::{SerializeStruct, Serializer};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
use std::fmt::{self, Display};
|
use std::fmt::{self, Display};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::{mpsc, watch, RwLock as AsyncRwLock};
|
use tokio::sync::{mpsc, watch};
|
||||||
use tokio::time::{sleep_until, Duration, Instant};
|
use tokio::time::{sleep_until, Duration, Instant};
|
||||||
use tokio::{pin, select};
|
use tokio::{pin, select};
|
||||||
use tracing::{debug, error, info, trace, warn};
|
use tracing::{debug, error, info, trace, warn};
|
||||||
@ -37,7 +38,7 @@ pub struct Web3Rpcs {
|
|||||||
pub(crate) block_sender: mpsc::UnboundedSender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
|
pub(crate) block_sender: mpsc::UnboundedSender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
|
||||||
/// any requests will be forwarded to one (or more) of these connections
|
/// any requests will be forwarded to one (or more) of these connections
|
||||||
/// TODO: hopefully this not being an async lock will be okay. if you need it across awaits, clone the arc
|
/// TODO: hopefully this not being an async lock will be okay. if you need it across awaits, clone the arc
|
||||||
pub(crate) by_name: AsyncRwLock<HashMap<String, Arc<Web3Rpc>>>,
|
pub(crate) by_name: RwLock<HashMap<String, Arc<Web3Rpc>>>,
|
||||||
/// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender`
|
/// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender`
|
||||||
/// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
|
/// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
|
||||||
/// TODO: why is watch_consensus_head_sender in an Option, but this one isn't?
|
/// TODO: why is watch_consensus_head_sender in an Option, but this one isn't?
|
||||||
@ -130,7 +131,7 @@ impl Web3Rpcs {
|
|||||||
watch::channel(Default::default());
|
watch::channel(Default::default());
|
||||||
|
|
||||||
// by_name starts empty. self.apply_server_configs will add to it
|
// by_name starts empty. self.apply_server_configs will add to it
|
||||||
let by_name = AsyncRwLock::new(HashMap::new());
|
let by_name = RwLock::new(HashMap::new());
|
||||||
|
|
||||||
let max_head_block_lag = max_head_block_lag.unwrap_or(5.into());
|
let max_head_block_lag = max_head_block_lag.unwrap_or(5.into());
|
||||||
|
|
||||||
@ -252,11 +253,7 @@ impl Web3Rpcs {
|
|||||||
Ok((new_rpc, _handle)) => {
|
Ok((new_rpc, _handle)) => {
|
||||||
// web3 connection worked
|
// web3 connection worked
|
||||||
|
|
||||||
// we don't remove it yet because we want the new one connected first
|
let old_rpc = self.by_name.read().get(&new_rpc.name).map(Arc::clone);
|
||||||
let old_rpc = {
|
|
||||||
let by_name = self.by_name.read().await;
|
|
||||||
by_name.get(&new_rpc.name).cloned()
|
|
||||||
};
|
|
||||||
|
|
||||||
// clean up the old rpc if it exists
|
// clean up the old rpc if it exists
|
||||||
if let Some(old_rpc) = old_rpc {
|
if let Some(old_rpc) = old_rpc {
|
||||||
@ -284,10 +281,7 @@ impl Web3Rpcs {
|
|||||||
|
|
||||||
// new rpc is synced (or old one was not synced). update the local map
|
// new rpc is synced (or old one was not synced). update the local map
|
||||||
// make sure that any new requests use the new connection
|
// make sure that any new requests use the new connection
|
||||||
self.by_name
|
self.by_name.write().insert(new_rpc.name.clone(), new_rpc);
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.insert(new_rpc.name.clone(), new_rpc);
|
|
||||||
|
|
||||||
// tell the old rpc to disconnect
|
// tell the old rpc to disconnect
|
||||||
if let Some(ref disconnect_sender) = old_rpc.disconnect_watch {
|
if let Some(ref disconnect_sender) = old_rpc.disconnect_watch {
|
||||||
@ -295,10 +289,7 @@ impl Web3Rpcs {
|
|||||||
disconnect_sender.send_replace(true);
|
disconnect_sender.send_replace(true);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
self.by_name
|
self.by_name.write().insert(new_rpc.name.clone(), new_rpc);
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.insert(new_rpc.name.clone(), new_rpc);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
@ -311,12 +302,12 @@ impl Web3Rpcs {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: remove any RPCs that were part of the config, but are now removed
|
// TODO: remove any RPCs that were part of the config, but are now removed
|
||||||
let active_names: Vec<_> = self.by_name.read().await.keys().cloned().collect();
|
let active_names: Vec<_> = self.by_name.read().keys().cloned().collect();
|
||||||
for name in active_names {
|
for name in active_names {
|
||||||
if names_to_keep.contains(&name) {
|
if names_to_keep.contains(&name) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if let Some(old_rpc) = self.by_name.write().await.remove(&name) {
|
if let Some(old_rpc) = self.by_name.write().remove(&name) {
|
||||||
if let Some(ref disconnect_sender) = old_rpc.disconnect_watch {
|
if let Some(ref disconnect_sender) = old_rpc.disconnect_watch {
|
||||||
debug!("telling {} to disconnect. no longer needed", old_rpc);
|
debug!("telling {} to disconnect. no longer needed", old_rpc);
|
||||||
disconnect_sender.send_replace(true);
|
disconnect_sender.send_replace(true);
|
||||||
@ -324,7 +315,7 @@ impl Web3Rpcs {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let num_rpcs = self.len().await;
|
let num_rpcs = self.len();
|
||||||
|
|
||||||
if num_rpcs < self.min_synced_rpcs {
|
if num_rpcs < self.min_synced_rpcs {
|
||||||
return Err(Web3ProxyError::NotEnoughRpcs {
|
return Err(Web3ProxyError::NotEnoughRpcs {
|
||||||
@ -336,16 +327,16 @@ impl Web3Rpcs {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get(&self, conn_name: &str) -> Option<Arc<Web3Rpc>> {
|
pub fn get(&self, conn_name: &str) -> Option<Arc<Web3Rpc>> {
|
||||||
self.by_name.read().await.get(conn_name).cloned()
|
self.by_name.read().get(conn_name).cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn len(&self) -> usize {
|
pub fn len(&self) -> usize {
|
||||||
self.by_name.read().await.len()
|
self.by_name.read().len()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn is_empty(&self) -> bool {
|
pub fn is_empty(&self) -> bool {
|
||||||
self.by_name.read().await.is_empty()
|
self.by_name.read().is_empty()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// TODO: rename to be consistent between "head" and "synced"
|
/// TODO: rename to be consistent between "head" and "synced"
|
||||||
@ -407,30 +398,25 @@ impl Web3Rpcs {
|
|||||||
web3_request: &Arc<ValidatedRequest>,
|
web3_request: &Arc<ValidatedRequest>,
|
||||||
) -> Web3ProxyResult<RpcsForRequest> {
|
) -> Web3ProxyResult<RpcsForRequest> {
|
||||||
// TODO: by_name might include things that are on a forked
|
// TODO: by_name might include things that are on a forked
|
||||||
|
let ranked_rpcs: Arc<RankedRpcs> =
|
||||||
let option_ranked_rpcs = self.watch_ranked_rpcs.borrow().clone();
|
if let Some(ranked_rpcs) = self.watch_ranked_rpcs.borrow().clone() {
|
||||||
|
ranked_rpcs
|
||||||
let ranked_rpcs: Arc<RankedRpcs> = if let Some(ranked_rpcs) = option_ranked_rpcs {
|
} else if self.watch_head_block.is_some() {
|
||||||
ranked_rpcs
|
// if we are here, this set of rpcs is subscribed to newHeads. But we didn't get a RankedRpcs. that means something is wrong
|
||||||
} else if self.watch_head_block.is_some() {
|
|
||||||
// if we are here, this set of rpcs is subscribed to newHeads. But we didn't get a RankedRpcs. that means something is wrong
|
|
||||||
return Err(Web3ProxyError::NoServersSynced);
|
|
||||||
} else {
|
|
||||||
// no RankedRpcs, but also no newHeads subscription. This is probably a set of "protected" rpcs or similar
|
|
||||||
// TODO: return a future that resolves once we do have something?
|
|
||||||
let rpcs = {
|
|
||||||
let by_name = self.by_name.read().await;
|
|
||||||
by_name.values().cloned().collect()
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(x) = RankedRpcs::from_rpcs(rpcs, web3_request.head_block.clone()) {
|
|
||||||
Arc::new(x)
|
|
||||||
} else {
|
|
||||||
// i doubt we will ever get here
|
|
||||||
// TODO: return a future that resolves once we do have something?
|
|
||||||
return Err(Web3ProxyError::NoServersSynced);
|
return Err(Web3ProxyError::NoServersSynced);
|
||||||
}
|
} else {
|
||||||
};
|
// no RankedRpcs, but also no newHeads subscription. This is probably a set of "protected" rpcs or similar
|
||||||
|
// TODO: return a future that resolves once we do have something?
|
||||||
|
let rpcs = self.by_name.read().values().cloned().collect();
|
||||||
|
|
||||||
|
if let Some(x) = RankedRpcs::from_rpcs(rpcs, web3_request.head_block.clone()) {
|
||||||
|
Arc::new(x)
|
||||||
|
} else {
|
||||||
|
// i doubt we will ever get here
|
||||||
|
// TODO: return a future that resolves once we do have something?
|
||||||
|
return Err(Web3ProxyError::NoServersSynced);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
match ranked_rpcs.for_request(web3_request) {
|
match ranked_rpcs.for_request(web3_request) {
|
||||||
None => Err(Web3ProxyError::NoServersSynced),
|
None => Err(Web3ProxyError::NoServersSynced),
|
||||||
@ -592,10 +578,9 @@ impl fmt::Debug for Web3Rpcs {
|
|||||||
// TODO: the default formatter takes forever to write. this is too quiet though
|
// TODO: the default formatter takes forever to write. this is too quiet though
|
||||||
let consensus_rpcs = self.watch_ranked_rpcs.borrow().is_some();
|
let consensus_rpcs = self.watch_ranked_rpcs.borrow().is_some();
|
||||||
|
|
||||||
// todo!(get names)
|
let names = self.by_name.read();
|
||||||
// let names = self.by_name.blocking_read();
|
|
||||||
// let names = names.values().map(|x| x.name.as_str()).collect::<Vec<_>>();
|
let names = names.values().map(|x| x.name.as_str()).collect::<Vec<_>>();
|
||||||
let names = ();
|
|
||||||
|
|
||||||
let head_block = self.head_block();
|
let head_block = self.head_block();
|
||||||
|
|
||||||
@ -615,11 +600,9 @@ impl Serialize for Web3Rpcs {
|
|||||||
let mut state = serializer.serialize_struct("Web3Rpcs", 5)?;
|
let mut state = serializer.serialize_struct("Web3Rpcs", 5)?;
|
||||||
|
|
||||||
{
|
{
|
||||||
// todo!(get rpcs)
|
let by_name = self.by_name.read();
|
||||||
// let by_name = self.by_name.read().await;
|
let rpcs: Vec<&Arc<Web3Rpc>> = by_name.values().collect();
|
||||||
// let rpcs: Vec<&Arc<Web3Rpc>> = by_name.values().collect();
|
|
||||||
// TODO: coordinate with frontend team to rename "conns" to "rpcs"
|
// TODO: coordinate with frontend team to rename "conns" to "rpcs"
|
||||||
let rpcs = ();
|
|
||||||
state.serialize_field("conns", &rpcs)?;
|
state.serialize_field("conns", &rpcs)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1315,6 +1315,7 @@ impl Serialize for Web3Rpc {
|
|||||||
where
|
where
|
||||||
S: Serializer,
|
S: Serializer,
|
||||||
{
|
{
|
||||||
|
// 15 if we bring head_delay back
|
||||||
let mut state = serializer.serialize_struct("Web3Rpc", 14)?;
|
let mut state = serializer.serialize_struct("Web3Rpc", 14)?;
|
||||||
|
|
||||||
// the url is excluded because it likely includes private information. just show the name that we use in keys
|
// the url is excluded because it likely includes private information. just show the name that we use in keys
|
||||||
|
@ -151,9 +151,7 @@ impl TestApp {
|
|||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
while frontend_port == 0 {
|
while frontend_port == 0 {
|
||||||
// we have to give it some time because it might have to do migrations
|
// we have to give it some time because it might have to do migrations
|
||||||
// we also added an arbitrary delay so that all the servers would definitely have time to respond.
|
if start.elapsed() > Duration::from_secs(10) {
|
||||||
// TODO: that arbitrary delay needs tuning
|
|
||||||
if start.elapsed() > Duration::from_secs(60) {
|
|
||||||
panic!("took too long to start!");
|
panic!("took too long to start!");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user