From 1ecd852401780d87687697d00ebdafbc91876a6f Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 14 Jun 2022 06:42:52 +0000 Subject: [PATCH] simple cache --- web3-proxy/src/app.rs | 14 ++++++++---- web3-proxy/src/connections.rs | 40 ++++++++++++++++++++++++++--------- 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 82ee895a..056c9050 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -5,8 +5,8 @@ use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; use axum::extract::ws::Message; +use dashmap::mapref::entry::Entry as DashMapEntry; use dashmap::DashMap; -use ethers::prelude::Transaction; use ethers::prelude::{Block, TxHash, H256}; use futures::future::Abortable; use futures::future::{join_all, AbortHandle}; @@ -52,6 +52,12 @@ pub async fn flatten_handle(handle: AnyhowJoinHandle) -> anyhow::Result } } +pub enum TxState { + Confirmed(TxHash, H256), + Pending(TxHash), + Orphaned(TxHash, H256), +} + /// The application // TODO: this debug impl is way too verbose. make something smaller // TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs @@ -65,7 +71,7 @@ pub struct Web3ProxyApp { // don't drop this or the sender will stop working head_block_receiver: watch::Receiver>, // TODO: i think we want a TxState enum for Confirmed(TxHash, BlockHash) or Pending(TxHash) or Orphan(TxHash, BlockHash) - pending_tx_receiver: flume::Receiver, + pending_tx_receiver: flume::Receiver, next_subscription_id: AtomicUsize, } @@ -393,10 +399,10 @@ impl Web3ProxyApp { let (incoming_tx, incoming_rx) = watch::channel(true); let mut other_incoming_rx = None; match self.incoming_requests.entry(cache_key.clone()) { - dashmap::mapref::entry::Entry::Occupied(entry) => { + DashMapEntry::Occupied(entry) => { other_incoming_rx = Some(entry.get().clone()); } - dashmap::mapref::entry::Entry::Vacant(entry) => { + DashMapEntry::Vacant(entry) => { entry.insert(incoming_rx); } } diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index cf5e6b13..e97e8546 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -1,6 +1,8 @@ ///! Load balanced communication with a group of web3 providers use arc_swap::ArcSwap; use counter::Counter; +use dashmap::mapref::entry::Entry as DashMapEntry; +use dashmap::DashMap; use derive_more::From; use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256}; use futures::stream::FuturesUnordered; @@ -19,7 +21,7 @@ use tokio::task; use tokio::time::sleep; use tracing::{debug, info, info_span, instrument, trace, warn}; -use crate::app::AnyhowJoinHandle; +use crate::app::{AnyhowJoinHandle, TxState}; use crate::config::Web3ConnectionConfig; use crate::connection::{ActiveRequestHandle, Web3Connection}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; @@ -52,6 +54,7 @@ impl SyncedConnections { pub struct Web3Connections { inner: Vec>, synced_connections: ArcSwap, + pending_transactions: DashMap, } impl Serialize for Web3Connections { @@ -86,7 +89,7 @@ impl Web3Connections { http_client: Option<&reqwest::Client>, rate_limiter: Option<&redis_cell_client::MultiplexedConnection>, head_block_sender: Option>>, - pending_tx_sender: Option>, + pending_tx_sender: Option>, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let num_connections = server_configs.len(); @@ -129,6 +132,7 @@ impl Web3Connections { let connections = Arc::new(Self { inner: connections, synced_connections: ArcSwap::new(Arc::new(synced_connections)), + pending_transactions: Default::default(), }); let handle = { @@ -157,7 +161,7 @@ impl Web3Connections { pending_tx_id_receiver: flume::Receiver<(TxHash, Arc)>, block_receiver: flume::Receiver<(Block, Arc)>, head_block_sender: Option>>, - pending_tx_sender: Option>, + pending_tx_sender: Option>, ) -> anyhow::Result<()> { let mut futures = FuturesUnordered::new(); @@ -167,21 +171,37 @@ impl Web3Connections { // forwards new transacitons to pending_tx_receipt_sender if let Some(pending_tx_sender) = pending_tx_sender { // TODO: do something with the handle so we can catch any errors + let clone = self.clone(); let handle = task::spawn(async move { while let Ok((pending_transaction_id, rpc)) = pending_tx_id_receiver.recv_async().await { - unimplemented!("de-dedup the pending txid"); + match clone.pending_transactions.entry(pending_transaction_id) { + DashMapEntry::Occupied(_entry) => continue, + DashMapEntry::Vacant(entry) => { + let request_handle = rpc.wait_for_request_handle().await; - let request_handle = rpc.wait_for_request_handle().await; + let pending_transaction: Transaction = request_handle + .request("eth_getTransactionByHash", (pending_transaction_id,)) + .await?; - let pending_transaction: Transaction = request_handle - .request("eth_getTransactionByHash", (pending_transaction_id,)) - .await?; + trace!(?pending_transaction, "pending"); - trace!(?pending_transaction, "pending"); + // TODO: do not unwrap. orphans might make this unsafe + let tx_state = match &pending_transaction.block_hash { + Some(block_hash) => { + TxState::Confirmed(pending_transaction_id, *block_hash) + } + None => { + entry.insert(pending_transaction); + TxState::Pending(pending_transaction_id) + } + }; - pending_tx_sender.send_async(pending_transaction).await?; + // TODO: maybe we should just send the txid and they can get it from the dashmap? + pending_tx_sender.send_async(tx_state).await?; + } + } } Ok(())