add per-user rpc accounting

This commit is contained in:
Bryan Stitt 2022-10-10 04:15:07 +00:00
parent ecd3dc29fc
commit 25aa68a5bf
24 changed files with 461 additions and 214 deletions

28
Cargo.lock generated

@ -1428,11 +1428,11 @@ dependencies = [
[[package]]
name = "entities"
version = "0.3.0"
version = "0.4.0"
dependencies = [
"sea-orm",
"serde",
"uuid 1.1.2",
"uuid 1.2.1",
]
[[package]]
@ -2742,10 +2742,10 @@ dependencies = [
[[package]]
name = "migration"
version = "0.3.0"
version = "0.4.0"
dependencies = [
"async-std",
"sea-orm-migration",
"tokio",
]
[[package]]
@ -2802,7 +2802,7 @@ dependencies = [
"tagptr",
"thiserror",
"triomphe",
"uuid 1.1.2",
"uuid 1.2.1",
]
[[package]]
@ -3430,7 +3430,7 @@ dependencies = [
"serde",
"serde_json",
"time 0.3.15",
"uuid 1.1.2",
"uuid 1.2.1",
]
[[package]]
@ -4076,7 +4076,7 @@ dependencies = [
"time 0.3.15",
"tracing",
"url",
"uuid 1.1.2",
"uuid 1.2.1",
]
[[package]]
@ -4138,7 +4138,7 @@ dependencies = [
"sea-query-driver",
"serde_json",
"time 0.3.15",
"uuid 1.1.2",
"uuid 1.2.1",
]
[[package]]
@ -4271,9 +4271,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.85"
version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e55a28e3aaef9d5ce0506d0a14dbba8054ddc7e499ef522dd8b26859ec9d4a44"
checksum = "41feea4228a6f1cd09ec7a3593a682276702cd67b5273544757dae23c096f074"
dependencies = [
"itoa 1.0.2",
"ryu",
@ -4605,7 +4605,7 @@ dependencies = [
"time 0.3.15",
"tokio-stream",
"url",
"uuid 1.1.2",
"uuid 1.2.1",
"webpki-roots",
]
@ -5331,9 +5331,9 @@ dependencies = [
[[package]]
name = "uuid"
version = "1.1.2"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f"
checksum = "feb41e78f93363bb2df8b0e86a2ca30eed7806ea16ea0c790d757cf93f79be83"
dependencies = [
"getrandom",
"serde",
@ -5545,7 +5545,7 @@ dependencies = [
"tracing-subscriber",
"ulid 1.0.0",
"url",
"uuid 1.1.2",
"uuid 1.2.1",
]
[[package]]

18
TODO.md

@ -174,12 +174,15 @@ These are roughly in order of completition
- [x] get to /, when not serving a websocket, should have a simple welcome page. maybe with a button to update your wallet.
- [x] instead of giving a rate limit error code, delay the connection's response at the start. reject if incoming requests is super high?
- [x] did this by checking a key/ip-specific semaphore before checking rate limits
- [x] emit stat on cache hit
- [x] emit stat on cache miss
- [ ] add grafana to dev docker-compose so we can browse stats
- [ ] emit stat on retry
- [ ] emit stat on no servers synced
- [ ] emit stat on error (maybe just use sentry, but graphs are handy)
- [x] emit user stat on cache hit
- [x] emit user stat on cache miss
- [x] have migration use tokio instead of async-std
- [x] user create script should allow a description field
- [-] change stats to using the database
- [ ] emit user stat on retry
- [ ] emit global stat on retry
- [ ] emit global stat on no servers synced
- [ ] emit global stat on error (maybe just use sentry, but graphs are handy)
- if we wait until the error handler to emit the stat, i don't think we have access to the authorized_request
- [ ] display requests per second per api key (only with authentication!)
- [ ] display concurrent requests per api key (only with authentication!)
@ -194,12 +197,13 @@ These are roughly in order of completition
- [ ] BUG: i think if all backend servers stop, the server doesn't properly reconnect. It appears to stop listening on 8854, but not shut down.
- [ ] option to rotate api key
- [ ] if no bearer token found in redis (likely because it expired), send 401 unauthorized
- [ ] user create script should allow a description field
- [ ] user create script should allow multiple keys per user
- [ ] somehow the proxy thought latest was hours behind. need internal health check that forces reconnect if this happens
- [ ] display logged reverts on an endpoint that requires authentication
- [ ] failsafe. if no blocks or transactions in some time, warn and reset the connection
- [ ] have a log all option? instead of just reverts, log all request/responses? can be very useful for debugging
- [ ] WARN http_request: web3_proxy::frontend::errors: anyhow err=UserKey was not a ULID or UUID id=01GER4VBTS0FDHEBR96D1JRDZF method=POST
- if invalid user id given, we give a 500. should be a different error code instead
## V1

@ -9,16 +9,13 @@ min_sum_soft_limit = 2000
min_synced_rpcs = 2
# TODO: how do we find the optimal redis_max_connections? too high actually ends up being slower
persistent_redis_max_connections = 300
persistent_redis_url = "redis://dev-predis:6379/"
volatile_redis_max_connections = 300
volatile_redis_url = "redis://dev-vredis:6379/"
redirect_public_url = "https://llamanodes.com/free-rpc-stats"
redirect_user_url = "https://llamanodes.com/user-rpc-stats/{{user_id}}"
public_rate_limit_per_minute = 0
frontend_rate_limit_per_minute = 0
# 1GB of cache
response_cache_max_bytes = 10000000000

@ -11,11 +11,6 @@ services:
#RUST_LOG: "info,web3_proxy=debug"
RUST_LOG: info
persistent_redis:
image: redis:6.0-alpine
command: [ "redis-server", "--save", "", "--appendonly", "no" ]
# be sure to mount /data!
volatile_redis:
image: redis:6.0-alpine
command: [ "redis-server", "--save", "60", "1" ]

@ -23,17 +23,6 @@ services:
volumes:
- ./data/dev_mysql:/var/lib/mysql
# persistent redis for storing user stats
# TODO: replace this with a real time series database
dev-predis:
extends:
file: docker-compose.common.yml
service: persistent_redis
ports:
- 16379:6379
volumes:
- ./data/dev_predis:/data
# volatile redis for storing rate limits
dev-vredis:
extends:

@ -1,6 +1,6 @@
[package]
name = "entities"
version = "0.3.0"
version = "0.4.0"
edition = "2021"
[lib]
@ -12,4 +12,4 @@ path = "src/mod.rs"
[dependencies]
sea-orm = "0.9.3"
serde = "1.0.145"
uuid = "1.1.2"
uuid = "1.2.1"

@ -3,6 +3,7 @@
pub mod prelude;
pub mod revert_logs;
pub mod rpc_accounting;
pub mod sea_orm_active_enums;
pub mod secondary_user;
pub mod user;

@ -1,6 +1,7 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.9.1
pub use super::revert_logs::Entity as RevertLogs;
pub use super::rpc_accounting::Entity as RpcAccounting;
pub use super::secondary_user::Entity as SecondaryUser;
pub use super::user::Entity as User;
pub use super::user_keys::Entity as UserKeys;

@ -0,0 +1,40 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.9.1
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "rpc_accounting")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: u64,
pub user_key_id: u64,
pub chain_id: u64,
pub timestamp: DateTimeUtc,
pub method: String,
pub backend_requests: u32,
pub error_response: i8,
pub query_millis: u32,
pub request_bytes: u32,
pub response_bytes: u32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::user_keys::Entity",
from = "Column::UserKeyId",
to = "super::user_keys::Column::Id",
on_update = "NoAction",
on_delete = "NoAction"
)]
UserKeys,
}
impl Related<super::user_keys::Entity> for Entity {
fn to() -> RelationDef {
Relation::UserKeys.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

@ -16,7 +16,6 @@ pub struct Model {
pub private_txs: bool,
pub active: bool,
pub requests_per_minute: Option<u64>,
pub max_concurrent_requests: Option<u64>,
#[sea_orm(column_type = "Decimal(Some((5, 4)))")]
pub log_revert_chance: Decimal,
#[sea_orm(column_type = "Text", nullable)]
@ -27,6 +26,7 @@ pub struct Model {
pub allowed_referers: Option<String>,
#[sea_orm(column_type = "Text", nullable)]
pub allowed_user_agents: Option<String>,
pub max_concurrent_requests: Option<u64>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
@ -41,6 +41,8 @@ pub enum Relation {
User,
#[sea_orm(has_many = "super::revert_logs::Entity")]
RevertLogs,
#[sea_orm(has_many = "super::rpc_accounting::Entity")]
RpcAccounting,
}
impl Related<super::user::Entity> for Entity {
@ -55,4 +57,10 @@ impl Related<super::revert_logs::Entity> for Entity {
}
}
impl Related<super::rpc_accounting::Entity> for Entity {
fn to() -> RelationDef {
Relation::RpcAccounting.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

@ -1,6 +1,6 @@
[package]
name = "migration"
version = "0.3.0"
version = "0.4.0"
edition = "2021"
publish = false
@ -9,7 +9,7 @@ name = "migration"
path = "src/lib.rs"
[dependencies]
async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }
tokio = { version = "1.21.2", features = ["full", "tracing"] }
[dependencies.sea-orm-migration]
version = "0.9.3"

@ -3,6 +3,7 @@ pub use sea_orm_migration::prelude::*;
mod m20220101_000001_create_table;
mod m20220921_181610_log_reverts;
mod m20220928_015108_concurrency_limits;
mod m20221007_213828_accounting;
pub struct Migrator;
@ -13,6 +14,7 @@ impl MigratorTrait for Migrator {
Box::new(m20220101_000001_create_table::Migration),
Box::new(m20220921_181610_log_reverts::Migration),
Box::new(m20220928_015108_concurrency_limits::Migration),
Box::new(m20221007_213828_accounting::Migration),
]
}
}

@ -0,0 +1,111 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// create a table for rpc request accounting
manager
.create_table(
Table::create()
.table(RpcAccounting::Table)
.col(
ColumnDef::new(RpcAccounting::Id)
.big_unsigned()
.not_null()
.auto_increment()
.primary_key(),
)
.col(
ColumnDef::new(RpcAccounting::UserKeyId)
.big_unsigned()
.not_null(),
)
.col(
ColumnDef::new(RpcAccounting::ChainId)
.big_unsigned()
.not_null(),
)
.col(
ColumnDef::new(RpcAccounting::Timestamp)
.timestamp()
.not_null(),
)
.col(ColumnDef::new(RpcAccounting::Method).string().not_null())
.col(
ColumnDef::new(RpcAccounting::FrontendRequests)
.unsigned()
.not_null(),
)
.col(
// 0 means cache hit
// 1 is hopefully what most require
// but there might be more if retries were necessary
ColumnDef::new(RpcAccounting::BackendRequests)
.unsigned()
.not_null(),
)
.col(
ColumnDef::new(RpcAccounting::ErrorResponse)
.boolean()
.not_null(),
)
.col(
ColumnDef::new(RpcAccounting::QueryMillis)
.unsigned()
.not_null(),
)
.col(
ColumnDef::new(RpcAccounting::RequestBytes)
.unsigned()
.not_null(),
)
.col(
ColumnDef::new(RpcAccounting::ResponseBytes)
.unsigned()
.not_null(),
)
.index(sea_query::Index::create().col(RpcAccounting::Timestamp))
.index(sea_query::Index::create().col(RpcAccounting::Method))
.index(sea_query::Index::create().col(RpcAccounting::BackendRequests))
.foreign_key(
sea_query::ForeignKey::create()
.from(RpcAccounting::Table, RpcAccounting::UserKeyId)
.to(UserKeys::Table, UserKeys::Id),
)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(RpcAccounting::Table).to_owned())
.await
}
}
/// Partial table definition
#[derive(Iden)]
pub enum UserKeys {
Table,
Id,
}
#[derive(Iden)]
enum RpcAccounting {
Table,
Id,
Timestamp,
UserKeyId,
ChainId,
Method,
FrontendRequests,
BackendRequests,
ErrorResponse,
QueryMillis,
RequestBytes,
ResponseBytes,
}

@ -1,6 +1,6 @@
use sea_orm_migration::prelude::*;
#[async_std::main]
#[tokio::main]
async fn main() {
cli::run_cli(migration::Migrator).await;
}

@ -55,7 +55,7 @@ rustc-hash = "1.1.0"
siwe = "0.5.0"
sea-orm = { version = "0.9.3", features = ["macros"] }
serde = { version = "1.0.145", features = [] }
serde_json = { version = "1.0.85", default-features = false, features = ["alloc", "raw_value"] }
serde_json = { version = "1.0.86", default-features = false, features = ["alloc", "raw_value"] }
serde_prometheus = "0.1.6"
# TODO: make sure this time version matches siwe. PR to put this in their prelude
time = "0.3.15"
@ -72,4 +72,4 @@ tracing = "0.1.37"
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "parking_lot"] }
ulid = { version = "1.0.0", features = ["serde"] }
url = "2.3.1"
uuid = "1.1.2"
uuid = "1.2.1"

