From dd674bb90077692d4f944fb95bb6c4de93600029 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 16 Jun 2022 20:57:48 +0000 Subject: [PATCH] only subscribe when someone is listening --- web3-proxy/src/app.rs | 237 ++++++++++++++------------ web3-proxy/src/connections.rs | 100 +++++------ web3-proxy/src/frontend/http.rs | 6 +- web3-proxy/src/frontend/http_proxy.rs | 2 +- 4 files changed, 171 insertions(+), 174 deletions(-) diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 977d18b8..8df99651 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -3,8 +3,8 @@ use dashmap::mapref::entry::Entry as DashMapEntry; use dashmap::DashMap; use ethers::prelude::Transaction; use ethers::prelude::{Block, TxHash, H256}; +use futures::future::Abortable; use futures::future::{join_all, AbortHandle}; -use futures::future::{try_join_all, Abortable}; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use futures::Future; @@ -19,7 +19,7 @@ use std::time::Duration; use tokio::sync::{broadcast, watch}; use tokio::task::JoinHandle; use tokio::time::timeout; -use tokio_stream::wrappers::WatchStream; +use tokio_stream::wrappers::{BroadcastStream, WatchStream}; use tracing::{debug, info, info_span, instrument, trace, warn, Instrument}; use crate::config::Web3ConnectionConfig; @@ -56,23 +56,23 @@ pub async fn flatten_handle(handle: AnyhowJoinHandle) -> anyhow::Result } } -pub async fn flatten_handles( - mut handles: FuturesUnordered>, +pub async fn flatten_handles( + mut handles: FuturesUnordered>, ) -> anyhow::Result<()> { while let Some(x) = handles.next().await { match x { Err(e) => return Err(e.into()), Ok(Err(e)) => return Err(e), - Ok(Ok(())) => {} + Ok(Ok(_)) => {} } } Ok(()) } +// TODO: think more about TxState. d #[derive(Clone)] pub enum TxState { - Known(TxHash), Pending(Transaction), Confirmed(Transaction), Orphaned(Transaction), @@ -93,7 +93,6 @@ pub struct Web3ProxyApp { // TODO: broadcast channel instead? head_block_receiver: watch::Receiver>, pending_tx_sender: broadcast::Sender, - pending_tx_receiver: broadcast::Receiver, pending_transactions: Arc>, next_subscription_id: AtomicUsize, } @@ -106,6 +105,10 @@ impl fmt::Debug for Web3ProxyApp { } impl Web3ProxyApp { + pub fn get_pending_transactions(&self) -> &DashMap { + &self.pending_transactions + } + pub async fn spawn( chain_id: usize, redis_address: Option, @@ -193,6 +196,9 @@ impl Web3ProxyApp { private_rpcs }; + // TODO: use this? it could listen for confirmed transactions and then clear pending_transactions, but the head_block_sender is doing that + drop(pending_tx_receiver); + let app = Web3ProxyApp { balanced_rpcs, private_rpcs, @@ -200,7 +206,6 @@ impl Web3ProxyApp { response_cache: Default::default(), head_block_receiver, pending_tx_sender, - pending_tx_receiver, pending_transactions, next_subscription_id: 1.into(), }; @@ -231,115 +236,127 @@ impl Web3ProxyApp { // save the id so we can use it in the response let id = payload.id.clone(); - let subscription_join_handle = { - let subscription_id = subscription_id.clone(); + match payload.params.as_deref().unwrap().get() { + r#"["newHeads"]"# => { + let head_block_receiver = self.head_block_receiver.clone(); - match payload.params.as_deref().unwrap().get() { - r#"["newHeads"]"# => { - let head_block_receiver = self.head_block_receiver.clone(); + let subscription_id = subscription_id.clone(); - trace!(?subscription_id, "new heads subscription"); - tokio::spawn(async move { - let mut head_block_receiver = Abortable::new( - WatchStream::new(head_block_receiver), - subscription_registration, - ); + trace!(?subscription_id, "new heads subscription"); + tokio::spawn(async move { + let mut head_block_receiver = Abortable::new( + WatchStream::new(head_block_receiver), + subscription_registration, + ); - while let Some(new_head) = head_block_receiver.next().await { - // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id - let msg = json!({ - "jsonrpc": "2.0", - "method":"eth_subscription", - "params": { - "subscription": subscription_id, - "result": new_head, - }, - }); + while let Some(new_head) = head_block_receiver.next().await { + // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id + let msg = json!({ + "jsonrpc": "2.0", + "method":"eth_subscription", + "params": { + "subscription": subscription_id, + "result": new_head, + }, + }); - let msg = Message::Text(serde_json::to_string(&msg).unwrap()); + let msg = Message::Text(serde_json::to_string(&msg).unwrap()); - if subscription_tx.send_async(msg).await.is_err() { - // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? - break; - }; - } + if subscription_tx.send_async(msg).await.is_err() { + // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? + break; + }; + } - trace!(?subscription_id, "closed new heads subscription"); - }) - } - r#"["newPendingTransactions"]"# => { - let mut pending_tx_receiver = self.pending_tx_sender.subscribe(); - - trace!(?subscription_id, "pending transactions subscription"); - tokio::spawn(async move { - while let Ok(new_tx_state) = pending_tx_receiver.recv().await { - let new_tx = match new_tx_state { - TxState::Known(..) => continue, - TxState::Confirmed(..) => continue, - TxState::Orphaned(tx) => tx, - TxState::Pending(tx) => tx, - }; - - // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id - let msg = json!({ - "jsonrpc": "2.0", - "method": "eth_subscription", - "params": { - "subscription": subscription_id, - "result": new_tx.hash, - }, - }); - - let msg = Message::Text(serde_json::to_string(&msg).unwrap()); - - if subscription_tx.send_async(msg).await.is_err() { - // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? - break; - }; - } - - trace!(?subscription_id, "closed new heads subscription"); - }) - } - r#"["newPendingFullTransactions"]"# => { - // TODO: too much copy/pasta with newPendingTransactions - let mut pending_tx_receiver = self.pending_tx_sender.subscribe(); - - trace!(?subscription_id, "pending transactions subscription"); - tokio::spawn(async move { - while let Ok(new_tx_state) = pending_tx_receiver.recv().await { - let new_tx = match new_tx_state { - TxState::Known(..) => continue, - TxState::Confirmed(..) => continue, - TxState::Orphaned(tx) => tx, - TxState::Pending(tx) => tx, - }; - - // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id - let msg = json!({ - "jsonrpc": "2.0", - "method": "eth_subscription", - "params": { - "subscription": subscription_id, - // upstream just sends the txid, but we want to send the whole transaction - "result": new_tx, - }, - }); - - let msg = Message::Text(serde_json::to_string(&msg).unwrap()); - - if subscription_tx.send_async(msg).await.is_err() { - // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? - break; - }; - } - - trace!(?subscription_id, "closed new heads subscription"); - }) - } - _ => return Err(anyhow::anyhow!("unimplemented")), + trace!(?subscription_id, "closed new heads subscription"); + }); } - }; + r#"["newPendingTransactions"]"# => { + let pending_tx_receiver = self.pending_tx_sender.subscribe(); + + let mut pending_tx_receiver = Abortable::new( + BroadcastStream::new(pending_tx_receiver), + subscription_registration, + ); + + let subscription_id = subscription_id.clone(); + + trace!(?subscription_id, "pending transactions subscription"); + tokio::spawn(async move { + while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { + let new_tx = match new_tx_state { + TxState::Pending(tx) => tx, + TxState::Confirmed(..) => continue, + TxState::Orphaned(tx) => tx, + }; + + // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id + let msg = json!({ + "jsonrpc": "2.0", + "method": "eth_subscription", + "params": { + "subscription": subscription_id, + "result": new_tx.hash, + }, + }); + + let msg = Message::Text(serde_json::to_string(&msg).unwrap()); + + if subscription_tx.send_async(msg).await.is_err() { + // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? + break; + }; + } + + trace!(?subscription_id, "closed new heads subscription"); + }); + } + r#"["newPendingFullTransactions"]"# => { + // TODO: too much copy/pasta with newPendingTransactions + let pending_tx_receiver = self.pending_tx_sender.subscribe(); + + let mut pending_tx_receiver = Abortable::new( + BroadcastStream::new(pending_tx_receiver), + subscription_registration, + ); + + let subscription_id = subscription_id.clone(); + + trace!(?subscription_id, "pending transactions subscription"); + + // TODO: do something with this handle? + tokio::spawn(async move { + while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { + let new_tx = match new_tx_state { + TxState::Pending(tx) => tx, + TxState::Confirmed(..) => continue, + TxState::Orphaned(tx) => tx, + }; + + // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id + let msg = json!({ + "jsonrpc": "2.0", + "method": "eth_subscription", + "params": { + "subscription": subscription_id, + // upstream just sends the txid, but we want to send the whole transaction + "result": new_tx, + }, + }); + + let msg = Message::Text(serde_json::to_string(&msg).unwrap()); + + if subscription_tx.send_async(msg).await.is_err() { + // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? + break; + }; + } + + trace!(?subscription_id, "closed new heads subscription"); + }); + } + _ => return Err(anyhow::anyhow!("unimplemented")), + } // TODO: do something with subscription_join_handle? diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index e2029f73..98134ec8 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -2,7 +2,6 @@ use anyhow::Context; use arc_swap::ArcSwap; use counter::Counter; -use dashmap::mapref::entry::Entry as DashMapEntry; use dashmap::DashMap; use derive_more::From; use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256}; @@ -165,6 +164,7 @@ impl Web3Connections { ) -> Result { // TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself // TODO: there is a race here sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!) + // TODO: maximum wait time let pending_transaction: Transaction = rpc .wait_for_request_handle() .await @@ -190,67 +190,44 @@ impl Web3Connections { pending_tx_sender: broadcast::Sender, ) -> anyhow::Result<()> { // TODO: how many retries? until some timestamp is hit is probably better. maybe just loop and call this with a timeout - for i in 0..30 { - // TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it? - info!(?pending_tx_id, "checking pending_transactions"); - let tx_state = match self.pending_transactions.entry(pending_tx_id) { - DashMapEntry::Occupied(mut entry) => { - if let TxState::Known(_) = entry.get() { - // TODO: if its occupied, but still only "Known", multiple nodes have this transaction. ask both - match self._funnel_transaction(rpc.clone(), pending_tx_id).await { - Ok(tx_state) => { - entry.insert(tx_state.clone()); + // TODO: after more investigation, i don't think retries will help. i think this is because chains of transactions get dropped from memory + // TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it? + trace!(?pending_tx_id, "checking pending_transactions on {}", rpc); - Some(tx_state) - } - Err(err) => { - debug!( - ?i, - ?err, - ?pending_tx_id, - "failed sending transaction (retry/race)" - ); - - None - } - } - } else { - None - } - } - DashMapEntry::Vacant(entry) => { - // TODO: how many retries? - // TODO: use a generic retry provider instead? - match self._funnel_transaction(rpc.clone(), pending_tx_id).await { - Ok(tx_state) => { - entry.insert(tx_state.clone()); - - Some(tx_state) - } - Err(err) => { - debug!(?i, ?err, ?pending_tx_id, "failed sending transaction"); - - None - } - } - } - }; - - if let Some(tx_state) = tx_state { - let _ = pending_tx_sender.send(tx_state); - - info!(?pending_tx_id, "sent"); - - // since we sent a transaction, we should return - return Ok(()); - } - - // unable to update the entry. sleep and try again soon - // TODO: exponential backoff with jitter starting from a much smaller time - sleep(Duration::from_millis(3000)).await; + if self.pending_transactions.contains_key(&pending_tx_id) { + // this transaction has already been processed + return Ok(()); } - info!(?pending_tx_id, "not found"); + if pending_tx_sender.receiver_count() == 0 { + // no receivers, so no point in querying to get the full transaction + return Ok(()); + } + + // query the rpc for this transaction + // it is possible that another rpc is also being queried. thats fine. we want the fastest response + match self._funnel_transaction(rpc.clone(), pending_tx_id).await { + Ok(tx_state) => { + let _ = pending_tx_sender.send(tx_state); + + trace!(?pending_tx_id, "sent"); + + // we sent the transaction. return now. don't break looping because that gives a warning + return Ok(()); + } + Err(err) => { + trace!(?err, ?pending_tx_id, "failed fetching transaction"); + // unable to update the entry. sleep and try again soon + // TODO: retry with exponential backoff with jitter starting from a much smaller time + // sleep(Duration::from_millis(100)).await; + } + } + + // warn is too loud. this is somewhat common + // "There is a Pending txn with a lower account nonce. This txn can only be executed after confirmation of the earlier Txn Hash#" + // sometimes it's been pending for many hours + // sometimes it's maybe something else? + debug!(?pending_tx_id, "not found on {}", rpc); Ok(()) } @@ -544,8 +521,11 @@ impl Web3Connections { // mark all transactions in the block as confirmed if pending_tx_sender.is_some() { for tx_hash in &new_block.transactions { - // TODO: should we mark as confirmed via pending_tx_sender so that orphans are easier? + // TODO: should we mark as confirmed via pending_tx_sender? + // TODO: possible deadlock here! + // trace!("removing {}...", tx_hash); let _ = self.pending_transactions.remove(tx_hash); + // trace!("removed {}", tx_hash); } }; diff --git a/web3-proxy/src/frontend/http.rs b/web3-proxy/src/frontend/http.rs index 4bab6278..98ff1eae 100644 --- a/web3-proxy/src/frontend/http.rs +++ b/web3-proxy/src/frontend/http.rs @@ -13,17 +13,17 @@ pub async fn index() -> impl IntoResponse { /// Very basic status page pub async fn status(app: Extension>) -> impl IntoResponse { - let app = app.0.as_ref(); - + // TODO: what else should we include? uptime? prometheus? let balanced_rpcs = app.get_balanced_rpcs(); let private_rpcs = app.get_private_rpcs(); let num_active_requests = app.get_active_requests().len(); + let num_pending_transactions = app.get_pending_transactions().len(); - // TODO: what else should we include? uptime? prometheus? let body = json!({ "balanced_rpcs": balanced_rpcs, "private_rpcs": private_rpcs, "num_active_requests": num_active_requests, + "num_pending_transactions": num_pending_transactions, }); (StatusCode::INTERNAL_SERVER_ERROR, Json(body)) diff --git a/web3-proxy/src/frontend/http_proxy.rs b/web3-proxy/src/frontend/http_proxy.rs index a3ad33fb..6656b0c2 100644 --- a/web3-proxy/src/frontend/http_proxy.rs +++ b/web3-proxy/src/frontend/http_proxy.rs @@ -8,7 +8,7 @@ pub async fn proxy_web3_rpc( payload: Json, app: Extension>, ) -> impl IntoResponse { - match app.0.proxy_web3_rpc(payload.0).await { + match app.proxy_web3_rpc(payload.0).await { Ok(response) => (StatusCode::OK, Json(&response)).into_response(), Err(err) => handle_anyhow_error(err, None).await.into_response(), }