sort be percents

This commit is contained in:
Bryan Stitt 2022-04-29 22:21:40 +00:00
parent e350f33616
commit 70a4300542
4 changed files with 69 additions and 35 deletions

View File

@ -27,14 +27,14 @@ impl Ord for SyncStatus {
fn cmp(&self, other: &Self) -> cmp::Ordering { fn cmp(&self, other: &Self) -> cmp::Ordering {
match (self, other) { match (self, other) {
(SyncStatus::Synced(a), SyncStatus::Synced(b)) => a.cmp(b), (SyncStatus::Synced(a), SyncStatus::Synced(b)) => a.cmp(b),
(SyncStatus::Synced(_), SyncStatus::Unknown) => cmp::Ordering::Greater,
(SyncStatus::Unknown, SyncStatus::Synced(_)) => cmp::Ordering::Less,
(SyncStatus::Unknown, SyncStatus::Unknown) => cmp::Ordering::Equal,
(SyncStatus::Synced(_), SyncStatus::Behind(_)) => cmp::Ordering::Greater, (SyncStatus::Synced(_), SyncStatus::Behind(_)) => cmp::Ordering::Greater,
(SyncStatus::Synced(_), SyncStatus::Unknown) => cmp::Ordering::Greater,
(SyncStatus::Behind(_), SyncStatus::Synced(_)) => cmp::Ordering::Less, (SyncStatus::Behind(_), SyncStatus::Synced(_)) => cmp::Ordering::Less,
(SyncStatus::Behind(_), SyncStatus::Unknown) => cmp::Ordering::Greater,
(SyncStatus::Behind(a), SyncStatus::Behind(b)) => a.cmp(b), (SyncStatus::Behind(a), SyncStatus::Behind(b)) => a.cmp(b),
(SyncStatus::Behind(_), SyncStatus::Unknown) => cmp::Ordering::Greater,
(SyncStatus::Unknown, SyncStatus::Synced(_)) => cmp::Ordering::Less,
(SyncStatus::Unknown, SyncStatus::Behind(_)) => cmp::Ordering::Less, (SyncStatus::Unknown, SyncStatus::Behind(_)) => cmp::Ordering::Less,
(SyncStatus::Unknown, SyncStatus::Unknown) => cmp::Ordering::Equal,
} }
} }
} }

View File

