make it work

This commit is contained in:
Bryan Stitt 2023-02-15 21:05:41 -08:00 committed by yenicelik
parent 1b7050d294
commit e1f803e91a

View File

@ -420,7 +420,7 @@ impl Web3Rpcs {
unimplemented!("this shouldn't be possible") unimplemented!("this shouldn't be possible")
} }
pub async fn best_consensus_head_connection( pub async fn best_available_rpc(
&self, &self,
authorization: &Arc<Authorization>, authorization: &Arc<Authorization>,
request_metadata: Option<&Arc<RequestMetadata>>, request_metadata: Option<&Arc<RequestMetadata>>,
@ -436,8 +436,6 @@ impl Web3Rpcs {
if let Some(head_block) = synced_connections.head_block.as_ref() { if let Some(head_block) = synced_connections.head_block.as_ref() {
(head_block.number(), head_block.age()) (head_block.number(), head_block.age())
} else { } else {
// TODO: optionally wait for a head_block.number() >= min_block_needed
// TODO: though i think that wait would actually need to be earlier in the request
return Ok(OpenRequestResult::NotReady); return Ok(OpenRequestResult::NotReady);
}; };
@ -463,6 +461,8 @@ impl Web3Rpcs {
} }
}; };
trace!("needed_blocks_comparison: {:?}", needed_blocks_comparison);
// collect "usable_rpcs_by_head_num_and_weight" // collect "usable_rpcs_by_head_num_and_weight"
// TODO: MAKE SURE None SORTS LAST? // TODO: MAKE SURE None SORTS LAST?
let mut m = BTreeMap::new(); let mut m = BTreeMap::new();
@ -470,6 +470,7 @@ impl Web3Rpcs {
match needed_blocks_comparison { match needed_blocks_comparison {
cmp::Ordering::Less => { cmp::Ordering::Less => {
// need an old block. check all the rpcs. ignore rpcs that are still syncing // need an old block. check all the rpcs. ignore rpcs that are still syncing
trace!("old block needed");
let min_block_age = let min_block_age =
self.max_block_age.map(|x| head_block_age.saturating_sub(x)); self.max_block_age.map(|x| head_block_age.saturating_sub(x));
@ -517,12 +518,14 @@ impl Web3Rpcs {
// TODO: do we really need to check head_num and age? // TODO: do we really need to check head_num and age?
if let Some(min_sync_num) = min_sync_num.as_ref() { if let Some(min_sync_num) = min_sync_num.as_ref() {
if x_head_num < min_sync_num { if x_head_num < min_sync_num {
trace!("rpc is still syncing");
continue; continue;
} }
} }
if let Some(min_block_age) = min_block_age { if let Some(min_block_age) = min_block_age {
if x_head.age() < min_block_age { if x_head.age() > min_block_age {
// rpc is still syncing // rpc is still syncing
trace!("block is too old");
continue; continue;
} }
} }
@ -536,12 +539,22 @@ impl Web3Rpcs {
// TODO: check min_synced_rpcs and min_sum_soft_limits? or maybe better to just try to serve the request? // TODO: check min_synced_rpcs and min_sum_soft_limits? or maybe better to just try to serve the request?
} }
cmp::Ordering::Equal => { cmp::Ordering::Equal => {
// need the consensus head block. filter the synced rpcs // using the consensus head block. filter the synced rpcs
for x in synced_connections.rpcs.iter().filter(|x| !skip.contains(x)) {
// the key doesn't matter if we are checking synced connections. its already sized to what we need
let key = (0, None);
m.entry(key).or_insert_with(Vec::new).push(x.clone()); // the key doesn't matter if we are checking synced connections
// they are all at the same block and it is already sized to what we need
let key = (0, None);
for x in synced_connections.rpcs.iter() {
if skip.contains(x) {
trace!("skipping: {}", x);
continue;
}
trace!("not skipped!");
m.entry(key.clone())
.or_insert_with(Vec::new)
.push(x.clone());
} }
} }
cmp::Ordering::Greater => { cmp::Ordering::Greater => {
@ -553,6 +566,11 @@ impl Web3Rpcs {
m m
}; };
trace!(
"usable_rpcs_by_tier_and_head_number: {:#?}",
usable_rpcs_by_tier_and_head_number
);
let mut earliest_retry_at = None; let mut earliest_retry_at = None;
for mut usable_rpcs in usable_rpcs_by_tier_and_head_number.into_values() { for mut usable_rpcs in usable_rpcs_by_tier_and_head_number.into_values() {
@ -570,9 +588,13 @@ impl Web3Rpcs {
// pick the first two and try the one with the lower rpc.latency.ewma // pick the first two and try the one with the lower rpc.latency.ewma
// TODO: chunks or tuple windows? // TODO: chunks or tuple windows?
for (rpc_a, rpc_b) in usable_rpcs.into_iter().circular_tuple_windows() { for (rpc_a, rpc_b) in usable_rpcs.into_iter().circular_tuple_windows() {
trace!("{} vs {}", rpc_a, rpc_b);
// TODO: cached key to save a read lock
// TODO: ties to the server with the smallest block_data_limit
let best_rpc = min_by_key(rpc_a, rpc_b, |x| { let best_rpc = min_by_key(rpc_a, rpc_b, |x| {
OrderedFloat(x.request_latency.read().ewma.value()) OrderedFloat(x.request_latency.read().ewma.value())
}); });
trace!("winner: {}", best_rpc);
// just because it has lower latency doesn't mean we are sure to get a connection // just because it has lower latency doesn't mean we are sure to get a connection
match best_rpc.try_request_handle(authorization, None).await { match best_rpc.try_request_handle(authorization, None).await {
@ -585,6 +607,7 @@ impl Web3Rpcs {
} }
Ok(OpenRequestResult::NotReady) => { Ok(OpenRequestResult::NotReady) => {
// TODO: log a warning? emit a stat? // TODO: log a warning? emit a stat?
trace!("best_rpc not ready");
} }
Err(err) => { Err(err) => {
warn!("No request handle for {}. err={:?}", best_rpc, err) warn!("No request handle for {}. err={:?}", best_rpc, err)
@ -771,7 +794,7 @@ impl Web3Rpcs {
} }
match self match self
.best_consensus_head_connection( .best_available_rpc(
authorization, authorization,
request_metadata, request_metadata,
&skip_rpcs, &skip_rpcs,
@ -922,65 +945,34 @@ impl Web3Rpcs {
request_metadata.no_servers.fetch_add(1, Ordering::Release); request_metadata.no_servers.fetch_add(1, Ordering::Release);
} }
// todo!( break;
// "check if we are requesting an old block and no archive servers are synced"
// );
if let Some(min_block_needed) = min_block_needed {
let mut theres_a_chance = false;
for potential_conn in self.by_name.values() {
if skip_rpcs.contains(potential_conn) {
continue;
}
// TODO: should we instead check if has_block_data but with the current head block?
if potential_conn.has_block_data(min_block_needed) {
trace!("chance for {} on {}", min_block_needed, potential_conn);
theres_a_chance = true;
break;
}
skip_rpcs.push(potential_conn.clone());
}
if !theres_a_chance {
debug!("no chance of finding data in block #{}", min_block_needed);
break;
}
}
debug!("No servers ready. Waiting up for change in synced servers");
watch_consensus_connections.changed().await?;
watch_consensus_connections.borrow_and_update();
} }
} }
} }
if let Some(r) = method_not_available_response { // TODO: do we need this here, or do we do it somewhere else? like, the code could change and a try operator in here would skip this increment
// TODO: emit a stat for unsupported methods?
return Ok(r);
}
// TODO: do we need this here, or do we do it somewhere else?
if let Some(request_metadata) = request_metadata { if let Some(request_metadata) = request_metadata {
request_metadata request_metadata
.error_response .error_response
.store(true, Ordering::Release); .store(true, Ordering::Release);
} }
if let Some(r) = method_not_available_response {
// TODO: emit a stat for unsupported methods? it would be best to block them at the proxy instead of at the backend
return Ok(r);
}
let num_conns = self.by_name.len(); let num_conns = self.by_name.len();
let num_skipped = skip_rpcs.len(); let num_skipped = skip_rpcs.len();
if num_skipped == 0 { if num_skipped == 0 {
error!("No servers synced ({} known)", num_conns); error!("No servers synced ({} known). None skipped", num_conns);
return Ok(JsonRpcForwardedResponse::from_str( Ok(JsonRpcForwardedResponse::from_str(
"No servers synced", "No servers synced",
Some(-32000), Some(-32000),
Some(request.id), Some(request.id),
)); ))
} else { } else {
// TODO: warn? debug? trace? // TODO: warn? debug? trace?
warn!( warn!(
@ -990,11 +982,11 @@ impl Web3Rpcs {
// TODO: what error code? // TODO: what error code?
// cloudflare gives {"jsonrpc":"2.0","error":{"code":-32043,"message":"Requested data cannot be older than 128 blocks."},"id":1} // cloudflare gives {"jsonrpc":"2.0","error":{"code":-32043,"message":"Requested data cannot be older than 128 blocks."},"id":1}
return Ok(JsonRpcForwardedResponse::from_str( Ok(JsonRpcForwardedResponse::from_str(
"Requested data is not available", "Requested data is not available",
Some(-32043), Some(-32043),
Some(request.id), Some(request.id),
)); ))
} }
} }
@ -1396,7 +1388,7 @@ mod tests {
// best_synced_backend_connection requires servers to be synced with the head block // best_synced_backend_connection requires servers to be synced with the head block
let x = rpcs let x = rpcs
.best_consensus_head_connection(&authorization, None, &[], None, None) .best_available_rpc(&authorization, None, &[], None, None)
.await .await
.unwrap(); .unwrap();
@ -1447,29 +1439,28 @@ mod tests {
assert_eq!(rpcs.num_synced_rpcs(), 1); assert_eq!(rpcs.num_synced_rpcs(), 1);
assert!(matches!( assert!(matches!(
rpcs.best_consensus_head_connection(&authorization, None, &[], None, None) rpcs.best_available_rpc(&authorization, None, &[], None, None)
.await, .await,
Ok(OpenRequestResult::Handle(_)) Ok(OpenRequestResult::Handle(_))
)); ));
assert!(matches!( assert!(matches!(
rpcs.best_consensus_head_connection(&authorization, None, &[], Some(&0.into()), None) rpcs.best_available_rpc(&authorization, None, &[], Some(&0.into()), None)
.await, .await,
Ok(OpenRequestResult::Handle(_)) Ok(OpenRequestResult::Handle(_))
)); ));
assert!(matches!( assert!(matches!(
rpcs.best_consensus_head_connection(&authorization, None, &[], Some(&1.into()), None) rpcs.best_available_rpc(&authorization, None, &[], Some(&1.into()), None)
.await, .await,
Ok(OpenRequestResult::Handle(_)) Ok(OpenRequestResult::Handle(_))
)); ));
// future block should not get a handle // future block should not get a handle
assert!(matches!( let future_rpc = rpcs
rpcs.best_consensus_head_connection(&authorization, None, &[], Some(&2.into()), None) .best_available_rpc(&authorization, None, &[], Some(&2.into()), None)
.await, .await;
Ok(OpenRequestResult::NotReady) assert!(matches!(future_rpc, Ok(OpenRequestResult::NotReady)));
));
} }
#[tokio::test] #[tokio::test]
@ -1505,6 +1496,7 @@ mod tests {
block_data_limit: 64.into(), block_data_limit: 64.into(),
tier: 1, tier: 1,
head_block: RwLock::new(Some(head_block.clone())), head_block: RwLock::new(Some(head_block.clone())),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
..Default::default() ..Default::default()
}; };
@ -1516,6 +1508,7 @@ mod tests {
block_data_limit: u64::MAX.into(), block_data_limit: u64::MAX.into(),
tier: 2, tier: 2,
head_block: RwLock::new(Some(head_block.clone())), head_block: RwLock::new(Some(head_block.clone())),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
..Default::default() ..Default::default()
}; };
@ -1583,25 +1576,25 @@ mod tests {
// best_synced_backend_connection requires servers to be synced with the head block // best_synced_backend_connection requires servers to be synced with the head block
// TODO: test with and without passing the head_block.number? // TODO: test with and without passing the head_block.number?
let best_head_server = rpcs let best_available_server = rpcs
.best_consensus_head_connection( .best_available_rpc(&authorization, None, &[], Some(&head_block.number()), None)
&authorization,
None,
&[],
Some(&head_block.number()),
None,
)
.await; .await;
debug!("best_head_server: {:#?}", best_head_server); debug!("best_available_server: {:#?}", best_available_server);
assert!(matches!( assert!(matches!(
best_head_server.unwrap(), best_available_server.unwrap(),
OpenRequestResult::Handle(_) OpenRequestResult::Handle(_)
)); ));
let best_available_server_from_none = rpcs
.best_available_rpc(&authorization, None, &[], None, None)
.await;
// assert_eq!(best_available_server, best_available_server_from_none);
let best_archive_server = rpcs let best_archive_server = rpcs
.best_consensus_head_connection(&authorization, None, &[], Some(&1.into()), None) .best_available_rpc(&authorization, None, &[], Some(&1.into()), None)
.await; .await;
match best_archive_server { match best_archive_server {