diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 3d944ac0..b964a9d9 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -492,142 +492,162 @@ impl Web3Rpcs { max_block_needed: Option<&U64>, ) -> anyhow::Result { let usable_rpcs_by_tier_and_head_number: BTreeMap<(u64, Option), Vec>> = { - let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone(); + if self.watch_consensus_head_sender.is_none() { + // pick any server + let mut m = BTreeMap::new(); - if synced_connections.is_none() { - return Ok(OpenRequestResult::NotReady); - } - let synced_connections = - synced_connections.expect("synced_connections can't be None here"); + let key = (0, None); - let head_block_num = synced_connections.head_block.number(); - let head_block_age = synced_connections.head_block.age(); - - // TODO: double check the logic on this. especially if only min is set - let needed_blocks_comparison = match (min_block_needed, max_block_needed) { - (None, None) => { - // no required block given. treat this like they requested the consensus head block - cmp::Ordering::Equal - } - (None, Some(max_block_needed)) => max_block_needed.cmp(head_block_num), - (Some(min_block_needed), None) => min_block_needed.cmp(head_block_num), - (Some(min_block_needed), Some(max_block_needed)) => { - match min_block_needed.cmp(max_block_needed) { - cmp::Ordering::Less | cmp::Ordering::Equal => { - min_block_needed.cmp(head_block_num) - } - cmp::Ordering::Greater => { - // TODO: force a debug log of the original request to see if our logic is wrong? - // TODO: attach the rpc_key_id so we can find the user to ask if they need help - return Err(anyhow::anyhow!( - "Invalid blocks bounds requested. min ({}) > max ({})", - min_block_needed, - max_block_needed - )); - } + for x in self.by_name.read().values() { + if skip.contains(x) { + trace!("skipping: {}", x); + continue; } + trace!("not skipped!"); + + m.entry(key).or_insert_with(Vec::new).push(x.clone()); } - }; - trace!("needed_blocks_comparison: {:?}", needed_blocks_comparison); + m + } else { + let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone(); - // collect "usable_rpcs_by_head_num_and_weight" - // TODO: MAKE SURE None SORTS LAST? - let mut m = BTreeMap::new(); - - match needed_blocks_comparison { - cmp::Ordering::Less => { - // need an old block. check all the rpcs. ignore rpcs that are still syncing - trace!("old block needed"); - - let min_block_age = - self.max_block_age.map(|x| head_block_age.saturating_sub(x)); - let min_sync_num = self.max_block_lag.map(|x| head_block_num.saturating_sub(x)); - - // TODO: cache this somehow? - // TODO: maybe have a helper on synced_connections? that way sum_soft_limits/min_synced_rpcs will be DRY - for x in self - .by_name - .read() - .values() - .filter(|x| { - // TODO: move a bunch of this onto a rpc.is_synced function - #[allow(clippy::if_same_then_else)] - if skip.contains(x) { - // we've already tried this server or have some other reason to skip it - false - } else if max_block_needed - .map(|max_block_needed| !x.has_block_data(max_block_needed)) - .unwrap_or(false) - { - // server does not have the max block - trace!( - "{} does not have the max block ({:?})", - x, - max_block_needed - ); - false - } else { - !min_block_needed - .map(|min_block_needed| !x.has_block_data(min_block_needed)) - .unwrap_or(false) - } - }) - .cloned() - { - let x_head_block = x.head_block.read().clone(); - - if let Some(x_head) = x_head_block { - // TODO: should nodes that are ahead of the consensus block have priority? seems better to spread the load - let x_head_num = x_head.number().min(head_block_num); - - // TODO: do we really need to check head_num and age? - if let Some(min_sync_num) = min_sync_num.as_ref() { - if x_head_num < min_sync_num { - trace!("rpc is still syncing"); - continue; - } - } - if let Some(min_block_age) = min_block_age { - if x_head.age() > min_block_age { - // rpc is still syncing - trace!("server's block is too old"); - continue; - } - } - - let key = (x.tier, Some(*x_head_num)); - - m.entry(key).or_insert_with(Vec::new).push(x); - } - } - - // TODO: check min_synced_rpcs and min_sum_soft_limits? or maybe better to just try to serve the request? - } - cmp::Ordering::Equal => { - // using the consensus head block. filter the synced rpcs - - // the key doesn't matter if we are checking synced connections - // they are all at the same block and it is already sized to what we need - let key = (0, None); - - for x in synced_connections.best_rpcs.iter() { - if skip.contains(x) { - trace!("skipping: {}", x); - continue; - } - trace!("not skipped!"); - - m.entry(key).or_insert_with(Vec::new).push(x.clone()); - } - } - cmp::Ordering::Greater => { - // TODO? if the blocks is close, maybe we could wait for change on a watch_consensus_connections_receiver().subscribe() + if synced_connections.is_none() { return Ok(OpenRequestResult::NotReady); } - } + let synced_connections = + synced_connections.expect("synced_connections can't be None here"); - m + let head_block_num = synced_connections.head_block.number(); + let head_block_age = synced_connections.head_block.age(); + + // TODO: double check the logic on this. especially if only min is set + let needed_blocks_comparison = match (min_block_needed, max_block_needed) { + (None, None) => { + // no required block given. treat this like they requested the consensus head block + cmp::Ordering::Equal + } + (None, Some(max_block_needed)) => max_block_needed.cmp(head_block_num), + (Some(min_block_needed), None) => min_block_needed.cmp(head_block_num), + (Some(min_block_needed), Some(max_block_needed)) => { + match min_block_needed.cmp(max_block_needed) { + cmp::Ordering::Less | cmp::Ordering::Equal => { + min_block_needed.cmp(head_block_num) + } + cmp::Ordering::Greater => { + // TODO: force a debug log of the original request to see if our logic is wrong? + // TODO: attach the rpc_key_id so we can find the user to ask if they need help + return Err(anyhow::anyhow!( + "Invalid blocks bounds requested. min ({}) > max ({})", + min_block_needed, + max_block_needed + )); + } + } + } + }; + + trace!("needed_blocks_comparison: {:?}", needed_blocks_comparison); + + // collect "usable_rpcs_by_head_num_and_weight" + // TODO: MAKE SURE None SORTS LAST? + let mut m = BTreeMap::new(); + + match needed_blocks_comparison { + cmp::Ordering::Less => { + // need an old block. check all the rpcs. ignore rpcs that are still syncing + trace!("old block needed"); + + let min_block_age = + self.max_block_age.map(|x| head_block_age.saturating_sub(x)); + let min_sync_num = + self.max_block_lag.map(|x| head_block_num.saturating_sub(x)); + + // TODO: cache this somehow? + // TODO: maybe have a helper on synced_connections? that way sum_soft_limits/min_synced_rpcs will be DRY + for x in self + .by_name + .read() + .values() + .filter(|x| { + // TODO: move a bunch of this onto a rpc.is_synced function + #[allow(clippy::if_same_then_else)] + if skip.contains(x) { + // we've already tried this server or have some other reason to skip it + false + } else if max_block_needed + .map(|max_block_needed| !x.has_block_data(max_block_needed)) + .unwrap_or(false) + { + // server does not have the max block + trace!( + "{} does not have the max block ({:?})", + x, + max_block_needed + ); + false + } else { + !min_block_needed + .map(|min_block_needed| !x.has_block_data(min_block_needed)) + .unwrap_or(false) + } + }) + .cloned() + { + let x_head_block = x.head_block.read().clone(); + + if let Some(x_head) = x_head_block { + // TODO: should nodes that are ahead of the consensus block have priority? seems better to spread the load + let x_head_num = x_head.number().min(head_block_num); + + // TODO: do we really need to check head_num and age? + if let Some(min_sync_num) = min_sync_num.as_ref() { + if x_head_num < min_sync_num { + trace!("rpc is still syncing"); + continue; + } + } + if let Some(min_block_age) = min_block_age { + if x_head.age() > min_block_age { + // rpc is still syncing + trace!("server's block is too old"); + continue; + } + } + + let key = (x.tier, Some(*x_head_num)); + + m.entry(key).or_insert_with(Vec::new).push(x); + } + } + + // TODO: check min_synced_rpcs and min_sum_soft_limits? or maybe better to just try to serve the request? + } + cmp::Ordering::Equal => { + // using the consensus head block. filter the synced rpcs + + // the key doesn't matter if we are checking synced connections + // they are all at the same block and it is already sized to what we need + let key = (0, None); + + for x in synced_connections.best_rpcs.iter() { + if skip.contains(x) { + trace!("skipping: {}", x); + continue; + } + trace!("not skipped!"); + + m.entry(key).or_insert_with(Vec::new).push(x.clone()); + } + } + cmp::Ordering::Greater => { + // TODO? if the blocks is close, maybe we could wait for change on a watch_consensus_connections_receiver().subscribe() + return Ok(OpenRequestResult::NotReady); + } + } + + m + } }; trace!(