improve wait for block

This commit is contained in:
Bryan Stitt 2023-05-16 13:26:39 -07:00
parent c66eb6d864
commit 978c385b3c
4 changed files with 53 additions and 34 deletions

@ -444,7 +444,7 @@ impl Web3Rpcs {
let total_tiers = consensus_finder.worst_tier().unwrap_or(10);
let backups_needed = new_synced_connections.backups_needed;
let consensus_head_block = new_synced_connections.head_block.clone();
let num_consensus_rpcs = new_synced_connections.num_conns();
let num_consensus_rpcs = new_synced_connections.num_consensus_rpcs();
let num_active_rpcs = consensus_finder.len();
let total_rpcs = self.by_name.load().len();

@ -7,7 +7,7 @@ use derive_more::Constructor;
use ethers::prelude::{H256, U64};
use hashbrown::{HashMap, HashSet};
use itertools::{Itertools, MinMaxResult};
use log::{debug, info, trace, warn};
use log::{trace, warn};
use moka::future::Cache;
use serde::Serialize;
use std::cmp::Reverse;
@ -111,10 +111,31 @@ pub struct ConsensusWeb3Rpcs {
impl ConsensusWeb3Rpcs {
#[inline]
pub fn num_conns(&self) -> usize {
pub fn num_consensus_rpcs(&self) -> usize {
self.best_rpcs.len()
}
pub fn best_block_num(&self, skip_rpcs: &[Arc<Web3Rpc>]) -> Option<&U64> {
if self.best_rpcs.iter().all(|rpc| skip_rpcs.contains(rpc)) {
// all of the consensus rpcs are skipped
// iterate the other rpc tiers to find the next best block
let mut best_block = None;
for (next_ranking, next_rpcs) in self.other_rpcs.iter() {
if next_rpcs.iter().all(|rpc| skip_rpcs.contains(rpc)) {
// everything in this ranking is skipped
continue;
}
best_block = best_block.max(next_ranking.head_num.as_ref());
}
best_block
} else {
// not all the best synced rpcs are skipped yet. use the best head block
Some(self.head_block.number())
}
}
pub fn has_block_data(&self, rpc: &Web3Rpc, block_num: &U64) -> bool {
self.rpc_data
.get(rpc)

@ -37,7 +37,6 @@ use std::fmt;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use thread_fast_rng::rand::seq::SliceRandom;
use tokio;
use tokio::sync::{broadcast, watch};
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
@ -630,7 +629,7 @@ impl Web3Rpcs {
match earliest_retry_at {
None => {
// none of the servers gave us a time to retry at
debug!("no servers on {:?} gave a retry time", self);
debug!("no servers on {:?} gave a retry time. {:?}", self, skip);
// TODO: bring this back? need to think about how to do this with `allow_backups`
// we could return an error here, but maybe waiting a second will fix the problem
@ -784,7 +783,7 @@ impl Web3Rpcs {
let mut skip_rpcs = vec![];
let mut method_not_available_response = None;
let mut watch_consensus_connections = self.watch_consensus_rpcs_sender.subscribe();
let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe();
let start = Instant::now();
@ -949,8 +948,8 @@ impl Web3Rpcs {
trace!("slept!");
skip_rpcs.pop();
}
_ = watch_consensus_connections.changed() => {
watch_consensus_connections.borrow_and_update();
_ = watch_consensus_rpcs.changed() => {
watch_consensus_rpcs.borrow_and_update();
}
}
}
@ -961,14 +960,12 @@ impl Web3Rpcs {
let waiting_for = min_block_needed.max(max_block_needed);
info!("waiting for {:?}", waiting_for);
if watch_for_block(waiting_for, &mut watch_consensus_connections).await? {
if watch_for_block(waiting_for, &skip_rpcs, &mut watch_consensus_rpcs).await? {
// block found! continue so we can check for another rpc
} else {
// rate limits are likely keeping us from serving the head block
watch_consensus_connections.changed().await?;
watch_consensus_connections.borrow_and_update();
watch_consensus_rpcs.changed().await?;
watch_consensus_rpcs.borrow_and_update();
}
}
}
@ -994,7 +991,7 @@ impl Web3Rpcs {
let needed = min_block_needed.max(max_block_needed);
let head_block_num = watch_consensus_connections
let head_block_num = watch_consensus_rpcs
.borrow()
.as_ref()
.map(|x| *x.head_block.number());
@ -1536,7 +1533,6 @@ mod tests {
assert!(lagged_rpc.has_block_data(lagged_block.number.as_ref().unwrap()));
assert!(!lagged_rpc.has_block_data(head_block.number.as_ref().unwrap()));
// todo!("this doesn't work anymore. send_head_block_result doesn't do anything when rpcs isn't watching the block_receiver")
assert_eq!(rpcs.num_synced_rpcs(), 2);
// add head block to the rpcs. lagged_rpc should not be available
@ -1917,22 +1913,24 @@ mod tests {
/// returns `true` when the desired block number is available
/// TODO: max wait time? max number of blocks to wait for? time is probably best
async fn watch_for_block(
block_num: Option<&U64>,
watch_consensus_connections: &mut watch::Receiver<Option<Arc<ConsensusWeb3Rpcs>>>,
needed_block_num: Option<&U64>,
skip_rpcs: &[Arc<Web3Rpc>],
watch_consensus_rpcs: &mut watch::Receiver<Option<Arc<ConsensusWeb3Rpcs>>>,
) -> Web3ProxyResult<bool> {
let mut head_block_num = watch_consensus_connections
info!("waiting for {:?}", needed_block_num);
let mut best_block_num: Option<U64> = watch_consensus_rpcs
.borrow_and_update()
.as_ref()
.map(|x| *x.head_block.number());
.and_then(|x| x.best_block_num(skip_rpcs).copied());
match (block_num, head_block_num) {
(Some(x), Some(ref head)) => {
if x <= head {
// we are past this block and no servers have this block
match (needed_block_num, best_block_num.as_ref()) {
(Some(x), Some(best)) => {
if x <= best {
// the best block is past the needed block and no servers have the needed data
// this happens if the block is old and all archive servers are offline
// there is no chance we will get this block without adding an archive server to the config
// TODO: i think this can also happen if we are being rate limited!
// TODO: i think this can also happen if we are being rate limited! but then waiting might work. need skip_rpcs to be smarter
return Ok(false);
}
}
@ -1944,6 +1942,7 @@ async fn watch_for_block(
}
(Some(_), None) => {
// block requested but no servers synced. we will wait
// TODO: if the web3rpcs connected to this consensus isn't watching head blocks, exit with an erorr (waiting for blocks won't ever work)
}
(None, Some(head)) => {
// i don't think this is possible
@ -1955,13 +1954,14 @@ async fn watch_for_block(
// future block is requested
// wait for the block to arrive
while head_block_num < block_num.copied() {
watch_consensus_connections.changed().await?;
while best_block_num.as_ref() < needed_block_num {
watch_consensus_rpcs.changed().await?;
head_block_num = watch_consensus_connections
.borrow_and_update()
let consensus_rpcs = watch_consensus_rpcs.borrow_and_update();
best_block_num = consensus_rpcs
.as_ref()
.map(|x| *x.head_block.number());
.and_then(|x| x.best_block_num(skip_rpcs).copied());
}
Ok(true)

@ -17,7 +17,7 @@ use derive_more::From;
use entities::sea_orm_active_enums::TrackingLevel;
use entities::{balance, referee, referrer, rpc_accounting_v2, rpc_key, user, user_tier};
use influxdb2::models::DataPoint;
use log::{trace, warn};
use log::{error, trace, warn};
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel,
@ -406,7 +406,7 @@ impl BufferedRpcQueryStats {
if new_available_balance < Decimal::from(10u64) && downgrade_user_role.title == "Premium" {
// TODO: we could do this outside the balance low block, but I think its fine. or better, update the cache if <$10 and downgrade if <$1
if let Some(rpc_secret_key_cache) = rpc_secret_key_cache {
todo!("expire (or probably better to update) the user cache now that the balance is low");
error!("expire (or probably better to update) the user cache now that the balance is low");
// actually i think we need to have 2 caches. otherwise users with 2 keys are going to have seperate caches
// 1. rpc_secret_key_id -> AuthorizationChecks (cuz we don't want to hit the db every time)
// 2. user_id -> Balance
@ -419,8 +419,6 @@ impl BufferedRpcQueryStats {
// active_downgrade_user.save(db_conn).await?;
}
// TODO:
// Get the referee, and the referrer
// (2) Look up the code that this user used. This is the referee table
let referee_object = match referee::Entity::find()