From b2016190e1dc8ea487a506b7d7c842148ebef61b Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 6 Mar 2023 04:39:35 +0000 Subject: [PATCH 01/25] change_admin_status script fixes --- ...admin_status.rs => change_admin_status.rs} | 17 +++---- web3_proxy/src/bin/web3_proxy_cli/main.rs | 46 +++++++++---------- 2 files changed, 31 insertions(+), 32 deletions(-) rename web3_proxy/src/bin/web3_proxy_cli/{change_user_admin_status.rs => change_admin_status.rs} (82%) diff --git a/web3_proxy/src/bin/web3_proxy_cli/change_user_admin_status.rs b/web3_proxy/src/bin/web3_proxy_cli/change_admin_status.rs similarity index 82% rename from web3_proxy/src/bin/web3_proxy_cli/change_user_admin_status.rs rename to web3_proxy/src/bin/web3_proxy_cli/change_admin_status.rs index 82e1495f..4c5cf683 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/change_user_admin_status.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/change_admin_status.rs @@ -2,7 +2,7 @@ use anyhow::Context; use argh::FromArgs; use entities::{admin, login, user}; use ethers::types::Address; -use log::debug; +use log::{debug, info}; use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, ModelTrait, QueryFilter, }; @@ -10,7 +10,7 @@ use migration::sea_orm::{ /// change a user's admin status. eiter they are an admin, or they aren't #[derive(FromArgs, PartialEq, Eq, Debug)] #[argh(subcommand, name = "change_admin_status")] -pub struct ChangeUserAdminStatusSubCommand { +pub struct ChangeAdminStatusSubCommand { /// the address of the user whose admin status you want to modify #[argh(positional)] address: String, @@ -20,7 +20,7 @@ pub struct ChangeUserAdminStatusSubCommand { should_be_admin: bool, } -impl ChangeUserAdminStatusSubCommand { +impl ChangeAdminStatusSubCommand { pub async fn main(self, db_conn: &DatabaseConnection) -> anyhow::Result<()> { let address: Address = self.address.parse()?; let should_be_admin: bool = self.should_be_admin; @@ -34,17 +34,18 @@ impl ChangeUserAdminStatusSubCommand { .await? .context(format!("No user with this id found {:?}", address))?; - debug!("user: {:#?}", user); + debug!("user: {}", serde_json::to_string_pretty(&user)?); // Check if there is a record in the database match admin::Entity::find() - .filter(admin::Column::UserId.eq(address)) + .filter(admin::Column::UserId.eq(user.id)) .one(db_conn) .await? { Some(old_admin) if !should_be_admin => { // User is already an admin, but shouldn't be old_admin.delete(db_conn).await?; + info!("revoked admin status"); } None if should_be_admin => { // User is not an admin yet, but should be @@ -53,11 +54,11 @@ impl ChangeUserAdminStatusSubCommand { ..Default::default() }; new_admin.insert(db_conn).await?; + info!("granted admin status"); } _ => { - // Do nothing in this case - debug!("no change needed for: {:#?}", user); - // Early return + info!("no change needed for: {:#?}", user); + // Since no change happened, we do not want to delete active logins. Return now. return Ok(()); } } diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index 99b8c042..14abe3cb 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -1,5 +1,5 @@ +mod change_admin_status; mod change_user_address; -mod change_user_admin_status; mod change_user_tier; mod change_user_tier_by_address; mod change_user_tier_by_key; @@ -67,8 +67,8 @@ pub struct Web3ProxyCli { #[derive(FromArgs, PartialEq, Debug)] #[argh(subcommand)] enum SubCommand { + ChangeAdminStatus(change_admin_status::ChangeAdminStatusSubCommand), ChangeUserAddress(change_user_address::ChangeUserAddressSubCommand), - ChangeUserAdminStatus(change_user_admin_status::ChangeUserAdminStatusSubCommand), ChangeUserTier(change_user_tier::ChangeUserTierSubCommand), ChangeUserTierByAddress(change_user_tier_by_address::ChangeUserTierByAddressSubCommand), ChangeUserTierByKey(change_user_tier_by_key::ChangeUserTierByKeySubCommand), @@ -278,19 +278,19 @@ fn main() -> anyhow::Result<()> { rt.block_on(async { match cli_config.sub_command { - SubCommand::ChangeUserAddress(x) => { - let db_url = cli_config - .db_url - .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + SubCommand::ChangeAdminStatus(x) => { + let db_url = cli_config.db_url.expect( + "'--config' (with a db) or '--db-url' is required to run change_admin_status", + ); let db_conn = get_db(db_url, 1, 1).await?; x.main(&db_conn).await } - SubCommand::ChangeUserAdminStatus(x) => { + SubCommand::ChangeUserAddress(x) => { let db_url = cli_config .db_url - .expect("'--config' (with a db) or '--db-url' is required to run change_user_admin_status"); + .expect("'--config' (with a db) or '--db-url' is required to run change_user_addres"); let db_conn = get_db(db_url, 1, 1).await?; @@ -299,16 +299,16 @@ fn main() -> anyhow::Result<()> { SubCommand::ChangeUserTier(x) => { let db_url = cli_config .db_url - .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + .expect("'--config' (with a db) or '--db-url' is required to run change_user_tier"); let db_conn = get_db(db_url, 1, 1).await?; x.main(&db_conn).await } SubCommand::ChangeUserTierByAddress(x) => { - let db_url = cli_config - .db_url - .expect("'--config' (with a db) or '--db-url' is required to run change_user_admin_status"); + let db_url = cli_config.db_url.expect( + "'--config' (with a db) or '--db-url' is required to run change_user_tier_by_address", + ); let db_conn = get_db(db_url, 1, 1).await?; @@ -317,7 +317,7 @@ fn main() -> anyhow::Result<()> { SubCommand::ChangeUserTierByKey(x) => { let db_url = cli_config .db_url - .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + .expect("'--config' (with a db) or '--db-url' is required to run change_user_tier_by_key"); let db_conn = get_db(db_url, 1, 1).await?; @@ -336,7 +336,7 @@ fn main() -> anyhow::Result<()> { SubCommand::CreateUser(x) => { let db_url = cli_config .db_url - .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + .expect("'--config' (with a db) or '--db-url' is required to run create_user"); let db_conn = get_migrated_db(db_url, 1, 1).await?; @@ -345,7 +345,7 @@ fn main() -> anyhow::Result<()> { SubCommand::CountUsers(x) => { let db_url = cli_config .db_url - .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + .expect("'--config' (with a db) or '--db-url' is required to run count_users"); let db_conn = get_db(db_url, 1, 1).await?; @@ -361,9 +361,9 @@ fn main() -> anyhow::Result<()> { SubCommand::DropMigrationLock(x) => { let db_url = cli_config .db_url - .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + .expect("'--config' (with a db) or '--db-url' is required to run drop_migration_lock"); - // very intentionally, do NOT run migrations here + // very intentionally, do NOT run migrations here. that would wait forever if the migration lock is abandoned let db_conn = get_db(db_url, 1, 1).await?; x.main(&db_conn).await @@ -376,9 +376,7 @@ fn main() -> anyhow::Result<()> { x.main(pagerduty_async, top_config).await } SubCommand::PopularityContest(x) => x.main().await, - SubCommand::SearchKafka(x) => { - x.main(top_config.unwrap()).await - }, + SubCommand::SearchKafka(x) => x.main(top_config.unwrap()).await, SubCommand::Sentryd(x) => { if cli_config.sentry_url.is_none() { warn!("sentry_url is not set! Logs will only show in this console"); @@ -389,7 +387,7 @@ fn main() -> anyhow::Result<()> { SubCommand::RpcAccounting(x) => { let db_url = cli_config .db_url - .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + .expect("'--config' (with a db) or '--db-url' is required to run rpc_accounting"); let db_conn = get_migrated_db(db_url, 1, 1).await?; @@ -398,7 +396,7 @@ fn main() -> anyhow::Result<()> { SubCommand::TransferKey(x) => { let db_url = cli_config .db_url - .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + .expect("'--config' (with a db) or '--db-url' is required to run transfer_key"); let db_conn = get_db(db_url, 1, 1).await?; x.main(&db_conn).await @@ -406,7 +404,7 @@ fn main() -> anyhow::Result<()> { SubCommand::UserExport(x) => { let db_url = cli_config .db_url - .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + .expect("'--config' (with a db) or '--db-url' is required to run user_export"); let db_conn = get_migrated_db(db_url, 1, 1).await?; @@ -415,7 +413,7 @@ fn main() -> anyhow::Result<()> { SubCommand::UserImport(x) => { let db_url = cli_config .db_url - .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + .expect("'--config' (with a db) or '--db-url' is required to run user_import"); let db_conn = get_migrated_db(db_url, 1, 1).await?; From 4cf52f9de31d6eded402310194cc408dbebbe460 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 4 Mar 2023 07:29:01 -0800 Subject: [PATCH 02/25] todos --- TODO.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/TODO.md b/TODO.md index 6b2ad470..9b10ae12 100644 --- a/TODO.md +++ b/TODO.md @@ -385,12 +385,11 @@ These are not yet ordered. There might be duplicates. We might not actually need - change premium concurrency limit to be against ip+rpckey - then sites like curve.fi don't have to worry about their user count - it does mean we will have a harder time capacity planning from the number of keys -- [ ] eth_getLogs is going to unsynced nodes when synced nodes are available. always prefer synced nodes - [ ] have the healthcheck get the block over http. if it errors, or doesn't match what the websocket says, something is wrong (likely a deadlock in the websocket code) - [ ] don't use new_head_provider anywhere except new head subscription -- [ ] maybe we shouldn't route eth_getLogs to syncing nodes. serving queries slows down sync significantly +- [x] maybe we shouldn't route eth_getLogs to syncing nodes. serving queries slows down sync significantly - change the send_best function to only include servers that are at least close to fully synced -- [ ] have private transactions be enabled by a url setting rather than a setting on the key +- [-] have private transactions be enabled by a url setting rather than a setting on the key - [ ] cli for adding rpc keys to an existing user - [ ] rate limiting/throttling on query_user_stats - [ ] web3rpc configs should have a max_concurrent_requests From 7b1b997c72f2008640a05bf778ade267e2fd3f38 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 7 Mar 2023 13:44:27 -0800 Subject: [PATCH 03/25] include backtrace in page --- web3_proxy/src/pagerduty.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/web3_proxy/src/pagerduty.rs b/web3_proxy/src/pagerduty.rs index 8716df90..39b0a3be 100644 --- a/web3_proxy/src/pagerduty.rs +++ b/web3_proxy/src/pagerduty.rs @@ -4,6 +4,7 @@ use log::{debug, error}; use pagerduty_rs::eventsv2sync::EventsV2 as PagerdutySyncEventsV2; use pagerduty_rs::types::{AlertTrigger, AlertTriggerPayload, Event}; use serde::Serialize; +use std::backtrace::Backtrace; use std::{ collections::hash_map::DefaultHasher, hash::{Hash, Hasher}, @@ -71,7 +72,9 @@ pub fn panic_handler( ) { let summary = format!("{}", panic_info); - let details = format!("{:#?}", panic_info); + let backtrace = Backtrace::force_capture(); + + let details = format!("{:#?}\n{:#?}", panic_info, backtrace); if summary.starts_with("panicked at 'WS Server panic") { // the ethers-rs library panics when websockets disconnect. this isn't a panic we care about reporting From d25eed0bd24167b969ef384a3cd9a87e729ac7b8 Mon Sep 17 00:00:00 2001 From: David Date: Wed, 8 Mar 2023 00:44:22 +0100 Subject: [PATCH 04/25] hotfix for admin address changes (#39) --- web3_proxy/src/frontend/admin.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/web3_proxy/src/frontend/admin.rs b/web3_proxy/src/frontend/admin.rs index 4d55af2a..fe156b80 100644 --- a/web3_proxy/src/frontend/admin.rs +++ b/web3_proxy/src/frontend/admin.rs @@ -21,7 +21,7 @@ use entities::{admin_trail, login, pending_login, rpc_key, user}; use ethers::{abi::AbiEncode, prelude::Address, types::Bytes}; use hashbrown::HashMap; use http::StatusCode; -use log::{debug, warn}; +use log::{debug, info, warn}; use migration::sea_orm::prelude::Uuid; use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, @@ -130,7 +130,7 @@ pub async fn admin_login_get( // TODO: accept a login_domain from the request? domain: login_domain.parse().unwrap(), // In the case of the admin, the admin needs to sign the message, so we include this logic ... - address: admin_address.to_fixed_bytes(), // user_address.to_fixed_bytes(), + address: admin_address.clone().to_fixed_bytes(), // user_address.to_fixed_bytes(), // TODO: config for statement statement: Some("🦙🦙🦙🦙🦙".to_string()), // TODO: don't unwrap @@ -145,6 +145,10 @@ pub async fn admin_login_get( resources: vec![], }; + let admin_address: Vec = admin_address + .to_fixed_bytes() + .into(); + let db_conn = app.db_conn().context("login requires a database")?; let db_replica = app .db_replica() @@ -160,8 +164,11 @@ pub async fn admin_login_get( "Could not find user in db".to_string(), ))?; + // TODO: Gotta check if encoding messes up things maybe ... + info!("Admin address is: {:?}", admin_address); + info!("Encoded admin address is: {:?}", admin_address); let admin = user::Entity::find() - .filter(user::Column::Address.eq(admin_address.encode())) + .filter(user::Column::Address.eq(admin_address)) .one(db_replica.conn()) .await? .ok_or(FrontendErrorResponse::BadRequest( From 02a7c0be1e70b13728ec04880e33bf54cb2f227e Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 7 Mar 2023 13:46:47 -0800 Subject: [PATCH 05/25] add todo --- web3_proxy/src/pagerduty.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/web3_proxy/src/pagerduty.rs b/web3_proxy/src/pagerduty.rs index 39b0a3be..f4808632 100644 --- a/web3_proxy/src/pagerduty.rs +++ b/web3_proxy/src/pagerduty.rs @@ -74,6 +74,7 @@ pub fn panic_handler( let backtrace = Backtrace::force_capture(); + // TODO: try to send to sentry and then put the sentry link into the page let details = format!("{:#?}\n{:#?}", panic_info, backtrace); if summary.starts_with("panicked at 'WS Server panic") { From 6717999ced6ba2296c33d439bc6eb9b7a7d1f6b9 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 7 Mar 2023 16:02:17 -0800 Subject: [PATCH 06/25] actually use backup rpcs --- web3_proxy/src/rpcs/blockchain.rs | 12 ++++++----- web3_proxy/src/rpcs/consensus.rs | 36 ++++++++++++++++++++----------- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 3f1c79a9..be220d29 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -221,7 +221,13 @@ impl Web3Rpcs { ) .await?; - let block = response.result.context("failed fetching block")?; + if let Some(err) = response.error { + let err = anyhow::anyhow!("{:#?}", err); + + return Err(err.context("failed fetching block")); + } + + let block = response.result.context("no error, but also no block")?; let block: Option = serde_json::from_str(block.get())?; @@ -482,10 +488,6 @@ impl Web3Rpcs { ) } else { // hash changed - if backups_needed { - // TODO: what else should be in this error? - warn!("Backup RPCs are in use!"); - } debug!( "unc {}/{} {}{}/{}/{}/{} con_head={} old={} rpc={}@{}", diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 5c6bf79e..1f6ecb99 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -267,7 +267,6 @@ impl ConnectionsGroup { } if backup_rpcs_voted.is_none() - && backup_consensus_rpcs != primary_consensus_rpcs && backup_sum_soft_limit >= web3_rpcs.min_sum_soft_limit && backup_consensus_rpcs.len() >= web3_rpcs.min_head_rpcs { @@ -303,7 +302,7 @@ impl ConnectionsGroup { ); if backup_rpcs_voted.is_some() { - warn!("{}", err_msg); + warn!("{}. using backup vote", err_msg); break; } else { return Err(anyhow::anyhow!(err_msg)); @@ -334,24 +333,35 @@ impl ConnectionsGroup { )); } - // success! this block has enough soft limit and nodes on it (or on later blocks) - let rpcs: Vec> = primary_consensus_rpcs + // success! a block has enough soft limit and nodes on it (or on later blocks) + let backups_needed; + let (head_block, consensus_rpcs) = if let Some(head_block) = primary_rpcs_voted { + backups_needed = false; + (head_block, primary_consensus_rpcs) + } else if let Some(head_block) = backup_rpcs_voted.clone() { + backups_needed = true; + (head_block, backup_consensus_rpcs) + } else { + return Err(anyhow::anyhow!("No head voted!")); + }; + + #[cfg(debug_assertions)] + { + let _ = head_block.hash(); + let _ = head_block.number(); + } + + let consensus_rpcs: Vec> = consensus_rpcs .into_iter() .filter_map(|conn_name| web3_rpcs.by_name.read().get(conn_name).cloned()) .collect(); - #[cfg(debug_assertions)] - { - let _ = maybe_head_block.hash(); - let _ = maybe_head_block.number(); - } - Ok(ConsensusWeb3Rpcs { tier: *tier, - head_block: maybe_head_block, - rpcs, + head_block, + rpcs: consensus_rpcs, backups_voted: backup_rpcs_voted, - backups_needed: primary_rpcs_voted.is_none(), + backups_needed, }) } } From df8cc3b95424b5f3845efb13aaff02f7f2b72997 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 7 Mar 2023 16:16:32 -0800 Subject: [PATCH 07/25] fix log order --- web3_proxy/src/rpcs/consensus.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 1f6ecb99..9f0ad4e4 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -295,9 +295,9 @@ impl ConnectionsGroup { primary_consensus_rpcs.len(), num_known, web3_rpcs.min_head_rpcs, + soft_limit_percent, primary_sum_soft_limit, web3_rpcs.min_sum_soft_limit, - soft_limit_percent, err, ); From d7a843cdc50742236b08e0a5af9494de1408fd4c Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 9 Mar 2023 09:32:17 -0800 Subject: [PATCH 08/25] lint --- web3_proxy/src/frontend/admin.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/web3_proxy/src/frontend/admin.rs b/web3_proxy/src/frontend/admin.rs index fe156b80..7af212f9 100644 --- a/web3_proxy/src/frontend/admin.rs +++ b/web3_proxy/src/frontend/admin.rs @@ -18,7 +18,7 @@ use axum_client_ip::InsecureClientIp; use axum_macros::debug_handler; use chrono::{TimeZone, Utc}; use entities::{admin_trail, login, pending_login, rpc_key, user}; -use ethers::{abi::AbiEncode, prelude::Address, types::Bytes}; +use ethers::{prelude::Address, types::Bytes}; use hashbrown::HashMap; use http::StatusCode; use log::{debug, info, warn}; @@ -130,7 +130,7 @@ pub async fn admin_login_get( // TODO: accept a login_domain from the request? domain: login_domain.parse().unwrap(), // In the case of the admin, the admin needs to sign the message, so we include this logic ... - address: admin_address.clone().to_fixed_bytes(), // user_address.to_fixed_bytes(), + address: admin_address.to_fixed_bytes(), // user_address.to_fixed_bytes(), // TODO: config for statement statement: Some("🦙🦙🦙🦙🦙".to_string()), // TODO: don't unwrap @@ -145,9 +145,7 @@ pub async fn admin_login_get( resources: vec![], }; - let admin_address: Vec = admin_address - .to_fixed_bytes() - .into(); + let admin_address: Vec = admin_address.to_fixed_bytes().into(); let db_conn = app.db_conn().context("login requires a database")?; let db_replica = app From b480e92259cf6d3b3c835617467070e4be2ab227 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 9 Mar 2023 09:32:30 -0800 Subject: [PATCH 09/25] don't panic if param is not set --- web3_proxy/src/app/mod.rs | 4 +++- web3_proxy/src/block_number.rs | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 6278b836..aa4677e7 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1493,7 +1493,9 @@ impl Web3ProxyApp { match &request.params { Some(serde_json::Value::Array(params)) => { // TODO: make a struct and use serde conversion to clean this up - if params.len() != 1 || !params[0].is_string() { + if params.len() != 1 + || !params.get(0).map(|x| x.is_string()).unwrap_or(false) + { // TODO: what error code? return Ok(( JsonRpcForwardedResponse::from_str( diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index bfb39299..82741d49 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -177,7 +177,9 @@ pub async fn block_needed( // TODO: think about this more // TODO: jsonrpc has a specific code for this // TODO: this shouldn't be a 500. this should be a 400. 500 will make haproxy retry a bunch - let obj = params[0] + let obj = params + .get_mut(0) + .ok_or_else(|| anyhow::anyhow!("invalid format. no params"))? .as_object_mut() .ok_or_else(|| anyhow::anyhow!("invalid format"))?; From 7b9bcf6881301cdb5d280cb51f2c55e183a55f95 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 9 Mar 2023 11:22:40 -0800 Subject: [PATCH 10/25] include head block in the log --- web3_proxy/src/rpcs/many.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 747b17fc..40f277bb 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1014,10 +1014,18 @@ impl Web3Rpcs { let num_skipped = skip_rpcs.len(); if num_skipped == 0 { + let consensus = watch_consensus_connections.borrow(); + + let head_block = consensus.as_ref().map(|x| &x.head_block); + error!( - "No servers synced ({:?}-{:?}) ({} known). None skipped", - min_block_needed, max_block_needed, num_conns + "No servers synced ({:?}-{:?}, {:?}) ({} known). None skipped", + min_block_needed, max_block_needed, head_block, num_conns ); + + drop(consensus); + + // TODO: remove this, or move to trace level debug!("{}", serde_json::to_string(&request).unwrap()); } else { // TODO: warn? debug? trace? From 4203c61a594f0b1db14888921737bda99ca868ae Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 9 Mar 2023 11:25:57 -0800 Subject: [PATCH 11/25] use pretty address in logs --- web3_proxy/src/bin/web3_proxy_cli/change_admin_status.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/web3_proxy/src/bin/web3_proxy_cli/change_admin_status.rs b/web3_proxy/src/bin/web3_proxy_cli/change_admin_status.rs index 4c5cf683..a6fcd9e5 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/change_admin_status.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/change_admin_status.rs @@ -25,11 +25,12 @@ impl ChangeAdminStatusSubCommand { let address: Address = self.address.parse()?; let should_be_admin: bool = self.should_be_admin; - let address: Vec = address.to_fixed_bytes().into(); + // we keep "address" around for use in logs + let address_vec: Vec = address.to_fixed_bytes().into(); // Find user in database let user = user::Entity::find() - .filter(user::Column::Address.eq(address.clone())) + .filter(user::Column::Address.eq(address_vec)) .one(db_conn) .await? .context(format!("No user with this id found {:?}", address))?; From babd215e69405d67cbc45802db5aca9ed91ef8af Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 9 Mar 2023 22:51:23 -0800 Subject: [PATCH 12/25] add an endpoint for debugging headers --- web3_proxy/src/frontend/mod.rs | 1 + web3_proxy/src/frontend/status.rs | 14 ++++++++++++++ 2 files changed, 15 insertions(+) diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index bfa7256d..4975a667 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -158,6 +158,7 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() // .route("/health", get(status::health)) .route("/status", get(status::status)) + .route("/status/headers", get(status::status_headers)) // // User stuff // diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index 1199dc25..a6997c14 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -7,6 +7,8 @@ use super::{FrontendHealthCache, FrontendResponseCache, FrontendResponseCaches}; use crate::app::{Web3ProxyApp, APP_USER_AGENT}; use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; use axum_macros::debug_handler; +use hashbrown::HashMap; +use http::HeaderMap; use serde_json::json; use std::sync::Arc; @@ -27,6 +29,18 @@ pub async fn health( } } +#[debug_handler] +pub async fn status_headers(headers: HeaderMap) -> impl IntoResponse { + let headers: HashMap, String> = headers + .into_iter() + .map(|(k, v)| (k.map(|k| k.to_string()), format!("{:?}", v))) + .collect(); + + let body = json!({ "headers": headers }); + + Json(body) +} + /// Very basic status page. /// /// TODO: replace this with proper stats and monitoring From 89853e24e5c346ed0c85f8afe93caab7fa3ec7cf Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 9 Mar 2023 22:53:01 -0800 Subject: [PATCH 13/25] all strings for json --- web3_proxy/src/frontend/status.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index a6997c14..0bb6c90e 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -31,9 +31,14 @@ pub async fn health( #[debug_handler] pub async fn status_headers(headers: HeaderMap) -> impl IntoResponse { - let headers: HashMap, String> = headers + let headers: HashMap = headers .into_iter() - .map(|(k, v)| (k.map(|k| k.to_string()), format!("{:?}", v))) + .map(|(k, v)| { + ( + k.map(|k| k.to_string()).unwrap_or_default(), + format!("{:?}", v), + ) + }) .collect(); let body = json!({ "headers": headers }); From 684c37ef6c465327b7e00c145f9ee2f9b05da3c1 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 10 Mar 2023 09:45:47 -0800 Subject: [PATCH 14/25] cargo upgrade --- Cargo.lock | 84 +++++++++++++------------------- deferred-rate-limiter/Cargo.toml | 2 +- entities/Cargo.toml | 2 +- migration/Cargo.toml | 2 +- rate-counter/Cargo.toml | 2 +- redis-rate-limiter/Cargo.toml | 2 +- web3_proxy/Cargo.toml | 10 ++-- 7 files changed, 43 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2e67ec28..47350e46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -282,9 +282,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.6.8" +version = "0.6.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bd379e511536bad07447f899300aa526e9bae8e6f66dc5e5ca45d7587b7c1ec" +checksum = "8582122b8edba2af43eaf6b80dbfd33f421b5a0eb3a3113d21bc096ac5b44faf" dependencies = [ "async-trait", "axum-core", @@ -312,7 +312,7 @@ dependencies = [ "tokio", "tokio-tungstenite 0.18.0", "tower", - "tower-http 0.3.5", + "tower-http", "tower-layer", "tower-service", ] @@ -330,9 +330,9 @@ dependencies = [ [[package]] name = "axum-core" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cae3e661676ffbacb30f1a824089a8c9150e71017f7e1e38f2aa32009188d34" +checksum = "b2f958c80c248b34b9a877a643811be8dbca03ca5ba827f2b63baf3a81e5fc4e" dependencies = [ "async-trait", "bytes", @@ -347,11 +347,11 @@ dependencies = [ [[package]] name = "axum-macros" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fbf955307ff8addb48d2399393c9e2740dd491537ec562b66ab364fc4a38841" +checksum = "404e816a138c27c29f7428ae9b1816ab880ba6923fa76a9f15296af79444a8dc" dependencies = [ - "heck 0.4.0", + "heck 0.4.1", "proc-macro2", "quote", "syn", @@ -810,7 +810,7 @@ version = "3.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea0c8bce528c4be4da13ea6fead8965e95b6073585a2f05204bd8f4119f82a65" dependencies = [ - "heck 0.4.0", + "heck 0.4.1", "proc-macro-error", "proc-macro2", "quote", @@ -2143,9 +2143,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.15" +version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f9f29bc9dda355256b2916cf526ab02ce0aeaaaf2bad60d65ef3f12f11dd0f4" +checksum = "5be7b54589b581f624f566bf5d8eb2bab1db736c51528720b6bd36b96b55924d" dependencies = [ "bytes", "fnv", @@ -2292,9 +2292,9 @@ dependencies = [ [[package]] name = "heck" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" dependencies = [ "unicode-segmentation", ] @@ -2617,9 +2617,9 @@ checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440" [[package]] name = "jobserver" -version = "0.1.25" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "068b1ee6743e4d11fb9c6a1e6064b3693a1b600e7f5f5988047d98b3dc9fb90b" +checksum = "936cfd212a0155903bcbc060e316fb6cc7cbf2e1907329391ebadc1fe0ce77c2" dependencies = [ "libc", ] @@ -4288,7 +4288,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63f62030c60f3a691f5fe251713b4e220b306e50a71e1d6f9cce1f24bb781978" dependencies = [ - "heck 0.4.0", + "heck 0.4.1", "proc-macro2", "quote", "syn", @@ -4484,9 +4484,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.152" +version = "1.0.154" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb" +checksum = "8cdd151213925e7f1ab45a9bbfb129316bd00799784b174b7cc7bcd16961c49e" dependencies = [ "serde_derive", ] @@ -4503,9 +4503,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.152" +version = "1.0.154" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" +checksum = "4fc80d722935453bcafdc2c9a73cd6fac4dc1938f0346035d84bf99fa9e33217" dependencies = [ "proc-macro2", "quote", @@ -4514,9 +4514,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.93" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cad406b69c91885b5107daf2c29572f6c8cdb3c66826821e286c533490c0bc76" +checksum = "1c533a59c9d8a93a09c6ab31f0fd5e5f4dd1b8fc9434804029839884765d04ea" dependencies = [ "itoa 1.0.5", "ryu", @@ -4725,9 +4725,9 @@ checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" [[package]] name = "socket2" -version = "0.4.7" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02e2d2db9033d13a1567121ddd7a095ee144db4e1ca1b1bda3419bc0da294ebd" +checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" dependencies = [ "libc", "winapi", @@ -4865,7 +4865,7 @@ checksum = "b850fa514dc11f2ee85be9d055c512aa866746adfacd1cb42d867d68e6a5b0d9" dependencies = [ "dotenvy", "either", - "heck 0.4.0", + "heck 0.4.1", "once_cell", "proc-macro2", "quote", @@ -4943,7 +4943,7 @@ version = "0.24.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" dependencies = [ - "heck 0.4.0", + "heck 0.4.1", "proc-macro2", "quote", "rustversion", @@ -4989,9 +4989,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.107" +version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f4064b5b16e03ae50984a5a8ed5d4f8803e6bc1fd170a3cda91a1be4b18e3f5" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ "proc-macro2", "quote", @@ -5178,9 +5178,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.25.0" +version = "1.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8e00990ebabbe4c14c08aca901caed183ecd5c09562a12c824bb53d3c3fd3af" +checksum = "03201d01c3c27a29c8a5cee5b55a93ddae1ccf6f08f65365c2c918f8c1b76f64" dependencies = [ "autocfg", "bytes", @@ -5194,7 +5194,7 @@ dependencies = [ "socket2", "tokio-macros", "tracing", - "windows-sys 0.42.0", + "windows-sys 0.45.0", ] [[package]] @@ -5349,25 +5349,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tower-http" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858" -dependencies = [ - "bitflags", - "bytes", - "futures-core", - "futures-util", - "http", - "http-body", - "http-range-header", - "pin-project-lite", - "tower", - "tower-layer", - "tower-service", -] - [[package]] name = "tower-http" version = "0.4.0" @@ -5382,6 +5363,7 @@ dependencies = [ "http-body", "http-range-header", "pin-project-lite", + "tower", "tower-layer", "tower-service", ] @@ -5863,7 +5845,7 @@ dependencies = [ "tokio-stream", "toml 0.7.2", "tower", - "tower-http 0.4.0", + "tower-http", "ulid", "url", "uuid 1.3.0", diff --git a/deferred-rate-limiter/Cargo.toml b/deferred-rate-limiter/Cargo.toml index 04cb8488..feca5295 100644 --- a/deferred-rate-limiter/Cargo.toml +++ b/deferred-rate-limiter/Cargo.toml @@ -11,4 +11,4 @@ anyhow = "1.0.69" hashbrown = "0.13.2" log = "0.4.17" moka = { version = "0.10.0", default-features = false, features = ["future"] } -tokio = "1.25.0" +tokio = "1.26.0" diff --git a/entities/Cargo.toml b/entities/Cargo.toml index a6ae80b0..0f73889f 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -11,7 +11,7 @@ path = "src/mod.rs" [dependencies] sea-orm = "0.11.0" -serde = "1.0.152" +serde = "1.0.154" uuid = "1.3.0" ethers = "1.0.2" ulid = "1.0.0" diff --git a/migration/Cargo.toml b/migration/Cargo.toml index b9da4c83..cea86b6b 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -9,7 +9,7 @@ name = "migration" path = "src/lib.rs" [dependencies] -tokio = { version = "1.25.0", features = ["full", "tracing"] } +tokio = { version = "1.26.0", features = ["full", "tracing"] } [dependencies.sea-orm-migration] version = "0.11.0" diff --git a/rate-counter/Cargo.toml b/rate-counter/Cargo.toml index 7185ecd2..0963526a 100644 --- a/rate-counter/Cargo.toml +++ b/rate-counter/Cargo.toml @@ -6,4 +6,4 @@ edition = "2021" [dependencies] flume = "0.10.14" -tokio = { version = "1.25.0", features = ["time"] } +tokio = { version = "1.26.0", features = ["time"] } diff --git a/redis-rate-limiter/Cargo.toml b/redis-rate-limiter/Cargo.toml index 9ba37ad3..a05080a8 100644 --- a/redis-rate-limiter/Cargo.toml +++ b/redis-rate-limiter/Cargo.toml @@ -7,4 +7,4 @@ edition = "2021" [dependencies] anyhow = "1.0.69" deadpool-redis = { version = "0.11.1", features = ["rt_tokio_1", "serde"] } -tokio = "1.25.0" +tokio = "1.26.0" diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 03f15b35..5e0d54bc 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -27,9 +27,9 @@ thread-fast-rng = { path = "../thread-fast-rng" } anyhow = { version = "1.0.69", features = ["backtrace"] } argh = "0.1.10" -axum = { version = "0.6.8", features = ["headers", "ws"] } +axum = { version = "0.6.10", features = ["headers", "ws"] } axum-client-ip = "0.4.0" -axum-macros = "0.3.4" +axum-macros = "0.3.5" chrono = "0.4.23" counter = "0.5.7" derive_more = "0.99.17" @@ -62,12 +62,12 @@ reqwest = { version = "0.11.14", default-features = false, features = ["json", " rmp-serde = "1.1.1" rustc-hash = "1.1.0" sentry = { version = "0.30.0", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] } -serde = { version = "1.0.152", features = [] } -serde_json = { version = "1.0.93", default-features = false, features = ["alloc", "raw_value"] } +serde = { version = "1.0.154", features = [] } +serde_json = { version = "1.0.94", default-features = false, features = ["alloc", "raw_value"] } serde_prometheus = "0.2.1" siwe = "0.5.0" time = "0.3.20" -tokio = { version = "1.25.0", features = ["full"] } +tokio = { version = "1.26.0", features = ["full"] } tokio-stream = { version = "0.1.12", features = ["sync"] } toml = "0.7.2" tower = "0.4.13" From b0c5d2b0eb450146d2db3cb592aecf85851d3277 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 10 Mar 2023 09:45:55 -0800 Subject: [PATCH 15/25] remove debugging headers --- web3_proxy/src/frontend/mod.rs | 1 - web3_proxy/src/frontend/rpc_proxy_http.rs | 16 ---------------- web3_proxy/src/frontend/status.rs | 17 ----------------- 3 files changed, 34 deletions(-) diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 4975a667..bfa7256d 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -158,7 +158,6 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() // .route("/health", get(status::health)) .route("/status", get(status::status)) - .route("/status/headers", get(status::status_headers)) // // User stuff // diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 1ddd93f5..d27f1533 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -99,15 +99,6 @@ async fn _proxy_web3_rpc( .expect("W3P-BACKEND-RPCS should always parse"), ); - // TODO: add a header if a backend rpc was used - - headers.insert( - "X-W3P-CLIENT-IP", - ip.to_string() - .parse() - .expect("X-CLIENT-IP should always parse"), - ); - Ok(response) } @@ -272,13 +263,6 @@ async fn _proxy_web3_rpc_with_key( .expect("W3P-BACKEND-RPCS should always parse"), ); - headers.insert( - "X-W3P-CLIENT-IP", - ip.to_string() - .parse() - .expect("X-CLIENT-IP should always parse"), - ); - if let Some(rpc_secret_key_id) = rpc_secret_key_id { headers.insert( "X-W3P-KEY-ID", diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index 0bb6c90e..53cf5824 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -29,23 +29,6 @@ pub async fn health( } } -#[debug_handler] -pub async fn status_headers(headers: HeaderMap) -> impl IntoResponse { - let headers: HashMap = headers - .into_iter() - .map(|(k, v)| { - ( - k.map(|k| k.to_string()).unwrap_or_default(), - format!("{:?}", v), - ) - }) - .collect(); - - let body = json!({ "headers": headers }); - - Json(body) -} - /// Very basic status page. /// /// TODO: replace this with proper stats and monitoring From d992bb2dc9101a6c42eb685f4b6ec5c142ff66df Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 21 Mar 2023 16:00:34 -0700 Subject: [PATCH 16/25] skip, don't error --- web3_proxy/src/rpcs/consensus.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 9f0ad4e4..6fb596bf 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -206,6 +206,7 @@ impl ConnectionsGroup { // track rpcs on this heaviest chain so we can build a new ConsensusConnections let mut primary_consensus_rpcs = HashSet::<&str>::new(); let mut backup_consensus_rpcs = HashSet::<&str>::new(); + let mut skip_rpcs = HashSet::<&str>::new(); // a running total of the soft limits covered by the rpcs that agree on the head block let mut primary_sum_soft_limit: u32 = 0; @@ -235,10 +236,14 @@ impl ConnectionsGroup { // connection is on a later block in this same chain continue; } + if skip_rpcs.contains(rpc_name) { + // connection is missing or theres some other reason to skip this rpc + continue; + } if let Some(rpc) = web3_rpcs.by_name.read().get(rpc_name) { if backup_rpcs_voted.is_some() { - // backups already voted for a head block. don't change it + // backups already voted for a head block on another tier. don't change it } else { backup_consensus_rpcs.insert(rpc_name); backup_sum_soft_limit += rpc.soft_limit; @@ -253,8 +258,9 @@ impl ConnectionsGroup { warn!("connection missing: {}", rpc_name); debug!("web3_rpcs.by_name: {:#?}", web3_rpcs.by_name); } else { - return Err(anyhow::anyhow!("not synced")); + debug!("connection missing: {}", rpc_name); } + skip_rpcs.insert(rpc_name); } } From 5465d927b4e48928ca7f3c2f323ed9fb618592db Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 21 Mar 2023 21:22:16 -0700 Subject: [PATCH 17/25] remove a debug log --- web3_proxy/src/rpcs/many.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 40f277bb..f6632679 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1026,7 +1026,7 @@ impl Web3Rpcs { drop(consensus); // TODO: remove this, or move to trace level - debug!("{}", serde_json::to_string(&request).unwrap()); + // debug!("{}", serde_json::to_string(&request).unwrap()); } else { // TODO: warn? debug? trace? warn!( From 9fae137e45786afcddaf70bdcbec0b1c88ac5402 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 22 Mar 2023 02:06:26 -0700 Subject: [PATCH 18/25] shorten log message --- web3_proxy/src/rpcs/many.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index f6632679..24f84678 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1016,11 +1016,11 @@ impl Web3Rpcs { if num_skipped == 0 { let consensus = watch_consensus_connections.borrow(); - let head_block = consensus.as_ref().map(|x| &x.head_block); + let head_block_num = consensus.as_ref().map(|x| x.head_block.number()); error!( "No servers synced ({:?}-{:?}, {:?}) ({} known). None skipped", - min_block_needed, max_block_needed, head_block, num_conns + min_block_needed, max_block_needed, head_block_num, num_conns ); drop(consensus); From 8eff48611c50090f1646839dcfe115c52e436831 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 21 Mar 2023 11:16:18 -0700 Subject: [PATCH 19/25] more direct consensus finding code this hopefully has less bugs. speed isn't super important since this isn't on the host path. --- TODO.md | 2 + web3_proxy/src/frontend/mod.rs | 5 +- web3_proxy/src/rpcs/blockchain.rs | 59 ++- web3_proxy/src/rpcs/consensus.rs | 575 ++++++++++-------------------- web3_proxy/src/rpcs/many.rs | 13 +- web3_proxy/src/rpcs/request.rs | 2 +- 6 files changed, 223 insertions(+), 433 deletions(-) diff --git a/TODO.md b/TODO.md index 9b10ae12..4336e32f 100644 --- a/TODO.md +++ b/TODO.md @@ -390,6 +390,7 @@ These are not yet ordered. There might be duplicates. We might not actually need - [x] maybe we shouldn't route eth_getLogs to syncing nodes. serving queries slows down sync significantly - change the send_best function to only include servers that are at least close to fully synced - [-] have private transactions be enabled by a url setting rather than a setting on the key +- [ ] eth_sendRawTransaction should only forward if the chain_id matches what we are running - [ ] cli for adding rpc keys to an existing user - [ ] rate limiting/throttling on query_user_stats - [ ] web3rpc configs should have a max_concurrent_requests @@ -425,6 +426,7 @@ These are not yet ordered. There might be duplicates. We might not actually need - [ ] implement remaining subscriptions - would be nice if our subscriptions had better gaurentees than geth/erigon do, but maybe simpler to just setup a broadcast channel and proxy all the respones to a backend instead - [ ] tests should use `test-env-log = "0.2.8"` +- [ ] eth_sendRawTransaction should only forward if the chain_id matches what we are running - [ ] weighted random choice should still prioritize non-archive servers - maybe shuffle randomly and then sort by (block_limit, random_index)? - maybe sum available_requests grouped by archive/non-archive. only limit to non-archive if they have enough? diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index bfa7256d..0d0e5146 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -232,6 +232,7 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() axum::Server::bind(&addr) // TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not .serve(service) - .await - .map_err(Into::into) + .await?; + + Ok(()) } diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index be220d29..b90b9958 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -12,6 +12,7 @@ use log::{debug, trace, warn, Level}; use moka::future::Cache; use serde::Serialize; use serde_json::json; +use std::hash::Hash; use std::time::{SystemTime, UNIX_EPOCH}; use std::{cmp::Ordering, fmt::Display, sync::Arc}; use tokio::sync::broadcast; @@ -42,6 +43,14 @@ impl PartialEq for Web3ProxyBlock { } } +impl Eq for Web3ProxyBlock {} + +impl Hash for Web3ProxyBlock { + fn hash(&self, state: &mut H) { + self.block.hash.hash(state); + } +} + impl Web3ProxyBlock { /// A new block has arrived over a subscription pub fn try_new(block: ArcBlock) -> Option { @@ -393,36 +402,28 @@ impl Web3Rpcs { return Ok(()); } - let new_synced_connections = consensus_finder - .best_consensus_connections(authorization, self) - .await - .context("no consensus head block!") - .map_err(|err| { - self.watch_consensus_rpcs_sender.send_replace(None); + let new_synced_connections = match consensus_finder + .find_consensus_connections(authorization, self) + .await { + Err(err) => { + let err = err.context("error while finding consensus head block!"); - err - })?; - - // TODO: what should we do if the block number of new_synced_connections is < old_synced_connections? wait? + return Err(err); + } + Ok(None) => { + return Err(anyhow!("no consensus head block!")); + } + Ok(Some(x)) => x + }; let watch_consensus_head_sender = self.watch_consensus_head_sender.as_ref().unwrap(); let consensus_tier = new_synced_connections.tier; - let total_tiers = consensus_finder.len(); + // TODO: think more about this unwrap + let total_tiers = consensus_finder.worst_tier().unwrap_or(10); let backups_needed = new_synced_connections.backups_needed; let consensus_head_block = new_synced_connections.head_block.clone(); let num_consensus_rpcs = new_synced_connections.num_conns(); - let mut num_synced_rpcs = 0; - let num_active_rpcs = consensus_finder - .all_rpcs_group() - .map(|x| { - for v in x.rpc_to_block.values() { - if *v == consensus_head_block { - num_synced_rpcs += 1; - } - } - x.len() - }) - .unwrap_or_default(); + let num_active_rpcs = consensus_finder.len(); let total_rpcs = self.by_name.read().len(); let old_consensus_head_connections = self @@ -474,12 +475,11 @@ impl Web3Rpcs { // no change in hash. no need to use watch_consensus_head_sender // TODO: trace level if rpc is backup debug!( - "con {}/{} {}{}/{}/{}/{} con={} rpc={}@{}", + "con {}/{} {}{}/{}/{} con={} rpc={}@{}", consensus_tier, total_tiers, backups_voted_str, num_consensus_rpcs, - num_synced_rpcs, num_active_rpcs, total_rpcs, consensus_head_block, @@ -490,12 +490,11 @@ impl Web3Rpcs { // hash changed debug!( - "unc {}/{} {}{}/{}/{}/{} con_head={} old={} rpc={}@{}", + "unc {}/{} {}{}/{}/{} con_head={} old={} rpc={}@{}", consensus_tier, total_tiers, backups_voted_str, num_consensus_rpcs, - num_synced_rpcs, num_active_rpcs, total_rpcs, consensus_head_block, @@ -518,12 +517,11 @@ impl Web3Rpcs { // this is unlikely but possible // TODO: better log warn!( - "chain rolled back {}/{} {}{}/{}/{}/{} con={} old={} rpc={}@{}", + "chain rolled back t{}/{} {}{}/{}/{} con={} old={} rpc={}@{}", consensus_tier, total_tiers, backups_voted_str, num_consensus_rpcs, - num_synced_rpcs, num_active_rpcs, total_rpcs, consensus_head_block, @@ -549,12 +547,11 @@ impl Web3Rpcs { } Ordering::Greater => { debug!( - "new {}/{} {}{}/{}/{}/{} con={} rpc={}@{}", + "new {}/{} {}{}/{}/{} con={} rpc={}@{}", consensus_tier, total_tiers, backups_voted_str, num_consensus_rpcs, - num_synced_rpcs, num_active_rpcs, total_rpcs, consensus_head_block, diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 6fb596bf..943dd051 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -6,10 +6,11 @@ use super::one::Web3Rpc; use anyhow::Context; use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; -use log::{debug, trace, warn}; +use itertools::{Itertools, MinMaxResult}; +use log::{trace, warn}; use moka::future::Cache; use serde::Serialize; -use std::collections::BTreeMap; +use std::cmp::Reverse; use std::fmt; use std::sync::Arc; use tokio::time::Instant; @@ -18,23 +19,19 @@ use tokio::time::Instant; /// Serialize is so we can print it on our debug endpoint #[derive(Clone, Serialize)] pub struct ConsensusWeb3Rpcs { - // TODO: tier should be an option, or we should have consensus be stored as an Option pub(super) tier: u64, pub(super) head_block: Web3ProxyBlock, // TODO: this should be able to serialize, but it isn't #[serde(skip_serializing)] - pub(super) rpcs: Vec>, - pub(super) backups_voted: Option, + pub(super) best_rpcs: Vec>, + // TODO: functions like "compare_backup_vote()" + // pub(super) backups_voted: Option, pub(super) backups_needed: bool, } impl ConsensusWeb3Rpcs { pub fn num_conns(&self) -> usize { - self.rpcs.len() - } - - pub fn sum_soft_limit(&self) -> u32 { - self.rpcs.iter().fold(0, |sum, rpc| sum + rpc.soft_limit) + self.best_rpcs.len() } // TODO: sum_hard_limit? @@ -44,9 +41,9 @@ impl fmt::Debug for ConsensusWeb3Rpcs { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default formatter takes forever to write. this is too quiet though // TODO: print the actual conns? - f.debug_struct("ConsensusConnections") + f.debug_struct("ConsensusWeb3Rpcs") .field("head_block", &self.head_block) - .field("num_conns", &self.rpcs.len()) + .field("num_conns", &self.best_rpcs.len()) .finish_non_exhaustive() } } @@ -73,7 +70,7 @@ impl Web3Rpcs { let consensus = self.watch_consensus_rpcs_sender.borrow(); if let Some(consensus) = consensus.as_ref() { - !consensus.rpcs.is_empty() + !consensus.best_rpcs.is_empty() } else { false } @@ -83,7 +80,7 @@ impl Web3Rpcs { let consensus = self.watch_consensus_rpcs_sender.borrow(); if let Some(consensus) = consensus.as_ref() { - consensus.rpcs.len() + consensus.best_rpcs.len() } else { 0 } @@ -92,50 +89,51 @@ impl Web3Rpcs { type FirstSeenCache = Cache; -pub struct ConnectionsGroup { - pub rpc_to_block: HashMap, Web3ProxyBlock>, - // TODO: what if there are two blocks with the same number? - pub highest_block: Option, +/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers +pub struct ConsensusFinder { + /// backups for all tiers are only used if necessary + /// tiers[0] = only tier 0. + /// tiers[1] = tier 0 and tier 1 + /// tiers[n] = tier 0..=n + /// This is a BTreeMap and not a Vec because sometimes a tier is empty + rpc_heads: HashMap, Web3ProxyBlock>, + /// never serve blocks that are too old + max_block_age: Option, + /// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag + max_block_lag: Option, /// used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups first_seen: FirstSeenCache, } -impl ConnectionsGroup { - pub fn new(first_seen: FirstSeenCache) -> Self { +impl ConsensusFinder { + pub fn new(max_block_age: Option, max_block_lag: Option) -> Self { + // TODO: what's a good capacity for this? it shouldn't need to be very large + // TODO: if we change Web3ProxyBlock to store the instance, i think we could use the block_by_hash cache + let first_seen = Cache::builder() + .max_capacity(16) + .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + + // TODO: hard coding 0-9 isn't great, but its easier than refactoring this to be smart about config reloading + let rpc_heads = HashMap::new(); + Self { - rpc_to_block: Default::default(), - highest_block: Default::default(), + rpc_heads, + max_block_age, + max_block_lag, first_seen, } } pub fn len(&self) -> usize { - self.rpc_to_block.len() + self.rpc_heads.len() } pub fn is_empty(&self) -> bool { - self.rpc_to_block.is_empty() + self.rpc_heads.is_empty() } fn remove(&mut self, rpc: &Arc) -> Option { - if let Some(removed_block) = self.rpc_to_block.remove(rpc) { - match self.highest_block.as_mut() { - None => {} - Some(current_highest_block) => { - if removed_block.hash() == current_highest_block.hash() { - for maybe_highest_block in self.rpc_to_block.values() { - if maybe_highest_block.number() > current_highest_block.number() { - *current_highest_block = maybe_highest_block.clone(); - }; - } - } - } - } - - Some(removed_block) - } else { - None - } + self.rpc_heads.remove(rpc) } async fn insert(&mut self, rpc: Arc, block: Web3ProxyBlock) -> Option { @@ -150,322 +148,7 @@ impl ConnectionsGroup { rpc.head_latency.write().record(latency); - // TODO: what about a reorg to the same height? - if Some(block.number()) > self.highest_block.as_ref().map(|x| x.number()) { - self.highest_block = Some(block.clone()); - } - - self.rpc_to_block.insert(rpc, block) - } - - /// min_consensus_block_num keeps us from ever going backwards. - /// TODO: think about min_consensus_block_num more. i think this might cause an outage if the chain is doing weird things. but 503s is probably better than broken data. - pub(self) async fn consensus_head_connections( - &self, - authorization: &Arc, - web3_rpcs: &Web3Rpcs, - min_consensus_block_num: Option, - tier: &u64, - ) -> anyhow::Result { - let mut maybe_head_block = match self.highest_block.clone() { - None => return Err(anyhow::anyhow!("no blocks known")), - Some(x) => x, - }; - - // TODO: take max_distance_consensus_to_highest as an argument? - // TODO: what if someone's backup node is misconfigured and goes on a really fast forked chain? - let max_lag_consensus_to_highest = - if let Some(min_consensus_block_num) = min_consensus_block_num { - maybe_head_block - .number() - .saturating_add(1.into()) - .saturating_sub(min_consensus_block_num) - .as_u64() - } else { - 10 - }; - - trace!( - "max_lag_consensus_to_highest: {}", - max_lag_consensus_to_highest - ); - - let num_known = self.rpc_to_block.len(); - - if num_known < web3_rpcs.min_head_rpcs { - return Err(anyhow::anyhow!( - "not enough rpcs connected: {}/{}", - num_known, - web3_rpcs.min_head_rpcs, - )); - } - - let mut primary_rpcs_voted: Option = None; - let mut backup_rpcs_voted: Option = None; - - // track rpcs on this heaviest chain so we can build a new ConsensusConnections - let mut primary_consensus_rpcs = HashSet::<&str>::new(); - let mut backup_consensus_rpcs = HashSet::<&str>::new(); - let mut skip_rpcs = HashSet::<&str>::new(); - - // a running total of the soft limits covered by the rpcs that agree on the head block - let mut primary_sum_soft_limit: u32 = 0; - let mut backup_sum_soft_limit: u32 = 0; - - // TODO: also track the sum of *available* hard_limits. if any servers have no hard limits, use their soft limit or no limit? - - // check the highest work block for a set of rpcs that can serve our request load - // if it doesn't have enough rpcs for our request load, check the parent block - // TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind. probably different per chain - // TODO: this loop is pretty long. any way to clean up this code? - for _ in 0..max_lag_consensus_to_highest { - let maybe_head_hash = maybe_head_block.hash(); - - // find all rpcs with maybe_head_hash as their current head - for (rpc, rpc_head) in self.rpc_to_block.iter() { - if rpc_head.hash() != maybe_head_hash { - // connection is not on the desired block - continue; - } - let rpc_name = rpc.name.as_str(); - if backup_consensus_rpcs.contains(rpc_name) { - // connection is on a later block in this same chain - continue; - } - if primary_consensus_rpcs.contains(rpc_name) { - // connection is on a later block in this same chain - continue; - } - if skip_rpcs.contains(rpc_name) { - // connection is missing or theres some other reason to skip this rpc - continue; - } - - if let Some(rpc) = web3_rpcs.by_name.read().get(rpc_name) { - if backup_rpcs_voted.is_some() { - // backups already voted for a head block on another tier. don't change it - } else { - backup_consensus_rpcs.insert(rpc_name); - backup_sum_soft_limit += rpc.soft_limit; - } - if !rpc.backup { - primary_consensus_rpcs.insert(rpc_name); - primary_sum_soft_limit += rpc.soft_limit; - } - } else { - // i don't think this is an error. i think its just if a reconnect is currently happening - if web3_rpcs.synced() { - warn!("connection missing: {}", rpc_name); - debug!("web3_rpcs.by_name: {:#?}", web3_rpcs.by_name); - } else { - debug!("connection missing: {}", rpc_name); - } - skip_rpcs.insert(rpc_name); - } - } - - if primary_sum_soft_limit >= web3_rpcs.min_sum_soft_limit - && primary_consensus_rpcs.len() >= web3_rpcs.min_head_rpcs - { - // we have enough servers with enough requests! yey! - primary_rpcs_voted = Some(maybe_head_block.clone()); - break; - } - - if backup_rpcs_voted.is_none() - && backup_sum_soft_limit >= web3_rpcs.min_sum_soft_limit - && backup_consensus_rpcs.len() >= web3_rpcs.min_head_rpcs - { - // if we include backup servers, we have enough servers with high enough limits - backup_rpcs_voted = Some(maybe_head_block.clone()); - } - - // not enough rpcs on this block. check the parent block - match web3_rpcs - .block(authorization, maybe_head_block.parent_hash(), None) - .await - { - Ok(parent_block) => { - // trace!( - // child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd. checking consensus on parent block", - // ); - maybe_head_block = parent_block; - continue; - } - Err(err) => { - let soft_limit_percent = (primary_sum_soft_limit as f32 - / web3_rpcs.min_sum_soft_limit as f32) - * 100.0; - - let err_msg = format!("ran out of parents to check. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{}). err: {:#?}", - primary_consensus_rpcs.len(), - num_known, - web3_rpcs.min_head_rpcs, - soft_limit_percent, - primary_sum_soft_limit, - web3_rpcs.min_sum_soft_limit, - err, - ); - - if backup_rpcs_voted.is_some() { - warn!("{}. using backup vote", err_msg); - break; - } else { - return Err(anyhow::anyhow!(err_msg)); - } - } - } - } - - // TODO: if consensus_head_rpcs.is_empty, try another method of finding the head block. will need to change the return Err above into breaks. - - // we've done all the searching for the heaviest block that we can - if (primary_consensus_rpcs.len() < web3_rpcs.min_head_rpcs - || primary_sum_soft_limit < web3_rpcs.min_sum_soft_limit) - && backup_rpcs_voted.is_none() - { - // if we get here, not enough servers are synced. return an error - let soft_limit_percent = - (primary_sum_soft_limit as f32 / web3_rpcs.min_sum_soft_limit as f32) * 100.0; - - return Err(anyhow::anyhow!( - "Not enough resources. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{})", - primary_consensus_rpcs.len(), - num_known, - web3_rpcs.min_head_rpcs, - primary_sum_soft_limit, - web3_rpcs.min_sum_soft_limit, - soft_limit_percent, - )); - } - - // success! a block has enough soft limit and nodes on it (or on later blocks) - let backups_needed; - let (head_block, consensus_rpcs) = if let Some(head_block) = primary_rpcs_voted { - backups_needed = false; - (head_block, primary_consensus_rpcs) - } else if let Some(head_block) = backup_rpcs_voted.clone() { - backups_needed = true; - (head_block, backup_consensus_rpcs) - } else { - return Err(anyhow::anyhow!("No head voted!")); - }; - - #[cfg(debug_assertions)] - { - let _ = head_block.hash(); - let _ = head_block.number(); - } - - let consensus_rpcs: Vec> = consensus_rpcs - .into_iter() - .filter_map(|conn_name| web3_rpcs.by_name.read().get(conn_name).cloned()) - .collect(); - - Ok(ConsensusWeb3Rpcs { - tier: *tier, - head_block, - rpcs: consensus_rpcs, - backups_voted: backup_rpcs_voted, - backups_needed, - }) - } -} - -/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers -pub struct ConsensusFinder { - /// backups for all tiers are only used if necessary - /// tiers[0] = only tier 0. - /// tiers[1] = tier 0 and tier 1 - /// tiers[n] = tier 0..=n - /// This is a BTreeMap and not a Vec because sometimes a tier is empty - tiers: BTreeMap, - /// never serve blocks that are too old - max_block_age: Option, - /// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag - max_block_lag: Option, -} - -impl ConsensusFinder { - pub fn new(max_block_age: Option, max_block_lag: Option) -> Self { - // TODO: what's a good capacity for this? it shouldn't need to be very large - // TODO: if we change Web3ProxyBlock to store the instance, i think we could use the block_by_hash cache - let first_seen = Cache::builder() - .max_capacity(16) - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - - // TODO: hard coding 0-9 isn't great, but its easier than refactoring this to be smart about config reloading - let tiers = (0..10) - .map(|x| (x, ConnectionsGroup::new(first_seen.clone()))) - .collect(); - - Self { - tiers, - max_block_age, - max_block_lag, - } - } - - pub fn len(&self) -> usize { - self.tiers.len() - } - - pub fn is_empty(&self) -> bool { - self.tiers.is_empty() - } - - /// get the ConnectionsGroup that contains all rpcs - /// panics if there are no tiers - pub fn all_rpcs_group(&self) -> Option<&ConnectionsGroup> { - self.tiers.values().last() - } - - /// get the mutable ConnectionsGroup that contains all rpcs - pub fn all_mut(&mut self) -> Option<&mut ConnectionsGroup> { - self.tiers.values_mut().last() - } - - pub fn remove(&mut self, rpc: &Arc) -> Option { - let mut removed = None; - - for (i, tier_group) in self.tiers.iter_mut().rev() { - if i < &rpc.tier { - break; - } - let x = tier_group.remove(rpc); - - if removed.is_none() && x.is_some() { - removed = x; - } - } - - removed - } - - /// returns the block that the rpc was on before updating to the new_block - pub async fn insert( - &mut self, - rpc: &Arc, - new_block: Web3ProxyBlock, - ) -> Option { - let mut old = None; - - // TODO: error if rpc.tier is not in self.tiers - - for (i, tier_group) in self.tiers.iter_mut().rev() { - if i < &rpc.tier { - break; - } - - // TODO: should new_block be a ref? - let x = tier_group.insert(rpc.clone(), new_block.clone()).await; - - if old.is_none() && x.is_some() { - old = x; - } - } - - old + self.rpc_heads.insert(rpc, block) } /// Update our tracking of the rpc and return true if something changed @@ -499,8 +182,8 @@ impl ConsensusFinder { } } - if let Some(prev_block) = self.insert(&rpc, rpc_head_block.clone()).await { - // false if this block was already sent by this rpc. return early + if let Some(prev_block) = self.insert(rpc, rpc_head_block.clone()).await { + // false if this block was already sent by this rpc // true if new block for this rpc prev_block.hash() != rpc_head_block.hash() } else { @@ -518,47 +201,155 @@ impl ConsensusFinder { Ok(changed) } - pub async fn best_consensus_connections( + pub async fn find_consensus_connections( &mut self, authorization: &Arc, - web3_connections: &Web3Rpcs, - ) -> anyhow::Result { - // TODO: attach context to these? - let highest_known_block = self - .all_rpcs_group() - .context("no rpcs")? - .highest_block - .as_ref() - .context("no highest block")?; + web3_rpcs: &Web3Rpcs, + ) -> anyhow::Result> { + let minmax_block = self + .rpc_heads + .values().minmax_by_key(|&x| x.number()); - trace!("highest_known_block: {}", highest_known_block); + let (lowest_block, highest_block) = match minmax_block { + MinMaxResult::NoElements => return Ok(None), + MinMaxResult::OneElement(x) => (x, x), + MinMaxResult::MinMax(min, max) => (min, max), + }; - let min_block_num = self - .max_block_lag - .map(|x| highest_known_block.number().saturating_sub(x)) - // we also want to be sure we don't ever go backwards! - .max(web3_connections.head_block_num()); + let highest_block_number = highest_block.number(); - trace!("min_block_num: {:#?}", min_block_num); + trace!("highest_block_number: {}", highest_block_number); - // TODO Should this be a Vec>>? - // TODO: how should errors be handled? - // TODO: find the best tier with a connectionsgroup. best case, this only queries the first tier - // TODO: do we need to calculate all of them? I think having highest_known_block included as part of min_block_num should make that unnecessary - for (tier, x) in self.tiers.iter() { - trace!("checking tier {}: {:#?}", tier, x.rpc_to_block); - if let Ok(consensus_head_connections) = x - .consensus_head_connections(authorization, web3_connections, min_block_num, tier) - .await - { - trace!("success on tier {}", tier); - // we got one! hopefully it didn't need to use any backups. - // but even if it did need backup servers, that is better than going to a worse tier - return Ok(consensus_head_connections); + trace!("lowest_block_number: {}", lowest_block.number()); + + let max_lag_block_number = highest_block_number.saturating_sub(self.max_block_lag.unwrap_or_else(|| U64::from(10))); + + trace!("max_lag_block_number: {}", max_lag_block_number); + + let lowest_block_number = lowest_block.number().max(&max_lag_block_number); + + trace!("safe lowest_block_number: {}", lowest_block_number); + + let num_known = self.rpc_heads.len(); + + if num_known < web3_rpcs.min_head_rpcs { + // this keeps us from serving requests when the proxy first starts + return Ok(None); + } + + // TODO: also track the sum of *available* hard_limits? if any servers have no hard limits, use their soft limit or no limit? + // TODO: struct for the value of the votes hashmap? + let mut primary_votes: HashMap, u32)> = Default::default(); + let mut backup_votes: HashMap, u32)> = Default::default(); + + let mut backup_consensus = None; + + let mut rpc_heads_by_tier: Vec<_> = self.rpc_heads.iter().collect(); + rpc_heads_by_tier.sort_by_cached_key(|(rpc, _)| rpc.tier); + + let current_tier = rpc_heads_by_tier.first().expect("rpc_heads_by_tier should never be empty").0.tier; + + // loop over all the rpc heads (grouped by tier) and their parents to find consensus + // TODO: i'm sure theres a lot of shortcuts that could be taken, but this is simplest to implement + for (rpc, rpc_head) in self.rpc_heads.iter() { + if current_tier != rpc.tier { + // we finished processing a tier. check for primary results + if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) { + return Ok(Some(consensus)) + } + + // only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus + if backup_consensus.is_none() { + if let Some(consensus) = self.count_votes(&backup_votes, web3_rpcs) { + backup_consensus =Some(consensus) + } + } + } + + let mut block_to_check = rpc_head.clone(); + + while block_to_check.number() >= lowest_block_number { + if !rpc.backup { + // backup nodes are excluded from the primary voting + let entry = primary_votes.entry(block_to_check.clone()).or_default(); + + entry.0.insert(&rpc.name); + entry.1 += rpc.soft_limit; + } + + // both primary and backup rpcs get included in the backup voting + let backup_entry = backup_votes.entry(block_to_check.clone()).or_default(); + + backup_entry.0.insert(&rpc.name); + backup_entry.1 += rpc.soft_limit; + + match web3_rpcs.block(authorization, block_to_check.parent_hash(), Some(rpc)).await { + Ok(parent_block) => block_to_check = parent_block, + Err(err) => { + warn!("Problem fetching parent block of {:#?} during consensus finding: {:#?}", block_to_check, err); + break; + } + } } } - return Err(anyhow::anyhow!("failed finding consensus on all tiers")); + // we finished processing all tiers. check for primary results (if anything but the last tier found consensus, we already returned above) + if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) { + return Ok(Some(consensus)) + } + + // only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus + if let Some(consensus) = backup_consensus { + return Ok(Some(consensus)); + } + + // count votes one last time + Ok(self.count_votes(&backup_votes, web3_rpcs)) + } + + // TODO: have min_sum_soft_limit and min_head_rpcs on self instead of on Web3Rpcs + fn count_votes(&self, votes: &HashMap, u32)>, web3_rpcs: &Web3Rpcs) -> Option { + // sort the primary votes ascending by tier and descending by block num + let mut votes: Vec<_> = votes.iter().map(|(block, (rpc_names, sum_soft_limit))| (block, sum_soft_limit, rpc_names)).collect(); + votes.sort_by_cached_key(|(block, sum_soft_limit, rpc_names)| (Reverse(*block.number()), Reverse(*sum_soft_limit), Reverse(rpc_names.len()))); + + // return the first result that exceededs confgured minimums (if any) + for (maybe_head_block, sum_soft_limit, rpc_names) in votes { + if *sum_soft_limit < web3_rpcs.min_sum_soft_limit { + continue; + } + // TODO: different mins for backup vs primary + if rpc_names.len() < web3_rpcs.min_head_rpcs { + continue; + } + + // consensus likely found! load the rpcs to make sure they all have active connections + let consensus_rpcs: Vec<_> = rpc_names.into_iter().filter_map(|x| web3_rpcs.get(x)).collect(); + + if consensus_rpcs.len() < web3_rpcs.min_head_rpcs { + continue; + } + // consensus found! + + let tier = consensus_rpcs.iter().map(|x| x.tier).max().expect("there should always be a max"); + + let backups_needed = consensus_rpcs.iter().any(|x| x.backup); + + let consensus = ConsensusWeb3Rpcs { + tier, + head_block: maybe_head_block.clone(), + best_rpcs: consensus_rpcs, + backups_needed, + }; + + return Some(consensus); + } + + None + } + + pub fn worst_tier(&self) -> Option { + self.rpc_heads.iter().map(|(x, _)| x.tier).max() } } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 24f84678..98a58391 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -27,7 +27,7 @@ use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; use serde_json::value::RawValue; -use std::cmp::min_by_key; +use std::cmp::{min_by_key, Reverse}; use std::collections::BTreeMap; use std::sync::atomic::{self, Ordering}; use std::sync::Arc; @@ -611,7 +611,7 @@ impl Web3Rpcs { // they are all at the same block and it is already sized to what we need let key = (0, None); - for x in synced_connections.rpcs.iter() { + for x in synced_connections.best_rpcs.iter() { if skip.contains(x) { trace!("skipping: {}", x); continue; @@ -741,7 +741,7 @@ impl Web3Rpcs { let synced_rpcs = self.watch_consensus_rpcs_sender.borrow(); if let Some(synced_rpcs) = synced_rpcs.as_ref() { - synced_rpcs.rpcs.clone() + synced_rpcs.best_rpcs.clone() } else { vec![] } @@ -1219,9 +1219,8 @@ impl Serialize for Web3Rpcs { /// TODO: should this be moved into a `impl Web3Rpc`? /// TODO: i think we still have sorts scattered around the code that should use this /// TODO: take AsRef or something like that? We don't need an Arc here -fn rpc_sync_status_sort_key(x: &Arc) -> (U64, u64, bool, OrderedFloat) { - let reversed_head_block = U64::MAX - - x.head_block +fn rpc_sync_status_sort_key(x: &Arc) -> (Reverse, u64, bool, OrderedFloat) { + let head_block = x.head_block .read() .as_ref() .map(|x| *x.number()) @@ -1241,7 +1240,7 @@ fn rpc_sync_status_sort_key(x: &Arc) -> (U64, u64, bool, OrderedFloat unimplemented!(), + Web3Provider::Mock => return Err(ProviderError::CustomError("mock provider can't respond".to_string())), Web3Provider::Ws(p) => p.request(method, params).await, Web3Provider::Http(p) | Web3Provider::Both(p, _) => { // TODO: i keep hearing that http is faster. but ws has always been better for me. investigate more with actual benchmarks From a69737db982edfc89215b7406c6426b743f45b41 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 22 Mar 2023 14:23:14 -0700 Subject: [PATCH 20/25] serialize best_rpcs --- TODO.md | 2 ++ web3_proxy/src/rpcs/blockchain.rs | 2 +- web3_proxy/src/rpcs/consensus.rs | 6 +++--- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/TODO.md b/TODO.md index 4336e32f..468ed917 100644 --- a/TODO.md +++ b/TODO.md @@ -565,6 +565,8 @@ These are not ordered. I think some rows also accidently got deleted here. Check - look at average request time for getBlock? i'm not sure how good a proxy that will be for serving eth_call, but its a start - https://crates.io/crates/histogram-sampler - [ ] interval for http subscriptions should be based on block time. load from config is easy, but better to query. currently hard coded to 13 seconds +- [ ] check code to keep us from going backwards. maybe that is causing outages +- [ ] min_backup_rpcs seperate from min_synced_rpcs in another repo: event subscriber - [ ] watch for transfer events to our contract and submit them to /payment/$tx_hash diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index b90b9958..543513e6 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -517,7 +517,7 @@ impl Web3Rpcs { // this is unlikely but possible // TODO: better log warn!( - "chain rolled back t{}/{} {}{}/{}/{} con={} old={} rpc={}@{}", + "chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}", consensus_tier, total_tiers, backups_voted_str, diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 943dd051..a6f94523 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -7,7 +7,7 @@ use anyhow::Context; use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; use itertools::{Itertools, MinMaxResult}; -use log::{trace, warn}; +use log::{trace, warn, debug}; use moka::future::Cache; use serde::Serialize; use std::cmp::Reverse; @@ -21,8 +21,6 @@ use tokio::time::Instant; pub struct ConsensusWeb3Rpcs { pub(super) tier: u64, pub(super) head_block: Web3ProxyBlock, - // TODO: this should be able to serialize, but it isn't - #[serde(skip_serializing)] pub(super) best_rpcs: Vec>, // TODO: functions like "compare_backup_vote()" // pub(super) backups_voted: Option, @@ -323,6 +321,8 @@ impl ConsensusFinder { continue; } + trace!("rpc_names: {:#?}", rpc_names); + // consensus likely found! load the rpcs to make sure they all have active connections let consensus_rpcs: Vec<_> = rpc_names.into_iter().filter_map(|x| web3_rpcs.get(x)).collect(); From c0afc295ca62dd1b908bd618654c3acd49d92f66 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 22 Mar 2023 15:18:54 -0700 Subject: [PATCH 21/25] add hostname to status --- Cargo.lock | 1 + web3_proxy/Cargo.toml | 1 + web3_proxy/src/app/mod.rs | 5 +++++ web3_proxy/src/frontend/status.rs | 2 ++ 4 files changed, 9 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 47350e46..8bd8065e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5814,6 +5814,7 @@ dependencies = [ "handlebars", "hashbrown 0.13.2", "hdrhistogram", + "hostname", "http", "ipnet", "itertools", diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 5e0d54bc..a3135764 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -45,6 +45,7 @@ handlebars = "4.3.6" hashbrown = { version = "0.13.2", features = ["serde"] } hdrhistogram = "7.5.2" http = "0.2.9" +hostname = "0.3.1" ipnet = "2.7.1" itertools = "0.10.5" log = "0.4.17" diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index aa4677e7..d945fa2c 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -44,6 +44,7 @@ use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRa use serde::Serialize; use serde_json::json; use serde_json::value::to_raw_value; +use std::ffi::OsString; use std::fmt; use std::hash::{Hash, Hasher}; use std::net::IpAddr; @@ -213,6 +214,7 @@ pub struct Web3ProxyApp { pub config: AppConfig, pub db_conn: Option, pub db_replica: Option, + pub hostname: Option, /// store pending transactions that we've seen so that we don't send duplicates to subscribers pub pending_transactions: Cache, pub frontend_ip_rate_limiter: Option>, @@ -694,6 +696,8 @@ impl Web3ProxyApp { Some(private_rpcs) }; + let hostname = hostname::get().ok(); + let app = Self { config: top_config.app.clone(), balanced_rpcs, @@ -709,6 +713,7 @@ impl Web3ProxyApp { login_rate_limiter, db_conn, db_replica, + hostname, vredis_pool, rpc_secret_key_cache, bearer_token_semaphores, diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index 53cf5824..f678e34f 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -40,11 +40,13 @@ pub async fn status( let body = response_cache .get_with(FrontendResponseCaches::Status, async { // TODO: what else should we include? uptime, cache hit rates, cpu load, memory used + // TODO: the hostname is probably not going to change. only get once at the start? let body = json!({ "version": APP_USER_AGENT, "chain_id": app.config.chain_id, "balanced_rpcs": app.balanced_rpcs, "private_rpcs": app.private_rpcs, + "hostname": app.hostname, }); Arc::new(body) From b4ca0fb62dc63db5381588d0f65fa1d643b09a78 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 22 Mar 2023 15:19:09 -0700 Subject: [PATCH 22/25] better log when servers skipped --- web3_proxy/src/rpcs/many.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 98a58391..1df26486 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -506,7 +506,7 @@ impl Web3Rpcs { // TODO: double check the logic on this. especially if only min is set let needed_blocks_comparison = match (min_block_needed, max_block_needed) { (None, None) => { - // no required block given. treat this like the requested the consensus head block + // no required block given. treat this like they requested the consensus head block cmp::Ordering::Equal } (None, Some(max_block_needed)) => max_block_needed.cmp(head_block_num), @@ -1013,28 +1013,28 @@ impl Web3Rpcs { let num_conns = self.by_name.read().len(); let num_skipped = skip_rpcs.len(); + let consensus = watch_consensus_connections.borrow(); + + let head_block_num = consensus.as_ref().map(|x| x.head_block.number()); + if num_skipped == 0 { - let consensus = watch_consensus_connections.borrow(); - - let head_block_num = consensus.as_ref().map(|x| x.head_block.number()); - error!( "No servers synced ({:?}-{:?}, {:?}) ({} known). None skipped", min_block_needed, max_block_needed, head_block_num, num_conns ); - drop(consensus); - // TODO: remove this, or move to trace level // debug!("{}", serde_json::to_string(&request).unwrap()); } else { - // TODO: warn? debug? trace? - warn!( - "Requested data was not available on {}/{} servers", - num_skipped, num_conns + // TODO: error? warn? debug? trace? + error!( + "Requested data is not available ({:?}-{:?}, {:?}) ({} skipped, {} known)", + min_block_needed, max_block_needed, head_block_num, num_skipped, num_conns ); } + drop(consensus); + // TODO: what error code? // cloudflare gives {"jsonrpc":"2.0","error":{"code":-32043,"message":"Requested data cannot be older than 128 blocks."},"id":1} Ok(JsonRpcForwardedResponse::from_str( From 86b4f39a75ff38816bdba68b819c6cff66863adb Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 22 Mar 2023 15:36:40 -0700 Subject: [PATCH 23/25] string hostname --- web3_proxy/src/app/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index d945fa2c..5ccdd5c6 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -44,7 +44,6 @@ use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRa use serde::Serialize; use serde_json::json; use serde_json::value::to_raw_value; -use std::ffi::OsString; use std::fmt; use std::hash::{Hash, Hasher}; use std::net::IpAddr; @@ -214,7 +213,7 @@ pub struct Web3ProxyApp { pub config: AppConfig, pub db_conn: Option, pub db_replica: Option, - pub hostname: Option, + pub hostname: Option, /// store pending transactions that we've seen so that we don't send duplicates to subscribers pub pending_transactions: Cache, pub frontend_ip_rate_limiter: Option>, @@ -696,7 +695,7 @@ impl Web3ProxyApp { Some(private_rpcs) }; - let hostname = hostname::get().ok(); + let hostname = hostname::get().ok().and_then(|x| x.to_str().map(|x| x.to_string())); let app = Self { config: top_config.app.clone(), From 7b223efa4db0641799f4f3fc742f8b0fb260d505 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 22 Mar 2023 16:16:15 -0700 Subject: [PATCH 24/25] improve reconnect logic --- web3_proxy/src/rpcs/blockchain.rs | 2 +- web3_proxy/src/rpcs/one.rs | 70 ++++++++++++++++++++++--------- 2 files changed, 52 insertions(+), 20 deletions(-) diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 543513e6..a51a9032 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -370,7 +370,7 @@ impl Web3Rpcs { ) .await { - warn!("unable to process block from rpc {}: {:#?}", rpc_name, err); + warn!("error while processing block from rpc {}: {:#?}", rpc_name, err); } } Err(err) => { diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 2ac4a765..bb42574f 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -11,6 +11,7 @@ use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; use ethers::types::{Address, Transaction, U256}; use futures::future::try_join_all; use futures::StreamExt; +use futures::stream::FuturesUnordered; use log::{debug, error, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; use ordered_float::OrderedFloat; @@ -701,8 +702,12 @@ impl Web3Rpc { RequestRevertHandler::ErrorLevel }; + // this does loop. just only when reconnect is enabled + #[allow(clippy::never_loop)] loop { - let mut futures = vec![]; + debug!("subscription loop started"); + + let mut futures = FuturesUnordered::new(); let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe()); @@ -735,6 +740,7 @@ impl Web3Rpc { // health check loop loop { + // TODO: do we need this to be abortable? if rpc.should_disconnect() { break; } @@ -839,6 +845,7 @@ impl Web3Rpc { } if let Some(block_sender) = &block_sender { + // TODO: do we need this to be abortable? let f = self.clone().subscribe_new_heads( authorization.clone(), http_interval_receiver, @@ -850,6 +857,7 @@ impl Web3Rpc { } if let Some(tx_id_sender) = &tx_id_sender { + // TODO: do we need this to be abortable? let f = self .clone() .subscribe_pending_transactions(authorization.clone(), tx_id_sender.clone()); @@ -857,32 +865,48 @@ impl Web3Rpc { futures.push(flatten_handle(tokio::spawn(f))); } - match try_join_all(futures).await { - Ok(_) => { - // futures all exited without error. break instead of restarting subscriptions - break; - } - Err(err) => { - if self.reconnect.load(atomic::Ordering::Acquire) { - warn!("{} connection ended. err={:?}", self, err); + while let Some(x) = futures.next().await { + match x { + Ok(_) => { + // future exited without error + // TODO: think about this more. we never set it to false. this can't be right + info!("future on {} exited successfully", self) + } + Err(err) => { + if self.reconnect.load(atomic::Ordering::Acquire) { + warn!("{} connection ended. reconnecting. err={:?}", self, err); - self.clone() - .retrying_connect( + let disconnect_sender = self.disconnect_watch.as_ref().unwrap(); + + // TODO: i'm not sure if this is necessary, but telling everything to disconnect seems like a better idea than relying on timeouts and dropped futures. + disconnect_sender.send_replace(true); + disconnect_sender.send_replace(false); + + // we call retrying_connect here with initial_delay=true. above, initial_delay=false + self.retrying_connect( block_sender.as_ref(), chain_id, authorization.db_conn.as_ref(), true, ) .await?; - } else if *disconnect_receiver.borrow() { - info!("{} is disconnecting", self); - break; - } else { - error!("{} subscription exited. err={:?}", self, err); - return Err(err); + + continue; + } + + // reconnect is not enabled. + if *disconnect_receiver.borrow() { + info!("{} is disconnecting", self); + break; + } else { + error!("{} subscription exited. err={:?}", self, err); + break; + } } } } + + break; } info!("all subscriptions on {} completed", self); @@ -1070,7 +1094,11 @@ impl Web3Rpc { self.send_head_block_result(Ok(None), &block_sender, block_map) .await?; - Ok(()) + if self.should_disconnect() { + Ok(()) + } else { + Err(anyhow!("new_heads subscription exited. reconnect needed")) + } } /// Turn on the firehose of pending transactions @@ -1124,7 +1152,11 @@ impl Web3Rpc { } } - Ok(()) + if self.should_disconnect() { + Ok(()) + } else { + Err(anyhow!("pending_transactions subscription exited. reconnect needed")) + } } /// be careful with this; it might wait forever! From 2205a472fd1ddb3081c4cac7dae170d80e4fc6ef Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 22 Mar 2023 18:43:13 -0700 Subject: [PATCH 25/25] add optional tokio-console and improve reconnect logic --- Cargo.lock | 381 +++++++++++++++++++++- web3_proxy/Cargo.toml | 5 +- web3_proxy/src/bin/web3_proxy_cli/main.rs | 51 +-- web3_proxy/src/jsonrpc.rs | 2 +- web3_proxy/src/rpcs/many.rs | 6 +- web3_proxy/src/rpcs/one.rs | 73 ++--- web3_proxy/src/rpcs/request.rs | 3 +- 7 files changed, 452 insertions(+), 69 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8bd8065e..f5592e65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -737,6 +737,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "cassowary" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df8670b8c7b9dae1793364eafadf7239c40d669904660c5960d74cfd80b46a53" + [[package]] name = "cc" version = "1.0.79" @@ -804,6 +810,15 @@ dependencies = [ "textwrap", ] +[[package]] +name = "clap_complete" +version = "3.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f7a2e0a962c45ce25afce14220bc24f9dade0a1787f185cecf96bfba7847cd8" +dependencies = [ + "clap", +] + [[package]] name = "clap_derive" version = "3.2.18" @@ -893,6 +908,34 @@ dependencies = [ "thiserror", ] +[[package]] +name = "color-eyre" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f1885697ee8a177096d42f158922251a41973117f6d8a234cee94b9509157b7" +dependencies = [ + "backtrace", + "color-spantrace", + "eyre", + "indenter", + "once_cell", + "owo-colors", + "tracing-error", + "url", +] + +[[package]] +name = "color-spantrace" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6eee477a4a8a72f4addd4de416eb56d54bc307b284d6601bafdee1f4ea462d1" +dependencies = [ + "once_cell", + "owo-colors", + "tracing-core", + "tracing-error", +] + [[package]] name = "combine" version = "4.6.6" @@ -943,6 +986,42 @@ dependencies = [ "windows-sys 0.42.0", ] +[[package]] +name = "console-api" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e57ff02e8ad8e06ab9731d5dc72dc23bef9200778eae1a89d555d8c42e5d4a86" +dependencies = [ + "prost", + "prost-types", + "tonic", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22a3a81dfaf6b66bce5d159eddae701e3a002f194d378cbf7be5f053c281d9be" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures", + "hdrhistogram", + "humantime", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-core", + "tracing-subscriber 0.3.16", +] + [[package]] name = "const-oid" version = "0.7.1" @@ -1068,6 +1147,32 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossterm" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0ebde6a9dd5e331cd6c6f48253254d117642c31653baa475e394657c59c1f7d" +dependencies = [ + "bitflags", + "crossterm_winapi", + "futures-core", + "libc", + "mio 0.7.14", + "parking_lot 0.11.2", + "signal-hook", + "signal-hook-mio", + "winapi", +] + +[[package]] +name = "crossterm_winapi" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a6966607622438301997d3dac0d2f6e9a90c68bb6bc1785ea98456ab93c0507" +dependencies = [ + "winapi", +] + [[package]] name = "crunchy" version = "0.2.2" @@ -1318,6 +1423,15 @@ dependencies = [ "subtle", ] +[[package]] +name = "dirs" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" +dependencies = [ + "dirs-sys", +] + [[package]] name = "dirs-next" version = "2.0.0" @@ -1328,6 +1442,17 @@ dependencies = [ "dirs-sys-next", ] +[[package]] +name = "dirs-sys" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + [[package]] name = "dirs-sys-next" version = "0.1.2" @@ -2435,6 +2560,18 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "iana-time-zone" version = "0.1.53" @@ -2828,6 +2965,19 @@ dependencies = [ "adler", ] +[[package]] +name = "mio" +version = "0.7.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc" +dependencies = [ + "libc", + "log", + "miow", + "ntapi", + "winapi", +] + [[package]] name = "mio" version = "0.8.6" @@ -2840,6 +2990,15 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "miow" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +dependencies = [ + "winapi", +] + [[package]] name = "moka" version = "0.10.0" @@ -2905,6 +3064,25 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9e591e719385e6ebaeb5ce5d3887f7d5676fceca6411d1925ccc95745f3d6f7" +[[package]] +name = "ntapi" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f" +dependencies = [ + "winapi", +] + +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num" version = "0.4.0" @@ -3137,6 +3315,18 @@ dependencies = [ "syn", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + +[[package]] +name = "owo-colors" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2386b4ebe91c2f7f51082d4cefa145d030e33a1842a96b12e4885cc3c01f7a55" + [[package]] name = "pagerduty-rs" version = "0.1.6" @@ -3589,6 +3779,38 @@ dependencies = [ "winapi", ] +[[package]] +name = "prost" +version = "0.11.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48e50df39172a3e7eb17e14642445da64996989bc212b583015435d39a58537" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.11.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ea9b0f8cbe5e15a8a042d030bd96668db28ecb567ec37d691971ff5731d2b1b" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.11.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "379119666929a1afd7a043aa6cf96fa67a6dce9af60c88095a4686dbce4c9c88" +dependencies = [ + "prost", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -4217,7 +4439,7 @@ dependencies = [ "regex", "sea-schema", "tracing", - "tracing-subscriber", + "tracing-subscriber 0.3.16", "url", ] @@ -4248,7 +4470,7 @@ dependencies = [ "sea-orm-cli", "sea-schema", "tracing", - "tracing-subscriber", + "tracing-subscriber 0.3.16", ] [[package]] @@ -4652,6 +4874,27 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "signal-hook" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "732768f1176d21d09e076c23a93123d40bba92d50c4058da34d45c8de8e682b9" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-mio" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af" +dependencies = [ + "libc", + "mio 0.7.14", + "signal-hook", +] + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -5186,7 +5429,7 @@ dependencies = [ "bytes", "libc", "memchr", - "mio", + "mio 0.8.6", "num_cpus", "parking_lot 0.12.1", "pin-project-lite", @@ -5197,6 +5440,45 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "tokio-console" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fce5f0a53cd350a00b3a37dcb85758eb3c333beeb334b40584f7747b1e01374e" +dependencies = [ + "atty", + "clap", + "clap_complete", + "color-eyre", + "console-api", + "crossterm", + "dirs", + "futures", + "h2", + "hdrhistogram", + "humantime", + "once_cell", + "prost-types", + "regex", + "serde", + "tokio", + "toml 0.5.11", + "tonic", + "tracing", + "tracing-subscriber 0.3.16", + "tui", +] + +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "1.8.2" @@ -5333,6 +5615,38 @@ dependencies = [ "toml_datetime 0.6.1", ] +[[package]] +name = "tonic" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.13.1", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "prost-derive", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + [[package]] name = "tower" version = "0.4.13" @@ -5341,9 +5655,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap", "pin-project", "pin-project-lite", + "rand", + "slab", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -5411,6 +5729,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-error" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4d7c0b83d4a500748fa5879461652b361edf5c9d51ede2a2ac03875ca185e24" +dependencies = [ + "tracing", + "tracing-subscriber 0.2.25", ] [[package]] @@ -5423,6 +5752,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "tracing-log" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e0d2eaa99c3c2e41547cfa109e910a68ea03823cccad4a0525dcbc9b01e8c71" +dependencies = [ + "sharded-slab", + "thread_local", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.16" @@ -5430,12 +5781,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70" dependencies = [ "matchers", + "nu-ansi-term", "once_cell", "regex", "sharded-slab", + "smallvec", "thread_local", "tracing", "tracing-core", + "tracing-log", ] [[package]] @@ -5450,6 +5804,19 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +[[package]] +name = "tui" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39c8ce4e27049eed97cfa363a5048b09d995e209994634a0efc26a14ab6c0c23" +dependencies = [ + "bitflags", + "cassowary", + "crossterm", + "unicode-segmentation", + "unicode-width", +] + [[package]] name = "tungstenite" version = "0.17.3" @@ -5646,6 +6013,12 @@ dependencies = [ "serde", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcpkg" version = "0.2.15" @@ -5798,6 +6171,7 @@ dependencies = [ "axum-client-ip", "axum-macros", "chrono", + "console-subscriber", "counter", "deferred-rate-limiter", "derive_more", @@ -5843,6 +6217,7 @@ dependencies = [ "thread-fast-rng", "time 0.3.20", "tokio", + "tokio-console", "tokio-stream", "toml 0.7.2", "tower", diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index a3135764..55631364 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -7,8 +7,9 @@ default-run = "web3_proxy_cli" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["deadlock_detection"] +default = ["deadlock_detection", "tokio-console"] deadlock_detection = ["parking_lot/deadlock_detection"] +tokio-console = ["dep:tokio-console", "dep:console-subscriber"] # TODO: turn tokio-console on with a feature. console-subscriber = { version = "0.1.7" } @@ -31,6 +32,7 @@ axum = { version = "0.6.10", features = ["headers", "ws"] } axum-client-ip = "0.4.0" axum-macros = "0.3.5" chrono = "0.4.23" +console-subscriber = { version = "*", optional = true } counter = "0.5.7" derive_more = "0.99.17" dotenv = "0.15.0" @@ -69,6 +71,7 @@ serde_prometheus = "0.2.1" siwe = "0.5.0" time = "0.3.20" tokio = { version = "1.26.0", features = ["full"] } +tokio-console = { version = "*", optional = true } tokio-stream = { version = "0.1.12", features = ["sync"] } toml = "0.7.2" tower = "0.4.13" diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index 14abe3cb..8dae057a 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -118,6 +118,10 @@ fn main() -> anyhow::Result<()> { // if RUST_LOG isn't set, configure a default // TODO: is there a better way to do this? + #[cfg(tokio_console)] + console_subscriber::init(); + + #[cfg(not(tokio_console))] let rust_log = match std::env::var("RUST_LOG") { Ok(x) => x, Err(_) => match std::env::var("WEB3_PROXY_TRACE").map(|x| x == "true") { @@ -190,35 +194,38 @@ fn main() -> anyhow::Result<()> { (None, None) }; - let logger = env_logger::builder().parse_filters(&rust_log).build(); + #[cfg(not(tokio_console))] + { + let logger = env_logger::builder().parse_filters(&rust_log).build(); - let max_level = logger.filter(); + let max_level = logger.filter(); - // connect to sentry for error reporting - // if no sentry, only log to stdout - let _sentry_guard = if let Some(sentry_url) = cli_config.sentry_url.clone() { - let logger = sentry::integrations::log::SentryLogger::with_dest(logger); + // connect to sentry for error reporting + // if no sentry, only log to stdout + let _sentry_guard = if let Some(sentry_url) = cli_config.sentry_url.clone() { + let logger = sentry::integrations::log::SentryLogger::with_dest(logger); - log::set_boxed_logger(Box::new(logger)).unwrap(); + log::set_boxed_logger(Box::new(logger)).unwrap(); - let guard = sentry::init(( - sentry_url, - sentry::ClientOptions { - release: sentry::release_name!(), - // TODO: Set this a to lower value (from config) in production - traces_sample_rate: 1.0, - ..Default::default() - }, - )); + let guard = sentry::init(( + sentry_url, + sentry::ClientOptions { + release: sentry::release_name!(), + // TODO: Set this a to lower value (from config) in production + traces_sample_rate: 1.0, + ..Default::default() + }, + )); - Some(guard) - } else { - log::set_boxed_logger(Box::new(logger)).unwrap(); + Some(guard) + } else { + log::set_boxed_logger(Box::new(logger)).unwrap(); - None - }; + None + }; - log::set_max_level(max_level); + log::set_max_level(max_level); + } info!("{}", APP_USER_AGENT); diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index 7a601c20..209d54b7 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -279,7 +279,7 @@ impl JsonRpcForwardedResponse { } } } else { - unimplemented!(); + return Err(anyhow::anyhow!("unexpected ethers error!")); } } } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 1df26486..ee719932 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -881,7 +881,7 @@ impl Web3Rpcs { request.id.clone(), ) { Ok(response) => { - if let Some(error) = &response.error.as_ref() { + if let Some(error) = response.error.as_ref() { // trace!(?response, "rpc error"); if let Some(request_metadata) = request_metadata { @@ -956,8 +956,10 @@ impl Web3Rpcs { // TODO: emit a stat. if a server is getting skipped a lot, something is not right + // TODO: if we get a TrySendError, reconnect. wait why do we see a trysenderror on a dual provider? shouldn't it be using reqwest + debug!( - "Backend server error on {}! Retrying on another. err={:?}", + "Backend server error on {}! Retrying on another. err={:#?}", rpc, err ); diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index bb42574f..8b0b4394 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -9,8 +9,8 @@ use crate::rpcs::request::RequestRevertHandler; use anyhow::{anyhow, Context}; use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; use ethers::types::{Address, Transaction, U256}; -use futures::future::try_join_all; use futures::StreamExt; +use futures::future::try_join_all; use futures::stream::FuturesUnordered; use log::{debug, error, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; @@ -701,13 +701,15 @@ impl Web3Rpc { } else { RequestRevertHandler::ErrorLevel }; + + let mut delay_start = false; // this does loop. just only when reconnect is enabled #[allow(clippy::never_loop)] loop { debug!("subscription loop started"); - let mut futures = FuturesUnordered::new(); + let mut futures = vec![]; let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe()); @@ -723,7 +725,7 @@ impl Web3Rpc { block_sender.as_ref(), chain_id, authorization.db_conn.as_ref(), - false, + delay_start, ) .await?; @@ -865,48 +867,41 @@ impl Web3Rpc { futures.push(flatten_handle(tokio::spawn(f))); } - while let Some(x) = futures.next().await { - match x { - Ok(_) => { - // future exited without error - // TODO: think about this more. we never set it to false. this can't be right - info!("future on {} exited successfully", self) + match try_join_all(futures).await { + Ok(_) => { + // future exited without error + // TODO: think about this more. we never set it to false. this can't be right + break; + } + Err(err) => { + let disconnect_sender = self.disconnect_watch.as_ref().unwrap(); + + if self.reconnect.load(atomic::Ordering::Acquire) { + warn!("{} connection ended. reconnecting. err={:?}", self, err); + + // TODO: i'm not sure if this is necessary, but telling everything to disconnect seems like a better idea than relying on timeouts and dropped futures. + disconnect_sender.send_replace(true); + disconnect_sender.send_replace(false); + + // we call retrying_connect here with initial_delay=true. above, initial_delay=false + delay_start = true; + + continue; } - Err(err) => { - if self.reconnect.load(atomic::Ordering::Acquire) { - warn!("{} connection ended. reconnecting. err={:?}", self, err); + + // reconnect is not enabled. + if *disconnect_receiver.borrow() { + info!("{} is disconnecting", self); + break; + } else { + error!("{} subscription exited. err={:?}", self, err); - let disconnect_sender = self.disconnect_watch.as_ref().unwrap(); + disconnect_sender.send_replace(true); - // TODO: i'm not sure if this is necessary, but telling everything to disconnect seems like a better idea than relying on timeouts and dropped futures. - disconnect_sender.send_replace(true); - disconnect_sender.send_replace(false); - - // we call retrying_connect here with initial_delay=true. above, initial_delay=false - self.retrying_connect( - block_sender.as_ref(), - chain_id, - authorization.db_conn.as_ref(), - true, - ) - .await?; - - continue; - } - - // reconnect is not enabled. - if *disconnect_receiver.borrow() { - info!("{} is disconnecting", self); - break; - } else { - error!("{} subscription exited. err={:?}", self, err); - break; - } + break; } } } - - break; } info!("all subscriptions on {} completed", self); diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 440f90ba..cadac247 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -164,8 +164,9 @@ impl OpenRequestHandle { }; let mut logged = false; - while provider.is_none() { + while provider.is_none() || provider.as_ref().map(|x| !x.ready()).unwrap() { // trace!("waiting on provider: locking..."); + // TODO: i dont like this. subscribing to a channel could be better sleep(Duration::from_millis(100)).await; if !logged {