diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 914cfc0a..642752ce 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -6,10 +6,11 @@ use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; use axum::extract::ws::Message; use dashmap::DashMap; -use ethers::prelude::TransactionReceipt; +use ethers::prelude::Transaction; use ethers::prelude::{Block, TxHash, H256}; use futures::future::Abortable; use futures::future::{join_all, AbortHandle}; +use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use linkedhashmap::LinkedHashMap; use parking_lot::RwLock; @@ -19,6 +20,7 @@ use std::sync::atomic::{self, AtomicUsize}; use std::sync::Arc; use std::time::Duration; use tokio::sync::watch; +use tokio::task::JoinHandle; use tokio::time::timeout; use tokio_stream::wrappers::WatchStream; use tracing::{debug, info, info_span, instrument, trace, warn, Instrument}; @@ -40,6 +42,16 @@ type ResponseLrcCache = RwLock type ActiveRequestsMap = DashMap>; +pub type AnyhowJoinHandle = JoinHandle>; + +pub async fn flatten_handle(handle: AnyhowJoinHandle) -> anyhow::Result { + match handle.await { + Ok(Ok(result)) => Ok(result), + Ok(Err(err)) => Err(err), + Err(err) => Err(err.into()), + } +} + /// The application // TODO: this debug impl is way too verbose. make something smaller // TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs @@ -53,7 +65,7 @@ pub struct Web3ProxyApp { // don't drop this or the sender will stop working head_block_receiver: watch::Receiver>, // TODO: i think we want a TxState enum for Confirmed(TxHash, BlockHash) or Pending(TxHash) or Orphan(TxHash, BlockHash) - pending_tx_receipt_receiver: flume::Receiver, + pending_tx_receiver: flume::Receiver, next_subscription_id: AtomicUsize, } @@ -70,7 +82,9 @@ impl Web3ProxyApp { redis_address: Option, balanced_rpcs: Vec, private_rpcs: Vec, - ) -> anyhow::Result> { + ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { + let mut handles = FuturesUnordered::new(); + // make a http shared client // TODO: how should we configure the connection pool? // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server @@ -100,35 +114,41 @@ impl Web3ProxyApp { // TODO: subscribe to pending transactions on the private rpcs, too? let (head_block_sender, head_block_receiver) = watch::channel(Block::default()); - let (pending_tx_receipt_sender, pending_tx_receipt_receiver) = flume::unbounded(); + let (pending_tx_sender, pending_tx_receiver) = flume::unbounded(); // TODO: attach context to this error - let balanced_rpcs = Web3Connections::spawn( + let (balanced_rpcs, balanced_handle) = Web3Connections::spawn( chain_id, balanced_rpcs, http_client.as_ref(), rate_limiter.as_ref(), Some(head_block_sender), - Some(pending_tx_receipt_sender), + Some(pending_tx_sender), ) .await?; + handles.push(balanced_handle); + let private_rpcs = if private_rpcs.is_empty() { warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); balanced_rpcs.clone() } else { // TODO: attach context to this error - Web3Connections::spawn( + let (private_rpcs, private_handle) = Web3Connections::spawn( chain_id, private_rpcs, http_client.as_ref(), rate_limiter.as_ref(), // subscribing to new heads here won't work well None, - // TODO: subscribe to pending transactions on the private rpcs, too? + // TODO: subscribe to pending transactions on the private rpcs? None, ) - .await? + .await?; + + handles.push(private_handle); + + private_rpcs }; let app = Web3ProxyApp { @@ -137,13 +157,26 @@ impl Web3ProxyApp { incoming_requests: Default::default(), response_cache: Default::default(), head_block_receiver, - pending_tx_receipt_receiver, + pending_tx_receiver, next_subscription_id: 1.into(), }; let app = Arc::new(app); - Ok(app) + // create a handle that returns on the first error + let handle = tokio::spawn(async move { + while let Some(x) = handles.next().await { + match x { + Err(e) => return Err(e.into()), + Ok(Err(e)) => return Err(e), + Ok(Ok(())) => {} + } + } + + Ok(()) + }); + + Ok((app, handle)) } pub async fn eth_subscribe( diff --git a/web3-proxy/src/config.rs b/web3-proxy/src/config.rs index 4bb0e98f..5920c0dc 100644 --- a/web3-proxy/src/config.rs +++ b/web3-proxy/src/config.rs @@ -1,11 +1,12 @@ +use crate::app::AnyhowJoinHandle; +use crate::connection::Web3Connection; +use crate::Web3ProxyApp; use argh::FromArgs; +use ethers::prelude::{Block, TxHash}; use serde::Deserialize; use std::collections::HashMap; use std::sync::Arc; -use crate::connection::Web3Connection; -use crate::Web3ProxyApp; - #[derive(Debug, FromArgs)] /// Web3-proxy is a fast caching and load balancing proxy for web3 (Ethereum or similar) JsonRPC servers. pub struct CliConfig { @@ -47,7 +48,7 @@ pub struct Web3ConnectionConfig { impl RpcConfig { /// Create a Web3ProxyApp from config // #[instrument(name = "try_build_RpcConfig", skip_all)] - pub async fn try_build(self) -> anyhow::Result> { + pub async fn spawn(self) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let balanced_rpcs = self.balanced_rpcs.into_values().collect(); let private_rpcs = if let Some(private_rpcs) = self.private_rpcs { @@ -69,13 +70,16 @@ impl RpcConfig { impl Web3ConnectionConfig { /// Create a Web3Connection from config // #[instrument(name = "try_build_Web3ConnectionConfig", skip_all)] - pub async fn try_build( + pub async fn spawn( self, - redis_conn: Option<&redis_cell_client::MultiplexedConnection>, + rate_limiter: Option<&redis_cell_client::MultiplexedConnection>, chain_id: usize, http_client: Option<&reqwest::Client>, - ) -> anyhow::Result> { - let hard_rate_limit = self.hard_limit.map(|x| (x, redis_conn.unwrap())); + block_sender: Option, Arc)>>, + tx_id_sender: Option)>>, + reconnect: bool, + ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { + let hard_rate_limit = self.hard_limit.map(|x| (x, rate_limiter.unwrap())); Web3Connection::spawn( chain_id, @@ -83,6 +87,9 @@ impl Web3ConnectionConfig { http_client, hard_rate_limit, self.soft_limit, + block_sender, + tx_id_sender, + reconnect, ) .await } diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 5bfefc32..46e4308f 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -9,10 +9,13 @@ use std::fmt; use std::hash::{Hash, Hasher}; use std::sync::atomic::{self, AtomicU32}; use std::{cmp::Ordering, sync::Arc}; +use tokio::sync::oneshot; use tokio::sync::RwLock; use tokio::task; use tokio::time::{interval, sleep, Duration, MissedTickBehavior}; -use tracing::{info, instrument, trace, warn}; +use tracing::{error, info, instrument, trace, warn}; + +use crate::app::AnyhowJoinHandle; /// TODO: instead of an enum, I tried to use Box, but hit https://github.com/gakonst/ethers-rs/issues/592 #[derive(From)] @@ -117,7 +120,8 @@ impl fmt::Display for Web3Connection { impl Web3Connection { /// Connect to a web3 rpc - #[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))] + // #[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))] + #[allow(clippy::too_many_arguments)] pub async fn spawn( chain_id: usize, url_str: String, @@ -126,7 +130,10 @@ impl Web3Connection { hard_limit: Option<(u32, &redis_cell_client::MultiplexedConnection)>, // TODO: think more about this type soft_limit: u32, - ) -> anyhow::Result> { + block_sender: Option, Arc)>>, + tx_id_sender: Option)>>, + reconnect: bool, + ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let hard_limit = hard_limit.map(|(hard_rate_limit, redis_conection)| { // TODO: allow different max_burst and count_per_period and period let period = 1; @@ -172,7 +179,7 @@ impl Web3Connection { found_chain_id )); } else { - info!("Successful connection"); + info!(?connection, "success"); } } Err(e) => { @@ -181,7 +188,16 @@ impl Web3Connection { } } - Ok(connection) + let handle = { + let connection = connection.clone(); + tokio::spawn(async move { + connection + .subscribe(block_sender, tx_id_sender, reconnect) + .await + }) + }; + + Ok((connection, handle)) } #[instrument(skip_all)] @@ -243,50 +259,59 @@ impl Web3Connection { Ok(()) } - pub async fn subscribe( + async fn subscribe( self: Arc, - block_sender: flume::Sender<(Block, Arc)>, - tx_id_sender: flume::Sender<(TxHash, Arc)>, + block_sender: Option, Arc)>>, + tx_id_sender: Option)>>, reconnect: bool, ) -> anyhow::Result<()> { - loop { - // TODO: make these abortable so that if one fails the other can be cancelled? + match (block_sender, tx_id_sender) { + (None, None) => { + // TODO: is there a better way to make a channel that is never ready? + let (tx, rx) = oneshot::channel::<()>(); + rx.await?; + drop(tx); + } + (Some(block_sender), Some(tx_id_sender)) => { + // TODO: make these abortable so that if one fails the other can be cancelled? + loop { + let new_heads = { + let clone = self.clone(); + let block_sender = block_sender.clone(); - let new_heads = { - let clone = self.clone(); - let block_sender = block_sender.clone(); + clone.subscribe_new_heads(block_sender) + }; - clone.subscribe_new_heads(block_sender) - }; + let pending_txs = { + let clone = self.clone(); + let tx_id_sender = tx_id_sender.clone(); - let pending_txs = { - let clone = self.clone(); - let tx_id_sender = tx_id_sender.clone(); + clone.subscribe_pending_transactions(tx_id_sender) + }; - clone.subscribe_pending_transactions(tx_id_sender) - }; + match tokio::try_join!(new_heads, pending_txs) { + Ok(_) => break, + Err(err) => { + if reconnect { + // TODO: exponential backoff + // TODO: share code with new heads subscription + warn!( + "subscription exited. Attempting to reconnect in 1 second. {:?}", err + ); + sleep(Duration::from_secs(1)).await; - tokio::select! { - _ = new_heads => { - info!(?self, "new heads subscription completed"); - } - _ = pending_txs => { - info!(?self, "pending transactions subscription completed"); + // TODO: loop on reconnecting! do not return with a "?" here + // TODO: this isn't going to work. it will get in a loop with newHeads + self.reconnect(&block_sender).await?; + } else { + error!("subscription exited. {:?}", err); + break; + } + } + }; } } - - if reconnect { - // TODO: exponential backoff - // TODO: share code with new heads subscription - warn!("pending transactions subscription exited. Attempting to reconnect in 1 second..."); - sleep(Duration::from_secs(1)).await; - - // TODO: loop on reconnecting! do not return with a "?" here - // TODO: this isn't going to work. it will get in a loop with newHeads - self.reconnect(&block_sender).await?; - } else { - break; - } + _ => panic!(), } Ok(()) @@ -299,7 +324,7 @@ impl Web3Connection { self: Arc, block_sender: flume::Sender<(Block, Arc)>, ) -> anyhow::Result<()> { - info!("Watching new_heads on {}", self); + info!("watching {}", self); // TODO: is a RwLock of an Option the right thing here? if let Some(provider) = self.provider.read().await.clone() { @@ -390,7 +415,7 @@ impl Web3Connection { self: Arc, tx_id_sender: flume::Sender<(TxHash, Arc)>, ) -> anyhow::Result<()> { - info!("watching pending transactions on {}", self); + info!("watching {}", self); // TODO: is a RwLock of an Option the right thing here? if let Some(provider) = self.provider.read().await.clone() { @@ -400,7 +425,7 @@ impl Web3Connection { // TODO: what should this interval be? probably automatically set to some fraction of block time // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now // TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though - let mut interval = interval(Duration::from_secs(2)); + let mut interval = interval(Duration::from_secs(60)); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); // TODO: create a filter @@ -410,6 +435,8 @@ impl Web3Connection { // TODO: if error or rate limit, increase interval? interval.tick().await; + // TODO: actually do something here + /* match self.try_request_handle().await { Ok(active_request_handle) => { // TODO: check the filter @@ -419,6 +446,7 @@ impl Web3Connection { warn!("Failed getting latest block from {}: {:?}", self, e); } } + */ } } Web3Provider::Ws(provider) => { @@ -469,7 +497,7 @@ impl Web3Connection { } } - // TODO: what should we do? + // TODO: what should we do? panic isn't ever what we want panic!("no request handle after 10 tries"); } diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 633acc7e..1a4f4f3e 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -2,7 +2,7 @@ use arc_swap::ArcSwap; use counter::Counter; use derive_more::From; -use ethers::prelude::{Block, ProviderError, TransactionReceipt, TxHash, H256}; +use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256}; use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::HashMap; @@ -17,9 +17,9 @@ use std::time::Duration; use tokio::sync::watch; use tokio::task; use tokio::time::sleep; -use tracing::Instrument; use tracing::{debug, info, info_span, instrument, trace, warn}; +use crate::app::AnyhowJoinHandle; use crate::config::Web3ConnectionConfig; use crate::connection::{ActiveRequestHandle, Web3Connection}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; @@ -82,22 +82,38 @@ impl Web3Connections { // #[instrument(name = "spawn_Web3Connections", skip_all)] pub async fn spawn( chain_id: usize, - servers: Vec, + server_configs: Vec, http_client: Option<&reqwest::Client>, rate_limiter: Option<&redis_cell_client::MultiplexedConnection>, head_block_sender: Option>>, - pending_tx_receipt_sender: Option>, - ) -> anyhow::Result> { - let num_connections = servers.len(); + pending_tx_sender: Option>, + ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { + let num_connections = server_configs.len(); + + let handles = FuturesUnordered::new(); + + // TODO: only create these if head_block_sender and pending_tx_sender are set + let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); + let (block_sender, block_receiver) = flume::unbounded(); // turn configs into connections let mut connections = Vec::with_capacity(num_connections); - for server_config in servers.into_iter() { + for server_config in server_configs.into_iter() { match server_config - .try_build(rate_limiter, chain_id, http_client) + .spawn( + rate_limiter, + chain_id, + http_client, + Some(block_sender.clone()), + Some(pending_tx_id_sender.clone()), + true, + ) .await { - Ok(connection) => connections.push(connection), + Ok((connection, connection_handle)) => { + handles.push(connection_handle); + connections.push(connection) + } Err(e) => warn!("Unable to connect to a server! {:?}", e), } } @@ -119,57 +135,50 @@ impl Web3Connections { tokio::spawn(async move { connections - .subscribe(head_block_sender, pending_tx_receipt_sender) + .subscribe( + pending_tx_id_sender, + pending_tx_id_receiver, + block_receiver, + head_block_sender, + pending_tx_sender, + ) .await }) }; - Ok(connections) + Ok((connections, handle)) } /// subscribe to all the backend rpcs async fn subscribe( self: Arc, + pending_tx_id_sender: flume::Sender<(TxHash, Arc)>, + pending_tx_id_receiver: flume::Receiver<(TxHash, Arc)>, + block_receiver: flume::Receiver<(Block, Arc)>, head_block_sender: Option>>, - pending_tx_receipt_sender: Option>, + pending_tx_sender: Option>, ) -> anyhow::Result<()> { let mut futures = FuturesUnordered::new(); - // subscribe to pending transactions - let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); - let (block_sender, block_receiver) = flume::unbounded(); - - // one future subscribes to pendingTransactions on all the rpcs. it sends them through the funnel - // TODO: do this only when someone is subscribed. otherwise this will be way too many queries - for (rpc_id, connection) in self.inner.iter().cloned().enumerate() { - let pending_tx_id_sender = pending_tx_id_sender.clone(); - let block_sender = block_sender.clone(); - - let handle = tokio::spawn(async move { - // loop to automatically reconnect - // TODO: make this cancellable? - // TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date - // TODO: proper span - connection - .subscribe(block_sender, pending_tx_id_sender, true) - .instrument(tracing::info_span!("rpc", ?rpc_id)) - .await - }); - - futures.push(handle); - } - - // the next future subscribes to the transaction funnel + // setup the transaction funnel // it skips any duplicates (unless they are being orphaned) // fetches new transactions from the notifying rpc // forwards new transacitons to pending_tx_receipt_sender - { + if let Some(pending_tx_sender) = pending_tx_sender { // TODO: do something with the handle so we can catch any errors let handle = task::spawn(async move { while let Ok((pending_transaction_id, rpc)) = pending_tx_id_receiver.recv_async().await { - unimplemented!("de-dedup the pending txid") + let request_handle = rpc.wait_for_request_handle().await; + + let pending_transaction: Transaction = request_handle + .request("eth_getTransactionByHash", (pending_transaction_id,)) + .await?; + + // unimplemented!("de-dedup the pending txid"); + + pending_tx_sender.send_async(pending_transaction).await?; } Ok(()) @@ -178,8 +187,7 @@ impl Web3Connections { futures.push(handle); } - // the next future subscribes to the block funnel - + // setup the block funnel if let Some(head_block_sender) = head_block_sender { let connections = Arc::clone(&self); let handle = task::Builder::default() @@ -193,10 +201,17 @@ impl Web3Connections { futures.push(handle); } + if futures.is_empty() { + // no transaction or block subscriptions. + unimplemented!("every second, check that the provider is still connected"); + } + if let Some(Err(e)) = futures.next().await { return Err(e.into()); } + info!("subscriptions over: {:?}", self); + Ok(()) } diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index c65a6f2c..970303c5 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -7,18 +7,17 @@ mod connections; mod frontend; mod jsonrpc; +use crate::app::{flatten_handle, Web3ProxyApp}; +use crate::config::{CliConfig, RpcConfig}; use parking_lot::deadlock; use std::fs; use std::sync::atomic::{self, AtomicUsize}; use std::thread; use std::time::Duration; use tokio::runtime; -use tracing::{info, trace}; +use tracing::{error, info, trace}; use tracing_subscriber::EnvFilter; -use crate::app::Web3ProxyApp; -use crate::config::{CliConfig, RpcConfig}; - fn main() -> anyhow::Result<()> { // if RUST_LOG isn't set, configure a default // TODO: is there a better way to do this? @@ -83,8 +82,20 @@ fn main() -> anyhow::Result<()> { // spawn the root task rt.block_on(async { - let app = rpc_config.try_build().await?; + let (app, app_handle) = rpc_config.spawn().await?; - frontend::run(cli_config.port, app).await + let frontend_handle = tokio::spawn(frontend::run(cli_config.port, app)); + + match tokio::try_join!(flatten_handle(app_handle), flatten_handle(frontend_handle)) { + Ok(_) => { + // do something with the values + info!("app completed") + } + Err(err) => { + error!(?err, "app failed"); + } + } + + Ok(()) }) }