This commit is contained in:
Bryan Stitt 2022-06-05 20:39:58 +00:00
parent 18a2d72b2c
commit 3abfeaab0b
2 changed files with 12 additions and 15 deletions

@ -13,7 +13,6 @@ use futures::stream::StreamExt;
use linkedhashmap::LinkedHashMap;
use parking_lot::RwLock;
use serde_json::json;
use serde_json::value::RawValue;
use std::fmt;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
@ -141,7 +140,6 @@ impl Web3ProxyApp {
pub async fn eth_subscribe(
&self,
id: Box<RawValue>,
payload: JsonRpcRequest,
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
subscription_tx: flume::Sender<Message>,
@ -154,11 +152,18 @@ impl Web3ProxyApp {
let subscription_id = format!("{:#x}", subscription_id);
// save the id so we can use it in the response
let id = payload.id.clone();
let f = {
let head_block_receiver = self.head_block_receiver.clone();
let subscription_id = subscription_id.clone();
if payload.params.as_deref().unwrap().get() == r#"["newHeads"]"# {
let id = payload.id;
let params = payload.params.as_deref().unwrap().get();
if params == r#"["newHeads"]"# {
let head_block_receiver = self.head_block_receiver.clone();
info!("received new heads subscription");
async move {
let mut head_block_receiver = Abortable::new(
@ -167,9 +172,9 @@ impl Web3ProxyApp {
);
while let Some(new_head) = head_block_receiver.next().await {
// TODO: this String to RawValue probably not efficient, but it works for now
// TODO: make a struct for this?
// TODO: make a struct for this? using JsonRpcForwardedResponse
let msg = json!({
"id": id,
"jsonrpc": "2.0",
"method":"eth_subscription",
"params": {
@ -178,8 +183,6 @@ impl Web3ProxyApp {
},
});
// let msg = JsonRpcForwardedResponse::from_json(&msg, id.clone());
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
subscription_tx.send_async(msg).await.unwrap();

@ -49,10 +49,7 @@ async fn handle_socket_payload(
let response: anyhow::Result<JsonRpcForwardedResponseEnum> = if payload.method
== "eth_subscribe"
{
// TODO: if we pass eth_subscribe the response_tx, we
let response = app
.eth_subscribe(id.clone(), payload, response_tx.clone())
.await;
let response = app.eth_subscribe(payload, response_tx.clone()).await;
match response {
Ok((handle, response)) => {
@ -79,9 +76,6 @@ async fn handle_socket_payload(
Ok(response.into())
} else {
// TODO: if this is a subscription request, we need to do some special handling. something with channels
// TODO: just handle subscribe_newBlock
app.proxy_web3_rpc(payload.into()).await
};