From f9be6a2ef13da34035a20793703a4cf07f42c7c1 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sun, 24 Apr 2022 18:56:46 +0000 Subject: [PATCH] parallel processing --- Cargo.lock | 87 ++++++++++++++++++++++++++++ Cargo.toml | 2 + src/main.rs | 160 ++++++++++++++++++++++++++++++++++++++-------------- 3 files changed, 207 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dcda5846..0d569783 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -37,6 +37,12 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6f8c380fa28aa1b36107cd97f0196474bb7241bb95a453c5c01a15ac74b2eac" +[[package]] +name = "atomic-counter" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62f447d68cfa5a9ab0c1c862a703da2a65b5ed1b7ce1153c9eb0169506d56019" + [[package]] name = "autocfg" version = "1.1.0" @@ -138,6 +144,16 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38" +dependencies = [ + "cfg-if", + "lazy_static", +] + [[package]] name = "crypto-common" version = "0.1.3" @@ -299,6 +315,12 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.21" @@ -338,6 +360,23 @@ dependencies = [ "wasi 0.10.2+wasi-snapshot-preview1", ] +[[package]] +name = "governor" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19775995ee20209163239355bc3ad2f33f83da35d9ef72dea26e5af753552c87" +dependencies = [ + "dashmap", + "futures", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot", + "quanta", + "rand", + "smallvec", +] + [[package]] name = "h2" version = "0.3.13" @@ -559,6 +598,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "mach" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa" +dependencies = [ + "libc", +] + [[package]] name = "matches" version = "0.1.9" @@ -646,6 +694,18 @@ dependencies = [ "tempfile", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "ntapi" version = "0.3.7" @@ -792,6 +852,22 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "quanta" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20afe714292d5e879d8b12740aa223c6a88f118af41870e8b6196e39a02238a8" +dependencies = [ + "crossbeam-utils", + "libc", + "mach", + "once_cell", + "raw-cpuid", + "wasi 0.10.2+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -837,6 +913,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "raw-cpuid" +version = "10.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "738bc47119e3eeccc7e94c4a506901aea5e7b4944ecd0829cbebf4af04ceda12" +dependencies = [ + "bitflags", +] + [[package]] name = "redox_syscall" version = "0.2.13" @@ -1464,8 +1549,10 @@ version = "0.1.0" dependencies = [ "anyhow", "argh", + "atomic-counter", "dashmap", "futures", + "governor", "reqwest", "serde_json", "tokio", diff --git a/Cargo.toml b/Cargo.toml index ee19ec3d..e0738822 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,8 +8,10 @@ edition = "2021" [dependencies] argh = "0.1" anyhow = "1.0" +atomic-counter = "1.0.1" dashmap = "5.2" futures = { version = "0.3.0", features = ["thread-pool"]} +governor = { version = "0.4.2", features = ["dashmap", "std"]} tokio = { version = "1", features = ["full"] } reqwest = { version = "0.11", features = ["json"] } serde_json = { version = "1.0", default-features = false, features = ["alloc"] } diff --git a/src/main.rs b/src/main.rs index 6964e3ac..7748e440 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,43 +1,64 @@ use dashmap::DashMap; -use futures::stream::FuturesUnordered; +use futures::stream; use futures::StreamExt; +use governor::clock::{QuantaClock, QuantaInstant}; +use governor::middleware::NoOpMiddleware; +use governor::state::{InMemoryState, NotKeyed}; +use governor::{NotUntil, RateLimiter}; +use std::num::NonZeroU32; use std::sync::Arc; use tokio::sync::RwLock; -use tokio::time::{sleep, Duration}; +// use tokio::time::{sleep, Duration}; use warp::Filter; +// TODO: what should this be? +const PARALLEL_REQUESTS: usize = 4; + +type RpcRateLimiter = + RateLimiter>; + /// Load balance to the least-connection rpc struct BalancedRpcs { rpcs: RwLock>, connections: DashMap, // TODO: what type? store with connections? - // ratelimits: DashMap, + // ratelimits: RateLimiter, dyn governor::clock::Clock>, + ratelimits: DashMap, } // TODO: also pass rate limits to this? -impl Into for Vec<&str> { +impl Into for Vec<(&str, u32)> { fn into(self) -> BalancedRpcs { let mut rpcs: Vec = vec![]; let connections = DashMap::new(); - // let ratelimits = DashMap::new(); + let ratelimits = DashMap::new(); - // TODO: i'm sure there is a better way to do this with more iterator things like collect, but this works - for s in self.into_iter() { + // TODO: where should we get the rate limits from? + // TODO: this is not going to work. we need different rate limits for different endpoints + + for (s, limit) in self.into_iter() { rpcs.push(s.to_string()); connections.insert(s.to_string(), 0); - // ratelimits.insert(s.to_string(), 0); + + if limit > 0 { + let quota = governor::Quota::per_second(NonZeroU32::new(limit).unwrap()); + + let rate_limiter = governor::RateLimiter::direct(quota); + + ratelimits.insert(s.to_string(), rate_limiter); + } } BalancedRpcs { rpcs: RwLock::new(rpcs), connections, - // ratelimits, + ratelimits, } } } impl BalancedRpcs { - async fn get_upstream_server(&self) -> Option { + async fn get_upstream_server(&self) -> Result> { let mut balanced_rpcs = self.rpcs.write().await; balanced_rpcs.sort_unstable_by(|a, b| { @@ -47,15 +68,47 @@ impl BalancedRpcs { .cmp(&self.connections.get(b).unwrap()) }); - // TODO: don't just grab the first. check rate limits - if let Some(selected_rpc) = balanced_rpcs.first() { + let mut earliest_not_until = None; + + for selected_rpc in balanced_rpcs.iter() { + // check rate limits + match self.ratelimits.get(selected_rpc).unwrap().check() { + Ok(_) => { + // rate limit succeeded + } + Err(not_until) => { + // rate limit failed + // save the smallest not_until. if nothing succeeds, return an Err with not_until in it + if earliest_not_until.is_none() { + earliest_not_until = Some(not_until); + } else { + let earliest_possible = + earliest_not_until.as_ref().unwrap().earliest_possible(); + let new_earliest_possible = not_until.earliest_possible(); + + if earliest_possible > new_earliest_possible { + earliest_not_until = Some(not_until); + } + } + continue; + } + }; + + // increment our connection counter + // TODO: need to change this to be an atomic counter! let mut connections = self.connections.get_mut(selected_rpc).unwrap(); *connections += 1; - return Some(selected_rpc.clone()); + // return the selected RPC + return Ok(selected_rpc.clone()); } - None + // return the smallest not_until + if let Some(not_until) = earliest_not_until { + return Err(not_until); + } else { + unimplemented!(); + } } } @@ -71,7 +124,6 @@ impl Into for Vec<&str> { let mut rpcs: Vec = vec![]; // let ratelimits = DashMap::new(); - // TODO: i'm sure there is a better way to do this with more iterator things like collect, but this works for s in self.into_iter() { rpcs.push(s.to_string()); // ratelimits.insert(s.to_string(), 0); @@ -101,7 +153,7 @@ struct Web3ProxyState { } impl Web3ProxyState { - fn new(balanced_rpc_tiers: Vec>, private_rpcs: Vec<&str>) -> Web3ProxyState { + fn new(balanced_rpc_tiers: Vec>, private_rpcs: Vec<&str>) -> Web3ProxyState { // TODO: warn if no private relays Web3ProxyState { client: reqwest::Client::new(), @@ -129,7 +181,7 @@ impl Web3ProxyState { } else { // this is not a private transaction (or no private relays are configured) for balanced_rpcs in self.balanced_rpc_tiers.iter() { - if let Some(upstream_server) = balanced_rpcs.get_upstream_server().await { + if let Ok(upstream_server) = balanced_rpcs.get_upstream_server().await { // TODO: capture any errors. at least log them if let Ok(result) = self .try_send_requests(vec![upstream_server], &json_body) @@ -137,6 +189,8 @@ impl Web3ProxyState { { return Ok(result); } + } else { + // TODO: if we got an error. save the ratelimit NotUntil so we can sleep until then before trying again } } } @@ -150,33 +204,48 @@ impl Web3ProxyState { json_body: &serde_json::Value, ) -> anyhow::Result { // send the query to all the servers - let mut future_responses = FuturesUnordered::new(); - for upstream_server in upstream_servers.into_iter() { - let f = self.client.post(upstream_server).json(&json_body).send(); + let mut bodies = stream::iter(upstream_servers) + .map(|url| { + let client = self.client.clone(); + let json_body = json_body.clone(); + tokio::spawn(async move { + let resp = client.post(url).json(&json_body).send().await?; + resp.text().await + }) + }) + .buffer_unordered(PARALLEL_REQUESTS); - future_responses.push(f); - } + let mut oks = vec![]; + let mut errs = vec![]; - // start loading text responses - let mut future_text = FuturesUnordered::new(); - while let Some(request) = future_responses.next().await { - if let Ok(request) = request { - let f = request.text(); - - future_text.push(f); + while let Some(b) = bodies.next().await { + // TODO: reduce connection counter + match b { + Ok(Ok(b)) => { + // TODO: if "no block with that header", skip this response (maybe retry) + oks.push(b); + } + Ok(Err(e)) => { + // TODO: better errors + eprintln!("Got a reqwest::Error: {}", e); + errs.push(anyhow::anyhow!("Got a reqwest::Error")); + } + Err(e) => { + // TODO: better errors + eprintln!("Got a tokio::JoinError: {}", e); + errs.push(anyhow::anyhow!("Got a tokio::JoinError")); + } } } - // return the first response - while let Some(text) = future_text.next().await { - if let Ok(text) = text { - // TODO: if "no block with that header", skip this response (maybe retry) - return Ok(text); - } - // TODO: capture errors + // TODO: which response should we use? + if oks.len() > 0 { + return Ok(oks.pop().unwrap()); + } else if errs.len() > 0 { + return Err(errs.pop().unwrap()); + } else { + return Err(anyhow::anyhow!("no successful responses")); } - - Err(anyhow::anyhow!("no successful responses")) } } @@ -190,13 +259,20 @@ async fn main() { let state = Web3ProxyState::new( vec![ // local nodes - vec!["https://10.11.12.16:8545"], + vec![("https://10.11.12.16:8545", 0)], // paid nodes - // TODO: add them + // TODO: add paid nodes (with rate limits) // free nodes - vec!["https://main-rpc.linkpool.io", "https://rpc.ankr.com/eth"], + // TODO: add rate limits + vec![ + ("https://main-rpc.linkpool.io", 0), + ("https://rpc.ankr.com/eth", 0), + ], + ], + vec![ + "https://api.edennetwork.io/v1/beta", + "https://api.edennetwork.io/v1/", ], - vec!["https://api.edennetwork.io/v1/beta"], ); let state: Arc = Arc::new(state);