proper result

This commit is contained in:
Bryan Stitt 2022-05-30 18:23:55 +00:00
parent 09db979ba3
commit 2c478e8b61

View File

@ -12,6 +12,7 @@ use futures::future::{join_all, AbortHandle};
use futures::stream::StreamExt; use futures::stream::StreamExt;
use linkedhashmap::LinkedHashMap; use linkedhashmap::LinkedHashMap;
use parking_lot::RwLock; use parking_lot::RwLock;
use serde_json::json;
use serde_json::value::RawValue; use serde_json::value::RawValue;
use std::fmt; use std::fmt;
use std::sync::Arc; use std::sync::Arc;
@ -144,9 +145,12 @@ impl Web3ProxyApp {
) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> { ) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> {
let (subscription_handle, subscription_registration) = AbortHandle::new_pair(); let (subscription_handle, subscription_registration) = AbortHandle::new_pair();
// TODO: generate subscription_id as needed. atomic u16?
let subscription_id = "0xcd0c3e8af590364c09d0fa6a1210faf5".to_string();
let f = { let f = {
let head_block_receiver = self.head_block_receiver.clone(); let head_block_receiver = self.head_block_receiver.clone();
let id = id.clone(); let subscription_id = subscription_id.clone();
if payload.params.as_deref().unwrap().to_string() == r#"["newHeads"]"# { if payload.params.as_deref().unwrap().to_string() == r#"["newHeads"]"# {
info!("received new heads subscription"); info!("received new heads subscription");
@ -158,15 +162,20 @@ impl Web3ProxyApp {
while let Some(new_head) = head_block_receiver.next().await { while let Some(new_head) = head_block_receiver.next().await {
// TODO: this String to RawValue probably not efficient, but it works for now // TODO: this String to RawValue probably not efficient, but it works for now
let msg = JsonRpcForwardedResponse::from_string( // TODO: make a struct for this?
serde_json::to_string(&new_head).unwrap(), let msg = json!({
id.clone(), "jsonrpc": "2.0",
); "method":"eth_subscription",
"params": {
"subscription": subscription_id,
"result": new_head,
},
});
// let msg = JsonRpcForwardedResponse::from_json(&msg, id.clone());
let msg = Message::Text(serde_json::to_string(&msg).unwrap()); let msg = Message::Text(serde_json::to_string(&msg).unwrap());
info!(?msg);
subscription_tx.send_async(msg).await.unwrap(); subscription_tx.send_async(msg).await.unwrap();
} }
} }
@ -177,9 +186,6 @@ impl Web3ProxyApp {
tokio::spawn(f); tokio::spawn(f);
// TODO: generate subscription_id as needed. atomic u16?
let subscription_id = r#""0xcd0c3e8af590364c09d0fa6a1210faf5""#.to_string();
let response = JsonRpcForwardedResponse::from_string(subscription_id, id); let response = JsonRpcForwardedResponse::from_string(subscription_id, id);
Ok((subscription_handle, response)) Ok((subscription_handle, response))