diff --git a/deferred-rate-limiter/src/lib.rs b/deferred-rate-limiter/src/lib.rs index 0754f85e..7d001446 100644 --- a/deferred-rate-limiter/src/lib.rs +++ b/deferred-rate-limiter/src/lib.rs @@ -79,7 +79,7 @@ where .get_with(key, async move { // we do not use the try operator here because we want to be okay with redis errors let redis_count = match rrl - .throttle_label(&redis_key, Some(max_per_period), count) + .throttle_label(&redis_key, Some(max_per_period), count, true) .await { Ok(RedisRateLimitResult::Allowed(count)) => { @@ -149,7 +149,7 @@ where let rrl = self.rrl.clone(); async move { match rrl - .throttle_label(&redis_key, Some(max_per_period), count) + .throttle_label(&redis_key, Some(max_per_period), count, false) .await { Ok(RedisRateLimitResult::Allowed(count)) => { diff --git a/redis-rate-limiter/src/lib.rs b/redis-rate-limiter/src/lib.rs index d362e65f..161f6ed3 100644 --- a/redis-rate-limiter/src/lib.rs +++ b/redis-rate-limiter/src/lib.rs @@ -1,5 +1,6 @@ //#![warn(missing_docs)] use anyhow::Context; +use deadpool_redis::redis::AsyncCommands; use std::ops::Add; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::time::{Duration, Instant}; @@ -71,6 +72,7 @@ impl RedisRateLimiter { label: &str, max_per_period: Option, count: u64, + expire: bool, ) -> anyhow::Result { let max_per_period = max_per_period.unwrap_or(self.max_requests_per_period); @@ -92,23 +94,32 @@ impl RedisRateLimiter { .await .context("get redis connection for rate limits")?; - // TODO: at high concurency, i think this is giving errors - // TODO: i'm starting to think that bb8 has a bug - let x: Vec = redis::pipe() - // we could get the key first, but that means an extra redis call for every check. this seems better - .incr(&throttle_key, count) - // set expiration each time we set the key. ignore the result - .expire(&throttle_key, self.period as usize) - // TODO: NX will make it only set the expiration the first time. works in redis, but not elasticache - // .arg("NX") - .ignore() - // do the query - .query_async(&mut *conn) - .await - .context("increment rate limit")?; + // TODO: at high concurency, this gives "connection reset by peer" errors. at least they are off the hot path + // TODO: only set expire if this is a new key + let new_count: u64 = if expire { + trace!("redis incr+expire"); + // TODO: automatic retry + let x: Vec<_> = redis::pipe() + .atomic() + // we could get the key first, but that means an extra redis call for every check. this seems better + .incr(&throttle_key, count) + // set expiration each time we set the key. ignore the result + .expire(&throttle_key, 1 + self.period as usize) + // TODO: NX will make it only set the expiration the first time. works in redis, but not elasticache + // .arg("NX") + .ignore() + // do the query + .query_async(&mut *conn) + .await + .context("increment rate limit and set expiration")?; - // TODO: is there a better way to do this? - let new_count = *x.first().context("check rate limit result")?; + *x.first().expect("check redis") + } else { + trace!("redis incr only"); + conn.incr(&throttle_key, count) + .await + .context("increment rate limit")? + }; if new_count > max_per_period { // TODO: this might actually be early if we are way over the count @@ -124,7 +135,7 @@ impl RedisRateLimiter { } #[inline] - pub async fn throttle(&self) -> anyhow::Result { - self.throttle_label("", None, 1).await + pub async fn throttle(&self, expire: bool) -> anyhow::Result { + self.throttle_label("", None, 1, expire).await } } diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 59b6bc4d..1381e48a 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -220,7 +220,6 @@ impl Web3ProxyApp { .unwrap_or(num_workers * 2); // TODO: what are reasonable timeouts? - // TODO: set a wait timeout? maybe somehow just emit a warning if this is long let redis_pool = RedisConfig::from_url(redis_url) .builder()? .max_size(redis_max_connections) diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index 64940265..9524739f 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -18,7 +18,7 @@ use tracing::{debug, info}; use tracing_subscriber::EnvFilter; use web3_proxy::app::{flatten_handle, Web3ProxyApp}; use web3_proxy::config::{CliConfig, TopConfig}; -use web3_proxy::{frontend, metrics}; +use web3_proxy::{frontend, metrics_frontend}; fn run( shutdown_receiver: flume::Receiver<()>, @@ -75,7 +75,7 @@ fn run( let frontend_handle = tokio::spawn(frontend::serve(app_frontend_port, app.clone())); - let prometheus_handle = tokio::spawn(metrics::serve(app, app_prometheus_port)); + let prometheus_handle = tokio::spawn(metrics_frontend::serve(app, app_prometheus_port)); // if everything is working, these should both run forever // TODO: try_join these instead? use signal_shutdown here? diff --git a/web3_proxy/src/frontend/rate_limit.rs b/web3_proxy/src/frontend/rate_limit.rs index edf97d97..6c4622cf 100644 --- a/web3_proxy/src/frontend/rate_limit.rs +++ b/web3_proxy/src/frontend/rate_limit.rs @@ -83,12 +83,15 @@ impl Web3ProxyApp { } } + // check the local cache for user data, or query the database pub(crate) async fn user_data(&self, user_key: Uuid) -> anyhow::Result { let db = self.db_conn.as_ref().context("no database")?; let user_data: Result<_, Arc> = self .user_cache .try_get_with(user_key, async move { + trace!(?user_key, "user_cache miss"); + /// helper enum for querying just a few columns instead of the entire table #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] enum QueryAs { @@ -130,7 +133,6 @@ impl Web3ProxyApp { } pub async fn rate_limit_by_key(&self, user_key: Uuid) -> anyhow::Result { - // check the local cache fo user data to save a database query let user_data = self.user_data(user_key).await?; if user_data.user_id == 0 { diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index a2da1712..8a0f6a93 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -3,6 +3,7 @@ pub mod block_number; pub mod config; pub mod frontend; pub mod jsonrpc; -pub mod metrics; +pub mod metered; +pub mod metrics_frontend; pub mod rpcs; pub mod users; diff --git a/web3_proxy/src/metrics.rs b/web3_proxy/src/metrics.rs deleted file mode 100644 index fbf01786..00000000 --- a/web3_proxy/src/metrics.rs +++ /dev/null @@ -1,58 +0,0 @@ -use axum::headers::HeaderName; -use axum::http::HeaderValue; -use axum::response::{IntoResponse, Response}; -use axum::{routing::get, Extension, Router}; -use std::net::SocketAddr; -use std::sync::Arc; -use tracing::{info, instrument}; - -use crate::app::Web3ProxyApp; - -/// Run a prometheus metrics server on the given port. -#[instrument(skip_all)] -pub async fn serve(app: Arc, port: u16) -> anyhow::Result<()> { - // build our application with a route - // order most to least common - // TODO: 404 any unhandled routes? - let app = Router::new().route("/", get(root)).layer(Extension(app)); - - // run our app with hyper - // TODO: allow only listening on localhost? - let addr = SocketAddr::from(([0, 0, 0, 0], port)); - info!("prometheus listening on port {}", port); - // TODO: into_make_service is enough if we always run behind a proxy. make into_make_service_with_connect_info optional? - - /* - It sequentially looks for an IP in: - - x-forwarded-for header (de-facto standard) - - x-real-ip header - - forwarded header (new standard) - - axum::extract::ConnectInfo (if not behind proxy) - - So we probably won't need into_make_service_with_connect_info, but it shouldn't hurt - */ - let service = app.into_make_service_with_connect_info::(); - // let service = app.into_make_service(); - - // `axum::Server` is a re-export of `hyper::Server` - axum::Server::bind(&addr) - // TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not - .serve(service) - .await - .map_err(Into::into) -} - -#[instrument(skip_all)] -async fn root(Extension(app): Extension>) -> Response { - let serialized = app.prometheus_metrics(); - - let mut r = serialized.into_response(); - - // // TODO: is there an easier way to do this? - r.headers_mut().insert( - HeaderName::from_static("content-type"), - HeaderValue::from_static("application/openmetrics-text; version=1.0.0; charset=utf-8"), - ); - - r -} diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index aa1cfc76..de3e7228 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -766,7 +766,8 @@ impl Web3Connection { // check rate limits if let Some(ratelimiter) = self.hard_limit.as_ref() { - match ratelimiter.throttle().await? { + // TODO: how should we know if we should set expire or not? + match ratelimiter.throttle(true).await? { RedisRateLimitResult::Allowed(_) => { trace!("rate limit succeeded") } diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 8587b741..6e2c1da4 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -1,8 +1,8 @@ use super::connection::Web3Connection; use super::provider::Web3Provider; +use crate::metered::{JsonRpcErrorCount, ProviderErrorCount}; use ethers::providers::ProviderError; use metered::metered; -use metered::ErrorCount; use metered::HitCount; use metered::ResponseTime; use metered::Throughput; @@ -65,7 +65,7 @@ impl OpenRequestHandle { /// TODO: we no longer take self because metered doesn't like that /// TODO: ErrorCount includes too many types of errors, such as transaction reverts #[instrument(skip_all)] - #[measure([ErrorCount, HitCount, ResponseTime, Throughput])] + #[measure([JsonRpcErrorCount, HitCount, ProviderErrorCount, ResponseTime, Throughput])] pub async fn request( &self, method: &str,