improve rpc filtering

This commit is contained in:
Bryan Stitt 2022-11-28 06:52:16 +00:00
parent d06aa3b170
commit 519ba473d9

View File

@ -390,41 +390,42 @@ impl Web3Connections {
) -> anyhow::Result<OpenRequestResult> {
let mut earliest_retry_at = None;
let min_block_needed = if let Some(min_block_needed) = min_block_needed {
*min_block_needed
let usable_rpcs: Vec<Arc<Web3Connection>> = if let Some(min_block_needed) = min_block_needed
{
// need a potentially old block. check all the rpcs
// TODO: we are going to be checking "has_block_data" a lot now
self.conns
.values()
.filter(|x| !skip.contains(x))
.filter(|x| x.has_block_data(min_block_needed))
.cloned()
.collect()
} else {
match self.head_block_num() {
Some(x) => x,
None => {
trace!("no head block on {:?}", self);
return Ok(OpenRequestResult::NotSynced);
}
}
// need latest. filter the synced rpcs
// TODO: double check has_block_data?
self.synced_connections
.load()
.conns
.iter()
.filter(|x| !skip.contains(x))
.cloned()
.collect()
};
trace!("min block needed: {}", min_block_needed);
// filter the synced rpcs
// TODO: we are going to be checking "has_block_data" a lot now
let head_rpcs: Vec<Arc<Web3Connection>> = self
.synced_connections
.load()
.conns
.iter()
.filter(|x| !skip.contains(x))
.filter(|x| x.has_block_data(&min_block_needed))
.cloned()
.collect();
match head_rpcs.len() {
match usable_rpcs.len() {
0 => {
warn!("no head rpcs: {:?} (skipped {:?})", self, skip);
warn!(
"no rpcs @ {:?}: {:?} (skipped {:?})",
min_block_needed,
self.synced_connections.load(),
skip.iter().map(|x| &x.name).collect::<Vec<_>>()
);
// TODO: what should happen here? automatic retry?
// TODO: more detailed error
return Ok(OpenRequestResult::NotSynced);
}
1 => {
let rpc = head_rpcs.get(0).expect("len is 1");
let rpc = usable_rpcs.get(0).expect("len is 1");
// TODO: try or wait for a request handle?
let handle = rpc
@ -441,7 +442,7 @@ impl Web3Connections {
let mut minimum = f64::MAX;
// we sort on a bunch of values. cache them here so that we don't do this math multiple times.
let available_request_map: HashMap<_, f64> = head_rpcs
let available_request_map: HashMap<_, f64> = usable_rpcs
.iter()
.map(|rpc| {
// TODO: are active requests what we want? do we want a counter for requests in the last second + any actives longer than that?
@ -482,13 +483,14 @@ impl Web3Connections {
};
let sorted_rpcs = {
if head_rpcs.len() == 1 {
vec![head_rpcs.get(0).expect("there should be 1")]
if usable_rpcs.len() == 1 {
vec![usable_rpcs.get(0).expect("there should be 1")]
} else {
let mut rng = thread_fast_rng::thread_fast_rng();
head_rpcs
.choose_multiple_weighted(&mut rng, head_rpcs.len(), |rpc| {
// TODO: sort or weight the non-archive nodes to be first
usable_rpcs
.choose_multiple_weighted(&mut rng, usable_rpcs.len(), |rpc| {
*available_request_map
.get(rpc)
.expect("rpc should always be in the weight map")
@ -709,8 +711,6 @@ impl Web3Connections {
continue;
}
OpenRequestResult::NotSynced => {
warn!("No synced servers! {:?}", self);
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
@ -730,6 +730,8 @@ impl Web3Connections {
.store(true, Ordering::Release);
}
warn!("No synced servers! {:?}", self.synced_connections.load());
// TODO: what error code? 502?
Err(anyhow::anyhow!("all {} tries exhausted", skip_rpcs.len()))
}
@ -996,18 +998,14 @@ mod tests {
);
// best_synced_backend_connection requires servers to be synced with the head block
assert!(matches!(
conns
.best_synced_backend_connection(
&authorization,
None,
&[],
lagged_block.number.as_ref()
)
.await
.unwrap(),
OpenRequestResult::NotSynced
));
let x = conns
.best_synced_backend_connection(&authorization, None, &[], None)
.await
.unwrap();
dbg!(&x);
assert!(matches!(x, OpenRequestResult::NotSynced));
// add lagged blocks to the conns. both servers should be allowed
conns.save_block(&lagged_block, true).await.unwrap();