pass user_id through to more places.

maybe we should pass a label around instead?
This commit is contained in:
Bryan Stitt 2022-08-12 22:12:46 +00:00
parent 52151f8b22
commit 1cf8226f4f
4 changed files with 97 additions and 24 deletions

View File

@ -524,6 +524,7 @@ impl Web3ProxyApp {
pub async fn proxy_web3_rpc( pub async fn proxy_web3_rpc(
&self, &self,
request: JsonRpcRequestEnum, request: JsonRpcRequestEnum,
user_id: u64,
) -> anyhow::Result<JsonRpcForwardedResponseEnum> { ) -> anyhow::Result<JsonRpcForwardedResponseEnum> {
trace!(?request, "proxy_web3_rpc"); trace!(?request, "proxy_web3_rpc");
@ -536,10 +537,10 @@ impl Web3ProxyApp {
let response = match request { let response = match request {
JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single( JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single(
timeout(max_time, self.proxy_web3_rpc_request(request)).await??, timeout(max_time, self.proxy_web3_rpc_request(request, user_id)).await??,
), ),
JsonRpcRequestEnum::Batch(requests) => JsonRpcForwardedResponseEnum::Batch( JsonRpcRequestEnum::Batch(requests) => JsonRpcForwardedResponseEnum::Batch(
timeout(max_time, self.proxy_web3_rpc_requests(requests)).await??, timeout(max_time, self.proxy_web3_rpc_requests(requests, user_id)).await??,
), ),
}; };
@ -552,6 +553,7 @@ impl Web3ProxyApp {
async fn proxy_web3_rpc_requests( async fn proxy_web3_rpc_requests(
&self, &self,
requests: Vec<JsonRpcRequest>, requests: Vec<JsonRpcRequest>,
user_id: u64,
) -> anyhow::Result<Vec<JsonRpcForwardedResponse>> { ) -> anyhow::Result<Vec<JsonRpcForwardedResponse>> {
// TODO: we should probably change ethers-rs to support this directly // TODO: we should probably change ethers-rs to support this directly
// we cut up the request and send to potentually different servers. this could be a problem. // we cut up the request and send to potentually different servers. this could be a problem.
@ -561,7 +563,7 @@ impl Web3ProxyApp {
let responses = join_all( let responses = join_all(
requests requests
.into_iter() .into_iter()
.map(|request| self.proxy_web3_rpc_request(request)) .map(|request| self.proxy_web3_rpc_request(request, user_id))
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
) )
.await; .await;
@ -623,6 +625,7 @@ impl Web3ProxyApp {
async fn proxy_web3_rpc_request( async fn proxy_web3_rpc_request(
&self, &self,
mut request: JsonRpcRequest, mut request: JsonRpcRequest,
user_id: u64,
) -> anyhow::Result<JsonRpcForwardedResponse> { ) -> anyhow::Result<JsonRpcForwardedResponse> {
trace!("Received request: {:?}", request); trace!("Received request: {:?}", request);
@ -699,6 +702,7 @@ impl Web3ProxyApp {
| "shh_post" | "shh_post"
| "shh_uninstallFilter" | "shh_uninstallFilter"
| "shh_version" => { | "shh_version" => {
// TODO: client error stat
// TODO: proper error code // TODO: proper error code
return Err(anyhow::anyhow!("unsupported")); return Err(anyhow::anyhow!("unsupported"));
} }
@ -708,10 +712,18 @@ impl Web3ProxyApp {
| "eth_newBlockFilter" | "eth_newBlockFilter"
| "eth_newFilter" | "eth_newFilter"
| "eth_newPendingTransactionFilter" | "eth_newPendingTransactionFilter"
| "eth_uninstallFilter" => return Err(anyhow::anyhow!("not yet implemented")), | "eth_uninstallFilter" => {
// TODO: unsupported command stat
return Err(anyhow::anyhow!("not yet implemented"));
}
// some commands can use local data or caches // some commands can use local data or caches
"eth_accounts" => serde_json::Value::Array(vec![]), "eth_accounts" => {
// no stats on this. its cheap
serde_json::Value::Array(vec![])
}
"eth_blockNumber" => { "eth_blockNumber" => {
// TODO: emit stats
let head_block_number = self.balanced_rpcs.head_block_num(); let head_block_number = self.balanced_rpcs.head_block_num();
// TODO: technically, block 0 is okay. i guess we should be using an option // TODO: technically, block 0 is okay. i guess we should be using an option
@ -727,19 +739,23 @@ impl Web3ProxyApp {
"eth_coinbase" => { "eth_coinbase" => {
// no need for serving coinbase // no need for serving coinbase
// we could return a per-user payment address here, but then we might leak that to dapps // we could return a per-user payment address here, but then we might leak that to dapps
// no stats on this. its cheap
json!(Address::zero()) json!(Address::zero())
} }
// TODO: eth_estimateGas using anvil? // TODO: eth_estimateGas using anvil?
// TODO: eth_gasPrice that does awesome magic to predict the future // TODO: eth_gasPrice that does awesome magic to predict the future
"eth_hashrate" => { "eth_hashrate" => {
// no stats on this. its cheap
json!(U64::zero()) json!(U64::zero())
} }
"eth_mining" => { "eth_mining" => {
// no stats on this. its cheap
json!(false) json!(false)
} }
// TODO: eth_sendBundle (flashbots command) // TODO: eth_sendBundle (flashbots command)
// broadcast transactions to all private rpcs at once // broadcast transactions to all private rpcs at once
"eth_sendRawTransaction" => { "eth_sendRawTransaction" => {
// emit stats
return self return self
.private_rpcs .private_rpcs
.try_send_all_upstream_servers(request, None) .try_send_all_upstream_servers(request, None)
@ -747,16 +763,25 @@ impl Web3ProxyApp {
.await; .await;
} }
"eth_syncing" => { "eth_syncing" => {
// no stats on this. its cheap
// TODO: return a real response if all backends are syncing or if no servers in sync // TODO: return a real response if all backends are syncing or if no servers in sync
json!(false) json!(false)
} }
"net_listening" => { "net_listening" => {
// no stats on this. its cheap
// TODO: only if there are some backends on balanced_rpcs? // TODO: only if there are some backends on balanced_rpcs?
json!(true) json!(true)
} }
"net_peerCount" => self.balanced_rpcs.num_synced_rpcs().into(), "net_peerCount" => {
"web3_clientVersion" => serde_json::Value::String(APP_USER_AGENT.to_string()), // emit stats
self.balanced_rpcs.num_synced_rpcs().into()
}
"web3_clientVersion" => {
// no stats on this. its cheap
serde_json::Value::String(APP_USER_AGENT.to_string())
}
"web3_sha3" => { "web3_sha3" => {
// emit stats
// returns Keccak-256 (not the standardized SHA3-256) of the given data. // returns Keccak-256 (not the standardized SHA3-256) of the given data.
match &request.params { match &request.params {
Some(serde_json::Value::Array(params)) => { Some(serde_json::Value::Array(params)) => {
@ -778,6 +803,8 @@ impl Web3ProxyApp {
// TODO: web3_sha3? // TODO: web3_sha3?
// anything else gets sent to backend rpcs and cached // anything else gets sent to backend rpcs and cached
method => { method => {
// emit stats
let head_block_number = self.balanced_rpcs.head_block_num(); let head_block_number = self.balanced_rpcs.head_block_num();
// we do this check before checking caches because it might modify the request params // we do this check before checking caches because it might modify the request params
@ -789,6 +816,7 @@ impl Web3ProxyApp {
trace!(?min_block_needed, ?method); trace!(?min_block_needed, ?method);
// TODO: emit a stat on error. maybe with .map_err?
let (cache_key, cache_result) = let (cache_key, cache_result) =
self.cached_response(min_block_needed, &request).await?; self.cached_response(min_block_needed, &request).await?;

View File

@ -7,6 +7,7 @@ use uuid::Uuid;
use super::errors::anyhow_error_into_response; use super::errors::anyhow_error_into_response;
use super::rate_limit::RateLimitResult; use super::rate_limit::RateLimitResult;
use crate::stats::{Protocol, ProxyRequestLabels};
use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum}; use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum};
pub async fn public_proxy_web3_rpc( pub async fn public_proxy_web3_rpc(
@ -23,7 +24,42 @@ pub async fn public_proxy_web3_rpc(
Err(err) => return anyhow_error_into_response(None, None, err).into_response(), Err(err) => return anyhow_error_into_response(None, None, err).into_response(),
}; };
match app.proxy_web3_rpc(payload).await { let user_id = 0;
let protocol = Protocol::HTTP;
match &payload {
JsonRpcRequestEnum::Batch(batch) => {
// TODO: use inc_by if possible?
for single in batch {
let rpc_method = single.method.clone();
let _count = app
.stats
.proxy_requests
.get_or_create(&ProxyRequestLabels {
protocol: protocol.clone(),
rpc_method,
user_id,
})
.inc();
}
}
JsonRpcRequestEnum::Single(single) => {
let rpc_method = single.method.clone();
let _count = app
.stats
.proxy_requests
.get_or_create(&ProxyRequestLabels {
protocol,
rpc_method,
user_id,
})
.inc();
}
};
match app.proxy_web3_rpc(payload, user_id).await {
Ok(response) => (StatusCode::OK, Json(&response)).into_response(), Ok(response) => (StatusCode::OK, Json(&response)).into_response(),
Err(err) => anyhow_error_into_response(None, None, err).into_response(), Err(err) => anyhow_error_into_response(None, None, err).into_response(),
} }
@ -34,7 +70,7 @@ pub async fn user_proxy_web3_rpc(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
Path(user_key): Path<Uuid>, Path(user_key): Path<Uuid>,
) -> Response { ) -> Response {
let _user_id = match app.rate_limit_by_key(user_key).await { let user_id = match app.rate_limit_by_key(user_key).await {
Ok(x) => match x.try_into_response().await { Ok(x) => match x.try_into_response().await {
Ok(RateLimitResult::AllowedUser(x)) => x, Ok(RateLimitResult::AllowedUser(x)) => x,
Err(err_response) => return err_response, Err(err_response) => return err_response,
@ -43,7 +79,7 @@ pub async fn user_proxy_web3_rpc(
Err(err) => return anyhow_error_into_response(None, None, err).into_response(), Err(err) => return anyhow_error_into_response(None, None, err).into_response(),
}; };
match app.proxy_web3_rpc(payload).await { match app.proxy_web3_rpc(payload, user_id).await {
Ok(response) => (StatusCode::OK, Json(&response)).into_response(), Ok(response) => (StatusCode::OK, Json(&response)).into_response(),
Err(err) => anyhow_error_into_response(None, None, err), Err(err) => anyhow_error_into_response(None, None, err),
} }

View File

@ -41,7 +41,7 @@ pub async fn public_websocket_handler(
match ws_upgrade { match ws_upgrade {
Some(ws) => ws Some(ws) => ws
.on_upgrade(|socket| proxy_web3_socket(app, socket)) .on_upgrade(|socket| proxy_web3_socket(app, socket, 0))
.into_response(), .into_response(),
None => { None => {
// this is not a websocket. redirect to a friendly page // this is not a websocket. redirect to a friendly page
@ -66,7 +66,9 @@ pub async fn user_websocket_handler(
}; };
match ws_upgrade { match ws_upgrade {
Some(ws_upgrade) => ws_upgrade.on_upgrade(|socket| proxy_web3_socket(app, socket)), Some(ws_upgrade) => {
ws_upgrade.on_upgrade(move |socket| proxy_web3_socket(app, socket, user_id))
}
None => { None => {
// TODO: store this on the app and use register_template? // TODO: store this on the app and use register_template?
let reg = Handlebars::new(); let reg = Handlebars::new();
@ -86,15 +88,15 @@ pub async fn user_websocket_handler(
} }
} }
async fn proxy_web3_socket(app: Arc<Web3ProxyApp>, socket: WebSocket) { async fn proxy_web3_socket(app: Arc<Web3ProxyApp>, socket: WebSocket, user_id: u64) {
// split the websocket so we can read and write concurrently // split the websocket so we can read and write concurrently
let (ws_tx, ws_rx) = socket.split(); let (ws_tx, ws_rx) = socket.split();
// create a channel for our reader and writer can communicate. todo: benchmark different channels // create a channel for our reader and writer can communicate. todo: benchmark different channels
let (response_tx, response_rx) = flume::unbounded::<Message>(); let (response_sender, response_receiver) = flume::unbounded::<Message>();
tokio::spawn(write_web3_socket(response_rx, ws_tx)); tokio::spawn(write_web3_socket(response_receiver, user_id, ws_tx));
tokio::spawn(read_web3_socket(app, ws_rx, response_tx)); tokio::spawn(read_web3_socket(app, user_id, ws_rx, response_sender));
} }
/// websockets support a few more methods than http clients /// websockets support a few more methods than http clients
@ -104,6 +106,7 @@ async fn handle_socket_payload(
response_sender: &flume::Sender<Message>, response_sender: &flume::Sender<Message>,
subscription_count: &AtomicUsize, subscription_count: &AtomicUsize,
subscriptions: &mut HashMap<String, AbortHandle>, subscriptions: &mut HashMap<String, AbortHandle>,
user_id: u64,
) -> Message { ) -> Message {
// TODO: do any clients send batches over websockets? // TODO: do any clients send batches over websockets?
let (id, response) = match serde_json::from_str::<JsonRpcRequest>(payload) { let (id, response) = match serde_json::from_str::<JsonRpcRequest>(payload) {
@ -129,13 +132,15 @@ async fn handle_socket_payload(
} }
} }
"eth_unsubscribe" => { "eth_unsubscribe" => {
// TODO: how should handle rate limits and stats on this?
let subscription_id = payload.params.unwrap().to_string(); let subscription_id = payload.params.unwrap().to_string();
let partial_response = match subscriptions.remove(&subscription_id) { let partial_response = match subscriptions.remove(&subscription_id) {
None => "false", None => false,
Some(handle) => { Some(handle) => {
handle.abort(); handle.abort();
"true" true
} }
}; };
@ -144,7 +149,7 @@ async fn handle_socket_payload(
Ok(response.into()) Ok(response.into())
} }
_ => app.proxy_web3_rpc(payload.into()).await, _ => app.proxy_web3_rpc(payload.into(), user_id).await,
}; };
(id, response) (id, response)
@ -170,6 +175,7 @@ async fn handle_socket_payload(
async fn read_web3_socket( async fn read_web3_socket(
app: Arc<Web3ProxyApp>, app: Arc<Web3ProxyApp>,
user_id: u64,
mut ws_rx: SplitStream<WebSocket>, mut ws_rx: SplitStream<WebSocket>,
response_sender: flume::Sender<Message>, response_sender: flume::Sender<Message>,
) { ) {
@ -186,6 +192,7 @@ async fn read_web3_socket(
&response_sender, &response_sender,
&subscription_count, &subscription_count,
&mut subscriptions, &mut subscriptions,
user_id,
) )
.await .await
} }
@ -208,6 +215,7 @@ async fn read_web3_socket(
&response_sender, &response_sender,
&subscription_count, &subscription_count,
&mut subscriptions, &mut subscriptions,
user_id,
) )
.await .await
} }
@ -225,6 +233,7 @@ async fn read_web3_socket(
async fn write_web3_socket( async fn write_web3_socket(
response_rx: flume::Receiver<Message>, response_rx: flume::Receiver<Message>,
user_id: u64,
mut ws_tx: SplitSink<WebSocket, Message>, mut ws_tx: SplitSink<WebSocket, Message>,
) { ) {
// TODO: increment counter for open websockets // TODO: increment counter for open websockets

View File

@ -1,7 +1,7 @@
use axum::headers::{ContentType, HeaderName}; use axum::headers::HeaderName;
use axum::http::HeaderValue; use axum::http::HeaderValue;
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use axum::{routing::get, Extension, Router, TypedHeader}; use axum::{routing::get, Extension, Router};
use prometheus_client::encoding::text::encode; use prometheus_client::encoding::text::encode;
use prometheus_client::encoding::text::Encode; use prometheus_client::encoding::text::Encode;
use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::counter::Counter;
@ -13,10 +13,10 @@ use tracing::info;
#[derive(Clone, Hash, PartialEq, Eq, Encode)] #[derive(Clone, Hash, PartialEq, Eq, Encode)]
pub struct ProxyRequestLabels { pub struct ProxyRequestLabels {
protocol: Protocol, pub protocol: Protocol,
rpc_method: String, pub rpc_method: String,
/// anonymous is user 0 /// anonymous is user 0
user_id: u64, pub user_id: u64,
} }
#[derive(Clone, Hash, PartialEq, Eq, Encode)] #[derive(Clone, Hash, PartialEq, Eq, Encode)]