diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index ec1d5802..462b5a59 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -21,7 +21,6 @@ use crate::rpcs::consensus::RankedRpcs; use crate::rpcs::many::Web3Rpcs; use crate::rpcs::one::Web3Rpc; use crate::rpcs::provider::{connect_http, EthersHttpProvider}; -use crate::rpcs::transactions::TxStatus; use crate::stats::{AppStat, FlushedStats, StatBuffer}; use anyhow::Context; use axum::http::StatusCode; @@ -29,7 +28,7 @@ use chrono::Utc; use deferred_rate_limiter::DeferredRateLimiter; use entities::user; use ethers::core::utils::keccak256; -use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U64}; +use ethers::prelude::{Address, Bytes, Transaction, H256, U64}; use ethers::types::U256; use ethers::utils::rlp::{Decodable, Rlp}; use futures::future::join_all; @@ -87,9 +86,6 @@ pub struct Web3ProxyApp { /// don't drop this or the sender will stop working /// TODO: broadcast channel instead? pub watch_consensus_head_receiver: watch::Receiver>, - /// rpc clients that subscribe to pendingTransactions use this channel - /// This is the Sender so that new channels can subscribe to it - pending_tx_sender: broadcast::Sender, /// Optional database for users and accounting pub db_conn: Option, /// Optional read-only database for users and accounting @@ -107,9 +103,6 @@ pub struct Web3ProxyApp { /// rate limit the login endpoint /// we do this because each pending login is a row in the database pub login_rate_limiter: Option, - /// store pending transactions that we've seen so that we don't send duplicates to subscribers - /// TODO: think about this more. might be worth storing if we sent the transaction or not and using this for automatic retries - pub pending_transactions: Cache, /// Send private requests (like eth_sendRawTransaction) to all these servers /// TODO: include another type so that we can use private miner relays that do not use JSONRPC requests pub private_rpcs: Option>, @@ -453,25 +446,6 @@ impl Web3ProxyApp { } let (watch_consensus_head_sender, watch_consensus_head_receiver) = watch::channel(None); - // TODO: will one receiver lagging be okay? how big should this be? - let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256); - - // TODO: use this? it could listen for confirmed transactions and then clear pending_transactions, but the head_block_sender is doing that - // TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them? - // TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks. - // TODO: we should still have some sort of expiration or maximum size limit for the map - drop(pending_tx_receiver); - - // TODO: capacity from configs - // all these are the same size, so no need for a weigher - // TODO: this used to have a time_to_idle - // TODO: different chains might handle this differently - // TODO: what should we set? 5 minutes is arbitrary. the nodes themselves hold onto transactions for much longer - // TODO: this used to be time_to_update, but - let pending_transactions = CacheBuilder::new(10_000) - .name("pending_transactions") - .time_to_live(Duration::from_secs(300)) - .build(); // responses can be very different in sizes, so this is a cache with a max capacity and a weigher // TODO: we should emit stats to calculate a more accurate expected cache size @@ -499,13 +473,10 @@ impl Web3ProxyApp { let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn( chain_id, - db_conn.clone(), top_config.app.max_head_block_lag, top_config.app.min_synced_rpcs, top_config.app.min_sum_soft_limit, "balanced rpcs".to_string(), - pending_transactions.clone(), - Some(pending_tx_sender.clone()), Some(watch_consensus_head_sender), ) .await @@ -520,19 +491,13 @@ impl Web3ProxyApp { None } else { // TODO: do something with the spawn handle - // TODO: Merge - // let (private_rpcs, private_rpcs_handle) = Web3Rpcs::spawn( let (private_rpcs, private_handle, _) = Web3Rpcs::spawn( chain_id, - db_conn.clone(), // private rpcs don't get subscriptions, so no need for max_head_block_lag None, 0, 0, "protected rpcs".to_string(), - pending_transactions.clone(), - // TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits, but they should have - None, // subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs // they also often have low rate limits // however, they are well connected to miners/validators. so maybe using them as a safety check would be good @@ -556,14 +521,11 @@ impl Web3ProxyApp { // TODO: do something with the spawn handle let (bundler_4337_rpcs, bundler_4337_rpcs_handle, _) = Web3Rpcs::spawn( chain_id, - db_conn.clone(), // bundler_4337_rpcs don't get subscriptions, so no need for max_head_block_lag None, 0, 0, "eip4337 rpcs".to_string(), - pending_transactions.clone(), - None, None, ) .await @@ -595,8 +557,6 @@ impl Web3ProxyApp { jsonrpc_response_cache, kafka_producer, login_rate_limiter, - pending_transactions, - pending_tx_sender, private_rpcs, prometheus_port: prometheus_port.clone(), rpc_secret_key_cache, @@ -1184,8 +1144,6 @@ impl Web3ProxyApp { // TODO: don't clone into a new string? let request_method = method.to_string(); - let authorization = request_metadata.authorization.clone().unwrap_or_default(); - // TODO: serve net_version without querying the backend // TODO: don't force RawValue let response_data: JsonRpcResponseEnum> = match request_method.as_ref() { @@ -1608,7 +1566,6 @@ impl Web3ProxyApp { // TODO: add a stat for archive vs full since they should probably cost different // TODO: this cache key can be rather large. is that okay? let cache_key: Option = match CacheMode::new( - &authorization, method, params, &head_block, diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index ab58b6aa..830059b8 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -6,7 +6,6 @@ use crate::frontend::authorization::{Authorization, RequestMetadata, RequestOrMe use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcRequest; use crate::response_cache::JsonRpcResponseEnum; -use crate::rpcs::transactions::TxStatus; use axum::extract::ws::{CloseFrame, Message}; use deferred_rate_limiter::DeferredRateLimitResult; use ethers::types::U64; @@ -19,7 +18,7 @@ use std::sync::atomic::{self, AtomicU64}; use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::Instant; -use tokio_stream::wrappers::{BroadcastStream, WatchStream}; +use tokio_stream::wrappers::WatchStream; use tracing::{error, trace}; impl Web3ProxyApp { @@ -124,218 +123,8 @@ impl Web3ProxyApp { trace!("closed newHeads subscription {:?}", subscription_id); }); - } else if subscribe_to == "newPendingTransactions" { - let pending_tx_receiver = self.pending_tx_sender.subscribe(); - let app = self.clone(); - - let mut pending_tx_receiver = Abortable::new( - BroadcastStream::new(pending_tx_receiver), - subscription_registration, - ); - - trace!( - "pending newPendingTransactions subscription id: {:?}", - subscription_id - ); - - tokio::spawn(async move { - while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { - let subscription_request_metadata = RequestMetadata::new( - &app, - authorization.clone(), - RequestOrMethod::Method("eth_subscribe(newPendingTransactions)", 0), - None, - ) - .await; - - if let Some(close_message) = app - .rate_limit_close_websocket(&subscription_request_metadata) - .await - { - let _ = response_sender.send(close_message); - break; - } - - let new_tx = match new_tx_state { - TxStatus::Pending(tx) => tx, - TxStatus::Confirmed(..) => continue, - TxStatus::Orphaned(tx) => tx, - }; - - // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id - let response_json = json!({ - "jsonrpc": "2.0", - "method": "eth_subscription", - "params": { - "subscription": subscription_id, - "result": new_tx.hash, - }, - }); - - let response_str = serde_json::to_string(&response_json) - .expect("this should always be valid json"); - - // TODO: test that this len is the same as JsonRpcForwardedResponseEnum.num_bytes() - let response_bytes = response_str.len(); - - subscription_request_metadata.add_response(response_bytes); - - // TODO: do clients support binary messages? reply with binary if thats what we were sent - let response_msg = Message::Text(response_str); - - if response_sender.send(response_msg).is_err() { - // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? - break; - }; - } - - trace!( - "closed newPendingTransactions subscription: {:?}", - subscription_id - ); - }); - } else if subscribe_to == "newPendingFullTransactions" { - // TODO: too much copy/pasta with newPendingTransactions - let pending_tx_receiver = self.pending_tx_sender.subscribe(); - let app = self.clone(); - - let mut pending_tx_receiver = Abortable::new( - BroadcastStream::new(pending_tx_receiver), - subscription_registration, - ); - - trace!( - "pending newPendingFullTransactions subscription: {:?}", - subscription_id - ); - - tokio::spawn(async move { - while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { - let subscription_request_metadata = RequestMetadata::new( - &app, - authorization.clone(), - RequestOrMethod::Method("eth_subscribe(newPendingFullTransactions)", 0), - None, - ) - .await; - - if let Some(close_message) = app - .rate_limit_close_websocket(&subscription_request_metadata) - .await - { - let _ = response_sender.send(close_message); - break; - } - - let new_tx = match new_tx_state { - TxStatus::Pending(tx) => tx, - TxStatus::Confirmed(..) => continue, - TxStatus::Orphaned(tx) => tx, - }; - - // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id - let response_json = json!({ - "jsonrpc": "2.0", - "method": "eth_subscription", - "params": { - "subscription": subscription_id, - // upstream just sends the txid, but we want to send the whole transaction - "result": new_tx, - }, - }); - - subscription_request_metadata.add_response(&response_json); - - let response_str = serde_json::to_string(&response_json) - .expect("this should always be valid json"); - - // TODO: do clients support binary messages? - let response_msg = Message::Text(response_str); - - if response_sender.send(response_msg).is_err() { - // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? - break; - }; - } - - trace!( - "closed newPendingFullTransactions subscription: {:?}", - subscription_id - ); - }); - } else if subscribe_to == "newPendingRawTransactions" { - // TODO: too much copy/pasta with newPendingTransactions - let pending_tx_receiver = self.pending_tx_sender.subscribe(); - let app = self.clone(); - - let mut pending_tx_receiver = Abortable::new( - BroadcastStream::new(pending_tx_receiver), - subscription_registration, - ); - - trace!( - "pending transactions subscription id: {:?}", - subscription_id - ); - - tokio::spawn(async move { - while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { - let subscription_request_metadata = RequestMetadata::new( - &app, - authorization.clone(), - "eth_subscribe(newPendingRawTransactions)", - None, - ) - .await; - - if let Some(close_message) = app - .rate_limit_close_websocket(&subscription_request_metadata) - .await - { - let _ = response_sender.send(close_message); - break; - } - - let new_tx = match new_tx_state { - TxStatus::Pending(tx) => tx, - TxStatus::Confirmed(..) => continue, - TxStatus::Orphaned(tx) => tx, - }; - - // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id - let response_json = json!({ - "jsonrpc": "2.0", - "method": "eth_subscription", - "params": { - "subscription": subscription_id, - // upstream just sends the txid, but we want to send the raw transaction - "result": new_tx.rlp(), - }, - }); - - let response_str = serde_json::to_string(&response_json) - .expect("this should always be valid json"); - - // we could use response.num_bytes() here, but since we already have the string, this is easier - let response_bytes = response_str.len(); - - // TODO: do clients support binary messages? - let response_msg = Message::Text(response_str); - - if response_sender.send(response_msg).is_err() { - // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? - break; - }; - - subscription_request_metadata.add_response(response_bytes); - } - - trace!( - "closed newPendingRawTransactions subscription: {:?}", - subscription_id - ); - }); } else { + // TODO: make sure this gets a CU cost of unimplemented instead of the normal eth_subscribe cost? return Err(Web3ProxyError::NotImplemented( subscribe_to.to_owned().into(), )); diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index f363fa3b..24641529 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -1,4 +1,5 @@ //! Helper functions for turning ether's BlockNumber into numbers and updating incoming queries to match. +use crate::rpcs::many::Web3Rpcs; use crate::{ errors::{Web3ProxyError, Web3ProxyResult}, rpcs::blockchain::Web3ProxyBlock, @@ -10,11 +11,8 @@ use ethers::{ types::H256, }; use serde_json::json; -use std::sync::Arc; use tracing::{error, trace, warn}; -use crate::{frontend::authorization::Authorization, rpcs::many::Web3Rpcs}; - #[allow(non_snake_case)] pub fn BlockNumber_to_U64(block_num: BlockNumber, latest_block: &U64) -> (U64, bool) { match block_num { @@ -67,7 +65,6 @@ impl From<&Web3ProxyBlock> for BlockNumAndHash { /// modify params to always have a block hash and not "latest" /// TODO: this should replace all block numbers with hashes, not just "latest" pub async fn clean_block_number( - authorization: &Arc, params: &mut serde_json::Value, block_param_id: usize, latest_block: &Web3ProxyBlock, @@ -101,7 +98,7 @@ pub async fn clean_block_number( serde_json::from_value(block_hash).context("decoding blockHash")?; let block = rpcs - .block(authorization, &block_hash, None, Some(3), None) + .block(&block_hash, None, Some(3), None) .await .context("fetching block number from hash")?; @@ -115,12 +112,12 @@ pub async fn clean_block_number( // TODO: move this to a helper function? if let Ok(block_num) = serde_json::from_value::(x.clone()) { let (block_hash, _) = rpcs - .block_hash(authorization, &block_num) + .block_hash(&block_num) .await .context("fetching block hash from number")?; let block = rpcs - .block(authorization, &block_hash, None, Some(3), None) + .block(&block_hash, None, Some(3), None) .await .context("fetching block from hash")?; @@ -136,12 +133,12 @@ pub async fn clean_block_number( (latest_block.into(), change) } else { let (block_hash, _) = rpcs - .block_hash(authorization, &block_num) + .block_hash(&block_num) .await .context("fetching block hash from number")?; let block = rpcs - .block(authorization, &block_hash, None, Some(3), None) + .block(&block_hash, None, Some(3), None) .await .context("fetching block from hash")?; @@ -149,7 +146,7 @@ pub async fn clean_block_number( } } else if let Ok(block_hash) = serde_json::from_value::(x.clone()) { let block = rpcs - .block(authorization, &block_hash, None, Some(3), None) + .block(&block_hash, None, Some(3), None) .await .context("fetching block number from hash")?; @@ -193,13 +190,12 @@ pub enum CacheMode { impl CacheMode { pub async fn new( - authorization: &Arc, method: &str, params: &mut serde_json::Value, head_block: &Web3ProxyBlock, rpcs: &Web3Rpcs, ) -> Self { - match Self::try_new(authorization, method, params, head_block, rpcs).await { + match Self::try_new(method, params, head_block, rpcs).await { Ok(x) => x, Err(err) => { warn!(?err, "unable to determine cache mode from params"); @@ -209,7 +205,6 @@ impl CacheMode { } pub async fn try_new( - authorization: &Arc, method: &str, params: &mut serde_json::Value, head_block: &Web3ProxyBlock, @@ -284,7 +279,7 @@ impl CacheMode { *x = json!(block_num); } - let (block_hash, _) = rpcs.block_hash(authorization, &block_num).await?; + let (block_hash, _) = rpcs.block_hash(&block_num).await?; BlockNumAndHash(block_num, block_hash) } else { @@ -304,7 +299,7 @@ impl CacheMode { *x = json!(block_num); } - let (block_hash, _) = rpcs.block_hash(authorization, &block_num).await?; + let (block_hash, _) = rpcs.block_hash(&block_num).await?; BlockNumAndHash(block_num, block_hash) } else { @@ -366,7 +361,7 @@ impl CacheMode { } }; - match clean_block_number(authorization, params, block_param_id, head_block, rpcs).await { + match clean_block_number(params, block_param_id, head_block, rpcs).await { Ok(block) => Ok(CacheMode::Cache { block, cache_errors: true, diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 56bdc17f..7d51b8cb 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -284,7 +284,6 @@ impl Web3RpcConfig { blocks_by_hash_cache: BlocksByHashCache, block_sender: Option>, max_head_block_age: Duration, - tx_id_sender: Option>, ) -> anyhow::Result<(Arc, Web3ProxyJoinHandle<()>)> { if !self.extra.is_empty() { warn!(extra=?self.extra.keys(), "unknown Web3RpcConfig fields!"); @@ -301,7 +300,6 @@ impl Web3RpcConfig { blocks_by_hash_cache, block_sender, max_head_block_age, - tx_id_sender, ) .await } diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index aa7b19b7..377af889 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -2,10 +2,8 @@ use super::consensus::ConsensusFinder; use super::many::Web3Rpcs; use super::one::Web3Rpc; -use super::transactions::TxStatus; use crate::config::{average_block_interval, BlockAndRpc}; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; -use crate::frontend::authorization::Authorization; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; use moka::future::Cache; @@ -15,7 +13,7 @@ use serde_json::json; use std::hash::Hash; use std::time::Duration; use std::{fmt::Display, sync::Arc}; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::mpsc; use tokio::time::timeout; use tracing::{debug, error, warn}; @@ -267,7 +265,6 @@ impl Web3Rpcs { /// Will query a specific node or the best available. pub async fn block( &self, - authorization: &Arc, hash: &H256, rpc: Option<&Arc>, max_tries: Option, @@ -301,11 +298,10 @@ impl Web3Rpcs { let mut block: Option = if let Some(rpc) = rpc { // ask a specific rpc - // TODO: request_with_metadata would probably be better than authorized_request - rpc.authorized_request::<_, Option>( + // this doesn't have retries, so we do retries with `self.internal_request` below (note the "self" vs "rpc") + rpc.internal_request::<_, Option>( "eth_getBlockByHash", &get_block_params, - authorization, None, max_tries, max_wait, @@ -339,12 +335,8 @@ impl Web3Rpcs { } /// Convenience method to get the cannonical block at a given block height. - pub async fn block_hash( - &self, - authorization: &Arc, - num: &U64, - ) -> Web3ProxyResult<(H256, u64)> { - let (block, block_depth) = self.cannonical_block(authorization, num).await?; + pub async fn block_hash(&self, num: &U64) -> Web3ProxyResult<(H256, u64)> { + let (block, block_depth) = self.cannonical_block(num).await?; let hash = *block.hash(); @@ -353,11 +345,7 @@ impl Web3Rpcs { /// Get the heaviest chain's block from cache or backend rpc /// Caution! If a future block is requested, this might wait forever. Be sure to have a timeout outside of this! - pub async fn cannonical_block( - &self, - authorization: &Arc, - num: &U64, - ) -> Web3ProxyResult<(Web3ProxyBlock, u64)> { + pub async fn cannonical_block(&self, num: &U64) -> Web3ProxyResult<(Web3ProxyBlock, u64)> { // we only have blocks by hash now // maybe save them during save_block in a blocks_by_number Cache> // if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations) @@ -376,17 +364,23 @@ impl Web3Rpcs { .web3_context("no consensus head block")? .number(); - loop { - if num <= &head_block_num { - break; + if *num > head_block_num { + // if num is too far in the future, error now + if *num - head_block_num > self.max_head_block_lag { + return Err(Web3ProxyError::UnknownBlockNumber { + known: head_block_num, + unknown: *num, + }); } - debug!(%head_block_num, %num, "waiting for future block"); + while *num > head_block_num { + debug!(%head_block_num, %num, "waiting for future block"); - consensus_head_receiver.changed().await?; + consensus_head_receiver.changed().await?; - if let Some(head) = consensus_head_receiver.borrow_and_update().as_ref() { - head_block_num = *head.number(); + if let Some(head) = consensus_head_receiver.borrow_and_update().as_ref() { + head_block_num = *head.number(); + } } } @@ -397,9 +391,7 @@ impl Web3Rpcs { if let Some(block_hash) = self.blocks_by_number.get(num) { // TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set // TODO: configurable max wait and rpc - let block = self - .block(authorization, &block_hash, None, Some(3), None) - .await?; + let block = self.block(&block_hash, None, Some(3), None).await?; return Ok((block, block_depth)); } @@ -426,11 +418,7 @@ impl Web3Rpcs { pub(super) async fn process_incoming_blocks( &self, - authorization: &Arc, mut block_receiver: mpsc::UnboundedReceiver, - // TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed - // Geth's subscriptions have the same potential for skipping blocks. - pending_tx_sender: Option>, ) -> Web3ProxyResult<()> { let mut consensus_finder = ConsensusFinder::new(Some(self.max_head_block_age), Some(self.max_head_block_lag)); @@ -449,13 +437,7 @@ impl Web3Rpcs { // TODO: what timeout on this? match timeout( Duration::from_secs(1), - consensus_finder.process_block_from_rpc( - self, - authorization, - new_block, - rpc, - &pending_tx_sender, - ), + consensus_finder.process_block_from_rpc(self, new_block, rpc), ) .await { @@ -493,7 +475,7 @@ impl Web3Rpcs { // TODO: what timeout on this? match timeout( Duration::from_secs(2), - consensus_finder.refresh(self, authorization, None, None), + consensus_finder.refresh(self, None, None), ) .await { diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index c165028a..76bf16ed 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -1,9 +1,7 @@ use super::blockchain::Web3ProxyBlock; use super::many::Web3Rpcs; use super::one::Web3Rpc; -use super::transactions::TxStatus; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; -use crate::frontend::authorization::Authorization; use base64::engine::general_purpose; use derive_more::Constructor; use ethers::prelude::{H256, U64}; @@ -16,7 +14,6 @@ use serde::Serialize; use std::cmp::{Ordering, Reverse}; use std::sync::{atomic, Arc}; use std::time::Duration; -use tokio::sync::broadcast; use tokio::time::Instant; use tracing::{debug, enabled, info, trace, warn, Level}; @@ -423,12 +420,11 @@ impl ConsensusFinder { pub(super) async fn refresh( &mut self, web3_rpcs: &Web3Rpcs, - authorization: &Arc, rpc: Option<&Arc>, new_block: Option, ) -> Web3ProxyResult { let new_ranked_rpcs = match self - .find_consensus_connections(authorization, web3_rpcs) + .find_consensus_connections(web3_rpcs) .await .web3_context("error while finding consensus head block!")? { @@ -616,10 +612,8 @@ impl ConsensusFinder { pub(super) async fn process_block_from_rpc( &mut self, web3_rpcs: &Web3Rpcs, - authorization: &Arc, new_block: Option, rpc: Arc, - _pending_tx_sender: &Option>, ) -> Web3ProxyResult { // TODO: how should we handle an error here? if !self @@ -632,8 +626,7 @@ impl ConsensusFinder { return Ok(false); } - self.refresh(web3_rpcs, authorization, Some(&rpc), new_block) - .await + self.refresh(web3_rpcs, Some(&rpc), new_block).await } fn remove(&mut self, rpc: &Arc) -> Option { @@ -797,7 +790,6 @@ impl ConsensusFinder { pub async fn find_consensus_connections( &mut self, - authorization: &Arc, web3_rpcs: &Web3Rpcs, ) -> Web3ProxyResult> { self.update_tiers().await?; @@ -868,10 +860,7 @@ impl ConsensusFinder { let parent_hash = block_to_check.parent_hash(); - match web3_rpcs - .block(authorization, parent_hash, Some(rpc), Some(1), None) - .await - { + match web3_rpcs.block(parent_hash, Some(rpc), Some(1), None).await { Ok(parent_block) => block_to_check = parent_block, Err(err) => { debug!( diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 5652bf08..988942a6 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -4,23 +4,21 @@ use super::consensus::{RankedRpcs, ShouldWaitForBlock}; use super::one::Web3Rpc; use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler}; use crate::app::{flatten_handle, Web3ProxyApp, Web3ProxyJoinHandle}; -use crate::config::{average_block_interval, BlockAndRpc, TxHashAndRpc, Web3RpcConfig}; +use crate::config::{average_block_interval, BlockAndRpc, Web3RpcConfig}; use crate::errors::{Web3ProxyError, Web3ProxyResult}; use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::frontend::status::MokaCacheSerializer; use crate::jsonrpc::{JsonRpcErrorData, JsonRpcParams, JsonRpcResultData}; -use crate::rpcs::transactions::TxStatus; use counter::Counter; use derive_more::From; -use ethers::prelude::{ProviderError, TxHash, U64}; +use ethers::prelude::{ProviderError, U64}; use futures::future::try_join_all; use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::HashMap; use itertools::Itertools; -use migration::sea_orm::DatabaseConnection; -use moka::future::{Cache, CacheBuilder}; +use moka::future::CacheBuilder; use parking_lot::RwLock; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; @@ -31,7 +29,7 @@ use std::fmt::{self, Display}; use std::sync::atomic::Ordering; use std::sync::Arc; use tokio::select; -use tokio::sync::{broadcast, mpsc, watch, RwLock as AsyncRwLock}; +use tokio::sync::{mpsc, watch}; use tokio::time::{sleep, sleep_until, Duration, Instant}; use tracing::{debug, error, info, trace, warn}; @@ -53,10 +51,6 @@ pub struct Web3Rpcs { pub(crate) watch_ranked_rpcs: watch::Sender>>, /// this head receiver makes it easy to wait until there is a new block pub(super) watch_head_block: Option>>, - /// keep track of transactions that we have sent through subscriptions - pub(super) pending_transaction_cache: Cache, - pub(super) pending_tx_id_receiver: AsyncRwLock>, - pub(super) pending_tx_id_sender: mpsc::UnboundedSender, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// all blocks, including orphans pub(super) blocks_by_hash: BlocksByHashCache, @@ -78,20 +72,16 @@ impl Web3Rpcs { #[allow(clippy::too_many_arguments)] pub async fn spawn( chain_id: u64, - db_conn: Option, max_head_block_lag: Option, min_head_rpcs: usize, min_sum_soft_limit: u32, name: String, - pending_transaction_cache: Cache, - pending_tx_sender: Option>, watch_consensus_head_sender: Option>>, ) -> anyhow::Result<( Arc, Web3ProxyJoinHandle<()>, watch::Receiver>>, )> { - let (pending_tx_id_sender, pending_tx_id_receiver) = mpsc::unbounded_channel(); let (block_sender, block_receiver) = mpsc::unbounded_channel::(); // these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes @@ -132,19 +122,14 @@ impl Web3Rpcs { min_synced_rpcs: min_head_rpcs, min_sum_soft_limit, name, - pending_transaction_cache, - pending_tx_id_receiver: AsyncRwLock::new(pending_tx_id_receiver), - pending_tx_id_sender, watch_head_block: watch_consensus_head_sender, watch_ranked_rpcs: watch_consensus_rpcs_sender, }); - let authorization = Arc::new(Authorization::internal(db_conn)?); - let handle = { let connections = connections.clone(); - tokio::spawn(connections.subscribe(authorization, block_receiver, pending_tx_sender)) + tokio::spawn(connections.subscribe(block_receiver)) }; Ok((connections, handle, consensus_connections_watcher)) @@ -205,7 +190,6 @@ impl Web3Rpcs { None }; - let pending_tx_id_sender = Some(self.pending_tx_id_sender.clone()); let blocks_by_hash_cache = self.blocks_by_hash.clone(); debug!("spawning tasks for {}", server_name); @@ -222,7 +206,6 @@ impl Web3Rpcs { blocks_by_hash_cache, block_sender, self.max_head_block_age, - pending_tx_id_sender, )); Some(handle) @@ -328,51 +311,42 @@ impl Web3Rpcs { /// transaction ids from all the `Web3Rpc`s are deduplicated and forwarded to `pending_tx_sender` async fn subscribe( self: Arc, - authorization: Arc, block_receiver: mpsc::UnboundedReceiver, - pending_tx_sender: Option>, ) -> Web3ProxyResult<()> { let mut futures = vec![]; - // setup the transaction funnel - // it skips any duplicates (unless they are being orphaned) - // fetches new transactions from the notifying rpc - // forwards new transacitons to pending_tx_receipt_sender - if let Some(pending_tx_sender) = pending_tx_sender.clone() { - let clone = self.clone(); - let authorization = authorization.clone(); - let handle = tokio::task::spawn(async move { - // TODO: set up this future the same as the block funnel - while let Some((pending_tx_id, rpc)) = - clone.pending_tx_id_receiver.write().await.recv().await - { - let f = clone.clone().process_incoming_tx_id( - authorization.clone(), - rpc, - pending_tx_id, - pending_tx_sender.clone(), - ); - tokio::spawn(f); - } + // // setup the transaction funnel + // // it skips any duplicates (unless they are being orphaned) + // // fetches new transactions from the notifying rpc + // // forwards new transacitons to pending_tx_receipt_sender + // if let Some(pending_tx_sender) = pending_tx_sender.clone() { + // let clone = self.clone(); + // let handle = tokio::task::spawn(async move { + // // TODO: set up this future the same as the block funnel + // while let Some((pending_tx_id, rpc)) = + // clone.pending_tx_id_receiver.write().await.recv().await + // { + // let f = clone.clone().process_incoming_tx_id( + // rpc, + // pending_tx_id, + // pending_tx_sender.clone(), + // ); + // tokio::spawn(f); + // } - Ok(()) - }); + // Ok(()) + // }); - futures.push(flatten_handle(handle)); - } + // futures.push(flatten_handle(handle)); + // } // setup the block funnel if self.watch_head_block.is_some() { let connections = Arc::clone(&self); - let pending_tx_sender = pending_tx_sender.clone(); let handle = tokio::task::Builder::default() .name("process_incoming_blocks") - .spawn(async move { - connections - .process_incoming_blocks(&authorization, block_receiver, pending_tx_sender) - .await - })?; + .spawn(async move { connections.process_incoming_blocks(block_receiver).await })?; futures.push(flatten_handle(handle)); } @@ -1353,7 +1327,6 @@ impl Serialize for Web3Rpcs { &( MokaCacheSerializer(&self.blocks_by_hash), MokaCacheSerializer(&self.blocks_by_number), - MokaCacheSerializer(&self.pending_transaction_cache), ), )?; @@ -1382,7 +1355,7 @@ mod tests { use ethers::types::H256; use ethers::types::{Block, U256}; use latency::PeakEwmaLatency; - use moka::future::CacheBuilder; + use moka::future::{Cache, CacheBuilder}; use std::time::{SystemTime, UNIX_EPOCH}; use tracing::trace; @@ -1539,7 +1512,6 @@ mod tests { let lagged_rpc = Arc::new(lagged_rpc); let (block_sender, _block_receiver) = mpsc::unbounded_channel(); - let (pending_tx_id_sender, pending_tx_id_receiver) = mpsc::unbounded_channel(); let (watch_ranked_rpcs, _watch_consensus_rpcs_receiver) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); @@ -1557,11 +1529,6 @@ mod tests { name: "test".to_string(), watch_head_block: Some(watch_consensus_head_sender), watch_ranked_rpcs, - pending_transaction_cache: CacheBuilder::new(100) - .time_to_live(Duration::from_secs(60)) - .build(), - pending_tx_id_receiver: AsyncRwLock::new(pending_tx_id_receiver), - pending_tx_id_sender, blocks_by_hash: CacheBuilder::new(100) .time_to_live(Duration::from_secs(60)) .build(), @@ -1576,18 +1543,16 @@ mod tests { min_sum_soft_limit: 1, }; - let authorization = Arc::new(Authorization::internal(None).unwrap()); - let mut consensus_finder = ConsensusFinder::new(None, None); consensus_finder - .process_block_from_rpc(&rpcs, &authorization, None, lagged_rpc.clone(), &None) + .process_block_from_rpc(&rpcs, None, lagged_rpc.clone()) .await .expect( "its lagged, but it should still be seen as consensus if its the first to report", ); consensus_finder - .process_block_from_rpc(&rpcs, &authorization, None, head_rpc.clone(), &None) + .process_block_from_rpc(&rpcs, None, head_rpc.clone()) .await .unwrap(); @@ -1634,10 +1599,8 @@ mod tests { consensus_finder .process_block_from_rpc( &rpcs, - &authorization, Some(lagged_block.clone().try_into().unwrap()), lagged_rpc.clone(), - &None, ) .await .unwrap(); @@ -1655,16 +1618,14 @@ mod tests { consensus_finder .process_block_from_rpc( &rpcs, - &authorization, Some(lagged_block.clone().try_into().unwrap()), head_rpc.clone(), - &None, ) .await .unwrap(); // TODO: how do we spawn this and wait for it to process things? subscribe and watch consensus connections? - // rpcs.process_incoming_blocks(&authorization, block_receiver, pending_tx_sender) + // rpcs.process_incoming_blocks(block_receiver, pending_tx_sender) assert!(head_rpc.has_block_data(lagged_block.number.as_ref().unwrap())); assert!(!head_rpc.has_block_data(head_block.number.as_ref().unwrap())); @@ -1688,10 +1649,8 @@ mod tests { consensus_finder .process_block_from_rpc( &rpcs, - &authorization, Some(head_block.clone().try_into().unwrap()), head_rpc.clone(), - &None, ) .await .unwrap(); @@ -1809,7 +1768,6 @@ mod tests { let archive_rpc = Arc::new(archive_rpc); let (block_sender, _) = mpsc::unbounded_channel(); - let (pending_tx_id_sender, pending_tx_id_receiver) = mpsc::unbounded_channel(); let (watch_ranked_rpcs, _) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); @@ -1826,11 +1784,6 @@ mod tests { name: "test".to_string(), watch_head_block: Some(watch_consensus_head_sender), watch_ranked_rpcs, - pending_transaction_cache: CacheBuilder::new(100) - .time_to_live(Duration::from_secs(120)) - .build(), - pending_tx_id_receiver: AsyncRwLock::new(pending_tx_id_receiver), - pending_tx_id_sender, blocks_by_hash: CacheBuilder::new(100) .time_to_live(Duration::from_secs(120)) .build(), @@ -1843,19 +1796,11 @@ mod tests { max_head_block_lag: 5.into(), }; - let authorization = Arc::new(Authorization::internal(None).unwrap()); - let mut connection_heads = ConsensusFinder::new(None, None); // min sum soft limit will require 2 servers let x = connection_heads - .process_block_from_rpc( - &rpcs, - &authorization, - Some(head_block.clone()), - pruned_rpc.clone(), - &None, - ) + .process_block_from_rpc(&rpcs, Some(head_block.clone()), pruned_rpc.clone()) .await .unwrap(); assert!(!x); @@ -1863,13 +1808,7 @@ mod tests { assert_eq!(rpcs.num_synced_rpcs(), 0); let x = connection_heads - .process_block_from_rpc( - &rpcs, - &authorization, - Some(head_block.clone()), - archive_rpc.clone(), - &None, - ) + .process_block_from_rpc(&rpcs, Some(head_block.clone()), archive_rpc.clone()) .await .unwrap(); assert!(x); @@ -1993,7 +1932,6 @@ mod tests { let mock_erigon_archive = Arc::new(mock_erigon_archive); let (block_sender, _) = mpsc::unbounded_channel(); - let (pending_tx_id_sender, pending_tx_id_receiver) = mpsc::unbounded_channel(); let (watch_ranked_rpcs, _) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); @@ -2014,9 +1952,6 @@ mod tests { name: "test".to_string(), watch_head_block: Some(watch_consensus_head_sender), watch_ranked_rpcs, - pending_transaction_cache: Cache::new(10_000), - pending_tx_id_receiver: AsyncRwLock::new(pending_tx_id_receiver), - pending_tx_id_sender, blocks_by_hash: Cache::new(10_000), blocks_by_number: Cache::new(10_000), min_synced_rpcs: 1, @@ -2025,29 +1960,15 @@ mod tests { max_head_block_lag: 5.into(), }; - let authorization = Arc::new(Authorization::internal(None).unwrap()); - let mut consensus_finder = ConsensusFinder::new(None, None); consensus_finder - .process_block_from_rpc( - &rpcs, - &authorization, - Some(block_1.clone()), - mock_geth.clone(), - &None, - ) + .process_block_from_rpc(&rpcs, Some(block_1.clone()), mock_geth.clone()) .await .unwrap(); consensus_finder - .process_block_from_rpc( - &rpcs, - &authorization, - Some(block_2.clone()), - mock_erigon_archive.clone(), - &None, - ) + .process_block_from_rpc(&rpcs, Some(block_2.clone()), mock_erigon_archive.clone()) .await .unwrap(); diff --git a/web3_proxy/src/rpcs/mod.rs b/web3_proxy/src/rpcs/mod.rs index 41b7a6ea..6d2ba81f 100644 --- a/web3_proxy/src/rpcs/mod.rs +++ b/web3_proxy/src/rpcs/mod.rs @@ -5,4 +5,3 @@ pub mod many; pub mod one; pub mod provider; pub mod request; -pub mod transactions; diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index a582864e..cb833a02 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -10,7 +10,7 @@ use crate::jsonrpc::{JsonRpcParams, JsonRpcResultData}; use crate::rpcs::request::RequestErrorHandler; use anyhow::{anyhow, Context}; use arc_swap::ArcSwapOption; -use ethers::prelude::{Bytes, Middleware, TxHash, U64}; +use ethers::prelude::{Bytes, Middleware, U64}; use ethers::types::{Address, Transaction, U256}; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -93,14 +93,13 @@ impl Web3Rpc { name: String, chain_id: u64, db_conn: Option, - // optional because this is only used for http providers. websocket providers don't use it + // optional because this is only used for http providers. websocket-only providers don't use it http_client: Option, redis_pool: Option, block_interval: Duration, block_map: BlocksByHashCache, block_and_rpc_sender: Option>, max_head_block_age: Duration, - tx_id_sender: Option)>>, ) -> anyhow::Result<(Arc, Web3ProxyJoinHandle<()>)> { let created_at = Instant::now(); @@ -126,12 +125,6 @@ impl Web3Rpc { } }; - let tx_id_sender = if config.subscribe_txs { - tx_id_sender - } else { - None - }; - let backup = config.backup; let block_data_limit: AtomicU64 = config.block_data_limit.unwrap_or_default().into(); @@ -210,18 +203,11 @@ impl Web3Rpc { // subscribe to new blocks and new transactions // subscribing starts the connection (with retries) - // TODO: make transaction subscription optional (just pass None for tx_id_sender) let handle = { let new_connection = new_connection.clone(); tokio::spawn(async move { - // TODO: this needs to be a subscribe_with_reconnect that does a retry with jitter and exponential backoff new_connection - .subscribe_with_reconnect( - block_map, - block_and_rpc_sender, - chain_id, - tx_id_sender, - ) + .subscribe_with_reconnect(block_map, block_and_rpc_sender, chain_id) .await }) }; @@ -596,23 +582,18 @@ impl Web3Rpc { Ok(()) } + /// TODO: this needs to be a subscribe_with_reconnect that does a retry with jitter and exponential backoff #[allow(clippy::too_many_arguments)] async fn subscribe_with_reconnect( self: Arc, block_map: BlocksByHashCache, block_and_rpc_sender: Option>, chain_id: u64, - tx_id_sender: Option)>>, ) -> Web3ProxyResult<()> { loop { if let Err(err) = self .clone() - .subscribe( - block_map.clone(), - block_and_rpc_sender.clone(), - chain_id, - tx_id_sender.clone(), - ) + .subscribe(block_map.clone(), block_and_rpc_sender.clone(), chain_id) .await { if self.should_disconnect() { @@ -645,7 +626,6 @@ impl Web3Rpc { block_map: BlocksByHashCache, block_and_rpc_sender: Option>, chain_id: u64, - tx_id_sender: Option)>>, ) -> Web3ProxyResult<()> { let error_handler = if self.backup { Some(RequestErrorHandler::DebugLevel) @@ -766,18 +746,6 @@ impl Web3Rpc { futures.push(flatten_handle(tokio::spawn(f))); } - // subscribe pending transactions - // TODO: make this opt-in. its a lot of bandwidth - if let Some(tx_id_sender) = tx_id_sender { - let subscribe_stop_rx = subscribe_stop_tx.subscribe(); - - let f = self - .clone() - .subscribe_pending_transactions(tx_id_sender, subscribe_stop_rx); - - futures.push(flatten_handle(tokio::spawn(f))); - } - // exit if any of the futures exit let first_exit = futures.next().await; @@ -809,11 +777,16 @@ impl Web3Rpc { trace!("subscribing to new heads on {}", self); // TODO: different handler depending on backup or not - let error_handler = None; - let authorization = Default::default(); + let error_handler = if self.backup { + Some(Level::DEBUG.into()) + } else { + Some(Level::ERROR.into()) + }; if let Some(ws_provider) = self.ws_provider.load().as_ref() { // todo: move subscribe_blocks onto the request handle + let authorization = Default::default(); + let active_request_handle = self .wait_for_request_handle(&authorization, None, error_handler) .await; @@ -826,11 +799,10 @@ impl Web3Rpc { // TODO: how does this get wrapped in an arc? does ethers handle that? // TODO: send this request to the ws_provider instead of the http_provider let latest_block: Result, _> = self - .authorized_request( + .internal_request( "eth_getBlockByNumber", &("latest", false), - &authorization, - Some(Level::WARN.into()), + error_handler, Some(2), Some(Duration::from_secs(5)), ) @@ -863,11 +835,10 @@ impl Web3Rpc { } let block_result = self - .authorized_request::<_, Option>( + .internal_request::<_, Option>( "eth_getBlockByNumber", &("latest", false), - &authorization, - Some(Level::WARN.into()), + error_handler, Some(2), Some(Duration::from_secs(5)), ) @@ -894,71 +865,6 @@ impl Web3Rpc { } } - /// Turn on the firehose of pending transactions - async fn subscribe_pending_transactions( - self: Arc, - tx_id_sender: mpsc::UnboundedSender<(TxHash, Arc)>, - mut subscribe_stop_rx: watch::Receiver, - ) -> Web3ProxyResult<()> { - // TODO: check that it actually changed to true - loop { - if *subscribe_stop_rx.borrow_and_update() { - break; - } - - subscribe_stop_rx.changed().await?; - } - - /* - trace!("watching pending transactions on {}", self); - // TODO: does this keep the lock open for too long? - match provider.as_ref() { - Web3Provider::Http(_provider) => { - // there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints - self.wait_for_disconnect().await?; - } - Web3Provider::Both(_, client) | Web3Provider::Ws(client) => { - // TODO: maybe the subscribe_pending_txs function should be on the active_request_handle - let active_request_handle = self - .wait_for_request_handle(&authorization, None, Some(provider.clone())) - .await?; - - let mut stream = client.subscribe_pending_txs().await?; - - drop(active_request_handle); - - while let Some(pending_tx_id) = stream.next().await { - tx_id_sender - .send_async((pending_tx_id, self.clone())) - .await - .context("tx_id_sender")?; - - // TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription - - // TODO: select on this instead of checking every loop - if self.should_disconnect() { - break; - } - } - - // TODO: is this always an error? - // TODO: we probably don't want a warn and to return error - debug!("pending_transactions subscription ended on {}", self); - } - #[cfg(test)] - Web3Provider::Mock => { - self.wait_for_disconnect().await?; - } - } - */ - - if *subscribe_stop_rx.borrow() { - Ok(()) - } else { - Err(anyhow!("pending_transactions subscription exited. reconnect needed").into()) - } - } - pub async fn wait_for_request_handle( self: &Arc, authorization: &Arc, diff --git a/web3_proxy/src/rpcs/transactions.rs b/web3_proxy/src/rpcs/transactions.rs deleted file mode 100644 index a980fcbd..00000000 --- a/web3_proxy/src/rpcs/transactions.rs +++ /dev/null @@ -1,118 +0,0 @@ -//! Load balanced communication with a group of web3 providers -use super::many::Web3Rpcs; -use super::one::Web3Rpc; -use super::request::OpenRequestResult; -use crate::errors::Web3ProxyResult; -use crate::frontend::authorization::Authorization; -use ethers::prelude::{ProviderError, Transaction, TxHash}; -use std::sync::Arc; -use tokio::sync::broadcast; -use tracing::{debug, trace, Level}; - -// TODO: think more about TxState -#[derive(Clone)] -pub enum TxStatus { - Pending(Transaction), - Confirmed(Transaction), - Orphaned(Transaction), -} - -impl Web3Rpcs { - async fn query_transaction_status( - &self, - authorization: &Arc, - rpc: Arc, - pending_tx_id: TxHash, - ) -> Result, ProviderError> { - // TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!) - // TODO: might not be a race. might be a nonce thats higher than the current account nonce. geth discards chains - // TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself - // TODO: if one rpc fails, try another? - // TODO: try_request_handle, or wait_for_request_handle? I think we want wait here - let tx: Transaction = match rpc - .try_request_handle(authorization, Some(Level::WARN.into())) - .await - { - Ok(OpenRequestResult::Handle(handle)) => { - handle - .request("eth_getTransactionByHash", &(pending_tx_id,)) - .await? - } - Ok(_) => { - // TODO: actually retry? - return Ok(None); - } - Err(err) => { - trace!( - "cancelled funneling transaction {} from {}: {:?}", - pending_tx_id, - rpc, - err, - ); - return Ok(None); - } - }; - - match &tx.block_hash { - Some(_block_hash) => { - // the transaction is already confirmed. no need to save in the pending_transactions map - Ok(Some(TxStatus::Confirmed(tx))) - } - None => Ok(Some(TxStatus::Pending(tx))), - } - } - - /// dedupe transaction and send them to any listening clients - pub(super) async fn process_incoming_tx_id( - self: Arc, - authorization: Arc, - rpc: Arc, - pending_tx_id: TxHash, - pending_tx_sender: broadcast::Sender, - ) -> Web3ProxyResult<()> { - // TODO: how many retries? until some timestamp is hit is probably better. maybe just loop and call this with a timeout - // TODO: after more investigation, i don't think retries will help. i think this is because chains of transactions get dropped from memory - // TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it? - - if pending_tx_sender.receiver_count() == 0 { - // no receivers, so no point in querying to get the full transaction - return Ok(()); - } - - // trace!(?pending_tx_id, "checking pending_transactions on {}", rpc); - if self.pending_transaction_cache.get(&pending_tx_id).is_some() { - // this transaction has already been processed - return Ok(()); - } - - // query the rpc for this transaction - // it is possible that another rpc is also being queried. thats fine. we want the fastest response - match self - .query_transaction_status(&authorization, rpc.clone(), pending_tx_id) - .await - { - Ok(Some(tx_state)) => { - let _ = pending_tx_sender.send(tx_state); - - trace!("sent tx {:?}", pending_tx_id); - - // we sent the transaction. return now. don't break looping because that gives a warning - return Ok(()); - } - Ok(None) => {} - Err(err) => { - trace!("failed fetching transaction {:?}: {:?}", pending_tx_id, err); - // unable to update the entry. sleep and try again soon - // TODO: retry with exponential backoff with jitter starting from a much smaller time - // sleep(Duration::from_millis(100)).await; - } - } - - // warn is too loud. this is somewhat common - // "There is a Pending txn with a lower account nonce. This txn can only be executed after confirmation of the earlier Txn Hash#" - // sometimes it's been pending for many hours - // sometimes it's maybe something else? - debug!("txid {} not found on {}", pending_tx_id, rpc); - Ok(()) - } -} diff --git a/web3_proxy/tests/test_admins.rs b/web3_proxy/tests/test_admins.rs index f31d9b16..f23dcc30 100644 --- a/web3_proxy/tests/test_admins.rs +++ b/web3_proxy/tests/test_admins.rs @@ -17,12 +17,6 @@ use tracing::info; #[ignore = "under construction"] #[test_log::test(tokio::test)] async fn test_admin_imitate_user() { - let a: TestAnvil = TestAnvil::spawn(31337).await; - - let db = TestMysql::spawn().await; - - let x = TestApp::spawn(&a, Some(&db)).await; - todo!(); } @@ -75,10 +69,5 @@ async fn test_admin_grant_credits() { #[ignore = "under construction"] #[test_log::test(tokio::test)] async fn test_admin_change_user_tier() { - let a = TestAnvil::spawn(31337).await; - let db = TestMysql::spawn().await; - - let x = TestApp::spawn(&a, Some(&db)).await; - todo!(); }