wait for lagged rpcs. v1.43.19

This commit is contained in:
Bryan Stitt 2023-10-07 11:56:52 -07:00
parent 349be463f3
commit 9ed0c70ecd
7 changed files with 138 additions and 185 deletions

4
Cargo.lock generated

@ -6590,7 +6590,7 @@ dependencies = [
[[package]]
name = "web3_proxy"
version = "1.43.16"
version = "1.43.19"
dependencies = [
"anyhow",
"arc-swap",
@ -6672,7 +6672,7 @@ dependencies = [
[[package]]
name = "web3_proxy_cli"
version = "1.43.16"
version = "1.43.19"
dependencies = [
"env_logger",
"parking_lot",

@ -1,6 +1,6 @@
[package]
name = "web3_proxy"
version = "1.43.16"
version = "1.43.19"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

@ -139,14 +139,13 @@ impl RankedRpcs {
let num_synced = rpcs.len();
// TODO: do we need real data in here? if we are calling from_rpcs, we probably don't even track their block
let mut rpc_data = HashMap::<Arc<Web3Rpc>, ConsensusRpcData>::with_capacity(num_synced);
for rpc in rpcs.iter().cloned() {
let data = ConsensusRpcData::new(&rpc, &head_block);
rpc_data.insert(rpc, data);
}
let rpc_data = Default::default();
// TODO: do we need real data in rpc_data? if we are calling from_rpcs, we probably don't even track their block
// let mut rpc_data = HashMap::<Arc<Web3Rpc>, ConsensusRpcData>::with_capacity(num_synced);
// for rpc in rpcs.iter().cloned() {
// let data = ConsensusRpcData::new(&rpc, &head_block);
// rpc_data.insert(rpc, data);
// }
let sort_mode = SortMethod::Shuffle;
@ -245,51 +244,67 @@ impl RankedRpcs {
return None;
}
let head_block = self.head_block.number();
// these are bigger than we need, but how much does that matter?
let mut inner = Vec::with_capacity(self.num_active_rpcs());
let mut outer = Vec::with_capacity(self.num_active_rpcs());
// TODO: what if min is set to some future block?
let max_block_needed = web3_request
.max_block_needed()
.or_else(|| web3_request.head_block.as_ref().map(|x| x.number()));
let min_block_needed = web3_request.min_block_needed();
for rpc in &self.inner {
match self.rpc_will_work_eventually(rpc, min_block_needed, max_block_needed) {
ShouldWaitForBlock::NeverReady => continue,
ShouldWaitForBlock::Ready => inner.push(rpc.clone()),
ShouldWaitForBlock::Wait { .. } => outer.push(rpc.clone()),
}
}
let mut rng = nanorand::tls_rng();
// TODO: use web3_request.start_instant? I think we want it to be now
let now = Instant::now();
let max_block_needed = web3_request.max_block_needed();
match self.sort_mode {
SortMethod::Shuffle => {
// we use shuffle instead of sort. we will compare weights when iterating RankedRpcsForRequest
// if we are shuffling, it is because we don't watch the head_blocks of the rpcs
// clone all of the rpcs
self.inner.clone_into(&mut inner);
let mut rng = nanorand::tls_rng();
// we use shuffle instead of sort. we will compare weights during `RpcsForRequest::to_stream`
// TODO: use web3_request.start_instant? I think we want it to be as recent as possible
let now = Instant::now();
inner.sort_by_cached_key(|x| {
x.shuffle_for_load_balancing_on(max_block_needed, &mut rng, now)
});
outer.sort_by_cached_key(|x| {
x.shuffle_for_load_balancing_on(max_block_needed, &mut rng, now)
});
}
SortMethod::Sort => {
// we use shuffle instead of sort. we will compare weights when iterating RankedRpcsForRequest
// TODO: what if min is set to some future block?
let min_block_needed = web3_request.min_block_needed();
// TODO: max lag from config
let recent_block_needed = head_block.saturating_sub(U64::from(5));
for rpc in &self.inner {
if self.has_block_data(rpc, recent_block_needed) {
match self.rpc_will_work_eventually(rpc, min_block_needed, max_block_needed)
{
ShouldWaitForBlock::NeverReady => continue,
ShouldWaitForBlock::Ready => inner.push(rpc.clone()),
ShouldWaitForBlock::Wait { .. } => outer.push(rpc.clone()),
}
}
}
let now = Instant::now();
// we use shuffle instead of sort. we will compare weights during `RpcsForRequest::to_stream`
inner.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed, now));
outer.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed, now));
}
}
Some(RpcsForRequest {
inner,
outer,
request: web3_request.clone(),
})
if inner.is_empty() && outer.is_empty() {
warn!(?inner, ?outer, %web3_request, %head_block, "no rpcs for request");
None
} else {
trace!(?inner, ?outer, %web3_request, "for_request");
Some(RpcsForRequest {
inner,
outer,
request: web3_request.clone(),
})
}
}
pub fn all(&self) -> &[Arc<Web3Rpc>] {
@ -1001,6 +1016,7 @@ impl RpcsForRequest {
stream! {
loop {
let mut earliest_retry_at = None;
let mut wait_for_sync = None;
// first check the inners
// TODO: DRY
@ -1031,6 +1047,14 @@ impl RpcsForRequest {
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
continue;
}
Ok(OpenRequestResult::Lagged(..)) => {
trace!("{} is lagged. will not work now", best_rpc);
// this will probably always be the same block, right?
if wait_for_sync.is_none() {
wait_for_sync = Some(best_rpc);
}
continue;
}
Ok(OpenRequestResult::NotReady) => {
// TODO: log a warning? emit a stat?
trace!("best_rpc not ready: {}", best_rpc);
@ -1068,6 +1092,7 @@ impl RpcsForRequest {
}
}
// TODO: if block_needed, do something with it here. not sure how best to subscribe
if let Some(retry_at) = earliest_retry_at {
if self.request.expire_instant <= retry_at {
break;

@ -595,139 +595,6 @@ impl Web3Rpcs {
.into())
}
// /// be sure there is a timeout on this or it might loop forever
// #[allow(clippy::too_many_arguments)]
// pub async fn xxx_try_send_all_synced_connections<R: JsonRpcResultData>(
// self: &Arc<Self>,
// web3_request: &Arc<Web3Request>,
// max_wait: Option<Duration>,
// error_level: Option<RequestErrorHandler>,
// max_sends: Option<usize>,
// ) -> Web3ProxyResult<R> {
// let mut watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe();
// // todo!() we are inconsistent with max_wait and web3_request.expires_at
// let start = Instant::now();
// loop {
// if let Some(max_wait) = max_wait {
// if start.elapsed() > max_wait {
// break;
// }
// }
// match self
// .all_connections(web3_request, max_sends, error_level)
// .await
// {
// Ok(active_request_handles) => {
// let mut only_backups_used = true;
// web3_request
// .backend_requests
// .lock()
// .extend(active_request_handles.iter().map(|x| {
// let rpc = x.clone_connection();
// if !rpc.backup {
// // TODO: even if a backup is included, it is possible the response is still from a primary connection. think about this more
// only_backups_used = false;
// }
// rpc
// }));
// warn!("move this to where we turn RequestMetadata into a Stat");
// web3_request
// .response_from_backup_rpc
// .store(only_backups_used, Ordering::Relaxed);
// let x = self
// .try_send_parallel_requests(active_request_handles, max_wait)
// .await?;
// // TODO: count the number of successes and possibly retry if there weren't enough
// return Ok(x);
// }
// Err(None) => {
// warn!(
// ?self,
// min_block_needed=?web3_request.min_block_needed(),
// max_block_needed=?web3_request.max_block_needed(),
// "No servers in sync on! Retrying",
// );
// // TODO: if this times out, i think we drop this
// web3_request.no_servers.fetch_add(1, Ordering::Relaxed);
// let max_sleep = if let Some(max_wait) = max_wait {
// start + max_wait
// } else {
// break;
// };
// select! {
// _ = sleep_until(max_sleep) => {
// // rpcs didn't change and we have waited too long. break to return an error
// warn!(?self, "timeout waiting for try_send_all_synced_connections!");
// break;
// },
// _ = watch_consensus_rpcs.changed() => {
// // consensus rpcs changed!
// watch_consensus_rpcs.borrow_and_update();
// // continue to try again
// continue;
// }
// }
// }
// Err(Some(retry_at)) => {
// web3_request.no_servers.fetch_add(1, Ordering::Relaxed);
// if let Some(max_wait) = max_wait {
// if start.elapsed() > max_wait {
// warn!(
// ?self,
// "All rate limits exceeded. And sleeping would take too long"
// );
// break;
// }
// warn!("All rate limits exceeded. Sleeping");
// // TODO: only make one of these sleep_untils
// let break_at = start + max_wait;
// if break_at <= retry_at {
// select! {
// _ = sleep_until(break_at) => {break}
// _ = watch_consensus_rpcs.changed() => {
// watch_consensus_rpcs.borrow_and_update();
// }
// }
// } else {
// select! {
// _ = sleep_until(retry_at) => {}
// _ = watch_consensus_rpcs.changed() => {
// watch_consensus_rpcs.borrow_and_update();
// }
// }
// }
// continue;
// } else {
// warn!(?self, "all rate limits exceeded");
// break;
// }
// }
// }
// }
// Err(Web3ProxyError::NoServersSynced)
// }
#[allow(clippy::too_many_arguments)]
pub async fn try_proxy_connection<R: JsonRpcResultData>(
&self,

@ -252,7 +252,8 @@ impl Web3Rpc {
/// TODO: tests on this!
/// TODO: should tier or block number take priority?
/// TODO: should this return a struct that implements sorting traits?
/// TODO: move this to consensus.rs
/// TODO: better return type!
/// TODO: move this to consensus.rs?
fn sort_on(
&self,
max_block: Option<U64>,
@ -281,6 +282,7 @@ impl Web3Rpc {
/// This is useful when you care about latency over spreading the load
/// For example, use this when selecting rpcs for balanced_rpcs
/// TODO: move this to consensus.rs?
/// TODO: better return type!
pub fn sort_for_load_balancing_on(
&self,
max_block: Option<U64>,
@ -292,9 +294,13 @@ impl Web3Rpc {
let sort_on = self.sort_on(max_block, start_instant);
// TODO: i think median is better than weighted at this point. we save checking weighted for the very end
let median_latency = OrderedFloat::from(self.median_latency.as_ref().unwrap().seconds());
let median_latency = self
.median_latency
.as_ref()
.map(|x| x.seconds())
.unwrap_or_default();
let x = (sort_on, median_latency);
let x = (sort_on, OrderedFloat::from(median_latency));
trace!("sort_for_load_balancing {}: {:?}", self, x);
@ -304,8 +310,8 @@ impl Web3Rpc {
/// like sort_for_load_balancing, but shuffles tiers randomly instead of sorting by weighted_peak_latency
/// This is useful when you care about spreading the load over latency.
/// For example, use this when selecting rpcs for protected_rpcs
/// TODO: move this to consensus.rs
/// TODO: this return type is too complex
/// TODO: move this to consensus.rs?
/// TODO: better return type
pub fn shuffle_for_load_balancing_on(
&self,
max_block: Option<U64>,
@ -458,7 +464,8 @@ impl Web3Rpc {
}
true
} else {
false
// do we want true or false here? false is accurate, but it stops the proxy from sending any requests so I think we want to lie
true
}
}
@ -872,8 +879,6 @@ impl Web3Rpc {
break;
}
// this should always work
// todo!("this has a bug! it gets full very quickly when no one is subscribed!");
pending_txid_firehose.send(x).await;
}
} else {
@ -1017,6 +1022,14 @@ impl Web3Rpc {
sleep_until(retry_at).await;
}
Ok(OpenRequestResult::Lagged(now_synced_f)) => {
select! {
_ = now_synced_f => {}
_ = sleep_until(web3_request.expire_instant) => {
break;
}
}
}
Ok(OpenRequestResult::NotReady) => {
// TODO: when can this happen? log? emit a stat?
trace!("{} has no handle ready", self);
@ -1119,11 +1132,54 @@ impl Web3Rpc {
web3_request: &Arc<Web3Request>,
error_handler: Option<RequestErrorHandler>,
) -> Web3ProxyResult<OpenRequestResult> {
// TODO: check a boolean set by health checks?
// TODO: if websocket is reconnecting, return an error?
// TODO: if this server can't handle this request because it isn't synced, return an error
// make sure this block has the oldest block that this request needs
// TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check
if let Some(block_needed) = web3_request.min_block_needed() {
if !self.has_block_data(block_needed) {
return Ok(OpenRequestResult::NotReady);
}
}
// check shared rate limits
// make sure this block has the newest block that this request needs
// TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check
if let Some(block_needed) = web3_request.max_block_needed() {
if !self.has_block_data(block_needed) {
let clone = self.clone();
let expire_instant = web3_request.expire_instant;
let synced_f = async move {
let mut head_block_receiver =
clone.head_block_sender.as_ref().unwrap().subscribe();
// TODO: if head_block is far behind block_needed, retrurn now
loop {
select! {
_ = head_block_receiver.changed() => {
if let Some(head_block) = head_block_receiver.borrow_and_update().clone() {
if head_block.number() >= block_needed {
// the block we needed has arrived!
break;
}
}
}
_ = sleep_until(expire_instant) => {
return Err(Web3ProxyError::NoServersSynced);
}
}
}
Ok(())
};
return Ok(OpenRequestResult::Lagged(Box::pin(synced_f)));
}
}
// check rate limits
match self.try_throttle().await? {
RedisRateLimitResult::Allowed(_) => {}
RedisRateLimitResult::RetryAt(retry_at, _) => {

@ -10,21 +10,26 @@ use entities::revert_log;
use entities::sea_orm_active_enums::Method;
use ethers::providers::ProviderError;
use ethers::types::{Address, Bytes};
use futures::Future;
use http::StatusCode;
use migration::sea_orm::{self, ActiveEnum, ActiveModelTrait};
use nanorand::Rng;
use serde_json::json;
use std::pin::Pin;
use std::sync::atomic;
use std::sync::Arc;
use tokio::time::{Duration, Instant};
use tracing::{debug, error, info, trace, warn, Level};
#[derive(Debug, From)]
#[derive(From)]
pub enum OpenRequestResult {
Handle(OpenRequestHandle),
/// Unable to start a request. Retry at the given time.
RetryAt(Instant),
/// Unable to start a request because no servers are synced
/// The rpc are not synced, but they should be soon.
/// You should wait for the given block number.
Lagged(Pin<Box<dyn Future<Output = Web3ProxyResult<()>> + Send>>),
/// Unable to start a request because no servers are synced or the necessary data has been pruned
NotReady,
}

@ -1,6 +1,6 @@
[package]
name = "web3_proxy_cli"
version = "1.43.16"
version = "1.43.19"
edition = "2021"
default-run = "web3_proxy_cli"