From 36bfd4bdcc9eea0654e63dd8fdcc1165eaf05350 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 16 Jun 2022 17:51:49 +0000 Subject: [PATCH] pass more handles around --- web3-proxy/src/app.rs | 54 ++++++++++---- web3-proxy/src/config.rs | 9 ++- web3-proxy/src/connections.rs | 136 ++++++++++++++++++++-------------- web3-proxy/src/main.rs | 2 +- 4 files changed, 129 insertions(+), 72 deletions(-) diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index b91255ec..977d18b8 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -3,14 +3,16 @@ use dashmap::mapref::entry::Entry as DashMapEntry; use dashmap::DashMap; use ethers::prelude::Transaction; use ethers::prelude::{Block, TxHash, H256}; -use futures::future::Abortable; use futures::future::{join_all, AbortHandle}; +use futures::future::{try_join_all, Abortable}; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; +use futures::Future; use linkedhashmap::LinkedHashMap; use parking_lot::RwLock; use serde_json::json; use std::fmt; +use std::pin::Pin; use std::sync::atomic::{self, AtomicUsize}; use std::sync::Arc; use std::time::Duration; @@ -54,6 +56,20 @@ pub async fn flatten_handle(handle: AnyhowJoinHandle) -> anyhow::Result } } +pub async fn flatten_handles( + mut handles: FuturesUnordered>, +) -> anyhow::Result<()> { + while let Some(x) = handles.next().await { + match x { + Err(e) => return Err(e.into()), + Ok(Err(e)) => return Err(e), + Ok(Ok(())) => {} + } + } + + Ok(()) +} + #[derive(Clone)] pub enum TxState { Known(TxHash), @@ -65,6 +81,7 @@ pub enum TxState { /// The application // TODO: this debug impl is way too verbose. make something smaller // TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs +// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard pub struct Web3ProxyApp { /// Send requests to the best server available balanced_rpcs: Arc, @@ -76,6 +93,8 @@ pub struct Web3ProxyApp { // TODO: broadcast channel instead? head_block_receiver: watch::Receiver>, pending_tx_sender: broadcast::Sender, + pending_tx_receiver: broadcast::Receiver, + pending_transactions: Arc>, next_subscription_id: AtomicUsize, } @@ -92,9 +111,12 @@ impl Web3ProxyApp { redis_address: Option, balanced_rpcs: Vec, private_rpcs: Vec, - ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { + ) -> anyhow::Result<( + Arc, + Pin>>>, + )> { // TODO: try_join_all instead - let mut handles = FuturesUnordered::new(); + let handles = FuturesUnordered::new(); // make a http shared client // TODO: how should we configure the connection pool? @@ -127,7 +149,12 @@ impl Web3ProxyApp { let (head_block_sender, head_block_receiver) = watch::channel(Block::default()); // TODO: will one receiver lagging be okay? let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(16); - drop(pending_tx_receiver); + + let pending_transactions = Arc::new(DashMap::new()); + + // TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them + // TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks. + // TODO: we should still have some sort of expiration or maximum size limit for the map // TODO: attach context to this error let (balanced_rpcs, balanced_handle) = Web3Connections::spawn( @@ -137,6 +164,7 @@ impl Web3ProxyApp { rate_limiter.as_ref(), Some(head_block_sender), Some(pending_tx_sender.clone()), + pending_transactions.clone(), ) .await?; @@ -155,7 +183,8 @@ impl Web3ProxyApp { // subscribing to new heads here won't work well None, // TODO: subscribe to pending transactions on the private rpcs? - None, + Some(pending_tx_sender.clone()), + pending_transactions.clone(), ) .await?; @@ -171,23 +200,16 @@ impl Web3ProxyApp { response_cache: Default::default(), head_block_receiver, pending_tx_sender, + pending_tx_receiver, + pending_transactions, next_subscription_id: 1.into(), }; let app = Arc::new(app); // create a handle that returns on the first error - let handle = tokio::spawn(async move { - while let Some(x) = handles.next().await { - match x { - Err(e) => return Err(e.into()), - Ok(Err(e)) => return Err(e), - Ok(Ok(())) => {} - } - } - - Ok(()) - }); + // TODO: move this to a helper. i think Web3Connections needs it too + let handle = Box::pin(flatten_handles(handles)); Ok((app, handle)) } diff --git a/web3-proxy/src/config.rs b/web3-proxy/src/config.rs index ca5719fb..4f22bd9f 100644 --- a/web3-proxy/src/config.rs +++ b/web3-proxy/src/config.rs @@ -1,7 +1,9 @@ use argh::FromArgs; use ethers::prelude::{Block, TxHash}; +use futures::Future; use serde::Deserialize; use std::collections::HashMap; +use std::pin::Pin; use std::sync::Arc; use crate::app::AnyhowJoinHandle; @@ -49,7 +51,12 @@ pub struct Web3ConnectionConfig { impl RpcConfig { /// Create a Web3ProxyApp from config // #[instrument(name = "try_build_RpcConfig", skip_all)] - pub async fn spawn(self) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { + pub async fn spawn( + self, + ) -> anyhow::Result<( + Arc, + Pin>>>, + )> { let balanced_rpcs = self.balanced_rpcs.into_values().collect(); let private_rpcs = if let Some(private_rpcs) = self.private_rpcs { diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 16d26527..e2029f73 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -21,7 +21,7 @@ use std::time::Duration; use tokio::sync::{broadcast, watch}; use tokio::task; use tokio::time::sleep; -use tracing::{debug, info, info_span, instrument, trace, warn}; +use tracing::{debug, error, info, info_span, instrument, trace, warn}; use crate::app::{flatten_handle, AnyhowJoinHandle, TxState}; use crate::config::Web3ConnectionConfig; @@ -57,7 +57,7 @@ impl SyncedConnections { pub struct Web3Connections { inner: Vec>, synced_connections: ArcSwap, - pending_transactions: DashMap, + pending_transactions: Arc>, } impl Serialize for Web3Connections { @@ -93,11 +93,12 @@ impl Web3Connections { rate_limiter: Option<&redis_cell_client::MultiplexedConnection>, head_block_sender: Option>>, pending_tx_sender: Option>, + pending_transactions: Arc>, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let num_connections = server_configs.len(); // TODO: try_join_all - let handles = FuturesUnordered::new(); + let mut handles = vec![]; // TODO: only create these if head_block_sender and pending_tx_sender are set let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); @@ -118,7 +119,7 @@ impl Web3Connections { .await { Ok((connection, connection_handle)) => { - handles.push(connection_handle); + handles.push(flatten_handle(connection_handle)); connections.push(connection) } Err(e) => warn!("Unable to connect to a server! {:?}", e), @@ -135,13 +136,14 @@ impl Web3Connections { let connections = Arc::new(Self { inner: connections, synced_connections: ArcSwap::new(Arc::new(synced_connections)), - pending_transactions: Default::default(), + pending_transactions, }); let handle = { let connections = connections.clone(); tokio::spawn(async move { + // TODO: try_join_all with the other handles here connections .subscribe( pending_tx_id_receiver, @@ -156,71 +158,96 @@ impl Web3Connections { Ok((connections, handle)) } - async fn send_transaction( + async fn _funnel_transaction( + &self, + rpc: Arc, + pending_tx_id: TxHash, + ) -> Result { + // TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself + // TODO: there is a race here sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!) + let pending_transaction: Transaction = rpc + .wait_for_request_handle() + .await + .request("eth_getTransactionByHash", (pending_tx_id,)) + .await?; + + trace!(?pending_transaction, "pending"); + + // TODO: do not unwrap. orphans might make this unsafe + match &pending_transaction.block_hash { + Some(_block_hash) => { + // the transaction is already confirmed. no need to save in the pending_transactions map + Ok(TxState::Confirmed(pending_transaction)) + } + None => Ok(TxState::Pending(pending_transaction)), + } + } + + async fn funnel_transaction( self: Arc, rpc: Arc, pending_tx_id: TxHash, pending_tx_sender: broadcast::Sender, ) -> anyhow::Result<()> { + // TODO: how many retries? until some timestamp is hit is probably better. maybe just loop and call this with a timeout for i in 0..30 { // TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it? - match self.pending_transactions.entry(pending_tx_id) { - DashMapEntry::Occupied(_entry) => { - // TODO: if its occupied, but still only "Known", multiple nodes have this transaction. ask both - return Ok(()); + info!(?pending_tx_id, "checking pending_transactions"); + let tx_state = match self.pending_transactions.entry(pending_tx_id) { + DashMapEntry::Occupied(mut entry) => { + if let TxState::Known(_) = entry.get() { + // TODO: if its occupied, but still only "Known", multiple nodes have this transaction. ask both + match self._funnel_transaction(rpc.clone(), pending_tx_id).await { + Ok(tx_state) => { + entry.insert(tx_state.clone()); + + Some(tx_state) + } + Err(err) => { + debug!( + ?i, + ?err, + ?pending_tx_id, + "failed sending transaction (retry/race)" + ); + + None + } + } + } else { + None + } } DashMapEntry::Vacant(entry) => { - let request_handle = rpc.wait_for_request_handle().await; - // TODO: how many retries? // TODO: use a generic retry provider instead? - let tx_result = request_handle - .request("eth_getTransactionByHash", (pending_tx_id,)) - .await; + match self._funnel_transaction(rpc.clone(), pending_tx_id).await { + Ok(tx_state) => { + entry.insert(tx_state.clone()); - // TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself - // TODO: there is a race here sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!) - let pending_transaction = match tx_result { - Ok(tx) => Some(tx), + Some(tx_state) + } Err(err) => { - trace!( - ?i, - ?err, - ?pending_tx_id, - "error getting transaction by hash" - ); + debug!(?i, ?err, ?pending_tx_id, "failed sending transaction"); - // TODO: how long? exponential backoff? - sleep(Duration::from_millis(100)).await; - continue; + None } - }; - - trace!(?pending_transaction, "pending"); - - let pending_transaction: Transaction = pending_transaction.unwrap(); - - // TODO: do not unwrap. orphans might make this unsafe - let tx_state = match &pending_transaction.block_hash { - Some(_block_hash) => { - // the transaction is already confirmed. no need to save in the pending_transactions map - TxState::Confirmed(pending_transaction) - } - None => { - let state = TxState::Pending(pending_transaction); - entry.insert(state.clone()); - state - } - }; - - // TODO: maybe we should just send the txid and they can get it from the dashmap? - let _ = pending_tx_sender.send(tx_state); - - info!(?pending_tx_id, "sent"); - - return Ok(()); + } } + }; + + if let Some(tx_state) = tx_state { + let _ = pending_tx_sender.send(tx_state); + + info!(?pending_tx_id, "sent"); + + // since we sent a transaction, we should return + return Ok(()); } + + // unable to update the entry. sleep and try again soon + // TODO: exponential backoff with jitter starting from a much smaller time + sleep(Duration::from_millis(3000)).await; } info!(?pending_tx_id, "not found"); @@ -247,7 +274,7 @@ impl Web3Connections { let handle = task::spawn(async move { while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await { // TODO: spawn this - let f = clone.clone().send_transaction( + let f = clone.clone().funnel_transaction( rpc, pending_tx_id, pending_tx_sender.clone(), @@ -285,6 +312,7 @@ impl Web3Connections { } if let Err(e) = try_join_all(futures).await { + error!("subscriptions over: {:?}", self); return Err(e); } diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index 5d5a9f98..56cc64b3 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -89,7 +89,7 @@ fn main() -> anyhow::Result<()> { // if everything is working, these should both run forever tokio::select! { - x = flatten_handle(app_handle) => { + x = app_handle => { // TODO: error log if error info!(?x, "app_handle exited"); }