From 879c6e49f246ffa3c3a25653c53e9df7dd66b784 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 10 Sep 2022 00:12:14 +0000 Subject: [PATCH] dry errors so that rate limits dont log so much --- Cargo.lock | 1 + TODO.md | 10 +- redis-rate-limit/Cargo.toml | 3 + redis-rate-limit/src/lib.rs | 3 +- web3_proxy/src/frontend/errors.rs | 149 ++++++++++++------ web3_proxy/src/frontend/rate_limit.rs | 174 +++++++--------------- web3_proxy/src/frontend/rpc_proxy_http.rs | 4 +- web3_proxy/src/frontend/rpc_proxy_ws.rs | 6 +- web3_proxy/src/jsonrpc.rs | 30 ++-- web3_proxy/src/rpcs/connection.rs | 15 +- web3_proxy/src/rpcs/request.rs | 34 ++++- 11 files changed, 231 insertions(+), 198 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d4615872..93f56ae3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3745,6 +3745,7 @@ version = "0.2.0" dependencies = [ "anyhow", "bb8-redis", + "tokio", "tracing", ] diff --git a/TODO.md b/TODO.md index 2536a262..9077e8ed 100644 --- a/TODO.md +++ b/TODO.md @@ -132,7 +132,7 @@ - [-] use siwe messages and signatures for sign up and login - [ ] quick script that calls all the curve-api endpoints once and checks for success, then calls wrk to hammer it - [ ] https://github.com/curvefi/curve-api - - [ ] test /api/getGauges + - [ ] test /api/getGaugesmethod - usually times out after vercel's 60 second timeout - one time got: Error invalid Json response "" - [-] basic request method stats (using the user_id and other fields that are in the tracing frame) @@ -148,7 +148,7 @@ These are not yet ordered. -- [ ] favicon. +- [ ] favicon - eth_1 | 2022-09-07T17:10:48.431536Z WARN web3_proxy::jsonrpc: forwarding error err=nothing to see here - use the one on https://staging.llamanodes.com/ - [ ] page that prints a graphviz dotfile of the blockchain @@ -241,9 +241,11 @@ new endpoints for users: ## V2 -These are not - +These are not ordered. I think some rows also accidently got deleted here. Check git history. +- [ ] opt-in debug mode that inspects responses for reverts and gives more logs about the call + - this must be opt-in since it will slow things down and will make their calls less private + - erigon just gives `method=eth_call reqid=986147 t=1.151551ms err="execution reverted"` - [ ] jwt auth so people can easily switch from infura - [ ] most things that are cached locally should probably be in shared redis caches - [ ] automated soft limit diff --git a/redis-rate-limit/Cargo.toml b/redis-rate-limit/Cargo.toml index e1178aa0..4938b60a 100644 --- a/redis-rate-limit/Cargo.toml +++ b/redis-rate-limit/Cargo.toml @@ -8,3 +8,6 @@ edition = "2021" anyhow = "1.0.64" bb8-redis = "0.11.0" tracing = "0.1.36" + +# TODO: i'd prefer not to require tokio here, but we use tokio::time +tokio = "1.21.0" diff --git a/redis-rate-limit/src/lib.rs b/redis-rate-limit/src/lib.rs index 574c170b..eb6fd041 100644 --- a/redis-rate-limit/src/lib.rs +++ b/redis-rate-limit/src/lib.rs @@ -4,7 +4,8 @@ mod errors; use anyhow::Context; use bb8_redis::redis::pipe; use std::ops::Add; -use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::time::{Duration, Instant}; use tracing::{debug, trace}; pub use crate::errors::{RedisError, RedisErrorSink}; diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 542c324e..bda3509f 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -7,9 +7,9 @@ use axum::{ use derive_more::From; use redis_rate_limit::{bb8::RunError, RedisError}; use sea_orm::DbErr; -use serde_json::value::RawValue; -use std::error::Error; -use tracing::instrument; +use std::{error::Error, net::IpAddr}; +use tokio::time::Instant; +use tracing::{instrument, warn}; // TODO: take "IntoResult" instead? pub type FrontendResult = Result; @@ -18,65 +18,122 @@ pub type FrontendResult = Result; pub enum FrontendErrorResponse { Anyhow(anyhow::Error), Box(Box), - // TODO: should we box these instead? Redis(RedisError), RedisRun(RunError), Response(Response), Database(DbErr), + RateLimitedUser(u64, Option), + RateLimitedIp(IpAddr, Option), + NotFound, } impl IntoResponse for FrontendErrorResponse { fn into_response(self) -> Response { - let null_id = RawValue::from_string("null".to_string()).unwrap(); - - // TODO: think more about this. this match should probably give us http and jsonrpc codes - let err = match self { - Self::Anyhow(err) => err, - Self::Box(err) => anyhow::anyhow!("Boxed error: {:?}", err), - Self::Redis(err) => err.into(), - Self::RedisRun(err) => err.into(), + // TODO: include the request id in these so that users can give us something that will point to logs + let (status_code, response) = match self { + Self::Anyhow(err) => { + warn!(?err, "anyhow"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "anyhow error!", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + // TODO: make this better + Self::Box(err) => { + warn!(?err, "boxed"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "boxed error!", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::Redis(err) => { + warn!(?err, "redis"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "redis error!", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::RedisRun(err) => { + warn!(?err, "redis run"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "redis run error!", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } Self::Response(r) => { + debug_assert_ne!(r.status(), StatusCode::OK); return r; } - Self::Database(err) => err.into(), + Self::Database(err) => { + warn!(?err, "database"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "database error!", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::RateLimitedIp(ip, retry_at) => { + // TODO: emit a stat + // TODO: include retry_at in the error + ( + StatusCode::TOO_MANY_REQUESTS, + JsonRpcForwardedResponse::from_string( + format!("too many requests from ip {}!", ip), + Some(StatusCode::TOO_MANY_REQUESTS.as_u16().into()), + None, + ), + ) + } + // TODO: this should actually by the id of the key. multiple users might control one key + Self::RateLimitedUser(user_id, retry_at) => { + // TODO: emit a stat + // TODO: include retry_at in the error + ( + StatusCode::TOO_MANY_REQUESTS, + JsonRpcForwardedResponse::from_string( + format!("too many requests from user {}!", user_id), + Some(StatusCode::TOO_MANY_REQUESTS.as_u16().into()), + None, + ), + ) + } + Self::NotFound => { + // TODO: emit a stat? + ( + StatusCode::NOT_FOUND, + JsonRpcForwardedResponse::from_str( + "not found!", + Some(StatusCode::NOT_FOUND.as_u16().into()), + None, + ), + ) + } }; - let err = JsonRpcForwardedResponse::from_anyhow_error(err, null_id); - - let code = StatusCode::INTERNAL_SERVER_ERROR; - - // TODO: logs here are too verbose. emit a stat instead? or maybe only log internal errors? - // warn!("Responding with error: {:?}", err); - - (code, Json(err)).into_response() + (status_code, Json(response)).into_response() } } #[instrument(skip_all)] pub async fn handler_404() -> Response { - let err = anyhow::anyhow!("nothing to see here"); - - anyhow_error_into_response(Some(StatusCode::NOT_FOUND), None, err) -} - -/// TODO: generic error? -/// handle errors by converting them into something that implements `IntoResponse` -/// TODO: use this. i can't get to work -/// TODO: i think we want a custom result type instead. put the anyhow result inside. then `impl IntoResponse for CustomResult` -pub fn anyhow_error_into_response( - http_code: Option, - id: Option>, - err: anyhow::Error, -) -> Response { - // TODO: we might have an id. like if this is for rate limiting, we can use it - let id = id.unwrap_or_else(|| RawValue::from_string("null".to_string()).unwrap()); - - let err = JsonRpcForwardedResponse::from_anyhow_error(err, id); - - // TODO: logs here are too verbose. emit a stat - // warn!("Responding with error: {:?}", err); - - let code = http_code.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); - - (code, Json(err)).into_response() + FrontendErrorResponse::NotFound.into_response() } diff --git a/web3_proxy/src/frontend/rate_limit.rs b/web3_proxy/src/frontend/rate_limit.rs index c1ee2645..b5e2c022 100644 --- a/web3_proxy/src/frontend/rate_limit.rs +++ b/web3_proxy/src/frontend/rate_limit.rs @@ -1,118 +1,49 @@ -use super::errors::{anyhow_error_into_response, FrontendErrorResponse}; +use super::errors::FrontendErrorResponse; use crate::app::{UserCacheValue, Web3ProxyApp}; use anyhow::Context; -use axum::response::Response; -use derive_more::From; use entities::user_keys; use redis_rate_limit::ThrottleResult; -use reqwest::StatusCode; use sea_orm::{ ColumnTrait, DeriveColumn, EntityTrait, EnumIter, IdenStatic, QueryFilter, QuerySelect, }; use std::{net::IpAddr, time::Duration}; use tokio::time::Instant; -use tracing::{debug, error}; +use tracing::{debug, error, trace}; use uuid::Uuid; +#[derive(Debug)] pub enum RateLimitResult { AllowedIp(IpAddr), AllowedUser(u64), - IpRateLimitExceeded(IpAddr), - UserRateLimitExceeded(u64), + RateLimitedIp(IpAddr, Option), + RateLimitedUser(u64, Option), UnknownKey, } -#[derive(From)] -pub enum RequestFrom { - Ip(IpAddr), - // TODO: fetch the actual user? - User(u64), -} - -impl TryFrom for IpAddr { - type Error = anyhow::Error; - - fn try_from(value: RequestFrom) -> Result { - match value { - RequestFrom::Ip(x) => Ok(x), - _ => Err(anyhow::anyhow!("not an ip")), - } - } -} - -impl TryFrom for u64 { - type Error = anyhow::Error; - - fn try_from(value: RequestFrom) -> Result { - match value { - RequestFrom::User(x) => Ok(x), - _ => Err(anyhow::anyhow!("not a user")), - } - } -} - pub async fn rate_limit_by_ip( app: &Web3ProxyApp, ip: IpAddr, ) -> Result { - let rate_limit_result = app.rate_limit_by_ip(ip).await?; - - match rate_limit_result { + match app.rate_limit_by_ip(ip).await? { RateLimitResult::AllowedIp(x) => Ok(x), - RateLimitResult::AllowedUser(_) => panic!("only ips or errors are expected here"), - rate_limit_result => { - let _: RequestFrom = rate_limit_result.try_into()?; - - panic!("try_into should have failed") + RateLimitResult::RateLimitedIp(x, retry_at) => { + Err(FrontendErrorResponse::RateLimitedIp(x, retry_at)) } + x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x), } } -pub async fn rate_limit_by_user_key( +pub async fn rate_limit_by_key( app: &Web3ProxyApp, // TODO: change this to a Ulid user_key: Uuid, ) -> Result { - let rate_limit_result = app.rate_limit_by_key(user_key).await?; - - match rate_limit_result { - RateLimitResult::AllowedIp(_) => panic!("only user keys or errors are expected here"), + match app.rate_limit_by_key(user_key).await? { RateLimitResult::AllowedUser(x) => Ok(x), - rate_limit_result => { - let _: RequestFrom = rate_limit_result.try_into()?; - - panic!("try_into should have failed") - } - } -} - -impl TryFrom for RequestFrom { - // TODO: return an error that has its own IntoResponse? - type Error = Response; - - fn try_from(value: RateLimitResult) -> Result { - match value { - RateLimitResult::AllowedIp(x) => Ok(RequestFrom::Ip(x)), - RateLimitResult::AllowedUser(x) => Ok(RequestFrom::User(x)), - RateLimitResult::IpRateLimitExceeded(ip) => Err(anyhow_error_into_response( - Some(StatusCode::TOO_MANY_REQUESTS), - None, - // TODO: how can we attach context here? maybe add a request id tracing field? - anyhow::anyhow!(format!("rate limit exceeded for {}", ip)), - )), - RateLimitResult::UserRateLimitExceeded(user) => Err(anyhow_error_into_response( - Some(StatusCode::TOO_MANY_REQUESTS), - None, - // TODO: don't expose numeric ids. show the address instead - // TODO: how can we attach context here? maybe add a request id tracing field? - anyhow::anyhow!(format!("rate limit exceeded for user {}", user)), - )), - RateLimitResult::UnknownKey => Err(anyhow_error_into_response( - Some(StatusCode::FORBIDDEN), - None, - anyhow::anyhow!("unknown key"), - )), + RateLimitResult::RateLimitedUser(x, retry_at) => { + Err(FrontendErrorResponse::RateLimitedUser(x, retry_at)) } + x => unimplemented!("rate_limit_by_key shouldn't ever see these: {:?}", x), } } @@ -120,38 +51,42 @@ impl Web3ProxyApp { pub async fn rate_limit_by_ip(&self, ip: IpAddr) -> anyhow::Result { // TODO: dry this up with rate_limit_by_key // TODO: have a local cache because if we hit redis too hard we get errors + // TODO: query redis in the background so that users don't have to wait on this network request if let Some(rate_limiter) = &self.frontend_rate_limiter { let rate_limiter_label = format!("ip-{}", ip); - // TODO: query redis in the background so that users don't have to wait on this network request match rate_limiter .throttle_label(&rate_limiter_label, None, 1) .await { - Ok(ThrottleResult::Allowed) => {} - Ok(ThrottleResult::RetryAt(_retry_at)) => { + Ok(ThrottleResult::Allowed) => Ok(RateLimitResult::AllowedIp(ip)), + Ok(ThrottleResult::RetryAt(retry_at)) => { // TODO: set headers so they know when they can retry - debug!(?rate_limiter_label, "rate limit exceeded"); // this is too verbose, but a stat might be good - // TODO: use their id if possible - return Ok(RateLimitResult::IpRateLimitExceeded(ip)); + // TODO: debug or trace? + // this is too verbose, but a stat might be good + trace!( + ?rate_limiter_label, + "rate limit exceeded until {:?}", + retry_at + ); + Ok(RateLimitResult::RateLimitedIp(ip, Some(retry_at))) } Ok(ThrottleResult::RetryNever) => { - // TODO: prettier error for the user - return Err(anyhow::anyhow!("ip ({}) blocked by rate limiter", ip)); + // TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely + debug!(?rate_limiter_label, "rate limit exceeded"); + Ok(RateLimitResult::RateLimitedIp(ip, None)) } Err(err) => { // internal error, not rate limit being hit // TODO: i really want axum to do this for us in a single place. - error!(?err, "redis is unhappy. allowing ip"); - return Ok(RateLimitResult::AllowedIp(ip)); + error!(?err, "rate limiter is unhappy. allowing ip"); + Ok(RateLimitResult::AllowedIp(ip)) } } } else { // TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right todo!("no rate limiter"); } - - Ok(RateLimitResult::AllowedIp(ip)) } pub(crate) async fn cache_user_data(&self, user_key: Uuid) -> anyhow::Result { @@ -234,13 +169,13 @@ impl Web3ProxyApp { } // user key is valid. now check rate limits - // TODO: this is throwing errors when curve-api hits us with high concurrency. investigate + // TODO: this is throwing errors when curve-api hits us with high concurrency. investigate i think its bb8's fault if false { if let Some(rate_limiter) = &self.frontend_rate_limiter { // TODO: query redis in the background so that users don't have to wait on this network request // TODO: better key? have a prefix so its easy to delete all of these // TODO: we should probably hash this or something - let rate_limiter_label = user_key.to_string(); + let rate_limiter_label = format!("user-{}", user_key); match rate_limiter .throttle_label( @@ -250,38 +185,41 @@ impl Web3ProxyApp { ) .await { - Ok(ThrottleResult::Allowed) => {} + Ok(ThrottleResult::Allowed) => { + Ok(RateLimitResult::AllowedUser(user_data.user_id)) + } Ok(ThrottleResult::RetryAt(retry_at)) => { - // TODO: set headers so they know when they can retry or maybe tarpit them? if they are barely over? - debug!(?rate_limiter_label, "user rate limit exceeded"); // this is too verbose, but a stat might be good - // TODO: use their id if possible - return Ok(RateLimitResult::UserRateLimitExceeded(user_data.user_id)); + // TODO: set headers so they know when they can retry + // TODO: debug or trace? + // this is too verbose, but a stat might be good + trace!( + ?rate_limiter_label, + "rate limit exceeded until {:?}", + retry_at + ); + Ok(RateLimitResult::RateLimitedUser( + user_data.user_id, + Some(retry_at), + )) } Ok(ThrottleResult::RetryNever) => { - // TODO: prettier error for the user - return Err(anyhow::anyhow!( - "user #{} blocked by rate limiter", - user_data.user_id - )); + // TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely + debug!(?rate_limiter_label, "rate limit exceeded"); + Ok(RateLimitResult::RateLimitedUser(user_data.user_id, None)) } Err(err) => { // internal error, not rate limit being hit - // rather than have downtime, i think its better to just use in-process rate limiting - // TODO: in-process rate limits that pipe into redis - error!(?err, "redis is unhappy. allowing ip"); - return Ok(RateLimitResult::AllowedUser(user_data.user_id)); - } // // TODO: set headers so they know when they can retry - // // warn!(?ip, "public rate limit exceeded"); // this is too verbose, but a stat might be good - // // TODO: use their id if possible - // // TODO: StatusCode::TOO_MANY_REQUESTS - // return Err(anyhow::anyhow!("too many requests from this key")); + // TODO: i really want axum to do this for us in a single place. + error!(?err, "rate limiter is unhappy. allowing ip"); + Ok(RateLimitResult::AllowedUser(user_data.user_id)) + } } } else { // TODO: if no redis, rate limit with a local cache? todo!("no redis. cannot rate limit") } + } else { + Ok(RateLimitResult::AllowedUser(user_data.user_id)) } - - Ok(RateLimitResult::AllowedUser(user_data.user_id)) } } diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 6e3bb79a..3ff513e9 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -1,5 +1,5 @@ use super::errors::FrontendResult; -use super::rate_limit::{rate_limit_by_ip, rate_limit_by_user_key}; +use super::rate_limit::{rate_limit_by_ip, rate_limit_by_key}; use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum}; use axum::extract::{Host, Path}; use axum::headers::{Referer, UserAgent}; @@ -48,7 +48,7 @@ pub async fn user_proxy_web3_rpc( ) -> FrontendResult { let request_span = debug_span!("request", host, ?referer, ?user_agent); - let user_id: u64 = rate_limit_by_user_key(&app, user_key) + let user_id = rate_limit_by_key(&app, user_key) .instrument(request_span.clone()) .await?; diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 3ba72252..a4c99566 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -1,5 +1,5 @@ use super::errors::FrontendResult; -use super::rate_limit::{rate_limit_by_ip, rate_limit_by_user_key}; +use super::rate_limit::{rate_limit_by_ip, rate_limit_by_key}; use axum::{ extract::ws::{Message, WebSocket, WebSocketUpgrade}, extract::Path, @@ -55,7 +55,7 @@ pub async fn user_websocket_handler( Path(user_key): Path, ws_upgrade: Option, ) -> FrontendResult { - let user_id: u64 = rate_limit_by_user_key(&app, user_key).await?; + let user_id: u64 = rate_limit_by_key(&app, user_key).await?; // log the id, not the address. we don't want to expose the user's address // TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses @@ -162,7 +162,7 @@ async fn handle_socket_payload( Ok(x) => serde_json::to_string(&x), Err(err) => { // we have an anyhow error. turn it into - let response = JsonRpcForwardedResponse::from_anyhow_error(err, id); + let response = JsonRpcForwardedResponse::from_anyhow_error(err, None, Some(id)); serde_json::to_string(&response) } } diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index 5eda3900..82714576 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -4,7 +4,6 @@ use serde::de::{self, Deserialize, Deserializer, MapAccess, SeqAccess, Visitor}; use serde::Serialize; use serde_json::value::RawValue; use std::fmt; -use tracing::warn; // this is used by serde #[allow(dead_code)] @@ -184,21 +183,32 @@ impl fmt::Debug for JsonRpcForwardedResponse { } impl JsonRpcForwardedResponse { - pub fn from_anyhow_error(err: anyhow::Error, id: Box) -> Self { + pub fn from_anyhow_error( + err: anyhow::Error, + code: Option, + id: Option>, + ) -> Self { + let message = format!("{:?}", err); + + Self::from_string(message, code, id) + } + + pub fn from_str(message: &str, code: Option, id: Option>) -> Self { + Self::from_string(message.to_string(), code, id) + } + + pub fn from_string(message: String, code: Option, id: Option>) -> Self { // TODO: this is too verbose. plenty of errors are valid, like users giving an invalid address. no need to log that // TODO: can we somehow get the initial request here? if we put that into a tracing span, will things slow down a ton? - warn!(?err, "forwarding error"); - JsonRpcForwardedResponse { jsonrpc: "2.0".to_string(), - id, + id: id.unwrap_or_else(|| { + RawValue::from_string("null".to_string()).expect("null id should always work") + }), result: None, error: Some(JsonRpcErrorData { - // TODO: set this jsonrpc error code to match the http status code? or maybe the other way around? maybe take it as an arg - code: -32099, - // TODO: some errors should be included here. others should not. i think anyhow might not be the right choice - // message: "internal server error".to_string(), - message: format!("{:?}", err), + code: code.unwrap_or(-32099), + message, data: None, }), } diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 18644a5d..b900207c 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -717,24 +717,21 @@ impl Web3Connection { // check rate limits if let Some(ratelimiter) = self.hard_limit.as_ref() { - match ratelimiter.throttle().await { - Ok(ThrottleResult::Allowed) => { + match ratelimiter.throttle().await? { + ThrottleResult::Allowed => { trace!("rate limit succeeded") } - Ok(ThrottleResult::RetryAt(retry_at)) => { + ThrottleResult::RetryAt(retry_at) => { // rate limit failed // save the smallest retry_after. if nothing succeeds, return an Err with retry_after in it // TODO: use tracing better // TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0? warn!(?retry_at, rpc=%self, "Exhausted rate limit"); - return Ok(OpenRequestResult::RetryAt(retry_at.into())); + return Ok(OpenRequestResult::RetryAt(retry_at)); } - Ok(ThrottleResult::RetryNever) => { - return Err(anyhow::anyhow!("Rate limit failed.")); - } - Err(err) => { - return Err(err); + ThrottleResult::RetryNever => { + return Ok(OpenRequestResult::RetryNever); } } }; diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index ab7b9e40..6934bab2 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -8,6 +8,7 @@ use metered::ResponseTime; use metered::Throughput; use std::fmt; use std::sync::atomic; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use tokio::time::{sleep, Duration, Instant}; use tracing::warn; @@ -28,6 +29,7 @@ pub struct OpenRequestHandle { conn: Arc, // TODO: this is the same metrics on the conn. use a reference metrics: Arc, + decremented: AtomicBool, } #[metered(registry = OpenRequestHandleMetrics, visibility = pub)] @@ -45,7 +47,13 @@ impl OpenRequestHandle { let metrics = conn.open_request_handle_metrics.clone(); - Self { conn, metrics } + let decremented = false.into(); + + Self { + conn, + metrics, + decremented, + } } pub fn clone_connection(&self) -> Arc { @@ -54,7 +62,8 @@ impl OpenRequestHandle { /// Send a web3 request /// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented - /// By taking self here, we ensure that this is dropped after the request is complete + /// By taking self here, we ensure that this is dropped after the request is complete. + /// TODO: we no longer take self because metered doesn't like that #[instrument(skip_all)] #[measure([ErrorCount, HitCount, InFlight, ResponseTime, Throughput])] pub async fn request( @@ -91,9 +100,19 @@ impl OpenRequestHandle { }; // TODO: i think ethers already has trace logging (and does it much more fancy) - // TODO: at least instrument this with more useful information - // trace!(rpc=%self.0, %method, ?response); - trace!(rpc=%self.conn, %method, "response"); + if let Err(err) = &response { + warn!(?err, %method, rpc=%self.conn, "response"); + } else { + // trace!(rpc=%self.0, %method, ?response); + trace!(%method, rpc=%self.conn, "response"); + } + + self.decremented.store(true, atomic::Ordering::Release); + self.conn + .active_requests + .fetch_sub(1, atomic::Ordering::AcqRel); + + // todo: do something to make sure this doesn't get called again? i miss having the function sig have self response } @@ -101,6 +120,11 @@ impl OpenRequestHandle { impl Drop for OpenRequestHandle { fn drop(&mut self) { + if self.decremented.load(atomic::Ordering::Acquire) { + // we already decremented from a successful request + return; + } + self.conn .active_requests .fetch_sub(1, atomic::Ordering::AcqRel);