diff --git a/Cargo.lock b/Cargo.lock index 824722b3..e62cea88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3148,7 +3148,7 @@ dependencies = [ [[package]] name = "migration" -version = "0.17.0" +version = "0.19.0" dependencies = [ "sea-orm-migration", "tokio", diff --git a/migration/Cargo.toml b/migration/Cargo.toml index a501865a..22d5c752 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "migration" -version = "0.17.0" +version = "0.19.0" edition = "2021" publish = false diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 8edbb93d..b27ccb40 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -209,6 +209,8 @@ impl DatabaseReplica { pub struct Web3ProxyApp { /// Send requests to the best server available pub balanced_rpcs: Arc, + /// Send 4337 Abstraction Bundler requests to one of these servers + pub bundler_4337_rpcs: Option>, pub http_client: Option, /// application config /// TODO: this will need a large refactor to handle reloads while running. maybe use a watch::Receiver? @@ -764,6 +766,34 @@ impl Web3ProxyApp { Some(private_rpcs) }; + // prepare a Web3Rpcs to hold all our 4337 Abstraction Bundler connections + // only some chains have this, so this is optional + let bundler_4337_rpcs = if top_config.bundler_4337_rpcs.is_none() { + warn!("No bundler_4337_rpcs configured"); + None + } else { + // TODO: do something with the spawn handle + let (bundler_4337_rpcs, bundler_4337_rpcs_handle, _) = Web3Rpcs::spawn( + top_config.app.chain_id, + db_conn.clone(), + http_client.clone(), + // bundler_4337_rpcs don't get subscriptions, so no need for max_block_age or max_block_lag + None, + None, + 0, + 0, + pending_transactions.clone(), + None, + None, + ) + .await + .context("spawning bundler_4337_rpcs")?; + + app_handles.push(bundler_4337_rpcs_handle); + + Some(bundler_4337_rpcs) + }; + let hostname = hostname::get() .ok() .and_then(|x| x.to_str().map(|x| x.to_string())); @@ -771,6 +801,7 @@ impl Web3ProxyApp { let app = Self { config: top_config.app.clone(), balanced_rpcs, + bundler_4337_rpcs, http_client, kafka_producer, private_rpcs, @@ -850,19 +881,33 @@ impl Web3ProxyApp { // connect to the backends self.balanced_rpcs .apply_server_configs(self, new_top_config.balanced_rpcs) - .await?; + .await + .context("updating balanced rpcs")?; if let Some(private_rpc_configs) = new_top_config.private_rpcs { if let Some(private_rpcs) = self.private_rpcs.as_ref() { private_rpcs .apply_server_configs(self, private_rpc_configs) - .await?; + .await + .context("updating private_rpcs")?; } else { // TODO: maybe we should have private_rpcs just be empty instead of being None todo!("handle toggling private_rpcs") } } + if let Some(bundler_4337_rpc_configs) = new_top_config.bundler_4337_rpcs { + if let Some(bundler_4337_rpcs) = self.bundler_4337_rpcs.as_ref() { + bundler_4337_rpcs + .apply_server_configs(self, bundler_4337_rpc_configs) + .await + .context("updating bundler_4337_rpcs")?; + } else { + // TODO: maybe we should have bundler_4337_rpcs just be empty instead of being None + todo!("handle toggling bundler_4337_rpcs") + } + } + Ok(()) } @@ -1158,6 +1203,7 @@ impl Web3ProxyApp { } } + // TODO: more robust stats and kafka logic! if we use the try operator, they aren't saved! async fn proxy_cached_request( self: &Arc, authorization: &Arc, @@ -1335,6 +1381,59 @@ impl Web3ProxyApp { vec![], )); } + method @ ("debug_bundler_sendBundleNow" + | "debug_bundler_clearState" + | "debug_bundler_dumpMempool") => { + return Ok(( + JsonRpcForwardedResponse::from_string( + // TODO: we should probably have some escaping on this. but maybe serde will protect us enough + format!("method unsupported: {}", method), + None, + Some(request_id), + ), + vec![], + )); + } + method @ ("eth_sendUserOperation" + | "eth_estimateUserOperationGas" + | "eth_getUserOperationByHash" + | "eth_getUserOperationReceipt" + | "eth_supportedEntryPoints") => match self.bundler_4337_rpcs.as_ref() { + Some(bundler_4337_rpcs) => { + let response = bundler_4337_rpcs + .try_proxy_connection( + authorization, + request, + Some(&request_metadata), + None, + None, + ) + .await?; + + // TODO: DRY + let rpcs = request_metadata.backend_requests.lock().clone(); + + if let Some(stat_sender) = self.stat_sender.as_ref() { + let response_stat = RpcQueryStats::new( + Some(method.to_string()), + authorization.clone(), + request_metadata, + response.num_bytes(), + ); + + stat_sender + .send_async(response_stat.into()) + .await + .map_err(Web3ProxyError::SendAppStatError)?; + } + + return Ok((response, rpcs)); + } + None => { + // TODO: stats! + return Err(anyhow::anyhow!("no bundler_4337_rpcs available").into()); + } + }, // some commands can use local data or caches "eth_accounts" => { // no stats on this. its cheap @@ -1379,6 +1478,8 @@ impl Web3ProxyApp { // i think this is always an error response let rpcs = request_metadata.backend_requests.lock().clone(); + // TODO! save stats + return Ok((response, rpcs)); }; @@ -1574,7 +1675,6 @@ impl Web3ProxyApp { serde_json::Value::String(APP_USER_AGENT.to_string()) } "web3_sha3" => { - // emit stats // returns Keccak-256 (not the standardized SHA3-256) of the given data. match &request.params { Some(serde_json::Value::Array(params)) => { @@ -1641,8 +1741,6 @@ impl Web3ProxyApp { return Err(Web3ProxyError::AccessDenied); } - // emit stats - // TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server let head_block_num = head_block_num .or(self.balanced_rpcs.head_block_num()) diff --git a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs index 312625a3..821f90c4 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs @@ -401,6 +401,7 @@ mod tests { ), ]), private_rpcs: None, + bundler_4337_rpcs: None, extra: Default::default(), }; diff --git a/web3_proxy/src/bin/web3_proxy_cli/rpc_accounting.rs b/web3_proxy/src/bin/web3_proxy_cli/rpc_accounting.rs index 653ecb02..6b73238b 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/rpc_accounting.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/rpc_accounting.rs @@ -42,12 +42,12 @@ impl RpcAccountingSubCommand { #[derive(Serialize, FromQueryResult)] struct SelectResult { total_frontend_requests: Decimal, - // pub total_backend_retries: Decimal, - // pub total_cache_misses: Decimal, + total_backend_retries: Decimal, + // total_cache_misses: Decimal, total_cache_hits: Decimal, total_response_bytes: Decimal, total_error_responses: Decimal, - // pub total_response_millis: Decimal, + total_response_millis: Decimal, first_period_datetime: DateTimeUtc, last_period_datetime: DateTimeUtc, } @@ -58,10 +58,10 @@ impl RpcAccountingSubCommand { rpc_accounting::Column::FrontendRequests.sum(), "total_frontend_requests", ) - // .column_as( - // rpc_accounting::Column::BackendRequests.sum(), - // "total_backend_retries", - // ) + .column_as( + rpc_accounting::Column::BackendRequests.sum(), + "total_backend_retries", + ) // .column_as( // rpc_accounting::Column::CacheMisses.sum(), // "total_cache_misses", @@ -76,10 +76,10 @@ impl RpcAccountingSubCommand { rpc_accounting::Column::ErrorResponse.sum(), "total_error_responses", ) - // .column_as( - // rpc_accounting::Column::SumResponseMillis.sum(), - // "total_response_millis", - // ) + .column_as( + rpc_accounting::Column::SumResponseMillis.sum(), + "total_response_millis", + ) .column_as( rpc_accounting::Column::PeriodDatetime.min(), "first_period_datetime", @@ -131,25 +131,42 @@ impl RpcAccountingSubCommand { q = q.filter(condition); - // TODO: make this work without into_json. i think we need to make a struct - let query_response = q + let stats = q .into_model::() .one(db_conn) .await? .context("no query result")?; - info!( - "query_response for chain {:?}: {:#}", - self.chain_id, - json!(query_response) - ); + if let Some(chain_id) = self.chain_id { + info!("stats for chain {}", chain_id); + } else { + info!("stats for all chains"); + } - // let query_seconds: Decimal = query_response - // .last_period_datetime - // .signed_duration_since(query_response.first_period_datetime) - // .num_seconds() - // .into(); - // info!("query seconds: {}", query_seconds); + info!("stats: {:#}", json!(&stats)); + + let query_seconds: Decimal = stats + .last_period_datetime + .signed_duration_since(stats.first_period_datetime) + .num_seconds() + .into(); + dbg!(query_seconds); + + let avg_request_per_second = (stats.total_frontend_requests / query_seconds).round_dp(2); + dbg!(avg_request_per_second); + + let cache_hit_rate = (stats.total_cache_hits / stats.total_frontend_requests + * Decimal::from(100)) + .round_dp(2); + dbg!(cache_hit_rate); + + let avg_response_millis = + (stats.total_response_millis / stats.total_frontend_requests).round_dp(3); + dbg!(avg_response_millis); + + let avg_response_bytes = + (stats.total_response_bytes / stats.total_frontend_requests).round(); + dbg!(avg_response_bytes); Ok(()) } diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 1ba1b298..1606e03d 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -42,8 +42,8 @@ pub struct CliConfig { pub struct TopConfig { pub app: AppConfig, pub balanced_rpcs: HashMap, - // TODO: instead of an option, give it a default pub private_rpcs: Option>, + pub bundler_4337_rpcs: Option>, /// unknown config options get put here #[serde(flatten, default = "HashMap::default")] pub extra: HashMap, diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 74dd29aa..88388efa 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -164,6 +164,7 @@ pub async fn serve( // .route("/health", get(status::health)) .route("/status", get(status::status)) + .route("/status/backups_needed", get(status::backups_needed)) // // User stuff // diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index b64d5fcb..970ad551 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -27,6 +27,30 @@ pub async fn health( } } +/// Easy alerting if backup servers are in use. +pub async fn backups_needed(Extension(app): Extension>) -> impl IntoResponse { + let code = { + let consensus_rpcs = app.balanced_rpcs.watch_consensus_rpcs_sender.borrow(); + + if let Some(consensus_rpcs) = consensus_rpcs.as_ref() { + if consensus_rpcs.backups_needed { + StatusCode::INTERNAL_SERVER_ERROR + } else { + StatusCode::OK + } + } else { + // if no consensus, we still "need backups". we just don't have any. which is worse + StatusCode::INTERNAL_SERVER_ERROR + } + }; + + if matches!(code, StatusCode::OK) { + (code, "no backups needed. :)") + } else { + (code, "backups needed! :(") + } +} + /// Very basic status page. /// /// TODO: replace this with proper stats and monitoring diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 9ea267d9..a352c7f9 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -18,12 +18,12 @@ use tokio::time::Instant; /// Serialize is so we can print it on our debug endpoint #[derive(Clone, Serialize)] pub struct ConsensusWeb3Rpcs { - pub(super) tier: u64, - pub(super) head_block: Web3ProxyBlock, - pub(super) best_rpcs: Vec>, + pub(crate) tier: u64, + pub(crate) head_block: Web3ProxyBlock, + pub(crate) best_rpcs: Vec>, // TODO: functions like "compare_backup_vote()" // pub(super) backups_voted: Option, - pub(super) backups_needed: bool, + pub(crate) backups_needed: bool, } impl ConsensusWeb3Rpcs { diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 5415d787..dbead8a8 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -53,7 +53,7 @@ pub struct Web3Rpcs { /// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed /// TODO: why is watch_consensus_head_sender in an Option, but this one isn't? /// Geth's subscriptions have the same potential for skipping blocks. - pub(super) watch_consensus_rpcs_sender: watch::Sender>>, + pub(crate) watch_consensus_rpcs_sender: watch::Sender>>, /// this head receiver makes it easy to wait until there is a new block pub(super) watch_consensus_head_sender: Option>>, pub(super) pending_transaction_cache: @@ -102,6 +102,8 @@ impl Web3Rpcs { let expected_block_time_ms = match chain_id { // ethereum 1 => 12_000, + // ethereum-goerli + 5 => 12_000, // polygon 137 => 2_000, // fantom @@ -110,7 +112,10 @@ impl Web3Rpcs { 42161 => 500, // anything else _ => { - warn!("unexpected chain_id. polling every {} seconds", 10); + warn!( + "unexpected chain_id ({}). polling every {} seconds", + chain_id, 10 + ); 10_000 } }; @@ -214,11 +219,14 @@ impl Web3Rpcs { ) -> anyhow::Result<()> { // safety checks if rpc_configs.len() < app.config.min_synced_rpcs { - return Err(anyhow::anyhow!( + // TODO: don't count disabled servers! + // TODO: include if this is balanced, private, or 4337 + warn!( "Only {}/{} rpcs! Add more rpcs or reduce min_synced_rpcs.", rpc_configs.len(), app.config.min_synced_rpcs - )); + ); + return Ok(()); } // safety check on sum soft limit @@ -502,141 +510,161 @@ impl Web3Rpcs { max_block_needed: Option<&U64>, ) -> Web3ProxyResult { let usable_rpcs_by_tier_and_head_number: BTreeMap<(u64, Option), Vec>> = { - let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone(); + if self.watch_consensus_head_sender.is_none() { + // pick any server + let mut m = BTreeMap::new(); - if synced_connections.is_none() { - return Ok(OpenRequestResult::NotReady); - } - let synced_connections = - synced_connections.expect("synced_connections can't be None here"); + let key = (0, None); - let head_block_num = synced_connections.head_block.number(); - let head_block_age = synced_connections.head_block.age(); - - // TODO: double check the logic on this. especially if only min is set - let needed_blocks_comparison = match (min_block_needed, max_block_needed) { - (None, None) => { - // no required block given. treat this like they requested the consensus head block - cmp::Ordering::Equal - } - (None, Some(max_block_needed)) => max_block_needed.cmp(head_block_num), - (Some(min_block_needed), None) => min_block_needed.cmp(head_block_num), - (Some(min_block_needed), Some(max_block_needed)) => { - match min_block_needed.cmp(max_block_needed) { - cmp::Ordering::Less | cmp::Ordering::Equal => { - min_block_needed.cmp(head_block_num) - } - cmp::Ordering::Greater => { - // TODO: force a debug log of the original request to see if our logic is wrong? - // TODO: attach the rpc_key_id so we can find the user to ask if they need help - return Err(Web3ProxyError::InvalidBlockBounds { - min: min_block_needed.as_u64(), - max: max_block_needed.as_u64(), - }); - } + for x in self.by_name.read().values() { + if skip.contains(x) { + trace!("skipping: {}", x); + continue; } + trace!("not skipped!"); + + m.entry(key).or_insert_with(Vec::new).push(x.clone()); } - }; - trace!("needed_blocks_comparison: {:?}", needed_blocks_comparison); + m + } else { + let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone(); - // collect "usable_rpcs_by_head_num_and_weight" - // TODO: MAKE SURE None SORTS LAST? - let mut m = BTreeMap::new(); - - match needed_blocks_comparison { - cmp::Ordering::Less => { - // need an old block. check all the rpcs. ignore rpcs that are still syncing - trace!("old block needed"); - - let min_block_age = - self.max_block_age.map(|x| head_block_age.saturating_sub(x)); - let min_sync_num = self.max_block_lag.map(|x| head_block_num.saturating_sub(x)); - - // TODO: cache this somehow? - // TODO: maybe have a helper on synced_connections? that way sum_soft_limits/min_synced_rpcs will be DRY - for x in self - .by_name - .read() - .values() - .filter(|x| { - // TODO: move a bunch of this onto a rpc.is_synced function - #[allow(clippy::if_same_then_else)] - if skip.contains(x) { - // we've already tried this server or have some other reason to skip it - false - } else if max_block_needed - .map(|max_block_needed| !x.has_block_data(max_block_needed)) - .unwrap_or(false) - { - // server does not have the max block - trace!( - "{} does not have the max block ({:?})", - x, - max_block_needed - ); - false - } else { - !min_block_needed - .map(|min_block_needed| !x.has_block_data(min_block_needed)) - .unwrap_or(false) - } - }) - .cloned() - { - let x_head_block = x.head_block.read().clone(); - - if let Some(x_head) = x_head_block { - // TODO: should nodes that are ahead of the consensus block have priority? seems better to spread the load - let x_head_num = x_head.number().min(head_block_num); - - // TODO: do we really need to check head_num and age? - if let Some(min_sync_num) = min_sync_num.as_ref() { - if x_head_num < min_sync_num { - trace!("rpc is still syncing"); - continue; - } - } - if let Some(min_block_age) = min_block_age { - if x_head.age() > min_block_age { - // rpc is still syncing - trace!("server's block is too old"); - continue; - } - } - - let key = (x.tier, Some(*x_head_num)); - - m.entry(key).or_insert_with(Vec::new).push(x); - } - } - - // TODO: check min_synced_rpcs and min_sum_soft_limits? or maybe better to just try to serve the request? - } - cmp::Ordering::Equal => { - // using the consensus head block. filter the synced rpcs - - // the key doesn't matter if we are checking synced connections - // they are all at the same block and it is already sized to what we need - let key = (0, None); - - for x in synced_connections.best_rpcs.iter() { - if skip.contains(x) { - trace!("skipping: {}", x); - continue; - } - trace!("not skipped!"); - - m.entry(key).or_insert_with(Vec::new).push(x.clone()); - } - } - cmp::Ordering::Greater => { - // TODO? if the blocks is close, maybe we could wait for change on a watch_consensus_connections_receiver().subscribe() + if synced_connections.is_none() { return Ok(OpenRequestResult::NotReady); } - } + let synced_connections = + synced_connections.expect("synced_connections can't be None here"); - m + let head_block_num = synced_connections.head_block.number(); + let head_block_age = synced_connections.head_block.age(); + + // TODO: double check the logic on this. especially if only min is set + let needed_blocks_comparison = match (min_block_needed, max_block_needed) { + (None, None) => { + // no required block given. treat this like they requested the consensus head block + cmp::Ordering::Equal + } + (None, Some(max_block_needed)) => max_block_needed.cmp(head_block_num), + (Some(min_block_needed), None) => min_block_needed.cmp(head_block_num), + (Some(min_block_needed), Some(max_block_needed)) => { + match min_block_needed.cmp(max_block_needed) { + cmp::Ordering::Less | cmp::Ordering::Equal => { + min_block_needed.cmp(head_block_num) + } + cmp::Ordering::Greater => { + // TODO: force a debug log of the original request to see if our logic is wrong? + // TODO: attach the rpc_key_id so we can find the user to ask if they need help + return Err(Web3ProxyError::InvalidBlockBounds { + min: min_block_needed.as_u64(), + max: max_block_needed.as_u64(), + }); + } + } + } + }; + + trace!("needed_blocks_comparison: {:?}", needed_blocks_comparison); + + // collect "usable_rpcs_by_head_num_and_weight" + // TODO: MAKE SURE None SORTS LAST? + let mut m = BTreeMap::new(); + + match needed_blocks_comparison { + cmp::Ordering::Less => { + // need an old block. check all the rpcs. ignore rpcs that are still syncing + trace!("old block needed"); + + let min_block_age = + self.max_block_age.map(|x| head_block_age.saturating_sub(x)); + let min_sync_num = + self.max_block_lag.map(|x| head_block_num.saturating_sub(x)); + + // TODO: cache this somehow? + // TODO: maybe have a helper on synced_connections? that way sum_soft_limits/min_synced_rpcs will be DRY + for x in self + .by_name + .read() + .values() + .filter(|x| { + // TODO: move a bunch of this onto a rpc.is_synced function + #[allow(clippy::if_same_then_else)] + if skip.contains(x) { + // we've already tried this server or have some other reason to skip it + false + } else if max_block_needed + .map(|max_block_needed| !x.has_block_data(max_block_needed)) + .unwrap_or(false) + { + // server does not have the max block + trace!( + "{} does not have the max block ({:?})", + x, + max_block_needed + ); + false + } else { + !min_block_needed + .map(|min_block_needed| !x.has_block_data(min_block_needed)) + .unwrap_or(false) + } + }) + .cloned() + { + let x_head_block = x.head_block.read().clone(); + + if let Some(x_head) = x_head_block { + // TODO: should nodes that are ahead of the consensus block have priority? seems better to spread the load + let x_head_num = x_head.number().min(head_block_num); + + // TODO: do we really need to check head_num and age? + if let Some(min_sync_num) = min_sync_num.as_ref() { + if x_head_num < min_sync_num { + trace!("rpc is still syncing"); + continue; + } + } + if let Some(min_block_age) = min_block_age { + if x_head.age() > min_block_age { + // rpc is still syncing + trace!("server's block is too old"); + continue; + } + } + + let key = (x.tier, Some(*x_head_num)); + + m.entry(key).or_insert_with(Vec::new).push(x); + } + } + + // TODO: check min_synced_rpcs and min_sum_soft_limits? or maybe better to just try to serve the request? + } + cmp::Ordering::Equal => { + // using the consensus head block. filter the synced rpcs + + // the key doesn't matter if we are checking synced connections + // they are all at the same block and it is already sized to what we need + let key = (0, None); + + for x in synced_connections.best_rpcs.iter() { + if skip.contains(x) { + trace!("skipping: {}", x); + continue; + } + trace!("not skipped!"); + + m.entry(key).or_insert_with(Vec::new).push(x.clone()); + } + } + cmp::Ordering::Greater => { + // TODO? if the blocks is close, maybe we could wait for change on a watch_consensus_connections_receiver().subscribe() + return Ok(OpenRequestResult::NotReady); + } + } + + m + } }; trace!( diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index e04d0a83..1b7246e2 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -187,7 +187,6 @@ impl Web3Rpc { }; let tx_id_sender = if config.subscribe_txs { - // TODO: warn if tx_id_sender is None? tx_id_sender } else { None @@ -557,7 +556,8 @@ impl Web3Rpc { // trace!("waiting on chain id for {}", self); let found_chain_id: Result = self .wait_for_request_handle(&authorization, None, unlocked_provider.clone()) - .await? + .await + .context(format!("waiting for request handle on {}", self))? .request( "eth_chainId", &json!(Option::None::<()>), @@ -580,18 +580,20 @@ impl Web3Rpc { } } Err(e) => { - return Err(anyhow::Error::from(e)); + return Err(anyhow::Error::from(e) + .context(format!("unable to parse eth_chainId from {}", self))); } } self.check_block_data_limit(&authorization, unlocked_provider.clone()) - .await?; + .await + .context(format!("unable to check_block_data_limit of {}", self))?; drop(unlocked_provider); info!("successfully connected to {}", self); } else if self.provider.read().await.is_none() { - return Err(anyhow!("failed waiting for client")); + return Err(anyhow!("failed waiting for client {}", self)); }; Ok(()) @@ -726,7 +728,7 @@ impl Web3Rpc { // this does loop. just only when reconnect is enabled #[allow(clippy::never_loop)] loop { - debug!("subscription loop started"); + trace!("subscription loop started on {}", self); let mut futures = vec![];