From 25aa68a5bfbc58dcc91724b14fa14ac3863a15bc Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 10 Oct 2022 04:15:07 +0000 Subject: [PATCH] add per-user rpc accounting --- Cargo.lock | 28 +-- TODO.md | 18 +- config/example.toml | 5 +- docker-compose.common.yml | 5 - docker-compose.yml | 11 - entities/Cargo.toml | 4 +- entities/src/mod.rs | 1 + entities/src/prelude.rs | 1 + entities/src/rpc_accounting.rs | 40 ++++ entities/src/user_keys.rs | 10 +- migration/Cargo.toml | 4 +- migration/src/lib.rs | 2 + migration/src/m20221007_213828_accounting.rs | 111 ++++++++++ migration/src/main.rs | 2 +- web3_proxy/Cargo.toml | 4 +- web3_proxy/src/app.rs | 89 +++----- web3_proxy/src/bin/web3_proxy.rs | 6 +- .../src/bin/web3_proxy_cli/create_user.rs | 8 +- web3_proxy/src/config.rs | 5 +- web3_proxy/src/frontend/authorization.rs | 115 +++++++---- web3_proxy/src/frontend/rpc_proxy_http.rs | 4 +- web3_proxy/src/frontend/rpc_proxy_ws.rs | 7 +- web3_proxy/src/rpcs/request.rs | 2 + web3_proxy/src/stats.rs | 193 +++++++++++++----- 24 files changed, 461 insertions(+), 214 deletions(-) create mode 100644 entities/src/rpc_accounting.rs create mode 100644 migration/src/m20221007_213828_accounting.rs diff --git a/Cargo.lock b/Cargo.lock index cdb1d7d1..338a17bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1428,11 +1428,11 @@ dependencies = [ [[package]] name = "entities" -version = "0.3.0" +version = "0.4.0" dependencies = [ "sea-orm", "serde", - "uuid 1.1.2", + "uuid 1.2.1", ] [[package]] @@ -2742,10 +2742,10 @@ dependencies = [ [[package]] name = "migration" -version = "0.3.0" +version = "0.4.0" dependencies = [ - "async-std", "sea-orm-migration", + "tokio", ] [[package]] @@ -2802,7 +2802,7 @@ dependencies = [ "tagptr", "thiserror", "triomphe", - "uuid 1.1.2", + "uuid 1.2.1", ] [[package]] @@ -3430,7 +3430,7 @@ dependencies = [ "serde", "serde_json", "time 0.3.15", - "uuid 1.1.2", + "uuid 1.2.1", ] [[package]] @@ -4076,7 +4076,7 @@ dependencies = [ "time 0.3.15", "tracing", "url", - "uuid 1.1.2", + "uuid 1.2.1", ] [[package]] @@ -4138,7 +4138,7 @@ dependencies = [ "sea-query-driver", "serde_json", "time 0.3.15", - "uuid 1.1.2", + "uuid 1.2.1", ] [[package]] @@ -4271,9 +4271,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.85" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e55a28e3aaef9d5ce0506d0a14dbba8054ddc7e499ef522dd8b26859ec9d4a44" +checksum = "41feea4228a6f1cd09ec7a3593a682276702cd67b5273544757dae23c096f074" dependencies = [ "itoa 1.0.2", "ryu", @@ -4605,7 +4605,7 @@ dependencies = [ "time 0.3.15", "tokio-stream", "url", - "uuid 1.1.2", + "uuid 1.2.1", "webpki-roots", ] @@ -5331,9 +5331,9 @@ dependencies = [ [[package]] name = "uuid" -version = "1.1.2" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f" +checksum = "feb41e78f93363bb2df8b0e86a2ca30eed7806ea16ea0c790d757cf93f79be83" dependencies = [ "getrandom", "serde", @@ -5545,7 +5545,7 @@ dependencies = [ "tracing-subscriber", "ulid 1.0.0", "url", - "uuid 1.1.2", + "uuid 1.2.1", ] [[package]] diff --git a/TODO.md b/TODO.md index 36958af6..677da5ba 100644 --- a/TODO.md +++ b/TODO.md @@ -174,12 +174,15 @@ These are roughly in order of completition - [x] get to /, when not serving a websocket, should have a simple welcome page. maybe with a button to update your wallet. - [x] instead of giving a rate limit error code, delay the connection's response at the start. reject if incoming requests is super high? - [x] did this by checking a key/ip-specific semaphore before checking rate limits -- [x] emit stat on cache hit -- [x] emit stat on cache miss -- [ ] add grafana to dev docker-compose so we can browse stats -- [ ] emit stat on retry -- [ ] emit stat on no servers synced -- [ ] emit stat on error (maybe just use sentry, but graphs are handy) +- [x] emit user stat on cache hit +- [x] emit user stat on cache miss +- [x] have migration use tokio instead of async-std +- [x] user create script should allow a description field +- [-] change stats to using the database +- [ ] emit user stat on retry +- [ ] emit global stat on retry +- [ ] emit global stat on no servers synced +- [ ] emit global stat on error (maybe just use sentry, but graphs are handy) - if we wait until the error handler to emit the stat, i don't think we have access to the authorized_request - [ ] display requests per second per api key (only with authentication!) - [ ] display concurrent requests per api key (only with authentication!) @@ -194,12 +197,13 @@ These are roughly in order of completition - [ ] BUG: i think if all backend servers stop, the server doesn't properly reconnect. It appears to stop listening on 8854, but not shut down. - [ ] option to rotate api key - [ ] if no bearer token found in redis (likely because it expired), send 401 unauthorized -- [ ] user create script should allow a description field - [ ] user create script should allow multiple keys per user - [ ] somehow the proxy thought latest was hours behind. need internal health check that forces reconnect if this happens - [ ] display logged reverts on an endpoint that requires authentication - [ ] failsafe. if no blocks or transactions in some time, warn and reset the connection - [ ] have a log all option? instead of just reverts, log all request/responses? can be very useful for debugging +- [ ] WARN http_request: web3_proxy::frontend::errors: anyhow err=UserKey was not a ULID or UUID id=01GER4VBTS0FDHEBR96D1JRDZF method=POST + - if invalid user id given, we give a 500. should be a different error code instead ## V1 diff --git a/config/example.toml b/config/example.toml index bb077923..82aed485 100644 --- a/config/example.toml +++ b/config/example.toml @@ -9,16 +9,13 @@ min_sum_soft_limit = 2000 min_synced_rpcs = 2 # TODO: how do we find the optimal redis_max_connections? too high actually ends up being slower -persistent_redis_max_connections = 300 -persistent_redis_url = "redis://dev-predis:6379/" - volatile_redis_max_connections = 300 volatile_redis_url = "redis://dev-vredis:6379/" redirect_public_url = "https://llamanodes.com/free-rpc-stats" redirect_user_url = "https://llamanodes.com/user-rpc-stats/{{user_id}}" -public_rate_limit_per_minute = 0 +frontend_rate_limit_per_minute = 0 # 1GB of cache response_cache_max_bytes = 10000000000 diff --git a/docker-compose.common.yml b/docker-compose.common.yml index ebb07071..26e24c3b 100644 --- a/docker-compose.common.yml +++ b/docker-compose.common.yml @@ -11,11 +11,6 @@ services: #RUST_LOG: "info,web3_proxy=debug" RUST_LOG: info - persistent_redis: - image: redis:6.0-alpine - command: [ "redis-server", "--save", "", "--appendonly", "no" ] - # be sure to mount /data! - volatile_redis: image: redis:6.0-alpine command: [ "redis-server", "--save", "60", "1" ] diff --git a/docker-compose.yml b/docker-compose.yml index 08e8910a..fcac11c9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,17 +23,6 @@ services: volumes: - ./data/dev_mysql:/var/lib/mysql - # persistent redis for storing user stats - # TODO: replace this with a real time series database - dev-predis: - extends: - file: docker-compose.common.yml - service: persistent_redis - ports: - - 16379:6379 - volumes: - - ./data/dev_predis:/data - # volatile redis for storing rate limits dev-vredis: extends: diff --git a/entities/Cargo.toml b/entities/Cargo.toml index f37d5d67..95a27cc3 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "entities" -version = "0.3.0" +version = "0.4.0" edition = "2021" [lib] @@ -12,4 +12,4 @@ path = "src/mod.rs" [dependencies] sea-orm = "0.9.3" serde = "1.0.145" -uuid = "1.1.2" +uuid = "1.2.1" diff --git a/entities/src/mod.rs b/entities/src/mod.rs index 4f2429b0..2189a255 100644 --- a/entities/src/mod.rs +++ b/entities/src/mod.rs @@ -3,6 +3,7 @@ pub mod prelude; pub mod revert_logs; +pub mod rpc_accounting; pub mod sea_orm_active_enums; pub mod secondary_user; pub mod user; diff --git a/entities/src/prelude.rs b/entities/src/prelude.rs index dfc16135..081dd721 100644 --- a/entities/src/prelude.rs +++ b/entities/src/prelude.rs @@ -1,6 +1,7 @@ //! SeaORM Entity. Generated by sea-orm-codegen 0.9.1 pub use super::revert_logs::Entity as RevertLogs; +pub use super::rpc_accounting::Entity as RpcAccounting; pub use super::secondary_user::Entity as SecondaryUser; pub use super::user::Entity as User; pub use super::user_keys::Entity as UserKeys; diff --git a/entities/src/rpc_accounting.rs b/entities/src/rpc_accounting.rs new file mode 100644 index 00000000..e39a8ac1 --- /dev/null +++ b/entities/src/rpc_accounting.rs @@ -0,0 +1,40 @@ +//! SeaORM Entity. Generated by sea-orm-codegen 0.9.1 + +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)] +#[sea_orm(table_name = "rpc_accounting")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: u64, + pub user_key_id: u64, + pub chain_id: u64, + pub timestamp: DateTimeUtc, + pub method: String, + pub backend_requests: u32, + pub error_response: i8, + pub query_millis: u32, + pub request_bytes: u32, + pub response_bytes: u32, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::user_keys::Entity", + from = "Column::UserKeyId", + to = "super::user_keys::Column::Id", + on_update = "NoAction", + on_delete = "NoAction" + )] + UserKeys, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::UserKeys.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/entities/src/user_keys.rs b/entities/src/user_keys.rs index 81e95552..b4765ae7 100644 --- a/entities/src/user_keys.rs +++ b/entities/src/user_keys.rs @@ -16,7 +16,6 @@ pub struct Model { pub private_txs: bool, pub active: bool, pub requests_per_minute: Option, - pub max_concurrent_requests: Option, #[sea_orm(column_type = "Decimal(Some((5, 4)))")] pub log_revert_chance: Decimal, #[sea_orm(column_type = "Text", nullable)] @@ -27,6 +26,7 @@ pub struct Model { pub allowed_referers: Option, #[sea_orm(column_type = "Text", nullable)] pub allowed_user_agents: Option, + pub max_concurrent_requests: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -41,6 +41,8 @@ pub enum Relation { User, #[sea_orm(has_many = "super::revert_logs::Entity")] RevertLogs, + #[sea_orm(has_many = "super::rpc_accounting::Entity")] + RpcAccounting, } impl Related for Entity { @@ -55,4 +57,10 @@ impl Related for Entity { } } +impl Related for Entity { + fn to() -> RelationDef { + Relation::RpcAccounting.def() + } +} + impl ActiveModelBehavior for ActiveModel {} diff --git a/migration/Cargo.toml b/migration/Cargo.toml index fc3968f6..944f5447 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "migration" -version = "0.3.0" +version = "0.4.0" edition = "2021" publish = false @@ -9,7 +9,7 @@ name = "migration" path = "src/lib.rs" [dependencies] -async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } +tokio = { version = "1.21.2", features = ["full", "tracing"] } [dependencies.sea-orm-migration] version = "0.9.3" diff --git a/migration/src/lib.rs b/migration/src/lib.rs index d5430fda..380bbe30 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -3,6 +3,7 @@ pub use sea_orm_migration::prelude::*; mod m20220101_000001_create_table; mod m20220921_181610_log_reverts; mod m20220928_015108_concurrency_limits; +mod m20221007_213828_accounting; pub struct Migrator; @@ -13,6 +14,7 @@ impl MigratorTrait for Migrator { Box::new(m20220101_000001_create_table::Migration), Box::new(m20220921_181610_log_reverts::Migration), Box::new(m20220928_015108_concurrency_limits::Migration), + Box::new(m20221007_213828_accounting::Migration), ] } } diff --git a/migration/src/m20221007_213828_accounting.rs b/migration/src/m20221007_213828_accounting.rs new file mode 100644 index 00000000..72ff2912 --- /dev/null +++ b/migration/src/m20221007_213828_accounting.rs @@ -0,0 +1,111 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // create a table for rpc request accounting + manager + .create_table( + Table::create() + .table(RpcAccounting::Table) + .col( + ColumnDef::new(RpcAccounting::Id) + .big_unsigned() + .not_null() + .auto_increment() + .primary_key(), + ) + .col( + ColumnDef::new(RpcAccounting::UserKeyId) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::ChainId) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::Timestamp) + .timestamp() + .not_null(), + ) + .col(ColumnDef::new(RpcAccounting::Method).string().not_null()) + .col( + ColumnDef::new(RpcAccounting::FrontendRequests) + .unsigned() + .not_null(), + ) + .col( + // 0 means cache hit + // 1 is hopefully what most require + // but there might be more if retries were necessary + ColumnDef::new(RpcAccounting::BackendRequests) + .unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::ErrorResponse) + .boolean() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::QueryMillis) + .unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::RequestBytes) + .unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::ResponseBytes) + .unsigned() + .not_null(), + ) + .index(sea_query::Index::create().col(RpcAccounting::Timestamp)) + .index(sea_query::Index::create().col(RpcAccounting::Method)) + .index(sea_query::Index::create().col(RpcAccounting::BackendRequests)) + .foreign_key( + sea_query::ForeignKey::create() + .from(RpcAccounting::Table, RpcAccounting::UserKeyId) + .to(UserKeys::Table, UserKeys::Id), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(RpcAccounting::Table).to_owned()) + .await + } +} + +/// Partial table definition +#[derive(Iden)] +pub enum UserKeys { + Table, + Id, +} + +#[derive(Iden)] +enum RpcAccounting { + Table, + Id, + Timestamp, + UserKeyId, + ChainId, + Method, + FrontendRequests, + BackendRequests, + ErrorResponse, + QueryMillis, + RequestBytes, + ResponseBytes, +} diff --git a/migration/src/main.rs b/migration/src/main.rs index c6b6e48d..f054deaf 100644 --- a/migration/src/main.rs +++ b/migration/src/main.rs @@ -1,6 +1,6 @@ use sea_orm_migration::prelude::*; -#[async_std::main] +#[tokio::main] async fn main() { cli::run_cli(migration::Migrator).await; } diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index e40ea133..b13c17e7 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -55,7 +55,7 @@ rustc-hash = "1.1.0" siwe = "0.5.0" sea-orm = { version = "0.9.3", features = ["macros"] } serde = { version = "1.0.145", features = [] } -serde_json = { version = "1.0.85", default-features = false, features = ["alloc", "raw_value"] } +serde_json = { version = "1.0.86", default-features = false, features = ["alloc", "raw_value"] } serde_prometheus = "0.1.6" # TODO: make sure this time version matches siwe. PR to put this in their prelude time = "0.3.15" @@ -72,4 +72,4 @@ tracing = "0.1.37" tracing-subscriber = { version = "0.3.16", features = ["env-filter", "parking_lot"] } ulid = { version = "1.0.0", features = ["serde"] } url = "2.3.1" -uuid = "1.1.2" +uuid = "1.2.1" diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 37ee09c6..ea172370 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -2,7 +2,7 @@ use crate::block_number::block_needed; use crate::config::{AppConfig, TopConfig}; -use crate::frontend::authorization::AuthorizedRequest; +use crate::frontend::authorization::{AuthorizedRequest, RequestMetadata}; use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcRequest; @@ -11,7 +11,7 @@ use crate::rpcs::blockchain::{ArcBlock, BlockId}; use crate::rpcs::connections::Web3Connections; use crate::rpcs::request::OpenRequestHandleMetrics; use crate::rpcs::transactions::TxStatus; -use crate::stats::{ProxyResponseStat, ProxyResponseType, StatEmitter, Web3ProxyStat}; +use crate::stats::{ProxyResponseStat, StatEmitter, Web3ProxyStat}; use anyhow::Context; use atomic::{AtomicBool, Ordering}; use axum::extract::ws::Message; @@ -263,49 +263,16 @@ impl Web3ProxyApp { } }; - // TODO: dry this with predis - let predis_pool = match top_config.app.persistent_redis_url.as_ref() { - Some(redis_url) => { - // TODO: scrub credentials and then include the redis_url in logs - info!("Connecting to predis"); - - // TODO: what is a good default? - let redis_max_connections = top_config - .app - .persistent_redis_max_connections - .unwrap_or(num_workers * 2); - - // TODO: what are reasonable timeouts? - let redis_pool = RedisConfig::from_url(redis_url) - .builder()? - .max_size(redis_max_connections) - .runtime(DeadpoolRuntime::Tokio1) - .build()?; - - // test the redis pool - if let Err(err) = redis_pool.get().await { - error!( - ?err, - "failed to connect to vredis. some features will be disabled" - ); - }; - - Some(redis_pool) - } - None => { - warn!("no predis connection. some features will be disabled"); - None - } - }; - // setup a channel for receiving stats (generally with a high cardinality, such as per-user) // we do this in a channel so we don't slow down our response to the users - let stat_sender = if let Some(redis_pool) = predis_pool.clone() { - let redis_conn = redis_pool.get().await?; - + let stat_sender = if let Some(db_conn) = db_conn.clone() { // TODO: sender and receiver here are a little confusing. because the thing that reads the receiver is what actually submits the stats - let (stat_sender, stat_handle) = - StatEmitter::spawn(top_config.app.chain_id, redis_conn).await?; + let (stat_sender, stat_handle) = { + // TODO: period from + let emitter = StatEmitter::new(top_config.app.chain_id, db_conn, 60); + + emitter.spawn().await? + }; handles.push(stat_handle); @@ -705,7 +672,7 @@ impl Web3ProxyApp { /// send the request or batch of requests to the approriate RPCs pub async fn proxy_web3_rpc( self: &Arc, - authorized_request: &Arc, + authorized_request: Arc, request: JsonRpcRequestEnum, ) -> anyhow::Result { // TODO: this should probably be trace level @@ -743,15 +710,22 @@ impl Web3ProxyApp { /// TODO: make sure this isn't a problem async fn proxy_web3_rpc_requests( self: &Arc, - authorized_request: &Arc, + authorized_request: Arc, requests: Vec, ) -> anyhow::Result> { // TODO: we should probably change ethers-rs to support this directly let num_requests = requests.len(); + let responses = join_all( requests .into_iter() - .map(|request| self.proxy_web3_rpc_request(authorized_request, request)) + .map(|request| { + let authorized_request = authorized_request.clone(); + + // TODO: spawn so the requests go in parallel + // TODO: i think we will need to flatten + self.proxy_web3_rpc_request(authorized_request, request) + }) .collect::>(), ) .await; @@ -783,11 +757,13 @@ impl Web3ProxyApp { #[measure([ErrorCount, HitCount, ResponseTime, Throughput])] async fn proxy_web3_rpc_request( self: &Arc, - authorized_request: &Arc, + authorized_request: Arc, mut request: JsonRpcRequest, ) -> anyhow::Result { trace!("Received request: {:?}", request); + let request_metadata = RequestMetadata::new(&request); + // save the id so we can attach it to the response // TODO: instead of cloning, take the id out let request_id = request.id.clone(); @@ -917,7 +893,7 @@ impl Web3ProxyApp { let rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs); return rpcs - .try_send_all_upstream_servers(Some(authorized_request), request, None) + .try_send_all_upstream_servers(Some(&authorized_request), request, None) .await; } "eth_syncing" => { @@ -1010,6 +986,8 @@ impl Web3ProxyApp { let mut response = { let cache_hit = cache_hit.clone(); + let authorized_request = authorized_request.clone(); + self.response_cache .try_get_with(cache_key, async move { cache_hit.store(false, Ordering::Release); @@ -1020,7 +998,7 @@ impl Web3ProxyApp { let mut response = self .balanced_rpcs .try_send_best_upstream_server( - Some(authorized_request), + Some(&authorized_request), request, Some(&request_block_id.num), ) @@ -1040,17 +1018,14 @@ impl Web3ProxyApp { .context("caching response")? }; - if let Some(stat_sender) = &self.stat_sender { - let response_type = if cache_hit.load(Ordering::Acquire) { - ProxyResponseType::CacheHit - } else { - ProxyResponseType::CacheMiss - }; - + if let (Some(stat_sender), Ok(AuthorizedRequest::User(Some(_), authorized_key))) = ( + self.stat_sender.as_ref(), + Arc::try_unwrap(authorized_request), + ) { let response_stat = ProxyResponseStat::new( method.to_string(), - response_type, - authorized_request, + authorized_key, + request_metadata, ); stat_sender.send_async(response_stat.into()).await?; diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index 521c06c1..28ca7377 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -78,7 +78,7 @@ fn run( let prometheus_handle = tokio::spawn(metrics_frontend::serve(app, app_prometheus_port)); // if everything is working, these should both run forever - // TODO: try_join these instead? + // TODO: join these instead and use shutdown handler properly tokio::select! { x = app_handle => { match x { @@ -114,6 +114,10 @@ fn run( } }; + // TODO: wait on all the handles to stop + + info!("finished"); + Ok(()) }) } diff --git a/web3_proxy/src/bin/web3_proxy_cli/create_user.rs b/web3_proxy/src/bin/web3_proxy_cli/create_user.rs index 3c9f85c0..c1c43292 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/create_user.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/create_user.rs @@ -31,6 +31,10 @@ pub struct CreateUserSubCommand { /// maximum requests per minute. /// default to "None" which the code sees as "unlimited" requests. rpm: Option, + + #[argh(option)] + /// a short description of the key's purpose + description: Option, } impl CreateUserSubCommand { @@ -70,16 +74,16 @@ impl CreateUserSubCommand { ); // create a key for the new user - // TODO: requests_per_minute should be configurable let uk = user_keys::ActiveModel { user_id: u.id, api_key: sea_orm::Set(self.api_key.into()), requests_per_minute: sea_orm::Set(self.rpm), + description: sea_orm::Set(self.description), ..Default::default() }; // TODO: if this fails, rever adding the user, too - let uk = uk.save(&txn).await.context("Failed saving new user key")?; + let _uk = uk.save(&txn).await.context("Failed saving new user key")?; txn.commit().await?; diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 56b8d7b8..a2ac0e70 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -81,11 +81,8 @@ pub struct AppConfig { /// This is separate from the rpc limits. #[serde(default = "default_login_rate_limit_per_minute")] pub login_rate_limit_per_minute: u64, - /// Persist user stats in a redis (or compatible backend) - /// TODO: research more time series databases - pub persistent_redis_url: Option, - pub persistent_redis_max_connections: Option, /// Track rate limits in a redis (or compatible backend) + /// It is okay if this data is lost. pub volatile_redis_url: Option, /// maximum size of the connection pool for the cache /// If none, the minimum * 2 is used diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index d9be108f..207a0bb4 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -1,7 +1,9 @@ use super::errors::FrontendErrorResponse; use crate::app::{UserKeyData, Web3ProxyApp}; +use crate::jsonrpc::JsonRpcRequest; use anyhow::Context; use axum::headers::{authorization::Bearer, Origin, Referer, UserAgent}; +use chrono::{DateTime, Utc}; use deferred_rate_limiter::DeferredRateLimitResult; use entities::user_keys; use ipnet::IpNet; @@ -10,6 +12,8 @@ use redis_rate_limiter::RedisRateLimitResult; use sea_orm::{prelude::Decimal, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; use serde::Serialize; use std::fmt::Display; +use std::mem::size_of_val; +use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU32, AtomicUsize}; use std::{net::IpAddr, str::FromStr, sync::Arc}; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::time::Instant; @@ -24,6 +28,62 @@ pub enum UserKey { Uuid(Uuid), } +#[derive(Debug)] +pub enum RateLimitResult { + /// contains the IP of the anonymous user + /// TODO: option inside or outside the arc? + AllowedIp(IpAddr, OwnedSemaphorePermit), + /// contains the user_key_id of an authenticated user + AllowedUser(UserKeyData, Option), + /// contains the IP and retry_at of the anonymous user + RateLimitedIp(IpAddr, Option), + /// contains the user_key_id and retry_at of an authenticated user key + RateLimitedUser(UserKeyData, Option), + /// This key is not in our database. Deny access! + UnknownKey, +} + +#[derive(Clone, Debug, Serialize)] +pub struct AuthorizedKey { + pub ip: IpAddr, + pub origin: Option, + pub user_key_id: u64, + // TODO: just use an f32? even an f16 is probably fine + pub log_revert_chance: Decimal, +} + +#[derive(Debug, Default, Serialize)] +pub struct RequestMetadata { + pub datetime: DateTime, + pub request_bytes: AtomicUsize, + pub backend_requests: AtomicU16, + pub error_response: AtomicBool, + pub response_bytes: AtomicUsize, + pub response_millis: AtomicU32, +} + +#[derive(Clone, Debug, Serialize)] +pub enum AuthorizedRequest { + /// Request from this app + Internal, + /// Request from an anonymous IP address + Ip(#[serde(skip)] IpAddr), + /// Request from an authenticated and authorized user + User(#[serde(skip)] Option, AuthorizedKey), +} + +impl RequestMetadata { + pub fn new(request: &JsonRpcRequest) -> Self { + let request_bytes = size_of_val(request); + + Self { + request_bytes: request_bytes.into(), + datetime: Utc::now(), + ..Default::default() + } + } +} + impl UserKey { pub fn new() -> Self { Ulid::new().into() @@ -54,6 +114,7 @@ impl FromStr for UserKey { } else if let Ok(uuid) = s.parse::() { Ok(uuid.into()) } else { + // TODO: custom error type so that this shows as a 400 Err(anyhow::anyhow!("UserKey was not a ULID or UUID")) } } @@ -89,30 +150,6 @@ impl From for Uuid { } } -#[derive(Debug)] -pub enum RateLimitResult { - /// contains the IP of the anonymous user - /// TODO: option inside or outside the arc? - AllowedIp(IpAddr, OwnedSemaphorePermit), - /// contains the user_key_id of an authenticated user - AllowedUser(UserKeyData, Option), - /// contains the IP and retry_at of the anonymous user - RateLimitedIp(IpAddr, Option), - /// contains the user_key_id and retry_at of an authenticated user key - RateLimitedUser(UserKeyData, Option), - /// This key is not in our database. Deny access! - UnknownKey, -} - -#[derive(Debug, Serialize)] -pub struct AuthorizedKey { - pub ip: IpAddr, - pub origin: Option, - pub user_key_id: u64, - pub log_revert_chance: Decimal, - // TODO: what else? -} - impl AuthorizedKey { pub fn try_new( ip: IpAddr, @@ -180,16 +217,6 @@ impl AuthorizedKey { } } -#[derive(Debug, Serialize)] -pub enum AuthorizedRequest { - /// Request from this app - Internal, - /// Request from an anonymous IP address - Ip(#[serde(skip)] IpAddr), - /// Request from an authenticated and authorized user - User(#[serde(skip)] Option, AuthorizedKey), -} - impl AuthorizedRequest { /// Only User has a database connection in case it needs to save a revert to the database. pub fn db_conn(&self) -> Option<&DatabaseConnection> { @@ -205,8 +232,8 @@ impl Display for &AuthorizedRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { AuthorizedRequest::Internal => f.write_str("int"), - AuthorizedRequest::Ip(x) => f.write_str(&format!("ip:{}", x)), - AuthorizedRequest::User(_, x) => f.write_str(&format!("uk:{}", x.user_key_id)), + AuthorizedRequest::Ip(x) => f.write_str(&format!("ip-{}", x)), + AuthorizedRequest::User(_, x) => f.write_str(&format!("uk-{}", x.user_key_id)), } } } @@ -324,6 +351,11 @@ impl Web3ProxyApp { }) .await; + // if semaphore.available_permits() == 0 { + // // TODO: concurrent limit hit! emit a stat? less important for anon users + // // TODO: there is probably a race here + // } + let semaphore_permit = semaphore.acquire_owned().await?; Ok(semaphore_permit) @@ -345,6 +377,10 @@ impl Web3ProxyApp { // TODO: is this the best way to handle an arc .map_err(|err| anyhow::anyhow!(err))?; + // if semaphore.available_permits() == 0 { + // // TODO: concurrent limit hit! emit a stat + // } + let semaphore_permit = semaphore.acquire_owned().await?; Ok(Some(semaphore_permit)) @@ -419,7 +455,7 @@ impl Web3ProxyApp { } } else { // TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right - todo!("no rate limiter"); + Ok(RateLimitResult::AllowedIp(ip, semaphore)) } } @@ -538,12 +574,14 @@ impl Web3ProxyApp { // TODO: debug or trace? // this is too verbose, but a stat might be good // TODO: keys are secrets! use the id instead + // TODO: emit a stat trace!(?user_key, "rate limit exceeded until {:?}", retry_at); Ok(RateLimitResult::RateLimitedUser(user_data, Some(retry_at))) } Ok(DeferredRateLimitResult::RetryNever) => { // TODO: keys are secret. don't log them! trace!(?user_key, "rate limit is 0"); + // TODO: emit a stat Ok(RateLimitResult::RateLimitedUser(user_data, None)) } Err(err) => { @@ -556,8 +594,7 @@ impl Web3ProxyApp { } } else { // TODO: if no redis, rate limit with just a local cache? - // if we don't have redis, we probably don't have a db, so this probably will never happen - Err(anyhow::anyhow!("no redis. cannot rate limit")) + Ok(RateLimitResult::AllowedUser(user_data, semaphore)) } } } diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 25408bfa..10d0331b 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -41,7 +41,7 @@ pub async fn proxy_web3_rpc( let authorized_request = Arc::new(authorized_request); let f = tokio::spawn(async move { - app.proxy_web3_rpc(&authorized_request, payload) + app.proxy_web3_rpc(authorized_request, payload) .instrument(request_span) .await }); @@ -81,7 +81,7 @@ pub async fn proxy_web3_rpc_with_key( let authorized_request = Arc::new(authorized_request); let f = tokio::spawn(async move { - app.proxy_web3_rpc(&authorized_request, payload) + app.proxy_web3_rpc(authorized_request, payload) .instrument(request_span) .await }); diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 3380dd16..7dcea27a 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -167,7 +167,7 @@ async fn handle_socket_payload( let response = app .eth_subscribe( - authorized_request.clone(), + authorized_request, payload, subscription_count, response_sender.clone(), @@ -211,10 +211,7 @@ async fn handle_socket_payload( Ok(response.into()) } - _ => { - app.proxy_web3_rpc(&authorized_request, payload.into()) - .await - } + _ => app.proxy_web3_rpc(authorized_request, payload.into()).await, }; (id, response) diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 3b66f053..0fd900ca 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -247,6 +247,8 @@ impl OpenRequestHandle { error_handler }; + // TODO: check for "execution reverted" here + match error_handler { RequestErrorHandler::DebugLevel => { debug!(?err, %method, rpc=%self.conn, "bad response!"); diff --git a/web3_proxy/src/stats.rs b/web3_proxy/src/stats.rs index defdf1c0..e0f7fe11 100644 --- a/web3_proxy/src/stats.rs +++ b/web3_proxy/src/stats.rs @@ -1,86 +1,121 @@ -use anyhow::Context; +use crate::frontend::authorization::{AuthorizedKey, RequestMetadata}; +use chrono::{DateTime, Utc}; use derive_more::From; -use redis_rate_limiter::{redis, RedisConnection}; -use std::fmt::Display; +use entities::rpc_accounting; +use moka::future::{Cache, CacheBuilder}; +use parking_lot::{Mutex, RwLock}; +use sea_orm::DatabaseConnection; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::{ + sync::atomic::{AtomicU32, AtomicU64}, + time::Duration, +}; use tokio::task::JoinHandle; -use tracing::{debug, error, info}; - -use crate::frontend::authorization::AuthorizedRequest; - -#[derive(Debug)] -pub enum ProxyResponseType { - CacheHit, - CacheMiss, - Error, -} - -impl Display for ProxyResponseType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - ProxyResponseType::CacheHit => f.write_str("ch"), - ProxyResponseType::CacheMiss => f.write_str("cm"), - ProxyResponseType::Error => f.write_str("err"), - } - } -} +use tracing::{error, info, trace}; /// TODO: where should this be defined? +/// TODO: can we use something inside sea_orm instead? #[derive(Debug)] -pub struct ProxyResponseStat(String); - -/// A very basic stat that we store in redis. -/// This probably belongs in a true time series database like influxdb, but client -impl ProxyResponseStat { - pub fn new(method: String, response_type: ProxyResponseType, who: &AuthorizedRequest) -> Self { - // TODO: what order? - // TODO: app specific prefix. need at least the chain id - let redis_key = format!("proxy_response:{}:{}:{}", method, response_type, who); - - Self(redis_key) - } +pub struct ProxyResponseStat { + user_key_id: u64, + method: String, + metadata: RequestMetadata, } +// TODO: impl From for our database model +#[derive(Default)] +pub struct ProxyResponseAggregate { + // user_key_id: u64, + // method: String, + // error_response: bool, + frontend_requests: AtomicU32, + backend_requests: AtomicU32, + first_datetime: DateTime, + // TODO: would like to not need a mutex. see how it performs before caring too much + last_timestamp: Mutex>, + first_response_millis: u32, + sum_response_millis: AtomicU32, + sum_request_bytes: AtomicU32, + sum_response_bytes: AtomicU32, +} + +/// key is the (user_key_id, method, error_response) +pub type UserProxyResponseCache = Cache< + (u64, String, bool), + Arc, + hashbrown::hash_map::DefaultHashBuilder, +>; +/// key is the "time bucket" (timestamp / period) +pub type TimeProxyResponseCache = + Cache; + +pub struct StatEmitter { + chain_id: u64, + db_conn: DatabaseConnection, + period_seconds: u64, + /// the outer cache has a TTL and a handler for expiration + aggregated_proxy_responses: TimeProxyResponseCache, +} + +/// A stat that we aggregate and then store in a database. #[derive(Debug, From)] pub enum Web3ProxyStat { ProxyResponse(ProxyResponseStat), } -impl Web3ProxyStat { - fn into_redis_key(self, chain_id: u64) -> String { - match self { - Self::ProxyResponse(x) => format!("{}:{}", x.0, chain_id), +impl ProxyResponseStat { + // TODO: should RequestMetadata be in an arc? or can we handle refs here? + pub fn new(method: String, authorized_key: AuthorizedKey, metadata: RequestMetadata) -> Self { + Self { + user_key_id: authorized_key.user_key_id, + method, + metadata, } } } -pub struct StatEmitter; - impl StatEmitter { + pub fn new(chain_id: u64, db_conn: DatabaseConnection, period_seconds: u64) -> Arc { + let aggregated_proxy_responses = CacheBuilder::default() + .time_to_live(Duration::from_secs(period_seconds * 3 / 2)) + .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); + + let s = Self { + chain_id, + db_conn, + period_seconds, + aggregated_proxy_responses, + }; + + Arc::new(s) + } + pub async fn spawn( - chain_id: u64, - mut redis_conn: RedisConnection, + self: Arc, ) -> anyhow::Result<(flume::Sender, JoinHandle>)> { let (tx, rx) = flume::unbounded::(); // simple future that reads the channel and emits stats let f = async move { + // TODO: select on shutdown handle so we can be sure to save every aggregate! while let Ok(x) = rx.recv_async().await { + trace!(?x, "emitting stat"); + + // TODO: increment global stats (in redis? in local cache for prometheus?) + + let clone = self.clone(); + // TODO: batch stats? spawn this? + // TODO: where can we wait on this handle? + tokio::spawn(async move { clone.queue_user_stat(x).await }); - let x = x.into_redis_key(chain_id); - - // TODO: this is too loud. just doing it for dev - debug!(?x, "emitting stat"); - - if let Err(err) = redis::Cmd::incr(&x, 1) - .query_async::<_, ()>(&mut redis_conn) - .await - .context("incrementing stat") - { - error!(?err, "emitting stat") - } + // no need to save manually. they save on expire } + // shutting down. force a save + self.save_user_stats().await?; + info!("stat emitter exited"); Ok(()) @@ -90,4 +125,52 @@ impl StatEmitter { Ok((tx, handle)) } + + pub async fn queue_user_stat(&self, stat: Web3ProxyStat) -> anyhow::Result<()> { + match stat { + Web3ProxyStat::ProxyResponse(x) => { + // TODO: move this into another function? + + // get the user cache for the current time bucket + let time_bucket = (x.metadata.datetime.timestamp() as u64) / self.period_seconds; + let user_cache = self + .aggregated_proxy_responses + .get_with(time_bucket, async move { + CacheBuilder::default() + .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()) + }) + .await; + + let error_response = x.metadata.error_response.load(Ordering::Acquire); + + let key = (x.user_key_id, x.method.clone(), error_response); + + let user_aggregate = user_cache + .get_with(key, async move { + let last_timestamp = Mutex::new(x.metadata.datetime); + + let aggregate = ProxyResponseAggregate { + first_datetime: x.metadata.datetime, + first_response_millis: x + .metadata + .response_millis + .load(Ordering::Acquire), + last_timestamp, + ..Default::default() + }; + + Arc::new(aggregate) + }) + .await; + + todo!(); + } + } + + Ok(()) + } + + pub async fn save_user_stats(&self) -> anyhow::Result<()> { + todo!(); + } }