diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 3c209e3d..71030e68 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -10,7 +10,7 @@ use crate::jsonrpc::{self, JsonRpcParams, JsonRpcResultData}; use crate::rpcs::request::RequestErrorHandler; use anyhow::{anyhow, Context}; use arc_swap::ArcSwapOption; -use ethers::prelude::{Address, Bytes, Middleware, Transaction, TxHash, U256, U64}; +use ethers::prelude::{Address, Bytes, Middleware, Transaction, TxHash, H256, U256, U64}; use futures::stream::FuturesUnordered; use futures::StreamExt; use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency}; @@ -846,12 +846,11 @@ impl Web3Rpc { if let Some(ws_provider) = self.ws_provider.load().as_ref() { // todo: move subscribe_blocks onto the request handle - let authorization = Default::default(); - let error_handler = Some(Level::ERROR.into()); + // we don't actually care about params here. we aren't going to use this handle for let active_request_handle = self - .wait_for_request_handle(&authorization, None, error_handler) + .internal_request::<_, H256>("eth_subscribe", &(), error_handler, None) .await; let mut pending_txs_sub = ws_provider.subscribe_pending_txs().await?; @@ -904,14 +903,10 @@ impl Web3Rpc { }; if let Some(ws_provider) = self.ws_provider.load().as_ref() { - // todo: move subscribe_blocks onto the request handle - let authorization = Default::default(); + self.wait_for_throttle(Instant::now() + Duration::from_secs(5)) + .await?; - let active_request_handle = self - .wait_for_request_handle(&authorization, None, error_handler) - .await; let mut blocks = ws_provider.subscribe_blocks().await?; - drop(active_request_handle); // query the block once since the subscription doesn't send the current block // there is a very small race condition here where the stream could send us a new block right now @@ -986,13 +981,8 @@ impl Web3Rpc { pub async fn wait_for_request_handle( self: &Arc, web3_request: &Arc, - max_wait: Option, error_handler: Option, ) -> Web3ProxyResult { - // TODO: what should the default be? - // TODO: split max_wait_connect (which might wait if a rate limit is pending) and max_wait_request - let max_wait_until = max_wait.map(|x| Instant::now() + x); - loop { match self.try_request_handle(web3_request, error_handler).await { Ok(OpenRequestResult::Handle(handle)) => return Ok(handle), @@ -1006,11 +996,15 @@ impl Web3Rpc { self ); - if let Some(max_wait_until) = max_wait_until { - if retry_at > max_wait_until { - // break now since we will wait past our maximum wait time - return Err(Web3ProxyError::Timeout(None)); - } + // if things are slow this could happen in prod. but generally its a problem + debug_assert!(wait > Duration::from_secs(0)); + + // TODO: have connect_timeout in addition to the full ttl + if retry_at > web3_request.expire_instant { + // break now since we will wait past our maximum wait time + return Err(Web3ProxyError::Timeout(Some( + web3_request.start_instant.elapsed(), + ))); } sleep_until(retry_at).await; @@ -1019,34 +1013,46 @@ impl Web3Rpc { // TODO: when can this happen? log? emit a stat? trace!("{} has no handle ready", self); - if let Some(max_wait_until) = max_wait_until { - if Instant::now() > max_wait_until { - return Err(Web3ProxyError::NoHandleReady); - } - } - // TODO: sleep how long? maybe just error? // TODO: instead of an arbitrary sleep, subscribe to the head block on this? - sleep(Duration::from_millis(10)).await; + // TODO: exponential backoff with jitter + todo!(); } Err(err) => return Err(err), } } } - pub async fn try_request_handle( - self: &Arc, - web3_request: &Arc, - error_handler: Option, - ) -> Web3ProxyResult { - // TODO: if websocket is reconnecting, return an error? + async fn wait_for_throttle(self: &Arc, wait_until: Instant) -> Web3ProxyResult { + loop { + match self.try_throttle().await? { + RedisRateLimitResult::Allowed(y) => return Ok(y), + RedisRateLimitResult::RetryAt(retry_at, _) => { + // TODO: check timeouts + if wait_until < retry_at { + break; + } + sleep_until(wait_until).await; + } + RedisRateLimitResult::RetryNever => { + // TODO: not sure what this should be + return Err(Web3ProxyError::Timeout(None)); + } + } + } + + // TODO: not sure what this should be. maybe if when we convert to JSON we can set it? + Err(Web3ProxyError::Timeout(None)) + } + + async fn try_throttle(self: &Arc) -> Web3ProxyResult { // check cached rate limits let now = Instant::now(); let hard_limit_until = self.next_available(now); if now < hard_limit_until { - return Ok(OpenRequestResult::RetryAt(hard_limit_until)); + return Ok(RedisRateLimitResult::RetryAt(hard_limit_until, u64::MAX)); } // check shared rate limits @@ -1057,10 +1063,8 @@ impl Web3Rpc { .await .context(format!("attempting to throttle {}", self))? { - RedisRateLimitResult::Allowed(_) => { - // trace!("rate limit succeeded") - } - RedisRateLimitResult::RetryAt(retry_at, _) => { + x @ RedisRateLimitResult::Allowed(_) => Ok(x), + RedisRateLimitResult::RetryAt(retry_at, count) => { // rate limit gave us a wait time // if not a backup server, warn. backups hit rate limits often if !self.backup { @@ -1076,13 +1080,37 @@ impl Web3Rpc { hard_limit_until.send_replace(retry_at); } - return Ok(OpenRequestResult::RetryAt(retry_at)); + Ok(RedisRateLimitResult::RetryAt(retry_at, count)) } - RedisRateLimitResult::RetryNever => { - warn!("how did retry never on {} happen?", self); - return Ok(OpenRequestResult::NotReady); + x @ RedisRateLimitResult::RetryNever => { + error!("how did retry never on {} happen?", self); + Ok(x) } } + } else { + Ok(RedisRateLimitResult::Allowed(u64::MAX)) + } + } + + pub async fn try_request_handle( + self: &Arc, + web3_request: &Arc, + error_handler: Option, + ) -> Web3ProxyResult { + // TODO: if websocket is reconnecting, return an error? + + // TODO: if this server can't handle this request because it isn't synced, return an error + + // check shared rate limits + match self.try_throttle().await? { + RedisRateLimitResult::Allowed(_) => {} + RedisRateLimitResult::RetryAt(retry_at, _) => { + return Ok(OpenRequestResult::RetryAt(retry_at)); + } + RedisRateLimitResult::RetryNever => { + warn!("how did retry never on {} happen?", self); + return Ok(OpenRequestResult::NotReady); + } }; let handle = @@ -1101,18 +1129,16 @@ impl Web3Rpc { // TODO: think about this more. its hard to do this without being self-referenctial! let web3_request = Web3Request::new_internal(method.into(), params, None, max_wait).await?; - self.authorized_request(&web3_request, error_handler, max_wait) - .await + self.authorized_request(&web3_request, error_handler).await } pub async fn authorized_request( self: &Arc, web3_request: &Arc, error_handler: Option, - max_wait: Option, ) -> Web3ProxyResult { let handle = self - .wait_for_request_handle(web3_request, max_wait, error_handler) + .wait_for_request_handle(web3_request, error_handler) .await?; let response = handle.request().await?;