From cc2eb9e8624aa76263adb6bf7c5f257c356058f6 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 7 Oct 2023 03:41:11 -0700 Subject: [PATCH] no more overflow on the pending txid firehose --- Cargo.lock | 11 +--- deduped_broadcast/Cargo.toml | 2 +- deduped_broadcast/src/lib.rs | 120 ++++++++++++----------------------- web3_proxy/src/app/mod.rs | 13 +++- web3_proxy/src/app/ws.rs | 106 +++++++++++++++++-------------- web3_proxy/src/config.rs | 5 +- web3_proxy/src/rpcs/many.rs | 9 +-- web3_proxy/src/rpcs/one.rs | 31 ++++----- 8 files changed, 136 insertions(+), 161 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9e470220..f4a94690 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1288,7 +1288,7 @@ dependencies = [ name = "deduped_broadcast" version = "0.2.2" dependencies = [ - "lru", + "moka", "serde", "tokio", "tracing", @@ -3055,15 +3055,6 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" -[[package]] -name = "lru" -version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1efa59af2ddfad1854ae27d75009d538d0998b4b2fd47083e743ac1a10e46c60" -dependencies = [ - "hashbrown 0.14.1", -] - [[package]] name = "mach2" version = "0.4.1" diff --git a/deduped_broadcast/Cargo.toml b/deduped_broadcast/Cargo.toml index 376fa9e2..e6f10c70 100644 --- a/deduped_broadcast/Cargo.toml +++ b/deduped_broadcast/Cargo.toml @@ -7,6 +7,6 @@ edition = "2021" [dependencies] tracing = "0.1" -lru = { version = "0.12.0" } +moka = { version = "0.12.1", features = ["future"] } serde = "1" tokio = { version = "1.32.0", features = ["full"] } diff --git a/deduped_broadcast/src/lib.rs b/deduped_broadcast/src/lib.rs index 29936ca0..9829a6b8 100644 --- a/deduped_broadcast/src/lib.rs +++ b/deduped_broadcast/src/lib.rs @@ -1,106 +1,65 @@ +use moka::future::{Cache, CacheBuilder}; use serde::ser::SerializeStruct; use serde::{Serialize, Serializer}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::{ - collections::hash_map::DefaultHasher, - fmt::Debug, - hash::{Hash, Hasher}, -}; -use tokio::sync::{broadcast, mpsc}; - -struct DedupedBroadcasterTask -where - T: Clone + Debug + Hash + Send + Sync + 'static, -{ - unfiltered_rx: mpsc::Receiver, - broadcast_filtered_tx: Arc>, - cache: lru::LruCache, - total_unfiltered: Arc, - total_filtered: Arc, - total_broadcasts: Arc, -} +use std::time::Duration; +use std::{fmt::Debug, hash::Hash}; +use tokio::sync::broadcast; pub struct DedupedBroadcaster where T: Clone + Debug + Hash + Send + Sync + 'static, { - /// takes in things to broadcast. Can include duplicates, but only one will be sent. - unfiltered_tx: mpsc::Sender, /// subscribe to this to get deduplicated items - broadcast_filtered_tx: Arc>, + broadcast_filtered_tx: broadcast::Sender, + cache: Cache, total_unfiltered: Arc, total_filtered: Arc, total_broadcasts: Arc, } -impl DedupedBroadcasterTask -where - T: Clone + Debug + Hash + Send + Sync + 'static, -{ - /// forward things from input_receiver to output_sender if they aren't in the cache - async fn run(mut self) { - while let Some(item) = self.unfiltered_rx.recv().await { - let mut hasher = DefaultHasher::new(); - item.hash(&mut hasher); - let hashed = hasher.finish(); - - self.total_unfiltered - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - - // we don't actually care what the return value is. we just want to send only if the cache is empty - // TODO: count new vs unique - self.cache.get_or_insert(hashed, || { - self.total_filtered - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - - if let Ok(x) = self.broadcast_filtered_tx.send(item) { - self.total_broadcasts.fetch_add(x, Ordering::Relaxed); - } - }); - } - } -} - impl DedupedBroadcaster where - T: Clone + Debug + Hash + Send + Sync + 'static, + T: Clone + Debug + Eq + Hash + PartialEq + Send + Sync + 'static, { - pub fn new(capacity: usize, cache_capacity: usize) -> Self { - let (unfiltered_tx, unfiltered_rx) = mpsc::channel::(capacity); + pub fn new(capacity: usize, cache_capacity: usize) -> Arc { let (broadcast_filtered_tx, _) = broadcast::channel(capacity); - let cache = lru::LruCache::new(cache_capacity.try_into().unwrap()); - - let broadcast_filtered_tx = Arc::new(broadcast_filtered_tx); + let cache = CacheBuilder::new(cache_capacity as u64) + .time_to_idle(Duration::from_secs(10 * 60)) + .name("DedupedBroadcaster") + .build(); let total_unfiltered = Arc::new(AtomicUsize::new(0)); let total_filtered = Arc::new(AtomicUsize::new(0)); let total_broadcasts = Arc::new(AtomicUsize::new(0)); - let task = DedupedBroadcasterTask { - unfiltered_rx, - broadcast_filtered_tx: broadcast_filtered_tx.clone(), + let x = Self { + broadcast_filtered_tx, cache, - total_unfiltered: total_unfiltered.clone(), - total_filtered: total_filtered.clone(), - total_broadcasts: total_broadcasts.clone(), + total_broadcasts, + total_filtered, + total_unfiltered, }; - // TODO: do something with the handle? - tokio::task::spawn(task.run()); - - Self { - unfiltered_tx, - broadcast_filtered_tx, - total_unfiltered, - total_filtered, - total_broadcasts, - } + Arc::new(x) } - pub fn sender(&self) -> &mpsc::Sender { - &self.unfiltered_tx + /// filter duplicates and send the rest to any subscribers + /// TODO: change this to be `send` and put a moka cache here instead of lru. then the de-dupe load will be spread across senders + pub async fn send(&self, item: T) { + self.total_unfiltered.fetch_add(1, Ordering::Relaxed); + + self.cache + .get_with(item.clone(), async { + self.total_filtered.fetch_add(1, Ordering::Relaxed); + + if let Ok(x) = self.broadcast_filtered_tx.send(item) { + self.total_broadcasts.fetch_add(x, Ordering::Relaxed); + } + }) + .await; } pub fn subscribe(&self) -> broadcast::Receiver { @@ -172,18 +131,19 @@ mod tests { #[tokio::test] async fn test_deduped_broadcaster() { + // TODO: what sizes? let broadcaster = DedupedBroadcaster::new(10, 10); let mut receiver_1 = broadcaster.subscribe(); let _receiver_2 = broadcaster.subscribe(); - broadcaster.sender().send(1).await.unwrap(); - broadcaster.sender().send(1).await.unwrap(); - broadcaster.sender().send(2).await.unwrap(); - broadcaster.sender().send(1).await.unwrap(); - broadcaster.sender().send(2).await.unwrap(); - broadcaster.sender().send(3).await.unwrap(); - broadcaster.sender().send(3).await.unwrap(); + broadcaster.send(1).await; + broadcaster.send(1).await; + broadcaster.send(2).await; + broadcaster.send(1).await; + broadcaster.send(2).await; + broadcaster.send(3).await; + broadcaster.send(3).await; yield_now().await; diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index b455666d..2cf17fb1 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -87,7 +87,7 @@ pub struct Web3ProxyApp { /// TODO: broadcast channel instead? pub watch_consensus_head_receiver: watch::Receiver>, /// rpc clients that subscribe to newPendingTransactions use this channel - pub pending_txid_firehose: deduped_broadcast::DedupedBroadcaster, + pub pending_txid_firehose: Arc>, pub hostname: Option, pub frontend_port: Arc, /// rate limit anonymous users @@ -437,7 +437,8 @@ impl Web3ProxyApp { let chain_id = top_config.app.chain_id; - let deduped_txid_firehose = DedupedBroadcaster::new(5_000, 5_000); + // TODO: deduped_txid_firehose capacity from config + let deduped_txid_firehose = DedupedBroadcaster::new(100, 20_000); // TODO: remove this. it should only be done by apply_top_config let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn( @@ -447,7 +448,7 @@ impl Web3ProxyApp { top_config.app.min_sum_soft_limit, "balanced rpcs".into(), Some(watch_consensus_head_sender), - Some(deduped_txid_firehose.sender().clone()), + Some(deduped_txid_firehose.clone()), ) .await .web3_context("spawning balanced rpcs")?; @@ -1414,6 +1415,10 @@ impl Web3ProxyApp { // TODO: decode the transaction // TODO: error if the chain_id is incorrect + // TODO: return now if already confirmed + // TODO: error if the nonce is way far in the future + + // TODO: self.pending_txid_firehose.send(txid).await; let response = self .try_send_protected( @@ -1466,6 +1471,8 @@ impl Web3ProxyApp { } } + // TODO: if successful, send the txid to the pending transaction firehose + // emit transaction count stats // TODO: use this cache to avoid sending duplicate transactions? // TODO: different salt for ips and transactions? diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index a4bf0d94..eca58d01 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -61,6 +61,8 @@ impl Web3ProxyApp { // TODO: DRY This up. lots of duplication between newHeads and newPendingTransactions match subscribe_to { "newHeads" => { + // we clone the watch before spawning so that theres less chance of missing anything + // TODO: watch receivers can miss a block. is that okay? let head_block_receiver = self.watch_consensus_head_receiver.clone(); let app = self.clone(); let authorization = web3_request.authorization.clone(); @@ -144,6 +146,7 @@ impl Web3ProxyApp { } // TODO: bring back the other custom subscription types that had the full transaction object "newPendingTransactions" => { + // we subscribe before spawning so that theres less chance of missing anything let pending_txid_firehose = self.pending_txid_firehose.subscribe(); let app = self.clone(); let authorization = web3_request.authorization.clone(); @@ -154,61 +157,72 @@ impl Web3ProxyApp { subscription_registration, ); - while let Some(Ok(new_txid)) = pending_txid_firehose.next().await { - // TODO: include the head_block here? - match Web3Request::new_with_app( - &app, - authorization.clone(), - None, - RequestOrMethod::Method( - "eth_subscribe(newPendingTransactions)".into(), - 0, - ), - None, - ) - .await - { + while let Some(maybe_txid) = pending_txid_firehose.next().await { + match maybe_txid { Err(err) => { - error!(?err, "error creating subscription_web3_request"); - // what should we do to turn this error into a message for them? - break; + trace!( + ?err, + "error inside newPendingTransactions. probably lagged" + ); + continue; } - Ok(subscription_web3_request) => { - // check if we should close the websocket connection - if let Some(close_message) = app - .rate_limit_close_websocket(&subscription_web3_request) - .await + Ok(new_txid) => { + // TODO: include the head_block here? + match Web3Request::new_with_app( + &app, + authorization.clone(), + None, + RequestOrMethod::Method( + "eth_subscribe(newPendingTransactions)".into(), + 0, + ), + None, + ) + .await { - let _ = response_sender.send(close_message).await; - break; - } + Err(err) => { + error!(?err, "error creating subscription_web3_request"); + // what should we do to turn this error into a message for them? + break; + } + Ok(subscription_web3_request) => { + // check if we should close the websocket connection + if let Some(close_message) = app + .rate_limit_close_websocket(&subscription_web3_request) + .await + { + let _ = response_sender.send(close_message).await; + break; + } - // TODO: make a struct/helper function for this - let response_json = json!({ - "jsonrpc": "2.0", - "method":"eth_subscription", - "params": { - "subscription": subscription_id, - "result": new_txid, - }, - }); + // TODO: make a struct/helper function for this + let response_json = json!({ + "jsonrpc": "2.0", + "method":"eth_subscription", + "params": { + "subscription": subscription_id, + "result": new_txid, + }, + }); - let response_str = serde_json::to_string(&response_json) - .expect("this should always be valid json"); + let response_str = serde_json::to_string(&response_json) + .expect("this should always be valid json"); - // we could use JsonRpcForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier - let response_bytes = response_str.len(); + // we could use JsonRpcForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier + let response_bytes = response_str.len(); - subscription_web3_request.add_response(response_bytes); + subscription_web3_request.add_response(response_bytes); - // TODO: do clients support binary messages? - // TODO: can we check a content type header? - let response_msg = Message::Text(response_str); + // TODO: do clients support binary messages? + // TODO: can we check a content type header? + let response_msg = Message::Text(response_str); - if response_sender.send(response_msg).await.is_err() { - // TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects. - // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? - break; + if response_sender.send(response_msg).await.is_err() { + // TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects. + // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? + break; + } + } } } } diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index c71b5c57..4ae81b02 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -3,6 +3,7 @@ use crate::compute_units::default_usd_per_cu; use crate::rpcs::blockchain::{BlocksByHashCache, Web3ProxyBlock}; use crate::rpcs::one::Web3Rpc; use argh::FromArgs; +use deduped_broadcast::DedupedBroadcaster; use ethers::prelude::{Address, TxHash}; use ethers::types::{U256, U64}; use hashbrown::HashMap; @@ -426,7 +427,7 @@ impl Web3RpcConfig { http_client: Option, blocks_by_hash_cache: BlocksByHashCache, block_and_rpc_sender: Option>, - pending_txid_firehouse_sender: Option>, + pending_txid_firehouse: Option>>, max_head_block_age: Duration, ) -> anyhow::Result<(Arc, Web3ProxyJoinHandle<()>)> { if !self.extra.is_empty() { @@ -444,7 +445,7 @@ impl Web3RpcConfig { block_interval, blocks_by_hash_cache, block_and_rpc_sender, - pending_txid_firehouse_sender, + pending_txid_firehouse, max_head_block_age, ) .await diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 5fb267ff..844aa82b 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -9,6 +9,7 @@ use crate::frontend::authorization::Web3Request; use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::frontend::status::MokaCacheSerializer; use crate::jsonrpc::{self, JsonRpcErrorData, JsonRpcParams, JsonRpcResultData}; +use deduped_broadcast::DedupedBroadcaster; use derive_more::From; use ethers::prelude::{TxHash, U64}; use futures::future::try_join_all; @@ -62,7 +63,7 @@ pub struct Web3Rpcs { /// calculated based on max_head_block_lag and averge block times pub(super) max_head_block_age: Duration, /// all of the pending txids for all of the rpcs. this still has duplicates - pub(super) pending_txid_firehose_sender: Option>, + pub(super) pending_txid_firehose: Option>>, } /// this is a RankedRpcs that should be ready to use @@ -102,7 +103,7 @@ impl Web3Rpcs { min_sum_soft_limit: u32, name: Cow<'static, str>, watch_consensus_head_sender: Option>>, - pending_txid_firehose_sender: Option>, + pending_txid_firehose: Option>>, ) -> anyhow::Result<( Arc, Web3ProxyJoinHandle<()>, @@ -149,7 +150,7 @@ impl Web3Rpcs { min_synced_rpcs: min_head_rpcs, min_sum_soft_limit, name, - pending_txid_firehose_sender, + pending_txid_firehose, watch_head_block: watch_consensus_head_sender, watch_ranked_rpcs: watch_consensus_rpcs_sender, }); @@ -234,7 +235,7 @@ impl Web3Rpcs { http_client, blocks_by_hash_cache, block_and_rpc_sender, - self.pending_txid_firehose_sender.clone(), + self.pending_txid_firehose.clone(), self.max_head_block_age, )); diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index e58d28ea..ba9da46c 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -10,6 +10,7 @@ use crate::jsonrpc::{self, JsonRpcParams, JsonRpcResultData}; use crate::rpcs::request::RequestErrorHandler; use anyhow::{anyhow, Context}; use arc_swap::ArcSwapOption; +use deduped_broadcast::DedupedBroadcaster; use ethers::prelude::{Address, Bytes, Middleware, Transaction, TxHash, U256, U64}; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -104,7 +105,7 @@ impl Web3Rpc { block_interval: Duration, block_map: BlocksByHashCache, block_and_rpc_sender: Option>, - pending_txid_firehose_sender: Option>, + pending_txid_firehose: Option>>, max_head_block_age: Duration, ) -> anyhow::Result<(Arc, Web3ProxyJoinHandle<()>)> { let created_at = Instant::now(); @@ -222,7 +223,7 @@ impl Web3Rpc { .subscribe_with_reconnect( block_map, block_and_rpc_sender, - pending_txid_firehose_sender, + pending_txid_firehose, chain_id, ) .await @@ -624,7 +625,7 @@ impl Web3Rpc { self: Arc, block_map: BlocksByHashCache, block_and_rpc_sender: Option>, - pending_txid_firehose_sender: Option>, + pending_txid_firehose: Option>>, chain_id: u64, ) -> Web3ProxyResult<()> { loop { @@ -633,7 +634,7 @@ impl Web3Rpc { .subscribe( block_map.clone(), block_and_rpc_sender.clone(), - pending_txid_firehose_sender.clone(), + pending_txid_firehose.clone(), chain_id, ) .await @@ -667,7 +668,7 @@ impl Web3Rpc { self: Arc, block_map: BlocksByHashCache, block_and_rpc_sender: Option>, - pending_txid_firehose_sender: Option>, + pending_txid_firehose: Option>>, chain_id: u64, ) -> Web3ProxyResult<()> { let error_handler = if self.backup { @@ -792,7 +793,7 @@ impl Web3Rpc { } // subscribe to new transactions - if let Some(pending_txid_firehose) = pending_txid_firehose_sender.clone() { + if let Some(pending_txid_firehose) = pending_txid_firehose.clone() { let clone = self.clone(); let subscribe_stop_rx = subscribe_stop_tx.subscribe(); @@ -829,7 +830,7 @@ impl Web3Rpc { async fn subscribe_new_transactions( self: &Arc, - pending_txid_firehose: mpsc::Sender, + pending_txid_firehose: Arc>, mut subscribe_stop_rx: watch::Receiver, ) -> Web3ProxyResult<()> { trace!("subscribing to new transactions on {}", self); @@ -846,28 +847,28 @@ impl Web3Rpc { } if let Some(ws_provider) = self.ws_provider.load().as_ref() { - // todo: move subscribe_blocks onto the request handle + // todo: move subscribe_blocks onto the request handle instead of having a seperate wait_for_throttle self.wait_for_throttle(Instant::now() + Duration::from_secs(5)) .await?; + // TODO: only subscribe if a user has subscribed let mut pending_txs_sub = ws_provider.subscribe_pending_txs().await?; while let Some(x) = pending_txs_sub.next().await { - if *subscribe_stop_rx.borrow_and_update() { + // TODO: check this less often + if *subscribe_stop_rx.borrow() { // TODO: this is checking way too often. have this on a timer instead trace!("stopping ws block subscription on {}", self); break; } // this should always work - if let Err(err) = pending_txid_firehose.try_send(x) { - error!( - ?err, - "pending_txid_firehose failed sending. it must be full" - ); - } + // todo!("this has a bug! it gets full very quickly when no one is subscribed!"); + pending_txid_firehose.send(x).await; } } else { + // only websockets subscribe to pending transactions + // its possibel to do with http, but not recommended // TODO: what should we do here? loop { if *subscribe_stop_rx.borrow_and_update() {