From 4ca292c3e1c473ccd837399236edb7c8a3c5de75 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 16 Jun 2022 02:53:37 +0000 Subject: [PATCH] getting closer --- Cargo.lock | 3 + TODO.md | 3 + web3-proxy/Cargo.toml | 2 +- web3-proxy/src/app.rs | 73 +++++++++--- web3-proxy/src/config.rs | 7 +- web3-proxy/src/connection.rs | 104 +++++++++-------- web3-proxy/src/connections.rs | 158 ++++++++++++++++++-------- web3-proxy/src/frontend/errors.rs | 3 +- web3-proxy/src/frontend/http.rs | 3 +- web3-proxy/src/frontend/http_proxy.rs | 5 +- web3-proxy/src/frontend/ws_proxy.rs | 15 +-- web3-proxy/src/main.rs | 23 ++-- 12 files changed, 262 insertions(+), 137 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 473004f2..5662ccd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -73,6 +73,9 @@ name = "anyhow" version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc" +dependencies = [ + "backtrace", +] [[package]] name = "arc-swap" diff --git a/TODO.md b/TODO.md index 5e3fb031..687869fd 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,8 @@ # Todo +- [ ] it works for a few seconds and then gets stuck on something +- [ ] rpc errors propagate too far. one subscription failing ends the app. isolate the providers more +- [ ] its working with one backend node, but multiple breaks. something to do with pending transactions - [ ] if web3 proxy gets an http error back, retry another node - [x] refactor Connection::spawn. have it return a handle to the spawned future of it running with block and transaction subscriptions - [x] refactor Connections::spawn. have it return a handle that is selecting on those handles? diff --git a/web3-proxy/Cargo.toml b/web3-proxy/Cargo.toml index e177cca1..4cdc765b 100644 --- a/web3-proxy/Cargo.toml +++ b/web3-proxy/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -anyhow = "1.0.57" +anyhow = { version = "1.0.57", features = ["backtrace"] } arc-swap = "1.5.0" argh = "0.1.7" axum = { version = "0.5.7", features = ["serde_json", "tokio-tungstenite", "ws"] } diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index eb4f6720..b91255ec 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -1,9 +1,3 @@ -use crate::config::Web3ConnectionConfig; -use crate::connections::Web3Connections; -use crate::jsonrpc::JsonRpcForwardedResponse; -use crate::jsonrpc::JsonRpcForwardedResponseEnum; -use crate::jsonrpc::JsonRpcRequest; -use crate::jsonrpc::JsonRpcRequestEnum; use axum::extract::ws::Message; use dashmap::mapref::entry::Entry as DashMapEntry; use dashmap::DashMap; @@ -20,12 +14,19 @@ use std::fmt; use std::sync::atomic::{self, AtomicUsize}; use std::sync::Arc; use std::time::Duration; -use tokio::sync::watch; +use tokio::sync::{broadcast, watch}; use tokio::task::JoinHandle; use tokio::time::timeout; use tokio_stream::wrappers::WatchStream; use tracing::{debug, info, info_span, instrument, trace, warn, Instrument}; +use crate::config::Web3ConnectionConfig; +use crate::connections::Web3Connections; +use crate::jsonrpc::JsonRpcForwardedResponse; +use crate::jsonrpc::JsonRpcForwardedResponseEnum; +use crate::jsonrpc::JsonRpcRequest; +use crate::jsonrpc::JsonRpcRequestEnum; + static APP_USER_AGENT: &str = concat!( "satoshiandkin/", env!("CARGO_PKG_NAME"), @@ -53,9 +54,11 @@ pub async fn flatten_handle(handle: AnyhowJoinHandle) -> anyhow::Result } } +#[derive(Clone)] pub enum TxState { - Confirmed(Transaction), + Known(TxHash), Pending(Transaction), + Confirmed(Transaction), Orphaned(Transaction), } @@ -70,9 +73,9 @@ pub struct Web3ProxyApp { incoming_requests: ActiveRequestsMap, response_cache: ResponseLrcCache, // don't drop this or the sender will stop working + // TODO: broadcast channel instead? head_block_receiver: watch::Receiver>, - // TODO: i think we want a TxState enum for Confirmed(TxHash, BlockHash) or Pending(TxHash) or Orphan(TxHash, BlockHash) - pending_tx_receiver: flume::Receiver, + pending_tx_sender: broadcast::Sender, next_subscription_id: AtomicUsize, } @@ -122,7 +125,9 @@ impl Web3ProxyApp { // TODO: subscribe to pending transactions on the private rpcs, too? let (head_block_sender, head_block_receiver) = watch::channel(Block::default()); - let (pending_tx_sender, pending_tx_receiver) = flume::unbounded(); + // TODO: will one receiver lagging be okay? + let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(16); + drop(pending_tx_receiver); // TODO: attach context to this error let (balanced_rpcs, balanced_handle) = Web3Connections::spawn( @@ -131,7 +136,7 @@ impl Web3ProxyApp { http_client.as_ref(), rate_limiter.as_ref(), Some(head_block_sender), - Some(pending_tx_sender), + Some(pending_tx_sender.clone()), ) .await?; @@ -165,7 +170,7 @@ impl Web3ProxyApp { incoming_requests: Default::default(), response_cache: Default::default(), head_block_receiver, - pending_tx_receiver, + pending_tx_sender, next_subscription_id: 1.into(), }; @@ -241,12 +246,48 @@ impl Web3ProxyApp { }) } r#"["newPendingTransactions"]"# => { - let pending_tx_receiver = self.pending_tx_receiver.clone(); + let mut pending_tx_receiver = self.pending_tx_sender.subscribe(); trace!(?subscription_id, "pending transactions subscription"); tokio::spawn(async move { - while let Ok(new_tx_state) = pending_tx_receiver.recv_async().await { + while let Ok(new_tx_state) = pending_tx_receiver.recv().await { let new_tx = match new_tx_state { + TxState::Known(..) => continue, + TxState::Confirmed(..) => continue, + TxState::Orphaned(tx) => tx, + TxState::Pending(tx) => tx, + }; + + // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id + let msg = json!({ + "jsonrpc": "2.0", + "method": "eth_subscription", + "params": { + "subscription": subscription_id, + "result": new_tx.hash, + }, + }); + + let msg = Message::Text(serde_json::to_string(&msg).unwrap()); + + if subscription_tx.send_async(msg).await.is_err() { + // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? + break; + }; + } + + trace!(?subscription_id, "closed new heads subscription"); + }) + } + r#"["newPendingFullTransactions"]"# => { + // TODO: too much copy/pasta with newPendingTransactions + let mut pending_tx_receiver = self.pending_tx_sender.subscribe(); + + trace!(?subscription_id, "pending transactions subscription"); + tokio::spawn(async move { + while let Ok(new_tx_state) = pending_tx_receiver.recv().await { + let new_tx = match new_tx_state { + TxState::Known(..) => continue, TxState::Confirmed(..) => continue, TxState::Orphaned(tx) => tx, TxState::Pending(tx) => tx, @@ -282,6 +323,8 @@ impl Web3ProxyApp { let response = JsonRpcForwardedResponse::from_string(subscription_id, id); + // TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct? + Ok((subscription_abort_handle, response)) } diff --git a/web3-proxy/src/config.rs b/web3-proxy/src/config.rs index 5920c0dc..ca5719fb 100644 --- a/web3-proxy/src/config.rs +++ b/web3-proxy/src/config.rs @@ -1,12 +1,13 @@ -use crate::app::AnyhowJoinHandle; -use crate::connection::Web3Connection; -use crate::Web3ProxyApp; use argh::FromArgs; use ethers::prelude::{Block, TxHash}; use serde::Deserialize; use std::collections::HashMap; use std::sync::Arc; +use crate::app::AnyhowJoinHandle; +use crate::connection::Web3Connection; +use crate::Web3ProxyApp; + #[derive(Debug, FromArgs)] /// Web3-proxy is a fast caching and load balancing proxy for web3 (Ethereum or similar) JsonRPC servers. pub struct CliConfig { diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 46e4308f..d5b6dc27 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -1,6 +1,8 @@ ///! Rate-limited communication with a web3 provider +use anyhow::Context; use derive_more::From; use ethers::prelude::{Block, Middleware, ProviderError, TxHash}; +use futures::future::try_join_all; use futures::StreamExt; use redis_cell_client::RedisCellClient; use serde::ser::{SerializeStruct, Serializer}; @@ -9,13 +11,12 @@ use std::fmt; use std::hash::{Hash, Hasher}; use std::sync::atomic::{self, AtomicU32}; use std::{cmp::Ordering, sync::Arc}; -use tokio::sync::oneshot; use tokio::sync::RwLock; use tokio::task; use tokio::time::{interval, sleep, Duration, MissedTickBehavior}; use tracing::{error, info, instrument, trace, warn}; -use crate::app::AnyhowJoinHandle; +use crate::app::{flatten_handle, AnyhowJoinHandle}; /// TODO: instead of an enum, I tried to use Box, but hit https://github.com/gakonst/ethers-rs/issues/592 #[derive(From)] @@ -203,7 +204,7 @@ impl Web3Connection { #[instrument(skip_all)] pub async fn reconnect( self: &Arc, - block_sender: &flume::Sender<(Block, Arc)>, + block_sender: Option, Arc)>>, ) -> anyhow::Result<()> { // websocket doesn't need the http client let http_client = None; @@ -214,10 +215,14 @@ impl Web3Connection { *provider = None; // tell the block subscriber that we are at 0 - block_sender - .send_async((Block::default(), self.clone())) - .await?; + if let Some(block_sender) = block_sender { + block_sender + .send_async((Block::default(), self.clone())) + .await + .context("block_sender at 0")?; + } + // TODO: if this fails, keep retrying let new_provider = Web3Provider::from_str(&self.url, http_client).await?; *provider = Some(Arc::new(new_provider)); @@ -249,7 +254,10 @@ impl Web3Connection { match block { Ok(block) => { // TODO: i'm pretty sure we don't need send_async, but double check - block_sender.send_async((block, self.clone())).await?; + block_sender + .send_async((block, self.clone())) + .await + .context("block_sender")?; } Err(e) => { warn!("unable to get block from {}: {}", self, e); @@ -265,53 +273,52 @@ impl Web3Connection { tx_id_sender: Option)>>, reconnect: bool, ) -> anyhow::Result<()> { - match (block_sender, tx_id_sender) { - (None, None) => { - // TODO: is there a better way to make a channel that is never ready? - let (tx, rx) = oneshot::channel::<()>(); - rx.await?; - drop(tx); + loop { + let mut futures = vec![]; + + if let Some(block_sender) = &block_sender { + let f = self.clone().subscribe_new_heads(block_sender.clone()); + + futures.push(flatten_handle(tokio::spawn(f))); } - (Some(block_sender), Some(tx_id_sender)) => { - // TODO: make these abortable so that if one fails the other can be cancelled? - loop { - let new_heads = { - let clone = self.clone(); - let block_sender = block_sender.clone(); - clone.subscribe_new_heads(block_sender) - }; + if let Some(tx_id_sender) = &tx_id_sender { + let f = self + .clone() + .subscribe_pending_transactions(tx_id_sender.clone()); - let pending_txs = { - let clone = self.clone(); - let tx_id_sender = tx_id_sender.clone(); + futures.push(flatten_handle(tokio::spawn(f))); + } - clone.subscribe_pending_transactions(tx_id_sender) - }; + if futures.is_empty() { + // TODO: is there a better way to make a channel that is never ready? + info!(?self, "no-op subscription"); + return Ok(()); + } - match tokio::try_join!(new_heads, pending_txs) { - Ok(_) => break, - Err(err) => { - if reconnect { - // TODO: exponential backoff - // TODO: share code with new heads subscription - warn!( - "subscription exited. Attempting to reconnect in 1 second. {:?}", err - ); - sleep(Duration::from_secs(1)).await; + match try_join_all(futures).await { + Ok(_) => break, + Err(err) => { + if reconnect { + // TODO: exponential backoff + let retry_in = Duration::from_secs(1); + warn!( + ?self, + "subscription exited. Attempting to reconnect in {:?}. {:?}", + retry_in, + err + ); + sleep(retry_in).await; - // TODO: loop on reconnecting! do not return with a "?" here - // TODO: this isn't going to work. it will get in a loop with newHeads - self.reconnect(&block_sender).await?; - } else { - error!("subscription exited. {:?}", err); - break; - } - } - }; + // TODO: loop on reconnecting! do not return with a "?" here + // TODO: this isn't going to work. it will get in a loop with newHeads + self.reconnect(block_sender.clone()).await?; + } else { + error!(?self, ?err, "subscription exited"); + return Err(err); + } } } - _ => panic!(), } Ok(()) @@ -463,13 +470,14 @@ impl Web3Connection { // TODO: query existing pending txs? // TODO: should the stream have a timeout on it here? - // TODO: although reconnects will make this less of an issue + // TODO: i don't think loop match is what we want. i think while let would be better loop { match stream.next().await { Some(pending_tx_id) => { tx_id_sender .send_async((pending_tx_id, self.clone())) - .await?; + .await + .context("tx_id_sender")?; } None => { warn!("subscription ended"); diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 5bb7c175..16d26527 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -1,10 +1,12 @@ ///! Load balanced communication with a group of web3 providers +use anyhow::Context; use arc_swap::ArcSwap; use counter::Counter; use dashmap::mapref::entry::Entry as DashMapEntry; use dashmap::DashMap; use derive_more::From; use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256}; +use futures::future::try_join_all; use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::HashMap; @@ -16,12 +18,12 @@ use std::collections::{BTreeMap, BTreeSet}; use std::fmt; use std::sync::Arc; use std::time::Duration; -use tokio::sync::watch; +use tokio::sync::{broadcast, watch}; use tokio::task; use tokio::time::sleep; use tracing::{debug, info, info_span, instrument, trace, warn}; -use crate::app::{AnyhowJoinHandle, TxState}; +use crate::app::{flatten_handle, AnyhowJoinHandle, TxState}; use crate::config::Web3ConnectionConfig; use crate::connection::{ActiveRequestHandle, Web3Connection}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; @@ -34,6 +36,7 @@ struct SyncedConnections { // TODO: this should be able to serialize, but it isn't #[serde(skip_serializing)] inner: BTreeSet>, + // TODO: use petgraph for keeping track of the chain so we can do better fork handling } impl fmt::Debug for SyncedConnections { @@ -54,7 +57,7 @@ impl SyncedConnections { pub struct Web3Connections { inner: Vec>, synced_connections: ArcSwap, - pending_transactions: DashMap, + pending_transactions: DashMap, } impl Serialize for Web3Connections { @@ -89,7 +92,7 @@ impl Web3Connections { http_client: Option<&reqwest::Client>, rate_limiter: Option<&redis_cell_client::MultiplexedConnection>, head_block_sender: Option>>, - pending_tx_sender: Option>, + pending_tx_sender: Option>, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let num_connections = server_configs.len(); @@ -153,15 +156,86 @@ impl Web3Connections { Ok((connections, handle)) } + async fn send_transaction( + self: Arc, + rpc: Arc, + pending_tx_id: TxHash, + pending_tx_sender: broadcast::Sender, + ) -> anyhow::Result<()> { + for i in 0..30 { + // TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it? + match self.pending_transactions.entry(pending_tx_id) { + DashMapEntry::Occupied(_entry) => { + // TODO: if its occupied, but still only "Known", multiple nodes have this transaction. ask both + return Ok(()); + } + DashMapEntry::Vacant(entry) => { + let request_handle = rpc.wait_for_request_handle().await; + + // TODO: how many retries? + // TODO: use a generic retry provider instead? + let tx_result = request_handle + .request("eth_getTransactionByHash", (pending_tx_id,)) + .await; + + // TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself + // TODO: there is a race here sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!) + let pending_transaction = match tx_result { + Ok(tx) => Some(tx), + Err(err) => { + trace!( + ?i, + ?err, + ?pending_tx_id, + "error getting transaction by hash" + ); + + // TODO: how long? exponential backoff? + sleep(Duration::from_millis(100)).await; + continue; + } + }; + + trace!(?pending_transaction, "pending"); + + let pending_transaction: Transaction = pending_transaction.unwrap(); + + // TODO: do not unwrap. orphans might make this unsafe + let tx_state = match &pending_transaction.block_hash { + Some(_block_hash) => { + // the transaction is already confirmed. no need to save in the pending_transactions map + TxState::Confirmed(pending_transaction) + } + None => { + let state = TxState::Pending(pending_transaction); + entry.insert(state.clone()); + state + } + }; + + // TODO: maybe we should just send the txid and they can get it from the dashmap? + let _ = pending_tx_sender.send(tx_state); + + info!(?pending_tx_id, "sent"); + + return Ok(()); + } + } + } + + info!(?pending_tx_id, "not found"); + Ok(()) + } + /// subscribe to all the backend rpcs async fn subscribe( self: Arc, pending_tx_id_receiver: flume::Receiver<(TxHash, Arc)>, block_receiver: flume::Receiver<(Block, Arc)>, head_block_sender: Option>>, - pending_tx_sender: Option>, + pending_tx_sender: Option>, ) -> anyhow::Result<()> { - let mut futures = FuturesUnordered::new(); + let mut futures = vec![]; // setup the transaction funnel // it skips any duplicates (unless they are being orphaned) @@ -171,39 +245,23 @@ impl Web3Connections { // TODO: do something with the handle so we can catch any errors let clone = self.clone(); let handle = task::spawn(async move { - while let Ok((pending_transaction_id, rpc)) = - pending_tx_id_receiver.recv_async().await - { - match clone.pending_transactions.entry(pending_transaction_id) { - DashMapEntry::Occupied(_entry) => continue, - DashMapEntry::Vacant(entry) => { - let request_handle = rpc.wait_for_request_handle().await; + while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await { + // TODO: spawn this + let f = clone.clone().send_transaction( + rpc, + pending_tx_id, + pending_tx_sender.clone(), + ); - let pending_transaction: Transaction = request_handle - .request("eth_getTransactionByHash", (pending_transaction_id,)) - .await?; - - trace!(?pending_transaction, "pending"); - - // TODO: do not unwrap. orphans might make this unsafe - let tx_state = match &pending_transaction.block_hash { - Some(_block_hash) => TxState::Confirmed(pending_transaction), - None => { - entry.insert(pending_transaction.clone()); - TxState::Pending(pending_transaction) - } - }; - - // TODO: maybe we should just send the txid and they can get it from the dashmap? - pending_tx_sender.send_async(tx_state).await?; - } - } + tokio::spawn(f); } Ok(()) }); - futures.push(handle); + futures.push(flatten_handle(handle)); + } else { + unimplemented!(); } // setup the block funnel @@ -218,7 +276,7 @@ impl Web3Connections { .await }); - futures.push(handle); + futures.push(flatten_handle(handle)); } if futures.is_empty() { @@ -226,8 +284,8 @@ impl Web3Connections { unimplemented!("every second, check that the provider is still connected"); } - if let Some(Err(e)) = futures.next().await { - return Err(e.into()); + if let Err(e) = try_join_all(futures).await { + return Err(e); } info!("subscriptions over: {:?}", self); @@ -310,7 +368,7 @@ impl Web3Connections { block_receiver: flume::Receiver<(Block, Arc)>, head_block_sender: watch::Sender>, // TODO: use pending_tx_sender - pending_tx_sender: Option>, + pending_tx_sender: Option>, ) -> anyhow::Result<()> { let total_rpcs = self.inner.len(); @@ -323,8 +381,11 @@ impl Web3Connections { let new_block_num = match new_block.number { Some(x) => x.as_u64(), None => { - // TODO: wth. how is this happening? need more logs - warn!(?new_block, "Block without number!"); + // block without a number is expected a node is syncing or + if new_block.hash.is_some() { + // this seems unlikely, but i'm pretty sure we see it + warn!(?new_block, "Block without number!"); + } continue; } }; @@ -360,7 +421,9 @@ impl Web3Connections { // TODO: if the parent hash isn't our previous best block, ignore it pending_synced_connections.head_block_hash = new_block_hash; - head_block_sender.send(new_block.clone())?; + head_block_sender + .send(new_block.clone()) + .context("head_block_sender")?; // TODO: mark all transactions as confirmed // TODO: mark any orphaned transactions as unconfirmed @@ -415,7 +478,9 @@ impl Web3Connections { // TODO: do this more efficiently? if pending_synced_connections.head_block_hash != most_common_head_hash { - head_block_sender.send(new_block.clone())?; + head_block_sender + .send(new_block.clone()) + .context("head_block_sender")?; pending_synced_connections.head_block_hash = most_common_head_hash; } @@ -449,15 +514,10 @@ impl Web3Connections { // TODO: what if the hashes don't match? if pending_synced_connections.head_block_hash == new_block_hash { // mark all transactions in the block as confirmed - if let Some(pending_tx_sender) = &pending_tx_sender { - // TODO: we need new_block to be the new_head_block + if pending_tx_sender.is_some() { for tx_hash in &new_block.transactions { - match self.pending_transactions.remove(tx_hash) { - Some((_tx_id, tx)) => { - pending_tx_sender.send_async(TxState::Confirmed(tx)).await?; - } - None => continue, - } + // TODO: should we mark as confirmed via pending_tx_sender so that orphans are easier? + let _ = self.pending_transactions.remove(tx_hash); } }; diff --git a/web3-proxy/src/frontend/errors.rs b/web3-proxy/src/frontend/errors.rs index 6ac9353c..3ce04ced 100644 --- a/web3-proxy/src/frontend/errors.rs +++ b/web3-proxy/src/frontend/errors.rs @@ -1,8 +1,9 @@ -use crate::jsonrpc::JsonRpcForwardedResponse; use axum::{http::StatusCode, response::IntoResponse, Json}; use serde_json::value::RawValue; use tracing::warn; +use crate::jsonrpc::JsonRpcForwardedResponse; + /// TODO: pretty 404 page? or us a json error fine? pub async fn handler_404() -> impl IntoResponse { let err = anyhow::anyhow!("nothing to see here"); diff --git a/web3-proxy/src/frontend/http.rs b/web3-proxy/src/frontend/http.rs index f313f7f8..4bab6278 100644 --- a/web3-proxy/src/frontend/http.rs +++ b/web3-proxy/src/frontend/http.rs @@ -1,8 +1,9 @@ -use crate::app::Web3ProxyApp; use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; use serde_json::json; use std::sync::Arc; +use crate::app::Web3ProxyApp; + /// a page for configuring your wallet with all the rpcs /// TODO: check auth (from authp?) here /// TODO: return actual html diff --git a/web3-proxy/src/frontend/http_proxy.rs b/web3-proxy/src/frontend/http_proxy.rs index 0b02ff91..a3ad33fb 100644 --- a/web3-proxy/src/frontend/http_proxy.rs +++ b/web3-proxy/src/frontend/http_proxy.rs @@ -1,8 +1,9 @@ -use super::errors::handle_anyhow_error; -use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum}; use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; use std::sync::Arc; +use super::errors::handle_anyhow_error; +use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum}; + pub async fn proxy_web3_rpc( payload: Json, app: Extension>, diff --git a/web3-proxy/src/frontend/ws_proxy.rs b/web3-proxy/src/frontend/ws_proxy.rs index 0d0c0065..1486e1ab 100644 --- a/web3-proxy/src/frontend/ws_proxy.rs +++ b/web3-proxy/src/frontend/ws_proxy.rs @@ -1,7 +1,3 @@ -use crate::{ - app::Web3ProxyApp, - jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest}, -}; use axum::{ extract::ws::{Message, WebSocket, WebSocketUpgrade}, response::IntoResponse, @@ -16,7 +12,12 @@ use hashbrown::HashMap; use serde_json::value::RawValue; use std::str::from_utf8_mut; use std::sync::Arc; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; + +use crate::{ + app::Web3ProxyApp, + jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest}, +}; pub async fn websocket_handler( app: Extension>, @@ -154,8 +155,8 @@ async fn write_web3_socket( ) { while let Ok(msg) = response_rx.recv_async().await { // a response is ready. write it to ws_tx - if ws_tx.send(msg).await.is_err() { - // TODO: log the error + if let Err(err) = ws_tx.send(msg).await { + warn!(?err, "unable to write to websocket"); break; }; } diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index 970303c5..5d5a9f98 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -7,17 +7,18 @@ mod connections; mod frontend; mod jsonrpc; -use crate::app::{flatten_handle, Web3ProxyApp}; -use crate::config::{CliConfig, RpcConfig}; use parking_lot::deadlock; use std::fs; use std::sync::atomic::{self, AtomicUsize}; use std::thread; use std::time::Duration; use tokio::runtime; -use tracing::{error, info, trace}; +use tracing::{info, trace}; use tracing_subscriber::EnvFilter; +use crate::app::{flatten_handle, Web3ProxyApp}; +use crate::config::{CliConfig, RpcConfig}; + fn main() -> anyhow::Result<()> { // if RUST_LOG isn't set, configure a default // TODO: is there a better way to do this? @@ -86,15 +87,17 @@ fn main() -> anyhow::Result<()> { let frontend_handle = tokio::spawn(frontend::run(cli_config.port, app)); - match tokio::try_join!(flatten_handle(app_handle), flatten_handle(frontend_handle)) { - Ok(_) => { - // do something with the values - info!("app completed") + // if everything is working, these should both run forever + tokio::select! { + x = flatten_handle(app_handle) => { + // TODO: error log if error + info!(?x, "app_handle exited"); } - Err(err) => { - error!(?err, "app failed"); + x = flatten_handle(frontend_handle) => { + // TODO: error log if error + info!(?x, "frontend exited"); } - } + }; Ok(()) })