move more into the spawned task
This commit is contained in:
parent
28510f8bba
commit
fd1be8a702
2
TODO.md
2
TODO.md
@ -301,6 +301,7 @@ These are not yet ordered. There might be duplicates. We might not actually need
|
||||
- [ ] if db is down, keep logins cached longer. at least only new logins will have trouble then
|
||||
- [ ] rate limiting/throttling on query_user_stats
|
||||
- [ ] minimum allowed query_start on query_user_stats
|
||||
- [ ] setting request limits to None is broken. it does maxu64 and then internal deferred rate limiter counts try to *99/100
|
||||
- [ ] during shutdown, mark the proxy unhealthy and send unsubscribe responses for any open websocket subscriptions
|
||||
- [ ] some chains still use total_difficulty. have total_difficulty be used only if the chain needs it
|
||||
- if total difficulty is not on the block and we aren't on ETH, fetch the full block instead of just the header
|
||||
@ -580,3 +581,4 @@ in another repo: event subscriber
|
||||
- [ ] some third party rpcs have limits on the size of eth_getLogs. include those limits in server config
|
||||
- [ ] some internal requests should go through app.proxy_rpc_request so that they get caching!
|
||||
- be careful not to make an infinite loop
|
||||
- [ ] request timeout messages should include the request id
|
||||
|
@ -113,13 +113,11 @@ pub struct AppConfig {
|
||||
/// Concurrent request limit for anonymous users.
|
||||
/// Some(0) = block all requests
|
||||
/// None = allow all requests
|
||||
#[serde(default = "default_public_max_concurrent_requests")]
|
||||
pub public_max_concurrent_requests: Option<usize>,
|
||||
|
||||
/// Request limit for anonymous users.
|
||||
/// Some(0) = block all requests
|
||||
/// None = allow all requests
|
||||
#[serde(default = "default_public_requests_per_period")]
|
||||
pub public_requests_per_period: Option<u64>,
|
||||
|
||||
/// RPC responses are cached locally
|
||||
@ -158,18 +156,6 @@ fn default_min_synced_rpcs() -> usize {
|
||||
1
|
||||
}
|
||||
|
||||
/// 0 blocks anonymous requests.
|
||||
/// None allows unlimited concurrent requests
|
||||
// TODO: what is a reasonable default?
|
||||
fn default_public_max_concurrent_requests() -> Option<usize> {
|
||||
Some(5)
|
||||
}
|
||||
|
||||
/// 0 blocks anonymous requests by default.
|
||||
fn default_public_requests_per_period() -> Option<u64> {
|
||||
Some(0)
|
||||
}
|
||||
|
||||
/// Having a low amount of concurrent requests for bearer tokens keeps us from hammering the database.
|
||||
fn default_bearer_token_max_concurrent_requests() -> u64 {
|
||||
2
|
||||
|
@ -22,19 +22,20 @@ pub async fn proxy_web3_rpc(
|
||||
origin: Option<TypedHeader<Origin>>,
|
||||
Json(payload): Json<JsonRpcRequestEnum>,
|
||||
) -> FrontendResult {
|
||||
// TODO: do we care about keeping the TypedHeader wrapper?
|
||||
let origin = origin.map(|x| x.0);
|
||||
|
||||
// TODO: move ip_is_authorized/key_is_authorized into proxy_web3_rpc
|
||||
let f = tokio::spawn(async move {
|
||||
let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin).await?;
|
||||
// TODO: do we care about keeping the TypedHeader wrapper?
|
||||
let origin = origin.map(|x| x.0);
|
||||
|
||||
let (authorization, semaphore) = ip_is_authorized(&app, ip, origin).await?;
|
||||
|
||||
let authorization = Arc::new(authorization);
|
||||
|
||||
app.proxy_web3_rpc(authorization, payload).await
|
||||
app.proxy_web3_rpc(authorization, payload)
|
||||
.await
|
||||
.map(|(x, y)| (x, y, semaphore))
|
||||
});
|
||||
|
||||
let (response, rpcs) = f.await??;
|
||||
let (response, rpcs, _semaphore) = f.await??;
|
||||
|
||||
let mut response = Json(&response).into_response();
|
||||
|
||||
@ -65,27 +66,41 @@ pub async fn proxy_web3_rpc_with_key(
|
||||
Path(rpc_key): Path<String>,
|
||||
Json(payload): Json<JsonRpcRequestEnum>,
|
||||
) -> FrontendResult {
|
||||
let rpc_key = rpc_key.parse()?;
|
||||
|
||||
// keep the semaphore until the end of the response
|
||||
let (authorization, _semaphore) = key_is_authorized(
|
||||
&app,
|
||||
rpc_key,
|
||||
ip,
|
||||
origin.map(|x| x.0),
|
||||
referer.map(|x| x.0),
|
||||
user_agent.map(|x| x.0),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let authorization = Arc::new(authorization);
|
||||
|
||||
// TODO: DRY w/ proxy_web3_rpc
|
||||
// the request can take a while, so we spawn so that we can start serving another request
|
||||
// TODO: spawn even earlier?
|
||||
let f = tokio::spawn(async move { app.proxy_web3_rpc(authorization, payload).await });
|
||||
let f = tokio::spawn(async move {
|
||||
let rpc_key = rpc_key.parse()?;
|
||||
|
||||
// if this is an error, we are likely shutting down
|
||||
let response = f.await??;
|
||||
let (authorization, semaphore) = key_is_authorized(
|
||||
&app,
|
||||
rpc_key,
|
||||
ip,
|
||||
origin.map(|x| x.0),
|
||||
referer.map(|x| x.0),
|
||||
user_agent.map(|x| x.0),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(Json(&response).into_response())
|
||||
let authorization = Arc::new(authorization);
|
||||
|
||||
app.proxy_web3_rpc(authorization, payload)
|
||||
.await
|
||||
.map(|(x, y)| (x, y, semaphore))
|
||||
});
|
||||
|
||||
let (response, rpcs, _semaphore) = f.await??;
|
||||
|
||||
let mut response = Json(&response).into_response();
|
||||
|
||||
let headers = response.headers_mut();
|
||||
|
||||
// TODO: special string if no rpcs were used (cache hit)?
|
||||
let rpcs: String = rpcs.into_iter().map(|x| x.name.clone()).join(",");
|
||||
|
||||
headers.insert(
|
||||
"W3P-RPCs",
|
||||
rpcs.parse().expect("W3P-RPCS should always parse"),
|
||||
);
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
@ -33,7 +33,6 @@ use std::sync::Arc;
|
||||
use std::{str::from_utf8_mut, sync::atomic::AtomicUsize};
|
||||
|
||||
/// Public entrypoint for WebSocket JSON-RPC requests.
|
||||
/// Defaults to rate limiting by IP address, but can also read the Authorization header for a bearer token.
|
||||
#[debug_handler]
|
||||
pub async fn websocket_handler(
|
||||
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
||||
@ -41,8 +40,6 @@ pub async fn websocket_handler(
|
||||
origin: Option<TypedHeader<Origin>>,
|
||||
ws_upgrade: Option<WebSocketUpgrade>,
|
||||
) -> FrontendResult {
|
||||
// TODO: i don't like logging ips. move this to trace level?
|
||||
|
||||
let origin = origin.map(|x| x.0);
|
||||
|
||||
let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin).await?;
|
||||
|
Loading…
Reference in New Issue
Block a user