this might be too many columns
This commit is contained in:
parent
da2bd0b0a5
commit
cfd26940a9
5
Cargo.lock
generated
5
Cargo.lock
generated
@ -2208,9 +2208,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hdrhistogram"
|
name = "hdrhistogram"
|
||||||
version = "7.5.1"
|
version = "7.5.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "6ea9fe3952d32674a14e0975009a3547af9ea364995b5ec1add2e23c2ae523ab"
|
checksum = "7f19b9f54f7c7f55e31401bb647626ce0cf0f67b0004982ce815b3ee72a02aa8"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"base64 0.13.0",
|
"base64 0.13.0",
|
||||||
"byteorder",
|
"byteorder",
|
||||||
@ -5513,6 +5513,7 @@ dependencies = [
|
|||||||
"futures",
|
"futures",
|
||||||
"handlebars",
|
"handlebars",
|
||||||
"hashbrown",
|
"hashbrown",
|
||||||
|
"hdrhistogram",
|
||||||
"http",
|
"http",
|
||||||
"ipnet",
|
"ipnet",
|
||||||
"metered",
|
"metered",
|
||||||
|
@ -3,20 +3,43 @@
|
|||||||
use sea_orm::entity::prelude::*;
|
use sea_orm::entity::prelude::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
|
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)]
|
||||||
#[sea_orm(table_name = "rpc_accounting")]
|
#[sea_orm(table_name = "rpc_accounting")]
|
||||||
|
/// TODO: make migrations match these items
|
||||||
pub struct Model {
|
pub struct Model {
|
||||||
#[sea_orm(primary_key)]
|
#[sea_orm(primary_key)]
|
||||||
pub id: u64,
|
pub id: u64,
|
||||||
pub user_key_id: u64,
|
pub user_key_id: u64,
|
||||||
pub chain_id: u64,
|
pub chain_id: u64,
|
||||||
pub timestamp: DateTimeUtc,
|
|
||||||
pub method: String,
|
pub method: String,
|
||||||
pub backend_requests: u32,
|
|
||||||
pub error_response: bool,
|
pub error_response: bool,
|
||||||
pub query_millis: u32,
|
pub period_datetime: DateTimeUtc,
|
||||||
pub request_bytes: u32,
|
pub frontend_requests: u32,
|
||||||
pub response_bytes: u32,
|
pub backend_requests: u32,
|
||||||
|
pub backend_retries: u32,
|
||||||
|
pub cache_misses: u32,
|
||||||
|
pub cache_hits: u32,
|
||||||
|
pub sum_request_bytes: u64,
|
||||||
|
pub min_request_bytes: u64,
|
||||||
|
pub mean_request_bytes: f64,
|
||||||
|
pub p50_request_bytes: u64,
|
||||||
|
pub p90_request_bytes: u64,
|
||||||
|
pub p99_request_bytes: u64,
|
||||||
|
pub max_request_bytes: u64,
|
||||||
|
pub sum_response_millis: u64,
|
||||||
|
pub min_response_millis: u64,
|
||||||
|
pub mean_response_millis: f64,
|
||||||
|
pub p50_response_millis: u64,
|
||||||
|
pub p90_response_millis: u64,
|
||||||
|
pub p99_response_millis: u64,
|
||||||
|
pub max_response_millis: u64,
|
||||||
|
pub sum_response_bytes: u64,
|
||||||
|
pub min_response_bytes: u64,
|
||||||
|
pub mean_response_bytes: f64,
|
||||||
|
pub p50_response_bytes: u64,
|
||||||
|
pub p90_response_bytes: u64,
|
||||||
|
pub p99_response_bytes: u64,
|
||||||
|
pub max_response_bytes: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||||
|
@ -28,53 +28,154 @@ impl MigrationTrait for Migration {
|
|||||||
.big_unsigned()
|
.big_unsigned()
|
||||||
.not_null(),
|
.not_null(),
|
||||||
)
|
)
|
||||||
.col(
|
|
||||||
ColumnDef::new(RpcAccounting::Timestamp)
|
|
||||||
.timestamp()
|
|
||||||
.not_null(),
|
|
||||||
)
|
|
||||||
.col(ColumnDef::new(RpcAccounting::Method).string().not_null())
|
.col(ColumnDef::new(RpcAccounting::Method).string().not_null())
|
||||||
.col(
|
|
||||||
ColumnDef::new(RpcAccounting::FrontendRequests)
|
|
||||||
.unsigned()
|
|
||||||
.not_null(),
|
|
||||||
)
|
|
||||||
.col(
|
|
||||||
// 0 means cache hit
|
|
||||||
// 1 is hopefully what most require
|
|
||||||
// but there might be more if retries were necessary
|
|
||||||
ColumnDef::new(RpcAccounting::BackendRequests)
|
|
||||||
.unsigned()
|
|
||||||
.not_null(),
|
|
||||||
)
|
|
||||||
.col(
|
.col(
|
||||||
ColumnDef::new(RpcAccounting::ErrorResponse)
|
ColumnDef::new(RpcAccounting::ErrorResponse)
|
||||||
.boolean()
|
.boolean()
|
||||||
.not_null(),
|
.not_null(),
|
||||||
)
|
)
|
||||||
.col(
|
.col(
|
||||||
ColumnDef::new(RpcAccounting::QueryMillis)
|
ColumnDef::new(RpcAccounting::PeriodDatetime)
|
||||||
.unsigned()
|
.timestamp()
|
||||||
.not_null(),
|
.not_null(),
|
||||||
)
|
)
|
||||||
.col(
|
.col(
|
||||||
ColumnDef::new(RpcAccounting::RequestBytes)
|
ColumnDef::new(RpcAccounting::FrontendRequests)
|
||||||
.unsigned()
|
.big_unsigned()
|
||||||
.not_null(),
|
.not_null(),
|
||||||
)
|
)
|
||||||
.col(
|
.col(
|
||||||
ColumnDef::new(RpcAccounting::ResponseBytes)
|
ColumnDef::new(RpcAccounting::BackendRequests)
|
||||||
.unsigned()
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::BackendRetries)
|
||||||
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::CacheMisses)
|
||||||
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::CacheHits)
|
||||||
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::SumRequestBytes)
|
||||||
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::MinRequestBytes)
|
||||||
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::MeanRequestBytes)
|
||||||
|
.float_len(64)
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::P50RequestBytes)
|
||||||
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::P90RequestBytes)
|
||||||
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::P99RequestBytes)
|
||||||
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::MaxRequestBytes)
|
||||||
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::SumResponseMillis)
|
||||||
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::MinResponseMillis)
|
||||||
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::MeanResponseMillis)
|
||||||
|
.float_len(64)
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::P50ResponseMillis)
|
||||||
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::P90ResponseMillis)
|
||||||
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::P99ResponseMillis)
|
||||||
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::MaxResponseMillis)
|
||||||
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::SumResponseBytes)
|
||||||
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::MinResponseBytes)
|
||||||
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::MeanResponseBytes)
|
||||||
|
.float_len(64)
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::P50ResponseBytes)
|
||||||
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::P90ResponseBytes)
|
||||||
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::P99ResponseBytes)
|
||||||
|
.big_unsigned()
|
||||||
|
.not_null(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RpcAccounting::MaxResponseBytes)
|
||||||
|
.big_unsigned()
|
||||||
.not_null(),
|
.not_null(),
|
||||||
)
|
)
|
||||||
.index(sea_query::Index::create().col(RpcAccounting::Timestamp))
|
|
||||||
.index(sea_query::Index::create().col(RpcAccounting::Method))
|
|
||||||
.index(sea_query::Index::create().col(RpcAccounting::BackendRequests))
|
|
||||||
.foreign_key(
|
.foreign_key(
|
||||||
sea_query::ForeignKey::create()
|
sea_query::ForeignKey::create()
|
||||||
.from(RpcAccounting::Table, RpcAccounting::UserKeyId)
|
.from(RpcAccounting::Table, RpcAccounting::UserKeyId)
|
||||||
.to(UserKeys::Table, UserKeys::Id),
|
.to(UserKeys::Table, UserKeys::Id),
|
||||||
)
|
)
|
||||||
|
.index(sea_query::Index::create().col(RpcAccounting::PeriodDatetime))
|
||||||
|
.index(sea_query::Index::create().col(RpcAccounting::Method))
|
||||||
.to_owned(),
|
.to_owned(),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@ -98,14 +199,35 @@ pub enum UserKeys {
|
|||||||
enum RpcAccounting {
|
enum RpcAccounting {
|
||||||
Table,
|
Table,
|
||||||
Id,
|
Id,
|
||||||
Timestamp,
|
|
||||||
UserKeyId,
|
UserKeyId,
|
||||||
ChainId,
|
ChainId,
|
||||||
Method,
|
Method,
|
||||||
|
ErrorResponse,
|
||||||
|
PeriodDatetime,
|
||||||
FrontendRequests,
|
FrontendRequests,
|
||||||
BackendRequests,
|
BackendRequests,
|
||||||
ErrorResponse,
|
BackendRetries,
|
||||||
QueryMillis,
|
CacheMisses,
|
||||||
RequestBytes,
|
CacheHits,
|
||||||
ResponseBytes,
|
SumRequestBytes,
|
||||||
|
MinRequestBytes,
|
||||||
|
MeanRequestBytes,
|
||||||
|
P50RequestBytes,
|
||||||
|
P90RequestBytes,
|
||||||
|
P99RequestBytes,
|
||||||
|
MaxRequestBytes,
|
||||||
|
SumResponseMillis,
|
||||||
|
MinResponseMillis,
|
||||||
|
MeanResponseMillis,
|
||||||
|
P50ResponseMillis,
|
||||||
|
P90ResponseMillis,
|
||||||
|
P99ResponseMillis,
|
||||||
|
MaxResponseMillis,
|
||||||
|
SumResponseBytes,
|
||||||
|
MinResponseBytes,
|
||||||
|
MeanResponseBytes,
|
||||||
|
P50ResponseBytes,
|
||||||
|
P90ResponseBytes,
|
||||||
|
P99ResponseBytes,
|
||||||
|
MaxResponseBytes,
|
||||||
}
|
}
|
||||||
|
@ -35,6 +35,7 @@ fdlimit = "0.2.1"
|
|||||||
flume = "0.10.14"
|
flume = "0.10.14"
|
||||||
futures = { version = "0.3.24", features = ["thread-pool"] }
|
futures = { version = "0.3.24", features = ["thread-pool"] }
|
||||||
hashbrown = { version = "0.12.3", features = ["serde"] }
|
hashbrown = { version = "0.12.3", features = ["serde"] }
|
||||||
|
hdrhistogram = "7.5.2"
|
||||||
http = "0.2.8"
|
http = "0.2.8"
|
||||||
ipnet = "2.5.0"
|
ipnet = "2.5.0"
|
||||||
metered = { version = "0.9.0", features = ["serialize"] }
|
metered = { version = "0.9.0", features = ["serialize"] }
|
||||||
|
@ -3,7 +3,7 @@ use crate::app::{UserKeyData, Web3ProxyApp};
|
|||||||
use crate::jsonrpc::JsonRpcRequest;
|
use crate::jsonrpc::JsonRpcRequest;
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use axum::headers::{authorization::Bearer, Origin, Referer, UserAgent};
|
use axum::headers::{authorization::Bearer, Origin, Referer, UserAgent};
|
||||||
use chrono::{Utc};
|
use chrono::Utc;
|
||||||
use deferred_rate_limiter::DeferredRateLimitResult;
|
use deferred_rate_limiter::DeferredRateLimitResult;
|
||||||
use entities::user_keys;
|
use entities::user_keys;
|
||||||
use ipnet::IpNet;
|
use ipnet::IpNet;
|
||||||
@ -13,7 +13,7 @@ use sea_orm::{prelude::Decimal, ColumnTrait, DatabaseConnection, EntityTrait, Qu
|
|||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use std::fmt::Display;
|
use std::fmt::Display;
|
||||||
use std::mem::size_of_val;
|
use std::mem::size_of_val;
|
||||||
use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU32, AtomicUsize};
|
use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU64};
|
||||||
use std::{net::IpAddr, str::FromStr, sync::Arc};
|
use std::{net::IpAddr, str::FromStr, sync::Arc};
|
||||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||||
use tokio::time::Instant;
|
use tokio::time::Instant;
|
||||||
@ -55,11 +55,11 @@ pub struct AuthorizedKey {
|
|||||||
#[derive(Debug, Default, Serialize)]
|
#[derive(Debug, Default, Serialize)]
|
||||||
pub struct RequestMetadata {
|
pub struct RequestMetadata {
|
||||||
pub timestamp: u64,
|
pub timestamp: u64,
|
||||||
pub request_bytes: AtomicUsize,
|
pub request_bytes: AtomicU64,
|
||||||
pub backend_requests: AtomicU16,
|
pub backend_requests: AtomicU16,
|
||||||
pub error_response: AtomicBool,
|
pub error_response: AtomicBool,
|
||||||
pub response_bytes: AtomicUsize,
|
pub response_bytes: AtomicU64,
|
||||||
pub response_millis: AtomicU32,
|
pub response_millis: AtomicU64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize)]
|
#[derive(Clone, Debug, Serialize)]
|
||||||
@ -74,7 +74,7 @@ pub enum AuthorizedRequest {
|
|||||||
|
|
||||||
impl RequestMetadata {
|
impl RequestMetadata {
|
||||||
pub fn new(request: &JsonRpcRequest) -> Self {
|
pub fn new(request: &JsonRpcRequest) -> Self {
|
||||||
let request_bytes = size_of_val(request);
|
let request_bytes = size_of_val(request) as u64;
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
request_bytes: request_bytes.into(),
|
request_bytes: request_bytes.into(),
|
||||||
|
@ -3,14 +3,13 @@ use anyhow::Context;
|
|||||||
use chrono::{TimeZone, Utc};
|
use chrono::{TimeZone, Utc};
|
||||||
use derive_more::From;
|
use derive_more::From;
|
||||||
use entities::rpc_accounting;
|
use entities::rpc_accounting;
|
||||||
|
use hdrhistogram::Histogram;
|
||||||
use moka::future::{Cache, CacheBuilder};
|
use moka::future::{Cache, CacheBuilder};
|
||||||
use sea_orm::{ActiveModelTrait, DatabaseConnection};
|
use sea_orm::{ActiveModelTrait, DatabaseConnection};
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{
|
use std::{sync::atomic::AtomicU32, time::Duration};
|
||||||
sync::atomic::{AtomicU32, AtomicU64},
|
use tokio::sync::Mutex as AsyncMutex;
|
||||||
time::Duration,
|
|
||||||
};
|
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
use tracing::{error, info, trace};
|
use tracing::{error, info, trace};
|
||||||
|
|
||||||
@ -23,31 +22,45 @@ pub struct ProxyResponseStat {
|
|||||||
metadata: RequestMetadata,
|
metadata: RequestMetadata,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub type TimeBucketTimestamp = u64;
|
||||||
|
|
||||||
// TODO: impl From for our database model
|
// TODO: impl From for our database model
|
||||||
#[derive(Default)]
|
|
||||||
pub struct ProxyResponseAggregate {
|
pub struct ProxyResponseAggregate {
|
||||||
|
// these are the key
|
||||||
// user_key_id: u64,
|
// user_key_id: u64,
|
||||||
// method: String,
|
// method: String,
|
||||||
// error_response: bool,
|
// error_response: bool,
|
||||||
first_timestamp: u64,
|
// TODO: this is the grandparent key. get it from there somehow
|
||||||
|
period_timestamp: u64,
|
||||||
frontend_requests: AtomicU32,
|
frontend_requests: AtomicU32,
|
||||||
backend_requests: AtomicU32,
|
backend_requests: AtomicU32,
|
||||||
last_timestamp: AtomicU64,
|
backend_retries: AtomicU32,
|
||||||
first_response_millis: u32,
|
cache_misses: AtomicU32,
|
||||||
sum_response_millis: AtomicU32,
|
cache_hits: AtomicU32,
|
||||||
sum_request_bytes: AtomicUsize,
|
request_bytes: AsyncMutex<Histogram<u64>>,
|
||||||
sum_response_bytes: AtomicUsize,
|
sum_request_bytes: AtomicU64,
|
||||||
|
response_bytes: AsyncMutex<Histogram<u64>>,
|
||||||
|
sum_response_bytes: AtomicU64,
|
||||||
|
response_millis: AsyncMutex<Histogram<u64>>,
|
||||||
|
sum_response_millis: AtomicU64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, From, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
|
pub struct UserProxyResponseKey {
|
||||||
|
user_key_id: u64,
|
||||||
|
method: String,
|
||||||
|
error_response: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// key is the (user_key_id, method, error_response)
|
/// key is the (user_key_id, method, error_response)
|
||||||
pub type UserProxyResponseCache = Cache<
|
pub type UserProxyResponseCache = Cache<
|
||||||
(u64, String, bool),
|
UserProxyResponseKey,
|
||||||
Arc<ProxyResponseAggregate>,
|
Arc<ProxyResponseAggregate>,
|
||||||
hashbrown::hash_map::DefaultHashBuilder,
|
hashbrown::hash_map::DefaultHashBuilder,
|
||||||
>;
|
>;
|
||||||
/// key is the "time bucket" (timestamp / period)
|
/// key is the "time bucket's timestamp" (timestamp / period * period)
|
||||||
pub type TimeProxyResponseCache =
|
pub type TimeProxyResponseCache =
|
||||||
Cache<u64, UserProxyResponseCache, hashbrown::hash_map::DefaultHashBuilder>;
|
Cache<TimeBucketTimestamp, UserProxyResponseCache, hashbrown::hash_map::DefaultHashBuilder>;
|
||||||
|
|
||||||
pub struct StatEmitter {
|
pub struct StatEmitter {
|
||||||
chain_id: u64,
|
chain_id: u64,
|
||||||
@ -79,9 +92,14 @@ impl StatEmitter {
|
|||||||
pub fn new(chain_id: u64, db_conn: DatabaseConnection, period_seconds: u64) -> Arc<Self> {
|
pub fn new(chain_id: u64, db_conn: DatabaseConnection, period_seconds: u64) -> Arc<Self> {
|
||||||
let (save_tx, save_rx) = flume::unbounded();
|
let (save_tx, save_rx) = flume::unbounded();
|
||||||
|
|
||||||
|
// this needs to be long enough that there are definitely no outstanding queries
|
||||||
|
// TODO: what should the "safe" multiplier be? what if something is late?
|
||||||
|
let ttl_seconds = period_seconds * 3;
|
||||||
|
|
||||||
let aggregated_proxy_responses = CacheBuilder::default()
|
let aggregated_proxy_responses = CacheBuilder::default()
|
||||||
.time_to_live(Duration::from_secs(period_seconds * 3 / 2))
|
.time_to_live(Duration::from_secs(ttl_seconds))
|
||||||
.eviction_listener_with_queued_delivery_mode(move |k, v, r| {
|
.eviction_listener_with_queued_delivery_mode(move |_, v, _| {
|
||||||
|
// this function must not panic!
|
||||||
if let Err(err) = save_tx.send(v) {
|
if let Err(err) = save_tx.send(v) {
|
||||||
error!(?err, "unable to save. sender closed!");
|
error!(?err, "unable to save. sender closed!");
|
||||||
}
|
}
|
||||||
@ -108,108 +126,159 @@ impl StatEmitter {
|
|||||||
)> {
|
)> {
|
||||||
let (aggregate_tx, aggregate_rx) = flume::unbounded::<Web3ProxyStat>();
|
let (aggregate_tx, aggregate_rx) = flume::unbounded::<Web3ProxyStat>();
|
||||||
|
|
||||||
// simple future that reads the channel and emits stats
|
|
||||||
let aggregate_f = {
|
|
||||||
let aggregated_proxy_responses = self.aggregated_proxy_responses.clone();
|
|
||||||
let clone = self.clone();
|
|
||||||
async move {
|
|
||||||
// TODO: select on shutdown handle so we can be sure to save every aggregate!
|
|
||||||
while let Ok(x) = aggregate_rx.recv_async().await {
|
|
||||||
trace!(?x, "aggregating stat");
|
|
||||||
|
|
||||||
// TODO: increment global stats (in redis? in local cache for prometheus?)
|
|
||||||
|
|
||||||
// TODO: batch stats? spawn this?
|
|
||||||
// TODO: where can we wait on this handle?
|
|
||||||
let clone = clone.clone();
|
|
||||||
tokio::spawn(async move { clone.aggregate_stat(x).await });
|
|
||||||
|
|
||||||
// no need to save manually. they save on expire
|
|
||||||
}
|
|
||||||
|
|
||||||
// shutting down. force a save
|
|
||||||
// we do not use invalidate_all because that is done on a background thread
|
|
||||||
for (key, _) in aggregated_proxy_responses.into_iter() {
|
|
||||||
aggregated_proxy_responses.invalidate(&key).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("stat aggregator exited");
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let save_f = {
|
|
||||||
let db_conn = self.db_conn.clone();
|
|
||||||
let save_rx = self.save_rx.clone();
|
|
||||||
let chain_id = self.chain_id;
|
|
||||||
async move {
|
|
||||||
while let Ok(x) = save_rx.recv_async().await {
|
|
||||||
// TODO: batch these
|
|
||||||
for (k, v) in x.into_iter() {
|
|
||||||
// TODO: try_unwrap()?
|
|
||||||
let (user_key_id, method, error_response) = k.as_ref();
|
|
||||||
|
|
||||||
info!(?user_key_id, ?method, ?error_response, "saving");
|
|
||||||
|
|
||||||
let first_timestamp = Utc.timestamp(v.first_timestamp as i64, 0);
|
|
||||||
let frontend_requests = v.frontend_requests.load(Ordering::Acquire);
|
|
||||||
let backend_requests = v.backend_requests.load(Ordering::Acquire);
|
|
||||||
let first_response_millis = v.first_response_millis;
|
|
||||||
let sum_request_bytes = v.sum_request_bytes.load(Ordering::Acquire);
|
|
||||||
let sum_response_millis = v.sum_response_millis.load(Ordering::Acquire);
|
|
||||||
let sum_response_bytes = v.sum_response_bytes.load(Ordering::Acquire);
|
|
||||||
|
|
||||||
let stat = rpc_accounting::ActiveModel {
|
|
||||||
user_key_id: sea_orm::Set(*user_key_id),
|
|
||||||
chain_id: sea_orm::Set(chain_id),
|
|
||||||
method: sea_orm::Set(method.clone()),
|
|
||||||
error_response: sea_orm::Set(*error_response),
|
|
||||||
first_timestamp: sea_orm::Set(first_timestamp),
|
|
||||||
frontend_requests: sea_orm::Set(frontend_requests)
|
|
||||||
backend_requests: sea_orm::Set(backend_requests),
|
|
||||||
first_query_millis: sea_orm::Set(first_query_millis),
|
|
||||||
sum_request_bytes: sea_orm::Set(sum_request_bytes),
|
|
||||||
sum_response_millis: sea_orm::Set(sum_response_millis),
|
|
||||||
sum_response_bytes: sea_orm::Set(sum_response_bytes),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO: if this fails, rever adding the user, too
|
|
||||||
if let Err(err) = stat
|
|
||||||
.save(&db_conn)
|
|
||||||
.await
|
|
||||||
.context("Saving rpc_accounting stat")
|
|
||||||
{
|
|
||||||
error!(?err, "unable to save aggregated stats");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("stat saver exited");
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO: join and flatten these handles
|
// TODO: join and flatten these handles
|
||||||
let aggregate_handle = tokio::spawn(aggregate_f);
|
let aggregate_handle = tokio::spawn(self.clone().aggregate_stats_loop(aggregate_rx));
|
||||||
let save_handle = tokio::spawn(save_f);
|
let save_handle = { tokio::spawn(self.save_stats_loop()) };
|
||||||
|
|
||||||
Ok((aggregate_tx, aggregate_handle, save_handle))
|
Ok((aggregate_tx, aggregate_handle, save_handle))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// simple future that reads the channel and aggregates stats in a local cache.
|
||||||
|
async fn aggregate_stats_loop(
|
||||||
|
self: Arc<Self>,
|
||||||
|
aggregate_rx: flume::Receiver<Web3ProxyStat>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
// TODO: select on shutdown handle so we can be sure to save every aggregate!
|
||||||
|
while let Ok(x) = aggregate_rx.recv_async().await {
|
||||||
|
trace!(?x, "aggregating stat");
|
||||||
|
|
||||||
|
// TODO: increment global stats (in redis? in local cache for prometheus?)
|
||||||
|
|
||||||
|
// TODO: batch stats? spawn this?
|
||||||
|
// TODO: where can we wait on this handle?
|
||||||
|
let clone = self.clone();
|
||||||
|
tokio::spawn(async move { clone.aggregate_stat(x).await });
|
||||||
|
|
||||||
|
// no need to save manually. they save on expire
|
||||||
|
}
|
||||||
|
|
||||||
|
// shutting down. force a save
|
||||||
|
// we do not use invalidate_all because that is done on a background thread
|
||||||
|
for (key, _) in self.aggregated_proxy_responses.into_iter() {
|
||||||
|
self.aggregated_proxy_responses.invalidate(&key).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("stat aggregator exited");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn save_stats_loop(self: Arc<Self>) -> anyhow::Result<()> {
|
||||||
|
while let Ok(x) = self.save_rx.recv_async().await {
|
||||||
|
// TODO: batch these
|
||||||
|
for (k, v) in x.into_iter() {
|
||||||
|
info!(?k, "saving");
|
||||||
|
|
||||||
|
let period_datetime = Utc.timestamp(v.period_timestamp as i64, 0);
|
||||||
|
let frontend_requests = v.frontend_requests.load(Ordering::Acquire);
|
||||||
|
let backend_requests = v.backend_requests.load(Ordering::Acquire);
|
||||||
|
let backend_retries = v.backend_retries.load(Ordering::Acquire);
|
||||||
|
let cache_misses = v.cache_misses.load(Ordering::Acquire);
|
||||||
|
let cache_hits = v.cache_hits.load(Ordering::Acquire);
|
||||||
|
|
||||||
|
let request_bytes = v.request_bytes.lock().await;
|
||||||
|
|
||||||
|
let sum_request_bytes = v.sum_request_bytes.load(Ordering::Acquire);
|
||||||
|
let min_request_bytes = request_bytes.min();
|
||||||
|
let mean_request_bytes = request_bytes.mean();
|
||||||
|
let p50_request_bytes = request_bytes.value_at_quantile(0.50);
|
||||||
|
let p90_request_bytes = request_bytes.value_at_quantile(0.90);
|
||||||
|
let p99_request_bytes = request_bytes.value_at_quantile(0.99);
|
||||||
|
let max_request_bytes = request_bytes.max();
|
||||||
|
|
||||||
|
drop(request_bytes);
|
||||||
|
|
||||||
|
let response_millis = v.response_millis.lock().await;
|
||||||
|
|
||||||
|
let sum_response_millis = v.sum_response_millis.load(Ordering::Acquire);
|
||||||
|
let min_response_millis = response_millis.min();
|
||||||
|
let mean_response_millis = response_millis.mean();
|
||||||
|
let p50_response_millis = response_millis.value_at_quantile(0.50);
|
||||||
|
let p90_response_millis = response_millis.value_at_quantile(0.90);
|
||||||
|
let p99_response_millis = response_millis.value_at_quantile(0.99);
|
||||||
|
let max_response_millis = response_millis.max();
|
||||||
|
|
||||||
|
drop(response_millis);
|
||||||
|
|
||||||
|
let response_bytes = v.response_bytes.lock().await;
|
||||||
|
|
||||||
|
let sum_response_bytes = v.sum_response_bytes.load(Ordering::Acquire);
|
||||||
|
let min_response_bytes = response_bytes.min();
|
||||||
|
let mean_response_bytes = response_bytes.mean();
|
||||||
|
let p50_response_bytes = response_bytes.value_at_quantile(0.50);
|
||||||
|
let p90_response_bytes = response_bytes.value_at_quantile(0.90);
|
||||||
|
let p99_response_bytes = response_bytes.value_at_quantile(0.99);
|
||||||
|
let max_response_bytes = response_bytes.max();
|
||||||
|
|
||||||
|
drop(response_bytes);
|
||||||
|
|
||||||
|
let stat = rpc_accounting::ActiveModel {
|
||||||
|
user_key_id: sea_orm::Set(k.user_key_id),
|
||||||
|
chain_id: sea_orm::Set(self.chain_id),
|
||||||
|
method: sea_orm::Set(k.method.clone()),
|
||||||
|
error_response: sea_orm::Set(k.error_response),
|
||||||
|
period_datetime: sea_orm::Set(period_datetime),
|
||||||
|
frontend_requests: sea_orm::Set(frontend_requests),
|
||||||
|
backend_requests: sea_orm::Set(backend_requests),
|
||||||
|
backend_retries: sea_orm::Set(backend_retries),
|
||||||
|
cache_misses: sea_orm::Set(cache_misses),
|
||||||
|
cache_hits: sea_orm::Set(cache_hits),
|
||||||
|
|
||||||
|
sum_request_bytes: sea_orm::Set(sum_request_bytes),
|
||||||
|
min_request_bytes: sea_orm::Set(min_request_bytes),
|
||||||
|
mean_request_bytes: sea_orm::Set(mean_request_bytes),
|
||||||
|
p50_request_bytes: sea_orm::Set(p50_request_bytes),
|
||||||
|
p90_request_bytes: sea_orm::Set(p90_request_bytes),
|
||||||
|
p99_request_bytes: sea_orm::Set(p99_request_bytes),
|
||||||
|
max_request_bytes: sea_orm::Set(max_request_bytes),
|
||||||
|
|
||||||
|
sum_response_millis: sea_orm::Set(sum_response_millis),
|
||||||
|
min_response_millis: sea_orm::Set(min_response_millis),
|
||||||
|
mean_response_millis: sea_orm::Set(mean_response_millis),
|
||||||
|
p50_response_millis: sea_orm::Set(p50_response_millis),
|
||||||
|
p90_response_millis: sea_orm::Set(p90_response_millis),
|
||||||
|
p99_response_millis: sea_orm::Set(p99_response_millis),
|
||||||
|
max_response_millis: sea_orm::Set(max_response_millis),
|
||||||
|
|
||||||
|
sum_response_bytes: sea_orm::Set(sum_response_bytes),
|
||||||
|
min_response_bytes: sea_orm::Set(min_response_bytes),
|
||||||
|
mean_response_bytes: sea_orm::Set(mean_response_bytes),
|
||||||
|
p50_response_bytes: sea_orm::Set(p50_response_bytes),
|
||||||
|
p90_response_bytes: sea_orm::Set(p90_response_bytes),
|
||||||
|
p99_response_bytes: sea_orm::Set(p99_response_bytes),
|
||||||
|
max_response_bytes: sea_orm::Set(max_response_bytes),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO: if this fails, rever adding the user, too
|
||||||
|
if let Err(err) = stat
|
||||||
|
.save(&self.db_conn)
|
||||||
|
.await
|
||||||
|
.context("Saving rpc_accounting stat")
|
||||||
|
{
|
||||||
|
error!(?err, "unable to save aggregated stats");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("stat saver exited");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn aggregate_stat(&self, stat: Web3ProxyStat) -> anyhow::Result<()> {
|
pub async fn aggregate_stat(&self, stat: Web3ProxyStat) -> anyhow::Result<()> {
|
||||||
trace!(?stat, "aggregating");
|
trace!(?stat, "aggregating");
|
||||||
match stat {
|
match stat {
|
||||||
Web3ProxyStat::ProxyResponse(x) => {
|
Web3ProxyStat::ProxyResponse(x) => {
|
||||||
// TODO: move this into another function?
|
// TODO: move this whole closure to another function?
|
||||||
|
// TODO: move period calculation into another function?
|
||||||
|
let period_timestamp =
|
||||||
|
x.metadata.timestamp / self.period_seconds * self.period_seconds;
|
||||||
|
|
||||||
// get the user cache for the current time bucket
|
// get the user cache for the current period
|
||||||
let time_bucket = x.metadata.timestamp / self.period_seconds;
|
|
||||||
let user_cache = self
|
let user_cache = self
|
||||||
.aggregated_proxy_responses
|
.aggregated_proxy_responses
|
||||||
.get_with(time_bucket, async move {
|
.get_with(period_timestamp, async move {
|
||||||
CacheBuilder::default()
|
CacheBuilder::default()
|
||||||
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new())
|
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new())
|
||||||
})
|
})
|
||||||
@ -217,20 +286,32 @@ impl StatEmitter {
|
|||||||
|
|
||||||
let error_response = x.metadata.error_response.load(Ordering::Acquire);
|
let error_response = x.metadata.error_response.load(Ordering::Acquire);
|
||||||
|
|
||||||
let key = (x.user_key_id, x.method, error_response);
|
let key = (x.user_key_id, x.method, error_response).into();
|
||||||
|
|
||||||
let timestamp = x.metadata.timestamp;
|
|
||||||
let response_millis = x.metadata.response_millis.load(Ordering::Acquire);
|
|
||||||
|
|
||||||
let user_aggregate = user_cache
|
let user_aggregate = user_cache
|
||||||
.get_with(key, async move {
|
.get_with(key, async move {
|
||||||
let last_timestamp = timestamp.into();
|
|
||||||
|
|
||||||
let aggregate = ProxyResponseAggregate {
|
let aggregate = ProxyResponseAggregate {
|
||||||
first_timestamp: timestamp,
|
period_timestamp,
|
||||||
first_response_millis: response_millis,
|
// start most things at 0 because we add outside this getter
|
||||||
last_timestamp,
|
frontend_requests: 0.into(),
|
||||||
..Default::default()
|
backend_requests: 0.into(),
|
||||||
|
backend_retries: 0.into(),
|
||||||
|
cache_misses: 0.into(),
|
||||||
|
cache_hits: 0.into(),
|
||||||
|
// TODO: how many significant figures?
|
||||||
|
request_bytes: AsyncMutex::new(
|
||||||
|
Histogram::new(5).expect("creating request_bytes histogram"),
|
||||||
|
),
|
||||||
|
sum_request_bytes: 0.into(),
|
||||||
|
response_bytes: AsyncMutex::new(
|
||||||
|
Histogram::new(5).expect("creating response_bytes histogram"),
|
||||||
|
),
|
||||||
|
sum_response_bytes: 0.into(),
|
||||||
|
// TODO: new_with_max here?
|
||||||
|
response_millis: AsyncMutex::new(
|
||||||
|
Histogram::new(5).expect("creating response_millis histogram"),
|
||||||
|
),
|
||||||
|
sum_response_millis: 0.into(),
|
||||||
};
|
};
|
||||||
|
|
||||||
Arc::new(aggregate)
|
Arc::new(aggregate)
|
||||||
@ -246,22 +327,43 @@ impl StatEmitter {
|
|||||||
.fetch_add(1, Ordering::Acquire);
|
.fetch_add(1, Ordering::Acquire);
|
||||||
|
|
||||||
let request_bytes = x.metadata.request_bytes.load(Ordering::Acquire);
|
let request_bytes = x.metadata.request_bytes.load(Ordering::Acquire);
|
||||||
|
|
||||||
|
let mut request_bytes_histogram = user_aggregate.request_bytes.lock().await;
|
||||||
|
|
||||||
|
// TODO: record_correct?
|
||||||
|
request_bytes_histogram.record(request_bytes)?;
|
||||||
|
|
||||||
|
drop(request_bytes_histogram);
|
||||||
|
|
||||||
user_aggregate
|
user_aggregate
|
||||||
.sum_request_bytes
|
.sum_request_bytes
|
||||||
.fetch_add(request_bytes, Ordering::Release);
|
.fetch_add(request_bytes, Ordering::Release);
|
||||||
|
|
||||||
let response_bytes = x.metadata.response_bytes.load(Ordering::Acquire);
|
let response_bytes = x.metadata.response_bytes.load(Ordering::Acquire);
|
||||||
|
|
||||||
|
let mut response_bytes_histogram = user_aggregate.response_bytes.lock().await;
|
||||||
|
|
||||||
|
// TODO: record_correct?
|
||||||
|
response_bytes_histogram.record(response_bytes)?;
|
||||||
|
|
||||||
|
drop(response_bytes_histogram);
|
||||||
|
|
||||||
user_aggregate
|
user_aggregate
|
||||||
.sum_response_bytes
|
.sum_response_bytes
|
||||||
.fetch_add(response_bytes, Ordering::Release);
|
.fetch_add(response_bytes, Ordering::Release);
|
||||||
|
|
||||||
|
let response_millis = x.metadata.response_millis.load(Ordering::Acquire);
|
||||||
|
|
||||||
|
let mut response_millis_histogram = user_aggregate.response_millis.lock().await;
|
||||||
|
|
||||||
|
// TODO: record_correct?
|
||||||
|
response_millis_histogram.record(response_millis)?;
|
||||||
|
|
||||||
|
drop(response_millis_histogram);
|
||||||
|
|
||||||
user_aggregate
|
user_aggregate
|
||||||
.sum_response_millis
|
.sum_response_millis
|
||||||
.fetch_add(response_millis, Ordering::Release);
|
.fetch_add(response_millis, Ordering::Release);
|
||||||
|
|
||||||
user_aggregate
|
|
||||||
.last_timestamp
|
|
||||||
.fetch_max(x.metadata.timestamp, Ordering::Release);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user