From 739947792a53c0d6e3b0637956bb613ccadada8a Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 20 Sep 2022 06:00:27 +0000 Subject: [PATCH] instrument more. add max_wait to wait_for_request_handle --- deferred-rate-limiter/src/lib.rs | 13 ++++--- redis-rate-limiter/src/lib.rs | 6 +++- web3_proxy/src/jsonrpc.rs | 12 +++---- web3_proxy/src/rpcs/blockchain.rs | 4 +-- web3_proxy/src/rpcs/connection.rs | 36 ++++++++++++------- web3_proxy/src/rpcs/request.rs | 60 +++++++++++++++---------------- 6 files changed, 73 insertions(+), 58 deletions(-) diff --git a/deferred-rate-limiter/src/lib.rs b/deferred-rate-limiter/src/lib.rs index 1fbd9de3..0754f85e 100644 --- a/deferred-rate-limiter/src/lib.rs +++ b/deferred-rate-limiter/src/lib.rs @@ -8,7 +8,7 @@ use std::sync::atomic::Ordering; use std::sync::{atomic::AtomicU64, Arc}; use tokio::sync::Mutex; use tokio::time::{Duration, Instant}; -use tracing::error; +use tracing::{error, info_span, Instrument}; /// A local cache that sits in front of a RedisRateLimiter /// Generic accross the key so it is simple to use with IPs or user keys @@ -62,7 +62,7 @@ where return Ok(DeferredRateLimitResult::RetryNever); } - let arc_deferred_rate_limit_result = Arc::new(Mutex::new(None)); + let deferred_rate_limit_result = Arc::new(Mutex::new(None)); let redis_key = format!("{}:{}", self.prefix, key); @@ -70,7 +70,7 @@ where // TODO: i'm sure this could be a lot better. but race conditions make this hard to think through. brain needs sleep let local_key_count: Arc = { // clone things outside of the `async move` - let deferred_rate_limit_result = arc_deferred_rate_limit_result.clone(); + let deferred_rate_limit_result = deferred_rate_limit_result.clone(); let redis_key = redis_key.clone(); let rrl = Arc::new(self.rrl.clone()); @@ -118,7 +118,7 @@ where .await }; - let mut locked = arc_deferred_rate_limit_result.lock().await; + let mut locked = deferred_rate_limit_result.lock().await; if let Some(deferred_rate_limit_result) = locked.take() { // new entry. redis was already incremented @@ -184,8 +184,11 @@ where // close to period. don't risk it. wait on redis Ok(rate_limit_f.await) } else { + // TODO: pass the frontend request id through + let span = info_span!("deferred rate limit"); + // rate limit has enough headroom that it should be safe to do this in the background - tokio::spawn(rate_limit_f); + tokio::spawn(rate_limit_f.instrument(span)); Ok(DeferredRateLimitResult::Allowed) } diff --git a/redis-rate-limiter/src/lib.rs b/redis-rate-limiter/src/lib.rs index 2ac85b07..d362e65f 100644 --- a/redis-rate-limiter/src/lib.rs +++ b/redis-rate-limiter/src/lib.rs @@ -86,7 +86,11 @@ impl RedisRateLimiter { // TODO: include max per period in the throttle key? let throttle_key = format!("{}:{}:{}", self.key_prefix, label, period_id); - let mut conn = self.pool.get().await.context("throttle")?; + let mut conn = self + .pool + .get() + .await + .context("get redis connection for rate limits")?; // TODO: at high concurency, i think this is giving errors // TODO: i'm starting to think that bb8 has a bug diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index 82714576..d3b802bf 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -1,7 +1,7 @@ use derive_more::From; use ethers::prelude::{HttpClientError, ProviderError, WsClientError}; -use serde::de::{self, Deserialize, Deserializer, MapAccess, SeqAccess, Visitor}; -use serde::Serialize; +use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor}; +use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; use std::fmt; @@ -11,7 +11,7 @@ fn default_jsonrpc() -> String { "2.0".to_string() } -#[derive(Clone, serde::Deserialize)] +#[derive(Clone, Deserialize)] pub struct JsonRpcRequest { // TODO: skip jsonrpc entirely? its against spec to drop it, but some servers bad #[serde(default = "default_jsonrpc")] @@ -45,7 +45,7 @@ impl<'de> Deserialize<'de> for JsonRpcRequestEnum { where D: Deserializer<'de>, { - #[derive(serde::Deserialize)] + #[derive(Deserialize)] #[serde(field_identifier, rename_all = "lowercase")] enum Field { JsonRpc, @@ -149,7 +149,7 @@ impl<'de> Deserialize<'de> for JsonRpcRequestEnum { // TODO: impl Error on this? /// All jsonrpc errors use this structure -#[derive(Serialize, Clone)] +#[derive(Deserialize, Serialize, Clone)] pub struct JsonRpcErrorData { /// The error code pub code: i64, @@ -161,7 +161,7 @@ pub struct JsonRpcErrorData { } /// A complete response -#[derive(Clone, Serialize)] +#[derive(Clone, Deserialize, Serialize)] pub struct JsonRpcForwardedResponse { // TODO: jsonrpc a &str? #[serde(default = "default_jsonrpc")] diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index e631bcc0..a847f2e2 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -14,6 +14,7 @@ use serde::Serialize; use serde_json::json; use std::{cmp::Ordering, fmt::Display, sync::Arc}; use tokio::sync::{broadcast, watch}; +use tokio::time::Duration; use tracing::{debug, trace, warn}; // TODO: type for Hydrated Blocks with their full transactions? @@ -87,7 +88,6 @@ impl Web3Connections { /// Get a block from caches with fallback. /// Will query a specific node or the best available. - /// WARNING! If rpc is specified, this may wait forever. be sure this runs with your own timeout pub async fn block( &self, hash: &H256, @@ -104,7 +104,7 @@ impl Web3Connections { // TODO: if error, retry? let block: Block = match rpc { Some(rpc) => { - rpc.wait_for_request_handle() + rpc.wait_for_request_handle(Duration::from_secs(30)) .await? .request("eth_getBlockByHash", get_block_params, false) .await? diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index ccb8c328..aa1cfc76 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -20,7 +20,7 @@ use std::sync::atomic::{self, AtomicU32, AtomicU64}; use std::{cmp::Ordering, sync::Arc}; use tokio::sync::broadcast; use tokio::sync::RwLock as AsyncRwLock; -use tokio::time::{interval, sleep, sleep_until, Duration, MissedTickBehavior}; +use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; use tracing::{debug, error, info, instrument, trace, warn}; /// An active connection to a Web3 RPC server like geth or erigon. @@ -112,9 +112,9 @@ impl Web3Connection { // check the server's chain_id here // TODO: move this outside the `new` function and into a `start` function or something. that way we can do retries from there // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error - // TODO: this will wait forever. do we want that? + // TODO: what should the timeout be? let found_chain_id: Result = new_connection - .wait_for_request_handle() + .wait_for_request_handle(Duration::from_secs(30)) .await? .request("eth_chainId", Option::None::<()>, false) .await; @@ -199,8 +199,9 @@ impl Web3Connection { .max(U64::one()); // TODO: wait for the handle BEFORE we check the current block number. it might be delayed too! + // TODO: what should the request be? let archive_result: Result = self - .wait_for_request_handle() + .wait_for_request_handle(Duration::from_secs(30)) .await? .request( "eth_getCode", @@ -532,8 +533,8 @@ impl Web3Connection { let mut last_hash = H256::zero(); loop { - // TODO: try, or wait_for? - match self.wait_for_request_handle().await { + // TODO: what should the max_wait be? + match self.wait_for_request_handle(Duration::from_secs(30)).await { Ok(active_request_handle) => { let block: Result, _> = active_request_handle .request("eth_getBlockByNumber", ("latest", false), false) @@ -596,7 +597,8 @@ impl Web3Connection { } Web3Provider::Ws(provider) => { // todo: move subscribe_blocks onto the request handle? - let active_request_handle = self.wait_for_request_handle().await; + let active_request_handle = + self.wait_for_request_handle(Duration::from_secs(30)).await; let mut stream = provider.subscribe_blocks().await?; drop(active_request_handle); @@ -604,7 +606,7 @@ impl Web3Connection { // there is a very small race condition here where the stream could send us a new block right now // all it does is print "new block" for the same block as current block let block: Result, _> = self - .wait_for_request_handle() + .wait_for_request_handle(Duration::from_secs(30)) .await? .request("eth_getBlockByNumber", ("latest", false), false) .await @@ -691,7 +693,8 @@ impl Web3Connection { } Web3Provider::Ws(provider) => { // TODO: maybe the subscribe_pending_txs function should be on the active_request_handle - let active_request_handle = self.wait_for_request_handle().await; + let active_request_handle = + self.wait_for_request_handle(Duration::from_secs(30)).await; let mut stream = provider.subscribe_pending_txs().await?; @@ -718,10 +721,12 @@ impl Web3Connection { } /// be careful with this; it might wait forever! - // TODO: maximum wait time? #[instrument] - pub async fn wait_for_request_handle(self: &Arc) -> anyhow::Result { - // TODO: maximum wait time? i think timeouts in other parts of the code are probably best + pub async fn wait_for_request_handle( + self: &Arc, + max_wait: Duration, + ) -> anyhow::Result { + let max_wait = Instant::now() + max_wait; loop { let x = self.try_request_handle().await; @@ -733,13 +738,18 @@ impl Web3Connection { Ok(OpenRequestResult::RetryAt(retry_at)) => { // TODO: emit a stat? trace!(?retry_at); + + if retry_at > max_wait { + // break now since we will wait past our maximum wait time + return Err(anyhow::anyhow!("timeout waiting for request handle")); + } sleep_until(retry_at).await; } Ok(OpenRequestResult::RetryNever) => { // TODO: when can this happen? log? emit a stat? // TODO: subscribe to the head block on this // TODO: sleep how long? maybe just error? - return Err(anyhow::anyhow!("unable to retry")); + return Err(anyhow::anyhow!("unable to retry for request handle")); } Err(err) => return Err(err), } diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 2e8c8224..8587b741 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -1,13 +1,14 @@ use super::connection::Web3Connection; use super::provider::Web3Provider; +use ethers::providers::ProviderError; use metered::metered; use metered::ErrorCount; use metered::HitCount; use metered::ResponseTime; use metered::Throughput; +use parking_lot::Mutex; use std::fmt; use std::sync::atomic; -use std::sync::atomic::AtomicBool; use std::sync::Arc; use tokio::time::{sleep, Duration, Instant}; use tracing::warn; @@ -25,20 +26,20 @@ pub enum OpenRequestResult { /// Make RPC requests through this handle and drop it when you are done. #[derive(Debug)] pub struct OpenRequestHandle { - conn: Arc, - // TODO: this is the same metrics on the conn. use a reference + conn: Mutex>>, + // TODO: this is the same metrics on the conn. use a reference? metrics: Arc, - decremented: AtomicBool, } #[metered(registry = OpenRequestHandleMetrics, visibility = pub)] impl OpenRequestHandle { pub fn new(conn: Arc) -> Self { + // TODO: take request_id as an argument? // TODO: attach a unique id to this? customer requests have one, but not internal queries // TODO: what ordering?! // TODO: should we be using metered, or not? i think not because we want stats for each handle // TODO: these should maybe be sent to an influxdb instance? - conn.active_requests.fetch_add(1, atomic::Ordering::AcqRel); + conn.active_requests.fetch_add(1, atomic::Ordering::Relaxed); // TODO: handle overflows? // TODO: what ordering? @@ -46,22 +47,21 @@ impl OpenRequestHandle { let metrics = conn.open_request_handle_metrics.clone(); - let decremented = false.into(); + let conn = Mutex::new(Some(conn)); - Self { - conn, - metrics, - decremented, - } + Self { conn, metrics } } pub fn clone_connection(&self) -> Arc { - self.conn.clone() + if let Some(conn) = self.conn.lock().as_ref() { + conn.clone() + } else { + unimplemented!("this shouldn't happen") + } } /// Send a web3 request /// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented - /// By taking self here, we ensure that this is dropped after the request is complete. /// TODO: we no longer take self because metered doesn't like that /// TODO: ErrorCount includes too many types of errors, such as transaction reverts #[instrument(skip_all)] @@ -70,23 +70,30 @@ impl OpenRequestHandle { &self, method: &str, params: T, + // TODO: change this to error_log_level? silent_errors: bool, - ) -> Result + ) -> Result where T: fmt::Debug + serde::Serialize + Send + Sync, R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug, { + let conn = self + .conn + .lock() + .take() + .expect("cannot use request multiple times"); + // TODO: use tracing spans properly // TODO: requests from customers have request ids, but we should add // TODO: including params in this is way too verbose - trace!(rpc=%self.conn, %method, "request"); + trace!(rpc=%conn, %method, "request"); let mut provider = None; while provider.is_none() { - match self.conn.provider.read().await.as_ref() { + match conn.provider.read().await.as_ref() { None => { - warn!(rpc=%self.conn, "no provider!"); + warn!(rpc=%conn, "no provider!"); // TODO: how should this work? a reconnect should be in progress. but maybe force one now? // TODO: sleep how long? subscribe to something instead? sleep(Duration::from_millis(100)).await @@ -100,22 +107,18 @@ impl OpenRequestHandle { Web3Provider::Ws(provider) => provider.request(method, params).await, }; - self.decremented.store(true, atomic::Ordering::Release); - self.conn - .active_requests - .fetch_sub(1, atomic::Ordering::AcqRel); - // todo: do something to make sure this doesn't get called again? i miss having the function sig have self + // no need to do conn.active_requests.fetch_sub because Drop will do that // TODO: i think ethers already has trace logging (and does it much more fancy) if let Err(err) = &response { if !silent_errors { // TODO: this isn't always bad. missing trie node while we are checking initial - warn!(?err, %method, rpc=%self.conn, "bad response!"); + warn!(?err, %method, rpc=%conn, "bad response!"); } } else { // TODO: opt-in response inspection to log reverts with their request. put into redis or what? // trace!(rpc=%self.0, %method, ?response); - trace!(%method, rpc=%self.conn, "response"); + trace!(%method, rpc=%conn, "response"); } response @@ -124,13 +127,8 @@ impl OpenRequestHandle { impl Drop for OpenRequestHandle { fn drop(&mut self) { - if self.decremented.load(atomic::Ordering::Acquire) { - // we already decremented from a successful request - return; + if let Some(conn) = self.conn.lock().take() { + conn.active_requests.fetch_sub(1, atomic::Ordering::AcqRel); } - - self.conn - .active_requests - .fetch_sub(1, atomic::Ordering::AcqRel); } }