From e8f2a13f5dfcad212d9a2a039184770de6763822 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 2 Nov 2022 23:14:16 +0000 Subject: [PATCH] better stats aggregations --- Cargo.lock | 20 +- TODO.md | 20 +- docs/http routes.txt | 9 +- entities/Cargo.toml | 2 +- entities/src/rpc_accounting.rs | 5 +- migration/Cargo.toml | 2 +- migration/src/lib.rs | 2 + .../src/m20221101_222349_archive_request.rs | 39 ++ web3_proxy/Cargo.toml | 3 +- web3_proxy/src/app.rs | 26 +- web3_proxy/src/app_stats.rs | 563 ++++++++---------- web3_proxy/src/app_stats_old.rs | 461 ++++++++++++++ web3_proxy/src/frontend/authorization.rs | 3 + web3_proxy/src/frontend/errors.rs | 14 +- web3_proxy/src/frontend/rpc_proxy_http.rs | 3 +- web3_proxy/src/rpcs/blockchain.rs | 21 +- web3_proxy/src/rpcs/connection.rs | 2 +- web3_proxy/src/user_queries.rs | 16 +- 18 files changed, 829 insertions(+), 382 deletions(-) create mode 100644 migration/src/m20221101_222349_archive_request.rs create mode 100644 web3_proxy/src/app_stats_old.rs diff --git a/Cargo.lock b/Cargo.lock index cc24f712..8f03e629 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1091,19 +1091,6 @@ dependencies = [ "cipher 0.4.3", ] -[[package]] -name = "dashmap" -version = "5.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" -dependencies = [ - "cfg-if", - "hashbrown", - "lock_api", - "once_cell", - "parking_lot_core 0.9.3", -] - [[package]] name = "deadpool" version = "0.9.5" @@ -1361,7 +1348,7 @@ dependencies = [ [[package]] name = "entities" -version = "0.8.0" +version = "0.9.0" dependencies = [ "sea-orm", "serde", @@ -2670,7 +2657,7 @@ dependencies = [ [[package]] name = "migration" -version = "0.8.0" +version = "0.9.0" dependencies = [ "sea-orm-migration", "tokio", @@ -5541,7 +5528,7 @@ dependencies = [ [[package]] name = "web3_proxy" -version = "0.8.0" +version = "0.9.0" dependencies = [ "anyhow", "arc-swap", @@ -5551,7 +5538,6 @@ dependencies = [ "axum-macros", "chrono", "counter", - "dashmap", "deferred-rate-limiter", "derive_more", "dotenv", diff --git a/TODO.md b/TODO.md index d536e109..2a253476 100644 --- a/TODO.md +++ b/TODO.md @@ -215,7 +215,8 @@ These are roughly in order of completition - [x] in code - [x] in database with a migration - [x] instead of requests_per_minute on every key, have a "user_tier" that gets joined -- [ ] document url params with a test that works for examples +- [x] document url params with examples +- [x] improve "docs/http routes.txt" - [ ] include if archive query or not in the stats - this is already partially done, but we need to double check it works. preferrably with tests - [-] add configurable size limits to all the Caches @@ -224,6 +225,15 @@ These are roughly in order of completition - this must be opt-in or spawned since it will slow things down and will make their calls less private - [ ] automatic pruning of old revert logs once too many are collected - [ ] we currently default to 0.0 and don't expose a way to edit it. we have a database row, but we don't use it + +## V1 + +These are not yet ordered. + +- [ ] with a test that creates a user and modifies their key +- [ ] Uuid/Ulid instead of big_unsigned for database ids + - might have to use Uuid in sea-orm and then convert to Ulid on display + - https://www.kostolansky.sk/posts/how-to-migrate-to-uuid/ - [ ] make the "not synced" error more verbose - I think there is a bug in our synced_rpcs filtering. likely in has_block_data - seeing "not synced" when I load https://vfat.tools/esd/ @@ -236,15 +246,7 @@ These are roughly in order of completition - [ ] if no bearer token found in redis (likely because it expired), send 401 unauthorized - [ ] user create script should allow multiple keys per user - [ ] somehow the proxy thought latest was hours behind. need internal health check that forces reconnect if this happens -- [ ] Uuid/Ulid instead of big_unsigned for database ids - - might have to use Uuid in sea-orm and then convert to Ulid on display - - https://www.kostolansky.sk/posts/how-to-migrate-to-uuid/ - [ ] display concurrent requests per api key (only with authentication!) - -## V1 - -These are not yet ordered. - - [ ] change "remember me" to last until 4 weeks of no use, rather than 4 weeks since login - [ ] BUG! if sending transactions gets "INTERNAL_ERROR: existing tx with same hash", fake a success message - ERROR http_request:request:try_send_all_upstream_servers: web3_proxy::rpcs::request: bad response! err=JsonRpcClientError(JsonRpcError(JsonRpcError { code: -32000, message: "INTERNAL_ERROR: existing tx with same hash", data: None })) method=eth_sendRawTransaction rpc=local_erigon_alpha_archive id=01GF4HV03Y4ZNKQV8DW5NDQ5CG method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 }) self=Web3Connections { conns: {"local_erigon_alpha_archive_ws": Web3Connection { name: "local_erigon_alpha_archive_ws", blocks: "all", .. }, "local_geth_ws": Web3Connection { name: "local_geth_ws", blocks: 64, .. }, "local_erigon_alpha_archive": Web3Connection { name: "local_erigon_alpha_archive", blocks: "all", .. }}, .. } authorized_request=Some(User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 })) request=JsonRpcRequest { id: RawValue(39), method: "eth_sendRawTransaction", .. } request_metadata=Some(RequestMetadata { datetime: 2022-10-11T22:14:57.406829095Z, period_seconds: 60, request_bytes: 633, backend_requests: 0, no_servers: 0, error_response: false, response_bytes: 0, response_millis: 0 }) block_needed=None diff --git a/docs/http routes.txt b/docs/http routes.txt index 4788fc23..b8798224 100644 --- a/docs/http routes.txt +++ b/docs/http routes.txt @@ -90,13 +90,12 @@ GET /user/keys Checks the "AUTHORIZATION" header for a valid bearer token. If valid, displays data about the user's keys as JSON. -POST /user/keys +POST or PUT /user/keys Checks the "AUTHORIZATION" header for a valid bearer token. - If valid, allows the user to change options on their keys. + If valid, allows the user to create a new key or change options on their keys. The POSTed JSON can have these fields: - existing_key_id: Option, - existing_key: Option, + key_id: Option, description: Option, private_txs: Option, active: Option, @@ -105,7 +104,7 @@ POST /user/keys allowed_referers: Option, allowed_user_agents: Option, - You must set **either** `existing_key_id` **or** `existing_key`. + The PUTed JSON has the same fields as the POSTed JSON, except for there is no `key_id` If you do not want to update a field, do not include it in the POSTed JSON. If you want to delete a string field, include the data's key and set the value to an empty string. diff --git a/entities/Cargo.toml b/entities/Cargo.toml index e65d8ee8..cfde5d47 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "entities" -version = "0.8.0" +version = "0.9.0" edition = "2021" [lib] diff --git a/entities/src/rpc_accounting.rs b/entities/src/rpc_accounting.rs index c1eab9c5..94a128eb 100644 --- a/entities/src/rpc_accounting.rs +++ b/entities/src/rpc_accounting.rs @@ -11,12 +11,13 @@ pub struct Model { pub rpc_key_id: u64, pub chain_id: u64, pub method: String, + pub archive_request: bool, pub error_response: bool, pub period_datetime: DateTimeUtc, pub frontend_requests: u64, pub backend_requests: u64, - pub backend_retries: u64, - pub no_servers: u64, + // pub backend_retries: u64, + // pub no_servers: u64, pub cache_misses: u64, pub cache_hits: u64, pub sum_request_bytes: u64, diff --git a/migration/Cargo.toml b/migration/Cargo.toml index 36f70ee1..06b0d959 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "migration" -version = "0.8.0" +version = "0.9.0" edition = "2021" publish = false diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 03b5a127..51000b56 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -8,6 +8,7 @@ mod m20221025_210326_add_chain_id_to_reverts; mod m20221026_230819_rename_user_keys; mod m20221027_002407_user_tiers; mod m20221031_211916_clean_up; +mod m20221101_222349_archive_request; pub struct Migrator; @@ -23,6 +24,7 @@ impl MigratorTrait for Migrator { Box::new(m20221026_230819_rename_user_keys::Migration), Box::new(m20221027_002407_user_tiers::Migration), Box::new(m20221031_211916_clean_up::Migration), + Box::new(m20221101_222349_archive_request::Migration), ] } } diff --git a/migration/src/m20221101_222349_archive_request.rs b/migration/src/m20221101_222349_archive_request.rs new file mode 100644 index 00000000..bac6ff11 --- /dev/null +++ b/migration/src/m20221101_222349_archive_request.rs @@ -0,0 +1,39 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Alias::new("rpc_accounting")) + .add_column( + ColumnDef::new(Alias::new("archive_request")) + .boolean() + .not_null(), + ) + .drop_column(Alias::new("backend_retries")) + .drop_column(Alias::new("no_servers")) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Alias::new("rpc_accounting")) + .drop_column(Alias::new("archive_request")) + .to_owned(), + ) + .await?; + + Ok(()) + } +} diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 44ef8df3..2a745258 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "web3_proxy" -version = "0.8.0" +version = "0.9.0" edition = "2021" default-run = "web3_proxy" @@ -28,7 +28,6 @@ axum-macros = "0.2.3" # TODO: import chrono from sea-orm so we always have the same version chrono = "0.4.22" counter = "0.5.7" -dashmap = "5.4.0" derive_more = "0.99.17" dotenv = "0.15.0" ethers = { version = "1.0.0", features = ["rustls", "ws"] } diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index fe910874..d1501b1f 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -283,20 +283,14 @@ impl Web3ProxyApp { // setup a channel for receiving stats (generally with a high cardinality, such as per-user) // we do this in a channel so we don't slow down our response to the users let stat_sender = if let Some(db_conn) = db_conn.clone() { - // TODO: sender and receiver here are a little confusing. because the thing that reads the receiver is what actually submits the stats - let (stat_sender, save_handle, stat_handle) = { - // TODO: period from config instead of always being 60 seconds - let emitter = StatEmitter::new(top_config.app.chain_id, db_conn, 60); + let emitter_spawn = + StatEmitter::spawn(top_config.app.chain_id, db_conn, 60, shutdown_receiver)?; - emitter.spawn(shutdown_receiver).await? - }; + important_background_handles.push(emitter_spawn.background_handle); - cancellable_handles.push(stat_handle); - important_background_handles.push(save_handle); - - Some(stat_sender) + Some(emitter_spawn.stat_sender) } else { - warn!("cannot store stats without a redis connection"); + warn!("cannot store stats without a database connection"); None }; @@ -1008,7 +1002,7 @@ impl Web3ProxyApp { method => { // emit stats - // TODO: wait for them to be synced? + // TODO: if no servers synced, wait for them to be synced? let head_block_id = self .balanced_rpcs .head_block_id() @@ -1025,9 +1019,15 @@ impl Web3ProxyApp { .await? { // TODO: maybe this should be on the app and not on balanced_rpcs - let request_block_hash = + let (request_block_hash, archive_needed) = self.balanced_rpcs.block_hash(&request_block_needed).await?; + if archive_needed { + request_metadata + .archive_request + .store(true, atomic::Ordering::Relaxed); + } + BlockId { num: request_block_needed, hash: request_block_hash, diff --git a/web3_proxy/src/app_stats.rs b/web3_proxy/src/app_stats.rs index 9ee3cafa..399336a6 100644 --- a/web3_proxy/src/app_stats.rs +++ b/web3_proxy/src/app_stats.rs @@ -1,20 +1,18 @@ use crate::frontend::authorization::{AuthorizedKey, RequestMetadata}; use crate::jsonrpc::JsonRpcForwardedResponse; -use anyhow::Context; use chrono::{TimeZone, Utc}; -use dashmap::mapref::entry::Entry; -use dashmap::DashMap; use derive_more::From; use entities::rpc_accounting; -use hdrhistogram::Histogram; -use moka::future::{Cache, CacheBuilder}; -use sea_orm::{ActiveModelTrait, DatabaseConnection}; -use std::sync::atomic::{AtomicU64, Ordering}; +use hashbrown::HashMap; +use hdrhistogram::{Histogram, RecordError}; +use sea_orm::{ActiveModelTrait, DatabaseConnection, DbErr}; +use std::sync::atomic::Ordering; use std::sync::Arc; -use std::time::Duration; -use tokio::sync::{broadcast, Mutex as AsyncMutex}; +use std::time::{Duration, SystemTime}; +use tokio::sync::broadcast; use tokio::task::JoinHandle; -use tracing::{error, info, trace}; +use tokio::time::{interval_at, Instant}; +use tracing::{error, info}; /// TODO: where should this be defined? /// TODO: can we use something inside sea_orm instead? @@ -22,18 +20,15 @@ use tracing::{error, info, trace}; pub struct ProxyResponseStat { rpc_key_id: u64, method: String, - period_seconds: u64, - period_timestamp: u64, + archive_request: bool, request_bytes: u64, - /// if this is 0, there was a cache_hit + /// if backend_requests is 0, there was a cache_hit backend_requests: u64, error_response: bool, response_bytes: u64, response_millis: u64, } -pub type TimeBucketTimestamp = u64; - pub struct ProxyResponseHistograms { request_bytes: Histogram, response_bytes: Histogram, @@ -55,52 +50,165 @@ impl Default for ProxyResponseHistograms { } } -// TODO: impl From for our database model -pub struct ProxyResponseAggregate { - // these are the key - // rpc_key_id: u64, - // method: String, - // error_response: bool, - // TODO: this is the grandparent key. get it from there somehow - period_timestamp: u64, - frontend_requests: AtomicU64, - backend_requests: AtomicU64, - backend_retries: AtomicU64, - no_servers: AtomicU64, - cache_misses: AtomicU64, - cache_hits: AtomicU64, - sum_request_bytes: AtomicU64, - sum_response_bytes: AtomicU64, - sum_response_millis: AtomicU64, - histograms: AsyncMutex, -} - -#[derive(Clone, Debug, From, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct UserProxyResponseKey { +#[derive(Clone, From, Hash, PartialEq, Eq)] +struct ProxyResponseAggregateKey { rpc_key_id: u64, method: String, error_response: bool, + archive_request: bool, } -// TODO: think about nested maps more. does this need an arc? -pub type UserProxyResponseCache = DashMap>; -/// key is the "time bucket's timestamp" (timestamp / period * period) -pub type TimeProxyResponseCache = - Cache; +#[derive(Default)] +pub struct ProxyResponseAggregate { + frontend_requests: u64, + backend_requests: u64, + // TODO: related to backend_requests. get this level of detail out + // backend_retries: u64, + // TODO: related to backend_requests. get this level of detail out + // no_servers: u64, + cache_misses: u64, + cache_hits: u64, + sum_request_bytes: u64, + sum_response_bytes: u64, + sum_response_millis: u64, + histograms: ProxyResponseHistograms, +} + +/// A stat that we aggregate and then store in a database. +/// For now there is just one, but I think there might be others later +#[derive(Debug, From)] +pub enum Web3ProxyStat { + Response(ProxyResponseStat), +} + +#[derive(From)] +pub struct StatEmitterSpawn { + pub stat_sender: flume::Sender, + /// these handles are important and must be allowed to finish + pub background_handle: JoinHandle>, +} pub struct StatEmitter { chain_id: u64, db_conn: DatabaseConnection, period_seconds: u64, - /// the outer cache has a TTL and a handler for expiration - aggregated_proxy_responses: TimeProxyResponseCache, - save_rx: flume::Receiver, } -/// A stat that we aggregate and then store in a database. -#[derive(Debug, From)] -pub enum Web3ProxyStat { - ProxyResponse(ProxyResponseStat), +// TODO: impl `+=` for ProxyResponseAggregate? +impl ProxyResponseAggregate { + fn add(&mut self, stat: ProxyResponseStat) -> Result<(), RecordError> { + // a stat always come from just 1 frontend request + self.frontend_requests += 1; + + if stat.backend_requests == 0 { + // no backend request. cache hit! + self.cache_hits += 1; + } else { + // backend requests! cache miss! + self.cache_misses += 1; + + // a stat might have multiple backend requests + self.backend_requests += stat.backend_requests; + } + + self.sum_request_bytes += stat.request_bytes; + self.sum_response_bytes += stat.response_bytes; + self.sum_response_millis += stat.response_millis; + + // TODO: use `record_correct`? + self.histograms.request_bytes.record(stat.request_bytes)?; + self.histograms + .response_millis + .record(stat.response_millis)?; + self.histograms.response_bytes.record(stat.response_bytes)?; + + Ok(()) + } + + // TODO? help to turn this plus the key into a database model? + // TODO: take a db transaction instead so that we can batch + async fn save( + self, + chain_id: u64, + db_conn: &DatabaseConnection, + key: ProxyResponseAggregateKey, + period_timestamp: u64, + ) -> Result<(), DbErr> { + // this is a lot of variables + let period_datetime = Utc.timestamp(period_timestamp as i64, 0); + + let request_bytes = &self.histograms.request_bytes; + + let min_request_bytes = request_bytes.min(); + let mean_request_bytes = request_bytes.mean(); + let p50_request_bytes = request_bytes.value_at_quantile(0.50); + let p90_request_bytes = request_bytes.value_at_quantile(0.90); + let p99_request_bytes = request_bytes.value_at_quantile(0.99); + let max_request_bytes = request_bytes.max(); + + let response_millis = &self.histograms.response_millis; + + let min_response_millis = response_millis.min(); + let mean_response_millis = response_millis.mean(); + let p50_response_millis = response_millis.value_at_quantile(0.50); + let p90_response_millis = response_millis.value_at_quantile(0.90); + let p99_response_millis = response_millis.value_at_quantile(0.99); + let max_response_millis = response_millis.max(); + + let response_bytes = &self.histograms.response_bytes; + + let min_response_bytes = response_bytes.min(); + let mean_response_bytes = response_bytes.mean(); + let p50_response_bytes = response_bytes.value_at_quantile(0.50); + let p90_response_bytes = response_bytes.value_at_quantile(0.90); + let p99_response_bytes = response_bytes.value_at_quantile(0.99); + let max_response_bytes = response_bytes.max(); + + let aggregated_stat_model = rpc_accounting::ActiveModel { + id: sea_orm::NotSet, + + rpc_key_id: sea_orm::Set(key.rpc_key_id), + chain_id: sea_orm::Set(chain_id), + method: sea_orm::Set(key.method), + archive_request: sea_orm::Set(key.archive_request), + error_response: sea_orm::Set(key.error_response), + period_datetime: sea_orm::Set(period_datetime), + frontend_requests: sea_orm::Set(self.frontend_requests), + backend_requests: sea_orm::Set(self.backend_requests), + // backend_retries: sea_orm::Set(self.backend_retries), + // no_servers: sea_orm::Set(self.no_servers), + cache_misses: sea_orm::Set(self.cache_misses), + cache_hits: sea_orm::Set(self.cache_hits), + + sum_request_bytes: sea_orm::Set(self.sum_request_bytes), + min_request_bytes: sea_orm::Set(min_request_bytes), + mean_request_bytes: sea_orm::Set(mean_request_bytes), + p50_request_bytes: sea_orm::Set(p50_request_bytes), + p90_request_bytes: sea_orm::Set(p90_request_bytes), + p99_request_bytes: sea_orm::Set(p99_request_bytes), + max_request_bytes: sea_orm::Set(max_request_bytes), + + sum_response_millis: sea_orm::Set(self.sum_response_millis), + min_response_millis: sea_orm::Set(min_response_millis), + mean_response_millis: sea_orm::Set(mean_response_millis), + p50_response_millis: sea_orm::Set(p50_response_millis), + p90_response_millis: sea_orm::Set(p90_response_millis), + p99_response_millis: sea_orm::Set(p99_response_millis), + max_response_millis: sea_orm::Set(max_response_millis), + + sum_response_bytes: sea_orm::Set(self.sum_response_bytes), + min_response_bytes: sea_orm::Set(min_response_bytes), + mean_response_bytes: sea_orm::Set(mean_response_bytes), + p50_response_bytes: sea_orm::Set(p50_response_bytes), + p90_response_bytes: sea_orm::Set(p90_response_bytes), + p99_response_bytes: sea_orm::Set(p99_response_bytes), + max_response_bytes: sea_orm::Set(max_response_bytes), + }; + + aggregated_stat_model.save(db_conn).await?; + + Ok(()) + } } impl ProxyResponseStat { @@ -116,10 +224,11 @@ impl ProxyResponseStat { .expect("serializing here should always work") .len() as u64; + let archive_request = metadata.archive_request.load(Ordering::Acquire); let backend_requests = metadata.backend_requests.load(Ordering::Acquire); - let period_seconds = metadata.period_seconds; - let period_timestamp = - (metadata.start_datetime.timestamp() as u64) / period_seconds * period_seconds; + // let period_seconds = metadata.period_seconds; + // let period_timestamp = + // (metadata.start_datetime.timestamp() as u64) / period_seconds * period_seconds; let request_bytes = metadata.request_bytes; let error_response = metadata.error_response.load(Ordering::Acquire); @@ -128,96 +237,116 @@ impl ProxyResponseStat { Self { rpc_key_id: authorized_key.rpc_key_id, + archive_request, method, backend_requests, - period_seconds, - period_timestamp, request_bytes, error_response, response_bytes, response_millis, } } + + fn key(&self) -> ProxyResponseAggregateKey { + ProxyResponseAggregateKey { + rpc_key_id: self.rpc_key_id, + method: self.method.clone(), + error_response: self.error_response, + archive_request: self.archive_request, + } + } } impl StatEmitter { - pub fn new(chain_id: u64, db_conn: DatabaseConnection, period_seconds: u64) -> Arc { - let (save_tx, save_rx) = flume::unbounded(); + pub fn spawn( + chain_id: u64, + db_conn: DatabaseConnection, + period_seconds: u64, + shutdown_receiver: broadcast::Receiver<()>, + ) -> anyhow::Result { + let (stat_sender, stat_receiver) = flume::unbounded(); - // this needs to be long enough that there are definitely no outstanding queries - // TODO: what should the "safe" multiplier be? what if something is late? - // TODO: in most cases this delays more than necessary. think of how to do this without dashmap which might let us proceed - let ttl_seconds = period_seconds * 3; - - let aggregated_proxy_responses = CacheBuilder::default() - .time_to_live(Duration::from_secs(ttl_seconds)) - .eviction_listener_with_queued_delivery_mode(move |_, v, _| { - // this function must not panic! - if let Err(err) = save_tx.send(v) { - error!(?err, "unable to save. sender closed!"); - } - }) - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); - - let s = Self { + let mut new = Self { chain_id, db_conn, period_seconds, - aggregated_proxy_responses, - save_rx, }; - Arc::new(s) + let handle = + tokio::spawn(async move { new.stat_loop(stat_receiver, shutdown_receiver).await }); + + Ok((stat_sender, handle).into()) } - pub async fn spawn( - self: Arc, - shutdown_receiver: broadcast::Receiver<()>, - ) -> anyhow::Result<( - flume::Sender, - JoinHandle>, - JoinHandle>, - )> { - let (aggregate_tx, aggregate_rx) = flume::unbounded::(); - - // TODO: join and flatten these handles - let aggregate_handle = tokio::spawn( - self.clone() - .aggregate_stats_loop(aggregate_rx, shutdown_receiver), - ); - let save_handle = tokio::spawn(self.save_stats_loop()); - - Ok((aggregate_tx, aggregate_handle, save_handle)) - } - - /// simple future that reads the channel and aggregates stats in a local cache. - async fn aggregate_stats_loop( - self: Arc, - aggregate_rx: flume::Receiver, + async fn stat_loop( + &mut self, + stat_receiver: flume::Receiver, mut shutdown_receiver: broadcast::Receiver<()>, ) -> anyhow::Result<()> { + let system_now = SystemTime::now(); + + let duration_since_epoch = system_now + .duration_since(SystemTime::UNIX_EPOCH) + .expect("time machines don't exist"); + + // TODO: change period_seconds from u64 to u32 + let current_period = duration_since_epoch + .checked_div(self.period_seconds as u32) + .unwrap() + * self.period_seconds as u32; + + let duration_to_next_period = + Duration::from_secs(self.period_seconds) - (duration_since_epoch - current_period); + + // start the interval when the next period starts + let start_instant = Instant::now() + duration_to_next_period; + let mut interval = interval_at(start_instant, Duration::from_secs(self.period_seconds)); + + // loop between different futures to update these mutables + let mut period_timestamp = current_period.as_secs(); + let mut response_aggregate_map = + HashMap::::new(); + loop { tokio::select! { - x = aggregate_rx.recv_async() => { - match x { - Ok(x) => { - trace!(?x, "aggregating stat"); + stat = stat_receiver.recv_async() => { + match stat? { + Web3ProxyStat::Response(stat) => { + let key = stat.key(); - // TODO: increment global stats (in redis? in local cache for prometheus?) + // TODO: does hashmap have get_or_insert? + if ! response_aggregate_map.contains_key(&key) { + response_aggregate_map.insert(key.clone(), Default::default()); + }; - // TODO: batch stats? - // TODO: where can we wait on this handle? - let clone = self.clone(); - tokio::spawn(async move { clone.aggregate_stat(x).await }); - }, - Err(err) => { - error!(?err, "aggregate_rx"); + if let Some(value) = response_aggregate_map.get_mut(&key) { + if let Err(err) = value.add(stat) { + error!(?err, "unable to aggregate stats!"); + }; + } else { + unimplemented!(); + } } } } + _ = interval.tick() => { + // save all the aggregated stats + // TODO: batch these saves + for (key, aggregate) in response_aggregate_map.drain() { + if let Err(err) = aggregate.save(self.chain_id, &self.db_conn, key, period_timestamp).await { + error!(?err, "Unable to save stat while shutting down!"); + }; + } + // advance to the next period + // TODO: is this safe? what if there is drift? + period_timestamp += self.period_seconds; + } x = shutdown_receiver.recv() => { match x { - Ok(_) => info!("aggregate stats loop shutting down"), + Ok(_) => { + info!("aggregate stat_loop shutting down"); + // TODO: call aggregate_stat for all the + }, Err(err) => error!(?err, "shutdown receiver"), } break; @@ -225,210 +354,16 @@ impl StatEmitter { } } - // shutting down. force a save of any pending stats - // we do not use invalidate_all because that is done on a background thread - for (key, _) in self.aggregated_proxy_responses.into_iter() { - self.aggregated_proxy_responses.invalidate(&key).await; + for (key, aggregate) in response_aggregate_map.drain() { + if let Err(err) = aggregate + .save(self.chain_id, &self.db_conn, key, period_timestamp) + .await + { + error!(?err, "Unable to save stat while shutting down!"); + }; } - info!("aggregate stats loop finished"); - - Ok(()) - } - - async fn save_stats_loop(self: Arc) -> anyhow::Result<()> { - while let Ok(x) = self.save_rx.recv_async().await { - // TODO: batch these - for (k, v) in x.into_iter() { - // TODO: this is a lot of variables - let period_datetime = Utc.timestamp(v.period_timestamp as i64, 0); - let frontend_requests = v.frontend_requests.load(Ordering::Acquire); - let backend_requests = v.backend_requests.load(Ordering::Acquire); - let backend_retries = v.backend_retries.load(Ordering::Acquire); - let no_servers = v.no_servers.load(Ordering::Acquire); - let cache_misses = v.cache_misses.load(Ordering::Acquire); - let cache_hits = v.cache_hits.load(Ordering::Acquire); - let sum_request_bytes = v.sum_request_bytes.load(Ordering::Acquire); - let sum_response_millis = v.sum_response_millis.load(Ordering::Acquire); - let sum_response_bytes = v.sum_response_bytes.load(Ordering::Acquire); - - let histograms = v.histograms.lock().await; - - let request_bytes = &histograms.request_bytes; - - let min_request_bytes = request_bytes.min(); - let mean_request_bytes = request_bytes.mean(); - let p50_request_bytes = request_bytes.value_at_quantile(0.50); - let p90_request_bytes = request_bytes.value_at_quantile(0.90); - let p99_request_bytes = request_bytes.value_at_quantile(0.99); - let max_request_bytes = request_bytes.max(); - - let response_millis = &histograms.response_millis; - - let min_response_millis = response_millis.min(); - let mean_response_millis = response_millis.mean(); - let p50_response_millis = response_millis.value_at_quantile(0.50); - let p90_response_millis = response_millis.value_at_quantile(0.90); - let p99_response_millis = response_millis.value_at_quantile(0.99); - let max_response_millis = response_millis.max(); - - let response_bytes = &histograms.response_bytes; - - let min_response_bytes = response_bytes.min(); - let mean_response_bytes = response_bytes.mean(); - let p50_response_bytes = response_bytes.value_at_quantile(0.50); - let p90_response_bytes = response_bytes.value_at_quantile(0.90); - let p99_response_bytes = response_bytes.value_at_quantile(0.99); - let max_response_bytes = response_bytes.max(); - - drop(histograms); - - let stat = rpc_accounting::ActiveModel { - id: sea_orm::NotSet, - - rpc_key_id: sea_orm::Set(k.rpc_key_id), - chain_id: sea_orm::Set(self.chain_id), - method: sea_orm::Set(k.method.clone()), - error_response: sea_orm::Set(k.error_response), - period_datetime: sea_orm::Set(period_datetime), - frontend_requests: sea_orm::Set(frontend_requests), - backend_requests: sea_orm::Set(backend_requests), - backend_retries: sea_orm::Set(backend_retries), - no_servers: sea_orm::Set(no_servers), - cache_misses: sea_orm::Set(cache_misses), - cache_hits: sea_orm::Set(cache_hits), - - sum_request_bytes: sea_orm::Set(sum_request_bytes), - min_request_bytes: sea_orm::Set(min_request_bytes), - mean_request_bytes: sea_orm::Set(mean_request_bytes), - p50_request_bytes: sea_orm::Set(p50_request_bytes), - p90_request_bytes: sea_orm::Set(p90_request_bytes), - p99_request_bytes: sea_orm::Set(p99_request_bytes), - max_request_bytes: sea_orm::Set(max_request_bytes), - - sum_response_millis: sea_orm::Set(sum_response_millis), - min_response_millis: sea_orm::Set(min_response_millis), - mean_response_millis: sea_orm::Set(mean_response_millis), - p50_response_millis: sea_orm::Set(p50_response_millis), - p90_response_millis: sea_orm::Set(p90_response_millis), - p99_response_millis: sea_orm::Set(p99_response_millis), - max_response_millis: sea_orm::Set(max_response_millis), - - sum_response_bytes: sea_orm::Set(sum_response_bytes), - min_response_bytes: sea_orm::Set(min_response_bytes), - mean_response_bytes: sea_orm::Set(mean_response_bytes), - p50_response_bytes: sea_orm::Set(p50_response_bytes), - p90_response_bytes: sea_orm::Set(p90_response_bytes), - p99_response_bytes: sea_orm::Set(p99_response_bytes), - max_response_bytes: sea_orm::Set(max_response_bytes), - }; - - // TODO: if this fails, rever adding the user, too - if let Err(err) = stat - .save(&self.db_conn) - .await - .context("Saving rpc_accounting stat") - { - error!(?err, "unable to save aggregated stats"); - } - } - } - - info!("stat saver exited"); - - Ok(()) - } - - pub async fn aggregate_stat(&self, stat: Web3ProxyStat) -> anyhow::Result<()> { - trace!(?stat, "aggregating"); - match stat { - Web3ProxyStat::ProxyResponse(stat) => { - // TODO: move this whole closure to another function? - - debug_assert_eq!(stat.period_seconds, self.period_seconds); - - // get the user cache for the current period - // TODO: i don't think this works right. maybe do DashMap entry api as the outer variable - let user_cache = self - .aggregated_proxy_responses - .get_with_by_ref(&stat.period_timestamp, async move { Default::default() }) - .await; - - let key = (stat.rpc_key_id, stat.method, stat.error_response).into(); - - let user_aggregate = match user_cache.entry(key) { - Entry::Occupied(x) => x.get().clone(), - Entry::Vacant(y) => { - let histograms = ProxyResponseHistograms::default(); - - // TODO: create a counter here that we use to tell when it is safe to flush these? faster than waiting 3 periods - - let aggregate = ProxyResponseAggregate { - period_timestamp: stat.period_timestamp, - // start most things at 0 because we add outside this getter - frontend_requests: 0.into(), - backend_requests: 0.into(), - backend_retries: 0.into(), - no_servers: 0.into(), - cache_misses: 0.into(), - cache_hits: 0.into(), - sum_request_bytes: 0.into(), - sum_response_bytes: 0.into(), - sum_response_millis: 0.into(), - histograms: AsyncMutex::new(histograms), - }; - - // TODO: store this arc in the map - // TODO: does this have a race condition? - - let aggregate = Arc::new(aggregate); - - y.insert(aggregate.clone()); - - aggregate - } - }; - - // a stat always come from just 1 frontend request - user_aggregate - .frontend_requests - .fetch_add(1, Ordering::Acquire); - - if stat.backend_requests == 0 { - // no backend request. cache hit! - user_aggregate.cache_hits.fetch_add(1, Ordering::Acquire); - } else { - // backend requests! cache miss! - user_aggregate.cache_misses.fetch_add(1, Ordering::Acquire); - - // a stat might have multiple backend requests - user_aggregate - .backend_requests - .fetch_add(stat.backend_requests, Ordering::Acquire); - } - - user_aggregate - .sum_request_bytes - .fetch_add(stat.request_bytes, Ordering::Release); - - user_aggregate - .sum_response_bytes - .fetch_add(stat.response_bytes, Ordering::Release); - - user_aggregate - .sum_response_millis - .fetch_add(stat.response_millis, Ordering::Release); - - { - let mut histograms = user_aggregate.histograms.lock().await; - - // TODO: use `record_correct`? - histograms.request_bytes.record(stat.request_bytes)?; - histograms.response_millis.record(stat.response_millis)?; - histograms.response_bytes.record(stat.response_bytes)?; - } - } - } + info!("aggregated stat_loop shut down"); Ok(()) } diff --git a/web3_proxy/src/app_stats_old.rs b/web3_proxy/src/app_stats_old.rs new file mode 100644 index 00000000..93f5cd55 --- /dev/null +++ b/web3_proxy/src/app_stats_old.rs @@ -0,0 +1,461 @@ +use crate::frontend::authorization::{AuthorizedKey, RequestMetadata}; +use crate::jsonrpc::JsonRpcForwardedResponse; +use anyhow::Context; +use chrono::{TimeZone, Utc}; +use dashmap::mapref::entry::Entry; +use dashmap::DashMap; +use derive_more::From; +use entities::rpc_accounting; +use hdrhistogram::Histogram; +use moka::future::{Cache, CacheBuilder, ConcurrentCacheExt}; +use sea_orm::{ActiveModelTrait, DatabaseConnection}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::{broadcast, Mutex as AsyncMutex}; +use tokio::task::JoinHandle; +use tracing::{error, info, trace}; + +/// TODO: where should this be defined? +/// TODO: can we use something inside sea_orm instead? +#[derive(Debug)] +pub struct ProxyResponseStat { + rpc_key_id: u64, + method: String, + archive_request: bool, + period_seconds: u64, + period_timestamp: u64, + request_bytes: u64, + /// if backend_requests is 0, there was a cache_hit + backend_requests: u64, + error_response: bool, + response_bytes: u64, + response_millis: u64, +} + +pub type TimeBucketTimestamp = u64; + +pub struct ProxyResponseHistograms { + request_bytes: Histogram, + response_bytes: Histogram, + response_millis: Histogram, +} + +impl Default for ProxyResponseHistograms { + fn default() -> Self { + // TODO: how many significant figures? + let request_bytes = Histogram::new(5).expect("creating request_bytes histogram"); + let response_bytes = Histogram::new(5).expect("creating response_bytes histogram"); + let response_millis = Histogram::new(5).expect("creating response_millis histogram"); + + Self { + request_bytes, + response_bytes, + response_millis, + } + } +} + +// TODO: impl From for our database model +pub struct ProxyResponseAggregate { + // these are the key + // rpc_key_id: u64, + // method: String, + // error_response: bool, + // TODO: this is the grandparent key. get it from there somehow + period_timestamp: u64, + archive_request: bool, + frontend_requests: AtomicU64, + backend_requests: AtomicU64, + backend_retries: AtomicU64, + no_servers: AtomicU64, + cache_misses: AtomicU64, + cache_hits: AtomicU64, + sum_request_bytes: AtomicU64, + sum_response_bytes: AtomicU64, + sum_response_millis: AtomicU64, + histograms: AsyncMutex, +} + +#[derive(Clone, Debug, From, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct UserProxyResponseKey { + rpc_key_id: u64, + method: String, + error_response: bool, +} + +// TODO: think about nested maps more. does this need an arc? +pub type UserProxyResponseCache = Arc>>; +/// key is the "time bucket's timestamp" (timestamp / period * period) +pub type TimeProxyResponseCache = + Cache; + +pub struct StatEmitter { + chain_id: u64, + db_conn: DatabaseConnection, + period_seconds: u64, + /// the outer cache has a TTL and a handler for expiration + aggregated_proxy_responses: TimeProxyResponseCache, + save_rx: flume::Receiver, +} + +/// A stat that we aggregate and then store in a database. +#[derive(Debug, From)] +pub enum Web3ProxyStat { + Response(ProxyResponseStat), +} + +impl ProxyResponseStat { + // TODO: should RequestMetadata be in an arc? or can we handle refs here? + pub fn new( + method: String, + authorized_key: AuthorizedKey, + metadata: Arc, + response: &JsonRpcForwardedResponse, + ) -> Self { + // TODO: do this without serializing to a string. this is going to slow us down! + let response_bytes = serde_json::to_string(response) + .expect("serializing here should always work") + .len() as u64; + + let archive_request = metadata.archive_request.load(Ordering::Acquire); + let backend_requests = metadata.backend_requests.load(Ordering::Acquire); + let period_seconds = metadata.period_seconds; + let period_timestamp = + (metadata.start_datetime.timestamp() as u64) / period_seconds * period_seconds; + let request_bytes = metadata.request_bytes; + let error_response = metadata.error_response.load(Ordering::Acquire); + + // TODO: timestamps could get confused by leap seconds. need tokio time instead + let response_millis = metadata.start_instant.elapsed().as_millis() as u64; + + Self { + rpc_key_id: authorized_key.rpc_key_id, + archive_request, + method, + backend_requests, + period_seconds, + period_timestamp, + request_bytes, + error_response, + response_bytes, + response_millis, + } + } +} + +impl StatEmitter { + pub fn new(chain_id: u64, db_conn: DatabaseConnection, period_seconds: u64) -> Arc { + let (save_tx, save_rx) = flume::unbounded(); + + // this needs to be long enough that there are definitely no outstanding queries + // TODO: what should the "safe" multiplier be? what if something is late? + // TODO: in most cases this delays more than necessary. think of how to do this without dashmap which might let us proceed + let ttl_seconds = period_seconds * 3; + + let aggregated_proxy_responses = CacheBuilder::default() + .time_to_live(Duration::from_secs(ttl_seconds)) + .eviction_listener_with_queued_delivery_mode(move |_, v, _| { + // this function must not panic! + if let Err(err) = save_tx.send(v) { + error!(?err, "unable to save. sender closed!"); + } + }) + .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); + + let s = Self { + chain_id, + db_conn, + period_seconds, + aggregated_proxy_responses, + save_rx, + }; + + Arc::new(s) + } + + pub async fn spawn( + self: Arc, + shutdown_receiver: broadcast::Receiver<()>, + ) -> anyhow::Result<( + flume::Sender, + JoinHandle>, + JoinHandle>, + )> { + let (aggregate_tx, aggregate_rx) = flume::unbounded::(); + + let (finished_tx, finished_rx) = flume::bounded(1); + + let aggregate_handle = tokio::spawn(self.clone().aggregate_stats_loop( + aggregate_rx, + shutdown_receiver, + finished_rx, + )); + let save_handle = tokio::spawn(self.save_stats_loop(finished_tx)); + + Ok((aggregate_tx, aggregate_handle, save_handle)) + } + + /// simple future that reads the channel and aggregates stats in a local cache. + async fn aggregate_stats_loop( + self: Arc, + aggregate_rx: flume::Receiver, + mut shutdown_receiver: broadcast::Receiver<()>, + finished_rx: flume::Receiver<()>, + ) -> anyhow::Result<()> { + loop { + tokio::select! { + x = aggregate_rx.recv_async() => { + match x { + Ok(x) => { + trace!(?x, "aggregating stat"); + + // TODO: increment global stats (in redis? in local cache for prometheus?) + + // TODO: batch stats? + // TODO: where can we wait on this handle? + let clone = self.clone(); + tokio::spawn(async move { clone.aggregate_stat(x).await }); + }, + Err(err) => { + error!(?err, "aggregate_rx"); + } + } + } + x = shutdown_receiver.recv() => { + match x { + Ok(_) => { + info!("aggregate stats loop shutting down"); + // TODO: call aggregate_stat for all the + }, + Err(err) => error!(?err, "shutdown receiver"), + } + break; + } + } + } + + // shutting down. force a save of any pending stats + // we do not use invalidate_all because that is done on a background thread + // TODO: i don't think this works + for (key, _) in self.aggregated_proxy_responses.into_iter() { + // TODO: call drain or remove or something instead? + self.aggregated_proxy_responses.invalidate(&key).await; + } + + self.aggregated_proxy_responses.sync(); + + todo!("drop self.aggregated_proxy_responses"); + + // TODO: timeout on this? + finished_rx.recv_async().await?; + + info!("aggregate stats loop finished"); + + Ok(()) + } + + async fn save_stats_loop( + self: Arc, + finished_tx: flume::Sender<()>, + ) -> anyhow::Result<()> { + while let Ok(x) = self.save_rx.recv_async().await { + // TODO: batch these + // TODO: i'm not seeing these on shutdown + for x in x.iter() { + let k = x.key(); + let v = x.value(); + + // TODO: this is a lot of variables + let period_datetime = Utc.timestamp(v.period_timestamp as i64, 0); + let frontend_requests = v.frontend_requests.load(Ordering::Acquire); + let backend_requests = v.backend_requests.load(Ordering::Acquire); + let backend_retries = v.backend_retries.load(Ordering::Acquire); + let no_servers = v.no_servers.load(Ordering::Acquire); + let cache_misses = v.cache_misses.load(Ordering::Acquire); + let cache_hits = v.cache_hits.load(Ordering::Acquire); + let sum_request_bytes = v.sum_request_bytes.load(Ordering::Acquire); + let sum_response_millis = v.sum_response_millis.load(Ordering::Acquire); + let sum_response_bytes = v.sum_response_bytes.load(Ordering::Acquire); + + let histograms = v.histograms.lock().await; + + let request_bytes = &histograms.request_bytes; + + let min_request_bytes = request_bytes.min(); + let mean_request_bytes = request_bytes.mean(); + let p50_request_bytes = request_bytes.value_at_quantile(0.50); + let p90_request_bytes = request_bytes.value_at_quantile(0.90); + let p99_request_bytes = request_bytes.value_at_quantile(0.99); + let max_request_bytes = request_bytes.max(); + + let response_millis = &histograms.response_millis; + + let min_response_millis = response_millis.min(); + let mean_response_millis = response_millis.mean(); + let p50_response_millis = response_millis.value_at_quantile(0.50); + let p90_response_millis = response_millis.value_at_quantile(0.90); + let p99_response_millis = response_millis.value_at_quantile(0.99); + let max_response_millis = response_millis.max(); + + let response_bytes = &histograms.response_bytes; + + let min_response_bytes = response_bytes.min(); + let mean_response_bytes = response_bytes.mean(); + let p50_response_bytes = response_bytes.value_at_quantile(0.50); + let p90_response_bytes = response_bytes.value_at_quantile(0.90); + let p99_response_bytes = response_bytes.value_at_quantile(0.99); + let max_response_bytes = response_bytes.max(); + + drop(histograms); + + let stat = rpc_accounting::ActiveModel { + id: sea_orm::NotSet, + + rpc_key_id: sea_orm::Set(k.rpc_key_id), + chain_id: sea_orm::Set(self.chain_id), + method: sea_orm::Set(k.method.clone()), + archive_request: sea_orm::Set(v.archive_request), + error_response: sea_orm::Set(k.error_response), + period_datetime: sea_orm::Set(period_datetime), + frontend_requests: sea_orm::Set(frontend_requests), + backend_requests: sea_orm::Set(backend_requests), + backend_retries: sea_orm::Set(backend_retries), + no_servers: sea_orm::Set(no_servers), + cache_misses: sea_orm::Set(cache_misses), + cache_hits: sea_orm::Set(cache_hits), + + sum_request_bytes: sea_orm::Set(sum_request_bytes), + min_request_bytes: sea_orm::Set(min_request_bytes), + mean_request_bytes: sea_orm::Set(mean_request_bytes), + p50_request_bytes: sea_orm::Set(p50_request_bytes), + p90_request_bytes: sea_orm::Set(p90_request_bytes), + p99_request_bytes: sea_orm::Set(p99_request_bytes), + max_request_bytes: sea_orm::Set(max_request_bytes), + + sum_response_millis: sea_orm::Set(sum_response_millis), + min_response_millis: sea_orm::Set(min_response_millis), + mean_response_millis: sea_orm::Set(mean_response_millis), + p50_response_millis: sea_orm::Set(p50_response_millis), + p90_response_millis: sea_orm::Set(p90_response_millis), + p99_response_millis: sea_orm::Set(p99_response_millis), + max_response_millis: sea_orm::Set(max_response_millis), + + sum_response_bytes: sea_orm::Set(sum_response_bytes), + min_response_bytes: sea_orm::Set(min_response_bytes), + mean_response_bytes: sea_orm::Set(mean_response_bytes), + p50_response_bytes: sea_orm::Set(p50_response_bytes), + p90_response_bytes: sea_orm::Set(p90_response_bytes), + p99_response_bytes: sea_orm::Set(p99_response_bytes), + max_response_bytes: sea_orm::Set(max_response_bytes), + }; + + // TODO: if this fails, what should we do? + if let Err(err) = stat + .save(&self.db_conn) + .await + .context("Saving rpc_accounting stat") + { + error!(?err, "unable to save aggregated stats"); + } else { + trace!("stat saved"); + } + } + } + + info!("stat saver exited"); + + finished_tx.send_async(()).await?; + + Ok(()) + } + + pub async fn aggregate_stat(&self, stat: Web3ProxyStat) -> anyhow::Result<()> { + match stat { + Web3ProxyStat::Response(stat) => { + // TODO: move this whole closure to another function? + + debug_assert_eq!(stat.period_seconds, self.period_seconds); + + // get the user cache for the current period + // TODO: i don't think this works right. maybe do DashMap entry api as the outer variable + let user_cache = self + .aggregated_proxy_responses + .get_with(stat.period_timestamp, async move { Default::default() }) + .await; + + let key = (stat.rpc_key_id, stat.method, stat.error_response).into(); + + let user_aggregate = match user_cache.entry(key) { + Entry::Occupied(x) => x.get().clone(), + Entry::Vacant(y) => { + let histograms = ProxyResponseHistograms::default(); + + // TODO: create a counter here that we use to tell when it is safe to flush these? faster than waiting 3 periods + + let aggregate = ProxyResponseAggregate { + period_timestamp: stat.period_timestamp, + archive_request: stat.archive_request, + // start most things at 0 because we add outside this getter + frontend_requests: 0.into(), + backend_requests: 0.into(), + backend_retries: 0.into(), + no_servers: 0.into(), + cache_misses: 0.into(), + cache_hits: 0.into(), + sum_request_bytes: 0.into(), + sum_response_bytes: 0.into(), + sum_response_millis: 0.into(), + histograms: AsyncMutex::new(histograms), + }; + + let aggregate = Arc::new(aggregate); + + y.insert(aggregate).clone() + } + }; + + // a stat always come from just 1 frontend request + user_aggregate + .frontend_requests + .fetch_add(1, Ordering::Acquire); + + if stat.backend_requests == 0 { + // no backend request. cache hit! + user_aggregate.cache_hits.fetch_add(1, Ordering::Acquire); + } else { + // backend requests! cache miss! + user_aggregate.cache_misses.fetch_add(1, Ordering::Acquire); + + // a stat might have multiple backend requests + user_aggregate + .backend_requests + .fetch_add(stat.backend_requests, Ordering::Acquire); + } + + user_aggregate + .sum_request_bytes + .fetch_add(stat.request_bytes, Ordering::Release); + + user_aggregate + .sum_response_bytes + .fetch_add(stat.response_bytes, Ordering::Release); + + user_aggregate + .sum_response_millis + .fetch_add(stat.response_millis, Ordering::Release); + + { + let mut histograms = user_aggregate.histograms.lock().await; + + // TODO: use `record_correct`? + histograms.request_bytes.record(stat.request_bytes)?; + histograms.response_millis.record(stat.response_millis)?; + histograms.response_bytes.record(stat.response_bytes)?; + } + } + } + + Ok(()) + } +} diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index a65a8d74..619ef4be 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -65,6 +65,8 @@ pub struct RequestMetadata { // TODO: better name for this pub period_seconds: u64, pub request_bytes: u64, + // TODO: "archive" isn't really a boolean. + pub archive_request: AtomicBool, /// if this is 0, there was a cache_hit pub backend_requests: AtomicU64, pub no_servers: AtomicU64, @@ -96,6 +98,7 @@ impl RequestMetadata { start_datetime: Utc::now(), period_seconds, request_bytes, + archive_request: false.into(), backend_requests: 0.into(), no_servers: 0.into(), error_response: false.into(), diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 75ac889b..9912aee3 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -14,7 +14,7 @@ use redis_rate_limiter::redis::RedisError; use reqwest::header::ToStrError; use sea_orm::DbErr; use std::{error::Error, net::IpAddr}; -use tokio::time::Instant; +use tokio::{task::JoinError, time::Instant}; use tracing::{instrument, trace, warn}; // TODO: take "IntoResponse" instead of Response? @@ -30,6 +30,7 @@ pub enum FrontendErrorResponse { HeaderToString(ToStrError), InvalidHeaderValue(InvalidHeaderValue), IpAddrParse(AddrParseError), + JoinError(JoinError), NotFound, RateLimitedUser(UserKeyData, Option), RateLimitedIp(IpAddr, Option), @@ -114,6 +115,17 @@ impl IntoResponse for FrontendErrorResponse { ), ) } + Self::JoinError(err) => { + warn!(?err, "JoinError. likely shutting down"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "Unable to complete request", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } Self::NotFound => { // TODO: emit a stat? // TODO: instead of an error, show a normal html page for 404 diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index ae8d9464..877ccfbc 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -87,7 +87,8 @@ pub async fn proxy_web3_rpc_with_key( .await }); - let response = f.await.expect("JoinHandle should always work")?; + // if this is an error, we are likely shutting down + let response = f.await??; Ok(Json(&response).into_response()) } diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index a858a382..c88cc19f 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -139,25 +139,30 @@ impl Web3Connections { } /// Convenience method to get the cannonical block at a given block height. - pub async fn block_hash(&self, num: &U64) -> anyhow::Result { - let block = self.cannonical_block(num).await?; + pub async fn block_hash(&self, num: &U64) -> anyhow::Result<(H256, bool)> { + let (block, is_archive_block) = self.cannonical_block(num).await?; - let hash = block.hash.unwrap(); + let hash = block.hash.expect("Saved blocks should always have hashes"); - Ok(hash) + Ok((hash, is_archive_block)) } /// Get the heaviest chain's block from cache or backend rpc - pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result { + pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result<(ArcBlock, bool)> { // we only have blocks by hash now // maybe save them during save_block in a blocks_by_number Cache> // if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations) // be sure the requested block num exists let head_block_num = self.head_block_num().context("no servers in sync")?; + + // TODO: not 64 on all chains? get from config? + let archive_needed = num < &(head_block_num - U64::from(64)); + if num > &head_block_num { // TODO: i'm seeing this a lot when using ethspam. i dont know why though. i thought we delayed publishing // TODO: instead of error, maybe just sleep and try again? + // TODO: this should be a 401, not a 500 return Err(anyhow::anyhow!( "Head block is #{}, but #{} was requested", head_block_num, @@ -170,7 +175,9 @@ impl Web3Connections { if let Some(block_hash) = self.block_numbers.get(num) { // TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set // TODO: pass authorized_request through here? - return self.block(None, &block_hash, None).await; + let block = self.block(None, &block_hash, None).await?; + + return Ok((block, archive_needed)); } // block number not in cache. we need to ask an rpc for it @@ -193,7 +200,7 @@ impl Web3Connections { // the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain self.save_block(&block, true).await?; - Ok(block) + Ok((block, true)) } pub(super) async fn process_incoming_blocks( diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index a0433e0b..5761aba0 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -384,7 +384,7 @@ impl Web3Connection { // if we already have this block saved, set new_head_block to that arc. otherwise store this copy new_head_block = block_map - .get_with_by_ref(&new_hash, async move { new_head_block }) + .get_with(new_hash, async move { new_head_block }) .await; let new_num = new_head_block.number.unwrap_or_default(); diff --git a/web3_proxy/src/user_queries.rs b/web3_proxy/src/user_queries.rs index 1301c580..16770f3c 100644 --- a/web3_proxy/src/user_queries.rs +++ b/web3_proxy/src/user_queries.rs @@ -204,15 +204,15 @@ pub async fn get_aggregate_rpc_stats_from_params( rpc_accounting::Column::FrontendRequests.sum(), "total_requests", ) + .column_as( + rpc_accounting::Column::BackendRequests.sum(), + "total_backend_retries", + ) .column_as( rpc_accounting::Column::CacheMisses.sum(), "total_cache_misses", ) .column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits") - .column_as( - rpc_accounting::Column::BackendRetries.sum(), - "total_backend_retries", - ) .column_as( rpc_accounting::Column::SumResponseBytes.sum(), "total_response_bytes", @@ -355,15 +355,15 @@ pub async fn get_detailed_stats( rpc_accounting::Column::FrontendRequests.sum(), "total_requests", ) + .column_as( + rpc_accounting::Column::BackendRequests.sum(), + "total_backend_requests", + ) .column_as( rpc_accounting::Column::CacheMisses.sum(), "total_cache_misses", ) .column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits") - .column_as( - rpc_accounting::Column::BackendRetries.sum(), - "total_backend_retries", - ) .column_as( rpc_accounting::Column::SumResponseBytes.sum(), "total_response_bytes",