improve waiting for sync when rate limited

This commit is contained in:
Bryan Stitt 2023-01-24 20:44:50 -08:00
parent 0ae240492a
commit 694e552b5d
5 changed files with 138 additions and 74 deletions

@ -321,6 +321,7 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [x] send sentryd errors to pagerduty
- [x] improve handling of unknown methods
- [x] don't send pagerduty alerts for websocket panics
- [x] improve waiting for sync when rate limited
- [-] proxy mode for benchmarking all backends
- [-] proxy mode for sending to multiple backends
- [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly

@ -167,13 +167,7 @@ impl Web3Connections {
// TODO: request_metadata? maybe we should put it in the authorization?
// TODO: think more about this wait_for_sync
let response = self
.try_send_best_consensus_head_connection(
authorization,
request,
None,
None,
true,
)
.try_send_best_consensus_head_connection(authorization, request, None, None)
.await?;
let block = response.result.context("failed fetching block")?;
@ -260,7 +254,7 @@ impl Web3Connections {
// TODO: if error, retry?
// TODO: request_metadata or authorization?
let response = self
.try_send_best_consensus_head_connection(authorization, request, None, Some(num), true)
.try_send_best_consensus_head_connection(authorization, request, None, Some(num))
.await?;
let raw_block = response.result.context("no block result")?;

@ -24,22 +24,22 @@ use std::sync::atomic::{self, AtomicU32, AtomicU64};
use std::{cmp::Ordering, sync::Arc};
use thread_fast_rng::rand::Rng;
use thread_fast_rng::thread_fast_rng;
use tokio::sync::{broadcast, oneshot, RwLock as AsyncRwLock};
use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock};
use tokio::time::{interval, sleep, sleep_until, timeout, Duration, Instant, MissedTickBehavior};
// TODO: maybe provider state should have the block data limit in it. but it is inside an async lock and we can't Serialize then
#[derive(Clone, Debug)]
pub enum ProviderState {
None,
NotReady(Arc<Web3Provider>),
Ready(Arc<Web3Provider>),
Connecting(Arc<Web3Provider>),
Connected(Arc<Web3Provider>),
}
impl ProviderState {
pub async fn provider(&self, allow_not_ready: bool) -> Option<&Arc<Web3Provider>> {
match self {
ProviderState::None => None,
ProviderState::NotReady(x) => {
ProviderState::Connecting(x) => {
if allow_not_ready {
Some(x)
} else {
@ -47,7 +47,7 @@ impl ProviderState {
None
}
}
ProviderState::Ready(x) => {
ProviderState::Connected(x) => {
if x.ready() {
Some(x)
} else {
@ -76,6 +76,8 @@ pub struct Web3Connection {
/// provider is in a RwLock so that we can replace it if re-connecting
/// it is an async lock because we hold it open across awaits
pub(super) provider_state: AsyncRwLock<ProviderState>,
/// keep track of hard limits
pub(super) hard_limit_until: Option<watch::Sender<Instant>>,
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
/// We do not use the deferred rate limiter because going over limits would cause errors
pub(super) hard_limit: Option<RedisRateLimiter>,
@ -136,6 +138,16 @@ impl Web3Connection {
let automatic_block_limit =
(block_data_limit.load(atomic::Ordering::Acquire) == 0) && block_sender.is_some();
// track hard limit until on backup servers (which might surprise us with rate limit changes)
// and track on servers that have a configured hard limit
let hard_limit_until = if backup || hard_limit.is_some() {
let (sender, _) = watch::channel(Instant::now());
Some(sender)
} else {
None
};
let new_connection = Self {
name,
db_conn: db_conn.clone(),
@ -147,6 +159,7 @@ impl Web3Connection {
internal_requests: 0.into(),
provider_state: AsyncRwLock::new(ProviderState::None),
hard_limit,
hard_limit_until,
soft_limit,
automatic_block_limit,
backup,
@ -376,7 +389,7 @@ impl Web3Connection {
ProviderState::None => {
info!("connecting to {}", self);
}
ProviderState::NotReady(provider) | ProviderState::Ready(provider) => {
ProviderState::Connecting(provider) | ProviderState::Connected(provider) => {
// disconnect the current provider
if let Web3Provider::Mock = provider.as_ref() {
return Ok(());
@ -410,7 +423,7 @@ impl Web3Connection {
let new_provider = Web3Provider::from_str(&self.url, self.http_client.clone()).await?;
// trace!("saving provider state as NotReady on {}", self);
*provider_state = ProviderState::NotReady(Arc::new(new_provider));
*provider_state = ProviderState::Connecting(Arc::new(new_provider));
// drop the lock so that we can get a request handle
// trace!("provider_state {} unlocked", self);
@ -464,7 +477,7 @@ impl Web3Connection {
.context("provider missing")?
.clone();
*provider_state = ProviderState::Ready(ready_provider);
*provider_state = ProviderState::Connected(ready_provider);
// trace!("unlocked for ready...");
}
@ -693,7 +706,7 @@ impl Web3Connection {
// trace!("unlocked on new heads");
// TODO: need a timeout
if let ProviderState::Ready(provider) = provider_state {
if let ProviderState::Connected(provider) = provider_state {
match provider.as_ref() {
Web3Provider::Mock => unimplemented!(),
Web3Provider::Http(_provider) => {
@ -865,7 +878,7 @@ impl Web3Connection {
authorization: Arc<Authorization>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
) -> anyhow::Result<()> {
if let ProviderState::Ready(provider) = self
if let ProviderState::Connected(provider) = self
.provider_state
.try_read()
.context("subscribe_pending_transactions")?
@ -938,6 +951,7 @@ impl Web3Connection {
/// be careful with this; it might wait forever!
/// `allow_not_ready` is only for use by health checks while starting the provider
/// TODO: don't use anyhow. use specific error type
pub async fn wait_for_request_handle(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
@ -954,21 +968,29 @@ impl Web3Connection {
Ok(OpenRequestResult::Handle(handle)) => return Ok(handle),
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// TODO: emit a stat?
// // trace!(?retry_at);
trace!("{} waiting for request handle until {:?}", self, retry_at);
if retry_at > max_wait {
// break now since we will wait past our maximum wait time
// TODO: don't use anyhow. use specific error type
return Err(anyhow::anyhow!("timeout waiting for request handle"));
}
sleep_until(retry_at).await;
}
Ok(OpenRequestResult::NotReady) => {
// TODO: when can this happen? log? emit a stat?
// TODO: subscribe to the head block on this
trace!("{} has no handle ready", self);
let now = Instant::now();
if now > max_wait {
return Err(anyhow::anyhow!("unable to retry for request handle"));
}
// TODO: sleep how long? maybe just error?
// TODO: don't use anyhow. use specific error type
return Err(anyhow::anyhow!("unable to retry for request handle"));
// TODO: instead of an arbitrary sleep, subscribe to the head block on this
sleep(Duration::from_millis(10)).await;
}
Err(err) => return Err(err),
}
@ -994,12 +1016,22 @@ impl Web3Connection {
return Ok(OpenRequestResult::NotReady);
}
if let Some(hard_limit_until) = self.hard_limit_until.as_ref() {
let hard_limit_ready = hard_limit_until.borrow().clone();
let now = Instant::now();
if now < hard_limit_ready {
return Ok(OpenRequestResult::RetryAt(hard_limit_ready));
}
}
// check rate limits
if let Some(ratelimiter) = self.hard_limit.as_ref() {
// TODO: how should we know if we should set expire or not?
match ratelimiter.throttle().await? {
RedisRateLimitResult::Allowed(_) => {
// // trace!("rate limit succeeded")
// trace!("rate limit succeeded")
}
RedisRateLimitResult::RetryAt(retry_at, _) => {
// rate limit failed
@ -1008,6 +1040,10 @@ impl Web3Connection {
// TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0?
warn!("Exhausted rate limit on {}. Retry at {:?}", self, retry_at);
if let Some(hard_limit_until) = self.hard_limit_until.as_ref() {
hard_limit_until.send(retry_at.clone())?;
}
return Ok(OpenRequestResult::RetryAt(retry_at));
}
RedisRateLimitResult::RetryNever => {
@ -1165,6 +1201,7 @@ mod tests {
internal_requests: 0.into(),
provider_state: AsyncRwLock::new(ProviderState::None),
hard_limit: None,
hard_limit_until: None,
soft_limit: 1_000,
automatic_block_limit: false,
backup: false,
@ -1213,6 +1250,7 @@ mod tests {
internal_requests: 0.into(),
provider_state: AsyncRwLock::new(ProviderState::None),
hard_limit: None,
hard_limit_until: None,
soft_limit: 1_000,
automatic_block_limit: false,
backup: false,

@ -128,6 +128,7 @@ impl Web3Connections {
// turn configs into connections (in parallel)
// TODO: move this into a helper function. then we can use it when configs change (will need a remove function too)
// TODO: futures unordered?
let spawn_handles: Vec<_> = server_configs
.into_iter()
.filter_map(|(server_name, server_config)| {
@ -175,7 +176,7 @@ impl Web3Connections {
let mut connections = HashMap::new();
let mut handles = vec![];
// TODO: do we need to join this?
// TODO: futures unordered?
for x in join_all(spawn_handles).await {
// TODO: how should we handle errors here? one rpc being down shouldn't cause the program to exit
match x {
@ -529,7 +530,7 @@ impl Web3Connections {
let available_requests = soft_limit - active_requests;
trace!("available requests on {}: {}", rpc, available_requests);
// trace!("available requests on {}: {}", rpc, available_requests);
minimum = minimum.min(available_requests);
maximum = maximum.max(available_requests);
@ -538,8 +539,8 @@ impl Web3Connections {
})
.collect();
trace!("minimum available requests: {}", minimum);
trace!("maximum available requests: {}", maximum);
// trace!("minimum available requests: {}", minimum);
// trace!("maximum available requests: {}", maximum);
if maximum < 0.0 {
// TODO: if maximum < 0 and there are other tiers on the same block, we should include them now
@ -588,7 +589,7 @@ impl Web3Connections {
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
trace!("opened handle: {}", best_rpc);
// trace!("opened handle: {}", best_rpc);
return Ok(OpenRequestResult::Handle(handle));
}
Ok(OpenRequestResult::RetryAt(retry_at)) => {
@ -746,24 +747,25 @@ impl Web3Connections {
request: JsonRpcRequest,
request_metadata: Option<&Arc<RequestMetadata>>,
min_block_needed: Option<&U64>,
wait_for_sync: bool,
) -> anyhow::Result<JsonRpcForwardedResponse> {
let mut skip_rpcs = vec![];
let mut method_not_available_response = None;
let mut watch_consensus_connections = if wait_for_sync {
Some(self.watch_consensus_connections_sender.subscribe())
} else {
None
};
let mut watch_consensus_connections = self.watch_consensus_connections_sender.subscribe();
// TODO: maximum retries? right now its the total number of servers
loop {
// TODO: is self.conns still right now that we split main and backup servers?
// TODO: if a new block arrives, we probably want to reset the skip list
if skip_rpcs.len() == self.conns.len() {
break;
let num_skipped = skip_rpcs.len();
if num_skipped > 0 {
// trace!("skip_rpcs: {:?}", skip_rpcs);
// TODO: is self.conns still right now that we split main and backup servers?
if num_skipped == self.conns.len() {
break;
}
}
match self
.best_consensus_head_connection(
authorization,
@ -890,30 +892,23 @@ impl Web3Connections {
// TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
warn!("All rate limits exceeded. Sleeping until {:?}", retry_at);
warn!(
"All rate limits exceeded. waiting for change in synced servers or {:?}",
retry_at
);
// TODO: have a separate column for rate limited?
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
// TODO: if there are other servers in synced_connections, we should continue now
if let Some(watch_consensus_connections) = watch_consensus_connections.as_mut()
{
// wait until retry_at OR synced_connections changes
trace!("waiting for change in synced servers or retry_at");
tokio::select! {
_ = sleep_until(retry_at) => {
skip_rpcs.pop();
}
_ = watch_consensus_connections.changed() => {
// TODO: would be nice to save this retry_at so we don't keep hitting limits
let _ = watch_consensus_connections.borrow_and_update();
}
tokio::select! {
_ = sleep_until(retry_at) => {
skip_rpcs.pop();
}
_ = watch_consensus_connections.changed() => {
watch_consensus_connections.borrow_and_update();
}
} else {
sleep_until(retry_at).await;
}
}
OpenRequestResult::NotReady => {
@ -921,13 +916,16 @@ impl Web3Connections {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
if wait_for_sync {
trace!("waiting for change in synced servers");
// TODO: race here. there might have been a change while we were waiting on the previous server
self.watch_consensus_connections_sender
.subscribe()
.changed()
.await?;
trace!("No servers ready. Waiting up to 1 second for change in synced servers");
// TODO: exponential backoff?
tokio::select! {
_ = sleep(Duration::from_secs(1)) => {
skip_rpcs.pop();
}
_ = watch_consensus_connections.changed() => {
watch_consensus_connections.borrow_and_update();
}
}
}
}
@ -1060,7 +1058,6 @@ impl Web3Connections {
request,
request_metadata,
min_block_needed,
true,
)
.await
}
@ -1168,8 +1165,11 @@ mod tests {
active_requests: 0.into(),
frontend_requests: 0.into(),
internal_requests: 0.into(),
provider_state: AsyncRwLock::new(ProviderState::Ready(Arc::new(Web3Provider::Mock))),
provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new(
Web3Provider::Mock,
))),
hard_limit: None,
hard_limit_until: None,
soft_limit: 1_000,
automatic_block_limit: true,
backup: false,
@ -1188,8 +1188,11 @@ mod tests {
active_requests: 0.into(),
frontend_requests: 0.into(),
internal_requests: 0.into(),
provider_state: AsyncRwLock::new(ProviderState::Ready(Arc::new(Web3Provider::Mock))),
provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new(
Web3Provider::Mock,
))),
hard_limit: None,
hard_limit_until: None,
soft_limit: 1_000,
automatic_block_limit: false,
backup: false,
@ -1395,8 +1398,11 @@ mod tests {
active_requests: 0.into(),
frontend_requests: 0.into(),
internal_requests: 0.into(),
provider_state: AsyncRwLock::new(ProviderState::Ready(Arc::new(Web3Provider::Mock))),
provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new(
Web3Provider::Mock,
))),
hard_limit: None,
hard_limit_until: None,
soft_limit: 3_000,
automatic_block_limit: false,
backup: false,
@ -1415,8 +1421,11 @@ mod tests {
active_requests: 0.into(),
frontend_requests: 0.into(),
internal_requests: 0.into(),
provider_state: AsyncRwLock::new(ProviderState::Ready(Arc::new(Web3Provider::Mock))),
provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new(
Web3Provider::Mock,
))),
hard_limit: None,
hard_limit_until: None,
soft_limit: 1_000,
automatic_block_limit: false,
backup: false,

@ -284,8 +284,14 @@ impl OpenRequestHandle {
revert_handler
};
enum ResponseTypes {
Revert,
RateLimit,
Ok,
}
// check for "execution reverted" here
let is_revert = if let ProviderError::JsonRpcClientError(err) = err {
let response_type = if let ProviderError::JsonRpcClientError(err) = err {
// Http and Ws errors are very similar, but different types
let msg = match &*self.provider {
Web3Provider::Mock => unimplemented!(),
@ -310,23 +316,39 @@ impl OpenRequestHandle {
};
if let Some(msg) = msg {
msg.starts_with("execution reverted")
if msg.starts_with("execution reverted") {
trace!("revert from {}", self.conn);
ResponseTypes::Revert
} else if msg.contains("limit") || msg.contains("request") {
trace!("rate limit from {}", self.conn);
ResponseTypes::RateLimit
} else {
ResponseTypes::Ok
}
} else {
false
ResponseTypes::Ok
}
} else {
false
ResponseTypes::Ok
};
if is_revert {
trace!("revert from {}", self.conn);
if matches!(response_type, ResponseTypes::RateLimit) {
if let Some(hard_limit_until) = self.conn.hard_limit_until.as_ref() {
let retry_at = Instant::now() + Duration::from_secs(1);
trace!("retry {} at: {:?}", self.conn, retry_at);
hard_limit_until
.send(retry_at)
.expect("sending hard limit retry times should always work");
}
}
// TODO: think more about the method and param logs. those can be sensitive information
match revert_handler {
RequestRevertHandler::DebugLevel => {
// TODO: think about this revert check more. sometimes we might want reverts logged so this needs a flag
if !is_revert {
if matches!(response_type, ResponseTypes::Revert) {
debug!(
"bad response from {}! method={} params={:?} err={:?}",
self.conn, method, params, err