From 1fd8f6f383e3c4c067bc7feffc04810c19805104 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 13 Sep 2023 12:05:47 -0700 Subject: [PATCH] Deduped broadcast channel (#209) * upgrade and start adding deduped_broadcast * basic test * accept arg for cache ttl * first pass at bringing the tx firehose back * try_send instead of send since there aren't always receivers * deduped broadcaster needs the try send * track counts and include in /status * use config for enable subscribe_txs --- Cargo.lock | 42 +++++- Cargo.toml | 1 + README.md | 6 +- deduped_broadcast/Cargo.toml | 12 ++ deduped_broadcast/src/lib.rs | 210 ++++++++++++++++++++++++++++ web3_proxy/Cargo.toml | 3 +- web3_proxy/src/app/mod.rs | 13 +- web3_proxy/src/app/ws.rs | 192 +++++++++++++++++-------- web3_proxy/src/block_number.rs | 2 +- web3_proxy/src/config.rs | 2 + web3_proxy/src/frontend/status.rs | 1 + web3_proxy/src/rpcs/many.rs | 39 ++---- web3_proxy/src/rpcs/one.rs | 90 +++++++++++- web3_proxy/src/stats/stat_buffer.rs | 1 + 14 files changed, 511 insertions(+), 103 deletions(-) create mode 100644 deduped_broadcast/Cargo.toml create mode 100644 deduped_broadcast/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index e1945d28..6f18cf3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1522,6 +1522,16 @@ dependencies = [ "uuid 1.4.1", ] +[[package]] +name = "deduped_broadcast" +version = "0.1.0" +dependencies = [ + "moka", + "serde", + "tokio", + "tracing", +] + [[package]] name = "deferred-rate-limiter" version = "0.2.0" @@ -4351,7 +4361,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919" dependencies = [ "once_cell", - "toml_edit", + "toml_edit 0.19.15", ] [[package]] @@ -6666,7 +6676,19 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit", + "toml_edit 0.19.15", +] + +[[package]] +name = "toml" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c226a7bba6d859b63c92c4b4fe69c5b6b72d0cb897dbc8e6012298e6154cb56e" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit 0.20.0", ] [[package]] @@ -6691,6 +6713,19 @@ dependencies = [ "winnow", ] +[[package]] +name = "toml_edit" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ff63e60a958cefbb518ae1fd6566af80d9d4be430a33f3723dfc47d1d411d95" +dependencies = [ + "indexmap 2.0.0", + "serde", + "serde_spanned", + "toml_datetime", + "winnow", +] + [[package]] name = "tonic" version = "0.9.2" @@ -7280,6 +7315,7 @@ dependencies = [ "chrono", "console-subscriber", "counter", + "deduped_broadcast", "deferred-rate-limiter", "derivative", "derive_more", @@ -7334,7 +7370,7 @@ dependencies = [ "tokio", "tokio-console", "tokio-stream", - "toml 0.7.8", + "toml 0.8.0", "tower", "tower-http", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 4b3c5dd7..b1eabaae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [workspace] members = [ + "deduped_broadcast", "deferred-rate-limiter", "entities", "latency", diff --git a/README.md b/README.md index 4dfda9d8..d326414d 100644 --- a/README.md +++ b/README.md @@ -90,11 +90,7 @@ $ websocat ws://127.0.0.1:8544 {"jsonrpc": "2.0", "id": 1, "method": "eth_subscribe", "params": ["newHeads"]} -{"jsonrpc": "2.0", "id": 2, "method": "eth_subscribe", "params": ["newPendingTransactions"]} - -{"jsonrpc": "2.0", "id": 3, "method": "eth_subscribe", "params": ["newPendingFullTransactions"]} - -{"jsonrpc": "2.0", "id": 4, "method": "eth_subscribe", "params": ["newPendingRawTransactions"]} +{"jsonrpc": "2.0", "id": 1, "method": "eth_subscribe", "params": ["newPendingTransactions"]} ``` You can copy `config/example.toml` to `config/production-$CHAINNAME.toml` and then run `docker-compose up --build -d` start proxies for many chains. diff --git a/deduped_broadcast/Cargo.toml b/deduped_broadcast/Cargo.toml new file mode 100644 index 00000000..6924e1fb --- /dev/null +++ b/deduped_broadcast/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "deduped_broadcast" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tracing = "0.1" +moka = { version = "0.11.3", default-features = false, features = ["atomic64", "future", "parking_lot", "quanta", "triomphe"] } +serde = "1" +tokio = { version = "1.32.0", features = ["full"] } diff --git a/deduped_broadcast/src/lib.rs b/deduped_broadcast/src/lib.rs new file mode 100644 index 00000000..cf290caa --- /dev/null +++ b/deduped_broadcast/src/lib.rs @@ -0,0 +1,210 @@ +use moka::future::{Cache, CacheBuilder}; +use serde::ser::SerializeStruct; +use serde::{Serialize, Serializer}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use std::{ + collections::hash_map::DefaultHasher, + fmt::Debug, + hash::{Hash, Hasher}, +}; +use tokio::sync::{broadcast, mpsc}; + +struct DedupedBroadcasterTask +where + T: Clone + Debug + Hash + Send + Sync + 'static, +{ + unfiltered_rx: mpsc::Receiver, + broadcast_filtered_tx: Arc>, + cache: Cache, + total_unfiltered: Arc, + total_filtered: Arc, + total_broadcasts: Arc, +} + +pub struct DedupedBroadcaster +where + T: Clone + Debug + Hash + Send + Sync + 'static, +{ + /// takes in things to broadcast. Can include duplicates, but only one will be sent. + unfiltered_tx: mpsc::Sender, + /// subscribe to this to get deduplicated items + broadcast_filtered_tx: Arc>, + total_unfiltered: Arc, + total_filtered: Arc, + total_broadcasts: Arc, +} + +impl DedupedBroadcasterTask +where + T: Clone + Debug + Hash + Send + Sync + 'static, +{ + /// forward things from input_receiver to output_sender if they aren't in the cache + async fn run(mut self) { + while let Some(item) = self.unfiltered_rx.recv().await { + let mut hasher = DefaultHasher::new(); + item.hash(&mut hasher); + let hashed = hasher.finish(); + + self.total_unfiltered + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + // we don't actually care what the return value is. we just want to send only if the cache is empty + // TODO: count new vs unique + self.cache + .get_with(hashed, async { + self.total_filtered + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + if let Ok(x) = self.broadcast_filtered_tx.send(item) { + self.total_broadcasts.fetch_add(x, Ordering::Relaxed); + } + }) + .await; + } + } +} + +impl DedupedBroadcaster +where + T: Clone + Debug + Hash + Send + Sync + 'static, +{ + pub fn new(capacity: usize, cache_capacity: u64, cache_ttl: Option) -> Self { + let (unfiltered_tx, unfiltered_rx) = mpsc::channel::(capacity); + let (broadcast_filtered_tx, _) = broadcast::channel(capacity); + + let mut cache = CacheBuilder::new(cache_capacity); + + if let Some(cache_ttl) = cache_ttl { + cache = cache.time_to_live(cache_ttl); + } + + let cache = cache.build(); + + let broadcast_filtered_tx = Arc::new(broadcast_filtered_tx); + + let total_unfiltered = Arc::new(AtomicUsize::new(0)); + let total_filtered = Arc::new(AtomicUsize::new(0)); + let total_broadcasts = Arc::new(AtomicUsize::new(0)); + + let task = DedupedBroadcasterTask { + unfiltered_rx, + broadcast_filtered_tx: broadcast_filtered_tx.clone(), + cache, + total_unfiltered: total_unfiltered.clone(), + total_filtered: total_filtered.clone(), + total_broadcasts: total_broadcasts.clone(), + }; + + // TODO: do something with the handle? + tokio::task::spawn(task.run()); + + Self { + unfiltered_tx, + broadcast_filtered_tx, + total_unfiltered, + total_filtered, + total_broadcasts, + } + } + + pub fn sender(&self) -> &mpsc::Sender { + &self.unfiltered_tx + } + + pub fn subscribe(&self) -> broadcast::Receiver { + self.broadcast_filtered_tx.subscribe() + } +} + +impl Debug for DedupedBroadcaster +where + T: Clone + Debug + Hash + Send + Sync + 'static, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DedupedBroadcaster") + .field( + "total_unfiltered", + &self.total_unfiltered.load(Ordering::Relaxed), + ) + .field( + "total_filtered", + &self.total_filtered.load(Ordering::Relaxed), + ) + .field( + "total_broadcasts", + &self.total_broadcasts.load(Ordering::Relaxed), + ) + .field( + "subscriptions", + &self.broadcast_filtered_tx.receiver_count(), + ) + .finish_non_exhaustive() + } +} + +impl Serialize for DedupedBroadcaster +where + T: Clone + Debug + Hash + Send + Sync + 'static, +{ + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("DedupedBroadcaster", 4)?; + + state.serialize_field( + "total_unfiltered", + &self.total_unfiltered.load(Ordering::Relaxed), + )?; + state.serialize_field( + "total_filtered", + &self.total_filtered.load(Ordering::Relaxed), + )?; + state.serialize_field( + "total_broadcasts", + &self.total_broadcasts.load(Ordering::Relaxed), + )?; + state.serialize_field( + "subscriptions", + &self.broadcast_filtered_tx.receiver_count(), + )?; + + state.end() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::task::yield_now; + + #[tokio::test] + async fn test_deduped_broadcaster() { + let broadcaster = DedupedBroadcaster::new(10, 10, None); + + let mut receiver_1 = broadcaster.subscribe(); + let _receiver_2 = broadcaster.subscribe(); + + broadcaster.sender().send(1).await.unwrap(); + broadcaster.sender().send(1).await.unwrap(); + broadcaster.sender().send(2).await.unwrap(); + broadcaster.sender().send(1).await.unwrap(); + broadcaster.sender().send(2).await.unwrap(); + broadcaster.sender().send(3).await.unwrap(); + broadcaster.sender().send(3).await.unwrap(); + + yield_now().await; + + assert_eq!(receiver_1.recv().await.unwrap(), 1); + assert_eq!(receiver_1.recv().await.unwrap(), 2); + assert_eq!(receiver_1.recv().await.unwrap(), 3); + + yield_now().await; + + assert_eq!(broadcaster.total_unfiltered.load(Ordering::Relaxed), 7); + assert_eq!(broadcaster.total_filtered.load(Ordering::Relaxed), 3); + assert_eq!(broadcaster.total_broadcasts.load(Ordering::Relaxed), 6); + } +} diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 731d9970..00ca220e 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -14,6 +14,7 @@ tests-needing-docker = [] tokio-console = ["dep:tokio-console", "dep:console-subscriber"] [dependencies] +deduped_broadcast = { path = "../deduped_broadcast" } deferred-rate-limiter = { path = "../deferred-rate-limiter" } entities = { path = "../entities" } latency = { path = "../latency" } @@ -90,7 +91,7 @@ time_03 = { package = "time", version = "0.3" } tokio = { version = "1.32.0", features = ["full", "tracing"] } tokio-console = { version = "0.1.9", optional = true } tokio-stream = { version = "0.1.14", features = ["sync"] } -toml = "0.7.8" +toml = "0.8.0" tower = { version = "0.4.13", features = ["timeout", "tracing"] } tower-http = { version = "0.4.4", features = ["cors", "normalize-path", "sensitive-headers", "trace"] } tracing = "0.1" diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index ee5e981b..408fc9fd 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -26,11 +26,11 @@ use crate::stats::{AppStat, FlushedStats, StatBuffer}; use anyhow::Context; use axum::http::StatusCode; use chrono::Utc; +use deduped_broadcast::DedupedBroadcaster; use deferred_rate_limiter::DeferredRateLimiter; use entities::user; use ethers::core::utils::keccak256; -use ethers::prelude::{Address, Bytes, Transaction, H256, U64}; -use ethers::types::U256; +use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U256, U64}; use ethers::utils::rlp::{Decodable, Rlp}; use futures::future::join_all; use futures::stream::{FuturesUnordered, StreamExt}; @@ -88,6 +88,8 @@ pub struct Web3ProxyApp { /// don't drop this or the sender will stop working /// TODO: broadcast channel instead? pub watch_consensus_head_receiver: watch::Receiver>, + /// rpc clients that subscribe to newPendingTransactions use this channel + pub pending_txid_firehose: deduped_broadcast::DedupedBroadcaster, pub hostname: Option, pub frontend_port: Arc, /// rate limit anonymous users @@ -407,6 +409,9 @@ impl Web3ProxyApp { let chain_id = top_config.app.chain_id; + let deduped_txid_firehose = + DedupedBroadcaster::new(10_000, 10_000, Some(Duration::from_secs(5 * 60))); + // TODO: remove this. it should only be done by apply_top_config let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn( chain_id, @@ -415,6 +420,7 @@ impl Web3ProxyApp { top_config.app.min_sum_soft_limit, "balanced rpcs".into(), Some(watch_consensus_head_sender), + Some(deduped_txid_firehose.sender().clone()), ) .await .web3_context("spawning balanced rpcs")?; @@ -441,6 +447,7 @@ impl Web3ProxyApp { // however, they are well connected to miners/validators. so maybe using them as a safety check would be good // TODO: but maybe we could include privates in the "backup" tier None, + None, ) .await .web3_context("spawning private_rpcs")?; @@ -466,6 +473,7 @@ impl Web3ProxyApp { 0, "eip4337 rpcs".into(), None, + None, ) .await .web3_context("spawning bundler_4337_rpcs")?; @@ -483,6 +491,7 @@ impl Web3ProxyApp { balanced_rpcs, bundler_4337_rpcs, config: top_config.app.clone(), + pending_txid_firehose: deduped_txid_firehose, frontend_port: frontend_port.clone(), frontend_ip_rate_limiter, frontend_registered_user_rate_limiter, diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index 5ecc2d9b..e824c464 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -18,6 +18,7 @@ use std::sync::atomic::{self, AtomicU64}; use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::Instant; +use tokio_stream::wrappers::BroadcastStream; use tokio_stream::wrappers::WatchStream; use tracing::{error, trace}; @@ -56,79 +57,150 @@ impl Web3ProxyApp { Web3ProxyError::BadRequest("unable to subscribe using these params".into()) })?; - // TODO: calling json! on every request is probably not fast. but we can only match against + // TODO: calling `json!` on every request is probably not fast. but it works for now // TODO: i think we need a stricter EthSubscribeRequest type that JsonRpcRequest can turn into - if subscribe_to == "newHeads" { - let head_block_receiver = self.watch_consensus_head_receiver.clone(); - let app = self.clone(); + // TODO: DRY This up. lots of duplication between newHeads and newPendingTransactions + match subscribe_to { + "newHeads" => { + let head_block_receiver = self.watch_consensus_head_receiver.clone(); + let app = self.clone(); - tokio::spawn(async move { - let mut head_block_receiver = Abortable::new( - WatchStream::new(head_block_receiver), - subscription_registration, - ); + tokio::spawn(async move { + trace!("newHeads subscription {:?}", subscription_id); - while let Some(new_head) = head_block_receiver.next().await { - let new_head = if let Some(new_head) = new_head { - new_head - } else { - continue; - }; + let mut head_block_receiver = Abortable::new( + WatchStream::new(head_block_receiver), + subscription_registration, + ); - let subscription_request_metadata = RequestMetadata::new( - &app, - authorization.clone(), - RequestOrMethod::Method("eth_subscribe(newHeads)", 0), - Some(&new_head), - ) - .await; + while let Some(new_head) = head_block_receiver.next().await { + let new_head = if let Some(new_head) = new_head { + new_head + } else { + continue; + }; - if let Some(close_message) = app - .rate_limit_close_websocket(&subscription_request_metadata) - .await - { - let _ = response_sender.send(close_message).await; - break; + let subscription_request_metadata = RequestMetadata::new( + &app, + authorization.clone(), + RequestOrMethod::Method("eth_subscribe(newHeads)", 0), + Some(&new_head), + ) + .await; + + if let Some(close_message) = app + .rate_limit_close_websocket(&subscription_request_metadata) + .await + { + let _ = response_sender.send(close_message).await; + break; + } + + // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id + let response_json = json!({ + "jsonrpc": "2.0", + "method":"eth_subscription", + "params": { + "subscription": subscription_id, + // TODO: option to include full transaction objects instead of just the hashes? + "result": new_head.block, + }, + }); + + let response_str = serde_json::to_string(&response_json) + .expect("this should always be valid json"); + + // we could use JsonRpcForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier + let response_bytes = response_str.len(); + + // TODO: do clients support binary messages? + // TODO: can we check a content type header? + let response_msg = Message::Text(response_str); + + if response_sender.send(response_msg).await.is_err() { + // TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects. + // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? + break; + }; + + subscription_request_metadata.add_response(response_bytes); } - // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id - let response_json = json!({ - "jsonrpc": "2.0", - "method":"eth_subscription", - "params": { - "subscription": subscription_id, - // TODO: option to include full transaction objects instead of just the hashes? - "result": new_head.block, - }, - }); + trace!("closed newHeads subscription {:?}", subscription_id); + }); + } + // TODO: bring back the other custom subscription types that had the full transaction object + "newPendingTransactions" => { + let pending_txid_firehose = self.pending_txid_firehose.subscribe(); + let app = self.clone(); - let response_str = serde_json::to_string(&response_json) - .expect("this should always be valid json"); + tokio::spawn(async move { + let mut pending_txid_firehose = Abortable::new( + BroadcastStream::new(pending_txid_firehose), + subscription_registration, + ); - // we could use JsonRpcForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier - let response_bytes = response_str.len(); + while let Some(Ok(new_txid)) = pending_txid_firehose.next().await { + // TODO: include the head_block here? + let subscription_request_metadata = RequestMetadata::new( + &app, + authorization.clone(), + RequestOrMethod::Method("eth_subscribe(newPendingTransactions)", 0), + None, + ) + .await; - // TODO: do clients support binary messages? - // TODO: can we check a content type header? - let response_msg = Message::Text(response_str); + // check if we should close the websocket connection + if let Some(close_message) = app + .rate_limit_close_websocket(&subscription_request_metadata) + .await + { + let _ = response_sender.send(close_message).await; + break; + } - if response_sender.send(response_msg).await.is_err() { - // TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects. - // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? - break; - }; + // TODO: make a struct/helper function for this + let response_json = json!({ + "jsonrpc": "2.0", + "method":"eth_subscription", + "params": { + "subscription": subscription_id, + "result": new_txid, + }, + }); - subscription_request_metadata.add_response(response_bytes); - } + let response_str = serde_json::to_string(&response_json) + .expect("this should always be valid json"); - trace!("closed newHeads subscription {:?}", subscription_id); - }); - } else { - // TODO: make sure this gets a CU cost of unimplemented instead of the normal eth_subscribe cost? - return Err(Web3ProxyError::MethodNotFound( - subscribe_to.to_owned().into(), - )); - } + // we could use JsonRpcForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier + let response_bytes = response_str.len(); + + subscription_request_metadata.add_response(response_bytes); + + // TODO: do clients support binary messages? + // TODO: can we check a content type header? + let response_msg = Message::Text(response_str); + + if response_sender.send(response_msg).await.is_err() { + // TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects. + // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? + break; + }; + } + + trace!( + "closed newPendingTransactions subscription {:?}", + subscription_id + ); + }); + } + _ => { + // TODO: make sure this gets a CU cost of unimplemented instead of the normal eth_subscribe cost? + return Err(Web3ProxyError::MethodNotFound( + subscribe_to.to_owned().into(), + )); + } + }; // TODO: do something with subscription_join_handle? diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index 2ec96eb6..eabca1a6 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -437,7 +437,7 @@ mod test { let head_block = Web3ProxyBlock::try_new(Arc::new(head_block)).unwrap(); let (empty, _handle, _ranked_rpc_reciver) = - Web3Rpcs::spawn(1, None, 1, 1, "test".into(), None) + Web3Rpcs::spawn(1, None, 1, 1, "test".into(), None, None) .await .unwrap(); diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index f72d60c3..67c704a2 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -396,6 +396,7 @@ impl Web3RpcConfig { http_client: Option, blocks_by_hash_cache: BlocksByHashCache, block_and_rpc_sender: Option>, + pending_txid_firehouse_sender: Option>, max_head_block_age: Duration, ) -> anyhow::Result<(Arc, Web3ProxyJoinHandle<()>)> { if !self.extra.is_empty() { @@ -413,6 +414,7 @@ impl Web3RpcConfig { block_interval, blocks_by_hash_cache, block_and_rpc_sender, + pending_txid_firehouse_sender, max_head_block_age, ) .await diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index e9d3dba9..f6467859 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -207,6 +207,7 @@ async fn _status(app: Arc) -> (StatusCode, &'static str, Bytes) { "head_block_hash": head_block.as_ref().map(|x| x.hash()), "hostname": app.hostname, "payment_factory_address": app.config.deposit_factory_contract, + "pending_txid_firehose": app.pending_txid_firehose, "private_rpcs": app.private_rpcs, "version": APP_USER_AGENT, }); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 99f76b47..e5f773e1 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -12,7 +12,7 @@ use crate::frontend::status::MokaCacheSerializer; use crate::jsonrpc::{JsonRpcErrorData, JsonRpcParams, JsonRpcResultData}; use counter::Counter; use derive_more::From; -use ethers::prelude::U64; +use ethers::prelude::{TxHash, U64}; use futures::future::try_join_all; use futures::stream::FuturesUnordered; use futures::{StreamExt, TryFutureExt}; @@ -67,6 +67,8 @@ pub struct Web3Rpcs { /// how old our consensus head block we can be before we stop serving requests /// calculated based on max_head_block_lag and averge block times pub(super) max_head_block_age: Duration, + /// all of the pending txids for all of the rpcs. this still has duplicates + pub(super) pending_txid_firehose_sender: Option>, } impl Web3Rpcs { @@ -78,6 +80,7 @@ impl Web3Rpcs { min_sum_soft_limit: u32, name: Cow<'static, str>, watch_consensus_head_sender: Option>>, + pending_txid_firehose_sender: Option>, ) -> anyhow::Result<( Arc, Web3ProxyJoinHandle<()>, @@ -124,6 +127,7 @@ impl Web3Rpcs { min_synced_rpcs: min_head_rpcs, min_sum_soft_limit, name, + pending_txid_firehose_sender, watch_head_block: watch_consensus_head_sender, watch_ranked_rpcs: watch_consensus_rpcs_sender, }); @@ -187,7 +191,7 @@ impl Web3Rpcs { let http_client = app.http_client.clone(); let vredis_pool = app.vredis_pool.clone(); - let block_sender = if self.watch_head_block.is_some() { + let block_and_rpc_sender = if self.watch_head_block.is_some() { Some(self.block_sender.clone()) } else { None @@ -207,7 +211,8 @@ impl Web3Rpcs { block_interval, http_client, blocks_by_hash_cache, - block_sender, + block_and_rpc_sender, + self.pending_txid_firehose_sender.clone(), self.max_head_block_age, )); @@ -318,30 +323,7 @@ impl Web3Rpcs { ) -> Web3ProxyResult<()> { let mut futures = vec![]; - // // setup the transaction funnel - // // it skips any duplicates (unless they are being orphaned) - // // fetches new transactions from the notifying rpc - // // forwards new transacitons to pending_tx_receipt_sender - // if let Some(pending_tx_sender) = pending_tx_sender.clone() { - // let clone = self.clone(); - // let handle = tokio::task::spawn(async move { - // // TODO: set up this future the same as the block funnel - // while let Some((pending_tx_id, rpc)) = - // clone.pending_tx_id_receiver.write().await.recv().await - // { - // let f = clone.clone().process_incoming_tx_id( - // rpc, - // pending_tx_id, - // pending_tx_sender.clone(), - // ); - // tokio::spawn(f); - // } - - // Ok(()) - // }); - - // futures.push(flatten_handle(handle)); - // } + // TODO: do we need anything here to set up the transaction funnel // setup the block funnel if self.watch_head_block.is_some() { @@ -1546,6 +1528,7 @@ mod tests { max_head_block_age: Duration::from_secs(60), // TODO: test max_head_block_lag? max_head_block_lag: 5.into(), + pending_txid_firehose_sender: None, min_synced_rpcs: 1, min_sum_soft_limit: 1, }; @@ -1795,6 +1778,7 @@ mod tests { min_sum_soft_limit: 4_000, min_synced_rpcs: 1, name: "test".into(), + pending_txid_firehose_sender: None, watch_head_block: Some(watch_consensus_head_sender), watch_ranked_rpcs, }; @@ -1959,6 +1943,7 @@ mod tests { min_sum_soft_limit: 1_000, min_synced_rpcs: 1, name: "test".into(), + pending_txid_firehose_sender: None, watch_head_block: Some(watch_consensus_head_sender), watch_ranked_rpcs, }; diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 1313b31d..464a28fc 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -10,8 +10,7 @@ use crate::jsonrpc::{JsonRpcParams, JsonRpcResultData}; use crate::rpcs::request::RequestErrorHandler; use anyhow::{anyhow, Context}; use arc_swap::ArcSwapOption; -use ethers::prelude::{Bytes, Middleware, U64}; -use ethers::types::{Address, Transaction, U256}; +use ethers::prelude::{Address, Bytes, Middleware, Transaction, TxHash, U256, U64}; use futures::stream::FuturesUnordered; use futures::StreamExt; use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency}; @@ -39,6 +38,7 @@ pub struct Web3Rpc { pub block_interval: Duration, pub display_name: Option, pub db_conn: Option, + pub subscribe_txs: bool, /// most all requests prefer use the http_provider pub(super) http_provider: Option, /// the websocket url is only used for subscriptions @@ -100,6 +100,7 @@ impl Web3Rpc { block_interval: Duration, block_map: BlocksByHashCache, block_and_rpc_sender: Option>, + pending_txid_firehose_sender: Option>, max_head_block_age: Duration, ) -> anyhow::Result<(Arc, Web3ProxyJoinHandle<()>)> { let created_at = Instant::now(); @@ -200,6 +201,7 @@ impl Web3Rpc { peak_latency: Some(peak_latency), median_latency: Some(median_request_latency), soft_limit: config.soft_limit, + subscribe_txs: config.subscribe_txs, ws_url, disconnect_watch: Some(disconnect_watch), ..Default::default() @@ -213,7 +215,12 @@ impl Web3Rpc { let new_connection = new_connection.clone(); tokio::spawn(async move { new_connection - .subscribe_with_reconnect(block_map, block_and_rpc_sender, chain_id) + .subscribe_with_reconnect( + block_map, + block_and_rpc_sender, + pending_txid_firehose_sender, + chain_id, + ) .await }) }; @@ -588,12 +595,18 @@ impl Web3Rpc { self: Arc, block_map: BlocksByHashCache, block_and_rpc_sender: Option>, + pending_txid_firehose_sender: Option>, chain_id: u64, ) -> Web3ProxyResult<()> { loop { if let Err(err) = self .clone() - .subscribe(block_map.clone(), block_and_rpc_sender.clone(), chain_id) + .subscribe( + block_map.clone(), + block_and_rpc_sender.clone(), + pending_txid_firehose_sender.clone(), + chain_id, + ) .await { if self.should_disconnect() { @@ -625,6 +638,7 @@ impl Web3Rpc { self: Arc, block_map: BlocksByHashCache, block_and_rpc_sender: Option>, + pending_txid_firehose_sender: Option>, chain_id: u64, ) -> Web3ProxyResult<()> { let error_handler = if self.backup { @@ -750,6 +764,20 @@ impl Web3Rpc { futures.push(flatten_handle(tokio::spawn(f))); } + // subscribe to new transactions + if let Some(pending_txid_firehose) = pending_txid_firehose_sender.clone() { + let clone = self.clone(); + let subscribe_stop_rx = subscribe_stop_tx.subscribe(); + + let f = async move { + clone + .subscribe_new_transactions(pending_txid_firehose, subscribe_stop_rx) + .await + }; + + futures.push(flatten_handle(tokio::spawn(f))); + } + // exit if any of the futures exit // TODO: have an enum for which one exited? let first_exit = futures.next().await; @@ -772,6 +800,60 @@ impl Web3Rpc { Ok(()) } + async fn subscribe_new_transactions( + self: &Arc, + pending_txid_firehose: mpsc::Sender, + mut subscribe_stop_rx: watch::Receiver, + ) -> Web3ProxyResult<()> { + trace!("subscribing to new transactions on {}", self); + + // rpcs opt-into subscribing to transactions. its a lot of bandwidth + if !self.subscribe_txs { + loop { + if *subscribe_stop_rx.borrow() { + trace!("stopping ws block subscription on {}", self); + return Ok(()); + } + subscribe_stop_rx.changed().await?; + } + } + + if let Some(ws_provider) = self.ws_provider.load().as_ref() { + // todo: move subscribe_blocks onto the request handle + let authorization = Default::default(); + + let error_handler = Some(Level::ERROR.into()); + + let active_request_handle = self + .wait_for_request_handle(&authorization, None, error_handler) + .await; + + let mut pending_txs_sub = ws_provider.subscribe_pending_txs().await?; + + drop(active_request_handle); + + while let Some(x) = pending_txs_sub.next().await { + if *subscribe_stop_rx.borrow() { + // TODO: this is checking way too often. have this on a timer instead + trace!("stopping ws block subscription on {}", self); + break; + } + + // this should always work + if let Err(err) = pending_txid_firehose.try_send(x) { + error!( + ?err, + "pending_txid_firehose failed sending. it must be full" + ); + } + } + } else { + unimplemented!(); + } + + Ok(()) + } + /// Subscribe to new blocks. async fn subscribe_new_heads( self: &Arc, diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index 7127bfde..bae4ecbb 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -198,6 +198,7 @@ impl StatBuffer { } } + // TODO: wait on the frontend to shutdown // TODO: wait on all websockets to close // TODO: wait on all pending external requests to finish info!("waiting 5 seconds for remaining stats to arrive");