move more things into their own files

This commit is contained in:
Bryan Stitt 2022-08-24 00:59:05 +00:00
parent 72312a686b
commit 04dc716250
7 changed files with 226 additions and 209 deletions

View File

@ -1,12 +1,21 @@
// TODO: this file is way too big now. move things into other modules
use crate::block_number::block_needed;
use crate::config::{AppConfig, TopConfig};
use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcForwardedResponseEnum;
use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum;
use crate::rpcs::connections::Web3Connections;
use crate::rpcs::transactions::TxStatus;
use crate::stats::AppStats;
use anyhow::Context;
use axum::extract::ws::Message;
use dashmap::mapref::entry::Entry as DashMapEntry;
use dashmap::DashMap;
use derive_more::From;
use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Block, Bytes, Transaction, TxHash, H256, U64};
use ethers::prelude::{Address, Block, Bytes, TxHash, H256, U64};
use fifomap::{FifoCountMap, FifoSizedMap};
use futures::future::Abortable;
use futures::future::{join_all, AbortHandle};
@ -35,15 +44,6 @@ use tokio_stream::wrappers::{BroadcastStream, WatchStream};
use tracing::{info, info_span, instrument, trace, warn, Instrument};
use uuid::Uuid;
use crate::block_number::block_needed;
use crate::config::{AppConfig, TopConfig};
use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcForwardedResponseEnum;
use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum;
use crate::rpcs::connections::Web3Connections;
use crate::stats::AppStats;
// TODO: make this customizable?
static APP_USER_AGENT: &str = concat!(
"satoshiandkin/",
@ -52,7 +52,8 @@ static APP_USER_AGENT: &str = concat!(
env!("CARGO_PKG_VERSION"),
);
// block hash, method, params
/// block hash, method, params
// TODO: better name
type CacheKey = (H256, String, Option<String>);
type ResponseLrcCache = RwLock<FifoSizedMap<CacheKey, JsonRpcForwardedResponse>>;
@ -61,14 +62,6 @@ type ActiveRequestsMap = DashMap<CacheKey, watch::Receiver<bool>>;
pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
// TODO: think more about TxState
#[derive(Clone)]
pub enum TxState {
Pending(Transaction),
Confirmed(Transaction),
Orphaned(Transaction),
}
#[derive(Clone, Copy, From)]
pub struct UserCacheValue {
pub expires_at: Instant,
@ -76,6 +69,30 @@ pub struct UserCacheValue {
pub user_count_per_period: u64,
}
/// The application
// TODO: this debug impl is way too verbose. make something smaller
// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard
pub struct Web3ProxyApp {
/// Send requests to the best server available
pub balanced_rpcs: Arc<Web3Connections>,
/// Send private requests (like eth_sendRawTransaction) to all these servers
pub private_rpcs: Arc<Web3Connections>,
/// Track active requests so that we don't send the same query to multiple backends
pub active_requests: ActiveRequestsMap,
response_cache: ResponseLrcCache,
// don't drop this or the sender will stop working
// TODO: broadcast channel instead?
head_block_receiver: watch::Receiver<Arc<Block<TxHash>>>,
pending_tx_sender: broadcast::Sender<TxStatus>,
pub config: AppConfig,
pub db_conn: Option<sea_orm::DatabaseConnection>,
pub pending_transactions: Arc<DashMap<TxHash, TxStatus>>,
pub rate_limiter: Option<RedisRateLimit>,
pub redis_pool: Option<RedisPool>,
pub stats: AppStats,
pub user_cache: RwLock<FifoCountMap<Uuid, UserCacheValue>>,
}
/// flatten a JoinError into an anyhow error
/// Useful when joining multiple futures.
pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T> {
@ -127,37 +144,6 @@ pub async fn get_migrated_db(
Ok(db)
}
/// The application
// TODO: this debug impl is way too verbose. make something smaller
// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard
pub struct Web3ProxyApp {
/// Send requests to the best server available
pub balanced_rpcs: Arc<Web3Connections>,
/// Send private requests (like eth_sendRawTransaction) to all these servers
pub private_rpcs: Arc<Web3Connections>,
/// Track active requests so that we don't send the same query to multiple backends
pub active_requests: ActiveRequestsMap,
response_cache: ResponseLrcCache,
// don't drop this or the sender will stop working
// TODO: broadcast channel instead?
head_block_receiver: watch::Receiver<Arc<Block<TxHash>>>,
pending_tx_sender: broadcast::Sender<TxState>,
pub config: AppConfig,
pub db_conn: Option<sea_orm::DatabaseConnection>,
pub pending_transactions: Arc<DashMap<TxHash, TxState>>,
pub rate_limiter: Option<RedisRateLimit>,
pub redis_pool: Option<RedisPool>,
pub stats: AppStats,
pub user_cache: RwLock<FifoCountMap<Uuid, UserCacheValue>>,
}
impl fmt::Debug for Web3ProxyApp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("Web3ProxyApp").finish_non_exhaustive()
}
}
impl Web3ProxyApp {
pub async fn redis_conn(&self) -> anyhow::Result<PooledConnection<RedisConnectionManager>> {
match self.redis_pool.as_ref() {
@ -405,9 +391,9 @@ impl Web3ProxyApp {
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let new_tx = match new_tx_state {
TxState::Pending(tx) => tx,
TxState::Confirmed(..) => continue,
TxState::Orphaned(tx) => tx,
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
TxStatus::Orphaned(tx) => tx,
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
@ -446,9 +432,9 @@ impl Web3ProxyApp {
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let new_tx = match new_tx_state {
TxState::Pending(tx) => tx,
TxState::Confirmed(..) => continue,
TxState::Orphaned(tx) => tx,
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
TxStatus::Orphaned(tx) => tx,
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
@ -488,9 +474,9 @@ impl Web3ProxyApp {
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let new_tx = match new_tx_state {
TxState::Pending(tx) => tx,
TxState::Confirmed(..) => continue,
TxState::Orphaned(tx) => tx,
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
TxStatus::Orphaned(tx) => tx,
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
@ -916,3 +902,10 @@ impl Web3ProxyApp {
Ok(response)
}
}
impl fmt::Debug for Web3ProxyApp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("Web3ProxyApp").finish_non_exhaustive()
}
}

View File

@ -2,7 +2,7 @@
use super::connection::Web3Connection;
use super::connections::Web3Connections;
use super::synced_connections::SyncedConnections;
use crate::app::TxState;
use super::transactions::TxStatus;
use crate::jsonrpc::JsonRpcRequest;
use anyhow::Context;
use ethers::prelude::{Block, TxHash, H256, U256, U64};
@ -151,13 +151,42 @@ impl Web3Connections {
Ok(block)
}
// TODO: rename this?
pub(super) async fn update_synced_rpcs(
&self,
block_receiver: flume::Receiver<(Arc<Block<TxHash>>, Arc<Web3Connection>)>,
// TODO: head_block_sender should be a broadcast_sender like pending_tx_sender
head_block_sender: watch::Sender<Arc<Block<TxHash>>>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
// TODO: indexmap or hashmap? what hasher? with_capacity?
// TODO: this will grow unbounded. prune old heads automatically
let mut connection_heads = IndexMap::<String, Arc<Block<TxHash>>>::new();
while let Ok((new_block, rpc)) = block_receiver.recv_async().await {
self.recv_block_from_rpc(
&mut connection_heads,
new_block,
rpc,
&head_block_sender,
&pending_tx_sender,
)
.await?;
}
// TODO: if there was an error, we should return it
warn!("block_receiver exited!");
Ok(())
}
pub async fn recv_block_from_rpc(
&self,
connection_heads: &mut IndexMap<String, Arc<Block<TxHash>>>,
new_block: Arc<Block<TxHash>>,
rpc: Arc<Web3Connection>,
head_block_sender: &watch::Sender<Arc<Block<TxHash>>>,
pending_tx_sender: &Option<broadcast::Sender<TxState>>,
pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
let new_block_hash = if let Some(hash) = new_block.hash {
hash

View File

@ -371,8 +371,8 @@ impl Web3Connection {
let mut last_hash = H256::zero();
loop {
match self.try_request_handle().await {
Ok(OpenRequestResult::ActiveRequest(active_request_handle)) => {
match self.try_open_request().await {
Ok(OpenRequestResult::Handle(active_request_handle)) => {
let block: Result<Block<TxHash>, _> = active_request_handle
.request("eth_getBlockByNumber", ("latest", false))
.await;
@ -520,30 +520,30 @@ impl Web3Connection {
}
/// be careful with this; it will wait forever!
// TODO: maximum wait time?
#[instrument(skip_all)]
pub async fn wait_for_request_handle(self: &Arc<Self>) -> anyhow::Result<OpenRequestHandle> {
// TODO: maximum wait time? i think timeouts in other parts of the code are probably best
loop {
match self.try_request_handle().await {
Ok(OpenRequestResult::ActiveRequest(handle)) => return Ok(handle),
match self.try_open_request().await {
Ok(OpenRequestResult::Handle(handle)) => return Ok(handle),
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// TODO: emit a stat?
sleep_until(retry_at).await;
}
Ok(OpenRequestResult::None) => {
// TODO: when can this happen? emit a stat?
// TODO: instead of erroring, subscribe to the head block on this
// TODO: when can this happen? log? emit a stat?
// TODO: subscribe to the head block on this
// TODO: sleep how long? maybe just error?
sleep(Duration::from_secs(1)).await;
}
// Err(None) => return Err(anyhow::anyhow!("rate limit will never succeed")),
Err(err) => return Err(err),
}
}
}
pub async fn try_request_handle(self: &Arc<Self>) -> anyhow::Result<OpenRequestResult> {
pub async fn try_open_request(self: &Arc<Self>) -> anyhow::Result<OpenRequestResult> {
// check that we are connected
if !self.has_provider().await {
// TODO: emit a stat?
@ -576,7 +576,7 @@ impl Web3Connection {
let handle = OpenRequestHandle::new(self.clone());
Ok(OpenRequestResult::ActiveRequest(handle))
Ok(OpenRequestResult::Handle(handle))
}
}

View File

@ -2,14 +2,15 @@
use super::connection::Web3Connection;
use super::request::{OpenRequestHandle, OpenRequestResult};
use super::synced_connections::SyncedConnections;
use crate::app::{flatten_handle, AnyhowJoinHandle, TxState};
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::Web3ConnectionConfig;
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
use crate::rpcs::transactions::TxStatus;
use arc_swap::ArcSwap;
use counter::Counter;
use dashmap::DashMap;
use derive_more::From;
use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256, U64};
use ethers::prelude::{Block, ProviderError, TxHash, H256, U64};
use futures::future::{join_all, try_join_all};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
@ -26,14 +27,14 @@ use tokio::sync::{broadcast, watch};
use tokio::task;
use tokio::time::{interval, sleep, sleep_until, MissedTickBehavior};
use tokio::time::{Duration, Instant};
use tracing::{debug, error, info, instrument, trace, warn};
use tracing::{error, info, instrument, trace, warn};
/// A collection of web3 connections. Sends requests either the current best server or all servers.
#[derive(From)]
pub struct Web3Connections {
pub(super) conns: IndexMap<String, Arc<Web3Connection>>,
pub(super) synced_connections: ArcSwap<SyncedConnections>,
pub(super) pending_transactions: Arc<DashMap<TxHash, TxState>>,
pub(super) pending_transactions: Arc<DashMap<TxHash, TxStatus>>,
/// only includes blocks on the main chain.
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
pub(super) chain_map: DashMap<U64, Arc<Block<TxHash>>>,
@ -51,8 +52,8 @@ impl Web3Connections {
http_client: Option<reqwest::Client>,
redis_client_pool: Option<redis_rate_limit::RedisPool>,
head_block_sender: Option<watch::Sender<Arc<Block<TxHash>>>>,
pending_tx_sender: Option<broadcast::Sender<TxState>>,
pending_transactions: Arc<DashMap<TxHash, TxState>>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
pending_transactions: Arc<DashMap<TxHash, TxStatus>>,
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) =
@ -182,97 +183,6 @@ impl Web3Connections {
Ok((connections, handle))
}
async fn _funnel_transaction(
&self,
rpc: Arc<Web3Connection>,
pending_tx_id: TxHash,
) -> Result<Option<TxState>, ProviderError> {
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
// TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!)
// TODO: maximum wait time
let pending_transaction: Transaction = match rpc.try_request_handle().await {
Ok(OpenRequestResult::ActiveRequest(handle)) => {
handle
.request("eth_getTransactionByHash", (pending_tx_id,))
.await?
}
Ok(_) => {
// TODO: actually retry?
return Ok(None);
}
Err(err) => {
trace!(
?pending_tx_id,
?rpc,
?err,
"cancelled funneling transaction"
);
return Ok(None);
}
};
trace!(?pending_transaction, "pending");
match &pending_transaction.block_hash {
Some(_block_hash) => {
// the transaction is already confirmed. no need to save in the pending_transactions map
Ok(Some(TxState::Confirmed(pending_transaction)))
}
None => Ok(Some(TxState::Pending(pending_transaction))),
}
}
/// dedupe transaction and send them to any listening clients
async fn funnel_transaction(
self: Arc<Self>,
rpc: Arc<Web3Connection>,
pending_tx_id: TxHash,
pending_tx_sender: broadcast::Sender<TxState>,
) -> anyhow::Result<()> {
// TODO: how many retries? until some timestamp is hit is probably better. maybe just loop and call this with a timeout
// TODO: after more investigation, i don't think retries will help. i think this is because chains of transactions get dropped from memory
// TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it?
if pending_tx_sender.receiver_count() == 0 {
// no receivers, so no point in querying to get the full transaction
return Ok(());
}
trace!(?pending_tx_id, "checking pending_transactions on {}", rpc);
if self.pending_transactions.contains_key(&pending_tx_id) {
// this transaction has already been processed
return Ok(());
}
// query the rpc for this transaction
// it is possible that another rpc is also being queried. thats fine. we want the fastest response
match self._funnel_transaction(rpc.clone(), pending_tx_id).await {
Ok(Some(tx_state)) => {
let _ = pending_tx_sender.send(tx_state);
trace!(?pending_tx_id, "sent");
// we sent the transaction. return now. don't break looping because that gives a warning
return Ok(());
}
Ok(None) => {}
Err(err) => {
trace!(?err, ?pending_tx_id, "failed fetching transaction");
// unable to update the entry. sleep and try again soon
// TODO: retry with exponential backoff with jitter starting from a much smaller time
// sleep(Duration::from_millis(100)).await;
}
}
// warn is too loud. this is somewhat common
// "There is a Pending txn with a lower account nonce. This txn can only be executed after confirmation of the earlier Txn Hash#"
// sometimes it's been pending for many hours
// sometimes it's maybe something else?
debug!(?pending_tx_id, "not found on {}", rpc);
Ok(())
}
/// subscribe to blocks and transactions from all the backend rpcs.
/// blocks are processed by all the `Web3Connection`s and then sent to the `block_receiver`
/// transaction ids from all the `Web3Connection`s are deduplicated and forwarded to `pending_tx_sender`
@ -281,7 +191,7 @@ impl Web3Connections {
pending_tx_id_receiver: flume::Receiver<(TxHash, Arc<Web3Connection>)>,
block_receiver: flume::Receiver<(Arc<Block<TxHash>>, Arc<Web3Connection>)>,
head_block_sender: Option<watch::Sender<Arc<Block<TxHash>>>>,
pending_tx_sender: Option<broadcast::Sender<TxState>>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
let mut futures = vec![];
@ -402,36 +312,6 @@ impl Web3Connections {
panic!("i don't think this is possible")
}
/// TODO: move parts of this onto SyncedConnections? it needs to be simpler
// we don't instrument here because we put a span inside the while loop
async fn update_synced_rpcs(
&self,
block_receiver: flume::Receiver<(Arc<Block<TxHash>>, Arc<Web3Connection>)>,
// TODO: head_block_sender should be a broadcast_sender like pending_tx_sender
head_block_sender: watch::Sender<Arc<Block<TxHash>>>,
pending_tx_sender: Option<broadcast::Sender<TxState>>,
) -> anyhow::Result<()> {
// TODO: indexmap or hashmap? what hasher? with_capacity?
// TODO: this will grow unbounded. prune old heads automatically
let mut connection_heads = IndexMap::<String, Arc<Block<TxHash>>>::new();
while let Ok((new_block, rpc)) = block_receiver.recv_async().await {
self.recv_block_from_rpc(
&mut connection_heads,
new_block,
rpc,
&head_block_sender,
&pending_tx_sender,
)
.await?;
}
// TODO: if there was an error, we should return it
warn!("block_receiver exited!");
Ok(())
}
/// get the best available rpc server
#[instrument(skip_all)]
pub async fn next_upstream_server(
@ -468,6 +348,7 @@ impl Web3Connections {
return Err(anyhow::anyhow!("not synced"));
}
// we sort on a bunch of values. cache them here so that we don't do this math multiple times.
let sort_cache: HashMap<_, _> = synced_rpcs
.iter()
.map(|rpc| {
@ -484,20 +365,20 @@ impl Web3Connections {
.collect();
synced_rpcs.sort_unstable_by(|a, b| {
let a_sorts = sort_cache.get(a).unwrap();
let b_sorts = sort_cache.get(b).unwrap();
let a_sorts = sort_cache.get(a).expect("sort_cache should always have a");
let b_sorts = sort_cache.get(b).expect("sort_cache should always have b");
// TODO: i'm comparing floats. crap
// partial_cmp because we are comparing floats
a_sorts.partial_cmp(b_sorts).unwrap_or(cmp::Ordering::Equal)
});
// now that the rpcs are sorted, try to get an active request handle for one of them
for rpc in synced_rpcs.into_iter() {
// increment our connection counter
match rpc.try_request_handle().await {
Ok(OpenRequestResult::ActiveRequest(handle)) => {
match rpc.try_open_request().await {
Ok(OpenRequestResult::Handle(handle)) => {
trace!("next server on {:?}: {:?}", self, rpc);
return Ok(OpenRequestResult::ActiveRequest(handle));
return Ok(OpenRequestResult::Handle(handle));
}
Ok(OpenRequestResult::RetryAt(retry_at)) => {
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
@ -539,12 +420,12 @@ impl Web3Connections {
}
// check rate limits and increment our connection counter
match connection.try_request_handle().await {
match connection.try_open_request().await {
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// this rpc is not available. skip it
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
}
Ok(OpenRequestResult::ActiveRequest(handle)) => selected_rpcs.push(handle),
Ok(OpenRequestResult::Handle(handle)) => selected_rpcs.push(handle),
Ok(OpenRequestResult::None) => {
warn!("no request handle for {}", connection)
}
@ -579,7 +460,7 @@ impl Web3Connections {
.next_upstream_server(&skip_rpcs, min_block_needed)
.await
{
Ok(OpenRequestResult::ActiveRequest(active_request_handle)) => {
Ok(OpenRequestResult::Handle(active_request_handle)) => {
// save the rpc in case we get an error and want to retry on another server
skip_rpcs.push(active_request_handle.clone_connection());

View File

@ -1,6 +1,8 @@
// TODO: all pub, or export useful things here instead?
pub mod blockchain;
pub mod connection;
pub mod connections;
pub mod provider;
pub mod request;
pub mod synced_connections;
pub mod transactions;

View File

@ -8,12 +8,14 @@ use tracing::{instrument, trace};
// TODO: rename this
pub enum OpenRequestResult {
ActiveRequest(OpenRequestHandle),
Handle(OpenRequestHandle),
/// Unable to start a request. Retry at the given time.
RetryAt(Instant),
/// Unable to start a request. Retrying will not succeed.
None,
}
/// Drop this once a connection completes
/// Make RPC requests through this handle and drop it when you are done.
pub struct OpenRequestHandle(Arc<Web3Connection>);
impl OpenRequestHandle {

View File

@ -0,0 +1,110 @@
///! Load balanced communication with a group of web3 providers
use super::connection::Web3Connection;
use super::connections::Web3Connections;
use super::request::OpenRequestResult;
use ethers::prelude::{ProviderError, Transaction, TxHash};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{debug, trace};
// TODO: think more about TxState
#[derive(Clone)]
pub enum TxStatus {
Pending(Transaction),
Confirmed(Transaction),
Orphaned(Transaction),
}
impl Web3Connections {
async fn query_transaction_status(
&self,
rpc: Arc<Web3Connection>,
pending_tx_id: TxHash,
) -> Result<Option<TxStatus>, ProviderError> {
// TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!)
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
// TODO: if one rpc fails, try another?
let tx: Transaction = match rpc.try_open_request().await {
Ok(OpenRequestResult::Handle(handle)) => {
handle
.request("eth_getTransactionByHash", (pending_tx_id,))
.await?
}
Ok(_) => {
// TODO: actually retry?
return Ok(None);
}
Err(err) => {
trace!(
?pending_tx_id,
?rpc,
?err,
"cancelled funneling transaction"
);
return Ok(None);
}
};
match &tx.block_hash {
Some(_block_hash) => {
// the transaction is already confirmed. no need to save in the pending_transactions map
Ok(Some(TxStatus::Confirmed(tx)))
}
None => Ok(Some(TxStatus::Pending(tx))),
}
}
/// dedupe transaction and send them to any listening clients
pub(super) async fn funnel_transaction(
self: Arc<Self>,
rpc: Arc<Web3Connection>,
pending_tx_id: TxHash,
pending_tx_sender: broadcast::Sender<TxStatus>,
) -> anyhow::Result<()> {
// TODO: how many retries? until some timestamp is hit is probably better. maybe just loop and call this with a timeout
// TODO: after more investigation, i don't think retries will help. i think this is because chains of transactions get dropped from memory
// TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it?
if pending_tx_sender.receiver_count() == 0 {
// no receivers, so no point in querying to get the full transaction
return Ok(());
}
trace!(?pending_tx_id, "checking pending_transactions on {}", rpc);
if self.pending_transactions.contains_key(&pending_tx_id) {
// this transaction has already been processed
return Ok(());
}
// query the rpc for this transaction
// it is possible that another rpc is also being queried. thats fine. we want the fastest response
match self
.query_transaction_status(rpc.clone(), pending_tx_id)
.await
{
Ok(Some(tx_state)) => {
let _ = pending_tx_sender.send(tx_state);
trace!(?pending_tx_id, "sent");
// we sent the transaction. return now. don't break looping because that gives a warning
return Ok(());
}
Ok(None) => {}
Err(err) => {
trace!(?err, ?pending_tx_id, "failed fetching transaction");
// unable to update the entry. sleep and try again soon
// TODO: retry with exponential backoff with jitter starting from a much smaller time
// sleep(Duration::from_millis(100)).await;
}
}
// warn is too loud. this is somewhat common
// "There is a Pending txn with a lower account nonce. This txn can only be executed after confirmation of the earlier Txn Hash#"
// sometimes it's been pending for many hours
// sometimes it's maybe something else?
debug!(?pending_tx_id, "not found on {}", rpc);
Ok(())
}
}