From afb3dde4b33b86093cc0e751cdc61aff057991bd Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 18 Jun 2022 07:06:54 +0000 Subject: [PATCH] newPendingRawTransactions --- Cargo.lock | 4 ++-- TODO.md | 1 + web3-proxy/Cargo.toml | 2 +- web3-proxy/src/app.rs | 44 +++++++++++++++++++++++++++++++++++ web3-proxy/src/connection.rs | 2 +- web3-proxy/src/connections.rs | 15 ++++-------- 6 files changed, 53 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5662ccd3..b473c957 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3544,9 +3544,9 @@ dependencies = [ [[package]] name = "tower" -version = "0.4.12" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a89fd63ad6adf737582df5db40d286574513c69a11dac5214dc3b5603d6713e" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", diff --git a/TODO.md b/TODO.md index d846ce19..76ed2f1b 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,6 @@ # Todo +- [ ] quick requests/second timer until we have real stats - [x] it works for a few seconds and then gets stuck on something. - [x] its working with one backend node, but multiple breaks. something to do with pending transactions - [x] dashmap entry api is easy to deadlock! be careful with it! diff --git a/web3-proxy/Cargo.toml b/web3-proxy/Cargo.toml index 4cdc765b..d88650ff 100644 --- a/web3-proxy/Cargo.toml +++ b/web3-proxy/Cargo.toml @@ -35,5 +35,5 @@ tracing = "0.1.35" # TODO: tracing-subscriber has serde and serde_json features that we might want to use tracing-subscriber = { version = "0.3.11", features = ["env-filter", "parking_lot"] } url = "2.2.2" -tower = "0.4.12" +tower = "0.4.13" tokio-stream = { version = "0.1.9", features = ["sync"] } diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 8df99651..cb465649 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -355,6 +355,50 @@ impl Web3ProxyApp { trace!(?subscription_id, "closed new heads subscription"); }); } + r#"["newPendingRawTransactions"]"# => { + // TODO: too much copy/pasta with newPendingTransactions + let pending_tx_receiver = self.pending_tx_sender.subscribe(); + + let mut pending_tx_receiver = Abortable::new( + BroadcastStream::new(pending_tx_receiver), + subscription_registration, + ); + + let subscription_id = subscription_id.clone(); + + trace!(?subscription_id, "pending transactions subscription"); + + // TODO: do something with this handle? + tokio::spawn(async move { + while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { + let new_tx = match new_tx_state { + TxState::Pending(tx) => tx, + TxState::Confirmed(..) => continue, + TxState::Orphaned(tx) => tx, + }; + + // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id + let msg = json!({ + "jsonrpc": "2.0", + "method": "eth_subscription", + "params": { + "subscription": subscription_id, + // upstream just sends the txid, but we want to send the whole transaction + "result": new_tx.rlp(), + }, + }); + + let msg = Message::Text(serde_json::to_string(&msg).unwrap()); + + if subscription_tx.send_async(msg).await.is_err() { + // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? + break; + }; + } + + trace!(?subscription_id, "closed new heads subscription"); + }); + } _ => return Err(anyhow::anyhow!("unimplemented")), } diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index eecc9834..3eb3f16a 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -427,7 +427,7 @@ impl Web3Connection { // TODO: is a RwLock of an Option the right thing here? if let Some(provider) = self.provider.read().await.clone() { match &*provider { - Web3Provider::Http(_provider) => { + Web3Provider::Http(provider) => { // there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints // TODO: what should this interval be? probably automatically set to some fraction of block time // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 0749fea0..f8a8d748 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -205,13 +205,13 @@ impl Web3Connections { // TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it? trace!(?pending_tx_id, "checking pending_transactions on {}", rpc); - if self.pending_transactions.contains_key(&pending_tx_id) { - // this transaction has already been processed + if pending_tx_sender.receiver_count() == 0 { + // no receivers, so no point in querying to get the full transaction return Ok(()); } - if pending_tx_sender.receiver_count() == 0 { - // no receivers, so no point in querying to get the full transaction + if self.pending_transactions.contains_key(&pending_tx_id) { + // this transaction has already been processed return Ok(()); } @@ -310,13 +310,6 @@ impl Web3Connections { Ok(()) } - // pub fn get_pending_tx( - // &self, - // tx_hash: &TxHash, - // ) -> Option> { - // self.pending_transactions.get(tx_hash) - // } - pub fn get_head_block_hash(&self) -> H256 { *self.synced_connections.load().get_head_block_hash() }