diff --git a/Cargo.lock b/Cargo.lock index d3cd223e..113e1202 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2163,19 +2163,6 @@ dependencies = [ "miniz_oxide", ] -[[package]] -name = "flume" -version = "0.10.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" -dependencies = [ - "futures-core", - "futures-sink", - "nanorand", - "pin-project", - "spin 0.9.8", -] - [[package]] name = "fnv" version = "1.0.7" @@ -3168,7 +3155,6 @@ checksum = "d3c48237b9604c5a4702de6b824e02006c3214327564636aef27c1028a8fa0ed" name = "latency" version = "0.1.0" dependencies = [ - "flume", "log", "portable-atomic", "serde", @@ -3421,9 +3407,6 @@ name = "nanorand" version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" -dependencies = [ - "getrandom 0.2.10", -] [[package]] name = "native-tls" @@ -7072,7 +7055,6 @@ dependencies = [ "ethbloom", "ethers", "fdlimit", - "flume", "fstrings", "futures", "glob", diff --git a/TODO.md b/TODO.md index fc124fb4..e008a958 100644 --- a/TODO.md +++ b/TODO.md @@ -189,7 +189,7 @@ These are roughly in order of completition - [x] graceful shutdown. stop taking new requests and don't stop until all outstanding queries are handled - https://github.com/tokio-rs/mini-redis/blob/master/src/shutdown.rs - we need this because we need to be sure all the queries are saved in the db. maybe put stuff in Drop - - need an flume::watch on unflushed stats that we can subscribe to. wait for it to flip to true + - need a tokio::sync::watch on unflushed stats that we can subscribe to. wait for it to flip to true - [x] don't use unix timestamps for response_millis since leap seconds will confuse it - [x] config to allow origins even on the anonymous endpoints - [x] send logs to sentry diff --git a/latency/Cargo.toml b/latency/Cargo.toml index 327746d8..c1199f38 100644 --- a/latency/Cargo.toml +++ b/latency/Cargo.toml @@ -6,7 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -flume = "0.10.14" log = "0.4.19" portable-atomic = { version = "1.3.3", features = ["float"] } serde = { version = "1.0.171", features = [] } diff --git a/latency/src/peak_ewma/mod.rs b/latency/src/peak_ewma/mod.rs index 2861df71..1e6cbaff 100644 --- a/latency/src/peak_ewma/mod.rs +++ b/latency/src/peak_ewma/mod.rs @@ -1,13 +1,12 @@ mod rtt_estimate; -use std::sync::Arc; - -use tokio::task::JoinHandle; -use tokio::time::{Duration, Instant}; -use tracing::{enabled, error, trace, Level}; - use self::rtt_estimate::AtomicRttEstimate; use crate::util::nanos::nanos; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; +use tokio::time::{Duration, Instant}; +use tracing::{enabled, error, trace, Level}; /// Latency calculation using Peak EWMA algorithm /// @@ -18,7 +17,7 @@ pub struct PeakEwmaLatency { /// Join handle for the latency calculation task pub join_handle: JoinHandle<()>, /// Send to update with each request duration - request_tx: flume::Sender, + request_tx: mpsc::Sender, /// Latency average and last update time rtt_estimate: Arc, /// Decay time @@ -35,7 +34,7 @@ impl PeakEwmaLatency { debug_assert!(decay_ns > 0.0, "decay_ns must be positive"); - let (request_tx, request_rx) = flume::bounded(buf_size); + let (request_tx, request_rx) = mpsc::channel(buf_size); let rtt_estimate = Arc::new(AtomicRttEstimate::new(start_latency)); let task = PeakEwmaLatencyTask { request_rx, @@ -92,7 +91,7 @@ impl PeakEwmaLatency { #[derive(Debug)] struct PeakEwmaLatencyTask { /// Receive new request timings for update - request_rx: flume::Receiver, + request_rx: mpsc::Receiver, /// Current estimate and update time rtt_estimate: Arc, /// Last update time, used for decay calculation @@ -103,8 +102,8 @@ struct PeakEwmaLatencyTask { impl PeakEwmaLatencyTask { /// Run the loop for updating latency - async fn run(self) { - while let Ok(rtt) = self.request_rx.recv_async().await { + async fn run(mut self) { + while let Some(rtt) = self.request_rx.recv().await { self.update(rtt); } trace!("latency loop exited"); diff --git a/latency/src/rolling_quantile.rs b/latency/src/rolling_quantile.rs index 5d0f0e6d..ae1f793d 100644 --- a/latency/src/rolling_quantile.rs +++ b/latency/src/rolling_quantile.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use portable_atomic::{AtomicF32, Ordering}; use serde::ser::Serializer; use serde::Serialize; +use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio::time::Duration; use watermill::quantile::RollingQuantile; @@ -12,7 +13,7 @@ pub struct RollingQuantileLatency { /// Join handle for the latency calculation task. pub join_handle: JoinHandle<()>, /// Send to update with each request duration. - latency_tx: flume::Sender, + latency_tx: mpsc::Sender, /// rolling quantile latency in seconds. Updated async. seconds: Arc, } @@ -20,7 +21,7 @@ pub struct RollingQuantileLatency { /// Task to be spawned per-RollingMedianLatency for calculating the value struct RollingQuantileLatencyTask { /// Receive to update each request duration - latency_rx: flume::Receiver, + latency_rx: mpsc::Receiver, /// Current estimate and update time seconds: Arc, /// quantile value. @@ -29,7 +30,7 @@ struct RollingQuantileLatencyTask { impl RollingQuantileLatencyTask { fn new( - latency_rx: flume::Receiver, + latency_rx: mpsc::Receiver, seconds: Arc, q: f32, window_size: usize, @@ -45,7 +46,7 @@ impl RollingQuantileLatencyTask { /// Run the loop for updating latency. async fn run(mut self) { - while let Ok(rtt) = self.latency_rx.recv_async().await { + while let Some(rtt) = self.latency_rx.recv().await { self.update(rtt); } } @@ -88,7 +89,7 @@ impl RollingQuantileLatency { impl RollingQuantileLatency { pub async fn spawn(quantile_value: f32, window_size: usize) -> Self { // TODO: how should queue size and window size be related? - let (latency_tx, latency_rx) = flume::bounded(window_size); + let (latency_tx, latency_rx) = mpsc::channel(window_size); let seconds = Arc::new(AtomicF32::new(0.0)); diff --git a/quick_cache_ttl/Cargo.toml b/quick_cache_ttl/Cargo.toml index 6e0da4f4..5cdcc81c 100644 --- a/quick_cache_ttl/Cargo.toml +++ b/quick_cache_ttl/Cargo.toml @@ -6,7 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -flume = "0.10.14" log = "0.4.18" quick_cache = "0.3.0" serde = "1" diff --git a/quick_cache_ttl/src/kq_cache.rs b/quick_cache_ttl/src/kq_cache.rs index b5a0b244..edde9ae9 100644 --- a/quick_cache_ttl/src/kq_cache.rs +++ b/quick_cache_ttl/src/kq_cache.rs @@ -18,7 +18,7 @@ pub struct KQCacheWithTTL { max_item_weight: NonZeroU32, name: &'static str, ttl: Duration, - tx: flume::Sender<(Instant, Key, Qey)>, + tx: mpsc::Sender<(Instant, Key, Qey)>, weighter: We, pub task_handle: JoinHandle<()>, @@ -27,7 +27,7 @@ pub struct KQCacheWithTTL { struct KQCacheWithTTLTask { cache: Arc>, name: &'static str, - rx: flume::Receiver<(Instant, Key, Qey)>, + rx: mpsc::Receiver<(Instant, Key, Qey)>, } pub struct PlaceholderGuardWithTTL<'a, Key, Qey, Val, We, B> { @@ -54,7 +54,7 @@ impl< hash_builder: B, ttl: Duration, ) -> Self { - let (tx, rx) = flume::unbounded(); + let (tx, rx) = mpsc::unbounded(); let cache = KQCache::with( estimated_items_capacity, diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index d5cbecd1..1d95b568 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -52,7 +52,6 @@ derive_more = { version = "0.99.17", features = ["nightly"] } ethbloom = { version = "0.13.0" } ethers = { version = "2.0.7", default-features = false, features = ["rustls", "ws"] } fdlimit = "0.2.1" -flume = "0.10.14" fstrings = "0.2" futures = { version = "0.3.28" } glob = "0.3.1" diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index c0ff46fb..7b652616 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -50,7 +50,7 @@ use std::str::FromStr; use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::{atomic, Arc}; use std::time::Duration; -use tokio::sync::{broadcast, watch, Semaphore, oneshot}; +use tokio::sync::{broadcast, mpsc, watch, Semaphore, oneshot}; use tokio::task::JoinHandle; use tokio::time::timeout; use tracing::{error, info, trace, warn, Level}; @@ -124,7 +124,7 @@ pub struct Web3ProxyApp { /// TODO: i think i might just delete this entirely. instead use local-only concurrency limits. pub vredis_pool: Option, /// channel for sending stats in a background task - pub stat_sender: Option>, + pub stat_sender: Option>, /// Optional time series database for making pretty graphs that load quickly influxdb_client: Option, @@ -179,8 +179,8 @@ impl Web3ProxyApp { top_config: TopConfig, num_workers: usize, shutdown_sender: broadcast::Sender<()>, - flush_stat_buffer_sender: flume::Sender>, - flush_stat_buffer_receiver: flume::Receiver>, + flush_stat_buffer_sender: mpsc::Sender>, + flush_stat_buffer_receiver: mpsc::Receiver>, ) -> anyhow::Result { let stat_buffer_shutdown_receiver = shutdown_sender.subscribe(); let mut background_shutdown_receiver = shutdown_sender.subscribe(); diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index 5b7d112d..ab58b6aa 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -17,6 +17,7 @@ use http::StatusCode; use serde_json::json; use std::sync::atomic::{self, AtomicU64}; use std::sync::Arc; +use tokio::sync::mpsc; use tokio::time::Instant; use tokio_stream::wrappers::{BroadcastStream, WatchStream}; use tracing::{error, trace}; @@ -28,7 +29,7 @@ impl Web3ProxyApp { jsonrpc_request: JsonRpcRequest, subscription_count: &'a AtomicU64, // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now - response_sender: flume::Sender, + response_sender: mpsc::UnboundedSender, ) -> Web3ProxyResult<(AbortHandle, JsonRpcForwardedResponse)> { let request_metadata = RequestMetadata::new( self, @@ -87,7 +88,7 @@ impl Web3ProxyApp { .rate_limit_close_websocket(&subscription_request_metadata) .await { - let _ = response_sender.send_async(close_message).await; + let _ = response_sender.send(close_message); break; } @@ -112,7 +113,7 @@ impl Web3ProxyApp { // TODO: can we check a content type header? let response_msg = Message::Text(response_str); - if response_sender.send_async(response_msg).await.is_err() { + if response_sender.send(response_msg).is_err() { // TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects. // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; @@ -151,7 +152,7 @@ impl Web3ProxyApp { .rate_limit_close_websocket(&subscription_request_metadata) .await { - let _ = response_sender.send_async(close_message).await; + let _ = response_sender.send(close_message); break; } @@ -182,7 +183,7 @@ impl Web3ProxyApp { // TODO: do clients support binary messages? reply with binary if thats what we were sent let response_msg = Message::Text(response_str); - if response_sender.send_async(response_msg).await.is_err() { + if response_sender.send(response_msg).is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; @@ -222,7 +223,7 @@ impl Web3ProxyApp { .rate_limit_close_websocket(&subscription_request_metadata) .await { - let _ = response_sender.send_async(close_message).await; + let _ = response_sender.send(close_message); break; } @@ -251,7 +252,7 @@ impl Web3ProxyApp { // TODO: do clients support binary messages? let response_msg = Message::Text(response_str); - if response_sender.send_async(response_msg).await.is_err() { + if response_sender.send(response_msg).is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; @@ -291,7 +292,7 @@ impl Web3ProxyApp { .rate_limit_close_websocket(&subscription_request_metadata) .await { - let _ = response_sender.send_async(close_message).await; + let _ = response_sender.send(close_message); break; } @@ -321,7 +322,7 @@ impl Web3ProxyApp { // TODO: do clients support binary messages? let response_msg = Message::Text(response_str); - if response_sender.send_async(response_msg).await.is_err() { + if response_sender.send(response_msg).is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index 3644385d..f363fa3b 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -72,11 +72,11 @@ pub async fn clean_block_number( block_param_id: usize, latest_block: &Web3ProxyBlock, rpcs: &Web3Rpcs, -) -> anyhow::Result { +) -> Web3ProxyResult { match params.as_array_mut() { None => { // TODO: this needs the correct error code in the response - Err(anyhow::anyhow!("params not an array")) + Err(anyhow::anyhow!("params not an array").into()) } Some(params) => match params.get_mut(block_param_id) { None => { @@ -107,7 +107,7 @@ pub async fn clean_block_number( (BlockNumAndHash::from(&block), false) } else { - return Err(anyhow::anyhow!("blockHash missing")); + return Err(anyhow::anyhow!("blockHash missing").into()); } } else { // it might be a string like "latest" or a block number or a block hash @@ -157,7 +157,8 @@ pub async fn clean_block_number( } else { return Err(anyhow::anyhow!( "param not a block identifier, block number, or block hash" - )); + ) + .into()); } }; @@ -370,6 +371,13 @@ impl CacheMode { block, cache_errors: true, }), + Err(Web3ProxyError::NoBlocksKnown) => { + warn!(%method, ?params, "no servers available to get block from params"); + Ok(CacheMode::Cache { + block: head_block.into(), + cache_errors: true, + }) + } Err(err) => { error!(%method, ?params, ?err, "could not get block from params"); Ok(CacheMode::Cache { diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 47288200..a5ae2d71 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -11,6 +11,7 @@ use serde::Deserialize; use serde_inline_default::serde_inline_default; use std::sync::Arc; use std::time::Duration; +use tokio::sync::mpsc; use tracing::warn; pub type BlockAndRpc = (Option, Arc); @@ -278,9 +279,9 @@ impl Web3RpcConfig { block_interval: Duration, http_client: Option, blocks_by_hash_cache: BlocksByHashCache, - block_sender: Option>, + block_sender: Option>, max_head_block_age: Duration, - tx_id_sender: Option>, + tx_id_sender: Option>, ) -> anyhow::Result<(Arc, Web3ProxyJoinHandle<()>)> { if !self.extra.is_empty() { warn!(extra=?self.extra.keys(), "unknown Web3RpcConfig fields!"); diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index 49562e1f..b9eb81d9 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -63,7 +63,6 @@ pub enum Web3ProxyError { EthersHttpClient(ethers::prelude::HttpClientError), EthersProvider(ethers::prelude::ProviderError), EthersWsClient(ethers::prelude::WsClientError), - FlumeRecv(flume::RecvError), GasEstimateNotU256, HdrRecord(hdrhistogram::errors::RecordError), Headers(headers::Error), @@ -140,7 +139,6 @@ pub enum Web3ProxyError { #[from(ignore)] RefererNotAllowed(headers::Referer), SemaphoreAcquireError(AcquireError), - SendAppStatError(flume::SendError), SerdeJson(serde_json::Error), SiweVerification(VerificationError), /// simple way to return an error message to the user and an anyhow to our logs @@ -332,17 +330,6 @@ impl Web3ProxyError { ) } } - Self::FlumeRecv(err) => { - warn!(?err, "FlumeRecvError"); - ( - StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcErrorData { - message: "flume recv error!".into(), - code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), - data: None, - }, - ) - } // Self::JsonRpcForwardedError(x) => (StatusCode::OK, x), Self::GasEstimateNotU256 => { trace!("GasEstimateNotU256"); @@ -886,17 +873,6 @@ impl Web3ProxyError { }, ) } - Self::SendAppStatError(err) => { - error!(?err, "SendAppStatError"); - ( - StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcErrorData { - message: "error stat_sender sending response_stat".into(), - code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), - data: None, - }, - ) - } Self::SerdeJson(err) => { trace!(?err, "serde json"); ( diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 5b56182a..12315077 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -42,7 +42,7 @@ use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize}; use std::time::Duration; use std::{net::IpAddr, str::FromStr, sync::Arc}; use tokio::sync::RwLock as AsyncRwLock; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore}; use tokio::task::JoinHandle; use tokio::time::Instant; use tracing::{error, trace, warn}; @@ -346,7 +346,7 @@ pub struct RequestMetadata { pub kafka_debug_logger: Option>, /// Cancel-safe channel for sending stats to the buffer - pub stat_sender: Option>, + pub stat_sender: Option>, } impl Default for Authorization { @@ -517,7 +517,7 @@ impl RequestMetadata { let stat: AppStat = stat.into(); - if let Err(err) = stat_sender.try_send(stat) { + if let Err(err) = stat_sender.send(stat) { error!(?err, "failed sending stat"); // TODO: return it? that seems like it might cause an infinite loop // TODO: but dropping stats is bad... hmm... i guess better to undercharge customers than overcharge diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 20cd93e7..d1a8538e 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -34,7 +34,7 @@ use std::net::IpAddr; use std::str::from_utf8_mut; use std::sync::atomic::AtomicU64; use std::sync::Arc; -use tokio::sync::{broadcast, OwnedSemaphorePermit, RwLock as AsyncRwLock}; +use tokio::sync::{broadcast, mpsc, OwnedSemaphorePermit, RwLock as AsyncRwLock}; use tracing::{info, trace}; /// How to select backend servers for a request @@ -306,7 +306,8 @@ async fn proxy_web3_socket( let (ws_tx, ws_rx) = socket.split(); // create a channel for our reader and writer can communicate. todo: benchmark different channels - let (response_sender, response_receiver) = flume::unbounded::(); + // TODO: this should be bounded. async blocking on too many messages would be fine + let (response_sender, response_receiver) = mpsc::unbounded_channel::(); tokio::spawn(write_web3_socket(response_receiver, ws_tx)); tokio::spawn(read_web3_socket(app, authorization, ws_rx, response_sender)); @@ -317,7 +318,7 @@ async fn handle_socket_payload( app: Arc, authorization: &Arc, payload: &str, - response_sender: &flume::Sender, + response_sender: &mpsc::UnboundedSender, subscription_count: &AtomicU64, subscriptions: Arc>>, ) -> Web3ProxyResult<(Message, Option)> { @@ -434,7 +435,7 @@ async fn read_web3_socket( app: Arc, authorization: Arc, mut ws_rx: SplitStream, - response_sender: flume::Sender, + response_sender: mpsc::UnboundedSender, ) { let subscriptions = Arc::new(AsyncRwLock::new(HashMap::new())); let subscription_count = Arc::new(AtomicU64::new(1)); @@ -519,7 +520,7 @@ async fn read_web3_socket( } }; - if response_sender.send_async(response_msg).await.is_err() { + if response_sender.send(response_msg).is_err() { let _ = close_sender.send(true); }; }; @@ -537,12 +538,12 @@ async fn read_web3_socket( } async fn write_web3_socket( - response_rx: flume::Receiver, + mut response_rx: mpsc::UnboundedReceiver, mut ws_tx: SplitSink, ) { // TODO: increment counter for open websockets - while let Ok(msg) = response_rx.recv_async().await { + while let Some(msg) = response_rx.recv().await { // a response is ready // we do not check rate limits here. they are checked before putting things into response_sender; diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index c6314653..aa7b19b7 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -15,7 +15,7 @@ use serde_json::json; use std::hash::Hash; use std::time::Duration; use std::{fmt::Display, sync::Arc}; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, mpsc}; use tokio::time::timeout; use tracing::{debug, error, warn}; @@ -427,7 +427,7 @@ impl Web3Rpcs { pub(super) async fn process_incoming_blocks( &self, authorization: &Arc, - block_receiver: flume::Receiver, + mut block_receiver: mpsc::UnboundedReceiver, // TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed // Geth's subscriptions have the same potential for skipping blocks. pending_tx_sender: Option>, @@ -441,8 +441,8 @@ impl Web3Rpcs { let mut had_first_success = false; loop { - match timeout(double_block_time, block_receiver.recv_async()).await { - Ok(Ok((new_block, rpc))) => { + match timeout(double_block_time, block_receiver.recv()).await { + Ok(Some((new_block, rpc))) => { let rpc_name = rpc.name.clone(); let rpc_is_backup = rpc.backup; @@ -485,10 +485,9 @@ impl Web3Rpcs { } } } - Ok(Err(err)) => { + Ok(None) => { // TODO: panic is probably too much, but getting here is definitely not good - error!("block_receiver on {} exited! {:#?}", self, err); - return Err(err.into()); + return Err(anyhow::anyhow!("block_receiver on {} exited", self).into()); } Err(_) => { // TODO: what timeout on this? diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 43f45334..700e5733 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -31,7 +31,7 @@ use std::fmt::{self, Display}; use std::sync::atomic::Ordering; use std::sync::Arc; use tokio::select; -use tokio::sync::{broadcast, watch}; +use tokio::sync::{broadcast, mpsc, watch, RwLock as AsyncRwLock}; use tokio::time::{sleep, sleep_until, Duration, Instant}; use tracing::{debug, error, info, trace, warn}; @@ -42,7 +42,7 @@ pub struct Web3Rpcs { pub(crate) name: String, pub(crate) chain_id: u64, /// if watch_consensus_head_sender is some, Web3Rpc inside self will send blocks here when they get them - pub(crate) block_sender: flume::Sender<(Option, Arc)>, + pub(crate) block_sender: mpsc::UnboundedSender<(Option, Arc)>, /// any requests will be forwarded to one (or more) of these connections /// TODO: hopefully this not being an async lock will be okay. if you need it across awaits, clone the arc pub(crate) by_name: RwLock>>, @@ -55,8 +55,8 @@ pub struct Web3Rpcs { pub(super) watch_head_block: Option>>, /// keep track of transactions that we have sent through subscriptions pub(super) pending_transaction_cache: Cache, - pub(super) pending_tx_id_receiver: flume::Receiver, - pub(super) pending_tx_id_sender: flume::Sender, + pub(super) pending_tx_id_receiver: AsyncRwLock>, + pub(super) pending_tx_id_sender: mpsc::UnboundedSender, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// all blocks, including orphans pub(super) blocks_by_hash: BlocksByHashCache, @@ -91,8 +91,8 @@ impl Web3Rpcs { Web3ProxyJoinHandle<()>, watch::Receiver>>, )> { - let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); - let (block_sender, block_receiver) = flume::unbounded::(); + let (pending_tx_id_sender, pending_tx_id_receiver) = mpsc::unbounded_channel(); + let (block_sender, block_receiver) = mpsc::unbounded_channel::(); // these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes // TODO: actual weighter on this @@ -133,7 +133,7 @@ impl Web3Rpcs { min_sum_soft_limit, name, pending_transaction_cache, - pending_tx_id_receiver, + pending_tx_id_receiver: AsyncRwLock::new(pending_tx_id_receiver), pending_tx_id_sender, watch_head_block: watch_consensus_head_sender, watch_ranked_rpcs: watch_consensus_rpcs_sender, @@ -329,7 +329,7 @@ impl Web3Rpcs { async fn subscribe( self: Arc, authorization: Arc, - block_receiver: flume::Receiver, + block_receiver: mpsc::UnboundedReceiver, pending_tx_sender: Option>, ) -> Web3ProxyResult<()> { let mut futures = vec![]; @@ -341,10 +341,11 @@ impl Web3Rpcs { if let Some(pending_tx_sender) = pending_tx_sender.clone() { let clone = self.clone(); let authorization = authorization.clone(); - let pending_tx_id_receiver = self.pending_tx_id_receiver.clone(); let handle = tokio::task::spawn(async move { // TODO: set up this future the same as the block funnel - while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await { + while let Some((pending_tx_id, rpc)) = + clone.pending_tx_id_receiver.write().await.recv().await + { let f = clone.clone().process_incoming_tx_id( authorization.clone(), rpc, @@ -1323,7 +1324,7 @@ impl Serialize for Web3Rpcs { where S: Serializer, { - let mut state = serializer.serialize_struct("Web3Rpcs", 6)?; + let mut state = serializer.serialize_struct("Web3Rpcs", 5)?; { let by_name = self.by_name.read(); @@ -1352,8 +1353,6 @@ impl Serialize for Web3Rpcs { ), )?; - state.serialize_field("block_sender_len", &self.block_sender.len())?; - state.serialize_field( "watch_consensus_rpcs_receivers", &self.watch_ranked_rpcs.receiver_count(), @@ -1535,8 +1534,8 @@ mod tests { let head_rpc = Arc::new(head_rpc); let lagged_rpc = Arc::new(lagged_rpc); - let (block_sender, _block_receiver) = flume::unbounded(); - let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); + let (block_sender, _block_receiver) = mpsc::unbounded_channel(); + let (pending_tx_id_sender, pending_tx_id_receiver) = mpsc::unbounded_channel(); let (watch_ranked_rpcs, _watch_consensus_rpcs_receiver) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); @@ -1557,7 +1556,7 @@ mod tests { pending_transaction_cache: CacheBuilder::new(100) .time_to_live(Duration::from_secs(60)) .build(), - pending_tx_id_receiver, + pending_tx_id_receiver: AsyncRwLock::new(pending_tx_id_receiver), pending_tx_id_sender, blocks_by_hash: CacheBuilder::new(100) .time_to_live(Duration::from_secs(60)) @@ -1805,8 +1804,8 @@ mod tests { let pruned_rpc = Arc::new(pruned_rpc); let archive_rpc = Arc::new(archive_rpc); - let (block_sender, _) = flume::unbounded(); - let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); + let (block_sender, _) = mpsc::unbounded_channel(); + let (pending_tx_id_sender, pending_tx_id_receiver) = mpsc::unbounded_channel(); let (watch_ranked_rpcs, _) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); @@ -1826,7 +1825,7 @@ mod tests { pending_transaction_cache: CacheBuilder::new(100) .time_to_live(Duration::from_secs(120)) .build(), - pending_tx_id_receiver, + pending_tx_id_receiver: AsyncRwLock::new(pending_tx_id_receiver), pending_tx_id_sender, blocks_by_hash: CacheBuilder::new(100) .time_to_live(Duration::from_secs(120)) @@ -1989,8 +1988,8 @@ mod tests { let mock_geth = Arc::new(mock_geth); let mock_erigon_archive = Arc::new(mock_erigon_archive); - let (block_sender, _) = flume::unbounded(); - let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); + let (block_sender, _) = mpsc::unbounded_channel(); + let (pending_tx_id_sender, pending_tx_id_receiver) = mpsc::unbounded_channel(); let (watch_ranked_rpcs, _) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); @@ -2012,7 +2011,7 @@ mod tests { watch_head_block: Some(watch_consensus_head_sender), watch_ranked_rpcs, pending_transaction_cache: Cache::new(10_000), - pending_tx_id_receiver, + pending_tx_id_receiver: AsyncRwLock::new(pending_tx_id_receiver), pending_tx_id_sender, blocks_by_hash: Cache::new(10_000), blocks_by_number: Cache::new(10_000), diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 9e4e60e9..675b33cb 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -26,7 +26,7 @@ use std::fmt; use std::hash::{Hash, Hasher}; use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize}; use std::{cmp::Ordering, sync::Arc}; -use tokio::sync::{watch, RwLock as AsyncRwLock}; +use tokio::sync::{mpsc, watch, RwLock as AsyncRwLock}; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; use tracing::{debug, error, info, trace, warn, Level}; use url::Url; @@ -98,9 +98,9 @@ impl Web3Rpc { redis_pool: Option, block_interval: Duration, block_map: BlocksByHashCache, - block_and_rpc_sender: Option>, + block_and_rpc_sender: Option>, max_head_block_age: Duration, - tx_id_sender: Option)>>, + tx_id_sender: Option)>>, ) -> anyhow::Result<(Arc, Web3ProxyJoinHandle<()>)> { let created_at = Instant::now(); @@ -465,7 +465,7 @@ impl Web3Rpc { pub(crate) async fn send_head_block_result( self: &Arc, new_head_block: Web3ProxyResult>, - block_and_rpc_sender: &flume::Sender, + block_and_rpc_sender: &mpsc::UnboundedSender, block_map: &BlocksByHashCache, ) -> Web3ProxyResult<()> { let head_block_sender = self.head_block.as_ref().unwrap(); @@ -530,8 +530,7 @@ impl Web3Rpc { // tell web3rpcs about this rpc having this block block_and_rpc_sender - .send_async((new_head_block, self.clone())) - .await + .send((new_head_block, self.clone())) .context("block_and_rpc_sender failed sending")?; Ok(()) @@ -601,9 +600,9 @@ impl Web3Rpc { async fn subscribe_with_reconnect( self: Arc, block_map: BlocksByHashCache, - block_and_rpc_sender: Option>, + block_and_rpc_sender: Option>, chain_id: u64, - tx_id_sender: Option)>>, + tx_id_sender: Option)>>, ) -> Web3ProxyResult<()> { loop { if let Err(err) = self @@ -644,9 +643,9 @@ impl Web3Rpc { async fn subscribe( self: Arc, block_map: BlocksByHashCache, - block_and_rpc_sender: Option>, + block_and_rpc_sender: Option>, chain_id: u64, - tx_id_sender: Option)>>, + tx_id_sender: Option)>>, ) -> Web3ProxyResult<()> { let error_handler = if self.backup { Some(RequestErrorHandler::DebugLevel) @@ -803,7 +802,7 @@ impl Web3Rpc { /// Subscribe to new blocks. async fn subscribe_new_heads( self: &Arc, - block_sender: flume::Sender, + block_sender: mpsc::UnboundedSender, block_map: BlocksByHashCache, subscribe_stop_rx: watch::Receiver, ) -> Web3ProxyResult<()> { @@ -898,7 +897,7 @@ impl Web3Rpc { /// Turn on the firehose of pending transactions async fn subscribe_pending_transactions( self: Arc, - tx_id_sender: flume::Sender<(TxHash, Arc)>, + tx_id_sender: mpsc::UnboundedSender<(TxHash, Arc)>, mut subscribe_stop_rx: watch::Receiver, ) -> Web3ProxyResult<()> { // TODO: check that it actually changed to true diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index 866d468b..b833da2b 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -10,7 +10,7 @@ use influxdb2::api::write::TimestampPrecision; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::DatabaseConnection; use std::time::Duration; -use tokio::sync::{broadcast, oneshot}; +use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::time::{interval, sleep}; use tracing::{error, info, trace}; @@ -33,7 +33,7 @@ pub struct BufferedRpcQueryStats { #[derive(From)] pub struct SpawnedStatBuffer { - pub stat_sender: flume::Sender, + pub stat_sender: mpsc::UnboundedSender, /// these handles are important and must be allowed to finish pub background_handle: Web3ProxyJoinHandle<()>, } @@ -53,7 +53,7 @@ pub struct StatBuffer { tsdb_save_interval_seconds: u32, user_balance_cache: UserBalanceCache, - _flush_sender: flume::Sender>, + _flush_sender: mpsc::Sender>, } impl StatBuffer { @@ -69,8 +69,8 @@ impl StatBuffer { user_balance_cache: Option, shutdown_receiver: broadcast::Receiver<()>, tsdb_save_interval_seconds: u32, - flush_sender: flume::Sender>, - flush_receiver: flume::Receiver>, + flush_sender: mpsc::Sender>, + flush_receiver: mpsc::Receiver>, ) -> anyhow::Result> { if influxdb_bucket.is_none() { influxdb_client = None; @@ -80,7 +80,7 @@ impl StatBuffer { return Ok(None); } - let (stat_sender, stat_receiver) = flume::unbounded(); + let (stat_sender, stat_receiver) = mpsc::unbounded_channel(); let timestamp_precision = TimestampPrecision::Seconds; @@ -113,9 +113,9 @@ impl StatBuffer { async fn aggregate_and_save_loop( &mut self, - stat_receiver: flume::Receiver, + mut stat_receiver: mpsc::UnboundedReceiver, mut shutdown_receiver: broadcast::Receiver<()>, - flush_receiver: flume::Receiver>, + mut flush_receiver: mpsc::Receiver>, ) -> Web3ProxyResult<()> { let mut tsdb_save_interval = interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64)); @@ -124,11 +124,11 @@ impl StatBuffer { loop { tokio::select! { - stat = stat_receiver.recv_async() => { + stat = stat_receiver.recv() => { // trace!("Received stat"); // save the stat to a buffer match stat { - Ok(AppStat::RpcQuery(stat)) => { + Some(AppStat::RpcQuery(stat)) => { if self.influxdb_client.is_some() { // TODO: round the timestamp at all? @@ -145,8 +145,8 @@ impl StatBuffer { self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat).await; } } - Err(err) => { - info!("error receiving stat: {}", err); + None => { + info!("error receiving stat"); break; } } @@ -165,9 +165,9 @@ impl StatBuffer { trace!("Saved {} stats to the tsdb", count); } } - x = flush_receiver.recv_async() => { + x = flush_receiver.recv() => { match x { - Ok(x) => { + Some(x) => { trace!("flush"); let tsdb_count = self.save_tsdb_stats().await; @@ -184,8 +184,9 @@ impl StatBuffer { error!(%tsdb_count, %relational_count, ?err, "unable to notify about flushed stats"); } } - Err(err) => { - error!(?err, "unable to flush stat buffer!"); + None => { + error!("unable to flush stat buffer!"); + break; } } } diff --git a/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs b/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs index 8440696b..e88c7202 100644 --- a/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs +++ b/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs @@ -16,7 +16,7 @@ use migration::{Expr, Value}; use parking_lot::Mutex; use std::num::NonZeroU64; use std::sync::Arc; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, mpsc}; use tokio::time::Instant; use tracing::{error, info}; use ulid::Ulid; @@ -72,7 +72,7 @@ impl MigrateStatsToV2SubCommand { None => None, }; - let (flush_sender, flush_receiver) = flume::bounded(1); + let (flush_sender, flush_receiver) = mpsc::channel(1); // Spawn the stat-sender let emitter_spawn = StatBuffer::try_spawn( diff --git a/web3_proxy/src/sub_commands/proxyd.rs b/web3_proxy/src/sub_commands/proxyd.rs index e55c4576..f36a7a87 100644 --- a/web3_proxy/src/sub_commands/proxyd.rs +++ b/web3_proxy/src/sub_commands/proxyd.rs @@ -12,9 +12,9 @@ use std::sync::Arc; use std::time::Duration; use std::{fs, thread}; use tokio::select; -use tokio::sync::{broadcast, oneshot}; +use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::time::{sleep_until, Instant}; -use tracing::{debug, error, info, trace, warn}; +use tracing::{error, info, trace, warn}; /// start the main proxy daemon #[derive(FromArgs, PartialEq, Debug, Eq)] @@ -42,7 +42,7 @@ impl ProxydSubCommand { let frontend_port = Arc::new(self.port.into()); let prometheus_port = Arc::new(self.prometheus_port.into()); - let (flush_stat_buffer_sender, flush_stat_buffer_receiver) = flume::bounded(8); + let (flush_stat_buffer_sender, flush_stat_buffer_receiver) = mpsc::channel(8); Self::_main( top_config, @@ -66,8 +66,8 @@ impl ProxydSubCommand { prometheus_port: Arc, num_workers: usize, frontend_shutdown_sender: broadcast::Sender<()>, - flush_stat_buffer_sender: flume::Sender>, - flush_stat_buffer_receiver: flume::Receiver>, + flush_stat_buffer_sender: mpsc::Sender>, + flush_stat_buffer_receiver: mpsc::Receiver>, ) -> anyhow::Result<()> { // tokio has code for catching ctrl+c so we use that to shut down in most cases // frontend_shutdown_sender is currently only used in tests, but we might make a /shutdown endpoint or something diff --git a/web3_proxy/tests/common/app.rs b/web3_proxy/tests/common/app.rs index 1e75c030..c2b74f20 100644 --- a/web3_proxy/tests/common/app.rs +++ b/web3_proxy/tests/common/app.rs @@ -23,7 +23,7 @@ use tokio::{ process::Command as AsyncCommand, sync::{ broadcast::{self, error::SendError}, - oneshot, + mpsc, oneshot, }, task::JoinHandle, time::{sleep, Instant}, @@ -59,7 +59,7 @@ pub struct TestApp { pub proxy_provider: Provider, /// tell the app to flush stats to the database - flush_stat_buffer_sender: flume::Sender>, + flush_stat_buffer_sender: mpsc::Sender>, /// tell the app to shut down (use `self.stop()`). shutdown_sender: broadcast::Sender<()>, @@ -278,7 +278,7 @@ impl TestApp { let frontend_port_arc = Arc::new(AtomicU16::new(0)); let prometheus_port_arc = Arc::new(AtomicU16::new(0)); - let (flush_stat_buffer_sender, flush_stat_buffer_receiver) = flume::bounded(1); + let (flush_stat_buffer_sender, flush_stat_buffer_receiver) = mpsc::channel(1); // spawn the app // TODO: spawn in a thread so we can run from non-async tests and so the Drop impl can wait for it to stop @@ -331,7 +331,7 @@ impl TestApp { pub async fn flush_stats(&self) -> anyhow::Result<(usize, usize)> { let (tx, rx) = oneshot::channel(); - self.flush_stat_buffer_sender.send(tx)?; + self.flush_stat_buffer_sender.send(tx).await?; let x = rx.await?;