From d822c607d9658c1f84d495c8628ba8c27c170d35 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 16 Aug 2022 04:56:01 +0000 Subject: [PATCH] instrument with spans and allow skipping jsonrpc --- TODO.md | 14 +++++----- web3_proxy/src/connection.rs | 1 + web3_proxy/src/frontend/http_proxy.rs | 19 +++++++++----- web3_proxy/src/frontend/ws_proxy.rs | 37 +++++++++++++++++---------- web3_proxy/src/jsonrpc.rs | 18 ++++++++++--- web3_proxy/src/stats.rs | 2 +- 6 files changed, 61 insertions(+), 30 deletions(-) diff --git a/TODO.md b/TODO.md index dbc59fae..f99181c1 100644 --- a/TODO.md +++ b/TODO.md @@ -76,13 +76,14 @@ - redis-cell was giving me weird errors and it isn't worth debugging it right now. - [x] create user script should allow setting the api key - [x] attach a request id to every web request -- [ ] attach user id (not IP!) to each request -- [-] basic request method stats +- [x] attach user id (not IP!) to each request +- [x] fantom_1 | 2022-08-10T22:19:43.522465Z WARN web3_proxy::jsonrpc: forwarding error err=missing field `jsonrpc` at line 1 column 60 + - [x] i think the server isn't following the spec. we need a context attached to more errors so we know which one + - [x] make jsonrpc default to "2.0" (including the custom deserializer that handles the RawValues) +- [-] basic request method stats (using the user_id and other fields that are in the tracing frame) - [ ] use siwe messages and signatures for sign up and login -- [ ] fantom_1 | 2022-08-10T22:19:43.522465Z WARN web3_proxy::jsonrpc: forwarding error err=missing field `jsonrpc` at line 1 column 60 - - [ ] i think the server isn't following the spec. we need a context attached to this error so we know which one - - [ ] maybe make jsonrpc an Option - [ ] "chain is forked" message is wrong. it includes nodes just being on different heights of the same chain. need a smarter check + - i think there is also a bug because 0xllam4 ran a benchmark and get a bunch of errors back ## V1 @@ -127,7 +128,8 @@ - [ ] Public bsc server got “0” for block data limit (ninicoin) - [ ] If we need an archive server and no servers in sync, exit immediately with an error instead of waiting 60 seconds - [ ] 60 second timeout is too short. Maybe do that for free tier and larger timeout for paid. Problem is that some queries can take over 1000 seconds -- [ ] refactor from_anyhow_error to have consistent error codes and http codes +- [ ] refactor from_anyhow_error to have consistent error codes and http codes. maybe implement the Error trait +- [ ] when handling errors from axum parsing the Json...Enum, the errors don't get wrapped in json. i think we need a Layer new endpoints for users: - think about where to put this. a separate app might be better diff --git a/web3_proxy/src/connection.rs b/web3_proxy/src/connection.rs index 45f6ab05..c225c9a7 100644 --- a/web3_proxy/src/connection.rs +++ b/web3_proxy/src/connection.rs @@ -511,6 +511,7 @@ impl Web3Connection { self: Arc, tx_id_sender: flume::Sender<(TxHash, Arc)>, ) -> anyhow::Result<()> { + // TODO: move this data into a span? info!("watching {}", self); // TODO: is a RwLock of an Option the right thing here? diff --git a/web3_proxy/src/frontend/http_proxy.rs b/web3_proxy/src/frontend/http_proxy.rs index 961c9284..498f0155 100644 --- a/web3_proxy/src/frontend/http_proxy.rs +++ b/web3_proxy/src/frontend/http_proxy.rs @@ -28,6 +28,10 @@ pub async fn public_proxy_web3_rpc( let protocol = Protocol::HTTP; let user_id = 0; + let user_span = error_span!("user", user_id, ?protocol); + + /* + // TODO: move this to a helper function (or two). have it fetch method, protocol, etc. from tracing? match &payload { JsonRpcRequestEnum::Batch(batch) => { // TODO: use inc_by if possible? need to group them by rpc_method @@ -59,14 +63,11 @@ pub async fn public_proxy_web3_rpc( .inc(); } }; - - let user_id = 0; - - let user_span = error_span!("user", user_id); + */ match app.proxy_web3_rpc(payload).instrument(user_span).await { Ok(response) => (StatusCode::OK, Json(&response)).into_response(), - Err(err) => anyhow_error_into_response(None, None, err).into_response(), + Err(err) => anyhow_error_into_response(None, None, err), } } @@ -81,10 +82,14 @@ pub async fn user_proxy_web3_rpc( Err(err_response) => return err_response, _ => unimplemented!(), }, - Err(err) => return anyhow_error_into_response(None, None, err).into_response(), + Err(err) => return anyhow_error_into_response(None, None, err), }; - match app.proxy_web3_rpc(payload).await { + let protocol = Protocol::HTTP; + + let user_span = error_span!("user", user_id, ?protocol); + + match app.proxy_web3_rpc(payload).instrument(user_span).await { Ok(response) => (StatusCode::OK, Json(&response)).into_response(), Err(err) => anyhow_error_into_response(None, None, err), } diff --git a/web3_proxy/src/frontend/ws_proxy.rs b/web3_proxy/src/frontend/ws_proxy.rs index b7a55803..2285b614 100644 --- a/web3_proxy/src/frontend/ws_proxy.rs +++ b/web3_proxy/src/frontend/ws_proxy.rs @@ -15,12 +15,13 @@ use hashbrown::HashMap; use serde_json::{json, value::RawValue}; use std::sync::Arc; use std::{str::from_utf8_mut, sync::atomic::AtomicUsize}; -use tracing::{error, info, trace}; +use tracing::{error, error_span, info, trace, Instrument}; use uuid::Uuid; use crate::{ app::Web3ProxyApp, jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest}, + stats::Protocol, }; use super::{errors::anyhow_error_into_response, rate_limit::RateLimitResult}; @@ -39,9 +40,14 @@ pub async fn public_websocket_handler( Err(err) => return anyhow_error_into_response(None, None, err).into_response(), }; + let user_id = 0; + let protocol = Protocol::Websocket; + + let user_span = error_span!("user", user_id, ?protocol); + match ws_upgrade { Some(ws) => ws - .on_upgrade(|socket| proxy_web3_socket(app, socket, 0)) + .on_upgrade(|socket| proxy_web3_socket(app, socket).instrument(user_span)) .into_response(), None => { // this is not a websocket. redirect to a friendly page @@ -65,10 +71,15 @@ pub async fn user_websocket_handler( Err(err) => return anyhow_error_into_response(None, None, err).into_response(), }; + let protocol = Protocol::Websocket; + + // log the id, not the address. we don't want to expose the user's address + // TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses + let user_span = error_span!("user", user_id, ?protocol); + match ws_upgrade { - Some(ws_upgrade) => { - ws_upgrade.on_upgrade(move |socket| proxy_web3_socket(app, socket, user_id)) - } + Some(ws_upgrade) => ws_upgrade + .on_upgrade(move |socket| proxy_web3_socket(app, socket).instrument(user_span)), None => { // TODO: store this on the app and use register_template? let reg = Handlebars::new(); @@ -88,15 +99,15 @@ pub async fn user_websocket_handler( } } -async fn proxy_web3_socket(app: Arc, socket: WebSocket, user_id: u64) { +async fn proxy_web3_socket(app: Arc, socket: WebSocket) { // split the websocket so we can read and write concurrently let (ws_tx, ws_rx) = socket.split(); // create a channel for our reader and writer can communicate. todo: benchmark different channels let (response_sender, response_receiver) = flume::unbounded::(); - tokio::spawn(write_web3_socket(response_receiver, user_id, ws_tx)); - tokio::spawn(read_web3_socket(app, user_id, ws_rx, response_sender)); + tokio::spawn(write_web3_socket(response_receiver, ws_tx)); + tokio::spawn(read_web3_socket(app, ws_rx, response_sender)); } /// websockets support a few more methods than http clients @@ -106,18 +117,22 @@ async fn handle_socket_payload( response_sender: &flume::Sender, subscription_count: &AtomicUsize, subscriptions: &mut HashMap, - user_id: u64, ) -> Message { // TODO: do any clients send batches over websockets? let (id, response) = match serde_json::from_str::(payload) { Ok(payload) => { + // TODO: should we use this id for the subscription id? it should be unique and means we dont need an atomic let id = payload.id.clone(); let response: anyhow::Result = match &payload.method[..] { "eth_subscribe" => { + // TODO: what should go in this span? + let span = error_span!("eth_subscribe"); + let response = app .clone() .eth_subscribe(payload, subscription_count, response_sender.clone()) + .instrument(span) .await; match response { @@ -175,7 +190,6 @@ async fn handle_socket_payload( async fn read_web3_socket( app: Arc, - user_id: u64, mut ws_rx: SplitStream, response_sender: flume::Sender, ) { @@ -192,7 +206,6 @@ async fn read_web3_socket( &response_sender, &subscription_count, &mut subscriptions, - user_id, ) .await } @@ -215,7 +228,6 @@ async fn read_web3_socket( &response_sender, &subscription_count, &mut subscriptions, - user_id, ) .await } @@ -233,7 +245,6 @@ async fn read_web3_socket( async fn write_web3_socket( response_rx: flume::Receiver, - user_id: u64, mut ws_tx: SplitSink, ) { // TODO: increment counter for open websockets diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index a36e1f7a..2533c003 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -6,10 +6,17 @@ use serde_json::value::RawValue; use std::fmt; use tracing::warn; +// this is used by serde +#[allow(dead_code)] +fn default_jsonrpc() -> String { + "2.0".to_string() +} + #[derive(Clone, serde::Deserialize)] pub struct JsonRpcRequest { // TODO: skip jsonrpc entirely? its against spec to drop it, but some servers bad - pub jsonrpc: Option>, + #[serde(default = "default_jsonrpc")] + pub jsonrpc: String, /// id could be a stricter type, but many rpcs do things against the spec pub id: Box, pub method: String, @@ -112,8 +119,10 @@ impl<'de> Deserialize<'de> for JsonRpcRequestEnum { } } - // TODO: some providers don't follow the spec and dont include the jsonrpc key - let jsonrpc = jsonrpc.ok_or_else(|| de::Error::missing_field("jsonrpc"))?; + // some providers don't follow the spec and dont include the jsonrpc key + // i think "2.0" should be a fine default to handle these incompatible clones + let jsonrpc = jsonrpc.unwrap_or_else(|| "2.0".to_string()); + // TODO: Errors returned by the try operator get shown in an ugly way let id = id.ok_or_else(|| de::Error::missing_field("id"))?; let method = method.ok_or_else(|| de::Error::missing_field("method"))?; @@ -139,6 +148,7 @@ impl<'de> Deserialize<'de> for JsonRpcRequestEnum { } } +// TODO: impl Error on this? /// All jsonrpc errors use this structure #[derive(Serialize, Clone)] pub struct JsonRpcErrorData { @@ -154,6 +164,8 @@ pub struct JsonRpcErrorData { /// A complete response #[derive(Clone, Serialize)] pub struct JsonRpcForwardedResponse { + // TODO: jsonrpc a &str? + #[serde(default = "default_jsonrpc")] pub jsonrpc: String, pub id: Box, #[serde(skip_serializing_if = "Option::is_none")] diff --git a/web3_proxy/src/stats.rs b/web3_proxy/src/stats.rs index ccab59a1..e8de4943 100644 --- a/web3_proxy/src/stats.rs +++ b/web3_proxy/src/stats.rs @@ -21,7 +21,7 @@ pub struct ProxyRequestLabels { pub user_id: u64, } -#[derive(Clone, Hash, PartialEq, Eq, Encode)] +#[derive(Clone, Debug, Hash, PartialEq, Eq, Encode)] pub enum Protocol { HTTP, Websocket,