From c443f766235eb1b86e169a0990f204ab95b9c557 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 18 May 2023 22:43:07 -0700 Subject: [PATCH] refactor tiers again --- Cargo.lock | 122 +++---- web3_proxy/Cargo.toml | 2 +- web3_proxy/src/frontend/users/payment.rs | 6 +- web3_proxy/src/frontend/users/referral.rs | 28 +- web3_proxy/src/referral_code.rs | 10 +- web3_proxy/src/rpcs/blockchain.rs | 10 +- web3_proxy/src/rpcs/consensus.rs | 2 +- web3_proxy/src/rpcs/many.rs | 402 +++++++++++++--------- 8 files changed, 317 insertions(+), 265 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7d73a5b1..95aec00b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -183,7 +183,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.16", ] [[package]] @@ -194,7 +194,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" dependencies = [ "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.16", ] [[package]] @@ -328,7 +328,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.16", ] [[package]] @@ -1172,7 +1172,7 @@ dependencies = [ "proc-macro2", "quote", "scratch", - "syn 2.0.15", + "syn 2.0.16", ] [[package]] @@ -1189,7 +1189,7 @@ checksum = "2345488264226bf682893e25de0769f3360aac9957980ec49361b083ddaa5bc5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.16", ] [[package]] @@ -1706,7 +1706,7 @@ dependencies = [ "reqwest", "serde", "serde_json", - "syn 2.0.15", + "syn 2.0.16", "tokio", "toml 0.7.4", "url", @@ -1726,7 +1726,7 @@ dependencies = [ "proc-macro2", "quote", "serde_json", - "syn 2.0.15", + "syn 2.0.16", ] [[package]] @@ -1753,7 +1753,7 @@ dependencies = [ "serde", "serde_json", "strum", - "syn 2.0.15", + "syn 2.0.16", "tempfile", "thiserror", "tiny-keccak", @@ -2156,7 +2156,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.16", ] [[package]] @@ -2296,9 +2296,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.18" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f8a914c2987b688368b5138aa05321db91f4090cf26118185672ad588bce21" +checksum = "d357c7ae988e7d2182f7d7871d0b963962420b0678b0997ce7de72001aeab782" dependencies = [ "bytes", "fnv", @@ -2862,9 +2862,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.62" +version = "0.3.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68c16e1bfd491478ab155fd8b4896b86f9ede344949b641e61501e07c2b8b4d5" +checksum = "2f37a4a5928311ac501dee68b3c7613a1037d0edb30c8e5427bd832d55d1b790" dependencies = [ "wasm-bindgen", ] @@ -3380,7 +3380,7 @@ dependencies = [ "proc-macro-crate 1.3.1", "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.16", ] [[package]] @@ -3458,7 +3458,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.16", ] [[package]] @@ -3739,7 +3739,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.16", ] [[package]] @@ -3923,7 +3923,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ceca8aaf45b5c46ec7ed39fff75f57290368c1846d33d24a122ca81416ab058" dependencies = [ "proc-macro2", - "syn 2.0.15", + "syn 2.0.16", ] [[package]] @@ -4005,9 +4005,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.56" +version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b63bdb0cd06f1f4dedf69b254734f9b45af66e4a031e42a7480257d9898b435" +checksum = "fa1fb82fc0c281dd9671101b66b771ebbe1eaf967b96ac8740dcba4b70005ca8" dependencies = [ "unicode-ident", ] @@ -4899,9 +4899,9 @@ dependencies = [ [[package]] name = "security-framework" -version = "2.8.2" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a332be01508d814fed64bf28f798a146d73792121129962fdf335bb3c49a4254" +checksum = "ca2855b3715770894e67cbfa3df957790aa0c9edc3bf06efa1a84d77fa0839d1" dependencies = [ "bitflags", "core-foundation", @@ -4912,9 +4912,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.8.0" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31c9bb296072e961fcbd8853511dd39c2d8be2deb1e17c6860b1d30732b323b4" +checksum = "f51d0c0d83bec45f16480d0ce0058397a69e48fcdc52d1dc8855fb68acbd31a7" dependencies = [ "core-foundation-sys", "libc", @@ -4943,9 +4943,9 @@ checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" [[package]] name = "sentry" -version = "0.31.0" +version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c3d7f8bf7373e75222452fcdd9347d857452a92d0eec738f941bc4656c5b5df" +checksum = "37dd6c0cdca6b1d1ca44cde7fff289f2592a97965afec870faa7b81b9fc87745" dependencies = [ "httpdate", "reqwest", @@ -4963,9 +4963,9 @@ dependencies = [ [[package]] name = "sentry-anyhow" -version = "0.31.0" +version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ef7f47c57a1146d553b4976f20e8bba370195a88858bdf6945a63c529549236" +checksum = "d9c3d7032fff178c77c107c32c6d3337b12847adf67165ccc876c898e7154b00" dependencies = [ "anyhow", "sentry-backtrace", @@ -4974,9 +4974,9 @@ dependencies = [ [[package]] name = "sentry-backtrace" -version = "0.31.0" +version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03b7cdefbdca51f1146f0f24a3cb4ecb6428951f030ff5c720cfb5c60bd174c0" +checksum = "c029fe8317cdd75cb2b52c600bab4e2ef64c552198e669ba874340447f330962" dependencies = [ "backtrace", "once_cell", @@ -4986,9 +4986,9 @@ dependencies = [ [[package]] name = "sentry-contexts" -version = "0.31.0" +version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6af4cb29066e0e8df0cc3111211eb93543ccb09e1ccbe71de6d88b4bb459a2b1" +checksum = "bc575098d73c8b942b589ab453b06e4c43527556dd8f95532220d1b54d7c6b4b" dependencies = [ "hostname", "libc", @@ -5000,9 +5000,9 @@ dependencies = [ [[package]] name = "sentry-core" -version = "0.31.0" +version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e781b55761e47a60d1ff326ae8059de22b0e6b0cee68eab1c5912e4fb199a76" +checksum = "20216140001bbf05895f013abd0dae4df58faee24e016d54cbf107f070bac56b" dependencies = [ "once_cell", "rand", @@ -5013,9 +5013,9 @@ dependencies = [ [[package]] name = "sentry-log" -version = "0.31.0" +version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee978a28d4cb15c89618dc035ab56ea18fe82d5a248ddcd771d153876ccbdf56" +checksum = "a43934e48e9c8e2c7d0dcb9c6cbcfcbe3ee109a14fc0c821e8944acd4faa2c25" dependencies = [ "log", "sentry-core", @@ -5023,9 +5023,9 @@ dependencies = [ [[package]] name = "sentry-panic" -version = "0.31.0" +version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e0b877981990d9e84ae6916df61993d188fdf76afb59521f0aeaf9b8e6d26d0" +checksum = "4e45cd0a113fc06d6edba01732010518816cdc8ce3bccc70f5e41570046bf046" dependencies = [ "sentry-backtrace", "sentry-core", @@ -5033,9 +5033,9 @@ dependencies = [ [[package]] name = "sentry-types" -version = "0.31.0" +version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d642a04657cc77d8de52ae7c6d93a15cb02284eb219344a89c1e2b26bbaf578c" +checksum = "d7f6959d8cb3a77be27e588eef6ce9a2a469651a556d9de662e4d07e5ace4232" dependencies = [ "debugid", "getrandom", @@ -5065,7 +5065,7 @@ checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.16", ] [[package]] @@ -5598,9 +5598,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.15" +version = "2.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a34fcf3e8b60f57e6a14301a2e916d323af98b0ea63c599441eec8558660c822" +checksum = "a6f671d4b5ffdb8eadec19c0ae67fe2639df8684bd7bc4b83d986b8db549cf01" dependencies = [ "proc-macro2", "quote", @@ -5685,7 +5685,7 @@ checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.16", ] [[package]] @@ -5847,7 +5847,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.16", ] [[package]] @@ -6079,7 +6079,7 @@ checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74" dependencies = [ "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.16", ] [[package]] @@ -6413,9 +6413,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.85" +version = "0.2.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b6cb788c4e39112fbe1822277ef6fb3c55cd86b95cb3d3c4c1c9597e4ac74b4" +checksum = "5bba0e8cb82ba49ff4e229459ff22a191bbe9a1cb3a341610c9c33efc27ddf73" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -6423,24 +6423,24 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.85" +version = "0.2.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35e522ed4105a9d626d885b35d62501b30d9666283a5c8be12c14a8bdafe7822" +checksum = "19b04bc93f9d6bdee709f6bd2118f57dd6679cf1176a1af464fca3ab0d66d8fb" dependencies = [ "bumpalo", "log", "once_cell", "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.16", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-futures" -version = "0.4.35" +version = "0.4.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "083abe15c5d88556b77bdf7aef403625be9e327ad37c62c4e4129af740168163" +checksum = "2d1985d03709c53167ce907ff394f5316aa22cb4e12761295c5dc57dacb6297e" dependencies = [ "cfg-if", "js-sys", @@ -6450,9 +6450,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.85" +version = "0.2.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "358a79a0cb89d21db8120cbfb91392335913e4890665b1a7981d9e956903b434" +checksum = "14d6b024f1a526bb0234f52840389927257beb670610081360e5a03c5df9c258" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -6460,22 +6460,22 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.85" +version = "0.2.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4783ce29f09b9d93134d41297aded3a712b7b979e9c6f28c32cb88c973a94869" +checksum = "e128beba882dd1eb6200e1dc92ae6c5dbaa4311aa7bb211ca035779e5efc39f8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.16", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.85" +version = "0.2.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a901d592cafaa4d711bc324edfaff879ac700b19c3dfd60058d2b445be2691eb" +checksum = "ed9d5b4305409d1fc9482fee2d7f9bcbf24b3972bf59817ef757e23982242a93" [[package]] name = "wasm-streams" @@ -6492,9 +6492,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.62" +version = "0.3.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16b5f940c7edfdc6d12126d98c9ef4d1b3d470011c47c76a6581df47ad9ba721" +checksum = "3bdd9ef4e984da1187bf8110c5cf5b845fbc87a23602cdf912386a76fcd3a7c2" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index a91eddb6..c51e9c8e 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -74,7 +74,7 @@ rdkafka = { version = "0.31.0" } regex = "1.8.1" reqwest = { version = "0.11.18", default-features = false, features = ["json", "tokio-rustls"] } rmp-serde = "1.1.1" -sentry = { version = "0.31.0", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] } +sentry = { version = "0.31.1", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] } serde = { version = "1.0.163", features = [] } serde_json = { version = "1.0.96", default-features = false, features = ["alloc", "raw_value"] } serde_prometheus = "0.2.2" diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index fb2f0d91..975fb3da 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -153,7 +153,7 @@ pub async fn user_balance_post( // Just make an rpc request, idk if i need to call this super extensive code let transaction_receipt: TransactionReceipt = match app .balanced_rpcs - .best_available_rpc(&authorization, None, &mut vec![], None, None) + .wait_for_best_rpc(&authorization, None, &mut vec![], None, None) .await { Ok(OpenRequestResult::Handle(handle)) => { @@ -188,7 +188,7 @@ pub async fn user_balance_post( debug!("Transaction receipt is: {:?}", transaction_receipt); let accepted_token: Address = match app .balanced_rpcs - .best_available_rpc(&authorization, None, &mut vec![], None, None) + .wait_for_best_rpc(&authorization, None, &mut vec![], None, None) .await { Ok(OpenRequestResult::Handle(handle)) => { @@ -243,7 +243,7 @@ pub async fn user_balance_post( debug!("Accepted token is: {:?}", accepted_token); let decimals: u32 = match app .balanced_rpcs - .best_available_rpc(&authorization, None, &mut vec![], None, None) + .wait_for_best_rpc(&authorization, None, &mut vec![], None, None) .await { Ok(OpenRequestResult::Handle(handle)) => { diff --git a/web3_proxy/src/frontend/users/referral.rs b/web3_proxy/src/frontend/users/referral.rs index c4f613ac..391fccc5 100644 --- a/web3_proxy/src/frontend/users/referral.rs +++ b/web3_proxy/src/frontend/users/referral.rs @@ -1,6 +1,6 @@ //! Handle registration, logins, and managing account data. use crate::app::Web3ProxyApp; -use crate::frontend::errors::{Web3ProxyError, Web3ProxyResponse}; +use crate::frontend::errors::Web3ProxyResponse; use crate::referral_code::ReferralCode; use anyhow::Context; use axum::{ @@ -10,10 +10,9 @@ use axum::{ Extension, Json, TypedHeader, }; use axum_macros::debug_handler; -use entities::{referrer, user_tier}; +use entities::referrer; use hashbrown::HashMap; use http::StatusCode; -use log::warn; use migration::sea_orm; use migration::sea_orm::ActiveModelTrait; use migration::sea_orm::ColumnTrait; @@ -38,20 +37,7 @@ pub async fn user_referral_link_get( .db_replica() .context("getting replica db for user's revert logs")?; - // Second, check if the user is a premium user - let user_tier = user_tier::Entity::find() - .filter(user_tier::Column::Id.eq(user.user_tier_id)) - .one(db_replica.conn()) - .await? - .ok_or(Web3ProxyError::UnknownKey)?; - - warn!("User tier is: {:?}", user_tier); - // TODO: This shouldn't be hardcoded. Also, it should be an enum, not sth like this ... - if user_tier.id != 6 { - return Err(Web3ProxyError::PaymentRequired); - } - - // Then get the referral token + // Then get the referral token. If one doesn't exist, create one let user_referrer = referrer::Entity::find() .filter(referrer::Column::UserId.eq(user.id)) .one(db_replica.conn()) @@ -60,18 +46,18 @@ pub async fn user_referral_link_get( let (referral_code, status_code) = match user_referrer { Some(x) => (x.referral_code, StatusCode::OK), None => { - // Connect to the database for mutable write + // Connect to the database for writes let db_conn = app.db_conn().context("getting db_conn")?; - let referral_code = ReferralCode::default().0; - // Log that this guy was referred by another guy - // Do not automatically create a new + let referral_code = ReferralCode::default().to_string(); + let referrer_entry = referrer::ActiveModel { user_id: sea_orm::ActiveValue::Set(user.id), referral_code: sea_orm::ActiveValue::Set(referral_code.clone()), ..Default::default() }; referrer_entry.save(&db_conn).await?; + (referral_code, StatusCode::CREATED) } }; diff --git a/web3_proxy/src/referral_code.rs b/web3_proxy/src/referral_code.rs index d5343e84..49b8b08c 100644 --- a/web3_proxy/src/referral_code.rs +++ b/web3_proxy/src/referral_code.rs @@ -1,7 +1,9 @@ +use std::fmt::Display; + use anyhow::{self, Result}; use ulid::Ulid; -pub struct ReferralCode(pub String); +pub struct ReferralCode(String); impl Default for ReferralCode { fn default() -> Self { @@ -10,6 +12,12 @@ impl Default for ReferralCode { } } +impl Display for ReferralCode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0) + } +} + impl TryFrom for ReferralCode { type Error = anyhow::Error; diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index ff2c08c4..9c181e0d 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -245,13 +245,7 @@ impl Web3Rpcs { // TODO: request_metadata? maybe we should put it in the authorization? // TODO: think more about this wait_for_sync let response = self - .try_send_best_consensus_head_connection( - authorization, - &request, - None, - None, - None, - ) + .try_send_best_connection(authorization, &request, None, None, None) .await?; let value = match response { @@ -346,7 +340,7 @@ impl Web3Rpcs { let request: JsonRpcRequest = serde_json::from_value(request)?; let response = self - .try_send_best_consensus_head_connection(authorization, &request, None, Some(num), None) + .try_send_best_connection(authorization, &request, None, Some(num), None) .await?; let value = match response { diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 2ca28519..b11fb794 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -190,7 +190,7 @@ impl ConsensusWeb3Rpcs { } // TODO: take method as a param, too. mark nodes with supported methods (maybe do it optimistically? on) - fn rpc_will_work_eventually( + pub fn rpc_will_work_eventually( &self, rpc: &Arc, needed_block_num: Option<&U64>, diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 5921f895..5f281bee 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -10,7 +10,6 @@ use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{JsonRpcErrorData, JsonRpcRequest}; use crate::response_cache::JsonRpcResponseData; -use crate::rpcs::consensus::{RankedRpcMap, RpcRanking}; use crate::rpcs::transactions::TxStatus; use anyhow::Context; use arc_swap::ArcSwap; @@ -32,7 +31,6 @@ use serde_json::json; use serde_json::value::RawValue; use std::borrow::Cow; use std::cmp::{min_by_key, Reverse}; -use std::collections::BTreeMap; use std::fmt::{self, Display}; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -491,141 +489,241 @@ impl Web3Rpcs { unimplemented!("this shouldn't be possible") } - pub async fn best_available_rpc( + async fn _best_available_rpc( + &self, + authorization: &Arc, + potential_rpcs: &[Arc], + skip: &mut Vec>, + ) -> OpenRequestResult { + let mut earliest_retry_at = None; + + for (rpc_a, rpc_b) in potential_rpcs.iter().circular_tuple_windows() { + trace!("{} vs {}", rpc_a, rpc_b); + // TODO: cached key to save a read lock + // TODO: ties to the server with the smallest block_data_limit + let faster_rpc = min_by_key(rpc_a, rpc_b, |x| x.peak_ewma()); + trace!("winner: {}", faster_rpc); + + // add to the skip list in case this one fails + skip.push(Arc::clone(faster_rpc)); + + // just because it has lower latency doesn't mean we are sure to get a connection. there might be rate limits + match faster_rpc.try_request_handle(authorization, None).await { + Ok(OpenRequestResult::Handle(handle)) => { + trace!("opened handle: {}", faster_rpc); + return OpenRequestResult::Handle(handle); + } + Ok(OpenRequestResult::RetryAt(retry_at)) => { + trace!( + "retry on {} @ {}", + faster_rpc, + retry_at.duration_since(Instant::now()).as_secs_f32() + ); + + if earliest_retry_at.is_none() { + earliest_retry_at = Some(retry_at); + } else { + earliest_retry_at = earliest_retry_at.min(Some(retry_at)); + } + } + Ok(OpenRequestResult::NotReady) => { + // TODO: log a warning? emit a stat? + trace!("best_rpc not ready: {}", faster_rpc); + } + Err(err) => { + trace!("No request handle for {}. err={:?}", faster_rpc, err) + } + } + } + + if let Some(retry_at) = earliest_retry_at { + OpenRequestResult::RetryAt(retry_at) + } else { + OpenRequestResult::NotReady + } + } + + pub async fn wait_for_best_rpc( &self, authorization: &Arc, request_metadata: Option<&Arc>, - skip: &mut Vec>, + skip_rpcs: &mut Vec>, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, ) -> Web3ProxyResult { - // TODO: use tracing and add this so logs are easy - let request_ulid = request_metadata.map(|x| &x.request_ulid); + let mut earliest_retry_at: Option = None; - let mut usable_rpcs_by_tier_and_head_number = { - let mut m: RankedRpcMap = BTreeMap::new(); + if self.watch_consensus_head_sender.is_none() { + trace!("this Web3Rpcs is not tracking head blocks. pick any server"); - if let Some(consensus_rpcs) = self.watch_consensus_rpcs_sender.borrow().as_ref() { - // first place is the blocks that are synced close to head. if those don't work. try all the rpcs. if those don't work, keep trying for a few seconds + let by_name = self.by_name.load(); - let head_block = &consensus_rpcs.head_block; + let mut potential_rpcs: Vec<_> = by_name + .values() + .filter(|rpc| !skip_rpcs.contains(rpc)) + .filter(|rpc| { + min_block_needed + .map(|x| rpc.has_block_data(x)) + .unwrap_or(true) + }) + .filter(|rpc| { + max_block_needed + .map(|x| rpc.has_block_data(x)) + .unwrap_or(true) + }) + .cloned() + .collect(); - let head_block_num = *head_block.number(); + potential_rpcs.shuffle(&mut thread_fast_rng::thread_fast_rng()); - let best_key = RpcRanking::new( - consensus_rpcs.tier, - consensus_rpcs.backups_needed, - Some(head_block_num), - ); - - // todo: for now, build the map m here. once that works, do as much of it as possible while building ConsensusWeb3Rpcs - for x in consensus_rpcs.head_rpcs.iter().filter(|rpc| { - consensus_rpcs.rpc_will_work_now(skip, min_block_needed, max_block_needed, rpc) - }) { - m.entry(best_key).or_insert_with(Vec::new).push(x.clone()); - } - - let tier_offset = consensus_rpcs.tier + 1; - - for (k, v) in consensus_rpcs.other_rpcs.iter() { - let v: Vec<_> = v - .iter() - .filter(|rpc| { - consensus_rpcs.rpc_will_work_now( - skip, - min_block_needed, - max_block_needed, - rpc, - ) - }) - .cloned() - .collect(); - - let offset_ranking = k.add_offset(tier_offset); - - m.entry(offset_ranking).or_insert_with(Vec::new).extend(v); - } - } else if self.watch_consensus_head_sender.is_none() { - trace!("this Web3Rpcs is not tracking head blocks. pick any server"); - - for x in self.by_name.load().values() { - if skip.contains(x) { - trace!("{:?} - already skipped. {}", request_ulid, x); - continue; + match self + ._best_available_rpc(authorization, &potential_rpcs, skip_rpcs) + .await + { + OpenRequestResult::Handle(x) => return Ok(OpenRequestResult::Handle(x)), + OpenRequestResult::NotReady => {} + OpenRequestResult::RetryAt(retry_at) => { + if earliest_retry_at.is_none() { + earliest_retry_at = Some(retry_at); + } else { + earliest_retry_at = earliest_retry_at.min(Some(retry_at)); } - - let key = RpcRanking::default_with_backup(x.backup); - - m.entry(key).or_insert_with(Vec::new).push(x.clone()); } } + } else { + let start = Instant::now(); - m - }; + // TODO: get from config or argument + let max_wait = Duration::from_secs(10); - trace!( - "{:?} - usable_rpcs_by_tier_and_head_number: {:#?}", - request_ulid, - usable_rpcs_by_tier_and_head_number - ); + let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe(); - let mut earliest_retry_at = None; + let mut potential_rpcs = Vec::with_capacity(self.by_name.load().len()); - for usable_rpcs in usable_rpcs_by_tier_and_head_number.values_mut() { - // sort the tier randomly - if usable_rpcs.len() == 1 { - // TODO: include an rpc from the next tier? - } else { - // we can't get the rng outside of this loop because it is not Send - // this function should be pretty fast anyway, so it shouldn't matter too much - let mut rng = thread_fast_rng::thread_fast_rng(); - usable_rpcs.shuffle(&mut rng); - }; + while start.elapsed() < max_wait { + let consensus_rpcs = watch_consensus_rpcs.borrow_and_update().clone(); - // now that the rpcs are shuffled, try to get an active request handle for one of them - // pick the first two and try the one with the lower rpc.latency.ewma - // TODO: chunks or tuple windows? - for (rpc_a, rpc_b) in usable_rpcs.iter().circular_tuple_windows() { - trace!("{:?} - {} vs {}", request_ulid, rpc_a, rpc_b); - // TODO: cached key to save a read lock - // TODO: ties to the server with the smallest block_data_limit - let best_rpc = min_by_key(rpc_a, rpc_b, |x| x.peak_ewma()); - trace!("{:?} - winner: {}", request_ulid, best_rpc); + potential_rpcs.clear(); - skip.push(best_rpc.clone()); + // first check everything that is synced + // even though we might be querying an old block that an unsynced server can handle, + // it is best to not send queries to a syncing server. that slows down sync and can bloat erigon's disk usage. + if let Some(consensus_rpcs) = consensus_rpcs { + potential_rpcs.extend( + consensus_rpcs + .head_rpcs + .iter() + .filter(|rpc| { + consensus_rpcs.rpc_will_work_now( + skip_rpcs, + min_block_needed, + max_block_needed, + rpc, + ) + }) + .cloned(), + ); - // just because it has lower latency doesn't mean we are sure to get a connection - match best_rpc.try_request_handle(authorization, None).await { - Ok(OpenRequestResult::Handle(handle)) => { - trace!("{:?} - opened handle: {}", request_ulid, best_rpc); - return Ok(OpenRequestResult::Handle(handle)); + potential_rpcs.shuffle(&mut thread_fast_rng::thread_fast_rng()); + + if potential_rpcs.len() >= self.min_head_rpcs { + // we have enough potential rpcs. try to load balance + + match self + ._best_available_rpc(authorization, &potential_rpcs, skip_rpcs) + .await + { + OpenRequestResult::Handle(x) => { + return Ok(OpenRequestResult::Handle(x)) + } + OpenRequestResult::NotReady => {} + OpenRequestResult::RetryAt(retry_at) => { + if earliest_retry_at.is_none() { + earliest_retry_at = Some(retry_at); + } else { + earliest_retry_at = earliest_retry_at.min(Some(retry_at)); + } + } + } + + // these rpcs were tried. don't try them again + potential_rpcs.clear(); } - Ok(OpenRequestResult::RetryAt(retry_at)) => { - trace!( - "{:?} - retry on {} @ {}", - request_ulid, - best_rpc, - retry_at.duration_since(Instant::now()).as_secs_f32() - ); - if earliest_retry_at.is_none() { - earliest_retry_at = Some(retry_at); - } else { - earliest_retry_at = earliest_retry_at.min(Some(retry_at)); + for next_rpcs in consensus_rpcs.other_rpcs.values() { + // we have to collect in order to shuffle + let mut more_rpcs: Vec<_> = next_rpcs + .iter() + .filter(|rpc| { + consensus_rpcs.rpc_will_work_now( + skip_rpcs, + min_block_needed, + max_block_needed, + rpc, + ) + }) + .cloned() + .collect(); + + // shuffle only the new entries. that way the highest tier still gets preference + more_rpcs.shuffle(&mut thread_fast_rng::thread_fast_rng()); + + potential_rpcs.extend(more_rpcs.into_iter()); + + if potential_rpcs.len() >= self.min_head_rpcs { + // we have enough potential rpcs. try to load balance + match self + ._best_available_rpc(authorization, &potential_rpcs, skip_rpcs) + .await + { + OpenRequestResult::Handle(x) => { + return Ok(OpenRequestResult::Handle(x)) + } + OpenRequestResult::NotReady => {} + OpenRequestResult::RetryAt(retry_at) => { + if earliest_retry_at.is_none() { + earliest_retry_at = Some(retry_at); + } else { + earliest_retry_at = earliest_retry_at.min(Some(retry_at)); + } + } + } + + // these rpcs were tried. don't try them again + potential_rpcs.clear(); } } - Ok(OpenRequestResult::NotReady) => { - // TODO: log a warning? emit a stat? - trace!("{:?} - best_rpc not ready: {}", request_ulid, best_rpc); + + if !potential_rpcs.is_empty() { + // even after scanning all the tiers, there are not enough rpcs that can serve this request. try anyways + match self + ._best_available_rpc(authorization, &potential_rpcs, skip_rpcs) + .await + { + OpenRequestResult::Handle(x) => { + return Ok(OpenRequestResult::Handle(x)) + } + OpenRequestResult::NotReady => {} + OpenRequestResult::RetryAt(retry_at) => { + if earliest_retry_at.is_none() { + earliest_retry_at = Some(retry_at); + } else { + earliest_retry_at = earliest_retry_at.min(Some(retry_at)); + } + } + } } - Err(err) => { - trace!( - "{:?} - No request handle for {}. err={:?}", - request_ulid, - best_rpc, - err - ) + + let waiting_for = min_block_needed.max(max_block_needed); + + match consensus_rpcs.should_wait_for_block(waiting_for, skip_rpcs) { + ShouldWaitForBlock::NeverReady => break, + ShouldWaitForBlock::Ready => continue, + ShouldWaitForBlock::Wait { .. } => {} } + + // TODO: select on consensus_rpcs changing and on earliest_retry_at + watch_consensus_rpcs.changed().await?; } } } @@ -634,41 +732,20 @@ impl Web3Rpcs { request_metadata.no_servers.fetch_add(1, Ordering::AcqRel); } - match earliest_retry_at { - None => { - // none of the servers gave us a time to retry at - debug!( - "{:?} - no servers in {} gave a retry time! Skipped {:?}. {:#?}", - request_ulid, self, skip, usable_rpcs_by_tier_and_head_number - ); + if let Some(retry_at) = earliest_retry_at { + // TODO: log the server that retry_at came from + warn!( + "no servers in {} ready! Skipped {:?}. Retry in {:?}s", + self, + skip_rpcs, + retry_at.duration_since(Instant::now()).as_secs_f32() + ); - // TODO: bring this back? need to think about how to do this with `allow_backups` - // we could return an error here, but maybe waiting a second will fix the problem - // TODO: configurable max wait? the whole max request time, or just some portion? - // let handle = sorted_rpcs - // .get(0) - // .expect("at least 1 is available") - // .wait_for_request_handle(authorization, Duration::from_secs(3), false) - // .await?; - // Ok(OpenRequestResult::Handle(handle)) + Ok(OpenRequestResult::RetryAt(retry_at)) + } else { + warn!("no servers in {} ready! Skipped {:?}", self, skip_rpcs); - Ok(OpenRequestResult::NotReady) - } - Some(earliest_retry_at) => { - // TODO: log the server that retry_at came from - // TODO: `self` doesn't log well. get a pretty name for this group of servers - warn!( - "{:?} - no servers in {} ready! Skipped {:?}. Retry in {:?}s", - request_ulid, - self, - skip, - earliest_retry_at - .duration_since(Instant::now()) - .as_secs_f32() - ); - - Ok(OpenRequestResult::RetryAt(earliest_retry_at)) - } + Ok(OpenRequestResult::NotReady) } } @@ -784,7 +861,7 @@ impl Web3Rpcs { /// be sure there is a timeout on this or it might loop forever /// TODO: think more about wait_for_sync - pub async fn try_send_best_consensus_head_connection( + pub async fn try_send_best_connection( &self, authorization: &Arc, request: &JsonRpcRequest, @@ -802,9 +879,10 @@ impl Web3Rpcs { // TODO: get from config let max_wait = Duration::from_secs(10); + // TODO: the loop here feels somewhat redundant with the loop in best_available_rpc while start.elapsed() < max_wait { match self - .best_available_rpc( + .wait_for_best_rpc( authorization, request_metadata, &mut skip_rpcs, @@ -963,21 +1041,7 @@ impl Web3Rpcs { } } OpenRequestResult::NotReady => { - if let Some(request_metadata) = request_metadata { - request_metadata.no_servers.fetch_add(1, Ordering::AcqRel); - } - - let waiting_for = min_block_needed.max(max_block_needed); - - if let Some(consensus_rpcs) = watch_consensus_rpcs.borrow_and_update().as_ref() - { - match consensus_rpcs.should_wait_for_block(waiting_for, &skip_rpcs) { - ShouldWaitForBlock::NeverReady => break, - ShouldWaitForBlock::Ready => continue, - ShouldWaitForBlock::Wait { .. } => {} - } - } - watch_consensus_rpcs.changed().await?; + break; } } } @@ -1152,7 +1216,7 @@ impl Web3Rpcs { ) -> Web3ProxyResult { match authorization.checks.proxy_mode { ProxyMode::Debug | ProxyMode::Best => { - self.try_send_best_consensus_head_connection( + self.try_send_best_connection( authorization, request, request_metadata, @@ -1489,7 +1553,7 @@ mod tests { // best_synced_backend_connection which servers to be synced with the head block should not find any nodes let x = rpcs - .best_available_rpc( + .wait_for_best_rpc( &authorization, None, &mut vec![], @@ -1586,28 +1650,28 @@ mod tests { // TODO: make sure the handle is for the expected rpc assert!(matches!( - rpcs.best_available_rpc(&authorization, None, &mut vec![], None, None) + rpcs.wait_for_best_rpc(&authorization, None, &mut vec![], None, None) .await, Ok(OpenRequestResult::Handle(_)) )); // TODO: make sure the handle is for the expected rpc assert!(matches!( - rpcs.best_available_rpc(&authorization, None, &mut vec![], Some(&0.into()), None) + rpcs.wait_for_best_rpc(&authorization, None, &mut vec![], Some(&0.into()), None) .await, Ok(OpenRequestResult::Handle(_)) )); // TODO: make sure the handle is for the expected rpc assert!(matches!( - rpcs.best_available_rpc(&authorization, None, &mut vec![], Some(&1.into()), None) + rpcs.wait_for_best_rpc(&authorization, None, &mut vec![], Some(&1.into()), None) .await, Ok(OpenRequestResult::Handle(_)) )); // future block should not get a handle let future_rpc = rpcs - .best_available_rpc(&authorization, None, &mut vec![], Some(&2.into()), None) + .wait_for_best_rpc(&authorization, None, &mut vec![], Some(&2.into()), None) .await; assert!(matches!(future_rpc, Ok(OpenRequestResult::NotReady))); } @@ -1732,7 +1796,7 @@ mod tests { // best_synced_backend_connection requires servers to be synced with the head block // TODO: test with and without passing the head_block.number? let best_available_server = rpcs - .best_available_rpc( + .wait_for_best_rpc( &authorization, None, &mut vec![], @@ -1749,13 +1813,13 @@ mod tests { )); let _best_available_server_from_none = rpcs - .best_available_rpc(&authorization, None, &mut vec![], None, None) + .wait_for_best_rpc(&authorization, None, &mut vec![], None, None) .await; // assert_eq!(best_available_server, best_available_server_from_none); let best_archive_server = rpcs - .best_available_rpc(&authorization, None, &mut vec![], Some(&1.into()), None) + .wait_for_best_rpc(&authorization, None, &mut vec![], Some(&1.into()), None) .await; match best_archive_server {