From fd1be8a702f9186b7d9a42225b0d6f262e788115 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 20 Dec 2022 10:54:13 -0800 Subject: [PATCH] move more into the spawned task --- TODO.md | 2 + web3_proxy/src/config.rs | 14 ----- web3_proxy/src/frontend/rpc_proxy_http.rs | 69 ++++++++++++++--------- web3_proxy/src/frontend/rpc_proxy_ws.rs | 3 - 4 files changed, 44 insertions(+), 44 deletions(-) diff --git a/TODO.md b/TODO.md index 02dea7a1..e00e735f 100644 --- a/TODO.md +++ b/TODO.md @@ -301,6 +301,7 @@ These are not yet ordered. There might be duplicates. We might not actually need - [ ] if db is down, keep logins cached longer. at least only new logins will have trouble then - [ ] rate limiting/throttling on query_user_stats - [ ] minimum allowed query_start on query_user_stats +- [ ] setting request limits to None is broken. it does maxu64 and then internal deferred rate limiter counts try to *99/100 - [ ] during shutdown, mark the proxy unhealthy and send unsubscribe responses for any open websocket subscriptions - [ ] some chains still use total_difficulty. have total_difficulty be used only if the chain needs it - if total difficulty is not on the block and we aren't on ETH, fetch the full block instead of just the header @@ -580,3 +581,4 @@ in another repo: event subscriber - [ ] some third party rpcs have limits on the size of eth_getLogs. include those limits in server config - [ ] some internal requests should go through app.proxy_rpc_request so that they get caching! - be careful not to make an infinite loop +- [ ] request timeout messages should include the request id diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 7a81bc20..0e7bd188 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -113,13 +113,11 @@ pub struct AppConfig { /// Concurrent request limit for anonymous users. /// Some(0) = block all requests /// None = allow all requests - #[serde(default = "default_public_max_concurrent_requests")] pub public_max_concurrent_requests: Option, /// Request limit for anonymous users. /// Some(0) = block all requests /// None = allow all requests - #[serde(default = "default_public_requests_per_period")] pub public_requests_per_period: Option, /// RPC responses are cached locally @@ -158,18 +156,6 @@ fn default_min_synced_rpcs() -> usize { 1 } -/// 0 blocks anonymous requests. -/// None allows unlimited concurrent requests -// TODO: what is a reasonable default? -fn default_public_max_concurrent_requests() -> Option { - Some(5) -} - -/// 0 blocks anonymous requests by default. -fn default_public_requests_per_period() -> Option { - Some(0) -} - /// Having a low amount of concurrent requests for bearer tokens keeps us from hammering the database. fn default_bearer_token_max_concurrent_requests() -> u64 { 2 diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 9015125c..060142d3 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -22,19 +22,20 @@ pub async fn proxy_web3_rpc( origin: Option>, Json(payload): Json, ) -> FrontendResult { - // TODO: do we care about keeping the TypedHeader wrapper? - let origin = origin.map(|x| x.0); - - // TODO: move ip_is_authorized/key_is_authorized into proxy_web3_rpc let f = tokio::spawn(async move { - let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin).await?; + // TODO: do we care about keeping the TypedHeader wrapper? + let origin = origin.map(|x| x.0); + + let (authorization, semaphore) = ip_is_authorized(&app, ip, origin).await?; let authorization = Arc::new(authorization); - app.proxy_web3_rpc(authorization, payload).await + app.proxy_web3_rpc(authorization, payload) + .await + .map(|(x, y)| (x, y, semaphore)) }); - let (response, rpcs) = f.await??; + let (response, rpcs, _semaphore) = f.await??; let mut response = Json(&response).into_response(); @@ -65,27 +66,41 @@ pub async fn proxy_web3_rpc_with_key( Path(rpc_key): Path, Json(payload): Json, ) -> FrontendResult { - let rpc_key = rpc_key.parse()?; - - // keep the semaphore until the end of the response - let (authorization, _semaphore) = key_is_authorized( - &app, - rpc_key, - ip, - origin.map(|x| x.0), - referer.map(|x| x.0), - user_agent.map(|x| x.0), - ) - .await?; - - let authorization = Arc::new(authorization); - + // TODO: DRY w/ proxy_web3_rpc // the request can take a while, so we spawn so that we can start serving another request - // TODO: spawn even earlier? - let f = tokio::spawn(async move { app.proxy_web3_rpc(authorization, payload).await }); + let f = tokio::spawn(async move { + let rpc_key = rpc_key.parse()?; - // if this is an error, we are likely shutting down - let response = f.await??; + let (authorization, semaphore) = key_is_authorized( + &app, + rpc_key, + ip, + origin.map(|x| x.0), + referer.map(|x| x.0), + user_agent.map(|x| x.0), + ) + .await?; - Ok(Json(&response).into_response()) + let authorization = Arc::new(authorization); + + app.proxy_web3_rpc(authorization, payload) + .await + .map(|(x, y)| (x, y, semaphore)) + }); + + let (response, rpcs, _semaphore) = f.await??; + + let mut response = Json(&response).into_response(); + + let headers = response.headers_mut(); + + // TODO: special string if no rpcs were used (cache hit)? + let rpcs: String = rpcs.into_iter().map(|x| x.name.clone()).join(","); + + headers.insert( + "W3P-RPCs", + rpcs.parse().expect("W3P-RPCS should always parse"), + ); + + Ok(response) } diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 4243cd4b..033486ba 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -33,7 +33,6 @@ use std::sync::Arc; use std::{str::from_utf8_mut, sync::atomic::AtomicUsize}; /// Public entrypoint for WebSocket JSON-RPC requests. -/// Defaults to rate limiting by IP address, but can also read the Authorization header for a bearer token. #[debug_handler] pub async fn websocket_handler( Extension(app): Extension>, @@ -41,8 +40,6 @@ pub async fn websocket_handler( origin: Option>, ws_upgrade: Option, ) -> FrontendResult { - // TODO: i don't like logging ips. move this to trace level? - let origin = origin.map(|x| x.0); let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin).await?;