if not watching heads, send to any server

This commit is contained in:
Bryan Stitt 2023-04-14 00:15:27 -07:00
parent f3435bc6e0
commit 3621d71037

@ -492,142 +492,162 @@ impl Web3Rpcs {
max_block_needed: Option<&U64>,
) -> anyhow::Result<OpenRequestResult> {
let usable_rpcs_by_tier_and_head_number: BTreeMap<(u64, Option<U64>), Vec<Arc<Web3Rpc>>> = {
let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone();
if self.watch_consensus_head_sender.is_none() {
// pick any server
let mut m = BTreeMap::new();
if synced_connections.is_none() {
return Ok(OpenRequestResult::NotReady);
}
let synced_connections =
synced_connections.expect("synced_connections can't be None here");
let key = (0, None);
let head_block_num = synced_connections.head_block.number();
let head_block_age = synced_connections.head_block.age();
// TODO: double check the logic on this. especially if only min is set
let needed_blocks_comparison = match (min_block_needed, max_block_needed) {
(None, None) => {
// no required block given. treat this like they requested the consensus head block
cmp::Ordering::Equal
}
(None, Some(max_block_needed)) => max_block_needed.cmp(head_block_num),
(Some(min_block_needed), None) => min_block_needed.cmp(head_block_num),
(Some(min_block_needed), Some(max_block_needed)) => {
match min_block_needed.cmp(max_block_needed) {
cmp::Ordering::Less | cmp::Ordering::Equal => {
min_block_needed.cmp(head_block_num)
}
cmp::Ordering::Greater => {
// TODO: force a debug log of the original request to see if our logic is wrong?
// TODO: attach the rpc_key_id so we can find the user to ask if they need help
return Err(anyhow::anyhow!(
"Invalid blocks bounds requested. min ({}) > max ({})",
min_block_needed,
max_block_needed
));
}
for x in self.by_name.read().values() {
if skip.contains(x) {
trace!("skipping: {}", x);
continue;
}
trace!("not skipped!");
m.entry(key).or_insert_with(Vec::new).push(x.clone());
}
};
trace!("needed_blocks_comparison: {:?}", needed_blocks_comparison);
m
} else {
let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone();
// collect "usable_rpcs_by_head_num_and_weight"
// TODO: MAKE SURE None SORTS LAST?
let mut m = BTreeMap::new();
match needed_blocks_comparison {
cmp::Ordering::Less => {
// need an old block. check all the rpcs. ignore rpcs that are still syncing
trace!("old block needed");
let min_block_age =
self.max_block_age.map(|x| head_block_age.saturating_sub(x));
let min_sync_num = self.max_block_lag.map(|x| head_block_num.saturating_sub(x));
// TODO: cache this somehow?
// TODO: maybe have a helper on synced_connections? that way sum_soft_limits/min_synced_rpcs will be DRY
for x in self
.by_name
.read()
.values()
.filter(|x| {
// TODO: move a bunch of this onto a rpc.is_synced function
#[allow(clippy::if_same_then_else)]
if skip.contains(x) {
// we've already tried this server or have some other reason to skip it
false
} else if max_block_needed
.map(|max_block_needed| !x.has_block_data(max_block_needed))
.unwrap_or(false)
{
// server does not have the max block
trace!(
"{} does not have the max block ({:?})",
x,
max_block_needed
);
false
} else {
!min_block_needed
.map(|min_block_needed| !x.has_block_data(min_block_needed))
.unwrap_or(false)
}
})
.cloned()
{
let x_head_block = x.head_block.read().clone();
if let Some(x_head) = x_head_block {
// TODO: should nodes that are ahead of the consensus block have priority? seems better to spread the load
let x_head_num = x_head.number().min(head_block_num);
// TODO: do we really need to check head_num and age?
if let Some(min_sync_num) = min_sync_num.as_ref() {
if x_head_num < min_sync_num {
trace!("rpc is still syncing");
continue;
}
}
if let Some(min_block_age) = min_block_age {
if x_head.age() > min_block_age {
// rpc is still syncing
trace!("server's block is too old");
continue;
}
}
let key = (x.tier, Some(*x_head_num));
m.entry(key).or_insert_with(Vec::new).push(x);
}
}
// TODO: check min_synced_rpcs and min_sum_soft_limits? or maybe better to just try to serve the request?
}
cmp::Ordering::Equal => {
// using the consensus head block. filter the synced rpcs
// the key doesn't matter if we are checking synced connections
// they are all at the same block and it is already sized to what we need
let key = (0, None);
for x in synced_connections.best_rpcs.iter() {
if skip.contains(x) {
trace!("skipping: {}", x);
continue;
}
trace!("not skipped!");
m.entry(key).or_insert_with(Vec::new).push(x.clone());
}
}
cmp::Ordering::Greater => {
// TODO? if the blocks is close, maybe we could wait for change on a watch_consensus_connections_receiver().subscribe()
if synced_connections.is_none() {
return Ok(OpenRequestResult::NotReady);
}
}
let synced_connections =
synced_connections.expect("synced_connections can't be None here");
m
let head_block_num = synced_connections.head_block.number();
let head_block_age = synced_connections.head_block.age();
// TODO: double check the logic on this. especially if only min is set
let needed_blocks_comparison = match (min_block_needed, max_block_needed) {
(None, None) => {
// no required block given. treat this like they requested the consensus head block
cmp::Ordering::Equal
}
(None, Some(max_block_needed)) => max_block_needed.cmp(head_block_num),
(Some(min_block_needed), None) => min_block_needed.cmp(head_block_num),
(Some(min_block_needed), Some(max_block_needed)) => {
match min_block_needed.cmp(max_block_needed) {
cmp::Ordering::Less | cmp::Ordering::Equal => {
min_block_needed.cmp(head_block_num)
}
cmp::Ordering::Greater => {
// TODO: force a debug log of the original request to see if our logic is wrong?
// TODO: attach the rpc_key_id so we can find the user to ask if they need help
return Err(anyhow::anyhow!(
"Invalid blocks bounds requested. min ({}) > max ({})",
min_block_needed,
max_block_needed
));
}
}
}
};
trace!("needed_blocks_comparison: {:?}", needed_blocks_comparison);
// collect "usable_rpcs_by_head_num_and_weight"
// TODO: MAKE SURE None SORTS LAST?
let mut m = BTreeMap::new();
match needed_blocks_comparison {
cmp::Ordering::Less => {
// need an old block. check all the rpcs. ignore rpcs that are still syncing
trace!("old block needed");
let min_block_age =
self.max_block_age.map(|x| head_block_age.saturating_sub(x));
let min_sync_num =
self.max_block_lag.map(|x| head_block_num.saturating_sub(x));
// TODO: cache this somehow?
// TODO: maybe have a helper on synced_connections? that way sum_soft_limits/min_synced_rpcs will be DRY
for x in self
.by_name
.read()
.values()
.filter(|x| {
// TODO: move a bunch of this onto a rpc.is_synced function
#[allow(clippy::if_same_then_else)]
if skip.contains(x) {
// we've already tried this server or have some other reason to skip it
false
} else if max_block_needed
.map(|max_block_needed| !x.has_block_data(max_block_needed))
.unwrap_or(false)
{
// server does not have the max block
trace!(
"{} does not have the max block ({:?})",
x,
max_block_needed
);
false
} else {
!min_block_needed
.map(|min_block_needed| !x.has_block_data(min_block_needed))
.unwrap_or(false)
}
})
.cloned()
{
let x_head_block = x.head_block.read().clone();
if let Some(x_head) = x_head_block {
// TODO: should nodes that are ahead of the consensus block have priority? seems better to spread the load
let x_head_num = x_head.number().min(head_block_num);
// TODO: do we really need to check head_num and age?
if let Some(min_sync_num) = min_sync_num.as_ref() {
if x_head_num < min_sync_num {
trace!("rpc is still syncing");
continue;
}
}
if let Some(min_block_age) = min_block_age {
if x_head.age() > min_block_age {
// rpc is still syncing
trace!("server's block is too old");
continue;
}
}
let key = (x.tier, Some(*x_head_num));
m.entry(key).or_insert_with(Vec::new).push(x);
}
}
// TODO: check min_synced_rpcs and min_sum_soft_limits? or maybe better to just try to serve the request?
}
cmp::Ordering::Equal => {
// using the consensus head block. filter the synced rpcs
// the key doesn't matter if we are checking synced connections
// they are all at the same block and it is already sized to what we need
let key = (0, None);
for x in synced_connections.best_rpcs.iter() {
if skip.contains(x) {
trace!("skipping: {}", x);
continue;
}
trace!("not skipped!");
m.entry(key).or_insert_with(Vec::new).push(x.clone());
}
}
cmp::Ordering::Greater => {
// TODO? if the blocks is close, maybe we could wait for change on a watch_consensus_connections_receiver().subscribe()
return Ok(OpenRequestResult::NotReady);
}
}
m
}
};
trace!(