Merge remote-tracking branch 'origin/devel' into quick_cache_ttl

This commit is contained in:
Bryan Stitt 2023-05-16 14:47:43 -07:00
commit 40b15579ab
7 changed files with 74 additions and 51 deletions

@ -745,4 +745,4 @@ in another repo: event subscriber
- [ ] tests for config reloading
- [ ] use pin instead of arc for a bunch of things?
- https://fasterthanli.me/articles/pin-and-suffering
- [ ] calculate archive depth automatically based on block_data_limits
- [ ] calculate archive depth automatically based on block_data_limits

@ -17,18 +17,19 @@ impl Serialize for EwmaLatency {
}
impl EwmaLatency {
#[inline(always)]
#[inline]
pub fn record(&mut self, duration: Duration) {
self.record_ms(duration.as_secs_f64() * 1000.0);
}
#[inline(always)]
#[inline]
pub fn record_ms(&mut self, milliseconds: f64) {
self.ewma.add(milliseconds);
// don't let it go under 0.1ms
self.ewma.add(milliseconds.max(0.1));
}
/// Current EWMA value in milliseconds
#[inline(always)]
#[inline]
pub fn value(&self) -> f64 {
self.ewma.value()
}
@ -36,10 +37,11 @@ impl EwmaLatency {
impl Default for EwmaLatency {
fn default() -> Self {
// TODO: what should the default span be? 25 requests?
let span = 25.0;
// TODO: what should the default span be? 10 requests?
let span = 10.0;
let start = 1000.0;
// TODO: what should the defautt start be?
let start = 1.0;
Self::new(span, start)
}

@ -444,7 +444,7 @@ impl Web3Rpcs {
let total_tiers = consensus_finder.worst_tier().unwrap_or(10);
let backups_needed = new_synced_connections.backups_needed;
let consensus_head_block = new_synced_connections.head_block.clone();
let num_consensus_rpcs = new_synced_connections.num_conns();
let num_consensus_rpcs = new_synced_connections.num_consensus_rpcs();
let num_active_rpcs = consensus_finder.len();
let total_rpcs = self.by_name.load().len();

@ -111,10 +111,31 @@ pub struct ConsensusWeb3Rpcs {
impl ConsensusWeb3Rpcs {
#[inline]
pub fn num_conns(&self) -> usize {
pub fn num_consensus_rpcs(&self) -> usize {
self.best_rpcs.len()
}
pub fn best_block_num(&self, skip_rpcs: &[Arc<Web3Rpc>]) -> Option<&U64> {
if self.best_rpcs.iter().all(|rpc| skip_rpcs.contains(rpc)) {
// all of the consensus rpcs are skipped
// iterate the other rpc tiers to find the next best block
let mut best_block = None;
for (next_ranking, next_rpcs) in self.other_rpcs.iter() {
if next_rpcs.iter().all(|rpc| skip_rpcs.contains(rpc)) {
// everything in this ranking is skipped
continue;
}
best_block = best_block.max(next_ranking.head_num.as_ref());
}
best_block
} else {
// not all the best synced rpcs are skipped yet. use the best head block
Some(self.head_block.number())
}
}
pub fn has_block_data(&self, rpc: &Web3Rpc, block_num: &U64) -> bool {
self.rpc_data
.get(rpc)
@ -266,15 +287,16 @@ impl ConsensusFinder {
async fn insert(&mut self, rpc: Arc<Web3Rpc>, block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
let first_seen = self
.first_seen
.get_with_by_ref(block.hash(), async move { Instant::now() })
.get_with_by_ref(block.hash(), async { Instant::now() })
.await;
// TODO: this should be 0 if we are first seen, but i think it will be slightly non-zero.
// calculate elapsed time before trying to lock.
// calculate elapsed time before trying to lock
let latency = first_seen.elapsed();
// record the time behind the fastest node
rpc.head_latency.write().record(latency);
// update the local mapping of rpc -> block
self.rpc_heads.insert(rpc, block)
}

@ -37,7 +37,6 @@ use std::fmt;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use thread_fast_rng::rand::seq::SliceRandom;
use tokio;
use tokio::sync::{broadcast, watch};
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
@ -630,7 +629,7 @@ impl Web3Rpcs {
match earliest_retry_at {
None => {
// none of the servers gave us a time to retry at
debug!("no servers on {:?} gave a retry time", self);
debug!("no servers on {:?} gave a retry time. {:?}", self, skip);
// TODO: bring this back? need to think about how to do this with `allow_backups`
// we could return an error here, but maybe waiting a second will fix the problem
@ -784,7 +783,7 @@ impl Web3Rpcs {
let mut skip_rpcs = vec![];
let mut method_not_available_response = None;
let mut watch_consensus_connections = self.watch_consensus_rpcs_sender.subscribe();
let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe();
let start = Instant::now();
@ -949,8 +948,8 @@ impl Web3Rpcs {
trace!("slept!");
skip_rpcs.pop();
}
_ = watch_consensus_connections.changed() => {
watch_consensus_connections.borrow_and_update();
_ = watch_consensus_rpcs.changed() => {
watch_consensus_rpcs.borrow_and_update();
}
}
}
@ -961,14 +960,12 @@ impl Web3Rpcs {
let waiting_for = min_block_needed.max(max_block_needed);
info!("waiting for {:?}", waiting_for);
if watch_for_block(waiting_for, &mut watch_consensus_connections).await? {
if watch_for_block(waiting_for, &skip_rpcs, &mut watch_consensus_rpcs).await? {
// block found! continue so we can check for another rpc
} else {
// rate limits are likely keeping us from serving the head block
watch_consensus_connections.changed().await?;
watch_consensus_connections.borrow_and_update();
watch_consensus_rpcs.changed().await?;
watch_consensus_rpcs.borrow_and_update();
}
}
}
@ -994,7 +991,7 @@ impl Web3Rpcs {
let needed = min_block_needed.max(max_block_needed);
let head_block_num = watch_consensus_connections
let head_block_num = watch_consensus_rpcs
.borrow()
.as_ref()
.map(|x| *x.head_block.number());
@ -1536,7 +1533,6 @@ mod tests {
assert!(lagged_rpc.has_block_data(lagged_block.number.as_ref().unwrap()));
assert!(!lagged_rpc.has_block_data(head_block.number.as_ref().unwrap()));
// todo!("this doesn't work anymore. send_head_block_result doesn't do anything when rpcs isn't watching the block_receiver")
assert_eq!(rpcs.num_synced_rpcs(), 2);
// add head block to the rpcs. lagged_rpc should not be available
@ -1917,22 +1913,24 @@ mod tests {
/// returns `true` when the desired block number is available
/// TODO: max wait time? max number of blocks to wait for? time is probably best
async fn watch_for_block(
block_num: Option<&U64>,
watch_consensus_connections: &mut watch::Receiver<Option<Arc<ConsensusWeb3Rpcs>>>,
needed_block_num: Option<&U64>,
skip_rpcs: &[Arc<Web3Rpc>],
watch_consensus_rpcs: &mut watch::Receiver<Option<Arc<ConsensusWeb3Rpcs>>>,
) -> Web3ProxyResult<bool> {
let mut head_block_num = watch_consensus_connections
info!("waiting for {:?}", needed_block_num);
let mut best_block_num: Option<U64> = watch_consensus_rpcs
.borrow_and_update()
.as_ref()
.map(|x| *x.head_block.number());
.and_then(|x| x.best_block_num(skip_rpcs).copied());
match (block_num, head_block_num) {
(Some(x), Some(ref head)) => {
if x <= head {
// we are past this block and no servers have this block
match (needed_block_num, best_block_num.as_ref()) {
(Some(x), Some(best)) => {
if x <= best {
// the best block is past the needed block and no servers have the needed data
// this happens if the block is old and all archive servers are offline
// there is no chance we will get this block without adding an archive server to the config
// TODO: i think this can also happen if we are being rate limited!
// TODO: i think this can also happen if we are being rate limited! but then waiting might work. need skip_rpcs to be smarter
return Ok(false);
}
}
@ -1944,6 +1942,7 @@ async fn watch_for_block(
}
(Some(_), None) => {
// block requested but no servers synced. we will wait
// TODO: if the web3rpcs connected to this consensus isn't watching head blocks, exit with an erorr (waiting for blocks won't ever work)
}
(None, Some(head)) => {
// i don't think this is possible
@ -1955,13 +1954,14 @@ async fn watch_for_block(
// future block is requested
// wait for the block to arrive
while head_block_num < block_num.copied() {
watch_consensus_connections.changed().await?;
while best_block_num.as_ref() < needed_block_num {
watch_consensus_rpcs.changed().await?;
head_block_num = watch_consensus_connections
.borrow_and_update()
let consensus_rpcs = watch_consensus_rpcs.borrow_and_update();
best_block_num = consensus_rpcs
.as_ref()
.map(|x| *x.head_block.number());
.and_then(|x| x.best_block_num(skip_rpcs).copied());
}
Ok(true)

@ -70,7 +70,6 @@ pub struct Web3Rpc {
/// Track head block latency
pub(super) head_latency: RwLock<EwmaLatency>,
/// Track peak request latency
///
/// This is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) peak_latency: Option<PeakEwmaLatency>,
/// Track total requests served
@ -236,16 +235,18 @@ impl Web3Rpc {
}
pub fn peak_ewma(&self) -> OrderedFloat<f64> {
let peak_latency = if let Some(peak_latency) = self.peak_latency.as_ref() {
peak_latency.latency().as_secs_f64()
} else {
0.0
};
// TODO: bug inside peak ewma somewhere. possible with atomics being relaxed or the conversion to pair and back
// let peak_latency = if let Some(peak_latency) = self.peak_latency.as_ref() {
// peak_latency.latency().as_secs_f64()
// } else {
// 0.0
// };
let head_latency = self.head_latency.read().value();
// TODO: what ordering?
let active_requests = self.active_requests.load(atomic::Ordering::Acquire) as f64 + 1.0;
OrderedFloat(peak_latency * active_requests)
OrderedFloat(head_latency * active_requests)
}
// TODO: would be great if rpcs exposed this. see https://github.com/ledgerwatch/erigon/issues/6391

@ -17,7 +17,7 @@ use derive_more::From;
use entities::sea_orm_active_enums::TrackingLevel;
use entities::{balance, referee, referrer, rpc_accounting_v2, rpc_key, user, user_tier};
use influxdb2::models::DataPoint;
use log::{trace, warn};
use log::{error, trace, warn};
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel,
@ -406,7 +406,7 @@ impl BufferedRpcQueryStats {
if new_available_balance < Decimal::from(10u64) && downgrade_user_role.title == "Premium" {
// TODO: we could do this outside the balance low block, but I think its fine. or better, update the cache if <$10 and downgrade if <$1
if let Some(rpc_secret_key_cache) = rpc_secret_key_cache {
todo!("expire (or probably better to update) the user cache now that the balance is low");
error!("expire (or probably better to update) the user cache now that the balance is low");
// actually i think we need to have 2 caches. otherwise users with 2 keys are going to have seperate caches
// 1. rpc_secret_key_id -> AuthorizationChecks (cuz we don't want to hit the db every time)
// 2. user_id -> Balance
@ -419,8 +419,6 @@ impl BufferedRpcQueryStats {
// active_downgrade_user.save(db_conn).await?;
}
// TODO:
// Get the referee, and the referrer
// (2) Look up the code that this user used. This is the referee table
let referee_object = match referee::Entity::find()