eth_subscribe rpc_accounting logging

This commit is contained in:
Bryan Stitt 2022-11-19 22:05:51 +00:00
parent eb96ab0a6c
commit 217a7b3fd3
8 changed files with 250 additions and 80 deletions

20
TODO.md

@ -241,13 +241,9 @@ These are roughly in order of completition
- [x] web3_proxy_cli command should read database settings from config
- [x] cli command to change user_tier by key
- [x] cache the status page for a second
- [ ] request accounting for websockets
- [ ] weighted random choice should still prioritize non-archive servers
- maybe shuffle randomly and then sort by (block_limit, random_index)?
- maybe sum available_requests grouped by archive/non-archive. only limit to non-archive if they have enough?
- [x] request accounting for websockets
- [ ] add block timestamp to the /status page
- [ ] probably need to turn more sentry log integrations (like anyhow) on!
- [ ] tests should use `test-env-log = "0.2.8"`
- [ ] be sure to save the timestamp in a way that our request routing logic can make use of it
- [ ] change invite codes to set the user_tier
- [ ] actually block unauthenticated requests instead of emitting warning of "allowing without auth during development!"
@ -256,6 +252,15 @@ These are roughly in order of completition
These are not yet ordered. There might be duplicates. We might not actually need all of these.
- [ ] eth_subscribe rpc_accounting has everything as cache_hits. should we instead count it as one background request?
- [ ] implement filters
- [ ] implement remaining subscriptions
- would be nice if our subscriptions had better gaurentees than geth/erigon do, but maybe simpler to just setup a broadcast channel and proxy all the respones to a backend instead
- [ ] tests should use `test-env-log = "0.2.8"`
- [ ] weighted random choice should still prioritize non-archive servers
- maybe shuffle randomly and then sort by (block_limit, random_index)?
- maybe sum available_requests grouped by archive/non-archive. only limit to non-archive if they have enough?
- [ ] some places we call it "accounting" others a "stat". be consistent
- [ ] cli commands to search users by key
- [-] more trace logging
- [-] add configurable size limits to all the Caches
@ -342,9 +347,6 @@ These are not yet ordered. There might be duplicates. We might not actually need
- eth_sendRawTransaction should accept "INTERNAL_ERROR: existing tx with same hash" as a successful response. we just want to be sure that the server has our tx and in this case, it does.
- [ ] EIP1271 for siwe
- [ ] Limited throughput during high traffic
- [ ] implement filters and other unimplemented rpc methods
- multiple teams need log filters and subscriptions.
- would be nice if our subscriptions had better gaurentees than geth/erigon do, but maybe simpler to just setup a broadcast channel and proxy all the respones to a backend instead
- [ ] instead of Option<...> in our frontend function signatures, use result and then the try operator so that we get our errors wrapped in json
- [ ] revert logs should have a maximum age and a maximum count to keep the database from being huge
- [ ] user login should also return a jwt (jsonwebtoken rust crate should make it easy)

