From 3d3e0c8c8b0762c01184695db27bcde355ea7b6c Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 14 Jun 2022 22:02:18 +0000 Subject: [PATCH] it works, but we need it to be optional --- TODO.md | 5 +-- web3-proxy/src/app.rs | 21 ++++-------- web3-proxy/src/connections.rs | 60 +++++++++++++++++++++-------------- 3 files changed, 46 insertions(+), 40 deletions(-) diff --git a/TODO.md b/TODO.md index ee1bcfca..5e3fb031 100644 --- a/TODO.md +++ b/TODO.md @@ -1,8 +1,8 @@ # Todo - [ ] if web3 proxy gets an http error back, retry another node -- [ ] refactor Connection::spawn. have it return a handle to the spawned future of it running with block and transaction subscriptions -- [ ] refactor Connections::spawn. have it return a handle that is selecting on those handles? +- [x] refactor Connection::spawn. have it return a handle to the spawned future of it running with block and transaction subscriptions +- [x] refactor Connections::spawn. have it return a handle that is selecting on those handles? - [x] support websocket clients - we support websockets for the backends already, but we need them for the frontend too - [ ] when block subscribers receive blocks, store them in a cache. use this cache instead of querying eth_getBlock @@ -44,6 +44,7 @@ - [ ] zero downtime deploys - [ ] are we using Acquire/Release/AcqRel properly? or do we need other modes? - [ ] subscription id should be per connection, not global +- [ ] emit stats - [x] simple proxy - [x] better locking. when lots of requests come in, we seem to be in the way of block updates - [x] load balance between multiple RPC servers diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 2a1469d7..eb4f6720 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -7,6 +7,7 @@ use crate::jsonrpc::JsonRpcRequestEnum; use axum::extract::ws::Message; use dashmap::mapref::entry::Entry as DashMapEntry; use dashmap::DashMap; +use ethers::prelude::Transaction; use ethers::prelude::{Block, TxHash, H256}; use futures::future::Abortable; use futures::future::{join_all, AbortHandle}; @@ -53,9 +54,9 @@ pub async fn flatten_handle(handle: AnyhowJoinHandle) -> anyhow::Result } pub enum TxState { - Confirmed(TxHash, H256), - Pending(TxHash), - Orphaned(TxHash, H256), + Confirmed(Transaction), + Pending(Transaction), + Orphaned(Transaction), } /// The application @@ -247,20 +248,10 @@ impl Web3ProxyApp { while let Ok(new_tx_state) = pending_tx_receiver.recv_async().await { let new_tx = match new_tx_state { TxState::Confirmed(..) => continue, - TxState::Orphaned(tx_hash, _block_hash) => { - self.balanced_rpcs.get_pending_tx(&tx_hash) - } - TxState::Pending(tx_hash) => { - self.balanced_rpcs.get_pending_tx(&tx_hash) - } + TxState::Orphaned(tx) => tx, + TxState::Pending(tx) => tx, }; - if new_tx.is_none() { - continue; - } - - let new_tx = &*new_tx.unwrap(); - // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id let msg = json!({ "jsonrpc": "2.0", diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 49f296c9..5bb7c175 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -141,7 +141,6 @@ impl Web3Connections { tokio::spawn(async move { connections .subscribe( - pending_tx_id_sender, pending_tx_id_receiver, block_receiver, head_block_sender, @@ -157,7 +156,6 @@ impl Web3Connections { /// subscribe to all the backend rpcs async fn subscribe( self: Arc, - pending_tx_id_sender: flume::Sender<(TxHash, Arc)>, pending_tx_id_receiver: flume::Receiver<(TxHash, Arc)>, block_receiver: flume::Receiver<(Block, Arc)>, head_block_sender: Option>>, @@ -169,7 +167,7 @@ impl Web3Connections { // it skips any duplicates (unless they are being orphaned) // fetches new transactions from the notifying rpc // forwards new transacitons to pending_tx_receipt_sender - if let Some(pending_tx_sender) = pending_tx_sender { + if let Some(pending_tx_sender) = pending_tx_sender.clone() { // TODO: do something with the handle so we can catch any errors let clone = self.clone(); let handle = task::spawn(async move { @@ -189,12 +187,10 @@ impl Web3Connections { // TODO: do not unwrap. orphans might make this unsafe let tx_state = match &pending_transaction.block_hash { - Some(block_hash) => { - TxState::Confirmed(pending_transaction_id, *block_hash) - } + Some(_block_hash) => TxState::Confirmed(pending_transaction), None => { - entry.insert(pending_transaction); - TxState::Pending(pending_transaction_id) + entry.insert(pending_transaction.clone()); + TxState::Pending(pending_transaction) } }; @@ -213,11 +209,12 @@ impl Web3Connections { // setup the block funnel if let Some(head_block_sender) = head_block_sender { let connections = Arc::clone(&self); + let pending_tx_sender = pending_tx_sender.clone(); let handle = task::Builder::default() .name("update_synced_rpcs") .spawn(async move { connections - .update_synced_rpcs(block_receiver, head_block_sender, pending_tx_id_sender) + .update_synced_rpcs(block_receiver, head_block_sender, pending_tx_sender) .await }); @@ -238,12 +235,12 @@ impl Web3Connections { Ok(()) } - pub fn get_pending_tx( - &self, - tx_hash: &TxHash, - ) -> Option> { - self.pending_transactions.get(tx_hash) - } + // pub fn get_pending_tx( + // &self, + // tx_hash: &TxHash, + // ) -> Option> { + // self.pending_transactions.get(tx_hash) + // } pub fn get_head_block_hash(&self) -> H256 { *self.synced_connections.load().get_head_block_hash() @@ -312,7 +309,8 @@ impl Web3Connections { &self, block_receiver: flume::Receiver<(Block, Arc)>, head_block_sender: watch::Sender>, - pending_tx_id_sender: flume::Sender<(TxHash, Arc)>, + // TODO: use pending_tx_sender + pending_tx_sender: Option>, ) -> anyhow::Result<()> { let total_rpcs = self.inner.len(); @@ -322,10 +320,10 @@ impl Web3Connections { let mut pending_synced_connections = SyncedConnections::default(); while let Ok((new_block, rpc)) = block_receiver.recv_async().await { - // TODO: wth. how is this happening? need more logs let new_block_num = match new_block.number { Some(x) => x.as_u64(), None => { + // TODO: wth. how is this happening? need more logs warn!(?new_block, "Block without number!"); continue; } @@ -341,7 +339,7 @@ impl Web3Connections { let _enter = span.enter(); if new_block_num == 0 { - warn!("rpc is still syncing"); + warn!("still syncing"); } connection_states.insert(rpc.clone(), (new_block_num, new_block_hash)); @@ -362,7 +360,7 @@ impl Web3Connections { // TODO: if the parent hash isn't our previous best block, ignore it pending_synced_connections.head_block_hash = new_block_hash; - head_block_sender.send(new_block)?; + head_block_sender.send(new_block.clone())?; // TODO: mark all transactions as confirmed // TODO: mark any orphaned transactions as unconfirmed @@ -417,11 +415,8 @@ impl Web3Connections { // TODO: do this more efficiently? if pending_synced_connections.head_block_hash != most_common_head_hash { - head_block_sender.send(new_block)?; + head_block_sender.send(new_block.clone())?; pending_synced_connections.head_block_hash = most_common_head_hash; - - // TODO: mark all transactions as confirmed - // TODO: mark any orphaned transactions as unconfirmed } pending_synced_connections.inner = synced_rpcs.into_iter().collect(); @@ -451,8 +446,27 @@ impl Web3Connections { synced_connections.inner ); + // TODO: what if the hashes don't match? + if pending_synced_connections.head_block_hash == new_block_hash { + // mark all transactions in the block as confirmed + if let Some(pending_tx_sender) = &pending_tx_sender { + // TODO: we need new_block to be the new_head_block + for tx_hash in &new_block.transactions { + match self.pending_transactions.remove(tx_hash) { + Some((_tx_id, tx)) => { + pending_tx_sender.send_async(TxState::Confirmed(tx)).await?; + } + None => continue, + } + } + }; + + // TODO: mark any orphaned transactions as unconfirmed + } + // TODO: only publish if there are x (default 2) nodes synced to this block? // do the arcswap + // TODO: do this before or after processing all the transactions in this block? self.synced_connections.swap(synced_connections); }