DRY throttling

This commit is contained in:
Bryan Stitt 2023-10-06 18:33:58 -07:00
parent 9a7777e3f9
commit 949c3eeb5d

View File

@ -10,7 +10,7 @@ use crate::jsonrpc::{self, JsonRpcParams, JsonRpcResultData};
use crate::rpcs::request::RequestErrorHandler; use crate::rpcs::request::RequestErrorHandler;
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use arc_swap::ArcSwapOption; use arc_swap::ArcSwapOption;
use ethers::prelude::{Address, Bytes, Middleware, Transaction, TxHash, U256, U64}; use ethers::prelude::{Address, Bytes, Middleware, Transaction, TxHash, H256, U256, U64};
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency}; use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency};
@ -846,12 +846,11 @@ impl Web3Rpc {
if let Some(ws_provider) = self.ws_provider.load().as_ref() { if let Some(ws_provider) = self.ws_provider.load().as_ref() {
// todo: move subscribe_blocks onto the request handle // todo: move subscribe_blocks onto the request handle
let authorization = Default::default();
let error_handler = Some(Level::ERROR.into()); let error_handler = Some(Level::ERROR.into());
// we don't actually care about params here. we aren't going to use this handle for
let active_request_handle = self let active_request_handle = self
.wait_for_request_handle(&authorization, None, error_handler) .internal_request::<_, H256>("eth_subscribe", &(), error_handler, None)
.await; .await;
let mut pending_txs_sub = ws_provider.subscribe_pending_txs().await?; let mut pending_txs_sub = ws_provider.subscribe_pending_txs().await?;
@ -904,14 +903,10 @@ impl Web3Rpc {
}; };
if let Some(ws_provider) = self.ws_provider.load().as_ref() { if let Some(ws_provider) = self.ws_provider.load().as_ref() {
// todo: move subscribe_blocks onto the request handle self.wait_for_throttle(Instant::now() + Duration::from_secs(5))
let authorization = Default::default(); .await?;
let active_request_handle = self
.wait_for_request_handle(&authorization, None, error_handler)
.await;
let mut blocks = ws_provider.subscribe_blocks().await?; let mut blocks = ws_provider.subscribe_blocks().await?;
drop(active_request_handle);
// query the block once since the subscription doesn't send the current block // query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now // there is a very small race condition here where the stream could send us a new block right now
@ -986,13 +981,8 @@ impl Web3Rpc {
pub async fn wait_for_request_handle( pub async fn wait_for_request_handle(
self: &Arc<Self>, self: &Arc<Self>,
web3_request: &Arc<Web3Request>, web3_request: &Arc<Web3Request>,
max_wait: Option<Duration>,
error_handler: Option<RequestErrorHandler>, error_handler: Option<RequestErrorHandler>,
) -> Web3ProxyResult<OpenRequestHandle> { ) -> Web3ProxyResult<OpenRequestHandle> {
// TODO: what should the default be?
// TODO: split max_wait_connect (which might wait if a rate limit is pending) and max_wait_request
let max_wait_until = max_wait.map(|x| Instant::now() + x);
loop { loop {
match self.try_request_handle(web3_request, error_handler).await { match self.try_request_handle(web3_request, error_handler).await {
Ok(OpenRequestResult::Handle(handle)) => return Ok(handle), Ok(OpenRequestResult::Handle(handle)) => return Ok(handle),
@ -1006,11 +996,15 @@ impl Web3Rpc {
self self
); );
if let Some(max_wait_until) = max_wait_until { // if things are slow this could happen in prod. but generally its a problem
if retry_at > max_wait_until { debug_assert!(wait > Duration::from_secs(0));
// break now since we will wait past our maximum wait time
return Err(Web3ProxyError::Timeout(None)); // TODO: have connect_timeout in addition to the full ttl
} if retry_at > web3_request.expire_instant {
// break now since we will wait past our maximum wait time
return Err(Web3ProxyError::Timeout(Some(
web3_request.start_instant.elapsed(),
)));
} }
sleep_until(retry_at).await; sleep_until(retry_at).await;
@ -1019,34 +1013,46 @@ impl Web3Rpc {
// TODO: when can this happen? log? emit a stat? // TODO: when can this happen? log? emit a stat?
trace!("{} has no handle ready", self); trace!("{} has no handle ready", self);
if let Some(max_wait_until) = max_wait_until {
if Instant::now() > max_wait_until {
return Err(Web3ProxyError::NoHandleReady);
}
}
// TODO: sleep how long? maybe just error? // TODO: sleep how long? maybe just error?
// TODO: instead of an arbitrary sleep, subscribe to the head block on this? // TODO: instead of an arbitrary sleep, subscribe to the head block on this?
sleep(Duration::from_millis(10)).await; // TODO: exponential backoff with jitter
todo!();
} }
Err(err) => return Err(err), Err(err) => return Err(err),
} }
} }
} }
pub async fn try_request_handle( async fn wait_for_throttle(self: &Arc<Self>, wait_until: Instant) -> Web3ProxyResult<u64> {
self: &Arc<Self>, loop {
web3_request: &Arc<Web3Request>, match self.try_throttle().await? {
error_handler: Option<RequestErrorHandler>, RedisRateLimitResult::Allowed(y) => return Ok(y),
) -> Web3ProxyResult<OpenRequestResult> { RedisRateLimitResult::RetryAt(retry_at, _) => {
// TODO: if websocket is reconnecting, return an error? // TODO: check timeouts
if wait_until < retry_at {
break;
}
sleep_until(wait_until).await;
}
RedisRateLimitResult::RetryNever => {
// TODO: not sure what this should be
return Err(Web3ProxyError::Timeout(None));
}
}
}
// TODO: not sure what this should be. maybe if when we convert to JSON we can set it?
Err(Web3ProxyError::Timeout(None))
}
async fn try_throttle(self: &Arc<Self>) -> Web3ProxyResult<RedisRateLimitResult> {
// check cached rate limits // check cached rate limits
let now = Instant::now(); let now = Instant::now();
let hard_limit_until = self.next_available(now); let hard_limit_until = self.next_available(now);
if now < hard_limit_until { if now < hard_limit_until {
return Ok(OpenRequestResult::RetryAt(hard_limit_until)); return Ok(RedisRateLimitResult::RetryAt(hard_limit_until, u64::MAX));
} }
// check shared rate limits // check shared rate limits
@ -1057,10 +1063,8 @@ impl Web3Rpc {
.await .await
.context(format!("attempting to throttle {}", self))? .context(format!("attempting to throttle {}", self))?
{ {
RedisRateLimitResult::Allowed(_) => { x @ RedisRateLimitResult::Allowed(_) => Ok(x),
// trace!("rate limit succeeded") RedisRateLimitResult::RetryAt(retry_at, count) => {
}
RedisRateLimitResult::RetryAt(retry_at, _) => {
// rate limit gave us a wait time // rate limit gave us a wait time
// if not a backup server, warn. backups hit rate limits often // if not a backup server, warn. backups hit rate limits often
if !self.backup { if !self.backup {
@ -1076,13 +1080,37 @@ impl Web3Rpc {
hard_limit_until.send_replace(retry_at); hard_limit_until.send_replace(retry_at);
} }
return Ok(OpenRequestResult::RetryAt(retry_at)); Ok(RedisRateLimitResult::RetryAt(retry_at, count))
} }
RedisRateLimitResult::RetryNever => { x @ RedisRateLimitResult::RetryNever => {
warn!("how did retry never on {} happen?", self); error!("how did retry never on {} happen?", self);
return Ok(OpenRequestResult::NotReady); Ok(x)
} }
} }
} else {
Ok(RedisRateLimitResult::Allowed(u64::MAX))
}
}
pub async fn try_request_handle(
self: &Arc<Self>,
web3_request: &Arc<Web3Request>,
error_handler: Option<RequestErrorHandler>,
) -> Web3ProxyResult<OpenRequestResult> {
// TODO: if websocket is reconnecting, return an error?
// TODO: if this server can't handle this request because it isn't synced, return an error
// check shared rate limits
match self.try_throttle().await? {
RedisRateLimitResult::Allowed(_) => {}
RedisRateLimitResult::RetryAt(retry_at, _) => {
return Ok(OpenRequestResult::RetryAt(retry_at));
}
RedisRateLimitResult::RetryNever => {
warn!("how did retry never on {} happen?", self);
return Ok(OpenRequestResult::NotReady);
}
}; };
let handle = let handle =
@ -1101,18 +1129,16 @@ impl Web3Rpc {
// TODO: think about this more. its hard to do this without being self-referenctial! // TODO: think about this more. its hard to do this without being self-referenctial!
let web3_request = Web3Request::new_internal(method.into(), params, None, max_wait).await?; let web3_request = Web3Request::new_internal(method.into(), params, None, max_wait).await?;
self.authorized_request(&web3_request, error_handler, max_wait) self.authorized_request(&web3_request, error_handler).await
.await
} }
pub async fn authorized_request<R: JsonRpcResultData>( pub async fn authorized_request<R: JsonRpcResultData>(
self: &Arc<Self>, self: &Arc<Self>,
web3_request: &Arc<Web3Request>, web3_request: &Arc<Web3Request>,
error_handler: Option<RequestErrorHandler>, error_handler: Option<RequestErrorHandler>,
max_wait: Option<Duration>,
) -> Web3ProxyResult<R> { ) -> Web3ProxyResult<R> {
let handle = self let handle = self
.wait_for_request_handle(web3_request, max_wait, error_handler) .wait_for_request_handle(web3_request, error_handler)
.await?; .await?;
let response = handle.request().await?; let response = handle.request().await?;