dry errors so that rate limits dont log so much
This commit is contained in:
parent
c8da98d12e
commit
879c6e49f2
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -3745,6 +3745,7 @@ version = "0.2.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bb8-redis",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
|
10
TODO.md
10
TODO.md
@ -132,7 +132,7 @@
|
||||
- [-] use siwe messages and signatures for sign up and login
|
||||
- [ ] quick script that calls all the curve-api endpoints once and checks for success, then calls wrk to hammer it
|
||||
- [ ] https://github.com/curvefi/curve-api
|
||||
- [ ] test /api/getGauges
|
||||
- [ ] test /api/getGaugesmethod
|
||||
- usually times out after vercel's 60 second timeout
|
||||
- one time got: Error invalid Json response ""
|
||||
- [-] basic request method stats (using the user_id and other fields that are in the tracing frame)
|
||||
@ -148,7 +148,7 @@
|
||||
|
||||
These are not yet ordered.
|
||||
|
||||
- [ ] favicon.
|
||||
- [ ] favicon
|
||||
- eth_1 | 2022-09-07T17:10:48.431536Z WARN web3_proxy::jsonrpc: forwarding error err=nothing to see here
|
||||
- use the one on https://staging.llamanodes.com/
|
||||
- [ ] page that prints a graphviz dotfile of the blockchain
|
||||
@ -241,9 +241,11 @@ new endpoints for users:
|
||||
|
||||
## V2
|
||||
|
||||
These are not
|
||||
|
||||
These are not ordered. I think some rows also accidently got deleted here. Check git history.
|
||||
|
||||
- [ ] opt-in debug mode that inspects responses for reverts and gives more logs about the call
|
||||
- this must be opt-in since it will slow things down and will make their calls less private
|
||||
- erigon just gives `method=eth_call reqid=986147 t=1.151551ms err="execution reverted"`
|
||||
- [ ] jwt auth so people can easily switch from infura
|
||||
- [ ] most things that are cached locally should probably be in shared redis caches
|
||||
- [ ] automated soft limit
|
||||
|
@ -8,3 +8,6 @@ edition = "2021"
|
||||
anyhow = "1.0.64"
|
||||
bb8-redis = "0.11.0"
|
||||
tracing = "0.1.36"
|
||||
|
||||
# TODO: i'd prefer not to require tokio here, but we use tokio::time
|
||||
tokio = "1.21.0"
|
||||
|
@ -4,7 +4,8 @@ mod errors;
|
||||
use anyhow::Context;
|
||||
use bb8_redis::redis::pipe;
|
||||
use std::ops::Add;
|
||||
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use tokio::time::{Duration, Instant};
|
||||
use tracing::{debug, trace};
|
||||
|
||||
pub use crate::errors::{RedisError, RedisErrorSink};
|
||||
|
@ -7,9 +7,9 @@ use axum::{
|
||||
use derive_more::From;
|
||||
use redis_rate_limit::{bb8::RunError, RedisError};
|
||||
use sea_orm::DbErr;
|
||||
use serde_json::value::RawValue;
|
||||
use std::error::Error;
|
||||
use tracing::instrument;
|
||||
use std::{error::Error, net::IpAddr};
|
||||
use tokio::time::Instant;
|
||||
use tracing::{instrument, warn};
|
||||
|
||||
// TODO: take "IntoResult" instead?
|
||||
pub type FrontendResult = Result<Response, FrontendErrorResponse>;
|
||||
@ -18,65 +18,122 @@ pub type FrontendResult = Result<Response, FrontendErrorResponse>;
|
||||
pub enum FrontendErrorResponse {
|
||||
Anyhow(anyhow::Error),
|
||||
Box(Box<dyn Error>),
|
||||
// TODO: should we box these instead?
|
||||
Redis(RedisError),
|
||||
RedisRun(RunError<RedisError>),
|
||||
Response(Response),
|
||||
Database(DbErr),
|
||||
RateLimitedUser(u64, Option<Instant>),
|
||||
RateLimitedIp(IpAddr, Option<Instant>),
|
||||
NotFound,
|
||||
}
|
||||
|
||||
impl IntoResponse for FrontendErrorResponse {
|
||||
fn into_response(self) -> Response {
|
||||
let null_id = RawValue::from_string("null".to_string()).unwrap();
|
||||
|
||||
// TODO: think more about this. this match should probably give us http and jsonrpc codes
|
||||
let err = match self {
|
||||
Self::Anyhow(err) => err,
|
||||
Self::Box(err) => anyhow::anyhow!("Boxed error: {:?}", err),
|
||||
Self::Redis(err) => err.into(),
|
||||
Self::RedisRun(err) => err.into(),
|
||||
// TODO: include the request id in these so that users can give us something that will point to logs
|
||||
let (status_code, response) = match self {
|
||||
Self::Anyhow(err) => {
|
||||
warn!(?err, "anyhow");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
JsonRpcForwardedResponse::from_str(
|
||||
"anyhow error!",
|
||||
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
|
||||
None,
|
||||
),
|
||||
)
|
||||
}
|
||||
// TODO: make this better
|
||||
Self::Box(err) => {
|
||||
warn!(?err, "boxed");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
JsonRpcForwardedResponse::from_str(
|
||||
"boxed error!",
|
||||
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
|
||||
None,
|
||||
),
|
||||
)
|
||||
}
|
||||
Self::Redis(err) => {
|
||||
warn!(?err, "redis");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
JsonRpcForwardedResponse::from_str(
|
||||
"redis error!",
|
||||
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
|
||||
None,
|
||||
),
|
||||
)
|
||||
}
|
||||
Self::RedisRun(err) => {
|
||||
warn!(?err, "redis run");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
JsonRpcForwardedResponse::from_str(
|
||||
"redis run error!",
|
||||
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
|
||||
None,
|
||||
),
|
||||
)
|
||||
}
|
||||
Self::Response(r) => {
|
||||
debug_assert_ne!(r.status(), StatusCode::OK);
|
||||
return r;
|
||||
}
|
||||
Self::Database(err) => err.into(),
|
||||
Self::Database(err) => {
|
||||
warn!(?err, "database");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
JsonRpcForwardedResponse::from_str(
|
||||
"database error!",
|
||||
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
|
||||
None,
|
||||
),
|
||||
)
|
||||
}
|
||||
Self::RateLimitedIp(ip, retry_at) => {
|
||||
// TODO: emit a stat
|
||||
// TODO: include retry_at in the error
|
||||
(
|
||||
StatusCode::TOO_MANY_REQUESTS,
|
||||
JsonRpcForwardedResponse::from_string(
|
||||
format!("too many requests from ip {}!", ip),
|
||||
Some(StatusCode::TOO_MANY_REQUESTS.as_u16().into()),
|
||||
None,
|
||||
),
|
||||
)
|
||||
}
|
||||
// TODO: this should actually by the id of the key. multiple users might control one key
|
||||
Self::RateLimitedUser(user_id, retry_at) => {
|
||||
// TODO: emit a stat
|
||||
// TODO: include retry_at in the error
|
||||
(
|
||||
StatusCode::TOO_MANY_REQUESTS,
|
||||
JsonRpcForwardedResponse::from_string(
|
||||
format!("too many requests from user {}!", user_id),
|
||||
Some(StatusCode::TOO_MANY_REQUESTS.as_u16().into()),
|
||||
None,
|
||||
),
|
||||
)
|
||||
}
|
||||
Self::NotFound => {
|
||||
// TODO: emit a stat?
|
||||
(
|
||||
StatusCode::NOT_FOUND,
|
||||
JsonRpcForwardedResponse::from_str(
|
||||
"not found!",
|
||||
Some(StatusCode::NOT_FOUND.as_u16().into()),
|
||||
None,
|
||||
),
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let err = JsonRpcForwardedResponse::from_anyhow_error(err, null_id);
|
||||
|
||||
let code = StatusCode::INTERNAL_SERVER_ERROR;
|
||||
|
||||
// TODO: logs here are too verbose. emit a stat instead? or maybe only log internal errors?
|
||||
// warn!("Responding with error: {:?}", err);
|
||||
|
||||
(code, Json(err)).into_response()
|
||||
(status_code, Json(response)).into_response()
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub async fn handler_404() -> Response {
|
||||
let err = anyhow::anyhow!("nothing to see here");
|
||||
|
||||
anyhow_error_into_response(Some(StatusCode::NOT_FOUND), None, err)
|
||||
}
|
||||
|
||||
/// TODO: generic error?
|
||||
/// handle errors by converting them into something that implements `IntoResponse`
|
||||
/// TODO: use this. i can't get <https://docs.rs/axum/latest/axum/error_handling/index.html> to work
|
||||
/// TODO: i think we want a custom result type instead. put the anyhow result inside. then `impl IntoResponse for CustomResult`
|
||||
pub fn anyhow_error_into_response(
|
||||
http_code: Option<StatusCode>,
|
||||
id: Option<Box<RawValue>>,
|
||||
err: anyhow::Error,
|
||||
) -> Response {
|
||||
// TODO: we might have an id. like if this is for rate limiting, we can use it
|
||||
let id = id.unwrap_or_else(|| RawValue::from_string("null".to_string()).unwrap());
|
||||
|
||||
let err = JsonRpcForwardedResponse::from_anyhow_error(err, id);
|
||||
|
||||
// TODO: logs here are too verbose. emit a stat
|
||||
// warn!("Responding with error: {:?}", err);
|
||||
|
||||
let code = http_code.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
|
||||
|
||||
(code, Json(err)).into_response()
|
||||
FrontendErrorResponse::NotFound.into_response()
|
||||
}
|
||||
|
@ -1,118 +1,49 @@
|
||||
use super::errors::{anyhow_error_into_response, FrontendErrorResponse};
|
||||
use super::errors::FrontendErrorResponse;
|
||||
use crate::app::{UserCacheValue, Web3ProxyApp};
|
||||
use anyhow::Context;
|
||||
use axum::response::Response;
|
||||
use derive_more::From;
|
||||
use entities::user_keys;
|
||||
use redis_rate_limit::ThrottleResult;
|
||||
use reqwest::StatusCode;
|
||||
use sea_orm::{
|
||||
ColumnTrait, DeriveColumn, EntityTrait, EnumIter, IdenStatic, QueryFilter, QuerySelect,
|
||||
};
|
||||
use std::{net::IpAddr, time::Duration};
|
||||
use tokio::time::Instant;
|
||||
use tracing::{debug, error};
|
||||
use tracing::{debug, error, trace};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum RateLimitResult {
|
||||
AllowedIp(IpAddr),
|
||||
AllowedUser(u64),
|
||||
IpRateLimitExceeded(IpAddr),
|
||||
UserRateLimitExceeded(u64),
|
||||
RateLimitedIp(IpAddr, Option<Instant>),
|
||||
RateLimitedUser(u64, Option<Instant>),
|
||||
UnknownKey,
|
||||
}
|
||||
|
||||
#[derive(From)]
|
||||
pub enum RequestFrom {
|
||||
Ip(IpAddr),
|
||||
// TODO: fetch the actual user?
|
||||
User(u64),
|
||||
}
|
||||
|
||||
impl TryFrom<RequestFrom> for IpAddr {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: RequestFrom) -> Result<Self, Self::Error> {
|
||||
match value {
|
||||
RequestFrom::Ip(x) => Ok(x),
|
||||
_ => Err(anyhow::anyhow!("not an ip")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<RequestFrom> for u64 {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: RequestFrom) -> Result<Self, Self::Error> {
|
||||
match value {
|
||||
RequestFrom::User(x) => Ok(x),
|
||||
_ => Err(anyhow::anyhow!("not a user")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn rate_limit_by_ip(
|
||||
app: &Web3ProxyApp,
|
||||
ip: IpAddr,
|
||||
) -> Result<IpAddr, FrontendErrorResponse> {
|
||||
let rate_limit_result = app.rate_limit_by_ip(ip).await?;
|
||||
|
||||
match rate_limit_result {
|
||||
match app.rate_limit_by_ip(ip).await? {
|
||||
RateLimitResult::AllowedIp(x) => Ok(x),
|
||||
RateLimitResult::AllowedUser(_) => panic!("only ips or errors are expected here"),
|
||||
rate_limit_result => {
|
||||
let _: RequestFrom = rate_limit_result.try_into()?;
|
||||
|
||||
panic!("try_into should have failed")
|
||||
RateLimitResult::RateLimitedIp(x, retry_at) => {
|
||||
Err(FrontendErrorResponse::RateLimitedIp(x, retry_at))
|
||||
}
|
||||
x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn rate_limit_by_user_key(
|
||||
pub async fn rate_limit_by_key(
|
||||
app: &Web3ProxyApp,
|
||||
// TODO: change this to a Ulid
|
||||
user_key: Uuid,
|
||||
) -> Result<u64, FrontendErrorResponse> {
|
||||
let rate_limit_result = app.rate_limit_by_key(user_key).await?;
|
||||
|
||||
match rate_limit_result {
|
||||
RateLimitResult::AllowedIp(_) => panic!("only user keys or errors are expected here"),
|
||||
match app.rate_limit_by_key(user_key).await? {
|
||||
RateLimitResult::AllowedUser(x) => Ok(x),
|
||||
rate_limit_result => {
|
||||
let _: RequestFrom = rate_limit_result.try_into()?;
|
||||
|
||||
panic!("try_into should have failed")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<RateLimitResult> for RequestFrom {
|
||||
// TODO: return an error that has its own IntoResponse?
|
||||
type Error = Response;
|
||||
|
||||
fn try_from(value: RateLimitResult) -> Result<Self, Self::Error> {
|
||||
match value {
|
||||
RateLimitResult::AllowedIp(x) => Ok(RequestFrom::Ip(x)),
|
||||
RateLimitResult::AllowedUser(x) => Ok(RequestFrom::User(x)),
|
||||
RateLimitResult::IpRateLimitExceeded(ip) => Err(anyhow_error_into_response(
|
||||
Some(StatusCode::TOO_MANY_REQUESTS),
|
||||
None,
|
||||
// TODO: how can we attach context here? maybe add a request id tracing field?
|
||||
anyhow::anyhow!(format!("rate limit exceeded for {}", ip)),
|
||||
)),
|
||||
RateLimitResult::UserRateLimitExceeded(user) => Err(anyhow_error_into_response(
|
||||
Some(StatusCode::TOO_MANY_REQUESTS),
|
||||
None,
|
||||
// TODO: don't expose numeric ids. show the address instead
|
||||
// TODO: how can we attach context here? maybe add a request id tracing field?
|
||||
anyhow::anyhow!(format!("rate limit exceeded for user {}", user)),
|
||||
)),
|
||||
RateLimitResult::UnknownKey => Err(anyhow_error_into_response(
|
||||
Some(StatusCode::FORBIDDEN),
|
||||
None,
|
||||
anyhow::anyhow!("unknown key"),
|
||||
)),
|
||||
RateLimitResult::RateLimitedUser(x, retry_at) => {
|
||||
Err(FrontendErrorResponse::RateLimitedUser(x, retry_at))
|
||||
}
|
||||
x => unimplemented!("rate_limit_by_key shouldn't ever see these: {:?}", x),
|
||||
}
|
||||
}
|
||||
|
||||
@ -120,38 +51,42 @@ impl Web3ProxyApp {
|
||||
pub async fn rate_limit_by_ip(&self, ip: IpAddr) -> anyhow::Result<RateLimitResult> {
|
||||
// TODO: dry this up with rate_limit_by_key
|
||||
// TODO: have a local cache because if we hit redis too hard we get errors
|
||||
// TODO: query redis in the background so that users don't have to wait on this network request
|
||||
if let Some(rate_limiter) = &self.frontend_rate_limiter {
|
||||
let rate_limiter_label = format!("ip-{}", ip);
|
||||
|
||||
// TODO: query redis in the background so that users don't have to wait on this network request
|
||||
match rate_limiter
|
||||
.throttle_label(&rate_limiter_label, None, 1)
|
||||
.await
|
||||
{
|
||||
Ok(ThrottleResult::Allowed) => {}
|
||||
Ok(ThrottleResult::RetryAt(_retry_at)) => {
|
||||
Ok(ThrottleResult::Allowed) => Ok(RateLimitResult::AllowedIp(ip)),
|
||||
Ok(ThrottleResult::RetryAt(retry_at)) => {
|
||||
// TODO: set headers so they know when they can retry
|
||||
debug!(?rate_limiter_label, "rate limit exceeded"); // this is too verbose, but a stat might be good
|
||||
// TODO: use their id if possible
|
||||
return Ok(RateLimitResult::IpRateLimitExceeded(ip));
|
||||
// TODO: debug or trace?
|
||||
// this is too verbose, but a stat might be good
|
||||
trace!(
|
||||
?rate_limiter_label,
|
||||
"rate limit exceeded until {:?}",
|
||||
retry_at
|
||||
);
|
||||
Ok(RateLimitResult::RateLimitedIp(ip, Some(retry_at)))
|
||||
}
|
||||
Ok(ThrottleResult::RetryNever) => {
|
||||
// TODO: prettier error for the user
|
||||
return Err(anyhow::anyhow!("ip ({}) blocked by rate limiter", ip));
|
||||
// TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely
|
||||
debug!(?rate_limiter_label, "rate limit exceeded");
|
||||
Ok(RateLimitResult::RateLimitedIp(ip, None))
|
||||
}
|
||||
Err(err) => {
|
||||
// internal error, not rate limit being hit
|
||||
// TODO: i really want axum to do this for us in a single place.
|
||||
error!(?err, "redis is unhappy. allowing ip");
|
||||
return Ok(RateLimitResult::AllowedIp(ip));
|
||||
error!(?err, "rate limiter is unhappy. allowing ip");
|
||||
Ok(RateLimitResult::AllowedIp(ip))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right
|
||||
todo!("no rate limiter");
|
||||
}
|
||||
|
||||
Ok(RateLimitResult::AllowedIp(ip))
|
||||
}
|
||||
|
||||
pub(crate) async fn cache_user_data(&self, user_key: Uuid) -> anyhow::Result<UserCacheValue> {
|
||||
@ -234,13 +169,13 @@ impl Web3ProxyApp {
|
||||
}
|
||||
|
||||
// user key is valid. now check rate limits
|
||||
// TODO: this is throwing errors when curve-api hits us with high concurrency. investigate
|
||||
// TODO: this is throwing errors when curve-api hits us with high concurrency. investigate i think its bb8's fault
|
||||
if false {
|
||||
if let Some(rate_limiter) = &self.frontend_rate_limiter {
|
||||
// TODO: query redis in the background so that users don't have to wait on this network request
|
||||
// TODO: better key? have a prefix so its easy to delete all of these
|
||||
// TODO: we should probably hash this or something
|
||||
let rate_limiter_label = user_key.to_string();
|
||||
let rate_limiter_label = format!("user-{}", user_key);
|
||||
|
||||
match rate_limiter
|
||||
.throttle_label(
|
||||
@ -250,38 +185,41 @@ impl Web3ProxyApp {
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(ThrottleResult::Allowed) => {}
|
||||
Ok(ThrottleResult::Allowed) => {
|
||||
Ok(RateLimitResult::AllowedUser(user_data.user_id))
|
||||
}
|
||||
Ok(ThrottleResult::RetryAt(retry_at)) => {
|
||||
// TODO: set headers so they know when they can retry or maybe tarpit them? if they are barely over?
|
||||
debug!(?rate_limiter_label, "user rate limit exceeded"); // this is too verbose, but a stat might be good
|
||||
// TODO: use their id if possible
|
||||
return Ok(RateLimitResult::UserRateLimitExceeded(user_data.user_id));
|
||||
// TODO: set headers so they know when they can retry
|
||||
// TODO: debug or trace?
|
||||
// this is too verbose, but a stat might be good
|
||||
trace!(
|
||||
?rate_limiter_label,
|
||||
"rate limit exceeded until {:?}",
|
||||
retry_at
|
||||
);
|
||||
Ok(RateLimitResult::RateLimitedUser(
|
||||
user_data.user_id,
|
||||
Some(retry_at),
|
||||
))
|
||||
}
|
||||
Ok(ThrottleResult::RetryNever) => {
|
||||
// TODO: prettier error for the user
|
||||
return Err(anyhow::anyhow!(
|
||||
"user #{} blocked by rate limiter",
|
||||
user_data.user_id
|
||||
));
|
||||
// TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely
|
||||
debug!(?rate_limiter_label, "rate limit exceeded");
|
||||
Ok(RateLimitResult::RateLimitedUser(user_data.user_id, None))
|
||||
}
|
||||
Err(err) => {
|
||||
// internal error, not rate limit being hit
|
||||
// rather than have downtime, i think its better to just use in-process rate limiting
|
||||
// TODO: in-process rate limits that pipe into redis
|
||||
error!(?err, "redis is unhappy. allowing ip");
|
||||
return Ok(RateLimitResult::AllowedUser(user_data.user_id));
|
||||
} // // TODO: set headers so they know when they can retry
|
||||
// // warn!(?ip, "public rate limit exceeded"); // this is too verbose, but a stat might be good
|
||||
// // TODO: use their id if possible
|
||||
// // TODO: StatusCode::TOO_MANY_REQUESTS
|
||||
// return Err(anyhow::anyhow!("too many requests from this key"));
|
||||
// TODO: i really want axum to do this for us in a single place.
|
||||
error!(?err, "rate limiter is unhappy. allowing ip");
|
||||
Ok(RateLimitResult::AllowedUser(user_data.user_id))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// TODO: if no redis, rate limit with a local cache?
|
||||
todo!("no redis. cannot rate limit")
|
||||
}
|
||||
} else {
|
||||
Ok(RateLimitResult::AllowedUser(user_data.user_id))
|
||||
}
|
||||
|
||||
Ok(RateLimitResult::AllowedUser(user_data.user_id))
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,5 @@
|
||||
use super::errors::FrontendResult;
|
||||
use super::rate_limit::{rate_limit_by_ip, rate_limit_by_user_key};
|
||||
use super::rate_limit::{rate_limit_by_ip, rate_limit_by_key};
|
||||
use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum};
|
||||
use axum::extract::{Host, Path};
|
||||
use axum::headers::{Referer, UserAgent};
|
||||
@ -48,7 +48,7 @@ pub async fn user_proxy_web3_rpc(
|
||||
) -> FrontendResult {
|
||||
let request_span = debug_span!("request", host, ?referer, ?user_agent);
|
||||
|
||||
let user_id: u64 = rate_limit_by_user_key(&app, user_key)
|
||||
let user_id = rate_limit_by_key(&app, user_key)
|
||||
.instrument(request_span.clone())
|
||||
.await?;
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
use super::errors::FrontendResult;
|
||||
use super::rate_limit::{rate_limit_by_ip, rate_limit_by_user_key};
|
||||
use super::rate_limit::{rate_limit_by_ip, rate_limit_by_key};
|
||||
use axum::{
|
||||
extract::ws::{Message, WebSocket, WebSocketUpgrade},
|
||||
extract::Path,
|
||||
@ -55,7 +55,7 @@ pub async fn user_websocket_handler(
|
||||
Path(user_key): Path<Uuid>,
|
||||
ws_upgrade: Option<WebSocketUpgrade>,
|
||||
) -> FrontendResult {
|
||||
let user_id: u64 = rate_limit_by_user_key(&app, user_key).await?;
|
||||
let user_id: u64 = rate_limit_by_key(&app, user_key).await?;
|
||||
|
||||
// log the id, not the address. we don't want to expose the user's address
|
||||
// TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses
|
||||
@ -162,7 +162,7 @@ async fn handle_socket_payload(
|
||||
Ok(x) => serde_json::to_string(&x),
|
||||
Err(err) => {
|
||||
// we have an anyhow error. turn it into
|
||||
let response = JsonRpcForwardedResponse::from_anyhow_error(err, id);
|
||||
let response = JsonRpcForwardedResponse::from_anyhow_error(err, None, Some(id));
|
||||
serde_json::to_string(&response)
|
||||
}
|
||||
}
|
||||
|
@ -4,7 +4,6 @@ use serde::de::{self, Deserialize, Deserializer, MapAccess, SeqAccess, Visitor};
|
||||
use serde::Serialize;
|
||||
use serde_json::value::RawValue;
|
||||
use std::fmt;
|
||||
use tracing::warn;
|
||||
|
||||
// this is used by serde
|
||||
#[allow(dead_code)]
|
||||
@ -184,21 +183,32 @@ impl fmt::Debug for JsonRpcForwardedResponse {
|
||||
}
|
||||
|
||||
impl JsonRpcForwardedResponse {
|
||||
pub fn from_anyhow_error(err: anyhow::Error, id: Box<RawValue>) -> Self {
|
||||
pub fn from_anyhow_error(
|
||||
err: anyhow::Error,
|
||||
code: Option<i64>,
|
||||
id: Option<Box<RawValue>>,
|
||||
) -> Self {
|
||||
let message = format!("{:?}", err);
|
||||
|
||||
Self::from_string(message, code, id)
|
||||
}
|
||||
|
||||
pub fn from_str(message: &str, code: Option<i64>, id: Option<Box<RawValue>>) -> Self {
|
||||
Self::from_string(message.to_string(), code, id)
|
||||
}
|
||||
|
||||
pub fn from_string(message: String, code: Option<i64>, id: Option<Box<RawValue>>) -> Self {
|
||||
// TODO: this is too verbose. plenty of errors are valid, like users giving an invalid address. no need to log that
|
||||
// TODO: can we somehow get the initial request here? if we put that into a tracing span, will things slow down a ton?
|
||||
warn!(?err, "forwarding error");
|
||||
|
||||
JsonRpcForwardedResponse {
|
||||
jsonrpc: "2.0".to_string(),
|
||||
id,
|
||||
id: id.unwrap_or_else(|| {
|
||||
RawValue::from_string("null".to_string()).expect("null id should always work")
|
||||
}),
|
||||
result: None,
|
||||
error: Some(JsonRpcErrorData {
|
||||
// TODO: set this jsonrpc error code to match the http status code? or maybe the other way around? maybe take it as an arg
|
||||
code: -32099,
|
||||
// TODO: some errors should be included here. others should not. i think anyhow might not be the right choice
|
||||
// message: "internal server error".to_string(),
|
||||
message: format!("{:?}", err),
|
||||
code: code.unwrap_or(-32099),
|
||||
message,
|
||||
data: None,
|
||||
}),
|
||||
}
|
||||
|
@ -717,24 +717,21 @@ impl Web3Connection {
|
||||
|
||||
// check rate limits
|
||||
if let Some(ratelimiter) = self.hard_limit.as_ref() {
|
||||
match ratelimiter.throttle().await {
|
||||
Ok(ThrottleResult::Allowed) => {
|
||||
match ratelimiter.throttle().await? {
|
||||
ThrottleResult::Allowed => {
|
||||
trace!("rate limit succeeded")
|
||||
}
|
||||
Ok(ThrottleResult::RetryAt(retry_at)) => {
|
||||
ThrottleResult::RetryAt(retry_at) => {
|
||||
// rate limit failed
|
||||
// save the smallest retry_after. if nothing succeeds, return an Err with retry_after in it
|
||||
// TODO: use tracing better
|
||||
// TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0?
|
||||
warn!(?retry_at, rpc=%self, "Exhausted rate limit");
|
||||
|
||||
return Ok(OpenRequestResult::RetryAt(retry_at.into()));
|
||||
return Ok(OpenRequestResult::RetryAt(retry_at));
|
||||
}
|
||||
Ok(ThrottleResult::RetryNever) => {
|
||||
return Err(anyhow::anyhow!("Rate limit failed."));
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(err);
|
||||
ThrottleResult::RetryNever => {
|
||||
return Ok(OpenRequestResult::RetryNever);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -8,6 +8,7 @@ use metered::ResponseTime;
|
||||
use metered::Throughput;
|
||||
use std::fmt;
|
||||
use std::sync::atomic;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Arc;
|
||||
use tokio::time::{sleep, Duration, Instant};
|
||||
use tracing::warn;
|
||||
@ -28,6 +29,7 @@ pub struct OpenRequestHandle {
|
||||
conn: Arc<Web3Connection>,
|
||||
// TODO: this is the same metrics on the conn. use a reference
|
||||
metrics: Arc<OpenRequestHandleMetrics>,
|
||||
decremented: AtomicBool,
|
||||
}
|
||||
|
||||
#[metered(registry = OpenRequestHandleMetrics, visibility = pub)]
|
||||
@ -45,7 +47,13 @@ impl OpenRequestHandle {
|
||||
|
||||
let metrics = conn.open_request_handle_metrics.clone();
|
||||
|
||||
Self { conn, metrics }
|
||||
let decremented = false.into();
|
||||
|
||||
Self {
|
||||
conn,
|
||||
metrics,
|
||||
decremented,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn clone_connection(&self) -> Arc<Web3Connection> {
|
||||
@ -54,7 +62,8 @@ impl OpenRequestHandle {
|
||||
|
||||
/// Send a web3 request
|
||||
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
|
||||
/// By taking self here, we ensure that this is dropped after the request is complete
|
||||
/// By taking self here, we ensure that this is dropped after the request is complete.
|
||||
/// TODO: we no longer take self because metered doesn't like that
|
||||
#[instrument(skip_all)]
|
||||
#[measure([ErrorCount, HitCount, InFlight, ResponseTime, Throughput])]
|
||||
pub async fn request<T, R>(
|
||||
@ -91,9 +100,19 @@ impl OpenRequestHandle {
|
||||
};
|
||||
|
||||
// TODO: i think ethers already has trace logging (and does it much more fancy)
|
||||
// TODO: at least instrument this with more useful information
|
||||
// trace!(rpc=%self.0, %method, ?response);
|
||||
trace!(rpc=%self.conn, %method, "response");
|
||||
if let Err(err) = &response {
|
||||
warn!(?err, %method, rpc=%self.conn, "response");
|
||||
} else {
|
||||
// trace!(rpc=%self.0, %method, ?response);
|
||||
trace!(%method, rpc=%self.conn, "response");
|
||||
}
|
||||
|
||||
self.decremented.store(true, atomic::Ordering::Release);
|
||||
self.conn
|
||||
.active_requests
|
||||
.fetch_sub(1, atomic::Ordering::AcqRel);
|
||||
|
||||
// todo: do something to make sure this doesn't get called again? i miss having the function sig have self
|
||||
|
||||
response
|
||||
}
|
||||
@ -101,6 +120,11 @@ impl OpenRequestHandle {
|
||||
|
||||
impl Drop for OpenRequestHandle {
|
||||
fn drop(&mut self) {
|
||||
if self.decremented.load(atomic::Ordering::Acquire) {
|
||||
// we already decremented from a successful request
|
||||
return;
|
||||
}
|
||||
|
||||
self.conn
|
||||
.active_requests
|
||||
.fetch_sub(1, atomic::Ordering::AcqRel);
|
||||
|
Loading…
Reference in New Issue
Block a user