From 694e552b5db01dc2338ea367a4f2c36e7eedbb20 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 24 Jan 2023 20:44:50 -0800 Subject: [PATCH] improve waiting for sync when rate limited --- TODO.md | 1 + web3_proxy/src/rpcs/blockchain.rs | 10 +-- web3_proxy/src/rpcs/connection.rs | 68 ++++++++++++++++----- web3_proxy/src/rpcs/connections.rs | 97 ++++++++++++++++-------------- web3_proxy/src/rpcs/request.rs | 36 ++++++++--- 5 files changed, 138 insertions(+), 74 deletions(-) diff --git a/TODO.md b/TODO.md index 986c4816..e1b8711c 100644 --- a/TODO.md +++ b/TODO.md @@ -321,6 +321,7 @@ These are not yet ordered. There might be duplicates. We might not actually need - [x] send sentryd errors to pagerduty - [x] improve handling of unknown methods - [x] don't send pagerduty alerts for websocket panics +- [x] improve waiting for sync when rate limited - [-] proxy mode for benchmarking all backends - [-] proxy mode for sending to multiple backends - [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 199fb65b..bcda8579 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -167,13 +167,7 @@ impl Web3Connections { // TODO: request_metadata? maybe we should put it in the authorization? // TODO: think more about this wait_for_sync let response = self - .try_send_best_consensus_head_connection( - authorization, - request, - None, - None, - true, - ) + .try_send_best_consensus_head_connection(authorization, request, None, None) .await?; let block = response.result.context("failed fetching block")?; @@ -260,7 +254,7 @@ impl Web3Connections { // TODO: if error, retry? // TODO: request_metadata or authorization? let response = self - .try_send_best_consensus_head_connection(authorization, request, None, Some(num), true) + .try_send_best_consensus_head_connection(authorization, request, None, Some(num)) .await?; let raw_block = response.result.context("no block result")?; diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index bfb8a9a3..e56a4448 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -24,22 +24,22 @@ use std::sync::atomic::{self, AtomicU32, AtomicU64}; use std::{cmp::Ordering, sync::Arc}; use thread_fast_rng::rand::Rng; use thread_fast_rng::thread_fast_rng; -use tokio::sync::{broadcast, oneshot, RwLock as AsyncRwLock}; +use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock}; use tokio::time::{interval, sleep, sleep_until, timeout, Duration, Instant, MissedTickBehavior}; // TODO: maybe provider state should have the block data limit in it. but it is inside an async lock and we can't Serialize then #[derive(Clone, Debug)] pub enum ProviderState { None, - NotReady(Arc), - Ready(Arc), + Connecting(Arc), + Connected(Arc), } impl ProviderState { pub async fn provider(&self, allow_not_ready: bool) -> Option<&Arc> { match self { ProviderState::None => None, - ProviderState::NotReady(x) => { + ProviderState::Connecting(x) => { if allow_not_ready { Some(x) } else { @@ -47,7 +47,7 @@ impl ProviderState { None } } - ProviderState::Ready(x) => { + ProviderState::Connected(x) => { if x.ready() { Some(x) } else { @@ -76,6 +76,8 @@ pub struct Web3Connection { /// provider is in a RwLock so that we can replace it if re-connecting /// it is an async lock because we hold it open across awaits pub(super) provider_state: AsyncRwLock, + /// keep track of hard limits + pub(super) hard_limit_until: Option>, /// rate limits are stored in a central redis so that multiple proxies can share their rate limits /// We do not use the deferred rate limiter because going over limits would cause errors pub(super) hard_limit: Option, @@ -136,6 +138,16 @@ impl Web3Connection { let automatic_block_limit = (block_data_limit.load(atomic::Ordering::Acquire) == 0) && block_sender.is_some(); + // track hard limit until on backup servers (which might surprise us with rate limit changes) + // and track on servers that have a configured hard limit + let hard_limit_until = if backup || hard_limit.is_some() { + let (sender, _) = watch::channel(Instant::now()); + + Some(sender) + } else { + None + }; + let new_connection = Self { name, db_conn: db_conn.clone(), @@ -147,6 +159,7 @@ impl Web3Connection { internal_requests: 0.into(), provider_state: AsyncRwLock::new(ProviderState::None), hard_limit, + hard_limit_until, soft_limit, automatic_block_limit, backup, @@ -376,7 +389,7 @@ impl Web3Connection { ProviderState::None => { info!("connecting to {}", self); } - ProviderState::NotReady(provider) | ProviderState::Ready(provider) => { + ProviderState::Connecting(provider) | ProviderState::Connected(provider) => { // disconnect the current provider if let Web3Provider::Mock = provider.as_ref() { return Ok(()); @@ -410,7 +423,7 @@ impl Web3Connection { let new_provider = Web3Provider::from_str(&self.url, self.http_client.clone()).await?; // trace!("saving provider state as NotReady on {}", self); - *provider_state = ProviderState::NotReady(Arc::new(new_provider)); + *provider_state = ProviderState::Connecting(Arc::new(new_provider)); // drop the lock so that we can get a request handle // trace!("provider_state {} unlocked", self); @@ -464,7 +477,7 @@ impl Web3Connection { .context("provider missing")? .clone(); - *provider_state = ProviderState::Ready(ready_provider); + *provider_state = ProviderState::Connected(ready_provider); // trace!("unlocked for ready..."); } @@ -693,7 +706,7 @@ impl Web3Connection { // trace!("unlocked on new heads"); // TODO: need a timeout - if let ProviderState::Ready(provider) = provider_state { + if let ProviderState::Connected(provider) = provider_state { match provider.as_ref() { Web3Provider::Mock => unimplemented!(), Web3Provider::Http(_provider) => { @@ -865,7 +878,7 @@ impl Web3Connection { authorization: Arc, tx_id_sender: flume::Sender<(TxHash, Arc)>, ) -> anyhow::Result<()> { - if let ProviderState::Ready(provider) = self + if let ProviderState::Connected(provider) = self .provider_state .try_read() .context("subscribe_pending_transactions")? @@ -938,6 +951,7 @@ impl Web3Connection { /// be careful with this; it might wait forever! /// `allow_not_ready` is only for use by health checks while starting the provider + /// TODO: don't use anyhow. use specific error type pub async fn wait_for_request_handle( self: &Arc, authorization: &Arc, @@ -954,21 +968,29 @@ impl Web3Connection { Ok(OpenRequestResult::Handle(handle)) => return Ok(handle), Ok(OpenRequestResult::RetryAt(retry_at)) => { // TODO: emit a stat? - // // trace!(?retry_at); + trace!("{} waiting for request handle until {:?}", self, retry_at); if retry_at > max_wait { // break now since we will wait past our maximum wait time // TODO: don't use anyhow. use specific error type return Err(anyhow::anyhow!("timeout waiting for request handle")); } + sleep_until(retry_at).await; } Ok(OpenRequestResult::NotReady) => { // TODO: when can this happen? log? emit a stat? - // TODO: subscribe to the head block on this + trace!("{} has no handle ready", self); + + let now = Instant::now(); + + if now > max_wait { + return Err(anyhow::anyhow!("unable to retry for request handle")); + } + // TODO: sleep how long? maybe just error? - // TODO: don't use anyhow. use specific error type - return Err(anyhow::anyhow!("unable to retry for request handle")); + // TODO: instead of an arbitrary sleep, subscribe to the head block on this + sleep(Duration::from_millis(10)).await; } Err(err) => return Err(err), } @@ -994,12 +1016,22 @@ impl Web3Connection { return Ok(OpenRequestResult::NotReady); } + if let Some(hard_limit_until) = self.hard_limit_until.as_ref() { + let hard_limit_ready = hard_limit_until.borrow().clone(); + + let now = Instant::now(); + + if now < hard_limit_ready { + return Ok(OpenRequestResult::RetryAt(hard_limit_ready)); + } + } + // check rate limits if let Some(ratelimiter) = self.hard_limit.as_ref() { // TODO: how should we know if we should set expire or not? match ratelimiter.throttle().await? { RedisRateLimitResult::Allowed(_) => { - // // trace!("rate limit succeeded") + // trace!("rate limit succeeded") } RedisRateLimitResult::RetryAt(retry_at, _) => { // rate limit failed @@ -1008,6 +1040,10 @@ impl Web3Connection { // TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0? warn!("Exhausted rate limit on {}. Retry at {:?}", self, retry_at); + if let Some(hard_limit_until) = self.hard_limit_until.as_ref() { + hard_limit_until.send(retry_at.clone())?; + } + return Ok(OpenRequestResult::RetryAt(retry_at)); } RedisRateLimitResult::RetryNever => { @@ -1165,6 +1201,7 @@ mod tests { internal_requests: 0.into(), provider_state: AsyncRwLock::new(ProviderState::None), hard_limit: None, + hard_limit_until: None, soft_limit: 1_000, automatic_block_limit: false, backup: false, @@ -1213,6 +1250,7 @@ mod tests { internal_requests: 0.into(), provider_state: AsyncRwLock::new(ProviderState::None), hard_limit: None, + hard_limit_until: None, soft_limit: 1_000, automatic_block_limit: false, backup: false, diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 24e4e856..5b97a49a 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -128,6 +128,7 @@ impl Web3Connections { // turn configs into connections (in parallel) // TODO: move this into a helper function. then we can use it when configs change (will need a remove function too) + // TODO: futures unordered? let spawn_handles: Vec<_> = server_configs .into_iter() .filter_map(|(server_name, server_config)| { @@ -175,7 +176,7 @@ impl Web3Connections { let mut connections = HashMap::new(); let mut handles = vec![]; - // TODO: do we need to join this? + // TODO: futures unordered? for x in join_all(spawn_handles).await { // TODO: how should we handle errors here? one rpc being down shouldn't cause the program to exit match x { @@ -529,7 +530,7 @@ impl Web3Connections { let available_requests = soft_limit - active_requests; - trace!("available requests on {}: {}", rpc, available_requests); + // trace!("available requests on {}: {}", rpc, available_requests); minimum = minimum.min(available_requests); maximum = maximum.max(available_requests); @@ -538,8 +539,8 @@ impl Web3Connections { }) .collect(); - trace!("minimum available requests: {}", minimum); - trace!("maximum available requests: {}", maximum); + // trace!("minimum available requests: {}", minimum); + // trace!("maximum available requests: {}", maximum); if maximum < 0.0 { // TODO: if maximum < 0 and there are other tiers on the same block, we should include them now @@ -588,7 +589,7 @@ impl Web3Connections { .await { Ok(OpenRequestResult::Handle(handle)) => { - trace!("opened handle: {}", best_rpc); + // trace!("opened handle: {}", best_rpc); return Ok(OpenRequestResult::Handle(handle)); } Ok(OpenRequestResult::RetryAt(retry_at)) => { @@ -746,24 +747,25 @@ impl Web3Connections { request: JsonRpcRequest, request_metadata: Option<&Arc>, min_block_needed: Option<&U64>, - wait_for_sync: bool, ) -> anyhow::Result { let mut skip_rpcs = vec![]; let mut method_not_available_response = None; - let mut watch_consensus_connections = if wait_for_sync { - Some(self.watch_consensus_connections_sender.subscribe()) - } else { - None - }; + let mut watch_consensus_connections = self.watch_consensus_connections_sender.subscribe(); // TODO: maximum retries? right now its the total number of servers loop { - // TODO: is self.conns still right now that we split main and backup servers? - // TODO: if a new block arrives, we probably want to reset the skip list - if skip_rpcs.len() == self.conns.len() { - break; + let num_skipped = skip_rpcs.len(); + + if num_skipped > 0 { + // trace!("skip_rpcs: {:?}", skip_rpcs); + + // TODO: is self.conns still right now that we split main and backup servers? + if num_skipped == self.conns.len() { + break; + } } + match self .best_consensus_head_connection( authorization, @@ -890,30 +892,23 @@ impl Web3Connections { // TODO: move this to a helper function // sleep (TODO: with a lock?) until our rate limits should be available // TODO: if a server catches up sync while we are waiting, we could stop waiting - warn!("All rate limits exceeded. Sleeping until {:?}", retry_at); + warn!( + "All rate limits exceeded. waiting for change in synced servers or {:?}", + retry_at + ); // TODO: have a separate column for rate limited? if let Some(request_metadata) = request_metadata { request_metadata.no_servers.fetch_add(1, Ordering::Release); } - // TODO: if there are other servers in synced_connections, we should continue now - - if let Some(watch_consensus_connections) = watch_consensus_connections.as_mut() - { - // wait until retry_at OR synced_connections changes - trace!("waiting for change in synced servers or retry_at"); - tokio::select! { - _ = sleep_until(retry_at) => { - skip_rpcs.pop(); - } - _ = watch_consensus_connections.changed() => { - // TODO: would be nice to save this retry_at so we don't keep hitting limits - let _ = watch_consensus_connections.borrow_and_update(); - } + tokio::select! { + _ = sleep_until(retry_at) => { + skip_rpcs.pop(); + } + _ = watch_consensus_connections.changed() => { + watch_consensus_connections.borrow_and_update(); } - } else { - sleep_until(retry_at).await; } } OpenRequestResult::NotReady => { @@ -921,13 +916,16 @@ impl Web3Connections { request_metadata.no_servers.fetch_add(1, Ordering::Release); } - if wait_for_sync { - trace!("waiting for change in synced servers"); - // TODO: race here. there might have been a change while we were waiting on the previous server - self.watch_consensus_connections_sender - .subscribe() - .changed() - .await?; + trace!("No servers ready. Waiting up to 1 second for change in synced servers"); + + // TODO: exponential backoff? + tokio::select! { + _ = sleep(Duration::from_secs(1)) => { + skip_rpcs.pop(); + } + _ = watch_consensus_connections.changed() => { + watch_consensus_connections.borrow_and_update(); + } } } } @@ -1060,7 +1058,6 @@ impl Web3Connections { request, request_metadata, min_block_needed, - true, ) .await } @@ -1168,8 +1165,11 @@ mod tests { active_requests: 0.into(), frontend_requests: 0.into(), internal_requests: 0.into(), - provider_state: AsyncRwLock::new(ProviderState::Ready(Arc::new(Web3Provider::Mock))), + provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new( + Web3Provider::Mock, + ))), hard_limit: None, + hard_limit_until: None, soft_limit: 1_000, automatic_block_limit: true, backup: false, @@ -1188,8 +1188,11 @@ mod tests { active_requests: 0.into(), frontend_requests: 0.into(), internal_requests: 0.into(), - provider_state: AsyncRwLock::new(ProviderState::Ready(Arc::new(Web3Provider::Mock))), + provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new( + Web3Provider::Mock, + ))), hard_limit: None, + hard_limit_until: None, soft_limit: 1_000, automatic_block_limit: false, backup: false, @@ -1395,8 +1398,11 @@ mod tests { active_requests: 0.into(), frontend_requests: 0.into(), internal_requests: 0.into(), - provider_state: AsyncRwLock::new(ProviderState::Ready(Arc::new(Web3Provider::Mock))), + provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new( + Web3Provider::Mock, + ))), hard_limit: None, + hard_limit_until: None, soft_limit: 3_000, automatic_block_limit: false, backup: false, @@ -1415,8 +1421,11 @@ mod tests { active_requests: 0.into(), frontend_requests: 0.into(), internal_requests: 0.into(), - provider_state: AsyncRwLock::new(ProviderState::Ready(Arc::new(Web3Provider::Mock))), + provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new( + Web3Provider::Mock, + ))), hard_limit: None, + hard_limit_until: None, soft_limit: 1_000, automatic_block_limit: false, backup: false, diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 8cf22bbf..2c440d26 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -284,8 +284,14 @@ impl OpenRequestHandle { revert_handler }; + enum ResponseTypes { + Revert, + RateLimit, + Ok, + } + // check for "execution reverted" here - let is_revert = if let ProviderError::JsonRpcClientError(err) = err { + let response_type = if let ProviderError::JsonRpcClientError(err) = err { // Http and Ws errors are very similar, but different types let msg = match &*self.provider { Web3Provider::Mock => unimplemented!(), @@ -310,23 +316,39 @@ impl OpenRequestHandle { }; if let Some(msg) = msg { - msg.starts_with("execution reverted") + if msg.starts_with("execution reverted") { + trace!("revert from {}", self.conn); + ResponseTypes::Revert + } else if msg.contains("limit") || msg.contains("request") { + trace!("rate limit from {}", self.conn); + ResponseTypes::RateLimit + } else { + ResponseTypes::Ok + } } else { - false + ResponseTypes::Ok } } else { - false + ResponseTypes::Ok }; - if is_revert { - trace!("revert from {}", self.conn); + if matches!(response_type, ResponseTypes::RateLimit) { + if let Some(hard_limit_until) = self.conn.hard_limit_until.as_ref() { + let retry_at = Instant::now() + Duration::from_secs(1); + + trace!("retry {} at: {:?}", self.conn, retry_at); + + hard_limit_until + .send(retry_at) + .expect("sending hard limit retry times should always work"); + } } // TODO: think more about the method and param logs. those can be sensitive information match revert_handler { RequestRevertHandler::DebugLevel => { // TODO: think about this revert check more. sometimes we might want reverts logged so this needs a flag - if !is_revert { + if matches!(response_type, ResponseTypes::Revert) { debug!( "bad response from {}! method={} params={:?} err={:?}", self.conn, method, params, err