clones to avoid deadlock

This commit is contained in:
Bryan Stitt 2022-12-05 16:06:28 -08:00
parent f010166ee0
commit 33f7256236
3 changed files with 105 additions and 71 deletions

View File

@ -453,8 +453,9 @@ impl Web3Connections {
maybe_head_block = parent_block;
continue;
} else {
// TODO: this message
warn!(
"no parent to check. soft limit only {}/{} from {}/{} rpcs: {}%",
"soft limit {}/{} from {}/{} rpcs: {}%",
highest_rpcs_sum_soft_limit,
self.min_sum_soft_limit,
highest_rpcs.len(),

View File

@ -27,7 +27,7 @@ use thread_fast_rng::thread_fast_rng;
use tokio::sync::{broadcast, oneshot, RwLock as AsyncRwLock};
use tokio::time::{interval, sleep, sleep_until, timeout, Duration, Instant, MissedTickBehavior};
#[derive(Debug)]
#[derive(Clone, Debug)]
pub enum ProviderState {
None,
NotReady(Arc<Web3Provider>),
@ -365,8 +365,6 @@ impl Web3Connection {
sleep(retry_in).await;
}
info!("connected to {}", self);
Ok(())
}
@ -377,9 +375,12 @@ impl Web3Connection {
chain_id: u64,
db_conn: Option<&DatabaseConnection>,
) -> anyhow::Result<()> {
trace!("provider_state {} locking...", self);
let mut provider_state = self.provider_state.write().await;
trace!("provider_state {} locked: {:?}", self, provider_state);
// trace!("provider_state {} locking...", self);
let mut provider_state = self
.provider_state
.try_write()
.context("locking provider for write")?;
// trace!("provider_state {} locked: {:?}", self, provider_state);
match &*provider_state {
ProviderState::None => {
@ -391,18 +392,18 @@ impl Web3Connection {
return Ok(());
}
trace!("Reconnecting to {}", self);
debug!("reconnecting to {}", self);
// disconnect the current provider
*provider_state = ProviderState::None;
// reset sync status
trace!("locking head block on {}", self);
// trace!("locking head block on {}", self);
{
let mut head_block = self.head_block.write();
*head_block = None;
}
trace!("done with head block on {}", self);
// trace!("done with head block on {}", self);
// tell the block subscriber that we don't have any blocks
if let Some(block_sender) = block_sender {
@ -414,15 +415,15 @@ impl Web3Connection {
}
}
trace!("Creating new Web3Provider on {}", self);
// trace!("Creating new Web3Provider on {}", self);
// TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again!
let new_provider = Web3Provider::from_str(&self.url, self.http_client.clone()).await?;
// TODO: if an error happens,
// trace!("saving provider state as NotReady on {}", self);
*provider_state = ProviderState::NotReady(Arc::new(new_provider));
// drop the lock so that we can get a request handle
trace!("provider_state {} unlocked", self);
// trace!("provider_state {} unlocked", self);
drop(provider_state);
let authorization = Arc::new(Authorization::internal(db_conn.cloned())?);
@ -430,7 +431,7 @@ impl Web3Connection {
// check the server's chain_id here
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
// TODO: what should the timeout be? should there be a request timeout?
trace!("waiting on chain id for {}", self);
// trace!("waiting on chain id for {}", self);
let found_chain_id: Result<U64, _> = self
.wait_for_request_handle(&authorization, Duration::from_secs(30), true)
.await?
@ -440,8 +441,7 @@ impl Web3Connection {
Level::Trace.into(),
)
.await;
trace!("found_chain_id: {:?}", found_chain_id);
// trace!("found_chain_id: {:?}", found_chain_id);
match found_chain_id {
Ok(found_chain_id) => {
@ -468,7 +468,9 @@ impl Web3Connection {
self.check_block_data_limit(&authorization).await?;
{
// trace!("locking for ready...");
let mut provider_state = self.provider_state.write().await;
// trace!("locked for ready...");
// TODO: do this without a clone
let ready_provider = provider_state
@ -478,6 +480,7 @@ impl Web3Connection {
.clone();
*provider_state = ProviderState::Ready(ready_provider);
// trace!("unlocked for ready...");
}
info!("successfully connected to {}", self);
@ -595,13 +598,16 @@ impl Web3Connection {
// wait before doing the initial health check
// TODO: how often?
// TODO: subscribe to self.head_block
let health_sleep_seconds = 10;
sleep(Duration::from_secs(health_sleep_seconds)).await;
let mut warned = 0;
loop {
// TODO: what if we just happened to have this check line up with another restart?
// TODO: think more about this
trace!("health check on {}", conn);
// trace!("health check on {}. locking...", conn);
if conn
.provider_state
.read()
@ -610,15 +616,37 @@ impl Web3Connection {
.await
.is_none()
{
// trace!("health check unlocked with error on {}", conn);
// returning error will trigger a reconnect
return Err(anyhow::anyhow!("{} is not ready", conn));
}
// trace!("health check on {}. unlocked", conn);
if let Some(x) = &*conn.head_block.read() {
// if this block is too old, return an error so we reconnect
if x.lag() > 0 {
// TODO: instead of a full reconnect, we should maybe just set it to None
return Err(anyhow::anyhow!("{} is lagged: {:?}", conn, x));
let current_lag = x.lag();
if current_lag > 0 {
let level = if warned == 0 {
log::Level::Warn
} else if current_lag % 1000 == 0 {
log::Level::Debug
} else {
log::Level::Trace
};
log::log!(
level,
"{} is lagged {} secs: {} {}",
conn,
current_lag,
x.number(),
x.hash(),
);
warned += 1;
} else {
// reset warnings now that we are connected
warned = 0;
}
}
@ -658,7 +686,7 @@ impl Web3Connection {
}
Err(err) => {
if reconnect {
warn!("{} connected ended. err={:?}", self, err);
warn!("{} connection ended. err={:?}", self, err);
self.clone()
.retrying_connect(
@ -691,7 +719,16 @@ impl Web3Connection {
) -> anyhow::Result<()> {
trace!("watching new heads on {}", self);
if let ProviderState::Ready(provider) = &*self.provider_state.read().await {
// trace!("locking on new heads");
let provider_state = self
.provider_state
.try_read()
.context("subscribe_new_heads")?
.clone();
// trace!("unlocked on new heads");
// TODO: need a timeout
if let ProviderState::Ready(provider) = provider_state {
match provider.as_ref() {
Web3Provider::Mock => unimplemented!(),
Web3Provider::Http(_provider) => {
@ -863,7 +900,12 @@ impl Web3Connection {
authorization: Arc<Authorization>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
) -> anyhow::Result<()> {
if let ProviderState::Ready(provider) = &*self.provider_state.read().await {
if let ProviderState::Ready(provider) = self
.provider_state
.try_read()
.context("subscribe_pending_transactions")?
.clone()
{
trace!("watching pending transactions on {}", self);
match provider.as_ref() {
Web3Provider::Mock => unimplemented!(),
@ -974,18 +1016,6 @@ impl Web3Connection {
// TODO? ready_provider: Option<&Arc<Web3Provider>>,
allow_not_ready: bool,
) -> anyhow::Result<OpenRequestResult> {
if self
.provider_state
.read()
.await
.provider(allow_not_ready)
.await
.is_none()
{
// TODO: emit a stat?
return Ok(OpenRequestResult::NotReady);
}
// check rate limits
if let Some(ratelimiter) = self.hard_limit.as_ref() {
// TODO: how should we know if we should set expire or not?
@ -1008,7 +1038,7 @@ impl Web3Connection {
}
};
let handle = OpenRequestHandle::new(authorization.clone(), self.clone());
let handle = OpenRequestHandle::new(authorization.clone(), self.clone()).await;
Ok(OpenRequestResult::Handle(handle))
}

View File

@ -8,7 +8,7 @@ use entities::revert_log;
use entities::sea_orm_active_enums::Method;
use ethers::providers::{HttpClientError, ProviderError, WsClientError};
use ethers::types::{Address, Bytes};
use log::{debug, error, trace, warn, Level};
use log::{debug, error, info, trace, warn, Level};
use metered::metered;
use metered::HitCount;
use metered::ResponseTime;
@ -37,6 +37,7 @@ pub struct OpenRequestHandle {
conn: Arc<Web3Connection>,
// TODO: this is the same metrics on the conn. use a reference?
metrics: Arc<OpenRequestHandleMetrics>,
provider: Arc<Web3Provider>,
used: AtomicBool,
}
@ -129,7 +130,7 @@ impl Authorization {
#[metered(registry = OpenRequestHandleMetrics, visibility = pub)]
impl OpenRequestHandle {
pub fn new(authorization: Arc<Authorization>, conn: Arc<Web3Connection>) -> Self {
pub async fn new(authorization: Arc<Authorization>, conn: Arc<Web3Connection>) -> Self {
// TODO: take request_id as an argument?
// TODO: attach a unique id to this? customer requests have one, but not internal queries
// TODO: what ordering?!
@ -137,6 +138,38 @@ impl OpenRequestHandle {
// TODO: these should maybe be sent to an influxdb instance?
conn.active_requests.fetch_add(1, atomic::Ordering::Relaxed);
let mut provider = None;
let mut logged = false;
while provider.is_none() {
// trace!("waiting on provider: locking...");
let ready_provider = conn
.provider_state
.read()
.await
// TODO: hard code true, or take a bool in the `new` function?
.provider(true)
.await
.cloned();
// trace!("waiting on provider: unlocked!");
match ready_provider {
None => {
if !logged {
logged = true;
warn!("no provider for {}!", conn);
}
// TODO: how should this work? a reconnect should be in progress. but maybe force one now?
// TODO: sleep how long? subscribe to something instead? maybe use a watch handle?
// TODO: this is going to be way too verbose!
sleep(Duration::from_millis(100)).await
}
Some(x) => provider = Some(x),
}
}
let provider = provider.expect("provider was checked already");
// TODO: handle overflows?
// TODO: what ordering?
match authorization.as_ref().authorization_type {
@ -157,6 +190,7 @@ impl OpenRequestHandle {
authorization,
conn,
metrics,
provider,
used,
}
}
@ -193,41 +227,10 @@ impl OpenRequestHandle {
// the authorization field is already on a parent span
// trace!(rpc=%self.conn, %method, "request");
let mut provider = None;
let mut logged = false;
while provider.is_none() {
let ready_provider = self
.conn
.provider_state
.read()
.await
// TODO: hard code true, or take a bool in the `new` function?
.provider(true)
.await
.cloned();
match ready_provider {
None => {
if !logged {
logged = true;
warn!("no provider for {}!", self.conn);
}
// TODO: how should this work? a reconnect should be in progress. but maybe force one now?
// TODO: sleep how long? subscribe to something instead? maybe use a watch handle?
// TODO: this is going to be way too verbose!
sleep(Duration::from_millis(100)).await
}
Some(x) => provider = Some(x),
}
}
let provider = provider.expect("provider was checked already");
// trace!("got provider for {:?}", self);
// TODO: really sucks that we have to clone here
let response = match &*provider {
let response = match &*self.provider {
Web3Provider::Mock => unimplemented!(),
Web3Provider::Http(provider) => provider.request(method, params).await,
Web3Provider::Ws(provider) => provider.request(method, params).await,
@ -273,7 +276,7 @@ impl OpenRequestHandle {
// check for "execution reverted" here
let is_revert = if let ProviderError::JsonRpcClientError(err) = err {
// Http and Ws errors are very similar, but different types
let msg = match &*provider {
let msg = match &*self.provider {
Web3Provider::Mock => unimplemented!(),
Web3Provider::Http(_) => {
if let Some(HttpClientError::JsonRpcError(err)) =