diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 33e03177..cc671ab7 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -420,7 +420,7 @@ impl Web3Rpcs { unimplemented!("this shouldn't be possible") } - pub async fn best_consensus_head_connection( + pub async fn best_available_rpc( &self, authorization: &Arc, request_metadata: Option<&Arc>, @@ -436,8 +436,6 @@ impl Web3Rpcs { if let Some(head_block) = synced_connections.head_block.as_ref() { (head_block.number(), head_block.age()) } else { - // TODO: optionally wait for a head_block.number() >= min_block_needed - // TODO: though i think that wait would actually need to be earlier in the request return Ok(OpenRequestResult::NotReady); }; @@ -463,6 +461,8 @@ impl Web3Rpcs { } }; + trace!("needed_blocks_comparison: {:?}", needed_blocks_comparison); + // collect "usable_rpcs_by_head_num_and_weight" // TODO: MAKE SURE None SORTS LAST? let mut m = BTreeMap::new(); @@ -470,6 +470,7 @@ impl Web3Rpcs { match needed_blocks_comparison { cmp::Ordering::Less => { // need an old block. check all the rpcs. ignore rpcs that are still syncing + trace!("old block needed"); let min_block_age = self.max_block_age.map(|x| head_block_age.saturating_sub(x)); @@ -517,12 +518,14 @@ impl Web3Rpcs { // TODO: do we really need to check head_num and age? if let Some(min_sync_num) = min_sync_num.as_ref() { if x_head_num < min_sync_num { + trace!("rpc is still syncing"); continue; } } if let Some(min_block_age) = min_block_age { - if x_head.age() < min_block_age { + if x_head.age() > min_block_age { // rpc is still syncing + trace!("block is too old"); continue; } } @@ -536,12 +539,22 @@ impl Web3Rpcs { // TODO: check min_synced_rpcs and min_sum_soft_limits? or maybe better to just try to serve the request? } cmp::Ordering::Equal => { - // need the consensus head block. filter the synced rpcs - for x in synced_connections.rpcs.iter().filter(|x| !skip.contains(x)) { - // the key doesn't matter if we are checking synced connections. its already sized to what we need - let key = (0, None); + // using the consensus head block. filter the synced rpcs - m.entry(key).or_insert_with(Vec::new).push(x.clone()); + // the key doesn't matter if we are checking synced connections + // they are all at the same block and it is already sized to what we need + let key = (0, None); + + for x in synced_connections.rpcs.iter() { + if skip.contains(x) { + trace!("skipping: {}", x); + continue; + } + trace!("not skipped!"); + + m.entry(key.clone()) + .or_insert_with(Vec::new) + .push(x.clone()); } } cmp::Ordering::Greater => { @@ -553,6 +566,11 @@ impl Web3Rpcs { m }; + trace!( + "usable_rpcs_by_tier_and_head_number: {:#?}", + usable_rpcs_by_tier_and_head_number + ); + let mut earliest_retry_at = None; for mut usable_rpcs in usable_rpcs_by_tier_and_head_number.into_values() { @@ -570,9 +588,13 @@ impl Web3Rpcs { // pick the first two and try the one with the lower rpc.latency.ewma // TODO: chunks or tuple windows? for (rpc_a, rpc_b) in usable_rpcs.into_iter().circular_tuple_windows() { + trace!("{} vs {}", rpc_a, rpc_b); + // TODO: cached key to save a read lock + // TODO: ties to the server with the smallest block_data_limit let best_rpc = min_by_key(rpc_a, rpc_b, |x| { OrderedFloat(x.request_latency.read().ewma.value()) }); + trace!("winner: {}", best_rpc); // just because it has lower latency doesn't mean we are sure to get a connection match best_rpc.try_request_handle(authorization, None).await { @@ -585,6 +607,7 @@ impl Web3Rpcs { } Ok(OpenRequestResult::NotReady) => { // TODO: log a warning? emit a stat? + trace!("best_rpc not ready"); } Err(err) => { warn!("No request handle for {}. err={:?}", best_rpc, err) @@ -771,7 +794,7 @@ impl Web3Rpcs { } match self - .best_consensus_head_connection( + .best_available_rpc( authorization, request_metadata, &skip_rpcs, @@ -922,65 +945,34 @@ impl Web3Rpcs { request_metadata.no_servers.fetch_add(1, Ordering::Release); } - // todo!( - // "check if we are requesting an old block and no archive servers are synced" - // ); - - if let Some(min_block_needed) = min_block_needed { - let mut theres_a_chance = false; - - for potential_conn in self.by_name.values() { - if skip_rpcs.contains(potential_conn) { - continue; - } - - // TODO: should we instead check if has_block_data but with the current head block? - if potential_conn.has_block_data(min_block_needed) { - trace!("chance for {} on {}", min_block_needed, potential_conn); - theres_a_chance = true; - break; - } - - skip_rpcs.push(potential_conn.clone()); - } - - if !theres_a_chance { - debug!("no chance of finding data in block #{}", min_block_needed); - break; - } - } - - debug!("No servers ready. Waiting up for change in synced servers"); - - watch_consensus_connections.changed().await?; - watch_consensus_connections.borrow_and_update(); + break; } } } - if let Some(r) = method_not_available_response { - // TODO: emit a stat for unsupported methods? - return Ok(r); - } - - // TODO: do we need this here, or do we do it somewhere else? + // TODO: do we need this here, or do we do it somewhere else? like, the code could change and a try operator in here would skip this increment if let Some(request_metadata) = request_metadata { request_metadata .error_response .store(true, Ordering::Release); } + if let Some(r) = method_not_available_response { + // TODO: emit a stat for unsupported methods? it would be best to block them at the proxy instead of at the backend + return Ok(r); + } + let num_conns = self.by_name.len(); let num_skipped = skip_rpcs.len(); if num_skipped == 0 { - error!("No servers synced ({} known)", num_conns); + error!("No servers synced ({} known). None skipped", num_conns); - return Ok(JsonRpcForwardedResponse::from_str( + Ok(JsonRpcForwardedResponse::from_str( "No servers synced", Some(-32000), Some(request.id), - )); + )) } else { // TODO: warn? debug? trace? warn!( @@ -990,11 +982,11 @@ impl Web3Rpcs { // TODO: what error code? // cloudflare gives {"jsonrpc":"2.0","error":{"code":-32043,"message":"Requested data cannot be older than 128 blocks."},"id":1} - return Ok(JsonRpcForwardedResponse::from_str( + Ok(JsonRpcForwardedResponse::from_str( "Requested data is not available", Some(-32043), Some(request.id), - )); + )) } } @@ -1396,7 +1388,7 @@ mod tests { // best_synced_backend_connection requires servers to be synced with the head block let x = rpcs - .best_consensus_head_connection(&authorization, None, &[], None, None) + .best_available_rpc(&authorization, None, &[], None, None) .await .unwrap(); @@ -1447,29 +1439,28 @@ mod tests { assert_eq!(rpcs.num_synced_rpcs(), 1); assert!(matches!( - rpcs.best_consensus_head_connection(&authorization, None, &[], None, None) + rpcs.best_available_rpc(&authorization, None, &[], None, None) .await, Ok(OpenRequestResult::Handle(_)) )); assert!(matches!( - rpcs.best_consensus_head_connection(&authorization, None, &[], Some(&0.into()), None) + rpcs.best_available_rpc(&authorization, None, &[], Some(&0.into()), None) .await, Ok(OpenRequestResult::Handle(_)) )); assert!(matches!( - rpcs.best_consensus_head_connection(&authorization, None, &[], Some(&1.into()), None) + rpcs.best_available_rpc(&authorization, None, &[], Some(&1.into()), None) .await, Ok(OpenRequestResult::Handle(_)) )); // future block should not get a handle - assert!(matches!( - rpcs.best_consensus_head_connection(&authorization, None, &[], Some(&2.into()), None) - .await, - Ok(OpenRequestResult::NotReady) - )); + let future_rpc = rpcs + .best_available_rpc(&authorization, None, &[], Some(&2.into()), None) + .await; + assert!(matches!(future_rpc, Ok(OpenRequestResult::NotReady))); } #[tokio::test] @@ -1505,6 +1496,7 @@ mod tests { block_data_limit: 64.into(), tier: 1, head_block: RwLock::new(Some(head_block.clone())), + provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), ..Default::default() }; @@ -1516,6 +1508,7 @@ mod tests { block_data_limit: u64::MAX.into(), tier: 2, head_block: RwLock::new(Some(head_block.clone())), + provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), ..Default::default() }; @@ -1583,25 +1576,25 @@ mod tests { // best_synced_backend_connection requires servers to be synced with the head block // TODO: test with and without passing the head_block.number? - let best_head_server = rpcs - .best_consensus_head_connection( - &authorization, - None, - &[], - Some(&head_block.number()), - None, - ) + let best_available_server = rpcs + .best_available_rpc(&authorization, None, &[], Some(&head_block.number()), None) .await; - debug!("best_head_server: {:#?}", best_head_server); + debug!("best_available_server: {:#?}", best_available_server); assert!(matches!( - best_head_server.unwrap(), + best_available_server.unwrap(), OpenRequestResult::Handle(_) )); + let best_available_server_from_none = rpcs + .best_available_rpc(&authorization, None, &[], None, None) + .await; + + // assert_eq!(best_available_server, best_available_server_from_none); + let best_archive_server = rpcs - .best_consensus_head_connection(&authorization, None, &[], Some(&1.into()), None) + .best_available_rpc(&authorization, None, &[], Some(&1.into()), None) .await; match best_archive_server {