From 3abfeaab0b4b8acf48efb263b4b290c43ab37de0 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sun, 5 Jun 2022 20:39:58 +0000 Subject: [PATCH] fix id --- web3-proxy/src/app.rs | 19 +++++++++++-------- web3-proxy/src/frontend/ws_proxy.rs | 8 +------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 164e8444..1bd1eb72 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -13,7 +13,6 @@ use futures::stream::StreamExt; use linkedhashmap::LinkedHashMap; use parking_lot::RwLock; use serde_json::json; -use serde_json::value::RawValue; use std::fmt; use std::sync::atomic::{self, AtomicUsize}; use std::sync::Arc; @@ -141,7 +140,6 @@ impl Web3ProxyApp { pub async fn eth_subscribe( &self, - id: Box, payload: JsonRpcRequest, // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now subscription_tx: flume::Sender, @@ -154,11 +152,18 @@ impl Web3ProxyApp { let subscription_id = format!("{:#x}", subscription_id); + // save the id so we can use it in the response + let id = payload.id.clone(); + let f = { - let head_block_receiver = self.head_block_receiver.clone(); let subscription_id = subscription_id.clone(); - if payload.params.as_deref().unwrap().get() == r#"["newHeads"]"# { + let id = payload.id; + let params = payload.params.as_deref().unwrap().get(); + + if params == r#"["newHeads"]"# { + let head_block_receiver = self.head_block_receiver.clone(); + info!("received new heads subscription"); async move { let mut head_block_receiver = Abortable::new( @@ -167,9 +172,9 @@ impl Web3ProxyApp { ); while let Some(new_head) = head_block_receiver.next().await { - // TODO: this String to RawValue probably not efficient, but it works for now - // TODO: make a struct for this? + // TODO: make a struct for this? using JsonRpcForwardedResponse let msg = json!({ + "id": id, "jsonrpc": "2.0", "method":"eth_subscription", "params": { @@ -178,8 +183,6 @@ impl Web3ProxyApp { }, }); - // let msg = JsonRpcForwardedResponse::from_json(&msg, id.clone()); - let msg = Message::Text(serde_json::to_string(&msg).unwrap()); subscription_tx.send_async(msg).await.unwrap(); diff --git a/web3-proxy/src/frontend/ws_proxy.rs b/web3-proxy/src/frontend/ws_proxy.rs index 3e8d5198..1f842cdf 100644 --- a/web3-proxy/src/frontend/ws_proxy.rs +++ b/web3-proxy/src/frontend/ws_proxy.rs @@ -49,10 +49,7 @@ async fn handle_socket_payload( let response: anyhow::Result = if payload.method == "eth_subscribe" { - // TODO: if we pass eth_subscribe the response_tx, we - let response = app - .eth_subscribe(id.clone(), payload, response_tx.clone()) - .await; + let response = app.eth_subscribe(payload, response_tx.clone()).await; match response { Ok((handle, response)) => { @@ -79,9 +76,6 @@ async fn handle_socket_payload( Ok(response.into()) } else { - // TODO: if this is a subscription request, we need to do some special handling. something with channels - // TODO: just handle subscribe_newBlock - app.proxy_web3_rpc(payload.into()).await };