//! Websocket-specific functions for the Web3ProxyApp use super::{Web3ProxyApp, REQUEST_PERIOD}; use crate::app_stats::ProxyResponseStat; use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcRequest; use crate::rpcs::transactions::TxStatus; use anyhow::Context; use axum::extract::ws::Message; use ethers::prelude::U64; use futures::future::AbortHandle; use futures::future::Abortable; use futures::stream::StreamExt; use log::{trace, warn}; use serde_json::json; use std::sync::atomic::{self, AtomicUsize}; use std::sync::Arc; use tokio_stream::wrappers::{BroadcastStream, WatchStream}; impl Web3ProxyApp { // TODO: #[measure([ErrorCount, HitCount, ResponseTime, Throughput])] pub async fn eth_subscribe<'a>( self: &'a Arc, authorization: Arc, request_json: JsonRpcRequest, subscription_count: &'a AtomicUsize, // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now response_sender: flume::Sender, ) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> { // TODO: this is not efficient let request_bytes = serde_json::to_string(&request_json) .context("finding request size")? .len(); let request_metadata = Arc::new(RequestMetadata::new(REQUEST_PERIOD, request_bytes).unwrap()); let (subscription_abort_handle, subscription_registration) = AbortHandle::new_pair(); // TODO: this only needs to be unique per connection. we don't need it globably unique let subscription_id = subscription_count.fetch_add(1, atomic::Ordering::SeqCst); let subscription_id = U64::from(subscription_id); // save the id so we can use it in the response let id = request_json.id.clone(); // TODO: calling json! on every request is probably not fast. but we can only match against // TODO: i think we need a stricter EthSubscribeRequest type that JsonRpcRequest can turn into match request_json.params.as_ref() { Some(x) if x == &json!(["newHeads"]) => { let authorization = authorization.clone(); let head_block_receiver = self.head_block_receiver.clone(); let stat_sender = self.stat_sender.clone(); trace!("newHeads subscription {:?}", subscription_id); tokio::spawn(async move { let mut head_block_receiver = Abortable::new( WatchStream::new(head_block_receiver), subscription_registration, ); while let Some(new_head) = head_block_receiver.next().await { // TODO: what should the payload for RequestMetadata be? let request_metadata = Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap()); // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id let response_json = json!({ "jsonrpc": "2.0", "method":"eth_subscription", "params": { "subscription": subscription_id, // TODO: option to include full transaction objects instead of just the hashes? "result": new_head.as_ref(), }, }); let response_str = serde_json::to_string(&response_json) .expect("this should always be valid json"); // we could use response.num_bytes() here, but since we already have the string, this is easier let response_bytes = response_str.len(); // TODO: do clients support binary messages? let response_msg = Message::Text(response_str); if response_sender.send_async(response_msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; if let Some(stat_sender) = stat_sender.as_ref() { let response_stat = ProxyResponseStat::new( "eth_subscription(newHeads)".to_string(), authorization.clone(), request_metadata.clone(), response_bytes, ); if let Err(err) = stat_sender.send_async(response_stat.into()).await { // TODO: what should we do? warn!( "stat_sender failed inside newPendingTransactions: {:?}", err ); } } } trace!("closed newHeads subscription {:?}", subscription_id); }); } Some(x) if x == &json!(["newPendingTransactions"]) => { let pending_tx_receiver = self.pending_tx_sender.subscribe(); let stat_sender = self.stat_sender.clone(); let authorization = authorization.clone(); let mut pending_tx_receiver = Abortable::new( BroadcastStream::new(pending_tx_receiver), subscription_registration, ); trace!( "pending newPendingTransactions subscription id: {:?}", subscription_id ); // TODO: do something with this handle? tokio::spawn(async move { while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { let request_metadata = Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap()); let new_tx = match new_tx_state { TxStatus::Pending(tx) => tx, TxStatus::Confirmed(..) => continue, TxStatus::Orphaned(tx) => tx, }; // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id let response_json = json!({ "jsonrpc": "2.0", "method": "eth_subscription", "params": { "subscription": subscription_id, "result": new_tx.hash, }, }); let response_str = serde_json::to_string(&response_json) .expect("this should always be valid json"); // we could use response.num_bytes() here, but since we already have the string, this is easier let response_bytes = response_str.len(); // TODO: do clients support binary messages? let response_msg = Message::Text(response_str); if response_sender.send_async(response_msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; if let Some(stat_sender) = stat_sender.as_ref() { let response_stat = ProxyResponseStat::new( "eth_subscription(newPendingTransactions)".to_string(), authorization.clone(), request_metadata.clone(), response_bytes, ); if let Err(err) = stat_sender.send_async(response_stat.into()).await { // TODO: what should we do? warn!( "stat_sender failed inside newPendingTransactions: {:?}", err ); } } } trace!( "closed newPendingTransactions subscription: {:?}", subscription_id ); }); } Some(x) if x == &json!(["newPendingFullTransactions"]) => { // TODO: too much copy/pasta with newPendingTransactions let authorization = authorization.clone(); let pending_tx_receiver = self.pending_tx_sender.subscribe(); let stat_sender = self.stat_sender.clone(); let mut pending_tx_receiver = Abortable::new( BroadcastStream::new(pending_tx_receiver), subscription_registration, ); trace!( "pending newPendingFullTransactions subscription: {:?}", subscription_id ); // TODO: do something with this handle? tokio::spawn(async move { while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { let request_metadata = Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap()); let new_tx = match new_tx_state { TxStatus::Pending(tx) => tx, TxStatus::Confirmed(..) => continue, TxStatus::Orphaned(tx) => tx, }; // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id let response_json = json!({ "jsonrpc": "2.0", "method": "eth_subscription", "params": { "subscription": subscription_id, // upstream just sends the txid, but we want to send the whole transaction "result": new_tx, }, }); let response_str = serde_json::to_string(&response_json) .expect("this should always be valid json"); // we could use response.num_bytes() here, but since we already have the string, this is easier let response_bytes = response_str.len(); // TODO: do clients support binary messages? let response_msg = Message::Text(response_str); if response_sender.send_async(response_msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; if let Some(stat_sender) = stat_sender.as_ref() { let response_stat = ProxyResponseStat::new( "eth_subscription(newPendingFullTransactions)".to_string(), authorization.clone(), request_metadata.clone(), response_bytes, ); if let Err(err) = stat_sender.send_async(response_stat.into()).await { // TODO: what should we do? warn!( "stat_sender failed inside newPendingFullTransactions: {:?}", err ); } } } trace!( "closed newPendingFullTransactions subscription: {:?}", subscription_id ); }); } Some(x) if x == &json!(["newPendingRawTransactions"]) => { // TODO: too much copy/pasta with newPendingTransactions let authorization = authorization.clone(); let pending_tx_receiver = self.pending_tx_sender.subscribe(); let stat_sender = self.stat_sender.clone(); let mut pending_tx_receiver = Abortable::new( BroadcastStream::new(pending_tx_receiver), subscription_registration, ); trace!( "pending transactions subscription id: {:?}", subscription_id ); // TODO: do something with this handle? tokio::spawn(async move { while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { let request_metadata = Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap()); let new_tx = match new_tx_state { TxStatus::Pending(tx) => tx, TxStatus::Confirmed(..) => continue, TxStatus::Orphaned(tx) => tx, }; // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id let response_json = json!({ "jsonrpc": "2.0", "method": "eth_subscription", "params": { "subscription": subscription_id, // upstream just sends the txid, but we want to send the raw transaction "result": new_tx.rlp(), }, }); let response_str = serde_json::to_string(&response_json) .expect("this should always be valid json"); // we could use response.num_bytes() here, but since we already have the string, this is easier let response_bytes = response_str.len(); // TODO: do clients support binary messages? let response_msg = Message::Text(response_str); if response_sender.send_async(response_msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; if let Some(stat_sender) = stat_sender.as_ref() { let response_stat = ProxyResponseStat::new( "eth_subscription(newPendingRawTransactions)".to_string(), authorization.clone(), request_metadata.clone(), response_bytes, ); if let Err(err) = stat_sender.send_async(response_stat.into()).await { // TODO: what should we do? warn!( "stat_sender failed inside newPendingRawTransactions: {:?}", err ); } } } trace!( "closed newPendingRawTransactions subscription: {:?}", subscription_id ); }); } _ => return Err(anyhow::anyhow!("unimplemented")), } // TODO: do something with subscription_join_handle? let response = JsonRpcForwardedResponse::from_value(json!(subscription_id), id); if let Some(stat_sender) = self.stat_sender.as_ref() { let response_stat = ProxyResponseStat::new( request_json.method.clone(), authorization.clone(), request_metadata, response.num_bytes(), ); if let Err(err) = stat_sender.send_async(response_stat.into()).await { // TODO: what should we do? warn!("stat_sender failed inside websocket: {:?}", err); } } // TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct? Ok((subscription_abort_handle, response)) } }