From 217a7b3fd3cf0a3de75164d0c2c099e823bf7902 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 19 Nov 2022 22:05:51 +0000 Subject: [PATCH] eth_subscribe rpc_accounting logging --- TODO.md | 20 +- web3_proxy/src/app.rs | 224 +++++++++++++++++++---- web3_proxy/src/app_stats.rs | 11 +- web3_proxy/src/frontend/authorization.rs | 8 +- web3_proxy/src/frontend/rpc_proxy_ws.rs | 49 +++-- web3_proxy/src/frontend/users.rs | 1 - web3_proxy/src/jsonrpc.rs | 16 ++ web3_proxy/src/rpcs/connection.rs | 1 + 8 files changed, 250 insertions(+), 80 deletions(-) diff --git a/TODO.md b/TODO.md index 6dd4e419..604d66c4 100644 --- a/TODO.md +++ b/TODO.md @@ -241,13 +241,9 @@ These are roughly in order of completition - [x] web3_proxy_cli command should read database settings from config - [x] cli command to change user_tier by key - [x] cache the status page for a second -- [ ] request accounting for websockets -- [ ] weighted random choice should still prioritize non-archive servers - - maybe shuffle randomly and then sort by (block_limit, random_index)? - - maybe sum available_requests grouped by archive/non-archive. only limit to non-archive if they have enough? +- [x] request accounting for websockets - [ ] add block timestamp to the /status page -- [ ] probably need to turn more sentry log integrations (like anyhow) on! -- [ ] tests should use `test-env-log = "0.2.8"` + - [ ] be sure to save the timestamp in a way that our request routing logic can make use of it - [ ] change invite codes to set the user_tier - [ ] actually block unauthenticated requests instead of emitting warning of "allowing without auth during development!" @@ -256,6 +252,15 @@ These are roughly in order of completition These are not yet ordered. There might be duplicates. We might not actually need all of these. +- [ ] eth_subscribe rpc_accounting has everything as cache_hits. should we instead count it as one background request? +- [ ] implement filters +- [ ] implement remaining subscriptions + - would be nice if our subscriptions had better gaurentees than geth/erigon do, but maybe simpler to just setup a broadcast channel and proxy all the respones to a backend instead +- [ ] tests should use `test-env-log = "0.2.8"` +- [ ] weighted random choice should still prioritize non-archive servers + - maybe shuffle randomly and then sort by (block_limit, random_index)? + - maybe sum available_requests grouped by archive/non-archive. only limit to non-archive if they have enough? +- [ ] some places we call it "accounting" others a "stat". be consistent - [ ] cli commands to search users by key - [-] more trace logging - [-] add configurable size limits to all the Caches @@ -342,9 +347,6 @@ These are not yet ordered. There might be duplicates. We might not actually need - eth_sendRawTransaction should accept "INTERNAL_ERROR: existing tx with same hash" as a successful response. we just want to be sure that the server has our tx and in this case, it does. - [ ] EIP1271 for siwe - [ ] Limited throughput during high traffic -- [ ] implement filters and other unimplemented rpc methods - - multiple teams need log filters and subscriptions. - - would be nice if our subscriptions had better gaurentees than geth/erigon do, but maybe simpler to just setup a broadcast channel and proxy all the respones to a backend instead - [ ] instead of Option<...> in our frontend function signatures, use result and then the try operator so that we get our errors wrapped in json - [ ] revert logs should have a maximum age and a maximum count to keep the database from being huge - [ ] user login should also return a jwt (jsonwebtoken rust crate should make it easy) diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 3c2c08ee..42a7dc03 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -55,6 +55,9 @@ pub static APP_USER_AGENT: &str = concat!( env!("CARGO_PKG_VERSION"), ); +/// TODO: allow customizing the request period? +pub static REQUEST_PERIOD: u64 = 60; + /// block hash, method, params // TODO: better name type ResponseCacheKey = (H256, String, Option); @@ -277,7 +280,7 @@ impl Web3ProxyApp { Some(db_conn) } else { - info!("no database"); + warn!("no database. some features will be disabled"); None }; @@ -579,11 +582,19 @@ impl Web3ProxyApp { pub async fn eth_subscribe<'a>( self: &'a Arc, authorization: Arc, - payload: JsonRpcRequest, + request_json: JsonRpcRequest, subscription_count: &'a AtomicUsize, // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now response_sender: flume::Sender, ) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> { + // TODO: this is not efficient + let request_bytes = serde_json::to_string(&request_json) + .context("finding request size")? + .len(); + + let request_metadata = + Arc::new(RequestMetadata::new(REQUEST_PERIOD, request_bytes).unwrap()); + let (subscription_abort_handle, subscription_registration) = AbortHandle::new_pair(); // TODO: this only needs to be unique per connection. we don't need it globably unique @@ -591,15 +602,17 @@ impl Web3ProxyApp { let subscription_id = U64::from(subscription_id); // save the id so we can use it in the response - let id = payload.id.clone(); + let id = request_json.id.clone(); // TODO: calling json! on every request is probably not fast. but we can only match against // TODO: i think we need a stricter EthSubscribeRequest type that JsonRpcRequest can turn into - match payload.params { - Some(x) if x == json!(["newHeads"]) => { + match request_json.params.as_ref() { + Some(x) if x == &json!(["newHeads"]) => { + let authorization = authorization.clone(); let head_block_receiver = self.head_block_receiver.clone(); + let stat_sender = self.stat_sender.clone(); - // trace!("new heads subscription. id={:?}", subscription_id); + trace!("newHeads subscription {:?}", subscription_id); tokio::spawn(async move { let mut head_block_receiver = Abortable::new( WatchStream::new(head_block_receiver), @@ -607,8 +620,12 @@ impl Web3ProxyApp { ); while let Some(new_head) = head_block_receiver.next().await { + // TODO: what should the payload for RequestMetadata be? + let request_metadata = + Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap()); + // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id - let msg = json!({ + let response_json = json!({ "jsonrpc": "2.0", "method":"eth_subscription", "params": { @@ -618,22 +635,45 @@ impl Web3ProxyApp { }, }); - // TODO: do clients support binary messages? - let msg = Message::Text( - serde_json::to_string(&msg).expect("this should always be valid json"), - ); + let response_str = serde_json::to_string(&response_json) + .expect("this should always be valid json"); - if response_sender.send_async(msg).await.is_err() { + // we could use response.num_bytes() here, but since we already have the string, this is easier + let response_bytes = response_str.len(); + + // TODO: do clients support binary messages? + let response_msg = Message::Text(response_str); + + if response_sender.send_async(response_msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; + + if let Some(stat_sender) = stat_sender.as_ref() { + let response_stat = ProxyResponseStat::new( + "eth_subscription(newHeads)".to_string(), + authorization.clone(), + request_metadata.clone(), + response_bytes, + ); + + if let Err(err) = stat_sender.send_async(response_stat.into()).await { + // TODO: what should we do? + warn!( + "stat_sender failed inside newPendingTransactions: {:?}", + err + ); + } + } } - // trace!("closed new heads subscription. id={:?}", subscription_id); + trace!("closed newHeads subscription. id={:?}", subscription_id); }); } - Some(x) if x == json!(["newPendingTransactions"]) => { + Some(x) if x == &json!(["newPendingTransactions"]) => { let pending_tx_receiver = self.pending_tx_sender.subscribe(); + let stat_sender = self.stat_sender.clone(); + let authorization = authorization.clone(); let mut pending_tx_receiver = Abortable::new( BroadcastStream::new(pending_tx_receiver), @@ -641,13 +681,16 @@ impl Web3ProxyApp { ); trace!( - "pending transactions subscription id: {:?}", + "pending newPendingTransactions subscription id: {:?}", subscription_id ); // TODO: do something with this handle? tokio::spawn(async move { while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { + let request_metadata = + Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap()); + let new_tx = match new_tx_state { TxStatus::Pending(tx) => tx, TxStatus::Confirmed(..) => continue, @@ -655,7 +698,7 @@ impl Web3ProxyApp { }; // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id - let msg = json!({ + let response_json = json!({ "jsonrpc": "2.0", "method": "eth_subscription", "params": { @@ -664,32 +707,66 @@ impl Web3ProxyApp { }, }); - let msg = - Message::Text(serde_json::to_string(&msg).expect("we made this `msg`")); + let response_str = serde_json::to_string(&response_json) + .expect("this should always be valid json"); - if response_sender.send_async(msg).await.is_err() { + // we could use response.num_bytes() here, but since we already have the string, this is easier + let response_bytes = response_str.len(); + + // TODO: do clients support binary messages? + let response_msg = Message::Text(response_str); + + if response_sender.send_async(response_msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; + + if let Some(stat_sender) = stat_sender.as_ref() { + let response_stat = ProxyResponseStat::new( + "eth_subscription(newPendingTransactions)".to_string(), + authorization.clone(), + request_metadata.clone(), + response_bytes, + ); + + if let Err(err) = stat_sender.send_async(response_stat.into()).await { + // TODO: what should we do? + warn!( + "stat_sender failed inside newPendingTransactions: {:?}", + err + ); + } + } } - // trace!(?subscription_id, "closed new heads subscription"); + trace!( + "closed newPendingTransactions subscription: {:?}", + subscription_id + ); }); } - Some(x) if x == json!(["newPendingFullTransactions"]) => { + Some(x) if x == &json!(["newPendingFullTransactions"]) => { // TODO: too much copy/pasta with newPendingTransactions + let authorization = authorization.clone(); let pending_tx_receiver = self.pending_tx_sender.subscribe(); + let stat_sender = self.stat_sender.clone(); let mut pending_tx_receiver = Abortable::new( BroadcastStream::new(pending_tx_receiver), subscription_registration, ); - // // trace!(?subscription_id, "pending transactions subscription"); + trace!( + "pending newPendingFullTransactions subscription: {:?}", + subscription_id + ); // TODO: do something with this handle? tokio::spawn(async move { while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { + let request_metadata = + Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap()); + let new_tx = match new_tx_state { TxStatus::Pending(tx) => tx, TxStatus::Confirmed(..) => continue, @@ -697,7 +774,7 @@ impl Web3ProxyApp { }; // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id - let msg = json!({ + let response_json = json!({ "jsonrpc": "2.0", "method": "eth_subscription", "params": { @@ -707,22 +784,49 @@ impl Web3ProxyApp { }, }); - let msg = Message::Text( - serde_json::to_string(&msg).expect("we made this message"), - ); + let response_str = serde_json::to_string(&response_json) + .expect("this should always be valid json"); - if response_sender.send_async(msg).await.is_err() { + // we could use response.num_bytes() here, but since we already have the string, this is easier + let response_bytes = response_str.len(); + + // TODO: do clients support binary messages? + let response_msg = Message::Text(response_str); + + if response_sender.send_async(response_msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; + + if let Some(stat_sender) = stat_sender.as_ref() { + let response_stat = ProxyResponseStat::new( + "eth_subscription(newPendingFullTransactions)".to_string(), + authorization.clone(), + request_metadata.clone(), + response_bytes, + ); + + if let Err(err) = stat_sender.send_async(response_stat.into()).await { + // TODO: what should we do? + warn!( + "stat_sender failed inside newPendingFullTransactions: {:?}", + err + ); + } + } } - // trace!(?subscription_id, "closed new heads subscription"); + trace!( + "closed newPendingFullTransactions subscription: {:?}", + subscription_id + ); }); } - Some(x) if x == json!(["newPendingRawTransactions"]) => { + Some(x) if x == &json!(["newPendingRawTransactions"]) => { // TODO: too much copy/pasta with newPendingTransactions + let authorization = authorization.clone(); let pending_tx_receiver = self.pending_tx_sender.subscribe(); + let stat_sender = self.stat_sender.clone(); let mut pending_tx_receiver = Abortable::new( BroadcastStream::new(pending_tx_receiver), @@ -737,6 +841,9 @@ impl Web3ProxyApp { // TODO: do something with this handle? tokio::spawn(async move { while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { + let request_metadata = + Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap()); + let new_tx = match new_tx_state { TxStatus::Pending(tx) => tx, TxStatus::Confirmed(..) => continue, @@ -744,7 +851,7 @@ impl Web3ProxyApp { }; // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id - let msg = json!({ + let response_json = json!({ "jsonrpc": "2.0", "method": "eth_subscription", "params": { @@ -754,17 +861,42 @@ impl Web3ProxyApp { }, }); - let msg = Message::Text( - serde_json::to_string(&msg).expect("this message was just built"), - ); + let response_str = serde_json::to_string(&response_json) + .expect("this should always be valid json"); - if response_sender.send_async(msg).await.is_err() { + // we could use response.num_bytes() here, but since we already have the string, this is easier + let response_bytes = response_str.len(); + + // TODO: do clients support binary messages? + let response_msg = Message::Text(response_str); + + if response_sender.send_async(response_msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; + + if let Some(stat_sender) = stat_sender.as_ref() { + let response_stat = ProxyResponseStat::new( + "eth_subscription(newPendingRawTransactions)".to_string(), + authorization.clone(), + request_metadata.clone(), + response_bytes, + ); + + if let Err(err) = stat_sender.send_async(response_stat.into()).await { + // TODO: what should we do? + warn!( + "stat_sender failed inside newPendingRawTransactions: {:?}", + err + ); + } + } } - trace!("closed new heads subscription: {:?}", subscription_id); + trace!( + "closed newPendingRawTransactions subscription: {:?}", + subscription_id + ); }); } _ => return Err(anyhow::anyhow!("unimplemented")), @@ -774,8 +906,21 @@ impl Web3ProxyApp { let response = JsonRpcForwardedResponse::from_value(json!(subscription_id), id); - // TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct? + if let Some(stat_sender) = self.stat_sender.as_ref() { + let response_stat = ProxyResponseStat::new( + request_json.method.clone(), + authorization.clone(), + request_metadata, + response.num_bytes(), + ); + if let Err(err) = stat_sender.send_async(response_stat.into()).await { + // TODO: what should we do? + warn!("stat_sender failed inside websocket: {:?}", err); + } + } + + // TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct? Ok((subscription_abort_handle, response)) } @@ -870,8 +1015,7 @@ impl Web3ProxyApp { ) -> anyhow::Result { // trace!("Received request: {:?}", request); - // TODO: allow customizing the period? - let request_metadata = Arc::new(RequestMetadata::new(60, &request)?); + let request_metadata = Arc::new(RequestMetadata::new(REQUEST_PERIOD, request.num_bytes())?); // save the id so we can attach it to the response // TODO: instead of cloning, take the id out? @@ -1149,7 +1293,7 @@ impl Web3ProxyApp { method.to_string(), authorization.clone(), request_metadata, - &response, + response.num_bytes(), ); stat_sender @@ -1169,7 +1313,7 @@ impl Web3ProxyApp { request.method, authorization.clone(), request_metadata, - &response, + response.num_bytes(), ); stat_sender diff --git a/web3_proxy/src/app_stats.rs b/web3_proxy/src/app_stats.rs index a22199c8..70551df1 100644 --- a/web3_proxy/src/app_stats.rs +++ b/web3_proxy/src/app_stats.rs @@ -1,5 +1,4 @@ use crate::frontend::authorization::{Authorization, RequestMetadata}; -use crate::jsonrpc::JsonRpcForwardedResponse; use axum::headers::Origin; use chrono::{TimeZone, Utc}; use derive_more::From; @@ -251,18 +250,12 @@ impl ProxyResponseAggregate { } impl ProxyResponseStat { - // TODO: should RequestMetadata be in an arc? or can we handle refs here? pub fn new( method: String, authorization: Arc, metadata: Arc, - response: &JsonRpcForwardedResponse, + response_bytes: usize, ) -> Self { - // TODO: do this without serializing to a string. this is going to slow us down! - let response_bytes = serde_json::to_string(response) - .expect("serializing here should always work") - .len() as u64; - let archive_request = metadata.archive_request.load(Ordering::Acquire); let backend_requests = metadata.backend_requests.load(Ordering::Acquire); // let period_seconds = metadata.period_seconds; @@ -274,6 +267,8 @@ impl ProxyResponseStat { // TODO: timestamps could get confused by leap seconds. need tokio time instead let response_millis = metadata.start_instant.elapsed().as_millis() as u64; + let response_bytes = response_bytes as u64; + Self { authorization, archive_request, diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index eda0cb6f..06fa11ec 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -2,7 +2,6 @@ use super::errors::FrontendErrorResponse; use crate::app::{AuthorizationChecks, Web3ProxyApp, APP_USER_AGENT}; -use crate::jsonrpc::JsonRpcRequest; use crate::user_token::UserBearerToken; use anyhow::Context; use axum::headers::authorization::Bearer; @@ -76,12 +75,9 @@ pub struct RequestMetadata { } impl RequestMetadata { - pub fn new(period_seconds: u64, request: &JsonRpcRequest) -> anyhow::Result { + pub fn new(period_seconds: u64, request_bytes: usize) -> anyhow::Result { // TODO: how can we do this without turning it into a string first. this is going to slow us down! - let request_bytes = serde_json::to_string(request) - .context("finding request size")? - .len() - .try_into()?; + let request_bytes = request_bytes as u64; let new = Self { start_instant: Instant::now(), diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 0f9ef556..46423842 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -2,8 +2,14 @@ //! //! WebSockets are the preferred method of receiving requests, but not all clients have good support. -use super::authorization::{ip_is_authorized, key_is_authorized, Authorization}; +use super::authorization::{ip_is_authorized, key_is_authorized, Authorization, RequestMetadata}; use super::errors::{FrontendErrorResponse, FrontendResult}; +use crate::app::REQUEST_PERIOD; +use crate::app_stats::ProxyResponseStat; +use crate::{ + app::Web3ProxyApp, + jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest}, +}; use axum::headers::{Origin, Referer, UserAgent}; use axum::{ extract::ws::{Message, WebSocket, WebSocketUpgrade}, @@ -21,20 +27,14 @@ use futures::{ use handlebars::Handlebars; use hashbrown::HashMap; use http::StatusCode; -use log::{error, info, trace}; +use log::{error, info, trace, warn}; use serde_json::{json, value::RawValue}; use std::sync::Arc; use std::{str::from_utf8_mut, sync::atomic::AtomicUsize}; -use crate::{ - app::Web3ProxyApp, - jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest}, -}; - /// Public entrypoint for WebSocket JSON-RPC requests. /// Defaults to rate limiting by IP address, but can also read the Authorization header for a bearer token. #[debug_handler] - pub async fn websocket_handler( Extension(app): Extension>, ClientIp(ip): ClientIp, @@ -178,18 +178,15 @@ async fn handle_socket_payload( [..] { "eth_subscribe" => { - // TODO: what should go in this span? - - let response = app + match app .eth_subscribe( authorization.clone(), json_request, subscription_count, response_sender.clone(), ) - .await; - - match response { + .await + { Ok((handle, response)) => { // TODO: better key subscriptions.insert( @@ -208,10 +205,15 @@ async fn handle_socket_payload( } } "eth_unsubscribe" => { - // TODO: how should handle rate limits and stats on this? - // TODO: handle invalid params + // TODO: move this logic into the app? + let request_bytes = json_request.num_bytes(); + + let request_metadata = + Arc::new(RequestMetadata::new(REQUEST_PERIOD, request_bytes).unwrap()); + let subscription_id = json_request.params.unwrap().to_string(); + // TODO: is this the right response? let partial_response = match subscriptions.remove(&subscription_id) { None => false, Some(handle) => { @@ -223,6 +225,20 @@ async fn handle_socket_payload( let response = JsonRpcForwardedResponse::from_value(json!(partial_response), id.clone()); + if let Some(stat_sender) = app.stat_sender.as_ref() { + let response_stat = ProxyResponseStat::new( + json_request.method.clone(), + authorization.clone(), + request_metadata, + response.num_bytes(), + ); + + if let Err(err) = stat_sender.send_async(response_stat.into()).await { + // TODO: what should we do? + warn!("stat_sender failed during eth_unsubscribe: {:?}", err); + } + } + Ok(response.into()) } _ => { @@ -234,6 +250,7 @@ async fn handle_socket_payload( (id, response) } Err(err) => { + // TODO: move this logic somewhere else and just set id to None here let id = RawValue::from_string("null".to_string()).expect("null can always be a value"); (id, Err(err.into())) } diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 9377a916..1099c3b0 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -438,7 +438,6 @@ pub async fn user_balance_get( /// TODO: one key per request? maybe /user/balance/:rpc_key? /// TODO: this will change as we add better support for secondary users. #[debug_handler] - pub async fn user_balance_post( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index 5cd2706a..5034be37 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -182,6 +182,15 @@ impl fmt::Debug for JsonRpcForwardedResponse { } } +impl JsonRpcRequest { + pub fn num_bytes(&self) -> usize { + // TODO: not sure how to do this without wasting a ton of allocations + serde_json::to_string(self) + .expect("this should always be valid json") + .len() + } +} + impl JsonRpcForwardedResponse { pub fn from_anyhow_error( err: anyhow::Error, @@ -307,6 +316,13 @@ impl JsonRpcForwardedResponse { Err(e) => Self::from_ethers_error(e, id), } } + + pub fn num_bytes(&self) -> usize { + // TODO: not sure how to do this without wasting a ton of allocations + serde_json::to_string(self) + .expect("this should always be valid json") + .len() + } } /// JSONRPC Responses can include one or many response objects. diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index b181b705..ae18726a 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -266,6 +266,7 @@ impl Web3Connection { Some(x) => x.num, }; + // this rpc doesn't have that block yet. still syncing if needed_block_num > &newest_block_num { return false; }