diff --git a/Cargo.lock b/Cargo.lock index 95aec00b..0c211658 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -263,7 +263,7 @@ checksum = "f8175979259124331c1d7bf6586ee7e0da434155e4b2d48ec2c8386281d8df39" dependencies = [ "async-trait", "axum-core", - "base64 0.21.0", + "base64 0.21.1", "bitflags", "bytes", "futures-util", @@ -379,9 +379,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" [[package]] name = "base64" -version = "0.21.0" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" +checksum = "3f1e31e207a6b8fb791a38ea3105e6cb541f55e4d029902d3039a4ad07cc4105" [[package]] name = "base64ct" @@ -532,9 +532,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.12.2" +version = "3.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c6ed94e98ecff0c12dd1b04c15ec0d7d9458ca8fe806cea6f12954efe74c63b" +checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" [[package]] name = "byte-slice-cast" @@ -797,7 +797,7 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b949a1c63fb7eb591eb7ba438746326aedf0ae843e51ec92ba6bec5bb382c4f" dependencies = [ - "base64 0.21.0", + "base64 0.21.1", "bech32", "bs58", "digest 0.10.6", @@ -1526,7 +1526,7 @@ dependencies = [ [[package]] name = "entities" -version = "0.27.0" +version = "0.28.0" dependencies = [ "ethers", "sea-orm", @@ -1812,7 +1812,7 @@ checksum = "1009041f40476b972b5d79346cc512e97c662b1a0a2f78285eabe9a122909783" dependencies = [ "async-trait", "auto_impl", - "base64 0.21.0", + "base64 0.21.1", "bytes", "enr", "ethers-core", @@ -2941,6 +2941,7 @@ name = "latency" version = "0.1.0" dependencies = [ "ewma", + "flume", "log", "serde", "tokio", @@ -3000,9 +3001,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.3.7" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ece97ea872ece730aed82664c424eb4c8291e1ff2480247ccf7409044bc6479f" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" [[package]] name = "listenfd" @@ -3090,7 +3091,7 @@ dependencies = [ [[package]] name = "migration" -version = "0.27.0" +version = "0.28.0" dependencies = [ "sea-orm-migration", "tokio", @@ -4278,13 +4279,13 @@ dependencies = [ [[package]] name = "regex" -version = "1.8.1" +version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af83e617f331cc6ae2da5443c602dfa5af81e517212d9d611a5b3ba1777b5370" +checksum = "d1a59b5d8e97dee33696bf13c5ba8ab85341c002922fba050069326b9c498974" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.7.1", + "regex-syntax 0.7.2", ] [[package]] @@ -4304,9 +4305,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] name = "regex-syntax" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5996294f19bd3aae0453a862ad728f60e6600695733dd5df01da90c54363a3c" +checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78" [[package]] name = "rend" @@ -4323,7 +4324,7 @@ version = "0.11.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cde824a14b7c14f85caff81225f411faacc04a2013f41670f41443742b1c1c55" dependencies = [ - "base64 0.21.0", + "base64 0.21.1", "bytes", "encoding_rs", "futures-core", @@ -4584,7 +4585,7 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b" dependencies = [ - "base64 0.21.0", + "base64 0.21.1", ] [[package]] @@ -4899,9 +4900,9 @@ dependencies = [ [[package]] name = "security-framework" -version = "2.9.0" +version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca2855b3715770894e67cbfa3df957790aa0c9edc3bf06efa1a84d77fa0839d1" +checksum = "1fc758eb7bffce5b308734e9b0c1468893cae9ff70ebf13e7090be8dcbcc83a8" dependencies = [ "bitflags", "core-foundation", @@ -4943,9 +4944,9 @@ checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" [[package]] name = "sentry" -version = "0.31.1" +version = "0.31.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37dd6c0cdca6b1d1ca44cde7fff289f2592a97965afec870faa7b81b9fc87745" +checksum = "234f6e133d27140ad5ea3b369a7665f7fbc060fe246f81d8168665b38c08b600" dependencies = [ "httpdate", "reqwest", @@ -4963,9 +4964,9 @@ dependencies = [ [[package]] name = "sentry-anyhow" -version = "0.31.1" +version = "0.31.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9c3d7032fff178c77c107c32c6d3337b12847adf67165ccc876c898e7154b00" +checksum = "47e940be4c63b29006b4ac422cacea932c2cb5f8c209647ee86446ed27595a42" dependencies = [ "anyhow", "sentry-backtrace", @@ -4974,9 +4975,9 @@ dependencies = [ [[package]] name = "sentry-backtrace" -version = "0.31.1" +version = "0.31.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c029fe8317cdd75cb2b52c600bab4e2ef64c552198e669ba874340447f330962" +checksum = "d89b6b53de06308dd5ac08934b597bcd72a9aae0c20bc3ab06da69cb34d468e3" dependencies = [ "backtrace", "once_cell", @@ -4986,9 +4987,9 @@ dependencies = [ [[package]] name = "sentry-contexts" -version = "0.31.1" +version = "0.31.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc575098d73c8b942b589ab453b06e4c43527556dd8f95532220d1b54d7c6b4b" +checksum = "0769b66763e59976cd5c0fd817dcd51ccce404de8bebac0cd0e886c55b0fffa8" dependencies = [ "hostname", "libc", @@ -5000,9 +5001,9 @@ dependencies = [ [[package]] name = "sentry-core" -version = "0.31.1" +version = "0.31.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20216140001bbf05895f013abd0dae4df58faee24e016d54cbf107f070bac56b" +checksum = "a1f954f1b89e8cd82576dc49bfab80304c9a6201343b4fe5c68c819f7a9bbed2" dependencies = [ "once_cell", "rand", @@ -5013,9 +5014,9 @@ dependencies = [ [[package]] name = "sentry-log" -version = "0.31.1" +version = "0.31.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a43934e48e9c8e2c7d0dcb9c6cbcfcbe3ee109a14fc0c821e8944acd4faa2c25" +checksum = "958f539c85854acf7fa0fa46f07077be9050c7b28602ddfb5a651e9d11b27740" dependencies = [ "log", "sentry-core", @@ -5023,9 +5024,9 @@ dependencies = [ [[package]] name = "sentry-panic" -version = "0.31.1" +version = "0.31.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e45cd0a113fc06d6edba01732010518816cdc8ce3bccc70f5e41570046bf046" +checksum = "94dc2ab494362ad51308c7c19f44e9ab70e426a931621e4a05f378a1e74558c2" dependencies = [ "sentry-backtrace", "sentry-core", @@ -5033,9 +5034,9 @@ dependencies = [ [[package]] name = "sentry-types" -version = "0.31.1" +version = "0.31.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7f6959d8cb3a77be27e588eef6ce9a2a469651a556d9de662e4d07e5ace4232" +checksum = "85c53caf80cb1c6fcdf4d82b7bfff8477f50841e4caad7bf8e5e57a152b564cb" dependencies = [ "debugid", "getrandom", @@ -5988,7 +5989,7 @@ checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" dependencies = [ "async-trait", "axum", - "base64 0.21.0", + "base64 0.21.1", "bytes", "futures-core", "futures-util", @@ -6502,7 +6503,7 @@ dependencies = [ [[package]] name = "web3_proxy" -version = "0.27.0" +version = "0.28.0" dependencies = [ "anyhow", "arc-swap", @@ -6532,6 +6533,7 @@ dependencies = [ "hex_fmt", "hostname", "http", + "hyper", "influxdb2", "influxdb2-structmap", "ipnet", diff --git a/entities/Cargo.toml b/entities/Cargo.toml index 0c6b4df8..b0cf89e7 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "entities" -version = "0.27.0" +version = "0.28.0" edition = "2021" [lib] diff --git a/entities/src/admin_increase_balance_receipt.rs b/entities/src/admin_increase_balance_receipt.rs new file mode 100644 index 00000000..d3d3cf27 --- /dev/null +++ b/entities/src/admin_increase_balance_receipt.rs @@ -0,0 +1,49 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "admin_increase_balance_receipt")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i32, + #[sea_orm(column_type = "Decimal(Some((20, 10)))")] + pub amount: Decimal, + pub admin_id: u64, + pub deposit_to_user_id: u64, + pub note: String, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::admin::Entity", + from = "Column::AdminId", + to = "super::admin::Column::Id", + on_update = "NoAction", + on_delete = "NoAction" + )] + Admin, + #[sea_orm( + belongs_to = "super::user::Entity", + from = "Column::DepositToUserId", + to = "super::user::Column::Id", + on_update = "NoAction", + on_delete = "NoAction" + )] + User, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Admin.def() + } +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::User.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/entities/src/mod.rs b/entities/src/mod.rs index 91a8a460..00510fae 100644 --- a/entities/src/mod.rs +++ b/entities/src/mod.rs @@ -3,6 +3,7 @@ pub mod prelude; pub mod admin; +pub mod admin_increase_balance_receipt; pub mod admin_trail; pub mod balance; pub mod increase_on_chain_balance_receipt; diff --git a/entities/src/prelude.rs b/entities/src/prelude.rs index 9d5f4cc0..20b764f3 100644 --- a/entities/src/prelude.rs +++ b/entities/src/prelude.rs @@ -1,6 +1,7 @@ //! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 pub use super::admin::Entity as Admin; +pub use super::admin_increase_balance_receipt::Entity as AdminIncreaseBalanceReceipt; pub use super::admin_trail::Entity as AdminTrail; pub use super::balance::Entity as Balance; pub use super::increase_on_chain_balance_receipt::Entity as IncreaseOnChainBalanceReceipt; diff --git a/latency/Cargo.toml b/latency/Cargo.toml index eb51eba9..7cea2c16 100644 --- a/latency/Cargo.toml +++ b/latency/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] ewma = "0.1.1" +flume = "0.10.14" log = "0.4.17" serde = { version = "1.0.163", features = [] } tokio = { version = "1.28.1", features = ["full"] } diff --git a/latency/src/peak_ewma/mod.rs b/latency/src/peak_ewma/mod.rs index a795c383..0d4dd25b 100644 --- a/latency/src/peak_ewma/mod.rs +++ b/latency/src/peak_ewma/mod.rs @@ -2,9 +2,7 @@ mod rtt_estimate; use std::sync::Arc; -use log::{error, trace}; -use tokio::sync::mpsc; -use tokio::sync::mpsc::error::TrySendError; +use log::{error, log_enabled, trace}; use tokio::task::JoinHandle; use tokio::time::{Duration, Instant}; @@ -20,7 +18,7 @@ pub struct PeakEwmaLatency { /// Join handle for the latency calculation task pub join_handle: JoinHandle<()>, /// Send to update with each request duration - request_tx: mpsc::Sender, + request_tx: flume::Sender, /// Latency average and last update time rtt_estimate: Arc, /// Decay time @@ -32,9 +30,12 @@ impl PeakEwmaLatency { /// /// Returns a handle that can also be used to read the current /// average latency. - pub fn spawn(decay_ns: f64, buf_size: usize, start_latency: Duration) -> Self { + pub fn spawn(decay: Duration, buf_size: usize, start_latency: Duration) -> Self { + let decay_ns = decay.as_nanos() as f64; + debug_assert!(decay_ns > 0.0, "decay_ns must be positive"); - let (request_tx, request_rx) = mpsc::channel(buf_size); + + let (request_tx, request_rx) = flume::bounded(buf_size); let rtt_estimate = Arc::new(AtomicRttEstimate::new(start_latency)); let task = PeakEwmaLatencyTask { request_rx, @@ -56,15 +57,19 @@ impl PeakEwmaLatency { let mut estimate = self.rtt_estimate.load(); let now = Instant::now(); - assert!( - estimate.update_at <= now, - "update_at is {}ns in the future", - estimate.update_at.duration_since(now).as_nanos(), - ); - // Update the RTT estimate to account for decay since the last update. - // TODO: having an update here means we don't actually write from just one thread!! Thats how we get partially written stuff i think - estimate.update(0.0, self.decay_ns, now) + if estimate.update_at > now { + if log_enabled!(log::Level::Trace) { + trace!( + "update_at is {}ns in the future", + estimate.update_at.duration_since(now).as_nanos() + ); + } + estimate.rtt + } else { + // Update the RTT estimate to account for decay since the last update. + estimate.update(0.0, self.decay_ns, now) + } } /// Report latency from a single request @@ -73,15 +78,12 @@ impl PeakEwmaLatency { pub fn report(&self, duration: Duration) { match self.request_tx.try_send(duration) { Ok(()) => {} - Err(TrySendError::Full(_)) => { + Err(err) => { // We don't want to block if the channel is full, just // report the error - error!("Latency report channel full"); + error!("Latency report channel full. {}", err); // TODO: could we spawn a new tokio task to report tthis later? } - Err(TrySendError::Closed(_)) => { - unreachable!("Owner should keep channel open"); - } }; } } @@ -90,7 +92,7 @@ impl PeakEwmaLatency { #[derive(Debug)] struct PeakEwmaLatencyTask { /// Receive new request timings for update - request_rx: mpsc::Receiver, + request_rx: flume::Receiver, /// Current estimate and update time rtt_estimate: Arc, /// Last update time, used for decay calculation @@ -101,8 +103,8 @@ struct PeakEwmaLatencyTask { impl PeakEwmaLatencyTask { /// Run the loop for updating latency - async fn run(mut self) { - while let Some(rtt) = self.request_rx.recv().await { + async fn run(self) { + while let Ok(rtt) = self.request_rx.recv_async().await { self.update(rtt); } trace!("latency loop exited"); @@ -128,14 +130,15 @@ impl PeakEwmaLatencyTask { mod tests { use tokio::time::{self, Duration}; - use crate::util::nanos::NANOS_PER_MILLI; - /// The default RTT estimate decays, so that new nodes are considered if the /// default RTT is too high. #[tokio::test(start_paused = true)] async fn default_decay() { - let estimate = - super::PeakEwmaLatency::spawn(NANOS_PER_MILLI * 1_000.0, 8, Duration::from_millis(10)); + let estimate = super::PeakEwmaLatency::spawn( + Duration::from_millis(1_000), + 8, + Duration::from_millis(10), + ); let load = estimate.latency(); assert_eq!(load, Duration::from_millis(10)); diff --git a/latency/src/peak_ewma/rtt_estimate.rs b/latency/src/peak_ewma/rtt_estimate.rs index be56fe9c..1850d9e4 100644 --- a/latency/src/peak_ewma/rtt_estimate.rs +++ b/latency/src/peak_ewma/rtt_estimate.rs @@ -44,6 +44,7 @@ impl RttEstimate { ); Duration::from_nanos(next_estimate as u64) }; + self.rtt } @@ -101,7 +102,7 @@ impl AtomicRttEstimate { } /// Fetches the value, and applies a function to it that returns an - /// new rtt. Retrns the new RttEstimate with new update_at. + /// new rtt. Returns the new RttEstimate with new update_at. /// /// Automatically updates the update_at with Instant::now(). This /// method omits ordering arguments, defaulting to Relaxed since diff --git a/migration/Cargo.toml b/migration/Cargo.toml index 97f07b5e..bd2e3b11 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "migration" -version = "0.27.0" +version = "0.28.0" edition = "2021" publish = false diff --git a/migration/src/lib.rs b/migration/src/lib.rs index ae9adaf7..91c45391 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -27,6 +27,7 @@ mod m20230412_171916_modify_secondary_user_add_primary_user; mod m20230422_172555_premium_downgrade_logic; mod m20230511_161214_remove_columns_statsv2_origin_and_method; mod m20230512_220213_allow_null_rpc_key_id_in_stats_v2; +mod m20230514_114803_admin_add_credits; pub struct Migrator; @@ -51,16 +52,17 @@ impl MigratorTrait for Migrator { Box::new(m20230125_204810_stats_v2::Migration), Box::new(m20230130_124740_read_only_login_logic::Migration), Box::new(m20230130_165144_prepare_admin_imitation_pre_login::Migration), - Box::new(m20230215_152254_admin_trail::Migration), - Box::new(m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2::Migration), Box::new(m20230205_130035_create_balance::Migration), Box::new(m20230205_133755_create_referrals::Migration), Box::new(m20230214_134254_increase_balance_transactions::Migration), + Box::new(m20230215_152254_admin_trail::Migration), Box::new(m20230221_230953_track_spend::Migration), + Box::new(m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2::Migration), Box::new(m20230412_171916_modify_secondary_user_add_primary_user::Migration), Box::new(m20230422_172555_premium_downgrade_logic::Migration), Box::new(m20230511_161214_remove_columns_statsv2_origin_and_method::Migration), Box::new(m20230512_220213_allow_null_rpc_key_id_in_stats_v2::Migration), + Box::new(m20230514_114803_admin_add_credits::Migration), ] } } diff --git a/migration/src/m20230514_114803_admin_add_credits.rs b/migration/src/m20230514_114803_admin_add_credits.rs new file mode 100644 index 00000000..0344d5d6 --- /dev/null +++ b/migration/src/m20230514_114803_admin_add_credits.rs @@ -0,0 +1,97 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(AdminIncreaseBalanceReceipt::Table) + .if_not_exists() + .col( + ColumnDef::new(AdminIncreaseBalanceReceipt::Id) + .integer() + .not_null() + .auto_increment() + .primary_key(), + ) + .col( + ColumnDef::new(AdminIncreaseBalanceReceipt::Amount) + .decimal_len(20, 10) + .not_null(), + ) + .col( + ColumnDef::new(AdminIncreaseBalanceReceipt::AdminId) + .big_unsigned() + .not_null(), + ) + .foreign_key( + ForeignKey::create() + .name("fk-admin_id") + .from( + AdminIncreaseBalanceReceipt::Table, + AdminIncreaseBalanceReceipt::AdminId, + ) + .to(Admin::Table, Admin::Id), + ) + .col( + ColumnDef::new(AdminIncreaseBalanceReceipt::DepositToUserId) + .big_unsigned() + .not_null(), + ) + .foreign_key( + ForeignKey::create() + .name("fk-admin_deposits_to_user_id") + .from( + AdminIncreaseBalanceReceipt::Table, + AdminIncreaseBalanceReceipt::DepositToUserId, + ) + .to(User::Table, User::Id), + ) + .col( + ColumnDef::new(AdminIncreaseBalanceReceipt::Note) + .string() + .not_null(), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table( + Table::drop() + .table(AdminIncreaseBalanceReceipt::Table) + .to_owned(), + ) + .await + } +} + +#[derive(Iden)] +enum Admin { + Table, + Id, +} + +/// Learn more at https://docs.rs/sea-query#iden +#[derive(Iden)] +enum User { + Table, + Id, +} + +/// Learn more at https://docs.rs/sea-query#iden +#[derive(Iden)] +enum AdminIncreaseBalanceReceipt { + Table, + Id, + Amount, + AdminId, + DepositToUserId, + Note, +} diff --git a/scripts/manual-tests/45-admin-add-balance.sh b/scripts/manual-tests/45-admin-add-balance.sh new file mode 100644 index 00000000..61b194f8 --- /dev/null +++ b/scripts/manual-tests/45-admin-add-balance.sh @@ -0,0 +1,44 @@ + +# Create / Login user1 +curl -X GET "http://127.0.0.1:8544/user/login/0xeb3e928a2e54be013ef8241d4c9eaf4dfae94d5a" +curl -X POST http://127.0.0.1:8544/user/login \ + -H 'Content-Type: application/json' \ + -d '{ + "address": "0xeb3e928a2e54be013ef8241d4c9eaf4dfae94d5a", + "msg": "0x6c6c616d616e6f6465732e636f6d2077616e747320796f7520746f207369676e20696e207769746820796f757220457468657265756d206163636f756e743a0a3078654233453932384132453534424530313345463832343164344339456146344466414539344435610a0af09fa699f09fa699f09fa699f09fa699f09fa6990a0a5552493a2068747470733a2f2f6c6c616d616e6f6465732e636f6d2f0a56657273696f6e3a20310a436861696e2049443a20310a4e6f6e63653a203031483044573642334a48355a4b384a4e3947504d594e4d4b370a4973737565642041743a20323032332d30352d31345431393a33353a35352e3736323632395a0a45787069726174696f6e2054696d653a20323032332d30352d31345431393a35353a35352e3736323632395a", + "sig": "f88b42d638246f8e51637c753052cab3a13b2a138faf3107c921ce2f0027d6506b9adcd3a7b72af830cdf50d20e6e9cb3f9f456dd1be47f6543990ea050909791c", + "version": "3", + "signer": "MEW" + }' + +# 01H0DW6VFCP365B9TXVQVVMHHY +# 01H0DVZNDJWQ7YG8RBHXQHJ301 + +# Make user1 an admin +cargo run change_admin_status 0xeB3E928A2E54BE013EF8241d4C9EaF4DfAE94D5a true + +# Create/Login user2 +curl -X GET "http://127.0.0.1:8544/user/login/0x762390ae7a3c4D987062a398C1eA8767029AB08E" + +curl -X POST http://127.0.0.1:8544/user/login \ + -H 'Content-Type: application/json' \ + -d '{ + "address": "0x762390ae7a3c4d987062a398c1ea8767029ab08e", + "msg": "0x6c6c616d616e6f6465732e636f6d2077616e747320796f7520746f207369676e20696e207769746820796f757220457468657265756d206163636f756e743a0a3078373632333930616537613363344439383730363261333938433165413837363730323941423038450a0af09fa699f09fa699f09fa699f09fa699f09fa6990a0a5552493a2068747470733a2f2f6c6c616d616e6f6465732e636f6d2f0a56657273696f6e3a20310a436861696e2049443a20310a4e6f6e63653a20303148304457384233304e534447594e484d33514d4a31434e530a4973737565642041743a20323032332d30352d31345431393a33373a30312e3238303338355a0a45787069726174696f6e2054696d653a20323032332d30352d31345431393a35373a30312e3238303338355a", + "sig": "c545235557b7952a789dffa2af153af5cf663dcc05449bcc4b651b04cda57de05bcef55c0f5cbf6aa2432369582eb6a40927d14ad0a2d15f48fa45f32fbf273f1c", + "version": "3", + "signer": "MEW" + }' + +# 01H0DWPXRQA7XX2VFSNR02CG1N +# 01H0DWPXQQ951Y3R90QMF6MYGE + +curl \ +-H "Authorization: Bearer 01H0DWPXRQA7XX2VFSNR02CG1N" \ +-X GET "127.0.0.1:8544/user/balance" + + +# Admin add balance +curl \ +-H "Authorization: Bearer 01H0DW6VFCP365B9TXVQVVMHHY" \ +-X GET "127.0.0.1:8544/admin/increase_balance?user_address=0x762390ae7a3c4D987062a398C1eA8767029AB08E&amount=100.0" diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index c51e9c8e..0f4c2af2 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "web3_proxy" -version = "0.27.0" +version = "0.28.0" edition = "2021" default-run = "web3_proxy_cli" @@ -55,6 +55,7 @@ hdrhistogram = "7.5.2" hex_fmt = "0.3.0" hostname = "0.3.1" http = "0.2.9" +hyper = { version = "0.14.26", features = ["full"] } influxdb2 = { git = "https://github.com/llamanodes/influxdb2", features = ["rustls"] } influxdb2-structmap = { git = "https://github.com/llamanodes/influxdb2/"} ipnet = "2.7.2" @@ -71,10 +72,10 @@ parking_lot = { version = "0.12.1", features = ["arc_lock"] } prettytable = "*" proctitle = "0.1.1" rdkafka = { version = "0.31.0" } -regex = "1.8.1" +regex = "1.8.2" reqwest = { version = "0.11.18", default-features = false, features = ["json", "tokio-rustls"] } rmp-serde = "1.1.1" -sentry = { version = "0.31.1", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] } +sentry = { version = "0.31.2", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] } serde = { version = "1.0.163", features = [] } serde_json = { version = "1.0.96", default-features = false, features = ["alloc", "raw_value"] } serde_prometheus = "0.2.2" @@ -91,3 +92,6 @@ tower-http = { version = "0.4.0", features = ["cors", "sensitive-headers"] } ulid = { version = "1.0.0", features = ["uuid", "serde"] } url = "2.3.1" uuid = "1.3.3" + +[dev-dependencies] +tokio = { version = "1.28.1", features = ["full", "test-util"] } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index bdc70c90..a6ade181 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -74,7 +74,7 @@ pub static APP_USER_AGENT: &str = concat!( // aggregate across 1 week pub const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7; -pub type AnyhowJoinHandle = JoinHandle>; +pub type Web3ProxyJoinHandle = JoinHandle>; /// TODO: move this #[derive(Clone, Debug, Default, From)] @@ -176,7 +176,7 @@ pub struct Web3ProxyApp { // TODO: should the key be our RpcSecretKey class instead of Ulid? pub rpc_secret_key_cache: RpcSecretKeyCache, /// concurrent/parallel RPC request limits for authenticated users - pub rpc_key_semaphores: Cache>, + pub user_semaphores: Cache>, /// concurrent/parallel request limits for anonymous users pub ip_semaphores: Cache>, /// concurrent/parallel application request limits for authenticated users @@ -188,7 +188,7 @@ pub struct Web3ProxyApp { /// flatten a JoinError into an anyhow error /// Useful when joining multiple futures. -pub async fn flatten_handle(handle: AnyhowJoinHandle) -> anyhow::Result { +pub async fn flatten_handle(handle: Web3ProxyJoinHandle) -> Web3ProxyResult { match handle.await { Ok(Ok(result)) => Ok(result), Ok(Err(err)) => Err(err), @@ -198,8 +198,8 @@ pub async fn flatten_handle(handle: AnyhowJoinHandle) -> anyhow::Result /// return the first error, or Ok if everything worked pub async fn flatten_handles( - mut handles: FuturesUnordered>, -) -> anyhow::Result<()> { + mut handles: FuturesUnordered>, +) -> Web3ProxyResult<()> { while let Some(x) = handles.next().await { match x { Err(e) => return Err(e.into()), @@ -315,9 +315,9 @@ pub struct Web3ProxyAppSpawn { /// the app. probably clone this to use in other groups of handles pub app: Arc, /// handles for the balanced and private rpcs - pub app_handles: FuturesUnordered>, + pub app_handles: FuturesUnordered>, /// these are important and must be allowed to finish - pub background_handles: FuturesUnordered>, + pub background_handles: FuturesUnordered>, /// config changes are sent here pub new_top_config_sender: watch::Sender, /// watch this to know when to start the app @@ -359,10 +359,12 @@ impl Web3ProxyApp { } // these futures are key parts of the app. if they stop running, the app has encountered an irrecoverable error - let app_handles = FuturesUnordered::new(); + // TODO: this is a small enough group, that a vec with try_join_all is probably fine + let app_handles: FuturesUnordered> = FuturesUnordered::new(); // we must wait for these to end on their own (and they need to subscribe to shutdown_sender) - let important_background_handles = FuturesUnordered::new(); + let important_background_handles: FuturesUnordered> = + FuturesUnordered::new(); // connect to the database and make sure the latest migrations have run let mut db_conn = None::; @@ -624,12 +626,10 @@ impl Web3ProxyApp { // TODO: what should tti be for semaphores? let bearer_token_semaphores = Cache::new(max_users); let ip_semaphores = Cache::new(max_users); - let registered_user_semaphores = Cache::new(max_users); + let user_semaphores = Cache::new(max_users); let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn( - top_config.app.chain_id, db_conn.clone(), - http_client.clone(), top_config.app.max_block_age, top_config.app.max_block_lag, top_config.app.min_synced_rpcs, @@ -654,9 +654,7 @@ impl Web3ProxyApp { // TODO: Merge // let (private_rpcs, private_rpcs_handle) = Web3Rpcs::spawn( let (private_rpcs, private_handle, _) = Web3Rpcs::spawn( - top_config.app.chain_id, db_conn.clone(), - http_client.clone(), // private rpcs don't get subscriptions, so no need for max_block_age or max_block_lag None, None, @@ -688,9 +686,7 @@ impl Web3ProxyApp { } else { // TODO: do something with the spawn handle let (bundler_4337_rpcs, bundler_4337_rpcs_handle, _) = Web3Rpcs::spawn( - top_config.app.chain_id, db_conn.clone(), - http_client.clone(), // bundler_4337_rpcs don't get subscriptions, so no need for max_block_age or max_block_lag None, None, @@ -735,7 +731,7 @@ impl Web3ProxyApp { rpc_secret_key_cache, bearer_token_semaphores, ip_semaphores, - rpc_key_semaphores: registered_user_semaphores, + user_semaphores, stat_sender, }; @@ -752,9 +748,9 @@ impl Web3ProxyApp { loop { let new_top_config = new_top_config_receiver.borrow_and_update().to_owned(); - app.apply_top_config(new_top_config) - .await - .context("failed applying new top_config")?; + if let Err(err) = app.apply_top_config(new_top_config).await { + error!("unable to apply config! {:?}", err); + }; new_top_config_receiver .changed() @@ -790,7 +786,7 @@ impl Web3ProxyApp { .into()) } - pub async fn apply_top_config(&self, new_top_config: TopConfig) -> anyhow::Result<()> { + pub async fn apply_top_config(&self, new_top_config: TopConfig) -> Web3ProxyResult<()> { // TODO: also update self.config from new_top_config.app // connect to the backends @@ -1015,20 +1011,12 @@ impl Web3ProxyApp { ) -> Web3ProxyResult<(StatusCode, JsonRpcForwardedResponseEnum, Vec>)> { // trace!(?request, "proxy_web3_rpc"); - // even though we have timeouts on the requests to our backend providers, - // we need a timeout for the incoming request so that retries don't run forever - // TODO: take this as an optional argument. check for a different max from the user_tier? - // TODO: how much time was spent on this request alredy? - let max_time = Duration::from_secs(240); - // TODO: use streams and buffers so we don't overwhelm our server let response = match request { JsonRpcRequestEnum::Single(mut request) => { - let (status_code, response, rpcs) = timeout( - max_time, - self.proxy_cached_request(&authorization, &mut request, None), - ) - .await?; + let (status_code, response, rpcs) = self + .proxy_cached_request(&authorization, &mut request, None) + .await; ( status_code, @@ -1037,11 +1025,9 @@ impl Web3ProxyApp { ) } JsonRpcRequestEnum::Batch(requests) => { - let (responses, rpcs) = timeout( - max_time, - self.proxy_web3_rpc_requests(&authorization, requests), - ) - .await??; + let (responses, rpcs) = self + .proxy_web3_rpc_requests(&authorization, requests) + .await?; // TODO: real status code ( @@ -1335,7 +1321,8 @@ impl Web3ProxyApp { | "eth_getUserOperationReceipt" | "eth_supportedEntryPoints") => match self.bundler_4337_rpcs.as_ref() { Some(bundler_4337_rpcs) => { - bundler_4337_rpcs + // TODO: timeout + bundler_4337_rpcs .try_proxy_connection( authorization, request, @@ -1371,6 +1358,7 @@ impl Web3ProxyApp { JsonRpcResponseData::from(json!(Address::zero())) } "eth_estimateGas" => { + // TODO: timeout let response_data = self .balanced_rpcs .try_proxy_connection( @@ -1407,6 +1395,7 @@ impl Web3ProxyApp { } "eth_getTransactionReceipt" | "eth_getTransactionByHash" => { // try to get the transaction without specifying a min_block_height + // TODO: timeout let mut response_data = self .balanced_rpcs .try_proxy_connection( @@ -1450,12 +1439,7 @@ impl Web3ProxyApp { // TODO: error if the chain_id is incorrect - // TODO: check the cache to see if we have sent this transaction recently - // TODO: if so, use a cached response. - // TODO: if not, - // TODO: - cache successes for up to 1 minute - // TODO: - cache failures for 1 block (we see transactions skipped because out of funds. but that can change block to block) - + // TODO: timeout let mut response_data = self .try_send_protected( authorization, @@ -1585,6 +1569,7 @@ impl Web3ProxyApp { , "web3_sha3" => { // returns Keccak-256 (not the standardized SHA3-256) of the given data. + // TODO: timeout match &request.params { Some(serde_json::Value::Array(params)) => { // TODO: make a struct and use serde conversion to clean this up @@ -1744,6 +1729,9 @@ impl Web3ProxyApp { let authorization = authorization.clone(); + // TODO: different timeouts for different user tiers + let duration = Duration::from_secs(240); + if let Some(cache_key) = cache_key { let from_block_num = cache_key.from_block.as_ref().map(|x| x.number.unwrap()); let to_block_num = cache_key.to_block.as_ref().map(|x| x.number.unwrap()); @@ -1754,15 +1742,18 @@ impl Web3ProxyApp { { Ok(x) => x, Err(x) => { - let response_data = self.balanced_rpcs - .try_proxy_connection( - &authorization, - request, - Some(request_metadata), - from_block_num.as_ref(), - to_block_num.as_ref(), + let response_data = timeout( + duration, + self.balanced_rpcs + .try_proxy_connection( + &authorization, + request, + Some(request_metadata), + from_block_num.as_ref(), + to_block_num.as_ref(), + ) ) - .await?; + .await??; // TODO: convert the Box to an Arc x.insert(response_data.clone()); @@ -1771,15 +1762,18 @@ impl Web3ProxyApp { } } } else { - self.balanced_rpcs - .try_proxy_connection( - &authorization, - request, - Some(request_metadata), - None, - None, + timeout( + duration, + self.balanced_rpcs + .try_proxy_connection( + &authorization, + request, + Some(request_metadata), + None, + None, + ) ) - .await? + .await?? } } }; diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 05a947d5..df3ad199 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -1,4 +1,4 @@ -use crate::app::AnyhowJoinHandle; +use crate::app::Web3ProxyJoinHandle; use crate::rpcs::blockchain::{BlocksByHashCache, Web3ProxyBlock}; use crate::rpcs::one::Web3Rpc; use argh::FromArgs; @@ -9,7 +9,7 @@ use log::warn; use migration::sea_orm::DatabaseConnection; use serde::Deserialize; use std::sync::Arc; -use tokio::sync::broadcast; +use std::time::Duration; pub type BlockAndRpc = (Option, Arc); pub type TxHashAndRpc = (TxHash, Arc); @@ -283,28 +283,48 @@ impl Web3RpcConfig { redis_pool: Option, chain_id: u64, http_client: Option, - http_interval_sender: Option>>, blocks_by_hash_cache: BlocksByHashCache, block_sender: Option>, tx_id_sender: Option>, - reconnect: bool, - ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { + ) -> anyhow::Result<(Arc, Web3ProxyJoinHandle<()>)> { if !self.extra.is_empty() { warn!("unknown Web3RpcConfig fields!: {:?}", self.extra.keys()); } + // TODO: get this from config? a helper function? where does this belong? + let block_interval = match chain_id { + // ethereum + 1 => Duration::from_secs(12), + // ethereum-goerli + 5 => Duration::from_secs(12), + // polygon + 137 => Duration::from_secs(2), + // fantom + 250 => Duration::from_secs(1), + // arbitrum + 42161 => Duration::from_millis(500), + // anything else + _ => { + let default = 10; + warn!( + "unexpected chain_id ({}). polling every {} seconds", + chain_id, default + ); + Duration::from_secs(default) + } + }; + Web3Rpc::spawn( self, name, chain_id, db_conn, http_client, - http_interval_sender, redis_pool, + block_interval, blocks_by_hash_cache, block_sender, tx_id_sender, - reconnect, ) .await } diff --git a/web3_proxy/src/frontend/admin.rs b/web3_proxy/src/frontend/admin.rs index c3ddf453..ba4d4eaa 100644 --- a/web3_proxy/src/frontend/admin.rs +++ b/web3_proxy/src/frontend/admin.rs @@ -7,6 +7,7 @@ use crate::app::Web3ProxyApp; use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext}; use crate::user_token::UserBearerToken; use crate::PostLogin; +use anyhow::Context; use axum::{ extract::{Path, Query}, headers::{authorization::Bearer, Authorization}, @@ -16,15 +17,19 @@ use axum::{ use axum_client_ip::InsecureClientIp; use axum_macros::debug_handler; use chrono::{TimeZone, Utc}; -use entities::{admin_trail, login, pending_login, rpc_key, user}; +use entities::{ + admin, admin_increase_balance_receipt, admin_trail, balance, login, pending_login, rpc_key, + user, user_tier, +}; use ethers::{prelude::Address, types::Bytes}; use hashbrown::HashMap; use http::StatusCode; use log::{debug, info, warn}; -use migration::sea_orm::prelude::Uuid; +use migration::sea_orm::prelude::{Decimal, Uuid}; use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, }; +use migration::{Expr, OnConflict}; use serde_json::json; use siwe::{Message, VerificationOpts}; use std::ops::Add; @@ -33,6 +38,135 @@ use std::sync::Arc; use time::{Duration, OffsetDateTime}; use ulid::Ulid; +/// `GET /admin/increase_balance` -- As an admin, modify a user's user-tier +/// +/// - user_address that is to credited balance +/// - user_role_tier that is supposed to be adapted +#[debug_handler] +pub async fn admin_increase_balance( + Extension(app): Extension>, + TypedHeader(Authorization(bearer)): TypedHeader>, + Query(params): Query>, +) -> Web3ProxyResponse { + let (caller, _) = app.bearer_is_authorized(bearer).await?; + let caller_id = caller.id; + + // Establish connections + let db_conn = app + .db_conn() + .context("query_admin_modify_user needs a db")?; + + // Check if the caller is an admin (if not, return early) + let admin_entry: admin::Model = admin::Entity::find() + .filter(admin::Column::UserId.eq(caller_id)) + .one(&db_conn) + .await? + .ok_or(Web3ProxyError::AccessDenied)?; + + // Get the user from params + let user_address: Address = params + .get("user_address") + .ok_or_else(|| { + Web3ProxyError::BadRequest("Unable to find user_address key in request".to_string()) + })? + .parse::
() + .map_err(|_| { + Web3ProxyError::BadRequest("Unable to parse user_address as an Address".to_string()) + })?; + let user_address_bytes: Vec = user_address.to_fixed_bytes().into(); + let note: String = params + .get("note") + .ok_or_else(|| { + Web3ProxyError::BadRequest("Unable to find 'note' key in request".to_string()) + })? + .parse::() + .map_err(|_| { + Web3ProxyError::BadRequest("Unable to parse 'note' as a String".to_string()) + })?; + // Get the amount from params + // Decimal::from_str + let amount: Decimal = params + .get("amount") + .ok_or_else(|| { + Web3ProxyError::BadRequest("Unable to get the amount key from the request".to_string()) + }) + .map(|x| Decimal::from_str(x))? + .map_err(|err| { + Web3ProxyError::BadRequest(format!("Unable to parse amount from the request {:?}", err)) + })?; + + let user_entry: user::Model = user::Entity::find() + .filter(user::Column::Address.eq(user_address_bytes.clone())) + .one(&db_conn) + .await? + .ok_or(Web3ProxyError::BadRequest( + "No user with this id found".to_string(), + ))?; + + let increase_balance_receipt = admin_increase_balance_receipt::ActiveModel { + amount: sea_orm::Set(amount), + admin_id: sea_orm::Set(admin_entry.id), + deposit_to_user_id: sea_orm::Set(user_entry.id), + note: sea_orm::Set(note), + ..Default::default() + }; + increase_balance_receipt.save(&db_conn).await?; + + let mut out = HashMap::new(); + out.insert( + "user", + serde_json::Value::String(format!("{:?}", user_address)), + ); + out.insert("amount", serde_json::Value::String(amount.to_string())); + + // Get the balance row + let balance_entry: balance::Model = balance::Entity::find() + .filter(balance::Column::UserId.eq(user_entry.id)) + .one(&db_conn) + .await? + .context("User does not have a balance row")?; + + // Finally make the user premium if balance is above 10$ + let premium_user_tier = user_tier::Entity::find() + .filter(user_tier::Column::Title.eq("Premium")) + .one(&db_conn) + .await? + .context("Premium tier was not found!")?; + + let balance_entry = balance_entry.into_active_model(); + balance::Entity::insert(balance_entry) + .on_conflict( + OnConflict::new() + .values([ + // ( + // balance::Column::Id, + // Expr::col(balance::Column::Id).add(self.frontend_requests), + // ), + ( + balance::Column::AvailableBalance, + Expr::col(balance::Column::AvailableBalance).add(amount), + ), + // ( + // balance::Column::Used, + // Expr::col(balance::Column::UsedBalance).add(self.backend_retries), + // ), + // ( + // balance::Column::UserId, + // Expr::col(balance::Column::UserId).add(self.no_servers), + // ), + ]) + .to_owned(), + ) + .exec(&db_conn) + .await?; + // TODO: Downgrade otherwise, right now not functioning properly + + // Then read and save in one transaction + let response = (StatusCode::OK, Json(out)).into_response(); + + Ok(response) +} + /// `GET /admin/modify_role` -- As an admin, modify a user's user-tier /// /// - user_address that is to be modified diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 51ad4834..dcc3b614 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -30,7 +30,7 @@ use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::RedisRateLimitResult; use std::convert::Infallible; use std::fmt::Display; -use std::hash::Hash; +use std::hash::{Hash, Hasher}; use std::mem; use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize}; use std::time::Duration; @@ -42,22 +42,12 @@ use ulid::Ulid; use uuid::Uuid; /// This lets us use UUID and ULID while we transition to only ULIDs -/// TODO: include the key's description. #[derive(Copy, Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)] pub enum RpcSecretKey { Ulid(Ulid), Uuid(Uuid), } -impl Hash for RpcSecretKey { - fn hash(&self, state: &mut H) { - match self { - Self::Ulid(x) => state.write_u128(x.0), - Self::Uuid(x) => state.write_u128(x.as_u128()), - } - } -} - /// TODO: should this have IpAddr and Origin or AuthorizationChecks? #[derive(Debug)] pub enum RateLimitResult { @@ -99,6 +89,17 @@ pub struct KafkaDebugLogger { num_responses: AtomicUsize, } +impl Hash for RpcSecretKey { + fn hash(&self, state: &mut H) { + let x = match self { + Self::Ulid(x) => x.0, + Self::Uuid(x) => x.as_u128(), + }; + + x.hash(state); + } +} + impl fmt::Debug for KafkaDebugLogger { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("KafkaDebugLogger") @@ -883,17 +884,13 @@ impl Web3ProxyApp { if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests { let semaphore = self .ip_semaphores - .get_or_insert_async::(ip, async move { + .get_or_insert_async::(ip, async move { // TODO: set max_concurrent_requests dynamically based on load? let s = Semaphore::new(max_concurrent_requests); Ok(Arc::new(s)) }) - .await?; - - // if semaphore.available_permits() == 0 { - // // TODO: concurrent limit hit! emit a stat? less important for anon users - // // TODO: there is probably a race here - // } + .await + .expect("infallible"); let semaphore_permit = semaphore.acquire_owned().await?; @@ -903,8 +900,8 @@ impl Web3ProxyApp { } } - /// Limit the number of concurrent requests from the given rpc key. - pub async fn registered_user_semaphore( + /// Limit the number of concurrent requests for a given user across all of their keys + pub async fn user_semaphore( &self, authorization_checks: &AuthorizationChecks, ) -> Web3ProxyResult> { @@ -915,25 +912,19 @@ impl Web3ProxyApp { .or(Err(Web3ProxyError::UserIdZero))?; let semaphore = self - .rpc_key_semaphores - .get_or_insert_async(&user_id, async move { + .user_semaphores + .get_or_insert_async::(&user_id, async move { let s = Semaphore::new(max_concurrent_requests as usize); - // trace!("new semaphore for user_id {}", user_id); - Ok::<_, Infallible>(Arc::new(s)) + Ok(Arc::new(s)) }) .await - .unwrap(); - - // if semaphore.available_permits() == 0 { - // // TODO: concurrent limit hit! emit a stat? this has a race condition though. - // // TODO: maybe have a stat on how long we wait to acquire the semaphore instead? - // } + .expect("infallible"); let semaphore_permit = semaphore.acquire_owned().await?; Ok(Some(semaphore_permit)) } else { - // unlimited requests allowed + // unlimited concurrency Ok(None) } } @@ -955,7 +946,7 @@ impl Web3ProxyApp { Ok(Arc::new(s)) }) .await - .unwrap(); + .expect("infallible"); let semaphore_permit = semaphore.acquire_owned().await?; @@ -1043,7 +1034,7 @@ impl Web3ProxyApp { // they do check origin because we can override rate limits for some origins let authorization = Authorization::external( allowed_origin_requests_per_period, - self.db_conn.clone(), + self.db_conn(), ip, origin, proxy_mode, @@ -1098,8 +1089,7 @@ impl Web3ProxyApp { proxy_mode: ProxyMode, rpc_secret_key: RpcSecretKey, ) -> Web3ProxyResult { - let authorization_checks: Result<_, Web3ProxyError> = self - .rpc_secret_key_cache + self.rpc_secret_key_cache .get_or_insert_async(&rpc_secret_key, async move { // trace!(?rpc_secret_key, "user cache miss"); @@ -1119,7 +1109,6 @@ impl Web3ProxyApp { Some(rpc_key_model) => { // TODO: move these splits into helper functions // TODO: can we have sea orm handle this for us? - // TODO: don't expect. return an application error let user_model = user::Entity::find_by_id(rpc_key_model.user_id) .one(db_replica.conn()) .await? @@ -1129,8 +1118,8 @@ impl Web3ProxyApp { .filter(balance::Column::UserId.eq(user_model.id)) .one(db_replica.conn()) .await? - .map(|x| x.available_balance) - .unwrap_or_default(); + .expect("related balance") + .available_balance; let user_tier_model = user_tier::Entity::find_by_id(user_model.user_tier_id) @@ -1220,9 +1209,7 @@ impl Web3ProxyApp { None => Ok(AuthorizationChecks::default()), } }) - .await; - - authorization_checks + .await } /// Authorized the ip/origin/referer/useragent and rate limit and concurrency @@ -1246,9 +1233,7 @@ impl Web3ProxyApp { // only allow this rpc_key to run a limited amount of concurrent requests // TODO: rate limit should be BEFORE the semaphore! - let semaphore = self - .registered_user_semaphore(&authorization_checks) - .await?; + let semaphore = self.user_semaphore(&authorization_checks).await?; let authorization = Authorization::try_new( authorization_checks, diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 9126526e..c970c849 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -47,14 +47,15 @@ pub enum Web3ProxyError { Database(DbErr), #[display(fmt = "{:#?}, {:#?}", _0, _1)] EipVerificationFailed(Box, Box), - EthersHttpClientError(ethers::prelude::HttpClientError), - EthersProviderError(ethers::prelude::ProviderError), - EthersWsClientError(ethers::prelude::WsClientError), - FlumeRecvError(flume::RecvError), + EthersHttpClient(ethers::prelude::HttpClientError), + EthersProvider(ethers::prelude::ProviderError), + EthersWsClient(ethers::prelude::WsClientError), + FlumeRecv(flume::RecvError), GasEstimateNotU256, Headers(headers::Error), HeaderToString(ToStrError), - InfluxDb2RequestError(influxdb2::RequestError), + Hyper(hyper::Error), + InfluxDb2Request(influxdb2::RequestError), #[display(fmt = "{} > {}", min, max)] #[from(ignore)] InvalidBlockBounds { @@ -64,6 +65,7 @@ pub enum Web3ProxyError { InvalidHeaderValue(InvalidHeaderValue), InvalidEip, InvalidInviteCode, + Io(std::io::Error), UnknownReferralCode, InvalidReferer, InvalidSignatureLength, @@ -88,6 +90,12 @@ pub enum Web3ProxyError { num_known: usize, min_head_rpcs: usize, }, + #[display(fmt = "{}/{}", available, needed)] + #[from(ignore)] + NotEnoughSoftLimit { + available: u32, + needed: u32, + }, NotFound, NotImplemented, OriginRequired, @@ -136,6 +144,7 @@ pub enum Web3ProxyError { impl Web3ProxyError { pub fn into_response_parts(self) -> (StatusCode, JsonRpcResponseData) { + // TODO: include a unique request id in the data let (code, err): (StatusCode, JsonRpcErrorData) = match self { Self::AccessDenied => { // TODO: attach something to this trace. probably don't include much in the message though. don't want to leak creds by accident @@ -223,7 +232,7 @@ impl Web3ProxyError { }, ) } - Self::EthersHttpClientError(err) => { + Self::EthersHttpClient(err) => { warn!("EthersHttpClientError err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, @@ -234,7 +243,7 @@ impl Web3ProxyError { }, ) } - Self::EthersProviderError(err) => { + Self::EthersProvider(err) => { warn!("EthersProviderError err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, @@ -245,7 +254,7 @@ impl Web3ProxyError { }, ) } - Self::EthersWsClientError(err) => { + Self::EthersWsClient(err) => { warn!("EthersWsClientError err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, @@ -256,7 +265,7 @@ impl Web3ProxyError { }, ) } - Self::FlumeRecvError(err) => { + Self::FlumeRecv(err) => { warn!("FlumeRecvError err={:#?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, @@ -290,7 +299,19 @@ impl Web3ProxyError { }, ) } - Self::InfluxDb2RequestError(err) => { + Self::Hyper(err) => { + warn!("hyper err={:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcErrorData { + // TODO: is it safe to expose these error strings? + message: Cow::Owned(err.to_string()), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, + ) + } + Self::InfluxDb2Request(err) => { // TODO: attach a request id to the message and to this error so that if people report problems, we can dig in sentry to find out more error!("influxdb2 err={:?}", err); ( @@ -371,6 +392,18 @@ impl Web3ProxyError { }, ) } + Self::Io(err) => { + warn!("std io err={:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcErrorData { + // TODO: is it safe to expose our io error strings? + message: Cow::Owned(err.to_string()), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, + ) + } Self::UnknownReferralCode => { debug!("UnknownReferralCode"); ( @@ -528,6 +561,20 @@ impl Web3ProxyError { }, ) } + Self::NotEnoughSoftLimit { available, needed } => { + error!("NotEnoughSoftLimit {}/{}", available, needed); + ( + StatusCode::BAD_GATEWAY, + JsonRpcErrorData { + message: Cow::Owned(format!( + "not enough soft limit available {}/{}", + available, needed + )), + code: StatusCode::BAD_GATEWAY.as_u16().into(), + data: None, + }, + ) + } Self::NotFound => { // TODO: emit a stat? // TODO: instead of an error, show a normal html page for 404? diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index e1496960..23cd51d2 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -28,6 +28,8 @@ use tokio::sync::broadcast; use tower_http::cors::CorsLayer; use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer; +use self::errors::Web3ProxyResult; + /// simple keys for caching responses #[derive(Copy, Clone, Hash, PartialEq, Eq, EnumCount, EnumIter)] pub enum ResponseCacheKey { @@ -49,7 +51,7 @@ pub async fn serve( proxy_app: Arc, mut shutdown_receiver: broadcast::Receiver<()>, shutdown_complete_sender: broadcast::Sender<()>, -) -> anyhow::Result<()> { +) -> Web3ProxyResult<()> { // setup caches for whatever the frontend needs // no need for max items since it is limited by the enum key // TODO: latest moka allows for different ttls for different @@ -199,6 +201,10 @@ pub async fn serve( "/user/logout", post(users::authentication::user_logout_post), ) + .route( + "/admin/increase_balance", + get(admin::admin_increase_balance), + ) .route("/admin/modify_role", get(admin::admin_change_user_roles)) .route( "/admin/imitate-login/:admin_address/:user_address", diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index 1f1cb94b..632f2839 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -36,7 +36,7 @@ pub async fn health( Ok(_health(app).await) }) .await - .unwrap(); + .expect("this cache get is infallible"); Response::builder() .status(code) @@ -70,7 +70,7 @@ pub async fn backups_needed( Ok(_backups_needed(app).await) }) .await - .unwrap(); + .expect("this cache get is infallible"); Response::builder() .status(code) @@ -120,7 +120,7 @@ pub async fn status( Ok(_status(app).await) }) .await - .unwrap(); + .expect("this cache get is infallible"); Response::builder() .status(code) diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index 975fb3da..1ceb8532 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -153,7 +153,7 @@ pub async fn user_balance_post( // Just make an rpc request, idk if i need to call this super extensive code let transaction_receipt: TransactionReceipt = match app .balanced_rpcs - .wait_for_best_rpc(&authorization, None, &mut vec![], None, None) + .wait_for_best_rpc(&authorization, None, &mut vec![], None, None, None) .await { Ok(OpenRequestResult::Handle(handle)) => { @@ -166,7 +166,6 @@ pub async fn user_balance_post( "eth_getTransactionReceipt", &vec![format!("0x{}", hex::encode(tx_hash))], Level::Trace.into(), - None, ) .await // TODO: What kind of error would be here @@ -188,7 +187,7 @@ pub async fn user_balance_post( debug!("Transaction receipt is: {:?}", transaction_receipt); let accepted_token: Address = match app .balanced_rpcs - .wait_for_best_rpc(&authorization, None, &mut vec![], None, None) + .wait_for_best_rpc(&authorization, None, &mut vec![], None, None, None) .await { Ok(OpenRequestResult::Handle(handle)) => { @@ -217,7 +216,7 @@ pub async fn user_balance_post( ]); debug!("Params are: {:?}", ¶ms); let accepted_token: String = handle - .request("eth_call", ¶ms, Level::Trace.into(), None) + .request("eth_call", ¶ms, Level::Trace.into()) .await // TODO: What kind of error would be here .map_err(|err| Web3ProxyError::Anyhow(err.into()))?; @@ -243,7 +242,7 @@ pub async fn user_balance_post( debug!("Accepted token is: {:?}", accepted_token); let decimals: u32 = match app .balanced_rpcs - .wait_for_best_rpc(&authorization, None, &mut vec![], None, None) + .wait_for_best_rpc(&authorization, None, &mut vec![], None, None, None) .await { Ok(OpenRequestResult::Handle(handle)) => { @@ -267,7 +266,7 @@ pub async fn user_balance_post( ]); debug!("ERC20 Decimal request params are: {:?}", ¶ms); let decimals: String = handle - .request("eth_call", ¶ms, Level::Trace.into(), None) + .request("eth_call", ¶ms, Level::Trace.into()) .await .map_err(|err| Web3ProxyError::Anyhow(err.into()))?; debug!("Decimals response is: {:?}", decimals); diff --git a/web3_proxy/src/prometheus.rs b/web3_proxy/src/prometheus.rs index 2c582c24..c17b8ecd 100644 --- a/web3_proxy/src/prometheus.rs +++ b/web3_proxy/src/prometheus.rs @@ -8,13 +8,14 @@ use std::sync::Arc; use tokio::sync::broadcast; use crate::app::Web3ProxyApp; +use crate::frontend::errors::Web3ProxyResult; /// Run a prometheus metrics server on the given port. pub async fn serve( app: Arc, port: u16, mut shutdown_receiver: broadcast::Receiver<()>, -) -> anyhow::Result<()> { +) -> Web3ProxyResult<()> { // routes should be ordered most to least common let app = Router::new().route("/", get(root)).layer(Extension(app)); diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 9c181e0d..df79ac68 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -18,7 +18,6 @@ use std::convert::Infallible; use std::hash::Hash; use std::{cmp::Ordering, fmt::Display, sync::Arc}; use tokio::sync::broadcast; -use tokio::time::Duration; // TODO: type for Hydrated Blocks with their full transactions? pub type ArcBlock = Arc>; @@ -193,7 +192,7 @@ impl Web3Rpcs { .blocks_by_hash .get_or_insert_async::(&block_hash, async move { Ok(block) }) .await - .unwrap(); + .expect("this cache get is infallible"); Ok(block) } @@ -219,13 +218,11 @@ impl Web3Rpcs { // TODO: if error, retry? let block: Web3ProxyBlock = match rpc { Some(rpc) => rpc - .wait_for_request_handle(authorization, Some(Duration::from_secs(30)), None) - .await? .request::<_, Option>( "eth_getBlockByHash", &json!(get_block_params), Level::Error.into(), - None, + authorization.clone(), ) .await? .and_then(|x| { @@ -367,7 +364,7 @@ impl Web3Rpcs { // TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed // Geth's subscriptions have the same potential for skipping blocks. pending_tx_sender: Option>, - ) -> anyhow::Result<()> { + ) -> Web3ProxyResult<()> { let mut connection_heads = ConsensusFinder::new(self.max_block_age, self.max_block_lag); loop { diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index b11fb794..b7237ccb 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -374,7 +374,7 @@ impl ConsensusFinder { .first_seen .get_or_insert_async::(block.hash(), async { Ok(Instant::now()) }) .await - .unwrap(); + .expect("this cache get is infallible"); // calculate elapsed time before trying to lock let latency = first_seen.elapsed(); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 5f281bee..4bf536cf 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -3,7 +3,7 @@ use super::blockchain::{BlocksByHashCache, BlocksByNumberCache, Web3ProxyBlock}; use super::consensus::{ConsensusWeb3Rpcs, ShouldWaitForBlock}; use super::one::Web3Rpc; use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler}; -use crate::app::{flatten_handle, AnyhowJoinHandle, Web3ProxyApp}; +use crate::app::{flatten_handle, Web3ProxyApp, Web3ProxyJoinHandle}; use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; @@ -11,7 +11,6 @@ use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{JsonRpcErrorData, JsonRpcRequest}; use crate::response_cache::JsonRpcResponseData; use crate::rpcs::transactions::TxStatus; -use anyhow::Context; use arc_swap::ArcSwap; use counter::Counter; use derive_more::From; @@ -35,8 +34,9 @@ use std::fmt::{self, Display}; use std::sync::atomic::Ordering; use std::sync::Arc; use thread_fast_rng::rand::seq::SliceRandom; +use tokio::select; use tokio::sync::{broadcast, watch}; -use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; +use tokio::time::{sleep, sleep_until, Duration, Instant}; /// A collection of web3 connections. Sends requests either the current best server or all servers. #[derive(From)] @@ -46,8 +46,6 @@ pub struct Web3Rpcs { pub(crate) block_sender: flume::Sender<(Option, Arc)>, /// any requests will be forwarded to one (or more) of these connections pub(crate) by_name: ArcSwap>>, - /// notify all http providers to check their blocks at the same time - pub(crate) http_interval_sender: Option>>, /// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender` /// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed /// TODO: why is watch_consensus_head_sender in an Option, but this one isn't? @@ -78,9 +76,7 @@ impl Web3Rpcs { /// Spawn durable connections to multiple Web3 providers. #[allow(clippy::too_many_arguments)] pub async fn spawn( - chain_id: u64, db_conn: Option, - http_client: Option, max_block_age: Option, max_block_lag: Option, min_head_rpcs: usize, @@ -91,82 +87,18 @@ impl Web3Rpcs { watch_consensus_head_sender: Option>>, ) -> anyhow::Result<( Arc, - AnyhowJoinHandle<()>, + Web3ProxyJoinHandle<()>, watch::Receiver>>, // watch::Receiver>, )> { let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (block_sender, block_receiver) = flume::unbounded::(); - // TODO: query the rpc to get the actual expected block time, or get from config? maybe have this be part of a health check? - let expected_block_time_ms = match chain_id { - // ethereum - 1 => 12_000, - // ethereum-goerli - 5 => 12_000, - // polygon - 137 => 2_000, - // fantom - 250 => 1_000, - // arbitrum - 42161 => 500, - // anything else - _ => { - warn!( - "unexpected chain_id ({}). polling every {} seconds", - chain_id, 10 - ); - 10_000 - } - }; - - let http_interval_sender = if http_client.is_some() { - let (sender, _) = broadcast::channel(1); - - // TODO: what interval? follow a websocket also? maybe by watching synced connections with a timeout. will need debounce - let mut interval = interval(Duration::from_millis(expected_block_time_ms / 2)); - interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - - let sender = Arc::new(sender); - - let f = { - let sender = sender.clone(); - - async move { - loop { - interval.tick().await; - - // trace!("http interval ready"); - - if sender.send(()).is_err() { - // errors are okay. they mean that all receivers have been dropped, or the rpcs just haven't started yet - // TODO: i'm seeing this error a lot more than expected - trace!("no http receivers"); - }; - } - } - }; - - // TODO: do something with this handle? - tokio::spawn(f); - - Some(sender) - } else { - None - }; - // these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes - // TODO: how can we do the weigher this? need to know actual allocated size + // TODO: actual weighter on this // TODO: time_to_idle instead? - // TODO: limits from config let blocks_by_hash: BlocksByHashCache = Arc::new(CacheWithTTL::new_with_capacity(10_000, Duration::from_secs(30 * 60)).await); - // .max_capacity(1024 * 1024 * 1024) - // .weigher(|_k, v: &Web3ProxyBlock| { - // 1 + v.block.transactions.len().try_into().unwrap_or(u32::MAX) - // }) - // .time_to_live(Duration::from_secs(30 * 60)) - // .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); // all block numbers are the same size, so no need for weigher // TODO: limits from config @@ -185,7 +117,6 @@ impl Web3Rpcs { blocks_by_hash, blocks_by_number, by_name, - http_interval_sender, max_block_age, max_block_lag, min_head_rpcs, @@ -214,7 +145,7 @@ impl Web3Rpcs { &self, app: &Web3ProxyApp, rpc_configs: HashMap, - ) -> anyhow::Result<()> { + ) -> Web3ProxyResult<()> { // safety checks if rpc_configs.len() < app.config.min_synced_rpcs { // TODO: don't count disabled servers! @@ -232,15 +163,14 @@ impl Web3Rpcs { let sum_soft_limit = rpc_configs.values().fold(0, |acc, x| acc + x.soft_limit); // TODO: require a buffer? - anyhow::ensure!( - sum_soft_limit >= self.min_sum_soft_limit, - "Only {}/{} soft limit! Add more rpcs, increase soft limits, or reduce min_sum_soft_limit.", - sum_soft_limit, - self.min_sum_soft_limit, - ); + if sum_soft_limit < self.min_sum_soft_limit { + return Err(Web3ProxyError::NotEnoughSoftLimit { + available: sum_soft_limit, + needed: self.min_sum_soft_limit, + }); + } // turn configs into connections (in parallel) - // TODO: move this into a helper function. then we can use it when configs change (will need a remove function too) let mut spawn_handles: FuturesUnordered<_> = rpc_configs .into_iter() .filter_map(|(server_name, server_config)| { @@ -261,7 +191,6 @@ impl Web3Rpcs { let pending_tx_id_sender = Some(self.pending_tx_id_sender.clone()); let blocks_by_hash_cache = self.blocks_by_hash.clone(); - let http_interval_sender = self.http_interval_sender.clone(); let chain_id = app.config.chain_id; debug!("spawning {}", server_name); @@ -272,11 +201,9 @@ impl Web3Rpcs { vredis_pool, chain_id, http_client, - http_interval_sender, blocks_by_hash_cache, block_sender, pending_tx_id_sender, - true, )); Some(handle) @@ -303,9 +230,9 @@ impl Web3Rpcs { while new_head_receiver.borrow_and_update().is_none() { new_head_receiver.changed().await?; } - } - old_rpc.disconnect().await.context("disconnect old rpc")?; + // TODO: tell ethers to disconnect? is there a function for that? + } } // TODO: what should we do with the new handle? make sure error logs aren't dropped @@ -313,7 +240,7 @@ impl Web3Rpcs { Ok(Err(err)) => { // if we got an error here, the app can continue on // TODO: include context about which connection failed - // TODO: will this retry automatically? i don't think so + // TODO: retry automatically error!("Unable to create connection. err={:?}", err); } Err(err) => { @@ -323,6 +250,15 @@ impl Web3Rpcs { } } + let num_rpcs = self.by_name.load().len(); + + if num_rpcs < self.min_head_rpcs { + return Err(Web3ProxyError::NotEnoughRpcs { + num_known: num_rpcs, + min_head_rpcs: self.min_head_rpcs, + }); + } + Ok(()) } @@ -350,7 +286,7 @@ impl Web3Rpcs { authorization: Arc, block_receiver: flume::Receiver, pending_tx_sender: Option>, - ) -> anyhow::Result<()> { + ) -> Web3ProxyResult<()> { let mut futures = vec![]; // setup the transaction funnel @@ -435,7 +371,7 @@ impl Web3Rpcs { .into_iter() .map(|active_request_handle| async move { let result: Result, _> = active_request_handle - .request(method, &json!(¶ms), error_level.into(), None) + .request(method, &json!(¶ms), error_level.into()) .await; result }) @@ -508,7 +444,7 @@ impl Web3Rpcs { skip.push(Arc::clone(faster_rpc)); // just because it has lower latency doesn't mean we are sure to get a connection. there might be rate limits - match faster_rpc.try_request_handle(authorization, None).await { + match faster_rpc.try_request_handle(authorization).await { Ok(OpenRequestResult::Handle(handle)) => { trace!("opened handle: {}", faster_rpc); return OpenRequestResult::Handle(handle); @@ -550,6 +486,7 @@ impl Web3Rpcs { skip_rpcs: &mut Vec>, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, + max_wait: Option, ) -> Web3ProxyResult { let mut earliest_retry_at: Option = None; @@ -591,16 +528,14 @@ impl Web3Rpcs { } } } else { - let start = Instant::now(); - - // TODO: get from config or argument - let max_wait = Duration::from_secs(10); + let stop_trying_at = + Instant::now() + max_wait.unwrap_or_else(|| Duration::from_secs(10)); let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe(); let mut potential_rpcs = Vec::with_capacity(self.by_name.load().len()); - while start.elapsed() < max_wait { + loop { let consensus_rpcs = watch_consensus_rpcs.borrow_and_update().clone(); potential_rpcs.clear(); @@ -718,12 +653,16 @@ impl Web3Rpcs { match consensus_rpcs.should_wait_for_block(waiting_for, skip_rpcs) { ShouldWaitForBlock::NeverReady => break, - ShouldWaitForBlock::Ready => continue, - ShouldWaitForBlock::Wait { .. } => {} + ShouldWaitForBlock::Ready => {} + ShouldWaitForBlock::Wait { .. } => select! { + _ = watch_consensus_rpcs.changed() => {}, + _ = sleep_until(stop_trying_at) => {}, + }, } + } - // TODO: select on consensus_rpcs changing and on earliest_retry_at - watch_consensus_rpcs.changed().await?; + if Instant::now() > stop_trying_at { + break; } } } @@ -831,7 +770,7 @@ impl Web3Rpcs { } // check rate limits and increment our connection counter - match rpc.try_request_handle(authorization, None).await { + match rpc.try_request_handle(authorization).await { Ok(OpenRequestResult::RetryAt(retry_at)) => { // this rpc is not available. skip it trace!("{} is rate limited. skipping", rpc); @@ -888,6 +827,7 @@ impl Web3Rpcs { &mut skip_rpcs, min_block_needed, max_block_needed, + None, ) .await? { @@ -908,7 +848,6 @@ impl Web3Rpcs { &request.method, &json!(request.params), RequestErrorHandler::Save, - None, ) .await; @@ -943,7 +882,11 @@ impl Web3Rpcs { let rate_limit_substrings = ["limit", "exceeded", "quota usage"]; for rate_limit_substr in rate_limit_substrings { if error_msg.contains(rate_limit_substr) { - warn!("rate limited by {}", skip_rpcs.last().unwrap()); + warn!( + "rate limited ({}) by {}", + error_msg, + skip_rpcs.last().unwrap() + ); continue; } } @@ -1307,8 +1250,8 @@ mod tests { use std::time::{SystemTime, UNIX_EPOCH}; use super::*; + use crate::rpcs::blockchain::Web3ProxyBlock; use crate::rpcs::consensus::ConsensusFinder; - use crate::rpcs::{blockchain::Web3ProxyBlock, provider::Web3Provider}; use arc_swap::ArcSwap; use ethers::types::H256; use ethers::types::{Block, U256}; @@ -1319,8 +1262,7 @@ mod tests { #[cfg(test)] fn new_peak_latency() -> PeakEwmaLatency { - const NANOS_PER_MILLI: f64 = 1_000_000.0; - PeakEwmaLatency::spawn(1_000.0 * NANOS_PER_MILLI, 4, Duration::from_secs(1)) + PeakEwmaLatency::spawn(Duration::from_secs(1), 4, Duration::from_secs(1)) } #[tokio::test] @@ -1451,7 +1393,6 @@ mod tests { block_data_limit: block_data_limit.into(), tier: 0, head_block: Some(tx_synced), - provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), peak_latency: Some(new_peak_latency()), ..Default::default() }; @@ -1466,7 +1407,6 @@ mod tests { block_data_limit: block_data_limit.into(), tier: 0, head_block: Some(tx_lagged), - provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), peak_latency: Some(new_peak_latency()), ..Default::default() }; @@ -1494,7 +1434,6 @@ mod tests { let rpcs = Web3Rpcs { block_sender: block_sender.clone(), by_name: ArcSwap::from_pointee(rpcs_by_name), - http_interval_sender: None, name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, @@ -1559,6 +1498,7 @@ mod tests { &mut vec![], Some(head_block.number.as_ref().unwrap()), None, + Some(Duration::from_secs(0)), ) .await .unwrap(); @@ -1572,7 +1512,7 @@ mod tests { .send_head_block_result( Ok(Some(lagged_block.clone())), &block_sender, - rpcs.blocks_by_hash.clone(), + &rpcs.blocks_by_hash, ) .await .unwrap(); @@ -1592,7 +1532,7 @@ mod tests { .send_head_block_result( Ok(Some(lagged_block.clone())), &block_sender, - rpcs.blocks_by_hash.clone(), + &rpcs.blocks_by_hash, ) .await .unwrap(); @@ -1624,7 +1564,7 @@ mod tests { .send_head_block_result( Ok(Some(head_block.clone())), &block_sender, - rpcs.blocks_by_hash.clone(), + &rpcs.blocks_by_hash, ) .await .unwrap(); @@ -1650,28 +1590,56 @@ mod tests { // TODO: make sure the handle is for the expected rpc assert!(matches!( - rpcs.wait_for_best_rpc(&authorization, None, &mut vec![], None, None) - .await, + rpcs.wait_for_best_rpc( + &authorization, + None, + &mut vec![], + None, + None, + Some(Duration::from_secs(0)) + ) + .await, Ok(OpenRequestResult::Handle(_)) )); // TODO: make sure the handle is for the expected rpc assert!(matches!( - rpcs.wait_for_best_rpc(&authorization, None, &mut vec![], Some(&0.into()), None) - .await, + rpcs.wait_for_best_rpc( + &authorization, + None, + &mut vec![], + Some(&0.into()), + None, + Some(Duration::from_secs(0)), + ) + .await, Ok(OpenRequestResult::Handle(_)) )); // TODO: make sure the handle is for the expected rpc assert!(matches!( - rpcs.wait_for_best_rpc(&authorization, None, &mut vec![], Some(&1.into()), None) - .await, + rpcs.wait_for_best_rpc( + &authorization, + None, + &mut vec![], + Some(&1.into()), + None, + Some(Duration::from_secs(0)), + ) + .await, Ok(OpenRequestResult::Handle(_)) )); // future block should not get a handle let future_rpc = rpcs - .wait_for_best_rpc(&authorization, None, &mut vec![], Some(&2.into()), None) + .wait_for_best_rpc( + &authorization, + None, + &mut vec![], + Some(&2.into()), + None, + Some(Duration::from_secs(0)), + ) .await; assert!(matches!(future_rpc, Ok(OpenRequestResult::NotReady))); } @@ -1707,7 +1675,6 @@ mod tests { block_data_limit: 64.into(), tier: 1, head_block: Some(tx_pruned), - provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), ..Default::default() }; @@ -1721,7 +1688,6 @@ mod tests { block_data_limit: u64::MAX.into(), tier: 2, head_block: Some(tx_archive), - provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), ..Default::default() }; @@ -1743,11 +1709,9 @@ mod tests { let (watch_consensus_rpcs_sender, _) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); - // TODO: make a Web3Rpcs::new let rpcs = Web3Rpcs { block_sender, by_name: ArcSwap::from_pointee(rpcs_by_name), - http_interval_sender: None, name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, @@ -1802,6 +1766,7 @@ mod tests { &mut vec![], Some(head_block.number()), None, + Some(Duration::from_secs(0)), ) .await; @@ -1813,13 +1778,27 @@ mod tests { )); let _best_available_server_from_none = rpcs - .wait_for_best_rpc(&authorization, None, &mut vec![], None, None) + .wait_for_best_rpc( + &authorization, + None, + &mut vec![], + None, + None, + Some(Duration::from_secs(0)), + ) .await; // assert_eq!(best_available_server, best_available_server_from_none); let best_archive_server = rpcs - .wait_for_best_rpc(&authorization, None, &mut vec![], Some(&1.into()), None) + .wait_for_best_rpc( + &authorization, + None, + &mut vec![], + Some(&1.into()), + None, + Some(Duration::from_secs(0)), + ) .await; match best_archive_server { @@ -1876,7 +1855,6 @@ mod tests { block_data_limit: 64.into(), tier: 0, head_block: Some(tx_mock_geth), - provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), peak_latency: Some(new_peak_latency()), ..Default::default() }; @@ -1889,7 +1867,6 @@ mod tests { block_data_limit: u64::MAX.into(), tier: 1, head_block: Some(tx_mock_erigon_archive), - provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), peak_latency: Some(new_peak_latency()), ..Default::default() }; @@ -1919,7 +1896,6 @@ mod tests { let rpcs = Web3Rpcs { block_sender, by_name: ArcSwap::from_pointee(rpcs_by_name), - http_interval_sender: None, name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index c3d8b34f..c74a156e 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -1,19 +1,19 @@ ///! Rate-limited communication with a web3 provider. use super::blockchain::{ArcBlock, BlocksByHashCache, Web3ProxyBlock}; -use super::provider::Web3Provider; +use super::provider::{connect_http, connect_ws, EthersHttpProvider, EthersWsProvider}; use super::request::{OpenRequestHandle, OpenRequestResult}; -use crate::app::{flatten_handle, AnyhowJoinHandle}; +use crate::app::{flatten_handle, Web3ProxyJoinHandle}; use crate::config::{BlockAndRpc, Web3RpcConfig}; use crate::frontend::authorization::Authorization; use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use crate::rpcs::request::RequestErrorHandler; use anyhow::{anyhow, Context}; -use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; +use ethers::prelude::{Bytes, Middleware, TxHash, U64}; use ethers::types::{Address, Transaction, U256}; use futures::future::try_join_all; use futures::StreamExt; use latency::{EwmaLatency, PeakEwmaLatency}; -use log::{debug, error, info, trace, warn, Level}; +use log::{debug, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; use ordered_float::OrderedFloat; use parking_lot::RwLock; @@ -21,16 +21,12 @@ use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; -use std::borrow::Cow; -use std::cmp::min; use std::convert::Infallible; use std::fmt; use std::hash::{Hash, Hasher}; -use std::sync::atomic::{self, AtomicBool, AtomicU64, AtomicUsize}; +use std::sync::atomic::{self, AtomicU64, AtomicUsize}; use std::{cmp::Ordering, sync::Arc}; -use thread_fast_rng::rand::Rng; -use thread_fast_rng::thread_fast_rng; -use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock}; +use tokio::sync::watch; use tokio::time::{sleep, sleep_until, timeout, Duration, Instant}; use url::Url; @@ -40,17 +36,10 @@ pub struct Web3Rpc { pub name: String, pub display_name: Option, pub db_conn: Option, - pub(super) ws_url: Option, - pub(super) http_url: Option, - /// Some connections use an http_client. we keep a clone for reconnecting - pub(super) http_client: Option, - /// provider is in a RwLock so that we can replace it if re-connecting - /// it is an async lock because we hold it open across awaits - /// this provider is only used for new heads subscriptions - /// TODO: benchmark ArcSwapOption and a watch::Sender - /// TODO: only the websocket provider needs to be behind an asyncrwlock! - /// TODO: the http provider is just an http_client - pub(super) provider: AsyncRwLock>>, + /// most all requests prefer use the http_provider + pub(super) http_provider: Option, + /// the websocket provider is only used for subscriptions + pub(super) ws_provider: Option, /// keep track of hard limits /// this is only inside an Option so that the "Default" derive works. it will always be set. pub(super) hard_limit_until: Option>, @@ -79,7 +68,6 @@ pub struct Web3Rpc { /// TODO: maybe move this to graphana pub(super) total_requests: AtomicUsize, pub(super) active_requests: AtomicUsize, - pub(super) reconnect: AtomicBool, /// this is only inside an Option so that the "Default" derive works. it will always be set. pub(super) disconnect_watch: Option>, pub(super) created_at: Option, @@ -96,14 +84,12 @@ impl Web3Rpc { db_conn: Option, // optional because this is only used for http providers. websocket providers don't use it http_client: Option, - // TODO: rename to http_new_head_interval_sender? - http_interval_sender: Option>>, redis_pool: Option, + block_interval: Duration, block_map: BlocksByHashCache, block_sender: Option>, tx_id_sender: Option)>>, - reconnect: bool, - ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { + ) -> anyhow::Result<(Arc, Web3ProxyJoinHandle<()>)> { let created_at = Instant::now(); let hard_limit = match (config.hard_limit, redis_pool) { @@ -160,16 +146,13 @@ impl Web3Rpc { } } - let (disconnect_sender, disconnect_receiver) = watch::channel(false); - let reconnect = reconnect.into(); - let (head_block, _) = watch::channel(None); // Spawn the task for calculting average peak latency // TODO Should these defaults be in config let peak_latency = PeakEwmaLatency::spawn( // Decay over 15s - Duration::from_secs(15).as_millis() as f64, + Duration::from_secs(15), // Peak requests so far around 5k, we will use an order of magnitude // more to be safe. Should only use about 50mb RAM 50_000, @@ -177,41 +160,49 @@ impl Web3Rpc { Duration::from_secs(1), ); - let http_url = if let Some(http_url) = config.http_url { - Some(http_url.parse()?) + let http_provider = if let Some(http_url) = config.http_url { + let http_url = http_url.parse::()?; + + Some(connect_http(http_url, http_client, block_interval)?) + + // TODO: check the provider is on the right chain } else { None }; - let ws_url = if let Some(ws_url) = config.ws_url { - Some(ws_url.parse()?) + let ws_provider = if let Some(ws_url) = config.ws_url { + let ws_url = ws_url.parse::()?; + + Some(connect_ws(ws_url, usize::MAX).await?) + + // TODO: check the provider is on the right chain } else { None }; - let new_connection = Self { - name, - db_conn: db_conn.clone(), - display_name: config.display_name, - http_client, - ws_url, - http_url, - hard_limit, - hard_limit_until: Some(hard_limit_until), - soft_limit: config.soft_limit, + let (disconnect_watch, _) = watch::channel(false); + + let new_rpc = Self { automatic_block_limit, backup, block_data_limit, - reconnect, - tier: config.tier, - disconnect_watch: Some(disconnect_sender), created_at: Some(created_at), + db_conn: db_conn.clone(), + display_name: config.display_name, + hard_limit, + hard_limit_until: Some(hard_limit_until), head_block: Some(head_block), + http_provider, + name, peak_latency: Some(peak_latency), + soft_limit: config.soft_limit, + tier: config.tier, + ws_provider, + disconnect_watch: Some(disconnect_watch), ..Default::default() }; - let new_connection = Arc::new(new_connection); + let new_connection = Arc::new(new_rpc); // subscribe to new blocks and new transactions // subscribing starts the connection (with retries) @@ -226,8 +217,6 @@ impl Web3Rpc { block_map, block_sender, chain_id, - disconnect_receiver, - http_interval_sender, tx_id_sender, ) .await @@ -238,25 +227,22 @@ impl Web3Rpc { } pub fn peak_ewma(&self) -> OrderedFloat { - // TODO: bug inside peak ewma somewhere. possible with atomics being relaxed or the conversion to pair and back - // let peak_latency = if let Some(peak_latency) = self.peak_latency.as_ref() { - // peak_latency.latency().as_secs_f64() - // } else { - // 0.0 - // }; - let head_latency = self.head_latency.read().value(); + let peak_latency = if let Some(peak_latency) = self.peak_latency.as_ref() { + peak_latency.latency().as_secs_f64() + } else { + 1.0 + }; // TODO: what ordering? let active_requests = self.active_requests.load(atomic::Ordering::Acquire) as f64 + 1.0; - OrderedFloat(head_latency * active_requests) + OrderedFloat(peak_latency * active_requests) } // TODO: would be great if rpcs exposed this. see https://github.com/ledgerwatch/erigon/issues/6391 async fn check_block_data_limit( self: &Arc, authorization: &Arc, - unlocked_provider: Option>, ) -> anyhow::Result> { if !self.automatic_block_limit { // TODO: is this a good thing to return? @@ -270,16 +256,12 @@ impl Web3Rpc { // TODO: binary search between 90k and max? // TODO: start at 0 or 1? for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] { - let handle = self - .wait_for_request_handle(authorization, None, unlocked_provider.clone()) - .await?; - - let head_block_num_future = handle.request::, U256>( + let head_block_num_future = self.request::, U256>( "eth_blockNumber", &None, // error here are expected, so keep the level low Level::Debug.into(), - unlocked_provider.clone(), + authorization.clone(), ); let head_block_num = timeout(Duration::from_secs(5), head_block_num_future) @@ -297,11 +279,7 @@ impl Web3Rpc { // TODO: wait for the handle BEFORE we check the current block number. it might be delayed too! // TODO: what should the request be? - let handle = self - .wait_for_request_handle(authorization, None, unlocked_provider.clone()) - .await?; - - let archive_result: Result = handle + let archive_result: Result = self .request( "eth_getCode", &json!(( @@ -310,7 +288,7 @@ impl Web3Rpc { )), // error here are expected, so keep the level low Level::Trace.into(), - unlocked_provider.clone(), + authorization.clone(), ) .await; @@ -388,231 +366,74 @@ impl Web3Rpc { true } - /// reconnect to the provider. errors are retried forever with exponential backoff with jitter. - /// We use the "Decorrelated" jitter from - /// TODO: maybe it would be better to use "Full Jitter". The "Full Jitter" approach uses less work, but slightly more time. - pub async fn retrying_connect( - self: &Arc, - block_sender: Option<&flume::Sender>, - chain_id: u64, - db_conn: Option<&DatabaseConnection>, - delay_start: bool, - ) -> anyhow::Result<()> { - // there are several crates that have retry helpers, but they all seem more complex than necessary - // TODO: move this backoff logic into a helper function so we can use it when doing database locking - let base_ms = 500; - let cap_ms = 30_000; - let range_multiplier = 3; + /// query the web3 provider to confirm it is on the expected chain with the expected data available + async fn check_provider(self: &Arc, chain_id: u64) -> Web3ProxyResult<()> { + let authorization = Arc::new(Authorization::internal(self.db_conn.clone())?); - // sleep once before the initial retry attempt - // TODO: now that we use this method for our initial connection, do we still want this sleep? - let mut sleep_ms = if delay_start { - let first_sleep_ms = min( - cap_ms, - thread_fast_rng().gen_range(base_ms..(base_ms * range_multiplier)), - ); - let reconnect_in = Duration::from_millis(first_sleep_ms); + // check the server's chain_id here + // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error + // TODO: what should the timeout be? should there be a request timeout? + // trace!("waiting on chain id for {}", self); + let found_chain_id: Result = self + .request( + "eth_chainId", + &json!(Vec::<()>::new()), + Level::Trace.into(), + authorization.clone(), + ) + .await; + trace!("found_chain_id: {:#?}", found_chain_id); - info!("Reconnect to {} in {}ms", self, reconnect_in.as_millis()); - - sleep(reconnect_in).await; - - first_sleep_ms - } else { - base_ms - }; - - // retry until we succeed - while let Err(err) = self.connect(block_sender, chain_id, db_conn).await { - // thread_rng is crytographically secure. we don't need that here. use thread_fast_rng instead - // TODO: min of 1 second? sleep longer if rate limited? - sleep_ms = min( - cap_ms, - thread_fast_rng().gen_range(base_ms..(sleep_ms * range_multiplier)), - ); - - let retry_in = Duration::from_millis(sleep_ms); - - let error_level = if self.backup { - log::Level::Debug - } else { - log::Level::Info - }; - - log::log!( - error_level, - "Failed (re)connect to {}! Retry in {}ms. err={:?}", - self, - retry_in.as_millis(), - err, - ); - - sleep(retry_in).await; + match found_chain_id { + Ok(found_chain_id) => { + // TODO: there has to be a cleaner way to do this + if chain_id != found_chain_id.as_u64() { + return Err(anyhow::anyhow!( + "incorrect chain id! Config has {}, but RPC has {}", + chain_id, + found_chain_id + ) + .context(format!("failed @ {}", self)) + .into()); + } + } + Err(e) => { + return Err(anyhow::Error::from(e) + .context(format!("unable to parse eth_chainId from {}", self)) + .into()); + } } - Ok(()) - } + self.check_block_data_limit(&authorization) + .await + .context(format!("unable to check_block_data_limit of {}", self))?; - /// connect to the web3 provider - async fn connect( - self: &Arc, - block_sender: Option<&flume::Sender>, - chain_id: u64, - db_conn: Option<&DatabaseConnection>, - ) -> anyhow::Result<()> { - if let Ok(mut unlocked_provider) = self.provider.try_write() { - #[cfg(test)] - if let Some(Web3Provider::Mock) = unlocked_provider.as_deref() { - return Ok(()); - } - - *unlocked_provider = if let Some(ws_url) = self.ws_url.as_ref() { - // set up ws client - match &*unlocked_provider { - None => { - info!("connecting to {}", self); - } - Some(_) => { - debug!("reconnecting to {}", self); - - // tell the block subscriber that this rpc doesn't have any blocks - if let Some(block_sender) = block_sender { - block_sender - .send_async((None, self.clone())) - .await - .context("block_sender during connect")?; - } - - // reset sync status - self.head_block - .as_ref() - .expect("head_block should always be set") - .send_replace(None); - - // disconnect the current provider - // TODO: what until the block_sender's receiver finishes updating this item? - *unlocked_provider = None; - } - } - - let p = Web3Provider::new(Cow::Borrowed(ws_url), None) - .await - .context(format!("failed connecting to {}", ws_url))?; - - assert!(p.ws().is_some()); - - Some(Arc::new(p)) - } else { - // http client - if let Some(url) = &self.http_url { - let p = Web3Provider::new(Cow::Borrowed(url), self.http_client.clone()) - .await - .context(format!("failed connecting to {}", url))?; - - assert!(p.http().is_some()); - - Some(Arc::new(p)) - } else { - None - } - }; - - let authorization = Arc::new(Authorization::internal(db_conn.cloned())?); - - // check the server's chain_id here - // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error - // TODO: what should the timeout be? should there be a request timeout? - // trace!("waiting on chain id for {}", self); - let found_chain_id: Result = self - .wait_for_request_handle(&authorization, None, unlocked_provider.clone()) - .await - .context(format!("waiting for request handle on {}", self))? - .request( - "eth_chainId", - &json!(Vec::<()>::new()), - Level::Trace.into(), - unlocked_provider.clone(), - ) - .await; - trace!("found_chain_id: {:#?}", found_chain_id); - - match found_chain_id { - Ok(found_chain_id) => { - // TODO: there has to be a cleaner way to do this - if chain_id != found_chain_id.as_u64() { - return Err(anyhow::anyhow!( - "incorrect chain id! Config has {}, but RPC has {}", - chain_id, - found_chain_id - ) - .context(format!("failed @ {}", self))); - } - } - Err(e) => { - return Err(anyhow::Error::from(e) - .context(format!("unable to parse eth_chainId from {}", self))); - } - } - - self.check_block_data_limit(&authorization, unlocked_provider.clone()) - .await - .context(format!("unable to check_block_data_limit of {}", self))?; - - drop(unlocked_provider); - - info!("successfully connected to {}", self); - } else if self.provider.read().await.is_none() { - return Err(anyhow!("failed waiting for client {}", self)); - }; - - Ok(()) - } - - pub async fn disconnect(&self) -> anyhow::Result<()> { - let age = self.created_at.unwrap().elapsed().as_secs(); - - info!("disconnecting {} ({}s old)", self, age); - - self.reconnect.store(false, atomic::Ordering::Release); - - if let Err(err) = self.disconnect_watch.as_ref().unwrap().send(true) { - warn!("failed sending disconnect watch: {:?}", err); - }; - - trace!("disconnecting (locking) {} ({}s old)", self, age); - - let mut provider = self.provider.write().await; - - trace!("disconnecting (clearing provider) {} ({}s old)", self, age); - - *provider = None; + info!("successfully connected to {}", self); Ok(()) } pub(crate) async fn send_head_block_result( self: &Arc, - new_head_block: Result, ProviderError>, + new_head_block: Web3ProxyResult>, block_sender: &flume::Sender, - block_map: BlocksByHashCache, - ) -> anyhow::Result<()> { + block_map: &BlocksByHashCache, + ) -> Web3ProxyResult<()> { let new_head_block = match new_head_block { Ok(None) => { - { - let head_block_tx = self.head_block.as_ref().unwrap(); + let head_block_tx = self.head_block.as_ref().unwrap(); - if head_block_tx.borrow().is_none() { - // we previously sent a None. return early - return Ok(()); - } - - let age = self.created_at.unwrap().elapsed().as_millis(); - - debug!("clearing head block on {} ({}ms old)!", self, age); - - head_block_tx.send_replace(None); + if head_block_tx.borrow().is_none() { + // we previously sent a None. return early + return Ok(()); } + let age = self.created_at.unwrap().elapsed().as_millis(); + + debug!("clearing head block on {} ({}ms old)!", self, age); + + head_block_tx.send_replace(None); + None } Ok(Some(new_head_block)) => { @@ -627,7 +448,8 @@ impl Web3Rpc { &new_hash, async move { Ok(new_head_block) }, ) - .await?; + .await + .expect("this cache get is infallible"); // save the block so we don't send the same one multiple times // also save so that archive checks can know how far back to query @@ -638,7 +460,7 @@ impl Web3Rpc { if self.block_data_limit() == U64::zero() { let authorization = Arc::new(Authorization::internal(self.db_conn.clone())?); - if let Err(err) = self.check_block_data_limit(&authorization, None).await { + if let Err(err) = self.check_block_data_limit(&authorization).await { warn!( "failed checking block limit after {} finished syncing. {:?}", self, err @@ -670,9 +492,61 @@ impl Web3Rpc { *self.disconnect_watch.as_ref().unwrap().borrow() } - /// subscribe to blocks and transactions with automatic reconnects + async fn healthcheck( + self: &Arc, + authorization: &Arc, + error_handler: RequestErrorHandler, + ) -> Web3ProxyResult<()> { + let head_block = self.head_block.as_ref().unwrap().borrow().clone(); + + if let Some(head_block) = head_block { + let head_block = head_block.block; + + // TODO: if head block is very old and not expected to be syncing, emit warning + + let block_number = head_block.number.context("no block number")?; + + let to = if let Some(txid) = head_block.transactions.last().cloned() { + let tx = self + .request::<_, Option>( + "eth_getTransactionByHash", + &(txid,), + error_handler, + authorization.clone(), + ) + .await? + .context("no transaction")?; + + // TODO: what default? something real? + tx.to.unwrap_or_else(|| { + "0xdead00000000000000000000000000000000beef" + .parse::
() + .expect("deafbeef") + }) + } else { + "0xdead00000000000000000000000000000000beef" + .parse::
() + .expect("deafbeef") + }; + + let _code = self + .request::<_, Option>( + "eth_getCode", + &(to, block_number), + error_handler, + authorization.clone(), + ) + .await?; + } else { + // TODO: if head block is none for too long, give an error + } + + Ok(()) + } + + /// subscribe to blocks and transactions /// This should only exit when the program is exiting. - /// TODO: should more of these args be on self? + /// TODO: should more of these args be on self? chain_id for sure #[allow(clippy::too_many_arguments)] async fn subscribe( self: Arc, @@ -680,220 +554,97 @@ impl Web3Rpc { block_map: BlocksByHashCache, block_sender: Option>, chain_id: u64, - disconnect_receiver: watch::Receiver, - http_interval_sender: Option>>, tx_id_sender: Option)>>, - ) -> anyhow::Result<()> { + ) -> Web3ProxyResult<()> { let error_handler = if self.backup { RequestErrorHandler::DebugLevel } else { RequestErrorHandler::ErrorLevel }; - let mut delay_start = false; + debug!("starting subscriptions on {}", self); - // this does loop. just only when reconnect is enabled - #[allow(clippy::never_loop)] - loop { - trace!("subscription loop started on {}", self); + self.check_provider(chain_id).await?; - let mut futures = vec![]; + let mut futures = vec![]; - let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe()); + // health check that runs if there haven't been any recent requests + { + // TODO: move this into a proper function + let authorization = authorization.clone(); + let rpc = self.clone(); - { - // TODO: move this into a proper function - let authorization = authorization.clone(); - let block_sender = block_sender.clone(); - let rpc = self.clone(); - let (ready_tx, ready_rx) = oneshot::channel(); - let f = async move { - // initial sleep to allow for the initial connection - rpc.retrying_connect( - block_sender.as_ref(), - chain_id, - authorization.db_conn.as_ref(), - delay_start, - ) - .await?; + // TODO: how often? different depending on the chain? + // TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though + let health_sleep_seconds = 10; - // provider is ready - ready_tx.send(()).unwrap(); + // health check loop + let f = async move { + // TODO: benchmark this and lock contention + let mut old_total_requests = 0; + let mut new_total_requests; - // TODO: how often? different depending on the chain? - // TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though - let health_sleep_seconds = 10; + // TODO: errors here should not cause the loop to exit! + while !rpc.should_disconnect() { + new_total_requests = rpc.total_requests.load(atomic::Ordering::Relaxed); - // TODO: benchmark this and lock contention - let mut old_total_requests = 0; - let mut new_total_requests; - - // health check loop - loop { - // TODO: do we need this to be abortable? - if rpc.should_disconnect() { - break; - } - - sleep(Duration::from_secs(health_sleep_seconds)).await; - - trace!("health check on {}", rpc); - - // TODO: what if we just happened to have this check line up with another restart? - // TODO: think more about this - if let Some(client) = rpc.provider.read().await.clone() { - // health check as a way of keeping this rpc's request_ewma accurate - // TODO: do something different if this is a backup server? - - new_total_requests = rpc.total_requests.load(atomic::Ordering::Acquire); - - // TODO: how many requests should we require in order to skip a health check? - if new_total_requests - old_total_requests < 10 { - // TODO: if this fails too many times, reset the connection - // TODO: move this into a function and the chaining should be easier - let head_block = rpc.head_block.as_ref().unwrap().borrow().clone(); - - if let Some((block_number, txid)) = head_block.and_then(|x| { - let block = x.block; - - let block_number = block.number?; - let txid = block.transactions.last().cloned()?; - - Some((block_number, txid)) - }) { - let to = rpc - .wait_for_query::<_, Option>( - "eth_getTransactionByHash", - &(txid,), - error_handler, - authorization.clone(), - Some(client.clone()), - ) - .await - .and_then(|tx| { - let tx = tx.context("no transaction found")?; - - // TODO: what default? something real? - let to = tx.to.unwrap_or_else(|| { - "0xdead00000000000000000000000000000000beef" - .parse::
() - .expect("deafbeef") - }); - - Ok(to) - }); - - let code = match to { - Err(err) => { - if rpc.backup { - debug!( - "{} failed health check query! {:#?}", - rpc, err - ); - } else { - warn!( - "{} failed health check query! {:#?}", - rpc, err - ); - } - continue; - } - Ok(to) => { - rpc.wait_for_query::<_, Option>( - "eth_getCode", - &(to, block_number), - error_handler, - authorization.clone(), - Some(client), - ) - .await - } - }; - - if let Err(err) = code { - if rpc.backup { - debug!("{} failed health check query! {:#?}", rpc, err); - } else { - warn!("{} failed health check query! {:#?}", rpc, err); - } - continue; - } - } - } - - old_total_requests = new_total_requests; + if new_total_requests - old_total_requests < 10 { + // TODO: if this fails too many times, reset the connection + // TODO: move this into a function and the chaining should be easier + if let Err(err) = rpc.healthcheck(&authorization, error_handler).await { + // TODO: different level depending on the error handler + warn!("health checking {} failed: {:?}", rpc, err); } } - debug!("health checks for {} exited", rpc); - Ok(()) - }; + // TODO: should we count the requests done inside this health check + old_total_requests = new_total_requests; - futures.push(flatten_handle(tokio::spawn(f))); - - // wait on the initial connection - ready_rx.await?; - } - - if let Some(block_sender) = &block_sender { - // TODO: do we need this to be abortable? - let f = self.clone().subscribe_new_heads( - authorization.clone(), - http_interval_receiver, - block_sender.clone(), - block_map.clone(), - ); - - futures.push(flatten_handle(tokio::spawn(f))); - } - - if let Some(tx_id_sender) = &tx_id_sender { - // TODO: do we need this to be abortable? - let f = self - .clone() - .subscribe_pending_transactions(authorization.clone(), tx_id_sender.clone()); - - futures.push(flatten_handle(tokio::spawn(f))); - } - - match try_join_all(futures).await { - Ok(_) => { - // future exited without error - // TODO: think about this more. we never set it to false. this can't be right - break; + sleep(Duration::from_secs(health_sleep_seconds)).await; } - Err(err) => { - let disconnect_sender = self.disconnect_watch.as_ref().unwrap(); - if self.reconnect.load(atomic::Ordering::Acquire) { - warn!("{} connection ended. reconnecting. err={:?}", self, err); + debug!("healthcheck loop on {} exited", rpc); - // TODO: i'm not sure if this is necessary, but telling everything to disconnect seems like a better idea than relying on timeouts and dropped futures. - disconnect_sender.send_replace(true); - disconnect_sender.send_replace(false); + Ok(()) + }; - // we call retrying_connect here with initial_delay=true. above, initial_delay=false - delay_start = true; - - continue; - } - - // reconnect is not enabled. - if *disconnect_receiver.borrow() { - info!("{} is disconnecting", self); - break; - } else { - error!("{} subscription exited. err={:?}", self, err); - - disconnect_sender.send_replace(true); - - break; - } - } - } + futures.push(flatten_handle(tokio::spawn(f))); } - info!("all subscriptions on {} completed", self); + // subscribe to new heads + if let Some(block_sender) = &block_sender { + // TODO: do we need this to be abortable? + let f = self.clone().subscribe_new_heads( + authorization.clone(), + block_sender.clone(), + block_map.clone(), + ); + + futures.push(flatten_handle(tokio::spawn(f))); + } + + // subscribe pending transactions + // TODO: make this opt-in. its a lot of bandwidth + if let Some(tx_id_sender) = tx_id_sender { + // TODO: do we need this to be abortable? + let f = self + .clone() + .subscribe_pending_transactions(authorization.clone(), tx_id_sender); + + futures.push(flatten_handle(tokio::spawn(f))); + } + + // try_join on the futures + if let Err(err) = try_join_all(futures).await { + warn!("subscription erred: {:?}", err); + } + + debug!("subscriptions on {} exited", self); + + self.disconnect_watch + .as_ref() + .expect("disconnect_watch should always be set") + .send_replace(true); Ok(()) } @@ -902,195 +653,76 @@ impl Web3Rpc { async fn subscribe_new_heads( self: Arc, authorization: Arc, - http_interval_receiver: Option>, block_sender: flume::Sender, block_map: BlocksByHashCache, - ) -> anyhow::Result<()> { - trace!("watching new heads on {}", self); + ) -> Web3ProxyResult<()> { + debug!("subscribing to new heads on {}", self); - let provider = self.wait_for_provider().await; + if let Some(ws_provider) = self.ws_provider.as_ref() { + // todo: move subscribe_blocks onto the request handle + let active_request_handle = self.wait_for_request_handle(&authorization, None).await; + let mut blocks = ws_provider.subscribe_blocks().await?; + drop(active_request_handle); - match provider.as_ref() { - Web3Provider::Http(_client) => { - // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints - // TODO: try watch_blocks and fall back to this? + // query the block once since the subscription doesn't send the current block + // there is a very small race condition here where the stream could send us a new block right now + // but all seeing the same block twice won't break anything + // TODO: how does this get wrapped in an arc? does ethers handle that? + // TODO: can we force this to use the websocket? + let latest_block: Result, _> = self + .request( + "eth_getBlockByNumber", + &json!(("latest", false)), + Level::Warn.into(), + authorization, + ) + .await; - let mut http_interval_receiver = http_interval_receiver.unwrap(); + self.send_head_block_result(latest_block, &block_sender, &block_map) + .await?; - let mut last_hash = H256::zero(); - - while !self.should_disconnect() { - // TODO: what should the max_wait be? - // we do not pass unlocked_provider because we want to get a new one each call. otherwise we might re-use an old one - match self - .wait_for_request_handle(&authorization, None, None) - .await - { - Ok(active_request_handle) => { - let block: Result, _> = active_request_handle - .request( - "eth_getBlockByNumber", - &json!(("latest", false)), - Level::Warn.into(), - None, - ) - .await; - - match block { - Ok(None) => { - warn!("no head block on {}", self); - - self.send_head_block_result( - Ok(None), - &block_sender, - block_map.clone(), - ) - .await?; - } - Ok(Some(block)) => { - if let Some(new_hash) = block.hash { - // don't send repeat blocks - if new_hash != last_hash { - // new hash! - last_hash = new_hash; - - self.send_head_block_result( - Ok(Some(block)), - &block_sender, - block_map.clone(), - ) - .await?; - } - } else { - // TODO: why is this happening? - warn!("empty head block on {}", self); - - self.send_head_block_result( - Ok(None), - &block_sender, - block_map.clone(), - ) - .await?; - } - } - Err(err) => { - // we did not get a block back. something is up with the server. take it out of rotation - self.send_head_block_result( - Err(err), - &block_sender, - block_map.clone(), - ) - .await?; - } - } - } - Err(err) => { - warn!("Internal error on latest block from {}. {:?}", self, err); - - self.send_head_block_result(Ok(None), &block_sender, block_map.clone()) - .await?; - - // TODO: what should we do? sleep? extra time? - } - } - - // wait for the next interval - // TODO: if error or rate limit, increase interval? - while let Err(err) = http_interval_receiver.recv().await { - match err { - broadcast::error::RecvError::Closed => { - // channel is closed! that's not good. bubble the error up - return Err(err.into()); - } - broadcast::error::RecvError::Lagged(lagged) => { - // querying the block was delayed - // this can happen if tokio is very busy or waiting for requests limits took too long - if self.backup { - debug!("http interval on {} lagging by {}!", self, lagged); - } else { - warn!("http interval on {} lagging by {}!", self, lagged); - } - } - } - } + while let Some(block) = blocks.next().await { + if self.should_disconnect() { + break; } + + let block = Arc::new(block); + + self.send_head_block_result(Ok(Some(block)), &block_sender, &block_map) + .await?; } - Web3Provider::Both(_, client) | Web3Provider::Ws(client) => { - // todo: move subscribe_blocks onto the request handle? - let active_request_handle = self - .wait_for_request_handle(&authorization, None, Some(provider.clone())) - .await; - let mut stream = client.subscribe_blocks().await?; - drop(active_request_handle); + } else if let Some(http_provider) = self.http_provider.as_ref() { + // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints + let mut blocks = http_provider.watch_blocks().await?; - // query the block once since the subscription doesn't send the current block - // there is a very small race condition here where the stream could send us a new block right now - // but all that does is print "new block" for the same block as current block - // TODO: how does this get wrapped in an arc? does ethers handle that? - // TODO: do this part over http? - let block: Result, _> = self - .wait_for_request_handle(&authorization, None, Some(provider.clone())) - .await? - .request( - "eth_getBlockByNumber", - &json!(("latest", false)), - Level::Warn.into(), - Some(provider.clone()), - ) - .await; + while let Some(block_hash) = blocks.next().await { + if self.should_disconnect() { + break; + } - let mut last_hash = match &block { - Ok(Some(new_block)) => new_block - .hash - .expect("blocks should always have a hash here"), - _ => H256::zero(), + let block = if let Some(block) = block_map.get(&block_hash) { + block.block + } else if let Some(block) = http_provider.get_block(block_hash).await? { + Arc::new(block) + } else { + continue; }; - self.send_head_block_result(block, &block_sender, block_map.clone()) + self.send_head_block_result(Ok(Some(block)), &block_sender, &block_map) .await?; - - while let Some(new_block) = stream.next().await { - // TODO: select on disconnect_watch instead of waiting for a block to arrive - if self.should_disconnect() { - break; - } - - // TODO: check the new block's hash to be sure we don't send dupes - let new_hash = new_block - .hash - .expect("blocks should always have a hash here"); - - if new_hash == last_hash { - // some rpcs like to give us duplicates. don't waste our time on them - continue; - } else { - last_hash = new_hash; - } - - self.send_head_block_result( - Ok(Some(Arc::new(new_block))), - &block_sender, - block_map.clone(), - ) - .await?; - } - - // TODO: is this always an error? - // TODO: we probably don't want a warn and to return error - debug!("new_heads subscription to {} ended", self); } - #[cfg(test)] - Web3Provider::Mock => unimplemented!(), + } else { + unimplemented!("no ws or http provider!") } // clear the head block. this might not be needed, but it won't hurt - self.send_head_block_result(Ok(None), &block_sender, block_map) + self.send_head_block_result(Ok(None), &block_sender, &block_map) .await?; if self.should_disconnect() { Ok(()) } else { - Err(anyhow!("new_heads subscription exited. reconnect needed")) + Err(anyhow!("new_heads subscription exited. reconnect needed").into()) } } @@ -1099,11 +731,11 @@ impl Web3Rpc { self: Arc, authorization: Arc, tx_id_sender: flume::Sender<(TxHash, Arc)>, - ) -> anyhow::Result<()> { - // TODO: give this a separate client. don't use new_head_client for everything. especially a firehose this big - // TODO: timeout - let provider = self.wait_for_provider().await; + ) -> Web3ProxyResult<()> { + // TODO: make this subscription optional + self.wait_for_disconnect().await?; + /* trace!("watching pending transactions on {}", self); // TODO: does this keep the lock open for too long? match provider.as_ref() { @@ -1144,32 +776,25 @@ impl Web3Rpc { self.wait_for_disconnect().await?; } } + */ if self.should_disconnect() { Ok(()) } else { - Err(anyhow!( - "pending_transactions subscription exited. reconnect needed" - )) + Err(anyhow!("pending_transactions subscription exited. reconnect needed").into()) } } /// be careful with this; it might wait forever! - /// `allow_not_ready` is only for use by health checks while starting the provider - /// TODO: don't use anyhow. use specific error type pub async fn wait_for_request_handle<'a>( self: &'a Arc, authorization: &'a Arc, max_wait: Option, - unlocked_provider: Option>, ) -> Web3ProxyResult { let max_wait = max_wait.map(|x| Instant::now() + x); loop { - match self - .try_request_handle(authorization, unlocked_provider.clone()) - .await - { + match self.try_request_handle(authorization).await { Ok(OpenRequestResult::Handle(handle)) => return Ok(handle), Ok(OpenRequestResult::RetryAt(retry_at)) => { // TODO: emit a stat? @@ -1203,7 +828,7 @@ impl Web3Rpc { } // TODO: sleep how long? maybe just error? - // TODO: instead of an arbitrary sleep, subscribe to the head block on this + // TODO: instead of an arbitrary sleep, subscribe to the head block on this? sleep(Duration::from_millis(10)).await; } Err(err) => return Err(err), @@ -1214,18 +839,8 @@ impl Web3Rpc { pub async fn try_request_handle( self: &Arc, authorization: &Arc, - // TODO: borrow on this instead of needing to clone the Arc? - unlocked_provider: Option>, ) -> Web3ProxyResult { - // TODO: think more about this read block - // TODO: this should *not* be new_head_client. this should be a separate object - if unlocked_provider.is_some() || self.provider.read().await.is_some() { - // we already have an unlocked provider. no need to lock - } else { - warn!("no provider on {}", self); - // TODO: wait for provider? that will probably slow us down more than we want - return Ok(OpenRequestResult::NotReady); - } + // TODO: if websocket is reconnecting, return an error? // check cached rate limits if let Some(hard_limit_until) = self.hard_limit_until.as_ref() { @@ -1278,63 +893,39 @@ impl Web3Rpc { } async fn wait_for_disconnect(&self) -> Result<(), tokio::sync::watch::error::RecvError> { - let mut disconnect_watch = self.disconnect_watch.as_ref().unwrap().subscribe(); + let mut disconnect_subscription = self.disconnect_watch.as_ref().unwrap().subscribe(); loop { - if *disconnect_watch.borrow_and_update() { + if *disconnect_subscription.borrow_and_update() { // disconnect watch is set to "true" return Ok(()); } - // wait for disconnect_watch to change - disconnect_watch.changed().await?; + // wait for disconnect_subscription to change + disconnect_subscription.changed().await?; } } - async fn wait_for_provider(&self) -> Arc { - let mut provider = self.provider.read().await.clone(); - - let mut logged = false; - while provider.is_none() { - // trace!("waiting on unlocked_provider: locking..."); - sleep(Duration::from_millis(100)).await; - - if !logged { - debug!("waiting for provider on {}", self); - logged = true; - } - - provider = self.provider.read().await.clone(); - } - - provider.unwrap() - } - - pub async fn wait_for_query( + pub async fn request( self: &Arc, method: &str, params: &P, revert_handler: RequestErrorHandler, authorization: Arc, - unlocked_provider: Option>, - ) -> anyhow::Result + ) -> Web3ProxyResult where // TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it P: Clone + fmt::Debug + serde::Serialize + Send + Sync + 'static, R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug + Send, { - self.wait_for_request_handle(&authorization, None, None) + // TODO: take max_wait as a function argument? + let x = self + .wait_for_request_handle(&authorization, None) .await? - .request::(method, params, revert_handler, unlocked_provider) - .await - .context("ProviderError from the backend") - } -} + .request::(method, params, revert_handler) + .await?; -impl fmt::Debug for Web3Provider { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url - f.debug_struct("Web3Provider").finish_non_exhaustive() + Ok(x) } } @@ -1342,8 +933,10 @@ impl Hash for Web3Rpc { fn hash(&self, state: &mut H) { self.name.hash(state); self.display_name.hash(state); - self.http_url.hash(state); - self.ws_url.hash(state); + self.http_provider.as_ref().map(|x| x.url()).hash(state); + // TODO: figure out how to get the url for the provider + // TODO: url does NOT include the authorization data. i think created_at should protect us if auth changes without anything else + // self.ws_provider.map(|x| x.url()).hash(state); self.automatic_block_limit.hash(state); self.backup.hash(state); // TODO: don't include soft_limit if we change them to be dynamic @@ -1419,14 +1012,14 @@ impl Serialize for Web3Rpc { &self.active_requests.load(atomic::Ordering::Relaxed), )?; - state.serialize_field("head_latency", &self.head_latency.read().value())?; + state.serialize_field("head_latency_ms", &self.head_latency.read().value())?; state.serialize_field( - "peak_latency", - &self.peak_latency.as_ref().unwrap().latency(), + "peak_latency_ms", + &self.peak_latency.as_ref().unwrap().latency().as_millis(), )?; - state.serialize_field("peak_ewma", self.peak_ewma().as_ref())?; + state.serialize_field("peak_ewma_s", self.peak_ewma().as_ref())?; state.end() } @@ -1459,7 +1052,7 @@ impl fmt::Display for Web3Rpc { mod tests { #![allow(unused_imports)] use super::*; - use ethers::types::{Block, U256}; + use ethers::types::{Block, H256, U256}; #[test] fn test_archive_node_has_block_data() { @@ -1481,7 +1074,6 @@ mod tests { let x = Web3Rpc { name: "name".to_string(), - ws_url: Some("ws://example.com".parse::().unwrap()), soft_limit: 1_000, automatic_block_limit: false, backup: false, diff --git a/web3_proxy/src/rpcs/provider.rs b/web3_proxy/src/rpcs/provider.rs index a70bc88f..45c82147 100644 --- a/web3_proxy/src/rpcs/provider.rs +++ b/web3_proxy/src/rpcs/provider.rs @@ -1,96 +1,80 @@ -use anyhow::anyhow; -use derive_more::From; use ethers::providers::{Authorization, ConnectionDetails}; -use std::{borrow::Cow, time::Duration}; +use std::time::Duration; use url::Url; // TODO: our own structs for these that handle streaming large responses -type EthersHttpProvider = ethers::providers::Provider; -type EthersWsProvider = ethers::providers::Provider; +pub type EthersHttpProvider = ethers::providers::Provider; +pub type EthersWsProvider = ethers::providers::Provider; -/// Use HTTP and WS providers. -// TODO: instead of an enum, I tried to use Box, but hit -// TODO: custom types that let us stream JSON responses -#[derive(From)] -pub enum Web3Provider { - Both(EthersHttpProvider, EthersWsProvider), - Http(EthersHttpProvider), - // TODO: deadpool? custom tokio-tungstenite - Ws(EthersWsProvider), - #[cfg(test)] - Mock, -} +pub fn extract_auth(url: &mut Url) -> Option { + if let Some(pass) = url.password().map(|x| x.to_string()) { + // to_string is needed because we are going to remove these items from the url + let user = url.username().to_string(); -impl Web3Provider { - pub fn http(&self) -> Option<&EthersHttpProvider> { - match self { - Self::Http(x) => Some(x), - _ => None, - } - } + // clear username and password from the url + url.set_username("") + .expect("unable to clear username on websocket"); + url.set_password(None) + .expect("unable to clear password on websocket"); - pub fn ws(&self) -> Option<&EthersWsProvider> { - match self { - Self::Both(_, x) | Self::Ws(x) => Some(x), - _ => None, - } - } - - /// Note, if the http url has an authority the http_client param is ignored and a dedicated http_client will be used - /// TODO: take a reqwest::Client or a reqwest::ClientBuilder. that way we can do things like set compression even when auth is set - pub async fn new( - mut url: Cow<'_, Url>, - http_client: Option, - ) -> anyhow::Result { - let auth = if let Some(pass) = url.password().map(|x| x.to_string()) { - // to_string is needed because we are going to remove these items from the url - let user = url.username().to_string(); - - // clear username and password from the url - let mut_url = url.to_mut(); - - mut_url - .set_username("") - .map_err(|_| anyhow!("unable to clear username on websocket"))?; - mut_url - .set_password(None) - .map_err(|_| anyhow!("unable to clear password on websocket"))?; - - // keep them - Some(Authorization::basic(user, pass)) - } else { - None - }; - - let provider = if url.scheme().starts_with("http") { - let provider = if let Some(auth) = auth { - ethers::providers::Http::new_with_auth(url.into_owned(), auth)? - } else if let Some(http_client) = http_client { - ethers::providers::Http::new_with_client(url.into_owned(), http_client) - } else { - ethers::providers::Http::new(url.into_owned()) - }; - - // TODO: i don't think this interval matters for our uses, but we should probably set it to like `block time / 2` - ethers::providers::Provider::new(provider) - .interval(Duration::from_secs(12)) - .into() - } else if url.scheme().starts_with("ws") { - let provider = if auth.is_some() { - let connection_details = ConnectionDetails::new(url.as_str(), auth); - - ethers::providers::Ws::connect(connection_details).await? - } else { - ethers::providers::Ws::connect(url.as_str()).await? - }; - - // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) - // TODO: i don't think this interval matters - ethers::providers::Provider::new(provider).into() - } else { - return Err(anyhow::anyhow!("only http and ws servers are supported")); - }; - - Ok(provider) + // keep them + Some(Authorization::basic(user, pass)) + } else { + None } } + +/// Note, if the http url has an authority the http_client param is ignored and a dedicated http_client will be used +/// TODO: take a reqwest::Client or a reqwest::ClientBuilder. that way we can do things like set compression even when auth is set +pub fn connect_http( + mut url: Url, + http_client: Option, + interval: Duration, +) -> anyhow::Result { + let auth = extract_auth(&mut url); + + let mut provider = if url.scheme().starts_with("http") { + let provider = if let Some(auth) = auth { + ethers::providers::Http::new_with_auth(url, auth)? + } else if let Some(http_client) = http_client { + ethers::providers::Http::new_with_client(url, http_client) + } else { + ethers::providers::Http::new(url) + }; + + // TODO: i don't think this interval matters for our uses, but we should probably set it to like `block time / 2` + ethers::providers::Provider::new(provider).interval(Duration::from_secs(2)) + } else { + return Err(anyhow::anyhow!( + "only http servers are supported. cannot use {}", + url + )); + }; + + provider.set_interval(interval); + + Ok(provider) +} + +pub async fn connect_ws(mut url: Url, reconnects: usize) -> anyhow::Result { + let auth = extract_auth(&mut url); + + let provider = if url.scheme().starts_with("ws") { + let provider = if auth.is_some() { + let connection_details = ConnectionDetails::new(url.as_str(), auth); + + // if they error, we do our own reconnection with backoff + ethers::providers::Ws::connect_with_reconnects(connection_details, reconnects).await? + } else { + ethers::providers::Ws::connect_with_reconnects(url.as_str(), reconnects).await? + }; + + // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) + // TODO: i don't think this interval matters + ethers::providers::Provider::new(provider) + } else { + return Err(anyhow::anyhow!("ws servers are supported")); + }; + + Ok(provider) +} diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index c50dd202..7a8a0003 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -1,6 +1,6 @@ use super::one::Web3Rpc; -use super::provider::Web3Provider; use crate::frontend::authorization::Authorization; +use crate::frontend::errors::Web3ProxyResult; use anyhow::Context; use chrono::Utc; use entities::revert_log; @@ -14,7 +14,7 @@ use std::fmt; use std::sync::atomic; use std::sync::Arc; use thread_fast_rng::rand::Rng; -use tokio::time::{sleep, Duration, Instant}; +use tokio::time::{Duration, Instant}; #[derive(Debug)] pub enum OpenRequestResult { @@ -76,7 +76,7 @@ impl Authorization { self: Arc, method: Method, params: EthCallFirstParams, - ) -> anyhow::Result<()> { + ) -> Web3ProxyResult<()> { let rpc_key_id = match self.checks.rpc_secret_key_id { Some(rpc_key_id) => rpc_key_id.into(), None => { @@ -158,7 +158,6 @@ impl OpenRequestHandle { method: &str, params: &P, mut error_handler: RequestErrorHandler, - unlocked_provider: Option>, ) -> Result where // TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it @@ -170,29 +169,6 @@ impl OpenRequestHandle { // trace!(rpc=%self.rpc, %method, "request"); trace!("requesting from {}", self.rpc); - let mut provider = if unlocked_provider.is_some() { - unlocked_provider - } else { - self.rpc.provider.read().await.clone() - }; - - let mut logged = false; - // TODO: instead of a lock, i guess it should be a watch? - while provider.is_none() { - // trace!("waiting on provider: locking..."); - // TODO: i dont like this. subscribing to a channel could be better - sleep(Duration::from_millis(100)).await; - - if !logged { - debug!("no provider for open handle on {}", self.rpc); - logged = true; - } - - provider = self.rpc.provider.read().await.clone(); - } - - let provider = provider.expect("provider was checked already"); - self.rpc .total_requests .fetch_add(1, std::sync::atomic::Ordering::AcqRel); @@ -202,21 +178,18 @@ impl OpenRequestHandle { let start = Instant::now(); // TODO: replace ethers-rs providers with our own that supports streaming the responses - let response = match provider.as_ref() { - #[cfg(test)] - Web3Provider::Mock => { - return Err(ProviderError::CustomError( - "mock provider can't respond".to_string(), - )) - } - Web3Provider::Ws(p) => p.request(method, params).await, - Web3Provider::Http(p) | Web3Provider::Both(p, _) => { - // TODO: i keep hearing that http is faster. but ws has always been better for me. investigate more with actual benchmarks - p.request(method, params).await - } + // TODO: replace ethers-rs providers with our own that handles "id" being null + let response: Result = if let Some(ref p) = self.rpc.http_provider { + p.request(method, params).await + } else if let Some(ref p) = self.rpc.ws_provider { + p.request(method, params).await + } else { + return Err(ProviderError::CustomError( + "no provider configured!".to_string(), + )); }; - // note. we intentionally do not record this latency now. we do NOT want to measure errors + // we do NOT want to measure errors, so we intentionally do not record this latency now. let latency = start.elapsed(); // we used to fetch_sub the active_request count here, but sometimes the handle is dropped without request being called! @@ -277,11 +250,7 @@ impl OpenRequestHandle { // TODO: move this info a function on ResponseErrorType let response_type = if let ProviderError::JsonRpcClientError(err) = err { // Http and Ws errors are very similar, but different types - let msg = match &*provider { - #[cfg(test)] - Web3Provider::Mock => unimplemented!(), - _ => err.as_error_response().map(|x| x.message.clone()), - }; + let msg = err.as_error_response().map(|x| x.message.clone()); trace!("error message: {:?}", msg); @@ -390,9 +359,7 @@ impl OpenRequestHandle { } } } else if let Some(peak_latency) = &self.rpc.peak_latency { - // trace!("updating peak_latency: {}", latency.as_secs_f64()); - // peak_latency.report(latency); - trace!("peak latency disabled for now"); + peak_latency.report(latency); } else { unreachable!("peak_latency not initialized"); } diff --git a/web3_proxy/src/rpcs/transactions.rs b/web3_proxy/src/rpcs/transactions.rs index d8c007ee..c9602e6c 100644 --- a/web3_proxy/src/rpcs/transactions.rs +++ b/web3_proxy/src/rpcs/transactions.rs @@ -1,4 +1,4 @@ -use crate::frontend::authorization::Authorization; +use crate::frontend::{authorization::Authorization, errors::Web3ProxyResult}; use super::many::Web3Rpcs; ///! Load balanced communication with a group of web3 providers @@ -29,14 +29,13 @@ impl Web3Rpcs { // TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself // TODO: if one rpc fails, try another? // TODO: try_request_handle, or wait_for_request_handle? I think we want wait here - let tx: Transaction = match rpc.try_request_handle(authorization, None).await { + let tx: Transaction = match rpc.try_request_handle(authorization).await { Ok(OpenRequestResult::Handle(handle)) => { handle .request( "eth_getTransactionByHash", &(pending_tx_id,), Level::Error.into(), - None, ) .await? } @@ -71,7 +70,7 @@ impl Web3Rpcs { rpc: Arc, pending_tx_id: TxHash, pending_tx_sender: broadcast::Sender, - ) -> anyhow::Result<()> { + ) -> Web3ProxyResult<()> { // TODO: how many retries? until some timestamp is hit is probably better. maybe just loop and call this with a timeout // TODO: after more investigation, i don't think retries will help. i think this is because chains of transactions get dropped from memory // TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it? diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index deb8563d..0aedb115 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -8,9 +8,9 @@ pub use stat_buffer::{SpawnedStatBuffer, StatBuffer}; use crate::app::RpcSecretKeyCache; use crate::frontend::authorization::{Authorization, RequestMetadata}; -use crate::frontend::errors::Web3ProxyError; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use crate::rpcs::one::Web3Rpc; -use anyhow::Context; +use anyhow::{anyhow, Context}; use axum::headers::Origin; use chrono::{DateTime, Months, TimeZone, Utc}; use derive_more::From; @@ -229,13 +229,14 @@ impl BufferedRpcQueryStats { db_conn: &DatabaseConnection, key: RpcQueryKey, rpc_secret_key_cache: Option<&RpcSecretKeyCache>, - ) -> anyhow::Result<()> { - anyhow::ensure!( - key.response_timestamp > 0, - "no response_timestamp! This is a bug! {:?} {:?}", - key, - self - ); + ) -> Web3ProxyResult<()> { + if key.response_timestamp == 0 { + return Err(Web3ProxyError::Anyhow(anyhow!( + "no response_timestamp! This is a bug! {:?} {:?}", + key, + self + ))); + } let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap(); @@ -670,8 +671,9 @@ impl RpcQueryStats { method: Option<&str>, ) -> Decimal { // for now, always return 0 for cost - return 0.into(); + 0.into() + /* // some methods should be free. there might be cases where method isn't set (though they should be uncommon) // TODO: get this list from config (and add more to it) if let Some(method) = method.as_ref() { @@ -704,5 +706,6 @@ impl RpcQueryStats { } cost + */ } } diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index f8fd2db8..c7028204 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -1,5 +1,6 @@ use super::{AppStat, RpcQueryKey}; -use crate::app::RpcSecretKeyCache; +use crate::app::{RpcSecretKeyCache, Web3ProxyJoinHandle}; +use crate::frontend::errors::Web3ProxyResult; use derive_more::From; use futures::stream; use hashbrown::HashMap; @@ -9,7 +10,6 @@ use migration::sea_orm::prelude::Decimal; use migration::sea_orm::DatabaseConnection; use std::time::Duration; use tokio::sync::broadcast; -use tokio::task::JoinHandle; use tokio::time::interval; #[derive(Debug, Default)] @@ -32,7 +32,7 @@ pub struct BufferedRpcQueryStats { pub struct SpawnedStatBuffer { pub stat_sender: flume::Sender, /// these handles are important and must be allowed to finish - pub background_handle: JoinHandle>, + pub background_handle: Web3ProxyJoinHandle<()>, } pub struct StatBuffer { accounting_db_buffer: HashMap, @@ -96,7 +96,7 @@ impl StatBuffer { bucket: String, stat_receiver: flume::Receiver, mut shutdown_receiver: broadcast::Receiver<()>, - ) -> anyhow::Result<()> { + ) -> Web3ProxyResult<()> { let mut tsdb_save_interval = interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64)); let mut db_save_interval =