Deduped broadcast channel (#209)

* upgrade and start adding deduped_broadcast

* basic test

* accept arg for cache ttl

* first pass at bringing the tx firehose back

* try_send instead of send since there aren't always receivers

* deduped broadcaster needs the try send

* track counts and include in /status

* use config for enable subscribe_txs
This commit is contained in:
Bryan Stitt 2023-09-13 12:05:47 -07:00 committed by GitHub
parent 4d0149346d
commit 1fd8f6f383
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 511 additions and 103 deletions

42
Cargo.lock generated

@ -1522,6 +1522,16 @@ dependencies = [
"uuid 1.4.1",
]
[[package]]
name = "deduped_broadcast"
version = "0.1.0"
dependencies = [
"moka",
"serde",
"tokio",
"tracing",
]
[[package]]
name = "deferred-rate-limiter"
version = "0.2.0"
@ -4351,7 +4361,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919"
dependencies = [
"once_cell",
"toml_edit",
"toml_edit 0.19.15",
]
[[package]]
@ -6666,7 +6676,19 @@ dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit",
"toml_edit 0.19.15",
]
[[package]]
name = "toml"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c226a7bba6d859b63c92c4b4fe69c5b6b72d0cb897dbc8e6012298e6154cb56e"
dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit 0.20.0",
]
[[package]]
@ -6691,6 +6713,19 @@ dependencies = [
"winnow",
]
[[package]]
name = "toml_edit"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ff63e60a958cefbb518ae1fd6566af80d9d4be430a33f3723dfc47d1d411d95"
dependencies = [
"indexmap 2.0.0",
"serde",
"serde_spanned",
"toml_datetime",
"winnow",
]
[[package]]
name = "tonic"
version = "0.9.2"
@ -7280,6 +7315,7 @@ dependencies = [
"chrono",
"console-subscriber",
"counter",
"deduped_broadcast",
"deferred-rate-limiter",
"derivative",
"derive_more",
@ -7334,7 +7370,7 @@ dependencies = [
"tokio",
"tokio-console",
"tokio-stream",
"toml 0.7.8",
"toml 0.8.0",
"tower",
"tower-http",
"tracing",

@ -1,5 +1,6 @@
[workspace]
members = [
"deduped_broadcast",
"deferred-rate-limiter",
"entities",
"latency",

@ -90,11 +90,7 @@ $ websocat ws://127.0.0.1:8544
{"jsonrpc": "2.0", "id": 1, "method": "eth_subscribe", "params": ["newHeads"]}
{"jsonrpc": "2.0", "id": 2, "method": "eth_subscribe", "params": ["newPendingTransactions"]}
{"jsonrpc": "2.0", "id": 3, "method": "eth_subscribe", "params": ["newPendingFullTransactions"]}
{"jsonrpc": "2.0", "id": 4, "method": "eth_subscribe", "params": ["newPendingRawTransactions"]}
{"jsonrpc": "2.0", "id": 1, "method": "eth_subscribe", "params": ["newPendingTransactions"]}
```
You can copy `config/example.toml` to `config/production-$CHAINNAME.toml` and then run `docker-compose up --build -d` start proxies for many chains.

@ -0,0 +1,12 @@
[package]
name = "deduped_broadcast"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tracing = "0.1"
moka = { version = "0.11.3", default-features = false, features = ["atomic64", "future", "parking_lot", "quanta", "triomphe"] }
serde = "1"
tokio = { version = "1.32.0", features = ["full"] }

@ -0,0 +1,210 @@
use moka::future::{Cache, CacheBuilder};
use serde::ser::SerializeStruct;
use serde::{Serialize, Serializer};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::{
collections::hash_map::DefaultHasher,
fmt::Debug,
hash::{Hash, Hasher},
};
use tokio::sync::{broadcast, mpsc};
struct DedupedBroadcasterTask<T>
where
T: Clone + Debug + Hash + Send + Sync + 'static,
{
unfiltered_rx: mpsc::Receiver<T>,
broadcast_filtered_tx: Arc<broadcast::Sender<T>>,
cache: Cache<u64, ()>,
total_unfiltered: Arc<AtomicUsize>,
total_filtered: Arc<AtomicUsize>,
total_broadcasts: Arc<AtomicUsize>,
}
pub struct DedupedBroadcaster<T>
where
T: Clone + Debug + Hash + Send + Sync + 'static,
{
/// takes in things to broadcast. Can include duplicates, but only one will be sent.
unfiltered_tx: mpsc::Sender<T>,
/// subscribe to this to get deduplicated items
broadcast_filtered_tx: Arc<broadcast::Sender<T>>,
total_unfiltered: Arc<AtomicUsize>,
total_filtered: Arc<AtomicUsize>,
total_broadcasts: Arc<AtomicUsize>,
}
impl<T> DedupedBroadcasterTask<T>
where
T: Clone + Debug + Hash + Send + Sync + 'static,
{
/// forward things from input_receiver to output_sender if they aren't in the cache
async fn run(mut self) {
while let Some(item) = self.unfiltered_rx.recv().await {
let mut hasher = DefaultHasher::new();
item.hash(&mut hasher);
let hashed = hasher.finish();
self.total_unfiltered
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// we don't actually care what the return value is. we just want to send only if the cache is empty
// TODO: count new vs unique
self.cache
.get_with(hashed, async {
self.total_filtered
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if let Ok(x) = self.broadcast_filtered_tx.send(item) {
self.total_broadcasts.fetch_add(x, Ordering::Relaxed);
}
})
.await;
}
}
}
impl<T> DedupedBroadcaster<T>
where
T: Clone + Debug + Hash + Send + Sync + 'static,
{
pub fn new(capacity: usize, cache_capacity: u64, cache_ttl: Option<Duration>) -> Self {
let (unfiltered_tx, unfiltered_rx) = mpsc::channel::<T>(capacity);
let (broadcast_filtered_tx, _) = broadcast::channel(capacity);
let mut cache = CacheBuilder::new(cache_capacity);
if let Some(cache_ttl) = cache_ttl {
cache = cache.time_to_live(cache_ttl);
}
let cache = cache.build();
let broadcast_filtered_tx = Arc::new(broadcast_filtered_tx);
let total_unfiltered = Arc::new(AtomicUsize::new(0));
let total_filtered = Arc::new(AtomicUsize::new(0));
let total_broadcasts = Arc::new(AtomicUsize::new(0));
let task = DedupedBroadcasterTask {
unfiltered_rx,
broadcast_filtered_tx: broadcast_filtered_tx.clone(),
cache,
total_unfiltered: total_unfiltered.clone(),
total_filtered: total_filtered.clone(),
total_broadcasts: total_broadcasts.clone(),
};
// TODO: do something with the handle?
tokio::task::spawn(task.run());
Self {
unfiltered_tx,
broadcast_filtered_tx,
total_unfiltered,
total_filtered,
total_broadcasts,
}
}
pub fn sender(&self) -> &mpsc::Sender<T> {
&self.unfiltered_tx
}
pub fn subscribe(&self) -> broadcast::Receiver<T> {
self.broadcast_filtered_tx.subscribe()
}
}
impl<T> Debug for DedupedBroadcaster<T>
where
T: Clone + Debug + Hash + Send + Sync + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DedupedBroadcaster")
.field(
"total_unfiltered",
&self.total_unfiltered.load(Ordering::Relaxed),
)
.field(
"total_filtered",
&self.total_filtered.load(Ordering::Relaxed),
)
.field(
"total_broadcasts",
&self.total_broadcasts.load(Ordering::Relaxed),
)
.field(
"subscriptions",
&self.broadcast_filtered_tx.receiver_count(),
)
.finish_non_exhaustive()
}
}
impl<T> Serialize for DedupedBroadcaster<T>
where
T: Clone + Debug + Hash + Send + Sync + 'static,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("DedupedBroadcaster", 4)?;
state.serialize_field(
"total_unfiltered",
&self.total_unfiltered.load(Ordering::Relaxed),
)?;
state.serialize_field(
"total_filtered",
&self.total_filtered.load(Ordering::Relaxed),
)?;
state.serialize_field(
"total_broadcasts",
&self.total_broadcasts.load(Ordering::Relaxed),
)?;
state.serialize_field(
"subscriptions",
&self.broadcast_filtered_tx.receiver_count(),
)?;
state.end()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::task::yield_now;
#[tokio::test]
async fn test_deduped_broadcaster() {
let broadcaster = DedupedBroadcaster::new(10, 10, None);
let mut receiver_1 = broadcaster.subscribe();
let _receiver_2 = broadcaster.subscribe();
broadcaster.sender().send(1).await.unwrap();
broadcaster.sender().send(1).await.unwrap();
broadcaster.sender().send(2).await.unwrap();
broadcaster.sender().send(1).await.unwrap();
broadcaster.sender().send(2).await.unwrap();
broadcaster.sender().send(3).await.unwrap();
broadcaster.sender().send(3).await.unwrap();
yield_now().await;
assert_eq!(receiver_1.recv().await.unwrap(), 1);
assert_eq!(receiver_1.recv().await.unwrap(), 2);
assert_eq!(receiver_1.recv().await.unwrap(), 3);
yield_now().await;
assert_eq!(broadcaster.total_unfiltered.load(Ordering::Relaxed), 7);
assert_eq!(broadcaster.total_filtered.load(Ordering::Relaxed), 3);
assert_eq!(broadcaster.total_broadcasts.load(Ordering::Relaxed), 6);
}
}

@ -14,6 +14,7 @@ tests-needing-docker = []
tokio-console = ["dep:tokio-console", "dep:console-subscriber"]
[dependencies]
deduped_broadcast = { path = "../deduped_broadcast" }
deferred-rate-limiter = { path = "../deferred-rate-limiter" }
entities = { path = "../entities" }
latency = { path = "../latency" }
@ -90,7 +91,7 @@ time_03 = { package = "time", version = "0.3" }
tokio = { version = "1.32.0", features = ["full", "tracing"] }
tokio-console = { version = "0.1.9", optional = true }
tokio-stream = { version = "0.1.14", features = ["sync"] }
toml = "0.7.8"
toml = "0.8.0"
tower = { version = "0.4.13", features = ["timeout", "tracing"] }
tower-http = { version = "0.4.4", features = ["cors", "normalize-path", "sensitive-headers", "trace"] }
tracing = "0.1"

@ -26,11 +26,11 @@ use crate::stats::{AppStat, FlushedStats, StatBuffer};
use anyhow::Context;
use axum::http::StatusCode;
use chrono::Utc;
use deduped_broadcast::DedupedBroadcaster;
use deferred_rate_limiter::DeferredRateLimiter;
use entities::user;
use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Bytes, Transaction, H256, U64};
use ethers::types::U256;
use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U256, U64};
use ethers::utils::rlp::{Decodable, Rlp};
use futures::future::join_all;
use futures::stream::{FuturesUnordered, StreamExt};
@ -88,6 +88,8 @@ pub struct Web3ProxyApp {
/// don't drop this or the sender will stop working
/// TODO: broadcast channel instead?
pub watch_consensus_head_receiver: watch::Receiver<Option<Web3ProxyBlock>>,
/// rpc clients that subscribe to newPendingTransactions use this channel
pub pending_txid_firehose: deduped_broadcast::DedupedBroadcaster<TxHash>,
pub hostname: Option<String>,
pub frontend_port: Arc<AtomicU16>,
/// rate limit anonymous users
@ -407,6 +409,9 @@ impl Web3ProxyApp {
let chain_id = top_config.app.chain_id;
let deduped_txid_firehose =
DedupedBroadcaster::new(10_000, 10_000, Some(Duration::from_secs(5 * 60)));
// TODO: remove this. it should only be done by apply_top_config
let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn(
chain_id,
@ -415,6 +420,7 @@ impl Web3ProxyApp {
top_config.app.min_sum_soft_limit,
"balanced rpcs".into(),
Some(watch_consensus_head_sender),
Some(deduped_txid_firehose.sender().clone()),
)
.await
.web3_context("spawning balanced rpcs")?;
@ -441,6 +447,7 @@ impl Web3ProxyApp {
// however, they are well connected to miners/validators. so maybe using them as a safety check would be good
// TODO: but maybe we could include privates in the "backup" tier
None,
None,
)
.await
.web3_context("spawning private_rpcs")?;
@ -466,6 +473,7 @@ impl Web3ProxyApp {
0,
"eip4337 rpcs".into(),
None,
None,
)
.await
.web3_context("spawning bundler_4337_rpcs")?;
@ -483,6 +491,7 @@ impl Web3ProxyApp {
balanced_rpcs,
bundler_4337_rpcs,
config: top_config.app.clone(),
pending_txid_firehose: deduped_txid_firehose,
frontend_port: frontend_port.clone(),
frontend_ip_rate_limiter,
frontend_registered_user_rate_limiter,

@ -18,6 +18,7 @@ use std::sync::atomic::{self, AtomicU64};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::Instant;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::wrappers::WatchStream;
use tracing::{error, trace};
@ -56,13 +57,17 @@ impl Web3ProxyApp {
Web3ProxyError::BadRequest("unable to subscribe using these params".into())
})?;
// TODO: calling json! on every request is probably not fast. but we can only match against
// TODO: calling `json!` on every request is probably not fast. but it works for now
// TODO: i think we need a stricter EthSubscribeRequest type that JsonRpcRequest can turn into
if subscribe_to == "newHeads" {
// TODO: DRY This up. lots of duplication between newHeads and newPendingTransactions
match subscribe_to {
"newHeads" => {
let head_block_receiver = self.watch_consensus_head_receiver.clone();
let app = self.clone();
tokio::spawn(async move {
trace!("newHeads subscription {:?}", subscription_id);
let mut head_block_receiver = Abortable::new(
WatchStream::new(head_block_receiver),
subscription_registration,
@ -123,12 +128,79 @@ impl Web3ProxyApp {
trace!("closed newHeads subscription {:?}", subscription_id);
});
} else {
}
// TODO: bring back the other custom subscription types that had the full transaction object
"newPendingTransactions" => {
let pending_txid_firehose = self.pending_txid_firehose.subscribe();
let app = self.clone();
tokio::spawn(async move {
let mut pending_txid_firehose = Abortable::new(
BroadcastStream::new(pending_txid_firehose),
subscription_registration,
);
while let Some(Ok(new_txid)) = pending_txid_firehose.next().await {
// TODO: include the head_block here?
let subscription_request_metadata = RequestMetadata::new(
&app,
authorization.clone(),
RequestOrMethod::Method("eth_subscribe(newPendingTransactions)", 0),
None,
)
.await;
// check if we should close the websocket connection
if let Some(close_message) = app
.rate_limit_close_websocket(&subscription_request_metadata)
.await
{
let _ = response_sender.send(close_message).await;
break;
}
// TODO: make a struct/helper function for this
let response_json = json!({
"jsonrpc": "2.0",
"method":"eth_subscription",
"params": {
"subscription": subscription_id,
"result": new_txid,
},
});
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use JsonRpcForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
subscription_request_metadata.add_response(response_bytes);
// TODO: do clients support binary messages?
// TODO: can we check a content type header?
let response_msg = Message::Text(response_str);
if response_sender.send(response_msg).await.is_err() {
// TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects.
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
}
trace!(
"closed newPendingTransactions subscription {:?}",
subscription_id
);
});
}
_ => {
// TODO: make sure this gets a CU cost of unimplemented instead of the normal eth_subscribe cost?
return Err(Web3ProxyError::MethodNotFound(
subscribe_to.to_owned().into(),
));
}
};
// TODO: do something with subscription_join_handle?

@ -437,7 +437,7 @@ mod test {
let head_block = Web3ProxyBlock::try_new(Arc::new(head_block)).unwrap();
let (empty, _handle, _ranked_rpc_reciver) =
Web3Rpcs::spawn(1, None, 1, 1, "test".into(), None)
Web3Rpcs::spawn(1, None, 1, 1, "test".into(), None, None)
.await
.unwrap();

@ -396,6 +396,7 @@ impl Web3RpcConfig {
http_client: Option<reqwest::Client>,
blocks_by_hash_cache: BlocksByHashCache,
block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
pending_txid_firehouse_sender: Option<mpsc::Sender<TxHash>>,
max_head_block_age: Duration,
) -> anyhow::Result<(Arc<Web3Rpc>, Web3ProxyJoinHandle<()>)> {
if !self.extra.is_empty() {
@ -413,6 +414,7 @@ impl Web3RpcConfig {
block_interval,
blocks_by_hash_cache,
block_and_rpc_sender,
pending_txid_firehouse_sender,
max_head_block_age,
)
.await

@ -207,6 +207,7 @@ async fn _status(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) {
"head_block_hash": head_block.as_ref().map(|x| x.hash()),
"hostname": app.hostname,
"payment_factory_address": app.config.deposit_factory_contract,
"pending_txid_firehose": app.pending_txid_firehose,
"private_rpcs": app.private_rpcs,
"version": APP_USER_AGENT,
});

@ -12,7 +12,7 @@ use crate::frontend::status::MokaCacheSerializer;
use crate::jsonrpc::{JsonRpcErrorData, JsonRpcParams, JsonRpcResultData};
use counter::Counter;
use derive_more::From;
use ethers::prelude::U64;
use ethers::prelude::{TxHash, U64};
use futures::future::try_join_all;
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryFutureExt};
@ -67,6 +67,8 @@ pub struct Web3Rpcs {
/// how old our consensus head block we can be before we stop serving requests
/// calculated based on max_head_block_lag and averge block times
pub(super) max_head_block_age: Duration,
/// all of the pending txids for all of the rpcs. this still has duplicates
pub(super) pending_txid_firehose_sender: Option<mpsc::Sender<TxHash>>,
}
impl Web3Rpcs {
@ -78,6 +80,7 @@ impl Web3Rpcs {
min_sum_soft_limit: u32,
name: Cow<'static, str>,
watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
pending_txid_firehose_sender: Option<mpsc::Sender<TxHash>>,
) -> anyhow::Result<(
Arc<Self>,
Web3ProxyJoinHandle<()>,
@ -124,6 +127,7 @@ impl Web3Rpcs {
min_synced_rpcs: min_head_rpcs,
min_sum_soft_limit,
name,
pending_txid_firehose_sender,
watch_head_block: watch_consensus_head_sender,
watch_ranked_rpcs: watch_consensus_rpcs_sender,
});
@ -187,7 +191,7 @@ impl Web3Rpcs {
let http_client = app.http_client.clone();
let vredis_pool = app.vredis_pool.clone();
let block_sender = if self.watch_head_block.is_some() {
let block_and_rpc_sender = if self.watch_head_block.is_some() {
Some(self.block_sender.clone())
} else {
None
@ -207,7 +211,8 @@ impl Web3Rpcs {
block_interval,
http_client,
blocks_by_hash_cache,
block_sender,
block_and_rpc_sender,
self.pending_txid_firehose_sender.clone(),
self.max_head_block_age,
));
@ -318,30 +323,7 @@ impl Web3Rpcs {
) -> Web3ProxyResult<()> {
let mut futures = vec![];
// // setup the transaction funnel
// // it skips any duplicates (unless they are being orphaned)
// // fetches new transactions from the notifying rpc
// // forwards new transacitons to pending_tx_receipt_sender
// if let Some(pending_tx_sender) = pending_tx_sender.clone() {
// let clone = self.clone();
// let handle = tokio::task::spawn(async move {
// // TODO: set up this future the same as the block funnel
// while let Some((pending_tx_id, rpc)) =
// clone.pending_tx_id_receiver.write().await.recv().await
// {
// let f = clone.clone().process_incoming_tx_id(
// rpc,
// pending_tx_id,
// pending_tx_sender.clone(),
// );
// tokio::spawn(f);
// }
// Ok(())
// });
// futures.push(flatten_handle(handle));
// }
// TODO: do we need anything here to set up the transaction funnel
// setup the block funnel
if self.watch_head_block.is_some() {
@ -1546,6 +1528,7 @@ mod tests {
max_head_block_age: Duration::from_secs(60),
// TODO: test max_head_block_lag?
max_head_block_lag: 5.into(),
pending_txid_firehose_sender: None,
min_synced_rpcs: 1,
min_sum_soft_limit: 1,
};
@ -1795,6 +1778,7 @@ mod tests {
min_sum_soft_limit: 4_000,
min_synced_rpcs: 1,
name: "test".into(),
pending_txid_firehose_sender: None,
watch_head_block: Some(watch_consensus_head_sender),
watch_ranked_rpcs,
};
@ -1959,6 +1943,7 @@ mod tests {
min_sum_soft_limit: 1_000,
min_synced_rpcs: 1,
name: "test".into(),
pending_txid_firehose_sender: None,
watch_head_block: Some(watch_consensus_head_sender),
watch_ranked_rpcs,
};

@ -10,8 +10,7 @@ use crate::jsonrpc::{JsonRpcParams, JsonRpcResultData};
use crate::rpcs::request::RequestErrorHandler;
use anyhow::{anyhow, Context};
use arc_swap::ArcSwapOption;
use ethers::prelude::{Bytes, Middleware, U64};
use ethers::types::{Address, Transaction, U256};
use ethers::prelude::{Address, Bytes, Middleware, Transaction, TxHash, U256, U64};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency};
@ -39,6 +38,7 @@ pub struct Web3Rpc {
pub block_interval: Duration,
pub display_name: Option<String>,
pub db_conn: Option<DatabaseConnection>,
pub subscribe_txs: bool,
/// most all requests prefer use the http_provider
pub(super) http_provider: Option<EthersHttpProvider>,
/// the websocket url is only used for subscriptions
@ -100,6 +100,7 @@ impl Web3Rpc {
block_interval: Duration,
block_map: BlocksByHashCache,
block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
pending_txid_firehose_sender: Option<mpsc::Sender<TxHash>>,
max_head_block_age: Duration,
) -> anyhow::Result<(Arc<Web3Rpc>, Web3ProxyJoinHandle<()>)> {
let created_at = Instant::now();
@ -200,6 +201,7 @@ impl Web3Rpc {
peak_latency: Some(peak_latency),
median_latency: Some(median_request_latency),
soft_limit: config.soft_limit,
subscribe_txs: config.subscribe_txs,
ws_url,
disconnect_watch: Some(disconnect_watch),
..Default::default()
@ -213,7 +215,12 @@ impl Web3Rpc {
let new_connection = new_connection.clone();
tokio::spawn(async move {
new_connection
.subscribe_with_reconnect(block_map, block_and_rpc_sender, chain_id)
.subscribe_with_reconnect(
block_map,
block_and_rpc_sender,
pending_txid_firehose_sender,
chain_id,
)
.await
})
};
@ -588,12 +595,18 @@ impl Web3Rpc {
self: Arc<Self>,
block_map: BlocksByHashCache,
block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
pending_txid_firehose_sender: Option<mpsc::Sender<TxHash>>,
chain_id: u64,
) -> Web3ProxyResult<()> {
loop {
if let Err(err) = self
.clone()
.subscribe(block_map.clone(), block_and_rpc_sender.clone(), chain_id)
.subscribe(
block_map.clone(),
block_and_rpc_sender.clone(),
pending_txid_firehose_sender.clone(),
chain_id,
)
.await
{
if self.should_disconnect() {
@ -625,6 +638,7 @@ impl Web3Rpc {
self: Arc<Self>,
block_map: BlocksByHashCache,
block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
pending_txid_firehose_sender: Option<mpsc::Sender<TxHash>>,
chain_id: u64,
) -> Web3ProxyResult<()> {
let error_handler = if self.backup {
@ -750,6 +764,20 @@ impl Web3Rpc {
futures.push(flatten_handle(tokio::spawn(f)));
}
// subscribe to new transactions
if let Some(pending_txid_firehose) = pending_txid_firehose_sender.clone() {
let clone = self.clone();
let subscribe_stop_rx = subscribe_stop_tx.subscribe();
let f = async move {
clone
.subscribe_new_transactions(pending_txid_firehose, subscribe_stop_rx)
.await
};
futures.push(flatten_handle(tokio::spawn(f)));
}
// exit if any of the futures exit
// TODO: have an enum for which one exited?
let first_exit = futures.next().await;
@ -772,6 +800,60 @@ impl Web3Rpc {
Ok(())
}
async fn subscribe_new_transactions(
self: &Arc<Self>,
pending_txid_firehose: mpsc::Sender<TxHash>,
mut subscribe_stop_rx: watch::Receiver<bool>,
) -> Web3ProxyResult<()> {
trace!("subscribing to new transactions on {}", self);
// rpcs opt-into subscribing to transactions. its a lot of bandwidth
if !self.subscribe_txs {
loop {
if *subscribe_stop_rx.borrow() {
trace!("stopping ws block subscription on {}", self);
return Ok(());
}
subscribe_stop_rx.changed().await?;
}
}
if let Some(ws_provider) = self.ws_provider.load().as_ref() {
// todo: move subscribe_blocks onto the request handle
let authorization = Default::default();
let error_handler = Some(Level::ERROR.into());
let active_request_handle = self
.wait_for_request_handle(&authorization, None, error_handler)
.await;
let mut pending_txs_sub = ws_provider.subscribe_pending_txs().await?;
drop(active_request_handle);
while let Some(x) = pending_txs_sub.next().await {
if *subscribe_stop_rx.borrow() {
// TODO: this is checking way too often. have this on a timer instead
trace!("stopping ws block subscription on {}", self);
break;
}
// this should always work
if let Err(err) = pending_txid_firehose.try_send(x) {
error!(
?err,
"pending_txid_firehose failed sending. it must be full"
);
}
}
} else {
unimplemented!();
}
Ok(())
}
/// Subscribe to new blocks.
async fn subscribe_new_heads(
self: &Arc<Self>,

@ -198,6 +198,7 @@ impl StatBuffer {
}
}
// TODO: wait on the frontend to shutdown
// TODO: wait on all websockets to close
// TODO: wait on all pending external requests to finish
info!("waiting 5 seconds for remaining stats to arrive");