From 43846a7051e811988c85d15ff3777c42a4cfd424 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 22 Sep 2022 20:27:14 +0000 Subject: [PATCH] pass authorized_request through a bunch of places --- web3_proxy/src/app.rs | 42 ++++++++++++++++++----- web3_proxy/src/block_number.rs | 2 +- web3_proxy/src/frontend/errors.rs | 4 +-- web3_proxy/src/frontend/mod.rs | 2 +- web3_proxy/src/frontend/rpc_proxy_http.rs | 20 +++++++++-- web3_proxy/src/frontend/rpc_proxy_ws.rs | 26 ++++++++++---- web3_proxy/src/rpcs/blockchain.rs | 15 +++++--- web3_proxy/src/rpcs/connection.rs | 21 +++++++----- web3_proxy/src/rpcs/connections.rs | 18 +++++++--- web3_proxy/src/rpcs/transactions.rs | 3 +- 10 files changed, 114 insertions(+), 39 deletions(-) diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 671cda31..b067ea66 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -2,6 +2,7 @@ use crate::block_number::block_needed; use crate::config::{AppConfig, TopConfig}; +use crate::frontend::authorization::AuthorizedRequest; use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcRequest; @@ -12,6 +13,7 @@ use crate::rpcs::request::OpenRequestHandleMetrics; use crate::rpcs::transactions::TxStatus; use anyhow::Context; use axum::extract::ws::Message; +use axum::headers::{Referer, UserAgent}; use deferred_rate_limiter::DeferredRateLimiter; use derive_more::From; use ethers::core::utils::keccak256; @@ -60,12 +62,18 @@ type ResponseCache = pub type AnyhowJoinHandle = JoinHandle>; -#[derive(Clone, Copy, Debug, From, Serialize)] +#[derive(Clone, Debug, From)] /// TODO: rename this? -pub struct UserData { +pub struct UserKeyData { pub user_key_id: u64, /// if None, allow unlimited queries pub user_count_per_period: Option, + /// if None, allow any Referer + pub allowed_referer: Option, + /// if None, allow any UserAgent + pub allowed_user_agent: Option, + /// if None, allow any IpAddr + pub allowed_ip: Option, } /// The application @@ -91,7 +99,7 @@ pub struct Web3ProxyApp { pub frontend_ip_rate_limiter: Option>, pub frontend_key_rate_limiter: Option>, pub redis_pool: Option, - pub user_cache: Cache, + pub user_cache: Cache, } /// flatten a JoinError into an anyhow error @@ -402,6 +410,7 @@ impl Web3ProxyApp { #[measure([ErrorCount, HitCount, ResponseTime, Throughput])] pub async fn eth_subscribe<'a>( self: &'a Arc, + authorized_request: Arc, payload: JsonRpcRequest, subscription_count: &'a AtomicUsize, // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now @@ -595,6 +604,7 @@ impl Web3ProxyApp { /// send the request or batch of requests to the approriate RPCs pub async fn proxy_web3_rpc( self: &Arc, + authorized_request: &Arc, request: JsonRpcRequestEnum, ) -> anyhow::Result { // TODO: this should probably be trace level @@ -607,10 +617,18 @@ impl Web3ProxyApp { let response = match request { JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single( - timeout(max_time, self.proxy_web3_rpc_request(request)).await??, + timeout( + max_time, + self.proxy_web3_rpc_request(authorized_request, request), + ) + .await??, ), JsonRpcRequestEnum::Batch(requests) => JsonRpcForwardedResponseEnum::Batch( - timeout(max_time, self.proxy_web3_rpc_requests(requests)).await??, + timeout( + max_time, + self.proxy_web3_rpc_requests(authorized_request, requests), + ) + .await??, ), }; @@ -624,6 +642,7 @@ impl Web3ProxyApp { /// TODO: make sure this isn't a problem async fn proxy_web3_rpc_requests( self: &Arc, + authorized_request: &Arc, requests: Vec, ) -> anyhow::Result> { // TODO: we should probably change ethers-rs to support this directly @@ -631,7 +650,7 @@ impl Web3ProxyApp { let responses = join_all( requests .into_iter() - .map(|request| self.proxy_web3_rpc_request(request)) + .map(|request| self.proxy_web3_rpc_request(authorized_request, request)) .collect::>(), ) .await; @@ -659,6 +678,7 @@ impl Web3ProxyApp { #[measure([ErrorCount, HitCount, ResponseTime, Throughput])] async fn proxy_web3_rpc_request( self: &Arc, + authorized_request: &Arc, mut request: JsonRpcRequest, ) -> anyhow::Result { trace!("Received request: {:?}", request); @@ -791,7 +811,9 @@ impl Web3ProxyApp { // emit stats let rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs); - return rpcs.try_send_all_upstream_servers(request, None).await; + return rpcs + .try_send_all_upstream_servers(Some(authorized_request), request, None) + .await; } "eth_syncing" => { // no stats on this. its cheap @@ -890,7 +912,11 @@ impl Web3ProxyApp { // TODO: put the hash here instead? let mut response = self .balanced_rpcs - .try_send_best_upstream_server(request, Some(&request_block_id.num)) + .try_send_best_upstream_server( + Some(authorized_request), + request, + Some(&request_block_id.num), + ) .await?; // discard their id by replacing it with an empty diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index 1ee2e9e5..fea48a9f 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -63,7 +63,7 @@ pub async fn clean_block_number( let block_hash: H256 = serde_json::from_value(block_hash).context("decoding blockHash")?; - let block = rpcs.block(&block_hash, None).await?; + let block = rpcs.block(None, &block_hash, None).await?; let block_num = block .number diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 5dbf2aaf..b372ab64 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -1,4 +1,4 @@ -use crate::{app::UserData, jsonrpc::JsonRpcForwardedResponse}; +use crate::{app::UserKeyData, jsonrpc::JsonRpcForwardedResponse}; use axum::{ http::StatusCode, response::{IntoResponse, Response}, @@ -21,7 +21,7 @@ pub enum FrontendErrorResponse { Redis(RedisError), Response(Response), Database(DbErr), - RateLimitedUser(UserData, Option), + RateLimitedUser(UserKeyData, Option), RateLimitedIp(IpAddr, Option), UnknownKey, NotFound, diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 48aea6df..ddc98890 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -1,4 +1,4 @@ -mod authorization; +pub mod authorization; mod errors; mod http; mod rpc_proxy_http; diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index e3980264..70370b6a 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -19,11 +19,19 @@ pub async fn public_proxy_web3_rpc( ) -> FrontendResult { let request_span = error_span!("request", %ip, ?referer, ?user_agent); - let ip = ip_is_authorized(&app, ip) + let authorized_request = ip_is_authorized(&app, ip) .instrument(request_span.clone()) .await?; - let f = tokio::spawn(async move { app.proxy_web3_rpc(payload).instrument(request_span).await }); + let request_span = error_span!("request", ?authorized_request); + + let authorized_request = Arc::new(authorized_request); + + let f = tokio::spawn(async move { + app.proxy_web3_rpc(&authorized_request, payload) + .instrument(request_span) + .await + }); let response = f.await.unwrap()?; @@ -53,7 +61,13 @@ pub async fn user_proxy_web3_rpc( let request_span = error_span!("request", ?authorized_request); - let f = tokio::spawn(async move { app.proxy_web3_rpc(payload).instrument(request_span).await }); + let authorized_request = Arc::new(authorized_request); + + let f = tokio::spawn(async move { + app.proxy_web3_rpc(&authorized_request, payload) + .instrument(request_span) + .await + }); let response = f.await.unwrap()?; diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 7d869936..4a65beae 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -37,6 +37,8 @@ pub async fn public_websocket_handler( let request_span = error_span!("request", ?authorized_request); + let authorized_request = Arc::new(authorized_request); + match ws_upgrade { Some(ws) => Ok(ws .on_upgrade(|socket| { @@ -68,10 +70,11 @@ pub async fn user_websocket_handler( ) .await?; - // log the id, not the address. we don't want to expose the user's address - // TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses + // TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses or other user info let request_span = error_span!("request", ?authorized_request); + let authorized_request = Arc::new(authorized_request); + match ws_upgrade { Some(ws_upgrade) => Ok(ws_upgrade.on_upgrade(move |socket| { proxy_web3_socket(app, authorized_request, socket).instrument(request_span) @@ -97,7 +100,7 @@ pub async fn user_websocket_handler( async fn proxy_web3_socket( app: Arc, - authorized_request: AuthorizedRequest, + authorized_request: Arc, socket: WebSocket, ) { // split the websocket so we can read and write concurrently @@ -118,6 +121,7 @@ async fn proxy_web3_socket( /// websockets support a few more methods than http clients async fn handle_socket_payload( app: Arc, + authorized_request: Arc, payload: &str, response_sender: &flume::Sender, subscription_count: &AtomicUsize, @@ -135,7 +139,12 @@ async fn handle_socket_payload( let span = error_span!("eth_subscribe"); let response = app - .eth_subscribe(payload, subscription_count, response_sender.clone()) + .eth_subscribe( + authorized_request.clone(), + payload, + subscription_count, + response_sender.clone(), + ) .instrument(span) .await; @@ -168,7 +177,10 @@ async fn handle_socket_payload( Ok(response.into()) } - _ => app.proxy_web3_rpc(payload.into()).await, + _ => { + app.proxy_web3_rpc(&authorized_request, payload.into()) + .await + } }; (id, response) @@ -194,7 +206,7 @@ async fn handle_socket_payload( async fn read_web3_socket( app: Arc, - authorized_request: AuthorizedRequest, + authorized_request: Arc, mut ws_rx: SplitStream, response_sender: flume::Sender, ) { @@ -207,6 +219,7 @@ async fn read_web3_socket( Message::Text(payload) => { handle_socket_payload( app.clone(), + authorized_request.clone(), &payload, &response_sender, &subscription_count, @@ -229,6 +242,7 @@ async fn read_web3_socket( handle_socket_payload( app.clone(), + authorized_request.clone(), payload, &response_sender, &subscription_count, diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index a7a59dfc..6ed4b562 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -2,6 +2,7 @@ use super::connection::Web3Connection; use super::connections::Web3Connections; use super::transactions::TxStatus; +use crate::frontend::authorization::AuthorizedRequest; use crate::{ config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections, }; @@ -90,6 +91,7 @@ impl Web3Connections { /// Will query a specific node or the best available. pub async fn block( &self, + authorized_request: Option<&Arc>, hash: &H256, rpc: Option<&Arc>, ) -> anyhow::Result { @@ -104,7 +106,7 @@ impl Web3Connections { // TODO: if error, retry? let block: Block = match rpc { Some(rpc) => { - rpc.wait_for_request_handle(Duration::from_secs(30)) + rpc.wait_for_request_handle(authorized_request, Duration::from_secs(30)) .await? .request("eth_getBlockByHash", &get_block_params, Level::ERROR.into()) .await? @@ -115,7 +117,9 @@ impl Web3Connections { let request = json!({ "id": "1", "method": "eth_getBlockByHash", "params": get_block_params }); let request: JsonRpcRequest = serde_json::from_value(request)?; - let response = self.try_send_best_upstream_server(request, None).await?; + let response = self + .try_send_best_upstream_server(authorized_request, request, None) + .await?; let block = response.result.unwrap(); @@ -163,7 +167,8 @@ impl Web3Connections { // deref to not keep the lock open if let Some(block_hash) = self.block_numbers.get(num) { // TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set - return self.block(&block_hash, None).await; + // TODO: pass authorized_request through here? + return self.block(None, &block_hash, None).await; } // block number not in cache. we need to ask an rpc for it @@ -173,7 +178,7 @@ impl Web3Connections { // TODO: if error, retry? let response = self - .try_send_best_upstream_server(request, Some(num)) + .try_send_best_upstream_server(None, request, Some(num)) .await?; let raw_block = response.result.context("no block result")?; @@ -290,7 +295,7 @@ impl Web3Connections { // this option should always be populated let conn_rpc = self.conns.get(conn_name); - match self.block(connection_head_hash, conn_rpc).await { + match self.block(None, connection_head_hash, conn_rpc).await { Ok(block) => block, Err(err) => { warn!(%connection_head_hash, %conn_name, %rpc, ?err, "Failed fetching connection_head_block for block_hashes"); diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index a2507388..7d4b0b96 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -4,6 +4,7 @@ use super::provider::Web3Provider; use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult}; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::BlockAndRpc; +use crate::frontend::authorization::AuthorizedRequest; use anyhow::Context; use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64}; use futures::future::try_join_all; @@ -114,7 +115,7 @@ impl Web3Connection { // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error // TODO: what should the timeout be? let found_chain_id: Result = new_connection - .wait_for_request_handle(Duration::from_secs(30)) + .wait_for_request_handle(None, Duration::from_secs(30)) .await? .request("eth_chainId", &Option::None::<()>, Level::ERROR.into()) .await; @@ -200,7 +201,7 @@ impl Web3Connection { // TODO: wait for the handle BEFORE we check the current block number. it might be delayed too! // TODO: what should the request be? let archive_result: Result = self - .wait_for_request_handle(Duration::from_secs(30)) + .wait_for_request_handle(None, Duration::from_secs(30)) .await? .request( "eth_getCode", @@ -530,7 +531,7 @@ impl Web3Connection { loop { // TODO: what should the max_wait be? - match self.wait_for_request_handle(Duration::from_secs(30)).await { + match self.wait_for_request_handle(None, Duration::from_secs(30)).await { Ok(active_request_handle) => { let block: Result, _> = active_request_handle .request( @@ -598,7 +599,7 @@ impl Web3Connection { Web3Provider::Ws(provider) => { // todo: move subscribe_blocks onto the request handle? let active_request_handle = - self.wait_for_request_handle(Duration::from_secs(30)).await; + self.wait_for_request_handle(None, Duration::from_secs(30)).await; let mut stream = provider.subscribe_blocks().await?; drop(active_request_handle); @@ -606,7 +607,7 @@ impl Web3Connection { // there is a very small race condition here where the stream could send us a new block right now // all it does is print "new block" for the same block as current block let block: Result, _> = self - .wait_for_request_handle(Duration::from_secs(30)) + .wait_for_request_handle(None, Duration::from_secs(30)) .await? .request( "eth_getBlockByNumber", @@ -697,7 +698,7 @@ impl Web3Connection { Web3Provider::Ws(provider) => { // TODO: maybe the subscribe_pending_txs function should be on the active_request_handle let active_request_handle = - self.wait_for_request_handle(Duration::from_secs(30)).await; + self.wait_for_request_handle(None, Duration::from_secs(30)).await; let mut stream = provider.subscribe_pending_txs().await?; @@ -727,12 +728,13 @@ impl Web3Connection { #[instrument] pub async fn wait_for_request_handle( self: &Arc, + authorized_request: Option<&Arc>, max_wait: Duration, ) -> anyhow::Result { let max_wait = Instant::now() + max_wait; loop { - let x = self.try_request_handle().await; + let x = self.try_request_handle(authorized_request.clone()).await; trace!(?x, "try_request_handle"); @@ -760,7 +762,10 @@ impl Web3Connection { } #[instrument] - pub async fn try_request_handle(self: &Arc) -> anyhow::Result { + pub async fn try_request_handle( + self: &Arc, + authorized_user: Option<&Arc>, + ) -> anyhow::Result { // check that we are connected if !self.has_provider().await { // TODO: emit a stat? diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index eccbbebc..e987ecd9 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -7,6 +7,7 @@ use super::request::{ use super::synced_connections::SyncedConnections; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig}; +use crate::frontend::authorization::AuthorizedRequest; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::rpcs::transactions::TxStatus; use arc_swap::ArcSwap; @@ -304,6 +305,7 @@ impl Web3Connections { /// Send the same request to all the handles. Returning the most common success or most common error. pub async fn try_send_parallel_requests( &self, + authorized_request: Option<&Arc>, active_request_handles: Vec, method: &str, // TODO: remove this box once i figure out how to do the options @@ -362,6 +364,7 @@ impl Web3Connections { /// get the best available rpc server pub async fn next_upstream_server( &self, + authorized_request: Option<&Arc>, skip: &[Arc], min_block_needed: Option<&U64>, ) -> anyhow::Result { @@ -420,7 +423,7 @@ impl Web3Connections { // now that the rpcs are sorted, try to get an active request handle for one of them for rpc in synced_rpcs.into_iter() { // increment our connection counter - match rpc.try_request_handle().await { + match rpc.try_request_handle(authorized_request.clone()).await { Ok(OpenRequestResult::Handle(handle)) => { trace!("next server on {:?}: {:?}", self, rpc); return Ok(OpenRequestResult::Handle(handle)); @@ -451,6 +454,7 @@ impl Web3Connections { // TODO: better type on this that can return an anyhow::Result pub async fn upstream_servers( &self, + authorized_request: Option<&Arc>, block_needed: Option<&U64>, ) -> Result, Option> { let mut earliest_retry_at = None; @@ -465,7 +469,7 @@ impl Web3Connections { } // check rate limits and increment our connection counter - match connection.try_request_handle().await { + match connection.try_request_handle(authorized_request).await { Ok(OpenRequestResult::RetryAt(retry_at)) => { // this rpc is not available. skip it earliest_retry_at = earliest_retry_at.min(Some(retry_at)); @@ -491,6 +495,7 @@ impl Web3Connections { /// be sure there is a timeout on this or it might loop forever pub async fn try_send_best_upstream_server( &self, + authorized_request: Option<&Arc>, request: JsonRpcRequest, min_block_needed: Option<&U64>, ) -> anyhow::Result { @@ -502,7 +507,7 @@ impl Web3Connections { break; } match self - .next_upstream_server(&skip_rpcs, min_block_needed) + .next_upstream_server(authorized_request, &skip_rpcs, min_block_needed) .await? { OpenRequestResult::Handle(active_request_handle) => { @@ -592,17 +597,22 @@ impl Web3Connections { #[instrument] pub async fn try_send_all_upstream_servers( &self, + authorized_request: Option<&Arc>, request: JsonRpcRequest, block_needed: Option<&U64>, ) -> anyhow::Result { loop { - match self.upstream_servers(block_needed).await { + match self + .upstream_servers(authorized_request.clone(), block_needed) + .await + { Ok(active_request_handles) => { // TODO: benchmark this compared to waiting on unbounded futures // TODO: do something with this handle? // TODO: this is not working right. simplify let quorum_response = self .try_send_parallel_requests( + authorized_request, active_request_handles, request.method.as_ref(), request.params.as_ref(), diff --git a/web3_proxy/src/rpcs/transactions.rs b/web3_proxy/src/rpcs/transactions.rs index 32a22108..a458b1b3 100644 --- a/web3_proxy/src/rpcs/transactions.rs +++ b/web3_proxy/src/rpcs/transactions.rs @@ -22,9 +22,10 @@ impl Web3Connections { pending_tx_id: TxHash, ) -> Result, ProviderError> { // TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!) + // TODO: might not be a race. might be a nonce thats higher than the current account nonce. geth discards chains // TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself // TODO: if one rpc fails, try another? - let tx: Transaction = match rpc.try_request_handle().await { + let tx: Transaction = match rpc.try_request_handle(None).await { Ok(OpenRequestResult::Handle(handle)) => { handle .request(