closer websocket (not yet working)

This commit is contained in:
Bryan Stitt 2022-05-30 01:28:22 +00:00
parent 1945a353dd
commit 57f0993852
4 changed files with 118 additions and 55 deletions

@ -6,11 +6,14 @@ use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum; use crate::jsonrpc::JsonRpcRequestEnum;
use axum::extract::ws::Message; use axum::extract::ws::Message;
use dashmap::DashMap; use dashmap::DashMap;
use ethers::prelude::H256; use ethers::prelude::{Block, TxHash, H256};
use futures::future::join_all;
use futures::future::Abortable; use futures::future::Abortable;
use futures::future::{join_all, AbortHandle};
use futures::stream::StreamExt;
use futures::FutureExt;
use linkedhashmap::LinkedHashMap; use linkedhashmap::LinkedHashMap;
use parking_lot::RwLock; use parking_lot::RwLock;
use serde_json::value::RawValue;
use std::fmt; use std::fmt;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -46,6 +49,7 @@ pub struct Web3ProxyApp {
private_rpcs: Arc<Web3Connections>, private_rpcs: Arc<Web3Connections>,
incoming_requests: ActiveRequestsMap, incoming_requests: ActiveRequestsMap,
response_cache: ResponseLrcCache, response_cache: ResponseLrcCache,
head_block_receiver: flume::Receiver<Block<TxHash>>,
} }
impl fmt::Debug for Web3ProxyApp { impl fmt::Debug for Web3ProxyApp {
@ -97,11 +101,13 @@ impl Web3ProxyApp {
) )
.await?; .await?;
let (head_block_sender, head_block_receiver) = flume::unbounded();
// TODO: do this separately instead of during try_new // TODO: do this separately instead of during try_new
{ {
let balanced_rpcs = balanced_rpcs.clone(); let balanced_rpcs = balanced_rpcs.clone();
task::spawn(async move { task::spawn(async move {
balanced_rpcs.subscribe_heads().await; balanced_rpcs.subscribe_all_heads(head_block_sender).await;
}); });
} }
@ -124,17 +130,57 @@ impl Web3ProxyApp {
private_rpcs, private_rpcs,
incoming_requests: Default::default(), incoming_requests: Default::default(),
response_cache: Default::default(), response_cache: Default::default(),
head_block_receiver,
}) })
} }
pub async fn eth_subscribe( pub async fn eth_subscribe(
&self, &self,
id: Box<RawValue>,
payload: JsonRpcRequest, payload: JsonRpcRequest,
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its way easier for now // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
// TODO: i think we will need to change this to support unsubscribe subscription_tx: flume::Sender<Message>,
subscription_tx: Abortable<flume::Sender<Message>>, ) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> {
) -> anyhow::Result<JsonRpcForwardedResponse> { let (subscription_handle, subscription_registration) = AbortHandle::new_pair();
unimplemented!();
let f = {
let head_block_receiver = self.head_block_receiver.clone();
let id = id.clone();
if payload.params.as_deref().unwrap().to_string() == r#"["newHeads"]"# {
async move {
let mut head_block_receiver = Abortable::new(
head_block_receiver.into_recv_async().into_stream(),
subscription_registration,
);
while let Some(Ok(new_head)) = head_block_receiver.next().await {
// TODO: this String to RawValue probably not efficient, but it works for now
let msg = JsonRpcForwardedResponse::from_string(
serde_json::to_string(&new_head).unwrap(),
id.clone(),
);
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
info!(?msg);
subscription_tx.send_async(msg).await.unwrap();
}
}
} else {
return Err(anyhow::anyhow!("unimplemented"));
}
};
tokio::spawn(f);
// TODO: generate subscription_id as needed. atomic u16?
let subscription_id = "0xcd0c3e8af590364c09d0fa6a1210faf5".to_string();
let response = JsonRpcForwardedResponse::from_string(subscription_id, id);
Ok((subscription_handle, response))
} }
pub fn get_balanced_rpcs(&self) -> &Web3Connections { pub fn get_balanced_rpcs(&self) -> &Web3Connections {

@ -1,6 +1,6 @@
///! Rate-limited communication with a web3 provider ///! Rate-limited communication with a web3 provider
use derive_more::From; use derive_more::From;
use ethers::prelude::{Block, Middleware, ProviderError, TxHash, H256}; use ethers::prelude::{Block, Middleware, ProviderError, TxHash};
use futures::StreamExt; use futures::StreamExt;
use redis_cell_client::RedisCellClient; use redis_cell_client::RedisCellClient;
use serde::ser::{SerializeStruct, Serializer}; use serde::ser::{SerializeStruct, Serializer};
@ -118,7 +118,7 @@ impl Web3Connection {
#[instrument(skip_all)] #[instrument(skip_all)]
pub async fn reconnect( pub async fn reconnect(
self: &Arc<Self>, self: &Arc<Self>,
block_sender: &flume::Sender<(u64, H256, usize)>, block_sender: &flume::Sender<(Block<TxHash>, usize)>,
rpc_id: usize, rpc_id: usize,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// websocket doesn't need the http client // websocket doesn't need the http client
@ -128,7 +128,7 @@ impl Web3Connection {
let mut provider = self.provider.write().await; let mut provider = self.provider.write().await;
// tell the block subscriber that we are at 0 // tell the block subscriber that we are at 0
block_sender.send_async((0, H256::zero(), rpc_id)).await?; block_sender.send_async((Block::default(), rpc_id)).await?;
let new_provider = Web3Provider::from_str(&self.url, http_client).await?; let new_provider = Web3Provider::from_str(&self.url, http_client).await?;
@ -218,19 +218,13 @@ impl Web3Connection {
async fn send_block( async fn send_block(
self: &Arc<Self>, self: &Arc<Self>,
block: Result<Block<TxHash>, ProviderError>, block: Result<Block<TxHash>, ProviderError>,
block_sender: &flume::Sender<(u64, H256, usize)>, block_sender: &flume::Sender<(Block<TxHash>, usize)>,
rpc_id: usize, rpc_id: usize,
) { ) {
match block { match block {
Ok(block) => { Ok(block) => {
let block_number = block.number.unwrap().as_u64();
let block_hash = block.hash.unwrap();
// TODO: i'm pretty sure we don't need send_async, but double check // TODO: i'm pretty sure we don't need send_async, but double check
block_sender block_sender.send_async((block, rpc_id)).await.unwrap();
.send_async((block_number, block_hash, rpc_id))
.await
.unwrap();
} }
Err(e) => { Err(e) => {
warn!("unable to get block from {}: {}", self, e); warn!("unable to get block from {}: {}", self, e);
@ -244,7 +238,7 @@ impl Web3Connection {
pub async fn subscribe_new_heads( pub async fn subscribe_new_heads(
self: Arc<Self>, self: Arc<Self>,
rpc_id: usize, rpc_id: usize,
block_sender: flume::Sender<(u64, H256, usize)>, block_sender: flume::Sender<(Block<TxHash>, usize)>,
reconnect: bool, reconnect: bool,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
loop { loop {

@ -2,7 +2,7 @@
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
use counter::Counter; use counter::Counter;
use derive_more::From; use derive_more::From;
use ethers::prelude::{ProviderError, H256}; use ethers::prelude::{Block, ProviderError, TxHash, H256};
use futures::future::join_all; use futures::future::join_all;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
@ -113,9 +113,12 @@ impl Web3Connections {
Ok(connections) Ok(connections)
} }
pub async fn subscribe_heads(self: &Arc<Self>) { pub async fn subscribe_all_heads(
// TODO: benchmark bounded vs unbounded? self: &Arc<Self>,
let (block_sender, block_receiver) = flume::unbounded(); head_block_sender: flume::Sender<Block<TxHash>>,
) {
// TODO: benchmark bounded vs unbounded
let (block_sender, block_receiver) = flume::unbounded::<(Block<TxHash>, usize)>();
let mut handles = vec![]; let mut handles = vec![];
@ -146,7 +149,11 @@ impl Web3Connections {
let connections = Arc::clone(self); let connections = Arc::clone(self);
let handle = task::Builder::default() let handle = task::Builder::default()
.name("update_synced_rpcs") .name("update_synced_rpcs")
.spawn(async move { connections.update_synced_rpcs(block_receiver).await }); .spawn(async move {
connections
.update_synced_rpcs(block_receiver, head_block_sender)
.await
});
handles.push(handle); handles.push(handle);
@ -219,16 +226,19 @@ impl Web3Connections {
// we don't instrument here because we put a span inside the while loop // we don't instrument here because we put a span inside the while loop
async fn update_synced_rpcs( async fn update_synced_rpcs(
&self, &self,
block_receiver: flume::Receiver<(u64, H256, usize)>, block_receiver: flume::Receiver<(Block<TxHash>, usize)>,
head_block_sender: flume::Sender<Block<TxHash>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let total_rpcs = self.inner.len(); let total_rpcs = self.inner.len();
let mut connection_states: HashMap<usize, (u64, H256)> = let mut connection_states: HashMap<usize, _> = HashMap::with_capacity(total_rpcs);
HashMap::with_capacity(total_rpcs);
let mut pending_synced_connections = SyncedConnections::default(); let mut pending_synced_connections = SyncedConnections::default();
while let Ok((new_block_num, new_block_hash, rpc_id)) = block_receiver.recv_async().await { while let Ok((new_block, rpc_id)) = block_receiver.recv_async().await {
let new_block_num = new_block.number.unwrap().as_u64();
let new_block_hash = new_block.hash.unwrap();
// TODO: span with more in it? // TODO: span with more in it?
// TODO: make sure i'm doing this span right // TODO: make sure i'm doing this span right
// TODO: show the actual rpc url? // TODO: show the actual rpc url?
@ -256,6 +266,8 @@ impl Web3Connections {
// TODO: if the parent hash isn't our previous best block, ignore it // TODO: if the parent hash isn't our previous best block, ignore it
pending_synced_connections.head_block_hash = new_block_hash; pending_synced_connections.head_block_hash = new_block_hash;
head_block_sender.send_async(new_block).await?;
} }
cmp::Ordering::Equal => { cmp::Ordering::Equal => {
if new_block_hash == pending_synced_connections.head_block_hash { if new_block_hash == pending_synced_connections.head_block_hash {
@ -304,7 +316,12 @@ impl Web3Connections {
most_common_head_hash most_common_head_hash
); );
// TODO: do this more efficiently?
if pending_synced_connections.head_block_hash != most_common_head_hash {
head_block_sender.send_async(new_block).await?;
pending_synced_connections.head_block_hash = most_common_head_hash; pending_synced_connections.head_block_hash = most_common_head_hash;
}
pending_synced_connections.inner = synced_rpcs.into_iter().collect(); pending_synced_connections.inner = synced_rpcs.into_iter().collect();
} }
} }

@ -7,7 +7,6 @@ use axum::{
routing::{get, post}, routing::{get, post},
Extension, Json, Router, Extension, Json, Router,
}; };
use futures::future::{AbortHandle, Abortable};
use futures::stream::{SplitSink, SplitStream, StreamExt}; use futures::stream::{SplitSink, SplitStream, StreamExt};
use futures::SinkExt; use futures::SinkExt;
use hashbrown::HashMap; use hashbrown::HashMap;
@ -15,7 +14,7 @@ use serde_json::json;
use serde_json::value::RawValue; use serde_json::value::RawValue;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use tracing::warn; use tracing::{error, warn};
use crate::{ use crate::{
app::Web3ProxyApp, app::Web3ProxyApp,
@ -102,34 +101,38 @@ async fn read_web3_socket(
let response: anyhow::Result<JsonRpcForwardedResponseEnum> = let response: anyhow::Result<JsonRpcForwardedResponseEnum> =
if payload.method == "eth_subscribe" { if payload.method == "eth_subscribe" {
let (subscription_handle, subscription_registration) = // TODO: if we pass eth_subscribe the response_tx, we
AbortHandle::new_pair(); let response = app
let subscription_response_tx =
Abortable::new(response_tx.clone(), subscription_registration);
let response: anyhow::Result<JsonRpcForwardedResponse> = app
.0 .0
.eth_subscribe(payload, subscription_response_tx) .eth_subscribe(id.clone(), payload, response_tx.clone())
.await .await;
.map(Into::into);
if let Ok(response) = &response { match response {
// TODO: better key. maybe we should just use u64 Ok((handle, response)) => {
// TODO: better key
subscriptions.insert( subscriptions.insert(
response.result.as_ref().unwrap().to_string(), response.result.as_ref().unwrap().to_string(),
subscription_handle, handle,
); );
}
response.map(JsonRpcForwardedResponseEnum::Single) Ok(response.into())
}
Err(err) => Err(err),
}
} else if payload.method == "eth_unsubscribe" { } else if payload.method == "eth_unsubscribe" {
let subscription_id = payload.params.unwrap().to_string(); let subscription_id = payload.params.unwrap().to_string();
subscriptions.remove(&subscription_id); let partial_response = match subscriptions.remove(&subscription_id)
{
None => "false",
Some(handle) => {
handle.abort();
"true"
}
};
let response = JsonRpcForwardedResponse::from_string( let response = JsonRpcForwardedResponse::from_string(
"true".to_string(), partial_response.to_string(),
id.clone(), id.clone(),
); );
@ -166,9 +169,12 @@ async fn read_web3_socket(
_ => unimplemented!(), _ => unimplemented!(),
}; };
if response_tx.send_async(response_msg).await.is_err() { match response_tx.send_async(response_msg).await {
// TODO: log the error Ok(_) => {}
Err(err) => {
error!("{}", err);
break; break;
}
}; };
} }
} }