Bryan devel 2023-05-12 (#67)

* add minor todo

* BadRequest instead of web3_context

* more bad request error codes

* use tokio-uring for the tcp listener

* clear block instead of panic

* clone earlier

* more watch channels instead of rwlocks

* drop uring for now (its single threaded) and combine get/post/put routes

* clean up iter vs into_iter and unnecessary collect

* arcswap instead of rwlock for Web3Rpcs.by_name

* cargo upgrade

* uuid fast-rng and alphabetize

* if protected rpcs, only use protected rpcs

* listenfd

* make connectinfo optional

* try_get_with_by_ref instead of try_get_with

* anyhow ensure. and try_get_with_as_ref isn't actually needed

* fix feature flags

* more refs and less clone

* automatic retry for eth_getTransactionReceipt and eth_getTransactionByHash

thanks for the report Lefteris @ Rotki

* ArcSwap for provider

* set archive_request to true on transaction retrying

* merge durable stats

* Revert "ArcSwap for provider"

This reverts commit 166d77f204cde9fa7722c0cefecbb27008749d47.

* comments

* less clones

* more refs

* fix test

* add optional mimalloc feature

* remove stale dependency

* sort

* cargo upgrade

* lint constants

* add todo

* another todo

* lint

* anyhow::ensure instead of panic

* allow rpc_accounting_v2 entries for requests without an rpc key
This commit is contained in:
Bryan Stitt 2023-05-12 15:15:32 -07:00 committed by GitHub
parent 34ed450fab
commit 8a097dabbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 2001 additions and 1383 deletions

52
Cargo.lock generated

@ -114,6 +114,12 @@ dependencies = [
"backtrace",
]
[[package]]
name = "arc-swap"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
[[package]]
name = "argh"
version = "0.1.10"
@ -1576,7 +1582,7 @@ dependencies = [
[[package]]
name = "entities"
version = "0.17.0"
version = "0.27.0"
dependencies = [
"ethers",
"sea-orm",
@ -3041,6 +3047,16 @@ version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb"
[[package]]
name = "libmimalloc-sys"
version = "0.1.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4ac0e912c8ef1b735e92369695618dc5b1819f5a7bf3f167301a3ba1cea515e"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "libz-sys"
version = "1.1.9"
@ -3068,6 +3084,17 @@ version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ece97ea872ece730aed82664c424eb4c8291e1ff2480247ccf7409044bc6479f"
[[package]]
name = "listenfd"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0500463acd96259d219abb05dc57e5a076ef04b2db9a2112846929b5f174c96"
dependencies = [
"libc",
"uuid 1.3.2",
"winapi",
]
[[package]]
name = "lock_api"
version = "0.4.9"
@ -3143,12 +3170,21 @@ dependencies = [
[[package]]
name = "migration"
version = "0.19.0"
version = "0.27.0"
dependencies = [
"sea-orm-migration",
"tokio",
]
[[package]]
name = "mimalloc"
version = "0.1.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e2894987a3459f3ffb755608bd82188f8ed00d0ae077f1edea29c068d639d98"
dependencies = [
"libmimalloc-sys",
]
[[package]]
name = "mime"
version = "0.3.17"
@ -4607,12 +4643,6 @@ version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
name = "rustc-hash"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc-hex"
version = "2.1.0"
@ -6609,9 +6639,10 @@ dependencies = [
[[package]]
name = "web3_proxy"
version = "0.17.0"
version = "0.27.0"
dependencies = [
"anyhow",
"arc-swap",
"argh",
"axum",
"axum-client-ip",
@ -6643,8 +6674,10 @@ dependencies = [
"ipnet",
"itertools",
"latency",
"listenfd",
"log",
"migration",
"mimalloc",
"moka",
"num",
"num-traits",
@ -6659,7 +6692,6 @@ dependencies = [
"regex",
"reqwest",
"rmp-serde",
"rustc-hash",
"sentry",
"serde",
"serde_json",

@ -32,17 +32,19 @@ RUN apt-get update && \
# copy the application
COPY . .
ENV WEB3_PROXY_FEATURES "rdkafka-src"
# test the application with cargo-nextest
RUN --mount=type=cache,target=/usr/local/cargo/registry \
--mount=type=cache,target=/app/target \
cargo nextest run --features "rdkafka-src tokio-uring" --no-default-features
cargo nextest run --features "$WEB3_PROXY_FEATURES" --no-default-features
# build the application
# using a "release" profile (which install does) is **very** important
RUN --mount=type=cache,target=/usr/local/cargo/registry \
--mount=type=cache,target=/app/target \
cargo install \
--features "rdkafka-src tokio-uring" \
--features "$WEB3_PROXY_FEATURES" \
--locked \
--no-default-features \
--path ./web3_proxy \

@ -79,7 +79,9 @@ web3_proxy_cli health_compass https://eth.llamarpc.com https://eth-ski.llamarpc.
### Run migrations
This is only really useful during development. The migrations run on application start.
Generally it is simplest to just run the app to run migrations. It runs migrations on start.
But if you want to run them manually (generally only useful in development):
```
cd migration

@ -89,7 +89,7 @@ where
// set arc_deferred_rate_limit_result and return the coun
self.local_cache
.get_with(key, async move {
.get_with_by_ref(&key, async move {
// we do not use the try operator here because we want to be okay with redis errors
let redis_count = match rrl
.throttle_label(&redis_key, Some(max_requests_per_period), count)
@ -110,7 +110,7 @@ where
count
}
Ok(RedisRateLimitResult::RetryNever) => {
panic!("RetryNever shouldn't happen")
unreachable!();
}
Err(err) => {
let _ = deferred_rate_limit_result

@ -1,6 +1,6 @@
[package]
name = "entities"
version = "0.17.0"
version = "0.27.0"
edition = "2021"
[lib]

@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};
pub struct Model {
#[sea_orm(primary_key)]
pub id: u64,
pub rpc_key_id: u64,
pub rpc_key_id: Option<u64>,
pub chain_id: u64,
pub period_datetime: DateTimeUtc,
pub archive_needed: bool,

@ -54,26 +54,27 @@ fn from_bits(bits: u64) -> [f32; 2] {
#[cfg(test)]
mod tests {
use std::f32;
use std::sync::atomic::Ordering;
use super::{from_bits, to_bits, AtomicF32Pair};
#[test]
fn test_f32_pair_bit_conversions() {
let pair = [3.14159, 2.71828];
let pair = [f32::consts::PI, f32::consts::E];
assert_eq!(pair, from_bits(to_bits(pair)));
}
#[test]
fn test_atomic_f32_pair_load() {
let pair = [3.14159, 2.71828];
let pair = [f32::consts::PI, f32::consts::E];
let atomic = AtomicF32Pair::new(pair);
assert_eq!(pair, atomic.load(Ordering::Relaxed));
}
#[test]
fn test_atomic_f32_pair_fetch_update() {
let pair = [3.14159, 2.71828];
let pair = [f32::consts::PI, f32::consts::E];
let atomic = AtomicF32Pair::new(pair);
atomic
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |[f1, f2]| {

@ -1,6 +1,6 @@
[package]
name = "migration"
version = "0.19.0"
version = "0.27.0"
edition = "2021"
publish = false

@ -26,6 +26,7 @@ mod m20230221_230953_track_spend;
mod m20230412_171916_modify_secondary_user_add_primary_user;
mod m20230422_172555_premium_downgrade_logic;
mod m20230511_161214_remove_columns_statsv2_origin_and_method;
mod m20230512_220213_allow_null_rpc_key_id_in_stats_v2;
pub struct Migrator;
@ -59,6 +60,7 @@ impl MigratorTrait for Migrator {
Box::new(m20230412_171916_modify_secondary_user_add_primary_user::Migration),
Box::new(m20230422_172555_premium_downgrade_logic::Migration),
Box::new(m20230511_161214_remove_columns_statsv2_origin_and_method::Migration),
Box::new(m20230512_220213_allow_null_rpc_key_id_in_stats_v2::Migration),
]
}
}

@ -92,7 +92,6 @@ impl MigrationTrait for Migration {
)
.await?;
// rename column rpc_key to rpc_secret_key
Ok(())
}

@ -6,7 +6,6 @@ pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Replace the sample below with your own migration scripts
manager
.create_table(
Table::create()
@ -36,7 +35,6 @@ impl MigrationTrait for Migration {
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Replace the sample below with your own migration scripts
manager
.drop_table(Table::drop().table(Admin::Table).to_owned())
.await

@ -6,7 +6,6 @@ pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Replace the sample below with your own migration scripts
manager
.create_table(
Table::create()
@ -48,7 +47,6 @@ impl MigrationTrait for Migration {
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Replace the sample below with your own migration scripts
manager
.drop_table(Table::drop().table(Balance::Table).to_owned())
.await

@ -68,7 +68,6 @@ impl MigrationTrait for Migration {
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Replace the sample below with your own migration scripts
manager
.drop_table(
Table::drop()

@ -49,7 +49,6 @@ impl MigrationTrait for Migration {
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Replace the sample below with your own migration scripts
manager
.drop_table(Table::drop().table(AdminTrail::Table).to_owned())
.await

@ -22,7 +22,6 @@ impl MigrationTrait for Migration {
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Replace the sample below with your own migration scripts
manager
.alter_table(
sea_query::Table::alter()

@ -7,8 +7,6 @@ pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Replace the sample below with your own migration scripts
// Add a column "downgrade_tier_id"
// It is a "foreign key" that references other items in this table
manager
@ -18,7 +16,6 @@ impl MigrationTrait for Migration {
.add_column(ColumnDef::new(UserTier::DowngradeTierId).big_unsigned())
.add_foreign_key(
TableForeignKey::new()
.to_tbl(UserTier::Table)
.to_tbl(UserTier::Table)
.from_col(UserTier::DowngradeTierId)
.to_col(UserTier::Id),
@ -83,8 +80,6 @@ impl MigrationTrait for Migration {
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Replace the sample below with your own migration scripts
// Remove the two tiers that you just added
// And remove the column you just added
let db_conn = manager.get_connection();

@ -44,7 +44,6 @@ impl MigrationTrait for Migration {
#[derive(Iden)]
enum RpcAccountingV2 {
Table,
Id,
Origin,
Method,
}

@ -0,0 +1,48 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
sea_query::Table::alter()
.table(RpcAccountingV2::Table)
.to_owned()
// allow rpc_key_id to be null. Needed for public rpc stat tracking
.modify_column(
ColumnDef::new(RpcAccountingV2::RpcKeyId)
.big_unsigned()
.null(),
)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
sea_query::Table::alter()
.table(RpcAccountingV2::Table)
.to_owned()
.modify_column(
ColumnDef::new(RpcAccountingV2::RpcKeyId)
.big_unsigned()
.not_null()
.default(0),
)
.to_owned(),
)
.await
}
}
/// Learn more at https://docs.rs/sea-query#iden
#[derive(Iden)]
enum RpcAccountingV2 {
Table,
RpcKeyId,
}

@ -1,16 +1,18 @@
[package]
name = "web3_proxy"
version = "0.17.0"
version = "0.27.0"
edition = "2021"
default-run = "web3_proxy_cli"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["deadlock_detection"]
default = ["connectinfo", "deadlock_detection"]
deadlock_detection = ["parking_lot/deadlock_detection"]
mimalloc = ["dep:mimalloc"]
tokio-console = ["dep:tokio-console", "dep:console-subscriber"]
rdkafka-src = ["rdkafka/cmake-build", "rdkafka/libz", "rdkafka/ssl", "rdkafka/zstd-pkg-config"]
connectinfo = []
[dependencies]
deferred-rate-limiter = { path = "../deferred-rate-limiter" }
@ -27,6 +29,7 @@ thread-fast-rng = { path = "../thread-fast-rng" }
# TODO: make sure this time version matches siwe. PR to put this in their prelude
anyhow = { version = "1.0.71", features = ["backtrace"] }
arc-swap = "1.6.0"
argh = "0.1.10"
axum = { version = "0.6.18", features = ["headers", "ws"] }
axum-client-ip = "0.4.1"
@ -47,15 +50,17 @@ gethostname = "0.4.2"
glob = "0.3.1"
handlebars = "4.3.7"
hashbrown = { version = "0.13.2", features = ["serde"] }
hex_fmt = "0.3.0"
hdrhistogram = "7.5.2"
hex_fmt = "0.3.0"
hostname = "0.3.1"
http = "0.2.9"
influxdb2 = { git = "https://github.com/llamanodes/influxdb2", features = ["rustls"] }
influxdb2-structmap = { git = "https://github.com/llamanodes/influxdb2/"}
hostname = "0.3.1"
ipnet = "2.7.2"
itertools = "0.10.5"
listenfd = "1.0.1"
log = "0.4.17"
mimalloc = { version = "0.1.37", optional = true}
moka = { version = "0.11.0", default-features = false, features = ["future"] }
num = "0.4.0"
num-traits = "0.2.15"
@ -69,7 +74,6 @@ rdkafka = { version = "0.29.0" }
regex = "1.8.1"
reqwest = { version = "0.11.17", default-features = false, features = ["json", "tokio-rustls"] }
rmp-serde = "1.1.1"
rustc-hash = "1.1.0"
sentry = { version = "0.31.0", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] }
serde = { version = "1.0.163", features = [] }
serde_json = { version = "1.0.96", default-features = false, features = ["alloc", "raw_value"] }

@ -3,7 +3,9 @@ mod ws;
use crate::block_number::{block_needed, BlockNeeded};
use crate::config::{AppConfig, TopConfig};
use crate::frontend::authorization::{Authorization, RequestMetadata, RpcSecretKey};
use crate::frontend::authorization::{
Authorization, RequestMetadata, RequestOrMethod, RpcSecretKey,
};
use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::jsonrpc::{
@ -14,7 +16,7 @@ use crate::rpcs::consensus::ConsensusWeb3Rpcs;
use crate::rpcs::many::Web3Rpcs;
use crate::rpcs::one::Web3Rpc;
use crate::rpcs::transactions::TxStatus;
use crate::stats::{AppStat, RpcQueryStats, StatBuffer};
use crate::stats::{AppStat, StatBuffer};
use crate::user_token::UserBearerToken;
use anyhow::Context;
use axum::headers::{Origin, Referer, UserAgent};
@ -40,8 +42,6 @@ use migration::sea_orm::{
use migration::sea_query::table::ColumnDef;
use migration::{Alias, DbErr, Migrator, MigratorTrait, Table};
use moka::future::Cache;
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::FutureRecord;
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
use serde::Serialize;
@ -78,7 +78,6 @@ struct ResponseCacheKey {
// to_block is only set when ranges of blocks are requested (like with eth_getLogs)
to_block: Option<Web3ProxyBlock>,
method: String,
// TODO: better type for this
params: Option<serde_json::Value>,
cache_errors: bool,
}
@ -87,7 +86,7 @@ impl ResponseCacheKey {
fn weight(&self) -> usize {
let mut w = self.method.len();
if let Some(p) = self.params.as_ref() {
if let Some(ref p) = self.params {
w += p.to_string().len();
}
@ -206,6 +205,10 @@ impl DatabaseReplica {
}
}
// TODO: this should be a the secret key id, not the key itself!
pub type RpcSecretKeyCache =
Cache<Ulid, AuthorizationChecks, hashbrown::hash_map::DefaultHashBuilder>;
/// The application
// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard
pub struct Web3ProxyApp {
@ -251,8 +254,7 @@ pub struct Web3ProxyApp {
pub vredis_pool: Option<RedisPool>,
/// cache authenticated users so that we don't have to query the database on the hot path
// TODO: should the key be our RpcSecretKey class instead of Ulid?
pub rpc_secret_key_cache:
Cache<Ulid, AuthorizationChecks, hashbrown::hash_map::DefaultHashBuilder>,
pub rpc_secret_key_cache: RpcSecretKeyCache,
/// concurrent/parallel RPC request limits for authenticated users
pub registered_user_semaphores:
Cache<NonZeroU64, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
@ -276,8 +278,7 @@ pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T>
}
}
/// return the first error or okay if everything worked
/// return the first error, or Ok if everything worked
pub async fn flatten_handles<T>(
mut handles: FuturesUnordered<AnyhowJoinHandle<T>>,
) -> anyhow::Result<()> {
@ -493,11 +494,10 @@ impl Web3ProxyApp {
db_conn.clone().map(DatabaseReplica)
};
} else {
if top_config.app.db_replica_url.is_some() {
return Err(anyhow::anyhow!(
"if there is a db_replica_url, there must be a db_url"
));
}
anyhow::ensure!(
top_config.app.db_replica_url.is_none(),
"if there is a db_replica_url, there must be a db_url"
);
warn!("no database. some features will be disabled");
};
@ -516,7 +516,10 @@ impl Web3ProxyApp {
.set("security.protocol", security_protocol)
.create()
{
Ok(k) => kafka_producer = Some(k),
Ok(k) => {
// TODO: create our topic
kafka_producer = Some(k)
}
Err(err) => error!("Failed connecting to kafka. This will not retry. {:?}", err),
}
}
@ -596,15 +599,15 @@ impl Web3ProxyApp {
let mut stat_sender = None;
if let Some(influxdb_bucket) = top_config.app.influxdb_bucket.clone() {
if let Some(spawned_stat_buffer) = StatBuffer::try_spawn(
top_config.app.chain_id,
BILLING_PERIOD_SECONDS,
influxdb_bucket,
top_config.app.chain_id,
db_conn.clone(),
60,
influxdb_client.clone(),
Some(rpc_secret_key_cache.clone()),
60,
1,
BILLING_PERIOD_SECONDS,
stat_buffer_shutdown_receiver,
1,
)? {
// since the database entries are used for accounting, we want to be sure everything is saved before exiting
important_background_handles.push(spawned_stat_buffer.background_handle);
@ -634,7 +637,7 @@ impl Web3ProxyApp {
let mut frontend_registered_user_rate_limiter = None;
let mut login_rate_limiter = None;
if let Some(redis_pool) = vredis_pool.as_ref() {
if let Some(ref redis_pool) = vredis_pool {
if let Some(public_requests_per_period) = top_config.app.public_requests_per_period {
// chain id is included in the app name so that rpc rate limits are per-chain
let rpc_rrl = RedisRateLimiter::new(
@ -669,7 +672,6 @@ impl Web3ProxyApp {
));
}
// TODO: i don't like doing Block::default here! Change this to "None"?
let (watch_consensus_head_sender, watch_consensus_head_receiver) = watch::channel(None);
// TODO: will one receiver lagging be okay? how big should this be?
let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256);
@ -894,7 +896,7 @@ impl Web3ProxyApp {
.context("updating balanced rpcs")?;
if let Some(private_rpc_configs) = new_top_config.private_rpcs {
if let Some(private_rpcs) = self.private_rpcs.as_ref() {
if let Some(ref private_rpcs) = self.private_rpcs {
private_rpcs
.apply_server_configs(self, private_rpc_configs)
.await
@ -906,7 +908,7 @@ impl Web3ProxyApp {
}
if let Some(bundler_4337_rpc_configs) = new_top_config.bundler_4337_rpcs {
if let Some(bundler_4337_rpcs) = self.bundler_4337_rpcs.as_ref() {
if let Some(ref bundler_4337_rpcs) = self.bundler_4337_rpcs {
bundler_4337_rpcs
.apply_server_configs(self, bundler_4337_rpc_configs)
.await
@ -1106,23 +1108,29 @@ impl Web3ProxyApp {
self: &Arc<Self>,
authorization: Arc<Authorization>,
request: JsonRpcRequestEnum,
) -> Web3ProxyResult<(JsonRpcForwardedResponseEnum, Vec<Arc<Web3Rpc>>)> {
) -> Web3ProxyResult<(StatusCode, JsonRpcForwardedResponseEnum, Vec<Arc<Web3Rpc>>)> {
// trace!(?request, "proxy_web3_rpc");
// even though we have timeouts on the requests to our backend providers,
// we need a timeout for the incoming request so that retries don't run forever
// TODO: take this as an optional argument. per user max? expiration time instead of duration?
let max_time = Duration::from_secs(120);
// TODO: take this as an optional argument. check for a different max from the user_tier?
// TODO: how much time was spent on this request alredy?
let max_time = Duration::from_secs(240);
// TODO: use streams and buffers so we don't overwhelm our server
let response = match request {
JsonRpcRequestEnum::Single(request) => {
let (response, rpcs) = timeout(
JsonRpcRequestEnum::Single(mut request) => {
let (status_code, response, rpcs) = timeout(
max_time,
self.proxy_cached_request(&authorization, request, None),
self.proxy_cached_request(&authorization, &mut request, None),
)
.await??;
.await?;
(JsonRpcForwardedResponseEnum::Single(response), rpcs)
(
status_code,
JsonRpcForwardedResponseEnum::Single(response),
rpcs,
)
}
JsonRpcRequestEnum::Batch(requests) => {
let (responses, rpcs) = timeout(
@ -1131,7 +1139,12 @@ impl Web3ProxyApp {
)
.await??;
(JsonRpcForwardedResponseEnum::Batch(responses), rpcs)
// TODO: real status code
(
StatusCode::OK,
JsonRpcForwardedResponseEnum::Batch(responses),
rpcs,
)
}
};
@ -1143,14 +1156,11 @@ impl Web3ProxyApp {
async fn proxy_web3_rpc_requests(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
requests: Vec<JsonRpcRequest>,
mut requests: Vec<JsonRpcRequest>,
) -> Web3ProxyResult<(Vec<JsonRpcForwardedResponse>, Vec<Arc<Web3Rpc>>)> {
// TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though
let num_requests = requests.len();
// TODO: spawn so the requests go in parallel? need to think about rate limiting more if we do that
// TODO: improve flattening
// get the head block now so that any requests that need it all use the same block
// TODO: this still has an edge condition if there is a reorg in the middle of the request!!!
let head_block_num = self
@ -1160,7 +1170,7 @@ impl Web3ProxyApp {
let responses = join_all(
requests
.into_iter()
.iter_mut()
.map(|request| {
self.proxy_cached_request(authorization, request, Some(head_block_num))
})
@ -1168,14 +1178,12 @@ impl Web3ProxyApp {
)
.await;
// TODO: i'm sure this could be done better with iterators
// TODO: stream the response?
let mut collected: Vec<JsonRpcForwardedResponse> = Vec::with_capacity(num_requests);
let mut collected_rpc_names: HashSet<String> = HashSet::new();
let mut collected_rpcs: Vec<Arc<Web3Rpc>> = vec![];
for response in responses {
// TODO: any way to attach the tried rpcs to the error? it is likely helpful
let (response, rpcs) = response?;
let (status_code, response, rpcs) = response;
collected.push(response);
collected_rpcs.extend(rpcs.into_iter().filter(|x| {
@ -1186,6 +1194,8 @@ impl Web3ProxyApp {
true
}
}));
// TODO: what should we do with the status code? check the jsonrpc spec
}
Ok((collected, collected_rpcs))
@ -1212,8 +1222,8 @@ impl Web3ProxyApp {
}
}
/// try to send transactions to the best available rpcs with private mempools
/// if no private rpcs are configured, then some public rpcs are used instead
/// try to send transactions to the best available rpcs with protected/private mempools
/// if no protected rpcs are configured, then some public rpcs are used instead
async fn try_send_protected(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
@ -1261,82 +1271,50 @@ impl Web3ProxyApp {
async fn proxy_cached_request(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
mut request: JsonRpcRequest,
request: &mut JsonRpcRequest,
head_block_num: Option<U64>,
) -> Web3ProxyResult<(JsonRpcForwardedResponse, Vec<Arc<Web3Rpc>>)> {
) -> (StatusCode, JsonRpcForwardedResponse, Vec<Arc<Web3Rpc>>) {
// TODO: move this code to another module so that its easy to turn this trace logging on in dev
trace!("Received request: {:?}", request);
let request_metadata = Arc::new(RequestMetadata::new(request.num_bytes()));
let request_metadata = RequestMetadata::new(
self,
authorization.clone(),
RequestOrMethod::Request(request),
head_block_num.as_ref(),
)
.await;
let mut kafka_stuff = None;
let (status_code, response) = match self
._proxy_cached_request(authorization, request, head_block_num, &request_metadata)
.await
{
Ok(x) => (StatusCode::OK, x),
Err(err) => err.into_response_parts(),
};
if matches!(authorization.checks.proxy_mode, ProxyMode::Debug) {
if let Some(kafka_producer) = self.kafka_producer.clone() {
let kafka_topic = "proxy_cached_request".to_string();
request_metadata.add_response(&response);
let rpc_secret_key_id = authorization
.checks
.rpc_secret_key_id
.map(|x| x.get())
.unwrap_or_default();
// TODO: with parallel request sending, I think there could be a race on this
let rpcs = request_metadata.backend_rpcs_used();
let kafka_key = rmp_serde::to_vec(&rpc_secret_key_id)?;
let request_bytes = rmp_serde::to_vec(&request)?;
let request_hash = Some(keccak256(&request_bytes));
let chain_id = self.config.chain_id;
// another item is added with the response, so initial_capacity is +1 what is needed here
let kafka_headers = OwnedHeaders::new_with_capacity(4)
.insert(Header {
key: "request_hash",
value: request_hash.as_ref(),
})
.insert(Header {
key: "head_block_num",
value: head_block_num.map(|x| x.to_string()).as_ref(),
})
.insert(Header {
key: "chain_id",
value: Some(&chain_id.to_le_bytes()),
});
// save the key and headers for when we log the response
kafka_stuff = Some((
kafka_topic.clone(),
kafka_key.clone(),
kafka_headers.clone(),
));
let f = async move {
let produce_future = kafka_producer.send(
FutureRecord::to(&kafka_topic)
.key(&kafka_key)
.payload(&request_bytes)
.headers(kafka_headers),
Duration::from_secs(0),
);
if let Err((err, _)) = produce_future.await {
error!("produce kafka request log: {}", err);
// TODO: re-queue the msg?
}
};
tokio::spawn(f);
}
}
(status_code, response, rpcs)
}
/// main logic for proxy_cached_request but in a dedicated function so the try operator is easy to use
async fn _proxy_cached_request(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
request: &mut JsonRpcRequest,
head_block_num: Option<U64>,
request_metadata: &Arc<RequestMetadata>,
) -> Web3ProxyResult<JsonRpcForwardedResponse> {
// save the id so we can attach it to the response
// TODO: instead of cloning, take the id out?
let request_id = request.id.clone();
// TODO: don't clone
let request_method = request.method.clone();
// TODO: if eth_chainId or net_version, serve those without querying the backend
// TODO: don't clone?
// TODO: serve net_version without querying the backend
let response: JsonRpcForwardedResponse = match request_method.as_ref() {
// lots of commands are blocked
method @ ("db_getHex"
@ -1449,15 +1427,15 @@ impl Web3ProxyApp {
.try_proxy_connection(
authorization,
request,
Some(&request_metadata),
Some(request_metadata),
None,
None,
)
.await?
}
None => {
// TODO: stats!
// TODO: not synced error?
// TODO: stats even when we error!
// TODO: use Web3ProxyError? dedicated error for no 4337 bundlers
return Err(anyhow::anyhow!("no bundler_4337_rpcs available").into());
}
},
@ -1493,26 +1471,19 @@ impl Web3ProxyApp {
.try_proxy_connection(
authorization,
request,
Some(&request_metadata),
Some(request_metadata),
None,
None,
)
.await?;
let mut gas_estimate: U256 = if let Some(gas_estimate) = response.result.take() {
serde_json::from_str(gas_estimate.get())
.or(Err(Web3ProxyError::GasEstimateNotU256))?
} else {
// i think this is always an error response
let rpcs = request_metadata.backend_requests.lock().clone();
if let Some(gas_estimate) = response.result.take() {
let mut gas_estimate: U256 = serde_json::from_str(gas_estimate.get())
.or(Err(Web3ProxyError::GasEstimateNotU256))?;
// TODO! save stats
return Ok((response, rpcs));
};
let gas_increase =
if let Some(gas_increase_percent) = self.config.gas_increase_percent {
let gas_increase = if let Some(gas_increase_percent) =
self.config.gas_increase_percent
{
let gas_increase = gas_estimate * gas_increase_percent / U256::from(100);
let min_gas_increase = self.config.gas_increase_min.unwrap_or_default();
@ -1522,9 +1493,12 @@ impl Web3ProxyApp {
self.config.gas_increase_min.unwrap_or_default()
};
gas_estimate += gas_increase;
gas_estimate += gas_increase;
JsonRpcForwardedResponse::from_value(json!(gas_estimate), request_id)
JsonRpcForwardedResponse::from_value(json!(gas_estimate), request_id)
} else {
response
}
}
"eth_getTransactionReceipt" | "eth_getTransactionByHash" => {
// try to get the transaction without specifying a min_block_height
@ -1532,8 +1506,8 @@ impl Web3ProxyApp {
.balanced_rpcs
.try_proxy_connection(
authorization,
request.clone(),
Some(&request_metadata),
request,
Some(request_metadata),
None,
None,
)
@ -1551,7 +1525,7 @@ impl Web3ProxyApp {
.try_proxy_connection(
authorization,
request,
Some(&request_metadata),
Some(request_metadata),
Some(&U64::one()),
None,
)
@ -1583,7 +1557,7 @@ impl Web3ProxyApp {
let mut response = self
.try_send_protected(
authorization,
&request,
request,
request_metadata.clone(),
num_public_rpcs,
)
@ -1592,24 +1566,36 @@ impl Web3ProxyApp {
// sometimes we get an error that the transaction is already known by our nodes,
// that's not really an error. Return the hash like a successful response would.
// TODO: move this to a helper function
if let Some(response_error) = response.error.as_ref() {
if let Some(ref response_error) = response.error {
if response_error.code == -32000
&& (response_error.message == "ALREADY_EXISTS: already known"
|| response_error.message
== "INTERNAL_ERROR: existing tx with same hash")
{
// TODO: expect instead of web3_context?
let params = request
.params
.as_mut()
.web3_context("there must be params if we got this far")?;
let params = params
.as_array()
.web3_context("there must be an array if we got this far")?
.ok_or_else(|| {
Web3ProxyError::BadRequest(
"Unable to get array from params".to_string(),
)
})?
.get(0)
.web3_context("there must be an item if we got this far")?
.ok_or_else(|| {
Web3ProxyError::BadRequest(
"Unable to get item 0 from params".to_string(),
)
})?
.as_str()
.web3_context("there must be a string if we got this far")?;
.ok_or_else(|| {
Web3ProxyError::BadRequest(
"Unable to get string from params item 0".to_string(),
)
})?;
let params = Bytes::from_str(params)
.expect("there must be Bytes if we got this far");
@ -1617,6 +1603,7 @@ impl Web3ProxyApp {
let rlp = Rlp::new(params.as_ref());
if let Ok(tx) = Transaction::decode(&rlp) {
// TODO: decode earlier and confirm that tx.chain_id (if set) matches self.config.chain_id
let tx_hash = json!(tx.hash());
trace!("tx_hash: {:#?}", tx_hash);
@ -1630,17 +1617,16 @@ impl Web3ProxyApp {
}
// emit transaction count stats
if let Some(salt) = self.config.public_recent_ips_salt.as_ref() {
if let Some(tx_hash) = response.result.clone() {
if let Some(ref salt) = self.config.public_recent_ips_salt {
if let Some(ref tx_hash) = response.result {
let now = Utc::now().timestamp();
let salt = salt.clone();
let app = self.clone();
let salted_tx_hash = format!("{}:{}", salt, tx_hash);
let f = async move {
match app.redis_conn().await {
Ok(Some(mut redis_conn)) => {
let salted_tx_hash = format!("{}:{}", salt, tx_hash);
let hashed_tx_hash =
Bytes::from(keccak256(salted_tx_hash.as_bytes()));
@ -1685,7 +1671,7 @@ impl Web3ProxyApp {
Some(request_id),
),
"net_listening" => {
// TODO: only if there are some backends on balanced_rpcs?
// TODO: only true if there are some backends on balanced_rpcs?
JsonRpcForwardedResponse::from_value(serde_json::Value::Bool(true), request_id)
}
"net_peerCount" => JsonRpcForwardedResponse::from_value(
@ -1705,35 +1691,35 @@ impl Web3ProxyApp {
|| !params.get(0).map(|x| x.is_string()).unwrap_or(false)
{
// TODO: what error code?
return Ok((
JsonRpcForwardedResponse::from_str(
"Invalid request",
Some(-32600),
Some(request_id),
),
vec![],
));
}
let param = Bytes::from_str(
params[0]
.as_str()
.ok_or(Web3ProxyError::ParseBytesError(None))
.web3_context("parsing params 0 into str then bytes")?,
)
.map_err(|x| {
trace!("bad request: {:?}", x);
Web3ProxyError::BadRequest(
"param 0 could not be read as H256".to_string(),
// TODO: use Web3ProxyError::BadRequest
JsonRpcForwardedResponse::from_str(
"Invalid request",
Some(-32600),
Some(request_id),
)
})?;
} else {
// TODO: BadRequest instead of web3_context
let param = Bytes::from_str(
params[0]
.as_str()
.ok_or(Web3ProxyError::ParseBytesError(None))
.web3_context("parsing params 0 into str then bytes")?,
)
.map_err(|x| {
trace!("bad request: {:?}", x);
Web3ProxyError::BadRequest(
"param 0 could not be read as H256".to_string(),
)
})?;
let hash = H256::from(keccak256(param));
let hash = H256::from(keccak256(param));
JsonRpcForwardedResponse::from_value(json!(hash), request_id)
JsonRpcForwardedResponse::from_value(json!(hash), request_id)
}
}
_ => {
// TODO: this needs the correct error code in the response
// TODO: Web3ProxyError::BadRequest instead?
JsonRpcForwardedResponse::from_str(
"invalid request",
Some(StatusCode::BAD_REQUEST.as_u16().into()),
@ -1759,6 +1745,9 @@ impl Web3ProxyApp {
.or(self.balanced_rpcs.head_block_num())
.ok_or(Web3ProxyError::NoServersSynced)?;
// TODO: don't clone. this happens way too much. maybe &mut?
let mut request = request.clone();
// we do this check before checking caches because it might modify the request params
// TODO: add a stat for archive vs full since they should probably cost different
// TODO: this cache key can be rather large. is that okay?
@ -1867,7 +1856,7 @@ impl Web3ProxyApp {
.balanced_rpcs
.try_proxy_connection(
&authorization,
request,
&request,
Some(&request_metadata),
from_block_num.as_ref(),
to_block_num.as_ref(),
@ -1888,7 +1877,7 @@ impl Web3ProxyApp {
self.balanced_rpcs
.try_proxy_connection(
&authorization,
request,
&request,
Some(&request_metadata),
None,
None,
@ -1897,7 +1886,7 @@ impl Web3ProxyApp {
}
};
// since this data came likely out of a cache, the id is not going to match
// since this data likely came out of a cache, the response.id is not going to match the request.id
// replace the id with our request's id.
response.id = request_id;
@ -1905,52 +1894,7 @@ impl Web3ProxyApp {
}
};
// save the rpcs so they can be included in a response header
let rpcs = request_metadata.backend_requests.lock().clone();
// send stats used for accounting and graphs
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
Some(request_method),
authorization.clone(),
request_metadata,
response.num_bytes(),
);
stat_sender
.send_async(response_stat.into())
.await
.map_err(Web3ProxyError::SendAppStatError)?;
}
// send debug info as a kafka log
if let Some((kafka_topic, kafka_key, kafka_headers)) = kafka_stuff {
let kafka_producer = self
.kafka_producer
.clone()
.expect("if headers are set, producer must exist");
let response_bytes =
rmp_serde::to_vec(&response).web3_context("failed msgpack serialize response")?;
let f = async move {
let produce_future = kafka_producer.send(
FutureRecord::to(&kafka_topic)
.key(&kafka_key)
.payload(&response_bytes)
.headers(kafka_headers),
Duration::from_secs(0),
);
if let Err((err, _)) = produce_future.await {
error!("produce kafka response log: {}", err);
}
};
tokio::spawn(f);
}
Ok((response, rpcs))
Ok(response)
}
}

@ -1,56 +1,55 @@
//! Websocket-specific functions for the Web3ProxyApp
use super::Web3ProxyApp;
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::{Authorization, RequestMetadata, RequestOrMethod};
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult};
use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcRequest;
use crate::rpcs::transactions::TxStatus;
use crate::stats::RpcQueryStats;
use axum::extract::ws::Message;
use ethers::prelude::U64;
use futures::future::AbortHandle;
use futures::future::Abortable;
use futures::stream::StreamExt;
use log::{trace, warn};
use log::trace;
use serde_json::json;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
impl Web3ProxyApp {
// TODO: #[measure([ErrorCount, HitCount, ResponseTime, Throughput])]
pub async fn eth_subscribe<'a>(
self: &'a Arc<Self>,
authorization: Arc<Authorization>,
request_json: JsonRpcRequest,
jsonrpc_request: JsonRpcRequest,
subscription_count: &'a AtomicUsize,
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
response_sender: flume::Sender<Message>,
) -> Web3ProxyResult<(AbortHandle, JsonRpcForwardedResponse)> {
// TODO: this is not efficient
let request_bytes = serde_json::to_string(&request_json)
.web3_context("finding request size")?
.len();
let request_metadata = Arc::new(RequestMetadata::new(request_bytes));
let request_metadata = RequestMetadata::new(
self,
authorization.clone(),
RequestOrMethod::Request(&jsonrpc_request),
None,
)
.await;
let (subscription_abort_handle, subscription_registration) = AbortHandle::new_pair();
// TODO: this only needs to be unique per connection. we don't need it globably unique
// TODO: have a max number of subscriptions per key/ip. have a global max number of subscriptions? how should this be calculated?
let subscription_id = subscription_count.fetch_add(1, atomic::Ordering::SeqCst);
let subscription_id = U64::from(subscription_id);
// save the id so we can use it in the response
let id = request_json.id.clone();
let id = jsonrpc_request.id.clone();
// TODO: calling json! on every request is probably not fast. but we can only match against
// TODO: i think we need a stricter EthSubscribeRequest type that JsonRpcRequest can turn into
match request_json.params.as_ref() {
match jsonrpc_request.params.as_ref() {
Some(x) if x == &json!(["newHeads"]) => {
let authorization = authorization.clone();
let head_block_receiver = self.watch_consensus_head_receiver.clone();
let stat_sender = self.stat_sender.clone();
let app = self.clone();
trace!("newHeads subscription {:?}", subscription_id);
tokio::spawn(async move {
@ -66,8 +65,13 @@ impl Web3ProxyApp {
continue;
};
// TODO: what should the payload for RequestMetadata be?
let request_metadata = Arc::new(RequestMetadata::new(0));
let subscription_request_metadata = RequestMetadata::new(
&app,
authorization.clone(),
RequestOrMethod::Method("eth_subscribe(newHeads)", 0),
Some(new_head.number()),
)
.await;
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
@ -83,33 +87,20 @@ impl Web3ProxyApp {
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use response.num_bytes() here, but since we already have the string, this is easier
// we could use JsonRpcForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: do clients support binary messages?
// TODO: can we check a content type header?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
// TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects.
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
Some("eth_subscription(newHeads)".to_string()),
authorization.clone(),
request_metadata.clone(),
response_bytes,
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!(
"stat_sender failed inside newPendingTransactions: {:?}",
err
);
}
}
subscription_request_metadata.add_response(response_bytes);
}
trace!("closed newHeads subscription {:?}", subscription_id);
@ -117,8 +108,7 @@ impl Web3ProxyApp {
}
Some(x) if x == &json!(["newPendingTransactions"]) => {
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let stat_sender = self.stat_sender.clone();
let authorization = authorization.clone();
let app = self.clone();
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
@ -133,7 +123,13 @@ impl Web3ProxyApp {
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let request_metadata = Arc::new(RequestMetadata::new(0));
let subscription_request_metadata = RequestMetadata::new(
&app,
authorization.clone(),
RequestOrMethod::Method("eth_subscribe(newPendingTransactions)", 0),
None,
)
.await;
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
@ -154,9 +150,11 @@ impl Web3ProxyApp {
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use response.num_bytes() here, but since we already have the string, this is easier
// TODO: test that this len is the same as JsonRpcForwardedResponseEnum.num_bytes()
let response_bytes = response_str.len();
subscription_request_metadata.add_response(response_bytes);
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
@ -164,23 +162,6 @@ impl Web3ProxyApp {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
Some("eth_subscription(newPendingTransactions)".to_string()),
authorization.clone(),
request_metadata.clone(),
response_bytes,
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!(
"stat_sender failed inside newPendingTransactions: {:?}",
err
);
}
}
}
trace!(
@ -191,9 +172,8 @@ impl Web3ProxyApp {
}
Some(x) if x == &json!(["newPendingFullTransactions"]) => {
// TODO: too much copy/pasta with newPendingTransactions
let authorization = authorization.clone();
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let stat_sender = self.stat_sender.clone();
let app = self.clone();
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
@ -208,7 +188,13 @@ impl Web3ProxyApp {
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let request_metadata = Arc::new(RequestMetadata::new(0));
let subscription_request_metadata = RequestMetadata::new(
&app,
authorization.clone(),
RequestOrMethod::Method("eth_subscribe(newPendingFullTransactions)", 0),
None,
)
.await;
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
@ -227,12 +213,11 @@ impl Web3ProxyApp {
},
});
subscription_request_metadata.add_response(&response_json);
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use response.num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
@ -240,23 +225,6 @@ impl Web3ProxyApp {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
Some("eth_subscription(newPendingFullTransactions)".to_string()),
authorization.clone(),
request_metadata.clone(),
response_bytes,
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!(
"stat_sender failed inside newPendingFullTransactions: {:?}",
err
);
}
}
}
trace!(
@ -267,9 +235,8 @@ impl Web3ProxyApp {
}
Some(x) if x == &json!(["newPendingRawTransactions"]) => {
// TODO: too much copy/pasta with newPendingTransactions
let authorization = authorization.clone();
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let stat_sender = self.stat_sender.clone();
let app = self.clone();
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
@ -284,7 +251,13 @@ impl Web3ProxyApp {
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let request_metadata = Arc::new(RequestMetadata::new(0));
let subscription_request_metadata = RequestMetadata::new(
&app,
authorization.clone(),
"eth_subscribe(newPendingRawTransactions)",
None,
)
.await;
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
@ -317,22 +290,7 @@ impl Web3ProxyApp {
break;
};
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
Some("eth_subscription(newPendingRawTransactions)".to_string()),
authorization.clone(),
request_metadata.clone(),
response_bytes,
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!(
"stat_sender failed inside newPendingRawTransactions: {:?}",
err
);
}
}
subscription_request_metadata.add_response(response_bytes);
}
trace!(
@ -348,19 +306,7 @@ impl Web3ProxyApp {
let response = JsonRpcForwardedResponse::from_value(json!(subscription_id), id);
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
Some(request_json.method.clone()),
authorization.clone(),
request_metadata,
response.num_bytes(),
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!("stat_sender failed inside websocket: {:?}", err);
}
}
request_metadata.add_response(&response);
// TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct?
Ok((subscription_abort_handle, response))

@ -60,13 +60,12 @@ async fn main() -> anyhow::Result<()> {
.context("unknown chain id for check_url")?;
if let Some(chain_id) = cli_config.chain_id {
if chain_id != check_id {
return Err(anyhow::anyhow!(
"chain_id of check_url is wrong! Need {}. Found {}",
chain_id,
check_id,
));
}
anyhow::ensure!(
chain_id == check_id,
"chain_id of check_url is wrong! Need {}. Found {}",
chain_id,
check_id,
);
}
let compare_url: String = match cli_config.compare_url {
@ -93,13 +92,12 @@ async fn main() -> anyhow::Result<()> {
.await
.context("unknown chain id for compare_url")?;
if check_id != compare_id {
return Err(anyhow::anyhow!(
"chain_id does not match! Need {}. Found {}",
check_id,
compare_id,
));
}
anyhow::ensure!(
check_id == compare_id,
"chain_id does not match! Need {}. Found {}",
check_id,
compare_id,
);
// start ids at 2 because id 1 was checking the chain id
let counter = AtomicU32::new(2);

@ -38,6 +38,13 @@ use web3_proxy::{
config::TopConfig,
};
#[cfg(feature = "mimalloc")]
use mimalloc::MiMalloc;
#[cfg(feature = "mimalloc")]
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
#[cfg(feature = "deadlock")]
use {parking_lot::deadlock, std::thread, tokio::time::Duration};
@ -120,10 +127,10 @@ fn main() -> anyhow::Result<()> {
// if RUST_LOG isn't set, configure a default
// TODO: is there a better way to do this?
#[cfg(tokio_console)]
#[cfg(feature = "tokio_console")]
console_subscriber::init();
#[cfg(not(tokio_console))]
#[cfg(not(feature = "tokio_console"))]
let rust_log = match std::env::var("RUST_LOG") {
Ok(x) => x,
Err(_) => match std::env::var("WEB3_PROXY_TRACE").map(|x| x == "true") {
@ -202,7 +209,6 @@ fn main() -> anyhow::Result<()> {
(None, None)
};
#[cfg(not(tokio_console))]
{
let logger = env_logger::builder().parse_filters(&rust_log).build();
@ -267,9 +273,6 @@ fn main() -> anyhow::Result<()> {
}
// set up tokio's async runtime
#[cfg(tokio_uring)]
let mut rt_builder = tokio_uring::Builder::new_multi_thread();
#[cfg(not(tokio_uring))]
let mut rt_builder = runtime::Builder::new_multi_thread();
rt_builder.enable_all();
@ -278,7 +281,7 @@ fn main() -> anyhow::Result<()> {
rt_builder.worker_threads(cli_config.workers);
}
if let Some(top_config) = top_config.as_ref() {
if let Some(ref top_config) = top_config {
let chain_id = top_config.app.chain_id;
rt_builder.thread_name_fn(move || {

@ -1,4 +1,4 @@
use anyhow::Context;
use anyhow::{anyhow, Context};
use argh::FromArgs;
use entities::{rpc_accounting, rpc_key};
use futures::stream::FuturesUnordered;
@ -9,17 +9,17 @@ use migration::sea_orm::{
ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, UpdateResult,
};
use migration::{Expr, Value};
use std::net::{IpAddr, Ipv4Addr};
use parking_lot::Mutex;
use std::num::NonZeroU64;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::time::Instant;
use web3_proxy::app::{AuthorizationChecks, BILLING_PERIOD_SECONDS};
use ulid::Ulid;
use web3_proxy::app::BILLING_PERIOD_SECONDS;
use web3_proxy::config::TopConfig;
use web3_proxy::frontend::authorization::{
Authorization, AuthorizationType, RequestMetadata, RpcSecretKey,
};
use web3_proxy::stats::{RpcQueryStats, StatBuffer};
use web3_proxy::frontend::authorization::{Authorization, RequestMetadata, RpcSecretKey};
use web3_proxy::rpcs::one::Web3Rpc;
use web3_proxy::stats::StatBuffer;
#[derive(FromArgs, PartialEq, Eq, Debug)]
/// Migrate towards influxdb and rpc_accounting_v2 from rpc_accounting
@ -67,28 +67,28 @@ impl MigrateStatsToV2 {
};
// Spawn the stat-sender
let stat_sender = if let Some(emitter_spawn) = StatBuffer::try_spawn(
top_config.app.chain_id,
let emitter_spawn = StatBuffer::try_spawn(
BILLING_PERIOD_SECONDS,
top_config
.app
.influxdb_bucket
.clone()
.context("No influxdb bucket was provided")?,
top_config.app.chain_id,
Some(db_conn.clone()),
30,
influxdb_client.clone(),
None,
30,
1,
BILLING_PERIOD_SECONDS,
rpc_account_shutdown_recevier,
)? {
// since the database entries are used for accounting, we want to be sure everything is saved before exiting
important_background_handles.push(emitter_spawn.background_handle);
1,
)
.context("Error spawning stat buffer")?
.context("No stat buffer spawned. Maybe missing influx or db credentials?")?;
Some(emitter_spawn.stat_sender)
} else {
None
};
// since the database entries are used for accounting, we want to be sure everything is saved before exiting
important_background_handles.push(emitter_spawn.background_handle);
let stat_sender = emitter_spawn.stat_sender;
let migration_timestamp = chrono::offset::Utc::now();
@ -110,7 +110,10 @@ impl MigrateStatsToV2 {
// (2) Create request metadata objects to match the old data
// Iterate through all old rows, and put them into the above objects.
for x in old_records.iter() {
let authorization_checks = match x.rpc_key_id {
let mut authorization = Authorization::internal(None)
.context("failed creating internal authorization")?;
match x.rpc_key_id {
Some(rpc_key_id) => {
let rpc_key_obj = rpc_key::Entity::find()
.filter(rpc_key::Column::Id.eq(rpc_key_id))
@ -118,34 +121,16 @@ impl MigrateStatsToV2 {
.await?
.context("Could not find rpc_key_obj for the given rpc_key_id")?;
// TODO: Create authrization
// We can probably also randomly generate this, as we don't care about the user (?)
AuthorizationChecks {
user_id: rpc_key_obj.user_id,
rpc_secret_key: Some(RpcSecretKey::Uuid(rpc_key_obj.secret_key)),
rpc_secret_key_id: Some(
NonZeroU64::new(rpc_key_id)
.context("Could not use rpc_key_id to create a u64")?,
),
..Default::default()
}
authorization.checks.user_id = rpc_key_obj.user_id;
authorization.checks.rpc_secret_key =
Some(RpcSecretKey::Uuid(rpc_key_obj.secret_key));
authorization.checks.rpc_secret_key_id =
NonZeroU64::try_from(rpc_key_id).ok();
}
None => Default::default(),
};
let authorization_type = AuthorizationType::Internal;
let authorization = Arc::new(
Authorization::try_new(
authorization_checks,
None,
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
None,
None,
None,
authorization_type,
)
.context("Initializing Authorization Struct was not successful")?,
);
let authorization = Arc::new(authorization);
// It will be like a fork basically (to simulate getting multiple single requests ...)
// Iterate through all frontend requests
@ -178,46 +163,38 @@ impl MigrateStatsToV2 {
// Add module at the last step to include for any remained that we missed ... (?)
// TODO: Create RequestMetadata
let backend_rpcs: Vec<_> = (0..int_backend_requests)
.map(|_| Arc::new(Web3Rpc::default()))
.collect();
let request_ulid = Ulid::new();
// Create RequestMetadata
let request_metadata = RequestMetadata {
start_instant: Instant::now(), // This is overwritten later on
request_bytes: int_request_bytes, // Get the mean of all the request bytes
archive_request: x.archive_request.into(),
backend_requests: Default::default(), // This is not used, instead we modify the field later
no_servers: 0.into(), // This is not relevant in the new version
authorization: Some(authorization.clone()),
backend_requests: Mutex::new(backend_rpcs),
error_response: x.error_response.into(),
// debug data is in kafka, not mysql or influx
kafka_debug_logger: None,
method: x.method.clone(),
// This is not relevant in the new version
no_servers: 0.into(),
// Get the mean of all the request bytes
request_bytes: int_request_bytes as usize,
response_bytes: int_response_bytes.into(),
// We did not initially record this data
response_from_backup_rpc: false.into(),
response_timestamp: x.period_datetime.timestamp().into(),
response_millis: int_response_millis.into(),
// We just don't have this data
response_from_backup_rpc: false.into(), // I think we did not record this back then // Default::default()
// This is overwritten later on
start_instant: Instant::now(),
stat_sender: Some(stat_sender.clone()),
request_ulid,
};
// (3) Send through a channel to a stat emitter
// Send it to the stats sender
if let Some(stat_sender_ref) = stat_sender.as_ref() {
// info!("Method is: {:?}", x.clone().method);
let mut response_stat = RpcQueryStats::new(
x.clone().method,
authorization.clone(),
Arc::new(request_metadata),
(int_response_bytes)
.try_into()
.context("sum bytes average is not calculated properly")?,
);
// Modify the timestamps ..
response_stat.modify_struct(
int_response_millis,
x.period_datetime.timestamp(),
int_backend_requests,
);
// info!("Sending stats: {:?}", response_stat);
stat_sender_ref
// .send(response_stat.into())
.send_async(response_stat.into())
.await
.context("stat_sender sending response_stat")?;
} else {
panic!("Stat sender was not spawned!");
if let Some(x) = request_metadata.try_send_stat()? {
return Err(anyhow!("failed saving stat! {:?}", x));
}
}
}

@ -195,7 +195,7 @@ async fn run(
// start the frontend port
let frontend_handle = tokio::spawn(frontend::serve(
app_frontend_port,
spawned_app.app.clone(),
spawned_app.app,
frontend_shutdown_receiver,
frontend_shutdown_complete_sender,
));
@ -417,17 +417,14 @@ mod tests {
let prometheus_port = 0;
let shutdown_sender = shutdown_sender.clone();
tokio::spawn(async move {
run(
top_config,
None,
frontend_port,
prometheus_port,
2,
shutdown_sender,
)
.await
})
tokio::spawn(run(
top_config,
None,
frontend_port,
prometheus_port,
2,
shutdown_sender,
))
};
// TODO: do something to the node. query latest block, mine another block, query again

@ -108,11 +108,9 @@ impl RpcAccountingSubCommand {
.all(db_conn)
.await?;
if u_keys.is_empty() {
return Err(anyhow::anyhow!("no user keys"));
}
anyhow::ensure!(!u_keys.is_empty(), "no user keys");
let u_key_ids: Vec<_> = u_keys.iter().map(|x| x.id).collect();
let u_key_ids: Vec<_> = u_keys.into_iter().map(|x| x.id).collect();
condition = condition.add(rpc_accounting::Column::RpcKeyId.is_in(u_key_ids));
}

@ -217,13 +217,12 @@ async fn check_rpc(
.await
.context(format!("awaiting response from {}", rpc))?;
if !response.status().is_success() {
return Err(anyhow::anyhow!(
"bad response from {}: {}",
rpc,
response.status(),
));
}
anyhow::ensure!(
response.status().is_success(),
"bad response from {}: {}",
rpc,
response.status(),
);
let body = response
.text()

@ -141,7 +141,7 @@ impl SentrydSubCommand {
None,
);
if let Some(pagerduty_async) = pagerduty_async.as_ref() {
if let Some(ref pagerduty_async) = pagerduty_async {
info!(
"sending to pagerduty: {:#}",
serde_json::to_string_pretty(&alert)?

@ -32,12 +32,11 @@ impl UserImportSubCommand {
pub async fn main(self, db_conn: &DatabaseConnection) -> anyhow::Result<()> {
let import_dir = Path::new(&self.input_dir);
if !import_dir.exists() {
return Err(anyhow::anyhow!(
"import dir ({}) does not exist!",
import_dir.to_string_lossy()
));
}
anyhow::ensure!(
import_dir.exists(),
"import dir ({}) does not exist!",
import_dir.to_string_lossy()
);
let user_glob_path = import_dir.join(format!("{}-users-*.json", self.export_timestamp));
@ -180,10 +179,7 @@ impl UserImportSubCommand {
.await?
{
// make sure it belongs to the mapped user
if existing_rk.user_id != mapped_id {
// TODO: error or import the rest?
return Err(anyhow::anyhow!("unexpected user id"));
}
anyhow::ensure!(existing_rk.user_id == mapped_id, "unexpected user id");
// the key exists under the expected user. we are good to continue
} else {

@ -108,8 +108,8 @@ pub async fn admin_login_get(
let login_domain = app
.config
.login_domain
.clone()
.unwrap_or_else(|| "llamanodes.com".to_string());
.as_deref()
.unwrap_or("llamanodes.com");
// Also there must basically be a token, that says that one admin logins _as a user_.
// I'm not yet fully sure how to handle with that logic specifically ...

@ -3,29 +3,38 @@
use super::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use super::rpc_proxy_ws::ProxyMode;
use crate::app::{AuthorizationChecks, Web3ProxyApp, APP_USER_AGENT};
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
use crate::rpcs::one::Web3Rpc;
use crate::stats::{AppStat, BackendRequests, RpcQueryStats};
use crate::user_token::UserBearerToken;
use axum::headers::authorization::Bearer;
use axum::headers::{Header, Origin, Referer, UserAgent};
use chrono::Utc;
use core::fmt;
use deferred_rate_limiter::DeferredRateLimitResult;
use derive_more::From;
use entities::sea_orm_active_enums::TrackingLevel;
use entities::{balance, login, rpc_key, user, user_tier};
use ethers::types::Bytes;
use ethers::types::{Bytes, U64};
use ethers::utils::keccak256;
use futures::TryFutureExt;
use hashbrown::HashMap;
use http::HeaderValue;
use ipnet::IpNet;
use log::{error, warn};
use log::{error, trace, warn};
use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use parking_lot::Mutex;
use rdkafka::message::{Header as KafkaHeader, OwnedHeaders as KafkaOwnedHeaders, OwnedMessage};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout as KafkaTimeout;
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::RedisRateLimitResult;
use std::fmt::Display;
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::mem;
use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize};
use std::time::Duration;
use std::{net::IpAddr, str::FromStr, sync::Arc};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::task::JoinHandle;
use tokio::time::Instant;
use ulid::Ulid;
use uuid::Uuid;
@ -70,37 +79,448 @@ pub struct Authorization {
pub authorization_type: AuthorizationType,
}
pub struct KafkaDebugLogger {
topic: String,
key: Vec<u8>,
headers: KafkaOwnedHeaders,
producer: FutureProducer,
num_requests: AtomicUsize,
num_responses: AtomicUsize,
}
impl fmt::Debug for KafkaDebugLogger {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("KafkaDebugLogger")
.field("topic", &self.topic)
.finish_non_exhaustive()
}
}
type KafkaLogResult = Result<(i32, i64), (rdkafka::error::KafkaError, OwnedMessage)>;
impl KafkaDebugLogger {
fn try_new(
app: &Web3ProxyApp,
authorization: Arc<Authorization>,
head_block_num: Option<&U64>,
kafka_topic: &str,
request_ulid: Ulid,
) -> Option<Arc<Self>> {
let kafka_producer = app.kafka_producer.clone()?;
let kafka_topic = kafka_topic.to_string();
let rpc_secret_key_id = authorization
.checks
.rpc_secret_key_id
.map(|x| x.get())
.unwrap_or_default();
let kafka_key =
rmp_serde::to_vec(&rpc_secret_key_id).expect("ids should always serialize with rmp");
let chain_id = app.config.chain_id;
let head_block_num = head_block_num
.copied()
.or_else(|| app.balanced_rpcs.head_block_num());
// TODO: would be nice to have the block hash too
// another item is added with the response, so initial_capacity is +1 what is needed here
let kafka_headers = KafkaOwnedHeaders::new_with_capacity(6)
.insert(KafkaHeader {
key: "rpc_secret_key_id",
value: authorization
.checks
.rpc_secret_key_id
.map(|x| x.to_string())
.as_ref(),
})
.insert(KafkaHeader {
key: "ip",
value: Some(&authorization.ip.to_string()),
})
.insert(KafkaHeader {
key: "request_ulid",
value: Some(&request_ulid.to_string()),
})
.insert(KafkaHeader {
key: "head_block_num",
value: head_block_num.map(|x| x.to_string()).as_ref(),
})
.insert(KafkaHeader {
key: "chain_id",
value: Some(&chain_id.to_le_bytes()),
});
// save the key and headers for when we log the response
let x = Self {
topic: kafka_topic,
key: kafka_key,
headers: kafka_headers,
producer: kafka_producer,
num_requests: 0.into(),
num_responses: 0.into(),
};
let x = Arc::new(x);
Some(x)
}
fn background_log(&self, payload: Vec<u8>) -> JoinHandle<KafkaLogResult> {
let topic = self.topic.clone();
let key = self.key.clone();
let producer = self.producer.clone();
let headers = self.headers.clone();
let f = async move {
let record = FutureRecord::to(&topic)
.key(&key)
.payload(&payload)
.headers(headers);
let produce_future =
producer.send(record, KafkaTimeout::After(Duration::from_secs(5 * 60)));
let kafka_response = produce_future.await;
if let Err((err, msg)) = kafka_response.as_ref() {
error!("produce kafka request: {} - {:?}", err, msg);
// TODO: re-queue the msg? log somewhere else like a file on disk?
// TODO: this is bad and should probably trigger an alarm
};
kafka_response
};
tokio::spawn(f)
}
/// for opt-in debug usage, log the request to kafka
/// TODO: generic type for request
pub fn log_debug_request(&self, request: &JsonRpcRequest) -> JoinHandle<KafkaLogResult> {
// TODO: is rust message pack a good choice? try rkyv instead
let payload =
rmp_serde::to_vec(&request).expect("requests should always serialize with rmp");
self.num_requests.fetch_add(1, atomic::Ordering::SeqCst);
self.background_log(payload)
}
pub fn log_debug_response<R>(&self, response: &R) -> JoinHandle<KafkaLogResult>
where
R: serde::Serialize,
{
let payload =
rmp_serde::to_vec(&response).expect("requests should always serialize with rmp");
self.num_responses.fetch_add(1, atomic::Ordering::SeqCst);
self.background_log(payload)
}
}
#[derive(Debug)]
pub struct RequestMetadata {
pub start_instant: tokio::time::Instant,
pub request_bytes: u64,
// TODO: do we need atomics? seems like we should be able to pass a &mut around
// TODO: "archive" isn't really a boolean.
/// TODO: set archive_request during the new instead of after
/// TODO: this is more complex than "requires a block older than X height". different types of data can be pruned differently
pub archive_request: AtomicBool,
pub authorization: Option<Arc<Authorization>>,
pub request_ulid: Ulid,
/// Size of the JSON request. Does not include headers or things like that.
pub request_bytes: usize,
/// users can opt out of method tracking for their personal dashboads
/// but we still have to store the method at least temporarily for cost calculations
pub method: Option<String>,
/// Instant that the request was received (or at least close to it)
/// We use Instant and not timestamps to avoid problems with leap seconds and similar issues
pub start_instant: tokio::time::Instant,
/// if this is empty, there was a cache_hit
pub backend_requests: Mutex<Vec<Arc<Web3Rpc>>>,
/// otherwise, it is populated with any rpc servers that were used by this request
pub backend_requests: BackendRequests,
/// The number of times the request got stuck waiting because no servers were synced
pub no_servers: AtomicU64,
/// If handling the request hit an application error
/// This does not count things like a transcation reverting or a malformed request
pub error_response: AtomicBool,
/// Size in bytes of the JSON response. Does not include headers or things like that.
pub response_bytes: AtomicU64,
/// How many milliseconds it took to respond to the request
pub response_millis: AtomicU64,
/// What time the (first) response was proxied.
/// TODO: think about how to store response times for ProxyMode::Versus
pub response_timestamp: AtomicI64,
/// True if the response required querying a backup RPC
/// RPC aggregators that query multiple providers to compare response may use this header to ignore our response.
pub response_from_backup_rpc: AtomicBool,
/// ProxyMode::Debug logs requests and responses with Kafka
/// TODO: maybe this shouldn't be determined by ProxyMode. A request param should probably enable this
pub kafka_debug_logger: Option<Arc<KafkaDebugLogger>>,
/// Channel to send stats to
pub stat_sender: Option<flume::Sender<AppStat>>,
}
impl Default for RequestMetadata {
fn default() -> Self {
Self {
archive_request: Default::default(),
authorization: Default::default(),
backend_requests: Default::default(),
error_response: Default::default(),
kafka_debug_logger: Default::default(),
method: Default::default(),
no_servers: Default::default(),
request_bytes: Default::default(),
request_ulid: Default::default(),
response_bytes: Default::default(),
response_from_backup_rpc: Default::default(),
response_millis: Default::default(),
response_timestamp: Default::default(),
start_instant: Instant::now(),
stat_sender: Default::default(),
}
}
}
#[derive(From)]
pub enum RequestOrMethod<'a> {
Request(&'a JsonRpcRequest),
/// jsonrpc method (or similar label) and the size that the request should count as (sometimes 0)
Method(&'a str, usize),
RequestSize(usize),
}
impl<'a> RequestOrMethod<'a> {
fn method(&self) -> Option<&str> {
match self {
Self::Request(x) => Some(&x.method),
Self::Method(x, _) => Some(x),
_ => None,
}
}
fn jsonrpc_request(&self) -> Option<&JsonRpcRequest> {
match self {
Self::Request(x) => Some(x),
_ => None,
}
}
fn num_bytes(&self) -> usize {
match self {
RequestOrMethod::Method(_, num_bytes) => *num_bytes,
RequestOrMethod::Request(x) => x.num_bytes(),
RequestOrMethod::RequestSize(num_bytes) => *num_bytes,
}
}
}
impl<'a> From<&'a str> for RequestOrMethod<'a> {
fn from(value: &'a str) -> Self {
if value.is_empty() {
Self::RequestSize(0)
} else {
Self::Method(value, 0)
}
}
}
// TODO: i think a trait is actually the right thing to use here
#[derive(From)]
pub enum ResponseOrBytes<'a> {
Json(&'a serde_json::Value),
Response(&'a JsonRpcForwardedResponse),
Bytes(usize),
}
impl<'a> From<u64> for ResponseOrBytes<'a> {
fn from(value: u64) -> Self {
Self::Bytes(value as usize)
}
}
impl ResponseOrBytes<'_> {
pub fn num_bytes(&self) -> usize {
match self {
Self::Json(x) => serde_json::to_string(x)
.expect("this should always serialize")
.len(),
Self::Response(x) => x.num_bytes(),
Self::Bytes(num_bytes) => *num_bytes,
}
}
}
impl RequestMetadata {
pub fn new(request_bytes: usize) -> Self {
// TODO: how can we do this without turning it into a string first. this is going to slow us down!
let request_bytes = request_bytes as u64;
pub async fn new<'a, R: Into<RequestOrMethod<'a>>>(
app: &Web3ProxyApp,
authorization: Arc<Authorization>,
request: R,
head_block_num: Option<&U64>,
) -> Arc<Self> {
let request = request.into();
Self {
start_instant: Instant::now(),
request_bytes,
let method = request.method().map(|x| x.to_string());
let request_bytes = request.num_bytes();
// TODO: modify the request here? I don't really like that very much. but its a sure way to get archive_request set correctly
// TODO: add the Ulid at the haproxy or amazon load balancer level? investigate OpenTelemetry
let request_ulid = Ulid::new();
let kafka_debug_logger = if matches!(authorization.checks.proxy_mode, ProxyMode::Debug) {
KafkaDebugLogger::try_new(
app,
authorization.clone(),
head_block_num,
"web3_proxy:rpc",
request_ulid,
)
} else {
None
};
if let Some(ref kafka_debug_logger) = kafka_debug_logger {
if let Some(request) = request.jsonrpc_request() {
// TODO: channels might be more ergonomic than spawned futures
// spawned things run in parallel easier but generally need more Arcs
kafka_debug_logger.log_debug_request(request);
} else {
// there probably isn't a new request attached to this metadata.
// this happens with websocket subscriptions
}
}
let x = Self {
archive_request: false.into(),
backend_requests: Default::default(),
no_servers: 0.into(),
error_response: false.into(),
kafka_debug_logger,
no_servers: 0.into(),
authorization: Some(authorization),
request_bytes,
method,
response_bytes: 0.into(),
response_millis: 0.into(),
response_from_backup_rpc: false.into(),
response_millis: 0.into(),
request_ulid,
response_timestamp: 0.into(),
start_instant: Instant::now(),
stat_sender: app.stat_sender.clone(),
};
Arc::new(x)
}
pub fn backend_rpcs_used(&self) -> Vec<Arc<Web3Rpc>> {
self.backend_requests.lock().clone()
}
pub fn tracking_level(&self) -> TrackingLevel {
if let Some(authorization) = self.authorization.as_ref() {
authorization.checks.tracking_level.clone()
} else {
TrackingLevel::None
}
}
pub fn opt_in_method(&self) -> Option<String> {
match self.tracking_level() {
TrackingLevel::None | TrackingLevel::Aggregated => None,
TrackingLevel::Detailed => self.method.clone(),
}
}
pub fn take_opt_in_method(&mut self) -> Option<String> {
match self.tracking_level() {
TrackingLevel::None | TrackingLevel::Aggregated => None,
TrackingLevel::Detailed => self.method.take(),
}
}
pub fn try_send_stat(mut self) -> Web3ProxyResult<Option<Self>> {
if let Some(stat_sender) = self.stat_sender.take() {
trace!("sending stat! {:?}", self);
let stat: RpcQueryStats = self.try_into()?;
let stat: AppStat = stat.into();
if let Err(err) = stat_sender.send(stat) {
error!("failed sending stats for {:?}: {:?}", err.0, err);
// TODO: return it? that seems like it might cause an infinite loop
};
Ok(None)
} else {
Ok(Some(self))
}
}
pub fn add_response<'a, R: Into<ResponseOrBytes<'a>>>(&'a self, response: R) {
// TODO: fetch? set? should it be None in a Mutex? or a OnceCell?
let response = response.into();
let num_bytes = response.num_bytes() as u64;
self.response_bytes
.fetch_add(num_bytes, atomic::Ordering::AcqRel);
self.response_millis.fetch_add(
self.start_instant.elapsed().as_millis() as u64,
atomic::Ordering::AcqRel,
);
// TODO: record first or last timestamp? really, we need multiple
self.response_timestamp
.store(Utc::now().timestamp(), atomic::Ordering::Release);
if let Some(kafka_debug_logger) = self.kafka_debug_logger.as_ref() {
if let ResponseOrBytes::Response(response) = response {
kafka_debug_logger.log_debug_response(response);
}
}
}
pub fn try_send_arc_stat(self: Arc<Self>) -> anyhow::Result<Option<Arc<Self>>> {
match Arc::try_unwrap(self) {
Ok(x) => {
let not_sent = x.try_send_stat()?.map(Arc::new);
Ok(not_sent)
}
Err(not_sent) => {
trace!(
"could not send stat while {} arcs are active",
Arc::strong_count(&not_sent)
);
Ok(Some(not_sent))
}
}
}
// TODO: helper function to duplicate? needs to clear request_bytes, and all the atomics tho...
}
// TODO: is this where the panic comes from?
impl Drop for RequestMetadata {
fn drop(&mut self) {
if self.stat_sender.is_some() {
// turn `&mut self` into `self`
let x = mem::take(self);
// warn!("request metadata dropped without stat send! {:?}", self);
let _ = x.try_send_stat();
}
}
}
@ -445,11 +865,11 @@ pub async fn key_is_authorized(
impl Web3ProxyApp {
/// Limit the number of concurrent requests from the given ip address.
pub async fn ip_semaphore(&self, ip: IpAddr) -> Web3ProxyResult<Option<OwnedSemaphorePermit>> {
pub async fn ip_semaphore(&self, ip: &IpAddr) -> Web3ProxyResult<Option<OwnedSemaphorePermit>> {
if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests {
let semaphore = self
.ip_semaphores
.get_with(ip, async move {
.get_with_by_ref(ip, async move {
// TODO: set max_concurrent_requests dynamically based on load?
let s = Semaphore::new(max_concurrent_requests);
Arc::new(s)
@ -516,7 +936,7 @@ impl Web3ProxyApp {
// limit concurrent requests
let semaphore = self
.bearer_token_semaphores
.get_with(user_bearer_token.clone(), async move {
.get_with_by_ref(&user_bearer_token, async move {
let s = Semaphore::new(self.config.bearer_token_max_concurrent_requests as usize);
Arc::new(s)
})
@ -623,7 +1043,7 @@ impl Web3ProxyApp {
{
Ok(DeferredRateLimitResult::Allowed) => {
// rate limit allowed us. check concurrent request limits
let semaphore = self.ip_semaphore(ip).await?;
let semaphore = self.ip_semaphore(&ip).await?;
Ok(RateLimitResult::Allowed(authorization, semaphore))
}
@ -643,14 +1063,14 @@ impl Web3ProxyApp {
error!("rate limiter is unhappy. allowing ip. err={:?}", err);
// at least we can still check the semaphore
let semaphore = self.ip_semaphore(ip).await?;
let semaphore = self.ip_semaphore(&ip).await?;
Ok(RateLimitResult::Allowed(authorization, semaphore))
}
}
} else {
// no redis, but we can still check the ip semaphore
let semaphore = self.ip_semaphore(ip).await?;
let semaphore = self.ip_semaphore(&ip).await?;
// TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right
Ok(RateLimitResult::Allowed(authorization, semaphore))

@ -25,6 +25,12 @@ pub type Web3ProxyResult<T> = Result<T, Web3ProxyError>;
// TODO: take "IntoResponse" instead of Response?
pub type Web3ProxyResponse = Web3ProxyResult<Response>;
impl From<Web3ProxyError> for Web3ProxyResult<()> {
fn from(value: Web3ProxyError) -> Self {
Err(value)
}
}
// TODO:
#[derive(Debug, Display, Error, From)]
pub enum Web3ProxyError {
@ -35,6 +41,9 @@ pub enum Web3ProxyError {
#[error(ignore)]
#[from(ignore)]
BadRequest(String),
#[error(ignore)]
#[from(ignore)]
BadResponse(String),
BadRouting,
Database(DbErr),
#[display(fmt = "{:#?}, {:#?}", _0, _1)]
@ -168,6 +177,18 @@ impl Web3ProxyError {
),
)
}
Self::BadResponse(err) => {
// TODO: think about this one more. ankr gives us this because ethers fails to parse responses without an id
debug!("BAD_RESPONSE: {}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_str(
&format!("bad response: {}", err),
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
None,
),
)
}
Self::BadRouting => {
error!("BadRouting");
(

@ -17,6 +17,7 @@ use axum::{
Extension, Router,
};
use http::header::AUTHORIZATION;
use listenfd::ListenFd;
use log::info;
use moka::future::Cache;
use std::net::SocketAddr;
@ -45,6 +46,7 @@ pub async fn serve(
) -> anyhow::Result<()> {
// setup caches for whatever the frontend needs
// no need for max items since it is limited by the enum key
// TODO: latest moka allows for different ttls for different
let json_response_cache: FrontendJsonResponseCache = Cache::builder()
.time_to_live(Duration::from_secs(2))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
@ -62,102 +64,77 @@ pub async fn serve(
//
// HTTP RPC (POST)
//
// Websocket RPC (GET)
// If not an RPC, GET will redirect to urls in the config
//
// public
.route("/", post(rpc_proxy_http::proxy_web3_rpc))
.route(
"/",
post(rpc_proxy_http::proxy_web3_rpc).get(rpc_proxy_ws::websocket_handler),
)
// authenticated with and without trailing slash
.route(
"/rpc/:rpc_key/",
post(rpc_proxy_http::proxy_web3_rpc_with_key),
post(rpc_proxy_http::proxy_web3_rpc_with_key)
.get(rpc_proxy_ws::websocket_handler_with_key),
)
.route(
"/rpc/:rpc_key",
post(rpc_proxy_http::proxy_web3_rpc_with_key),
post(rpc_proxy_http::proxy_web3_rpc_with_key)
.get(rpc_proxy_ws::websocket_handler_with_key),
)
// authenticated debug route with and without trailing slash
.route(
"/debug/:rpc_key/",
post(rpc_proxy_http::debug_proxy_web3_rpc_with_key),
post(rpc_proxy_http::debug_proxy_web3_rpc_with_key)
.get(rpc_proxy_ws::debug_websocket_handler_with_key),
)
.route(
"/debug/:rpc_key",
post(rpc_proxy_http::debug_proxy_web3_rpc_with_key),
post(rpc_proxy_http::debug_proxy_web3_rpc_with_key)
.get(rpc_proxy_ws::debug_websocket_handler_with_key),
)
// public fastest with and without trailing slash
.route("/fastest/", post(rpc_proxy_http::fastest_proxy_web3_rpc))
.route("/fastest", post(rpc_proxy_http::fastest_proxy_web3_rpc))
.route(
"/fastest/",
post(rpc_proxy_http::fastest_proxy_web3_rpc)
.get(rpc_proxy_ws::fastest_websocket_handler),
)
.route(
"/fastest",
post(rpc_proxy_http::fastest_proxy_web3_rpc)
.get(rpc_proxy_ws::fastest_websocket_handler),
)
// authenticated fastest with and without trailing slash
.route(
"/fastest/:rpc_key/",
post(rpc_proxy_http::fastest_proxy_web3_rpc_with_key),
post(rpc_proxy_http::fastest_proxy_web3_rpc_with_key)
.get(rpc_proxy_ws::fastest_websocket_handler_with_key),
)
.route(
"/fastest/:rpc_key",
post(rpc_proxy_http::fastest_proxy_web3_rpc_with_key),
)
// public versus
.route("/versus/", post(rpc_proxy_http::versus_proxy_web3_rpc))
.route("/versus", post(rpc_proxy_http::versus_proxy_web3_rpc))
// authenticated versus with and without trailing slash
.route(
"/versus/:rpc_key/",
post(rpc_proxy_http::versus_proxy_web3_rpc_with_key),
)
.route(
"/versus/:rpc_key",
post(rpc_proxy_http::versus_proxy_web3_rpc_with_key),
)
//
// Websocket RPC (GET)
// If not an RPC, this will redirect to configurable urls
//
// public
.route("/", get(rpc_proxy_ws::websocket_handler))
// authenticated with and without trailing slash
.route(
"/rpc/:rpc_key/",
get(rpc_proxy_ws::websocket_handler_with_key),
)
.route(
"/rpc/:rpc_key",
get(rpc_proxy_ws::websocket_handler_with_key),
)
// debug with and without trailing slash
.route(
"/debug/:rpc_key/",
get(rpc_proxy_ws::websocket_handler_with_key),
)
.route(
"/debug/:rpc_key",
get(rpc_proxy_ws::websocket_handler_with_key),
) // public fastest with and without trailing slash
.route("/fastest/", get(rpc_proxy_ws::fastest_websocket_handler))
.route("/fastest", get(rpc_proxy_ws::fastest_websocket_handler))
// authenticated fastest with and without trailing slash
.route(
"/fastest/:rpc_key/",
get(rpc_proxy_ws::fastest_websocket_handler_with_key),
)
.route(
"/fastest/:rpc_key",
get(rpc_proxy_ws::fastest_websocket_handler_with_key),
post(rpc_proxy_http::fastest_proxy_web3_rpc_with_key)
.get(rpc_proxy_ws::fastest_websocket_handler_with_key),
)
// public versus
.route(
"/versus/",
get(rpc_proxy_ws::versus_websocket_handler_with_key),
post(rpc_proxy_http::versus_proxy_web3_rpc).get(rpc_proxy_ws::versus_websocket_handler),
)
.route(
"/versus",
get(rpc_proxy_ws::versus_websocket_handler_with_key),
post(rpc_proxy_http::versus_proxy_web3_rpc).get(rpc_proxy_ws::versus_websocket_handler),
)
// authenticated versus with and without trailing slash
.route(
"/versus/:rpc_key/",
get(rpc_proxy_ws::versus_websocket_handler_with_key),
post(rpc_proxy_http::versus_proxy_web3_rpc_with_key)
.get(rpc_proxy_ws::versus_websocket_handler_with_key),
)
.route(
"/versus/:rpc_key",
get(rpc_proxy_ws::versus_websocket_handler_with_key),
post(rpc_proxy_http::versus_proxy_web3_rpc_with_key)
.get(rpc_proxy_ws::versus_websocket_handler_with_key),
)
//
// System things
@ -241,19 +218,29 @@ pub async fn serve(
// handle cors
.layer(CorsLayer::very_permissive())
// application state
.layer(Extension(proxy_app.clone()))
.layer(Extension(proxy_app))
// frontend caches
.layer(Extension(json_response_cache))
.layer(Extension(health_cache))
// 404 for any unknown routes
.fallback(errors::handler_404);
// run our app with hyper
// TODO: allow only listening on localhost? top_config.app.host.parse()?
let addr = SocketAddr::from(([0, 0, 0, 0], port));
info!("listening on port {}", port);
let server_builder = if let Some(listener) = ListenFd::from_env().take_tcp_listener(0)? {
// use systemd socket magic for no downtime deploys
let addr = listener.local_addr()?;
// TODO: into_make_service is enough if we always run behind a proxy. make into_make_service_with_connect_info optional?
info!("listening with fd at {}", addr);
axum::Server::from_tcp(listener)?
} else {
info!("listening on port {}", port);
// TODO: allow only listening on localhost? top_config.app.host.parse()?
let addr = SocketAddr::from(([0, 0, 0, 0], port));
axum::Server::try_bind(&addr)?
};
// into_make_service is enough if we always run behind a proxy
/*
It sequentially looks for an IP in:
- x-forwarded-for header (de-facto standard)
@ -261,12 +248,21 @@ pub async fn serve(
- forwarded header (new standard)
- axum::extract::ConnectInfo (if not behind proxy)
*/
let service = app.into_make_service_with_connect_info::<SocketAddr>();
#[cfg(feature = "connectinfo")]
let make_service = {
info!("connectinfo feature enabled");
app.into_make_service_with_connect_info::<SocketAddr>()
};
// `axum::Server` is a re-export of `hyper::Server`
let server = axum::Server::bind(&addr)
#[cfg(not(feature = "connectinfo"))]
let make_service = {
info!("connectinfo feature disabled");
app.into_make_service()
};
let server = server_builder
.serve(make_service)
// TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not
.serve(service)
.with_graceful_shutdown(async move {
let _ = shutdown_receiver.recv().await;
})

@ -63,12 +63,12 @@ async fn _proxy_web3_rpc(
let authorization = Arc::new(authorization);
let (response, rpcs, _semaphore) = app
let (status_code, response, rpcs, _semaphore) = app
.proxy_web3_rpc(authorization, payload)
.await
.map(|(x, y)| (x, y, semaphore))?;
.map(|(s, x, y)| (s, x, y, semaphore))?;
let mut response = Json(&response).into_response();
let mut response = (status_code, Json(response)).into_response();
let headers = response.headers_mut();
@ -129,6 +129,8 @@ pub async fn proxy_web3_rpc_with_key(
.await
}
// TODO: if a /debug/ request gets rejected by an invalid request, there won't be any kafka log
// TODO:
#[debug_handler]
pub async fn debug_proxy_web3_rpc_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
@ -228,12 +230,12 @@ async fn _proxy_web3_rpc_with_key(
let rpc_secret_key_id = authorization.checks.rpc_secret_key_id;
let (response, rpcs, _semaphore) = app
let (status_code, response, rpcs, _semaphore) = app
.proxy_web3_rpc(authorization, payload)
.await
.map(|(x, y)| (x, y, semaphore))?;
.map(|(s, x, y)| (s, x, y, semaphore))?;
let mut response = Json(&response).into_response();
let mut response = (status_code, Json(response)).into_response();
let headers = response.headers_mut();

@ -5,12 +5,12 @@
use super::authorization::{ip_is_authorized, key_is_authorized, Authorization, RequestMetadata};
use super::errors::{Web3ProxyError, Web3ProxyResponse};
use crate::jsonrpc::JsonRpcId;
use crate::stats::RpcQueryStats;
use crate::{
app::Web3ProxyApp,
frontend::errors::Web3ProxyResult,
jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest},
};
use anyhow::Context;
use axum::headers::{Origin, Referer, UserAgent};
use axum::{
extract::ws::{Message, WebSocket, WebSocketUpgrade},
@ -20,6 +20,8 @@ use axum::{
};
use axum_client_ip::InsecureClientIp;
use axum_macros::debug_handler;
use ethers::types::Bytes;
use fstrings::{f, format_args_f};
use futures::SinkExt;
use futures::{
future::AbortHandle,
@ -28,12 +30,13 @@ use futures::{
use handlebars::Handlebars;
use hashbrown::HashMap;
use http::StatusCode;
use log::{info, trace, warn};
use log::{info, trace};
use serde_json::json;
use std::sync::Arc;
use std::{str::from_utf8_mut, sync::atomic::AtomicUsize};
use tokio::sync::{broadcast, OwnedSemaphorePermit, RwLock};
/// How to select backend servers for a request
#[derive(Copy, Clone, Debug)]
pub enum ProxyMode {
/// send to the "best" synced server
@ -43,6 +46,7 @@ pub enum ProxyMode {
/// send to all servers for benchmarking. return the fastest non-error response
Versus,
/// send all requests and responses to kafka
/// TODO: should this be seperate from best/fastest/versus?
Debug,
}
@ -314,14 +318,15 @@ async fn proxy_web3_socket(
}
/// websockets support a few more methods than http clients
/// TODO: i think this subscriptions hashmap grows unbounded
async fn handle_socket_payload(
app: Arc<Web3ProxyApp>,
authorization: &Arc<Authorization>,
payload: &str,
response_sender: &flume::Sender<Message>,
subscription_count: &AtomicUsize,
subscriptions: Arc<RwLock<HashMap<String, AbortHandle>>>,
) -> (Message, Option<OwnedSemaphorePermit>) {
subscriptions: Arc<RwLock<HashMap<Bytes, AbortHandle>>>,
) -> Web3ProxyResult<(Message, Option<OwnedSemaphorePermit>)> {
let (authorization, semaphore) = match authorization.check_again(&app).await {
Ok((a, s)) => (a, s),
Err(err) => {
@ -329,7 +334,7 @@ async fn handle_socket_payload(
let err = serde_json::to_string(&err).expect("to_string should always work here");
return (Message::Text(err), None);
return Ok((Message::Text(err), None));
}
};
@ -338,84 +343,90 @@ async fn handle_socket_payload(
Ok(json_request) => {
let id = json_request.id.clone();
let response: Web3ProxyResult<JsonRpcForwardedResponseEnum> = match &json_request.method
[..]
{
"eth_subscribe" => {
// TODO: how can we subscribe with proxy_mode?
match app
.eth_subscribe(
authorization.clone(),
json_request,
subscription_count,
response_sender.clone(),
)
// TODO: move this to a seperate function so we can use the try operator
let response: Web3ProxyResult<JsonRpcForwardedResponseEnum> =
match &json_request.method[..] {
"eth_subscribe" => {
// TODO: how can we subscribe with proxy_mode?
match app
.eth_subscribe(
authorization.clone(),
json_request,
subscription_count,
response_sender.clone(),
)
.await
{
Ok((handle, response)) => {
{
let mut x = subscriptions.write().await;
let result: &serde_json::value::RawValue = response
.result
.as_ref()
.context("there should be a result here")?;
// TODO: there must be a better way to do this
let k: Bytes = serde_json::from_str(result.get())
.context("subscription ids must be bytes")?;
x.insert(k, handle);
};
Ok(response.into())
}
Err(err) => Err(err),
}
}
"eth_unsubscribe" => {
let request_metadata =
RequestMetadata::new(&app, authorization.clone(), &json_request, None)
.await;
#[derive(serde::Deserialize)]
struct EthUnsubscribeParams([Bytes; 1]);
if let Some(params) = json_request.params {
match serde_json::from_value(params) {
Ok::<EthUnsubscribeParams, _>(params) => {
let subscription_id = &params.0[0];
// TODO: is this the right response?
let partial_response = {
let mut x = subscriptions.write().await;
match x.remove(subscription_id) {
None => false,
Some(handle) => {
handle.abort();
true
}
}
};
let response = JsonRpcForwardedResponse::from_value(
json!(partial_response),
id.clone(),
);
request_metadata.add_response(&response);
Ok(response.into())
}
Err(err) => Err(Web3ProxyError::BadRequest(f!(
"incorrect params given for eth_unsubscribe. {err:?}"
))),
}
} else {
Err(Web3ProxyError::BadRequest(
"no params given for eth_unsubscribe".to_string(),
))
}
}
_ => app
.proxy_web3_rpc(authorization.clone(), json_request.into())
.await
{
Ok((handle, response)) => {
// TODO: better key
let mut x = subscriptions.write().await;
x.insert(
response
.result
.as_ref()
// TODO: what if there is an error?
.expect("response should always have a result, not an error")
.to_string(),
handle,
);
Ok(response.into())
}
Err(err) => Err(err),
}
}
"eth_unsubscribe" => {
// TODO: move this logic into the app?
let request_bytes = json_request.num_bytes();
let request_metadata = Arc::new(RequestMetadata::new(request_bytes));
let subscription_id = json_request.params.unwrap().to_string();
let mut x = subscriptions.write().await;
// TODO: is this the right response?
let partial_response = match x.remove(&subscription_id) {
None => false,
Some(handle) => {
handle.abort();
true
}
};
drop(x);
let response =
JsonRpcForwardedResponse::from_value(json!(partial_response), id.clone());
if let Some(stat_sender) = app.stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
Some(json_request.method.clone()),
authorization.clone(),
request_metadata,
response.num_bytes(),
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!("stat_sender failed during eth_unsubscribe: {:?}", err);
}
}
Ok(response.into())
}
_ => app
.proxy_web3_rpc(authorization.clone(), json_request.into())
.await
.map(|(response, _)| response),
};
.map(|(status_code, response, _)| response),
};
(id, response)
}
@ -434,7 +445,7 @@ async fn handle_socket_payload(
}
};
(Message::Text(response_str), semaphore)
Ok((Message::Text(response_str), semaphore))
}
async fn read_web3_socket(
@ -443,7 +454,7 @@ async fn read_web3_socket(
mut ws_rx: SplitStream<WebSocket>,
response_sender: flume::Sender<Message>,
) {
// TODO: need a concurrent hashmap
// RwLock should be fine here. a user isn't going to be opening tons of subscriptions
let subscriptions = Arc::new(RwLock::new(HashMap::new()));
let subscription_count = Arc::new(AtomicUsize::new(1));
@ -467,16 +478,17 @@ async fn read_web3_socket(
// new message from our client. forward to a backend and then send it through response_tx
let response_msg = match msg {
Message::Text(payload) => {
Message::Text(ref payload) => {
// TODO: do not unwrap!
let (msg, s) = handle_socket_payload(
app.clone(),
&authorization,
&payload,
payload,
&response_sender,
&subscription_count,
subscriptions,
)
.await;
.await.unwrap();
_semaphore = s;
@ -499,6 +511,7 @@ async fn read_web3_socket(
Message::Binary(mut payload) => {
let payload = from_utf8_mut(&mut payload).unwrap();
// TODO: do not unwrap!
let (msg, s) = handle_socket_payload(
app.clone(),
&authorization,
@ -507,7 +520,7 @@ async fn read_web3_socket(
&subscription_count,
subscriptions,
)
.await;
.await.unwrap();
_semaphore = s;

@ -30,9 +30,13 @@ pub async fn health(
/// Easy alerting if backup servers are in use.
pub async fn backups_needed(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
let code = {
let consensus_rpcs = app.balanced_rpcs.watch_consensus_rpcs_sender.borrow();
let consensus_rpcs = app
.balanced_rpcs
.watch_consensus_rpcs_sender
.borrow()
.clone();
if let Some(consensus_rpcs) = consensus_rpcs.as_ref() {
if let Some(ref consensus_rpcs) = consensus_rpcs {
if consensus_rpcs.backups_needed {
StatusCode::INTERNAL_SERVER_ERROR
} else {

@ -284,7 +284,7 @@ pub async fn user_login_post(
let rpc_secret_key = RpcSecretKey::new();
let user_rpc_key = rpc_key::ActiveModel {
user_id: sea_orm::Set(caller.id.clone()),
user_id: sea_orm::Set(caller.id),
secret_key: sea_orm::Set(rpc_secret_key.into()),
description: sea_orm::Set(None),
..Default::default()
@ -297,7 +297,7 @@ pub async fn user_login_post(
// We should also create the balance entry ...
let user_balance = balance::ActiveModel {
user_id: sea_orm::Set(caller.id.clone()),
user_id: sea_orm::Set(caller.id),
available_balance: sea_orm::Set(Decimal::new(0, 0)),
used_balance: sea_orm::Set(Decimal::new(0, 0)),
..Default::default()

@ -321,7 +321,7 @@ pub async fn user_balance_post(
}
// Get the topics out
let topic: H256 = H256::from(log.topics.get(0).unwrap().to_owned());
let topic: H256 = log.topics.get(0).unwrap().to_owned();
if topic != deposit_topic {
debug!(
"Out: Topic is not relevant: {:?} {:?}",
@ -489,8 +489,9 @@ pub async fn user_balance_post(
})),
)
.into_response();
// Return early if the log was added, assume there is at most one valid log per transaction
return Ok(response.into());
return Ok(response);
}
Err(Web3ProxyError::BadRequest(

@ -19,7 +19,6 @@ use migration::sea_orm::ActiveModelTrait;
use migration::sea_orm::ColumnTrait;
use migration::sea_orm::EntityTrait;
use migration::sea_orm::QueryFilter;
use migration::sea_orm::TransactionTrait;
use serde_json::json;
use std::sync::Arc;
@ -49,7 +48,7 @@ pub async fn user_referral_link_get(
warn!("User tier is: {:?}", user_tier);
// TODO: This shouldn't be hardcoded. Also, it should be an enum, not sth like this ...
if user_tier.id != 6 {
return Err(Web3ProxyError::PaymentRequired.into());
return Err(Web3ProxyError::PaymentRequired);
}
// Then get the referral token

@ -49,7 +49,7 @@ pub async fn get_keys_as_subuser(
.all(db_replica.conn())
.await?
.into_iter()
.map(|x| (x.rpc_secret_key_id.clone(), x))
.map(|x| (x.rpc_secret_key_id, x))
.collect::<HashMap<u64, secondary_user::Model>>();
// Now return a list of all subusers (their wallets)
@ -147,7 +147,7 @@ pub async fn get_subusers(
.all(db_replica.conn())
.await?
.into_iter()
.map(|x| (x.user_id.clone(), x))
.map(|x| (x.user_id, x))
.collect::<HashMap<u64, secondary_user::Model>>();
// Now return a list of all subusers (their wallets)
@ -314,7 +314,7 @@ pub async fn modify_subuser(
let rpc_secret_key = RpcSecretKey::new();
let subuser_rpc_key = rpc_key::ActiveModel {
user_id: sea_orm::Set(subuser.id.clone()),
user_id: sea_orm::Set(subuser.id),
secret_key: sea_orm::Set(rpc_secret_key.into()),
description: sea_orm::Set(None),
..Default::default()
@ -327,7 +327,7 @@ pub async fn modify_subuser(
// We should also create the balance entry ...
let subuser_balance = balance::ActiveModel {
user_id: sea_orm::Set(subuser.id.clone()),
user_id: sea_orm::Set(subuser.id),
available_balance: sea_orm::Set(Decimal::new(0, 0)),
used_balance: sea_orm::Set(Decimal::new(0, 0)),
..Default::default()
@ -374,7 +374,8 @@ pub async fn modify_subuser(
let txn = db_conn.begin().await?;
let mut action = "no action";
let _ = match subuser_entry_secondary_user {
match subuser_entry_secondary_user {
Some(secondary_user) => {
// In this case, remove the subuser
let mut active_subuser_entry_secondary_user = secondary_user.into_active_model();
@ -421,6 +422,7 @@ pub async fn modify_subuser(
})),
)
.into_response();
// Return early if the log was added, assume there is at most one valid log per transaction
Ok(response.into())
Ok(response)
}

@ -290,9 +290,12 @@ impl JsonRpcForwardedResponse {
data = err.data.clone();
} else if let Some(err) = err.as_serde_error() {
// this is not an rpc error. keep it as an error
return Err(Web3ProxyError::BadRequest(format!("bad request: {}", err)));
return Err(Web3ProxyError::BadResponse(format!(
"bad response: {}",
err
)));
} else {
return Err(anyhow::anyhow!("unexpected ethers error!").into());
return Err(anyhow::anyhow!("unexpected ethers error! {:?}", err).into());
}
}
e => return Err(e.into()),

@ -169,7 +169,6 @@ impl Web3Rpcs {
// TODO: i think we can rearrange this function to make it faster on the hot path
let block_hash = block.hash();
// skip Block::default()
if block_hash.is_zero() {
debug!("Skipping block without hash!");
return Ok(block);
@ -189,7 +188,7 @@ impl Web3Rpcs {
// TODO: use their get_with
let block = self
.blocks_by_hash
.get_with(*block_hash, async move { block.clone() })
.get_with(*block_hash, async move { block })
.await;
Ok(block)
@ -236,7 +235,7 @@ impl Web3Rpcs {
None => {
// TODO: helper for method+params => JsonRpcRequest
// TODO: does this id matter?
let request = json!({ "id": "1", "method": "eth_getBlockByHash", "params": get_block_params });
let request = json!({ "jsonrpc": "2.0", "id": "1", "method": "eth_getBlockByHash", "params": get_block_params });
let request: JsonRpcRequest = serde_json::from_value(request)?;
// TODO: request_metadata? maybe we should put it in the authorization?
@ -244,7 +243,7 @@ impl Web3Rpcs {
let response = self
.try_send_best_consensus_head_connection(
authorization,
request,
&request,
None,
None,
None,
@ -344,7 +343,7 @@ impl Web3Rpcs {
let request: JsonRpcRequest = serde_json::from_value(request)?;
let response = self
.try_send_best_consensus_head_connection(authorization, request, None, Some(num), None)
.try_send_best_consensus_head_connection(authorization, &request, None, Some(num), None)
.await?;
if response.error.is_some() {
@ -446,7 +445,7 @@ impl Web3Rpcs {
let consensus_head_block = new_synced_connections.head_block.clone();
let num_consensus_rpcs = new_synced_connections.num_conns();
let num_active_rpcs = consensus_finder.len();
let total_rpcs = self.by_name.read().len();
let total_rpcs = self.by_name.load().len();
let old_consensus_head_connections = self
.watch_consensus_rpcs_sender

@ -3,6 +3,7 @@ use super::many::Web3Rpcs;
use super::one::Web3Rpc;
use crate::frontend::authorization::Authorization;
use crate::frontend::errors::{Web3ProxyErrorContext, Web3ProxyResult};
use derive_more::Constructor;
use ethers::prelude::{H256, U64};
use hashbrown::{HashMap, HashSet};
use itertools::{Itertools, MinMaxResult};
@ -10,28 +11,146 @@ use log::{trace, warn};
use moka::future::Cache;
use serde::Serialize;
use std::cmp::Reverse;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::Arc;
use tokio::time::Instant;
#[derive(Clone, Serialize)]
struct RpcData {
head_block_num: U64,
// TODO: this is too simple. erigon has 4 prune levels (hrct)
oldest_block_num: U64,
}
impl RpcData {
fn new(rpc: &Web3Rpc, head: &Web3ProxyBlock) -> Self {
let head_block_num = *head.number();
let block_data_limit = rpc.block_data_limit();
let oldest_block_num = head_block_num.saturating_sub(block_data_limit);
Self {
head_block_num,
oldest_block_num,
}
}
// TODO: take an enum for the type of data (hrtc)
fn data_available(&self, block_num: &U64) -> bool {
*block_num >= self.oldest_block_num && *block_num <= self.head_block_num
}
}
#[derive(Constructor, Clone, Copy, Debug, Default, Eq, PartialEq, Serialize)]
pub struct RpcRanking {
tier: u64,
backup: bool,
head_num: Option<U64>,
}
impl RpcRanking {
pub fn add_offset(&self, offset: u64) -> Self {
Self {
tier: self.tier + offset,
backup: self.backup,
head_num: self.head_num,
}
}
pub fn default_with_backup(backup: bool) -> Self {
Self {
backup,
..Default::default()
}
}
fn sort_key(&self) -> (u64, bool, Reverse<Option<U64>>) {
// TODO: add soft_limit here? add peak_ewma here?
(self.tier, !self.backup, Reverse(self.head_num))
}
}
impl Ord for RpcRanking {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.sort_key().cmp(&other.sort_key())
}
}
impl PartialOrd for RpcRanking {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
pub type RankedRpcMap = BTreeMap<RpcRanking, Vec<Arc<Web3Rpc>>>;
/// A collection of Web3Rpcs that are on the same block.
/// Serialize is so we can print it on our debug endpoint
#[derive(Clone, Serialize)]
pub struct ConsensusWeb3Rpcs {
pub(crate) tier: u64,
pub(crate) backups_needed: bool,
pub(crate) head_block: Web3ProxyBlock,
pub(crate) best_rpcs: Vec<Arc<Web3Rpc>>,
// TODO: functions like "compare_backup_vote()"
// pub(super) backups_voted: Option<Web3ProxyBlock>,
pub(crate) backups_needed: bool,
pub(crate) other_rpcs: RankedRpcMap,
rpc_data: HashMap<Arc<Web3Rpc>, RpcData>,
}
impl ConsensusWeb3Rpcs {
#[inline(always)]
#[inline]
pub fn num_conns(&self) -> usize {
self.best_rpcs.len()
}
pub fn has_block_data(&self, rpc: &Web3Rpc, block_num: &U64) -> bool {
self.rpc_data
.get(rpc)
.map(|x| x.data_available(block_num))
.unwrap_or(false)
}
pub fn filter(
&self,
skip: &[Arc<Web3Rpc>],
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
rpc: &Arc<Web3Rpc>,
) -> bool {
if skip.contains(rpc) {
trace!("skipping {}", rpc);
return false;
}
if let Some(min_block_needed) = min_block_needed {
if !self.has_block_data(rpc, min_block_needed) {
trace!(
"{} is missing min_block_needed ({}). skipping",
rpc,
min_block_needed,
);
return false;
}
}
if let Some(max_block_needed) = max_block_needed {
if !self.has_block_data(rpc, max_block_needed) {
trace!(
"{} is missing max_block_needed ({}). skipping",
rpc,
max_block_needed,
);
return false;
}
}
// we could check hard rate limits here, but i think it is faster to do later
true
}
// TODO: sum_hard_limit?
}
@ -46,6 +165,7 @@ impl fmt::Debug for ConsensusWeb3Rpcs {
}
}
// TODO: refs for all of these. borrow on a Sender is cheap enough
impl Web3Rpcs {
// TODO: return a ref?
pub fn head_block(&self) -> Option<Web3ProxyBlock> {
@ -93,7 +213,6 @@ pub struct ConsensusFinder {
/// `tiers[0] = only tier 0`
/// `tiers[1] = tier 0 and tier 1`
/// `tiers[n] = tier 0..=n`
/// This is a BTreeMap and not a Vec because sometimes a tier is empty
rpc_heads: HashMap<Arc<Web3Rpc>, Web3ProxyBlock>,
/// never serve blocks that are too old
max_block_age: Option<u64>,
@ -137,7 +256,7 @@ impl ConsensusFinder {
async fn insert(&mut self, rpc: Arc<Web3Rpc>, block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
let first_seen = self
.first_seen
.get_with(*block.hash(), async move { Instant::now() })
.get_with_by_ref(block.hash(), async move { Instant::now() })
.await;
// TODO: this should be 0 if we are first seen, but i think it will be slightly non-zero.
@ -166,13 +285,6 @@ impl ConsensusFinder {
.await
.web3_context("failed caching block")?;
// if let Some(max_block_lag) = max_block_lag {
// if rpc_head_block.number() < ??? {
// trace!("rpc_head_block from {} is too far behind! {}", rpc, rpc_head_block);
// return Ok(self.remove(&rpc).is_some());
// }
// }
if let Some(max_age) = self.max_block_age {
if rpc_head_block.age() > max_age {
trace!("rpc_head_block from {} is too old! {}", rpc, rpc_head_block);
@ -324,7 +436,7 @@ impl ConsensusFinder {
) -> Option<ConsensusWeb3Rpcs> {
// sort the primary votes ascending by tier and descending by block num
let mut votes: Vec<_> = votes
.iter()
.into_iter()
.map(|(block, (rpc_names, sum_soft_limit))| (block, sum_soft_limit, rpc_names))
.collect();
votes.sort_by_cached_key(|(block, sum_soft_limit, rpc_names)| {
@ -366,11 +478,39 @@ impl ConsensusFinder {
let backups_needed = consensus_rpcs.iter().any(|x| x.backup);
let mut other_rpcs = BTreeMap::new();
for (x, x_head) in self
.rpc_heads
.iter()
.filter(|(k, _)| !consensus_rpcs.contains(k))
{
let x_head_num = *x_head.number();
let key: RpcRanking = RpcRanking::new(x.tier, x.backup, Some(x_head_num));
other_rpcs
.entry(key)
.or_insert_with(Vec::new)
.push(x.clone());
}
// TODO: how should we populate this?
let mut rpc_data = HashMap::with_capacity(self.rpc_heads.len());
for (x, x_head) in self.rpc_heads.iter() {
let y = RpcData::new(x, x_head);
rpc_data.insert(x.clone(), y);
}
let consensus = ConsensusWeb3Rpcs {
tier,
head_block: maybe_head_block.clone(),
best_rpcs: consensus_rpcs,
other_rpcs,
backups_needed,
rpc_data,
};
return Some(consensus);

File diff suppressed because it is too large Load Diff

@ -46,15 +46,15 @@ pub struct Web3Rpc {
/// provider is in a RwLock so that we can replace it if re-connecting
/// it is an async lock because we hold it open across awaits
/// this provider is only used for new heads subscriptions
/// TODO: watch channel instead of a lock
/// TODO: is this only used for new heads subscriptions? if so, rename
/// TODO: benchmark ArcSwapOption and a watch::Sender
pub(super) provider: AsyncRwLock<Option<Arc<Web3Provider>>>,
/// keep track of hard limits. Optional because we skip this code for our own servers.
/// keep track of hard limits
/// this is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) hard_limit_until: Option<watch::Sender<Instant>>,
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
/// We do not use the deferred rate limiter because going over limits would cause errors
pub(super) hard_limit: Option<RedisRateLimiter>,
/// used for load balancing to the least loaded server
/// used for ensuring enough requests are available before advancing the head block
pub(super) soft_limit: u32,
/// use web3 queries to find the block data limit for archive/pruned nodes
pub(super) automatic_block_limit: bool,
@ -65,7 +65,8 @@ pub struct Web3Rpc {
/// Lower tiers are higher priority when sending requests
pub(super) tier: u64,
/// TODO: change this to a watch channel so that http providers can subscribe and take action on change.
pub(super) head_block: RwLock<Option<Web3ProxyBlock>>,
/// this is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) head_block: Option<watch::Sender<Option<Web3ProxyBlock>>>,
/// Track head block latency
pub(super) head_latency: RwLock<EwmaLatency>,
/// Track peak request latency
@ -96,8 +97,6 @@ impl Web3Rpc {
// TODO: rename to http_new_head_interval_sender?
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
redis_pool: Option<RedisPool>,
// TODO: think more about soft limit. watching ewma of requests is probably better. but what should the random sort be on? maybe group on tier is enough
// soft_limit: u32,
block_map: BlocksByHashCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
@ -139,15 +138,9 @@ impl Web3Rpc {
let automatic_block_limit =
(block_data_limit.load(atomic::Ordering::Acquire) == 0) && block_sender.is_some();
// track hard limit until on backup servers (which might surprise us with rate limit changes)
// have a sender for tracking hard limit anywhere. we use this in case we
// and track on servers that have a configured hard limit
let hard_limit_until = if backup || hard_limit.is_some() {
let (sender, _) = watch::channel(Instant::now());
Some(sender)
} else {
None
};
let (hard_limit_until, _) = watch::channel(Instant::now());
if config.ws_url.is_none() && config.http_url.is_none() {
if let Some(url) = config.url {
@ -168,6 +161,8 @@ impl Web3Rpc {
let (disconnect_sender, disconnect_receiver) = watch::channel(false);
let reconnect = reconnect.into();
let (head_block, _) = watch::channel(None);
// Spawn the task for calculting average peak latency
// TODO Should these defaults be in config
let peak_latency = PeakEwmaLatency::spawn(
@ -200,7 +195,7 @@ impl Web3Rpc {
ws_url,
http_url,
hard_limit,
hard_limit_until,
hard_limit_until: Some(hard_limit_until),
soft_limit: config.soft_limit,
automatic_block_limit,
backup,
@ -209,7 +204,7 @@ impl Web3Rpc {
tier: config.tier,
disconnect_watch: Some(disconnect_sender),
created_at: Some(created_at),
head_block: RwLock::new(Default::default()),
head_block: Some(head_block),
peak_latency: Some(peak_latency),
..Default::default()
};
@ -352,8 +347,9 @@ impl Web3Rpc {
self.block_data_limit.load(atomic::Ordering::Acquire).into()
}
/// TODO: get rid of this now that consensus rpcs does it
pub fn has_block_data(&self, needed_block_num: &U64) -> bool {
let head_block_num = match self.head_block.read().as_ref() {
let head_block_num = match self.head_block.as_ref().unwrap().borrow().as_ref() {
None => return false,
Some(x) => *x.number(),
};
@ -483,8 +479,10 @@ impl Web3Rpc {
}
// reset sync status
let mut head_block = self.head_block.write();
*head_block = None;
self.head_block
.as_ref()
.expect("head_block should always be set")
.send_replace(None);
// disconnect the current provider
// TODO: what until the block_sender's receiver finishes updating this item?
@ -587,7 +585,7 @@ impl Web3Rpc {
Ok(())
}
async fn send_head_block_result(
pub(crate) async fn send_head_block_result(
self: &Arc<Self>,
new_head_block: Result<Option<ArcBlock>, ProviderError>,
block_sender: &flume::Sender<BlockAndRpc>,
@ -596,9 +594,9 @@ impl Web3Rpc {
let new_head_block = match new_head_block {
Ok(None) => {
{
let mut head_block = self.head_block.write();
let head_block_tx = self.head_block.as_ref().unwrap();
if head_block.is_none() {
if head_block_tx.borrow().is_none() {
// we previously sent a None. return early
return Ok(());
}
@ -607,7 +605,7 @@ impl Web3Rpc {
debug!("clearing head block on {} ({}ms old)!", self, age);
*head_block = None;
head_block_tx.send_replace(None);
}
None
@ -625,11 +623,10 @@ impl Web3Rpc {
// save the block so we don't send the same one multiple times
// also save so that archive checks can know how far back to query
{
let mut head_block = self.head_block.write();
let _ = head_block.insert(new_head_block.clone());
}
self.head_block
.as_ref()
.unwrap()
.send_replace(Some(new_head_block.clone()));
if self.block_data_limit() == U64::zero() {
let authorization = Arc::new(Authorization::internal(self.db_conn.clone())?);
@ -646,11 +643,7 @@ impl Web3Rpc {
Err(err) => {
warn!("unable to get block from {}. err={:?}", self, err);
{
let mut head_block = self.head_block.write();
*head_block = None;
}
self.head_block.as_ref().unwrap().send_replace(None);
None
}
@ -750,7 +743,7 @@ impl Web3Rpc {
if new_total_requests - old_total_requests < 10 {
// TODO: if this fails too many times, reset the connection
// TODO: move this into a function and the chaining should be easier
let head_block = rpc.head_block.read().clone();
let head_block = rpc.head_block.as_ref().unwrap().borrow().clone();
if let Some((block_number, txid)) = head_block.and_then(|x| {
let block = x.block;
@ -947,16 +940,25 @@ impl Web3Rpc {
.await?;
}
Ok(Some(block)) => {
// don't send repeat blocks
let new_hash =
block.hash.expect("blocks here should always have hashes");
if let Some(new_hash) = block.hash {
// don't send repeat blocks
if new_hash != last_hash {
// new hash!
last_hash = new_hash;
if new_hash != last_hash {
// new hash!
last_hash = new_hash;
self.send_head_block_result(
Ok(Some(block)),
&block_sender,
block_map.clone(),
)
.await?;
}
} else {
// TODO: why is this happening?
warn!("empty head block on {}", self);
self.send_head_block_result(
Ok(Some(block)),
Ok(None),
&block_sender,
block_map.clone(),
)
@ -1387,7 +1389,12 @@ impl Serialize for Web3Rpc {
state.serialize_field("soft_limit", &self.soft_limit)?;
// TODO: maybe this is too much data. serialize less?
state.serialize_field("head_block", &*self.head_block.read())?;
{
let head_block = self.head_block.as_ref().unwrap();
let head_block = head_block.borrow();
let head_block = head_block.as_ref();
state.serialize_field("head_block", &head_block)?;
}
state.serialize_field("head_latency", &self.head_latency.read().value())?;
@ -1445,6 +1452,8 @@ mod tests {
let head_block = Web3ProxyBlock::try_new(random_block).unwrap();
let block_data_limit = u64::MAX;
let (tx, _) = watch::channel(Some(head_block.clone()));
let x = Web3Rpc {
name: "name".to_string(),
ws_url: Some("ws://example.com".parse::<Url>().unwrap()),
@ -1453,7 +1462,7 @@ mod tests {
backup: false,
block_data_limit: block_data_limit.into(),
tier: 0,
head_block: RwLock::new(Some(head_block.clone())),
head_block: Some(tx),
..Default::default()
};
@ -1479,6 +1488,8 @@ mod tests {
let block_data_limit = 64;
let (tx, _rx) = watch::channel(Some(head_block.clone()));
let x = Web3Rpc {
name: "name".to_string(),
soft_limit: 1_000,
@ -1486,7 +1497,7 @@ mod tests {
backup: false,
block_data_limit: block_data_limit.into(),
tier: 0,
head_block: RwLock::new(Some(head_block.clone())),
head_block: Some(tx),
..Default::default()
};

@ -273,6 +273,7 @@ pub async fn query_user_stats<'a>(
.expect("max-age should always parse"),
);
// TODO: get this from `response` isntead of json serializing twice
let cache_body = json!(response_body).to_string();
if let Err(err) = redis_conn

@ -125,7 +125,7 @@ pub async fn query_user_stats<'a>(
user_rpc_keys.append(&mut subuser_rpc_keys);
if user_rpc_keys.len() == 0 {
if user_rpc_keys.is_empty() {
return Err(Web3ProxyError::BadRequest(
"User has no secret RPC keys yet".to_string(),
));

@ -2,36 +2,36 @@
//! TODO: move some of these structs/functions into their own file?
pub mod db_queries;
pub mod influxdb_queries;
use crate::app::AuthorizationChecks;
mod stat_buffer;
pub use stat_buffer::{SpawnedStatBuffer, StatBuffer};
use crate::app::RpcSecretKeyCache;
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::frontend::errors::Web3ProxyError;
use crate::rpcs::one::Web3Rpc;
use anyhow::Context;
use axum::headers::Origin;
use chrono::{DateTime, Months, TimeZone, Utc};
use derive_more::From;
use entities::sea_orm_active_enums::TrackingLevel;
use entities::{balance, referee, referrer, rpc_accounting_v2, rpc_key, user, user_tier};
use futures::stream;
use hashbrown::HashMap;
use influxdb2::api::write::TimestampPrecision;
use influxdb2::models::DataPoint;
use log::{error, info, trace, warn};
use log::{trace, warn};
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::ActiveModelTrait;
use migration::sea_orm::ColumnTrait;
use migration::sea_orm::IntoActiveModel;
use migration::sea_orm::{self, DatabaseConnection, EntityTrait, QueryFilter};
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel,
QueryFilter,
};
use migration::{Expr, OnConflict};
use moka::future::Cache;
use num_traits::ToPrimitive;
use parking_lot::Mutex;
use std::cmp::max;
use std::num::NonZeroU64;
use std::sync::atomic::Ordering;
use std::sync::atomic::{self, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tokio::time::interval;
use ulid::Ulid;
use self::stat_buffer::BufferedRpcQueryStats;
#[derive(Debug, PartialEq, Eq)]
pub enum StatType {
@ -39,8 +39,9 @@ pub enum StatType {
Detailed,
}
// Pub is needed for migration ... I could also write a second constructor for this if needed
/// TODO: better name?
pub type BackendRequests = Mutex<Vec<Arc<Web3Rpc>>>;
/// TODO: better name? RpcQueryStatBuilder?
#[derive(Clone, Debug)]
pub struct RpcQueryStats {
pub authorization: Arc<Authorization>,
@ -49,8 +50,8 @@ pub struct RpcQueryStats {
pub error_response: bool,
pub request_bytes: u64,
/// if backend_requests is 0, there was a cache_hit
// pub frontend_request: u64,
pub backend_requests: u64,
/// no need to track frontend_request on this. a RpcQueryStats always represents one frontend request
pub backend_rpcs_used: Vec<Arc<Web3Rpc>>,
pub response_bytes: u64,
pub response_millis: u64,
pub response_timestamp: i64,
@ -58,7 +59,7 @@ pub struct RpcQueryStats {
pub credits_used: Decimal,
}
#[derive(Clone, From, Hash, PartialEq, Eq)]
#[derive(Clone, Debug, From, Hash, PartialEq, Eq)]
pub struct RpcQueryKey {
/// unix epoch time
/// for the time series db, this is (close to) the time that the response was sent
@ -181,22 +182,6 @@ impl RpcQueryStats {
}
}
#[derive(Default)]
pub struct BufferedRpcQueryStats {
pub frontend_requests: u64,
pub backend_requests: u64,
pub backend_retries: u64,
pub no_servers: u64,
pub cache_misses: u64,
pub cache_hits: u64,
pub sum_request_bytes: u64,
pub sum_response_bytes: u64,
pub sum_response_millis: u64,
pub sum_credits_used: Decimal,
/// Balance tells us the user's balance at this point in time
pub latest_balance: Decimal,
}
/// A stat that we aggregate and then store in a database.
/// For now there is just one, but I think there might be others later
#[derive(Debug, From)]
@ -204,34 +189,16 @@ pub enum AppStat {
RpcQuery(RpcQueryStats),
}
#[derive(From)]
pub struct SpawnedStatBuffer {
pub stat_sender: flume::Sender<AppStat>,
/// these handles are important and must be allowed to finish
pub background_handle: JoinHandle<anyhow::Result<()>>,
}
pub struct StatBuffer {
chain_id: u64,
db_conn: Option<DatabaseConnection>,
influxdb_client: Option<influxdb2::Client>,
tsdb_save_interval_seconds: u32,
rpc_secret_key_cache:
Option<Cache<Ulid, AuthorizationChecks, hashbrown::hash_map::DefaultHashBuilder>>,
db_save_interval_seconds: u32,
billing_period_seconds: i64,
global_timeseries_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
opt_in_timeseries_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
accounting_db_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
timestamp_precision: TimestampPrecision,
}
// TODO: move to stat_buffer.rs?
impl BufferedRpcQueryStats {
fn add(&mut self, stat: RpcQueryStats) {
// a stat always come from just 1 frontend request
self.frontend_requests += 1;
if stat.backend_requests == 0 {
// TODO: is this always okay? is it true that each backend rpc will only be queried once per request? i think so
let num_backend_rpcs_used = stat.backend_rpcs_used.len() as u64;
if num_backend_rpcs_used == 0 {
// no backend request. cache hit!
self.cache_hits += 1;
} else {
@ -239,7 +206,7 @@ impl BufferedRpcQueryStats {
self.cache_misses += 1;
// a single frontend request might have multiple backend requests
self.backend_requests += stat.backend_requests;
self.backend_requests += num_backend_rpcs_used;
}
self.sum_request_bytes += stat.request_bytes;
@ -261,13 +228,21 @@ impl BufferedRpcQueryStats {
chain_id: u64,
db_conn: &DatabaseConnection,
key: RpcQueryKey,
rpc_secret_key_cache: Option<&RpcSecretKeyCache>,
) -> anyhow::Result<()> {
anyhow::ensure!(
key.response_timestamp > 0,
"no response_timestamp! This is a bug! {:?} {:?}",
key,
self
);
let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap();
// this is a lot of variables
let accounting_entry = rpc_accounting_v2::ActiveModel {
id: sea_orm::NotSet,
rpc_key_id: sea_orm::Set(key.rpc_secret_key_id.map(Into::into).unwrap_or_default()),
rpc_key_id: sea_orm::Set(key.rpc_secret_key_id.map(Into::into)),
chain_id: sea_orm::Set(chain_id),
period_datetime: sea_orm::Set(period_datetime),
archive_needed: sea_orm::Set(key.archive_needed),
@ -360,7 +335,7 @@ impl BufferedRpcQueryStats {
// Technicall there should always be a user ... still let's return "Ok(())" for now
let sender_user_id: u64 = match sender_rpc_key {
Some(x) => x.user_id.into(),
Some(x) => x.user_id,
// Return early if the User is not found, because then it is an anonymous user
// Let's also issue a warning because obviously the RPC key should correspond to a user
None => {
@ -390,12 +365,14 @@ impl BufferedRpcQueryStats {
// Still subtract from the user in any case,
// Modify the balance of the sender completely (in mysql, next to the stats)
// In any case, add this to "spent"
// TODO! we need to do the math in mysql (like with `Expr::col` above). if we do the addition here, there is a race condition
active_sender_balance.used_balance =
sea_orm::Set(sender_balance.used_balance + Decimal::from(self.sum_credits_used));
sea_orm::Set(sender_balance.used_balance + self.sum_credits_used);
// Also update the available balance
// TODO! this needs to be queried from the database
let new_available_balance = max(
sender_balance.available_balance - Decimal::from(self.sum_credits_used),
sender_balance.available_balance - self.sum_credits_used,
Decimal::from(0),
);
active_sender_balance.available_balance = sea_orm::Set(new_available_balance);
@ -424,15 +401,26 @@ impl BufferedRpcQueryStats {
))?;
// Downgrade a user to premium - out of funds if there's less than 10$ in the account, and if the user was premium before
// TODO: lets let them get under $1
// TODO: instead of checking for a specific title, downgrade if the downgrade id is set to anything
if new_available_balance < Decimal::from(10u64) && downgrade_user_role.title == "Premium" {
// TODO: we could do this outside the balance low block, but I think its fine. or better, update the cache if <$10 and downgrade if <$1
if let Some(rpc_secret_key_cache) = rpc_secret_key_cache {
todo!("expire (or probably better to update) the user cache now that the balance is low");
// actually i think we need to have 2 caches. otherwise users with 2 keys are going to have seperate caches
// 1. rpc_secret_key_id -> AuthorizationChecks (cuz we don't want to hit the db every time)
// 2. user_id -> Balance
}
// Only downgrade the user in local process memory, not elsewhere
// app.rpc_secret_key_cache-
// let mut active_downgrade_user = downgrade_user.into_active_model();
// active_downgrade_user.user_tier_id = sea_orm::Set(downgrade_user_role.id);
// active_downgrade_user.save(db_conn).await?;
}
// TODO:
// Get the referee, and the referrer
// (2) Look up the code that this user used. This is the referee table
let referee_object = match referee::Entity::find()
@ -459,6 +447,7 @@ impl BufferedRpcQueryStats {
{
Some(x) => x,
None => {
// TODO: warn seems too verbose for this. it should be fine for a user to not have a referall code, right?
warn!(
"No referrer with that referral code was found {:?}",
referee_object
@ -487,6 +476,7 @@ impl BufferedRpcQueryStats {
}
};
// TODO: don't clone on this. use the active_model later
let mut active_sender_balance = sender_balance.clone().into_active_model();
let referrer_balance = match balance::Entity::find()
.filter(balance::Column::UserId.eq(user_with_that_referral_code.user_id))
@ -513,6 +503,7 @@ impl BufferedRpcQueryStats {
{
// (6) If the credits have not yet been applied to the referee, apply 10M credits / $100.00 USD worth of credits.
// Make it into an active model, and add credits
// TODO! race condition here! we can't set. need to let the db do the math
active_sender_balance.available_balance =
sea_orm::Set(sender_balance.available_balance + Decimal::from(100));
// Also mark referral as "credits_applied_for_referee"
@ -528,8 +519,7 @@ impl BufferedRpcQueryStats {
let mut active_referrer_balance = referrer_balance.clone().into_active_model();
// Add 10% referral fees ...
active_referrer_balance.available_balance = sea_orm::Set(
referrer_balance.available_balance
+ Decimal::from(self.sum_credits_used / Decimal::from(10)),
referrer_balance.available_balance + self.sum_credits_used / Decimal::from(10),
);
// Also record how much the current referrer has "provided" / "gifted" away
active_referee.credits_applied_for_referrer =
@ -598,51 +588,80 @@ impl BufferedRpcQueryStats {
}
}
impl RpcQueryStats {
pub fn new(
method: Option<String>,
authorization: Arc<Authorization>,
metadata: Arc<RequestMetadata>,
response_bytes: usize,
) -> Self {
// TODO: try_unwrap the metadata to be sure that all the stats for this request have been collected
// TODO: otherwise, i think the whole thing should be in a single lock that we can "reset" when a stat is created
impl TryFrom<RequestMetadata> for RpcQueryStats {
type Error = Web3ProxyError;
fn try_from(mut metadata: RequestMetadata) -> Result<Self, Self::Error> {
let mut authorization = metadata.authorization.take();
if authorization.is_none() {
authorization = Some(Arc::new(Authorization::internal(None)?));
}
let authorization = authorization.expect("Authorization will always be set");
let archive_request = metadata.archive_request.load(Ordering::Acquire);
let backend_requests = metadata.backend_requests.lock().len() as u64;
let request_bytes = metadata.request_bytes;
let error_response = metadata.error_response.load(Ordering::Acquire);
let response_millis = metadata.start_instant.elapsed().as_millis() as u64;
let response_bytes = response_bytes as u64;
// TODO: Gotta make the arithmetic here
// TODO: do this without cloning. we can take their vec
let backend_rpcs_used = metadata.backend_rpcs_used();
let request_bytes = metadata.request_bytes as u64;
let response_bytes = metadata.response_bytes.load(Ordering::Acquire);
let mut error_response = metadata.error_response.load(Ordering::Acquire);
let mut response_millis = metadata.response_millis.load(atomic::Ordering::Acquire);
let response_timestamp = match metadata.response_timestamp.load(atomic::Ordering::Acquire) {
0 => {
// no response timestamp!
if !error_response {
// force error_response to true
// this can happen when a try operator escapes and metadata.add_response() isn't called
trace!(
"no response known, but no errors logged. investigate. {:?}",
metadata
);
error_response = true;
}
if response_millis == 0 {
// get something for millis even if it is a bit late
response_millis = metadata.start_instant.elapsed().as_millis() as u64
}
// no timestamp given. likely handling an error. set it to the current time
Utc::now().timestamp()
}
x => x,
};
let method = metadata.method.take();
// TODO: Depending on the method, metadata and response bytes, pick a different number of credits used
// This can be a slightly more complex function as we ll
// TODO: Here, let's implement the formula
let credits_used = Self::compute_cost(
request_bytes,
response_bytes,
backend_requests == 0,
&method,
backend_rpcs_used.is_empty(),
method.as_deref(),
);
let response_timestamp = Utc::now().timestamp();
Self {
let x = Self {
authorization,
archive_request,
method,
backend_requests,
backend_rpcs_used,
request_bytes,
error_response,
response_bytes,
response_millis,
response_timestamp,
credits_used,
}
}
};
Ok(x)
}
}
impl RpcQueryStats {
/// Compute cost per request
/// All methods cost the same
/// The number of bytes are based on input, and output bytes
@ -650,251 +669,39 @@ impl RpcQueryStats {
request_bytes: u64,
response_bytes: u64,
cache_hit: bool,
_method: &Option<String>,
method: Option<&str>,
) -> Decimal {
// TODO: Should make these lazy_static const?
// some methods should be free. there might be cases where method isn't set (though they should be uncommon)
// TODO: get this list from config (and add more to it)
if let Some(method) = method.as_ref() {
if ["eth_chainId"].contains(method) {
return 0.into();
}
}
// TODO: get cost_minimum, cost_free_bytes, cost_per_byte, cache_hit_divisor from config. each chain will be different
// pays at least $0.000018 / credits per request
let cost_minimum = Decimal::new(18, 6);
// 1kb is included on each call
let cost_free_bytes = 1024;
// after that, we add cost per bytes, $0.000000006 / credits per byte
// amazon charges $.09/GB outbound
// but we also have to cover our RAM and expensive nics on the servers (haproxy/web3-proxy/blockchains)
let cost_per_byte = Decimal::new(6, 9);
let total_bytes = request_bytes + response_bytes;
let total_chargable_bytes =
Decimal::from(max(0, total_bytes as i64 - cost_free_bytes as i64));
let out = cost_minimum + cost_per_byte * total_chargable_bytes;
let total_chargable_bytes = Decimal::from(total_bytes.saturating_sub(cost_free_bytes));
let mut cost = cost_minimum + cost_per_byte * total_chargable_bytes;
// cache hits get a 50% discount
if cache_hit {
out * Decimal::new(5, 1)
} else {
out
cost /= Decimal::from(2)
}
}
/// Only used for migration from stats_v1 to stats_v2/v3
pub fn modify_struct(
&mut self,
response_millis: u64,
response_timestamp: i64,
backend_requests: u64,
) {
self.response_millis = response_millis;
self.response_timestamp = response_timestamp;
self.backend_requests = backend_requests;
}
}
impl StatBuffer {
#[allow(clippy::too_many_arguments)]
pub fn try_spawn(
chain_id: u64,
bucket: String,
db_conn: Option<DatabaseConnection>,
influxdb_client: Option<influxdb2::Client>,
rpc_secret_key_cache: Option<
Cache<Ulid, AuthorizationChecks, hashbrown::hash_map::DefaultHashBuilder>,
>,
db_save_interval_seconds: u32,
tsdb_save_interval_seconds: u32,
billing_period_seconds: i64,
shutdown_receiver: broadcast::Receiver<()>,
) -> anyhow::Result<Option<SpawnedStatBuffer>> {
if db_conn.is_none() && influxdb_client.is_none() {
return Ok(None);
}
let (stat_sender, stat_receiver) = flume::unbounded();
let timestamp_precision = TimestampPrecision::Seconds;
let mut new = Self {
chain_id,
db_conn,
influxdb_client,
db_save_interval_seconds,
tsdb_save_interval_seconds,
rpc_secret_key_cache,
billing_period_seconds,
global_timeseries_buffer: Default::default(),
opt_in_timeseries_buffer: Default::default(),
accounting_db_buffer: Default::default(),
timestamp_precision,
};
// any errors inside this task will cause the application to exit
let handle = tokio::spawn(async move {
new.aggregate_and_save_loop(bucket, stat_receiver, shutdown_receiver)
.await
});
Ok(Some((stat_sender, handle).into()))
}
async fn aggregate_and_save_loop(
&mut self,
bucket: String,
stat_receiver: flume::Receiver<AppStat>,
mut shutdown_receiver: broadcast::Receiver<()>,
) -> anyhow::Result<()> {
let mut tsdb_save_interval =
interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64));
let mut db_save_interval =
interval(Duration::from_secs(self.db_save_interval_seconds as u64));
// TODO: Somewhere here we should probably be updating the balance of the user
// And also update the credits used etc. for the referred user
loop {
tokio::select! {
stat = stat_receiver.recv_async() => {
// info!("Received stat");
// save the stat to a buffer
match stat {
Ok(AppStat::RpcQuery(stat)) => {
if self.influxdb_client.is_some() {
// TODO: round the timestamp at all?
let global_timeseries_key = stat.global_timeseries_key();
self.global_timeseries_buffer.entry(global_timeseries_key).or_default().add(stat.clone());
if let Some(opt_in_timeseries_key) = stat.opt_in_timeseries_key() {
self.opt_in_timeseries_buffer.entry(opt_in_timeseries_key).or_default().add(stat.clone());
}
}
if self.db_conn.is_some() {
self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat);
}
}
Err(err) => {
error!("error receiving stat: {:?}", err);
break;
}
}
}
_ = db_save_interval.tick() => {
// info!("DB save internal tick");
let count = self.save_relational_stats().await;
if count > 0 {
trace!("Saved {} stats to the relational db", count);
}
}
_ = tsdb_save_interval.tick() => {
// info!("TSDB save internal tick");
let count = self.save_tsdb_stats(&bucket).await;
if count > 0 {
trace!("Saved {} stats to the tsdb", count);
}
}
x = shutdown_receiver.recv() => {
info!("shutdown signal ---");
match x {
Ok(_) => {
info!("stat_loop shutting down");
},
Err(err) => error!("stat_loop shutdown receiver err={:?}", err),
}
break;
}
}
}
let saved_relational = self.save_relational_stats().await;
info!("saved {} pending relational stats", saved_relational);
let saved_tsdb = self.save_tsdb_stats(&bucket).await;
info!("saved {} pending tsdb stats", saved_tsdb);
info!("accounting and stat save loop complete");
Ok(())
}
async fn save_relational_stats(&mut self) -> usize {
let mut count = 0;
if let Some(db_conn) = self.db_conn.as_ref() {
count = self.accounting_db_buffer.len();
for (key, stat) in self.accounting_db_buffer.drain() {
// TODO: batch saves
// TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now
if let Err(err) = stat.save_db(self.chain_id, db_conn, key).await {
error!("unable to save accounting entry! err={:?}", err);
};
}
}
count
}
// TODO: bucket should be an enum so that we don't risk typos
async fn save_tsdb_stats(&mut self, bucket: &str) -> usize {
let mut count = 0;
if let Some(influxdb_client) = self.influxdb_client.as_ref() {
// TODO: use stream::iter properly to avoid allocating this Vec
let mut points = vec![];
for (key, stat) in self.global_timeseries_buffer.drain() {
// TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now
match stat
.build_timeseries_point("global_proxy", self.chain_id, key)
.await
{
Ok(point) => {
points.push(point);
}
Err(err) => {
error!("unable to build global stat! err={:?}", err);
}
};
}
for (key, stat) in self.opt_in_timeseries_buffer.drain() {
// TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now
match stat
.build_timeseries_point("opt_in_proxy", self.chain_id, key)
.await
{
Ok(point) => {
points.push(point);
}
Err(err) => {
// TODO: if this errors, we throw away some of the pending stats! we should probably buffer them somewhere to be tried again
error!("unable to build opt-in stat! err={:?}", err);
}
};
}
count = points.len();
if count > 0 {
// TODO: put max_batch_size in config?
// TODO: i think the real limit is the byte size of the http request. so, a simple line count won't work very well
let max_batch_size = 100;
let mut num_left = count;
while num_left > 0 {
let batch_size = num_left.min(max_batch_size);
let p = points.split_off(batch_size);
num_left -= batch_size;
if let Err(err) = influxdb_client
.write_with_precision(bucket, stream::iter(p), self.timestamp_precision)
.await
{
// TODO: if this errors, we throw away some of the pending stats! we should probably buffer them somewhere to be tried again
error!("unable to save {} tsdb stats! err={:?}", batch_size, err);
}
}
}
}
count
cost
}
}

@ -0,0 +1,269 @@
use super::{AppStat, RpcQueryKey};
use crate::app::RpcSecretKeyCache;
use derive_more::From;
use futures::stream;
use hashbrown::HashMap;
use influxdb2::api::write::TimestampPrecision;
use log::{error, info, trace};
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::DatabaseConnection;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tokio::time::interval;
#[derive(Debug, Default)]
pub struct BufferedRpcQueryStats {
pub frontend_requests: u64,
pub backend_requests: u64,
pub backend_retries: u64,
pub no_servers: u64,
pub cache_misses: u64,
pub cache_hits: u64,
pub sum_request_bytes: u64,
pub sum_response_bytes: u64,
pub sum_response_millis: u64,
pub sum_credits_used: Decimal,
/// Balance tells us the user's balance at this point in time
pub latest_balance: Decimal,
}
#[derive(From)]
pub struct SpawnedStatBuffer {
pub stat_sender: flume::Sender<AppStat>,
/// these handles are important and must be allowed to finish
pub background_handle: JoinHandle<anyhow::Result<()>>,
}
pub struct StatBuffer {
accounting_db_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
billing_period_seconds: i64,
chain_id: u64,
db_conn: Option<DatabaseConnection>,
db_save_interval_seconds: u32,
global_timeseries_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
influxdb_client: Option<influxdb2::Client>,
opt_in_timeseries_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
rpc_secret_key_cache: Option<RpcSecretKeyCache>,
timestamp_precision: TimestampPrecision,
tsdb_save_interval_seconds: u32,
}
impl StatBuffer {
#[allow(clippy::too_many_arguments)]
pub fn try_spawn(
billing_period_seconds: i64,
bucket: String,
chain_id: u64,
db_conn: Option<DatabaseConnection>,
db_save_interval_seconds: u32,
influxdb_client: Option<influxdb2::Client>,
rpc_secret_key_cache: Option<RpcSecretKeyCache>,
shutdown_receiver: broadcast::Receiver<()>,
tsdb_save_interval_seconds: u32,
) -> anyhow::Result<Option<SpawnedStatBuffer>> {
if db_conn.is_none() && influxdb_client.is_none() {
return Ok(None);
}
let (stat_sender, stat_receiver) = flume::unbounded();
let timestamp_precision = TimestampPrecision::Seconds;
let mut new = Self {
accounting_db_buffer: Default::default(),
billing_period_seconds,
chain_id,
db_conn,
db_save_interval_seconds,
global_timeseries_buffer: Default::default(),
influxdb_client,
opt_in_timeseries_buffer: Default::default(),
rpc_secret_key_cache,
timestamp_precision,
tsdb_save_interval_seconds,
};
// any errors inside this task will cause the application to exit
let handle = tokio::spawn(async move {
new.aggregate_and_save_loop(bucket, stat_receiver, shutdown_receiver)
.await
});
Ok(Some((stat_sender, handle).into()))
}
async fn aggregate_and_save_loop(
&mut self,
bucket: String,
stat_receiver: flume::Receiver<AppStat>,
mut shutdown_receiver: broadcast::Receiver<()>,
) -> anyhow::Result<()> {
let mut tsdb_save_interval =
interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64));
let mut db_save_interval =
interval(Duration::from_secs(self.db_save_interval_seconds as u64));
// TODO: Somewhere here we should probably be updating the balance of the user
// And also update the credits used etc. for the referred user
loop {
tokio::select! {
stat = stat_receiver.recv_async() => {
// info!("Received stat");
// save the stat to a buffer
match stat {
Ok(AppStat::RpcQuery(stat)) => {
if self.influxdb_client.is_some() {
// TODO: round the timestamp at all?
let global_timeseries_key = stat.global_timeseries_key();
self.global_timeseries_buffer.entry(global_timeseries_key).or_default().add(stat.clone());
if let Some(opt_in_timeseries_key) = stat.opt_in_timeseries_key() {
self.opt_in_timeseries_buffer.entry(opt_in_timeseries_key).or_default().add(stat.clone());
}
}
if self.db_conn.is_some() {
self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat);
}
}
Err(err) => {
error!("error receiving stat: {:?}", err);
break;
}
}
}
_ = db_save_interval.tick() => {
// info!("DB save internal tick");
let count = self.save_relational_stats().await;
if count > 0 {
trace!("Saved {} stats to the relational db", count);
}
}
_ = tsdb_save_interval.tick() => {
// info!("TSDB save internal tick");
let count = self.save_tsdb_stats(&bucket).await;
if count > 0 {
trace!("Saved {} stats to the tsdb", count);
}
}
x = shutdown_receiver.recv() => {
match x {
Ok(_) => {
info!("stat_loop shutting down");
},
Err(err) => error!("stat_loop shutdown receiver err={:?}", err),
}
break;
}
}
}
let saved_relational = self.save_relational_stats().await;
info!("saved {} pending relational stat(s)", saved_relational);
let saved_tsdb = self.save_tsdb_stats(&bucket).await;
info!("saved {} pending tsdb stat(s)", saved_tsdb);
info!("accounting and stat save loop complete");
Ok(())
}
async fn save_relational_stats(&mut self) -> usize {
let mut count = 0;
if let Some(db_conn) = self.db_conn.as_ref() {
count = self.accounting_db_buffer.len();
for (key, stat) in self.accounting_db_buffer.drain() {
// TODO: batch saves
// TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now
if let Err(err) = stat
.save_db(
self.chain_id,
db_conn,
key,
self.rpc_secret_key_cache.as_ref(),
)
.await
{
error!("unable to save accounting entry! err={:?}", err);
};
}
}
count
}
// TODO: bucket should be an enum so that we don't risk typos
async fn save_tsdb_stats(&mut self, bucket: &str) -> usize {
let mut count = 0;
if let Some(influxdb_client) = self.influxdb_client.as_ref() {
// TODO: use stream::iter properly to avoid allocating this Vec
let mut points = vec![];
for (key, stat) in self.global_timeseries_buffer.drain() {
// TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now
match stat
.build_timeseries_point("global_proxy", self.chain_id, key)
.await
{
Ok(point) => {
points.push(point);
}
Err(err) => {
error!("unable to build global stat! err={:?}", err);
}
};
}
for (key, stat) in self.opt_in_timeseries_buffer.drain() {
// TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now
match stat
.build_timeseries_point("opt_in_proxy", self.chain_id, key)
.await
{
Ok(point) => {
points.push(point);
}
Err(err) => {
// TODO: if this errors, we throw away some of the pending stats! we should probably buffer them somewhere to be tried again
error!("unable to build opt-in stat! err={:?}", err);
}
};
}
count = points.len();
if count > 0 {
// TODO: put max_batch_size in config?
// TODO: i think the real limit is the byte size of the http request. so, a simple line count won't work very well
let max_batch_size = 100;
let mut num_left = count;
while num_left > 0 {
let batch_size = num_left.min(max_batch_size);
let p = points.split_off(batch_size);
num_left -= batch_size;
if let Err(err) = influxdb_client
.write_with_precision(bucket, stream::iter(p), self.timestamp_precision)
.await
{
// TODO: if this errors, we throw away some of the pending stats! we should probably buffer them somewhere to be tried again
error!("unable to save {} tsdb stats! err={:?}", batch_size, err);
}
}
}
}
count
}
}