per connection subscription id
This commit is contained in:
parent
6fe58bafb4
commit
df15353a83
4
TODO.md
4
TODO.md
@ -89,10 +89,10 @@
|
||||
- [ ] add the backend server to the header?
|
||||
- [ ] think more about how multiple rpc tiers should work
|
||||
- maybe always try at least two servers in parallel? and then return the first? or only if the first one doesn't respond very quickly? this doubles our request load though.
|
||||
- [ ] one proxy for mulitple chains?
|
||||
- [ ] one proxy for multiple chains?
|
||||
- [ ] zero downtime deploys
|
||||
- [ ] are we using Acquire/Release/AcqRel properly? or do we need other modes?
|
||||
- [ ] subscription id should be per connection, not global
|
||||
- [x] subscription id should be per connection, not global
|
||||
- [ ] use https://github.com/ledgerwatch/interfaces to talk to erigon directly instead of through erigon's rpcdaemon (possible example code which uses ledgerwatch/interfaces: https://github.com/akula-bft/akula/tree/master)
|
||||
- [ ] subscribe to pending transactions and build an intelligent gas estimator
|
||||
- [ ] include private rpcs with regular queries? i don't want to overwhelm them, but they could be good for excess load
|
||||
|
@ -96,7 +96,6 @@ pub struct Web3ProxyApp {
|
||||
pending_tx_sender: broadcast::Sender<TxState>,
|
||||
pending_transactions: Arc<DashMap<TxHash, TxState>>,
|
||||
public_rate_limiter: Option<RedisCellClient>,
|
||||
next_subscription_id: AtomicUsize,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Web3ProxyApp {
|
||||
@ -227,7 +226,6 @@ impl Web3ProxyApp {
|
||||
pending_tx_sender,
|
||||
pending_transactions,
|
||||
public_rate_limiter,
|
||||
next_subscription_id: 1.into(),
|
||||
};
|
||||
|
||||
let app = Arc::new(app);
|
||||
@ -242,21 +240,20 @@ impl Web3ProxyApp {
|
||||
pub async fn eth_subscribe(
|
||||
self: Arc<Self>,
|
||||
payload: JsonRpcRequest,
|
||||
subscription_count: &AtomicUsize,
|
||||
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
|
||||
subscription_tx: flume::Sender<Message>,
|
||||
response_sender: flume::Sender<Message>,
|
||||
) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> {
|
||||
let (subscription_abort_handle, subscription_registration) = AbortHandle::new_pair();
|
||||
|
||||
// TODO: this only needs to be unique per connection. we don't need it globably unique
|
||||
let subscription_id = self
|
||||
.next_subscription_id
|
||||
.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
let subscription_id = subscription_count.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
let subscription_id = format!("{:#x}", subscription_id);
|
||||
|
||||
// save the id so we can use it in the response
|
||||
let id = payload.id.clone();
|
||||
|
||||
// TODO: calling json! on every request is not fast.
|
||||
// TODO: calling json! on every request is probably not fast.
|
||||
match payload.params {
|
||||
Some(x) if x == json!(["newHeads"]) => {
|
||||
let head_block_receiver = self.head_block_receiver.clone();
|
||||
@ -283,7 +280,7 @@ impl Web3ProxyApp {
|
||||
|
||||
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
|
||||
|
||||
if subscription_tx.send_async(msg).await.is_err() {
|
||||
if response_sender.send_async(msg).await.is_err() {
|
||||
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
|
||||
break;
|
||||
};
|
||||
@ -295,13 +292,13 @@ impl Web3ProxyApp {
|
||||
Some(x) if x == json!(["newPendingTransactions"]) => {
|
||||
let pending_tx_receiver = self.pending_tx_sender.subscribe();
|
||||
|
||||
let subscription_id = subscription_id.clone();
|
||||
|
||||
let mut pending_tx_receiver = Abortable::new(
|
||||
BroadcastStream::new(pending_tx_receiver),
|
||||
subscription_registration,
|
||||
);
|
||||
|
||||
let subscription_id = subscription_id.clone();
|
||||
|
||||
trace!(?subscription_id, "pending transactions subscription");
|
||||
tokio::spawn(async move {
|
||||
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
|
||||
@ -323,7 +320,7 @@ impl Web3ProxyApp {
|
||||
|
||||
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
|
||||
|
||||
if subscription_tx.send_async(msg).await.is_err() {
|
||||
if response_sender.send_async(msg).await.is_err() {
|
||||
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
|
||||
break;
|
||||
};
|
||||
@ -336,13 +333,13 @@ impl Web3ProxyApp {
|
||||
// TODO: too much copy/pasta with newPendingTransactions
|
||||
let pending_tx_receiver = self.pending_tx_sender.subscribe();
|
||||
|
||||
let subscription_id = subscription_id.clone();
|
||||
|
||||
let mut pending_tx_receiver = Abortable::new(
|
||||
BroadcastStream::new(pending_tx_receiver),
|
||||
subscription_registration,
|
||||
);
|
||||
|
||||
let subscription_id = subscription_id.clone();
|
||||
|
||||
trace!(?subscription_id, "pending transactions subscription");
|
||||
|
||||
// TODO: do something with this handle?
|
||||
@ -367,7 +364,7 @@ impl Web3ProxyApp {
|
||||
|
||||
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
|
||||
|
||||
if subscription_tx.send_async(msg).await.is_err() {
|
||||
if response_sender.send_async(msg).await.is_err() {
|
||||
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
|
||||
break;
|
||||
};
|
||||
@ -411,7 +408,7 @@ impl Web3ProxyApp {
|
||||
|
||||
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
|
||||
|
||||
if subscription_tx.send_async(msg).await.is_err() {
|
||||
if response_sender.send_async(msg).await.is_err() {
|
||||
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
|
||||
break;
|
||||
};
|
||||
|
@ -13,7 +13,6 @@ use std::sync::atomic::{self, AtomicU32};
|
||||
use std::{cmp::Ordering, sync::Arc};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::task;
|
||||
use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
|
||||
use tracing::{error, info, instrument, trace, warn};
|
||||
|
||||
@ -42,7 +41,7 @@ impl Web3Provider {
|
||||
let provider = ethers::providers::Http::new_with_client(url, http_client);
|
||||
|
||||
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
|
||||
// TODO: i don't think this interval matters, but it should probably come from config
|
||||
// TODO: i don't think this interval matters for our uses, but we should probably set it to like `block time / 2`
|
||||
ethers::providers::Provider::new(provider)
|
||||
.interval(Duration::from_secs(13))
|
||||
.into()
|
||||
@ -50,9 +49,8 @@ impl Web3Provider {
|
||||
// TODO: wrapper automatically reconnect
|
||||
let provider = ethers::providers::Ws::connect(url_str).await?;
|
||||
|
||||
// TODO: make sure this automatically reconnects
|
||||
|
||||
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
|
||||
// TODO: i don't think this interval matters
|
||||
ethers::providers::Provider::new(provider)
|
||||
.interval(Duration::from_secs(1))
|
||||
.into()
|
||||
@ -376,8 +374,8 @@ impl Web3Connection {
|
||||
|
||||
self.send_block(block, &block_sender).await?;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed getting latest block from {}: {:?}", self, e);
|
||||
Err(err) => {
|
||||
warn!(?err, "Rate limited on latest block from {}", self);
|
||||
}
|
||||
}
|
||||
|
||||
@ -407,20 +405,11 @@ impl Web3Connection {
|
||||
|
||||
// TODO: should the stream have a timeout on it here?
|
||||
// TODO: although reconnects will make this less of an issue
|
||||
loop {
|
||||
match stream.next().await {
|
||||
Some(new_block) => {
|
||||
self.send_block(Ok(new_block), &block_sender).await?;
|
||||
|
||||
// TODO: really not sure about this
|
||||
task::yield_now().await;
|
||||
}
|
||||
None => {
|
||||
warn!("subscription ended");
|
||||
break;
|
||||
}
|
||||
}
|
||||
while let Some(new_block) = stream.next().await {
|
||||
self.send_block(Ok(new_block), &block_sender).await?;
|
||||
}
|
||||
|
||||
warn!(?self, "subscription ended");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -10,9 +10,9 @@ use futures::{
|
||||
};
|
||||
use hashbrown::HashMap;
|
||||
use serde_json::value::RawValue;
|
||||
use std::str::from_utf8_mut;
|
||||
use std::sync::Arc;
|
||||
use tracing::{error, info, trace, warn};
|
||||
use std::{str::from_utf8_mut, sync::atomic::AtomicUsize};
|
||||
use tracing::{error, info, trace};
|
||||
|
||||
use crate::{
|
||||
app::Web3ProxyApp,
|
||||
@ -40,7 +40,8 @@ async fn proxy_web3_socket(app: Arc<Web3ProxyApp>, socket: WebSocket) {
|
||||
async fn handle_socket_payload(
|
||||
app: Arc<Web3ProxyApp>,
|
||||
payload: &str,
|
||||
response_tx: &flume::Sender<Message>,
|
||||
response_sender: &flume::Sender<Message>,
|
||||
subscription_count: &AtomicUsize,
|
||||
subscriptions: &mut HashMap<String, AbortHandle>,
|
||||
) -> Message {
|
||||
let (id, response) = match serde_json::from_str::<JsonRpcRequest>(payload) {
|
||||
@ -51,7 +52,7 @@ async fn handle_socket_payload(
|
||||
"eth_subscribe" => {
|
||||
let response = app
|
||||
.clone()
|
||||
.eth_subscribe(payload, response_tx.clone())
|
||||
.eth_subscribe(payload, subscription_count, response_sender.clone())
|
||||
.await;
|
||||
|
||||
match response {
|
||||
@ -111,15 +112,23 @@ async fn handle_socket_payload(
|
||||
async fn read_web3_socket(
|
||||
app: Arc<Web3ProxyApp>,
|
||||
mut ws_rx: SplitStream<WebSocket>,
|
||||
response_tx: flume::Sender<Message>,
|
||||
response_sender: flume::Sender<Message>,
|
||||
) {
|
||||
let mut subscriptions = HashMap::new();
|
||||
let subscription_count = AtomicUsize::new(1);
|
||||
|
||||
while let Some(Ok(msg)) = ws_rx.next().await {
|
||||
// new message from our client. forward to a backend and then send it through response_tx
|
||||
let response_msg = match msg {
|
||||
Message::Text(payload) => {
|
||||
handle_socket_payload(app.clone(), &payload, &response_tx, &mut subscriptions).await
|
||||
handle_socket_payload(
|
||||
app.clone(),
|
||||
&payload,
|
||||
&response_sender,
|
||||
&subscription_count,
|
||||
&mut subscriptions,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Message::Ping(x) => Message::Pong(x),
|
||||
Message::Pong(x) => {
|
||||
@ -133,11 +142,18 @@ async fn read_web3_socket(
|
||||
Message::Binary(mut payload) => {
|
||||
let payload = from_utf8_mut(&mut payload).unwrap();
|
||||
|
||||
handle_socket_payload(app.clone(), payload, &response_tx, &mut subscriptions).await
|
||||
handle_socket_payload(
|
||||
app.clone(),
|
||||
payload,
|
||||
&response_sender,
|
||||
&subscription_count,
|
||||
&mut subscriptions,
|
||||
)
|
||||
.await
|
||||
}
|
||||
};
|
||||
|
||||
match response_tx.send_async(response_msg).await {
|
||||
match response_sender.send_async(response_msg).await {
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
@ -151,11 +167,16 @@ async fn write_web3_socket(
|
||||
response_rx: flume::Receiver<Message>,
|
||||
mut ws_tx: SplitSink<WebSocket, Message>,
|
||||
) {
|
||||
// TODO: increment counter for open websockets
|
||||
|
||||
while let Ok(msg) = response_rx.recv_async().await {
|
||||
// a response is ready. write it to ws_tx
|
||||
if let Err(err) = ws_tx.send(msg).await {
|
||||
warn!(?err, "unable to write to websocket");
|
||||
// this isn't a problem. this is common and happens whenever a client disconnects
|
||||
trace!(?err, "unable to write to websocket");
|
||||
break;
|
||||
};
|
||||
}
|
||||
|
||||
// TODO: decrement counter for open websockets
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user