parallel processing

This commit is contained in:
Bryan Stitt 2022-04-24 18:56:46 +00:00
parent 5a6467fa3f
commit f9be6a2ef1
3 changed files with 207 additions and 42 deletions

87
Cargo.lock generated

@ -37,6 +37,12 @@ version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6f8c380fa28aa1b36107cd97f0196474bb7241bb95a453c5c01a15ac74b2eac"
[[package]]
name = "atomic-counter"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62f447d68cfa5a9ab0c1c862a703da2a65b5ed1b7ce1153c9eb0169506d56019"
[[package]]
name = "autocfg"
version = "1.1.0"
@ -138,6 +144,16 @@ dependencies = [
"libc",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38"
dependencies = [
"cfg-if",
"lazy_static",
]
[[package]]
name = "crypto-common"
version = "0.1.3"
@ -299,6 +315,12 @@ version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a"
[[package]]
name = "futures-timer"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]]
name = "futures-util"
version = "0.3.21"
@ -338,6 +360,23 @@ dependencies = [
"wasi 0.10.2+wasi-snapshot-preview1",
]
[[package]]
name = "governor"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19775995ee20209163239355bc3ad2f33f83da35d9ef72dea26e5af753552c87"
dependencies = [
"dashmap",
"futures",
"futures-timer",
"no-std-compat",
"nonzero_ext",
"parking_lot",
"quanta",
"rand",
"smallvec",
]
[[package]]
name = "h2"
version = "0.3.13"
@ -559,6 +598,15 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "mach"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa"
dependencies = [
"libc",
]
[[package]]
name = "matches"
version = "0.1.9"
@ -646,6 +694,18 @@ dependencies = [
"tempfile",
]
[[package]]
name = "no-std-compat"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
[[package]]
name = "nonzero_ext"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "ntapi"
version = "0.3.7"
@ -792,6 +852,22 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "quanta"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20afe714292d5e879d8b12740aa223c6a88f118af41870e8b6196e39a02238a8"
dependencies = [
"crossbeam-utils",
"libc",
"mach",
"once_cell",
"raw-cpuid",
"wasi 0.10.2+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
[[package]]
name = "quick-error"
version = "1.2.3"
@ -837,6 +913,15 @@ dependencies = [
"getrandom",
]
[[package]]
name = "raw-cpuid"
version = "10.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "738bc47119e3eeccc7e94c4a506901aea5e7b4944ecd0829cbebf4af04ceda12"
dependencies = [
"bitflags",
]
[[package]]
name = "redox_syscall"
version = "0.2.13"
@ -1464,8 +1549,10 @@ version = "0.1.0"
dependencies = [
"anyhow",
"argh",
"atomic-counter",
"dashmap",
"futures",
"governor",
"reqwest",
"serde_json",
"tokio",

@ -8,8 +8,10 @@ edition = "2021"
[dependencies]
argh = "0.1"
anyhow = "1.0"
atomic-counter = "1.0.1"
dashmap = "5.2"
futures = { version = "0.3.0", features = ["thread-pool"]}
governor = { version = "0.4.2", features = ["dashmap", "std"]}
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.11", features = ["json"] }
serde_json = { version = "1.0", default-features = false, features = ["alloc"] }

@ -1,43 +1,64 @@
use dashmap::DashMap;
use futures::stream::FuturesUnordered;
use futures::stream;
use futures::StreamExt;
use governor::clock::{QuantaClock, QuantaInstant};
use governor::middleware::NoOpMiddleware;
use governor::state::{InMemoryState, NotKeyed};
use governor::{NotUntil, RateLimiter};
use std::num::NonZeroU32;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::time::{sleep, Duration};
// use tokio::time::{sleep, Duration};
use warp::Filter;
// TODO: what should this be?
const PARALLEL_REQUESTS: usize = 4;
type RpcRateLimiter =
RateLimiter<NotKeyed, InMemoryState, QuantaClock, NoOpMiddleware<QuantaInstant>>;
/// Load balance to the least-connection rpc
struct BalancedRpcs {
rpcs: RwLock<Vec<String>>,
connections: DashMap<String, u32>,
// TODO: what type? store with connections?
// ratelimits: DashMap<String, u32>,
// ratelimits: RateLimiter<K, DashMapStateStore<K>, dyn governor::clock::Clock>,
ratelimits: DashMap<String, RpcRateLimiter>,
}
// TODO: also pass rate limits to this?
impl Into<BalancedRpcs> for Vec<&str> {
impl Into<BalancedRpcs> for Vec<(&str, u32)> {
fn into(self) -> BalancedRpcs {
let mut rpcs: Vec<String> = vec![];
let connections = DashMap::new();
// let ratelimits = DashMap::new();
let ratelimits = DashMap::new();
// TODO: i'm sure there is a better way to do this with more iterator things like collect, but this works
for s in self.into_iter() {
// TODO: where should we get the rate limits from?
// TODO: this is not going to work. we need different rate limits for different endpoints
for (s, limit) in self.into_iter() {
rpcs.push(s.to_string());
connections.insert(s.to_string(), 0);
// ratelimits.insert(s.to_string(), 0);
if limit > 0 {
let quota = governor::Quota::per_second(NonZeroU32::new(limit).unwrap());
let rate_limiter = governor::RateLimiter::direct(quota);
ratelimits.insert(s.to_string(), rate_limiter);
}
}
BalancedRpcs {
rpcs: RwLock::new(rpcs),
connections,
// ratelimits,
ratelimits,
}
}
}
impl BalancedRpcs {
async fn get_upstream_server(&self) -> Option<String> {
async fn get_upstream_server(&self) -> Result<String, NotUntil<QuantaInstant>> {
let mut balanced_rpcs = self.rpcs.write().await;
balanced_rpcs.sort_unstable_by(|a, b| {
@ -47,15 +68,47 @@ impl BalancedRpcs {
.cmp(&self.connections.get(b).unwrap())
});
// TODO: don't just grab the first. check rate limits
if let Some(selected_rpc) = balanced_rpcs.first() {
let mut earliest_not_until = None;
for selected_rpc in balanced_rpcs.iter() {
// check rate limits
match self.ratelimits.get(selected_rpc).unwrap().check() {
Ok(_) => {
// rate limit succeeded
}
Err(not_until) => {
// rate limit failed
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it
if earliest_not_until.is_none() {
earliest_not_until = Some(not_until);
} else {
let earliest_possible =
earliest_not_until.as_ref().unwrap().earliest_possible();
let new_earliest_possible = not_until.earliest_possible();
if earliest_possible > new_earliest_possible {
earliest_not_until = Some(not_until);
}
}
continue;
}
};
// increment our connection counter
// TODO: need to change this to be an atomic counter!
let mut connections = self.connections.get_mut(selected_rpc).unwrap();
*connections += 1;
return Some(selected_rpc.clone());
// return the selected RPC
return Ok(selected_rpc.clone());
}
None
// return the smallest not_until
if let Some(not_until) = earliest_not_until {
return Err(not_until);
} else {
unimplemented!();
}
}
}
@ -71,7 +124,6 @@ impl Into<LoudRpcs> for Vec<&str> {
let mut rpcs: Vec<String> = vec![];
// let ratelimits = DashMap::new();
// TODO: i'm sure there is a better way to do this with more iterator things like collect, but this works
for s in self.into_iter() {
rpcs.push(s.to_string());
// ratelimits.insert(s.to_string(), 0);
@ -101,7 +153,7 @@ struct Web3ProxyState {
}
impl Web3ProxyState {
fn new(balanced_rpc_tiers: Vec<Vec<&str>>, private_rpcs: Vec<&str>) -> Web3ProxyState {
fn new(balanced_rpc_tiers: Vec<Vec<(&str, u32)>>, private_rpcs: Vec<&str>) -> Web3ProxyState {
// TODO: warn if no private relays
Web3ProxyState {
client: reqwest::Client::new(),
@ -129,7 +181,7 @@ impl Web3ProxyState {
} else {
// this is not a private transaction (or no private relays are configured)
for balanced_rpcs in self.balanced_rpc_tiers.iter() {
if let Some(upstream_server) = balanced_rpcs.get_upstream_server().await {
if let Ok(upstream_server) = balanced_rpcs.get_upstream_server().await {
// TODO: capture any errors. at least log them
if let Ok(result) = self
.try_send_requests(vec![upstream_server], &json_body)
@ -137,6 +189,8 @@ impl Web3ProxyState {
{
return Ok(result);
}
} else {
// TODO: if we got an error. save the ratelimit NotUntil so we can sleep until then before trying again
}
}
}
@ -150,33 +204,48 @@ impl Web3ProxyState {
json_body: &serde_json::Value,
) -> anyhow::Result<String> {
// send the query to all the servers
let mut future_responses = FuturesUnordered::new();
for upstream_server in upstream_servers.into_iter() {
let f = self.client.post(upstream_server).json(&json_body).send();
let mut bodies = stream::iter(upstream_servers)
.map(|url| {
let client = self.client.clone();
let json_body = json_body.clone();
tokio::spawn(async move {
let resp = client.post(url).json(&json_body).send().await?;
resp.text().await
})
})
.buffer_unordered(PARALLEL_REQUESTS);
future_responses.push(f);
}
let mut oks = vec![];
let mut errs = vec![];
// start loading text responses
let mut future_text = FuturesUnordered::new();
while let Some(request) = future_responses.next().await {
if let Ok(request) = request {
let f = request.text();
future_text.push(f);
while let Some(b) = bodies.next().await {
// TODO: reduce connection counter
match b {
Ok(Ok(b)) => {
// TODO: if "no block with that header", skip this response (maybe retry)
oks.push(b);
}
Ok(Err(e)) => {
// TODO: better errors
eprintln!("Got a reqwest::Error: {}", e);
errs.push(anyhow::anyhow!("Got a reqwest::Error"));
}
Err(e) => {
// TODO: better errors
eprintln!("Got a tokio::JoinError: {}", e);
errs.push(anyhow::anyhow!("Got a tokio::JoinError"));
}
}
}
// return the first response
while let Some(text) = future_text.next().await {
if let Ok(text) = text {
// TODO: if "no block with that header", skip this response (maybe retry)
return Ok(text);
}
// TODO: capture errors
// TODO: which response should we use?
if oks.len() > 0 {
return Ok(oks.pop().unwrap());
} else if errs.len() > 0 {
return Err(errs.pop().unwrap());
} else {
return Err(anyhow::anyhow!("no successful responses"));
}
Err(anyhow::anyhow!("no successful responses"))
}
}
@ -190,13 +259,20 @@ async fn main() {
let state = Web3ProxyState::new(
vec![
// local nodes
vec!["https://10.11.12.16:8545"],
vec![("https://10.11.12.16:8545", 0)],
// paid nodes
// TODO: add them
// TODO: add paid nodes (with rate limits)
// free nodes
vec!["https://main-rpc.linkpool.io", "https://rpc.ankr.com/eth"],
// TODO: add rate limits
vec![
("https://main-rpc.linkpool.io", 0),
("https://rpc.ankr.com/eth", 0),
],
],
vec![
"https://api.edennetwork.io/v1/beta",
"https://api.edennetwork.io/v1/",
],
vec!["https://api.edennetwork.io/v1/beta"],
);
let state: Arc<Web3ProxyState> = Arc::new(state);