subscriptions getting closer

This commit is contained in:
Bryan Stitt 2022-05-30 04:30:13 +00:00
parent 57f0993852
commit 09db979ba3
6 changed files with 37 additions and 15 deletions

13
Cargo.lock generated
View File

@ -3473,6 +3473,18 @@ dependencies = [
"webpki", "webpki",
] ]
[[package]]
name = "tokio-stream"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util 0.6.10",
]
[[package]] [[package]]
name = "tokio-tungstenite" name = "tokio-tungstenite"
version = "0.17.1" version = "0.17.1"
@ -3928,6 +3940,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"tokio-stream",
"toml", "toml",
"tower", "tower",
"tracing", "tracing",

View File

@ -36,3 +36,4 @@ tracing = "0.1.34"
tracing-subscriber = { version = "0.3.11", features = ["env-filter", "parking_lot"] } tracing-subscriber = { version = "0.3.11", features = ["env-filter", "parking_lot"] }
url = "2.2.2" url = "2.2.2"
tower = "0.4.12" tower = "0.4.12"
tokio-stream = { version = "0.1.8", features = ["sync"] }

View File

@ -10,7 +10,6 @@ use ethers::prelude::{Block, TxHash, H256};
use futures::future::Abortable; use futures::future::Abortable;
use futures::future::{join_all, AbortHandle}; use futures::future::{join_all, AbortHandle};
use futures::stream::StreamExt; use futures::stream::StreamExt;
use futures::FutureExt;
use linkedhashmap::LinkedHashMap; use linkedhashmap::LinkedHashMap;
use parking_lot::RwLock; use parking_lot::RwLock;
use serde_json::value::RawValue; use serde_json::value::RawValue;
@ -20,6 +19,7 @@ use std::time::Duration;
use tokio::sync::watch; use tokio::sync::watch;
use tokio::task; use tokio::task;
use tokio::time::timeout; use tokio::time::timeout;
use tokio_stream::wrappers::WatchStream;
use tracing::{debug, info, instrument, trace, warn}; use tracing::{debug, info, instrument, trace, warn};
static APP_USER_AGENT: &str = concat!( static APP_USER_AGENT: &str = concat!(
@ -49,7 +49,8 @@ pub struct Web3ProxyApp {
private_rpcs: Arc<Web3Connections>, private_rpcs: Arc<Web3Connections>,
incoming_requests: ActiveRequestsMap, incoming_requests: ActiveRequestsMap,
response_cache: ResponseLrcCache, response_cache: ResponseLrcCache,
head_block_receiver: flume::Receiver<Block<TxHash>>, // don't drop this or the sender will stop working
head_block_receiver: watch::Receiver<Block<TxHash>>,
} }
impl fmt::Debug for Web3ProxyApp { impl fmt::Debug for Web3ProxyApp {
@ -101,7 +102,7 @@ impl Web3ProxyApp {
) )
.await?; .await?;
let (head_block_sender, head_block_receiver) = flume::unbounded(); let (head_block_sender, head_block_receiver) = watch::channel(Block::default());
// TODO: do this separately instead of during try_new // TODO: do this separately instead of during try_new
{ {
@ -148,13 +149,14 @@ impl Web3ProxyApp {
let id = id.clone(); let id = id.clone();
if payload.params.as_deref().unwrap().to_string() == r#"["newHeads"]"# { if payload.params.as_deref().unwrap().to_string() == r#"["newHeads"]"# {
info!("received new heads subscription");
async move { async move {
let mut head_block_receiver = Abortable::new( let mut head_block_receiver = Abortable::new(
head_block_receiver.into_recv_async().into_stream(), WatchStream::new(head_block_receiver),
subscription_registration, subscription_registration,
); );
while let Some(Ok(new_head)) = head_block_receiver.next().await { while let Some(new_head) = head_block_receiver.next().await {
// TODO: this String to RawValue probably not efficient, but it works for now // TODO: this String to RawValue probably not efficient, but it works for now
let msg = JsonRpcForwardedResponse::from_string( let msg = JsonRpcForwardedResponse::from_string(
serde_json::to_string(&new_head).unwrap(), serde_json::to_string(&new_head).unwrap(),
@ -176,7 +178,7 @@ impl Web3ProxyApp {
tokio::spawn(f); tokio::spawn(f);
// TODO: generate subscription_id as needed. atomic u16? // TODO: generate subscription_id as needed. atomic u16?
let subscription_id = "0xcd0c3e8af590364c09d0fa6a1210faf5".to_string(); let subscription_id = r#""0xcd0c3e8af590364c09d0fa6a1210faf5""#.to_string();
let response = JsonRpcForwardedResponse::from_string(subscription_id, id); let response = JsonRpcForwardedResponse::from_string(subscription_id, id);

View File

@ -220,16 +220,18 @@ impl Web3Connection {
block: Result<Block<TxHash>, ProviderError>, block: Result<Block<TxHash>, ProviderError>,
block_sender: &flume::Sender<(Block<TxHash>, usize)>, block_sender: &flume::Sender<(Block<TxHash>, usize)>,
rpc_id: usize, rpc_id: usize,
) { ) -> anyhow::Result<()> {
match block { match block {
Ok(block) => { Ok(block) => {
// TODO: i'm pretty sure we don't need send_async, but double check // TODO: i'm pretty sure we don't need send_async, but double check
block_sender.send_async((block, rpc_id)).await.unwrap(); block_sender.send_async((block, rpc_id)).await?;
} }
Err(e) => { Err(e) => {
warn!("unable to get block from {}: {}", self, e); warn!("unable to get block from {}: {}", self, e);
} }
} }
Ok(())
} }
/// Subscribe to new blocks. If `reconnect` is true, this runs forever. /// Subscribe to new blocks. If `reconnect` is true, this runs forever.
@ -283,7 +285,7 @@ impl Web3Connection {
last_hash = new_hash; last_hash = new_hash;
} }
self.send_block(block, &block_sender, rpc_id).await; self.send_block(block, &block_sender, rpc_id).await?;
} }
Err(e) => { Err(e) => {
warn!("Failed getting latest block from {}: {:?}", self, e); warn!("Failed getting latest block from {}: {:?}", self, e);
@ -311,14 +313,15 @@ impl Web3Connection {
.request("eth_getBlockByNumber", ("latest", false)) .request("eth_getBlockByNumber", ("latest", false))
.await; .await;
self.send_block(block, &block_sender, rpc_id).await; self.send_block(block, &block_sender, rpc_id).await?;
// TODO: should the stream have a timeout on it here? // TODO: should the stream have a timeout on it here?
// TODO: although reconnects will make this less of an issue // TODO: although reconnects will make this less of an issue
loop { loop {
match stream.next().await { match stream.next().await {
Some(new_block) => { Some(new_block) => {
self.send_block(Ok(new_block), &block_sender, rpc_id).await; self.send_block(Ok(new_block), &block_sender, rpc_id)
.await?;
// TODO: really not sure about this // TODO: really not sure about this
task::yield_now().await; task::yield_now().await;

View File

@ -15,6 +15,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::fmt; use std::fmt;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::watch;
use tokio::task; use tokio::task;
use tokio::time::sleep; use tokio::time::sleep;
use tracing::Instrument; use tracing::Instrument;
@ -115,7 +116,7 @@ impl Web3Connections {
pub async fn subscribe_all_heads( pub async fn subscribe_all_heads(
self: &Arc<Self>, self: &Arc<Self>,
head_block_sender: flume::Sender<Block<TxHash>>, head_block_sender: watch::Sender<Block<TxHash>>,
) { ) {
// TODO: benchmark bounded vs unbounded // TODO: benchmark bounded vs unbounded
let (block_sender, block_receiver) = flume::unbounded::<(Block<TxHash>, usize)>(); let (block_sender, block_receiver) = flume::unbounded::<(Block<TxHash>, usize)>();
@ -227,7 +228,7 @@ impl Web3Connections {
async fn update_synced_rpcs( async fn update_synced_rpcs(
&self, &self,
block_receiver: flume::Receiver<(Block<TxHash>, usize)>, block_receiver: flume::Receiver<(Block<TxHash>, usize)>,
head_block_sender: flume::Sender<Block<TxHash>>, head_block_sender: watch::Sender<Block<TxHash>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let total_rpcs = self.inner.len(); let total_rpcs = self.inner.len();
@ -267,7 +268,7 @@ impl Web3Connections {
// TODO: if the parent hash isn't our previous best block, ignore it // TODO: if the parent hash isn't our previous best block, ignore it
pending_synced_connections.head_block_hash = new_block_hash; pending_synced_connections.head_block_hash = new_block_hash;
head_block_sender.send_async(new_block).await?; head_block_sender.send(new_block)?;
} }
cmp::Ordering::Equal => { cmp::Ordering::Equal => {
if new_block_hash == pending_synced_connections.head_block_hash { if new_block_hash == pending_synced_connections.head_block_hash {
@ -318,7 +319,7 @@ impl Web3Connections {
// TODO: do this more efficiently? // TODO: do this more efficiently?
if pending_synced_connections.head_block_hash != most_common_head_hash { if pending_synced_connections.head_block_hash != most_common_head_hash {
head_block_sender.send_async(new_block).await?; head_block_sender.send(new_block)?;
pending_synced_connections.head_block_hash = most_common_head_hash; pending_synced_connections.head_block_hash = most_common_head_hash;
} }

View File

@ -4,6 +4,7 @@ use serde::de::{self, Deserialize, Deserializer, MapAccess, SeqAccess, Visitor};
use serde::Serialize; use serde::Serialize;
use serde_json::value::RawValue; use serde_json::value::RawValue;
use std::fmt; use std::fmt;
use tracing::trace;
#[derive(Clone, serde::Deserialize)] #[derive(Clone, serde::Deserialize)]
pub struct JsonRpcRequest { pub struct JsonRpcRequest {
@ -191,6 +192,7 @@ impl JsonRpcForwardedResponse {
} }
pub fn from_string(partial_response: String, id: Box<RawValue>) -> Self { pub fn from_string(partial_response: String, id: Box<RawValue>) -> Self {
trace!("partial_response: {}", partial_response);
let partial_response = RawValue::from_string(partial_response).unwrap(); let partial_response = RawValue::from_string(partial_response).unwrap();
Self::from_response(partial_response, id) Self::from_response(partial_response, id)