better stats aggregations
This commit is contained in:
parent
f2268dbb1b
commit
e8f2a13f5d
20
Cargo.lock
generated
20
Cargo.lock
generated
@ -1091,19 +1091,6 @@ dependencies = [
|
|||||||
"cipher 0.4.3",
|
"cipher 0.4.3",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "dashmap"
|
|
||||||
version = "5.4.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
|
|
||||||
dependencies = [
|
|
||||||
"cfg-if",
|
|
||||||
"hashbrown",
|
|
||||||
"lock_api",
|
|
||||||
"once_cell",
|
|
||||||
"parking_lot_core 0.9.3",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "deadpool"
|
name = "deadpool"
|
||||||
version = "0.9.5"
|
version = "0.9.5"
|
||||||
@ -1361,7 +1348,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "entities"
|
name = "entities"
|
||||||
version = "0.8.0"
|
version = "0.9.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"sea-orm",
|
"sea-orm",
|
||||||
"serde",
|
"serde",
|
||||||
@ -2670,7 +2657,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "migration"
|
name = "migration"
|
||||||
version = "0.8.0"
|
version = "0.9.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"sea-orm-migration",
|
"sea-orm-migration",
|
||||||
"tokio",
|
"tokio",
|
||||||
@ -5541,7 +5528,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "web3_proxy"
|
name = "web3_proxy"
|
||||||
version = "0.8.0"
|
version = "0.9.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
@ -5551,7 +5538,6 @@ dependencies = [
|
|||||||
"axum-macros",
|
"axum-macros",
|
||||||
"chrono",
|
"chrono",
|
||||||
"counter",
|
"counter",
|
||||||
"dashmap",
|
|
||||||
"deferred-rate-limiter",
|
"deferred-rate-limiter",
|
||||||
"derive_more",
|
"derive_more",
|
||||||
"dotenv",
|
"dotenv",
|
||||||
|
20
TODO.md
20
TODO.md
@ -215,7 +215,8 @@ These are roughly in order of completition
|
|||||||
- [x] in code
|
- [x] in code
|
||||||
- [x] in database with a migration
|
- [x] in database with a migration
|
||||||
- [x] instead of requests_per_minute on every key, have a "user_tier" that gets joined
|
- [x] instead of requests_per_minute on every key, have a "user_tier" that gets joined
|
||||||
- [ ] document url params with a test that works for examples
|
- [x] document url params with examples
|
||||||
|
- [x] improve "docs/http routes.txt"
|
||||||
- [ ] include if archive query or not in the stats
|
- [ ] include if archive query or not in the stats
|
||||||
- this is already partially done, but we need to double check it works. preferrably with tests
|
- this is already partially done, but we need to double check it works. preferrably with tests
|
||||||
- [-] add configurable size limits to all the Caches
|
- [-] add configurable size limits to all the Caches
|
||||||
@ -224,6 +225,15 @@ These are roughly in order of completition
|
|||||||
- this must be opt-in or spawned since it will slow things down and will make their calls less private
|
- this must be opt-in or spawned since it will slow things down and will make their calls less private
|
||||||
- [ ] automatic pruning of old revert logs once too many are collected
|
- [ ] automatic pruning of old revert logs once too many are collected
|
||||||
- [ ] we currently default to 0.0 and don't expose a way to edit it. we have a database row, but we don't use it
|
- [ ] we currently default to 0.0 and don't expose a way to edit it. we have a database row, but we don't use it
|
||||||
|
|
||||||
|
## V1
|
||||||
|
|
||||||
|
These are not yet ordered.
|
||||||
|
|
||||||
|
- [ ] with a test that creates a user and modifies their key
|
||||||
|
- [ ] Uuid/Ulid instead of big_unsigned for database ids
|
||||||
|
- might have to use Uuid in sea-orm and then convert to Ulid on display
|
||||||
|
- https://www.kostolansky.sk/posts/how-to-migrate-to-uuid/
|
||||||
- [ ] make the "not synced" error more verbose
|
- [ ] make the "not synced" error more verbose
|
||||||
- I think there is a bug in our synced_rpcs filtering. likely in has_block_data
|
- I think there is a bug in our synced_rpcs filtering. likely in has_block_data
|
||||||
- seeing "not synced" when I load https://vfat.tools/esd/
|
- seeing "not synced" when I load https://vfat.tools/esd/
|
||||||
@ -236,15 +246,7 @@ These are roughly in order of completition
|
|||||||
- [ ] if no bearer token found in redis (likely because it expired), send 401 unauthorized
|
- [ ] if no bearer token found in redis (likely because it expired), send 401 unauthorized
|
||||||
- [ ] user create script should allow multiple keys per user
|
- [ ] user create script should allow multiple keys per user
|
||||||
- [ ] somehow the proxy thought latest was hours behind. need internal health check that forces reconnect if this happens
|
- [ ] somehow the proxy thought latest was hours behind. need internal health check that forces reconnect if this happens
|
||||||
- [ ] Uuid/Ulid instead of big_unsigned for database ids
|
|
||||||
- might have to use Uuid in sea-orm and then convert to Ulid on display
|
|
||||||
- https://www.kostolansky.sk/posts/how-to-migrate-to-uuid/
|
|
||||||
- [ ] display concurrent requests per api key (only with authentication!)
|
- [ ] display concurrent requests per api key (only with authentication!)
|
||||||
|
|
||||||
## V1
|
|
||||||
|
|
||||||
These are not yet ordered.
|
|
||||||
|
|
||||||
- [ ] change "remember me" to last until 4 weeks of no use, rather than 4 weeks since login
|
- [ ] change "remember me" to last until 4 weeks of no use, rather than 4 weeks since login
|
||||||
- [ ] BUG! if sending transactions gets "INTERNAL_ERROR: existing tx with same hash", fake a success message
|
- [ ] BUG! if sending transactions gets "INTERNAL_ERROR: existing tx with same hash", fake a success message
|
||||||
- ERROR http_request:request:try_send_all_upstream_servers: web3_proxy::rpcs::request: bad response! err=JsonRpcClientError(JsonRpcError(JsonRpcError { code: -32000, message: "INTERNAL_ERROR: existing tx with same hash", data: None })) method=eth_sendRawTransaction rpc=local_erigon_alpha_archive id=01GF4HV03Y4ZNKQV8DW5NDQ5CG method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 }) self=Web3Connections { conns: {"local_erigon_alpha_archive_ws": Web3Connection { name: "local_erigon_alpha_archive_ws", blocks: "all", .. }, "local_geth_ws": Web3Connection { name: "local_geth_ws", blocks: 64, .. }, "local_erigon_alpha_archive": Web3Connection { name: "local_erigon_alpha_archive", blocks: "all", .. }}, .. } authorized_request=Some(User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 })) request=JsonRpcRequest { id: RawValue(39), method: "eth_sendRawTransaction", .. } request_metadata=Some(RequestMetadata { datetime: 2022-10-11T22:14:57.406829095Z, period_seconds: 60, request_bytes: 633, backend_requests: 0, no_servers: 0, error_response: false, response_bytes: 0, response_millis: 0 }) block_needed=None
|
- ERROR http_request:request:try_send_all_upstream_servers: web3_proxy::rpcs::request: bad response! err=JsonRpcClientError(JsonRpcError(JsonRpcError { code: -32000, message: "INTERNAL_ERROR: existing tx with same hash", data: None })) method=eth_sendRawTransaction rpc=local_erigon_alpha_archive id=01GF4HV03Y4ZNKQV8DW5NDQ5CG method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 }) self=Web3Connections { conns: {"local_erigon_alpha_archive_ws": Web3Connection { name: "local_erigon_alpha_archive_ws", blocks: "all", .. }, "local_geth_ws": Web3Connection { name: "local_geth_ws", blocks: 64, .. }, "local_erigon_alpha_archive": Web3Connection { name: "local_erigon_alpha_archive", blocks: "all", .. }}, .. } authorized_request=Some(User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 })) request=JsonRpcRequest { id: RawValue(39), method: "eth_sendRawTransaction", .. } request_metadata=Some(RequestMetadata { datetime: 2022-10-11T22:14:57.406829095Z, period_seconds: 60, request_bytes: 633, backend_requests: 0, no_servers: 0, error_response: false, response_bytes: 0, response_millis: 0 }) block_needed=None
|
||||||
|
@ -90,13 +90,12 @@ GET /user/keys
|
|||||||
Checks the "AUTHORIZATION" header for a valid bearer token.
|
Checks the "AUTHORIZATION" header for a valid bearer token.
|
||||||
If valid, displays data about the user's keys as JSON.
|
If valid, displays data about the user's keys as JSON.
|
||||||
|
|
||||||
POST /user/keys
|
POST or PUT /user/keys
|
||||||
Checks the "AUTHORIZATION" header for a valid bearer token.
|
Checks the "AUTHORIZATION" header for a valid bearer token.
|
||||||
If valid, allows the user to change options on their keys.
|
If valid, allows the user to create a new key or change options on their keys.
|
||||||
|
|
||||||
The POSTed JSON can have these fields:
|
The POSTed JSON can have these fields:
|
||||||
existing_key_id: Option<u64>,
|
key_id: Option<u64>,
|
||||||
existing_key: Option<RpcApiKey>,
|
|
||||||
description: Option<String>,
|
description: Option<String>,
|
||||||
private_txs: Option<bool>,
|
private_txs: Option<bool>,
|
||||||
active: Option<bool>,
|
active: Option<bool>,
|
||||||
@ -105,7 +104,7 @@ POST /user/keys
|
|||||||
allowed_referers: Option<String>,
|
allowed_referers: Option<String>,
|
||||||
allowed_user_agents: Option<String>,
|
allowed_user_agents: Option<String>,
|
||||||
|
|
||||||
You must set **either** `existing_key_id` **or** `existing_key`.
|
The PUTed JSON has the same fields as the POSTed JSON, except for there is no `key_id`
|
||||||
|
|
||||||
If you do not want to update a field, do not include it in the POSTed JSON.
|
If you do not want to update a field, do not include it in the POSTed JSON.
|
||||||
If you want to delete a string field, include the data's key and set the value to an empty string.
|
If you want to delete a string field, include the data's key and set the value to an empty string.
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "entities"
|
name = "entities"
|
||||||
version = "0.8.0"
|
version = "0.9.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
|
@ -11,12 +11,13 @@ pub struct Model {
|
|||||||
pub rpc_key_id: u64,
|
pub rpc_key_id: u64,
|
||||||
pub chain_id: u64,
|
pub chain_id: u64,
|
||||||
pub method: String,
|
pub method: String,
|
||||||
|
pub archive_request: bool,
|
||||||
pub error_response: bool,
|
pub error_response: bool,
|
||||||
pub period_datetime: DateTimeUtc,
|
pub period_datetime: DateTimeUtc,
|
||||||
pub frontend_requests: u64,
|
pub frontend_requests: u64,
|
||||||
pub backend_requests: u64,
|
pub backend_requests: u64,
|
||||||
pub backend_retries: u64,
|
// pub backend_retries: u64,
|
||||||
pub no_servers: u64,
|
// pub no_servers: u64,
|
||||||
pub cache_misses: u64,
|
pub cache_misses: u64,
|
||||||
pub cache_hits: u64,
|
pub cache_hits: u64,
|
||||||
pub sum_request_bytes: u64,
|
pub sum_request_bytes: u64,
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "migration"
|
name = "migration"
|
||||||
version = "0.8.0"
|
version = "0.9.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
publish = false
|
publish = false
|
||||||
|
|
||||||
|
@ -8,6 +8,7 @@ mod m20221025_210326_add_chain_id_to_reverts;
|
|||||||
mod m20221026_230819_rename_user_keys;
|
mod m20221026_230819_rename_user_keys;
|
||||||
mod m20221027_002407_user_tiers;
|
mod m20221027_002407_user_tiers;
|
||||||
mod m20221031_211916_clean_up;
|
mod m20221031_211916_clean_up;
|
||||||
|
mod m20221101_222349_archive_request;
|
||||||
|
|
||||||
pub struct Migrator;
|
pub struct Migrator;
|
||||||
|
|
||||||
@ -23,6 +24,7 @@ impl MigratorTrait for Migrator {
|
|||||||
Box::new(m20221026_230819_rename_user_keys::Migration),
|
Box::new(m20221026_230819_rename_user_keys::Migration),
|
||||||
Box::new(m20221027_002407_user_tiers::Migration),
|
Box::new(m20221027_002407_user_tiers::Migration),
|
||||||
Box::new(m20221031_211916_clean_up::Migration),
|
Box::new(m20221031_211916_clean_up::Migration),
|
||||||
|
Box::new(m20221101_222349_archive_request::Migration),
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
39
migration/src/m20221101_222349_archive_request.rs
Normal file
39
migration/src/m20221101_222349_archive_request.rs
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
use sea_orm_migration::prelude::*;
|
||||||
|
|
||||||
|
#[derive(DeriveMigrationName)]
|
||||||
|
pub struct Migration;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MigrationTrait for Migration {
|
||||||
|
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager
|
||||||
|
.alter_table(
|
||||||
|
Table::alter()
|
||||||
|
.table(Alias::new("rpc_accounting"))
|
||||||
|
.add_column(
|
||||||
|
ColumnDef::new(Alias::new("archive_request"))
|
||||||
|
.boolean()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.drop_column(Alias::new("backend_retries"))
|
||||||
|
.drop_column(Alias::new("no_servers"))
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager
|
||||||
|
.alter_table(
|
||||||
|
Table::alter()
|
||||||
|
.table(Alias::new("rpc_accounting"))
|
||||||
|
.drop_column(Alias::new("archive_request"))
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "web3_proxy"
|
name = "web3_proxy"
|
||||||
version = "0.8.0"
|
version = "0.9.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
default-run = "web3_proxy"
|
default-run = "web3_proxy"
|
||||||
|
|
||||||
@ -28,7 +28,6 @@ axum-macros = "0.2.3"
|
|||||||
# TODO: import chrono from sea-orm so we always have the same version
|
# TODO: import chrono from sea-orm so we always have the same version
|
||||||
chrono = "0.4.22"
|
chrono = "0.4.22"
|
||||||
counter = "0.5.7"
|
counter = "0.5.7"
|
||||||
dashmap = "5.4.0"
|
|
||||||
derive_more = "0.99.17"
|
derive_more = "0.99.17"
|
||||||
dotenv = "0.15.0"
|
dotenv = "0.15.0"
|
||||||
ethers = { version = "1.0.0", features = ["rustls", "ws"] }
|
ethers = { version = "1.0.0", features = ["rustls", "ws"] }
|
||||||
|
@ -283,20 +283,14 @@ impl Web3ProxyApp {
|
|||||||
// setup a channel for receiving stats (generally with a high cardinality, such as per-user)
|
// setup a channel for receiving stats (generally with a high cardinality, such as per-user)
|
||||||
// we do this in a channel so we don't slow down our response to the users
|
// we do this in a channel so we don't slow down our response to the users
|
||||||
let stat_sender = if let Some(db_conn) = db_conn.clone() {
|
let stat_sender = if let Some(db_conn) = db_conn.clone() {
|
||||||
// TODO: sender and receiver here are a little confusing. because the thing that reads the receiver is what actually submits the stats
|
let emitter_spawn =
|
||||||
let (stat_sender, save_handle, stat_handle) = {
|
StatEmitter::spawn(top_config.app.chain_id, db_conn, 60, shutdown_receiver)?;
|
||||||
// TODO: period from config instead of always being 60 seconds
|
|
||||||
let emitter = StatEmitter::new(top_config.app.chain_id, db_conn, 60);
|
|
||||||
|
|
||||||
emitter.spawn(shutdown_receiver).await?
|
important_background_handles.push(emitter_spawn.background_handle);
|
||||||
};
|
|
||||||
|
|
||||||
cancellable_handles.push(stat_handle);
|
Some(emitter_spawn.stat_sender)
|
||||||
important_background_handles.push(save_handle);
|
|
||||||
|
|
||||||
Some(stat_sender)
|
|
||||||
} else {
|
} else {
|
||||||
warn!("cannot store stats without a redis connection");
|
warn!("cannot store stats without a database connection");
|
||||||
|
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
@ -1008,7 +1002,7 @@ impl Web3ProxyApp {
|
|||||||
method => {
|
method => {
|
||||||
// emit stats
|
// emit stats
|
||||||
|
|
||||||
// TODO: wait for them to be synced?
|
// TODO: if no servers synced, wait for them to be synced?
|
||||||
let head_block_id = self
|
let head_block_id = self
|
||||||
.balanced_rpcs
|
.balanced_rpcs
|
||||||
.head_block_id()
|
.head_block_id()
|
||||||
@ -1025,9 +1019,15 @@ impl Web3ProxyApp {
|
|||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
// TODO: maybe this should be on the app and not on balanced_rpcs
|
// TODO: maybe this should be on the app and not on balanced_rpcs
|
||||||
let request_block_hash =
|
let (request_block_hash, archive_needed) =
|
||||||
self.balanced_rpcs.block_hash(&request_block_needed).await?;
|
self.balanced_rpcs.block_hash(&request_block_needed).await?;
|
||||||
|
|
||||||
|
if archive_needed {
|
||||||
|
request_metadata
|
||||||
|
.archive_request
|
||||||
|
.store(true, atomic::Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
BlockId {
|
BlockId {
|
||||||
num: request_block_needed,
|
num: request_block_needed,
|
||||||
hash: request_block_hash,
|
hash: request_block_hash,
|
||||||
|
@ -1,20 +1,18 @@
|
|||||||
use crate::frontend::authorization::{AuthorizedKey, RequestMetadata};
|
use crate::frontend::authorization::{AuthorizedKey, RequestMetadata};
|
||||||
use crate::jsonrpc::JsonRpcForwardedResponse;
|
use crate::jsonrpc::JsonRpcForwardedResponse;
|
||||||
use anyhow::Context;
|
|
||||||
use chrono::{TimeZone, Utc};
|
use chrono::{TimeZone, Utc};
|
||||||
use dashmap::mapref::entry::Entry;
|
|
||||||
use dashmap::DashMap;
|
|
||||||
use derive_more::From;
|
use derive_more::From;
|
||||||
use entities::rpc_accounting;
|
use entities::rpc_accounting;
|
||||||
use hdrhistogram::Histogram;
|
use hashbrown::HashMap;
|
||||||
use moka::future::{Cache, CacheBuilder};
|
use hdrhistogram::{Histogram, RecordError};
|
||||||
use sea_orm::{ActiveModelTrait, DatabaseConnection};
|
use sea_orm::{ActiveModelTrait, DatabaseConnection, DbErr};
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::Ordering;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::{Duration, SystemTime};
|
||||||
use tokio::sync::{broadcast, Mutex as AsyncMutex};
|
use tokio::sync::broadcast;
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
use tracing::{error, info, trace};
|
use tokio::time::{interval_at, Instant};
|
||||||
|
use tracing::{error, info};
|
||||||
|
|
||||||
/// TODO: where should this be defined?
|
/// TODO: where should this be defined?
|
||||||
/// TODO: can we use something inside sea_orm instead?
|
/// TODO: can we use something inside sea_orm instead?
|
||||||
@ -22,18 +20,15 @@ use tracing::{error, info, trace};
|
|||||||
pub struct ProxyResponseStat {
|
pub struct ProxyResponseStat {
|
||||||
rpc_key_id: u64,
|
rpc_key_id: u64,
|
||||||
method: String,
|
method: String,
|
||||||
period_seconds: u64,
|
archive_request: bool,
|
||||||
period_timestamp: u64,
|
|
||||||
request_bytes: u64,
|
request_bytes: u64,
|
||||||
/// if this is 0, there was a cache_hit
|
/// if backend_requests is 0, there was a cache_hit
|
||||||
backend_requests: u64,
|
backend_requests: u64,
|
||||||
error_response: bool,
|
error_response: bool,
|
||||||
response_bytes: u64,
|
response_bytes: u64,
|
||||||
response_millis: u64,
|
response_millis: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type TimeBucketTimestamp = u64;
|
|
||||||
|
|
||||||
pub struct ProxyResponseHistograms {
|
pub struct ProxyResponseHistograms {
|
||||||
request_bytes: Histogram<u64>,
|
request_bytes: Histogram<u64>,
|
||||||
response_bytes: Histogram<u64>,
|
response_bytes: Histogram<u64>,
|
||||||
@ -55,52 +50,165 @@ impl Default for ProxyResponseHistograms {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: impl From for our database model
|
#[derive(Clone, From, Hash, PartialEq, Eq)]
|
||||||
pub struct ProxyResponseAggregate {
|
struct ProxyResponseAggregateKey {
|
||||||
// these are the key
|
|
||||||
// rpc_key_id: u64,
|
|
||||||
// method: String,
|
|
||||||
// error_response: bool,
|
|
||||||
// TODO: this is the grandparent key. get it from there somehow
|
|
||||||
period_timestamp: u64,
|
|
||||||
frontend_requests: AtomicU64,
|
|
||||||
backend_requests: AtomicU64,
|
|
||||||
backend_retries: AtomicU64,
|
|
||||||
no_servers: AtomicU64,
|
|
||||||
cache_misses: AtomicU64,
|
|
||||||
cache_hits: AtomicU64,
|
|
||||||
sum_request_bytes: AtomicU64,
|
|
||||||
sum_response_bytes: AtomicU64,
|
|
||||||
sum_response_millis: AtomicU64,
|
|
||||||
histograms: AsyncMutex<ProxyResponseHistograms>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, From, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
|
||||||
pub struct UserProxyResponseKey {
|
|
||||||
rpc_key_id: u64,
|
rpc_key_id: u64,
|
||||||
method: String,
|
method: String,
|
||||||
error_response: bool,
|
error_response: bool,
|
||||||
|
archive_request: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: think about nested maps more. does this need an arc?
|
#[derive(Default)]
|
||||||
pub type UserProxyResponseCache = DashMap<UserProxyResponseKey, Arc<ProxyResponseAggregate>>;
|
pub struct ProxyResponseAggregate {
|
||||||
/// key is the "time bucket's timestamp" (timestamp / period * period)
|
frontend_requests: u64,
|
||||||
pub type TimeProxyResponseCache =
|
backend_requests: u64,
|
||||||
Cache<TimeBucketTimestamp, UserProxyResponseCache, hashbrown::hash_map::DefaultHashBuilder>;
|
// TODO: related to backend_requests. get this level of detail out
|
||||||
|
// backend_retries: u64,
|
||||||
|
// TODO: related to backend_requests. get this level of detail out
|
||||||
|
// no_servers: u64,
|
||||||
|
cache_misses: u64,
|
||||||
|
cache_hits: u64,
|
||||||
|
sum_request_bytes: u64,
|
||||||
|
sum_response_bytes: u64,
|
||||||
|
sum_response_millis: u64,
|
||||||
|
histograms: ProxyResponseHistograms,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A stat that we aggregate and then store in a database.
|
||||||
|
/// For now there is just one, but I think there might be others later
|
||||||
|
#[derive(Debug, From)]
|
||||||
|
pub enum Web3ProxyStat {
|
||||||
|
Response(ProxyResponseStat),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(From)]
|
||||||
|
pub struct StatEmitterSpawn {
|
||||||
|
pub stat_sender: flume::Sender<Web3ProxyStat>,
|
||||||
|
/// these handles are important and must be allowed to finish
|
||||||
|
pub background_handle: JoinHandle<anyhow::Result<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
pub struct StatEmitter {
|
pub struct StatEmitter {
|
||||||
chain_id: u64,
|
chain_id: u64,
|
||||||
db_conn: DatabaseConnection,
|
db_conn: DatabaseConnection,
|
||||||
period_seconds: u64,
|
period_seconds: u64,
|
||||||
/// the outer cache has a TTL and a handler for expiration
|
|
||||||
aggregated_proxy_responses: TimeProxyResponseCache,
|
|
||||||
save_rx: flume::Receiver<UserProxyResponseCache>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A stat that we aggregate and then store in a database.
|
// TODO: impl `+=<ProxyResponseStat>` for ProxyResponseAggregate?
|
||||||
#[derive(Debug, From)]
|
impl ProxyResponseAggregate {
|
||||||
pub enum Web3ProxyStat {
|
fn add(&mut self, stat: ProxyResponseStat) -> Result<(), RecordError> {
|
||||||
ProxyResponse(ProxyResponseStat),
|
// a stat always come from just 1 frontend request
|
||||||
|
self.frontend_requests += 1;
|
||||||
|
|
||||||
|
if stat.backend_requests == 0 {
|
||||||
|
// no backend request. cache hit!
|
||||||
|
self.cache_hits += 1;
|
||||||
|
} else {
|
||||||
|
// backend requests! cache miss!
|
||||||
|
self.cache_misses += 1;
|
||||||
|
|
||||||
|
// a stat might have multiple backend requests
|
||||||
|
self.backend_requests += stat.backend_requests;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.sum_request_bytes += stat.request_bytes;
|
||||||
|
self.sum_response_bytes += stat.response_bytes;
|
||||||
|
self.sum_response_millis += stat.response_millis;
|
||||||
|
|
||||||
|
// TODO: use `record_correct`?
|
||||||
|
self.histograms.request_bytes.record(stat.request_bytes)?;
|
||||||
|
self.histograms
|
||||||
|
.response_millis
|
||||||
|
.record(stat.response_millis)?;
|
||||||
|
self.histograms.response_bytes.record(stat.response_bytes)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO? help to turn this plus the key into a database model?
|
||||||
|
// TODO: take a db transaction instead so that we can batch
|
||||||
|
async fn save(
|
||||||
|
self,
|
||||||
|
chain_id: u64,
|
||||||
|
db_conn: &DatabaseConnection,
|
||||||
|
key: ProxyResponseAggregateKey,
|
||||||
|
period_timestamp: u64,
|
||||||
|
) -> Result<(), DbErr> {
|
||||||
|
// this is a lot of variables
|
||||||
|
let period_datetime = Utc.timestamp(period_timestamp as i64, 0);
|
||||||
|
|
||||||
|
let request_bytes = &self.histograms.request_bytes;
|
||||||
|
|
||||||
|
let min_request_bytes = request_bytes.min();
|
||||||
|
let mean_request_bytes = request_bytes.mean();
|
||||||
|
let p50_request_bytes = request_bytes.value_at_quantile(0.50);
|
||||||
|
let p90_request_bytes = request_bytes.value_at_quantile(0.90);
|
||||||
|
let p99_request_bytes = request_bytes.value_at_quantile(0.99);
|
||||||
|
let max_request_bytes = request_bytes.max();
|
||||||
|
|
||||||
|
let response_millis = &self.histograms.response_millis;
|
||||||
|
|
||||||
|
let min_response_millis = response_millis.min();
|
||||||
|
let mean_response_millis = response_millis.mean();
|
||||||
|
let p50_response_millis = response_millis.value_at_quantile(0.50);
|
||||||
|
let p90_response_millis = response_millis.value_at_quantile(0.90);
|
||||||
|
let p99_response_millis = response_millis.value_at_quantile(0.99);
|
||||||
|
let max_response_millis = response_millis.max();
|
||||||
|
|
||||||
|
let response_bytes = &self.histograms.response_bytes;
|
||||||
|
|
||||||
|
let min_response_bytes = response_bytes.min();
|
||||||
|
let mean_response_bytes = response_bytes.mean();
|
||||||
|
let p50_response_bytes = response_bytes.value_at_quantile(0.50);
|
||||||
|
let p90_response_bytes = response_bytes.value_at_quantile(0.90);
|
||||||
|
let p99_response_bytes = response_bytes.value_at_quantile(0.99);
|
||||||
|
let max_response_bytes = response_bytes.max();
|
||||||
|
|
||||||
|
let aggregated_stat_model = rpc_accounting::ActiveModel {
|
||||||
|
id: sea_orm::NotSet,
|
||||||
|
|
||||||
|
rpc_key_id: sea_orm::Set(key.rpc_key_id),
|
||||||
|
chain_id: sea_orm::Set(chain_id),
|
||||||
|
method: sea_orm::Set(key.method),
|
||||||
|
archive_request: sea_orm::Set(key.archive_request),
|
||||||
|
error_response: sea_orm::Set(key.error_response),
|
||||||
|
period_datetime: sea_orm::Set(period_datetime),
|
||||||
|
frontend_requests: sea_orm::Set(self.frontend_requests),
|
||||||
|
backend_requests: sea_orm::Set(self.backend_requests),
|
||||||
|
// backend_retries: sea_orm::Set(self.backend_retries),
|
||||||
|
// no_servers: sea_orm::Set(self.no_servers),
|
||||||
|
cache_misses: sea_orm::Set(self.cache_misses),
|
||||||
|
cache_hits: sea_orm::Set(self.cache_hits),
|
||||||
|
|
||||||
|
sum_request_bytes: sea_orm::Set(self.sum_request_bytes),
|
||||||
|
min_request_bytes: sea_orm::Set(min_request_bytes),
|
||||||
|
mean_request_bytes: sea_orm::Set(mean_request_bytes),
|
||||||
|
p50_request_bytes: sea_orm::Set(p50_request_bytes),
|
||||||
|
p90_request_bytes: sea_orm::Set(p90_request_bytes),
|
||||||
|
p99_request_bytes: sea_orm::Set(p99_request_bytes),
|
||||||
|
max_request_bytes: sea_orm::Set(max_request_bytes),
|
||||||
|
|
||||||
|
sum_response_millis: sea_orm::Set(self.sum_response_millis),
|
||||||
|
min_response_millis: sea_orm::Set(min_response_millis),
|
||||||
|
mean_response_millis: sea_orm::Set(mean_response_millis),
|
||||||
|
p50_response_millis: sea_orm::Set(p50_response_millis),
|
||||||
|
p90_response_millis: sea_orm::Set(p90_response_millis),
|
||||||
|
p99_response_millis: sea_orm::Set(p99_response_millis),
|
||||||
|
max_response_millis: sea_orm::Set(max_response_millis),
|
||||||
|
|
||||||
|
sum_response_bytes: sea_orm::Set(self.sum_response_bytes),
|
||||||
|
min_response_bytes: sea_orm::Set(min_response_bytes),
|
||||||
|
mean_response_bytes: sea_orm::Set(mean_response_bytes),
|
||||||
|
p50_response_bytes: sea_orm::Set(p50_response_bytes),
|
||||||
|
p90_response_bytes: sea_orm::Set(p90_response_bytes),
|
||||||
|
p99_response_bytes: sea_orm::Set(p99_response_bytes),
|
||||||
|
max_response_bytes: sea_orm::Set(max_response_bytes),
|
||||||
|
};
|
||||||
|
|
||||||
|
aggregated_stat_model.save(db_conn).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ProxyResponseStat {
|
impl ProxyResponseStat {
|
||||||
@ -116,10 +224,11 @@ impl ProxyResponseStat {
|
|||||||
.expect("serializing here should always work")
|
.expect("serializing here should always work")
|
||||||
.len() as u64;
|
.len() as u64;
|
||||||
|
|
||||||
|
let archive_request = metadata.archive_request.load(Ordering::Acquire);
|
||||||
let backend_requests = metadata.backend_requests.load(Ordering::Acquire);
|
let backend_requests = metadata.backend_requests.load(Ordering::Acquire);
|
||||||
let period_seconds = metadata.period_seconds;
|
// let period_seconds = metadata.period_seconds;
|
||||||
let period_timestamp =
|
// let period_timestamp =
|
||||||
(metadata.start_datetime.timestamp() as u64) / period_seconds * period_seconds;
|
// (metadata.start_datetime.timestamp() as u64) / period_seconds * period_seconds;
|
||||||
let request_bytes = metadata.request_bytes;
|
let request_bytes = metadata.request_bytes;
|
||||||
let error_response = metadata.error_response.load(Ordering::Acquire);
|
let error_response = metadata.error_response.load(Ordering::Acquire);
|
||||||
|
|
||||||
@ -128,96 +237,116 @@ impl ProxyResponseStat {
|
|||||||
|
|
||||||
Self {
|
Self {
|
||||||
rpc_key_id: authorized_key.rpc_key_id,
|
rpc_key_id: authorized_key.rpc_key_id,
|
||||||
|
archive_request,
|
||||||
method,
|
method,
|
||||||
backend_requests,
|
backend_requests,
|
||||||
period_seconds,
|
|
||||||
period_timestamp,
|
|
||||||
request_bytes,
|
request_bytes,
|
||||||
error_response,
|
error_response,
|
||||||
response_bytes,
|
response_bytes,
|
||||||
response_millis,
|
response_millis,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn key(&self) -> ProxyResponseAggregateKey {
|
||||||
|
ProxyResponseAggregateKey {
|
||||||
|
rpc_key_id: self.rpc_key_id,
|
||||||
|
method: self.method.clone(),
|
||||||
|
error_response: self.error_response,
|
||||||
|
archive_request: self.archive_request,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StatEmitter {
|
impl StatEmitter {
|
||||||
pub fn new(chain_id: u64, db_conn: DatabaseConnection, period_seconds: u64) -> Arc<Self> {
|
pub fn spawn(
|
||||||
let (save_tx, save_rx) = flume::unbounded();
|
chain_id: u64,
|
||||||
|
db_conn: DatabaseConnection,
|
||||||
|
period_seconds: u64,
|
||||||
|
shutdown_receiver: broadcast::Receiver<()>,
|
||||||
|
) -> anyhow::Result<StatEmitterSpawn> {
|
||||||
|
let (stat_sender, stat_receiver) = flume::unbounded();
|
||||||
|
|
||||||
// this needs to be long enough that there are definitely no outstanding queries
|
let mut new = Self {
|
||||||
// TODO: what should the "safe" multiplier be? what if something is late?
|
|
||||||
// TODO: in most cases this delays more than necessary. think of how to do this without dashmap which might let us proceed
|
|
||||||
let ttl_seconds = period_seconds * 3;
|
|
||||||
|
|
||||||
let aggregated_proxy_responses = CacheBuilder::default()
|
|
||||||
.time_to_live(Duration::from_secs(ttl_seconds))
|
|
||||||
.eviction_listener_with_queued_delivery_mode(move |_, v, _| {
|
|
||||||
// this function must not panic!
|
|
||||||
if let Err(err) = save_tx.send(v) {
|
|
||||||
error!(?err, "unable to save. sender closed!");
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
|
|
||||||
|
|
||||||
let s = Self {
|
|
||||||
chain_id,
|
chain_id,
|
||||||
db_conn,
|
db_conn,
|
||||||
period_seconds,
|
period_seconds,
|
||||||
aggregated_proxy_responses,
|
|
||||||
save_rx,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
Arc::new(s)
|
let handle =
|
||||||
|
tokio::spawn(async move { new.stat_loop(stat_receiver, shutdown_receiver).await });
|
||||||
|
|
||||||
|
Ok((stat_sender, handle).into())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn spawn(
|
async fn stat_loop(
|
||||||
self: Arc<Self>,
|
&mut self,
|
||||||
shutdown_receiver: broadcast::Receiver<()>,
|
stat_receiver: flume::Receiver<Web3ProxyStat>,
|
||||||
) -> anyhow::Result<(
|
|
||||||
flume::Sender<Web3ProxyStat>,
|
|
||||||
JoinHandle<anyhow::Result<()>>,
|
|
||||||
JoinHandle<anyhow::Result<()>>,
|
|
||||||
)> {
|
|
||||||
let (aggregate_tx, aggregate_rx) = flume::unbounded::<Web3ProxyStat>();
|
|
||||||
|
|
||||||
// TODO: join and flatten these handles
|
|
||||||
let aggregate_handle = tokio::spawn(
|
|
||||||
self.clone()
|
|
||||||
.aggregate_stats_loop(aggregate_rx, shutdown_receiver),
|
|
||||||
);
|
|
||||||
let save_handle = tokio::spawn(self.save_stats_loop());
|
|
||||||
|
|
||||||
Ok((aggregate_tx, aggregate_handle, save_handle))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// simple future that reads the channel and aggregates stats in a local cache.
|
|
||||||
async fn aggregate_stats_loop(
|
|
||||||
self: Arc<Self>,
|
|
||||||
aggregate_rx: flume::Receiver<Web3ProxyStat>,
|
|
||||||
mut shutdown_receiver: broadcast::Receiver<()>,
|
mut shutdown_receiver: broadcast::Receiver<()>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
|
let system_now = SystemTime::now();
|
||||||
|
|
||||||
|
let duration_since_epoch = system_now
|
||||||
|
.duration_since(SystemTime::UNIX_EPOCH)
|
||||||
|
.expect("time machines don't exist");
|
||||||
|
|
||||||
|
// TODO: change period_seconds from u64 to u32
|
||||||
|
let current_period = duration_since_epoch
|
||||||
|
.checked_div(self.period_seconds as u32)
|
||||||
|
.unwrap()
|
||||||
|
* self.period_seconds as u32;
|
||||||
|
|
||||||
|
let duration_to_next_period =
|
||||||
|
Duration::from_secs(self.period_seconds) - (duration_since_epoch - current_period);
|
||||||
|
|
||||||
|
// start the interval when the next period starts
|
||||||
|
let start_instant = Instant::now() + duration_to_next_period;
|
||||||
|
let mut interval = interval_at(start_instant, Duration::from_secs(self.period_seconds));
|
||||||
|
|
||||||
|
// loop between different futures to update these mutables
|
||||||
|
let mut period_timestamp = current_period.as_secs();
|
||||||
|
let mut response_aggregate_map =
|
||||||
|
HashMap::<ProxyResponseAggregateKey, ProxyResponseAggregate>::new();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
x = aggregate_rx.recv_async() => {
|
stat = stat_receiver.recv_async() => {
|
||||||
match x {
|
match stat? {
|
||||||
Ok(x) => {
|
Web3ProxyStat::Response(stat) => {
|
||||||
trace!(?x, "aggregating stat");
|
let key = stat.key();
|
||||||
|
|
||||||
// TODO: increment global stats (in redis? in local cache for prometheus?)
|
// TODO: does hashmap have get_or_insert?
|
||||||
|
if ! response_aggregate_map.contains_key(&key) {
|
||||||
|
response_aggregate_map.insert(key.clone(), Default::default());
|
||||||
|
};
|
||||||
|
|
||||||
// TODO: batch stats?
|
if let Some(value) = response_aggregate_map.get_mut(&key) {
|
||||||
// TODO: where can we wait on this handle?
|
if let Err(err) = value.add(stat) {
|
||||||
let clone = self.clone();
|
error!(?err, "unable to aggregate stats!");
|
||||||
tokio::spawn(async move { clone.aggregate_stat(x).await });
|
};
|
||||||
},
|
} else {
|
||||||
Err(err) => {
|
unimplemented!();
|
||||||
error!(?err, "aggregate_rx");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
_ = interval.tick() => {
|
||||||
|
// save all the aggregated stats
|
||||||
|
// TODO: batch these saves
|
||||||
|
for (key, aggregate) in response_aggregate_map.drain() {
|
||||||
|
if let Err(err) = aggregate.save(self.chain_id, &self.db_conn, key, period_timestamp).await {
|
||||||
|
error!(?err, "Unable to save stat while shutting down!");
|
||||||
|
};
|
||||||
|
}
|
||||||
|
// advance to the next period
|
||||||
|
// TODO: is this safe? what if there is drift?
|
||||||
|
period_timestamp += self.period_seconds;
|
||||||
|
}
|
||||||
x = shutdown_receiver.recv() => {
|
x = shutdown_receiver.recv() => {
|
||||||
match x {
|
match x {
|
||||||
Ok(_) => info!("aggregate stats loop shutting down"),
|
Ok(_) => {
|
||||||
|
info!("aggregate stat_loop shutting down");
|
||||||
|
// TODO: call aggregate_stat for all the
|
||||||
|
},
|
||||||
Err(err) => error!(?err, "shutdown receiver"),
|
Err(err) => error!(?err, "shutdown receiver"),
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@ -225,210 +354,16 @@ impl StatEmitter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// shutting down. force a save of any pending stats
|
for (key, aggregate) in response_aggregate_map.drain() {
|
||||||
// we do not use invalidate_all because that is done on a background thread
|
if let Err(err) = aggregate
|
||||||
for (key, _) in self.aggregated_proxy_responses.into_iter() {
|
.save(self.chain_id, &self.db_conn, key, period_timestamp)
|
||||||
self.aggregated_proxy_responses.invalidate(&key).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("aggregate stats loop finished");
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn save_stats_loop(self: Arc<Self>) -> anyhow::Result<()> {
|
|
||||||
while let Ok(x) = self.save_rx.recv_async().await {
|
|
||||||
// TODO: batch these
|
|
||||||
for (k, v) in x.into_iter() {
|
|
||||||
// TODO: this is a lot of variables
|
|
||||||
let period_datetime = Utc.timestamp(v.period_timestamp as i64, 0);
|
|
||||||
let frontend_requests = v.frontend_requests.load(Ordering::Acquire);
|
|
||||||
let backend_requests = v.backend_requests.load(Ordering::Acquire);
|
|
||||||
let backend_retries = v.backend_retries.load(Ordering::Acquire);
|
|
||||||
let no_servers = v.no_servers.load(Ordering::Acquire);
|
|
||||||
let cache_misses = v.cache_misses.load(Ordering::Acquire);
|
|
||||||
let cache_hits = v.cache_hits.load(Ordering::Acquire);
|
|
||||||
let sum_request_bytes = v.sum_request_bytes.load(Ordering::Acquire);
|
|
||||||
let sum_response_millis = v.sum_response_millis.load(Ordering::Acquire);
|
|
||||||
let sum_response_bytes = v.sum_response_bytes.load(Ordering::Acquire);
|
|
||||||
|
|
||||||
let histograms = v.histograms.lock().await;
|
|
||||||
|
|
||||||
let request_bytes = &histograms.request_bytes;
|
|
||||||
|
|
||||||
let min_request_bytes = request_bytes.min();
|
|
||||||
let mean_request_bytes = request_bytes.mean();
|
|
||||||
let p50_request_bytes = request_bytes.value_at_quantile(0.50);
|
|
||||||
let p90_request_bytes = request_bytes.value_at_quantile(0.90);
|
|
||||||
let p99_request_bytes = request_bytes.value_at_quantile(0.99);
|
|
||||||
let max_request_bytes = request_bytes.max();
|
|
||||||
|
|
||||||
let response_millis = &histograms.response_millis;
|
|
||||||
|
|
||||||
let min_response_millis = response_millis.min();
|
|
||||||
let mean_response_millis = response_millis.mean();
|
|
||||||
let p50_response_millis = response_millis.value_at_quantile(0.50);
|
|
||||||
let p90_response_millis = response_millis.value_at_quantile(0.90);
|
|
||||||
let p99_response_millis = response_millis.value_at_quantile(0.99);
|
|
||||||
let max_response_millis = response_millis.max();
|
|
||||||
|
|
||||||
let response_bytes = &histograms.response_bytes;
|
|
||||||
|
|
||||||
let min_response_bytes = response_bytes.min();
|
|
||||||
let mean_response_bytes = response_bytes.mean();
|
|
||||||
let p50_response_bytes = response_bytes.value_at_quantile(0.50);
|
|
||||||
let p90_response_bytes = response_bytes.value_at_quantile(0.90);
|
|
||||||
let p99_response_bytes = response_bytes.value_at_quantile(0.99);
|
|
||||||
let max_response_bytes = response_bytes.max();
|
|
||||||
|
|
||||||
drop(histograms);
|
|
||||||
|
|
||||||
let stat = rpc_accounting::ActiveModel {
|
|
||||||
id: sea_orm::NotSet,
|
|
||||||
|
|
||||||
rpc_key_id: sea_orm::Set(k.rpc_key_id),
|
|
||||||
chain_id: sea_orm::Set(self.chain_id),
|
|
||||||
method: sea_orm::Set(k.method.clone()),
|
|
||||||
error_response: sea_orm::Set(k.error_response),
|
|
||||||
period_datetime: sea_orm::Set(period_datetime),
|
|
||||||
frontend_requests: sea_orm::Set(frontend_requests),
|
|
||||||
backend_requests: sea_orm::Set(backend_requests),
|
|
||||||
backend_retries: sea_orm::Set(backend_retries),
|
|
||||||
no_servers: sea_orm::Set(no_servers),
|
|
||||||
cache_misses: sea_orm::Set(cache_misses),
|
|
||||||
cache_hits: sea_orm::Set(cache_hits),
|
|
||||||
|
|
||||||
sum_request_bytes: sea_orm::Set(sum_request_bytes),
|
|
||||||
min_request_bytes: sea_orm::Set(min_request_bytes),
|
|
||||||
mean_request_bytes: sea_orm::Set(mean_request_bytes),
|
|
||||||
p50_request_bytes: sea_orm::Set(p50_request_bytes),
|
|
||||||
p90_request_bytes: sea_orm::Set(p90_request_bytes),
|
|
||||||
p99_request_bytes: sea_orm::Set(p99_request_bytes),
|
|
||||||
max_request_bytes: sea_orm::Set(max_request_bytes),
|
|
||||||
|
|
||||||
sum_response_millis: sea_orm::Set(sum_response_millis),
|
|
||||||
min_response_millis: sea_orm::Set(min_response_millis),
|
|
||||||
mean_response_millis: sea_orm::Set(mean_response_millis),
|
|
||||||
p50_response_millis: sea_orm::Set(p50_response_millis),
|
|
||||||
p90_response_millis: sea_orm::Set(p90_response_millis),
|
|
||||||
p99_response_millis: sea_orm::Set(p99_response_millis),
|
|
||||||
max_response_millis: sea_orm::Set(max_response_millis),
|
|
||||||
|
|
||||||
sum_response_bytes: sea_orm::Set(sum_response_bytes),
|
|
||||||
min_response_bytes: sea_orm::Set(min_response_bytes),
|
|
||||||
mean_response_bytes: sea_orm::Set(mean_response_bytes),
|
|
||||||
p50_response_bytes: sea_orm::Set(p50_response_bytes),
|
|
||||||
p90_response_bytes: sea_orm::Set(p90_response_bytes),
|
|
||||||
p99_response_bytes: sea_orm::Set(p99_response_bytes),
|
|
||||||
max_response_bytes: sea_orm::Set(max_response_bytes),
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO: if this fails, rever adding the user, too
|
|
||||||
if let Err(err) = stat
|
|
||||||
.save(&self.db_conn)
|
|
||||||
.await
|
.await
|
||||||
.context("Saving rpc_accounting stat")
|
|
||||||
{
|
{
|
||||||
error!(?err, "unable to save aggregated stats");
|
error!(?err, "Unable to save stat while shutting down!");
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("stat saver exited");
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn aggregate_stat(&self, stat: Web3ProxyStat) -> anyhow::Result<()> {
|
|
||||||
trace!(?stat, "aggregating");
|
|
||||||
match stat {
|
|
||||||
Web3ProxyStat::ProxyResponse(stat) => {
|
|
||||||
// TODO: move this whole closure to another function?
|
|
||||||
|
|
||||||
debug_assert_eq!(stat.period_seconds, self.period_seconds);
|
|
||||||
|
|
||||||
// get the user cache for the current period
|
|
||||||
// TODO: i don't think this works right. maybe do DashMap entry api as the outer variable
|
|
||||||
let user_cache = self
|
|
||||||
.aggregated_proxy_responses
|
|
||||||
.get_with_by_ref(&stat.period_timestamp, async move { Default::default() })
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let key = (stat.rpc_key_id, stat.method, stat.error_response).into();
|
|
||||||
|
|
||||||
let user_aggregate = match user_cache.entry(key) {
|
|
||||||
Entry::Occupied(x) => x.get().clone(),
|
|
||||||
Entry::Vacant(y) => {
|
|
||||||
let histograms = ProxyResponseHistograms::default();
|
|
||||||
|
|
||||||
// TODO: create a counter here that we use to tell when it is safe to flush these? faster than waiting 3 periods
|
|
||||||
|
|
||||||
let aggregate = ProxyResponseAggregate {
|
|
||||||
period_timestamp: stat.period_timestamp,
|
|
||||||
// start most things at 0 because we add outside this getter
|
|
||||||
frontend_requests: 0.into(),
|
|
||||||
backend_requests: 0.into(),
|
|
||||||
backend_retries: 0.into(),
|
|
||||||
no_servers: 0.into(),
|
|
||||||
cache_misses: 0.into(),
|
|
||||||
cache_hits: 0.into(),
|
|
||||||
sum_request_bytes: 0.into(),
|
|
||||||
sum_response_bytes: 0.into(),
|
|
||||||
sum_response_millis: 0.into(),
|
|
||||||
histograms: AsyncMutex::new(histograms),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: store this arc in the map
|
|
||||||
// TODO: does this have a race condition?
|
|
||||||
|
|
||||||
let aggregate = Arc::new(aggregate);
|
|
||||||
|
|
||||||
y.insert(aggregate.clone());
|
|
||||||
|
|
||||||
aggregate
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// a stat always come from just 1 frontend request
|
|
||||||
user_aggregate
|
|
||||||
.frontend_requests
|
|
||||||
.fetch_add(1, Ordering::Acquire);
|
|
||||||
|
|
||||||
if stat.backend_requests == 0 {
|
|
||||||
// no backend request. cache hit!
|
|
||||||
user_aggregate.cache_hits.fetch_add(1, Ordering::Acquire);
|
|
||||||
} else {
|
|
||||||
// backend requests! cache miss!
|
|
||||||
user_aggregate.cache_misses.fetch_add(1, Ordering::Acquire);
|
|
||||||
|
|
||||||
// a stat might have multiple backend requests
|
|
||||||
user_aggregate
|
|
||||||
.backend_requests
|
|
||||||
.fetch_add(stat.backend_requests, Ordering::Acquire);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
user_aggregate
|
info!("aggregated stat_loop shut down");
|
||||||
.sum_request_bytes
|
|
||||||
.fetch_add(stat.request_bytes, Ordering::Release);
|
|
||||||
|
|
||||||
user_aggregate
|
|
||||||
.sum_response_bytes
|
|
||||||
.fetch_add(stat.response_bytes, Ordering::Release);
|
|
||||||
|
|
||||||
user_aggregate
|
|
||||||
.sum_response_millis
|
|
||||||
.fetch_add(stat.response_millis, Ordering::Release);
|
|
||||||
|
|
||||||
{
|
|
||||||
let mut histograms = user_aggregate.histograms.lock().await;
|
|
||||||
|
|
||||||
// TODO: use `record_correct`?
|
|
||||||
histograms.request_bytes.record(stat.request_bytes)?;
|
|
||||||
histograms.response_millis.record(stat.response_millis)?;
|
|
||||||
histograms.response_bytes.record(stat.response_bytes)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
461
web3_proxy/src/app_stats_old.rs
Normal file
461
web3_proxy/src/app_stats_old.rs
Normal file
@ -0,0 +1,461 @@
|
|||||||
|
use crate::frontend::authorization::{AuthorizedKey, RequestMetadata};
|
||||||
|
use crate::jsonrpc::JsonRpcForwardedResponse;
|
||||||
|
use anyhow::Context;
|
||||||
|
use chrono::{TimeZone, Utc};
|
||||||
|
use dashmap::mapref::entry::Entry;
|
||||||
|
use dashmap::DashMap;
|
||||||
|
use derive_more::From;
|
||||||
|
use entities::rpc_accounting;
|
||||||
|
use hdrhistogram::Histogram;
|
||||||
|
use moka::future::{Cache, CacheBuilder, ConcurrentCacheExt};
|
||||||
|
use sea_orm::{ActiveModelTrait, DatabaseConnection};
|
||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::sync::{broadcast, Mutex as AsyncMutex};
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
use tracing::{error, info, trace};
|
||||||
|
|
||||||
|
/// TODO: where should this be defined?
|
||||||
|
/// TODO: can we use something inside sea_orm instead?
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ProxyResponseStat {
|
||||||
|
rpc_key_id: u64,
|
||||||
|
method: String,
|
||||||
|
archive_request: bool,
|
||||||
|
period_seconds: u64,
|
||||||
|
period_timestamp: u64,
|
||||||
|
request_bytes: u64,
|
||||||
|
/// if backend_requests is 0, there was a cache_hit
|
||||||
|
backend_requests: u64,
|
||||||
|
error_response: bool,
|
||||||
|
response_bytes: u64,
|
||||||
|
response_millis: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type TimeBucketTimestamp = u64;
|
||||||
|
|
||||||
|
pub struct ProxyResponseHistograms {
|
||||||
|
request_bytes: Histogram<u64>,
|
||||||
|
response_bytes: Histogram<u64>,
|
||||||
|
response_millis: Histogram<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for ProxyResponseHistograms {
|
||||||
|
fn default() -> Self {
|
||||||
|
// TODO: how many significant figures?
|
||||||
|
let request_bytes = Histogram::new(5).expect("creating request_bytes histogram");
|
||||||
|
let response_bytes = Histogram::new(5).expect("creating response_bytes histogram");
|
||||||
|
let response_millis = Histogram::new(5).expect("creating response_millis histogram");
|
||||||
|
|
||||||
|
Self {
|
||||||
|
request_bytes,
|
||||||
|
response_bytes,
|
||||||
|
response_millis,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: impl From for our database model
|
||||||
|
pub struct ProxyResponseAggregate {
|
||||||
|
// these are the key
|
||||||
|
// rpc_key_id: u64,
|
||||||
|
// method: String,
|
||||||
|
// error_response: bool,
|
||||||
|
// TODO: this is the grandparent key. get it from there somehow
|
||||||
|
period_timestamp: u64,
|
||||||
|
archive_request: bool,
|
||||||
|
frontend_requests: AtomicU64,
|
||||||
|
backend_requests: AtomicU64,
|
||||||
|
backend_retries: AtomicU64,
|
||||||
|
no_servers: AtomicU64,
|
||||||
|
cache_misses: AtomicU64,
|
||||||
|
cache_hits: AtomicU64,
|
||||||
|
sum_request_bytes: AtomicU64,
|
||||||
|
sum_response_bytes: AtomicU64,
|
||||||
|
sum_response_millis: AtomicU64,
|
||||||
|
histograms: AsyncMutex<ProxyResponseHistograms>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, From, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
|
pub struct UserProxyResponseKey {
|
||||||
|
rpc_key_id: u64,
|
||||||
|
method: String,
|
||||||
|
error_response: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: think about nested maps more. does this need an arc?
|
||||||
|
pub type UserProxyResponseCache = Arc<DashMap<UserProxyResponseKey, Arc<ProxyResponseAggregate>>>;
|
||||||
|
/// key is the "time bucket's timestamp" (timestamp / period * period)
|
||||||
|
pub type TimeProxyResponseCache =
|
||||||
|
Cache<TimeBucketTimestamp, UserProxyResponseCache, hashbrown::hash_map::DefaultHashBuilder>;
|
||||||
|
|
||||||
|
pub struct StatEmitter {
|
||||||
|
chain_id: u64,
|
||||||
|
db_conn: DatabaseConnection,
|
||||||
|
period_seconds: u64,
|
||||||
|
/// the outer cache has a TTL and a handler for expiration
|
||||||
|
aggregated_proxy_responses: TimeProxyResponseCache,
|
||||||
|
save_rx: flume::Receiver<UserProxyResponseCache>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A stat that we aggregate and then store in a database.
|
||||||
|
#[derive(Debug, From)]
|
||||||
|
pub enum Web3ProxyStat {
|
||||||
|
Response(ProxyResponseStat),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ProxyResponseStat {
|
||||||
|
// TODO: should RequestMetadata be in an arc? or can we handle refs here?
|
||||||
|
pub fn new(
|
||||||
|
method: String,
|
||||||
|
authorized_key: AuthorizedKey,
|
||||||
|
metadata: Arc<RequestMetadata>,
|
||||||
|
response: &JsonRpcForwardedResponse,
|
||||||
|
) -> Self {
|
||||||
|
// TODO: do this without serializing to a string. this is going to slow us down!
|
||||||
|
let response_bytes = serde_json::to_string(response)
|
||||||
|
.expect("serializing here should always work")
|
||||||
|
.len() as u64;
|
||||||
|
|
||||||
|
let archive_request = metadata.archive_request.load(Ordering::Acquire);
|
||||||
|
let backend_requests = metadata.backend_requests.load(Ordering::Acquire);
|
||||||
|
let period_seconds = metadata.period_seconds;
|
||||||
|
let period_timestamp =
|
||||||
|
(metadata.start_datetime.timestamp() as u64) / period_seconds * period_seconds;
|
||||||
|
let request_bytes = metadata.request_bytes;
|
||||||
|
let error_response = metadata.error_response.load(Ordering::Acquire);
|
||||||
|
|
||||||
|
// TODO: timestamps could get confused by leap seconds. need tokio time instead
|
||||||
|
let response_millis = metadata.start_instant.elapsed().as_millis() as u64;
|
||||||
|
|
||||||
|
Self {
|
||||||
|
rpc_key_id: authorized_key.rpc_key_id,
|
||||||
|
archive_request,
|
||||||
|
method,
|
||||||
|
backend_requests,
|
||||||
|
period_seconds,
|
||||||
|
period_timestamp,
|
||||||
|
request_bytes,
|
||||||
|
error_response,
|
||||||
|
response_bytes,
|
||||||
|
response_millis,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StatEmitter {
|
||||||
|
pub fn new(chain_id: u64, db_conn: DatabaseConnection, period_seconds: u64) -> Arc<Self> {
|
||||||
|
let (save_tx, save_rx) = flume::unbounded();
|
||||||
|
|
||||||
|
// this needs to be long enough that there are definitely no outstanding queries
|
||||||
|
// TODO: what should the "safe" multiplier be? what if something is late?
|
||||||
|
// TODO: in most cases this delays more than necessary. think of how to do this without dashmap which might let us proceed
|
||||||
|
let ttl_seconds = period_seconds * 3;
|
||||||
|
|
||||||
|
let aggregated_proxy_responses = CacheBuilder::default()
|
||||||
|
.time_to_live(Duration::from_secs(ttl_seconds))
|
||||||
|
.eviction_listener_with_queued_delivery_mode(move |_, v, _| {
|
||||||
|
// this function must not panic!
|
||||||
|
if let Err(err) = save_tx.send(v) {
|
||||||
|
error!(?err, "unable to save. sender closed!");
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
|
||||||
|
|
||||||
|
let s = Self {
|
||||||
|
chain_id,
|
||||||
|
db_conn,
|
||||||
|
period_seconds,
|
||||||
|
aggregated_proxy_responses,
|
||||||
|
save_rx,
|
||||||
|
};
|
||||||
|
|
||||||
|
Arc::new(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn spawn(
|
||||||
|
self: Arc<Self>,
|
||||||
|
shutdown_receiver: broadcast::Receiver<()>,
|
||||||
|
) -> anyhow::Result<(
|
||||||
|
flume::Sender<Web3ProxyStat>,
|
||||||
|
JoinHandle<anyhow::Result<()>>,
|
||||||
|
JoinHandle<anyhow::Result<()>>,
|
||||||
|
)> {
|
||||||
|
let (aggregate_tx, aggregate_rx) = flume::unbounded::<Web3ProxyStat>();
|
||||||
|
|
||||||
|
let (finished_tx, finished_rx) = flume::bounded(1);
|
||||||
|
|
||||||
|
let aggregate_handle = tokio::spawn(self.clone().aggregate_stats_loop(
|
||||||
|
aggregate_rx,
|
||||||
|
shutdown_receiver,
|
||||||
|
finished_rx,
|
||||||
|
));
|
||||||
|
let save_handle = tokio::spawn(self.save_stats_loop(finished_tx));
|
||||||
|
|
||||||
|
Ok((aggregate_tx, aggregate_handle, save_handle))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// simple future that reads the channel and aggregates stats in a local cache.
|
||||||
|
async fn aggregate_stats_loop(
|
||||||
|
self: Arc<Self>,
|
||||||
|
aggregate_rx: flume::Receiver<Web3ProxyStat>,
|
||||||
|
mut shutdown_receiver: broadcast::Receiver<()>,
|
||||||
|
finished_rx: flume::Receiver<()>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
x = aggregate_rx.recv_async() => {
|
||||||
|
match x {
|
||||||
|
Ok(x) => {
|
||||||
|
trace!(?x, "aggregating stat");
|
||||||
|
|
||||||
|
// TODO: increment global stats (in redis? in local cache for prometheus?)
|
||||||
|
|
||||||
|
// TODO: batch stats?
|
||||||
|
// TODO: where can we wait on this handle?
|
||||||
|
let clone = self.clone();
|
||||||
|
tokio::spawn(async move { clone.aggregate_stat(x).await });
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
error!(?err, "aggregate_rx");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
x = shutdown_receiver.recv() => {
|
||||||
|
match x {
|
||||||
|
Ok(_) => {
|
||||||
|
info!("aggregate stats loop shutting down");
|
||||||
|
// TODO: call aggregate_stat for all the
|
||||||
|
},
|
||||||
|
Err(err) => error!(?err, "shutdown receiver"),
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// shutting down. force a save of any pending stats
|
||||||
|
// we do not use invalidate_all because that is done on a background thread
|
||||||
|
// TODO: i don't think this works
|
||||||
|
for (key, _) in self.aggregated_proxy_responses.into_iter() {
|
||||||
|
// TODO: call drain or remove or something instead?
|
||||||
|
self.aggregated_proxy_responses.invalidate(&key).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.aggregated_proxy_responses.sync();
|
||||||
|
|
||||||
|
todo!("drop self.aggregated_proxy_responses");
|
||||||
|
|
||||||
|
// TODO: timeout on this?
|
||||||
|
finished_rx.recv_async().await?;
|
||||||
|
|
||||||
|
info!("aggregate stats loop finished");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn save_stats_loop(
|
||||||
|
self: Arc<Self>,
|
||||||
|
finished_tx: flume::Sender<()>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
while let Ok(x) = self.save_rx.recv_async().await {
|
||||||
|
// TODO: batch these
|
||||||
|
// TODO: i'm not seeing these on shutdown
|
||||||
|
for x in x.iter() {
|
||||||
|
let k = x.key();
|
||||||
|
let v = x.value();
|
||||||
|
|
||||||
|
// TODO: this is a lot of variables
|
||||||
|
let period_datetime = Utc.timestamp(v.period_timestamp as i64, 0);
|
||||||
|
let frontend_requests = v.frontend_requests.load(Ordering::Acquire);
|
||||||
|
let backend_requests = v.backend_requests.load(Ordering::Acquire);
|
||||||
|
let backend_retries = v.backend_retries.load(Ordering::Acquire);
|
||||||
|
let no_servers = v.no_servers.load(Ordering::Acquire);
|
||||||
|
let cache_misses = v.cache_misses.load(Ordering::Acquire);
|
||||||
|
let cache_hits = v.cache_hits.load(Ordering::Acquire);
|
||||||
|
let sum_request_bytes = v.sum_request_bytes.load(Ordering::Acquire);
|
||||||
|
let sum_response_millis = v.sum_response_millis.load(Ordering::Acquire);
|
||||||
|
let sum_response_bytes = v.sum_response_bytes.load(Ordering::Acquire);
|
||||||
|
|
||||||
|
let histograms = v.histograms.lock().await;
|
||||||
|
|
||||||
|
let request_bytes = &histograms.request_bytes;
|
||||||
|
|
||||||
|
let min_request_bytes = request_bytes.min();
|
||||||
|
let mean_request_bytes = request_bytes.mean();
|
||||||
|
let p50_request_bytes = request_bytes.value_at_quantile(0.50);
|
||||||
|
let p90_request_bytes = request_bytes.value_at_quantile(0.90);
|
||||||
|
let p99_request_bytes = request_bytes.value_at_quantile(0.99);
|
||||||
|
let max_request_bytes = request_bytes.max();
|
||||||
|
|
||||||
|
let response_millis = &histograms.response_millis;
|
||||||
|
|
||||||
|
let min_response_millis = response_millis.min();
|
||||||
|
let mean_response_millis = response_millis.mean();
|
||||||
|
let p50_response_millis = response_millis.value_at_quantile(0.50);
|
||||||
|
let p90_response_millis = response_millis.value_at_quantile(0.90);
|
||||||
|
let p99_response_millis = response_millis.value_at_quantile(0.99);
|
||||||
|
let max_response_millis = response_millis.max();
|
||||||
|
|
||||||
|
let response_bytes = &histograms.response_bytes;
|
||||||
|
|
||||||
|
let min_response_bytes = response_bytes.min();
|
||||||
|
let mean_response_bytes = response_bytes.mean();
|
||||||
|
let p50_response_bytes = response_bytes.value_at_quantile(0.50);
|
||||||
|
let p90_response_bytes = response_bytes.value_at_quantile(0.90);
|
||||||
|
let p99_response_bytes = response_bytes.value_at_quantile(0.99);
|
||||||
|
let max_response_bytes = response_bytes.max();
|
||||||
|
|
||||||
|
drop(histograms);
|
||||||
|
|
||||||
|
let stat = rpc_accounting::ActiveModel {
|
||||||
|
id: sea_orm::NotSet,
|
||||||
|
|
||||||
|
rpc_key_id: sea_orm::Set(k.rpc_key_id),
|
||||||
|
chain_id: sea_orm::Set(self.chain_id),
|
||||||
|
method: sea_orm::Set(k.method.clone()),
|
||||||
|
archive_request: sea_orm::Set(v.archive_request),
|
||||||
|
error_response: sea_orm::Set(k.error_response),
|
||||||
|
period_datetime: sea_orm::Set(period_datetime),
|
||||||
|
frontend_requests: sea_orm::Set(frontend_requests),
|
||||||
|
backend_requests: sea_orm::Set(backend_requests),
|
||||||
|
backend_retries: sea_orm::Set(backend_retries),
|
||||||
|
no_servers: sea_orm::Set(no_servers),
|
||||||
|
cache_misses: sea_orm::Set(cache_misses),
|
||||||
|
cache_hits: sea_orm::Set(cache_hits),
|
||||||
|
|
||||||
|
sum_request_bytes: sea_orm::Set(sum_request_bytes),
|
||||||
|
min_request_bytes: sea_orm::Set(min_request_bytes),
|
||||||
|
mean_request_bytes: sea_orm::Set(mean_request_bytes),
|
||||||
|
p50_request_bytes: sea_orm::Set(p50_request_bytes),
|
||||||
|
p90_request_bytes: sea_orm::Set(p90_request_bytes),
|
||||||
|
p99_request_bytes: sea_orm::Set(p99_request_bytes),
|
||||||
|
max_request_bytes: sea_orm::Set(max_request_bytes),
|
||||||
|
|
||||||
|
sum_response_millis: sea_orm::Set(sum_response_millis),
|
||||||
|
min_response_millis: sea_orm::Set(min_response_millis),
|
||||||
|
mean_response_millis: sea_orm::Set(mean_response_millis),
|
||||||
|
p50_response_millis: sea_orm::Set(p50_response_millis),
|
||||||
|
p90_response_millis: sea_orm::Set(p90_response_millis),
|
||||||
|
p99_response_millis: sea_orm::Set(p99_response_millis),
|
||||||
|
max_response_millis: sea_orm::Set(max_response_millis),
|
||||||
|
|
||||||
|
sum_response_bytes: sea_orm::Set(sum_response_bytes),
|
||||||
|
min_response_bytes: sea_orm::Set(min_response_bytes),
|
||||||
|
mean_response_bytes: sea_orm::Set(mean_response_bytes),
|
||||||
|
p50_response_bytes: sea_orm::Set(p50_response_bytes),
|
||||||
|
p90_response_bytes: sea_orm::Set(p90_response_bytes),
|
||||||
|
p99_response_bytes: sea_orm::Set(p99_response_bytes),
|
||||||
|
max_response_bytes: sea_orm::Set(max_response_bytes),
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO: if this fails, what should we do?
|
||||||
|
if let Err(err) = stat
|
||||||
|
.save(&self.db_conn)
|
||||||
|
.await
|
||||||
|
.context("Saving rpc_accounting stat")
|
||||||
|
{
|
||||||
|
error!(?err, "unable to save aggregated stats");
|
||||||
|
} else {
|
||||||
|
trace!("stat saved");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("stat saver exited");
|
||||||
|
|
||||||
|
finished_tx.send_async(()).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn aggregate_stat(&self, stat: Web3ProxyStat) -> anyhow::Result<()> {
|
||||||
|
match stat {
|
||||||
|
Web3ProxyStat::Response(stat) => {
|
||||||
|
// TODO: move this whole closure to another function?
|
||||||
|
|
||||||
|
debug_assert_eq!(stat.period_seconds, self.period_seconds);
|
||||||
|
|
||||||
|
// get the user cache for the current period
|
||||||
|
// TODO: i don't think this works right. maybe do DashMap entry api as the outer variable
|
||||||
|
let user_cache = self
|
||||||
|
.aggregated_proxy_responses
|
||||||
|
.get_with(stat.period_timestamp, async move { Default::default() })
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let key = (stat.rpc_key_id, stat.method, stat.error_response).into();
|
||||||
|
|
||||||
|
let user_aggregate = match user_cache.entry(key) {
|
||||||
|
Entry::Occupied(x) => x.get().clone(),
|
||||||
|
Entry::Vacant(y) => {
|
||||||
|
let histograms = ProxyResponseHistograms::default();
|
||||||
|
|
||||||
|
// TODO: create a counter here that we use to tell when it is safe to flush these? faster than waiting 3 periods
|
||||||
|
|
||||||
|
let aggregate = ProxyResponseAggregate {
|
||||||
|
period_timestamp: stat.period_timestamp,
|
||||||
|
archive_request: stat.archive_request,
|
||||||
|
// start most things at 0 because we add outside this getter
|
||||||
|
frontend_requests: 0.into(),
|
||||||
|
backend_requests: 0.into(),
|
||||||
|
backend_retries: 0.into(),
|
||||||
|
no_servers: 0.into(),
|
||||||
|
cache_misses: 0.into(),
|
||||||
|
cache_hits: 0.into(),
|
||||||
|
sum_request_bytes: 0.into(),
|
||||||
|
sum_response_bytes: 0.into(),
|
||||||
|
sum_response_millis: 0.into(),
|
||||||
|
histograms: AsyncMutex::new(histograms),
|
||||||
|
};
|
||||||
|
|
||||||
|
let aggregate = Arc::new(aggregate);
|
||||||
|
|
||||||
|
y.insert(aggregate).clone()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// a stat always come from just 1 frontend request
|
||||||
|
user_aggregate
|
||||||
|
.frontend_requests
|
||||||
|
.fetch_add(1, Ordering::Acquire);
|
||||||
|
|
||||||
|
if stat.backend_requests == 0 {
|
||||||
|
// no backend request. cache hit!
|
||||||
|
user_aggregate.cache_hits.fetch_add(1, Ordering::Acquire);
|
||||||
|
} else {
|
||||||
|
// backend requests! cache miss!
|
||||||
|
user_aggregate.cache_misses.fetch_add(1, Ordering::Acquire);
|
||||||
|
|
||||||
|
// a stat might have multiple backend requests
|
||||||
|
user_aggregate
|
||||||
|
.backend_requests
|
||||||
|
.fetch_add(stat.backend_requests, Ordering::Acquire);
|
||||||
|
}
|
||||||
|
|
||||||
|
user_aggregate
|
||||||
|
.sum_request_bytes
|
||||||
|
.fetch_add(stat.request_bytes, Ordering::Release);
|
||||||
|
|
||||||
|
user_aggregate
|
||||||
|
.sum_response_bytes
|
||||||
|
.fetch_add(stat.response_bytes, Ordering::Release);
|
||||||
|
|
||||||
|
user_aggregate
|
||||||
|
.sum_response_millis
|
||||||
|
.fetch_add(stat.response_millis, Ordering::Release);
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut histograms = user_aggregate.histograms.lock().await;
|
||||||
|
|
||||||
|
// TODO: use `record_correct`?
|
||||||
|
histograms.request_bytes.record(stat.request_bytes)?;
|
||||||
|
histograms.response_millis.record(stat.response_millis)?;
|
||||||
|
histograms.response_bytes.record(stat.response_bytes)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
@ -65,6 +65,8 @@ pub struct RequestMetadata {
|
|||||||
// TODO: better name for this
|
// TODO: better name for this
|
||||||
pub period_seconds: u64,
|
pub period_seconds: u64,
|
||||||
pub request_bytes: u64,
|
pub request_bytes: u64,
|
||||||
|
// TODO: "archive" isn't really a boolean.
|
||||||
|
pub archive_request: AtomicBool,
|
||||||
/// if this is 0, there was a cache_hit
|
/// if this is 0, there was a cache_hit
|
||||||
pub backend_requests: AtomicU64,
|
pub backend_requests: AtomicU64,
|
||||||
pub no_servers: AtomicU64,
|
pub no_servers: AtomicU64,
|
||||||
@ -96,6 +98,7 @@ impl RequestMetadata {
|
|||||||
start_datetime: Utc::now(),
|
start_datetime: Utc::now(),
|
||||||
period_seconds,
|
period_seconds,
|
||||||
request_bytes,
|
request_bytes,
|
||||||
|
archive_request: false.into(),
|
||||||
backend_requests: 0.into(),
|
backend_requests: 0.into(),
|
||||||
no_servers: 0.into(),
|
no_servers: 0.into(),
|
||||||
error_response: false.into(),
|
error_response: false.into(),
|
||||||
|
@ -14,7 +14,7 @@ use redis_rate_limiter::redis::RedisError;
|
|||||||
use reqwest::header::ToStrError;
|
use reqwest::header::ToStrError;
|
||||||
use sea_orm::DbErr;
|
use sea_orm::DbErr;
|
||||||
use std::{error::Error, net::IpAddr};
|
use std::{error::Error, net::IpAddr};
|
||||||
use tokio::time::Instant;
|
use tokio::{task::JoinError, time::Instant};
|
||||||
use tracing::{instrument, trace, warn};
|
use tracing::{instrument, trace, warn};
|
||||||
|
|
||||||
// TODO: take "IntoResponse" instead of Response?
|
// TODO: take "IntoResponse" instead of Response?
|
||||||
@ -30,6 +30,7 @@ pub enum FrontendErrorResponse {
|
|||||||
HeaderToString(ToStrError),
|
HeaderToString(ToStrError),
|
||||||
InvalidHeaderValue(InvalidHeaderValue),
|
InvalidHeaderValue(InvalidHeaderValue),
|
||||||
IpAddrParse(AddrParseError),
|
IpAddrParse(AddrParseError),
|
||||||
|
JoinError(JoinError),
|
||||||
NotFound,
|
NotFound,
|
||||||
RateLimitedUser(UserKeyData, Option<Instant>),
|
RateLimitedUser(UserKeyData, Option<Instant>),
|
||||||
RateLimitedIp(IpAddr, Option<Instant>),
|
RateLimitedIp(IpAddr, Option<Instant>),
|
||||||
@ -114,6 +115,17 @@ impl IntoResponse for FrontendErrorResponse {
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
Self::JoinError(err) => {
|
||||||
|
warn!(?err, "JoinError. likely shutting down");
|
||||||
|
(
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
JsonRpcForwardedResponse::from_str(
|
||||||
|
"Unable to complete request",
|
||||||
|
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
|
||||||
|
None,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
Self::NotFound => {
|
Self::NotFound => {
|
||||||
// TODO: emit a stat?
|
// TODO: emit a stat?
|
||||||
// TODO: instead of an error, show a normal html page for 404
|
// TODO: instead of an error, show a normal html page for 404
|
||||||
|
@ -87,7 +87,8 @@ pub async fn proxy_web3_rpc_with_key(
|
|||||||
.await
|
.await
|
||||||
});
|
});
|
||||||
|
|
||||||
let response = f.await.expect("JoinHandle should always work")?;
|
// if this is an error, we are likely shutting down
|
||||||
|
let response = f.await??;
|
||||||
|
|
||||||
Ok(Json(&response).into_response())
|
Ok(Json(&response).into_response())
|
||||||
}
|
}
|
||||||
|
@ -139,25 +139,30 @@ impl Web3Connections {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Convenience method to get the cannonical block at a given block height.
|
/// Convenience method to get the cannonical block at a given block height.
|
||||||
pub async fn block_hash(&self, num: &U64) -> anyhow::Result<H256> {
|
pub async fn block_hash(&self, num: &U64) -> anyhow::Result<(H256, bool)> {
|
||||||
let block = self.cannonical_block(num).await?;
|
let (block, is_archive_block) = self.cannonical_block(num).await?;
|
||||||
|
|
||||||
let hash = block.hash.unwrap();
|
let hash = block.hash.expect("Saved blocks should always have hashes");
|
||||||
|
|
||||||
Ok(hash)
|
Ok((hash, is_archive_block))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the heaviest chain's block from cache or backend rpc
|
/// Get the heaviest chain's block from cache or backend rpc
|
||||||
pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result<ArcBlock> {
|
pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result<(ArcBlock, bool)> {
|
||||||
// we only have blocks by hash now
|
// we only have blocks by hash now
|
||||||
// maybe save them during save_block in a blocks_by_number Cache<U64, Vec<ArcBlock>>
|
// maybe save them during save_block in a blocks_by_number Cache<U64, Vec<ArcBlock>>
|
||||||
// if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations)
|
// if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations)
|
||||||
|
|
||||||
// be sure the requested block num exists
|
// be sure the requested block num exists
|
||||||
let head_block_num = self.head_block_num().context("no servers in sync")?;
|
let head_block_num = self.head_block_num().context("no servers in sync")?;
|
||||||
|
|
||||||
|
// TODO: not 64 on all chains? get from config?
|
||||||
|
let archive_needed = num < &(head_block_num - U64::from(64));
|
||||||
|
|
||||||
if num > &head_block_num {
|
if num > &head_block_num {
|
||||||
// TODO: i'm seeing this a lot when using ethspam. i dont know why though. i thought we delayed publishing
|
// TODO: i'm seeing this a lot when using ethspam. i dont know why though. i thought we delayed publishing
|
||||||
// TODO: instead of error, maybe just sleep and try again?
|
// TODO: instead of error, maybe just sleep and try again?
|
||||||
|
// TODO: this should be a 401, not a 500
|
||||||
return Err(anyhow::anyhow!(
|
return Err(anyhow::anyhow!(
|
||||||
"Head block is #{}, but #{} was requested",
|
"Head block is #{}, but #{} was requested",
|
||||||
head_block_num,
|
head_block_num,
|
||||||
@ -170,7 +175,9 @@ impl Web3Connections {
|
|||||||
if let Some(block_hash) = self.block_numbers.get(num) {
|
if let Some(block_hash) = self.block_numbers.get(num) {
|
||||||
// TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set
|
// TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set
|
||||||
// TODO: pass authorized_request through here?
|
// TODO: pass authorized_request through here?
|
||||||
return self.block(None, &block_hash, None).await;
|
let block = self.block(None, &block_hash, None).await?;
|
||||||
|
|
||||||
|
return Ok((block, archive_needed));
|
||||||
}
|
}
|
||||||
|
|
||||||
// block number not in cache. we need to ask an rpc for it
|
// block number not in cache. we need to ask an rpc for it
|
||||||
@ -193,7 +200,7 @@ impl Web3Connections {
|
|||||||
// the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain
|
// the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain
|
||||||
self.save_block(&block, true).await?;
|
self.save_block(&block, true).await?;
|
||||||
|
|
||||||
Ok(block)
|
Ok((block, true))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) async fn process_incoming_blocks(
|
pub(super) async fn process_incoming_blocks(
|
||||||
|
@ -384,7 +384,7 @@ impl Web3Connection {
|
|||||||
|
|
||||||
// if we already have this block saved, set new_head_block to that arc. otherwise store this copy
|
// if we already have this block saved, set new_head_block to that arc. otherwise store this copy
|
||||||
new_head_block = block_map
|
new_head_block = block_map
|
||||||
.get_with_by_ref(&new_hash, async move { new_head_block })
|
.get_with(new_hash, async move { new_head_block })
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let new_num = new_head_block.number.unwrap_or_default();
|
let new_num = new_head_block.number.unwrap_or_default();
|
||||||
|
@ -204,15 +204,15 @@ pub async fn get_aggregate_rpc_stats_from_params(
|
|||||||
rpc_accounting::Column::FrontendRequests.sum(),
|
rpc_accounting::Column::FrontendRequests.sum(),
|
||||||
"total_requests",
|
"total_requests",
|
||||||
)
|
)
|
||||||
|
.column_as(
|
||||||
|
rpc_accounting::Column::BackendRequests.sum(),
|
||||||
|
"total_backend_retries",
|
||||||
|
)
|
||||||
.column_as(
|
.column_as(
|
||||||
rpc_accounting::Column::CacheMisses.sum(),
|
rpc_accounting::Column::CacheMisses.sum(),
|
||||||
"total_cache_misses",
|
"total_cache_misses",
|
||||||
)
|
)
|
||||||
.column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits")
|
.column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits")
|
||||||
.column_as(
|
|
||||||
rpc_accounting::Column::BackendRetries.sum(),
|
|
||||||
"total_backend_retries",
|
|
||||||
)
|
|
||||||
.column_as(
|
.column_as(
|
||||||
rpc_accounting::Column::SumResponseBytes.sum(),
|
rpc_accounting::Column::SumResponseBytes.sum(),
|
||||||
"total_response_bytes",
|
"total_response_bytes",
|
||||||
@ -355,15 +355,15 @@ pub async fn get_detailed_stats(
|
|||||||
rpc_accounting::Column::FrontendRequests.sum(),
|
rpc_accounting::Column::FrontendRequests.sum(),
|
||||||
"total_requests",
|
"total_requests",
|
||||||
)
|
)
|
||||||
|
.column_as(
|
||||||
|
rpc_accounting::Column::BackendRequests.sum(),
|
||||||
|
"total_backend_requests",
|
||||||
|
)
|
||||||
.column_as(
|
.column_as(
|
||||||
rpc_accounting::Column::CacheMisses.sum(),
|
rpc_accounting::Column::CacheMisses.sum(),
|
||||||
"total_cache_misses",
|
"total_cache_misses",
|
||||||
)
|
)
|
||||||
.column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits")
|
.column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits")
|
||||||
.column_as(
|
|
||||||
rpc_accounting::Column::BackendRetries.sum(),
|
|
||||||
"total_backend_retries",
|
|
||||||
)
|
|
||||||
.column_as(
|
.column_as(
|
||||||
rpc_accounting::Column::SumResponseBytes.sum(),
|
rpc_accounting::Column::SumResponseBytes.sum(),
|
||||||
"total_response_bytes",
|
"total_response_bytes",
|
||||||
|
Loading…
Reference in New Issue
Block a user