From 57f0993852853a39680e62331c4c11665a338b32 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 30 May 2022 01:28:22 +0000 Subject: [PATCH] closer websocket (not yet working) --- web3-proxy/src/app.rs | 62 ++++++++++++++++++++++++++++++----- web3-proxy/src/connection.rs | 18 ++++------ web3-proxy/src/connections.rs | 37 +++++++++++++++------ web3-proxy/src/frontend.rs | 56 +++++++++++++++++-------------- 4 files changed, 118 insertions(+), 55 deletions(-) diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 9b0134f8..262b21ec 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -6,11 +6,14 @@ use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; use axum::extract::ws::Message; use dashmap::DashMap; -use ethers::prelude::H256; -use futures::future::join_all; +use ethers::prelude::{Block, TxHash, H256}; use futures::future::Abortable; +use futures::future::{join_all, AbortHandle}; +use futures::stream::StreamExt; +use futures::FutureExt; use linkedhashmap::LinkedHashMap; use parking_lot::RwLock; +use serde_json::value::RawValue; use std::fmt; use std::sync::Arc; use std::time::Duration; @@ -46,6 +49,7 @@ pub struct Web3ProxyApp { private_rpcs: Arc, incoming_requests: ActiveRequestsMap, response_cache: ResponseLrcCache, + head_block_receiver: flume::Receiver>, } impl fmt::Debug for Web3ProxyApp { @@ -97,11 +101,13 @@ impl Web3ProxyApp { ) .await?; + let (head_block_sender, head_block_receiver) = flume::unbounded(); + // TODO: do this separately instead of during try_new { let balanced_rpcs = balanced_rpcs.clone(); task::spawn(async move { - balanced_rpcs.subscribe_heads().await; + balanced_rpcs.subscribe_all_heads(head_block_sender).await; }); } @@ -124,17 +130,57 @@ impl Web3ProxyApp { private_rpcs, incoming_requests: Default::default(), response_cache: Default::default(), + head_block_receiver, }) } pub async fn eth_subscribe( &self, + id: Box, payload: JsonRpcRequest, - // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its way easier for now - // TODO: i think we will need to change this to support unsubscribe - subscription_tx: Abortable>, - ) -> anyhow::Result { - unimplemented!(); + // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now + subscription_tx: flume::Sender, + ) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> { + let (subscription_handle, subscription_registration) = AbortHandle::new_pair(); + + let f = { + let head_block_receiver = self.head_block_receiver.clone(); + let id = id.clone(); + + if payload.params.as_deref().unwrap().to_string() == r#"["newHeads"]"# { + async move { + let mut head_block_receiver = Abortable::new( + head_block_receiver.into_recv_async().into_stream(), + subscription_registration, + ); + + while let Some(Ok(new_head)) = head_block_receiver.next().await { + // TODO: this String to RawValue probably not efficient, but it works for now + let msg = JsonRpcForwardedResponse::from_string( + serde_json::to_string(&new_head).unwrap(), + id.clone(), + ); + + let msg = Message::Text(serde_json::to_string(&msg).unwrap()); + + info!(?msg); + + subscription_tx.send_async(msg).await.unwrap(); + } + } + } else { + return Err(anyhow::anyhow!("unimplemented")); + } + }; + + tokio::spawn(f); + + // TODO: generate subscription_id as needed. atomic u16? + let subscription_id = "0xcd0c3e8af590364c09d0fa6a1210faf5".to_string(); + + let response = JsonRpcForwardedResponse::from_string(subscription_id, id); + + Ok((subscription_handle, response)) } pub fn get_balanced_rpcs(&self) -> &Web3Connections { diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 99e9bb35..a025bb7c 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -1,6 +1,6 @@ ///! Rate-limited communication with a web3 provider use derive_more::From; -use ethers::prelude::{Block, Middleware, ProviderError, TxHash, H256}; +use ethers::prelude::{Block, Middleware, ProviderError, TxHash}; use futures::StreamExt; use redis_cell_client::RedisCellClient; use serde::ser::{SerializeStruct, Serializer}; @@ -118,7 +118,7 @@ impl Web3Connection { #[instrument(skip_all)] pub async fn reconnect( self: &Arc, - block_sender: &flume::Sender<(u64, H256, usize)>, + block_sender: &flume::Sender<(Block, usize)>, rpc_id: usize, ) -> anyhow::Result<()> { // websocket doesn't need the http client @@ -128,7 +128,7 @@ impl Web3Connection { let mut provider = self.provider.write().await; // tell the block subscriber that we are at 0 - block_sender.send_async((0, H256::zero(), rpc_id)).await?; + block_sender.send_async((Block::default(), rpc_id)).await?; let new_provider = Web3Provider::from_str(&self.url, http_client).await?; @@ -218,19 +218,13 @@ impl Web3Connection { async fn send_block( self: &Arc, block: Result, ProviderError>, - block_sender: &flume::Sender<(u64, H256, usize)>, + block_sender: &flume::Sender<(Block, usize)>, rpc_id: usize, ) { match block { Ok(block) => { - let block_number = block.number.unwrap().as_u64(); - let block_hash = block.hash.unwrap(); - // TODO: i'm pretty sure we don't need send_async, but double check - block_sender - .send_async((block_number, block_hash, rpc_id)) - .await - .unwrap(); + block_sender.send_async((block, rpc_id)).await.unwrap(); } Err(e) => { warn!("unable to get block from {}: {}", self, e); @@ -244,7 +238,7 @@ impl Web3Connection { pub async fn subscribe_new_heads( self: Arc, rpc_id: usize, - block_sender: flume::Sender<(u64, H256, usize)>, + block_sender: flume::Sender<(Block, usize)>, reconnect: bool, ) -> anyhow::Result<()> { loop { diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 473f68dc..0953908e 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -2,7 +2,7 @@ use arc_swap::ArcSwap; use counter::Counter; use derive_more::From; -use ethers::prelude::{ProviderError, H256}; +use ethers::prelude::{Block, ProviderError, TxHash, H256}; use futures::future::join_all; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -113,9 +113,12 @@ impl Web3Connections { Ok(connections) } - pub async fn subscribe_heads(self: &Arc) { - // TODO: benchmark bounded vs unbounded? - let (block_sender, block_receiver) = flume::unbounded(); + pub async fn subscribe_all_heads( + self: &Arc, + head_block_sender: flume::Sender>, + ) { + // TODO: benchmark bounded vs unbounded + let (block_sender, block_receiver) = flume::unbounded::<(Block, usize)>(); let mut handles = vec![]; @@ -146,7 +149,11 @@ impl Web3Connections { let connections = Arc::clone(self); let handle = task::Builder::default() .name("update_synced_rpcs") - .spawn(async move { connections.update_synced_rpcs(block_receiver).await }); + .spawn(async move { + connections + .update_synced_rpcs(block_receiver, head_block_sender) + .await + }); handles.push(handle); @@ -219,16 +226,19 @@ impl Web3Connections { // we don't instrument here because we put a span inside the while loop async fn update_synced_rpcs( &self, - block_receiver: flume::Receiver<(u64, H256, usize)>, + block_receiver: flume::Receiver<(Block, usize)>, + head_block_sender: flume::Sender>, ) -> anyhow::Result<()> { let total_rpcs = self.inner.len(); - let mut connection_states: HashMap = - HashMap::with_capacity(total_rpcs); + let mut connection_states: HashMap = HashMap::with_capacity(total_rpcs); let mut pending_synced_connections = SyncedConnections::default(); - while let Ok((new_block_num, new_block_hash, rpc_id)) = block_receiver.recv_async().await { + while let Ok((new_block, rpc_id)) = block_receiver.recv_async().await { + let new_block_num = new_block.number.unwrap().as_u64(); + let new_block_hash = new_block.hash.unwrap(); + // TODO: span with more in it? // TODO: make sure i'm doing this span right // TODO: show the actual rpc url? @@ -256,6 +266,8 @@ impl Web3Connections { // TODO: if the parent hash isn't our previous best block, ignore it pending_synced_connections.head_block_hash = new_block_hash; + + head_block_sender.send_async(new_block).await?; } cmp::Ordering::Equal => { if new_block_hash == pending_synced_connections.head_block_hash { @@ -304,7 +316,12 @@ impl Web3Connections { most_common_head_hash ); - pending_synced_connections.head_block_hash = most_common_head_hash; + // TODO: do this more efficiently? + if pending_synced_connections.head_block_hash != most_common_head_hash { + head_block_sender.send_async(new_block).await?; + pending_synced_connections.head_block_hash = most_common_head_hash; + } + pending_synced_connections.inner = synced_rpcs.into_iter().collect(); } } diff --git a/web3-proxy/src/frontend.rs b/web3-proxy/src/frontend.rs index 1bcc9ce2..a7c3643c 100644 --- a/web3-proxy/src/frontend.rs +++ b/web3-proxy/src/frontend.rs @@ -7,7 +7,6 @@ use axum::{ routing::{get, post}, Extension, Json, Router, }; -use futures::future::{AbortHandle, Abortable}; use futures::stream::{SplitSink, SplitStream, StreamExt}; use futures::SinkExt; use hashbrown::HashMap; @@ -15,7 +14,7 @@ use serde_json::json; use serde_json::value::RawValue; use std::net::SocketAddr; use std::sync::Arc; -use tracing::warn; +use tracing::{error, warn}; use crate::{ app::Web3ProxyApp, @@ -102,34 +101,38 @@ async fn read_web3_socket( let response: anyhow::Result = if payload.method == "eth_subscribe" { - let (subscription_handle, subscription_registration) = - AbortHandle::new_pair(); - - let subscription_response_tx = - Abortable::new(response_tx.clone(), subscription_registration); - - let response: anyhow::Result = app + // TODO: if we pass eth_subscribe the response_tx, we + let response = app .0 - .eth_subscribe(payload, subscription_response_tx) - .await - .map(Into::into); + .eth_subscribe(id.clone(), payload, response_tx.clone()) + .await; - if let Ok(response) = &response { - // TODO: better key. maybe we should just use u64 - subscriptions.insert( - response.result.as_ref().unwrap().to_string(), - subscription_handle, - ); + match response { + Ok((handle, response)) => { + // TODO: better key + subscriptions.insert( + response.result.as_ref().unwrap().to_string(), + handle, + ); + + Ok(response.into()) + } + Err(err) => Err(err), } - - response.map(JsonRpcForwardedResponseEnum::Single) } else if payload.method == "eth_unsubscribe" { let subscription_id = payload.params.unwrap().to_string(); - subscriptions.remove(&subscription_id); + let partial_response = match subscriptions.remove(&subscription_id) + { + None => "false", + Some(handle) => { + handle.abort(); + "true" + } + }; let response = JsonRpcForwardedResponse::from_string( - "true".to_string(), + partial_response.to_string(), id.clone(), ); @@ -166,9 +169,12 @@ async fn read_web3_socket( _ => unimplemented!(), }; - if response_tx.send_async(response_msg).await.is_err() { - // TODO: log the error - break; + match response_tx.send_async(response_msg).await { + Ok(_) => {} + Err(err) => { + error!("{}", err); + break; + } }; } }