From 5bec8bb5b92c4dbf022694d45dd4646ed5af1930 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 5 Dec 2022 13:13:36 -0800 Subject: [PATCH] much smarter connection logic --- .../src/bin/web3_proxy_cli/cost_calculator.rs | 30 +- web3_proxy/src/frontend/errors.rs | 15 + web3_proxy/src/rpcs/blockchain.rs | 33 +- web3_proxy/src/rpcs/connection.rs | 525 +++++++++++------- web3_proxy/src/rpcs/connections.rs | 43 +- web3_proxy/src/rpcs/provider.rs | 1 - web3_proxy/src/rpcs/request.rs | 36 +- web3_proxy/src/rpcs/transactions.rs | 2 +- web3_proxy/src/user_queries.rs | 10 +- 9 files changed, 429 insertions(+), 266 deletions(-) diff --git a/web3_proxy/src/bin/web3_proxy_cli/cost_calculator.rs b/web3_proxy/src/bin/web3_proxy_cli/cost_calculator.rs index 747d08b2..218709d5 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/cost_calculator.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/cost_calculator.rs @@ -70,12 +70,12 @@ impl CostCalculatorCommand { #[derive(Debug, FromQueryResult)] struct SelectResult { pub total_frontend_requests: Decimal, - pub total_backend_retries: Decimal, - pub total_cache_misses: Decimal, + // pub total_backend_retries: Decimal, + // pub total_cache_misses: Decimal, pub total_cache_hits: Decimal, pub total_response_bytes: Decimal, pub total_error_responses: Decimal, - pub total_response_millis: Decimal, + // pub total_response_millis: Decimal, pub first_period_datetime: DateTimeUtc, pub last_period_datetime: DateTimeUtc, } @@ -86,14 +86,14 @@ impl CostCalculatorCommand { rpc_accounting::Column::FrontendRequests.sum(), "total_frontend_requests", ) - .column_as( - rpc_accounting::Column::BackendRequests.sum(), - "total_backend_retries", - ) - .column_as( - rpc_accounting::Column::CacheMisses.sum(), - "total_cache_misses", - ) + // .column_as( + // rpc_accounting::Column::BackendRequests.sum(), + // "total_backend_retries", + // ) + // .column_as( + // rpc_accounting::Column::CacheMisses.sum(), + // "total_cache_misses", + // ) .column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits") .column_as( rpc_accounting::Column::SumResponseBytes.sum(), @@ -104,10 +104,10 @@ impl CostCalculatorCommand { rpc_accounting::Column::ErrorResponse.sum(), "total_error_responses", ) - .column_as( - rpc_accounting::Column::SumResponseMillis.sum(), - "total_response_millis", - ) + // .column_as( + // rpc_accounting::Column::SumResponseMillis.sum(), + // "total_response_millis", + // ) .column_as( rpc_accounting::Column::PeriodDatetime.min(), "first_period_datetime", diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 5ec03a5f..ff3a1d1f 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -24,6 +24,7 @@ pub type FrontendResult = Result; // TODO: #[derive(Debug, From)] pub enum FrontendErrorResponse { + AccessDenied, Anyhow(anyhow::Error), Box(Box), Database(DbErr), @@ -45,7 +46,21 @@ pub enum FrontendErrorResponse { impl IntoResponse for FrontendErrorResponse { fn into_response(self) -> Response { // TODO: include the request id in these so that users can give us something that will point to logs + // TODO: status code is in the jsonrpc response and is also the first item in the tuple. DRY let (status_code, response) = match self { + Self::AccessDenied => { + // TODO: attach something to this trace. probably don't include much in the message though. don't want to leak creds by accident + trace!("access denied"); + ( + StatusCode::FORBIDDEN, + JsonRpcForwardedResponse::from_string( + // TODO: is it safe to expose all of our anyhow strings? + "FORBIDDEN".to_string(), + Some(StatusCode::FORBIDDEN.as_u16().into()), + None, + ), + ) + } Self::Anyhow(err) => { warn!("anyhow. err={:?}", err); ( diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index d3d4ef4d..0029a096 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -29,11 +29,21 @@ pub type BlockHashesCache = Cache Self { + let mut x = Self { block, lag: 0 }; + + // no need to recalulate lag every time + // if the head block gets too old, a health check restarts this connection + x.lag = x.lag(); + + x + } + + pub fn lag(&self) -> u64 { // TODO: read this from a global config. different chains should probably have different gaps. let allowed_lag: u64 = 60; @@ -45,18 +55,15 @@ impl SavedBlock { // TODO: is this safe enough? what if something about the chain is actually lagged? what if its a chain like BTC with 10 minute blocks? let oldest_allowed = now - Duration::from_secs(allowed_lag); - let block_timestamp = Duration::from_secs(block.timestamp.as_u64()); + let block_timestamp = Duration::from_secs(self.block.timestamp.as_u64()); - // TODO: recalculate this every time? - let lag = if block_timestamp < oldest_allowed { + if block_timestamp < oldest_allowed { // this server is still syncing from too far away to serve requests // u64 is safe because ew checked equality above (oldest_allowed - block_timestamp).as_secs() as u64 } else { 0 - }; - - Self { block, lag } + } } pub fn hash(&self) -> H256 { @@ -143,7 +150,7 @@ impl Web3Connections { // TODO: if error, retry? let block: ArcBlock = match rpc { Some(rpc) => { - rpc.wait_for_request_handle(authorization, Duration::from_secs(30)) + rpc.wait_for_request_handle(authorization, Duration::from_secs(30), false) .await? .request( "eth_getBlockByHash", @@ -301,9 +308,6 @@ impl Web3Connections { // add the rpc's block to connection_heads, or remove the rpc from connection_heads let rpc_head_block = match rpc_head_block { Some(rpc_head_block) => { - let rpc_head_num = rpc_head_block.number(); - let rpc_head_hash = rpc_head_block.hash(); - // we don't know if its on the heaviest chain yet self.save_block(&rpc_head_block.block, false).await?; @@ -314,6 +318,8 @@ impl Web3Connections { None } else { + let rpc_head_hash = rpc_head_block.hash(); + if let Some(prev_hash) = connection_heads.insert(rpc.name.to_owned(), rpc_head_hash) { @@ -490,10 +496,11 @@ impl Web3Connections { .filter_map(|conn_name| self.conns.get(conn_name).cloned()) .collect(); - let consensus_head_hash = maybe_head_block + // TODO: DEBUG only check + let _ = maybe_head_block .hash .expect("head blocks always have hashes"); - let consensus_head_num = maybe_head_block + let _ = maybe_head_block .number .expect("head blocks always have numbers"); diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 6dd8e628..16941e71 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -7,6 +7,7 @@ use crate::config::BlockAndRpc; use crate::frontend::authorization::Authorization; use anyhow::Context; use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; +use ethers::types::{Block, U256}; use futures::future::try_join_all; use futures::StreamExt; use log::{debug, error, info, trace, warn, Level}; @@ -23,9 +24,38 @@ use std::sync::atomic::{self, AtomicU32, AtomicU64}; use std::{cmp::Ordering, sync::Arc}; use thread_fast_rng::rand::Rng; use thread_fast_rng::thread_fast_rng; -use tokio::sync::broadcast; -use tokio::sync::RwLock as AsyncRwLock; -use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; +use tokio::sync::{broadcast, oneshot, RwLock as AsyncRwLock}; +use tokio::time::{interval, sleep, sleep_until, timeout, Duration, Instant, MissedTickBehavior}; + +#[derive(Debug)] +pub enum ProviderState { + None, + NotReady(Arc), + Ready(Arc), +} + +impl ProviderState { + pub async fn provider(&self, allow_not_ready: bool) -> Option<&Arc> { + match self { + ProviderState::None => None, + ProviderState::NotReady(x) => { + if allow_not_ready { + Some(x) + } else { + // TODO: do a ready check here? + None + } + } + ProviderState::Ready(x) => { + if x.ready() { + Some(x) + } else { + None + } + } + } + } +} /// An active connection to a Web3 RPC server like geth or erigon. pub struct Web3Connection { @@ -43,12 +73,14 @@ pub struct Web3Connection { pub(super) internal_requests: AtomicU64, /// provider is in a RwLock so that we can replace it if re-connecting /// it is an async lock because we hold it open across awaits - pub(super) provider: AsyncRwLock>>, + pub(super) provider_state: AsyncRwLock, /// rate limits are stored in a central redis so that multiple proxies can share their rate limits /// We do not use the deferred rate limiter because going over limits would cause errors pub(super) hard_limit: Option, /// used for load balancing to the least loaded server pub(super) soft_limit: u32, + /// use web3 queries to find the block data limit for archive/pruned nodes + pub(super) automatic_block_limit: bool, /// TODO: have an enum for this so that "no limit" prints pretty? pub(super) block_data_limit: AtomicU64, /// Lower weight are higher priority when sending requests. 0 to 99. @@ -97,7 +129,10 @@ impl Web3Connection { // turn weight 0 into 100% and weight 100 into 0% let weight = (100 - weight) as f64 / 100.0; - let block_data_limit = block_data_limit.unwrap_or_default().into(); + // TODO: should we do this even if block_sender is None? then we would know limits on private relays + let block_data_limit: AtomicU64 = block_data_limit.unwrap_or_default().into(); + let automatic_block_limit = + (block_data_limit.load(atomic::Ordering::Acquire) == 0) && block_sender.is_some(); let new_connection = Self { name, @@ -107,9 +142,10 @@ impl Web3Connection { active_requests: 0.into(), frontend_requests: 0.into(), internal_requests: 0.into(), - provider: AsyncRwLock::new(None), + provider_state: AsyncRwLock::new(ProviderState::None), hard_limit, soft_limit, + automatic_block_limit, block_data_limit, head_block: RwLock::new(Default::default()), weight, @@ -118,89 +154,27 @@ impl Web3Connection { let new_connection = Arc::new(new_connection); - // connect to the server (with retries) - // TODO: PROBLEM! THIS RETRIES FOREVER AND BLOCKS THE APP STARTING - new_connection - .retrying_reconnect(block_sender.as_ref(), false) - .await?; - - let authorization = Arc::new(Authorization::internal(db_conn)?); - - // check the server's chain_id here - // TODO: move this outside the `new` function and into a `start` function or something. that way we can do retries from there - // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error - // TODO: what should the timeout be? - let found_chain_id: Result = new_connection - .wait_for_request_handle(&authorization, Duration::from_secs(30)) - .await? - .request( - "eth_chainId", - &json!(Option::None::<()>), - Level::Error.into(), - ) - .await; - - match found_chain_id { - Ok(found_chain_id) => { - // TODO: there has to be a cleaner way to do this - if chain_id != found_chain_id.as_u64() { - return Err(anyhow::anyhow!( - "incorrect chain id! Config has {}, but RPC has {}", - chain_id, - found_chain_id - ) - .context(format!("failed @ {}", new_connection.name))); - } - } - Err(e) => { - let e = anyhow::Error::from(e).context(format!("failed @ {}", new_connection.name)); - return Err(e); - } - } - - // TODO: should we do this even if block_sender is None? then we would know limits on private relays - let check_block_limit_needed = (new_connection - .block_data_limit - .load(atomic::Ordering::Acquire) - == 0) - && block_sender.is_some(); - // subscribe to new blocks and new transactions + // subscribing starts the connection (with retries) // TODO: make transaction subscription optional (just pass None for tx_id_sender) let handle = { let new_connection = new_connection.clone(); - let authorization = authorization.clone(); + let authorization = Arc::new(Authorization::internal(db_conn)?); tokio::spawn(async move { new_connection .subscribe( &authorization, - http_interval_sender, block_map, block_sender, - tx_id_sender, + chain_id, + http_interval_sender, reconnect, + tx_id_sender, ) .await }) }; - // we could take "archive" as a parameter, but we would want a safety check on it regardless - // check common archive thresholds - // TODO: would be great if rpcs exposed this - // TODO: move this to a helper function so we can recheck on errors or as the chain grows - // TODO: move this to a helper function that checks - if check_block_limit_needed { - // TODO: make sure the server isn't still syncing - - // TODO: don't sleep. wait for new heads subscription instead - // TODO: i think instead of atomics, we could maybe use a watch channel - sleep(Duration::from_millis(250)).await; - - new_connection - .check_block_data_limit(&authorization) - .await?; - } - Ok((new_connection, handle)) } @@ -208,32 +182,68 @@ impl Web3Connection { self: &Arc, authorization: &Arc, ) -> anyhow::Result> { + if !self.automatic_block_limit { + // TODO: is this a good thing to return + return Ok(None); + } + let mut limit = None; + // check if we are synced + let head_block: ArcBlock = self + .wait_for_request_handle(authorization, Duration::from_secs(30), true) + .await? + .request( + "eth_getBlockByNumber", + &json!(("latest", false)), + // error here are expected, so keep the level low + Level::Debug.into(), + ) + .await?; + + if SavedBlock::from(head_block).syncing() { + // if the node is syncing, we can't check its block data limit + // TODO: once a node stops syncing, how do we make sure this is run? + self.block_data_limit.store(0, atomic::Ordering::Release); + return Ok(Some(0)); + } + + // TODO: add SavedBlock to self? probably best not to. we might not get marked Ready + // TODO: binary search between 90k and max? - // TODO: start at 0 or 1 + // TODO: start at 0 or 1? for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] { - let mut head_block_id = self.head_block.read().clone(); + let handle = self + .wait_for_request_handle(authorization, Duration::from_secs(30), true) + .await?; - // TODO: subscribe to a channel instead of polling. subscribe to http_interval_sender? - while head_block_id.is_none() { - warn!("no head block yet. retrying rpc {}", self); + let head_block_num_future = handle.request::, U256>( + "eth_blockNumber", + &None, + // error here are expected, so keep the level low + Level::Debug.into(), + ); - // TODO: sleep for the block time, or maybe subscribe to a channel instead of this simple pull - sleep(Duration::from_secs(13)).await; + let head_block_num = timeout(Duration::from_secs(5), head_block_num_future) + .await + .context("timeout fetching eth_blockNumber")? + .context("provider error")?; - head_block_id = self.head_block.read().clone(); - } - let head_block_num = head_block_id.expect("is_none was checked above").number(); - - // TODO: subtract 1 from block_data_limit for safety? let maybe_archive_block = head_block_num.saturating_sub((block_data_limit).into()); + trace!( + "checking maybe_archive_block on {}: {}", + self, + maybe_archive_block + ); + // TODO: wait for the handle BEFORE we check the current block number. it might be delayed too! // TODO: what should the request be? - let archive_result: Result = self - .wait_for_request_handle(authorization, Duration::from_secs(30)) - .await? + let handle = self + .wait_for_request_handle(authorization, Duration::from_secs(30), true) + .await?; + + let archive_result: Result = handle .request( "eth_getCode", &json!(( @@ -246,9 +256,10 @@ impl Web3Connection { .await; trace!( - "archive_result on {} for {}: {:?}", + "archive_result on {} for {} ({}): {:?}", + self, block_data_limit, - self.name, + maybe_archive_block, archive_result ); @@ -304,10 +315,12 @@ impl Web3Connection { /// reconnect to the provider. errors are retried forever with exponential backoff with jitter. /// We use the "Decorrelated" jitter from /// TODO: maybe it would be better to use "Full Jitter". The "Full Jitter" approach uses less work, but slightly more time. - pub async fn retrying_reconnect( + pub async fn retrying_connect( self: &Arc, block_sender: Option<&flume::Sender>, - initial_sleep: bool, + chain_id: u64, + db_conn: Option<&DatabaseConnection>, + delay_start: bool, ) -> anyhow::Result<()> { // there are several crates that have retry helpers, but they all seem more complex than necessary // TODO: move this backoff logic into a helper function so we can use it when doing database locking @@ -317,7 +330,7 @@ impl Web3Connection { // sleep once before the initial retry attempt // TODO: now that we use this method for our initial connection, do we still want this sleep? - let mut sleep_ms = if initial_sleep { + let mut sleep_ms = if delay_start { let first_sleep_ms = min( cap_ms, thread_fast_rng().gen_range(base_ms..(base_ms * range_multiplier)), @@ -334,8 +347,8 @@ impl Web3Connection { }; // retry until we succeed - while let Err(err) = self.reconnect(block_sender).await { - // thread_rng is crytographically secure. we don't need that, but we don't need this super efficient so its fine + while let Err(err) = self.connect(block_sender, chain_id, db_conn).await { + // thread_rng is crytographically secure. we don't need that here sleep_ms = min( cap_ms, thread_fast_rng().gen_range(base_ms..(sleep_ms * range_multiplier)), @@ -352,56 +365,120 @@ impl Web3Connection { sleep(retry_in).await; } + info!("connected to {}", self); + Ok(()) } - /// reconnect a websocket provider - pub async fn reconnect( + /// connect to the web3 provider + async fn connect( self: &Arc, - // websocket doesn't need the http client block_sender: Option<&flume::Sender>, + chain_id: u64, + db_conn: Option<&DatabaseConnection>, ) -> anyhow::Result<()> { - // since this lock is held open over an await, we use tokio's locking - // TODO: timeout on this lock. if its slow, something is wrong - let mut provider_option = self.provider.write().await; + trace!("provider_state {} locking...", self); + let mut provider_state = self.provider_state.write().await; + trace!("provider_state {} locked: {:?}", self, provider_state); - if let Some(provider) = &*provider_option { - match provider.as_ref() { - Web3Provider::Http(_) => { - // http clients don't need to do anything for reconnecting - // they *do* need to run this function to setup the first + match &*provider_state { + ProviderState::None => { + info!("connecting to {}", self); + } + ProviderState::NotReady(provider) | ProviderState::Ready(provider) => { + // disconnect the current provider + if let Web3Provider::Mock = provider.as_ref() { return Ok(()); } - Web3Provider::Ws(_) => {} - Web3Provider::Mock => return Ok(()), + + trace!("Reconnecting to {}", self); + + // disconnect the current provider + *provider_state = ProviderState::None; + + // reset sync status + trace!("locking head block on {}", self); + { + let mut head_block = self.head_block.write(); + *head_block = None; + } + trace!("done with head block on {}", self); + + // tell the block subscriber that we don't have any blocks + if let Some(block_sender) = block_sender { + block_sender + .send_async((None, self.clone())) + .await + .context("block_sender during connect")?; + } } - - info!("Reconnecting to {}", self); - - // disconnect the current provider - *provider_option = None; - - // reset sync status - { - let mut head_block_id = self.head_block.write(); - *head_block_id = None; - } - - // tell the block subscriber that we don't have any blocks - if let Some(block_sender) = &block_sender { - block_sender - .send_async((None, self.clone())) - .await - .context("block_sender during connect")?; - } - } else { - info!("connecting to {}", self); } + trace!("Creating new Web3Provider on {}", self); // TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again! let new_provider = Web3Provider::from_str(&self.url, self.http_client.clone()).await?; - *provider_option = Some(Arc::new(new_provider)); + // TODO: if an error happens, + *provider_state = ProviderState::NotReady(Arc::new(new_provider)); + + // drop the lock so that we can get a request handle + trace!("provider_state {} unlocked", self); + drop(provider_state); + + let authorization = Arc::new(Authorization::internal(db_conn.cloned())?); + + // check the server's chain_id here + // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error + // TODO: what should the timeout be? should there be a request timeout? + trace!("waiting on chain id for {}", self); + let found_chain_id: Result = self + .wait_for_request_handle(&authorization, Duration::from_secs(30), true) + .await? + .request( + "eth_chainId", + &json!(Option::None::<()>), + Level::Trace.into(), + ) + .await; + + trace!("found_chain_id: {:?}", found_chain_id); + + match found_chain_id { + Ok(found_chain_id) => { + // TODO: there has to be a cleaner way to do this + if chain_id != found_chain_id.as_u64() { + return Err(anyhow::anyhow!( + "incorrect chain id! Config has {}, but RPC has {}", + chain_id, + found_chain_id + ) + .context(format!("failed @ {}", self))); + } + } + Err(e) => { + return Err(anyhow::Error::from(e)); + } + } + + // we could take "archive" as a parameter, but we would want a safety check on it regardless + // check common archive thresholds + // TODO: would be great if rpcs exposed this + // TODO: move this to a helper function so we can recheck on errors or as the chain grows + // TODO: move this to a helper function that checks + self.check_block_data_limit(&authorization).await?; + + { + let mut provider_state = self.provider_state.write().await; + + // TODO: do this without a clone + let ready_provider = provider_state + .provider(true) + .await + .context("provider missing")? + .clone(); + + *provider_state = ProviderState::Ready(ready_provider); + } info!("successfully connected to {}", self); @@ -413,11 +490,6 @@ impl Web3Connection { self.active_requests.load(atomic::Ordering::Acquire) } - #[inline] - pub async fn has_provider(&self) -> bool { - self.provider.read().await.is_some() - } - async fn send_head_block_result( self: &Arc, new_head_block: Result, ProviderError>, @@ -483,22 +555,82 @@ impl Web3Connection { } /// subscribe to blocks and transactions with automatic reconnects + /// This should only exit when the program is exiting. + /// TODO: should more of these args be on self? + #[allow(clippy::too_many_arguments)] async fn subscribe( self: Arc, authorization: &Arc, - http_interval_sender: Option>>, block_map: BlockHashesCache, block_sender: Option>, - tx_id_sender: Option)>>, + chain_id: u64, + http_interval_sender: Option>>, reconnect: bool, + tx_id_sender: Option)>>, ) -> anyhow::Result<()> { loop { - debug!("subscribing to {}", self); - let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe()); let mut futures = vec![]; + { + // health check + // TODO: move this into a proper function + let authorization = authorization.clone(); + let block_sender = block_sender.clone(); + let conn = self.clone(); + let (ready_tx, ready_rx) = oneshot::channel(); + let f = async move { + // initial sleep to allow for the initial connection + conn.retrying_connect( + block_sender.as_ref(), + chain_id, + authorization.db_conn.as_ref(), + false, + ) + .await?; + + // provider is ready + ready_tx.send(()).unwrap(); + + // wait before doing the initial health check + // TODO: how often? + let health_sleep_seconds = 10; + sleep(Duration::from_secs(health_sleep_seconds)).await; + + loop { + // TODO: what if we just happened to have this check line up with another restart? + // TODO: think more about this + trace!("health check on {}", conn); + if conn + .provider_state + .read() + .await + .provider(false) + .await + .is_none() + { + // returning error will trigger a reconnect + return Err(anyhow::anyhow!("{} is not ready", conn)); + } + + if let Some(x) = &*conn.head_block.read() { + // if this block is too old, return an error so we reconnect + if x.lag() > 0 { + return Err(anyhow::anyhow!("provider is lagged")); + } + } + + sleep(Duration::from_secs(health_sleep_seconds)).await; + } + }; + + futures.push(flatten_handle(tokio::spawn(f))); + + // wait on the initial connection + ready_rx.await?; + } + if let Some(block_sender) = &block_sender { let f = self.clone().subscribe_new_heads( authorization.clone(), @@ -518,32 +650,6 @@ impl Web3Connection { futures.push(flatten_handle(tokio::spawn(f))); } - { - // TODO: move this into a proper function - let conn = self.clone(); - // health check - let f = async move { - loop { - if let Some(provider) = conn.provider.read().await.as_ref() { - if provider.ready() { - // // trace!(rpc=%conn, "provider is ready"); - } else { - warn!("rpc {} is NOT ready", conn); - // returning error will trigger a reconnect - // TODO: what if we just happened to have this check line up with another restart? - return Err(anyhow::anyhow!("provider is not ready")); - } - } - - // TODO: how often? - // TODO: also check that the head block has changed recently - sleep(Duration::from_secs(10)).await; - } - }; - - futures.push(flatten_handle(tokio::spawn(f))); - } - match try_join_all(futures).await { Ok(_) => { // futures all exited without error. break instead of restarting subscriptions @@ -551,9 +657,16 @@ impl Web3Connection { } Err(err) => { if reconnect { - warn!("{} subscription exited. err={:?}", self, err); + warn!("{} connected ended. err={:?}", self, err); - self.retrying_reconnect(block_sender.as_ref(), true).await?; + self.clone() + .retrying_connect( + block_sender.as_ref(), + chain_id, + authorization.db_conn.as_ref(), + true, + ) + .await?; } else { error!("{} subscription exited. err={:?}", self, err); return Err(err); @@ -575,11 +688,10 @@ impl Web3Connection { block_sender: flume::Sender, block_map: BlockHashesCache, ) -> anyhow::Result<()> { - info!("watching new heads on {}", self); + trace!("watching new heads on {}", self); - // TODO: is a RwLock of an Option the right thing here? - if let Some(provider) = self.provider.read().await.clone() { - match &*provider { + if let ProviderState::Ready(provider) = &*self.provider_state.read().await { + match provider.as_ref() { Web3Provider::Mock => unimplemented!(), Web3Provider::Http(_provider) => { // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints @@ -592,7 +704,7 @@ impl Web3Connection { loop { // TODO: what should the max_wait be? match self - .wait_for_request_handle(&authorization, Duration::from_secs(30)) + .wait_for_request_handle(&authorization, Duration::from_secs(30), false) .await { Ok(active_request_handle) => { @@ -673,14 +785,12 @@ impl Web3Connection { } } } - - // // trace!(rpc=%self, "ok http interval"); } } Web3Provider::Ws(provider) => { // todo: move subscribe_blocks onto the request handle? let active_request_handle = self - .wait_for_request_handle(&authorization, Duration::from_secs(30)) + .wait_for_request_handle(&authorization, Duration::from_secs(30), false) .await; let mut stream = provider.subscribe_blocks().await?; drop(active_request_handle); @@ -690,7 +800,7 @@ impl Web3Connection { // all it does is print "new block" for the same block as current block // TODO: how does this get wrapped in an arc? does ethers handle that? let block: Result, _> = self - .wait_for_request_handle(&authorization, Duration::from_secs(30)) + .wait_for_request_handle(&authorization, Duration::from_secs(30), false) .await? .request( "eth_getBlockByNumber", @@ -737,12 +847,14 @@ impl Web3Connection { // TODO: is this always an error? // TODO: we probably don't want a warn and to return error warn!("new_heads subscription to {} ended", self); - return Err(anyhow::anyhow!("new_heads subscription ended")); + Err(anyhow::anyhow!("new_heads subscription ended")) } } + } else { + Err(anyhow::anyhow!( + "Provider not ready! Unable to subscribe to heads" + )) } - - Ok(()) } async fn subscribe_pending_transactions( @@ -750,11 +862,9 @@ impl Web3Connection { authorization: Arc, tx_id_sender: flume::Sender<(TxHash, Arc)>, ) -> anyhow::Result<()> { - info!("watching pending transactions on {}", self); - - // TODO: is a RwLock of an Option the right thing here? - if let Some(provider) = self.provider.read().await.clone() { - match &*provider { + if let ProviderState::Ready(provider) = &*self.provider_state.read().await { + trace!("watching pending transactions on {}", self); + match provider.as_ref() { Web3Provider::Mock => unimplemented!(), Web3Provider::Http(provider) => { // there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints @@ -786,7 +896,7 @@ impl Web3Connection { Web3Provider::Ws(provider) => { // TODO: maybe the subscribe_pending_txs function should be on the active_request_handle let active_request_handle = self - .wait_for_request_handle(&authorization, Duration::from_secs(30)) + .wait_for_request_handle(&authorization, Duration::from_secs(30), false) .await; let mut stream = provider.subscribe_pending_txs().await?; @@ -808,26 +918,31 @@ impl Web3Connection { return Err(anyhow::anyhow!("pending_transactions subscription ended")); } } + } else { + warn!( + "Provider not ready! Unable to watch pending transactions on {}", + self + ); } Ok(()) } /// be careful with this; it might wait forever! - + /// `allow_not_ready` is only for use by health checks while starting the provider pub async fn wait_for_request_handle( self: &Arc, authorization: &Arc, max_wait: Duration, + allow_not_ready: bool, ) -> anyhow::Result { let max_wait = Instant::now() + max_wait; loop { - let x = self.try_request_handle(authorization).await; - - // // trace!(?x, "try_request_handle"); - - match x { + match self + .try_request_handle(authorization, allow_not_ready) + .await + { Ok(OpenRequestResult::Handle(handle)) => return Ok(handle), Ok(OpenRequestResult::RetryAt(retry_at)) => { // TODO: emit a stat? @@ -840,7 +955,7 @@ impl Web3Connection { } sleep_until(retry_at).await; } - Ok(OpenRequestResult::NotSynced) => { + Ok(OpenRequestResult::NotReady) => { // TODO: when can this happen? log? emit a stat? // TODO: subscribe to the head block on this // TODO: sleep how long? maybe just error? @@ -855,12 +970,19 @@ impl Web3Connection { pub async fn try_request_handle( self: &Arc, authorization: &Arc, + // TODO? ready_provider: Option<&Arc>, + allow_not_ready: bool, ) -> anyhow::Result { - // check that we are connected - if !self.has_provider().await { + if self + .provider_state + .read() + .await + .provider(allow_not_ready) + .await + .is_none() + { // TODO: emit a stat? - // TODO: wait until we have a provider? - return Ok(OpenRequestResult::NotSynced); + return Ok(OpenRequestResult::NotReady); } // check rate limits @@ -880,7 +1002,7 @@ impl Web3Connection { return Ok(OpenRequestResult::RetryAt(retry_at)); } RedisRateLimitResult::RetryNever => { - return Ok(OpenRequestResult::NotSynced); + return Ok(OpenRequestResult::NotReady); } } }; @@ -1027,9 +1149,10 @@ mod tests { active_requests: 0.into(), frontend_requests: 0.into(), internal_requests: 0.into(), - provider: AsyncRwLock::new(None), + provider_state: AsyncRwLock::new(ProviderState::None), hard_limit: None, soft_limit: 1_000, + automatic_block_limit: false, block_data_limit: block_data_limit.into(), weight: 100.0, head_block: RwLock::new(Some(head_block.clone())), @@ -1072,9 +1195,10 @@ mod tests { active_requests: 0.into(), frontend_requests: 0.into(), internal_requests: 0.into(), - provider: AsyncRwLock::new(None), + provider_state: AsyncRwLock::new(ProviderState::None), hard_limit: None, soft_limit: 1_000, + automatic_block_limit: false, block_data_limit: block_data_limit.into(), weight: 100.0, head_block: RwLock::new(Some(head_block.clone())), @@ -1121,9 +1245,10 @@ mod tests { active_requests: 0.into(), frontend_requests: 0.into(), internal_requests: 0.into(), - provider: AsyncRwLock::new(None), + provider_state: AsyncRwLock::new(ProviderState::None), hard_limit: None, soft_limit: 1_000, + automatic_block_limit: false, block_data_limit: block_data_limit.into(), weight: 100.0, head_block: RwLock::new(Some(head_block.clone())), diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 832d5b2a..dbca28cf 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -154,7 +154,7 @@ impl Web3Connections { let mut connections = HashMap::new(); let mut handles = vec![]; - // TODO: futures unordered? + // TODO: do we need to join this? for x in join_all(spawn_handles).await { // TODO: how should we handle errors here? one rpc being down shouldn't cause the program to exit match x { @@ -405,14 +405,14 @@ impl Web3Connections { ); // TODO: what should happen here? automatic retry? // TODO: more detailed error - return Ok(OpenRequestResult::NotSynced); + return Ok(OpenRequestResult::NotReady); } 1 => { let rpc = usable_rpcs.get(0).expect("len is 1"); // TODO: try or wait for a request handle? let handle = rpc - .wait_for_request_handle(authorization, Duration::from_secs(60)) + .wait_for_request_handle(authorization, Duration::from_secs(60), false) .await?; return Ok(OpenRequestResult::Handle(handle)); @@ -486,7 +486,7 @@ impl Web3Connections { // now that the rpcs are sorted, try to get an active request handle for one of them for rpc in sorted_rpcs.iter() { // increment our connection counter - match rpc.try_request_handle(authorization).await { + match rpc.try_request_handle(authorization, false).await { Ok(OpenRequestResult::Handle(handle)) => { // // trace!("next server on {:?}: {:?}", self, rpc); return Ok(OpenRequestResult::Handle(handle)); @@ -494,7 +494,7 @@ impl Web3Connections { Ok(OpenRequestResult::RetryAt(retry_at)) => { earliest_retry_at = earliest_retry_at.min(Some(retry_at)); } - Ok(OpenRequestResult::NotSynced) => { + Ok(OpenRequestResult::NotReady) => { // TODO: log a warning? } Err(err) => { @@ -516,7 +516,7 @@ impl Web3Connections { let handle = sorted_rpcs .get(0) .expect("at least 1 is available") - .wait_for_request_handle(authorization, Duration::from_secs(3)) + .wait_for_request_handle(authorization, Duration::from_secs(3), false) .await?; Ok(OpenRequestResult::Handle(handle)) @@ -553,13 +553,13 @@ impl Web3Connections { } // check rate limits and increment our connection counter - match connection.try_request_handle(authorization).await { + match connection.try_request_handle(authorization, false).await { Ok(OpenRequestResult::RetryAt(retry_at)) => { // this rpc is not available. skip it earliest_retry_at = earliest_retry_at.min(Some(retry_at)); } Ok(OpenRequestResult::Handle(handle)) => selected_rpcs.push(handle), - Ok(OpenRequestResult::NotSynced) => { + Ok(OpenRequestResult::NotReady) => { warn!("no request handle for {}", connection) } Err(err) => { @@ -682,7 +682,7 @@ impl Web3Connections { // TODO: move this to a helper function // sleep (TODO: with a lock?) until our rate limits should be available // TODO: if a server catches up sync while we are waiting, we could stop waiting - warn!("All rate limits exceeded. Sleeping untill {:?}", retry_at); + warn!("All rate limits exceeded. Sleeping until {:?}", retry_at); // TODO: have a separate column for rate limited? if let Some(request_metadata) = request_metadata { @@ -693,15 +693,12 @@ impl Web3Connections { continue; } - OpenRequestResult::NotSynced => { + OpenRequestResult::NotReady => { if let Some(request_metadata) = request_metadata { request_metadata.no_servers.fetch_add(1, Ordering::Release); } - // TODO: subscribe to something on synced connections. maybe it should just be a watch channel - sleep(Duration::from_millis(200)).await; - - continue; + break; } } } @@ -832,7 +829,7 @@ mod tests { // TODO: why is this allow needed? does tokio::test get in the way somehow? #![allow(unused_imports)] use super::*; - use crate::rpcs::{blockchain::SavedBlock, provider::Web3Provider}; + use crate::rpcs::{blockchain::SavedBlock, connection::ProviderState, provider::Web3Provider}; use ethers::types::{Block, U256}; use log::{trace, LevelFilter}; use parking_lot::RwLock; @@ -886,9 +883,10 @@ mod tests { active_requests: 0.into(), frontend_requests: 0.into(), internal_requests: 0.into(), - provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), + provider_state: AsyncRwLock::new(ProviderState::Ready(Arc::new(Web3Provider::Mock))), hard_limit: None, soft_limit: 1_000, + automatic_block_limit: true, block_data_limit: block_data_limit.into(), weight: 100.0, head_block: RwLock::new(Some(head_block.clone())), @@ -903,9 +901,10 @@ mod tests { active_requests: 0.into(), frontend_requests: 0.into(), internal_requests: 0.into(), - provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), + provider_state: AsyncRwLock::new(ProviderState::Ready(Arc::new(Web3Provider::Mock))), hard_limit: None, soft_limit: 1_000, + automatic_block_limit: false, block_data_limit: block_data_limit.into(), weight: 100.0, head_block: RwLock::new(Some(lagged_block.clone())), @@ -993,7 +992,7 @@ mod tests { dbg!(&x); - assert!(matches!(x, OpenRequestResult::NotSynced)); + assert!(matches!(x, OpenRequestResult::NotReady)); // add lagged blocks to the conns. both servers should be allowed conns.save_block(&lagged_block.block, true).await.unwrap(); @@ -1066,7 +1065,7 @@ mod tests { conns .best_synced_backend_connection(&authorization, None, &[], Some(&2.into())) .await, - Ok(OpenRequestResult::NotSynced) + Ok(OpenRequestResult::NotReady) )); } @@ -1103,9 +1102,10 @@ mod tests { active_requests: 0.into(), frontend_requests: 0.into(), internal_requests: 0.into(), - provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), + provider_state: AsyncRwLock::new(ProviderState::Ready(Arc::new(Web3Provider::Mock))), hard_limit: None, soft_limit: 3_000, + automatic_block_limit: false, block_data_limit: 64.into(), weight: 1.0, head_block: RwLock::new(Some(head_block.clone())), @@ -1120,9 +1120,10 @@ mod tests { active_requests: 0.into(), frontend_requests: 0.into(), internal_requests: 0.into(), - provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), + provider_state: AsyncRwLock::new(ProviderState::Ready(Arc::new(Web3Provider::Mock))), hard_limit: None, soft_limit: 1_000, + automatic_block_limit: false, block_data_limit: u64::MAX.into(), // TODO: does weight = 0 work? weight: 0.01, diff --git a/web3_proxy/src/rpcs/provider.rs b/web3_proxy/src/rpcs/provider.rs index bcffec4e..3093a4e4 100644 --- a/web3_proxy/src/rpcs/provider.rs +++ b/web3_proxy/src/rpcs/provider.rs @@ -14,7 +14,6 @@ pub enum Web3Provider { impl Web3Provider { pub fn ready(&self) -> bool { - // TODO: i'm not sure if this is enough match self { Self::Mock => true, Self::Http(_) => true, diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index da9dcb0d..313944ab 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -8,7 +8,7 @@ use entities::revert_log; use entities::sea_orm_active_enums::Method; use ethers::providers::{HttpClientError, ProviderError, WsClientError}; use ethers::types::{Address, Bytes}; -use log::{debug, error, trace, warn, Level}; +use log::{debug, error, info, trace, warn, Level}; use metered::metered; use metered::HitCount; use metered::ResponseTime; @@ -27,7 +27,7 @@ pub enum OpenRequestResult { /// Unable to start a request. Retry at the given time. RetryAt(Instant), /// Unable to start a request because the server is not synced - NotSynced, + NotReady, } /// Make RPC requests through this handle and drop it when you are done. @@ -194,29 +194,47 @@ impl OpenRequestHandle { // trace!(rpc=%self.conn, %method, "request"); let mut provider = None; - + let mut logged = false; while provider.is_none() { - match self.conn.provider.read().await.clone() { + let ready_provider = self + .conn + .provider_state + .read() + .await + // TODO: hard code true, or take a bool in the `new` function? + .provider(true) + .await + .cloned(); + + match ready_provider { None => { - warn!("no provider for {}!", self.conn); + if !logged { + logged = true; + warn!("no provider for {}!", self.conn); + } + // TODO: how should this work? a reconnect should be in progress. but maybe force one now? // TODO: sleep how long? subscribe to something instead? maybe use a watch handle? // TODO: this is going to be way too verbose! sleep(Duration::from_millis(100)).await } - Some(found_provider) => provider = Some(found_provider), + Some(x) => provider = Some(x), } } - let provider = &*provider.expect("provider was checked already"); + let provider = provider.expect("provider was checked already"); + + // trace!("got provider for {:?}", self); // TODO: really sucks that we have to clone here - let response = match provider { + let response = match &*provider { Web3Provider::Mock => unimplemented!(), Web3Provider::Http(provider) => provider.request(method, params).await, Web3Provider::Ws(provider) => provider.request(method, params).await, }; + // trace!("got response for {:?}: {:?}", self, response); + if let Err(err) = &response { // only save reverts for some types of calls // TODO: do something special for eth_sendRawTransaction too @@ -255,7 +273,7 @@ impl OpenRequestHandle { // check for "execution reverted" here let is_revert = if let ProviderError::JsonRpcClientError(err) = err { // Http and Ws errors are very similar, but different types - let msg = match provider { + let msg = match &*provider { Web3Provider::Mock => unimplemented!(), Web3Provider::Http(_) => { if let Some(HttpClientError::JsonRpcError(err)) = diff --git a/web3_proxy/src/rpcs/transactions.rs b/web3_proxy/src/rpcs/transactions.rs index 88f4db9f..cc5a4011 100644 --- a/web3_proxy/src/rpcs/transactions.rs +++ b/web3_proxy/src/rpcs/transactions.rs @@ -28,7 +28,7 @@ impl Web3Connections { // TODO: might not be a race. might be a nonce thats higher than the current account nonce. geth discards chains // TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself // TODO: if one rpc fails, try another? - let tx: Transaction = match rpc.try_request_handle(authorization).await { + let tx: Transaction = match rpc.try_request_handle(authorization, false).await { Ok(OpenRequestResult::Handle(handle)) => { handle .request( diff --git a/web3_proxy/src/user_queries.rs b/web3_proxy/src/user_queries.rs index 03c6e660..d4e3ae81 100644 --- a/web3_proxy/src/user_queries.rs +++ b/web3_proxy/src/user_queries.rs @@ -22,7 +22,7 @@ pub async fn get_user_id_from_params( // this is a long type. should we strip it down? bearer: Option>>, params: &HashMap, -) -> anyhow::Result { +) -> Result { match (bearer, params.get("user_id")) { (Some(TypedHeader(Authorization(bearer))), Some(user_id)) => { // check for the bearer cache key @@ -38,8 +38,7 @@ pub async fn get_user_id_from_params( let user_id: u64 = user_id.parse().context("Parsing user_id param")?; if bearer_user_id != user_id { - // TODO: proper HTTP Status code - Err(anyhow::anyhow!("permission denied")) + Err(FrontendErrorResponse::AccessDenied) } else { Ok(bearer_user_id) } @@ -49,13 +48,12 @@ pub async fn get_user_id_from_params( // 0 means all Ok(0) } - (None, Some(x)) => { + (None, Some(_)) => { // they do not have a bearer token, but requested a specific id. block // TODO: proper error code from a useful error code // TODO: maybe instead of this sharp edged warn, we have a config value? // TODO: check config for if we should deny or allow this - Err(anyhow::anyhow!("permission denied")) - + Err(FrontendErrorResponse::AccessDenied) // // TODO: make this a flag // warn!("allowing without auth during development!"); // Ok(x.parse()?)