@ -2,7 +2,7 @@
use crate::block_number::block_needed;
use crate::config::{AppConfig, TopConfig};
use crate::frontend::authorization::AuthorizedRequest;
use crate::frontend::authorization::{AuthorizedRequest, RequestMetadata};
use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcForwardedResponseEnum;
use crate::jsonrpc::JsonRpcRequest;
@ -11,7 +11,7 @@ use crate::rpcs::blockchain::{ArcBlock, BlockId};
use crate::rpcs::connections::Web3Connections;
use crate::rpcs::request::OpenRequestHandleMetrics;
use crate::rpcs::transactions::TxStatus;
use crate::stats::{ProxyResponseStat, ProxyResponseType, StatEmitter, Web3ProxyStat};
use crate::stats::{ProxyResponseStat, StatEmitter, Web3ProxyStat};
use anyhow::Context;
use atomic::{AtomicBool, Ordering};
use axum::extract::ws::Message;
@ -263,49 +263,16 @@ impl Web3ProxyApp {
}
};
// TODO: dry this with predis
let predis_pool = match top_config.app.persistent_redis_url.as_ref() {
Some(redis_url) => {
// TODO: scrub credentials and then include the redis_url in logs
info!("Connecting to predis");
// TODO: what is a good default?
let redis_max_connections = top_config
.app
.persistent_redis_max_connections
.unwrap_or(num_workers * 2);
// TODO: what are reasonable timeouts?
let redis_pool = RedisConfig::from_url(redis_url)
.builder()?
.max_size(redis_max_connections)
.runtime(DeadpoolRuntime::Tokio1)
.build()?;
// test the redis pool
if let Err(err) = redis_pool.get().await {
error!(
?err,
"failed to connect to vredis. some features will be disabled"
);
};
Some(redis_pool)
}
None => {
warn!("no predis connection. some features will be disabled");
None
}
};
// setup a channel for receiving stats (generally with a high cardinality, such as per-user)
// we do this in a channel so we don't slow down our response to the users
let stat_sender = if let Some(redis_pool) = predis_pool.clone() {
let redis_conn = redis_pool.get().await?;
let stat_sender = if let Some(db_conn) = db_conn.clone() {
// TODO: sender and receiver here are a little confusing. because the thing that reads the receiver is what actually submits the stats
let (stat_sender, stat_handle) =
StatEmitter::spawn(top_config.app.chain_id, redis_conn).await?;
let (stat_sender, stat_handle) = {
// TODO: period from
let emitter = StatEmitter::new(top_config.app.chain_id, db_conn, 60);
emitter.spawn().await?
};
handles.push(stat_handle);
@ -705,7 +672,7 @@ impl Web3ProxyApp {
/// send the request or batch of requests to the approriate RPCs
pub async fn proxy_web3_rpc(
self: &Arc<Self>,
authorized_request: &Arc<AuthorizedRequest>,
authorized_request: Arc<AuthorizedRequest>,
request: JsonRpcRequestEnum,
) -> anyhow::Result<JsonRpcForwardedResponseEnum> {
// TODO: this should probably be trace level
@ -743,15 +710,22 @@ impl Web3ProxyApp {
/// TODO: make sure this isn't a problem
async fn proxy_web3_rpc_requests(
self: &Arc<Self>,
authorized_request: &Arc<AuthorizedRequest>,
authorized_request: Arc<AuthorizedRequest>,
requests: Vec<JsonRpcRequest>,
) -> anyhow::Result<Vec<JsonRpcForwardedResponse>> {
// TODO: we should probably change ethers-rs to support this directly
let num_requests = requests.len();
let responses = join_all(
requests
.into_iter()
.map(|request| self.proxy_web3_rpc_request(authorized_request, request))
.map(|request| {
let authorized_request = authorized_request.clone();
// TODO: spawn so the requests go in parallel
// TODO: i think we will need to flatten
self.proxy_web3_rpc_request(authorized_request, request)
})
.collect::<Vec<_>>(),
)
.await;
@ -783,11 +757,13 @@ impl Web3ProxyApp {
#[measure([ErrorCount, HitCount, ResponseTime, Throughput])]
async fn proxy_web3_rpc_request(
self: &Arc<Self>,
authorized_request: &Arc<AuthorizedRequest>,
authorized_request: Arc<AuthorizedRequest>,
mut request: JsonRpcRequest,
) -> anyhow::Result<JsonRpcForwardedResponse> {
trace!("Received request: {:?}", request);
let request_metadata = RequestMetadata::new(&request);
// save the id so we can attach it to the response
// TODO: instead of cloning, take the id out
let request_id = request.id.clone();
@ -917,7 +893,7 @@ impl Web3ProxyApp {
let rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs);
return rpcs
.try_send_all_upstream_servers(Some(authorized_request), request, None)
.try_send_all_upstream_servers(Some(&authorized_request), request, None)
.await;
}
"eth_syncing" => {
@ -1010,6 +986,8 @@ impl Web3ProxyApp {
let mut response = {
let cache_hit = cache_hit.clone();
let authorized_request = authorized_request.clone();
self.response_cache
.try_get_with(cache_key, async move {
cache_hit.store(false, Ordering::Release);
@ -1020,7 +998,7 @@ impl Web3ProxyApp {
let mut response = self
.balanced_rpcs
.try_send_best_upstream_server(
Some(authorized_request),
Some(&authorized_request),
request,
Some(&request_block_id.num),
)
@ -1040,17 +1018,14 @@ impl Web3ProxyApp {
.context("caching response")?
};
if let Some(stat_sender) = &self.stat_sender {
let response_type = if cache_hit.load(Ordering::Acquire) {
ProxyResponseType::CacheHit
} else {
ProxyResponseType::CacheMiss
};
if let (Some(stat_sender), Ok(AuthorizedRequest::User(Some(_), authorized_key))) = (
self.stat_sender.as_ref(),
Arc::try_unwrap(authorized_request),
) {
let response_stat = ProxyResponseStat::new(
method.to_string(),
response_type,
authorized_request,
authorized_key,
request_metadata,
);
stat_sender.send_async(response_stat.into()).await?;

@ -78,7 +78,7 @@ fn run(
let prometheus_handle = tokio::spawn(metrics_frontend::serve(app, app_prometheus_port));
// if everything is working, these should both run forever
// TODO: try_join these instead?
// TODO: join these instead and use shutdown handler properly
tokio::select! {
x = app_handle => {
match x {
@ -114,6 +114,10 @@ fn run(
}
};
// TODO: wait on all the handles to stop
info!("finished");
Ok(())
})
}

@ -31,6 +31,10 @@ pub struct CreateUserSubCommand {
/// maximum requests per minute.
/// default to "None" which the code sees as "unlimited" requests.
rpm: Option<u64>,
#[argh(option)]
/// a short description of the key's purpose
description: Option<String>,
}
impl CreateUserSubCommand {
@ -70,16 +74,16 @@ impl CreateUserSubCommand {
);
// create a key for the new user
// TODO: requests_per_minute should be configurable
let uk = user_keys::ActiveModel {
user_id: u.id,
api_key: sea_orm::Set(self.api_key.into()),
requests_per_minute: sea_orm::Set(self.rpm),
description: sea_orm::Set(self.description),
..Default::default()
};
// TODO: if this fails, rever adding the user, too
let uk = uk.save(&txn).await.context("Failed saving new user key")?;
let _uk = uk.save(&txn).await.context("Failed saving new user key")?;
txn.commit().await?;

@ -81,11 +81,8 @@ pub struct AppConfig {
/// This is separate from the rpc limits.
#[serde(default = "default_login_rate_limit_per_minute")]
pub login_rate_limit_per_minute: u64,
/// Persist user stats in a redis (or compatible backend)
/// TODO: research more time series databases
pub persistent_redis_url: Option<String>,
pub persistent_redis_max_connections: Option<usize>,
/// Track rate limits in a redis (or compatible backend)
/// It is okay if this data is lost.
pub volatile_redis_url: Option<String>,
/// maximum size of the connection pool for the cache
/// If none, the minimum * 2 is used

@ -1,7 +1,9 @@
use super::errors::FrontendErrorResponse;
use crate::app::{UserKeyData, Web3ProxyApp};
use crate::jsonrpc::JsonRpcRequest;
use anyhow::Context;
use axum::headers::{authorization::Bearer, Origin, Referer, UserAgent};
use chrono::{DateTime, Utc};
use deferred_rate_limiter::DeferredRateLimitResult;
use entities::user_keys;
use ipnet::IpNet;
@ -10,6 +12,8 @@ use redis_rate_limiter::RedisRateLimitResult;
use sea_orm::{prelude::Decimal, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use serde::Serialize;
use std::fmt::Display;
use std::mem::size_of_val;
use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU32, AtomicUsize};
use std::{net::IpAddr, str::FromStr, sync::Arc};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::time::Instant;
@ -24,6 +28,62 @@ pub enum UserKey {
Uuid(Uuid),
}
#[derive(Debug)]
pub enum RateLimitResult {
/// contains the IP of the anonymous user
/// TODO: option inside or outside the arc?
AllowedIp(IpAddr, OwnedSemaphorePermit),
/// contains the user_key_id of an authenticated user
AllowedUser(UserKeyData, Option<OwnedSemaphorePermit>),
/// contains the IP and retry_at of the anonymous user
RateLimitedIp(IpAddr, Option<Instant>),
/// contains the user_key_id and retry_at of an authenticated user key
RateLimitedUser(UserKeyData, Option<Instant>),
/// This key is not in our database. Deny access!
UnknownKey,
}
#[derive(Clone, Debug, Serialize)]
pub struct AuthorizedKey {
pub ip: IpAddr,
pub origin: Option<String>,
pub user_key_id: u64,
// TODO: just use an f32? even an f16 is probably fine
pub log_revert_chance: Decimal,
}
#[derive(Debug, Default, Serialize)]
pub struct RequestMetadata {
pub datetime: DateTime<Utc>,
pub request_bytes: AtomicUsize,
pub backend_requests: AtomicU16,
pub error_response: AtomicBool,
pub response_bytes: AtomicUsize,
pub response_millis: AtomicU32,
}
#[derive(Clone, Debug, Serialize)]
pub enum AuthorizedRequest {
/// Request from this app
Internal,
/// Request from an anonymous IP address
Ip(#[serde(skip)] IpAddr),
/// Request from an authenticated and authorized user
User(#[serde(skip)] Option<DatabaseConnection>, AuthorizedKey),
}
impl RequestMetadata {
pub fn new(request: &JsonRpcRequest) -> Self {
let request_bytes = size_of_val(request);
Self {
request_bytes: request_bytes.into(),
datetime: Utc::now(),
..Default::default()
}
}
}
impl UserKey {
pub fn new() -> Self {
Ulid::new().into()
@ -54,6 +114,7 @@ impl FromStr for UserKey {
} else if let Ok(uuid) = s.parse::<Uuid>() {
Ok(uuid.into())
} else {
// TODO: custom error type so that this shows as a 400
Err(anyhow::anyhow!("UserKey was not a ULID or UUID"))
}
}
@ -89,30 +150,6 @@ impl From<UserKey> for Uuid {
}
}
#[derive(Debug)]
pub enum RateLimitResult {
/// contains the IP of the anonymous user
/// TODO: option inside or outside the arc?
AllowedIp(IpAddr, OwnedSemaphorePermit),
/// contains the user_key_id of an authenticated user
AllowedUser(UserKeyData, Option<OwnedSemaphorePermit>),
/// contains the IP and retry_at of the anonymous user
RateLimitedIp(IpAddr, Option<Instant>),
/// contains the user_key_id and retry_at of an authenticated user key
RateLimitedUser(UserKeyData, Option<Instant>),
/// This key is not in our database. Deny access!
UnknownKey,
}
#[derive(Debug, Serialize)]
pub struct AuthorizedKey {
pub ip: IpAddr,
pub origin: Option<String>,
pub user_key_id: u64,
pub log_revert_chance: Decimal,
// TODO: what else?
}
impl AuthorizedKey {
pub fn try_new(
ip: IpAddr,
@ -180,16 +217,6 @@ impl AuthorizedKey {
}
}
#[derive(Debug, Serialize)]
pub enum AuthorizedRequest {
/// Request from this app
Internal,
/// Request from an anonymous IP address
Ip(#[serde(skip)] IpAddr),
/// Request from an authenticated and authorized user
User(#[serde(skip)] Option<DatabaseConnection>, AuthorizedKey),
}
impl AuthorizedRequest {
/// Only User has a database connection in case it needs to save a revert to the database.
pub fn db_conn(&self) -> Option<&DatabaseConnection> {
@ -205,8 +232,8 @@ impl Display for &AuthorizedRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AuthorizedRequest::Internal => f.write_str("int"),
AuthorizedRequest::Ip(x) => f.write_str(&format!("ip:{}", x)),
AuthorizedRequest::User(_, x) => f.write_str(&format!("uk:{}", x.user_key_id)),
AuthorizedRequest::Ip(x) => f.write_str(&format!("ip-{}", x)),
AuthorizedRequest::User(_, x) => f.write_str(&format!("uk-{}", x.user_key_id)),
}
}
}
@ -324,6 +351,11 @@ impl Web3ProxyApp {
})
.await;
// if semaphore.available_permits() == 0 {
// // TODO: concurrent limit hit! emit a stat? less important for anon users
// // TODO: there is probably a race here
// }
let semaphore_permit = semaphore.acquire_owned().await?;
Ok(semaphore_permit)
@ -345,6 +377,10 @@ impl Web3ProxyApp {
// TODO: is this the best way to handle an arc
.map_err(|err| anyhow::anyhow!(err))?;
// if semaphore.available_permits() == 0 {
// // TODO: concurrent limit hit! emit a stat
// }
let semaphore_permit = semaphore.acquire_owned().await?;
Ok(Some(semaphore_permit))
@ -419,7 +455,7 @@ impl Web3ProxyApp {
}
} else {
// TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right
todo!("no rate limiter");
Ok(RateLimitResult::AllowedIp(ip, semaphore))
}
}
@ -538,12 +574,14 @@ impl Web3ProxyApp {
// TODO: debug or trace?
// this is too verbose, but a stat might be good
// TODO: keys are secrets! use the id instead
// TODO: emit a stat
trace!(?user_key, "rate limit exceeded until {:?}", retry_at);
Ok(RateLimitResult::RateLimitedUser(user_data, Some(retry_at)))
}
Ok(DeferredRateLimitResult::RetryNever) => {
// TODO: keys are secret. don't log them!
trace!(?user_key, "rate limit is 0");
// TODO: emit a stat
Ok(RateLimitResult::RateLimitedUser(user_data, None))
}
Err(err) => {
@ -556,8 +594,7 @@ impl Web3ProxyApp {
}
} else {
// TODO: if no redis, rate limit with just a local cache?
// if we don't have redis, we probably don't have a db, so this probably will never happen
Err(anyhow::anyhow!("no redis. cannot rate limit"))
Ok(RateLimitResult::AllowedUser(user_data, semaphore))
}
}
}

@ -41,7 +41,7 @@ pub async fn proxy_web3_rpc(
let authorized_request = Arc::new(authorized_request);
let f = tokio::spawn(async move {
app.proxy_web3_rpc(&authorized_request, payload)
app.proxy_web3_rpc(authorized_request, payload)
.instrument(request_span)
.await
});
@ -81,7 +81,7 @@ pub async fn proxy_web3_rpc_with_key(
let authorized_request = Arc::new(authorized_request);
let f = tokio::spawn(async move {
app.proxy_web3_rpc(&authorized_request, payload)
app.proxy_web3_rpc(authorized_request, payload)
.instrument(request_span)
.await
});

@ -167,7 +167,7 @@ async fn handle_socket_payload(
let response = app
.eth_subscribe(
authorized_request.clone(),
authorized_request,
payload,
subscription_count,
response_sender.clone(),
@ -211,10 +211,7 @@ async fn handle_socket_payload(
Ok(response.into())
}
_ => {
app.proxy_web3_rpc(&authorized_request, payload.into())
.await
}
_ => app.proxy_web3_rpc(authorized_request, payload.into()).await,
};
(id, response)

@ -247,6 +247,8 @@ impl OpenRequestHandle {
error_handler
};
// TODO: check for "execution reverted" here
match error_handler {
RequestErrorHandler::DebugLevel => {
debug!(?err, %method, rpc=%self.conn, "bad response!");

@ -1,86 +1,121 @@
use anyhow::Context;
use crate::frontend::authorization::{AuthorizedKey, RequestMetadata};
use chrono::{DateTime, Utc};
use derive_more::From;
use redis_rate_limiter::{redis, RedisConnection};
use std::fmt::Display;
use entities::rpc_accounting;
use moka::future::{Cache, CacheBuilder};
use parking_lot::{Mutex, RwLock};
use sea_orm::DatabaseConnection;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::{
sync::atomic::{AtomicU32, AtomicU64},
time::Duration,
};
use tokio::task::JoinHandle;
use tracing::{debug, error, info};
use crate::frontend::authorization::AuthorizedRequest;
#[derive(Debug)]
pub enum ProxyResponseType {
CacheHit,
CacheMiss,
Error,
}
impl Display for ProxyResponseType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ProxyResponseType::CacheHit => f.write_str("ch"),
ProxyResponseType::CacheMiss => f.write_str("cm"),
ProxyResponseType::Error => f.write_str("err"),
}
}
}
use tracing::{error, info, trace};
/// TODO: where should this be defined?
/// TODO: can we use something inside sea_orm instead?
#[derive(Debug)]
pub struct ProxyResponseStat(String);
/// A very basic stat that we store in redis.
/// This probably belongs in a true time series database like influxdb, but client
impl ProxyResponseStat {
pub fn new(method: String, response_type: ProxyResponseType, who: &AuthorizedRequest) -> Self {
// TODO: what order?
// TODO: app specific prefix. need at least the chain id
let redis_key = format!("proxy_response:{}:{}:{}", method, response_type, who);
Self(redis_key)
}
pub struct ProxyResponseStat {
user_key_id: u64,
method: String,
metadata: RequestMetadata,
}
// TODO: impl From for our database model
#[derive(Default)]
pub struct ProxyResponseAggregate {
// user_key_id: u64,
// method: String,
// error_response: bool,
frontend_requests: AtomicU32,
backend_requests: AtomicU32,
first_datetime: DateTime<Utc>,
// TODO: would like to not need a mutex. see how it performs before caring too much
last_timestamp: Mutex<DateTime<Utc>>,
first_response_millis: u32,
sum_response_millis: AtomicU32,
sum_request_bytes: AtomicU32,
sum_response_bytes: AtomicU32,
}
/// key is the (user_key_id, method, error_response)
pub type UserProxyResponseCache = Cache<
(u64, String, bool),
Arc<ProxyResponseAggregate>,
hashbrown::hash_map::DefaultHashBuilder,
>;
/// key is the "time bucket" (timestamp / period)
pub type TimeProxyResponseCache =
Cache<u64, UserProxyResponseCache, hashbrown::hash_map::DefaultHashBuilder>;
pub struct StatEmitter {
chain_id: u64,
db_conn: DatabaseConnection,
period_seconds: u64,
/// the outer cache has a TTL and a handler for expiration
aggregated_proxy_responses: TimeProxyResponseCache,
}
/// A stat that we aggregate and then store in a database.
#[derive(Debug, From)]
pub enum Web3ProxyStat {
ProxyResponse(ProxyResponseStat),
}
impl Web3ProxyStat {
fn into_redis_key(self, chain_id: u64) -> String {
match self {
Self::ProxyResponse(x) => format!("{}:{}", x.0, chain_id),
impl ProxyResponseStat {
// TODO: should RequestMetadata be in an arc? or can we handle refs here?
pub fn new(method: String, authorized_key: AuthorizedKey, metadata: RequestMetadata) -> Self {
Self {
user_key_id: authorized_key.user_key_id,
method,
metadata,
}
}
}
pub struct StatEmitter;
impl StatEmitter {
pub fn new(chain_id: u64, db_conn: DatabaseConnection, period_seconds: u64) -> Arc<Self> {
let aggregated_proxy_responses = CacheBuilder::default()
.time_to_live(Duration::from_secs(period_seconds * 3 / 2))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
let s = Self {
chain_id,
db_conn,
period_seconds,
aggregated_proxy_responses,
};
Arc::new(s)
}
pub async fn spawn(
chain_id: u64,
mut redis_conn: RedisConnection,
self: Arc<Self>,
) -> anyhow::Result<(flume::Sender<Web3ProxyStat>, JoinHandle<anyhow::Result<()>>)> {
let (tx, rx) = flume::unbounded::<Web3ProxyStat>();
// simple future that reads the channel and emits stats
let f = async move {
// TODO: select on shutdown handle so we can be sure to save every aggregate!
while let Ok(x) = rx.recv_async().await {
trace!(?x, "emitting stat");
// TODO: increment global stats (in redis? in local cache for prometheus?)
let clone = self.clone();
// TODO: batch stats? spawn this?
// TODO: where can we wait on this handle?
tokio::spawn(async move { clone.queue_user_stat(x).await });
let x = x.into_redis_key(chain_id);
// TODO: this is too loud. just doing it for dev
debug!(?x, "emitting stat");
if let Err(err) = redis::Cmd::incr(&x, 1)
.query_async::<_, ()>(&mut redis_conn)
.await
.context("incrementing stat")
{
error!(?err, "emitting stat")
}
// no need to save manually. they save on expire
}
// shutting down. force a save
self.save_user_stats().await?;
info!("stat emitter exited");
Ok(())
@ -90,4 +125,52 @@ impl StatEmitter {
Ok((tx, handle))
}
pub async fn queue_user_stat(&self, stat: Web3ProxyStat) -> anyhow::Result<()> {
match stat {
Web3ProxyStat::ProxyResponse(x) => {
// TODO: move this into another function?
// get the user cache for the current time bucket
let time_bucket = (x.metadata.datetime.timestamp() as u64) / self.period_seconds;
let user_cache = self
.aggregated_proxy_responses
.get_with(time_bucket, async move {
CacheBuilder::default()
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new())
})
.await;
let error_response = x.metadata.error_response.load(Ordering::Acquire);
let key = (x.user_key_id, x.method.clone(), error_response);
let user_aggregate = user_cache
.get_with(key, async move {
let last_timestamp = Mutex::new(x.metadata.datetime);
let aggregate = ProxyResponseAggregate {
first_datetime: x.metadata.datetime,
first_response_millis: x
.metadata
.response_millis
.load(Ordering::Acquire),
last_timestamp,
..Default::default()
};
Arc::new(aggregate)
})
.await;
todo!();
}
}
Ok(())
}
pub async fn save_user_stats(&self) -> anyhow::Result<()> {
todo!();
}
}