make it work
This commit is contained in:
parent
3c9576c13b
commit
bc306f62d4
@ -420,7 +420,7 @@ impl Web3Rpcs {
|
||||
unimplemented!("this shouldn't be possible")
|
||||
}
|
||||
|
||||
pub async fn best_consensus_head_connection(
|
||||
pub async fn best_available_rpc(
|
||||
&self,
|
||||
authorization: &Arc<Authorization>,
|
||||
request_metadata: Option<&Arc<RequestMetadata>>,
|
||||
@ -436,8 +436,6 @@ impl Web3Rpcs {
|
||||
if let Some(head_block) = synced_connections.head_block.as_ref() {
|
||||
(head_block.number(), head_block.age())
|
||||
} else {
|
||||
// TODO: optionally wait for a head_block.number() >= min_block_needed
|
||||
// TODO: though i think that wait would actually need to be earlier in the request
|
||||
return Ok(OpenRequestResult::NotReady);
|
||||
};
|
||||
|
||||
@ -463,6 +461,8 @@ impl Web3Rpcs {
|
||||
}
|
||||
};
|
||||
|
||||
trace!("needed_blocks_comparison: {:?}", needed_blocks_comparison);
|
||||
|
||||
// collect "usable_rpcs_by_head_num_and_weight"
|
||||
// TODO: MAKE SURE None SORTS LAST?
|
||||
let mut m = BTreeMap::new();
|
||||
@ -470,6 +470,7 @@ impl Web3Rpcs {
|
||||
match needed_blocks_comparison {
|
||||
cmp::Ordering::Less => {
|
||||
// need an old block. check all the rpcs. ignore rpcs that are still syncing
|
||||
trace!("old block needed");
|
||||
|
||||
let min_block_age =
|
||||
self.max_block_age.map(|x| head_block_age.saturating_sub(x));
|
||||
@ -517,12 +518,14 @@ impl Web3Rpcs {
|
||||
// TODO: do we really need to check head_num and age?
|
||||
if let Some(min_sync_num) = min_sync_num.as_ref() {
|
||||
if x_head_num < min_sync_num {
|
||||
trace!("rpc is still syncing");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if let Some(min_block_age) = min_block_age {
|
||||
if x_head.age() < min_block_age {
|
||||
if x_head.age() > min_block_age {
|
||||
// rpc is still syncing
|
||||
trace!("block is too old");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@ -536,12 +539,22 @@ impl Web3Rpcs {
|
||||
// TODO: check min_synced_rpcs and min_sum_soft_limits? or maybe better to just try to serve the request?
|
||||
}
|
||||
cmp::Ordering::Equal => {
|
||||
// need the consensus head block. filter the synced rpcs
|
||||
for x in synced_connections.rpcs.iter().filter(|x| !skip.contains(x)) {
|
||||
// the key doesn't matter if we are checking synced connections. its already sized to what we need
|
||||
let key = (0, None);
|
||||
// using the consensus head block. filter the synced rpcs
|
||||
|
||||
m.entry(key).or_insert_with(Vec::new).push(x.clone());
|
||||
// the key doesn't matter if we are checking synced connections
|
||||
// they are all at the same block and it is already sized to what we need
|
||||
let key = (0, None);
|
||||
|
||||
for x in synced_connections.rpcs.iter() {
|
||||
if skip.contains(x) {
|
||||
trace!("skipping: {}", x);
|
||||
continue;
|
||||
}
|
||||
trace!("not skipped!");
|
||||
|
||||
m.entry(key.clone())
|
||||
.or_insert_with(Vec::new)
|
||||
.push(x.clone());
|
||||
}
|
||||
}
|
||||
cmp::Ordering::Greater => {
|
||||
@ -553,6 +566,11 @@ impl Web3Rpcs {
|
||||
m
|
||||
};
|
||||
|
||||
trace!(
|
||||
"usable_rpcs_by_tier_and_head_number: {:#?}",
|
||||
usable_rpcs_by_tier_and_head_number
|
||||
);
|
||||
|
||||
let mut earliest_retry_at = None;
|
||||
|
||||
for mut usable_rpcs in usable_rpcs_by_tier_and_head_number.into_values() {
|
||||
@ -570,9 +588,13 @@ impl Web3Rpcs {
|
||||
// pick the first two and try the one with the lower rpc.latency.ewma
|
||||
// TODO: chunks or tuple windows?
|
||||
for (rpc_a, rpc_b) in usable_rpcs.into_iter().circular_tuple_windows() {
|
||||
trace!("{} vs {}", rpc_a, rpc_b);
|
||||
// TODO: cached key to save a read lock
|
||||
// TODO: ties to the server with the smallest block_data_limit
|
||||
let best_rpc = min_by_key(rpc_a, rpc_b, |x| {
|
||||
OrderedFloat(x.request_latency.read().ewma.value())
|
||||
});
|
||||
trace!("winner: {}", best_rpc);
|
||||
|
||||
// just because it has lower latency doesn't mean we are sure to get a connection
|
||||
match best_rpc.try_request_handle(authorization, None).await {
|
||||
@ -585,6 +607,7 @@ impl Web3Rpcs {
|
||||
}
|
||||
Ok(OpenRequestResult::NotReady) => {
|
||||
// TODO: log a warning? emit a stat?
|
||||
trace!("best_rpc not ready");
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("No request handle for {}. err={:?}", best_rpc, err)
|
||||
@ -771,7 +794,7 @@ impl Web3Rpcs {
|
||||
}
|
||||
|
||||
match self
|
||||
.best_consensus_head_connection(
|
||||
.best_available_rpc(
|
||||
authorization,
|
||||
request_metadata,
|
||||
&skip_rpcs,
|
||||
@ -922,65 +945,34 @@ impl Web3Rpcs {
|
||||
request_metadata.no_servers.fetch_add(1, Ordering::Release);
|
||||
}
|
||||
|
||||
// todo!(
|
||||
// "check if we are requesting an old block and no archive servers are synced"
|
||||
// );
|
||||
|
||||
if let Some(min_block_needed) = min_block_needed {
|
||||
let mut theres_a_chance = false;
|
||||
|
||||
for potential_conn in self.by_name.values() {
|
||||
if skip_rpcs.contains(potential_conn) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: should we instead check if has_block_data but with the current head block?
|
||||
if potential_conn.has_block_data(min_block_needed) {
|
||||
trace!("chance for {} on {}", min_block_needed, potential_conn);
|
||||
theres_a_chance = true;
|
||||
break;
|
||||
}
|
||||
|
||||
skip_rpcs.push(potential_conn.clone());
|
||||
}
|
||||
|
||||
if !theres_a_chance {
|
||||
debug!("no chance of finding data in block #{}", min_block_needed);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
debug!("No servers ready. Waiting up for change in synced servers");
|
||||
|
||||
watch_consensus_connections.changed().await?;
|
||||
watch_consensus_connections.borrow_and_update();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(r) = method_not_available_response {
|
||||
// TODO: emit a stat for unsupported methods?
|
||||
return Ok(r);
|
||||
}
|
||||
|
||||
// TODO: do we need this here, or do we do it somewhere else?
|
||||
// TODO: do we need this here, or do we do it somewhere else? like, the code could change and a try operator in here would skip this increment
|
||||
if let Some(request_metadata) = request_metadata {
|
||||
request_metadata
|
||||
.error_response
|
||||
.store(true, Ordering::Release);
|
||||
}
|
||||
|
||||
if let Some(r) = method_not_available_response {
|
||||
// TODO: emit a stat for unsupported methods? it would be best to block them at the proxy instead of at the backend
|
||||
return Ok(r);
|
||||
}
|
||||
|
||||
let num_conns = self.by_name.len();
|
||||
let num_skipped = skip_rpcs.len();
|
||||
|
||||
if num_skipped == 0 {
|
||||
error!("No servers synced ({} known)", num_conns);
|
||||
error!("No servers synced ({} known). None skipped", num_conns);
|
||||
|
||||
return Ok(JsonRpcForwardedResponse::from_str(
|
||||
Ok(JsonRpcForwardedResponse::from_str(
|
||||
"No servers synced",
|
||||
Some(-32000),
|
||||
Some(request.id),
|
||||
));
|
||||
))
|
||||
} else {
|
||||
// TODO: warn? debug? trace?
|
||||
warn!(
|
||||
@ -990,11 +982,11 @@ impl Web3Rpcs {
|
||||
|
||||
// TODO: what error code?
|
||||
// cloudflare gives {"jsonrpc":"2.0","error":{"code":-32043,"message":"Requested data cannot be older than 128 blocks."},"id":1}
|
||||
return Ok(JsonRpcForwardedResponse::from_str(
|
||||
Ok(JsonRpcForwardedResponse::from_str(
|
||||
"Requested data is not available",
|
||||
Some(-32043),
|
||||
Some(request.id),
|
||||
));
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@ -1396,7 +1388,7 @@ mod tests {
|
||||
|
||||
// best_synced_backend_connection requires servers to be synced with the head block
|
||||
let x = rpcs
|
||||
.best_consensus_head_connection(&authorization, None, &[], None, None)
|
||||
.best_available_rpc(&authorization, None, &[], None, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@ -1447,29 +1439,28 @@ mod tests {
|
||||
assert_eq!(rpcs.num_synced_rpcs(), 1);
|
||||
|
||||
assert!(matches!(
|
||||
rpcs.best_consensus_head_connection(&authorization, None, &[], None, None)
|
||||
rpcs.best_available_rpc(&authorization, None, &[], None, None)
|
||||
.await,
|
||||
Ok(OpenRequestResult::Handle(_))
|
||||
));
|
||||
|
||||
assert!(matches!(
|
||||
rpcs.best_consensus_head_connection(&authorization, None, &[], Some(&0.into()), None)
|
||||
rpcs.best_available_rpc(&authorization, None, &[], Some(&0.into()), None)
|
||||
.await,
|
||||
Ok(OpenRequestResult::Handle(_))
|
||||
));
|
||||
|
||||
assert!(matches!(
|
||||
rpcs.best_consensus_head_connection(&authorization, None, &[], Some(&1.into()), None)
|
||||
rpcs.best_available_rpc(&authorization, None, &[], Some(&1.into()), None)
|
||||
.await,
|
||||
Ok(OpenRequestResult::Handle(_))
|
||||
));
|
||||
|
||||
// future block should not get a handle
|
||||
assert!(matches!(
|
||||
rpcs.best_consensus_head_connection(&authorization, None, &[], Some(&2.into()), None)
|
||||
.await,
|
||||
Ok(OpenRequestResult::NotReady)
|
||||
));
|
||||
let future_rpc = rpcs
|
||||
.best_available_rpc(&authorization, None, &[], Some(&2.into()), None)
|
||||
.await;
|
||||
assert!(matches!(future_rpc, Ok(OpenRequestResult::NotReady)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@ -1505,6 +1496,7 @@ mod tests {
|
||||
block_data_limit: 64.into(),
|
||||
tier: 1,
|
||||
head_block: RwLock::new(Some(head_block.clone())),
|
||||
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@ -1516,6 +1508,7 @@ mod tests {
|
||||
block_data_limit: u64::MAX.into(),
|
||||
tier: 2,
|
||||
head_block: RwLock::new(Some(head_block.clone())),
|
||||
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@ -1583,25 +1576,25 @@ mod tests {
|
||||
|
||||
// best_synced_backend_connection requires servers to be synced with the head block
|
||||
// TODO: test with and without passing the head_block.number?
|
||||
let best_head_server = rpcs
|
||||
.best_consensus_head_connection(
|
||||
&authorization,
|
||||
None,
|
||||
&[],
|
||||
Some(&head_block.number()),
|
||||
None,
|
||||
)
|
||||
let best_available_server = rpcs
|
||||
.best_available_rpc(&authorization, None, &[], Some(&head_block.number()), None)
|
||||
.await;
|
||||
|
||||
debug!("best_head_server: {:#?}", best_head_server);
|
||||
debug!("best_available_server: {:#?}", best_available_server);
|
||||
|
||||
assert!(matches!(
|
||||
best_head_server.unwrap(),
|
||||
best_available_server.unwrap(),
|
||||
OpenRequestResult::Handle(_)
|
||||
));
|
||||
|
||||
let best_available_server_from_none = rpcs
|
||||
.best_available_rpc(&authorization, None, &[], None, None)
|
||||
.await;
|
||||
|
||||
// assert_eq!(best_available_server, best_available_server_from_none);
|
||||
|
||||
let best_archive_server = rpcs
|
||||
.best_consensus_head_connection(&authorization, None, &[], Some(&1.into()), None)
|
||||
.best_available_rpc(&authorization, None, &[], Some(&1.into()), None)
|
||||
.await;
|
||||
|
||||
match best_archive_server {
|
||||
|
Loading…
Reference in New Issue
Block a user