@ -55,6 +55,9 @@ pub static APP_USER_AGENT: &str = concat!(
env!("CARGO_PKG_VERSION"),
);
/// TODO: allow customizing the request period?
pub static REQUEST_PERIOD: u64 = 60;
/// block hash, method, params
// TODO: better name
type ResponseCacheKey = (H256, String, Option<String>);
@ -277,7 +280,7 @@ impl Web3ProxyApp {
Some(db_conn)
} else {
info!("no database");
warn!("no database. some features will be disabled");
None
};
@ -579,11 +582,19 @@ impl Web3ProxyApp {
pub async fn eth_subscribe<'a>(
self: &'a Arc<Self>,
authorization: Arc<Authorization>,
payload: JsonRpcRequest,
request_json: JsonRpcRequest,
subscription_count: &'a AtomicUsize,
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
response_sender: flume::Sender<Message>,
) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> {
// TODO: this is not efficient
let request_bytes = serde_json::to_string(&request_json)
.context("finding request size")?
.len();
let request_metadata =
Arc::new(RequestMetadata::new(REQUEST_PERIOD, request_bytes).unwrap());
let (subscription_abort_handle, subscription_registration) = AbortHandle::new_pair();
// TODO: this only needs to be unique per connection. we don't need it globably unique
@ -591,15 +602,17 @@ impl Web3ProxyApp {
let subscription_id = U64::from(subscription_id);
// save the id so we can use it in the response
let id = payload.id.clone();
let id = request_json.id.clone();
// TODO: calling json! on every request is probably not fast. but we can only match against
// TODO: i think we need a stricter EthSubscribeRequest type that JsonRpcRequest can turn into
match payload.params {
Some(x) if x == json!(["newHeads"]) => {
match request_json.params.as_ref() {
Some(x) if x == &json!(["newHeads"]) => {
let authorization = authorization.clone();
let head_block_receiver = self.head_block_receiver.clone();
let stat_sender = self.stat_sender.clone();
// trace!("new heads subscription. id={:?}", subscription_id);
trace!("newHeads subscription {:?}", subscription_id);
tokio::spawn(async move {
let mut head_block_receiver = Abortable::new(
WatchStream::new(head_block_receiver),
@ -607,8 +620,12 @@ impl Web3ProxyApp {
);
while let Some(new_head) = head_block_receiver.next().await {
// TODO: what should the payload for RequestMetadata be?
let request_metadata =
Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap());
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json!({
let response_json = json!({
"jsonrpc": "2.0",
"method":"eth_subscription",
"params": {
@ -618,22 +635,45 @@ impl Web3ProxyApp {
},
});
// TODO: do clients support binary messages?
let msg = Message::Text(
serde_json::to_string(&msg).expect("this should always be valid json"),
);
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
if response_sender.send_async(msg).await.is_err() {
// we could use response.num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
"eth_subscription(newHeads)".to_string(),
authorization.clone(),
request_metadata.clone(),
response_bytes,
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!(
"stat_sender failed inside newPendingTransactions: {:?}",
err
);
}
}
}
// trace!("closed new heads subscription. id={:?}", subscription_id);
trace!("closed newHeads subscription. id={:?}", subscription_id);
});
}
Some(x) if x == json!(["newPendingTransactions"]) => {
Some(x) if x == &json!(["newPendingTransactions"]) => {
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let stat_sender = self.stat_sender.clone();
let authorization = authorization.clone();
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
@ -641,13 +681,16 @@ impl Web3ProxyApp {
);
trace!(
"pending transactions subscription id: {:?}",
"pending newPendingTransactions subscription id: {:?}",
subscription_id
);
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let request_metadata =
Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap());
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
@ -655,7 +698,7 @@ impl Web3ProxyApp {
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json!({
let response_json = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
@ -664,32 +707,66 @@ impl Web3ProxyApp {
},
});
let msg =
Message::Text(serde_json::to_string(&msg).expect("we made this `msg`"));
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
if response_sender.send_async(msg).await.is_err() {
// we could use response.num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
"eth_subscription(newPendingTransactions)".to_string(),
authorization.clone(),
request_metadata.clone(),
response_bytes,
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!(
"stat_sender failed inside newPendingTransactions: {:?}",
err
);
}
}
}
// trace!(?subscription_id, "closed new heads subscription");
trace!(
"closed newPendingTransactions subscription: {:?}",
subscription_id
);
});
}
Some(x) if x == json!(["newPendingFullTransactions"]) => {
Some(x) if x == &json!(["newPendingFullTransactions"]) => {
// TODO: too much copy/pasta with newPendingTransactions
let authorization = authorization.clone();
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let stat_sender = self.stat_sender.clone();
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
subscription_registration,
);
// // trace!(?subscription_id, "pending transactions subscription");
trace!(
"pending newPendingFullTransactions subscription: {:?}",
subscription_id
);
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let request_metadata =
Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap());
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
@ -697,7 +774,7 @@ impl Web3ProxyApp {
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json!({
let response_json = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
@ -707,22 +784,49 @@ impl Web3ProxyApp {
},
});
let msg = Message::Text(
serde_json::to_string(&msg).expect("we made this message"),
);
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
if response_sender.send_async(msg).await.is_err() {
// we could use response.num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
"eth_subscription(newPendingFullTransactions)".to_string(),
authorization.clone(),
request_metadata.clone(),
response_bytes,
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!(
"stat_sender failed inside newPendingFullTransactions: {:?}",
err
);
}
}
}
// trace!(?subscription_id, "closed new heads subscription");
trace!(
"closed newPendingFullTransactions subscription: {:?}",
subscription_id
);
});
}
Some(x) if x == json!(["newPendingRawTransactions"]) => {
Some(x) if x == &json!(["newPendingRawTransactions"]) => {
// TODO: too much copy/pasta with newPendingTransactions
let authorization = authorization.clone();
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let stat_sender = self.stat_sender.clone();
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
@ -737,6 +841,9 @@ impl Web3ProxyApp {
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let request_metadata =
Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap());
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
@ -744,7 +851,7 @@ impl Web3ProxyApp {
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json!({
let response_json = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
@ -754,17 +861,42 @@ impl Web3ProxyApp {
},
});
let msg = Message::Text(
serde_json::to_string(&msg).expect("this message was just built"),
);
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
if response_sender.send_async(msg).await.is_err() {
// we could use response.num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
"eth_subscription(newPendingRawTransactions)".to_string(),
authorization.clone(),
request_metadata.clone(),
response_bytes,
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!(
"stat_sender failed inside newPendingRawTransactions: {:?}",
err
);
}
}
}
trace!("closed new heads subscription: {:?}", subscription_id);
trace!(
"closed newPendingRawTransactions subscription: {:?}",
subscription_id
);
});
}
_ => return Err(anyhow::anyhow!("unimplemented")),
@ -774,8 +906,21 @@ impl Web3ProxyApp {
let response = JsonRpcForwardedResponse::from_value(json!(subscription_id), id);
// TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct?
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
request_json.method.clone(),
authorization.clone(),
request_metadata,
response.num_bytes(),
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!("stat_sender failed inside websocket: {:?}", err);
}
}
// TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct?
Ok((subscription_abort_handle, response))
}
@ -870,8 +1015,7 @@ impl Web3ProxyApp {
) -> anyhow::Result<JsonRpcForwardedResponse> {
// trace!("Received request: {:?}", request);
// TODO: allow customizing the period?
let request_metadata = Arc::new(RequestMetadata::new(60, &request)?);
let request_metadata = Arc::new(RequestMetadata::new(REQUEST_PERIOD, request.num_bytes())?);
// save the id so we can attach it to the response
// TODO: instead of cloning, take the id out?
@ -1149,7 +1293,7 @@ impl Web3ProxyApp {
method.to_string(),
authorization.clone(),
request_metadata,
&response,
response.num_bytes(),
);
stat_sender
@ -1169,7 +1313,7 @@ impl Web3ProxyApp {
request.method,
authorization.clone(),
request_metadata,
&response,
response.num_bytes(),
);
stat_sender

@ -1,5 +1,4 @@
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::jsonrpc::JsonRpcForwardedResponse;
use axum::headers::Origin;
use chrono::{TimeZone, Utc};
use derive_more::From;
@ -251,18 +250,12 @@ impl ProxyResponseAggregate {
}
impl ProxyResponseStat {
// TODO: should RequestMetadata be in an arc? or can we handle refs here?
pub fn new(
method: String,
authorization: Arc<Authorization>,
metadata: Arc<RequestMetadata>,
response: &JsonRpcForwardedResponse,
response_bytes: usize,
) -> Self {
// TODO: do this without serializing to a string. this is going to slow us down!
let response_bytes = serde_json::to_string(response)
.expect("serializing here should always work")
.len() as u64;
let archive_request = metadata.archive_request.load(Ordering::Acquire);
let backend_requests = metadata.backend_requests.load(Ordering::Acquire);
// let period_seconds = metadata.period_seconds;
@ -274,6 +267,8 @@ impl ProxyResponseStat {
// TODO: timestamps could get confused by leap seconds. need tokio time instead
let response_millis = metadata.start_instant.elapsed().as_millis() as u64;
let response_bytes = response_bytes as u64;
Self {
authorization,
archive_request,

@ -2,7 +2,6 @@
use super::errors::FrontendErrorResponse;
use crate::app::{AuthorizationChecks, Web3ProxyApp, APP_USER_AGENT};
use crate::jsonrpc::JsonRpcRequest;
use crate::user_token::UserBearerToken;
use anyhow::Context;
use axum::headers::authorization::Bearer;
@ -76,12 +75,9 @@ pub struct RequestMetadata {
}
impl RequestMetadata {
pub fn new(period_seconds: u64, request: &JsonRpcRequest) -> anyhow::Result<Self> {
pub fn new(period_seconds: u64, request_bytes: usize) -> anyhow::Result<Self> {
// TODO: how can we do this without turning it into a string first. this is going to slow us down!
let request_bytes = serde_json::to_string(request)
.context("finding request size")?
.len()
.try_into()?;
let request_bytes = request_bytes as u64;
let new = Self {
start_instant: Instant::now(),

@ -2,8 +2,14 @@
//!
//! WebSockets are the preferred method of receiving requests, but not all clients have good support.
use super::authorization::{ip_is_authorized, key_is_authorized, Authorization};
use super::authorization::{ip_is_authorized, key_is_authorized, Authorization, RequestMetadata};
use super::errors::{FrontendErrorResponse, FrontendResult};
use crate::app::REQUEST_PERIOD;
use crate::app_stats::ProxyResponseStat;
use crate::{
app::Web3ProxyApp,
jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest},
};
use axum::headers::{Origin, Referer, UserAgent};
use axum::{
extract::ws::{Message, WebSocket, WebSocketUpgrade},
@ -21,20 +27,14 @@ use futures::{
use handlebars::Handlebars;
use hashbrown::HashMap;
use http::StatusCode;
use log::{error, info, trace};
use log::{error, info, trace, warn};
use serde_json::{json, value::RawValue};
use std::sync::Arc;
use std::{str::from_utf8_mut, sync::atomic::AtomicUsize};
use crate::{
app::Web3ProxyApp,
jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest},
};
/// Public entrypoint for WebSocket JSON-RPC requests.
/// Defaults to rate limiting by IP address, but can also read the Authorization header for a bearer token.
#[debug_handler]
pub async fn websocket_handler(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
@ -178,18 +178,15 @@ async fn handle_socket_payload(
[..]
{
"eth_subscribe" => {
// TODO: what should go in this span?
let response = app
match app
.eth_subscribe(
authorization.clone(),
json_request,
subscription_count,
response_sender.clone(),
)
.await;
match response {
.await
{
Ok((handle, response)) => {
// TODO: better key
subscriptions.insert(
@ -208,10 +205,15 @@ async fn handle_socket_payload(
}
}
"eth_unsubscribe" => {
// TODO: how should handle rate limits and stats on this?
// TODO: handle invalid params
// TODO: move this logic into the app?
let request_bytes = json_request.num_bytes();
let request_metadata =
Arc::new(RequestMetadata::new(REQUEST_PERIOD, request_bytes).unwrap());
let subscription_id = json_request.params.unwrap().to_string();
// TODO: is this the right response?
let partial_response = match subscriptions.remove(&subscription_id) {
None => false,
Some(handle) => {
@ -223,6 +225,20 @@ async fn handle_socket_payload(
let response =
JsonRpcForwardedResponse::from_value(json!(partial_response), id.clone());
if let Some(stat_sender) = app.stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
json_request.method.clone(),
authorization.clone(),
request_metadata,
response.num_bytes(),
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!("stat_sender failed during eth_unsubscribe: {:?}", err);
}
}
Ok(response.into())
}
_ => {
@ -234,6 +250,7 @@ async fn handle_socket_payload(
(id, response)
}
Err(err) => {
// TODO: move this logic somewhere else and just set id to None here
let id = RawValue::from_string("null".to_string()).expect("null can always be a value");
(id, Err(err.into()))
}

@ -438,7 +438,6 @@ pub async fn user_balance_get(
/// TODO: one key per request? maybe /user/balance/:rpc_key?
/// TODO: this will change as we add better support for secondary users.
#[debug_handler]
pub async fn user_balance_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,

@ -182,6 +182,15 @@ impl fmt::Debug for JsonRpcForwardedResponse {
}
}
impl JsonRpcRequest {
pub fn num_bytes(&self) -> usize {
// TODO: not sure how to do this without wasting a ton of allocations
serde_json::to_string(self)
.expect("this should always be valid json")
.len()
}
}
impl JsonRpcForwardedResponse {
pub fn from_anyhow_error(
err: anyhow::Error,
@ -307,6 +316,13 @@ impl JsonRpcForwardedResponse {
Err(e) => Self::from_ethers_error(e, id),
}
}
pub fn num_bytes(&self) -> usize {
// TODO: not sure how to do this without wasting a ton of allocations
serde_json::to_string(self)
.expect("this should always be valid json")
.len()
}
}
/// JSONRPC Responses can include one or many response objects.

@ -266,6 +266,7 @@ impl Web3Connection {
Some(x) => x.num,
};
// this rpc doesn't have that block yet. still syncing
if needed_block_num > &newest_block_num {
return false;
}