less authorization and remove pending txs for now

This commit is contained in:
Bryan Stitt 2023-07-13 10:58:22 -07:00
parent 53877ed2e5
commit dce3540ee3
11 changed files with 91 additions and 684 deletions

@ -21,7 +21,6 @@ use crate::rpcs::consensus::RankedRpcs;
use crate::rpcs::many::Web3Rpcs; use crate::rpcs::many::Web3Rpcs;
use crate::rpcs::one::Web3Rpc; use crate::rpcs::one::Web3Rpc;
use crate::rpcs::provider::{connect_http, EthersHttpProvider}; use crate::rpcs::provider::{connect_http, EthersHttpProvider};
use crate::rpcs::transactions::TxStatus;
use crate::stats::{AppStat, FlushedStats, StatBuffer}; use crate::stats::{AppStat, FlushedStats, StatBuffer};
use anyhow::Context; use anyhow::Context;
use axum::http::StatusCode; use axum::http::StatusCode;
@ -29,7 +28,7 @@ use chrono::Utc;
use deferred_rate_limiter::DeferredRateLimiter; use deferred_rate_limiter::DeferredRateLimiter;
use entities::user; use entities::user;
use ethers::core::utils::keccak256; use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U64}; use ethers::prelude::{Address, Bytes, Transaction, H256, U64};
use ethers::types::U256; use ethers::types::U256;
use ethers::utils::rlp::{Decodable, Rlp}; use ethers::utils::rlp::{Decodable, Rlp};
use futures::future::join_all; use futures::future::join_all;
@ -87,9 +86,6 @@ pub struct Web3ProxyApp {
/// don't drop this or the sender will stop working /// don't drop this or the sender will stop working
/// TODO: broadcast channel instead? /// TODO: broadcast channel instead?
pub watch_consensus_head_receiver: watch::Receiver<Option<Web3ProxyBlock>>, pub watch_consensus_head_receiver: watch::Receiver<Option<Web3ProxyBlock>>,
/// rpc clients that subscribe to pendingTransactions use this channel
/// This is the Sender so that new channels can subscribe to it
pending_tx_sender: broadcast::Sender<TxStatus>,
/// Optional database for users and accounting /// Optional database for users and accounting
pub db_conn: Option<DatabaseConnection>, pub db_conn: Option<DatabaseConnection>,
/// Optional read-only database for users and accounting /// Optional read-only database for users and accounting
@ -107,9 +103,6 @@ pub struct Web3ProxyApp {
/// rate limit the login endpoint /// rate limit the login endpoint
/// we do this because each pending login is a row in the database /// we do this because each pending login is a row in the database
pub login_rate_limiter: Option<RedisRateLimiter>, pub login_rate_limiter: Option<RedisRateLimiter>,
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
/// TODO: think about this more. might be worth storing if we sent the transaction or not and using this for automatic retries
pub pending_transactions: Cache<TxHash, TxStatus>,
/// Send private requests (like eth_sendRawTransaction) to all these servers /// Send private requests (like eth_sendRawTransaction) to all these servers
/// TODO: include another type so that we can use private miner relays that do not use JSONRPC requests /// TODO: include another type so that we can use private miner relays that do not use JSONRPC requests
pub private_rpcs: Option<Arc<Web3Rpcs>>, pub private_rpcs: Option<Arc<Web3Rpcs>>,
@ -453,25 +446,6 @@ impl Web3ProxyApp {
} }
let (watch_consensus_head_sender, watch_consensus_head_receiver) = watch::channel(None); let (watch_consensus_head_sender, watch_consensus_head_receiver) = watch::channel(None);
// TODO: will one receiver lagging be okay? how big should this be?
let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256);
// TODO: use this? it could listen for confirmed transactions and then clear pending_transactions, but the head_block_sender is doing that
// TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them?
// TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks.
// TODO: we should still have some sort of expiration or maximum size limit for the map
drop(pending_tx_receiver);
// TODO: capacity from configs
// all these are the same size, so no need for a weigher
// TODO: this used to have a time_to_idle
// TODO: different chains might handle this differently
// TODO: what should we set? 5 minutes is arbitrary. the nodes themselves hold onto transactions for much longer
// TODO: this used to be time_to_update, but
let pending_transactions = CacheBuilder::new(10_000)
.name("pending_transactions")
.time_to_live(Duration::from_secs(300))
.build();
// responses can be very different in sizes, so this is a cache with a max capacity and a weigher // responses can be very different in sizes, so this is a cache with a max capacity and a weigher
// TODO: we should emit stats to calculate a more accurate expected cache size // TODO: we should emit stats to calculate a more accurate expected cache size
@ -499,13 +473,10 @@ impl Web3ProxyApp {
let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn( let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn(
chain_id, chain_id,
db_conn.clone(),
top_config.app.max_head_block_lag, top_config.app.max_head_block_lag,
top_config.app.min_synced_rpcs, top_config.app.min_synced_rpcs,
top_config.app.min_sum_soft_limit, top_config.app.min_sum_soft_limit,
"balanced rpcs".to_string(), "balanced rpcs".to_string(),
pending_transactions.clone(),
Some(pending_tx_sender.clone()),
Some(watch_consensus_head_sender), Some(watch_consensus_head_sender),
) )
.await .await
@ -520,19 +491,13 @@ impl Web3ProxyApp {
None None
} else { } else {
// TODO: do something with the spawn handle // TODO: do something with the spawn handle
// TODO: Merge
// let (private_rpcs, private_rpcs_handle) = Web3Rpcs::spawn(
let (private_rpcs, private_handle, _) = Web3Rpcs::spawn( let (private_rpcs, private_handle, _) = Web3Rpcs::spawn(
chain_id, chain_id,
db_conn.clone(),
// private rpcs don't get subscriptions, so no need for max_head_block_lag // private rpcs don't get subscriptions, so no need for max_head_block_lag
None, None,
0, 0,
0, 0,
"protected rpcs".to_string(), "protected rpcs".to_string(),
pending_transactions.clone(),
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits, but they should have
None,
// subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs // subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs
// they also often have low rate limits // they also often have low rate limits
// however, they are well connected to miners/validators. so maybe using them as a safety check would be good // however, they are well connected to miners/validators. so maybe using them as a safety check would be good
@ -556,14 +521,11 @@ impl Web3ProxyApp {
// TODO: do something with the spawn handle // TODO: do something with the spawn handle
let (bundler_4337_rpcs, bundler_4337_rpcs_handle, _) = Web3Rpcs::spawn( let (bundler_4337_rpcs, bundler_4337_rpcs_handle, _) = Web3Rpcs::spawn(
chain_id, chain_id,
db_conn.clone(),
// bundler_4337_rpcs don't get subscriptions, so no need for max_head_block_lag // bundler_4337_rpcs don't get subscriptions, so no need for max_head_block_lag
None, None,
0, 0,
0, 0,
"eip4337 rpcs".to_string(), "eip4337 rpcs".to_string(),
pending_transactions.clone(),
None,
None, None,
) )
.await .await
@ -595,8 +557,6 @@ impl Web3ProxyApp {
jsonrpc_response_cache, jsonrpc_response_cache,
kafka_producer, kafka_producer,
login_rate_limiter, login_rate_limiter,
pending_transactions,
pending_tx_sender,
private_rpcs, private_rpcs,
prometheus_port: prometheus_port.clone(), prometheus_port: prometheus_port.clone(),
rpc_secret_key_cache, rpc_secret_key_cache,
@ -1184,8 +1144,6 @@ impl Web3ProxyApp {
// TODO: don't clone into a new string? // TODO: don't clone into a new string?
let request_method = method.to_string(); let request_method = method.to_string();
let authorization = request_metadata.authorization.clone().unwrap_or_default();
// TODO: serve net_version without querying the backend // TODO: serve net_version without querying the backend
// TODO: don't force RawValue // TODO: don't force RawValue
let response_data: JsonRpcResponseEnum<Arc<RawValue>> = match request_method.as_ref() { let response_data: JsonRpcResponseEnum<Arc<RawValue>> = match request_method.as_ref() {
@ -1608,7 +1566,6 @@ impl Web3ProxyApp {
// TODO: add a stat for archive vs full since they should probably cost different // TODO: add a stat for archive vs full since they should probably cost different
// TODO: this cache key can be rather large. is that okay? // TODO: this cache key can be rather large. is that okay?
let cache_key: Option<JsonRpcQueryCacheKey> = match CacheMode::new( let cache_key: Option<JsonRpcQueryCacheKey> = match CacheMode::new(
&authorization,
method, method,
params, params,
&head_block, &head_block,

@ -6,7 +6,6 @@ use crate::frontend::authorization::{Authorization, RequestMetadata, RequestOrMe
use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequest;
use crate::response_cache::JsonRpcResponseEnum; use crate::response_cache::JsonRpcResponseEnum;
use crate::rpcs::transactions::TxStatus;
use axum::extract::ws::{CloseFrame, Message}; use axum::extract::ws::{CloseFrame, Message};
use deferred_rate_limiter::DeferredRateLimitResult; use deferred_rate_limiter::DeferredRateLimitResult;
use ethers::types::U64; use ethers::types::U64;
@ -19,7 +18,7 @@ use std::sync::atomic::{self, AtomicU64};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::time::Instant; use tokio::time::Instant;
use tokio_stream::wrappers::{BroadcastStream, WatchStream}; use tokio_stream::wrappers::WatchStream;
use tracing::{error, trace}; use tracing::{error, trace};
impl Web3ProxyApp { impl Web3ProxyApp {
@ -124,218 +123,8 @@ impl Web3ProxyApp {
trace!("closed newHeads subscription {:?}", subscription_id); trace!("closed newHeads subscription {:?}", subscription_id);
}); });
} else if subscribe_to == "newPendingTransactions" {
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let app = self.clone();
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
subscription_registration,
);
trace!(
"pending newPendingTransactions subscription id: {:?}",
subscription_id
);
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let subscription_request_metadata = RequestMetadata::new(
&app,
authorization.clone(),
RequestOrMethod::Method("eth_subscribe(newPendingTransactions)", 0),
None,
)
.await;
if let Some(close_message) = app
.rate_limit_close_websocket(&subscription_request_metadata)
.await
{
let _ = response_sender.send(close_message);
break;
}
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
TxStatus::Orphaned(tx) => tx,
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
"result": new_tx.hash,
},
});
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// TODO: test that this len is the same as JsonRpcForwardedResponseEnum.num_bytes()
let response_bytes = response_str.len();
subscription_request_metadata.add_response(response_bytes);
// TODO: do clients support binary messages? reply with binary if thats what we were sent
let response_msg = Message::Text(response_str);
if response_sender.send(response_msg).is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
}
trace!(
"closed newPendingTransactions subscription: {:?}",
subscription_id
);
});
} else if subscribe_to == "newPendingFullTransactions" {
// TODO: too much copy/pasta with newPendingTransactions
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let app = self.clone();
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
subscription_registration,
);
trace!(
"pending newPendingFullTransactions subscription: {:?}",
subscription_id
);
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let subscription_request_metadata = RequestMetadata::new(
&app,
authorization.clone(),
RequestOrMethod::Method("eth_subscribe(newPendingFullTransactions)", 0),
None,
)
.await;
if let Some(close_message) = app
.rate_limit_close_websocket(&subscription_request_metadata)
.await
{
let _ = response_sender.send(close_message);
break;
}
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
TxStatus::Orphaned(tx) => tx,
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
// upstream just sends the txid, but we want to send the whole transaction
"result": new_tx,
},
});
subscription_request_metadata.add_response(&response_json);
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send(response_msg).is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
}
trace!(
"closed newPendingFullTransactions subscription: {:?}",
subscription_id
);
});
} else if subscribe_to == "newPendingRawTransactions" {
// TODO: too much copy/pasta with newPendingTransactions
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let app = self.clone();
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
subscription_registration,
);
trace!(
"pending transactions subscription id: {:?}",
subscription_id
);
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let subscription_request_metadata = RequestMetadata::new(
&app,
authorization.clone(),
"eth_subscribe(newPendingRawTransactions)",
None,
)
.await;
if let Some(close_message) = app
.rate_limit_close_websocket(&subscription_request_metadata)
.await
{
let _ = response_sender.send(close_message);
break;
}
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
TxStatus::Orphaned(tx) => tx,
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
// upstream just sends the txid, but we want to send the raw transaction
"result": new_tx.rlp(),
},
});
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use response.num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send(response_msg).is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
subscription_request_metadata.add_response(response_bytes);
}
trace!(
"closed newPendingRawTransactions subscription: {:?}",
subscription_id
);
});
} else { } else {
// TODO: make sure this gets a CU cost of unimplemented instead of the normal eth_subscribe cost?
return Err(Web3ProxyError::NotImplemented( return Err(Web3ProxyError::NotImplemented(
subscribe_to.to_owned().into(), subscribe_to.to_owned().into(),
)); ));

@ -1,4 +1,5 @@
//! Helper functions for turning ether's BlockNumber into numbers and updating incoming queries to match. //! Helper functions for turning ether's BlockNumber into numbers and updating incoming queries to match.
use crate::rpcs::many::Web3Rpcs;
use crate::{ use crate::{
errors::{Web3ProxyError, Web3ProxyResult}, errors::{Web3ProxyError, Web3ProxyResult},
rpcs::blockchain::Web3ProxyBlock, rpcs::blockchain::Web3ProxyBlock,
@ -10,11 +11,8 @@ use ethers::{
types::H256, types::H256,
}; };
use serde_json::json; use serde_json::json;
use std::sync::Arc;
use tracing::{error, trace, warn}; use tracing::{error, trace, warn};
use crate::{frontend::authorization::Authorization, rpcs::many::Web3Rpcs};
#[allow(non_snake_case)] #[allow(non_snake_case)]
pub fn BlockNumber_to_U64(block_num: BlockNumber, latest_block: &U64) -> (U64, bool) { pub fn BlockNumber_to_U64(block_num: BlockNumber, latest_block: &U64) -> (U64, bool) {
match block_num { match block_num {
@ -67,7 +65,6 @@ impl From<&Web3ProxyBlock> for BlockNumAndHash {
/// modify params to always have a block hash and not "latest" /// modify params to always have a block hash and not "latest"
/// TODO: this should replace all block numbers with hashes, not just "latest" /// TODO: this should replace all block numbers with hashes, not just "latest"
pub async fn clean_block_number( pub async fn clean_block_number(
authorization: &Arc<Authorization>,
params: &mut serde_json::Value, params: &mut serde_json::Value,
block_param_id: usize, block_param_id: usize,
latest_block: &Web3ProxyBlock, latest_block: &Web3ProxyBlock,
@ -101,7 +98,7 @@ pub async fn clean_block_number(
serde_json::from_value(block_hash).context("decoding blockHash")?; serde_json::from_value(block_hash).context("decoding blockHash")?;
let block = rpcs let block = rpcs
.block(authorization, &block_hash, None, Some(3), None) .block(&block_hash, None, Some(3), None)
.await .await
.context("fetching block number from hash")?; .context("fetching block number from hash")?;
@ -115,12 +112,12 @@ pub async fn clean_block_number(
// TODO: move this to a helper function? // TODO: move this to a helper function?
if let Ok(block_num) = serde_json::from_value::<U64>(x.clone()) { if let Ok(block_num) = serde_json::from_value::<U64>(x.clone()) {
let (block_hash, _) = rpcs let (block_hash, _) = rpcs
.block_hash(authorization, &block_num) .block_hash(&block_num)
.await .await
.context("fetching block hash from number")?; .context("fetching block hash from number")?;
let block = rpcs let block = rpcs
.block(authorization, &block_hash, None, Some(3), None) .block(&block_hash, None, Some(3), None)
.await .await
.context("fetching block from hash")?; .context("fetching block from hash")?;
@ -136,12 +133,12 @@ pub async fn clean_block_number(
(latest_block.into(), change) (latest_block.into(), change)
} else { } else {
let (block_hash, _) = rpcs let (block_hash, _) = rpcs
.block_hash(authorization, &block_num) .block_hash(&block_num)
.await .await
.context("fetching block hash from number")?; .context("fetching block hash from number")?;
let block = rpcs let block = rpcs
.block(authorization, &block_hash, None, Some(3), None) .block(&block_hash, None, Some(3), None)
.await .await
.context("fetching block from hash")?; .context("fetching block from hash")?;
@ -149,7 +146,7 @@ pub async fn clean_block_number(
} }
} else if let Ok(block_hash) = serde_json::from_value::<H256>(x.clone()) { } else if let Ok(block_hash) = serde_json::from_value::<H256>(x.clone()) {
let block = rpcs let block = rpcs
.block(authorization, &block_hash, None, Some(3), None) .block(&block_hash, None, Some(3), None)
.await .await
.context("fetching block number from hash")?; .context("fetching block number from hash")?;
@ -193,13 +190,12 @@ pub enum CacheMode {
impl CacheMode { impl CacheMode {
pub async fn new( pub async fn new(
authorization: &Arc<Authorization>,
method: &str, method: &str,
params: &mut serde_json::Value, params: &mut serde_json::Value,
head_block: &Web3ProxyBlock, head_block: &Web3ProxyBlock,
rpcs: &Web3Rpcs, rpcs: &Web3Rpcs,
) -> Self { ) -> Self {
match Self::try_new(authorization, method, params, head_block, rpcs).await { match Self::try_new(method, params, head_block, rpcs).await {
Ok(x) => x, Ok(x) => x,
Err(err) => { Err(err) => {
warn!(?err, "unable to determine cache mode from params"); warn!(?err, "unable to determine cache mode from params");
@ -209,7 +205,6 @@ impl CacheMode {
} }
pub async fn try_new( pub async fn try_new(
authorization: &Arc<Authorization>,
method: &str, method: &str,
params: &mut serde_json::Value, params: &mut serde_json::Value,
head_block: &Web3ProxyBlock, head_block: &Web3ProxyBlock,
@ -284,7 +279,7 @@ impl CacheMode {
*x = json!(block_num); *x = json!(block_num);
} }
let (block_hash, _) = rpcs.block_hash(authorization, &block_num).await?; let (block_hash, _) = rpcs.block_hash(&block_num).await?;
BlockNumAndHash(block_num, block_hash) BlockNumAndHash(block_num, block_hash)
} else { } else {
@ -304,7 +299,7 @@ impl CacheMode {
*x = json!(block_num); *x = json!(block_num);
} }
let (block_hash, _) = rpcs.block_hash(authorization, &block_num).await?; let (block_hash, _) = rpcs.block_hash(&block_num).await?;
BlockNumAndHash(block_num, block_hash) BlockNumAndHash(block_num, block_hash)
} else { } else {
@ -366,7 +361,7 @@ impl CacheMode {
} }
}; };
match clean_block_number(authorization, params, block_param_id, head_block, rpcs).await { match clean_block_number(params, block_param_id, head_block, rpcs).await {
Ok(block) => Ok(CacheMode::Cache { Ok(block) => Ok(CacheMode::Cache {
block, block,
cache_errors: true, cache_errors: true,

@ -284,7 +284,6 @@ impl Web3RpcConfig {
blocks_by_hash_cache: BlocksByHashCache, blocks_by_hash_cache: BlocksByHashCache,
block_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>, block_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
max_head_block_age: Duration, max_head_block_age: Duration,
tx_id_sender: Option<mpsc::UnboundedSender<TxHashAndRpc>>,
) -> anyhow::Result<(Arc<Web3Rpc>, Web3ProxyJoinHandle<()>)> { ) -> anyhow::Result<(Arc<Web3Rpc>, Web3ProxyJoinHandle<()>)> {
if !self.extra.is_empty() { if !self.extra.is_empty() {
warn!(extra=?self.extra.keys(), "unknown Web3RpcConfig fields!"); warn!(extra=?self.extra.keys(), "unknown Web3RpcConfig fields!");
@ -301,7 +300,6 @@ impl Web3RpcConfig {
blocks_by_hash_cache, blocks_by_hash_cache,
block_sender, block_sender,
max_head_block_age, max_head_block_age,
tx_id_sender,
) )
.await .await
} }

@ -2,10 +2,8 @@
use super::consensus::ConsensusFinder; use super::consensus::ConsensusFinder;
use super::many::Web3Rpcs; use super::many::Web3Rpcs;
use super::one::Web3Rpc; use super::one::Web3Rpc;
use super::transactions::TxStatus;
use crate::config::{average_block_interval, BlockAndRpc}; use crate::config::{average_block_interval, BlockAndRpc};
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::Authorization;
use derive_more::From; use derive_more::From;
use ethers::prelude::{Block, TxHash, H256, U64}; use ethers::prelude::{Block, TxHash, H256, U64};
use moka::future::Cache; use moka::future::Cache;
@ -15,7 +13,7 @@ use serde_json::json;
use std::hash::Hash; use std::hash::Hash;
use std::time::Duration; use std::time::Duration;
use std::{fmt::Display, sync::Arc}; use std::{fmt::Display, sync::Arc};
use tokio::sync::{broadcast, mpsc}; use tokio::sync::mpsc;
use tokio::time::timeout; use tokio::time::timeout;
use tracing::{debug, error, warn}; use tracing::{debug, error, warn};
@ -267,7 +265,6 @@ impl Web3Rpcs {
/// Will query a specific node or the best available. /// Will query a specific node or the best available.
pub async fn block( pub async fn block(
&self, &self,
authorization: &Arc<Authorization>,
hash: &H256, hash: &H256,
rpc: Option<&Arc<Web3Rpc>>, rpc: Option<&Arc<Web3Rpc>>,
max_tries: Option<usize>, max_tries: Option<usize>,
@ -301,11 +298,10 @@ impl Web3Rpcs {
let mut block: Option<ArcBlock> = if let Some(rpc) = rpc { let mut block: Option<ArcBlock> = if let Some(rpc) = rpc {
// ask a specific rpc // ask a specific rpc
// TODO: request_with_metadata would probably be better than authorized_request // this doesn't have retries, so we do retries with `self.internal_request` below (note the "self" vs "rpc")
rpc.authorized_request::<_, Option<ArcBlock>>( rpc.internal_request::<_, Option<ArcBlock>>(
"eth_getBlockByHash", "eth_getBlockByHash",
&get_block_params, &get_block_params,
authorization,
None, None,
max_tries, max_tries,
max_wait, max_wait,
@ -339,12 +335,8 @@ impl Web3Rpcs {
} }
/// Convenience method to get the cannonical block at a given block height. /// Convenience method to get the cannonical block at a given block height.
pub async fn block_hash( pub async fn block_hash(&self, num: &U64) -> Web3ProxyResult<(H256, u64)> {
&self, let (block, block_depth) = self.cannonical_block(num).await?;
authorization: &Arc<Authorization>,
num: &U64,
) -> Web3ProxyResult<(H256, u64)> {
let (block, block_depth) = self.cannonical_block(authorization, num).await?;
let hash = *block.hash(); let hash = *block.hash();
@ -353,11 +345,7 @@ impl Web3Rpcs {
/// Get the heaviest chain's block from cache or backend rpc /// Get the heaviest chain's block from cache or backend rpc
/// Caution! If a future block is requested, this might wait forever. Be sure to have a timeout outside of this! /// Caution! If a future block is requested, this might wait forever. Be sure to have a timeout outside of this!
pub async fn cannonical_block( pub async fn cannonical_block(&self, num: &U64) -> Web3ProxyResult<(Web3ProxyBlock, u64)> {
&self,
authorization: &Arc<Authorization>,
num: &U64,
) -> Web3ProxyResult<(Web3ProxyBlock, u64)> {
// we only have blocks by hash now // we only have blocks by hash now
// maybe save them during save_block in a blocks_by_number Cache<U64, Vec<ArcBlock>> // maybe save them during save_block in a blocks_by_number Cache<U64, Vec<ArcBlock>>
// if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations) // if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations)
@ -376,11 +364,16 @@ impl Web3Rpcs {
.web3_context("no consensus head block")? .web3_context("no consensus head block")?
.number(); .number();
loop { if *num > head_block_num {
if num <= &head_block_num { // if num is too far in the future, error now
break; if *num - head_block_num > self.max_head_block_lag {
return Err(Web3ProxyError::UnknownBlockNumber {
known: head_block_num,
unknown: *num,
});
} }
while *num > head_block_num {
debug!(%head_block_num, %num, "waiting for future block"); debug!(%head_block_num, %num, "waiting for future block");
consensus_head_receiver.changed().await?; consensus_head_receiver.changed().await?;
@ -389,6 +382,7 @@ impl Web3Rpcs {
head_block_num = *head.number(); head_block_num = *head.number();
} }
} }
}
let block_depth = (head_block_num - num).as_u64(); let block_depth = (head_block_num - num).as_u64();
@ -397,9 +391,7 @@ impl Web3Rpcs {
if let Some(block_hash) = self.blocks_by_number.get(num) { if let Some(block_hash) = self.blocks_by_number.get(num) {
// TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set // TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set
// TODO: configurable max wait and rpc // TODO: configurable max wait and rpc
let block = self let block = self.block(&block_hash, None, Some(3), None).await?;
.block(authorization, &block_hash, None, Some(3), None)
.await?;
return Ok((block, block_depth)); return Ok((block, block_depth));
} }
@ -426,11 +418,7 @@ impl Web3Rpcs {
pub(super) async fn process_incoming_blocks( pub(super) async fn process_incoming_blocks(
&self, &self,
authorization: &Arc<Authorization>,
mut block_receiver: mpsc::UnboundedReceiver<BlockAndRpc>, mut block_receiver: mpsc::UnboundedReceiver<BlockAndRpc>,
// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
// Geth's subscriptions have the same potential for skipping blocks.
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> Web3ProxyResult<()> { ) -> Web3ProxyResult<()> {
let mut consensus_finder = let mut consensus_finder =
ConsensusFinder::new(Some(self.max_head_block_age), Some(self.max_head_block_lag)); ConsensusFinder::new(Some(self.max_head_block_age), Some(self.max_head_block_lag));
@ -449,13 +437,7 @@ impl Web3Rpcs {
// TODO: what timeout on this? // TODO: what timeout on this?
match timeout( match timeout(
Duration::from_secs(1), Duration::from_secs(1),
consensus_finder.process_block_from_rpc( consensus_finder.process_block_from_rpc(self, new_block, rpc),
self,
authorization,
new_block,
rpc,
&pending_tx_sender,
),
) )
.await .await
{ {
@ -493,7 +475,7 @@ impl Web3Rpcs {
// TODO: what timeout on this? // TODO: what timeout on this?
match timeout( match timeout(
Duration::from_secs(2), Duration::from_secs(2),
consensus_finder.refresh(self, authorization, None, None), consensus_finder.refresh(self, None, None),
) )
.await .await
{ {

@ -1,9 +1,7 @@
use super::blockchain::Web3ProxyBlock; use super::blockchain::Web3ProxyBlock;
use super::many::Web3Rpcs; use super::many::Web3Rpcs;
use super::one::Web3Rpc; use super::one::Web3Rpc;
use super::transactions::TxStatus;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::Authorization;
use base64::engine::general_purpose; use base64::engine::general_purpose;
use derive_more::Constructor; use derive_more::Constructor;
use ethers::prelude::{H256, U64}; use ethers::prelude::{H256, U64};
@ -16,7 +14,6 @@ use serde::Serialize;
use std::cmp::{Ordering, Reverse}; use std::cmp::{Ordering, Reverse};
use std::sync::{atomic, Arc}; use std::sync::{atomic, Arc};
use std::time::Duration; use std::time::Duration;
use tokio::sync::broadcast;
use tokio::time::Instant; use tokio::time::Instant;
use tracing::{debug, enabled, info, trace, warn, Level}; use tracing::{debug, enabled, info, trace, warn, Level};
@ -423,12 +420,11 @@ impl ConsensusFinder {
pub(super) async fn refresh( pub(super) async fn refresh(
&mut self, &mut self,
web3_rpcs: &Web3Rpcs, web3_rpcs: &Web3Rpcs,
authorization: &Arc<Authorization>,
rpc: Option<&Arc<Web3Rpc>>, rpc: Option<&Arc<Web3Rpc>>,
new_block: Option<Web3ProxyBlock>, new_block: Option<Web3ProxyBlock>,
) -> Web3ProxyResult<bool> { ) -> Web3ProxyResult<bool> {
let new_ranked_rpcs = match self let new_ranked_rpcs = match self
.find_consensus_connections(authorization, web3_rpcs) .find_consensus_connections(web3_rpcs)
.await .await
.web3_context("error while finding consensus head block!")? .web3_context("error while finding consensus head block!")?
{ {
@ -616,10 +612,8 @@ impl ConsensusFinder {
pub(super) async fn process_block_from_rpc( pub(super) async fn process_block_from_rpc(
&mut self, &mut self,
web3_rpcs: &Web3Rpcs, web3_rpcs: &Web3Rpcs,
authorization: &Arc<Authorization>,
new_block: Option<Web3ProxyBlock>, new_block: Option<Web3ProxyBlock>,
rpc: Arc<Web3Rpc>, rpc: Arc<Web3Rpc>,
_pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
) -> Web3ProxyResult<bool> { ) -> Web3ProxyResult<bool> {
// TODO: how should we handle an error here? // TODO: how should we handle an error here?
if !self if !self
@ -632,8 +626,7 @@ impl ConsensusFinder {
return Ok(false); return Ok(false);
} }
self.refresh(web3_rpcs, authorization, Some(&rpc), new_block) self.refresh(web3_rpcs, Some(&rpc), new_block).await
.await
} }
fn remove(&mut self, rpc: &Arc<Web3Rpc>) -> Option<Web3ProxyBlock> { fn remove(&mut self, rpc: &Arc<Web3Rpc>) -> Option<Web3ProxyBlock> {
@ -797,7 +790,6 @@ impl ConsensusFinder {
pub async fn find_consensus_connections( pub async fn find_consensus_connections(
&mut self, &mut self,
authorization: &Arc<Authorization>,
web3_rpcs: &Web3Rpcs, web3_rpcs: &Web3Rpcs,
) -> Web3ProxyResult<Option<RankedRpcs>> { ) -> Web3ProxyResult<Option<RankedRpcs>> {
self.update_tiers().await?; self.update_tiers().await?;
@ -868,10 +860,7 @@ impl ConsensusFinder {
let parent_hash = block_to_check.parent_hash(); let parent_hash = block_to_check.parent_hash();
match web3_rpcs match web3_rpcs.block(parent_hash, Some(rpc), Some(1), None).await {
.block(authorization, parent_hash, Some(rpc), Some(1), None)
.await
{
Ok(parent_block) => block_to_check = parent_block, Ok(parent_block) => block_to_check = parent_block,
Err(err) => { Err(err) => {
debug!( debug!(

@ -4,23 +4,21 @@ use super::consensus::{RankedRpcs, ShouldWaitForBlock};
use super::one::Web3Rpc; use super::one::Web3Rpc;
use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler}; use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler};
use crate::app::{flatten_handle, Web3ProxyApp, Web3ProxyJoinHandle}; use crate::app::{flatten_handle, Web3ProxyApp, Web3ProxyJoinHandle};
use crate::config::{average_block_interval, BlockAndRpc, TxHashAndRpc, Web3RpcConfig}; use crate::config::{average_block_interval, BlockAndRpc, Web3RpcConfig};
use crate::errors::{Web3ProxyError, Web3ProxyResult}; use crate::errors::{Web3ProxyError, Web3ProxyResult};
use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::frontend::status::MokaCacheSerializer; use crate::frontend::status::MokaCacheSerializer;
use crate::jsonrpc::{JsonRpcErrorData, JsonRpcParams, JsonRpcResultData}; use crate::jsonrpc::{JsonRpcErrorData, JsonRpcParams, JsonRpcResultData};
use crate::rpcs::transactions::TxStatus;
use counter::Counter; use counter::Counter;
use derive_more::From; use derive_more::From;
use ethers::prelude::{ProviderError, TxHash, U64}; use ethers::prelude::{ProviderError, U64};
use futures::future::try_join_all; use futures::future::try_join_all;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
use hashbrown::HashMap; use hashbrown::HashMap;
use itertools::Itertools; use itertools::Itertools;
use migration::sea_orm::DatabaseConnection; use moka::future::CacheBuilder;
use moka::future::{Cache, CacheBuilder};
use parking_lot::RwLock; use parking_lot::RwLock;
use serde::ser::{SerializeStruct, Serializer}; use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize; use serde::Serialize;
@ -31,7 +29,7 @@ use std::fmt::{self, Display};
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use tokio::select; use tokio::select;
use tokio::sync::{broadcast, mpsc, watch, RwLock as AsyncRwLock}; use tokio::sync::{mpsc, watch};
use tokio::time::{sleep, sleep_until, Duration, Instant}; use tokio::time::{sleep, sleep_until, Duration, Instant};
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, trace, warn};
@ -53,10 +51,6 @@ pub struct Web3Rpcs {
pub(crate) watch_ranked_rpcs: watch::Sender<Option<Arc<RankedRpcs>>>, pub(crate) watch_ranked_rpcs: watch::Sender<Option<Arc<RankedRpcs>>>,
/// this head receiver makes it easy to wait until there is a new block /// this head receiver makes it easy to wait until there is a new block
pub(super) watch_head_block: Option<watch::Sender<Option<Web3ProxyBlock>>>, pub(super) watch_head_block: Option<watch::Sender<Option<Web3ProxyBlock>>>,
/// keep track of transactions that we have sent through subscriptions
pub(super) pending_transaction_cache: Cache<TxHash, TxStatus>,
pub(super) pending_tx_id_receiver: AsyncRwLock<mpsc::UnboundedReceiver<TxHashAndRpc>>,
pub(super) pending_tx_id_sender: mpsc::UnboundedSender<TxHashAndRpc>,
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
/// all blocks, including orphans /// all blocks, including orphans
pub(super) blocks_by_hash: BlocksByHashCache, pub(super) blocks_by_hash: BlocksByHashCache,
@ -78,20 +72,16 @@ impl Web3Rpcs {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn spawn( pub async fn spawn(
chain_id: u64, chain_id: u64,
db_conn: Option<DatabaseConnection>,
max_head_block_lag: Option<U64>, max_head_block_lag: Option<U64>,
min_head_rpcs: usize, min_head_rpcs: usize,
min_sum_soft_limit: u32, min_sum_soft_limit: u32,
name: String, name: String,
pending_transaction_cache: Cache<TxHash, TxStatus>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>, watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
) -> anyhow::Result<( ) -> anyhow::Result<(
Arc<Self>, Arc<Self>,
Web3ProxyJoinHandle<()>, Web3ProxyJoinHandle<()>,
watch::Receiver<Option<Arc<RankedRpcs>>>, watch::Receiver<Option<Arc<RankedRpcs>>>,
)> { )> {
let (pending_tx_id_sender, pending_tx_id_receiver) = mpsc::unbounded_channel();
let (block_sender, block_receiver) = mpsc::unbounded_channel::<BlockAndRpc>(); let (block_sender, block_receiver) = mpsc::unbounded_channel::<BlockAndRpc>();
// these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes // these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes
@ -132,19 +122,14 @@ impl Web3Rpcs {
min_synced_rpcs: min_head_rpcs, min_synced_rpcs: min_head_rpcs,
min_sum_soft_limit, min_sum_soft_limit,
name, name,
pending_transaction_cache,
pending_tx_id_receiver: AsyncRwLock::new(pending_tx_id_receiver),
pending_tx_id_sender,
watch_head_block: watch_consensus_head_sender, watch_head_block: watch_consensus_head_sender,
watch_ranked_rpcs: watch_consensus_rpcs_sender, watch_ranked_rpcs: watch_consensus_rpcs_sender,
}); });
let authorization = Arc::new(Authorization::internal(db_conn)?);
let handle = { let handle = {
let connections = connections.clone(); let connections = connections.clone();
tokio::spawn(connections.subscribe(authorization, block_receiver, pending_tx_sender)) tokio::spawn(connections.subscribe(block_receiver))
}; };
Ok((connections, handle, consensus_connections_watcher)) Ok((connections, handle, consensus_connections_watcher))
@ -205,7 +190,6 @@ impl Web3Rpcs {
None None
}; };
let pending_tx_id_sender = Some(self.pending_tx_id_sender.clone());
let blocks_by_hash_cache = self.blocks_by_hash.clone(); let blocks_by_hash_cache = self.blocks_by_hash.clone();
debug!("spawning tasks for {}", server_name); debug!("spawning tasks for {}", server_name);
@ -222,7 +206,6 @@ impl Web3Rpcs {
blocks_by_hash_cache, blocks_by_hash_cache,
block_sender, block_sender,
self.max_head_block_age, self.max_head_block_age,
pending_tx_id_sender,
)); ));
Some(handle) Some(handle)
@ -328,51 +311,42 @@ impl Web3Rpcs {
/// transaction ids from all the `Web3Rpc`s are deduplicated and forwarded to `pending_tx_sender` /// transaction ids from all the `Web3Rpc`s are deduplicated and forwarded to `pending_tx_sender`
async fn subscribe( async fn subscribe(
self: Arc<Self>, self: Arc<Self>,
authorization: Arc<Authorization>,
block_receiver: mpsc::UnboundedReceiver<BlockAndRpc>, block_receiver: mpsc::UnboundedReceiver<BlockAndRpc>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> Web3ProxyResult<()> { ) -> Web3ProxyResult<()> {
let mut futures = vec![]; let mut futures = vec![];
// setup the transaction funnel // // setup the transaction funnel
// it skips any duplicates (unless they are being orphaned) // // it skips any duplicates (unless they are being orphaned)
// fetches new transactions from the notifying rpc // // fetches new transactions from the notifying rpc
// forwards new transacitons to pending_tx_receipt_sender // // forwards new transacitons to pending_tx_receipt_sender
if let Some(pending_tx_sender) = pending_tx_sender.clone() { // if let Some(pending_tx_sender) = pending_tx_sender.clone() {
let clone = self.clone(); // let clone = self.clone();
let authorization = authorization.clone(); // let handle = tokio::task::spawn(async move {
let handle = tokio::task::spawn(async move { // // TODO: set up this future the same as the block funnel
// TODO: set up this future the same as the block funnel // while let Some((pending_tx_id, rpc)) =
while let Some((pending_tx_id, rpc)) = // clone.pending_tx_id_receiver.write().await.recv().await
clone.pending_tx_id_receiver.write().await.recv().await // {
{ // let f = clone.clone().process_incoming_tx_id(
let f = clone.clone().process_incoming_tx_id( // rpc,
authorization.clone(), // pending_tx_id,
rpc, // pending_tx_sender.clone(),
pending_tx_id, // );
pending_tx_sender.clone(), // tokio::spawn(f);
); // }
tokio::spawn(f);
}
Ok(()) // Ok(())
}); // });
futures.push(flatten_handle(handle)); // futures.push(flatten_handle(handle));
} // }
// setup the block funnel // setup the block funnel
if self.watch_head_block.is_some() { if self.watch_head_block.is_some() {
let connections = Arc::clone(&self); let connections = Arc::clone(&self);
let pending_tx_sender = pending_tx_sender.clone();
let handle = tokio::task::Builder::default() let handle = tokio::task::Builder::default()
.name("process_incoming_blocks") .name("process_incoming_blocks")
.spawn(async move { .spawn(async move { connections.process_incoming_blocks(block_receiver).await })?;
connections
.process_incoming_blocks(&authorization, block_receiver, pending_tx_sender)
.await
})?;
futures.push(flatten_handle(handle)); futures.push(flatten_handle(handle));
} }
@ -1353,7 +1327,6 @@ impl Serialize for Web3Rpcs {
&( &(
MokaCacheSerializer(&self.blocks_by_hash), MokaCacheSerializer(&self.blocks_by_hash),
MokaCacheSerializer(&self.blocks_by_number), MokaCacheSerializer(&self.blocks_by_number),
MokaCacheSerializer(&self.pending_transaction_cache),
), ),
)?; )?;
@ -1382,7 +1355,7 @@ mod tests {
use ethers::types::H256; use ethers::types::H256;
use ethers::types::{Block, U256}; use ethers::types::{Block, U256};
use latency::PeakEwmaLatency; use latency::PeakEwmaLatency;
use moka::future::CacheBuilder; use moka::future::{Cache, CacheBuilder};
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tracing::trace; use tracing::trace;
@ -1539,7 +1512,6 @@ mod tests {
let lagged_rpc = Arc::new(lagged_rpc); let lagged_rpc = Arc::new(lagged_rpc);
let (block_sender, _block_receiver) = mpsc::unbounded_channel(); let (block_sender, _block_receiver) = mpsc::unbounded_channel();
let (pending_tx_id_sender, pending_tx_id_receiver) = mpsc::unbounded_channel();
let (watch_ranked_rpcs, _watch_consensus_rpcs_receiver) = watch::channel(None); let (watch_ranked_rpcs, _watch_consensus_rpcs_receiver) = watch::channel(None);
let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None);
@ -1557,11 +1529,6 @@ mod tests {
name: "test".to_string(), name: "test".to_string(),
watch_head_block: Some(watch_consensus_head_sender), watch_head_block: Some(watch_consensus_head_sender),
watch_ranked_rpcs, watch_ranked_rpcs,
pending_transaction_cache: CacheBuilder::new(100)
.time_to_live(Duration::from_secs(60))
.build(),
pending_tx_id_receiver: AsyncRwLock::new(pending_tx_id_receiver),
pending_tx_id_sender,
blocks_by_hash: CacheBuilder::new(100) blocks_by_hash: CacheBuilder::new(100)
.time_to_live(Duration::from_secs(60)) .time_to_live(Duration::from_secs(60))
.build(), .build(),
@ -1576,18 +1543,16 @@ mod tests {
min_sum_soft_limit: 1, min_sum_soft_limit: 1,
}; };
let authorization = Arc::new(Authorization::internal(None).unwrap());
let mut consensus_finder = ConsensusFinder::new(None, None); let mut consensus_finder = ConsensusFinder::new(None, None);
consensus_finder consensus_finder
.process_block_from_rpc(&rpcs, &authorization, None, lagged_rpc.clone(), &None) .process_block_from_rpc(&rpcs, None, lagged_rpc.clone())
.await .await
.expect( .expect(
"its lagged, but it should still be seen as consensus if its the first to report", "its lagged, but it should still be seen as consensus if its the first to report",
); );
consensus_finder consensus_finder
.process_block_from_rpc(&rpcs, &authorization, None, head_rpc.clone(), &None) .process_block_from_rpc(&rpcs, None, head_rpc.clone())
.await .await
.unwrap(); .unwrap();
@ -1634,10 +1599,8 @@ mod tests {
consensus_finder consensus_finder
.process_block_from_rpc( .process_block_from_rpc(
&rpcs, &rpcs,
&authorization,
Some(lagged_block.clone().try_into().unwrap()), Some(lagged_block.clone().try_into().unwrap()),
lagged_rpc.clone(), lagged_rpc.clone(),
&None,
) )
.await .await
.unwrap(); .unwrap();
@ -1655,16 +1618,14 @@ mod tests {
consensus_finder consensus_finder
.process_block_from_rpc( .process_block_from_rpc(
&rpcs, &rpcs,
&authorization,
Some(lagged_block.clone().try_into().unwrap()), Some(lagged_block.clone().try_into().unwrap()),
head_rpc.clone(), head_rpc.clone(),
&None,
) )
.await .await
.unwrap(); .unwrap();
// TODO: how do we spawn this and wait for it to process things? subscribe and watch consensus connections? // TODO: how do we spawn this and wait for it to process things? subscribe and watch consensus connections?
// rpcs.process_incoming_blocks(&authorization, block_receiver, pending_tx_sender) // rpcs.process_incoming_blocks(block_receiver, pending_tx_sender)
assert!(head_rpc.has_block_data(lagged_block.number.as_ref().unwrap())); assert!(head_rpc.has_block_data(lagged_block.number.as_ref().unwrap()));
assert!(!head_rpc.has_block_data(head_block.number.as_ref().unwrap())); assert!(!head_rpc.has_block_data(head_block.number.as_ref().unwrap()));
@ -1688,10 +1649,8 @@ mod tests {
consensus_finder consensus_finder
.process_block_from_rpc( .process_block_from_rpc(
&rpcs, &rpcs,
&authorization,
Some(head_block.clone().try_into().unwrap()), Some(head_block.clone().try_into().unwrap()),
head_rpc.clone(), head_rpc.clone(),
&None,
) )
.await .await
.unwrap(); .unwrap();
@ -1809,7 +1768,6 @@ mod tests {
let archive_rpc = Arc::new(archive_rpc); let archive_rpc = Arc::new(archive_rpc);
let (block_sender, _) = mpsc::unbounded_channel(); let (block_sender, _) = mpsc::unbounded_channel();
let (pending_tx_id_sender, pending_tx_id_receiver) = mpsc::unbounded_channel();
let (watch_ranked_rpcs, _) = watch::channel(None); let (watch_ranked_rpcs, _) = watch::channel(None);
let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None);
@ -1826,11 +1784,6 @@ mod tests {
name: "test".to_string(), name: "test".to_string(),
watch_head_block: Some(watch_consensus_head_sender), watch_head_block: Some(watch_consensus_head_sender),
watch_ranked_rpcs, watch_ranked_rpcs,
pending_transaction_cache: CacheBuilder::new(100)
.time_to_live(Duration::from_secs(120))
.build(),
pending_tx_id_receiver: AsyncRwLock::new(pending_tx_id_receiver),
pending_tx_id_sender,
blocks_by_hash: CacheBuilder::new(100) blocks_by_hash: CacheBuilder::new(100)
.time_to_live(Duration::from_secs(120)) .time_to_live(Duration::from_secs(120))
.build(), .build(),
@ -1843,19 +1796,11 @@ mod tests {
max_head_block_lag: 5.into(), max_head_block_lag: 5.into(),
}; };
let authorization = Arc::new(Authorization::internal(None).unwrap());
let mut connection_heads = ConsensusFinder::new(None, None); let mut connection_heads = ConsensusFinder::new(None, None);
// min sum soft limit will require 2 servers // min sum soft limit will require 2 servers
let x = connection_heads let x = connection_heads
.process_block_from_rpc( .process_block_from_rpc(&rpcs, Some(head_block.clone()), pruned_rpc.clone())
&rpcs,
&authorization,
Some(head_block.clone()),
pruned_rpc.clone(),
&None,
)
.await .await
.unwrap(); .unwrap();
assert!(!x); assert!(!x);
@ -1863,13 +1808,7 @@ mod tests {
assert_eq!(rpcs.num_synced_rpcs(), 0); assert_eq!(rpcs.num_synced_rpcs(), 0);
let x = connection_heads let x = connection_heads
.process_block_from_rpc( .process_block_from_rpc(&rpcs, Some(head_block.clone()), archive_rpc.clone())
&rpcs,
&authorization,
Some(head_block.clone()),
archive_rpc.clone(),
&None,
)
.await .await
.unwrap(); .unwrap();
assert!(x); assert!(x);
@ -1993,7 +1932,6 @@ mod tests {
let mock_erigon_archive = Arc::new(mock_erigon_archive); let mock_erigon_archive = Arc::new(mock_erigon_archive);
let (block_sender, _) = mpsc::unbounded_channel(); let (block_sender, _) = mpsc::unbounded_channel();
let (pending_tx_id_sender, pending_tx_id_receiver) = mpsc::unbounded_channel();
let (watch_ranked_rpcs, _) = watch::channel(None); let (watch_ranked_rpcs, _) = watch::channel(None);
let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None);
@ -2014,9 +1952,6 @@ mod tests {
name: "test".to_string(), name: "test".to_string(),
watch_head_block: Some(watch_consensus_head_sender), watch_head_block: Some(watch_consensus_head_sender),
watch_ranked_rpcs, watch_ranked_rpcs,
pending_transaction_cache: Cache::new(10_000),
pending_tx_id_receiver: AsyncRwLock::new(pending_tx_id_receiver),
pending_tx_id_sender,
blocks_by_hash: Cache::new(10_000), blocks_by_hash: Cache::new(10_000),
blocks_by_number: Cache::new(10_000), blocks_by_number: Cache::new(10_000),
min_synced_rpcs: 1, min_synced_rpcs: 1,
@ -2025,29 +1960,15 @@ mod tests {
max_head_block_lag: 5.into(), max_head_block_lag: 5.into(),
}; };
let authorization = Arc::new(Authorization::internal(None).unwrap());
let mut consensus_finder = ConsensusFinder::new(None, None); let mut consensus_finder = ConsensusFinder::new(None, None);
consensus_finder consensus_finder
.process_block_from_rpc( .process_block_from_rpc(&rpcs, Some(block_1.clone()), mock_geth.clone())
&rpcs,
&authorization,
Some(block_1.clone()),
mock_geth.clone(),
&None,
)
.await .await
.unwrap(); .unwrap();
consensus_finder consensus_finder
.process_block_from_rpc( .process_block_from_rpc(&rpcs, Some(block_2.clone()), mock_erigon_archive.clone())
&rpcs,
&authorization,
Some(block_2.clone()),
mock_erigon_archive.clone(),
&None,
)
.await .await
.unwrap(); .unwrap();

@ -5,4 +5,3 @@ pub mod many;
pub mod one; pub mod one;
pub mod provider; pub mod provider;
pub mod request; pub mod request;
pub mod transactions;

@ -10,7 +10,7 @@ use crate::jsonrpc::{JsonRpcParams, JsonRpcResultData};
use crate::rpcs::request::RequestErrorHandler; use crate::rpcs::request::RequestErrorHandler;
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use arc_swap::ArcSwapOption; use arc_swap::ArcSwapOption;
use ethers::prelude::{Bytes, Middleware, TxHash, U64}; use ethers::prelude::{Bytes, Middleware, U64};
use ethers::types::{Address, Transaction, U256}; use ethers::types::{Address, Transaction, U256};
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
@ -93,14 +93,13 @@ impl Web3Rpc {
name: String, name: String,
chain_id: u64, chain_id: u64,
db_conn: Option<DatabaseConnection>, db_conn: Option<DatabaseConnection>,
// optional because this is only used for http providers. websocket providers don't use it // optional because this is only used for http providers. websocket-only providers don't use it
http_client: Option<reqwest::Client>, http_client: Option<reqwest::Client>,
redis_pool: Option<RedisPool>, redis_pool: Option<RedisPool>,
block_interval: Duration, block_interval: Duration,
block_map: BlocksByHashCache, block_map: BlocksByHashCache,
block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>, block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
max_head_block_age: Duration, max_head_block_age: Duration,
tx_id_sender: Option<mpsc::UnboundedSender<(TxHash, Arc<Self>)>>,
) -> anyhow::Result<(Arc<Web3Rpc>, Web3ProxyJoinHandle<()>)> { ) -> anyhow::Result<(Arc<Web3Rpc>, Web3ProxyJoinHandle<()>)> {
let created_at = Instant::now(); let created_at = Instant::now();
@ -126,12 +125,6 @@ impl Web3Rpc {
} }
}; };
let tx_id_sender = if config.subscribe_txs {
tx_id_sender
} else {
None
};
let backup = config.backup; let backup = config.backup;
let block_data_limit: AtomicU64 = config.block_data_limit.unwrap_or_default().into(); let block_data_limit: AtomicU64 = config.block_data_limit.unwrap_or_default().into();
@ -210,18 +203,11 @@ impl Web3Rpc {
// subscribe to new blocks and new transactions // subscribe to new blocks and new transactions
// subscribing starts the connection (with retries) // subscribing starts the connection (with retries)
// TODO: make transaction subscription optional (just pass None for tx_id_sender)
let handle = { let handle = {
let new_connection = new_connection.clone(); let new_connection = new_connection.clone();
tokio::spawn(async move { tokio::spawn(async move {
// TODO: this needs to be a subscribe_with_reconnect that does a retry with jitter and exponential backoff
new_connection new_connection
.subscribe_with_reconnect( .subscribe_with_reconnect(block_map, block_and_rpc_sender, chain_id)
block_map,
block_and_rpc_sender,
chain_id,
tx_id_sender,
)
.await .await
}) })
}; };
@ -596,23 +582,18 @@ impl Web3Rpc {
Ok(()) Ok(())
} }
/// TODO: this needs to be a subscribe_with_reconnect that does a retry with jitter and exponential backoff
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
async fn subscribe_with_reconnect( async fn subscribe_with_reconnect(
self: Arc<Self>, self: Arc<Self>,
block_map: BlocksByHashCache, block_map: BlocksByHashCache,
block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>, block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
chain_id: u64, chain_id: u64,
tx_id_sender: Option<mpsc::UnboundedSender<(TxHash, Arc<Self>)>>,
) -> Web3ProxyResult<()> { ) -> Web3ProxyResult<()> {
loop { loop {
if let Err(err) = self if let Err(err) = self
.clone() .clone()
.subscribe( .subscribe(block_map.clone(), block_and_rpc_sender.clone(), chain_id)
block_map.clone(),
block_and_rpc_sender.clone(),
chain_id,
tx_id_sender.clone(),
)
.await .await
{ {
if self.should_disconnect() { if self.should_disconnect() {
@ -645,7 +626,6 @@ impl Web3Rpc {
block_map: BlocksByHashCache, block_map: BlocksByHashCache,
block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>, block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
chain_id: u64, chain_id: u64,
tx_id_sender: Option<mpsc::UnboundedSender<(TxHash, Arc<Self>)>>,
) -> Web3ProxyResult<()> { ) -> Web3ProxyResult<()> {
let error_handler = if self.backup { let error_handler = if self.backup {
Some(RequestErrorHandler::DebugLevel) Some(RequestErrorHandler::DebugLevel)
@ -766,18 +746,6 @@ impl Web3Rpc {
futures.push(flatten_handle(tokio::spawn(f))); futures.push(flatten_handle(tokio::spawn(f)));
} }
// subscribe pending transactions
// TODO: make this opt-in. its a lot of bandwidth
if let Some(tx_id_sender) = tx_id_sender {
let subscribe_stop_rx = subscribe_stop_tx.subscribe();
let f = self
.clone()
.subscribe_pending_transactions(tx_id_sender, subscribe_stop_rx);
futures.push(flatten_handle(tokio::spawn(f)));
}
// exit if any of the futures exit // exit if any of the futures exit
let first_exit = futures.next().await; let first_exit = futures.next().await;
@ -809,11 +777,16 @@ impl Web3Rpc {
trace!("subscribing to new heads on {}", self); trace!("subscribing to new heads on {}", self);
// TODO: different handler depending on backup or not // TODO: different handler depending on backup or not
let error_handler = None; let error_handler = if self.backup {
let authorization = Default::default(); Some(Level::DEBUG.into())
} else {
Some(Level::ERROR.into())
};
if let Some(ws_provider) = self.ws_provider.load().as_ref() { if let Some(ws_provider) = self.ws_provider.load().as_ref() {
// todo: move subscribe_blocks onto the request handle // todo: move subscribe_blocks onto the request handle
let authorization = Default::default();
let active_request_handle = self let active_request_handle = self
.wait_for_request_handle(&authorization, None, error_handler) .wait_for_request_handle(&authorization, None, error_handler)
.await; .await;
@ -826,11 +799,10 @@ impl Web3Rpc {
// TODO: how does this get wrapped in an arc? does ethers handle that? // TODO: how does this get wrapped in an arc? does ethers handle that?
// TODO: send this request to the ws_provider instead of the http_provider // TODO: send this request to the ws_provider instead of the http_provider
let latest_block: Result<Option<ArcBlock>, _> = self let latest_block: Result<Option<ArcBlock>, _> = self
.authorized_request( .internal_request(
"eth_getBlockByNumber", "eth_getBlockByNumber",
&("latest", false), &("latest", false),
&authorization, error_handler,
Some(Level::WARN.into()),
Some(2), Some(2),
Some(Duration::from_secs(5)), Some(Duration::from_secs(5)),
) )
@ -863,11 +835,10 @@ impl Web3Rpc {
} }
let block_result = self let block_result = self
.authorized_request::<_, Option<ArcBlock>>( .internal_request::<_, Option<ArcBlock>>(
"eth_getBlockByNumber", "eth_getBlockByNumber",
&("latest", false), &("latest", false),
&authorization, error_handler,
Some(Level::WARN.into()),
Some(2), Some(2),
Some(Duration::from_secs(5)), Some(Duration::from_secs(5)),
) )
@ -894,71 +865,6 @@ impl Web3Rpc {
} }
} }
/// Turn on the firehose of pending transactions
async fn subscribe_pending_transactions(
self: Arc<Self>,
tx_id_sender: mpsc::UnboundedSender<(TxHash, Arc<Self>)>,
mut subscribe_stop_rx: watch::Receiver<bool>,
) -> Web3ProxyResult<()> {
// TODO: check that it actually changed to true
loop {
if *subscribe_stop_rx.borrow_and_update() {
break;
}
subscribe_stop_rx.changed().await?;
}
/*
trace!("watching pending transactions on {}", self);
// TODO: does this keep the lock open for too long?
match provider.as_ref() {
Web3Provider::Http(_provider) => {
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
self.wait_for_disconnect().await?;
}
Web3Provider::Both(_, client) | Web3Provider::Ws(client) => {
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
let active_request_handle = self
.wait_for_request_handle(&authorization, None, Some(provider.clone()))
.await?;
let mut stream = client.subscribe_pending_txs().await?;
drop(active_request_handle);
while let Some(pending_tx_id) = stream.next().await {
tx_id_sender
.send_async((pending_tx_id, self.clone()))
.await
.context("tx_id_sender")?;
// TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription
// TODO: select on this instead of checking every loop
if self.should_disconnect() {
break;
}
}
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
debug!("pending_transactions subscription ended on {}", self);
}
#[cfg(test)]
Web3Provider::Mock => {
self.wait_for_disconnect().await?;
}
}
*/
if *subscribe_stop_rx.borrow() {
Ok(())
} else {
Err(anyhow!("pending_transactions subscription exited. reconnect needed").into())
}
}
pub async fn wait_for_request_handle( pub async fn wait_for_request_handle(
self: &Arc<Self>, self: &Arc<Self>,
authorization: &Arc<Authorization>, authorization: &Arc<Authorization>,

@ -1,118 +0,0 @@
//! Load balanced communication with a group of web3 providers
use super::many::Web3Rpcs;
use super::one::Web3Rpc;
use super::request::OpenRequestResult;
use crate::errors::Web3ProxyResult;
use crate::frontend::authorization::Authorization;
use ethers::prelude::{ProviderError, Transaction, TxHash};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{debug, trace, Level};
// TODO: think more about TxState
#[derive(Clone)]
pub enum TxStatus {
Pending(Transaction),
Confirmed(Transaction),
Orphaned(Transaction),
}
impl Web3Rpcs {
async fn query_transaction_status(
&self,
authorization: &Arc<Authorization>,
rpc: Arc<Web3Rpc>,
pending_tx_id: TxHash,
) -> Result<Option<TxStatus>, ProviderError> {
// TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!)
// TODO: might not be a race. might be a nonce thats higher than the current account nonce. geth discards chains
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
// TODO: if one rpc fails, try another?
// TODO: try_request_handle, or wait_for_request_handle? I think we want wait here
let tx: Transaction = match rpc
.try_request_handle(authorization, Some(Level::WARN.into()))
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
handle
.request("eth_getTransactionByHash", &(pending_tx_id,))
.await?
}
Ok(_) => {
// TODO: actually retry?
return Ok(None);
}
Err(err) => {
trace!(
"cancelled funneling transaction {} from {}: {:?}",
pending_tx_id,
rpc,
err,
);
return Ok(None);
}
};
match &tx.block_hash {
Some(_block_hash) => {
// the transaction is already confirmed. no need to save in the pending_transactions map
Ok(Some(TxStatus::Confirmed(tx)))
}
None => Ok(Some(TxStatus::Pending(tx))),
}
}
/// dedupe transaction and send them to any listening clients
pub(super) async fn process_incoming_tx_id(
self: Arc<Self>,
authorization: Arc<Authorization>,
rpc: Arc<Web3Rpc>,
pending_tx_id: TxHash,
pending_tx_sender: broadcast::Sender<TxStatus>,
) -> Web3ProxyResult<()> {
// TODO: how many retries? until some timestamp is hit is probably better. maybe just loop and call this with a timeout
// TODO: after more investigation, i don't think retries will help. i think this is because chains of transactions get dropped from memory
// TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it?
if pending_tx_sender.receiver_count() == 0 {
// no receivers, so no point in querying to get the full transaction
return Ok(());
}
// trace!(?pending_tx_id, "checking pending_transactions on {}", rpc);
if self.pending_transaction_cache.get(&pending_tx_id).is_some() {
// this transaction has already been processed
return Ok(());
}
// query the rpc for this transaction
// it is possible that another rpc is also being queried. thats fine. we want the fastest response
match self
.query_transaction_status(&authorization, rpc.clone(), pending_tx_id)
.await
{
Ok(Some(tx_state)) => {
let _ = pending_tx_sender.send(tx_state);
trace!("sent tx {:?}", pending_tx_id);
// we sent the transaction. return now. don't break looping because that gives a warning
return Ok(());
}
Ok(None) => {}
Err(err) => {
trace!("failed fetching transaction {:?}: {:?}", pending_tx_id, err);
// unable to update the entry. sleep and try again soon
// TODO: retry with exponential backoff with jitter starting from a much smaller time
// sleep(Duration::from_millis(100)).await;
}
}
// warn is too loud. this is somewhat common
// "There is a Pending txn with a lower account nonce. This txn can only be executed after confirmation of the earlier Txn Hash#"
// sometimes it's been pending for many hours
// sometimes it's maybe something else?
debug!("txid {} not found on {}", pending_tx_id, rpc);
Ok(())
}
}

@ -17,12 +17,6 @@ use tracing::info;
#[ignore = "under construction"] #[ignore = "under construction"]
#[test_log::test(tokio::test)] #[test_log::test(tokio::test)]
async fn test_admin_imitate_user() { async fn test_admin_imitate_user() {
let a: TestAnvil = TestAnvil::spawn(31337).await;
let db = TestMysql::spawn().await;
let x = TestApp::spawn(&a, Some(&db)).await;
todo!(); todo!();
} }
@ -75,10 +69,5 @@ async fn test_admin_grant_credits() {
#[ignore = "under construction"] #[ignore = "under construction"]
#[test_log::test(tokio::test)] #[test_log::test(tokio::test)]
async fn test_admin_change_user_tier() { async fn test_admin_change_user_tier() {
let a = TestAnvil::spawn(31337).await;
let db = TestMysql::spawn().await;
let x = TestApp::spawn(&a, Some(&db)).await;
todo!(); todo!();
} }