instrument with spans and allow skipping jsonrpc

This commit is contained in:
Bryan Stitt 2022-08-16 04:56:01 +00:00
parent 3b23ed1f8e
commit d822c607d9
6 changed files with 61 additions and 30 deletions

14
TODO.md

@ -76,13 +76,14 @@
- redis-cell was giving me weird errors and it isn't worth debugging it right now.
- [x] create user script should allow setting the api key
- [x] attach a request id to every web request
- [ ] attach user id (not IP!) to each request
- [-] basic request method stats
- [x] attach user id (not IP!) to each request
- [x] fantom_1 | 2022-08-10T22:19:43.522465Z WARN web3_proxy::jsonrpc: forwarding error err=missing field `jsonrpc` at line 1 column 60
- [x] i think the server isn't following the spec. we need a context attached to more errors so we know which one
- [x] make jsonrpc default to "2.0" (including the custom deserializer that handles the RawValues)
- [-] basic request method stats (using the user_id and other fields that are in the tracing frame)
- [ ] use siwe messages and signatures for sign up and login
- [ ] fantom_1 | 2022-08-10T22:19:43.522465Z WARN web3_proxy::jsonrpc: forwarding error err=missing field `jsonrpc` at line 1 column 60
- [ ] i think the server isn't following the spec. we need a context attached to this error so we know which one
- [ ] maybe make jsonrpc an Option
- [ ] "chain is forked" message is wrong. it includes nodes just being on different heights of the same chain. need a smarter check
- i think there is also a bug because 0xllam4 ran a benchmark and get a bunch of errors back
## V1
@ -127,7 +128,8 @@
- [ ] Public bsc server got “0” for block data limit (ninicoin)
- [ ] If we need an archive server and no servers in sync, exit immediately with an error instead of waiting 60 seconds
- [ ] 60 second timeout is too short. Maybe do that for free tier and larger timeout for paid. Problem is that some queries can take over 1000 seconds
- [ ] refactor from_anyhow_error to have consistent error codes and http codes
- [ ] refactor from_anyhow_error to have consistent error codes and http codes. maybe implement the Error trait
- [ ] when handling errors from axum parsing the Json...Enum, the errors don't get wrapped in json. i think we need a Layer
new endpoints for users:
- think about where to put this. a separate app might be better

