no more overflow on the pending txid firehose

This commit is contained in:
Bryan Stitt 2023-10-07 03:41:11 -07:00
parent 00991d67b8
commit cc2eb9e862
8 changed files with 136 additions and 161 deletions

11
Cargo.lock generated

@ -1288,7 +1288,7 @@ dependencies = [
name = "deduped_broadcast"
version = "0.2.2"
dependencies = [
"lru",
"moka",
"serde",
"tokio",
"tracing",
@ -3055,15 +3055,6 @@ version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "lru"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1efa59af2ddfad1854ae27d75009d538d0998b4b2fd47083e743ac1a10e46c60"
dependencies = [
"hashbrown 0.14.1",
]
[[package]]
name = "mach2"
version = "0.4.1"

@ -7,6 +7,6 @@ edition = "2021"
[dependencies]
tracing = "0.1"
lru = { version = "0.12.0" }
moka = { version = "0.12.1", features = ["future"] }
serde = "1"
tokio = { version = "1.32.0", features = ["full"] }

@ -1,106 +1,65 @@
use moka::future::{Cache, CacheBuilder};
use serde::ser::SerializeStruct;
use serde::{Serialize, Serializer};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::{
collections::hash_map::DefaultHasher,
fmt::Debug,
hash::{Hash, Hasher},
};
use tokio::sync::{broadcast, mpsc};
struct DedupedBroadcasterTask<T>
where
T: Clone + Debug + Hash + Send + Sync + 'static,
{
unfiltered_rx: mpsc::Receiver<T>,
broadcast_filtered_tx: Arc<broadcast::Sender<T>>,
cache: lru::LruCache<u64, ()>,
total_unfiltered: Arc<AtomicUsize>,
total_filtered: Arc<AtomicUsize>,
total_broadcasts: Arc<AtomicUsize>,
}
use std::time::Duration;
use std::{fmt::Debug, hash::Hash};
use tokio::sync::broadcast;
pub struct DedupedBroadcaster<T>
where
T: Clone + Debug + Hash + Send + Sync + 'static,
{
/// takes in things to broadcast. Can include duplicates, but only one will be sent.
unfiltered_tx: mpsc::Sender<T>,
/// subscribe to this to get deduplicated items
broadcast_filtered_tx: Arc<broadcast::Sender<T>>,
broadcast_filtered_tx: broadcast::Sender<T>,
cache: Cache<T, ()>,
total_unfiltered: Arc<AtomicUsize>,
total_filtered: Arc<AtomicUsize>,
total_broadcasts: Arc<AtomicUsize>,
}
impl<T> DedupedBroadcasterTask<T>
where
T: Clone + Debug + Hash + Send + Sync + 'static,
{
/// forward things from input_receiver to output_sender if they aren't in the cache
async fn run(mut self) {
while let Some(item) = self.unfiltered_rx.recv().await {
let mut hasher = DefaultHasher::new();
item.hash(&mut hasher);
let hashed = hasher.finish();
self.total_unfiltered
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// we don't actually care what the return value is. we just want to send only if the cache is empty
// TODO: count new vs unique
self.cache.get_or_insert(hashed, || {
self.total_filtered
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if let Ok(x) = self.broadcast_filtered_tx.send(item) {
self.total_broadcasts.fetch_add(x, Ordering::Relaxed);
}
});
}
}
}
impl<T> DedupedBroadcaster<T>
where
T: Clone + Debug + Hash + Send + Sync + 'static,
T: Clone + Debug + Eq + Hash + PartialEq + Send + Sync + 'static,
{
pub fn new(capacity: usize, cache_capacity: usize) -> Self {
let (unfiltered_tx, unfiltered_rx) = mpsc::channel::<T>(capacity);
pub fn new(capacity: usize, cache_capacity: usize) -> Arc<Self> {
let (broadcast_filtered_tx, _) = broadcast::channel(capacity);
let cache = lru::LruCache::new(cache_capacity.try_into().unwrap());
let broadcast_filtered_tx = Arc::new(broadcast_filtered_tx);
let cache = CacheBuilder::new(cache_capacity as u64)
.time_to_idle(Duration::from_secs(10 * 60))
.name("DedupedBroadcaster")
.build();
let total_unfiltered = Arc::new(AtomicUsize::new(0));
let total_filtered = Arc::new(AtomicUsize::new(0));
let total_broadcasts = Arc::new(AtomicUsize::new(0));
let task = DedupedBroadcasterTask {
unfiltered_rx,
broadcast_filtered_tx: broadcast_filtered_tx.clone(),
let x = Self {
broadcast_filtered_tx,
cache,
total_unfiltered: total_unfiltered.clone(),
total_filtered: total_filtered.clone(),
total_broadcasts: total_broadcasts.clone(),
total_broadcasts,
total_filtered,
total_unfiltered,
};
// TODO: do something with the handle?
tokio::task::spawn(task.run());
Self {
unfiltered_tx,
broadcast_filtered_tx,
total_unfiltered,
total_filtered,
total_broadcasts,
}
Arc::new(x)
}
pub fn sender(&self) -> &mpsc::Sender<T> {
&self.unfiltered_tx
/// filter duplicates and send the rest to any subscribers
/// TODO: change this to be `send` and put a moka cache here instead of lru. then the de-dupe load will be spread across senders
pub async fn send(&self, item: T) {
self.total_unfiltered.fetch_add(1, Ordering::Relaxed);
self.cache
.get_with(item.clone(), async {
self.total_filtered.fetch_add(1, Ordering::Relaxed);
if let Ok(x) = self.broadcast_filtered_tx.send(item) {
self.total_broadcasts.fetch_add(x, Ordering::Relaxed);
}
})
.await;
}
pub fn subscribe(&self) -> broadcast::Receiver<T> {
@ -172,18 +131,19 @@ mod tests {
#[tokio::test]
async fn test_deduped_broadcaster() {
// TODO: what sizes?
let broadcaster = DedupedBroadcaster::new(10, 10);
let mut receiver_1 = broadcaster.subscribe();
let _receiver_2 = broadcaster.subscribe();
broadcaster.sender().send(1).await.unwrap();
broadcaster.sender().send(1).await.unwrap();
broadcaster.sender().send(2).await.unwrap();
broadcaster.sender().send(1).await.unwrap();
broadcaster.sender().send(2).await.unwrap();
broadcaster.sender().send(3).await.unwrap();
broadcaster.sender().send(3).await.unwrap();
broadcaster.send(1).await;
broadcaster.send(1).await;
broadcaster.send(2).await;
broadcaster.send(1).await;
broadcaster.send(2).await;
broadcaster.send(3).await;
broadcaster.send(3).await;
yield_now().await;

@ -87,7 +87,7 @@ pub struct Web3ProxyApp {
/// TODO: broadcast channel instead?
pub watch_consensus_head_receiver: watch::Receiver<Option<Web3ProxyBlock>>,
/// rpc clients that subscribe to newPendingTransactions use this channel
pub pending_txid_firehose: deduped_broadcast::DedupedBroadcaster<TxHash>,
pub pending_txid_firehose: Arc<DedupedBroadcaster<TxHash>>,
pub hostname: Option<String>,
pub frontend_port: Arc<AtomicU16>,
/// rate limit anonymous users
@ -437,7 +437,8 @@ impl Web3ProxyApp {
let chain_id = top_config.app.chain_id;
let deduped_txid_firehose = DedupedBroadcaster::new(5_000, 5_000);
// TODO: deduped_txid_firehose capacity from config
let deduped_txid_firehose = DedupedBroadcaster::new(100, 20_000);
// TODO: remove this. it should only be done by apply_top_config
let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn(
@ -447,7 +448,7 @@ impl Web3ProxyApp {
top_config.app.min_sum_soft_limit,
"balanced rpcs".into(),
Some(watch_consensus_head_sender),
Some(deduped_txid_firehose.sender().clone()),
Some(deduped_txid_firehose.clone()),
)
.await
.web3_context("spawning balanced rpcs")?;
@ -1414,6 +1415,10 @@ impl Web3ProxyApp {
// TODO: decode the transaction
// TODO: error if the chain_id is incorrect
// TODO: return now if already confirmed
// TODO: error if the nonce is way far in the future
// TODO: self.pending_txid_firehose.send(txid).await;
let response = self
.try_send_protected(
@ -1466,6 +1471,8 @@ impl Web3ProxyApp {
}
}
// TODO: if successful, send the txid to the pending transaction firehose
// emit transaction count stats
// TODO: use this cache to avoid sending duplicate transactions?
// TODO: different salt for ips and transactions?

@ -61,6 +61,8 @@ impl Web3ProxyApp {
// TODO: DRY This up. lots of duplication between newHeads and newPendingTransactions
match subscribe_to {
"newHeads" => {
// we clone the watch before spawning so that theres less chance of missing anything
// TODO: watch receivers can miss a block. is that okay?
let head_block_receiver = self.watch_consensus_head_receiver.clone();
let app = self.clone();
let authorization = web3_request.authorization.clone();
@ -144,6 +146,7 @@ impl Web3ProxyApp {
}
// TODO: bring back the other custom subscription types that had the full transaction object
"newPendingTransactions" => {
// we subscribe before spawning so that theres less chance of missing anything
let pending_txid_firehose = self.pending_txid_firehose.subscribe();
let app = self.clone();
let authorization = web3_request.authorization.clone();
@ -154,61 +157,72 @@ impl Web3ProxyApp {
subscription_registration,
);
while let Some(Ok(new_txid)) = pending_txid_firehose.next().await {
// TODO: include the head_block here?
match Web3Request::new_with_app(
&app,
authorization.clone(),
None,
RequestOrMethod::Method(
"eth_subscribe(newPendingTransactions)".into(),
0,
),
None,
)
.await
{
while let Some(maybe_txid) = pending_txid_firehose.next().await {
match maybe_txid {
Err(err) => {
error!(?err, "error creating subscription_web3_request");
// what should we do to turn this error into a message for them?
break;
trace!(
?err,
"error inside newPendingTransactions. probably lagged"
);
continue;
}
Ok(subscription_web3_request) => {
// check if we should close the websocket connection
if let Some(close_message) = app
.rate_limit_close_websocket(&subscription_web3_request)
.await
Ok(new_txid) => {
// TODO: include the head_block here?
match Web3Request::new_with_app(
&app,
authorization.clone(),
None,
RequestOrMethod::Method(
"eth_subscribe(newPendingTransactions)".into(),
0,
),
None,
)
.await
{
let _ = response_sender.send(close_message).await;
break;
}
Err(err) => {
error!(?err, "error creating subscription_web3_request");
// what should we do to turn this error into a message for them?
break;
}
Ok(subscription_web3_request) => {
// check if we should close the websocket connection
if let Some(close_message) = app
.rate_limit_close_websocket(&subscription_web3_request)
.await
{
let _ = response_sender.send(close_message).await;
break;
}
// TODO: make a struct/helper function for this
let response_json = json!({
"jsonrpc": "2.0",
"method":"eth_subscription",
"params": {
"subscription": subscription_id,
"result": new_txid,
},
});
// TODO: make a struct/helper function for this
let response_json = json!({
"jsonrpc": "2.0",
"method":"eth_subscription",
"params": {
"subscription": subscription_id,
"result": new_txid,
},
});
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use JsonRpcForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// we could use JsonRpcForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
subscription_web3_request.add_response(response_bytes);
subscription_web3_request.add_response(response_bytes);
// TODO: do clients support binary messages?
// TODO: can we check a content type header?
let response_msg = Message::Text(response_str);
// TODO: do clients support binary messages?
// TODO: can we check a content type header?
let response_msg = Message::Text(response_str);
if response_sender.send(response_msg).await.is_err() {
// TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects.
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
if response_sender.send(response_msg).await.is_err() {
// TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects.
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
}
}
}
}
}

@ -3,6 +3,7 @@ use crate::compute_units::default_usd_per_cu;
use crate::rpcs::blockchain::{BlocksByHashCache, Web3ProxyBlock};
use crate::rpcs::one::Web3Rpc;
use argh::FromArgs;
use deduped_broadcast::DedupedBroadcaster;
use ethers::prelude::{Address, TxHash};
use ethers::types::{U256, U64};
use hashbrown::HashMap;
@ -426,7 +427,7 @@ impl Web3RpcConfig {
http_client: Option<reqwest::Client>,
blocks_by_hash_cache: BlocksByHashCache,
block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
pending_txid_firehouse_sender: Option<mpsc::Sender<TxHash>>,
pending_txid_firehouse: Option<Arc<DedupedBroadcaster<TxHash>>>,
max_head_block_age: Duration,
) -> anyhow::Result<(Arc<Web3Rpc>, Web3ProxyJoinHandle<()>)> {
if !self.extra.is_empty() {
@ -444,7 +445,7 @@ impl Web3RpcConfig {
block_interval,
blocks_by_hash_cache,
block_and_rpc_sender,
pending_txid_firehouse_sender,
pending_txid_firehouse,
max_head_block_age,
)
.await

@ -9,6 +9,7 @@ use crate::frontend::authorization::Web3Request;
use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::frontend::status::MokaCacheSerializer;
use crate::jsonrpc::{self, JsonRpcErrorData, JsonRpcParams, JsonRpcResultData};
use deduped_broadcast::DedupedBroadcaster;
use derive_more::From;
use ethers::prelude::{TxHash, U64};
use futures::future::try_join_all;
@ -62,7 +63,7 @@ pub struct Web3Rpcs {
/// calculated based on max_head_block_lag and averge block times
pub(super) max_head_block_age: Duration,
/// all of the pending txids for all of the rpcs. this still has duplicates
pub(super) pending_txid_firehose_sender: Option<mpsc::Sender<TxHash>>,
pub(super) pending_txid_firehose: Option<Arc<DedupedBroadcaster<TxHash>>>,
}
/// this is a RankedRpcs that should be ready to use
@ -102,7 +103,7 @@ impl Web3Rpcs {
min_sum_soft_limit: u32,
name: Cow<'static, str>,
watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
pending_txid_firehose_sender: Option<mpsc::Sender<TxHash>>,
pending_txid_firehose: Option<Arc<DedupedBroadcaster<TxHash>>>,
) -> anyhow::Result<(
Arc<Self>,
Web3ProxyJoinHandle<()>,
@ -149,7 +150,7 @@ impl Web3Rpcs {
min_synced_rpcs: min_head_rpcs,
min_sum_soft_limit,
name,
pending_txid_firehose_sender,
pending_txid_firehose,
watch_head_block: watch_consensus_head_sender,
watch_ranked_rpcs: watch_consensus_rpcs_sender,
});
@ -234,7 +235,7 @@ impl Web3Rpcs {
http_client,
blocks_by_hash_cache,
block_and_rpc_sender,
self.pending_txid_firehose_sender.clone(),
self.pending_txid_firehose.clone(),
self.max_head_block_age,
));

@ -10,6 +10,7 @@ use crate::jsonrpc::{self, JsonRpcParams, JsonRpcResultData};
use crate::rpcs::request::RequestErrorHandler;
use anyhow::{anyhow, Context};
use arc_swap::ArcSwapOption;
use deduped_broadcast::DedupedBroadcaster;
use ethers::prelude::{Address, Bytes, Middleware, Transaction, TxHash, U256, U64};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
@ -104,7 +105,7 @@ impl Web3Rpc {
block_interval: Duration,
block_map: BlocksByHashCache,
block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
pending_txid_firehose_sender: Option<mpsc::Sender<TxHash>>,
pending_txid_firehose: Option<Arc<DedupedBroadcaster<TxHash>>>,
max_head_block_age: Duration,
) -> anyhow::Result<(Arc<Web3Rpc>, Web3ProxyJoinHandle<()>)> {
let created_at = Instant::now();
@ -222,7 +223,7 @@ impl Web3Rpc {
.subscribe_with_reconnect(
block_map,
block_and_rpc_sender,
pending_txid_firehose_sender,
pending_txid_firehose,
chain_id,
)
.await
@ -624,7 +625,7 @@ impl Web3Rpc {
self: Arc<Self>,
block_map: BlocksByHashCache,
block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
pending_txid_firehose_sender: Option<mpsc::Sender<TxHash>>,
pending_txid_firehose: Option<Arc<DedupedBroadcaster<TxHash>>>,
chain_id: u64,
) -> Web3ProxyResult<()> {
loop {
@ -633,7 +634,7 @@ impl Web3Rpc {
.subscribe(
block_map.clone(),
block_and_rpc_sender.clone(),
pending_txid_firehose_sender.clone(),
pending_txid_firehose.clone(),
chain_id,
)
.await
@ -667,7 +668,7 @@ impl Web3Rpc {
self: Arc<Self>,
block_map: BlocksByHashCache,
block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
pending_txid_firehose_sender: Option<mpsc::Sender<TxHash>>,
pending_txid_firehose: Option<Arc<DedupedBroadcaster<TxHash>>>,
chain_id: u64,
) -> Web3ProxyResult<()> {
let error_handler = if self.backup {
@ -792,7 +793,7 @@ impl Web3Rpc {
}
// subscribe to new transactions
if let Some(pending_txid_firehose) = pending_txid_firehose_sender.clone() {
if let Some(pending_txid_firehose) = pending_txid_firehose.clone() {
let clone = self.clone();
let subscribe_stop_rx = subscribe_stop_tx.subscribe();
@ -829,7 +830,7 @@ impl Web3Rpc {
async fn subscribe_new_transactions(
self: &Arc<Self>,
pending_txid_firehose: mpsc::Sender<TxHash>,
pending_txid_firehose: Arc<DedupedBroadcaster<TxHash>>,
mut subscribe_stop_rx: watch::Receiver<bool>,
) -> Web3ProxyResult<()> {
trace!("subscribing to new transactions on {}", self);
@ -846,28 +847,28 @@ impl Web3Rpc {
}
if let Some(ws_provider) = self.ws_provider.load().as_ref() {
// todo: move subscribe_blocks onto the request handle
// todo: move subscribe_blocks onto the request handle instead of having a seperate wait_for_throttle
self.wait_for_throttle(Instant::now() + Duration::from_secs(5))
.await?;
// TODO: only subscribe if a user has subscribed
let mut pending_txs_sub = ws_provider.subscribe_pending_txs().await?;
while let Some(x) = pending_txs_sub.next().await {
if *subscribe_stop_rx.borrow_and_update() {
// TODO: check this less often
if *subscribe_stop_rx.borrow() {
// TODO: this is checking way too often. have this on a timer instead
trace!("stopping ws block subscription on {}", self);
break;
}
// this should always work
if let Err(err) = pending_txid_firehose.try_send(x) {
error!(
?err,
"pending_txid_firehose failed sending. it must be full"
);
}
// todo!("this has a bug! it gets full very quickly when no one is subscribed!");
pending_txid_firehose.send(x).await;
}
} else {
// only websockets subscribe to pending transactions
// its possibel to do with http, but not recommended
// TODO: what should we do here?
loop {
if *subscribe_stop_rx.borrow_and_update() {