From 33f7256236fdb664ae375ffb3316bdc673ffa587 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 5 Dec 2022 16:06:28 -0800 Subject: [PATCH] clones to avoid deadlock --- web3_proxy/src/rpcs/blockchain.rs | 3 +- web3_proxy/src/rpcs/connection.rs | 100 +++++++++++++++++++----------- web3_proxy/src/rpcs/request.rs | 73 +++++++++++----------- 3 files changed, 105 insertions(+), 71 deletions(-) diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 638c55c8..e727bde0 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -453,8 +453,9 @@ impl Web3Connections { maybe_head_block = parent_block; continue; } else { + // TODO: this message warn!( - "no parent to check. soft limit only {}/{} from {}/{} rpcs: {}%", + "soft limit {}/{} from {}/{} rpcs: {}%", highest_rpcs_sum_soft_limit, self.min_sum_soft_limit, highest_rpcs.len(), diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 3e7cf6bd..b24d2dbf 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -27,7 +27,7 @@ use thread_fast_rng::thread_fast_rng; use tokio::sync::{broadcast, oneshot, RwLock as AsyncRwLock}; use tokio::time::{interval, sleep, sleep_until, timeout, Duration, Instant, MissedTickBehavior}; -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum ProviderState { None, NotReady(Arc), @@ -365,8 +365,6 @@ impl Web3Connection { sleep(retry_in).await; } - info!("connected to {}", self); - Ok(()) } @@ -377,9 +375,12 @@ impl Web3Connection { chain_id: u64, db_conn: Option<&DatabaseConnection>, ) -> anyhow::Result<()> { - trace!("provider_state {} locking...", self); - let mut provider_state = self.provider_state.write().await; - trace!("provider_state {} locked: {:?}", self, provider_state); + // trace!("provider_state {} locking...", self); + let mut provider_state = self + .provider_state + .try_write() + .context("locking provider for write")?; + // trace!("provider_state {} locked: {:?}", self, provider_state); match &*provider_state { ProviderState::None => { @@ -391,18 +392,18 @@ impl Web3Connection { return Ok(()); } - trace!("Reconnecting to {}", self); + debug!("reconnecting to {}", self); // disconnect the current provider *provider_state = ProviderState::None; // reset sync status - trace!("locking head block on {}", self); + // trace!("locking head block on {}", self); { let mut head_block = self.head_block.write(); *head_block = None; } - trace!("done with head block on {}", self); + // trace!("done with head block on {}", self); // tell the block subscriber that we don't have any blocks if let Some(block_sender) = block_sender { @@ -414,15 +415,15 @@ impl Web3Connection { } } - trace!("Creating new Web3Provider on {}", self); + // trace!("Creating new Web3Provider on {}", self); // TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again! let new_provider = Web3Provider::from_str(&self.url, self.http_client.clone()).await?; - // TODO: if an error happens, + // trace!("saving provider state as NotReady on {}", self); *provider_state = ProviderState::NotReady(Arc::new(new_provider)); // drop the lock so that we can get a request handle - trace!("provider_state {} unlocked", self); + // trace!("provider_state {} unlocked", self); drop(provider_state); let authorization = Arc::new(Authorization::internal(db_conn.cloned())?); @@ -430,7 +431,7 @@ impl Web3Connection { // check the server's chain_id here // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error // TODO: what should the timeout be? should there be a request timeout? - trace!("waiting on chain id for {}", self); + // trace!("waiting on chain id for {}", self); let found_chain_id: Result = self .wait_for_request_handle(&authorization, Duration::from_secs(30), true) .await? @@ -440,8 +441,7 @@ impl Web3Connection { Level::Trace.into(), ) .await; - - trace!("found_chain_id: {:?}", found_chain_id); + // trace!("found_chain_id: {:?}", found_chain_id); match found_chain_id { Ok(found_chain_id) => { @@ -468,7 +468,9 @@ impl Web3Connection { self.check_block_data_limit(&authorization).await?; { + // trace!("locking for ready..."); let mut provider_state = self.provider_state.write().await; + // trace!("locked for ready..."); // TODO: do this without a clone let ready_provider = provider_state @@ -478,6 +480,7 @@ impl Web3Connection { .clone(); *provider_state = ProviderState::Ready(ready_provider); + // trace!("unlocked for ready..."); } info!("successfully connected to {}", self); @@ -595,13 +598,16 @@ impl Web3Connection { // wait before doing the initial health check // TODO: how often? + // TODO: subscribe to self.head_block let health_sleep_seconds = 10; sleep(Duration::from_secs(health_sleep_seconds)).await; + let mut warned = 0; + loop { // TODO: what if we just happened to have this check line up with another restart? // TODO: think more about this - trace!("health check on {}", conn); + // trace!("health check on {}. locking...", conn); if conn .provider_state .read() @@ -610,15 +616,37 @@ impl Web3Connection { .await .is_none() { + // trace!("health check unlocked with error on {}", conn); // returning error will trigger a reconnect return Err(anyhow::anyhow!("{} is not ready", conn)); } + // trace!("health check on {}. unlocked", conn); if let Some(x) = &*conn.head_block.read() { // if this block is too old, return an error so we reconnect - if x.lag() > 0 { - // TODO: instead of a full reconnect, we should maybe just set it to None - return Err(anyhow::anyhow!("{} is lagged: {:?}", conn, x)); + let current_lag = x.lag(); + if current_lag > 0 { + let level = if warned == 0 { + log::Level::Warn + } else if current_lag % 1000 == 0 { + log::Level::Debug + } else { + log::Level::Trace + }; + + log::log!( + level, + "{} is lagged {} secs: {} {}", + conn, + current_lag, + x.number(), + x.hash(), + ); + + warned += 1; + } else { + // reset warnings now that we are connected + warned = 0; } } @@ -658,7 +686,7 @@ impl Web3Connection { } Err(err) => { if reconnect { - warn!("{} connected ended. err={:?}", self, err); + warn!("{} connection ended. err={:?}", self, err); self.clone() .retrying_connect( @@ -691,7 +719,16 @@ impl Web3Connection { ) -> anyhow::Result<()> { trace!("watching new heads on {}", self); - if let ProviderState::Ready(provider) = &*self.provider_state.read().await { + // trace!("locking on new heads"); + let provider_state = self + .provider_state + .try_read() + .context("subscribe_new_heads")? + .clone(); + // trace!("unlocked on new heads"); + + // TODO: need a timeout + if let ProviderState::Ready(provider) = provider_state { match provider.as_ref() { Web3Provider::Mock => unimplemented!(), Web3Provider::Http(_provider) => { @@ -863,7 +900,12 @@ impl Web3Connection { authorization: Arc, tx_id_sender: flume::Sender<(TxHash, Arc)>, ) -> anyhow::Result<()> { - if let ProviderState::Ready(provider) = &*self.provider_state.read().await { + if let ProviderState::Ready(provider) = self + .provider_state + .try_read() + .context("subscribe_pending_transactions")? + .clone() + { trace!("watching pending transactions on {}", self); match provider.as_ref() { Web3Provider::Mock => unimplemented!(), @@ -974,18 +1016,6 @@ impl Web3Connection { // TODO? ready_provider: Option<&Arc>, allow_not_ready: bool, ) -> anyhow::Result { - if self - .provider_state - .read() - .await - .provider(allow_not_ready) - .await - .is_none() - { - // TODO: emit a stat? - return Ok(OpenRequestResult::NotReady); - } - // check rate limits if let Some(ratelimiter) = self.hard_limit.as_ref() { // TODO: how should we know if we should set expire or not? @@ -1008,7 +1038,7 @@ impl Web3Connection { } }; - let handle = OpenRequestHandle::new(authorization.clone(), self.clone()); + let handle = OpenRequestHandle::new(authorization.clone(), self.clone()).await; Ok(OpenRequestResult::Handle(handle)) } diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index ced5362e..e71ad04a 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -8,7 +8,7 @@ use entities::revert_log; use entities::sea_orm_active_enums::Method; use ethers::providers::{HttpClientError, ProviderError, WsClientError}; use ethers::types::{Address, Bytes}; -use log::{debug, error, trace, warn, Level}; +use log::{debug, error, info, trace, warn, Level}; use metered::metered; use metered::HitCount; use metered::ResponseTime; @@ -37,6 +37,7 @@ pub struct OpenRequestHandle { conn: Arc, // TODO: this is the same metrics on the conn. use a reference? metrics: Arc, + provider: Arc, used: AtomicBool, } @@ -129,7 +130,7 @@ impl Authorization { #[metered(registry = OpenRequestHandleMetrics, visibility = pub)] impl OpenRequestHandle { - pub fn new(authorization: Arc, conn: Arc) -> Self { + pub async fn new(authorization: Arc, conn: Arc) -> Self { // TODO: take request_id as an argument? // TODO: attach a unique id to this? customer requests have one, but not internal queries // TODO: what ordering?! @@ -137,6 +138,38 @@ impl OpenRequestHandle { // TODO: these should maybe be sent to an influxdb instance? conn.active_requests.fetch_add(1, atomic::Ordering::Relaxed); + let mut provider = None; + let mut logged = false; + while provider.is_none() { + // trace!("waiting on provider: locking..."); + + let ready_provider = conn + .provider_state + .read() + .await + // TODO: hard code true, or take a bool in the `new` function? + .provider(true) + .await + .cloned(); + // trace!("waiting on provider: unlocked!"); + + match ready_provider { + None => { + if !logged { + logged = true; + warn!("no provider for {}!", conn); + } + + // TODO: how should this work? a reconnect should be in progress. but maybe force one now? + // TODO: sleep how long? subscribe to something instead? maybe use a watch handle? + // TODO: this is going to be way too verbose! + sleep(Duration::from_millis(100)).await + } + Some(x) => provider = Some(x), + } + } + let provider = provider.expect("provider was checked already"); + // TODO: handle overflows? // TODO: what ordering? match authorization.as_ref().authorization_type { @@ -157,6 +190,7 @@ impl OpenRequestHandle { authorization, conn, metrics, + provider, used, } } @@ -193,41 +227,10 @@ impl OpenRequestHandle { // the authorization field is already on a parent span // trace!(rpc=%self.conn, %method, "request"); - let mut provider = None; - let mut logged = false; - while provider.is_none() { - let ready_provider = self - .conn - .provider_state - .read() - .await - // TODO: hard code true, or take a bool in the `new` function? - .provider(true) - .await - .cloned(); - - match ready_provider { - None => { - if !logged { - logged = true; - warn!("no provider for {}!", self.conn); - } - - // TODO: how should this work? a reconnect should be in progress. but maybe force one now? - // TODO: sleep how long? subscribe to something instead? maybe use a watch handle? - // TODO: this is going to be way too verbose! - sleep(Duration::from_millis(100)).await - } - Some(x) => provider = Some(x), - } - } - - let provider = provider.expect("provider was checked already"); - // trace!("got provider for {:?}", self); // TODO: really sucks that we have to clone here - let response = match &*provider { + let response = match &*self.provider { Web3Provider::Mock => unimplemented!(), Web3Provider::Http(provider) => provider.request(method, params).await, Web3Provider::Ws(provider) => provider.request(method, params).await, @@ -273,7 +276,7 @@ impl OpenRequestHandle { // check for "execution reverted" here let is_revert = if let ProviderError::JsonRpcClientError(err) = err { // Http and Ws errors are very similar, but different types - let msg = match &*provider { + let msg = match &*self.provider { Web3Provider::Mock => unimplemented!(), Web3Provider::Http(_) => { if let Some(HttpClientError::JsonRpcError(err)) =