diff --git a/TODO.md b/TODO.md index e34535c6..39ebdca0 100644 --- a/TODO.md +++ b/TODO.md @@ -147,12 +147,11 @@ These are roughly in order of completition - include the old head number and block in the log - [x] exponential backoff when reconnecting a connection - [x] once the merge happens, we don't want to use total difficulty and instead just care about the number -- [-] rewrite rate limiting to have a tiered cache. do not put redis in the hot path +- [x] rewrite rate limiting to have a tiered cache. do not put redis in the hot path - instead, we should check a local cache for the current rate limit (+1) and spawn an update to the local cache from redis in the background. - [x] when there are a LOT of concurrent requests, we see errors. i thought that was a problem with redis cell, but it happens with my simpler rate limit. now i think the problem is actually with bb8 - https://docs.rs/redis/latest/redis/aio/struct.ConnectionManager.html or https://crates.io/crates/deadpool-redis? - WARN http_request: redis_rate_limit::errors: redis error err=Response was of incompatible type: "Response type not string compatible." (response was int(500237)) id=01GC6514JWN5PS1NCWJCGJTC94 method=POST - - [ ] bring back redis-cell? - [ ] web3_proxy_error_count{path = "backend_rpc/request"} is inflated by a bunch of reverts. do not log reverts as warn. - erigon gives `method=eth_call reqid=986147 t=1.151551ms err="execution reverted"` - [ ] opt-in debug mode that inspects responses for reverts and saves the request to the database for the user diff --git a/deferred-rate-limiter/src/lib.rs b/deferred-rate-limiter/src/lib.rs index f0d023c8..1fbd9de3 100644 --- a/deferred-rate-limiter/src/lib.rs +++ b/deferred-rate-limiter/src/lib.rs @@ -52,7 +52,7 @@ where /// TODO: max_per_period being None means two things. some places it means unlimited, but here it means to use the default. make an enum pub async fn throttle( &self, - key: &K, + key: K, max_per_period: Option, count: u64, ) -> anyhow::Result { @@ -76,7 +76,7 @@ where // set arc_deferred_rate_limit_result and return the coun self.local_cache - .get_with(*key, async move { + .get_with(key, async move { // we do not use the try operator here because we want to be okay with redis errors let redis_count = match rrl .throttle_label(&redis_key, Some(max_per_period), count) @@ -167,9 +167,9 @@ where Err(err) => { // don't let redis errors block our users! error!( - // ?key, // TODO: this errors + ?key, ?err, - "unable to query rate limits. local cache available" + "unable to query rate limits, but local cache is available" ); // TODO: we need to start a timer that resets this count every minute DeferredRateLimitResult::Allowed diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index a2d2846f..59b6bc4d 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -39,7 +39,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::{broadcast, watch}; use tokio::task::JoinHandle; -use tokio::time::{timeout}; +use tokio::time::timeout; use tokio_stream::wrappers::{BroadcastStream, WatchStream}; use tracing::{error, info, info_span, instrument, trace, warn, Instrument}; use uuid::Uuid; @@ -166,7 +166,7 @@ impl Web3ProxyApp { let open_request_handle_metrics: Arc = Default::default(); // first, we connect to mysql and make sure the latest migrations have run - let db_conn = if let Some(db_url) = &top_config.app.db_url { + let db_conn = if let Some(db_url) = top_config.app.db_url.clone() { let db_min_connections = top_config .app .db_min_connections @@ -178,8 +178,7 @@ impl Web3ProxyApp { .db_max_connections .unwrap_or(db_min_connections * 2); - let db = - get_migrated_db(db_url.clone(), db_min_connections, redis_max_connections).await?; + let db = get_migrated_db(db_url, db_min_connections, redis_max_connections).await?; Some(db) } else { @@ -220,17 +219,15 @@ impl Web3ProxyApp { .redis_max_connections .unwrap_or(num_workers * 2); - // TODO: what are reasonable timeouts + // TODO: what are reasonable timeouts? // TODO: set a wait timeout? maybe somehow just emit a warning if this is long let redis_pool = RedisConfig::from_url(redis_url) .builder()? - .create_timeout(Some(Duration::from_secs(5))) .max_size(redis_max_connections) - .recycle_timeout(Some(Duration::from_secs(5))) .runtime(DeadpoolRuntime::Tokio1) .build()?; - // test the pool + // test the redis pool if let Err(err) = redis_pool.get().await { error!( ?err, @@ -484,7 +481,8 @@ impl Web3ProxyApp { }, }); - let msg = Message::Text(serde_json::to_string(&msg).unwrap()); + let msg = + Message::Text(serde_json::to_string(&msg).expect("we made this `msg`")); if response_sender.send_async(msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? @@ -526,7 +524,9 @@ impl Web3ProxyApp { }, }); - let msg = Message::Text(serde_json::to_string(&msg).unwrap()); + let msg = Message::Text( + serde_json::to_string(&msg).expect("we made this message"), + ); if response_sender.send_async(msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? @@ -667,6 +667,7 @@ impl Web3ProxyApp { trace!("Received request: {:?}", request); // save the id so we can attach it to the response + // TODO: instead of cloning, take the id out let request_id = request.id.clone(); // TODO: if eth_chainId or net_version, serve those without querying the backend @@ -834,7 +835,11 @@ impl Web3ProxyApp { return Err(anyhow::anyhow!("invalid request")); } - let param = Bytes::from_str(params[0].as_str().unwrap())?; + let param = Bytes::from_str( + params[0] + .as_str() + .context("parsing params 0 into str then bytes")?, + )?; let hash = H256::from(keccak256(param)); @@ -881,6 +886,8 @@ impl Web3ProxyApp { request.params.clone().map(|x| x.to_string()), ); + // TODO: remove their request id instead of cloning it + // TODO: move this caching outside this match and cache some of the other responses? // TODO: cache the warp::reply to save us serializing every time? let mut response = self @@ -888,15 +895,24 @@ impl Web3ProxyApp { .try_get_with(cache_key, async move { // TODO: retry some failures automatically! // TODO: try private_rpcs if all the balanced_rpcs fail! - self.balanced_rpcs + // TODO: put the hash here instead? + let mut response = self + .balanced_rpcs .try_send_best_upstream_server(request, Some(&request_block_id.num)) - .await + .await?; + + // discard their id by replacing it with an empty + response.id = Default::default(); + + Ok::<_, anyhow::Error>(response) }) .await - .unwrap(); + .map_err(|err| Arc::try_unwrap(err).expect("this should be the only reference")) + .context("caching response")?; - // since this data came out of a cache, the id is likely wrong. + // since this data came likely out of a cache, the id is not going to match // replace the id with our request's id. + // TODO: cache without the id response.id = request_id; return Ok(response); diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 52308036..c0194f9d 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -36,19 +36,19 @@ impl IntoResponse for FrontendErrorResponse { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcForwardedResponse::from_string( - // TODO: is it safe to expose all our anyhow strings? + // TODO: is it safe to expose all of our anyhow strings? err.to_string(), Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), None, ), ) } - // TODO: make this better Self::Box(err) => { warn!(?err, "boxed"); ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcForwardedResponse::from_str( + // TODO: make this better. maybe include the error type? "boxed error!", Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), None, @@ -66,17 +66,6 @@ impl IntoResponse for FrontendErrorResponse { ), ) } - // Self::RedisRun(err) => { - // warn!(?err, "redis run"); - // ( - // StatusCode::INTERNAL_SERVER_ERROR, - // JsonRpcForwardedResponse::from_str( - // "redis run error!", - // Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - // None, - // ), - // ) - // } Self::Response(r) => { debug_assert_ne!(r.status(), StatusCode::OK); return r; @@ -95,6 +84,7 @@ impl IntoResponse for FrontendErrorResponse { Self::RateLimitedIp(ip, retry_at) => { // TODO: emit a stat // TODO: include retry_at in the error + // TODO: if retry_at is None, give an unauthorized status code? ( StatusCode::TOO_MANY_REQUESTS, JsonRpcForwardedResponse::from_string( @@ -127,6 +117,7 @@ impl IntoResponse for FrontendErrorResponse { ), Self::NotFound => { // TODO: emit a stat? + // TODO: instead of an error, show a normal html page for 404 ( StatusCode::NOT_FOUND, JsonRpcForwardedResponse::from_str( diff --git a/web3_proxy/src/frontend/rate_limit.rs b/web3_proxy/src/frontend/rate_limit.rs index a4f7dda0..edf97d97 100644 --- a/web3_proxy/src/frontend/rate_limit.rs +++ b/web3_proxy/src/frontend/rate_limit.rs @@ -56,7 +56,7 @@ impl Web3ProxyApp { // TODO: have a local cache because if we hit redis too hard we get errors // TODO: query redis in the background so that users don't have to wait on this network request if let Some(rate_limiter) = &self.frontend_ip_rate_limiter { - match rate_limiter.throttle(&ip, None, 1).await { + match rate_limiter.throttle(ip, None, 1).await { Ok(DeferredRateLimitResult::Allowed) => Ok(RateLimitResult::AllowedIp(ip)), Ok(DeferredRateLimitResult::RetryAt(retry_at)) => { // TODO: set headers so they know when they can retry @@ -117,6 +117,7 @@ impl Web3ProxyApp { } else { Some(requests_per_minute) }; + Ok(UserCacheValue::from((user_id, user_count_per_period))) } None => Ok(UserCacheValue::from((0, Some(0)))), @@ -144,7 +145,7 @@ impl Web3ProxyApp { // user key is valid. now check rate limits if let Some(rate_limiter) = &self.frontend_key_rate_limiter { match rate_limiter - .throttle(&user_key, Some(user_count_per_period), 1) + .throttle(user_key, Some(user_count_per_period), 1) .await { Ok(DeferredRateLimitResult::Allowed) => { diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 83b0e212..fbbcfe5c 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -245,13 +245,12 @@ pub async fn post_login( // save the bearer token in redis with a long (7 or 30 day?) expiry. or in database? let mut redis_conn = app.redis_conn().await?; + // TODO: move this into a struct so this is less fragile let bearer_key = format!("bearer:{}", bearer_token); redis_conn.set(bearer_key, u.id.to_string()).await?; - // save the user data in redis with a short expiry - // TODO: we already have uk, so this could be more efficient. it works for now - app.user_data(uk.api_key).await?; + // TODO: save user_data. we already have uk, so this could be more efficient. it works for now Ok(response) }