From 70a43005423db1aa7d66c254be1c12380eb9fd4f Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 29 Apr 2022 22:21:40 +0000 Subject: [PATCH] sort be percents --- src/block_watcher.rs | 8 +++---- src/main.rs | 26 +++++++++++++---------- src/provider.rs | 49 ++++++++++++++++++++++++++++++++++--------- src/provider_tiers.rs | 21 ++++++++++--------- 4 files changed, 69 insertions(+), 35 deletions(-) diff --git a/src/block_watcher.rs b/src/block_watcher.rs index 66c1f627..cfe9561e 100644 --- a/src/block_watcher.rs +++ b/src/block_watcher.rs @@ -27,14 +27,14 @@ impl Ord for SyncStatus { fn cmp(&self, other: &Self) -> cmp::Ordering { match (self, other) { (SyncStatus::Synced(a), SyncStatus::Synced(b)) => a.cmp(b), - (SyncStatus::Synced(_), SyncStatus::Unknown) => cmp::Ordering::Greater, - (SyncStatus::Unknown, SyncStatus::Synced(_)) => cmp::Ordering::Less, - (SyncStatus::Unknown, SyncStatus::Unknown) => cmp::Ordering::Equal, (SyncStatus::Synced(_), SyncStatus::Behind(_)) => cmp::Ordering::Greater, + (SyncStatus::Synced(_), SyncStatus::Unknown) => cmp::Ordering::Greater, (SyncStatus::Behind(_), SyncStatus::Synced(_)) => cmp::Ordering::Less, - (SyncStatus::Behind(_), SyncStatus::Unknown) => cmp::Ordering::Greater, (SyncStatus::Behind(a), SyncStatus::Behind(b)) => a.cmp(b), + (SyncStatus::Behind(_), SyncStatus::Unknown) => cmp::Ordering::Greater, + (SyncStatus::Unknown, SyncStatus::Synced(_)) => cmp::Ordering::Less, (SyncStatus::Unknown, SyncStatus::Behind(_)) => cmp::Ordering::Less, + (SyncStatus::Unknown, SyncStatus::Unknown) => cmp::Ordering::Equal, } } } diff --git a/src/main.rs b/src/main.rs index d6909d42..5524c8e0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,8 +47,8 @@ impl fmt::Debug for Web3ProxyApp { impl Web3ProxyApp { async fn try_new( allowed_lag: u64, - balanced_rpc_tiers: Vec>, - private_rpcs: Vec<(&str, u32)>, + balanced_rpc_tiers: Vec)>>, + private_rpcs: Vec<(&str, u32, Option)>, ) -> anyhow::Result { let clock = QuantaClock::default(); @@ -242,7 +242,7 @@ impl Web3ProxyApp { json!({ // TODO: re-use their jsonrpc? - "jsonrpc": json_body.jsonrpc, + "jsonrpc": "2.0", "id": json_body.id, "result": partial_response }) @@ -250,7 +250,7 @@ impl Web3ProxyApp { Err(e) => { // TODO: what is the proper format for an error? json!({ - "jsonrpc": json_body.jsonrpc, + "jsonrpc": "2.0", "id": json_body.id, "error": format!("{}", e) }) @@ -312,23 +312,27 @@ async fn main() { allowed_lag, vec![ // local nodes - vec![("ws://10.11.12.16:8545", 0), ("ws://10.11.12.16:8946", 0)], + vec![ + ("ws://10.11.12.16:8545", 68_800, None), + ("ws://10.11.12.16:8946", 152_138, None), + ], // paid nodes // TODO: add paid nodes (with rate limits) // vec![ // // chainstack.com archive + // // moralis free (25/sec rate limit) // ], // free nodes // vec![ - // // ("https://main-rpc.linkpool.io", 0), // linkpool is slow and often offline - // ("https://rpc.ankr.com/eth", 0), + // // ("https://main-rpc.linkpool.io", 4_779, None), // linkpool is slow and often offline + // ("https://rpc.ankr.com/eth", 23_967, None), // ], ], vec![ - // ("https://api.edennetwork.io/v1/", 0), - // ("https://api.edennetwork.io/v1/beta", 0), - // ("https://rpc.ethermine.org/", 0), - // ("https://rpc.flashbots.net", 0), + // ("https://api.edennetwork.io/v1/", 1_805, None), + // ("https://api.edennetwork.io/v1/beta", 300, None), + // ("https://rpc.ethermine.org/", 5_861, None), + // ("https://rpc.flashbots.net", 7074, None), ], ) .await diff --git a/src/provider.rs b/src/provider.rs index d3dfe3cb..6d9aca4d 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -10,7 +10,7 @@ use governor::RateLimiter; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; use std::fmt; -use std::sync::atomic::{self, AtomicUsize}; +use std::sync::atomic::{self, AtomicU32}; use std::time::Duration; use std::{cmp::Ordering, sync::Arc}; use tokio::time::interval; @@ -23,7 +23,6 @@ type Web3RateLimiter = #[derive(Clone, Deserialize)] pub struct JsonRpcRequest { - pub jsonrpc: Box, pub id: Box, pub method: String, pub params: Box, @@ -38,9 +37,9 @@ impl fmt::Debug for JsonRpcRequest { } } +// TODO: check for errors too! #[derive(Clone, Deserialize, Serialize)] pub struct JsonRpcForwardedResponse { - pub jsonrpc: Box, pub id: Box, pub result: Box, } @@ -124,12 +123,19 @@ impl Web3Provider { } /// An active connection to a Web3Rpc -#[derive(Debug)] pub struct Web3Connection { /// keep track of currently open requests. We sort on this - active_requests: AtomicUsize, + active_requests: AtomicU32, provider: Arc, ratelimiter: Option, + /// used for load balancing to the least loaded server + soft_limit: f32, +} + +impl fmt::Debug for Web3Connection { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Web3Connection").finish_non_exhaustive() + } } impl Web3Connection { @@ -142,7 +148,8 @@ impl Web3Connection { url_str: String, http_client: Option, block_watcher_sender: BlockWatcherSender, - ratelimiter: Option, + hard_rate_limiter: Option, + soft_limit: f32, ) -> anyhow::Result { let provider = if url_str.starts_with("http") { let url: url::Url = url_str.parse()?; @@ -184,7 +191,8 @@ impl Web3Connection { Ok(Web3Connection { active_requests: Default::default(), provider, - ratelimiter, + ratelimiter: hard_rate_limiter, + soft_limit, }) } @@ -223,12 +231,33 @@ impl Eq for Web3Connection {} impl Ord for Web3Connection { fn cmp(&self, other: &Self) -> std::cmp::Ordering { // TODO: what atomic ordering?! - self.active_requests - .load(atomic::Ordering::Acquire) - .cmp(&other.active_requests.load(atomic::Ordering::Acquire)) + let a = self.active_requests.load(atomic::Ordering::Acquire); + let b = other.active_requests.load(atomic::Ordering::Acquire); + + // TODO: how should we include the soft limit? floats are slower than integer math + let a = a as f32 / self.soft_limit; + let b = b as f32 / other.soft_limit; + + a.partial_cmp(&b).unwrap() } } +/** + +Lets say geth has 1000 active requests and erigon also has 1000 active requests + +geth is much faster, so we want it to get the rest + +geth's soft limit is 120k. erigon's soft limit is 60k + +120 vs 60 + +Lets say geth has 100k active requests and erigon has 100k active requests. same soft limits + +0.8333 + + */ + impl PartialOrd for Web3Connection { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) diff --git a/src/provider_tiers.rs b/src/provider_tiers.rs index fc3c5767..b82f7a51 100644 --- a/src/provider_tiers.rs +++ b/src/provider_tiers.rs @@ -123,7 +123,7 @@ impl fmt::Debug for Web3ProviderTier { impl Web3ProviderTier { pub async fn try_new( - servers: Vec<(&str, u32)>, + servers: Vec<(&str, u32, Option)>, http_client: Option, block_watcher: Arc, clock: &QuantaClock, @@ -131,11 +131,11 @@ impl Web3ProviderTier { let mut rpcs: Vec = vec![]; let mut connections = HashMap::new(); - for (s, limit) in servers.into_iter() { + for (s, soft_limit, hard_limit) in servers.into_iter() { rpcs.push(s.to_string()); - let ratelimiter = if limit > 0 { - let quota = governor::Quota::per_second(NonZeroU32::new(limit).unwrap()); + let hard_rate_limiter = if let Some(hard_limit) = hard_limit { + let quota = governor::Quota::per_second(NonZeroU32::new(hard_limit).unwrap()); let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock); @@ -148,7 +148,8 @@ impl Web3ProviderTier { s.to_string(), http_client.clone(), block_watcher.clone_sender(), - ratelimiter, + hard_rate_limiter, + soft_limit as f32, ) .await?; @@ -192,7 +193,7 @@ impl Web3ProviderTier { .collect(); // sort rpcs by their sync status - // TODO: if we only changed one row, we don't need to sort the whole thing. i think we can do this better + // TODO: if we only changed one entry, we don't need to sort the whole thing. we can do this better available_rpcs.sort_unstable_by(|a, b| { let a_synced = sync_status.get(a).unwrap(); let b_synced = sync_status.get(b).unwrap(); @@ -221,11 +222,11 @@ impl Web3ProviderTier { // TODO: we don't want to sort on active connections. we want to sort on remaining capacity for connections. for example, geth can handle more than erigon synced_rpcs.sort_unstable_by(|a, b| { + let a = self.connections.get(a).unwrap(); + let b = self.connections.get(b).unwrap(); + // sort on active connections - self.connections - .get(a) - .unwrap() - .cmp(self.connections.get(b).unwrap()) + a.cmp(b) }); for selected_rpc in synced_rpcs.iter() {