bring back outer rpc checking

This commit is contained in:
Bryan Stitt 2023-10-12 23:57:12 -07:00
parent 433bf02c3c
commit 51ba539800
3 changed files with 53 additions and 40 deletions

@ -103,6 +103,7 @@ pub async fn clean_block_number<'a>(
if block_hash == *head_block.hash() {
(head_block.into(), false)
} else if let Some(app) = app {
// TODO: query for the block
let block = app
.balanced_rpcs
.blocks_by_hash
@ -161,7 +162,7 @@ pub async fn clean_block_number<'a>(
let head_block_num = head_block.number();
if block_num > head_block_num {
// TODO: option to wait for the block
// todo!(option to wait for the block here)
return Err(Web3ProxyError::UnknownBlockNumber {
known: head_block_num,
unknown: block_num,

@ -97,6 +97,7 @@ pub struct RankedRpcs {
// TODO: could these be refs? The owning RankedRpcs lifetime might work. `stream!` might make it complicated
pub struct RpcsForRequest {
inner: Vec<Arc<Web3Rpc>>,
outer: Vec<Arc<Web3Rpc>>,
request: Arc<ValidatedRequest>,
}
@ -205,8 +206,11 @@ impl RankedRpcs {
let head_block = self.head_block.number();
let num_active = self.num_active_rpcs();
// these are bigger than we need, but how much does that matter?
let mut inner_for_request = Vec::<Arc<Web3Rpc>>::with_capacity(self.num_active_rpcs());
let mut inner_for_request = Vec::<Arc<Web3Rpc>>::with_capacity(num_active);
let mut outer_for_request = Vec::with_capacity(num_active);
// TODO: what if min is set to some future block?
// TODO: what if max is set to some future block?
@ -214,21 +218,24 @@ impl RankedRpcs {
let max_block_needed = web3_request.max_block_needed();
// TODO: max lag was already handled
for rpc in self.inner.iter() {
for rpc in self.inner.iter().cloned() {
if let Some(block_needed) = min_block_needed {
if !rpc.has_block_data(block_needed) {
outer_for_request.push(rpc);
continue;
}
}
if let Some(block_needed) = max_block_needed {
if !rpc.has_block_data(block_needed) {
outer_for_request.push(rpc);
continue;
}
}
inner_for_request.push(rpc.clone());
inner_for_request.push(rpc);
}
// TODO: use web3_request.start_instant? I think we want it to be as recent as possible
let now = Instant::now();
match self.sort_mode {
@ -239,26 +246,31 @@ impl RankedRpcs {
// we use shuffle instead of sort si that the load gets spread around more
// we will still compare weights during `RpcsForRequest::to_stream`
// TODO: use web3_request.start_instant? I think we want it to be as recent as possible
inner_for_request.sort_by_cached_key(|x| {
x.shuffle_for_load_balancing_on(max_block_needed, &mut rng, now)
});
outer_for_request.sort_by_cached_key(|x| {
x.shuffle_for_load_balancing_on(max_block_needed, &mut rng, now)
});
}
SortMethod::Sort => {
// we sort so that the best nodes are always preferred. we will compare weights during `RpcsForRequest::to_stream`
inner_for_request
.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed, now));
outer_for_request
.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed, now));
}
}
if inner_for_request.is_empty() {
warn!(?inner_for_request, %web3_request, %head_block, "no rpcs for request");
warn!(?inner_for_request, ?outer_for_request, %web3_request, %head_block, "no rpcs for request");
None
} else {
trace!(?inner_for_request, %web3_request, "for_request");
trace!(?inner_for_request, ?outer_for_request, %web3_request, "for_request");
Some(RpcsForRequest {
inner: inner_for_request,
outer: outer_for_request,
request: web3_request.clone(),
})
}
@ -870,46 +882,46 @@ impl RpcsForRequest {
loop {
if self.request.connect_timeout() {
break;
} else {
// yield_now().await;
}
let mut earliest_retry_at = None;
let mut wait_for_sync = Vec::new();
// TODO: we used to do a neat power of 2 random choices here, but it had bugs. bring that back
for best_rpc in self.inner.iter() {
match best_rpc
.try_request_handle(&self.request, error_handler, false)
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
trace!("opened handle: {}", best_rpc);
yield handle;
for rpcs in [self.inner.iter(), self.outer.iter()] {
for best_rpc in rpcs {
match best_rpc
.try_request_handle(&self.request, error_handler, false)
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
trace!("opened handle: {}", best_rpc);
yield handle;
}
Ok(OpenRequestResult::RetryAt(retry_at)) => {
trace!(
"retry on {} @ {}",
best_rpc,
retry_at.duration_since(Instant::now()).as_secs_f32()
);
earliest_retry_at = earliest_retry_at.min(Some(retry_at, ));
}
Ok(OpenRequestResult::Lagged(x)) => {
// this will probably always be the same block, right?
trace!("{} is lagged. will not work now", best_rpc);
wait_for_sync.push(x);
}
Ok(OpenRequestResult::Failed) => {
// TODO: log a warning? emit a stat?
trace!("best_rpc not ready: {}", best_rpc);
}
Err(err) => {
trace!("No request handle for {}. err={:?}", best_rpc, err);
}
}
Ok(OpenRequestResult::RetryAt(retry_at)) => {
trace!(
"retry on {} @ {}",
best_rpc,
retry_at.duration_since(Instant::now()).as_secs_f32()
);
earliest_retry_at = earliest_retry_at.min(Some(retry_at, ));
}
Ok(OpenRequestResult::Lagged(x)) => {
// this will probably always be the same block, right?
trace!("{} is lagged. will not work now", best_rpc);
wait_for_sync.push(x);
}
Ok(OpenRequestResult::Failed) => {
// TODO: log a warning? emit a stat?
trace!("best_rpc not ready: {}", best_rpc);
}
Err(err) => {
trace!("No request handle for {}. err={:?}", best_rpc, err);
}
}
yield_now().await;
yield_now().await;
}
}
// if we got this far, no inner or outer rpcs are ready. thats suprising since an inner should have been ready. maybe it got rate limited

@ -462,7 +462,7 @@ impl Web3Rpcs {
// TODO: limit number of tries
let rpcs = self.try_rpcs_for_request(web3_request).await?;
let stream = rpcs.to_stream().take(3);
let stream = rpcs.to_stream();
pin!(stream);