@ -47,8 +47,8 @@ impl fmt::Debug for Web3ProxyApp {
impl Web3ProxyApp { impl Web3ProxyApp {
async fn try_new( async fn try_new(
allowed_lag: u64, allowed_lag: u64,
balanced_rpc_tiers: Vec<Vec<(&str, u32)>>, balanced_rpc_tiers: Vec<Vec<(&str, u32, Option<u32>)>>,
private_rpcs: Vec<(&str, u32)>, private_rpcs: Vec<(&str, u32, Option<u32>)>,
) -> anyhow::Result<Web3ProxyApp> { ) -> anyhow::Result<Web3ProxyApp> {
let clock = QuantaClock::default(); let clock = QuantaClock::default();
@ -242,7 +242,7 @@ impl Web3ProxyApp {
json!({ json!({
// TODO: re-use their jsonrpc? // TODO: re-use their jsonrpc?
"jsonrpc": json_body.jsonrpc, "jsonrpc": "2.0",
"id": json_body.id, "id": json_body.id,
"result": partial_response "result": partial_response
}) })
@ -250,7 +250,7 @@ impl Web3ProxyApp {
Err(e) => { Err(e) => {
// TODO: what is the proper format for an error? // TODO: what is the proper format for an error?
json!({ json!({
"jsonrpc": json_body.jsonrpc, "jsonrpc": "2.0",
"id": json_body.id, "id": json_body.id,
"error": format!("{}", e) "error": format!("{}", e)
}) })
@ -312,23 +312,27 @@ async fn main() {
allowed_lag, allowed_lag,
vec![ vec![
// local nodes // local nodes
vec![("ws://10.11.12.16:8545", 0), ("ws://10.11.12.16:8946", 0)], vec![
("ws://10.11.12.16:8545", 68_800, None),
("ws://10.11.12.16:8946", 152_138, None),
],
// paid nodes // paid nodes
// TODO: add paid nodes (with rate limits) // TODO: add paid nodes (with rate limits)
// vec![ // vec![
// // chainstack.com archive // // chainstack.com archive
// // moralis free (25/sec rate limit)
// ], // ],
// free nodes // free nodes
// vec![ // vec![
// // ("https://main-rpc.linkpool.io", 0), // linkpool is slow and often offline // // ("https://main-rpc.linkpool.io", 4_779, None), // linkpool is slow and often offline
// ("https://rpc.ankr.com/eth", 0), // ("https://rpc.ankr.com/eth", 23_967, None),
// ], // ],
], ],
vec![ vec![
// ("https://api.edennetwork.io/v1/", 0), // ("https://api.edennetwork.io/v1/", 1_805, None),
// ("https://api.edennetwork.io/v1/beta", 0), // ("https://api.edennetwork.io/v1/beta", 300, None),
// ("https://rpc.ethermine.org/", 0), // ("https://rpc.ethermine.org/", 5_861, None),
// ("https://rpc.flashbots.net", 0), // ("https://rpc.flashbots.net", 7074, None),
], ],
) )
.await .await

View File

@ -10,7 +10,7 @@ use governor::RateLimiter;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::value::RawValue; use serde_json::value::RawValue;
use std::fmt; use std::fmt;
use std::sync::atomic::{self, AtomicUsize}; use std::sync::atomic::{self, AtomicU32};
use std::time::Duration; use std::time::Duration;
use std::{cmp::Ordering, sync::Arc}; use std::{cmp::Ordering, sync::Arc};
use tokio::time::interval; use tokio::time::interval;
@ -23,7 +23,6 @@ type Web3RateLimiter =
#[derive(Clone, Deserialize)] #[derive(Clone, Deserialize)]
pub struct JsonRpcRequest { pub struct JsonRpcRequest {
pub jsonrpc: Box<RawValue>,
pub id: Box<RawValue>, pub id: Box<RawValue>,
pub method: String, pub method: String,
pub params: Box<RawValue>, pub params: Box<RawValue>,
@ -38,9 +37,9 @@ impl fmt::Debug for JsonRpcRequest {
} }
} }
// TODO: check for errors too!
#[derive(Clone, Deserialize, Serialize)] #[derive(Clone, Deserialize, Serialize)]
pub struct JsonRpcForwardedResponse { pub struct JsonRpcForwardedResponse {
pub jsonrpc: Box<RawValue>,
pub id: Box<RawValue>, pub id: Box<RawValue>,
pub result: Box<RawValue>, pub result: Box<RawValue>,
} }
@ -124,12 +123,19 @@ impl Web3Provider {
} }
/// An active connection to a Web3Rpc /// An active connection to a Web3Rpc
#[derive(Debug)]
pub struct Web3Connection { pub struct Web3Connection {
/// keep track of currently open requests. We sort on this /// keep track of currently open requests. We sort on this
active_requests: AtomicUsize, active_requests: AtomicU32,
provider: Arc<Web3Provider>, provider: Arc<Web3Provider>,
ratelimiter: Option<Web3RateLimiter>, ratelimiter: Option<Web3RateLimiter>,
/// used for load balancing to the least loaded server
soft_limit: f32,
}
impl fmt::Debug for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Web3Connection").finish_non_exhaustive()
}
} }
impl Web3Connection { impl Web3Connection {
@ -142,7 +148,8 @@ impl Web3Connection {
url_str: String, url_str: String,
http_client: Option<reqwest::Client>, http_client: Option<reqwest::Client>,
block_watcher_sender: BlockWatcherSender, block_watcher_sender: BlockWatcherSender,
ratelimiter: Option<Web3RateLimiter>, hard_rate_limiter: Option<Web3RateLimiter>,
soft_limit: f32,
) -> anyhow::Result<Web3Connection> { ) -> anyhow::Result<Web3Connection> {
let provider = if url_str.starts_with("http") { let provider = if url_str.starts_with("http") {
let url: url::Url = url_str.parse()?; let url: url::Url = url_str.parse()?;
@ -184,7 +191,8 @@ impl Web3Connection {
Ok(Web3Connection { Ok(Web3Connection {
active_requests: Default::default(), active_requests: Default::default(),
provider, provider,
ratelimiter, ratelimiter: hard_rate_limiter,
soft_limit,
}) })
} }
@ -223,12 +231,33 @@ impl Eq for Web3Connection {}
impl Ord for Web3Connection { impl Ord for Web3Connection {
fn cmp(&self, other: &Self) -> std::cmp::Ordering { fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// TODO: what atomic ordering?! // TODO: what atomic ordering?!
self.active_requests let a = self.active_requests.load(atomic::Ordering::Acquire);
.load(atomic::Ordering::Acquire) let b = other.active_requests.load(atomic::Ordering::Acquire);
.cmp(&other.active_requests.load(atomic::Ordering::Acquire))
// TODO: how should we include the soft limit? floats are slower than integer math
let a = a as f32 / self.soft_limit;
let b = b as f32 / other.soft_limit;
a.partial_cmp(&b).unwrap()
} }
} }
/**
Lets say geth has 1000 active requests and erigon also has 1000 active requests
geth is much faster, so we want it to get the rest
geth's soft limit is 120k. erigon's soft limit is 60k
120 vs 60
Lets say geth has 100k active requests and erigon has 100k active requests. same soft limits
0.8333
*/
impl PartialOrd for Web3Connection { impl PartialOrd for Web3Connection {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> { fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other)) Some(self.cmp(other))

View File

@ -123,7 +123,7 @@ impl fmt::Debug for Web3ProviderTier {
impl Web3ProviderTier { impl Web3ProviderTier {
pub async fn try_new( pub async fn try_new(
servers: Vec<(&str, u32)>, servers: Vec<(&str, u32, Option<u32>)>,
http_client: Option<reqwest::Client>, http_client: Option<reqwest::Client>,
block_watcher: Arc<BlockWatcher>, block_watcher: Arc<BlockWatcher>,
clock: &QuantaClock, clock: &QuantaClock,
@ -131,11 +131,11 @@ impl Web3ProviderTier {
let mut rpcs: Vec<String> = vec![]; let mut rpcs: Vec<String> = vec![];
let mut connections = HashMap::new(); let mut connections = HashMap::new();
for (s, limit) in servers.into_iter() { for (s, soft_limit, hard_limit) in servers.into_iter() {
rpcs.push(s.to_string()); rpcs.push(s.to_string());
let ratelimiter = if limit > 0 { let hard_rate_limiter = if let Some(hard_limit) = hard_limit {
let quota = governor::Quota::per_second(NonZeroU32::new(limit).unwrap()); let quota = governor::Quota::per_second(NonZeroU32::new(hard_limit).unwrap());
let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock); let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock);
@ -148,7 +148,8 @@ impl Web3ProviderTier {
s.to_string(), s.to_string(),
http_client.clone(), http_client.clone(),
block_watcher.clone_sender(), block_watcher.clone_sender(),
ratelimiter, hard_rate_limiter,
soft_limit as f32,
) )
.await?; .await?;
@ -192,7 +193,7 @@ impl Web3ProviderTier {
.collect(); .collect();
// sort rpcs by their sync status // sort rpcs by their sync status
// TODO: if we only changed one row, we don't need to sort the whole thing. i think we can do this better // TODO: if we only changed one entry, we don't need to sort the whole thing. we can do this better
available_rpcs.sort_unstable_by(|a, b| { available_rpcs.sort_unstable_by(|a, b| {
let a_synced = sync_status.get(a).unwrap(); let a_synced = sync_status.get(a).unwrap();
let b_synced = sync_status.get(b).unwrap(); let b_synced = sync_status.get(b).unwrap();
@ -221,11 +222,11 @@ impl Web3ProviderTier {
// TODO: we don't want to sort on active connections. we want to sort on remaining capacity for connections. for example, geth can handle more than erigon // TODO: we don't want to sort on active connections. we want to sort on remaining capacity for connections. for example, geth can handle more than erigon
synced_rpcs.sort_unstable_by(|a, b| { synced_rpcs.sort_unstable_by(|a, b| {
let a = self.connections.get(a).unwrap();
let b = self.connections.get(b).unwrap();
// sort on active connections // sort on active connections
self.connections a.cmp(b)
.get(a)
.unwrap()
.cmp(self.connections.get(b).unwrap())
}); });
for selected_rpc in synced_rpcs.iter() { for selected_rpc in synced_rpcs.iter() {