From 5a9bcd5dc343e58f9e6faa90463f10370928f1cb Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 3 May 2022 05:13:53 +0000 Subject: [PATCH] flume seems faster --- Cargo.lock | 34 ++++++++++++++++++++- Cargo.toml | 3 +- src/connection.rs | 73 +++++++++++++++++++++------------------------- src/connections.rs | 5 +--- src/main.rs | 8 ++--- 5 files changed, 71 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 866dd939..a13f2d55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1196,6 +1196,19 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d" +[[package]] +name = "flume" +version = "0.10.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843c03199d0c0ca54bc1ea90ac0d507274c28abcc4f691ae8b4eaa375087c76a" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "pin-project", + "spin 0.9.3", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1913,6 +1926,15 @@ dependencies = [ "twoway", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom", +] + [[package]] name = "native-tls" version = "0.2.10" @@ -2612,7 +2634,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", + "spin 0.5.2", "untrusted", "web-sys", "winapi", @@ -3036,6 +3058,15 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c530c2b0d0bf8b69304b39fe2001993e267461948b890cd037d8ad4293fa1a0d" +dependencies = [ + "lock_api", +] + [[package]] name = "spki" version = "0.5.4" @@ -3760,6 +3791,7 @@ dependencies = [ "argh", "derive_more", "ethers", + "flume", "futures", "fxhash", "governor", diff --git a/Cargo.toml b/Cargo.toml index e5b45fd3..08cff05a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ argh = "0.1.7" anyhow = "1.0.57" derive_more = "0.99.17" ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] } +flume = "0.10.12" futures = { version = "0.3.21", features = ["thread-pool"] } fxhash = "0.2.1" governor = { version = "0.4.2", features = ["dashmap", "std"] } @@ -21,7 +22,7 @@ reqwest = { version = "0.11.10", features = ["json", "rustls"] } rustc-hash = "1.1.0" serde = { version = "1.0.137", features = [] } serde_json = { version = "1.0.80", default-features = false, features = ["alloc"] } -toml = "*" +toml = "0.5.9" tracing = "0.1.34" tracing-subscriber = "0.3.11" url = "2.2.2" diff --git a/src/connection.rs b/src/connection.rs index 2ab661e9..848028f3 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -10,7 +10,6 @@ use governor::RateLimiter; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; use std::fmt; -use std::hash::{Hash, Hasher}; use std::num::NonZeroU32; use std::sync::atomic::{self, AtomicU32, AtomicU64}; use std::time::Duration; @@ -23,39 +22,7 @@ use crate::connections::Web3Connections; type Web3RateLimiter = RateLimiter>; -#[derive(Clone, Deserialize)] -pub struct JsonRpcRequest { - pub id: Box, - pub method: String, - pub params: Box, -} - -impl fmt::Debug for JsonRpcRequest { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though - f.debug_struct("JsonRpcRequest") - .field("id", &self.id) - .finish_non_exhaustive() - } -} - -// TODO: check for errors too! -#[derive(Clone, Deserialize, Serialize)] -pub struct JsonRpcForwardedResponse { - pub id: Box, - pub result: Box, -} - -impl fmt::Debug for JsonRpcForwardedResponse { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though - f.debug_struct("JsonRpcForwardedResponse") - .field("id", &self.id) - .finish_non_exhaustive() - } -} - -// TODO: instead of an enum, I tried to use Box, but hit https://github.com/gakonst/ethers-rs/issues/592 +/// TODO: instead of an enum, I tried to use Box, but hit https://github.com/gakonst/ethers-rs/issues/592 #[derive(From)] pub enum Web3Provider { Http(ethers::providers::Provider), @@ -96,12 +63,6 @@ impl fmt::Display for Web3Connection { } } -impl Hash for Web3Connection { - fn hash(&self, state: &mut H) { - self.url.hash(state); - } -} - impl Web3Connection { /// Connect to a web3 rpc and subscribe to new heads pub async fn try_new( @@ -302,3 +263,35 @@ impl PartialEq for Web3Connection { == other.active_requests.load(atomic::Ordering::Acquire) } } + +#[derive(Clone, Deserialize)] +pub struct JsonRpcRequest { + pub id: Box, + pub method: String, + pub params: Box, +} + +impl fmt::Debug for JsonRpcRequest { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: the default formatter takes forever to write. this is too quiet though + f.debug_struct("JsonRpcRequest") + .field("id", &self.id) + .finish_non_exhaustive() + } +} + +// TODO: check for errors too! +#[derive(Clone, Deserialize, Serialize)] +pub struct JsonRpcForwardedResponse { + pub id: Box, + pub result: Box, +} + +impl fmt::Debug for JsonRpcForwardedResponse { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: the default formatter takes forever to write. this is too quiet though + f.debug_struct("JsonRpcForwardedResponse") + .field("id", &self.id) + .finish_non_exhaustive() + } +} diff --git a/src/connections.rs b/src/connections.rs index 5401c062..4392fe85 100644 --- a/src/connections.rs +++ b/src/connections.rs @@ -10,7 +10,6 @@ use serde_json::value::RawValue; use std::cmp; use std::fmt; use std::sync::Arc; -use tokio::sync::mpsc; use tracing::warn; use crate::config::Web3ConnectionConfig; @@ -93,8 +92,6 @@ impl Web3Connections { }); } - // TODO: listen on a receiver mpsc channel? - Ok(connections) } @@ -120,7 +117,7 @@ impl Web3Connections { connections: Vec>, method: String, params: Box, - response_sender: mpsc::UnboundedSender>, + response_sender: flume::Sender>, ) -> anyhow::Result<()> { let mut unordered_futures = FuturesUnordered::new(); diff --git a/src/main.rs b/src/main.rs index dc75bf3d..6d78fc3f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,7 +10,6 @@ use std::fmt; use std::fs; use std::sync::Arc; use std::time::Duration; -use tokio::sync::mpsc; use tokio::time::sleep; use tracing::warn; use warp::Filter; @@ -99,7 +98,7 @@ impl Web3ProxyApp { // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit match private_rpcs.get_upstream_servers() { Ok(upstream_servers) => { - let (tx, mut rx) = mpsc::unbounded_channel(); + let (tx, rx) = flume::unbounded(); let connections = private_rpcs.clone(); let method = json_body.method.clone(); @@ -112,10 +111,7 @@ impl Web3ProxyApp { }); // wait for the first response - let response = rx - .recv() - .await - .ok_or_else(|| anyhow::anyhow!("no successful response"))?; + let response = rx.recv_async().await?; if let Ok(partial_response) = response { let response = json!({