Split errors (#158)

* add migration for splitting errors

* fix type from u32 to u64

* update entities to match migrations

* no migration needed. these are only in stats

* add user_error_response to influx

* only if detailed

* set error_response and user_error_response

* 0 cost error responses

* only 33 migrations now

* put macros back

* get the stat buffer sender to the TestApp helper

* fixes
This commit is contained in:
Bryan Stitt 2023-07-05 18:18:10 -07:00 committed by GitHub
parent aee220b7ad
commit eb7b98fdbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 292 additions and 109 deletions

4
Cargo.lock generated

@ -1678,7 +1678,7 @@ dependencies = [
[[package]]
name = "entities"
version = "0.32.0"
version = "0.33.0"
dependencies = [
"ethers",
"sea-orm",
@ -3354,7 +3354,7 @@ dependencies = [
[[package]]
name = "migration"
version = "0.32.0"
version = "0.33.0"
dependencies = [
"sea-orm-migration",
"tokio",

@ -133,6 +133,7 @@ cargo install sea-orm-cli
3. `sea-orm-cli generate entity -u mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy -o entities/src --with-serde both`
- Be careful when adding the `--tables THE,MODIFIED,TABLES` flag. It will delete relationships if they aren't listed
4. After running the above, you will need to manually fix some things
- Add any derives that got removed (like `Default`)
- `Vec<u8>` -> `sea_orm::prelude::Uuid` (Related: <https://github.com/SeaQL/sea-query/issues/375>)
- `i8` -> `bool` (Related: <https://github.com/SeaQL/sea-orm/issues/924>)
- add all the tables back into `mod.rs`

@ -1,6 +1,6 @@
[package]
name = "entities"
version = "0.32.0"
version = "0.33.0"
edition = "2021"
[lib]

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
@ -10,7 +10,9 @@ pub struct Model {
pub id: i32,
pub caller: u64,
pub imitating_user: Option<u64>,
#[sea_orm(column_type = "Text")]
pub endpoint: String,
#[sea_orm(column_type = "Text")]
pub payload: String,
pub timestamp: DateTimeUtc,
}

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
use crate::serialization;
use sea_orm::entity::prelude::*;
@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
pub struct Model {
#[sea_orm(primary_key)]
pub id: u64,
#[sea_orm(unique)]
#[sea_orm(column_type = "Binary(BlobSize::Blob(Some(16)))", unique)]
#[serde(serialize_with = "serialization::uuid_as_ulid")]
pub bearer_token: Uuid,
pub user_id: u64,

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
pub mod prelude;

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
use crate::serialization;
use sea_orm::entity::prelude::*;
@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
pub struct Model {
#[sea_orm(primary_key)]
pub id: u64,
#[sea_orm(unique)]
#[sea_orm(column_type = "Binary(BlobSize::Blob(Some(16)))", unique)]
#[serde(serialize_with = "serialization::uuid_as_ulid")]
pub nonce: Uuid,
#[sea_orm(column_type = "Text")]

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
pub use super::admin::Entity as Admin;
pub use super::admin_increase_balance_receipt::Entity as AdminIncreaseBalanceReceipt;
@ -14,5 +14,6 @@ pub use super::rpc_accounting::Entity as RpcAccounting;
pub use super::rpc_accounting_v2::Entity as RpcAccountingV2;
pub use super::rpc_key::Entity as RpcKey;
pub use super::secondary_user::Entity as SecondaryUser;
pub use super::stripe_increase_balance_receipt::Entity as StripeIncreaseBalanceReceipt;
pub use super::user::Entity as User;
pub use super::user_tier::Entity as UserTier;

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
use super::sea_orm_active_enums::Method;
use crate::serialization;
@ -13,6 +13,7 @@ pub struct Model {
pub rpc_key_id: u64,
pub timestamp: DateTimeUtc,
pub method: Method,
#[sea_orm(column_type = "Binary(BlobSize::Blob(Some(20)))")]
#[serde(serialize_with = "serialization::vec_as_address")]
pub to: Vec<u8>,
#[sea_orm(column_type = "Text", nullable)]

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
@ -19,6 +19,7 @@ pub struct Model {
pub cache_hits: u64,
pub sum_request_bytes: u64,
pub min_request_bytes: u64,
#[sea_orm(column_type = "Double")]
pub mean_request_bytes: f64,
pub p50_request_bytes: u64,
pub p90_request_bytes: u64,
@ -26,6 +27,7 @@ pub struct Model {
pub max_request_bytes: u64,
pub sum_response_millis: u64,
pub min_response_millis: u64,
#[sea_orm(column_type = "Double")]
pub mean_response_millis: f64,
pub p50_response_millis: u64,
pub p90_response_millis: u64,
@ -33,6 +35,7 @@ pub struct Model {
pub max_response_millis: u64,
pub sum_response_bytes: u64,
pub min_response_bytes: u64,
#[sea_orm(column_type = "Double")]
pub mean_response_bytes: f64,
pub p50_response_bytes: u64,
pub p90_response_bytes: u64,

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
use crate::serialization;
use sea_orm::entity::prelude::*;
@ -10,7 +10,7 @@ pub struct Model {
#[sea_orm(primary_key)]
pub id: u64,
pub user_id: u64,
#[sea_orm(unique)]
#[sea_orm(column_type = "Binary(BlobSize::Blob(Some(16)))", unique)]
#[serde(serialize_with = "serialization::uuid_as_ulid")]
pub secret_key: Uuid,
pub description: Option<String>,
@ -24,6 +24,7 @@ pub struct Model {
pub allowed_referers: Option<String>,
#[sea_orm(column_type = "Text", nullable)]
pub allowed_user_agents: Option<String>,
#[sea_orm(column_type = "Double")]
pub log_revert_chance: f64,
}

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
use super::sea_orm_active_enums::Role;
use sea_orm::entity::prelude::*;

@ -1,15 +1,15 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
use sea_orm::entity::prelude::*;
use serde::Serialize;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "stripe_increase_balance_receipt")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: u64,
pub deposit_to_user_id: Option<u64>,
pub stripe_payment_intend_id: String,
pub deposit_to_user_id: Option<u64>,
#[sea_orm(column_type = "Decimal(Some((20, 10)))")]
pub amount: Decimal,
pub currency: String,

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
use crate::serialization;
use sea_orm::entity::prelude::*;
@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
pub struct Model {
#[sea_orm(primary_key)]
pub id: u64,
#[sea_orm(unique)]
#[sea_orm(column_type = "Binary(BlobSize::Blob(Some(20)))", unique)]
#[serde(serialize_with = "serialization::vec_as_address")]
pub address: Vec<u8>,
pub description: Option<String>,
@ -19,14 +19,28 @@ pub struct Model {
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(has_many = "super::login::Entity")]
Login,
#[sea_orm(has_many = "super::rpc_key::Entity")]
RpcKey,
#[sea_orm(has_one = "super::admin::Entity")]
Admin,
#[sea_orm(has_many = "super::admin_increase_balance_receipt::Entity")]
AdminIncreaseBalanceReceipt,
#[sea_orm(has_one = "super::balance::Entity")]
Balance,
#[sea_orm(has_many = "super::increase_on_chain_balance_receipt::Entity")]
IncreaseOnChainBalanceReceipt,
#[sea_orm(has_many = "super::login::Entity")]
Login,
#[sea_orm(has_many = "super::pending_login::Entity")]
PendingLogin,
#[sea_orm(has_one = "super::referee::Entity")]
Referee,
#[sea_orm(has_one = "super::referrer::Entity")]
Referrer,
#[sea_orm(has_many = "super::rpc_key::Entity")]
RpcKey,
#[sea_orm(has_many = "super::secondary_user::Entity")]
SecondaryUser,
#[sea_orm(has_many = "super::stripe_increase_balance_receipt::Entity")]
StripeIncreaseBalanceReceipt,
#[sea_orm(
belongs_to = "super::user_tier::Entity",
from = "Column::UserTierId",
@ -37,15 +51,21 @@ pub enum Relation {
UserTier,
}
impl Related<super::login::Entity> for Entity {
impl Related<super::admin::Entity> for Entity {
fn to() -> RelationDef {
Relation::Login.def()
Relation::Admin.def()
}
}
impl Related<super::rpc_key::Entity> for Entity {
impl Related<super::admin_increase_balance_receipt::Entity> for Entity {
fn to() -> RelationDef {
Relation::RpcKey.def()
Relation::AdminIncreaseBalanceReceipt.def()
}
}
impl Related<super::balance::Entity> for Entity {
fn to() -> RelationDef {
Relation::Balance.def()
}
}
@ -55,12 +75,48 @@ impl Related<super::increase_on_chain_balance_receipt::Entity> for Entity {
}
}
impl Related<super::login::Entity> for Entity {
fn to() -> RelationDef {
Relation::Login.def()
}
}
impl Related<super::pending_login::Entity> for Entity {
fn to() -> RelationDef {
Relation::PendingLogin.def()
}
}
impl Related<super::referee::Entity> for Entity {
fn to() -> RelationDef {
Relation::Referee.def()
}
}
impl Related<super::referrer::Entity> for Entity {
fn to() -> RelationDef {
Relation::Referrer.def()
}
}
impl Related<super::rpc_key::Entity> for Entity {
fn to() -> RelationDef {
Relation::RpcKey.def()
}
}
impl Related<super::secondary_user::Entity> for Entity {
fn to() -> RelationDef {
Relation::SecondaryUser.def()
}
}
impl Related<super::stripe_increase_balance_receipt::Entity> for Entity {
fn to() -> RelationDef {
Relation::StripeIncreaseBalanceReceipt.def()
}
}
impl Related<super::user_tier::Entity> for Entity {
fn to() -> RelationDef {
Relation::UserTier.def()

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

@ -1,6 +1,6 @@
[package]
name = "migration"
version = "0.32.0"
version = "0.33.0"
edition = "2021"
publish = false

@ -33,6 +33,7 @@ mod m20230615_221201_handle_payment_uncles;
mod m20230618_230611_longer_payload;
mod m20230619_172237_default_tracking;
mod m20230622_104142_stripe_deposits;
mod m20230705_214013_type_fixes;
pub struct Migrator;
@ -73,6 +74,7 @@ impl MigratorTrait for Migrator {
Box::new(m20230618_230611_longer_payload::Migration),
Box::new(m20230619_172237_default_tracking::Migration),
Box::new(m20230622_104142_stripe_deposits::Migration),
Box::new(m20230705_214013_type_fixes::Migration),
]
}
}

@ -0,0 +1,46 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Replace the sample below with your own migration scripts
manager
.alter_table(
Table::alter()
.table(IncreaseOnChainBalanceReceipt::Table)
.modify_column(
ColumnDef::new(IncreaseOnChainBalanceReceipt::LogIndex)
.big_unsigned()
.not_null(),
)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(IncreaseOnChainBalanceReceipt::Table)
.modify_column(
ColumnDef::new(IncreaseOnChainBalanceReceipt::LogIndex)
.big_integer()
.unsigned()
.not_null(),
)
.to_owned(),
)
.await
}
}
/// Learn more at https://docs.rs/sea-query#iden
#[derive(Iden)]
enum IncreaseOnChainBalanceReceipt {
Table,
LogIndex,
}

@ -27,7 +27,6 @@ use anyhow::Context;
use axum::http::StatusCode;
use chrono::Utc;
use deferred_rate_limiter::DeferredRateLimiter;
use derive_more::From;
use entities::user;
use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U64};
@ -165,7 +164,6 @@ pub async fn flatten_handles<T>(
}
/// starting an app creates many tasks
#[derive(From)]
pub struct Web3ProxyAppSpawn {
/// the app. probably clone this to use in other groups of handles
pub app: Arc<Web3ProxyApp>,
@ -187,6 +185,7 @@ impl Web3ProxyApp {
top_config: TopConfig,
num_workers: usize,
shutdown_sender: broadcast::Sender<()>,
flush_stat_buffer_receiver: broadcast::Receiver<()>
) -> anyhow::Result<Web3ProxyAppSpawn> {
let stat_buffer_shutdown_receiver = shutdown_sender.subscribe();
let mut background_shutdown_receiver = shutdown_sender.subscribe();
@ -399,6 +398,7 @@ impl Web3ProxyApp {
Some(user_balance_cache.clone()),
stat_buffer_shutdown_receiver,
1,
flush_stat_buffer_receiver,
)? {
// since the database entries are used for accounting, we want to be sure everything is saved before exiting
important_background_handles.push(spawned_stat_buffer.background_handle);
@ -653,14 +653,13 @@ impl Web3ProxyApp {
important_background_handles.push(f);
}
Ok((
Ok(Web3ProxyAppSpawn {
app,
app_handles,
important_background_handles,
new_top_config_sender,
consensus_connections_watcher,
)
.into())
background_handles: important_background_handles,
new_top_config: new_top_config_sender,
ranked_rpcs: consensus_connections_watcher,
})
}
pub async fn apply_top_config(&self, new_top_config: TopConfig) -> Web3ProxyResult<()> {

@ -135,8 +135,16 @@ impl ComputeUnit {
/// Compute cost per request
/// All methods cost the same
/// The number of bytes are based on input, and output bytes
pub fn cost(&self, archive_request: bool, cache_hit: bool, usd_per_cu: Decimal) -> Decimal {
// TODO: server errors are free. need to split server and user errors
pub fn cost(
&self,
archive_request: bool,
cache_hit: bool,
error_response: bool,
usd_per_cu: Decimal,
) -> Decimal {
if error_response {
return 0.into();
}
let mut cost = self.0 * usd_per_cu;

@ -14,6 +14,7 @@ use axum::headers::{Header, Origin, Referer, UserAgent};
use chrono::Utc;
use core::fmt;
use deferred_rate_limiter::DeferredRateLimitResult;
use derivative::Derivative;
use derive_more::From;
use entities::{balance, login, rpc_key, user, user_tier};
use ethers::types::{Bytes, U64};
@ -309,7 +310,8 @@ impl KafkaDebugLogger {
}
}
#[derive(Debug)]
#[derive(Debug, Derivative)]
#[derivative(Default)]
pub struct RequestMetadata {
/// TODO: set archive_request during the new instead of after
/// TODO: this is more complex than "requires a block older than X height". different types of data can be pruned differently
@ -329,7 +331,8 @@ pub struct RequestMetadata {
/// Instant that the request was received (or at least close to it)
/// We use Instant and not timestamps to avoid problems with leap seconds and similar issues
pub start_instant: tokio::time::Instant,
#[derivative(Default(value = "Instant::now()"))]
pub start_instant: Instant,
/// if this is empty, there was a cache_hit
/// otherwise, it is populated with any rpc servers that were used by this request
pub backend_requests: BackendRequests,
@ -348,6 +351,8 @@ pub struct RequestMetadata {
/// True if the response required querying a backup RPC
/// RPC aggregators that query multiple providers to compare response may use this header to ignore our response.
pub response_from_backup_rpc: AtomicBool,
/// If the request is invalid or received a jsonrpc error response (excluding reverts)
pub user_error_response: AtomicBool,
/// ProxyMode::Debug logs requests and responses with Kafka
/// TODO: maybe this shouldn't be determined by ProxyMode. A request param should probably enable this
@ -363,30 +368,6 @@ impl Default for Authorization {
}
}
/// this is only implemented so that we can use `mem::take`. You probably shouldn't use this.
impl Default for RequestMetadata {
fn default() -> Self {
Self {
archive_request: Default::default(),
authorization: Default::default(),
backend_requests: Default::default(),
chain_id: Default::default(),
error_response: Default::default(),
kafka_debug_logger: Default::default(),
method: Default::default(),
no_servers: Default::default(),
request_bytes: Default::default(),
request_ulid: Default::default(),
response_bytes: Default::default(),
response_from_backup_rpc: Default::default(),
response_millis: Default::default(),
response_timestamp: Default::default(),
start_instant: Instant::now(),
stat_sender: Default::default(),
}
}
}
impl RequestMetadata {
pub fn proxy_mode(&self) -> ProxyMode {
self.authorization
@ -531,6 +512,7 @@ impl RequestMetadata {
response_timestamp: 0.into(),
start_instant: Instant::now(),
stat_sender: app.stat_sender.clone(),
user_error_response: false.into(),
};
Arc::new(x)

@ -852,9 +852,11 @@ impl Web3Rpcs {
request_metadata
.response_from_backup_rpc
.store(is_backup_response, Ordering::Release);
}
if let Some(request_metadata) = request_metadata {
request_metadata
.user_error_response
.store(false, Ordering::Release);
request_metadata
.error_response
.store(false, Ordering::Release);
@ -863,21 +865,29 @@ impl Web3Rpcs {
return Ok(response);
}
Err(error) => {
// trace!(?response, "rpc error");
// TODO: separate tracking for jsonrpc error and web3 proxy error!
if let Some(request_metadata) = request_metadata {
request_metadata
.error_response
.store(true, Ordering::Release);
}
// TODO: if this is an error, do NOT return. continue to try on another server
let error = match JsonRpcErrorData::try_from(&error) {
Ok(x) => x,
Ok(x) => {
if let Some(request_metadata) = request_metadata {
request_metadata
.user_error_response
.store(true, Ordering::Release);
}
x
}
Err(err) => {
warn!(?err, "error from {}", rpc);
if let Some(request_metadata) = request_metadata {
request_metadata
.error_response
.store(true, Ordering::Release);
request_metadata
.user_error_response
.store(false, Ordering::Release);
}
last_provider_error = Some(error);
continue;
@ -1012,19 +1022,27 @@ impl Web3Rpcs {
}
}
OpenRequestResult::NotReady => {
if let Some(request_metadata) = request_metadata {
request_metadata
.error_response
.store(true, Ordering::Release);
}
break;
}
}
}
// TODO: do we need this here, or do we do it somewhere else? like, the code could change and a try operator in here would skip this increment
if let Some(request_metadata) = request_metadata {
request_metadata
.error_response
.store(true, Ordering::Release);
}
if let Some(err) = method_not_available_response {
if let Some(request_metadata) = request_metadata {
request_metadata
.error_response
.store(false, Ordering::Release);
request_metadata
.user_error_response
.store(true, Ordering::Release);
}
// this error response is likely the user's fault
// TODO: emit a stat for unsupported methods. then we can know what there is demand for or if we are missing a feature
return Err(err.into());

@ -263,6 +263,7 @@ pub async fn query_user_stats<'a>(
"error_response",
"method",
"rpc_secret_key_id",
"user_error_response",
]"#
}
};
@ -332,7 +333,7 @@ pub async fn query_user_stats<'a>(
// TODO: lower log level
debug!("Raw query to db is: {:#}", query);
let query = Query::new(query.to_string());
trace!("Query to db is: {:#?}", query);
trace!(?query, "influx");
// Make the query and collect all data
let raw_influx_responses: Vec<FluxRecord> = influxdb_client
@ -575,7 +576,25 @@ pub async fn query_user_stats<'a>(
);
}
_ => {
error!("error_response should always be a Long!");
error!("error_response should always be a String!");
}
}
} else if stat_response_type == StatType::Detailed && key == "user_error_response" {
match value {
influxdb2_structmap::value::Value::String(inner) => {
out.insert(
"user_error_response",
if inner == "true" {
serde_json::Value::Bool(true)
} else if inner == "false" {
serde_json::Value::Bool(false)
} else {
serde_json::Value::String("error".to_owned())
},
);
}
_ => {
error!("user_error_response should always be a String!");
}
}
}

@ -29,7 +29,7 @@ use std::borrow::Cow;
use std::mem;
use std::num::NonZeroU64;
use std::str::FromStr;
use std::sync::atomic::{self, Ordering};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tracing::trace;
@ -61,6 +61,8 @@ pub struct RpcQueryStats {
/// The cost of the query in USD
/// If the user is on a free tier, this is still calculated so we know how much we are giving away.
pub compute_unit_cost: Decimal,
/// If the request is invalid or received a jsonrpc error response (excluding reverts)
pub user_error_response: bool,
}
#[derive(Clone, Debug, From, Hash, PartialEq, Eq)]
@ -71,11 +73,13 @@ pub struct RpcQueryKey {
response_timestamp: i64,
/// true if an archive server was needed to serve the request.
archive_needed: bool,
/// true if the response was some sort of JSONRPC error.
/// true if the response was some sort of application error.
error_response: bool,
/// true if the response was some sort of JSONRPC error.
user_error_response: bool,
/// the rpc method used.
method: Cow<'static, str>,
/// origin tracking was opt-in. Now it is "None"
/// origin tracking **was** opt-in. Now, it is always "None"
origin: Option<Origin>,
/// None if the public url was used.
rpc_secret_key_id: Option<NonZeroU64>,
@ -103,6 +107,9 @@ impl RpcQueryStats {
// we used to optionally store origin, but wallets don't set it, so its almost always None
let origin = None;
// user_error_response is always set to false because we don't bother tracking this in the database
let user_error_response = false;
// Depending on method, add some arithmetic around calculating credits_used
// I think balance should not go here, this looks more like a key thingy
RpcQueryKey {
@ -113,6 +120,7 @@ impl RpcQueryStats {
rpc_secret_key_id,
rpc_key_user_id: self.authorization.checks.user_id.try_into().ok(),
origin,
user_error_response,
}
}
@ -133,6 +141,7 @@ impl RpcQueryStats {
method,
rpc_secret_key_id,
rpc_key_user_id: self.authorization.checks.user_id.try_into().ok(),
user_error_response: self.user_error_response,
origin,
}
}
@ -151,6 +160,7 @@ impl RpcQueryStats {
method,
rpc_secret_key_id: self.authorization.checks.rpc_secret_key_id,
rpc_key_user_id: self.authorization.checks.user_id.try_into().ok(),
user_error_response: self.user_error_response,
origin,
};
@ -732,6 +742,7 @@ impl BufferedRpcQueryStats {
builder = builder
.tag("archive_needed", key.archive_needed.to_string())
.tag("error_response", key.error_response.to_string())
.tag("user_error_response", key.user_error_response.to_string())
.field("frontend_requests", self.frontend_requests as i64)
.field("backend_requests", self.backend_requests as i64)
.field("no_servers", self.no_servers as i64)
@ -784,9 +795,11 @@ impl TryFrom<RequestMetadata> for RpcQueryStats {
let response_bytes = metadata.response_bytes.load(Ordering::Acquire);
let mut error_response = metadata.error_response.load(Ordering::Acquire);
let mut response_millis = metadata.response_millis.load(atomic::Ordering::Acquire);
let mut response_millis = metadata.response_millis.load(Ordering::Acquire);
let response_timestamp = match metadata.response_timestamp.load(atomic::Ordering::Acquire) {
let user_error_response = metadata.user_error_response.load(Ordering::Acquire);
let response_timestamp = match metadata.response_timestamp.load(Ordering::Acquire) {
0 => {
// no response timestamp!
if !error_response {
@ -820,7 +833,7 @@ impl TryFrom<RequestMetadata> for RpcQueryStats {
let cache_hit = !backend_rpcs_used.is_empty();
let compute_unit_cost = cu.cost(archive_request, cache_hit, usd_per_cu);
let compute_unit_cost = cu.cost(archive_request, cache_hit, error_response, usd_per_cu);
let method = mem::take(&mut metadata.method);
@ -836,6 +849,7 @@ impl TryFrom<RequestMetadata> for RpcQueryStats {
response_bytes,
response_millis,
response_timestamp,
user_error_response,
};
Ok(x)

@ -64,6 +64,7 @@ impl StatBuffer {
user_balance_cache: Option<UserBalanceCache>,
shutdown_receiver: broadcast::Receiver<()>,
tsdb_save_interval_seconds: u32,
flush_receiver: broadcast::Receiver<()>,
) -> anyhow::Result<Option<SpawnedStatBuffer>> {
if db_conn.is_none() && influxdb_client.is_none() {
return Ok(None);
@ -89,7 +90,7 @@ impl StatBuffer {
// any errors inside this task will cause the application to exit
let handle = tokio::spawn(async move {
new.aggregate_and_save_loop(bucket, stat_receiver, shutdown_receiver)
new.aggregate_and_save_loop(bucket, stat_receiver, shutdown_receiver, flush_receiver)
.await
});
@ -101,6 +102,7 @@ impl StatBuffer {
bucket: String,
stat_receiver: flume::Receiver<AppStat>,
mut shutdown_receiver: broadcast::Receiver<()>,
mut flush_receiver: broadcast::Receiver<()>,
) -> Web3ProxyResult<()> {
let mut tsdb_save_interval =
interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64));
@ -150,6 +152,19 @@ impl StatBuffer {
trace!("Saved {} stats to the tsdb", count);
}
}
_ = flush_receiver.recv() => {
trace!("flush");
let count = self.save_tsdb_stats(&bucket).await;
if count > 0 {
trace!("Flushed {} stats to the tsdb", count);
}
let count = self.save_relational_stats().await;
if count > 0 {
trace!("Flushed {} stats to the relational db", count);
}
}
x = shutdown_receiver.recv() => {
match x {
Ok(_) => {

@ -72,6 +72,8 @@ impl MigrateStatsToV2SubCommand {
None => None,
};
let (_flush_sender, flush_receiver) = broadcast::channel(1);
// Spawn the stat-sender
let emitter_spawn = StatBuffer::try_spawn(
BILLING_PERIOD_SECONDS,
@ -88,6 +90,7 @@ impl MigrateStatsToV2SubCommand {
None,
rpc_account_shutdown_recevier,
1,
flush_receiver,
)
.context("Error spawning stat buffer")?
.context("No stat buffer spawned. Maybe missing influx or db credentials?")?;
@ -203,6 +206,7 @@ impl MigrateStatsToV2SubCommand {
start_instant: Instant::now(),
stat_sender: Some(stat_sender.clone()),
request_ulid,
user_error_response: false.into(),
};
if let Some(x) = request_metadata.try_send_stat()? {

@ -42,6 +42,7 @@ impl ProxydSubCommand {
let frontend_port = Arc::new(self.port.into());
let prometheus_port = Arc::new(self.prometheus_port.into());
let (flush_stat_buffer_sender, _) = broadcast::channel(1);
Self::_main(
top_config,
@ -50,6 +51,7 @@ impl ProxydSubCommand {
prometheus_port,
num_workers,
shutdown_sender,
flush_stat_buffer_sender,
)
.await
}
@ -62,14 +64,11 @@ impl ProxydSubCommand {
prometheus_port: Arc<AtomicU16>,
num_workers: usize,
frontend_shutdown_sender: broadcast::Sender<()>,
flush_stat_buffer_sender: broadcast::Sender<()>,
) -> anyhow::Result<()> {
// tokio has code for catching ctrl+c so we use that
// this shutdown sender is currently only used in tests, but we might make a /shutdown endpoint or something
// tokio has code for catching ctrl+c so we use that to shut down in most cases
// frontend_shutdown_sender is currently only used in tests, but we might make a /shutdown endpoint or something
// we do not need this receiver. new receivers are made by `shutdown_sender.subscribe()`
// TODO: should we use a watch or broadcast for these?
// Maybe this one ?
// let mut shutdown_receiver = shutdown_sender.subscribe();
let (app_shutdown_sender, _app_shutdown_receiver) = broadcast::channel(1);
let frontend_shutdown_receiver = frontend_shutdown_sender.subscribe();
@ -86,6 +85,7 @@ impl ProxydSubCommand {
top_config.clone(),
num_workers,
app_shutdown_sender.clone(),
flush_stat_buffer_sender.subscribe(),
)
.await?;

@ -54,6 +54,9 @@ pub struct TestApp {
/// connection to the proxy that is connected to anil.
pub proxy_provider: Provider<Http>,
/// tell the app to flush stats to the database
flush_stat_buffer_sender: broadcast::Sender<()>,
/// tell the app to shut down (use `self.stop()`).
shutdown_sender: broadcast::Sender<()>,
}
@ -269,6 +272,8 @@ impl TestApp {
let frontend_port_arc = Arc::new(AtomicU16::new(0));
let prometheus_port_arc = Arc::new(AtomicU16::new(0));
let (flush_stat_buffer_sender, _flush_stat_buffer_receiver) = broadcast::channel(1);
// spawn the app
// TODO: spawn in a thread so we can run from non-async tests and so the Drop impl can wait for it to stop
let handle = {
@ -279,6 +284,7 @@ impl TestApp {
prometheus_port_arc,
num_workers,
shutdown_sender.clone(),
flush_stat_buffer_sender.clone(),
))
};
@ -304,6 +310,7 @@ impl TestApp {
db,
proxy_handle: Mutex::new(Some(handle)),
proxy_provider,
flush_stat_buffer_sender,
shutdown_sender,
}
}
@ -313,6 +320,10 @@ impl TestApp {
self.db.as_ref().unwrap().conn.as_ref().unwrap()
}
pub fn flush_stats(&self) {
self.flush_stat_buffer_sender.send(()).unwrap();
}
pub fn stop(&self) -> Result<usize, SendError<()>> {
self.shutdown_sender.send(())
}