From a5e324a6925da4057beba6b0d6e636e8358e1bd1 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 14 Jun 2022 07:13:42 +0000 Subject: [PATCH] newPendingTransactions --- web3-proxy/src/app.rs | 60 ++++++++++++++++++++++++----- web3-proxy/src/connections.rs | 7 ++++ web3-proxy/src/frontend/ws_proxy.rs | 11 ++++-- 3 files changed, 64 insertions(+), 14 deletions(-) diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 91a01d98..2a1469d7 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -187,12 +187,12 @@ impl Web3ProxyApp { } pub async fn eth_subscribe( - &self, + self: Arc, payload: JsonRpcRequest, // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now subscription_tx: flume::Sender, ) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> { - let (subscription_handle, subscription_registration) = AbortHandle::new_pair(); + let (subscription_abort_handle, subscription_registration) = AbortHandle::new_pair(); // TODO: this only needs to be unique per connection. we don't need it globably unique let subscription_id = self @@ -203,7 +203,7 @@ impl Web3ProxyApp { // save the id so we can use it in the response let id = payload.id.clone(); - let subscription_future = { + let subscription_join_handle = { let subscription_id = subscription_id.clone(); match payload.params.as_deref().unwrap().get() { @@ -211,7 +211,7 @@ impl Web3ProxyApp { let head_block_receiver = self.head_block_receiver.clone(); trace!(?subscription_id, "new heads subscription"); - async move { + tokio::spawn(async move { let mut head_block_receiver = Abortable::new( WatchStream::new(head_block_receiver), subscription_registration, @@ -237,21 +237,61 @@ impl Web3ProxyApp { } trace!(?subscription_id, "closed new heads subscription"); - } + }) } - r#"["pendingTransactions"]"# => { - unimplemented!("pendingTransactions") + r#"["newPendingTransactions"]"# => { + let pending_tx_receiver = self.pending_tx_receiver.clone(); + + trace!(?subscription_id, "pending transactions subscription"); + tokio::spawn(async move { + while let Ok(new_tx_state) = pending_tx_receiver.recv_async().await { + let new_tx = match new_tx_state { + TxState::Confirmed(..) => continue, + TxState::Orphaned(tx_hash, _block_hash) => { + self.balanced_rpcs.get_pending_tx(&tx_hash) + } + TxState::Pending(tx_hash) => { + self.balanced_rpcs.get_pending_tx(&tx_hash) + } + }; + + if new_tx.is_none() { + continue; + } + + let new_tx = &*new_tx.unwrap(); + + // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id + let msg = json!({ + "jsonrpc": "2.0", + "method": "eth_subscription", + "params": { + "subscription": subscription_id, + // upstream just sends the txid, but we want to send the whole transaction + "result": new_tx, + }, + }); + + let msg = Message::Text(serde_json::to_string(&msg).unwrap()); + + if subscription_tx.send_async(msg).await.is_err() { + // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? + break; + }; + } + + trace!(?subscription_id, "closed new heads subscription"); + }) } _ => return Err(anyhow::anyhow!("unimplemented")), } }; - // TODO: what should we do with this handle? i think its fine to just drop it - tokio::spawn(subscription_future); + // TODO: do something with subscription_join_handle? let response = JsonRpcForwardedResponse::from_string(subscription_id, id); - Ok((subscription_handle, response)) + Ok((subscription_abort_handle, response)) } pub fn get_balanced_rpcs(&self) -> &Web3Connections { diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index e97e8546..49f296c9 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -238,6 +238,13 @@ impl Web3Connections { Ok(()) } + pub fn get_pending_tx( + &self, + tx_hash: &TxHash, + ) -> Option> { + self.pending_transactions.get(tx_hash) + } + pub fn get_head_block_hash(&self) -> H256 { *self.synced_connections.load().get_head_block_hash() } diff --git a/web3-proxy/src/frontend/ws_proxy.rs b/web3-proxy/src/frontend/ws_proxy.rs index 59e925d3..0d0c0065 100644 --- a/web3-proxy/src/frontend/ws_proxy.rs +++ b/web3-proxy/src/frontend/ws_proxy.rs @@ -37,7 +37,7 @@ async fn proxy_web3_socket(app: Extension>, socket: WebSocket) } async fn handle_socket_payload( - app: &Web3ProxyApp, + app: Arc, payload: &str, response_tx: &flume::Sender, subscriptions: &mut HashMap, @@ -48,7 +48,10 @@ async fn handle_socket_payload( let response: anyhow::Result = match &payload.method[..] { "eth_subscribe" => { - let response = app.eth_subscribe(payload, response_tx.clone()).await; + let response = app + .clone() + .eth_subscribe(payload, response_tx.clone()) + .await; match response { Ok((handle, response)) => { @@ -115,7 +118,7 @@ async fn read_web3_socket( // new message from our client. forward to a backend and then send it through response_tx let response_msg = match msg { Message::Text(payload) => { - handle_socket_payload(app.0.as_ref(), &payload, &response_tx, &mut subscriptions) + handle_socket_payload(app.0.clone(), &payload, &response_tx, &mut subscriptions) .await } Message::Ping(x) => Message::Pong(x), @@ -130,7 +133,7 @@ async fn read_web3_socket( Message::Binary(mut payload) => { let payload = from_utf8_mut(&mut payload).unwrap(); - handle_socket_payload(app.0.as_ref(), payload, &response_tx, &mut subscriptions) + handle_socket_payload(app.0.clone(), payload, &response_tx, &mut subscriptions) .await } };