From df15353a8378541abe2b3314d0ed586edff9fb3e Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 8 Jul 2022 22:14:45 +0000 Subject: [PATCH] per connection subscription id --- TODO.md | 4 +-- web3-proxy/src/app.rs | 27 +++++++++----------- web3-proxy/src/connection.rs | 27 ++++++-------------- web3-proxy/src/frontend/ws_proxy.rs | 39 ++++++++++++++++++++++------- 4 files changed, 52 insertions(+), 45 deletions(-) diff --git a/TODO.md b/TODO.md index 44f96366..a9ab9679 100644 --- a/TODO.md +++ b/TODO.md @@ -89,10 +89,10 @@ - [ ] add the backend server to the header? - [ ] think more about how multiple rpc tiers should work - maybe always try at least two servers in parallel? and then return the first? or only if the first one doesn't respond very quickly? this doubles our request load though. -- [ ] one proxy for mulitple chains? +- [ ] one proxy for multiple chains? - [ ] zero downtime deploys - [ ] are we using Acquire/Release/AcqRel properly? or do we need other modes? -- [ ] subscription id should be per connection, not global +- [x] subscription id should be per connection, not global - [ ] use https://github.com/ledgerwatch/interfaces to talk to erigon directly instead of through erigon's rpcdaemon (possible example code which uses ledgerwatch/interfaces: https://github.com/akula-bft/akula/tree/master) - [ ] subscribe to pending transactions and build an intelligent gas estimator - [ ] include private rpcs with regular queries? i don't want to overwhelm them, but they could be good for excess load diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 88a26498..8f741ce4 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -96,7 +96,6 @@ pub struct Web3ProxyApp { pending_tx_sender: broadcast::Sender, pending_transactions: Arc>, public_rate_limiter: Option, - next_subscription_id: AtomicUsize, } impl fmt::Debug for Web3ProxyApp { @@ -227,7 +226,6 @@ impl Web3ProxyApp { pending_tx_sender, pending_transactions, public_rate_limiter, - next_subscription_id: 1.into(), }; let app = Arc::new(app); @@ -242,21 +240,20 @@ impl Web3ProxyApp { pub async fn eth_subscribe( self: Arc, payload: JsonRpcRequest, + subscription_count: &AtomicUsize, // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now - subscription_tx: flume::Sender, + response_sender: flume::Sender, ) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> { let (subscription_abort_handle, subscription_registration) = AbortHandle::new_pair(); // TODO: this only needs to be unique per connection. we don't need it globably unique - let subscription_id = self - .next_subscription_id - .fetch_add(1, atomic::Ordering::SeqCst); + let subscription_id = subscription_count.fetch_add(1, atomic::Ordering::SeqCst); let subscription_id = format!("{:#x}", subscription_id); // save the id so we can use it in the response let id = payload.id.clone(); - // TODO: calling json! on every request is not fast. + // TODO: calling json! on every request is probably not fast. match payload.params { Some(x) if x == json!(["newHeads"]) => { let head_block_receiver = self.head_block_receiver.clone(); @@ -283,7 +280,7 @@ impl Web3ProxyApp { let msg = Message::Text(serde_json::to_string(&msg).unwrap()); - if subscription_tx.send_async(msg).await.is_err() { + if response_sender.send_async(msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; @@ -295,13 +292,13 @@ impl Web3ProxyApp { Some(x) if x == json!(["newPendingTransactions"]) => { let pending_tx_receiver = self.pending_tx_sender.subscribe(); + let subscription_id = subscription_id.clone(); + let mut pending_tx_receiver = Abortable::new( BroadcastStream::new(pending_tx_receiver), subscription_registration, ); - let subscription_id = subscription_id.clone(); - trace!(?subscription_id, "pending transactions subscription"); tokio::spawn(async move { while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { @@ -323,7 +320,7 @@ impl Web3ProxyApp { let msg = Message::Text(serde_json::to_string(&msg).unwrap()); - if subscription_tx.send_async(msg).await.is_err() { + if response_sender.send_async(msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; @@ -336,13 +333,13 @@ impl Web3ProxyApp { // TODO: too much copy/pasta with newPendingTransactions let pending_tx_receiver = self.pending_tx_sender.subscribe(); + let subscription_id = subscription_id.clone(); + let mut pending_tx_receiver = Abortable::new( BroadcastStream::new(pending_tx_receiver), subscription_registration, ); - let subscription_id = subscription_id.clone(); - trace!(?subscription_id, "pending transactions subscription"); // TODO: do something with this handle? @@ -367,7 +364,7 @@ impl Web3ProxyApp { let msg = Message::Text(serde_json::to_string(&msg).unwrap()); - if subscription_tx.send_async(msg).await.is_err() { + if response_sender.send_async(msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; @@ -411,7 +408,7 @@ impl Web3ProxyApp { let msg = Message::Text(serde_json::to_string(&msg).unwrap()); - if subscription_tx.send_async(msg).await.is_err() { + if response_sender.send_async(msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 3006fcf4..22377bbb 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -13,7 +13,6 @@ use std::sync::atomic::{self, AtomicU32}; use std::{cmp::Ordering, sync::Arc}; use tokio::sync::broadcast; use tokio::sync::RwLock; -use tokio::task; use tokio::time::{interval, sleep, Duration, MissedTickBehavior}; use tracing::{error, info, instrument, trace, warn}; @@ -42,7 +41,7 @@ impl Web3Provider { let provider = ethers::providers::Http::new_with_client(url, http_client); // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) - // TODO: i don't think this interval matters, but it should probably come from config + // TODO: i don't think this interval matters for our uses, but we should probably set it to like `block time / 2` ethers::providers::Provider::new(provider) .interval(Duration::from_secs(13)) .into() @@ -50,9 +49,8 @@ impl Web3Provider { // TODO: wrapper automatically reconnect let provider = ethers::providers::Ws::connect(url_str).await?; - // TODO: make sure this automatically reconnects - // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) + // TODO: i don't think this interval matters ethers::providers::Provider::new(provider) .interval(Duration::from_secs(1)) .into() @@ -376,8 +374,8 @@ impl Web3Connection { self.send_block(block, &block_sender).await?; } - Err(e) => { - warn!("Failed getting latest block from {}: {:?}", self, e); + Err(err) => { + warn!(?err, "Rate limited on latest block from {}", self); } } @@ -407,20 +405,11 @@ impl Web3Connection { // TODO: should the stream have a timeout on it here? // TODO: although reconnects will make this less of an issue - loop { - match stream.next().await { - Some(new_block) => { - self.send_block(Ok(new_block), &block_sender).await?; - - // TODO: really not sure about this - task::yield_now().await; - } - None => { - warn!("subscription ended"); - break; - } - } + while let Some(new_block) = stream.next().await { + self.send_block(Ok(new_block), &block_sender).await?; } + + warn!(?self, "subscription ended"); } } } diff --git a/web3-proxy/src/frontend/ws_proxy.rs b/web3-proxy/src/frontend/ws_proxy.rs index 01346b41..148d6090 100644 --- a/web3-proxy/src/frontend/ws_proxy.rs +++ b/web3-proxy/src/frontend/ws_proxy.rs @@ -10,9 +10,9 @@ use futures::{ }; use hashbrown::HashMap; use serde_json::value::RawValue; -use std::str::from_utf8_mut; use std::sync::Arc; -use tracing::{error, info, trace, warn}; +use std::{str::from_utf8_mut, sync::atomic::AtomicUsize}; +use tracing::{error, info, trace}; use crate::{ app::Web3ProxyApp, @@ -40,7 +40,8 @@ async fn proxy_web3_socket(app: Arc, socket: WebSocket) { async fn handle_socket_payload( app: Arc, payload: &str, - response_tx: &flume::Sender, + response_sender: &flume::Sender, + subscription_count: &AtomicUsize, subscriptions: &mut HashMap, ) -> Message { let (id, response) = match serde_json::from_str::(payload) { @@ -51,7 +52,7 @@ async fn handle_socket_payload( "eth_subscribe" => { let response = app .clone() - .eth_subscribe(payload, response_tx.clone()) + .eth_subscribe(payload, subscription_count, response_sender.clone()) .await; match response { @@ -111,15 +112,23 @@ async fn handle_socket_payload( async fn read_web3_socket( app: Arc, mut ws_rx: SplitStream, - response_tx: flume::Sender, + response_sender: flume::Sender, ) { let mut subscriptions = HashMap::new(); + let subscription_count = AtomicUsize::new(1); while let Some(Ok(msg)) = ws_rx.next().await { // new message from our client. forward to a backend and then send it through response_tx let response_msg = match msg { Message::Text(payload) => { - handle_socket_payload(app.clone(), &payload, &response_tx, &mut subscriptions).await + handle_socket_payload( + app.clone(), + &payload, + &response_sender, + &subscription_count, + &mut subscriptions, + ) + .await } Message::Ping(x) => Message::Pong(x), Message::Pong(x) => { @@ -133,11 +142,18 @@ async fn read_web3_socket( Message::Binary(mut payload) => { let payload = from_utf8_mut(&mut payload).unwrap(); - handle_socket_payload(app.clone(), payload, &response_tx, &mut subscriptions).await + handle_socket_payload( + app.clone(), + payload, + &response_sender, + &subscription_count, + &mut subscriptions, + ) + .await } }; - match response_tx.send_async(response_msg).await { + match response_sender.send_async(response_msg).await { Ok(_) => {} Err(err) => { error!("{}", err); @@ -151,11 +167,16 @@ async fn write_web3_socket( response_rx: flume::Receiver, mut ws_tx: SplitSink, ) { + // TODO: increment counter for open websockets + while let Ok(msg) = response_rx.recv_async().await { // a response is ready. write it to ws_tx if let Err(err) = ws_tx.send(msg).await { - warn!(?err, "unable to write to websocket"); + // this isn't a problem. this is common and happens whenever a client disconnects + trace!(?err, "unable to write to websocket"); break; }; } + + // TODO: decrement counter for open websockets }