From 510612d343fc51338a8a4282dcc229b50097835b Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 12 May 2023 15:47:01 -0700 Subject: [PATCH] use kanal instead of flume or tokio channels (#68) --- Cargo.lock | 129 ++++-------------- TODO.md | 2 +- latency/Cargo.toml | 1 + latency/src/peak_ewma/mod.rs | 20 +-- rate-counter/Cargo.toml | 2 +- web3_proxy/Cargo.toml | 2 +- web3_proxy/src/app/mod.rs | 2 +- web3_proxy/src/app/ws.rs | 10 +- .../src/bin/web3_proxy_cli/sentryd/mod.rs | 7 +- web3_proxy/src/config.rs | 4 +- web3_proxy/src/frontend/authorization.rs | 7 +- web3_proxy/src/frontend/errors.rs | 10 +- web3_proxy/src/frontend/rpc_proxy_ws.rs | 12 +- web3_proxy/src/rpcs/blockchain.rs | 4 +- web3_proxy/src/rpcs/many.rs | 26 ++-- web3_proxy/src/rpcs/one.rs | 24 ++-- web3_proxy/src/stats/mod.rs | 1 - web3_proxy/src/stats/stat_buffer.rs | 8 +- 18 files changed, 97 insertions(+), 174 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5832affc..8753b543 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -792,16 +792,6 @@ dependencies = [ "cc", ] -[[package]] -name = "codespan-reporting" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e" -dependencies = [ - "termcolor", - "unicode-width", -] - [[package]] name = "coins-bip32" version = "0.8.3" @@ -1204,50 +1194,6 @@ dependencies = [ "cipher 0.4.4", ] -[[package]] -name = "cxx" -version = "1.0.94" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f61f1b6389c3fe1c316bf8a4dccc90a38208354b330925bce1f74a6c4756eb93" -dependencies = [ - "cc", - "cxxbridge-flags", - "cxxbridge-macro", - "link-cplusplus", -] - -[[package]] -name = "cxx-build" -version = "1.0.94" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12cee708e8962df2aeb38f594aae5d827c022b6460ac71a7a3e2c3c2aae5a07b" -dependencies = [ - "cc", - "codespan-reporting", - "once_cell", - "proc-macro2", - "quote", - "scratch", - "syn 2.0.15", -] - -[[package]] -name = "cxxbridge-flags" -version = "1.0.94" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7944172ae7e4068c533afbb984114a56c46e9ccddda550499caa222902c7f7bb" - -[[package]] -name = "cxxbridge-macro" -version = "1.0.94" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2345488264226bf682893e25de0769f3360aac9957980ec49361b083ddaa5bc5" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.15", -] - [[package]] name = "dashmap" version = "4.0.2" @@ -2052,19 +1998,6 @@ dependencies = [ "miniz_oxide 0.7.1", ] -[[package]] -name = "flume" -version = "0.10.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" -dependencies = [ - "futures-core", - "futures-sink", - "nanorand", - "pin-project", - "spin 0.9.8", -] - [[package]] name = "fnv" version = "1.0.7" @@ -2376,9 +2309,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.18" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f8a914c2987b688368b5138aa05321db91f4090cf26118185672ad588bce21" +checksum = "d357c7ae988e7d2182f7d7871d0b963962420b0678b0997ce7de72001aeab782" dependencies = [ "bytes", "fnv", @@ -2712,12 +2645,11 @@ dependencies = [ [[package]] name = "iana-time-zone-haiku" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0703ae284fc167426161c2e3f1da3ea71d94b21bedbcc9494e92b28e334e3dca" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" dependencies = [ - "cxx", - "cxx-build", + "cc", ] [[package]] @@ -2976,6 +2908,16 @@ dependencies = [ "signature 2.1.0", ] +[[package]] +name = "kanal" +version = "0.1.0-pre8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05d55519627edaf7fd0f29981f6dc03fb52df3f5b257130eb8d0bf2801ea1d7" +dependencies = [ + "futures-core", + "lock_api", +] + [[package]] name = "keccak" version = "0.1.4" @@ -3021,6 +2963,7 @@ name = "latency" version = "0.1.0" dependencies = [ "ewma", + "kanal", "log", "serde", "tokio", @@ -3069,15 +3012,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "link-cplusplus" -version = "1.0.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecd207c9c713c34f95a097a5b029ac2ce6010530c7b49d7fea24d977dede04f5" -dependencies = [ - "cc", -] - [[package]] name = "linux-raw-sys" version = "0.3.7" @@ -3274,15 +3208,6 @@ dependencies = [ "uuid 1.3.2", ] -[[package]] -name = "nanorand" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" -dependencies = [ - "getrandom", -] - [[package]] name = "native-tls" version = "0.2.11" @@ -4277,7 +4202,7 @@ dependencies = [ name = "rate-counter" version = "0.1.0" dependencies = [ - "flume", + "kanal", "tokio", ] @@ -4323,9 +4248,9 @@ dependencies = [ [[package]] name = "rdkafka-sys" -version = "4.3.0+1.9.2" +version = "4.4.0+1.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d222a401698c7f2010e3967353eae566d9934dcda49c29910da922414ab4e3f4" +checksum = "87ac9d87c3aba1748e3112318459f2ac8bff80bfff7359e338e0463549590249" dependencies = [ "cmake", "libc", @@ -4777,12 +4702,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" -[[package]] -name = "scratch" -version = "1.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1792db035ce95be60c3f8853017b3999209281c24e2ba5bc8e59bf97a0c590c1" - [[package]] name = "scrypt" version = "0.10.0" @@ -5004,9 +4923,9 @@ dependencies = [ [[package]] name = "security-framework" -version = "2.8.2" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a332be01508d814fed64bf28f798a146d73792121129962fdf335bb3c49a4254" +checksum = "ca2855b3715770894e67cbfa3df957790aa0c9edc3bf06efa1a84d77fa0839d1" dependencies = [ "bitflags", "core-foundation", @@ -5017,9 +4936,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.8.0" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31c9bb296072e961fcbd8853511dd39c2d8be2deb1e17c6860b1d30732b323b4" +checksum = "f51d0c0d83bec45f16480d0ce0058397a69e48fcdc52d1dc8855fb68acbd31a7" dependencies = [ "core-foundation-sys", "libc", @@ -6658,7 +6577,6 @@ dependencies = [ "ethers", "ewma", "fdlimit", - "flume", "fstrings", "futures", "gethostname", @@ -6673,6 +6591,7 @@ dependencies = [ "influxdb2-structmap", "ipnet", "itertools", + "kanal", "latency", "listenfd", "log", diff --git a/TODO.md b/TODO.md index b405b251..690007b4 100644 --- a/TODO.md +++ b/TODO.md @@ -189,7 +189,7 @@ These are roughly in order of completition - [x] graceful shutdown. stop taking new requests and don't stop until all outstanding queries are handled - https://github.com/tokio-rs/mini-redis/blob/master/src/shutdown.rs - we need this because we need to be sure all the queries are saved in the db. maybe put stuff in Drop - - need an flume::watch on unflushed stats that we can subscribe to. wait for it to flip to true + - need an kanal::watch on unflushed stats that we can subscribe to. wait for it to flip to true - [x] don't use unix timestamps for response_millis since leap seconds will confuse it - [x] config to allow origins even on the anonymous endpoints - [x] send logs to sentry diff --git a/latency/Cargo.toml b/latency/Cargo.toml index eb51eba9..8583e9ab 100644 --- a/latency/Cargo.toml +++ b/latency/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] ewma = "0.1.1" +kanal = "0.1.0-pre8" log = "0.4.17" serde = { version = "1.0.163", features = [] } tokio = { version = "1.28.1", features = ["full"] } diff --git a/latency/src/peak_ewma/mod.rs b/latency/src/peak_ewma/mod.rs index 9b6f2f8b..533ff3af 100644 --- a/latency/src/peak_ewma/mod.rs +++ b/latency/src/peak_ewma/mod.rs @@ -2,9 +2,8 @@ mod rtt_estimate; use std::sync::Arc; +use kanal::SendError; use log::error; -use tokio::sync::mpsc; -use tokio::sync::mpsc::error::TrySendError; use tokio::task::JoinHandle; use tokio::time::{Duration, Instant}; @@ -20,7 +19,7 @@ pub struct PeakEwmaLatency { /// Join handle for the latency calculation task pub join_handle: JoinHandle<()>, /// Send to update with each request duration - request_tx: mpsc::Sender, + request_tx: kanal::AsyncSender, /// Latency average and last update time rtt_estimate: Arc, /// Decay time @@ -34,7 +33,7 @@ impl PeakEwmaLatency { /// average latency. pub fn spawn(decay_ns: f64, buf_size: usize, start_latency: Duration) -> Self { debug_assert!(decay_ns > 0.0, "decay_ns must be positive"); - let (request_tx, request_rx) = mpsc::channel(buf_size); + let (request_tx, request_rx) = kanal::bounded_async(buf_size); let rtt_estimate = Arc::new(AtomicRttEstimate::new(start_latency)); let task = PeakEwmaLatencyTask { request_rx, @@ -71,16 +70,19 @@ impl PeakEwmaLatency { /// Should only be called from the Web3Rpc that owns it. pub fn report(&self, duration: Duration) { match self.request_tx.try_send(duration) { - Ok(()) => {} - Err(TrySendError::Full(_)) => { + Ok(true) => {} + Ok(false) => { // We don't want to block if the channel is full, just // report the error error!("Latency report channel full"); // TODO: could we spawn a new tokio task to report tthis later? } - Err(TrySendError::Closed(_)) => { + Err(SendError::Closed) => { unreachable!("Owner should keep channel open"); } + Err(SendError::ReceiveClosed) => { + unreachable!("Receiver should keep channel open"); + } }; //.expect("Owner should keep channel open"); } @@ -90,7 +92,7 @@ impl PeakEwmaLatency { #[derive(Debug)] struct PeakEwmaLatencyTask { /// Receive new request timings for update - request_rx: mpsc::Receiver, + request_rx: kanal::AsyncReceiver, /// Current estimate and update time rtt_estimate: Arc, /// Last update time, used for decay calculation @@ -102,7 +104,7 @@ struct PeakEwmaLatencyTask { impl PeakEwmaLatencyTask { /// Run the loop for updating latency async fn run(mut self) { - while let Some(rtt) = self.request_rx.recv().await { + while let Ok(rtt) = self.request_rx.recv().await { self.update(rtt); } } diff --git a/rate-counter/Cargo.toml b/rate-counter/Cargo.toml index 7bf027e4..6e25777a 100644 --- a/rate-counter/Cargo.toml +++ b/rate-counter/Cargo.toml @@ -5,5 +5,5 @@ authors = ["Bryan Stitt "] edition = "2021" [dependencies] -flume = "0.10.14" +kanal = "0.1.0-pre8" tokio = { version = "1.28.1", features = ["time"] } diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 4883f9a2..4d4a926c 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -43,7 +43,6 @@ env_logger = "0.10.0" ethers = { version = "2.0.4", default-features = false, features = ["rustls", "ws"] } ewma = "0.1.1" fdlimit = "0.2.1" -flume = "0.10.14" fstrings = "0.2" futures = { version = "0.3.28", features = ["thread-pool"] } gethostname = "0.4.2" @@ -58,6 +57,7 @@ influxdb2 = { git = "https://github.com/llamanodes/influxdb2", features = ["rust influxdb2-structmap = { git = "https://github.com/llamanodes/influxdb2/"} ipnet = "2.7.2" itertools = "0.10.5" +kanal = "0.1.0-pre8" listenfd = "1.0.1" log = "0.4.17" mimalloc = { version = "0.1.37", optional = true} diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 63c3f9f5..09dfb2ed 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -265,7 +265,7 @@ pub struct Web3ProxyApp { Cache, hashbrown::hash_map::DefaultHashBuilder>, pub kafka_producer: Option, /// channel for sending stats in a background task - pub stat_sender: Option>, + pub stat_sender: Option>, } /// flatten a JoinError into an anyhow error diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index db348577..6e8e61f1 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -24,7 +24,7 @@ impl Web3ProxyApp { jsonrpc_request: JsonRpcRequest, subscription_count: &'a AtomicUsize, // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now - response_sender: flume::Sender, + response_sender: kanal::AsyncSender, ) -> Web3ProxyResult<(AbortHandle, JsonRpcForwardedResponse)> { let request_metadata = RequestMetadata::new( self, @@ -94,7 +94,7 @@ impl Web3ProxyApp { // TODO: can we check a content type header? let response_msg = Message::Text(response_str); - if response_sender.send_async(response_msg).await.is_err() { + if response_sender.send(response_msg).await.is_err() { // TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects. // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; @@ -158,7 +158,7 @@ impl Web3ProxyApp { // TODO: do clients support binary messages? let response_msg = Message::Text(response_str); - if response_sender.send_async(response_msg).await.is_err() { + if response_sender.send(response_msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; @@ -221,7 +221,7 @@ impl Web3ProxyApp { // TODO: do clients support binary messages? let response_msg = Message::Text(response_str); - if response_sender.send_async(response_msg).await.is_err() { + if response_sender.send(response_msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; @@ -285,7 +285,7 @@ impl Web3ProxyApp { // TODO: do clients support binary messages? let response_msg = Message::Text(response_str); - if response_sender.send_async(response_msg).await.is_err() { + if response_sender.send(response_msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; diff --git a/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs b/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs index 5e0af642..a1d9f50d 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs @@ -11,7 +11,6 @@ use log::{error, info}; use pagerduty_rs::{eventsv2async::EventsV2 as PagerdutyAsyncEventsV2, types::Event}; use serde_json::json; use std::time::Duration; -use tokio::sync::mpsc; use tokio::time::{interval, MissedTickBehavior}; use web3_proxy::{config::TopConfig, pagerduty::pagerduty_alert}; @@ -116,7 +115,7 @@ impl SentrydSubCommand { let mut handles = FuturesUnordered::new(); // channels and a task for sending errors to logs/pagerduty - let (error_sender, mut error_receiver) = mpsc::channel::(10); + let (error_sender, error_receiver) = kanal::bounded_async::(10); { let error_handler_f = async move { @@ -124,7 +123,7 @@ impl SentrydSubCommand { info!("set PAGERDUTY_INTEGRATION_KEY to send create alerts for errors"); } - while let Some(err) = error_receiver.recv().await { + while let Ok(err) = error_receiver.recv().await { log::log!(err.level, "check failed: {:#?}", err); if matches!(err.level, log::Level::Error) { @@ -258,7 +257,7 @@ async fn a_loop( class: &str, seconds: u64, error_level: log::Level, - error_sender: mpsc::Sender, + error_sender: kanal::AsyncSender, f: impl Fn(SentrydErrorBuilder) -> T, ) -> anyhow::Result<()> where diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 05a947d5..51fd4099 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -285,8 +285,8 @@ impl Web3RpcConfig { http_client: Option, http_interval_sender: Option>>, blocks_by_hash_cache: BlocksByHashCache, - block_sender: Option>, - tx_id_sender: Option>, + block_sender: Option>, + tx_id_sender: Option>, reconnect: bool, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { if !self.extra.is_empty() { diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 5d951956..bb1d6ecb 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -267,7 +267,7 @@ pub struct RequestMetadata { pub kafka_debug_logger: Option>, /// Channel to send stats to - pub stat_sender: Option>, + pub stat_sender: Option>, } impl Default for RequestMetadata { @@ -457,8 +457,11 @@ impl RequestMetadata { let stat: AppStat = stat.into(); + // can't use async because a Drop can call this + let stat_sender = stat_sender.to_sync(); + if let Err(err) = stat_sender.send(stat) { - error!("failed sending stats for {:?}: {:?}", err.0, err); + error!("failed sending stat: {:?}", err); // TODO: return it? that seems like it might cause an infinite loop }; diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index b7d1a669..1ae29e23 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -51,7 +51,6 @@ pub enum Web3ProxyError { EthersHttpClientError(ethers::prelude::HttpClientError), EthersProviderError(ethers::prelude::ProviderError), EthersWsClientError(ethers::prelude::WsClientError), - FlumeRecvError(flume::RecvError), GasEstimateNotU256, Headers(headers::Error), HeaderToString(ToStrError), @@ -78,6 +77,8 @@ pub enum Web3ProxyError { #[display(fmt = "{:?}", _0)] #[error(ignore)] JsonRpcForwardedError(JsonRpcForwardedResponse), + KanalReceiveError(kanal::ReceiveError), + KanalSendError(kanal::SendError), #[display(fmt = "{:?}", _0)] #[error(ignore)] MsgPackEncode(rmp_serde::encode::Error), @@ -112,7 +113,6 @@ pub enum Web3ProxyError { #[from(ignore)] RefererNotAllowed(headers::Referer), SemaphoreAcquireError(AcquireError), - SendAppStatError(flume::SendError), SerdeJson(serde_json::Error), /// simple way to return an error message to the user and an anyhow to our logs #[display(fmt = "{}, {}, {:?}", _0, _1, _2)] @@ -261,8 +261,8 @@ impl Web3ProxyError { ), ) } - Self::FlumeRecvError(err) => { - warn!("FlumeRecvError err={:#?}", err); + Self::KanalReceiveError(err) => { + warn!("KanalRecvError err={:#?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcForwardedResponse::from_str( @@ -701,7 +701,7 @@ impl Web3ProxyError { ), ) } - Self::SendAppStatError(err) => { + Self::KanalSendError(err) => { error!("SendAppStatError err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index e0522b00..e562f297 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -311,7 +311,7 @@ async fn proxy_web3_socket( let (ws_tx, ws_rx) = socket.split(); // create a channel for our reader and writer can communicate. todo: benchmark different channels - let (response_sender, response_receiver) = flume::unbounded::(); + let (response_sender, response_receiver) = kanal::unbounded_async::(); tokio::spawn(write_web3_socket(response_receiver, ws_tx)); tokio::spawn(read_web3_socket(app, authorization, ws_rx, response_sender)); @@ -323,7 +323,7 @@ async fn handle_socket_payload( app: Arc, authorization: &Arc, payload: &str, - response_sender: &flume::Sender, + response_sender: &kanal::AsyncSender, subscription_count: &AtomicUsize, subscriptions: Arc>>, ) -> Web3ProxyResult<(Message, Option)> { @@ -452,7 +452,7 @@ async fn read_web3_socket( app: Arc, authorization: Arc, mut ws_rx: SplitStream, - response_sender: flume::Sender, + response_sender: kanal::AsyncSender, ) { // RwLock should be fine here. a user isn't going to be opening tons of subscriptions let subscriptions = Arc::new(RwLock::new(HashMap::new())); @@ -528,7 +528,7 @@ async fn read_web3_socket( } }; - if response_sender.send_async(response_msg).await.is_err() { + if response_sender.send(response_msg).await.is_err() { let _ = close_sender.send(true); return; }; @@ -549,13 +549,13 @@ async fn read_web3_socket( } async fn write_web3_socket( - response_rx: flume::Receiver, + response_rx: kanal::AsyncReceiver, mut ws_tx: SplitSink, ) { // TODO: increment counter for open websockets // TODO: is there any way to make this stream receive. - while let Ok(msg) = response_rx.recv_async().await { + while let Ok(msg) = response_rx.recv().await { // a response is ready // TODO: poke rate limits for this user? diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 3d6ac3dd..dc1ef8c2 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -365,7 +365,7 @@ impl Web3Rpcs { pub(super) async fn process_incoming_blocks( &self, authorization: &Arc, - block_receiver: flume::Receiver, + block_receiver: kanal::AsyncReceiver, // TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed // Geth's subscriptions have the same potential for skipping blocks. pending_tx_sender: Option>, @@ -373,7 +373,7 @@ impl Web3Rpcs { let mut connection_heads = ConsensusFinder::new(self.max_block_age, self.max_block_lag); loop { - match block_receiver.recv_async().await { + match block_receiver.recv().await { Ok((new_block, rpc)) => { let rpc_name = rpc.name.clone(); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 9d27ad43..6addeb6f 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -43,7 +43,7 @@ use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBeh #[derive(From)] pub struct Web3Rpcs { /// if watch_consensus_head_sender is some, Web3Rpc inside self will send blocks here when they get them - pub(crate) block_sender: flume::Sender<(Option, Arc)>, + pub(crate) block_sender: kanal::AsyncSender<(Option, Arc)>, /// any requests will be forwarded to one (or more) of these connections pub(crate) by_name: ArcSwap>>, /// notify all http providers to check their blocks at the same time @@ -57,8 +57,8 @@ pub struct Web3Rpcs { pub(super) watch_consensus_head_sender: Option>>, pub(super) pending_transaction_cache: Cache, - pub(super) pending_tx_id_receiver: flume::Receiver, - pub(super) pending_tx_id_sender: flume::Sender, + pub(super) pending_tx_id_receiver: kanal::AsyncReceiver, + pub(super) pending_tx_id_sender: kanal::AsyncSender, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// all blocks, including orphans pub(super) blocks_by_hash: BlocksByHashCache, @@ -94,8 +94,8 @@ impl Web3Rpcs { watch::Receiver>>, // watch::Receiver>, )> { - let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); - let (block_sender, block_receiver) = flume::unbounded::(); + let (pending_tx_id_sender, pending_tx_id_receiver) = kanal::unbounded_async(); + let (block_sender, block_receiver) = kanal::unbounded_async::(); // TODO: query the rpc to get the actual expected block time, or get from config? maybe have this be part of a health check? let expected_block_time_ms = match chain_id { @@ -347,7 +347,7 @@ impl Web3Rpcs { async fn subscribe( self: Arc, authorization: Arc, - block_receiver: flume::Receiver, + block_receiver: kanal::AsyncReceiver, pending_tx_sender: Option>, ) -> anyhow::Result<()> { let mut futures = vec![]; @@ -362,7 +362,7 @@ impl Web3Rpcs { let pending_tx_id_receiver = self.pending_tx_id_receiver.clone(); let handle = tokio::task::spawn(async move { // TODO: set up this future the same as the block funnel - while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await { + while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv().await { let f = clone.clone().process_incoming_tx_id( authorization.clone(), rpc, @@ -1391,8 +1391,8 @@ mod tests { (lagged_rpc.name.clone(), lagged_rpc.clone()), ]); - let (block_sender, _block_receiver) = flume::unbounded(); - let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); + let (block_sender, _block_receiver) = kanal::unbounded_async(); + let (pending_tx_id_sender, pending_tx_id_receiver) = kanal::unbounded_async(); let (watch_consensus_rpcs_sender, _watch_consensus_rpcs_receiver) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); @@ -1643,8 +1643,8 @@ mod tests { (archive_rpc.name.clone(), archive_rpc.clone()), ]); - let (block_sender, _) = flume::unbounded(); - let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); + let (block_sender, _) = kanal::unbounded_async(); + let (pending_tx_id_sender, pending_tx_id_receiver) = kanal::unbounded_async(); let (watch_consensus_rpcs_sender, _) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); @@ -1807,8 +1807,8 @@ mod tests { ), ]); - let (block_sender, _) = flume::unbounded(); - let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); + let (block_sender, _) = kanal::unbounded_async(); + let (pending_tx_id_sender, pending_tx_id_receiver) = kanal::unbounded_async(); let (watch_consensus_rpcs_sender, _) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index c541c0da..01575384 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -98,8 +98,8 @@ impl Web3Rpc { http_interval_sender: Option>>, redis_pool: Option, block_map: BlocksByHashCache, - block_sender: Option>, - tx_id_sender: Option)>>, + block_sender: Option>, + tx_id_sender: Option)>>, reconnect: bool, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let created_at = Instant::now(); @@ -388,7 +388,7 @@ impl Web3Rpc { /// TODO: maybe it would be better to use "Full Jitter". The "Full Jitter" approach uses less work, but slightly more time. pub async fn retrying_connect( self: &Arc, - block_sender: Option<&flume::Sender>, + block_sender: Option<&kanal::AsyncSender>, chain_id: u64, db_conn: Option<&DatabaseConnection>, delay_start: bool, @@ -451,7 +451,7 @@ impl Web3Rpc { /// connect to the web3 provider async fn connect( self: &Arc, - block_sender: Option<&flume::Sender>, + block_sender: Option<&kanal::AsyncSender>, chain_id: u64, db_conn: Option<&DatabaseConnection>, ) -> anyhow::Result<()> { @@ -473,7 +473,7 @@ impl Web3Rpc { // tell the block subscriber that this rpc doesn't have any blocks if let Some(block_sender) = block_sender { block_sender - .send_async((None, self.clone())) + .send((None, self.clone())) .await .context("block_sender during connect")?; } @@ -588,7 +588,7 @@ impl Web3Rpc { pub(crate) async fn send_head_block_result( self: &Arc, new_head_block: Result, ProviderError>, - block_sender: &flume::Sender, + block_sender: &kanal::AsyncSender, block_map: BlocksByHashCache, ) -> anyhow::Result<()> { let new_head_block = match new_head_block { @@ -651,7 +651,7 @@ impl Web3Rpc { // send an empty block to take this server out of rotation block_sender - .send_async((new_head_block, self.clone())) + .send((new_head_block, self.clone())) .await .context("block_sender")?; @@ -670,11 +670,11 @@ impl Web3Rpc { self: Arc, authorization: &Arc, block_map: BlocksByHashCache, - block_sender: Option>, + block_sender: Option>, chain_id: u64, disconnect_receiver: watch::Receiver, http_interval_sender: Option>>, - tx_id_sender: Option)>>, + tx_id_sender: Option)>>, ) -> anyhow::Result<()> { let error_handler = if self.backup { RequestErrorHandler::DebugLevel @@ -895,7 +895,7 @@ impl Web3Rpc { self: Arc, authorization: Arc, http_interval_receiver: Option>, - block_sender: flume::Sender, + block_sender: kanal::AsyncSender, block_map: BlocksByHashCache, ) -> anyhow::Result<()> { trace!("watching new heads on {}", self); @@ -1090,7 +1090,7 @@ impl Web3Rpc { async fn subscribe_pending_transactions( self: Arc, authorization: Arc, - tx_id_sender: flume::Sender<(TxHash, Arc)>, + tx_id_sender: kanal::AsyncSender<(TxHash, Arc)>, ) -> anyhow::Result<()> { // TODO: give this a separate client. don't use new_head_client for everything. especially a firehose this big // TODO: timeout @@ -1115,7 +1115,7 @@ impl Web3Rpc { while let Some(pending_tx_id) = stream.next().await { tx_id_sender - .send_async((pending_tx_id, self.clone())) + .send((pending_tx_id, self.clone())) .await .context("tx_id_sender")?; diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 930f9e04..14759611 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -672,7 +672,6 @@ impl RpcQueryStats { method: Option<&str>, ) -> Decimal { // some methods should be free. there might be cases where method isn't set (though they should be uncommon) - // TODO: get this list from config (and add more to it) if let Some(method) = method.as_ref() { if ["eth_chainId"].contains(method) { return 0.into(); diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index aaca71cd..e71f90af 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -30,7 +30,7 @@ pub struct BufferedRpcQueryStats { #[derive(From)] pub struct SpawnedStatBuffer { - pub stat_sender: flume::Sender, + pub stat_sender: kanal::AsyncSender, /// these handles are important and must be allowed to finish pub background_handle: JoinHandle>, } @@ -65,7 +65,7 @@ impl StatBuffer { return Ok(None); } - let (stat_sender, stat_receiver) = flume::unbounded(); + let (stat_sender, stat_receiver) = kanal::unbounded_async(); let timestamp_precision = TimestampPrecision::Seconds; let mut new = Self { @@ -94,7 +94,7 @@ impl StatBuffer { async fn aggregate_and_save_loop( &mut self, bucket: String, - stat_receiver: flume::Receiver, + stat_receiver: kanal::AsyncReceiver, mut shutdown_receiver: broadcast::Receiver<()>, ) -> anyhow::Result<()> { let mut tsdb_save_interval = @@ -107,7 +107,7 @@ impl StatBuffer { loop { tokio::select! { - stat = stat_receiver.recv_async() => { + stat = stat_receiver.recv() => { // info!("Received stat"); // save the stat to a buffer match stat {