fix where we sort and lock less
This commit is contained in:
parent
e8cc27bd37
commit
4eeacbb30c
@ -16,12 +16,41 @@ pub type BlockWatcherSender = mpsc::UnboundedSender<NewHead>;
|
||||
pub type BlockWatcherReceiver = mpsc::UnboundedReceiver<NewHead>;
|
||||
|
||||
// TODO: ethers has a similar SyncingStatus
|
||||
#[derive(Eq)]
|
||||
pub enum SyncStatus {
|
||||
Synced(u64),
|
||||
Behind(u64),
|
||||
Unknown,
|
||||
}
|
||||
|
||||
impl Ord for SyncStatus {
|
||||
fn cmp(&self, other: &Self) -> cmp::Ordering {
|
||||
match (self, other) {
|
||||
(SyncStatus::Synced(a), SyncStatus::Synced(b)) => a.cmp(b),
|
||||
(SyncStatus::Synced(_), SyncStatus::Unknown) => cmp::Ordering::Greater,
|
||||
(SyncStatus::Unknown, SyncStatus::Synced(_)) => cmp::Ordering::Less,
|
||||
(SyncStatus::Unknown, SyncStatus::Unknown) => cmp::Ordering::Equal,
|
||||
(SyncStatus::Synced(_), SyncStatus::Behind(_)) => cmp::Ordering::Greater,
|
||||
(SyncStatus::Behind(_), SyncStatus::Synced(_)) => cmp::Ordering::Less,
|
||||
(SyncStatus::Behind(_), SyncStatus::Unknown) => cmp::Ordering::Greater,
|
||||
(SyncStatus::Behind(a), SyncStatus::Behind(b)) => a.cmp(b),
|
||||
(SyncStatus::Unknown, SyncStatus::Behind(_)) => cmp::Ordering::Less,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for SyncStatus {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for SyncStatus {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.cmp(other) == cmp::Ordering::Equal
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BlockWatcher {
|
||||
sender: BlockWatcherSender,
|
||||
/// this Mutex is locked over awaits, so we want an async lock
|
||||
|
28
src/main.rs
28
src/main.rs
@ -9,12 +9,11 @@ use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::{mpsc, watch, RwLock};
|
||||
use tokio::sync::{mpsc, watch};
|
||||
use tokio::time::sleep;
|
||||
use tracing::warn;
|
||||
use warp::Filter;
|
||||
|
||||
// use crate::types::{BlockMap, ConnectionsMap, RpcRateLimiterMap};
|
||||
use crate::block_watcher::BlockWatcher;
|
||||
use crate::provider::JsonRpcRequest;
|
||||
use crate::provider_tiers::Web3ProviderTier;
|
||||
@ -36,11 +35,6 @@ struct Web3ProxyApp {
|
||||
balanced_rpc_tiers: Arc<Vec<Web3ProviderTier>>,
|
||||
/// Send private requests (like eth_sendRawTransaction) to all these servers
|
||||
private_rpcs: Option<Arc<Web3ProviderTier>>,
|
||||
/// write lock on these when all rate limits are hit
|
||||
/// this lock will be held open over an await, so use async locking
|
||||
balanced_rpc_ratelimiter_lock: RwLock<()>,
|
||||
/// this lock will be held open over an await, so use async locking
|
||||
private_rpcs_ratelimiter_lock: RwLock<()>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Web3ProxyApp {
|
||||
@ -165,8 +159,6 @@ impl Web3ProxyApp {
|
||||
clock,
|
||||
balanced_rpc_tiers,
|
||||
private_rpcs,
|
||||
balanced_rpc_ratelimiter_lock: Default::default(),
|
||||
private_rpcs_ratelimiter_lock: Default::default(),
|
||||
})
|
||||
}
|
||||
|
||||
@ -182,8 +174,6 @@ impl Web3ProxyApp {
|
||||
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
|
||||
loop {
|
||||
// TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit
|
||||
let read_lock = self.private_rpcs_ratelimiter_lock.read().await;
|
||||
|
||||
match private_rpcs.get_upstream_servers().await {
|
||||
Ok(upstream_servers) => {
|
||||
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||
@ -216,16 +206,10 @@ impl Web3ProxyApp {
|
||||
Err(not_until) => {
|
||||
// TODO: move this to a helper function
|
||||
// sleep (with a lock) until our rate limits should be available
|
||||
drop(read_lock);
|
||||
|
||||
if let Some(not_until) = not_until {
|
||||
let write_lock = self.balanced_rpc_ratelimiter_lock.write().await;
|
||||
|
||||
let deadline = not_until.wait_time_from(self.clock.now());
|
||||
|
||||
sleep(deadline).await;
|
||||
|
||||
drop(write_lock);
|
||||
}
|
||||
}
|
||||
};
|
||||
@ -234,8 +218,6 @@ impl Web3ProxyApp {
|
||||
// this is not a private transaction (or no private relays are configured)
|
||||
// try to send to each tier, stopping at the first success
|
||||
loop {
|
||||
let read_lock = self.balanced_rpc_ratelimiter_lock.read().await;
|
||||
|
||||
// there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again
|
||||
let mut earliest_not_until = None;
|
||||
|
||||
@ -300,18 +282,10 @@ impl Web3ProxyApp {
|
||||
}
|
||||
|
||||
// we haven't returned an Ok, sleep and try again
|
||||
// TODO: move this to a helper function
|
||||
drop(read_lock);
|
||||
|
||||
// unwrap should be safe since we would have returned if it wasn't set
|
||||
if let Some(earliest_not_until) = earliest_not_until {
|
||||
let write_lock = self.balanced_rpc_ratelimiter_lock.write().await;
|
||||
|
||||
let deadline = earliest_not_until.wait_time_from(self.clock.now());
|
||||
|
||||
sleep(deadline).await;
|
||||
|
||||
drop(write_lock);
|
||||
} else {
|
||||
// TODO: how long should we wait?
|
||||
// TODO: max wait time?
|
||||
|
@ -6,7 +6,6 @@ use futures::StreamExt;
|
||||
use governor::clock::{QuantaClock, QuantaInstant};
|
||||
use governor::NotUntil;
|
||||
use serde_json::value::RawValue;
|
||||
use std::cmp;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::num::NonZeroU32;
|
||||
@ -192,62 +191,22 @@ impl Web3ProviderTier {
|
||||
})
|
||||
.collect();
|
||||
|
||||
// sort rpcs by their sync status and active connections
|
||||
// sort rpcs by their sync status
|
||||
// TODO: if we only changed one row, we don't need to sort the whole thing. i think we can do this better
|
||||
available_rpcs.sort_unstable_by(|a, b| {
|
||||
let a_synced = sync_status.get(a).unwrap();
|
||||
let b_synced = sync_status.get(b).unwrap();
|
||||
|
||||
match (a_synced, b_synced) {
|
||||
(SyncStatus::Synced(a), SyncStatus::Synced(b)) => {
|
||||
if a != b {
|
||||
return a.cmp(b);
|
||||
}
|
||||
// else they are equal and we want to compare on active connections
|
||||
}
|
||||
(SyncStatus::Synced(_), SyncStatus::Unknown) => {
|
||||
return cmp::Ordering::Greater;
|
||||
}
|
||||
(SyncStatus::Unknown, SyncStatus::Synced(_)) => {
|
||||
return cmp::Ordering::Less;
|
||||
}
|
||||
(SyncStatus::Unknown, SyncStatus::Unknown) => {
|
||||
// neither rpc is synced
|
||||
// this means neither will have connections
|
||||
return cmp::Ordering::Equal;
|
||||
}
|
||||
(SyncStatus::Synced(_), SyncStatus::Behind(_)) => {
|
||||
return cmp::Ordering::Greater;
|
||||
}
|
||||
(SyncStatus::Behind(_), SyncStatus::Synced(_)) => {
|
||||
return cmp::Ordering::Less;
|
||||
}
|
||||
(SyncStatus::Behind(_), SyncStatus::Unknown) => {
|
||||
return cmp::Ordering::Greater;
|
||||
}
|
||||
(SyncStatus::Behind(a), SyncStatus::Behind(b)) => {
|
||||
if a != b {
|
||||
return a.cmp(b);
|
||||
}
|
||||
// else they are equal and we want to compare on active connections
|
||||
}
|
||||
(SyncStatus::Unknown, SyncStatus::Behind(_)) => {
|
||||
return cmp::Ordering::Less;
|
||||
}
|
||||
}
|
||||
|
||||
// sort on active connections
|
||||
self.connections
|
||||
.get(a)
|
||||
.unwrap()
|
||||
.cmp(self.connections.get(b).unwrap())
|
||||
a_synced.cmp(b_synced)
|
||||
});
|
||||
|
||||
// filter out
|
||||
// filter out unsynced rpcs
|
||||
let synced_rpcs: Vec<String> = available_rpcs
|
||||
.into_iter()
|
||||
.take_while(|rpc| matches!(sync_status.get(rpc).unwrap(), SyncStatus::Synced(_)))
|
||||
.collect();
|
||||
|
||||
// TODO: is arcswap the best type for this?
|
||||
self.synced_rpcs.swap(Arc::new(synced_rpcs));
|
||||
|
||||
Ok(())
|
||||
@ -257,7 +216,19 @@ impl Web3ProviderTier {
|
||||
pub async fn next_upstream_server(&self) -> Result<String, Option<NotUntil<QuantaInstant>>> {
|
||||
let mut earliest_not_until = None;
|
||||
|
||||
for selected_rpc in self.synced_rpcs.load().iter() {
|
||||
// TODO: this clone is probably not the best way to do this
|
||||
let mut synced_rpcs = Vec::clone(&*self.synced_rpcs.load());
|
||||
|
||||
// TODO: we don't want to sort on active connections. we want to sort on remaining capacity for connections. for example, geth can handle more than erigon
|
||||
synced_rpcs.sort_unstable_by(|a, b| {
|
||||
// sort on active connections
|
||||
self.connections
|
||||
.get(a)
|
||||
.unwrap()
|
||||
.cmp(self.connections.get(b).unwrap())
|
||||
});
|
||||
|
||||
for selected_rpc in synced_rpcs.iter() {
|
||||
// increment our connection counter
|
||||
if let Err(not_until) = self
|
||||
.connections
|
||||
|
Loading…
Reference in New Issue
Block a user