From 09db979ba3f00fc0a5e35f505db2a10a4b05602f Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 30 May 2022 04:30:13 +0000 Subject: [PATCH] subscriptions getting closer --- Cargo.lock | 13 +++++++++++++ web3-proxy/Cargo.toml | 1 + web3-proxy/src/app.rs | 14 ++++++++------ web3-proxy/src/connection.rs | 13 ++++++++----- web3-proxy/src/connections.rs | 9 +++++---- web3-proxy/src/jsonrpc.rs | 2 ++ 6 files changed, 37 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5aef90d1..267cd223 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3473,6 +3473,18 @@ dependencies = [ "webpki", ] +[[package]] +name = "tokio-stream" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util 0.6.10", +] + [[package]] name = "tokio-tungstenite" version = "0.17.1" @@ -3928,6 +3940,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tokio-stream", "toml", "tower", "tracing", diff --git a/web3-proxy/Cargo.toml b/web3-proxy/Cargo.toml index 14289968..b83c833b 100644 --- a/web3-proxy/Cargo.toml +++ b/web3-proxy/Cargo.toml @@ -36,3 +36,4 @@ tracing = "0.1.34" tracing-subscriber = { version = "0.3.11", features = ["env-filter", "parking_lot"] } url = "2.2.2" tower = "0.4.12" +tokio-stream = { version = "0.1.8", features = ["sync"] } diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 262b21ec..ecb3edc8 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -10,7 +10,6 @@ use ethers::prelude::{Block, TxHash, H256}; use futures::future::Abortable; use futures::future::{join_all, AbortHandle}; use futures::stream::StreamExt; -use futures::FutureExt; use linkedhashmap::LinkedHashMap; use parking_lot::RwLock; use serde_json::value::RawValue; @@ -20,6 +19,7 @@ use std::time::Duration; use tokio::sync::watch; use tokio::task; use tokio::time::timeout; +use tokio_stream::wrappers::WatchStream; use tracing::{debug, info, instrument, trace, warn}; static APP_USER_AGENT: &str = concat!( @@ -49,7 +49,8 @@ pub struct Web3ProxyApp { private_rpcs: Arc, incoming_requests: ActiveRequestsMap, response_cache: ResponseLrcCache, - head_block_receiver: flume::Receiver>, + // don't drop this or the sender will stop working + head_block_receiver: watch::Receiver>, } impl fmt::Debug for Web3ProxyApp { @@ -101,7 +102,7 @@ impl Web3ProxyApp { ) .await?; - let (head_block_sender, head_block_receiver) = flume::unbounded(); + let (head_block_sender, head_block_receiver) = watch::channel(Block::default()); // TODO: do this separately instead of during try_new { @@ -148,13 +149,14 @@ impl Web3ProxyApp { let id = id.clone(); if payload.params.as_deref().unwrap().to_string() == r#"["newHeads"]"# { + info!("received new heads subscription"); async move { let mut head_block_receiver = Abortable::new( - head_block_receiver.into_recv_async().into_stream(), + WatchStream::new(head_block_receiver), subscription_registration, ); - while let Some(Ok(new_head)) = head_block_receiver.next().await { + while let Some(new_head) = head_block_receiver.next().await { // TODO: this String to RawValue probably not efficient, but it works for now let msg = JsonRpcForwardedResponse::from_string( serde_json::to_string(&new_head).unwrap(), @@ -176,7 +178,7 @@ impl Web3ProxyApp { tokio::spawn(f); // TODO: generate subscription_id as needed. atomic u16? - let subscription_id = "0xcd0c3e8af590364c09d0fa6a1210faf5".to_string(); + let subscription_id = r#""0xcd0c3e8af590364c09d0fa6a1210faf5""#.to_string(); let response = JsonRpcForwardedResponse::from_string(subscription_id, id); diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index a025bb7c..5fb2b948 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -220,16 +220,18 @@ impl Web3Connection { block: Result, ProviderError>, block_sender: &flume::Sender<(Block, usize)>, rpc_id: usize, - ) { + ) -> anyhow::Result<()> { match block { Ok(block) => { // TODO: i'm pretty sure we don't need send_async, but double check - block_sender.send_async((block, rpc_id)).await.unwrap(); + block_sender.send_async((block, rpc_id)).await?; } Err(e) => { warn!("unable to get block from {}: {}", self, e); } } + + Ok(()) } /// Subscribe to new blocks. If `reconnect` is true, this runs forever. @@ -283,7 +285,7 @@ impl Web3Connection { last_hash = new_hash; } - self.send_block(block, &block_sender, rpc_id).await; + self.send_block(block, &block_sender, rpc_id).await?; } Err(e) => { warn!("Failed getting latest block from {}: {:?}", self, e); @@ -311,14 +313,15 @@ impl Web3Connection { .request("eth_getBlockByNumber", ("latest", false)) .await; - self.send_block(block, &block_sender, rpc_id).await; + self.send_block(block, &block_sender, rpc_id).await?; // TODO: should the stream have a timeout on it here? // TODO: although reconnects will make this less of an issue loop { match stream.next().await { Some(new_block) => { - self.send_block(Ok(new_block), &block_sender, rpc_id).await; + self.send_block(Ok(new_block), &block_sender, rpc_id) + .await?; // TODO: really not sure about this task::yield_now().await; diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 0953908e..5a43229a 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -15,6 +15,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::fmt; use std::sync::Arc; use std::time::Duration; +use tokio::sync::watch; use tokio::task; use tokio::time::sleep; use tracing::Instrument; @@ -115,7 +116,7 @@ impl Web3Connections { pub async fn subscribe_all_heads( self: &Arc, - head_block_sender: flume::Sender>, + head_block_sender: watch::Sender>, ) { // TODO: benchmark bounded vs unbounded let (block_sender, block_receiver) = flume::unbounded::<(Block, usize)>(); @@ -227,7 +228,7 @@ impl Web3Connections { async fn update_synced_rpcs( &self, block_receiver: flume::Receiver<(Block, usize)>, - head_block_sender: flume::Sender>, + head_block_sender: watch::Sender>, ) -> anyhow::Result<()> { let total_rpcs = self.inner.len(); @@ -267,7 +268,7 @@ impl Web3Connections { // TODO: if the parent hash isn't our previous best block, ignore it pending_synced_connections.head_block_hash = new_block_hash; - head_block_sender.send_async(new_block).await?; + head_block_sender.send(new_block)?; } cmp::Ordering::Equal => { if new_block_hash == pending_synced_connections.head_block_hash { @@ -318,7 +319,7 @@ impl Web3Connections { // TODO: do this more efficiently? if pending_synced_connections.head_block_hash != most_common_head_hash { - head_block_sender.send_async(new_block).await?; + head_block_sender.send(new_block)?; pending_synced_connections.head_block_hash = most_common_head_hash; } diff --git a/web3-proxy/src/jsonrpc.rs b/web3-proxy/src/jsonrpc.rs index 36e0c84c..b3881106 100644 --- a/web3-proxy/src/jsonrpc.rs +++ b/web3-proxy/src/jsonrpc.rs @@ -4,6 +4,7 @@ use serde::de::{self, Deserialize, Deserializer, MapAccess, SeqAccess, Visitor}; use serde::Serialize; use serde_json::value::RawValue; use std::fmt; +use tracing::trace; #[derive(Clone, serde::Deserialize)] pub struct JsonRpcRequest { @@ -191,6 +192,7 @@ impl JsonRpcForwardedResponse { } pub fn from_string(partial_response: String, id: Box) -> Self { + trace!("partial_response: {}", partial_response); let partial_response = RawValue::from_string(partial_response).unwrap(); Self::from_response(partial_response, id)