diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index c7bf9cfb..4e5424b7 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -524,6 +524,7 @@ impl Web3ProxyApp { pub async fn proxy_web3_rpc( &self, request: JsonRpcRequestEnum, + user_id: u64, ) -> anyhow::Result { trace!(?request, "proxy_web3_rpc"); @@ -536,10 +537,10 @@ impl Web3ProxyApp { let response = match request { JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single( - timeout(max_time, self.proxy_web3_rpc_request(request)).await??, + timeout(max_time, self.proxy_web3_rpc_request(request, user_id)).await??, ), JsonRpcRequestEnum::Batch(requests) => JsonRpcForwardedResponseEnum::Batch( - timeout(max_time, self.proxy_web3_rpc_requests(requests)).await??, + timeout(max_time, self.proxy_web3_rpc_requests(requests, user_id)).await??, ), }; @@ -552,6 +553,7 @@ impl Web3ProxyApp { async fn proxy_web3_rpc_requests( &self, requests: Vec, + user_id: u64, ) -> anyhow::Result> { // TODO: we should probably change ethers-rs to support this directly // we cut up the request and send to potentually different servers. this could be a problem. @@ -561,7 +563,7 @@ impl Web3ProxyApp { let responses = join_all( requests .into_iter() - .map(|request| self.proxy_web3_rpc_request(request)) + .map(|request| self.proxy_web3_rpc_request(request, user_id)) .collect::>(), ) .await; @@ -623,6 +625,7 @@ impl Web3ProxyApp { async fn proxy_web3_rpc_request( &self, mut request: JsonRpcRequest, + user_id: u64, ) -> anyhow::Result { trace!("Received request: {:?}", request); @@ -699,6 +702,7 @@ impl Web3ProxyApp { | "shh_post" | "shh_uninstallFilter" | "shh_version" => { + // TODO: client error stat // TODO: proper error code return Err(anyhow::anyhow!("unsupported")); } @@ -708,10 +712,18 @@ impl Web3ProxyApp { | "eth_newBlockFilter" | "eth_newFilter" | "eth_newPendingTransactionFilter" - | "eth_uninstallFilter" => return Err(anyhow::anyhow!("not yet implemented")), + | "eth_uninstallFilter" => { + // TODO: unsupported command stat + return Err(anyhow::anyhow!("not yet implemented")); + } // some commands can use local data or caches - "eth_accounts" => serde_json::Value::Array(vec![]), + "eth_accounts" => { + // no stats on this. its cheap + serde_json::Value::Array(vec![]) + } "eth_blockNumber" => { + // TODO: emit stats + let head_block_number = self.balanced_rpcs.head_block_num(); // TODO: technically, block 0 is okay. i guess we should be using an option @@ -727,19 +739,23 @@ impl Web3ProxyApp { "eth_coinbase" => { // no need for serving coinbase // we could return a per-user payment address here, but then we might leak that to dapps + // no stats on this. its cheap json!(Address::zero()) } // TODO: eth_estimateGas using anvil? // TODO: eth_gasPrice that does awesome magic to predict the future "eth_hashrate" => { + // no stats on this. its cheap json!(U64::zero()) } "eth_mining" => { + // no stats on this. its cheap json!(false) } // TODO: eth_sendBundle (flashbots command) // broadcast transactions to all private rpcs at once "eth_sendRawTransaction" => { + // emit stats return self .private_rpcs .try_send_all_upstream_servers(request, None) @@ -747,16 +763,25 @@ impl Web3ProxyApp { .await; } "eth_syncing" => { + // no stats on this. its cheap // TODO: return a real response if all backends are syncing or if no servers in sync json!(false) } "net_listening" => { + // no stats on this. its cheap // TODO: only if there are some backends on balanced_rpcs? json!(true) } - "net_peerCount" => self.balanced_rpcs.num_synced_rpcs().into(), - "web3_clientVersion" => serde_json::Value::String(APP_USER_AGENT.to_string()), + "net_peerCount" => { + // emit stats + self.balanced_rpcs.num_synced_rpcs().into() + } + "web3_clientVersion" => { + // no stats on this. its cheap + serde_json::Value::String(APP_USER_AGENT.to_string()) + } "web3_sha3" => { + // emit stats // returns Keccak-256 (not the standardized SHA3-256) of the given data. match &request.params { Some(serde_json::Value::Array(params)) => { @@ -778,6 +803,8 @@ impl Web3ProxyApp { // TODO: web3_sha3? // anything else gets sent to backend rpcs and cached method => { + // emit stats + let head_block_number = self.balanced_rpcs.head_block_num(); // we do this check before checking caches because it might modify the request params @@ -789,6 +816,7 @@ impl Web3ProxyApp { trace!(?min_block_needed, ?method); + // TODO: emit a stat on error. maybe with .map_err? let (cache_key, cache_result) = self.cached_response(min_block_needed, &request).await?; diff --git a/web3_proxy/src/frontend/http_proxy.rs b/web3_proxy/src/frontend/http_proxy.rs index 3ccc4436..6774db73 100644 --- a/web3_proxy/src/frontend/http_proxy.rs +++ b/web3_proxy/src/frontend/http_proxy.rs @@ -7,6 +7,7 @@ use uuid::Uuid; use super::errors::anyhow_error_into_response; use super::rate_limit::RateLimitResult; +use crate::stats::{Protocol, ProxyRequestLabels}; use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum}; pub async fn public_proxy_web3_rpc( @@ -23,7 +24,42 @@ pub async fn public_proxy_web3_rpc( Err(err) => return anyhow_error_into_response(None, None, err).into_response(), }; - match app.proxy_web3_rpc(payload).await { + let user_id = 0; + let protocol = Protocol::HTTP; + + match &payload { + JsonRpcRequestEnum::Batch(batch) => { + // TODO: use inc_by if possible? + for single in batch { + let rpc_method = single.method.clone(); + + let _count = app + .stats + .proxy_requests + .get_or_create(&ProxyRequestLabels { + protocol: protocol.clone(), + rpc_method, + user_id, + }) + .inc(); + } + } + JsonRpcRequestEnum::Single(single) => { + let rpc_method = single.method.clone(); + + let _count = app + .stats + .proxy_requests + .get_or_create(&ProxyRequestLabels { + protocol, + rpc_method, + user_id, + }) + .inc(); + } + }; + + match app.proxy_web3_rpc(payload, user_id).await { Ok(response) => (StatusCode::OK, Json(&response)).into_response(), Err(err) => anyhow_error_into_response(None, None, err).into_response(), } @@ -34,7 +70,7 @@ pub async fn user_proxy_web3_rpc( Extension(app): Extension>, Path(user_key): Path, ) -> Response { - let _user_id = match app.rate_limit_by_key(user_key).await { + let user_id = match app.rate_limit_by_key(user_key).await { Ok(x) => match x.try_into_response().await { Ok(RateLimitResult::AllowedUser(x)) => x, Err(err_response) => return err_response, @@ -43,7 +79,7 @@ pub async fn user_proxy_web3_rpc( Err(err) => return anyhow_error_into_response(None, None, err).into_response(), }; - match app.proxy_web3_rpc(payload).await { + match app.proxy_web3_rpc(payload, user_id).await { Ok(response) => (StatusCode::OK, Json(&response)).into_response(), Err(err) => anyhow_error_into_response(None, None, err), } diff --git a/web3_proxy/src/frontend/ws_proxy.rs b/web3_proxy/src/frontend/ws_proxy.rs index fb175fad..92f6af13 100644 --- a/web3_proxy/src/frontend/ws_proxy.rs +++ b/web3_proxy/src/frontend/ws_proxy.rs @@ -41,7 +41,7 @@ pub async fn public_websocket_handler( match ws_upgrade { Some(ws) => ws - .on_upgrade(|socket| proxy_web3_socket(app, socket)) + .on_upgrade(|socket| proxy_web3_socket(app, socket, 0)) .into_response(), None => { // this is not a websocket. redirect to a friendly page @@ -66,7 +66,9 @@ pub async fn user_websocket_handler( }; match ws_upgrade { - Some(ws_upgrade) => ws_upgrade.on_upgrade(|socket| proxy_web3_socket(app, socket)), + Some(ws_upgrade) => { + ws_upgrade.on_upgrade(move |socket| proxy_web3_socket(app, socket, user_id)) + } None => { // TODO: store this on the app and use register_template? let reg = Handlebars::new(); @@ -86,15 +88,15 @@ pub async fn user_websocket_handler( } } -async fn proxy_web3_socket(app: Arc, socket: WebSocket) { +async fn proxy_web3_socket(app: Arc, socket: WebSocket, user_id: u64) { // split the websocket so we can read and write concurrently let (ws_tx, ws_rx) = socket.split(); // create a channel for our reader and writer can communicate. todo: benchmark different channels - let (response_tx, response_rx) = flume::unbounded::(); + let (response_sender, response_receiver) = flume::unbounded::(); - tokio::spawn(write_web3_socket(response_rx, ws_tx)); - tokio::spawn(read_web3_socket(app, ws_rx, response_tx)); + tokio::spawn(write_web3_socket(response_receiver, user_id, ws_tx)); + tokio::spawn(read_web3_socket(app, user_id, ws_rx, response_sender)); } /// websockets support a few more methods than http clients @@ -104,6 +106,7 @@ async fn handle_socket_payload( response_sender: &flume::Sender, subscription_count: &AtomicUsize, subscriptions: &mut HashMap, + user_id: u64, ) -> Message { // TODO: do any clients send batches over websockets? let (id, response) = match serde_json::from_str::(payload) { @@ -129,13 +132,15 @@ async fn handle_socket_payload( } } "eth_unsubscribe" => { + // TODO: how should handle rate limits and stats on this? + let subscription_id = payload.params.unwrap().to_string(); let partial_response = match subscriptions.remove(&subscription_id) { - None => "false", + None => false, Some(handle) => { handle.abort(); - "true" + true } }; @@ -144,7 +149,7 @@ async fn handle_socket_payload( Ok(response.into()) } - _ => app.proxy_web3_rpc(payload.into()).await, + _ => app.proxy_web3_rpc(payload.into(), user_id).await, }; (id, response) @@ -170,6 +175,7 @@ async fn handle_socket_payload( async fn read_web3_socket( app: Arc, + user_id: u64, mut ws_rx: SplitStream, response_sender: flume::Sender, ) { @@ -186,6 +192,7 @@ async fn read_web3_socket( &response_sender, &subscription_count, &mut subscriptions, + user_id, ) .await } @@ -208,6 +215,7 @@ async fn read_web3_socket( &response_sender, &subscription_count, &mut subscriptions, + user_id, ) .await } @@ -225,6 +233,7 @@ async fn read_web3_socket( async fn write_web3_socket( response_rx: flume::Receiver, + user_id: u64, mut ws_tx: SplitSink, ) { // TODO: increment counter for open websockets diff --git a/web3_proxy/src/stats.rs b/web3_proxy/src/stats.rs index 23a24b8f..8ea94f54 100644 --- a/web3_proxy/src/stats.rs +++ b/web3_proxy/src/stats.rs @@ -1,7 +1,7 @@ -use axum::headers::{ContentType, HeaderName}; +use axum::headers::HeaderName; use axum::http::HeaderValue; use axum::response::{IntoResponse, Response}; -use axum::{routing::get, Extension, Router, TypedHeader}; +use axum::{routing::get, Extension, Router}; use prometheus_client::encoding::text::encode; use prometheus_client::encoding::text::Encode; use prometheus_client::metrics::counter::Counter; @@ -13,10 +13,10 @@ use tracing::info; #[derive(Clone, Hash, PartialEq, Eq, Encode)] pub struct ProxyRequestLabels { - protocol: Protocol, - rpc_method: String, + pub protocol: Protocol, + pub rpc_method: String, /// anonymous is user 0 - user_id: u64, + pub user_id: u64, } #[derive(Clone, Hash, PartialEq, Eq, Encode)]