@ -511,6 +511,7 @@ impl Web3Connection {
self: Arc<Self>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
) -> anyhow::Result<()> {
// TODO: move this data into a span?
info!("watching {}", self);
// TODO: is a RwLock of an Option<Arc> the right thing here?

@ -28,6 +28,10 @@ pub async fn public_proxy_web3_rpc(
let protocol = Protocol::HTTP;
let user_id = 0;
let user_span = error_span!("user", user_id, ?protocol);
/*
// TODO: move this to a helper function (or two). have it fetch method, protocol, etc. from tracing?
match &payload {
JsonRpcRequestEnum::Batch(batch) => {
// TODO: use inc_by if possible? need to group them by rpc_method
@ -59,14 +63,11 @@ pub async fn public_proxy_web3_rpc(
.inc();
}
};
let user_id = 0;
let user_span = error_span!("user", user_id);
*/
match app.proxy_web3_rpc(payload).instrument(user_span).await {
Ok(response) => (StatusCode::OK, Json(&response)).into_response(),
Err(err) => anyhow_error_into_response(None, None, err).into_response(),
Err(err) => anyhow_error_into_response(None, None, err),
}
}
@ -81,10 +82,14 @@ pub async fn user_proxy_web3_rpc(
Err(err_response) => return err_response,
_ => unimplemented!(),
},
Err(err) => return anyhow_error_into_response(None, None, err).into_response(),
Err(err) => return anyhow_error_into_response(None, None, err),
};
match app.proxy_web3_rpc(payload).await {
let protocol = Protocol::HTTP;
let user_span = error_span!("user", user_id, ?protocol);
match app.proxy_web3_rpc(payload).instrument(user_span).await {
Ok(response) => (StatusCode::OK, Json(&response)).into_response(),
Err(err) => anyhow_error_into_response(None, None, err),
}

@ -15,12 +15,13 @@ use hashbrown::HashMap;
use serde_json::{json, value::RawValue};
use std::sync::Arc;
use std::{str::from_utf8_mut, sync::atomic::AtomicUsize};
use tracing::{error, info, trace};
use tracing::{error, error_span, info, trace, Instrument};
use uuid::Uuid;
use crate::{
app::Web3ProxyApp,
jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest},
stats::Protocol,
};
use super::{errors::anyhow_error_into_response, rate_limit::RateLimitResult};
@ -39,9 +40,14 @@ pub async fn public_websocket_handler(
Err(err) => return anyhow_error_into_response(None, None, err).into_response(),
};
let user_id = 0;
let protocol = Protocol::Websocket;
let user_span = error_span!("user", user_id, ?protocol);
match ws_upgrade {
Some(ws) => ws
.on_upgrade(|socket| proxy_web3_socket(app, socket, 0))
.on_upgrade(|socket| proxy_web3_socket(app, socket).instrument(user_span))
.into_response(),
None => {
// this is not a websocket. redirect to a friendly page
@ -65,10 +71,15 @@ pub async fn user_websocket_handler(
Err(err) => return anyhow_error_into_response(None, None, err).into_response(),
};
let protocol = Protocol::Websocket;
// log the id, not the address. we don't want to expose the user's address
// TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses
let user_span = error_span!("user", user_id, ?protocol);
match ws_upgrade {
Some(ws_upgrade) => {
ws_upgrade.on_upgrade(move |socket| proxy_web3_socket(app, socket, user_id))
}
Some(ws_upgrade) => ws_upgrade
.on_upgrade(move |socket| proxy_web3_socket(app, socket).instrument(user_span)),
None => {
// TODO: store this on the app and use register_template?
let reg = Handlebars::new();
@ -88,15 +99,15 @@ pub async fn user_websocket_handler(
}
}
async fn proxy_web3_socket(app: Arc<Web3ProxyApp>, socket: WebSocket, user_id: u64) {
async fn proxy_web3_socket(app: Arc<Web3ProxyApp>, socket: WebSocket) {
// split the websocket so we can read and write concurrently
let (ws_tx, ws_rx) = socket.split();
// create a channel for our reader and writer can communicate. todo: benchmark different channels
let (response_sender, response_receiver) = flume::unbounded::<Message>();
tokio::spawn(write_web3_socket(response_receiver, user_id, ws_tx));
tokio::spawn(read_web3_socket(app, user_id, ws_rx, response_sender));
tokio::spawn(write_web3_socket(response_receiver, ws_tx));
tokio::spawn(read_web3_socket(app, ws_rx, response_sender));
}
/// websockets support a few more methods than http clients
@ -106,18 +117,22 @@ async fn handle_socket_payload(
response_sender: &flume::Sender<Message>,
subscription_count: &AtomicUsize,
subscriptions: &mut HashMap<String, AbortHandle>,
user_id: u64,
) -> Message {
// TODO: do any clients send batches over websockets?
let (id, response) = match serde_json::from_str::<JsonRpcRequest>(payload) {
Ok(payload) => {
// TODO: should we use this id for the subscription id? it should be unique and means we dont need an atomic
let id = payload.id.clone();
let response: anyhow::Result<JsonRpcForwardedResponseEnum> = match &payload.method[..] {
"eth_subscribe" => {
// TODO: what should go in this span?
let span = error_span!("eth_subscribe");
let response = app
.clone()
.eth_subscribe(payload, subscription_count, response_sender.clone())
.instrument(span)
.await;
match response {
@ -175,7 +190,6 @@ async fn handle_socket_payload(
async fn read_web3_socket(
app: Arc<Web3ProxyApp>,
user_id: u64,
mut ws_rx: SplitStream<WebSocket>,
response_sender: flume::Sender<Message>,
) {
@ -192,7 +206,6 @@ async fn read_web3_socket(
&response_sender,
&subscription_count,
&mut subscriptions,
user_id,
)
.await
}
@ -215,7 +228,6 @@ async fn read_web3_socket(
&response_sender,
&subscription_count,
&mut subscriptions,
user_id,
)
.await
}
@ -233,7 +245,6 @@ async fn read_web3_socket(
async fn write_web3_socket(
response_rx: flume::Receiver<Message>,
user_id: u64,
mut ws_tx: SplitSink<WebSocket, Message>,
) {
// TODO: increment counter for open websockets

@ -6,10 +6,17 @@ use serde_json::value::RawValue;
use std::fmt;
use tracing::warn;
// this is used by serde
#[allow(dead_code)]
fn default_jsonrpc() -> String {
"2.0".to_string()
}
#[derive(Clone, serde::Deserialize)]
pub struct JsonRpcRequest {
// TODO: skip jsonrpc entirely? its against spec to drop it, but some servers bad
pub jsonrpc: Option<Box<RawValue>>,
#[serde(default = "default_jsonrpc")]
pub jsonrpc: String,
/// id could be a stricter type, but many rpcs do things against the spec
pub id: Box<RawValue>,
pub method: String,
@ -112,8 +119,10 @@ impl<'de> Deserialize<'de> for JsonRpcRequestEnum {
}
}
// TODO: some providers don't follow the spec and dont include the jsonrpc key
let jsonrpc = jsonrpc.ok_or_else(|| de::Error::missing_field("jsonrpc"))?;
// some providers don't follow the spec and dont include the jsonrpc key
// i think "2.0" should be a fine default to handle these incompatible clones
let jsonrpc = jsonrpc.unwrap_or_else(|| "2.0".to_string());
// TODO: Errors returned by the try operator get shown in an ugly way
let id = id.ok_or_else(|| de::Error::missing_field("id"))?;
let method = method.ok_or_else(|| de::Error::missing_field("method"))?;
@ -139,6 +148,7 @@ impl<'de> Deserialize<'de> for JsonRpcRequestEnum {
}
}
// TODO: impl Error on this?
/// All jsonrpc errors use this structure
#[derive(Serialize, Clone)]
pub struct JsonRpcErrorData {
@ -154,6 +164,8 @@ pub struct JsonRpcErrorData {
/// A complete response
#[derive(Clone, Serialize)]
pub struct JsonRpcForwardedResponse {
// TODO: jsonrpc a &str?
#[serde(default = "default_jsonrpc")]
pub jsonrpc: String,
pub id: Box<RawValue>,
#[serde(skip_serializing_if = "Option::is_none")]

@ -21,7 +21,7 @@ pub struct ProxyRequestLabels {
pub user_id: u64,
}
#[derive(Clone, Hash, PartialEq, Eq, Encode)]
#[derive(Clone, Debug, Hash, PartialEq, Eq, Encode)]
pub enum Protocol {
HTTP,
Websocket,