move rpc filtering to later

This commit is contained in:
Bryan Stitt 2023-10-10 23:05:14 -07:00
parent 9b4f14e779
commit ceaed4f239

View File

@ -21,36 +21,10 @@ use std::cmp::{min_by_key, Ordering, Reverse};
use std::sync::{atomic, Arc};
use std::time::Duration;
use tokio::select;
use tokio::task::yield_now;
use tokio::time::{sleep_until, Instant};
use tracing::{debug, enabled, error, info, trace, warn, Level};
#[derive(Clone, Debug, Serialize)]
struct ConsensusRpcData {
head_block_num: U64,
// TODO: this is too simple. erigon has 4 prune levels (hrct)
oldest_block_num: U64,
}
impl ConsensusRpcData {
fn new(rpc: &Web3Rpc, head: &Web3ProxyBlock) -> Self {
let head_block_num = head.number();
let block_data_limit = rpc.block_data_limit();
let oldest_block_num = head_block_num.saturating_sub(block_data_limit);
Self {
head_block_num,
oldest_block_num,
}
}
// TODO: take an enum for the type of data (hrtc)
fn data_available(&self, block_num: U64) -> bool {
block_num >= self.oldest_block_num && block_num <= self.head_block_num
}
}
#[derive(Constructor, Clone, Copy, Debug, Default, Eq, PartialEq, Serialize)]
pub struct RpcRanking {
backup: bool,
@ -117,17 +91,12 @@ pub struct RankedRpcs {
inner: Vec<Arc<Web3Rpc>>,
// TODO: make serializing work. the key needs to be a string. I think we need `serialize_with`
#[serde(skip_serializing)]
rpc_data: HashMap<Arc<Web3Rpc>, ConsensusRpcData>,
sort_mode: SortMethod,
}
// TODO: could these be refs? The owning RankedRpcs lifetime might work. `stream!` might make it complicated
pub struct RpcsForRequest {
inner: Vec<Arc<Web3Rpc>>,
outer: Vec<Arc<Web3Rpc>>,
request: Arc<Web3Request>,
}
@ -142,14 +111,6 @@ impl RankedRpcs {
let num_synced = rpcs.len();
let rpc_data = Default::default();
// TODO: do we need real data in rpc_data? if we are calling from_rpcs, we probably don't even track their block
// let mut rpc_data = HashMap::<Arc<Web3Rpc>, ConsensusRpcData>::with_capacity(num_synced);
// for rpc in rpcs.iter().cloned() {
// let data = ConsensusRpcData::new(&rpc, &head_block);
// rpc_data.insert(rpc, data);
// }
let sort_mode = SortMethod::Shuffle;
let ranked_rpcs = RankedRpcs {
@ -157,7 +118,6 @@ impl RankedRpcs {
head_block,
inner: rpcs,
num_synced,
rpc_data,
sort_mode,
};
@ -199,17 +159,12 @@ impl RankedRpcs {
// return the first result that exceededs confgured minimums (if any)
if let Some((best_block, _, best_rpcs)) = votes.into_iter().next() {
let mut ranked_rpcs: Vec<_> = best_rpcs.into_iter().map(Arc::clone).collect();
let mut rpc_data = HashMap::new();
let backups_needed = ranked_rpcs.iter().any(|x| x.backup);
let num_synced = ranked_rpcs.len();
// TODO: add all the unsynced rpcs
for (x, x_head) in heads.iter() {
let data = ConsensusRpcData::new(x, x_head);
rpc_data.insert(x.clone(), data);
if ranked_rpcs.contains(x) {
continue;
}
@ -219,6 +174,8 @@ impl RankedRpcs {
continue;
}
// TODO: max age here too?
ranked_rpcs.push(x.clone());
}
@ -230,7 +187,6 @@ impl RankedRpcs {
let consensus = RankedRpcs {
backups_needed,
head_block: best_block,
rpc_data,
sort_mode,
inner: ranked_rpcs,
num_synced,
@ -250,64 +206,59 @@ impl RankedRpcs {
let head_block = self.head_block.number();
// these are bigger than we need, but how much does that matter?
let mut inner = Vec::with_capacity(self.num_active_rpcs());
let mut outer = Vec::with_capacity(self.num_active_rpcs());
let mut inner_for_request = Vec::<Arc<Web3Rpc>>::with_capacity(self.num_active_rpcs());
// TODO: what if min is set to some future block?
// TODO: what if max is set to some future block?
let min_block_needed = web3_request.min_block_needed();
let max_block_needed = web3_request.max_block_needed();
// TODO: max lag was already handled
for rpc in self.inner.iter() {
if let Some(block_needed) = min_block_needed {
if !rpc.has_block_data(block_needed) {
continue;
}
}
if let Some(block_needed) = max_block_needed {
if !rpc.has_block_data(block_needed) {
continue;
}
}
inner_for_request.push(rpc.clone());
}
let now = Instant::now();
match self.sort_mode {
SortMethod::Shuffle => {
// if we are shuffling, it is because we don't watch the head_blocks of the rpcs
// clone all of the rpcs
self.inner.clone_into(&mut inner);
let mut rng = nanorand::tls_rng();
// we use shuffle instead of sort. we will compare weights during `RpcsForRequest::to_stream`
// we use shuffle instead of sort si that the load gets spread around more
// we will still compare weights during `RpcsForRequest::to_stream`
// TODO: use web3_request.start_instant? I think we want it to be as recent as possible
let now = Instant::now();
inner.sort_by_cached_key(|x| {
inner_for_request.sort_by_cached_key(|x| {
x.shuffle_for_load_balancing_on(max_block_needed, &mut rng, now)
});
}
SortMethod::Sort => {
// TODO: what if min is set to some future block?
let min_block_needed = web3_request.min_block_needed();
// TODO: max lag from config
let recent_block_needed = head_block.saturating_sub(U64::from(5));
for rpc in &self.inner {
if self.has_block_data(rpc, recent_block_needed) {
match self.rpc_will_work_eventually(rpc, min_block_needed, max_block_needed)
{
ShouldWaitForBlock::NeverReady => continue,
ShouldWaitForBlock::Ready => inner.push(rpc.clone()),
ShouldWaitForBlock::Wait { .. } => outer.push(rpc.clone()),
}
}
}
let now = Instant::now();
// we use shuffle instead of sort. we will compare weights during `RpcsForRequest::to_stream`
inner.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed, now));
outer.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed, now));
// we sort so that the best nodes are always preferred. we will compare weights during `RpcsForRequest::to_stream`
inner_for_request
.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed, now));
}
}
// TODO: turn these back on
outer.clear();
if inner.is_empty() && outer.is_empty() {
warn!(?inner, ?outer, %web3_request, %head_block, "no rpcs for request");
if inner_for_request.is_empty() {
warn!(?inner_for_request, %web3_request, %head_block, "no rpcs for request");
None
} else {
trace!(?inner, ?outer, %web3_request, "for_request");
trace!(?inner_for_request, %web3_request, "for_request");
Some(RpcsForRequest {
inner,
outer,
inner: inner_for_request,
request: web3_request.clone(),
})
}
@ -327,109 +278,6 @@ impl RankedRpcs {
self.inner.len()
}
pub fn has_block_data(&self, rpc: &Web3Rpc, block_num: U64) -> bool {
self.rpc_data
.get(rpc)
.map(|x| x.data_available(block_num))
.unwrap_or(false)
}
// TODO: take method as a param, too. mark nodes with supported methods (maybe do it optimistically? on error mark them as not supporting it)
pub fn rpc_will_work_eventually(
&self,
rpc: &Arc<Web3Rpc>,
min_block_num: Option<U64>,
max_block_num: Option<U64>,
) -> ShouldWaitForBlock {
if !rpc.healthy.load(atomic::Ordering::Relaxed) {
return ShouldWaitForBlock::NeverReady;
}
if let Some(min_block_num) = min_block_num {
if !self.has_block_data(rpc, min_block_num) {
trace!(
"{} is missing min_block_num ({}). will not work eventually",
rpc,
min_block_num,
);
return ShouldWaitForBlock::NeverReady;
}
}
if let Some(needed_block_num) = max_block_num {
if let Some(rpc_data) = self.rpc_data.get(rpc) {
match rpc_data.head_block_num.cmp(&needed_block_num) {
Ordering::Less => {
trace!("{} is behind. let it catch up", rpc);
// TODO: what if this is a pruned rpc that is behind by a lot, and the block is old, too?
return ShouldWaitForBlock::Wait(
needed_block_num - rpc_data.head_block_num,
);
}
Ordering::Greater | Ordering::Equal => {
// rpc is synced past the needed block. make sure the block isn't too old for it
if self.has_block_data(rpc, needed_block_num) {
trace!("{} has {}", rpc, needed_block_num);
return ShouldWaitForBlock::Ready;
} else {
trace!("{} does not have {}", rpc, needed_block_num);
return ShouldWaitForBlock::NeverReady;
}
}
}
}
warn!("no rpc data for this {}. thats not promising", rpc);
ShouldWaitForBlock::NeverReady
} else {
// if no needed_block_num was specified, then this should work
ShouldWaitForBlock::Ready
}
}
// TODO: this should probably be on the rpcs as "can_serve_request"
// TODO: and it should take the method into account, too
pub fn rpc_will_work_now(
&self,
min_block_needed: Option<U64>,
max_block_needed: Option<U64>,
rpc: &Arc<Web3Rpc>,
) -> bool {
if rpc.backup && !self.backups_needed {
// skip backups unless backups are needed for ranked_rpcs to exist
return false;
}
if !rpc.healthy.load(atomic::Ordering::Relaxed) {
return false;
}
if let Some(min_block_needed) = min_block_needed {
if !self.has_block_data(rpc, min_block_needed) {
trace!(
"{} is missing min_block_needed ({}). will not work now",
rpc,
min_block_needed,
);
return false;
}
}
if let Some(max_block_needed) = max_block_needed {
if !self.has_block_data(rpc, max_block_needed) {
trace!(
"{} is missing max_block_needed ({}). will not work now",
rpc,
max_block_needed,
);
return false;
}
}
// rate limit are handled by sort order
true
}
// TODO: sum_hard_limit?
}
@ -1018,7 +866,7 @@ impl RpcsForRequest {
// let error_handler = web3_request.authorization.error_handler;
let error_handler = None;
let max_len = self.inner.len() + self.outer.len();
let max_len = self.inner.len();
// TODO: do this without having 3 Vecs
let mut filtered = Vec::with_capacity(max_len);
@ -1029,73 +877,74 @@ impl RpcsForRequest {
loop {
if self.request.connect_timeout() {
break;
} else {
yield_now().await;
}
let mut earliest_retry_at = None;
let mut wait_for_sync = FuturesUnordered::new();
// first check the inners, then the outers
for rpcs in [&self.inner, &self.outer] {
attempted.clear();
attempted.clear();
while attempted.len() + completed.len() < rpcs.len() {
filtered.clear();
while attempted.len() + completed.len() < self.inner.len() {
filtered.clear();
// TODO: i'd like to do this without the collect, but since we push into `attempted`, having `attempted.contains` causes issues
filtered.extend(rpcs.iter().filter(|x| !(attempted.contains(x) || completed.contains(x))));
// TODO: i'd like to do this without the collect, but since we push into `attempted`, having `attempted.contains` causes issues
filtered.extend(self.inner.iter().filter(|x| !(attempted.contains(x) || completed.contains(x))));
// tuple_windows doesn't do anything for single item iters. make the code DRY by just having it compare itself
if filtered.len() == 1 {
filtered.push(filtered[0]);
}
// tuple_windows doesn't do anything for single item iters. make the code DRY by just having it compare itself
if filtered.len() == 1 {
filtered.push(filtered[0]);
}
for (rpc_a, rpc_b) in filtered.iter().tuple_windows() {
// TODO: ties within X% to the server with the smallest block_data_limit?
// find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose
// TODO: should next_available be reversed?
// TODO: this is similar to sort_for_load_balancing_on, but at this point we don't want to prefer tiers
// TODO: move ethis to a helper function just so we can test it
// TODO: should x.next_available should be Reverse<_>?
let best_rpc = best_rpc(rpc_a, rpc_b);
for (rpc_a, rpc_b) in filtered.iter().tuple_windows() {
// TODO: ties within X% to the server with the smallest block_data_limit?
// find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose
// TODO: should next_available be reversed?
// TODO: this is similar to sort_for_load_balancing_on, but at this point we don't want to prefer tiers
// TODO: move ethis to a helper function just so we can test it
// TODO: should x.next_available should be Reverse<_>?
let best_rpc = best_rpc(rpc_a, rpc_b);
match best_rpc
.try_request_handle(&self.request, error_handler, false)
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
trace!("opened handle: {}", best_rpc);
completed.insert(best_rpc);
yield handle;
}
Ok(OpenRequestResult::RetryAt(retry_at)) => {
trace!(
"retry on {} @ {}",
best_rpc,
retry_at.duration_since(Instant::now()).as_secs_f32()
);
attempted.insert(best_rpc);
earliest_retry_at = earliest_retry_at.min(Some(retry_at, ));
}
Ok(OpenRequestResult::Lagged(x)) => {
// this will probably always be the same block, right?
trace!("{} is lagged. will not work now", best_rpc);
attempted.insert(best_rpc);
wait_for_sync.push(x);
}
Ok(OpenRequestResult::Failed) => {
// TODO: log a warning? emit a stat?
trace!("best_rpc not ready: {}", best_rpc);
completed.insert(best_rpc);
}
Err(err) => {
trace!("No request handle for {}. err={:?}", best_rpc, err);
completed.insert(best_rpc);
}
match best_rpc
.try_request_handle(&self.request, error_handler, false)
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
trace!("opened handle: {}", best_rpc);
completed.insert(best_rpc);
yield handle;
}
Ok(OpenRequestResult::RetryAt(retry_at)) => {
trace!(
"retry on {} @ {}",
best_rpc,
retry_at.duration_since(Instant::now()).as_secs_f32()
);
attempted.insert(best_rpc);
earliest_retry_at = earliest_retry_at.min(Some(retry_at, ));
}
Ok(OpenRequestResult::Lagged(x)) => {
// this will probably always be the same block, right?
trace!("{} is lagged. will not work now", best_rpc);
attempted.insert(best_rpc);
wait_for_sync.push(x);
}
Ok(OpenRequestResult::Failed) => {
// TODO: log a warning? emit a stat?
trace!("best_rpc not ready: {}", best_rpc);
completed.insert(best_rpc);
}
Err(err) => {
trace!("No request handle for {}. err={:?}", best_rpc, err);
completed.insert(best_rpc);
}
}
debug_assert!(!(attempted.is_empty() && completed.is_empty()));
}
debug_assert!(!(attempted.is_empty() && completed.is_empty()));
}
// if we got this far, no inner or outer rpcs are ready. thats suprising since an inner should have been ready. maybe it got rate limited
@ -1105,11 +954,15 @@ impl RpcsForRequest {
// clear earliest_retry_at if it is too far in the future to help us
if let Some(retry_at) = earliest_retry_at {
let corrected = retry_at.max(min_wait_until).min(self.request.connect_timeout_at());
// set a minimum of 100ms. this is probably actually a bug we should figure out.
earliest_retry_at = Some(retry_at.max(min_wait_until));
earliest_retry_at = Some(corrected);
} else {
earliest_retry_at = Some(self.request.connect_timeout_at());
}
let retry_at = earliest_retry_at.min(Some(self.request.connect_timeout_at())).expect("retry_at always set");
let retry_at = earliest_retry_at.expect("retry_at should always be set by now");
if wait_for_sync.is_empty() {
sleep_until(retry_at).await;