atomic and less expires

This commit is contained in:
Bryan Stitt 2022-09-20 06:56:24 +00:00
parent 739947792a
commit abe5001792
9 changed files with 42 additions and 86 deletions

View File

@ -79,7 +79,7 @@ where
.get_with(key, async move { .get_with(key, async move {
// we do not use the try operator here because we want to be okay with redis errors // we do not use the try operator here because we want to be okay with redis errors
let redis_count = match rrl let redis_count = match rrl
.throttle_label(&redis_key, Some(max_per_period), count) .throttle_label(&redis_key, Some(max_per_period), count, true)
.await .await
{ {
Ok(RedisRateLimitResult::Allowed(count)) => { Ok(RedisRateLimitResult::Allowed(count)) => {
@ -149,7 +149,7 @@ where
let rrl = self.rrl.clone(); let rrl = self.rrl.clone();
async move { async move {
match rrl match rrl
.throttle_label(&redis_key, Some(max_per_period), count) .throttle_label(&redis_key, Some(max_per_period), count, false)
.await .await
{ {
Ok(RedisRateLimitResult::Allowed(count)) => { Ok(RedisRateLimitResult::Allowed(count)) => {

View File

@ -1,5 +1,6 @@
//#![warn(missing_docs)] //#![warn(missing_docs)]
use anyhow::Context; use anyhow::Context;
use deadpool_redis::redis::AsyncCommands;
use std::ops::Add; use std::ops::Add;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::{Duration, Instant}; use tokio::time::{Duration, Instant};
@ -71,6 +72,7 @@ impl RedisRateLimiter {
label: &str, label: &str,
max_per_period: Option<u64>, max_per_period: Option<u64>,
count: u64, count: u64,
expire: bool,
) -> anyhow::Result<RedisRateLimitResult> { ) -> anyhow::Result<RedisRateLimitResult> {
let max_per_period = max_per_period.unwrap_or(self.max_requests_per_period); let max_per_period = max_per_period.unwrap_or(self.max_requests_per_period);
@ -92,23 +94,32 @@ impl RedisRateLimiter {
.await .await
.context("get redis connection for rate limits")?; .context("get redis connection for rate limits")?;
// TODO: at high concurency, i think this is giving errors // TODO: at high concurency, this gives "connection reset by peer" errors. at least they are off the hot path
// TODO: i'm starting to think that bb8 has a bug // TODO: only set expire if this is a new key
let x: Vec<u64> = redis::pipe() let new_count: u64 = if expire {
// we could get the key first, but that means an extra redis call for every check. this seems better trace!("redis incr+expire");
.incr(&throttle_key, count) // TODO: automatic retry
// set expiration each time we set the key. ignore the result let x: Vec<_> = redis::pipe()
.expire(&throttle_key, self.period as usize) .atomic()
// TODO: NX will make it only set the expiration the first time. works in redis, but not elasticache // we could get the key first, but that means an extra redis call for every check. this seems better
// .arg("NX") .incr(&throttle_key, count)
.ignore() // set expiration each time we set the key. ignore the result
// do the query .expire(&throttle_key, 1 + self.period as usize)
.query_async(&mut *conn) // TODO: NX will make it only set the expiration the first time. works in redis, but not elasticache
.await // .arg("NX")
.context("increment rate limit")?; .ignore()
// do the query
.query_async(&mut *conn)
.await
.context("increment rate limit and set expiration")?;
// TODO: is there a better way to do this? *x.first().expect("check redis")
let new_count = *x.first().context("check rate limit result")?; } else {
trace!("redis incr only");
conn.incr(&throttle_key, count)
.await
.context("increment rate limit")?
};
if new_count > max_per_period { if new_count > max_per_period {
// TODO: this might actually be early if we are way over the count // TODO: this might actually be early if we are way over the count
@ -124,7 +135,7 @@ impl RedisRateLimiter {
} }
#[inline] #[inline]
pub async fn throttle(&self) -> anyhow::Result<RedisRateLimitResult> { pub async fn throttle(&self, expire: bool) -> anyhow::Result<RedisRateLimitResult> {
self.throttle_label("", None, 1).await self.throttle_label("", None, 1, expire).await
} }
} }

View File

@ -220,7 +220,6 @@ impl Web3ProxyApp {
.unwrap_or(num_workers * 2); .unwrap_or(num_workers * 2);
// TODO: what are reasonable timeouts? // TODO: what are reasonable timeouts?
// TODO: set a wait timeout? maybe somehow just emit a warning if this is long
let redis_pool = RedisConfig::from_url(redis_url) let redis_pool = RedisConfig::from_url(redis_url)
.builder()? .builder()?
.max_size(redis_max_connections) .max_size(redis_max_connections)

View File

@ -18,7 +18,7 @@ use tracing::{debug, info};
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
use web3_proxy::app::{flatten_handle, Web3ProxyApp}; use web3_proxy::app::{flatten_handle, Web3ProxyApp};
use web3_proxy::config::{CliConfig, TopConfig}; use web3_proxy::config::{CliConfig, TopConfig};
use web3_proxy::{frontend, metrics}; use web3_proxy::{frontend, metrics_frontend};
fn run( fn run(
shutdown_receiver: flume::Receiver<()>, shutdown_receiver: flume::Receiver<()>,
@ -75,7 +75,7 @@ fn run(
let frontend_handle = tokio::spawn(frontend::serve(app_frontend_port, app.clone())); let frontend_handle = tokio::spawn(frontend::serve(app_frontend_port, app.clone()));
let prometheus_handle = tokio::spawn(metrics::serve(app, app_prometheus_port)); let prometheus_handle = tokio::spawn(metrics_frontend::serve(app, app_prometheus_port));
// if everything is working, these should both run forever // if everything is working, these should both run forever
// TODO: try_join these instead? use signal_shutdown here? // TODO: try_join these instead? use signal_shutdown here?

View File

@ -83,12 +83,15 @@ impl Web3ProxyApp {
} }
} }
// check the local cache for user data, or query the database
pub(crate) async fn user_data(&self, user_key: Uuid) -> anyhow::Result<UserCacheValue> { pub(crate) async fn user_data(&self, user_key: Uuid) -> anyhow::Result<UserCacheValue> {
let db = self.db_conn.as_ref().context("no database")?; let db = self.db_conn.as_ref().context("no database")?;
let user_data: Result<_, Arc<anyhow::Error>> = self let user_data: Result<_, Arc<anyhow::Error>> = self
.user_cache .user_cache
.try_get_with(user_key, async move { .try_get_with(user_key, async move {
trace!(?user_key, "user_cache miss");
/// helper enum for querying just a few columns instead of the entire table /// helper enum for querying just a few columns instead of the entire table
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
enum QueryAs { enum QueryAs {
@ -130,7 +133,6 @@ impl Web3ProxyApp {
} }
pub async fn rate_limit_by_key(&self, user_key: Uuid) -> anyhow::Result<RateLimitResult> { pub async fn rate_limit_by_key(&self, user_key: Uuid) -> anyhow::Result<RateLimitResult> {
// check the local cache fo user data to save a database query
let user_data = self.user_data(user_key).await?; let user_data = self.user_data(user_key).await?;
if user_data.user_id == 0 { if user_data.user_id == 0 {

View File

@ -3,6 +3,7 @@ pub mod block_number;
pub mod config; pub mod config;
pub mod frontend; pub mod frontend;
pub mod jsonrpc; pub mod jsonrpc;
pub mod metrics; pub mod metered;
pub mod metrics_frontend;
pub mod rpcs; pub mod rpcs;
pub mod users; pub mod users;

View File

@ -1,58 +0,0 @@
use axum::headers::HeaderName;
use axum::http::HeaderValue;
use axum::response::{IntoResponse, Response};
use axum::{routing::get, Extension, Router};
use std::net::SocketAddr;
use std::sync::Arc;
use tracing::{info, instrument};
use crate::app::Web3ProxyApp;
/// Run a prometheus metrics server on the given port.
#[instrument(skip_all)]
pub async fn serve(app: Arc<Web3ProxyApp>, port: u16) -> anyhow::Result<()> {
// build our application with a route
// order most to least common
// TODO: 404 any unhandled routes?
let app = Router::new().route("/", get(root)).layer(Extension(app));
// run our app with hyper
// TODO: allow only listening on localhost?
let addr = SocketAddr::from(([0, 0, 0, 0], port));
info!("prometheus listening on port {}", port);
// TODO: into_make_service is enough if we always run behind a proxy. make into_make_service_with_connect_info optional?
/*
It sequentially looks for an IP in:
- x-forwarded-for header (de-facto standard)
- x-real-ip header
- forwarded header (new standard)
- axum::extract::ConnectInfo (if not behind proxy)
So we probably won't need into_make_service_with_connect_info, but it shouldn't hurt
*/
let service = app.into_make_service_with_connect_info::<SocketAddr>();
// let service = app.into_make_service();
// `axum::Server` is a re-export of `hyper::Server`
axum::Server::bind(&addr)
// TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not
.serve(service)
.await
.map_err(Into::into)
}
#[instrument(skip_all)]
async fn root(Extension(app): Extension<Arc<Web3ProxyApp>>) -> Response {
let serialized = app.prometheus_metrics();
let mut r = serialized.into_response();
// // TODO: is there an easier way to do this?
r.headers_mut().insert(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/openmetrics-text; version=1.0.0; charset=utf-8"),
);
r
}

View File

@ -766,7 +766,8 @@ impl Web3Connection {
// check rate limits // check rate limits
if let Some(ratelimiter) = self.hard_limit.as_ref() { if let Some(ratelimiter) = self.hard_limit.as_ref() {
match ratelimiter.throttle().await? { // TODO: how should we know if we should set expire or not?
match ratelimiter.throttle(true).await? {
RedisRateLimitResult::Allowed(_) => { RedisRateLimitResult::Allowed(_) => {
trace!("rate limit succeeded") trace!("rate limit succeeded")
} }

View File

@ -1,8 +1,8 @@
use super::connection::Web3Connection; use super::connection::Web3Connection;
use super::provider::Web3Provider; use super::provider::Web3Provider;
use crate::metered::{JsonRpcErrorCount, ProviderErrorCount};
use ethers::providers::ProviderError; use ethers::providers::ProviderError;
use metered::metered; use metered::metered;
use metered::ErrorCount;
use metered::HitCount; use metered::HitCount;
use metered::ResponseTime; use metered::ResponseTime;
use metered::Throughput; use metered::Throughput;
@ -65,7 +65,7 @@ impl OpenRequestHandle {
/// TODO: we no longer take self because metered doesn't like that /// TODO: we no longer take self because metered doesn't like that
/// TODO: ErrorCount includes too many types of errors, such as transaction reverts /// TODO: ErrorCount includes too many types of errors, such as transaction reverts
#[instrument(skip_all)] #[instrument(skip_all)]
#[measure([ErrorCount, HitCount, ResponseTime, Throughput])] #[measure([JsonRpcErrorCount, HitCount, ProviderErrorCount, ResponseTime, Throughput])]
pub async fn request<T, R>( pub async fn request<T, R>(
&self, &self,
method: &str, method: &str,