From 04dc7162502eb33950905a88ac7a75955397c7bb Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 24 Aug 2022 00:59:05 +0000 Subject: [PATCH] move more things into their own files --- web3_proxy/src/app.rs | 111 ++++++++++---------- web3_proxy/src/rpcs/blockchain.rs | 33 +++++- web3_proxy/src/rpcs/connection.rs | 18 ++-- web3_proxy/src/rpcs/connections.rs | 155 ++++------------------------ web3_proxy/src/rpcs/mod.rs | 2 + web3_proxy/src/rpcs/request.rs | 6 +- web3_proxy/src/rpcs/transactions.rs | 110 ++++++++++++++++++++ 7 files changed, 226 insertions(+), 209 deletions(-) create mode 100644 web3_proxy/src/rpcs/transactions.rs diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 2dd59eaf..c61bc472 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -1,12 +1,21 @@ // TODO: this file is way too big now. move things into other modules +use crate::block_number::block_needed; +use crate::config::{AppConfig, TopConfig}; +use crate::jsonrpc::JsonRpcForwardedResponse; +use crate::jsonrpc::JsonRpcForwardedResponseEnum; +use crate::jsonrpc::JsonRpcRequest; +use crate::jsonrpc::JsonRpcRequestEnum; +use crate::rpcs::connections::Web3Connections; +use crate::rpcs::transactions::TxStatus; +use crate::stats::AppStats; use anyhow::Context; use axum::extract::ws::Message; use dashmap::mapref::entry::Entry as DashMapEntry; use dashmap::DashMap; use derive_more::From; use ethers::core::utils::keccak256; -use ethers::prelude::{Address, Block, Bytes, Transaction, TxHash, H256, U64}; +use ethers::prelude::{Address, Block, Bytes, TxHash, H256, U64}; use fifomap::{FifoCountMap, FifoSizedMap}; use futures::future::Abortable; use futures::future::{join_all, AbortHandle}; @@ -35,15 +44,6 @@ use tokio_stream::wrappers::{BroadcastStream, WatchStream}; use tracing::{info, info_span, instrument, trace, warn, Instrument}; use uuid::Uuid; -use crate::block_number::block_needed; -use crate::config::{AppConfig, TopConfig}; -use crate::jsonrpc::JsonRpcForwardedResponse; -use crate::jsonrpc::JsonRpcForwardedResponseEnum; -use crate::jsonrpc::JsonRpcRequest; -use crate::jsonrpc::JsonRpcRequestEnum; -use crate::rpcs::connections::Web3Connections; -use crate::stats::AppStats; - // TODO: make this customizable? static APP_USER_AGENT: &str = concat!( "satoshiandkin/", @@ -52,7 +52,8 @@ static APP_USER_AGENT: &str = concat!( env!("CARGO_PKG_VERSION"), ); -// block hash, method, params +/// block hash, method, params +// TODO: better name type CacheKey = (H256, String, Option); type ResponseLrcCache = RwLock>; @@ -61,14 +62,6 @@ type ActiveRequestsMap = DashMap>; pub type AnyhowJoinHandle = JoinHandle>; -// TODO: think more about TxState -#[derive(Clone)] -pub enum TxState { - Pending(Transaction), - Confirmed(Transaction), - Orphaned(Transaction), -} - #[derive(Clone, Copy, From)] pub struct UserCacheValue { pub expires_at: Instant, @@ -76,6 +69,30 @@ pub struct UserCacheValue { pub user_count_per_period: u64, } +/// The application +// TODO: this debug impl is way too verbose. make something smaller +// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard +pub struct Web3ProxyApp { + /// Send requests to the best server available + pub balanced_rpcs: Arc, + /// Send private requests (like eth_sendRawTransaction) to all these servers + pub private_rpcs: Arc, + /// Track active requests so that we don't send the same query to multiple backends + pub active_requests: ActiveRequestsMap, + response_cache: ResponseLrcCache, + // don't drop this or the sender will stop working + // TODO: broadcast channel instead? + head_block_receiver: watch::Receiver>>, + pending_tx_sender: broadcast::Sender, + pub config: AppConfig, + pub db_conn: Option, + pub pending_transactions: Arc>, + pub rate_limiter: Option, + pub redis_pool: Option, + pub stats: AppStats, + pub user_cache: RwLock>, +} + /// flatten a JoinError into an anyhow error /// Useful when joining multiple futures. pub async fn flatten_handle(handle: AnyhowJoinHandle) -> anyhow::Result { @@ -127,37 +144,6 @@ pub async fn get_migrated_db( Ok(db) } -/// The application -// TODO: this debug impl is way too verbose. make something smaller -// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard -pub struct Web3ProxyApp { - /// Send requests to the best server available - pub balanced_rpcs: Arc, - /// Send private requests (like eth_sendRawTransaction) to all these servers - pub private_rpcs: Arc, - /// Track active requests so that we don't send the same query to multiple backends - pub active_requests: ActiveRequestsMap, - response_cache: ResponseLrcCache, - // don't drop this or the sender will stop working - // TODO: broadcast channel instead? - head_block_receiver: watch::Receiver>>, - pending_tx_sender: broadcast::Sender, - pub config: AppConfig, - pub db_conn: Option, - pub pending_transactions: Arc>, - pub rate_limiter: Option, - pub redis_pool: Option, - pub stats: AppStats, - pub user_cache: RwLock>, -} - -impl fmt::Debug for Web3ProxyApp { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though - f.debug_struct("Web3ProxyApp").finish_non_exhaustive() - } -} - impl Web3ProxyApp { pub async fn redis_conn(&self) -> anyhow::Result> { match self.redis_pool.as_ref() { @@ -405,9 +391,9 @@ impl Web3ProxyApp { tokio::spawn(async move { while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { let new_tx = match new_tx_state { - TxState::Pending(tx) => tx, - TxState::Confirmed(..) => continue, - TxState::Orphaned(tx) => tx, + TxStatus::Pending(tx) => tx, + TxStatus::Confirmed(..) => continue, + TxStatus::Orphaned(tx) => tx, }; // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id @@ -446,9 +432,9 @@ impl Web3ProxyApp { tokio::spawn(async move { while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { let new_tx = match new_tx_state { - TxState::Pending(tx) => tx, - TxState::Confirmed(..) => continue, - TxState::Orphaned(tx) => tx, + TxStatus::Pending(tx) => tx, + TxStatus::Confirmed(..) => continue, + TxStatus::Orphaned(tx) => tx, }; // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id @@ -488,9 +474,9 @@ impl Web3ProxyApp { tokio::spawn(async move { while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { let new_tx = match new_tx_state { - TxState::Pending(tx) => tx, - TxState::Confirmed(..) => continue, - TxState::Orphaned(tx) => tx, + TxStatus::Pending(tx) => tx, + TxStatus::Confirmed(..) => continue, + TxStatus::Orphaned(tx) => tx, }; // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id @@ -916,3 +902,10 @@ impl Web3ProxyApp { Ok(response) } } + +impl fmt::Debug for Web3ProxyApp { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: the default formatter takes forever to write. this is too quiet though + f.debug_struct("Web3ProxyApp").finish_non_exhaustive() + } +} diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 492ad214..ec01ea63 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -2,7 +2,7 @@ use super::connection::Web3Connection; use super::connections::Web3Connections; use super::synced_connections::SyncedConnections; -use crate::app::TxState; +use super::transactions::TxStatus; use crate::jsonrpc::JsonRpcRequest; use anyhow::Context; use ethers::prelude::{Block, TxHash, H256, U256, U64}; @@ -151,13 +151,42 @@ impl Web3Connections { Ok(block) } + // TODO: rename this? + pub(super) async fn update_synced_rpcs( + &self, + block_receiver: flume::Receiver<(Arc>, Arc)>, + // TODO: head_block_sender should be a broadcast_sender like pending_tx_sender + head_block_sender: watch::Sender>>, + pending_tx_sender: Option>, + ) -> anyhow::Result<()> { + // TODO: indexmap or hashmap? what hasher? with_capacity? + // TODO: this will grow unbounded. prune old heads automatically + let mut connection_heads = IndexMap::>>::new(); + + while let Ok((new_block, rpc)) = block_receiver.recv_async().await { + self.recv_block_from_rpc( + &mut connection_heads, + new_block, + rpc, + &head_block_sender, + &pending_tx_sender, + ) + .await?; + } + + // TODO: if there was an error, we should return it + warn!("block_receiver exited!"); + + Ok(()) + } + pub async fn recv_block_from_rpc( &self, connection_heads: &mut IndexMap>>, new_block: Arc>, rpc: Arc, head_block_sender: &watch::Sender>>, - pending_tx_sender: &Option>, + pending_tx_sender: &Option>, ) -> anyhow::Result<()> { let new_block_hash = if let Some(hash) = new_block.hash { hash diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 600fc55a..a20e270c 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -371,8 +371,8 @@ impl Web3Connection { let mut last_hash = H256::zero(); loop { - match self.try_request_handle().await { - Ok(OpenRequestResult::ActiveRequest(active_request_handle)) => { + match self.try_open_request().await { + Ok(OpenRequestResult::Handle(active_request_handle)) => { let block: Result, _> = active_request_handle .request("eth_getBlockByNumber", ("latest", false)) .await; @@ -520,30 +520,30 @@ impl Web3Connection { } /// be careful with this; it will wait forever! + // TODO: maximum wait time? #[instrument(skip_all)] pub async fn wait_for_request_handle(self: &Arc) -> anyhow::Result { // TODO: maximum wait time? i think timeouts in other parts of the code are probably best loop { - match self.try_request_handle().await { - Ok(OpenRequestResult::ActiveRequest(handle)) => return Ok(handle), + match self.try_open_request().await { + Ok(OpenRequestResult::Handle(handle)) => return Ok(handle), Ok(OpenRequestResult::RetryAt(retry_at)) => { // TODO: emit a stat? sleep_until(retry_at).await; } Ok(OpenRequestResult::None) => { - // TODO: when can this happen? emit a stat? - // TODO: instead of erroring, subscribe to the head block on this + // TODO: when can this happen? log? emit a stat? + // TODO: subscribe to the head block on this // TODO: sleep how long? maybe just error? sleep(Duration::from_secs(1)).await; } - // Err(None) => return Err(anyhow::anyhow!("rate limit will never succeed")), Err(err) => return Err(err), } } } - pub async fn try_request_handle(self: &Arc) -> anyhow::Result { + pub async fn try_open_request(self: &Arc) -> anyhow::Result { // check that we are connected if !self.has_provider().await { // TODO: emit a stat? @@ -576,7 +576,7 @@ impl Web3Connection { let handle = OpenRequestHandle::new(self.clone()); - Ok(OpenRequestResult::ActiveRequest(handle)) + Ok(OpenRequestResult::Handle(handle)) } } diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index f33f23a4..d9fc797f 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -2,14 +2,15 @@ use super::connection::Web3Connection; use super::request::{OpenRequestHandle, OpenRequestResult}; use super::synced_connections::SyncedConnections; -use crate::app::{flatten_handle, AnyhowJoinHandle, TxState}; +use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::Web3ConnectionConfig; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; +use crate::rpcs::transactions::TxStatus; use arc_swap::ArcSwap; use counter::Counter; use dashmap::DashMap; use derive_more::From; -use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256, U64}; +use ethers::prelude::{Block, ProviderError, TxHash, H256, U64}; use futures::future::{join_all, try_join_all}; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -26,14 +27,14 @@ use tokio::sync::{broadcast, watch}; use tokio::task; use tokio::time::{interval, sleep, sleep_until, MissedTickBehavior}; use tokio::time::{Duration, Instant}; -use tracing::{debug, error, info, instrument, trace, warn}; +use tracing::{error, info, instrument, trace, warn}; /// A collection of web3 connections. Sends requests either the current best server or all servers. #[derive(From)] pub struct Web3Connections { pub(super) conns: IndexMap>, pub(super) synced_connections: ArcSwap, - pub(super) pending_transactions: Arc>, + pub(super) pending_transactions: Arc>, /// only includes blocks on the main chain. /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? pub(super) chain_map: DashMap>>, @@ -51,8 +52,8 @@ impl Web3Connections { http_client: Option, redis_client_pool: Option, head_block_sender: Option>>>, - pending_tx_sender: Option>, - pending_transactions: Arc>, + pending_tx_sender: Option>, + pending_transactions: Arc>, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (block_sender, block_receiver) = @@ -182,97 +183,6 @@ impl Web3Connections { Ok((connections, handle)) } - async fn _funnel_transaction( - &self, - rpc: Arc, - pending_tx_id: TxHash, - ) -> Result, ProviderError> { - // TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself - // TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!) - // TODO: maximum wait time - let pending_transaction: Transaction = match rpc.try_request_handle().await { - Ok(OpenRequestResult::ActiveRequest(handle)) => { - handle - .request("eth_getTransactionByHash", (pending_tx_id,)) - .await? - } - Ok(_) => { - // TODO: actually retry? - return Ok(None); - } - Err(err) => { - trace!( - ?pending_tx_id, - ?rpc, - ?err, - "cancelled funneling transaction" - ); - return Ok(None); - } - }; - - trace!(?pending_transaction, "pending"); - - match &pending_transaction.block_hash { - Some(_block_hash) => { - // the transaction is already confirmed. no need to save in the pending_transactions map - Ok(Some(TxState::Confirmed(pending_transaction))) - } - None => Ok(Some(TxState::Pending(pending_transaction))), - } - } - - /// dedupe transaction and send them to any listening clients - async fn funnel_transaction( - self: Arc, - rpc: Arc, - pending_tx_id: TxHash, - pending_tx_sender: broadcast::Sender, - ) -> anyhow::Result<()> { - // TODO: how many retries? until some timestamp is hit is probably better. maybe just loop and call this with a timeout - // TODO: after more investigation, i don't think retries will help. i think this is because chains of transactions get dropped from memory - // TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it? - - if pending_tx_sender.receiver_count() == 0 { - // no receivers, so no point in querying to get the full transaction - return Ok(()); - } - - trace!(?pending_tx_id, "checking pending_transactions on {}", rpc); - - if self.pending_transactions.contains_key(&pending_tx_id) { - // this transaction has already been processed - return Ok(()); - } - - // query the rpc for this transaction - // it is possible that another rpc is also being queried. thats fine. we want the fastest response - match self._funnel_transaction(rpc.clone(), pending_tx_id).await { - Ok(Some(tx_state)) => { - let _ = pending_tx_sender.send(tx_state); - - trace!(?pending_tx_id, "sent"); - - // we sent the transaction. return now. don't break looping because that gives a warning - return Ok(()); - } - Ok(None) => {} - Err(err) => { - trace!(?err, ?pending_tx_id, "failed fetching transaction"); - // unable to update the entry. sleep and try again soon - // TODO: retry with exponential backoff with jitter starting from a much smaller time - // sleep(Duration::from_millis(100)).await; - } - } - - // warn is too loud. this is somewhat common - // "There is a Pending txn with a lower account nonce. This txn can only be executed after confirmation of the earlier Txn Hash#" - // sometimes it's been pending for many hours - // sometimes it's maybe something else? - debug!(?pending_tx_id, "not found on {}", rpc); - Ok(()) - } - /// subscribe to blocks and transactions from all the backend rpcs. /// blocks are processed by all the `Web3Connection`s and then sent to the `block_receiver` /// transaction ids from all the `Web3Connection`s are deduplicated and forwarded to `pending_tx_sender` @@ -281,7 +191,7 @@ impl Web3Connections { pending_tx_id_receiver: flume::Receiver<(TxHash, Arc)>, block_receiver: flume::Receiver<(Arc>, Arc)>, head_block_sender: Option>>>, - pending_tx_sender: Option>, + pending_tx_sender: Option>, ) -> anyhow::Result<()> { let mut futures = vec![]; @@ -402,36 +312,6 @@ impl Web3Connections { panic!("i don't think this is possible") } - /// TODO: move parts of this onto SyncedConnections? it needs to be simpler - // we don't instrument here because we put a span inside the while loop - async fn update_synced_rpcs( - &self, - block_receiver: flume::Receiver<(Arc>, Arc)>, - // TODO: head_block_sender should be a broadcast_sender like pending_tx_sender - head_block_sender: watch::Sender>>, - pending_tx_sender: Option>, - ) -> anyhow::Result<()> { - // TODO: indexmap or hashmap? what hasher? with_capacity? - // TODO: this will grow unbounded. prune old heads automatically - let mut connection_heads = IndexMap::>>::new(); - - while let Ok((new_block, rpc)) = block_receiver.recv_async().await { - self.recv_block_from_rpc( - &mut connection_heads, - new_block, - rpc, - &head_block_sender, - &pending_tx_sender, - ) - .await?; - } - - // TODO: if there was an error, we should return it - warn!("block_receiver exited!"); - - Ok(()) - } - /// get the best available rpc server #[instrument(skip_all)] pub async fn next_upstream_server( @@ -468,6 +348,7 @@ impl Web3Connections { return Err(anyhow::anyhow!("not synced")); } + // we sort on a bunch of values. cache them here so that we don't do this math multiple times. let sort_cache: HashMap<_, _> = synced_rpcs .iter() .map(|rpc| { @@ -484,20 +365,20 @@ impl Web3Connections { .collect(); synced_rpcs.sort_unstable_by(|a, b| { - let a_sorts = sort_cache.get(a).unwrap(); - let b_sorts = sort_cache.get(b).unwrap(); + let a_sorts = sort_cache.get(a).expect("sort_cache should always have a"); + let b_sorts = sort_cache.get(b).expect("sort_cache should always have b"); - // TODO: i'm comparing floats. crap + // partial_cmp because we are comparing floats a_sorts.partial_cmp(b_sorts).unwrap_or(cmp::Ordering::Equal) }); // now that the rpcs are sorted, try to get an active request handle for one of them for rpc in synced_rpcs.into_iter() { // increment our connection counter - match rpc.try_request_handle().await { - Ok(OpenRequestResult::ActiveRequest(handle)) => { + match rpc.try_open_request().await { + Ok(OpenRequestResult::Handle(handle)) => { trace!("next server on {:?}: {:?}", self, rpc); - return Ok(OpenRequestResult::ActiveRequest(handle)); + return Ok(OpenRequestResult::Handle(handle)); } Ok(OpenRequestResult::RetryAt(retry_at)) => { earliest_retry_at = earliest_retry_at.min(Some(retry_at)); @@ -539,12 +420,12 @@ impl Web3Connections { } // check rate limits and increment our connection counter - match connection.try_request_handle().await { + match connection.try_open_request().await { Ok(OpenRequestResult::RetryAt(retry_at)) => { // this rpc is not available. skip it earliest_retry_at = earliest_retry_at.min(Some(retry_at)); } - Ok(OpenRequestResult::ActiveRequest(handle)) => selected_rpcs.push(handle), + Ok(OpenRequestResult::Handle(handle)) => selected_rpcs.push(handle), Ok(OpenRequestResult::None) => { warn!("no request handle for {}", connection) } @@ -579,7 +460,7 @@ impl Web3Connections { .next_upstream_server(&skip_rpcs, min_block_needed) .await { - Ok(OpenRequestResult::ActiveRequest(active_request_handle)) => { + Ok(OpenRequestResult::Handle(active_request_handle)) => { // save the rpc in case we get an error and want to retry on another server skip_rpcs.push(active_request_handle.clone_connection()); diff --git a/web3_proxy/src/rpcs/mod.rs b/web3_proxy/src/rpcs/mod.rs index c283a179..9a05f896 100644 --- a/web3_proxy/src/rpcs/mod.rs +++ b/web3_proxy/src/rpcs/mod.rs @@ -1,6 +1,8 @@ +// TODO: all pub, or export useful things here instead? pub mod blockchain; pub mod connection; pub mod connections; pub mod provider; pub mod request; pub mod synced_connections; +pub mod transactions; diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index d67a2656..82190ed6 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -8,12 +8,14 @@ use tracing::{instrument, trace}; // TODO: rename this pub enum OpenRequestResult { - ActiveRequest(OpenRequestHandle), + Handle(OpenRequestHandle), + /// Unable to start a request. Retry at the given time. RetryAt(Instant), + /// Unable to start a request. Retrying will not succeed. None, } -/// Drop this once a connection completes +/// Make RPC requests through this handle and drop it when you are done. pub struct OpenRequestHandle(Arc); impl OpenRequestHandle { diff --git a/web3_proxy/src/rpcs/transactions.rs b/web3_proxy/src/rpcs/transactions.rs new file mode 100644 index 00000000..55b3b4c4 --- /dev/null +++ b/web3_proxy/src/rpcs/transactions.rs @@ -0,0 +1,110 @@ +///! Load balanced communication with a group of web3 providers +use super::connection::Web3Connection; +use super::connections::Web3Connections; +use super::request::OpenRequestResult; +use ethers::prelude::{ProviderError, Transaction, TxHash}; +use std::sync::Arc; +use tokio::sync::broadcast; +use tracing::{debug, trace}; + +// TODO: think more about TxState +#[derive(Clone)] +pub enum TxStatus { + Pending(Transaction), + Confirmed(Transaction), + Orphaned(Transaction), +} + +impl Web3Connections { + async fn query_transaction_status( + &self, + rpc: Arc, + pending_tx_id: TxHash, + ) -> Result, ProviderError> { + // TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!) + // TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself + // TODO: if one rpc fails, try another? + let tx: Transaction = match rpc.try_open_request().await { + Ok(OpenRequestResult::Handle(handle)) => { + handle + .request("eth_getTransactionByHash", (pending_tx_id,)) + .await? + } + Ok(_) => { + // TODO: actually retry? + return Ok(None); + } + Err(err) => { + trace!( + ?pending_tx_id, + ?rpc, + ?err, + "cancelled funneling transaction" + ); + return Ok(None); + } + }; + + match &tx.block_hash { + Some(_block_hash) => { + // the transaction is already confirmed. no need to save in the pending_transactions map + Ok(Some(TxStatus::Confirmed(tx))) + } + None => Ok(Some(TxStatus::Pending(tx))), + } + } + + /// dedupe transaction and send them to any listening clients + pub(super) async fn funnel_transaction( + self: Arc, + rpc: Arc, + pending_tx_id: TxHash, + pending_tx_sender: broadcast::Sender, + ) -> anyhow::Result<()> { + // TODO: how many retries? until some timestamp is hit is probably better. maybe just loop and call this with a timeout + // TODO: after more investigation, i don't think retries will help. i think this is because chains of transactions get dropped from memory + // TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it? + + if pending_tx_sender.receiver_count() == 0 { + // no receivers, so no point in querying to get the full transaction + return Ok(()); + } + + trace!(?pending_tx_id, "checking pending_transactions on {}", rpc); + + if self.pending_transactions.contains_key(&pending_tx_id) { + // this transaction has already been processed + return Ok(()); + } + + // query the rpc for this transaction + // it is possible that another rpc is also being queried. thats fine. we want the fastest response + match self + .query_transaction_status(rpc.clone(), pending_tx_id) + .await + { + Ok(Some(tx_state)) => { + let _ = pending_tx_sender.send(tx_state); + + trace!(?pending_tx_id, "sent"); + + // we sent the transaction. return now. don't break looping because that gives a warning + return Ok(()); + } + Ok(None) => {} + Err(err) => { + trace!(?err, ?pending_tx_id, "failed fetching transaction"); + // unable to update the entry. sleep and try again soon + // TODO: retry with exponential backoff with jitter starting from a much smaller time + // sleep(Duration::from_millis(100)).await; + } + } + + // warn is too loud. this is somewhat common + // "There is a Pending txn with a lower account nonce. This txn can only be executed after confirmation of the earlier Txn Hash#" + // sometimes it's been pending for many hours + // sometimes it's maybe something else? + debug!(?pending_tx_id, "not found on {}", rpc); + Ok(()) + } +}