From 9c584354d9ab8c3f1d623fb00094f9d6191e6d16 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 18 May 2023 13:51:28 -0700 Subject: [PATCH] add to skip list earlier --- web3_proxy/src/frontend/users/payment.rs | 6 ++--- web3_proxy/src/rpcs/consensus.rs | 22 ++++++++--------- web3_proxy/src/rpcs/many.rs | 31 ++++++++++++++---------- 3 files changed, 32 insertions(+), 27 deletions(-) diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index b49c3705..fb2f0d91 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -153,7 +153,7 @@ pub async fn user_balance_post( // Just make an rpc request, idk if i need to call this super extensive code let transaction_receipt: TransactionReceipt = match app .balanced_rpcs - .best_available_rpc(&authorization, None, &[], None, None) + .best_available_rpc(&authorization, None, &mut vec![], None, None) .await { Ok(OpenRequestResult::Handle(handle)) => { @@ -188,7 +188,7 @@ pub async fn user_balance_post( debug!("Transaction receipt is: {:?}", transaction_receipt); let accepted_token: Address = match app .balanced_rpcs - .best_available_rpc(&authorization, None, &[], None, None) + .best_available_rpc(&authorization, None, &mut vec![], None, None) .await { Ok(OpenRequestResult::Handle(handle)) => { @@ -243,7 +243,7 @@ pub async fn user_balance_post( debug!("Accepted token is: {:?}", accepted_token); let decimals: u32 = match app .balanced_rpcs - .best_available_rpc(&authorization, None, &[], None, None) + .best_available_rpc(&authorization, None, &mut vec![], None, None) .await { Ok(OpenRequestResult::Handle(handle)) => { diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 1502966c..2ca28519 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -130,18 +130,18 @@ impl ConsensusWeb3Rpcs { skip_rpcs: &[Arc], ) -> ShouldWaitForBlock { // TODO: i think checking synced is always a waste of time. though i guess there could be a race - // if self - // .head_rpcs - // .iter() - // .any(|rpc| self.rpc_will_work_eventually(rpc, needed_block_num, skip_rpcs)) - // { - // let head_num = self.head_block.number(); + if self + .head_rpcs + .iter() + .any(|rpc| self.rpc_will_work_eventually(rpc, needed_block_num, skip_rpcs)) + { + let head_num = self.head_block.number(); - // if Some(head_num) >= needed_block_num { - // debug!("best (head) block: {}", head_num); - // return ShouldWaitForBlock::Ready; - // } - // } + if Some(head_num) >= needed_block_num { + debug!("best (head) block: {}", head_num); + return ShouldWaitForBlock::Ready; + } + } // all of the head rpcs are skipped diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 6311b5b7..5921f895 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -495,7 +495,7 @@ impl Web3Rpcs { &self, authorization: &Arc, request_metadata: Option<&Arc>, - skip: &[Arc], + skip: &mut Vec>, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, ) -> Web3ProxyResult { @@ -592,6 +592,8 @@ impl Web3Rpcs { let best_rpc = min_by_key(rpc_a, rpc_b, |x| x.peak_ewma()); trace!("{:?} - winner: {}", request_ulid, best_rpc); + skip.push(best_rpc.clone()); + // just because it has lower latency doesn't mean we are sure to get a connection match best_rpc.try_request_handle(authorization, None).await { Ok(OpenRequestResult::Handle(handle)) => { @@ -805,7 +807,7 @@ impl Web3Rpcs { .best_available_rpc( authorization, request_metadata, - &skip_rpcs, + &mut skip_rpcs, min_block_needed, max_block_needed, ) @@ -822,9 +824,6 @@ impl Web3Rpcs { let is_backup_response = rpc.backup; - // TODO: instead of entirely skipping, maybe demote a tier? - skip_rpcs.push(rpc); - // TODO: get the log percent from the user data let response_result: Result, _> = active_request_handle .request( @@ -1493,7 +1492,7 @@ mod tests { .best_available_rpc( &authorization, None, - &[], + &mut vec![], Some(head_block.number.as_ref().unwrap()), None, ) @@ -1587,28 +1586,28 @@ mod tests { // TODO: make sure the handle is for the expected rpc assert!(matches!( - rpcs.best_available_rpc(&authorization, None, &[], None, None) + rpcs.best_available_rpc(&authorization, None, &mut vec![], None, None) .await, Ok(OpenRequestResult::Handle(_)) )); // TODO: make sure the handle is for the expected rpc assert!(matches!( - rpcs.best_available_rpc(&authorization, None, &[], Some(&0.into()), None) + rpcs.best_available_rpc(&authorization, None, &mut vec![], Some(&0.into()), None) .await, Ok(OpenRequestResult::Handle(_)) )); // TODO: make sure the handle is for the expected rpc assert!(matches!( - rpcs.best_available_rpc(&authorization, None, &[], Some(&1.into()), None) + rpcs.best_available_rpc(&authorization, None, &mut vec![], Some(&1.into()), None) .await, Ok(OpenRequestResult::Handle(_)) )); // future block should not get a handle let future_rpc = rpcs - .best_available_rpc(&authorization, None, &[], Some(&2.into()), None) + .best_available_rpc(&authorization, None, &mut vec![], Some(&2.into()), None) .await; assert!(matches!(future_rpc, Ok(OpenRequestResult::NotReady))); } @@ -1733,7 +1732,13 @@ mod tests { // best_synced_backend_connection requires servers to be synced with the head block // TODO: test with and without passing the head_block.number? let best_available_server = rpcs - .best_available_rpc(&authorization, None, &[], Some(head_block.number()), None) + .best_available_rpc( + &authorization, + None, + &mut vec![], + Some(head_block.number()), + None, + ) .await; debug!("best_available_server: {:#?}", best_available_server); @@ -1744,13 +1749,13 @@ mod tests { )); let _best_available_server_from_none = rpcs - .best_available_rpc(&authorization, None, &[], None, None) + .best_available_rpc(&authorization, None, &mut vec![], None, None) .await; // assert_eq!(best_available_server, best_available_server_from_none); let best_archive_server = rpcs - .best_available_rpc(&authorization, None, &[], Some(&1.into()), None) + .best_available_rpc(&authorization, None, &mut vec![], Some(&1.into()), None) .await; match best_archive_server {