Merge branch 'devel' into david/devel/debug-graph

This commit is contained in:
David 2023-05-24 13:36:45 +02:00 committed by GitHub
commit 9a5e9d46a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1113 additions and 1204 deletions

78
Cargo.lock generated

@ -263,7 +263,7 @@ checksum = "f8175979259124331c1d7bf6586ee7e0da434155e4b2d48ec2c8386281d8df39"
dependencies = [
"async-trait",
"axum-core",
"base64 0.21.0",
"base64 0.21.1",
"bitflags",
"bytes",
"futures-util",
@ -379,9 +379,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64"
version = "0.21.0"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a"
checksum = "3f1e31e207a6b8fb791a38ea3105e6cb541f55e4d029902d3039a4ad07cc4105"
[[package]]
name = "base64ct"
@ -532,9 +532,9 @@ dependencies = [
[[package]]
name = "bumpalo"
version = "3.12.2"
version = "3.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c6ed94e98ecff0c12dd1b04c15ec0d7d9458ca8fe806cea6f12954efe74c63b"
checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1"
[[package]]
name = "byte-slice-cast"
@ -797,7 +797,7 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b949a1c63fb7eb591eb7ba438746326aedf0ae843e51ec92ba6bec5bb382c4f"
dependencies = [
"base64 0.21.0",
"base64 0.21.1",
"bech32",
"bs58",
"digest 0.10.6",
@ -1526,7 +1526,7 @@ dependencies = [
[[package]]
name = "entities"
version = "0.27.0"
version = "0.28.0"
dependencies = [
"ethers",
"sea-orm",
@ -1812,7 +1812,7 @@ checksum = "1009041f40476b972b5d79346cc512e97c662b1a0a2f78285eabe9a122909783"
dependencies = [
"async-trait",
"auto_impl",
"base64 0.21.0",
"base64 0.21.1",
"bytes",
"enr",
"ethers-core",
@ -2941,6 +2941,7 @@ name = "latency"
version = "0.1.0"
dependencies = [
"ewma",
"flume",
"log",
"serde",
"tokio",
@ -3000,9 +3001,9 @@ dependencies = [
[[package]]
name = "linux-raw-sys"
version = "0.3.7"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ece97ea872ece730aed82664c424eb4c8291e1ff2480247ccf7409044bc6479f"
checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
[[package]]
name = "listenfd"
@ -3090,7 +3091,7 @@ dependencies = [
[[package]]
name = "migration"
version = "0.27.0"
version = "0.28.0"
dependencies = [
"sea-orm-migration",
"tokio",
@ -4278,13 +4279,13 @@ dependencies = [
[[package]]
name = "regex"
version = "1.8.1"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af83e617f331cc6ae2da5443c602dfa5af81e517212d9d611a5b3ba1777b5370"
checksum = "d1a59b5d8e97dee33696bf13c5ba8ab85341c002922fba050069326b9c498974"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax 0.7.1",
"regex-syntax 0.7.2",
]
[[package]]
@ -4304,9 +4305,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.7.1"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5996294f19bd3aae0453a862ad728f60e6600695733dd5df01da90c54363a3c"
checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78"
[[package]]
name = "rend"
@ -4323,7 +4324,7 @@ version = "0.11.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cde824a14b7c14f85caff81225f411faacc04a2013f41670f41443742b1c1c55"
dependencies = [
"base64 0.21.0",
"base64 0.21.1",
"bytes",
"encoding_rs",
"futures-core",
@ -4584,7 +4585,7 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b"
dependencies = [
"base64 0.21.0",
"base64 0.21.1",
]
[[package]]
@ -4899,9 +4900,9 @@ dependencies = [
[[package]]
name = "security-framework"
version = "2.9.0"
version = "2.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca2855b3715770894e67cbfa3df957790aa0c9edc3bf06efa1a84d77fa0839d1"
checksum = "1fc758eb7bffce5b308734e9b0c1468893cae9ff70ebf13e7090be8dcbcc83a8"
dependencies = [
"bitflags",
"core-foundation",
@ -4943,9 +4944,9 @@ checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73"
[[package]]
name = "sentry"
version = "0.31.1"
version = "0.31.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37dd6c0cdca6b1d1ca44cde7fff289f2592a97965afec870faa7b81b9fc87745"
checksum = "234f6e133d27140ad5ea3b369a7665f7fbc060fe246f81d8168665b38c08b600"
dependencies = [
"httpdate",
"reqwest",
@ -4963,9 +4964,9 @@ dependencies = [
[[package]]
name = "sentry-anyhow"
version = "0.31.1"
version = "0.31.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c3d7032fff178c77c107c32c6d3337b12847adf67165ccc876c898e7154b00"
checksum = "47e940be4c63b29006b4ac422cacea932c2cb5f8c209647ee86446ed27595a42"
dependencies = [
"anyhow",
"sentry-backtrace",
@ -4974,9 +4975,9 @@ dependencies = [
[[package]]
name = "sentry-backtrace"
version = "0.31.1"
version = "0.31.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c029fe8317cdd75cb2b52c600bab4e2ef64c552198e669ba874340447f330962"
checksum = "d89b6b53de06308dd5ac08934b597bcd72a9aae0c20bc3ab06da69cb34d468e3"
dependencies = [
"backtrace",
"once_cell",
@ -4986,9 +4987,9 @@ dependencies = [
[[package]]
name = "sentry-contexts"
version = "0.31.1"
version = "0.31.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc575098d73c8b942b589ab453b06e4c43527556dd8f95532220d1b54d7c6b4b"
checksum = "0769b66763e59976cd5c0fd817dcd51ccce404de8bebac0cd0e886c55b0fffa8"
dependencies = [
"hostname",
"libc",
@ -5000,9 +5001,9 @@ dependencies = [
[[package]]
name = "sentry-core"
version = "0.31.1"
version = "0.31.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20216140001bbf05895f013abd0dae4df58faee24e016d54cbf107f070bac56b"
checksum = "a1f954f1b89e8cd82576dc49bfab80304c9a6201343b4fe5c68c819f7a9bbed2"
dependencies = [
"once_cell",
"rand",
@ -5013,9 +5014,9 @@ dependencies = [
[[package]]
name = "sentry-log"
version = "0.31.1"
version = "0.31.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a43934e48e9c8e2c7d0dcb9c6cbcfcbe3ee109a14fc0c821e8944acd4faa2c25"
checksum = "958f539c85854acf7fa0fa46f07077be9050c7b28602ddfb5a651e9d11b27740"
dependencies = [
"log",
"sentry-core",
@ -5023,9 +5024,9 @@ dependencies = [
[[package]]
name = "sentry-panic"
version = "0.31.1"
version = "0.31.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e45cd0a113fc06d6edba01732010518816cdc8ce3bccc70f5e41570046bf046"
checksum = "94dc2ab494362ad51308c7c19f44e9ab70e426a931621e4a05f378a1e74558c2"
dependencies = [
"sentry-backtrace",
"sentry-core",
@ -5033,9 +5034,9 @@ dependencies = [
[[package]]
name = "sentry-types"
version = "0.31.1"
version = "0.31.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7f6959d8cb3a77be27e588eef6ce9a2a469651a556d9de662e4d07e5ace4232"
checksum = "85c53caf80cb1c6fcdf4d82b7bfff8477f50841e4caad7bf8e5e57a152b564cb"
dependencies = [
"debugid",
"getrandom",
@ -5988,7 +5989,7 @@ checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a"
dependencies = [
"async-trait",
"axum",
"base64 0.21.0",
"base64 0.21.1",
"bytes",
"futures-core",
"futures-util",
@ -6502,7 +6503,7 @@ dependencies = [
[[package]]
name = "web3_proxy"
version = "0.27.0"
version = "0.28.0"
dependencies = [
"anyhow",
"arc-swap",
@ -6532,6 +6533,7 @@ dependencies = [
"hex_fmt",
"hostname",
"http",
"hyper",
"influxdb2",
"influxdb2-structmap",
"ipnet",

@ -1,6 +1,6 @@
[package]
name = "entities"
version = "0.27.0"
version = "0.28.0"
edition = "2021"
[lib]

@ -0,0 +1,49 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "admin_increase_balance_receipt")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
#[sea_orm(column_type = "Decimal(Some((20, 10)))")]
pub amount: Decimal,
pub admin_id: u64,
pub deposit_to_user_id: u64,
pub note: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::admin::Entity",
from = "Column::AdminId",
to = "super::admin::Column::Id",
on_update = "NoAction",
on_delete = "NoAction"
)]
Admin,
#[sea_orm(
belongs_to = "super::user::Entity",
from = "Column::DepositToUserId",
to = "super::user::Column::Id",
on_update = "NoAction",
on_delete = "NoAction"
)]
User,
}
impl Related<super::admin::Entity> for Entity {
fn to() -> RelationDef {
Relation::Admin.def()
}
}
impl Related<super::user::Entity> for Entity {
fn to() -> RelationDef {
Relation::User.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

@ -3,6 +3,7 @@
pub mod prelude;
pub mod admin;
pub mod admin_increase_balance_receipt;
pub mod admin_trail;
pub mod balance;
pub mod increase_on_chain_balance_receipt;

@ -1,6 +1,7 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
pub use super::admin::Entity as Admin;
pub use super::admin_increase_balance_receipt::Entity as AdminIncreaseBalanceReceipt;
pub use super::admin_trail::Entity as AdminTrail;
pub use super::balance::Entity as Balance;
pub use super::increase_on_chain_balance_receipt::Entity as IncreaseOnChainBalanceReceipt;

@ -7,6 +7,7 @@ edition = "2021"
[dependencies]
ewma = "0.1.1"
flume = "0.10.14"
log = "0.4.17"
serde = { version = "1.0.163", features = [] }
tokio = { version = "1.28.1", features = ["full"] }

@ -2,9 +2,7 @@ mod rtt_estimate;
use std::sync::Arc;
use log::{error, trace};
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use log::{error, log_enabled, trace};
use tokio::task::JoinHandle;
use tokio::time::{Duration, Instant};
@ -20,7 +18,7 @@ pub struct PeakEwmaLatency {
/// Join handle for the latency calculation task
pub join_handle: JoinHandle<()>,
/// Send to update with each request duration
request_tx: mpsc::Sender<Duration>,
request_tx: flume::Sender<Duration>,
/// Latency average and last update time
rtt_estimate: Arc<AtomicRttEstimate>,
/// Decay time
@ -32,9 +30,12 @@ impl PeakEwmaLatency {
///
/// Returns a handle that can also be used to read the current
/// average latency.
pub fn spawn(decay_ns: f64, buf_size: usize, start_latency: Duration) -> Self {
pub fn spawn(decay: Duration, buf_size: usize, start_latency: Duration) -> Self {
let decay_ns = decay.as_nanos() as f64;
debug_assert!(decay_ns > 0.0, "decay_ns must be positive");
let (request_tx, request_rx) = mpsc::channel(buf_size);
let (request_tx, request_rx) = flume::bounded(buf_size);
let rtt_estimate = Arc::new(AtomicRttEstimate::new(start_latency));
let task = PeakEwmaLatencyTask {
request_rx,
@ -56,15 +57,19 @@ impl PeakEwmaLatency {
let mut estimate = self.rtt_estimate.load();
let now = Instant::now();
assert!(
estimate.update_at <= now,
"update_at is {}ns in the future",
estimate.update_at.duration_since(now).as_nanos(),
);
// Update the RTT estimate to account for decay since the last update.
// TODO: having an update here means we don't actually write from just one thread!! Thats how we get partially written stuff i think
estimate.update(0.0, self.decay_ns, now)
if estimate.update_at > now {
if log_enabled!(log::Level::Trace) {
trace!(
"update_at is {}ns in the future",
estimate.update_at.duration_since(now).as_nanos()
);
}
estimate.rtt
} else {
// Update the RTT estimate to account for decay since the last update.
estimate.update(0.0, self.decay_ns, now)
}
}
/// Report latency from a single request
@ -73,15 +78,12 @@ impl PeakEwmaLatency {
pub fn report(&self, duration: Duration) {
match self.request_tx.try_send(duration) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
Err(err) => {
// We don't want to block if the channel is full, just
// report the error
error!("Latency report channel full");
error!("Latency report channel full. {}", err);
// TODO: could we spawn a new tokio task to report tthis later?
}
Err(TrySendError::Closed(_)) => {
unreachable!("Owner should keep channel open");
}
};
}
}
@ -90,7 +92,7 @@ impl PeakEwmaLatency {
#[derive(Debug)]
struct PeakEwmaLatencyTask {
/// Receive new request timings for update
request_rx: mpsc::Receiver<Duration>,
request_rx: flume::Receiver<Duration>,
/// Current estimate and update time
rtt_estimate: Arc<AtomicRttEstimate>,
/// Last update time, used for decay calculation
@ -101,8 +103,8 @@ struct PeakEwmaLatencyTask {
impl PeakEwmaLatencyTask {
/// Run the loop for updating latency
async fn run(mut self) {
while let Some(rtt) = self.request_rx.recv().await {
async fn run(self) {
while let Ok(rtt) = self.request_rx.recv_async().await {
self.update(rtt);
}
trace!("latency loop exited");
@ -128,14 +130,15 @@ impl PeakEwmaLatencyTask {
mod tests {
use tokio::time::{self, Duration};
use crate::util::nanos::NANOS_PER_MILLI;
/// The default RTT estimate decays, so that new nodes are considered if the
/// default RTT is too high.
#[tokio::test(start_paused = true)]
async fn default_decay() {
let estimate =
super::PeakEwmaLatency::spawn(NANOS_PER_MILLI * 1_000.0, 8, Duration::from_millis(10));
let estimate = super::PeakEwmaLatency::spawn(
Duration::from_millis(1_000),
8,
Duration::from_millis(10),
);
let load = estimate.latency();
assert_eq!(load, Duration::from_millis(10));

@ -44,6 +44,7 @@ impl RttEstimate {
);
Duration::from_nanos(next_estimate as u64)
};
self.rtt
}
@ -101,7 +102,7 @@ impl AtomicRttEstimate {
}
/// Fetches the value, and applies a function to it that returns an
/// new rtt. Retrns the new RttEstimate with new update_at.
/// new rtt. Returns the new RttEstimate with new update_at.
///
/// Automatically updates the update_at with Instant::now(). This
/// method omits ordering arguments, defaulting to Relaxed since

@ -1,6 +1,6 @@
[package]
name = "migration"
version = "0.27.0"
version = "0.28.0"
edition = "2021"
publish = false

@ -27,6 +27,7 @@ mod m20230412_171916_modify_secondary_user_add_primary_user;
mod m20230422_172555_premium_downgrade_logic;
mod m20230511_161214_remove_columns_statsv2_origin_and_method;
mod m20230512_220213_allow_null_rpc_key_id_in_stats_v2;
mod m20230514_114803_admin_add_credits;
pub struct Migrator;
@ -51,16 +52,17 @@ impl MigratorTrait for Migrator {
Box::new(m20230125_204810_stats_v2::Migration),
Box::new(m20230130_124740_read_only_login_logic::Migration),
Box::new(m20230130_165144_prepare_admin_imitation_pre_login::Migration),
Box::new(m20230215_152254_admin_trail::Migration),
Box::new(m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2::Migration),
Box::new(m20230205_130035_create_balance::Migration),
Box::new(m20230205_133755_create_referrals::Migration),
Box::new(m20230214_134254_increase_balance_transactions::Migration),
Box::new(m20230215_152254_admin_trail::Migration),
Box::new(m20230221_230953_track_spend::Migration),
Box::new(m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2::Migration),
Box::new(m20230412_171916_modify_secondary_user_add_primary_user::Migration),
Box::new(m20230422_172555_premium_downgrade_logic::Migration),
Box::new(m20230511_161214_remove_columns_statsv2_origin_and_method::Migration),
Box::new(m20230512_220213_allow_null_rpc_key_id_in_stats_v2::Migration),
Box::new(m20230514_114803_admin_add_credits::Migration),
]
}
}

@ -0,0 +1,97 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(AdminIncreaseBalanceReceipt::Table)
.if_not_exists()
.col(
ColumnDef::new(AdminIncreaseBalanceReceipt::Id)
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(
ColumnDef::new(AdminIncreaseBalanceReceipt::Amount)
.decimal_len(20, 10)
.not_null(),
)
.col(
ColumnDef::new(AdminIncreaseBalanceReceipt::AdminId)
.big_unsigned()
.not_null(),
)
.foreign_key(
ForeignKey::create()
.name("fk-admin_id")
.from(
AdminIncreaseBalanceReceipt::Table,
AdminIncreaseBalanceReceipt::AdminId,
)
.to(Admin::Table, Admin::Id),
)
.col(
ColumnDef::new(AdminIncreaseBalanceReceipt::DepositToUserId)
.big_unsigned()
.not_null(),
)
.foreign_key(
ForeignKey::create()
.name("fk-admin_deposits_to_user_id")
.from(
AdminIncreaseBalanceReceipt::Table,
AdminIncreaseBalanceReceipt::DepositToUserId,
)
.to(User::Table, User::Id),
)
.col(
ColumnDef::new(AdminIncreaseBalanceReceipt::Note)
.string()
.not_null(),
)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(
Table::drop()
.table(AdminIncreaseBalanceReceipt::Table)
.to_owned(),
)
.await
}
}
#[derive(Iden)]
enum Admin {
Table,
Id,
}
/// Learn more at https://docs.rs/sea-query#iden
#[derive(Iden)]
enum User {
Table,
Id,
}
/// Learn more at https://docs.rs/sea-query#iden
#[derive(Iden)]
enum AdminIncreaseBalanceReceipt {
Table,
Id,
Amount,
AdminId,
DepositToUserId,
Note,
}

@ -0,0 +1,44 @@
# Create / Login user1
curl -X GET "http://127.0.0.1:8544/user/login/0xeb3e928a2e54be013ef8241d4c9eaf4dfae94d5a"
curl -X POST http://127.0.0.1:8544/user/login \
-H 'Content-Type: application/json' \
-d '{
"address": "0xeb3e928a2e54be013ef8241d4c9eaf4dfae94d5a",
"msg": "0x6c6c616d616e6f6465732e636f6d2077616e747320796f7520746f207369676e20696e207769746820796f757220457468657265756d206163636f756e743a0a3078654233453932384132453534424530313345463832343164344339456146344466414539344435610a0af09fa699f09fa699f09fa699f09fa699f09fa6990a0a5552493a2068747470733a2f2f6c6c616d616e6f6465732e636f6d2f0a56657273696f6e3a20310a436861696e2049443a20310a4e6f6e63653a203031483044573642334a48355a4b384a4e3947504d594e4d4b370a4973737565642041743a20323032332d30352d31345431393a33353a35352e3736323632395a0a45787069726174696f6e2054696d653a20323032332d30352d31345431393a35353a35352e3736323632395a",
"sig": "f88b42d638246f8e51637c753052cab3a13b2a138faf3107c921ce2f0027d6506b9adcd3a7b72af830cdf50d20e6e9cb3f9f456dd1be47f6543990ea050909791c",
"version": "3",
"signer": "MEW"
}'
# 01H0DW6VFCP365B9TXVQVVMHHY
# 01H0DVZNDJWQ7YG8RBHXQHJ301
# Make user1 an admin
cargo run change_admin_status 0xeB3E928A2E54BE013EF8241d4C9EaF4DfAE94D5a true
# Create/Login user2
curl -X GET "http://127.0.0.1:8544/user/login/0x762390ae7a3c4D987062a398C1eA8767029AB08E"
curl -X POST http://127.0.0.1:8544/user/login \
-H 'Content-Type: application/json' \
-d '{
"address": "0x762390ae7a3c4d987062a398c1ea8767029ab08e",
"msg": "0x6c6c616d616e6f6465732e636f6d2077616e747320796f7520746f207369676e20696e207769746820796f757220457468657265756d206163636f756e743a0a3078373632333930616537613363344439383730363261333938433165413837363730323941423038450a0af09fa699f09fa699f09fa699f09fa699f09fa6990a0a5552493a2068747470733a2f2f6c6c616d616e6f6465732e636f6d2f0a56657273696f6e3a20310a436861696e2049443a20310a4e6f6e63653a20303148304457384233304e534447594e484d33514d4a31434e530a4973737565642041743a20323032332d30352d31345431393a33373a30312e3238303338355a0a45787069726174696f6e2054696d653a20323032332d30352d31345431393a35373a30312e3238303338355a",
"sig": "c545235557b7952a789dffa2af153af5cf663dcc05449bcc4b651b04cda57de05bcef55c0f5cbf6aa2432369582eb6a40927d14ad0a2d15f48fa45f32fbf273f1c",
"version": "3",
"signer": "MEW"
}'
# 01H0DWPXRQA7XX2VFSNR02CG1N
# 01H0DWPXQQ951Y3R90QMF6MYGE
curl \
-H "Authorization: Bearer 01H0DWPXRQA7XX2VFSNR02CG1N" \
-X GET "127.0.0.1:8544/user/balance"
# Admin add balance
curl \
-H "Authorization: Bearer 01H0DW6VFCP365B9TXVQVVMHHY" \
-X GET "127.0.0.1:8544/admin/increase_balance?user_address=0x762390ae7a3c4D987062a398C1eA8767029AB08E&amount=100.0"

@ -1,6 +1,6 @@
[package]
name = "web3_proxy"
version = "0.27.0"
version = "0.28.0"
edition = "2021"
default-run = "web3_proxy_cli"
@ -55,6 +55,7 @@ hdrhistogram = "7.5.2"
hex_fmt = "0.3.0"
hostname = "0.3.1"
http = "0.2.9"
hyper = { version = "0.14.26", features = ["full"] }
influxdb2 = { git = "https://github.com/llamanodes/influxdb2", features = ["rustls"] }
influxdb2-structmap = { git = "https://github.com/llamanodes/influxdb2/"}
ipnet = "2.7.2"
@ -71,10 +72,10 @@ parking_lot = { version = "0.12.1", features = ["arc_lock"] }
prettytable = "*"
proctitle = "0.1.1"
rdkafka = { version = "0.31.0" }
regex = "1.8.1"
regex = "1.8.2"
reqwest = { version = "0.11.18", default-features = false, features = ["json", "tokio-rustls"] }
rmp-serde = "1.1.1"
sentry = { version = "0.31.1", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] }
sentry = { version = "0.31.2", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] }
serde = { version = "1.0.163", features = [] }
serde_json = { version = "1.0.96", default-features = false, features = ["alloc", "raw_value"] }
serde_prometheus = "0.2.2"
@ -91,3 +92,6 @@ tower-http = { version = "0.4.0", features = ["cors", "sensitive-headers"] }
ulid = { version = "1.0.0", features = ["uuid", "serde"] }
url = "2.3.1"
uuid = "1.3.3"
[dev-dependencies]
tokio = { version = "1.28.1", features = ["full", "test-util"] }

@ -74,7 +74,7 @@ pub static APP_USER_AGENT: &str = concat!(
// aggregate across 1 week
pub const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7;
pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
pub type Web3ProxyJoinHandle<T> = JoinHandle<Web3ProxyResult<T>>;
/// TODO: move this
#[derive(Clone, Debug, Default, From)]
@ -176,7 +176,7 @@ pub struct Web3ProxyApp {
// TODO: should the key be our RpcSecretKey class instead of Ulid?
pub rpc_secret_key_cache: RpcSecretKeyCache,
/// concurrent/parallel RPC request limits for authenticated users
pub rpc_key_semaphores: Cache<NonZeroU64, Arc<Semaphore>>,
pub user_semaphores: Cache<NonZeroU64, Arc<Semaphore>>,
/// concurrent/parallel request limits for anonymous users
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>>,
/// concurrent/parallel application request limits for authenticated users
@ -188,7 +188,7 @@ pub struct Web3ProxyApp {
/// flatten a JoinError into an anyhow error
/// Useful when joining multiple futures.
pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T> {
pub async fn flatten_handle<T>(handle: Web3ProxyJoinHandle<T>) -> Web3ProxyResult<T> {
match handle.await {
Ok(Ok(result)) => Ok(result),
Ok(Err(err)) => Err(err),
@ -198,8 +198,8 @@ pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T>
/// return the first error, or Ok if everything worked
pub async fn flatten_handles<T>(
mut handles: FuturesUnordered<AnyhowJoinHandle<T>>,
) -> anyhow::Result<()> {
mut handles: FuturesUnordered<Web3ProxyJoinHandle<T>>,
) -> Web3ProxyResult<()> {
while let Some(x) = handles.next().await {
match x {
Err(e) => return Err(e.into()),
@ -315,9 +315,9 @@ pub struct Web3ProxyAppSpawn {
/// the app. probably clone this to use in other groups of handles
pub app: Arc<Web3ProxyApp>,
/// handles for the balanced and private rpcs
pub app_handles: FuturesUnordered<AnyhowJoinHandle<()>>,
pub app_handles: FuturesUnordered<Web3ProxyJoinHandle<()>>,
/// these are important and must be allowed to finish
pub background_handles: FuturesUnordered<AnyhowJoinHandle<()>>,
pub background_handles: FuturesUnordered<Web3ProxyJoinHandle<()>>,
/// config changes are sent here
pub new_top_config_sender: watch::Sender<TopConfig>,
/// watch this to know when to start the app
@ -359,10 +359,12 @@ impl Web3ProxyApp {
}
// these futures are key parts of the app. if they stop running, the app has encountered an irrecoverable error
let app_handles = FuturesUnordered::new();
// TODO: this is a small enough group, that a vec with try_join_all is probably fine
let app_handles: FuturesUnordered<Web3ProxyJoinHandle<()>> = FuturesUnordered::new();
// we must wait for these to end on their own (and they need to subscribe to shutdown_sender)
let important_background_handles = FuturesUnordered::new();
let important_background_handles: FuturesUnordered<Web3ProxyJoinHandle<()>> =
FuturesUnordered::new();
// connect to the database and make sure the latest migrations have run
let mut db_conn = None::<DatabaseConnection>;
@ -624,12 +626,10 @@ impl Web3ProxyApp {
// TODO: what should tti be for semaphores?
let bearer_token_semaphores = Cache::new(max_users);
let ip_semaphores = Cache::new(max_users);
let registered_user_semaphores = Cache::new(max_users);
let user_semaphores = Cache::new(max_users);
let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn(
top_config.app.chain_id,
db_conn.clone(),
http_client.clone(),
top_config.app.max_block_age,
top_config.app.max_block_lag,
top_config.app.min_synced_rpcs,
@ -654,9 +654,7 @@ impl Web3ProxyApp {
// TODO: Merge
// let (private_rpcs, private_rpcs_handle) = Web3Rpcs::spawn(
let (private_rpcs, private_handle, _) = Web3Rpcs::spawn(
top_config.app.chain_id,
db_conn.clone(),
http_client.clone(),
// private rpcs don't get subscriptions, so no need for max_block_age or max_block_lag
None,
None,
@ -688,9 +686,7 @@ impl Web3ProxyApp {
} else {
// TODO: do something with the spawn handle
let (bundler_4337_rpcs, bundler_4337_rpcs_handle, _) = Web3Rpcs::spawn(
top_config.app.chain_id,
db_conn.clone(),
http_client.clone(),
// bundler_4337_rpcs don't get subscriptions, so no need for max_block_age or max_block_lag
None,
None,
@ -735,7 +731,7 @@ impl Web3ProxyApp {
rpc_secret_key_cache,
bearer_token_semaphores,
ip_semaphores,
rpc_key_semaphores: registered_user_semaphores,
user_semaphores,
stat_sender,
};
@ -752,9 +748,9 @@ impl Web3ProxyApp {
loop {
let new_top_config = new_top_config_receiver.borrow_and_update().to_owned();
app.apply_top_config(new_top_config)
.await
.context("failed applying new top_config")?;
if let Err(err) = app.apply_top_config(new_top_config).await {
error!("unable to apply config! {:?}", err);
};
new_top_config_receiver
.changed()
@ -790,7 +786,7 @@ impl Web3ProxyApp {
.into())
}
pub async fn apply_top_config(&self, new_top_config: TopConfig) -> anyhow::Result<()> {
pub async fn apply_top_config(&self, new_top_config: TopConfig) -> Web3ProxyResult<()> {
// TODO: also update self.config from new_top_config.app
// connect to the backends
@ -1015,20 +1011,12 @@ impl Web3ProxyApp {
) -> Web3ProxyResult<(StatusCode, JsonRpcForwardedResponseEnum, Vec<Arc<Web3Rpc>>)> {
// trace!(?request, "proxy_web3_rpc");
// even though we have timeouts on the requests to our backend providers,
// we need a timeout for the incoming request so that retries don't run forever
// TODO: take this as an optional argument. check for a different max from the user_tier?
// TODO: how much time was spent on this request alredy?
let max_time = Duration::from_secs(240);
// TODO: use streams and buffers so we don't overwhelm our server
let response = match request {
JsonRpcRequestEnum::Single(mut request) => {
let (status_code, response, rpcs) = timeout(
max_time,
self.proxy_cached_request(&authorization, &mut request, None),
)
.await?;
let (status_code, response, rpcs) = self
.proxy_cached_request(&authorization, &mut request, None)
.await;
(
status_code,
@ -1037,11 +1025,9 @@ impl Web3ProxyApp {
)
}
JsonRpcRequestEnum::Batch(requests) => {
let (responses, rpcs) = timeout(
max_time,
self.proxy_web3_rpc_requests(&authorization, requests),
)
.await??;
let (responses, rpcs) = self
.proxy_web3_rpc_requests(&authorization, requests)
.await?;
// TODO: real status code
(
@ -1335,7 +1321,8 @@ impl Web3ProxyApp {
| "eth_getUserOperationReceipt"
| "eth_supportedEntryPoints") => match self.bundler_4337_rpcs.as_ref() {
Some(bundler_4337_rpcs) => {
bundler_4337_rpcs
// TODO: timeout
bundler_4337_rpcs
.try_proxy_connection(
authorization,
request,
@ -1371,6 +1358,7 @@ impl Web3ProxyApp {
JsonRpcResponseData::from(json!(Address::zero()))
}
"eth_estimateGas" => {
// TODO: timeout
let response_data = self
.balanced_rpcs
.try_proxy_connection(
@ -1407,6 +1395,7 @@ impl Web3ProxyApp {
}
"eth_getTransactionReceipt" | "eth_getTransactionByHash" => {
// try to get the transaction without specifying a min_block_height
// TODO: timeout
let mut response_data = self
.balanced_rpcs
.try_proxy_connection(
@ -1450,12 +1439,7 @@ impl Web3ProxyApp {
// TODO: error if the chain_id is incorrect
// TODO: check the cache to see if we have sent this transaction recently
// TODO: if so, use a cached response.
// TODO: if not,
// TODO: - cache successes for up to 1 minute
// TODO: - cache failures for 1 block (we see transactions skipped because out of funds. but that can change block to block)
// TODO: timeout
let mut response_data = self
.try_send_protected(
authorization,
@ -1585,6 +1569,7 @@ impl Web3ProxyApp {
,
"web3_sha3" => {
// returns Keccak-256 (not the standardized SHA3-256) of the given data.
// TODO: timeout
match &request.params {
Some(serde_json::Value::Array(params)) => {
// TODO: make a struct and use serde conversion to clean this up
@ -1744,6 +1729,9 @@ impl Web3ProxyApp {
let authorization = authorization.clone();
// TODO: different timeouts for different user tiers
let duration = Duration::from_secs(240);
if let Some(cache_key) = cache_key {
let from_block_num = cache_key.from_block.as_ref().map(|x| x.number.unwrap());
let to_block_num = cache_key.to_block.as_ref().map(|x| x.number.unwrap());
@ -1754,15 +1742,18 @@ impl Web3ProxyApp {
{
Ok(x) => x,
Err(x) => {
let response_data = self.balanced_rpcs
.try_proxy_connection(
&authorization,
request,
Some(request_metadata),
from_block_num.as_ref(),
to_block_num.as_ref(),
let response_data = timeout(
duration,
self.balanced_rpcs
.try_proxy_connection(
&authorization,
request,
Some(request_metadata),
from_block_num.as_ref(),
to_block_num.as_ref(),
)
)
.await?;
.await??;
// TODO: convert the Box<RawValue> to an Arc<RawValue>
x.insert(response_data.clone());
@ -1771,15 +1762,18 @@ impl Web3ProxyApp {
}
}
} else {
self.balanced_rpcs
.try_proxy_connection(
&authorization,
request,
Some(request_metadata),
None,
None,
timeout(
duration,
self.balanced_rpcs
.try_proxy_connection(
&authorization,
request,
Some(request_metadata),
None,
None,
)
)
.await?
.await??
}
}
};

@ -1,4 +1,4 @@
use crate::app::AnyhowJoinHandle;
use crate::app::Web3ProxyJoinHandle;
use crate::rpcs::blockchain::{BlocksByHashCache, Web3ProxyBlock};
use crate::rpcs::one::Web3Rpc;
use argh::FromArgs;
@ -9,7 +9,7 @@ use log::warn;
use migration::sea_orm::DatabaseConnection;
use serde::Deserialize;
use std::sync::Arc;
use tokio::sync::broadcast;
use std::time::Duration;
pub type BlockAndRpc = (Option<Web3ProxyBlock>, Arc<Web3Rpc>);
pub type TxHashAndRpc = (TxHash, Arc<Web3Rpc>);
@ -283,28 +283,48 @@ impl Web3RpcConfig {
redis_pool: Option<redis_rate_limiter::RedisPool>,
chain_id: u64,
http_client: Option<reqwest::Client>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
blocks_by_hash_cache: BlocksByHashCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<TxHashAndRpc>>,
reconnect: bool,
) -> anyhow::Result<(Arc<Web3Rpc>, AnyhowJoinHandle<()>)> {
) -> anyhow::Result<(Arc<Web3Rpc>, Web3ProxyJoinHandle<()>)> {
if !self.extra.is_empty() {
warn!("unknown Web3RpcConfig fields!: {:?}", self.extra.keys());
}
// TODO: get this from config? a helper function? where does this belong?
let block_interval = match chain_id {
// ethereum
1 => Duration::from_secs(12),
// ethereum-goerli
5 => Duration::from_secs(12),
// polygon
137 => Duration::from_secs(2),
// fantom
250 => Duration::from_secs(1),
// arbitrum
42161 => Duration::from_millis(500),
// anything else
_ => {
let default = 10;
warn!(
"unexpected chain_id ({}). polling every {} seconds",
chain_id, default
);
Duration::from_secs(default)
}
};
Web3Rpc::spawn(
self,
name,
chain_id,
db_conn,
http_client,
http_interval_sender,
redis_pool,
block_interval,
blocks_by_hash_cache,
block_sender,
tx_id_sender,
reconnect,
)
.await
}

@ -7,6 +7,7 @@ use crate::app::Web3ProxyApp;
use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext};
use crate::user_token::UserBearerToken;
use crate::PostLogin;
use anyhow::Context;
use axum::{
extract::{Path, Query},
headers::{authorization::Bearer, Authorization},
@ -16,15 +17,19 @@ use axum::{
use axum_client_ip::InsecureClientIp;
use axum_macros::debug_handler;
use chrono::{TimeZone, Utc};
use entities::{admin_trail, login, pending_login, rpc_key, user};
use entities::{
admin, admin_increase_balance_receipt, admin_trail, balance, login, pending_login, rpc_key,
user, user_tier,
};
use ethers::{prelude::Address, types::Bytes};
use hashbrown::HashMap;
use http::StatusCode;
use log::{debug, info, warn};
use migration::sea_orm::prelude::Uuid;
use migration::sea_orm::prelude::{Decimal, Uuid};
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter,
};
use migration::{Expr, OnConflict};
use serde_json::json;
use siwe::{Message, VerificationOpts};
use std::ops::Add;
@ -33,6 +38,135 @@ use std::sync::Arc;
use time::{Duration, OffsetDateTime};
use ulid::Ulid;
/// `GET /admin/increase_balance` -- As an admin, modify a user's user-tier
///
/// - user_address that is to credited balance
/// - user_role_tier that is supposed to be adapted
#[debug_handler]
pub async fn admin_increase_balance(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
Query(params): Query<HashMap<String, String>>,
) -> Web3ProxyResponse {
let (caller, _) = app.bearer_is_authorized(bearer).await?;
let caller_id = caller.id;
// Establish connections
let db_conn = app
.db_conn()
.context("query_admin_modify_user needs a db")?;
// Check if the caller is an admin (if not, return early)
let admin_entry: admin::Model = admin::Entity::find()
.filter(admin::Column::UserId.eq(caller_id))
.one(&db_conn)
.await?
.ok_or(Web3ProxyError::AccessDenied)?;
// Get the user from params
let user_address: Address = params
.get("user_address")
.ok_or_else(|| {
Web3ProxyError::BadRequest("Unable to find user_address key in request".to_string())
})?
.parse::<Address>()
.map_err(|_| {
Web3ProxyError::BadRequest("Unable to parse user_address as an Address".to_string())
})?;
let user_address_bytes: Vec<u8> = user_address.to_fixed_bytes().into();
let note: String = params
.get("note")
.ok_or_else(|| {
Web3ProxyError::BadRequest("Unable to find 'note' key in request".to_string())
})?
.parse::<String>()
.map_err(|_| {
Web3ProxyError::BadRequest("Unable to parse 'note' as a String".to_string())
})?;
// Get the amount from params
// Decimal::from_str
let amount: Decimal = params
.get("amount")
.ok_or_else(|| {
Web3ProxyError::BadRequest("Unable to get the amount key from the request".to_string())
})
.map(|x| Decimal::from_str(x))?
.map_err(|err| {
Web3ProxyError::BadRequest(format!("Unable to parse amount from the request {:?}", err))
})?;
let user_entry: user::Model = user::Entity::find()
.filter(user::Column::Address.eq(user_address_bytes.clone()))
.one(&db_conn)
.await?
.ok_or(Web3ProxyError::BadRequest(
"No user with this id found".to_string(),
))?;
let increase_balance_receipt = admin_increase_balance_receipt::ActiveModel {
amount: sea_orm::Set(amount),
admin_id: sea_orm::Set(admin_entry.id),
deposit_to_user_id: sea_orm::Set(user_entry.id),
note: sea_orm::Set(note),
..Default::default()
};
increase_balance_receipt.save(&db_conn).await?;
let mut out = HashMap::new();
out.insert(
"user",
serde_json::Value::String(format!("{:?}", user_address)),
);
out.insert("amount", serde_json::Value::String(amount.to_string()));
// Get the balance row
let balance_entry: balance::Model = balance::Entity::find()
.filter(balance::Column::UserId.eq(user_entry.id))
.one(&db_conn)
.await?
.context("User does not have a balance row")?;
// Finally make the user premium if balance is above 10$
let premium_user_tier = user_tier::Entity::find()
.filter(user_tier::Column::Title.eq("Premium"))
.one(&db_conn)
.await?
.context("Premium tier was not found!")?;
let balance_entry = balance_entry.into_active_model();
balance::Entity::insert(balance_entry)
.on_conflict(
OnConflict::new()
.values([
// (
// balance::Column::Id,
// Expr::col(balance::Column::Id).add(self.frontend_requests),
// ),
(
balance::Column::AvailableBalance,
Expr::col(balance::Column::AvailableBalance).add(amount),
),
// (
// balance::Column::Used,
// Expr::col(balance::Column::UsedBalance).add(self.backend_retries),
// ),
// (
// balance::Column::UserId,
// Expr::col(balance::Column::UserId).add(self.no_servers),
// ),
])
.to_owned(),
)
.exec(&db_conn)
.await?;
// TODO: Downgrade otherwise, right now not functioning properly
// Then read and save in one transaction
let response = (StatusCode::OK, Json(out)).into_response();
Ok(response)
}
/// `GET /admin/modify_role` -- As an admin, modify a user's user-tier
///
/// - user_address that is to be modified

@ -30,7 +30,7 @@ use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::RedisRateLimitResult;
use std::convert::Infallible;
use std::fmt::Display;
use std::hash::Hash;
use std::hash::{Hash, Hasher};
use std::mem;
use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize};
use std::time::Duration;
@ -42,22 +42,12 @@ use ulid::Ulid;
use uuid::Uuid;
/// This lets us use UUID and ULID while we transition to only ULIDs
/// TODO: include the key's description.
#[derive(Copy, Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)]
pub enum RpcSecretKey {
Ulid(Ulid),
Uuid(Uuid),
}
impl Hash for RpcSecretKey {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
match self {
Self::Ulid(x) => state.write_u128(x.0),
Self::Uuid(x) => state.write_u128(x.as_u128()),
}
}
}
/// TODO: should this have IpAddr and Origin or AuthorizationChecks?
#[derive(Debug)]
pub enum RateLimitResult {
@ -99,6 +89,17 @@ pub struct KafkaDebugLogger {
num_responses: AtomicUsize,
}
impl Hash for RpcSecretKey {
fn hash<H: Hasher>(&self, state: &mut H) {
let x = match self {
Self::Ulid(x) => x.0,
Self::Uuid(x) => x.as_u128(),
};
x.hash(state);
}
}
impl fmt::Debug for KafkaDebugLogger {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("KafkaDebugLogger")
@ -883,17 +884,13 @@ impl Web3ProxyApp {
if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests {
let semaphore = self
.ip_semaphores
.get_or_insert_async::<Web3ProxyError>(ip, async move {
.get_or_insert_async::<Infallible>(ip, async move {
// TODO: set max_concurrent_requests dynamically based on load?
let s = Semaphore::new(max_concurrent_requests);
Ok(Arc::new(s))
})
.await?;
// if semaphore.available_permits() == 0 {
// // TODO: concurrent limit hit! emit a stat? less important for anon users
// // TODO: there is probably a race here
// }
.await
.expect("infallible");
let semaphore_permit = semaphore.acquire_owned().await?;
@ -903,8 +900,8 @@ impl Web3ProxyApp {
}
}
/// Limit the number of concurrent requests from the given rpc key.
pub async fn registered_user_semaphore(
/// Limit the number of concurrent requests for a given user across all of their keys
pub async fn user_semaphore(
&self,
authorization_checks: &AuthorizationChecks,
) -> Web3ProxyResult<Option<OwnedSemaphorePermit>> {
@ -915,25 +912,19 @@ impl Web3ProxyApp {
.or(Err(Web3ProxyError::UserIdZero))?;
let semaphore = self
.rpc_key_semaphores
.get_or_insert_async(&user_id, async move {
.user_semaphores
.get_or_insert_async::<Infallible>(&user_id, async move {
let s = Semaphore::new(max_concurrent_requests as usize);
// trace!("new semaphore for user_id {}", user_id);
Ok::<_, Infallible>(Arc::new(s))
Ok(Arc::new(s))
})
.await
.unwrap();
// if semaphore.available_permits() == 0 {
// // TODO: concurrent limit hit! emit a stat? this has a race condition though.
// // TODO: maybe have a stat on how long we wait to acquire the semaphore instead?
// }
.expect("infallible");
let semaphore_permit = semaphore.acquire_owned().await?;
Ok(Some(semaphore_permit))
} else {
// unlimited requests allowed
// unlimited concurrency
Ok(None)
}
}
@ -955,7 +946,7 @@ impl Web3ProxyApp {
Ok(Arc::new(s))
})
.await
.unwrap();
.expect("infallible");
let semaphore_permit = semaphore.acquire_owned().await?;
@ -1043,7 +1034,7 @@ impl Web3ProxyApp {
// they do check origin because we can override rate limits for some origins
let authorization = Authorization::external(
allowed_origin_requests_per_period,
self.db_conn.clone(),
self.db_conn(),
ip,
origin,
proxy_mode,
@ -1098,8 +1089,7 @@ impl Web3ProxyApp {
proxy_mode: ProxyMode,
rpc_secret_key: RpcSecretKey,
) -> Web3ProxyResult<AuthorizationChecks> {
let authorization_checks: Result<_, Web3ProxyError> = self
.rpc_secret_key_cache
self.rpc_secret_key_cache
.get_or_insert_async(&rpc_secret_key, async move {
// trace!(?rpc_secret_key, "user cache miss");
@ -1119,7 +1109,6 @@ impl Web3ProxyApp {
Some(rpc_key_model) => {
// TODO: move these splits into helper functions
// TODO: can we have sea orm handle this for us?
// TODO: don't expect. return an application error
let user_model = user::Entity::find_by_id(rpc_key_model.user_id)
.one(db_replica.conn())
.await?
@ -1129,8 +1118,8 @@ impl Web3ProxyApp {
.filter(balance::Column::UserId.eq(user_model.id))
.one(db_replica.conn())
.await?
.map(|x| x.available_balance)
.unwrap_or_default();
.expect("related balance")
.available_balance;
let user_tier_model =
user_tier::Entity::find_by_id(user_model.user_tier_id)
@ -1220,9 +1209,7 @@ impl Web3ProxyApp {
None => Ok(AuthorizationChecks::default()),
}
})
.await;
authorization_checks
.await
}
/// Authorized the ip/origin/referer/useragent and rate limit and concurrency
@ -1246,9 +1233,7 @@ impl Web3ProxyApp {
// only allow this rpc_key to run a limited amount of concurrent requests
// TODO: rate limit should be BEFORE the semaphore!
let semaphore = self
.registered_user_semaphore(&authorization_checks)
.await?;
let semaphore = self.user_semaphore(&authorization_checks).await?;
let authorization = Authorization::try_new(
authorization_checks,

@ -47,14 +47,15 @@ pub enum Web3ProxyError {
Database(DbErr),
#[display(fmt = "{:#?}, {:#?}", _0, _1)]
EipVerificationFailed(Box<Web3ProxyError>, Box<Web3ProxyError>),
EthersHttpClientError(ethers::prelude::HttpClientError),
EthersProviderError(ethers::prelude::ProviderError),
EthersWsClientError(ethers::prelude::WsClientError),
FlumeRecvError(flume::RecvError),
EthersHttpClient(ethers::prelude::HttpClientError),
EthersProvider(ethers::prelude::ProviderError),
EthersWsClient(ethers::prelude::WsClientError),
FlumeRecv(flume::RecvError),
GasEstimateNotU256,
Headers(headers::Error),
HeaderToString(ToStrError),
InfluxDb2RequestError(influxdb2::RequestError),
Hyper(hyper::Error),
InfluxDb2Request(influxdb2::RequestError),
#[display(fmt = "{} > {}", min, max)]
#[from(ignore)]
InvalidBlockBounds {
@ -64,6 +65,7 @@ pub enum Web3ProxyError {
InvalidHeaderValue(InvalidHeaderValue),
InvalidEip,
InvalidInviteCode,
Io(std::io::Error),
UnknownReferralCode,
InvalidReferer,
InvalidSignatureLength,
@ -88,6 +90,12 @@ pub enum Web3ProxyError {
num_known: usize,
min_head_rpcs: usize,
},
#[display(fmt = "{}/{}", available, needed)]
#[from(ignore)]
NotEnoughSoftLimit {
available: u32,
needed: u32,
},
NotFound,
NotImplemented,
OriginRequired,
@ -136,6 +144,7 @@ pub enum Web3ProxyError {
impl Web3ProxyError {
pub fn into_response_parts(self) -> (StatusCode, JsonRpcResponseData) {
// TODO: include a unique request id in the data
let (code, err): (StatusCode, JsonRpcErrorData) = match self {
Self::AccessDenied => {
// TODO: attach something to this trace. probably don't include much in the message though. don't want to leak creds by accident
@ -223,7 +232,7 @@ impl Web3ProxyError {
},
)
}
Self::EthersHttpClientError(err) => {
Self::EthersHttpClient(err) => {
warn!("EthersHttpClientError err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
@ -234,7 +243,7 @@ impl Web3ProxyError {
},
)
}
Self::EthersProviderError(err) => {
Self::EthersProvider(err) => {
warn!("EthersProviderError err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
@ -245,7 +254,7 @@ impl Web3ProxyError {
},
)
}
Self::EthersWsClientError(err) => {
Self::EthersWsClient(err) => {
warn!("EthersWsClientError err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
@ -256,7 +265,7 @@ impl Web3ProxyError {
},
)
}
Self::FlumeRecvError(err) => {
Self::FlumeRecv(err) => {
warn!("FlumeRecvError err={:#?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
@ -290,7 +299,19 @@ impl Web3ProxyError {
},
)
}
Self::InfluxDb2RequestError(err) => {
Self::Hyper(err) => {
warn!("hyper err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcErrorData {
// TODO: is it safe to expose these error strings?
message: Cow::Owned(err.to_string()),
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(),
data: None,
},
)
}
Self::InfluxDb2Request(err) => {
// TODO: attach a request id to the message and to this error so that if people report problems, we can dig in sentry to find out more
error!("influxdb2 err={:?}", err);
(
@ -371,6 +392,18 @@ impl Web3ProxyError {
},
)
}
Self::Io(err) => {
warn!("std io err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcErrorData {
// TODO: is it safe to expose our io error strings?
message: Cow::Owned(err.to_string()),
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(),
data: None,
},
)
}
Self::UnknownReferralCode => {
debug!("UnknownReferralCode");
(
@ -528,6 +561,20 @@ impl Web3ProxyError {
},
)
}
Self::NotEnoughSoftLimit { available, needed } => {
error!("NotEnoughSoftLimit {}/{}", available, needed);
(
StatusCode::BAD_GATEWAY,
JsonRpcErrorData {
message: Cow::Owned(format!(
"not enough soft limit available {}/{}",
available, needed
)),
code: StatusCode::BAD_GATEWAY.as_u16().into(),
data: None,
},
)
}
Self::NotFound => {
// TODO: emit a stat?
// TODO: instead of an error, show a normal html page for 404?

@ -28,6 +28,8 @@ use tokio::sync::broadcast;
use tower_http::cors::CorsLayer;
use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer;
use self::errors::Web3ProxyResult;
/// simple keys for caching responses
#[derive(Copy, Clone, Hash, PartialEq, Eq, EnumCount, EnumIter)]
pub enum ResponseCacheKey {
@ -49,7 +51,7 @@ pub async fn serve(
proxy_app: Arc<Web3ProxyApp>,
mut shutdown_receiver: broadcast::Receiver<()>,
shutdown_complete_sender: broadcast::Sender<()>,
) -> anyhow::Result<()> {
) -> Web3ProxyResult<()> {
// setup caches for whatever the frontend needs
// no need for max items since it is limited by the enum key
// TODO: latest moka allows for different ttls for different
@ -199,6 +201,10 @@ pub async fn serve(
"/user/logout",
post(users::authentication::user_logout_post),
)
.route(
"/admin/increase_balance",
get(admin::admin_increase_balance),
)
.route("/admin/modify_role", get(admin::admin_change_user_roles))
.route(
"/admin/imitate-login/:admin_address/:user_address",

@ -36,7 +36,7 @@ pub async fn health(
Ok(_health(app).await)
})
.await
.unwrap();
.expect("this cache get is infallible");
Response::builder()
.status(code)
@ -70,7 +70,7 @@ pub async fn backups_needed(
Ok(_backups_needed(app).await)
})
.await
.unwrap();
.expect("this cache get is infallible");
Response::builder()
.status(code)
@ -120,7 +120,7 @@ pub async fn status(
Ok(_status(app).await)
})
.await
.unwrap();
.expect("this cache get is infallible");
Response::builder()
.status(code)

@ -153,7 +153,7 @@ pub async fn user_balance_post(
// Just make an rpc request, idk if i need to call this super extensive code
let transaction_receipt: TransactionReceipt = match app
.balanced_rpcs
.wait_for_best_rpc(&authorization, None, &mut vec![], None, None)
.wait_for_best_rpc(&authorization, None, &mut vec![], None, None, None)
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
@ -166,7 +166,6 @@ pub async fn user_balance_post(
"eth_getTransactionReceipt",
&vec![format!("0x{}", hex::encode(tx_hash))],
Level::Trace.into(),
None,
)
.await
// TODO: What kind of error would be here
@ -188,7 +187,7 @@ pub async fn user_balance_post(
debug!("Transaction receipt is: {:?}", transaction_receipt);
let accepted_token: Address = match app
.balanced_rpcs
.wait_for_best_rpc(&authorization, None, &mut vec![], None, None)
.wait_for_best_rpc(&authorization, None, &mut vec![], None, None, None)
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
@ -217,7 +216,7 @@ pub async fn user_balance_post(
]);
debug!("Params are: {:?}", &params);
let accepted_token: String = handle
.request("eth_call", &params, Level::Trace.into(), None)
.request("eth_call", &params, Level::Trace.into())
.await
// TODO: What kind of error would be here
.map_err(|err| Web3ProxyError::Anyhow(err.into()))?;
@ -243,7 +242,7 @@ pub async fn user_balance_post(
debug!("Accepted token is: {:?}", accepted_token);
let decimals: u32 = match app
.balanced_rpcs
.wait_for_best_rpc(&authorization, None, &mut vec![], None, None)
.wait_for_best_rpc(&authorization, None, &mut vec![], None, None, None)
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
@ -267,7 +266,7 @@ pub async fn user_balance_post(
]);
debug!("ERC20 Decimal request params are: {:?}", &params);
let decimals: String = handle
.request("eth_call", &params, Level::Trace.into(), None)
.request("eth_call", &params, Level::Trace.into())
.await
.map_err(|err| Web3ProxyError::Anyhow(err.into()))?;
debug!("Decimals response is: {:?}", decimals);

@ -8,13 +8,14 @@ use std::sync::Arc;
use tokio::sync::broadcast;
use crate::app::Web3ProxyApp;
use crate::frontend::errors::Web3ProxyResult;
/// Run a prometheus metrics server on the given port.
pub async fn serve(
app: Arc<Web3ProxyApp>,
port: u16,
mut shutdown_receiver: broadcast::Receiver<()>,
) -> anyhow::Result<()> {
) -> Web3ProxyResult<()> {
// routes should be ordered most to least common
let app = Router::new().route("/", get(root)).layer(Extension(app));

@ -18,7 +18,6 @@ use std::convert::Infallible;
use std::hash::Hash;
use std::{cmp::Ordering, fmt::Display, sync::Arc};
use tokio::sync::broadcast;
use tokio::time::Duration;
// TODO: type for Hydrated Blocks with their full transactions?
pub type ArcBlock = Arc<Block<TxHash>>;
@ -193,7 +192,7 @@ impl Web3Rpcs {
.blocks_by_hash
.get_or_insert_async::<Infallible, _>(&block_hash, async move { Ok(block) })
.await
.unwrap();
.expect("this cache get is infallible");
Ok(block)
}
@ -219,13 +218,11 @@ impl Web3Rpcs {
// TODO: if error, retry?
let block: Web3ProxyBlock = match rpc {
Some(rpc) => rpc
.wait_for_request_handle(authorization, Some(Duration::from_secs(30)), None)
.await?
.request::<_, Option<ArcBlock>>(
"eth_getBlockByHash",
&json!(get_block_params),
Level::Error.into(),
None,
authorization.clone(),
)
.await?
.and_then(|x| {
@ -367,7 +364,7 @@ impl Web3Rpcs {
// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
// Geth's subscriptions have the same potential for skipping blocks.
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
) -> Web3ProxyResult<()> {
let mut connection_heads = ConsensusFinder::new(self.max_block_age, self.max_block_lag);
loop {

@ -374,7 +374,7 @@ impl ConsensusFinder {
.first_seen
.get_or_insert_async::<Infallible>(block.hash(), async { Ok(Instant::now()) })
.await
.unwrap();
.expect("this cache get is infallible");
// calculate elapsed time before trying to lock
let latency = first_seen.elapsed();

@ -3,7 +3,7 @@ use super::blockchain::{BlocksByHashCache, BlocksByNumberCache, Web3ProxyBlock};
use super::consensus::{ConsensusWeb3Rpcs, ShouldWaitForBlock};
use super::one::Web3Rpc;
use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler};
use crate::app::{flatten_handle, AnyhowJoinHandle, Web3ProxyApp};
use crate::app::{flatten_handle, Web3ProxyApp, Web3ProxyJoinHandle};
use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig};
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult};
@ -11,7 +11,6 @@ use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::jsonrpc::{JsonRpcErrorData, JsonRpcRequest};
use crate::response_cache::JsonRpcResponseData;
use crate::rpcs::transactions::TxStatus;
use anyhow::Context;
use arc_swap::ArcSwap;
use counter::Counter;
use derive_more::From;
@ -35,8 +34,9 @@ use std::fmt::{self, Display};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use thread_fast_rng::rand::seq::SliceRandom;
use tokio::select;
use tokio::sync::{broadcast, watch};
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
use tokio::time::{sleep, sleep_until, Duration, Instant};
/// A collection of web3 connections. Sends requests either the current best server or all servers.
#[derive(From)]
@ -46,8 +46,6 @@ pub struct Web3Rpcs {
pub(crate) block_sender: flume::Sender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
/// any requests will be forwarded to one (or more) of these connections
pub(crate) by_name: ArcSwap<HashMap<String, Arc<Web3Rpc>>>,
/// notify all http providers to check their blocks at the same time
pub(crate) http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
/// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender`
/// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
/// TODO: why is watch_consensus_head_sender in an Option, but this one isn't?
@ -78,9 +76,7 @@ impl Web3Rpcs {
/// Spawn durable connections to multiple Web3 providers.
#[allow(clippy::too_many_arguments)]
pub async fn spawn(
chain_id: u64,
db_conn: Option<DatabaseConnection>,
http_client: Option<reqwest::Client>,
max_block_age: Option<u64>,
max_block_lag: Option<U64>,
min_head_rpcs: usize,
@ -91,82 +87,18 @@ impl Web3Rpcs {
watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
) -> anyhow::Result<(
Arc<Self>,
AnyhowJoinHandle<()>,
Web3ProxyJoinHandle<()>,
watch::Receiver<Option<Arc<ConsensusWeb3Rpcs>>>,
// watch::Receiver<Arc<ConsensusWeb3Rpcs>>,
)> {
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) = flume::unbounded::<BlockAndRpc>();
// TODO: query the rpc to get the actual expected block time, or get from config? maybe have this be part of a health check?
let expected_block_time_ms = match chain_id {
// ethereum
1 => 12_000,
// ethereum-goerli
5 => 12_000,
// polygon
137 => 2_000,
// fantom
250 => 1_000,
// arbitrum
42161 => 500,
// anything else
_ => {
warn!(
"unexpected chain_id ({}). polling every {} seconds",
chain_id, 10
);
10_000
}
};
let http_interval_sender = if http_client.is_some() {
let (sender, _) = broadcast::channel(1);
// TODO: what interval? follow a websocket also? maybe by watching synced connections with a timeout. will need debounce
let mut interval = interval(Duration::from_millis(expected_block_time_ms / 2));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let sender = Arc::new(sender);
let f = {
let sender = sender.clone();
async move {
loop {
interval.tick().await;
// trace!("http interval ready");
if sender.send(()).is_err() {
// errors are okay. they mean that all receivers have been dropped, or the rpcs just haven't started yet
// TODO: i'm seeing this error a lot more than expected
trace!("no http receivers");
};
}
}
};
// TODO: do something with this handle?
tokio::spawn(f);
Some(sender)
} else {
None
};
// these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes
// TODO: how can we do the weigher this? need to know actual allocated size
// TODO: actual weighter on this
// TODO: time_to_idle instead?
// TODO: limits from config
let blocks_by_hash: BlocksByHashCache =
Arc::new(CacheWithTTL::new_with_capacity(10_000, Duration::from_secs(30 * 60)).await);
// .max_capacity(1024 * 1024 * 1024)
// .weigher(|_k, v: &Web3ProxyBlock| {
// 1 + v.block.transactions.len().try_into().unwrap_or(u32::MAX)
// })
// .time_to_live(Duration::from_secs(30 * 60))
// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// all block numbers are the same size, so no need for weigher
// TODO: limits from config
@ -185,7 +117,6 @@ impl Web3Rpcs {
blocks_by_hash,
blocks_by_number,
by_name,
http_interval_sender,
max_block_age,
max_block_lag,
min_head_rpcs,
@ -214,7 +145,7 @@ impl Web3Rpcs {
&self,
app: &Web3ProxyApp,
rpc_configs: HashMap<String, Web3RpcConfig>,
) -> anyhow::Result<()> {
) -> Web3ProxyResult<()> {
// safety checks
if rpc_configs.len() < app.config.min_synced_rpcs {
// TODO: don't count disabled servers!
@ -232,15 +163,14 @@ impl Web3Rpcs {
let sum_soft_limit = rpc_configs.values().fold(0, |acc, x| acc + x.soft_limit);
// TODO: require a buffer?
anyhow::ensure!(
sum_soft_limit >= self.min_sum_soft_limit,
"Only {}/{} soft limit! Add more rpcs, increase soft limits, or reduce min_sum_soft_limit.",
sum_soft_limit,
self.min_sum_soft_limit,
);
if sum_soft_limit < self.min_sum_soft_limit {
return Err(Web3ProxyError::NotEnoughSoftLimit {
available: sum_soft_limit,
needed: self.min_sum_soft_limit,
});
}
// turn configs into connections (in parallel)
// TODO: move this into a helper function. then we can use it when configs change (will need a remove function too)
let mut spawn_handles: FuturesUnordered<_> = rpc_configs
.into_iter()
.filter_map(|(server_name, server_config)| {
@ -261,7 +191,6 @@ impl Web3Rpcs {
let pending_tx_id_sender = Some(self.pending_tx_id_sender.clone());
let blocks_by_hash_cache = self.blocks_by_hash.clone();
let http_interval_sender = self.http_interval_sender.clone();
let chain_id = app.config.chain_id;
debug!("spawning {}", server_name);
@ -272,11 +201,9 @@ impl Web3Rpcs {
vredis_pool,
chain_id,
http_client,
http_interval_sender,
blocks_by_hash_cache,
block_sender,
pending_tx_id_sender,
true,
));
Some(handle)
@ -303,9 +230,9 @@ impl Web3Rpcs {
while new_head_receiver.borrow_and_update().is_none() {
new_head_receiver.changed().await?;
}
}
old_rpc.disconnect().await.context("disconnect old rpc")?;
// TODO: tell ethers to disconnect? is there a function for that?
}
}
// TODO: what should we do with the new handle? make sure error logs aren't dropped
@ -313,7 +240,7 @@ impl Web3Rpcs {
Ok(Err(err)) => {
// if we got an error here, the app can continue on
// TODO: include context about which connection failed
// TODO: will this retry automatically? i don't think so
// TODO: retry automatically
error!("Unable to create connection. err={:?}", err);
}
Err(err) => {
@ -323,6 +250,15 @@ impl Web3Rpcs {
}
}
let num_rpcs = self.by_name.load().len();
if num_rpcs < self.min_head_rpcs {
return Err(Web3ProxyError::NotEnoughRpcs {
num_known: num_rpcs,
min_head_rpcs: self.min_head_rpcs,
});
}
Ok(())
}
@ -350,7 +286,7 @@ impl Web3Rpcs {
authorization: Arc<Authorization>,
block_receiver: flume::Receiver<BlockAndRpc>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
) -> Web3ProxyResult<()> {
let mut futures = vec![];
// setup the transaction funnel
@ -435,7 +371,7 @@ impl Web3Rpcs {
.into_iter()
.map(|active_request_handle| async move {
let result: Result<Box<RawValue>, _> = active_request_handle
.request(method, &json!(&params), error_level.into(), None)
.request(method, &json!(&params), error_level.into())
.await;
result
})
@ -508,7 +444,7 @@ impl Web3Rpcs {
skip.push(Arc::clone(faster_rpc));
// just because it has lower latency doesn't mean we are sure to get a connection. there might be rate limits
match faster_rpc.try_request_handle(authorization, None).await {
match faster_rpc.try_request_handle(authorization).await {
Ok(OpenRequestResult::Handle(handle)) => {
trace!("opened handle: {}", faster_rpc);
return OpenRequestResult::Handle(handle);
@ -550,6 +486,7 @@ impl Web3Rpcs {
skip_rpcs: &mut Vec<Arc<Web3Rpc>>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
max_wait: Option<Duration>,
) -> Web3ProxyResult<OpenRequestResult> {
let mut earliest_retry_at: Option<Instant> = None;
@ -591,16 +528,14 @@ impl Web3Rpcs {
}
}
} else {
let start = Instant::now();
// TODO: get from config or argument
let max_wait = Duration::from_secs(10);
let stop_trying_at =
Instant::now() + max_wait.unwrap_or_else(|| Duration::from_secs(10));
let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe();
let mut potential_rpcs = Vec::with_capacity(self.by_name.load().len());
while start.elapsed() < max_wait {
loop {
let consensus_rpcs = watch_consensus_rpcs.borrow_and_update().clone();
potential_rpcs.clear();
@ -718,12 +653,16 @@ impl Web3Rpcs {
match consensus_rpcs.should_wait_for_block(waiting_for, skip_rpcs) {
ShouldWaitForBlock::NeverReady => break,
ShouldWaitForBlock::Ready => continue,
ShouldWaitForBlock::Wait { .. } => {}
ShouldWaitForBlock::Ready => {}
ShouldWaitForBlock::Wait { .. } => select! {
_ = watch_consensus_rpcs.changed() => {},
_ = sleep_until(stop_trying_at) => {},
},
}
}
// TODO: select on consensus_rpcs changing and on earliest_retry_at
watch_consensus_rpcs.changed().await?;
if Instant::now() > stop_trying_at {
break;
}
}
}
@ -831,7 +770,7 @@ impl Web3Rpcs {
}
// check rate limits and increment our connection counter
match rpc.try_request_handle(authorization, None).await {
match rpc.try_request_handle(authorization).await {
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// this rpc is not available. skip it
trace!("{} is rate limited. skipping", rpc);
@ -888,6 +827,7 @@ impl Web3Rpcs {
&mut skip_rpcs,
min_block_needed,
max_block_needed,
None,
)
.await?
{
@ -908,7 +848,6 @@ impl Web3Rpcs {
&request.method,
&json!(request.params),
RequestErrorHandler::Save,
None,
)
.await;
@ -943,7 +882,11 @@ impl Web3Rpcs {
let rate_limit_substrings = ["limit", "exceeded", "quota usage"];
for rate_limit_substr in rate_limit_substrings {
if error_msg.contains(rate_limit_substr) {
warn!("rate limited by {}", skip_rpcs.last().unwrap());
warn!(
"rate limited ({}) by {}",
error_msg,
skip_rpcs.last().unwrap()
);
continue;
}
}
@ -1307,8 +1250,8 @@ mod tests {
use std::time::{SystemTime, UNIX_EPOCH};
use super::*;
use crate::rpcs::blockchain::Web3ProxyBlock;
use crate::rpcs::consensus::ConsensusFinder;
use crate::rpcs::{blockchain::Web3ProxyBlock, provider::Web3Provider};
use arc_swap::ArcSwap;
use ethers::types::H256;
use ethers::types::{Block, U256};
@ -1319,8 +1262,7 @@ mod tests {
#[cfg(test)]
fn new_peak_latency() -> PeakEwmaLatency {
const NANOS_PER_MILLI: f64 = 1_000_000.0;
PeakEwmaLatency::spawn(1_000.0 * NANOS_PER_MILLI, 4, Duration::from_secs(1))
PeakEwmaLatency::spawn(Duration::from_secs(1), 4, Duration::from_secs(1))
}
#[tokio::test]
@ -1451,7 +1393,6 @@ mod tests {
block_data_limit: block_data_limit.into(),
tier: 0,
head_block: Some(tx_synced),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
peak_latency: Some(new_peak_latency()),
..Default::default()
};
@ -1466,7 +1407,6 @@ mod tests {
block_data_limit: block_data_limit.into(),
tier: 0,
head_block: Some(tx_lagged),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
peak_latency: Some(new_peak_latency()),
..Default::default()
};
@ -1494,7 +1434,6 @@ mod tests {
let rpcs = Web3Rpcs {
block_sender: block_sender.clone(),
by_name: ArcSwap::from_pointee(rpcs_by_name),
http_interval_sender: None,
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
@ -1559,6 +1498,7 @@ mod tests {
&mut vec![],
Some(head_block.number.as_ref().unwrap()),
None,
Some(Duration::from_secs(0)),
)
.await
.unwrap();
@ -1572,7 +1512,7 @@ mod tests {
.send_head_block_result(
Ok(Some(lagged_block.clone())),
&block_sender,
rpcs.blocks_by_hash.clone(),
&rpcs.blocks_by_hash,
)
.await
.unwrap();
@ -1592,7 +1532,7 @@ mod tests {
.send_head_block_result(
Ok(Some(lagged_block.clone())),
&block_sender,
rpcs.blocks_by_hash.clone(),
&rpcs.blocks_by_hash,
)
.await
.unwrap();
@ -1624,7 +1564,7 @@ mod tests {
.send_head_block_result(
Ok(Some(head_block.clone())),
&block_sender,
rpcs.blocks_by_hash.clone(),
&rpcs.blocks_by_hash,
)
.await
.unwrap();
@ -1650,28 +1590,56 @@ mod tests {
// TODO: make sure the handle is for the expected rpc
assert!(matches!(
rpcs.wait_for_best_rpc(&authorization, None, &mut vec![], None, None)
.await,
rpcs.wait_for_best_rpc(
&authorization,
None,
&mut vec![],
None,
None,
Some(Duration::from_secs(0))
)
.await,
Ok(OpenRequestResult::Handle(_))
));
// TODO: make sure the handle is for the expected rpc
assert!(matches!(
rpcs.wait_for_best_rpc(&authorization, None, &mut vec![], Some(&0.into()), None)
.await,
rpcs.wait_for_best_rpc(
&authorization,
None,
&mut vec![],
Some(&0.into()),
None,
Some(Duration::from_secs(0)),
)
.await,
Ok(OpenRequestResult::Handle(_))
));
// TODO: make sure the handle is for the expected rpc
assert!(matches!(
rpcs.wait_for_best_rpc(&authorization, None, &mut vec![], Some(&1.into()), None)
.await,
rpcs.wait_for_best_rpc(
&authorization,
None,
&mut vec![],
Some(&1.into()),
None,
Some(Duration::from_secs(0)),
)
.await,
Ok(OpenRequestResult::Handle(_))
));
// future block should not get a handle
let future_rpc = rpcs
.wait_for_best_rpc(&authorization, None, &mut vec![], Some(&2.into()), None)
.wait_for_best_rpc(
&authorization,
None,
&mut vec![],
Some(&2.into()),
None,
Some(Duration::from_secs(0)),
)
.await;
assert!(matches!(future_rpc, Ok(OpenRequestResult::NotReady)));
}
@ -1707,7 +1675,6 @@ mod tests {
block_data_limit: 64.into(),
tier: 1,
head_block: Some(tx_pruned),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
..Default::default()
};
@ -1721,7 +1688,6 @@ mod tests {
block_data_limit: u64::MAX.into(),
tier: 2,
head_block: Some(tx_archive),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
..Default::default()
};
@ -1743,11 +1709,9 @@ mod tests {
let (watch_consensus_rpcs_sender, _) = watch::channel(None);
let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None);
// TODO: make a Web3Rpcs::new
let rpcs = Web3Rpcs {
block_sender,
by_name: ArcSwap::from_pointee(rpcs_by_name),
http_interval_sender: None,
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
@ -1802,6 +1766,7 @@ mod tests {
&mut vec![],
Some(head_block.number()),
None,
Some(Duration::from_secs(0)),
)
.await;
@ -1813,13 +1778,27 @@ mod tests {
));
let _best_available_server_from_none = rpcs
.wait_for_best_rpc(&authorization, None, &mut vec![], None, None)
.wait_for_best_rpc(
&authorization,
None,
&mut vec![],
None,
None,
Some(Duration::from_secs(0)),
)
.await;
// assert_eq!(best_available_server, best_available_server_from_none);
let best_archive_server = rpcs
.wait_for_best_rpc(&authorization, None, &mut vec![], Some(&1.into()), None)
.wait_for_best_rpc(
&authorization,
None,
&mut vec![],
Some(&1.into()),
None,
Some(Duration::from_secs(0)),
)
.await;
match best_archive_server {
@ -1876,7 +1855,6 @@ mod tests {
block_data_limit: 64.into(),
tier: 0,
head_block: Some(tx_mock_geth),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
peak_latency: Some(new_peak_latency()),
..Default::default()
};
@ -1889,7 +1867,6 @@ mod tests {
block_data_limit: u64::MAX.into(),
tier: 1,
head_block: Some(tx_mock_erigon_archive),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
peak_latency: Some(new_peak_latency()),
..Default::default()
};
@ -1919,7 +1896,6 @@ mod tests {
let rpcs = Web3Rpcs {
block_sender,
by_name: ArcSwap::from_pointee(rpcs_by_name),
http_interval_sender: None,
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,

File diff suppressed because it is too large Load Diff

@ -1,96 +1,80 @@
use anyhow::anyhow;
use derive_more::From;
use ethers::providers::{Authorization, ConnectionDetails};
use std::{borrow::Cow, time::Duration};
use std::time::Duration;
use url::Url;
// TODO: our own structs for these that handle streaming large responses
type EthersHttpProvider = ethers::providers::Provider<ethers::providers::Http>;
type EthersWsProvider = ethers::providers::Provider<ethers::providers::Ws>;
pub type EthersHttpProvider = ethers::providers::Provider<ethers::providers::Http>;
pub type EthersWsProvider = ethers::providers::Provider<ethers::providers::Ws>;
/// Use HTTP and WS providers.
// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit <https://github.com/gakonst/ethers-rs/issues/592>
// TODO: custom types that let us stream JSON responses
#[derive(From)]
pub enum Web3Provider {
Both(EthersHttpProvider, EthersWsProvider),
Http(EthersHttpProvider),
// TODO: deadpool? custom tokio-tungstenite
Ws(EthersWsProvider),
#[cfg(test)]
Mock,
}
pub fn extract_auth(url: &mut Url) -> Option<Authorization> {
if let Some(pass) = url.password().map(|x| x.to_string()) {
// to_string is needed because we are going to remove these items from the url
let user = url.username().to_string();
impl Web3Provider {
pub fn http(&self) -> Option<&EthersHttpProvider> {
match self {
Self::Http(x) => Some(x),
_ => None,
}
}
// clear username and password from the url
url.set_username("")
.expect("unable to clear username on websocket");
url.set_password(None)
.expect("unable to clear password on websocket");
pub fn ws(&self) -> Option<&EthersWsProvider> {
match self {
Self::Both(_, x) | Self::Ws(x) => Some(x),
_ => None,
}
}
/// Note, if the http url has an authority the http_client param is ignored and a dedicated http_client will be used
/// TODO: take a reqwest::Client or a reqwest::ClientBuilder. that way we can do things like set compression even when auth is set
pub async fn new(
mut url: Cow<'_, Url>,
http_client: Option<reqwest::Client>,
) -> anyhow::Result<Self> {
let auth = if let Some(pass) = url.password().map(|x| x.to_string()) {
// to_string is needed because we are going to remove these items from the url
let user = url.username().to_string();
// clear username and password from the url
let mut_url = url.to_mut();
mut_url
.set_username("")
.map_err(|_| anyhow!("unable to clear username on websocket"))?;
mut_url
.set_password(None)
.map_err(|_| anyhow!("unable to clear password on websocket"))?;
// keep them
Some(Authorization::basic(user, pass))
} else {
None
};
let provider = if url.scheme().starts_with("http") {
let provider = if let Some(auth) = auth {
ethers::providers::Http::new_with_auth(url.into_owned(), auth)?
} else if let Some(http_client) = http_client {
ethers::providers::Http::new_with_client(url.into_owned(), http_client)
} else {
ethers::providers::Http::new(url.into_owned())
};
// TODO: i don't think this interval matters for our uses, but we should probably set it to like `block time / 2`
ethers::providers::Provider::new(provider)
.interval(Duration::from_secs(12))
.into()
} else if url.scheme().starts_with("ws") {
let provider = if auth.is_some() {
let connection_details = ConnectionDetails::new(url.as_str(), auth);
ethers::providers::Ws::connect(connection_details).await?
} else {
ethers::providers::Ws::connect(url.as_str()).await?
};
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
// TODO: i don't think this interval matters
ethers::providers::Provider::new(provider).into()
} else {
return Err(anyhow::anyhow!("only http and ws servers are supported"));
};
Ok(provider)
// keep them
Some(Authorization::basic(user, pass))
} else {
None
}
}
/// Note, if the http url has an authority the http_client param is ignored and a dedicated http_client will be used
/// TODO: take a reqwest::Client or a reqwest::ClientBuilder. that way we can do things like set compression even when auth is set
pub fn connect_http(
mut url: Url,
http_client: Option<reqwest::Client>,
interval: Duration,
) -> anyhow::Result<EthersHttpProvider> {
let auth = extract_auth(&mut url);
let mut provider = if url.scheme().starts_with("http") {
let provider = if let Some(auth) = auth {
ethers::providers::Http::new_with_auth(url, auth)?
} else if let Some(http_client) = http_client {
ethers::providers::Http::new_with_client(url, http_client)
} else {
ethers::providers::Http::new(url)
};
// TODO: i don't think this interval matters for our uses, but we should probably set it to like `block time / 2`
ethers::providers::Provider::new(provider).interval(Duration::from_secs(2))
} else {
return Err(anyhow::anyhow!(
"only http servers are supported. cannot use {}",
url
));
};
provider.set_interval(interval);
Ok(provider)
}
pub async fn connect_ws(mut url: Url, reconnects: usize) -> anyhow::Result<EthersWsProvider> {
let auth = extract_auth(&mut url);
let provider = if url.scheme().starts_with("ws") {
let provider = if auth.is_some() {
let connection_details = ConnectionDetails::new(url.as_str(), auth);
// if they error, we do our own reconnection with backoff
ethers::providers::Ws::connect_with_reconnects(connection_details, reconnects).await?
} else {
ethers::providers::Ws::connect_with_reconnects(url.as_str(), reconnects).await?
};
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
// TODO: i don't think this interval matters
ethers::providers::Provider::new(provider)
} else {
return Err(anyhow::anyhow!("ws servers are supported"));
};
Ok(provider)
}

@ -1,6 +1,6 @@
use super::one::Web3Rpc;
use super::provider::Web3Provider;
use crate::frontend::authorization::Authorization;
use crate::frontend::errors::Web3ProxyResult;
use anyhow::Context;
use chrono::Utc;
use entities::revert_log;
@ -14,7 +14,7 @@ use std::fmt;
use std::sync::atomic;
use std::sync::Arc;
use thread_fast_rng::rand::Rng;
use tokio::time::{sleep, Duration, Instant};
use tokio::time::{Duration, Instant};
#[derive(Debug)]
pub enum OpenRequestResult {
@ -76,7 +76,7 @@ impl Authorization {
self: Arc<Self>,
method: Method,
params: EthCallFirstParams,
) -> anyhow::Result<()> {
) -> Web3ProxyResult<()> {
let rpc_key_id = match self.checks.rpc_secret_key_id {
Some(rpc_key_id) => rpc_key_id.into(),
None => {
@ -158,7 +158,6 @@ impl OpenRequestHandle {
method: &str,
params: &P,
mut error_handler: RequestErrorHandler,
unlocked_provider: Option<Arc<Web3Provider>>,
) -> Result<R, ProviderError>
where
// TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it
@ -170,29 +169,6 @@ impl OpenRequestHandle {
// trace!(rpc=%self.rpc, %method, "request");
trace!("requesting from {}", self.rpc);
let mut provider = if unlocked_provider.is_some() {
unlocked_provider
} else {
self.rpc.provider.read().await.clone()
};
let mut logged = false;
// TODO: instead of a lock, i guess it should be a watch?
while provider.is_none() {
// trace!("waiting on provider: locking...");
// TODO: i dont like this. subscribing to a channel could be better
sleep(Duration::from_millis(100)).await;
if !logged {
debug!("no provider for open handle on {}", self.rpc);
logged = true;
}
provider = self.rpc.provider.read().await.clone();
}
let provider = provider.expect("provider was checked already");
self.rpc
.total_requests
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
@ -202,21 +178,18 @@ impl OpenRequestHandle {
let start = Instant::now();
// TODO: replace ethers-rs providers with our own that supports streaming the responses
let response = match provider.as_ref() {
#[cfg(test)]
Web3Provider::Mock => {
return Err(ProviderError::CustomError(
"mock provider can't respond".to_string(),
))
}
Web3Provider::Ws(p) => p.request(method, params).await,
Web3Provider::Http(p) | Web3Provider::Both(p, _) => {
// TODO: i keep hearing that http is faster. but ws has always been better for me. investigate more with actual benchmarks
p.request(method, params).await
}
// TODO: replace ethers-rs providers with our own that handles "id" being null
let response: Result<R, _> = if let Some(ref p) = self.rpc.http_provider {
p.request(method, params).await
} else if let Some(ref p) = self.rpc.ws_provider {
p.request(method, params).await
} else {
return Err(ProviderError::CustomError(
"no provider configured!".to_string(),
));
};
// note. we intentionally do not record this latency now. we do NOT want to measure errors
// we do NOT want to measure errors, so we intentionally do not record this latency now.
let latency = start.elapsed();
// we used to fetch_sub the active_request count here, but sometimes the handle is dropped without request being called!
@ -277,11 +250,7 @@ impl OpenRequestHandle {
// TODO: move this info a function on ResponseErrorType
let response_type = if let ProviderError::JsonRpcClientError(err) = err {
// Http and Ws errors are very similar, but different types
let msg = match &*provider {
#[cfg(test)]
Web3Provider::Mock => unimplemented!(),
_ => err.as_error_response().map(|x| x.message.clone()),
};
let msg = err.as_error_response().map(|x| x.message.clone());
trace!("error message: {:?}", msg);
@ -390,9 +359,7 @@ impl OpenRequestHandle {
}
}
} else if let Some(peak_latency) = &self.rpc.peak_latency {
// trace!("updating peak_latency: {}", latency.as_secs_f64());
// peak_latency.report(latency);
trace!("peak latency disabled for now");
peak_latency.report(latency);
} else {
unreachable!("peak_latency not initialized");
}

@ -1,4 +1,4 @@
use crate::frontend::authorization::Authorization;
use crate::frontend::{authorization::Authorization, errors::Web3ProxyResult};
use super::many::Web3Rpcs;
///! Load balanced communication with a group of web3 providers
@ -29,14 +29,13 @@ impl Web3Rpcs {
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
// TODO: if one rpc fails, try another?
// TODO: try_request_handle, or wait_for_request_handle? I think we want wait here
let tx: Transaction = match rpc.try_request_handle(authorization, None).await {
let tx: Transaction = match rpc.try_request_handle(authorization).await {
Ok(OpenRequestResult::Handle(handle)) => {
handle
.request(
"eth_getTransactionByHash",
&(pending_tx_id,),
Level::Error.into(),
None,
)
.await?
}
@ -71,7 +70,7 @@ impl Web3Rpcs {
rpc: Arc<Web3Rpc>,
pending_tx_id: TxHash,
pending_tx_sender: broadcast::Sender<TxStatus>,
) -> anyhow::Result<()> {
) -> Web3ProxyResult<()> {
// TODO: how many retries? until some timestamp is hit is probably better. maybe just loop and call this with a timeout
// TODO: after more investigation, i don't think retries will help. i think this is because chains of transactions get dropped from memory
// TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it?

@ -8,9 +8,9 @@ pub use stat_buffer::{SpawnedStatBuffer, StatBuffer};
use crate::app::RpcSecretKeyCache;
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::frontend::errors::Web3ProxyError;
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult};
use crate::rpcs::one::Web3Rpc;
use anyhow::Context;
use anyhow::{anyhow, Context};
use axum::headers::Origin;
use chrono::{DateTime, Months, TimeZone, Utc};
use derive_more::From;
@ -229,13 +229,14 @@ impl BufferedRpcQueryStats {
db_conn: &DatabaseConnection,
key: RpcQueryKey,
rpc_secret_key_cache: Option<&RpcSecretKeyCache>,
) -> anyhow::Result<()> {
anyhow::ensure!(
key.response_timestamp > 0,
"no response_timestamp! This is a bug! {:?} {:?}",
key,
self
);
) -> Web3ProxyResult<()> {
if key.response_timestamp == 0 {
return Err(Web3ProxyError::Anyhow(anyhow!(
"no response_timestamp! This is a bug! {:?} {:?}",
key,
self
)));
}
let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap();
@ -670,8 +671,9 @@ impl RpcQueryStats {
method: Option<&str>,
) -> Decimal {
// for now, always return 0 for cost
return 0.into();
0.into()
/*
// some methods should be free. there might be cases where method isn't set (though they should be uncommon)
// TODO: get this list from config (and add more to it)
if let Some(method) = method.as_ref() {
@ -704,5 +706,6 @@ impl RpcQueryStats {
}
cost
*/
}
}

@ -1,5 +1,6 @@
use super::{AppStat, RpcQueryKey};
use crate::app::RpcSecretKeyCache;
use crate::app::{RpcSecretKeyCache, Web3ProxyJoinHandle};
use crate::frontend::errors::Web3ProxyResult;
use derive_more::From;
use futures::stream;
use hashbrown::HashMap;
@ -9,7 +10,6 @@ use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::DatabaseConnection;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tokio::time::interval;
#[derive(Debug, Default)]
@ -32,7 +32,7 @@ pub struct BufferedRpcQueryStats {
pub struct SpawnedStatBuffer {
pub stat_sender: flume::Sender<AppStat>,
/// these handles are important and must be allowed to finish
pub background_handle: JoinHandle<anyhow::Result<()>>,
pub background_handle: Web3ProxyJoinHandle<()>,
}
pub struct StatBuffer {
accounting_db_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
@ -96,7 +96,7 @@ impl StatBuffer {
bucket: String,
stat_receiver: flume::Receiver<AppStat>,
mut shutdown_receiver: broadcast::Receiver<()>,
) -> anyhow::Result<()> {
) -> Web3ProxyResult<()> {
let mut tsdb_save_interval =
interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64));
let mut db_save_interval =