flume seems faster

This commit is contained in:
Bryan Stitt 2022-05-03 05:13:53 +00:00
parent 0d5424cf7d
commit 5a9bcd5dc3
5 changed files with 71 additions and 52 deletions

34
Cargo.lock generated

@ -1196,6 +1196,19 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d"
[[package]]
name = "flume"
version = "0.10.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "843c03199d0c0ca54bc1ea90ac0d507274c28abcc4f691ae8b4eaa375087c76a"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"pin-project",
"spin 0.9.3",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -1913,6 +1926,15 @@ dependencies = [
"twoway",
]
[[package]]
name = "nanorand"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [
"getrandom",
]
[[package]]
name = "native-tls"
version = "0.2.10"
@ -2612,7 +2634,7 @@ dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"spin 0.5.2",
"untrusted",
"web-sys",
"winapi",
@ -3036,6 +3058,15 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c530c2b0d0bf8b69304b39fe2001993e267461948b890cd037d8ad4293fa1a0d"
dependencies = [
"lock_api",
]
[[package]]
name = "spki"
version = "0.5.4"
@ -3760,6 +3791,7 @@ dependencies = [
"argh",
"derive_more",
"ethers",
"flume",
"futures",
"fxhash",
"governor",

@ -11,6 +11,7 @@ argh = "0.1.7"
anyhow = "1.0.57"
derive_more = "0.99.17"
ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] }
flume = "0.10.12"
futures = { version = "0.3.21", features = ["thread-pool"] }
fxhash = "0.2.1"
governor = { version = "0.4.2", features = ["dashmap", "std"] }
@ -21,7 +22,7 @@ reqwest = { version = "0.11.10", features = ["json", "rustls"] }
rustc-hash = "1.1.0"
serde = { version = "1.0.137", features = [] }
serde_json = { version = "1.0.80", default-features = false, features = ["alloc"] }
toml = "*"
toml = "0.5.9"
tracing = "0.1.34"
tracing-subscriber = "0.3.11"
url = "2.2.2"

@ -10,7 +10,6 @@ use governor::RateLimiter;
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::num::NonZeroU32;
use std::sync::atomic::{self, AtomicU32, AtomicU64};
use std::time::Duration;
@ -23,39 +22,7 @@ use crate::connections::Web3Connections;
type Web3RateLimiter =
RateLimiter<NotKeyed, InMemoryState, QuantaClock, NoOpMiddleware<QuantaInstant>>;
#[derive(Clone, Deserialize)]
pub struct JsonRpcRequest {
pub id: Box<RawValue>,
pub method: String,
pub params: Box<RawValue>,
}
impl fmt::Debug for JsonRpcRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("JsonRpcRequest")
.field("id", &self.id)
.finish_non_exhaustive()
}
}
// TODO: check for errors too!
#[derive(Clone, Deserialize, Serialize)]
pub struct JsonRpcForwardedResponse {
pub id: Box<RawValue>,
pub result: Box<RawValue>,
}
impl fmt::Debug for JsonRpcForwardedResponse {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("JsonRpcForwardedResponse")
.field("id", &self.id)
.finish_non_exhaustive()
}
}
// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit https://github.com/gakonst/ethers-rs/issues/592
/// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit https://github.com/gakonst/ethers-rs/issues/592
#[derive(From)]
pub enum Web3Provider {
Http(ethers::providers::Provider<ethers::providers::Http>),
@ -96,12 +63,6 @@ impl fmt::Display for Web3Connection {
}
}
impl Hash for Web3Connection {
fn hash<H: Hasher>(&self, state: &mut H) {
self.url.hash(state);
}
}
impl Web3Connection {
/// Connect to a web3 rpc and subscribe to new heads
pub async fn try_new(
@ -302,3 +263,35 @@ impl PartialEq for Web3Connection {
== other.active_requests.load(atomic::Ordering::Acquire)
}
}
#[derive(Clone, Deserialize)]
pub struct JsonRpcRequest {
pub id: Box<RawValue>,
pub method: String,
pub params: Box<RawValue>,
}
impl fmt::Debug for JsonRpcRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("JsonRpcRequest")
.field("id", &self.id)
.finish_non_exhaustive()
}
}
// TODO: check for errors too!
#[derive(Clone, Deserialize, Serialize)]
pub struct JsonRpcForwardedResponse {
pub id: Box<RawValue>,
pub result: Box<RawValue>,
}
impl fmt::Debug for JsonRpcForwardedResponse {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("JsonRpcForwardedResponse")
.field("id", &self.id)
.finish_non_exhaustive()
}
}

@ -10,7 +10,6 @@ use serde_json::value::RawValue;
use std::cmp;
use std::fmt;
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::warn;
use crate::config::Web3ConnectionConfig;
@ -93,8 +92,6 @@ impl Web3Connections {
});
}
// TODO: listen on a receiver mpsc channel?
Ok(connections)
}
@ -120,7 +117,7 @@ impl Web3Connections {
connections: Vec<Arc<Web3Connection>>,
method: String,
params: Box<RawValue>,
response_sender: mpsc::UnboundedSender<anyhow::Result<JsonRpcForwardedResponse>>,
response_sender: flume::Sender<anyhow::Result<JsonRpcForwardedResponse>>,
) -> anyhow::Result<()> {
let mut unordered_futures = FuturesUnordered::new();

@ -10,7 +10,6 @@ use std::fmt;
use std::fs;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::sleep;
use tracing::warn;
use warp::Filter;
@ -99,7 +98,7 @@ impl Web3ProxyApp {
// TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit
match private_rpcs.get_upstream_servers() {
Ok(upstream_servers) => {
let (tx, mut rx) = mpsc::unbounded_channel();
let (tx, rx) = flume::unbounded();
let connections = private_rpcs.clone();
let method = json_body.method.clone();
@ -112,10 +111,7 @@ impl Web3ProxyApp {
});
// wait for the first response
let response = rx
.recv()
.await
.ok_or_else(|| anyhow::anyhow!("no successful response"))?;
let response = rx.recv_async().await?;
if let Ok(partial_response) = response {
let response = json!({