diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 1ec3bc2c..828eacb3 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -444,7 +444,7 @@ impl Web3Rpcs { let total_tiers = consensus_finder.worst_tier().unwrap_or(10); let backups_needed = new_synced_connections.backups_needed; let consensus_head_block = new_synced_connections.head_block.clone(); - let num_consensus_rpcs = new_synced_connections.num_conns(); + let num_consensus_rpcs = new_synced_connections.num_consensus_rpcs(); let num_active_rpcs = consensus_finder.len(); let total_rpcs = self.by_name.load().len(); diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index f2a4b576..31ffe385 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -7,7 +7,7 @@ use derive_more::Constructor; use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; use itertools::{Itertools, MinMaxResult}; -use log::{debug, info, trace, warn}; +use log::{trace, warn}; use moka::future::Cache; use serde::Serialize; use std::cmp::Reverse; @@ -111,10 +111,31 @@ pub struct ConsensusWeb3Rpcs { impl ConsensusWeb3Rpcs { #[inline] - pub fn num_conns(&self) -> usize { + pub fn num_consensus_rpcs(&self) -> usize { self.best_rpcs.len() } + pub fn best_block_num(&self, skip_rpcs: &[Arc]) -> Option<&U64> { + if self.best_rpcs.iter().all(|rpc| skip_rpcs.contains(rpc)) { + // all of the consensus rpcs are skipped + // iterate the other rpc tiers to find the next best block + let mut best_block = None; + for (next_ranking, next_rpcs) in self.other_rpcs.iter() { + if next_rpcs.iter().all(|rpc| skip_rpcs.contains(rpc)) { + // everything in this ranking is skipped + continue; + } + + best_block = best_block.max(next_ranking.head_num.as_ref()); + } + + best_block + } else { + // not all the best synced rpcs are skipped yet. use the best head block + Some(self.head_block.number()) + } + } + pub fn has_block_data(&self, rpc: &Web3Rpc, block_num: &U64) -> bool { self.rpc_data .get(rpc) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index e9183d86..35dd5d2e 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -37,7 +37,6 @@ use std::fmt; use std::sync::atomic::Ordering; use std::sync::Arc; use thread_fast_rng::rand::seq::SliceRandom; -use tokio; use tokio::sync::{broadcast, watch}; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; @@ -630,7 +629,7 @@ impl Web3Rpcs { match earliest_retry_at { None => { // none of the servers gave us a time to retry at - debug!("no servers on {:?} gave a retry time", self); + debug!("no servers on {:?} gave a retry time. {:?}", self, skip); // TODO: bring this back? need to think about how to do this with `allow_backups` // we could return an error here, but maybe waiting a second will fix the problem @@ -784,7 +783,7 @@ impl Web3Rpcs { let mut skip_rpcs = vec![]; let mut method_not_available_response = None; - let mut watch_consensus_connections = self.watch_consensus_rpcs_sender.subscribe(); + let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe(); let start = Instant::now(); @@ -949,8 +948,8 @@ impl Web3Rpcs { trace!("slept!"); skip_rpcs.pop(); } - _ = watch_consensus_connections.changed() => { - watch_consensus_connections.borrow_and_update(); + _ = watch_consensus_rpcs.changed() => { + watch_consensus_rpcs.borrow_and_update(); } } } @@ -961,14 +960,12 @@ impl Web3Rpcs { let waiting_for = min_block_needed.max(max_block_needed); - info!("waiting for {:?}", waiting_for); - - if watch_for_block(waiting_for, &mut watch_consensus_connections).await? { + if watch_for_block(waiting_for, &skip_rpcs, &mut watch_consensus_rpcs).await? { // block found! continue so we can check for another rpc } else { // rate limits are likely keeping us from serving the head block - watch_consensus_connections.changed().await?; - watch_consensus_connections.borrow_and_update(); + watch_consensus_rpcs.changed().await?; + watch_consensus_rpcs.borrow_and_update(); } } } @@ -994,7 +991,7 @@ impl Web3Rpcs { let needed = min_block_needed.max(max_block_needed); - let head_block_num = watch_consensus_connections + let head_block_num = watch_consensus_rpcs .borrow() .as_ref() .map(|x| *x.head_block.number()); @@ -1536,7 +1533,6 @@ mod tests { assert!(lagged_rpc.has_block_data(lagged_block.number.as_ref().unwrap())); assert!(!lagged_rpc.has_block_data(head_block.number.as_ref().unwrap())); - // todo!("this doesn't work anymore. send_head_block_result doesn't do anything when rpcs isn't watching the block_receiver") assert_eq!(rpcs.num_synced_rpcs(), 2); // add head block to the rpcs. lagged_rpc should not be available @@ -1917,22 +1913,24 @@ mod tests { /// returns `true` when the desired block number is available /// TODO: max wait time? max number of blocks to wait for? time is probably best async fn watch_for_block( - block_num: Option<&U64>, - watch_consensus_connections: &mut watch::Receiver>>, + needed_block_num: Option<&U64>, + skip_rpcs: &[Arc], + watch_consensus_rpcs: &mut watch::Receiver>>, ) -> Web3ProxyResult { - let mut head_block_num = watch_consensus_connections + info!("waiting for {:?}", needed_block_num); + + let mut best_block_num: Option = watch_consensus_rpcs .borrow_and_update() .as_ref() - .map(|x| *x.head_block.number()); + .and_then(|x| x.best_block_num(skip_rpcs).copied()); - match (block_num, head_block_num) { - (Some(x), Some(ref head)) => { - if x <= head { - // we are past this block and no servers have this block + match (needed_block_num, best_block_num.as_ref()) { + (Some(x), Some(best)) => { + if x <= best { + // the best block is past the needed block and no servers have the needed data // this happens if the block is old and all archive servers are offline // there is no chance we will get this block without adding an archive server to the config - - // TODO: i think this can also happen if we are being rate limited! + // TODO: i think this can also happen if we are being rate limited! but then waiting might work. need skip_rpcs to be smarter return Ok(false); } } @@ -1944,6 +1942,7 @@ async fn watch_for_block( } (Some(_), None) => { // block requested but no servers synced. we will wait + // TODO: if the web3rpcs connected to this consensus isn't watching head blocks, exit with an erorr (waiting for blocks won't ever work) } (None, Some(head)) => { // i don't think this is possible @@ -1955,13 +1954,14 @@ async fn watch_for_block( // future block is requested // wait for the block to arrive - while head_block_num < block_num.copied() { - watch_consensus_connections.changed().await?; + while best_block_num.as_ref() < needed_block_num { + watch_consensus_rpcs.changed().await?; - head_block_num = watch_consensus_connections - .borrow_and_update() + let consensus_rpcs = watch_consensus_rpcs.borrow_and_update(); + + best_block_num = consensus_rpcs .as_ref() - .map(|x| *x.head_block.number()); + .and_then(|x| x.best_block_num(skip_rpcs).copied()); } Ok(true) diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 930f9e04..352adb58 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -17,7 +17,7 @@ use derive_more::From; use entities::sea_orm_active_enums::TrackingLevel; use entities::{balance, referee, referrer, rpc_accounting_v2, rpc_key, user, user_tier}; use influxdb2::models::DataPoint; -use log::{trace, warn}; +use log::{error, trace, warn}; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, @@ -406,7 +406,7 @@ impl BufferedRpcQueryStats { if new_available_balance < Decimal::from(10u64) && downgrade_user_role.title == "Premium" { // TODO: we could do this outside the balance low block, but I think its fine. or better, update the cache if <$10 and downgrade if <$1 if let Some(rpc_secret_key_cache) = rpc_secret_key_cache { - todo!("expire (or probably better to update) the user cache now that the balance is low"); + error!("expire (or probably better to update) the user cache now that the balance is low"); // actually i think we need to have 2 caches. otherwise users with 2 keys are going to have seperate caches // 1. rpc_secret_key_id -> AuthorizationChecks (cuz we don't want to hit the db every time) // 2. user_id -> Balance @@ -419,8 +419,6 @@ impl BufferedRpcQueryStats { // active_downgrade_user.save(db_conn).await?; } - // TODO: - // Get the referee, and the referrer // (2) Look up the code that this user used. This is the referee table let referee_object = match referee::Entity::find()