instrument more. add max_wait to wait_for_request_handle
This commit is contained in:
parent
28a4438c0c
commit
739947792a
|
@ -8,7 +8,7 @@ use std::sync::atomic::Ordering;
|
||||||
use std::sync::{atomic::AtomicU64, Arc};
|
use std::sync::{atomic::AtomicU64, Arc};
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tokio::time::{Duration, Instant};
|
use tokio::time::{Duration, Instant};
|
||||||
use tracing::error;
|
use tracing::{error, info_span, Instrument};
|
||||||
|
|
||||||
/// A local cache that sits in front of a RedisRateLimiter
|
/// A local cache that sits in front of a RedisRateLimiter
|
||||||
/// Generic accross the key so it is simple to use with IPs or user keys
|
/// Generic accross the key so it is simple to use with IPs or user keys
|
||||||
|
@ -62,7 +62,7 @@ where
|
||||||
return Ok(DeferredRateLimitResult::RetryNever);
|
return Ok(DeferredRateLimitResult::RetryNever);
|
||||||
}
|
}
|
||||||
|
|
||||||
let arc_deferred_rate_limit_result = Arc::new(Mutex::new(None));
|
let deferred_rate_limit_result = Arc::new(Mutex::new(None));
|
||||||
|
|
||||||
let redis_key = format!("{}:{}", self.prefix, key);
|
let redis_key = format!("{}:{}", self.prefix, key);
|
||||||
|
|
||||||
|
@ -70,7 +70,7 @@ where
|
||||||
// TODO: i'm sure this could be a lot better. but race conditions make this hard to think through. brain needs sleep
|
// TODO: i'm sure this could be a lot better. but race conditions make this hard to think through. brain needs sleep
|
||||||
let local_key_count: Arc<AtomicU64> = {
|
let local_key_count: Arc<AtomicU64> = {
|
||||||
// clone things outside of the `async move`
|
// clone things outside of the `async move`
|
||||||
let deferred_rate_limit_result = arc_deferred_rate_limit_result.clone();
|
let deferred_rate_limit_result = deferred_rate_limit_result.clone();
|
||||||
let redis_key = redis_key.clone();
|
let redis_key = redis_key.clone();
|
||||||
let rrl = Arc::new(self.rrl.clone());
|
let rrl = Arc::new(self.rrl.clone());
|
||||||
|
|
||||||
|
@ -118,7 +118,7 @@ where
|
||||||
.await
|
.await
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut locked = arc_deferred_rate_limit_result.lock().await;
|
let mut locked = deferred_rate_limit_result.lock().await;
|
||||||
|
|
||||||
if let Some(deferred_rate_limit_result) = locked.take() {
|
if let Some(deferred_rate_limit_result) = locked.take() {
|
||||||
// new entry. redis was already incremented
|
// new entry. redis was already incremented
|
||||||
|
@ -184,8 +184,11 @@ where
|
||||||
// close to period. don't risk it. wait on redis
|
// close to period. don't risk it. wait on redis
|
||||||
Ok(rate_limit_f.await)
|
Ok(rate_limit_f.await)
|
||||||
} else {
|
} else {
|
||||||
|
// TODO: pass the frontend request id through
|
||||||
|
let span = info_span!("deferred rate limit");
|
||||||
|
|
||||||
// rate limit has enough headroom that it should be safe to do this in the background
|
// rate limit has enough headroom that it should be safe to do this in the background
|
||||||
tokio::spawn(rate_limit_f);
|
tokio::spawn(rate_limit_f.instrument(span));
|
||||||
|
|
||||||
Ok(DeferredRateLimitResult::Allowed)
|
Ok(DeferredRateLimitResult::Allowed)
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,7 +86,11 @@ impl RedisRateLimiter {
|
||||||
// TODO: include max per period in the throttle key?
|
// TODO: include max per period in the throttle key?
|
||||||
let throttle_key = format!("{}:{}:{}", self.key_prefix, label, period_id);
|
let throttle_key = format!("{}:{}:{}", self.key_prefix, label, period_id);
|
||||||
|
|
||||||
let mut conn = self.pool.get().await.context("throttle")?;
|
let mut conn = self
|
||||||
|
.pool
|
||||||
|
.get()
|
||||||
|
.await
|
||||||
|
.context("get redis connection for rate limits")?;
|
||||||
|
|
||||||
// TODO: at high concurency, i think this is giving errors
|
// TODO: at high concurency, i think this is giving errors
|
||||||
// TODO: i'm starting to think that bb8 has a bug
|
// TODO: i'm starting to think that bb8 has a bug
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use derive_more::From;
|
use derive_more::From;
|
||||||
use ethers::prelude::{HttpClientError, ProviderError, WsClientError};
|
use ethers::prelude::{HttpClientError, ProviderError, WsClientError};
|
||||||
use serde::de::{self, Deserialize, Deserializer, MapAccess, SeqAccess, Visitor};
|
use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor};
|
||||||
use serde::Serialize;
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::value::RawValue;
|
use serde_json::value::RawValue;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
|
||||||
|
@ -11,7 +11,7 @@ fn default_jsonrpc() -> String {
|
||||||
"2.0".to_string()
|
"2.0".to_string()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, serde::Deserialize)]
|
#[derive(Clone, Deserialize)]
|
||||||
pub struct JsonRpcRequest {
|
pub struct JsonRpcRequest {
|
||||||
// TODO: skip jsonrpc entirely? its against spec to drop it, but some servers bad
|
// TODO: skip jsonrpc entirely? its against spec to drop it, but some servers bad
|
||||||
#[serde(default = "default_jsonrpc")]
|
#[serde(default = "default_jsonrpc")]
|
||||||
|
@ -45,7 +45,7 @@ impl<'de> Deserialize<'de> for JsonRpcRequestEnum {
|
||||||
where
|
where
|
||||||
D: Deserializer<'de>,
|
D: Deserializer<'de>,
|
||||||
{
|
{
|
||||||
#[derive(serde::Deserialize)]
|
#[derive(Deserialize)]
|
||||||
#[serde(field_identifier, rename_all = "lowercase")]
|
#[serde(field_identifier, rename_all = "lowercase")]
|
||||||
enum Field {
|
enum Field {
|
||||||
JsonRpc,
|
JsonRpc,
|
||||||
|
@ -149,7 +149,7 @@ impl<'de> Deserialize<'de> for JsonRpcRequestEnum {
|
||||||
|
|
||||||
// TODO: impl Error on this?
|
// TODO: impl Error on this?
|
||||||
/// All jsonrpc errors use this structure
|
/// All jsonrpc errors use this structure
|
||||||
#[derive(Serialize, Clone)]
|
#[derive(Deserialize, Serialize, Clone)]
|
||||||
pub struct JsonRpcErrorData {
|
pub struct JsonRpcErrorData {
|
||||||
/// The error code
|
/// The error code
|
||||||
pub code: i64,
|
pub code: i64,
|
||||||
|
@ -161,7 +161,7 @@ pub struct JsonRpcErrorData {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A complete response
|
/// A complete response
|
||||||
#[derive(Clone, Serialize)]
|
#[derive(Clone, Deserialize, Serialize)]
|
||||||
pub struct JsonRpcForwardedResponse {
|
pub struct JsonRpcForwardedResponse {
|
||||||
// TODO: jsonrpc a &str?
|
// TODO: jsonrpc a &str?
|
||||||
#[serde(default = "default_jsonrpc")]
|
#[serde(default = "default_jsonrpc")]
|
||||||
|
|
|
@ -14,6 +14,7 @@ use serde::Serialize;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::{cmp::Ordering, fmt::Display, sync::Arc};
|
use std::{cmp::Ordering, fmt::Display, sync::Arc};
|
||||||
use tokio::sync::{broadcast, watch};
|
use tokio::sync::{broadcast, watch};
|
||||||
|
use tokio::time::Duration;
|
||||||
use tracing::{debug, trace, warn};
|
use tracing::{debug, trace, warn};
|
||||||
|
|
||||||
// TODO: type for Hydrated Blocks with their full transactions?
|
// TODO: type for Hydrated Blocks with their full transactions?
|
||||||
|
@ -87,7 +88,6 @@ impl Web3Connections {
|
||||||
|
|
||||||
/// Get a block from caches with fallback.
|
/// Get a block from caches with fallback.
|
||||||
/// Will query a specific node or the best available.
|
/// Will query a specific node or the best available.
|
||||||
/// WARNING! If rpc is specified, this may wait forever. be sure this runs with your own timeout
|
|
||||||
pub async fn block(
|
pub async fn block(
|
||||||
&self,
|
&self,
|
||||||
hash: &H256,
|
hash: &H256,
|
||||||
|
@ -104,7 +104,7 @@ impl Web3Connections {
|
||||||
// TODO: if error, retry?
|
// TODO: if error, retry?
|
||||||
let block: Block<TxHash> = match rpc {
|
let block: Block<TxHash> = match rpc {
|
||||||
Some(rpc) => {
|
Some(rpc) => {
|
||||||
rpc.wait_for_request_handle()
|
rpc.wait_for_request_handle(Duration::from_secs(30))
|
||||||
.await?
|
.await?
|
||||||
.request("eth_getBlockByHash", get_block_params, false)
|
.request("eth_getBlockByHash", get_block_params, false)
|
||||||
.await?
|
.await?
|
||||||
|
|
|
@ -20,7 +20,7 @@ use std::sync::atomic::{self, AtomicU32, AtomicU64};
|
||||||
use std::{cmp::Ordering, sync::Arc};
|
use std::{cmp::Ordering, sync::Arc};
|
||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
use tokio::sync::RwLock as AsyncRwLock;
|
use tokio::sync::RwLock as AsyncRwLock;
|
||||||
use tokio::time::{interval, sleep, sleep_until, Duration, MissedTickBehavior};
|
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
|
||||||
use tracing::{debug, error, info, instrument, trace, warn};
|
use tracing::{debug, error, info, instrument, trace, warn};
|
||||||
|
|
||||||
/// An active connection to a Web3 RPC server like geth or erigon.
|
/// An active connection to a Web3 RPC server like geth or erigon.
|
||||||
|
@ -112,9 +112,9 @@ impl Web3Connection {
|
||||||
// check the server's chain_id here
|
// check the server's chain_id here
|
||||||
// TODO: move this outside the `new` function and into a `start` function or something. that way we can do retries from there
|
// TODO: move this outside the `new` function and into a `start` function or something. that way we can do retries from there
|
||||||
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
|
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
|
||||||
// TODO: this will wait forever. do we want that?
|
// TODO: what should the timeout be?
|
||||||
let found_chain_id: Result<U64, _> = new_connection
|
let found_chain_id: Result<U64, _> = new_connection
|
||||||
.wait_for_request_handle()
|
.wait_for_request_handle(Duration::from_secs(30))
|
||||||
.await?
|
.await?
|
||||||
.request("eth_chainId", Option::None::<()>, false)
|
.request("eth_chainId", Option::None::<()>, false)
|
||||||
.await;
|
.await;
|
||||||
|
@ -199,8 +199,9 @@ impl Web3Connection {
|
||||||
.max(U64::one());
|
.max(U64::one());
|
||||||
|
|
||||||
// TODO: wait for the handle BEFORE we check the current block number. it might be delayed too!
|
// TODO: wait for the handle BEFORE we check the current block number. it might be delayed too!
|
||||||
|
// TODO: what should the request be?
|
||||||
let archive_result: Result<Bytes, _> = self
|
let archive_result: Result<Bytes, _> = self
|
||||||
.wait_for_request_handle()
|
.wait_for_request_handle(Duration::from_secs(30))
|
||||||
.await?
|
.await?
|
||||||
.request(
|
.request(
|
||||||
"eth_getCode",
|
"eth_getCode",
|
||||||
|
@ -532,8 +533,8 @@ impl Web3Connection {
|
||||||
let mut last_hash = H256::zero();
|
let mut last_hash = H256::zero();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// TODO: try, or wait_for?
|
// TODO: what should the max_wait be?
|
||||||
match self.wait_for_request_handle().await {
|
match self.wait_for_request_handle(Duration::from_secs(30)).await {
|
||||||
Ok(active_request_handle) => {
|
Ok(active_request_handle) => {
|
||||||
let block: Result<Block<TxHash>, _> = active_request_handle
|
let block: Result<Block<TxHash>, _> = active_request_handle
|
||||||
.request("eth_getBlockByNumber", ("latest", false), false)
|
.request("eth_getBlockByNumber", ("latest", false), false)
|
||||||
|
@ -596,7 +597,8 @@ impl Web3Connection {
|
||||||
}
|
}
|
||||||
Web3Provider::Ws(provider) => {
|
Web3Provider::Ws(provider) => {
|
||||||
// todo: move subscribe_blocks onto the request handle?
|
// todo: move subscribe_blocks onto the request handle?
|
||||||
let active_request_handle = self.wait_for_request_handle().await;
|
let active_request_handle =
|
||||||
|
self.wait_for_request_handle(Duration::from_secs(30)).await;
|
||||||
let mut stream = provider.subscribe_blocks().await?;
|
let mut stream = provider.subscribe_blocks().await?;
|
||||||
drop(active_request_handle);
|
drop(active_request_handle);
|
||||||
|
|
||||||
|
@ -604,7 +606,7 @@ impl Web3Connection {
|
||||||
// there is a very small race condition here where the stream could send us a new block right now
|
// there is a very small race condition here where the stream could send us a new block right now
|
||||||
// all it does is print "new block" for the same block as current block
|
// all it does is print "new block" for the same block as current block
|
||||||
let block: Result<Option<ArcBlock>, _> = self
|
let block: Result<Option<ArcBlock>, _> = self
|
||||||
.wait_for_request_handle()
|
.wait_for_request_handle(Duration::from_secs(30))
|
||||||
.await?
|
.await?
|
||||||
.request("eth_getBlockByNumber", ("latest", false), false)
|
.request("eth_getBlockByNumber", ("latest", false), false)
|
||||||
.await
|
.await
|
||||||
|
@ -691,7 +693,8 @@ impl Web3Connection {
|
||||||
}
|
}
|
||||||
Web3Provider::Ws(provider) => {
|
Web3Provider::Ws(provider) => {
|
||||||
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
|
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
|
||||||
let active_request_handle = self.wait_for_request_handle().await;
|
let active_request_handle =
|
||||||
|
self.wait_for_request_handle(Duration::from_secs(30)).await;
|
||||||
|
|
||||||
let mut stream = provider.subscribe_pending_txs().await?;
|
let mut stream = provider.subscribe_pending_txs().await?;
|
||||||
|
|
||||||
|
@ -718,10 +721,12 @@ impl Web3Connection {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// be careful with this; it might wait forever!
|
/// be careful with this; it might wait forever!
|
||||||
// TODO: maximum wait time?
|
|
||||||
#[instrument]
|
#[instrument]
|
||||||
pub async fn wait_for_request_handle(self: &Arc<Self>) -> anyhow::Result<OpenRequestHandle> {
|
pub async fn wait_for_request_handle(
|
||||||
// TODO: maximum wait time? i think timeouts in other parts of the code are probably best
|
self: &Arc<Self>,
|
||||||
|
max_wait: Duration,
|
||||||
|
) -> anyhow::Result<OpenRequestHandle> {
|
||||||
|
let max_wait = Instant::now() + max_wait;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let x = self.try_request_handle().await;
|
let x = self.try_request_handle().await;
|
||||||
|
@ -733,13 +738,18 @@ impl Web3Connection {
|
||||||
Ok(OpenRequestResult::RetryAt(retry_at)) => {
|
Ok(OpenRequestResult::RetryAt(retry_at)) => {
|
||||||
// TODO: emit a stat?
|
// TODO: emit a stat?
|
||||||
trace!(?retry_at);
|
trace!(?retry_at);
|
||||||
|
|
||||||
|
if retry_at > max_wait {
|
||||||
|
// break now since we will wait past our maximum wait time
|
||||||
|
return Err(anyhow::anyhow!("timeout waiting for request handle"));
|
||||||
|
}
|
||||||
sleep_until(retry_at).await;
|
sleep_until(retry_at).await;
|
||||||
}
|
}
|
||||||
Ok(OpenRequestResult::RetryNever) => {
|
Ok(OpenRequestResult::RetryNever) => {
|
||||||
// TODO: when can this happen? log? emit a stat?
|
// TODO: when can this happen? log? emit a stat?
|
||||||
// TODO: subscribe to the head block on this
|
// TODO: subscribe to the head block on this
|
||||||
// TODO: sleep how long? maybe just error?
|
// TODO: sleep how long? maybe just error?
|
||||||
return Err(anyhow::anyhow!("unable to retry"));
|
return Err(anyhow::anyhow!("unable to retry for request handle"));
|
||||||
}
|
}
|
||||||
Err(err) => return Err(err),
|
Err(err) => return Err(err),
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,13 +1,14 @@
|
||||||
use super::connection::Web3Connection;
|
use super::connection::Web3Connection;
|
||||||
use super::provider::Web3Provider;
|
use super::provider::Web3Provider;
|
||||||
|
use ethers::providers::ProviderError;
|
||||||
use metered::metered;
|
use metered::metered;
|
||||||
use metered::ErrorCount;
|
use metered::ErrorCount;
|
||||||
use metered::HitCount;
|
use metered::HitCount;
|
||||||
use metered::ResponseTime;
|
use metered::ResponseTime;
|
||||||
use metered::Throughput;
|
use metered::Throughput;
|
||||||
|
use parking_lot::Mutex;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::sync::atomic;
|
use std::sync::atomic;
|
||||||
use std::sync::atomic::AtomicBool;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::time::{sleep, Duration, Instant};
|
use tokio::time::{sleep, Duration, Instant};
|
||||||
use tracing::warn;
|
use tracing::warn;
|
||||||
|
@ -25,20 +26,20 @@ pub enum OpenRequestResult {
|
||||||
/// Make RPC requests through this handle and drop it when you are done.
|
/// Make RPC requests through this handle and drop it when you are done.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct OpenRequestHandle {
|
pub struct OpenRequestHandle {
|
||||||
conn: Arc<Web3Connection>,
|
conn: Mutex<Option<Arc<Web3Connection>>>,
|
||||||
// TODO: this is the same metrics on the conn. use a reference
|
// TODO: this is the same metrics on the conn. use a reference?
|
||||||
metrics: Arc<OpenRequestHandleMetrics>,
|
metrics: Arc<OpenRequestHandleMetrics>,
|
||||||
decremented: AtomicBool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[metered(registry = OpenRequestHandleMetrics, visibility = pub)]
|
#[metered(registry = OpenRequestHandleMetrics, visibility = pub)]
|
||||||
impl OpenRequestHandle {
|
impl OpenRequestHandle {
|
||||||
pub fn new(conn: Arc<Web3Connection>) -> Self {
|
pub fn new(conn: Arc<Web3Connection>) -> Self {
|
||||||
|
// TODO: take request_id as an argument?
|
||||||
// TODO: attach a unique id to this? customer requests have one, but not internal queries
|
// TODO: attach a unique id to this? customer requests have one, but not internal queries
|
||||||
// TODO: what ordering?!
|
// TODO: what ordering?!
|
||||||
// TODO: should we be using metered, or not? i think not because we want stats for each handle
|
// TODO: should we be using metered, or not? i think not because we want stats for each handle
|
||||||
// TODO: these should maybe be sent to an influxdb instance?
|
// TODO: these should maybe be sent to an influxdb instance?
|
||||||
conn.active_requests.fetch_add(1, atomic::Ordering::AcqRel);
|
conn.active_requests.fetch_add(1, atomic::Ordering::Relaxed);
|
||||||
|
|
||||||
// TODO: handle overflows?
|
// TODO: handle overflows?
|
||||||
// TODO: what ordering?
|
// TODO: what ordering?
|
||||||
|
@ -46,22 +47,21 @@ impl OpenRequestHandle {
|
||||||
|
|
||||||
let metrics = conn.open_request_handle_metrics.clone();
|
let metrics = conn.open_request_handle_metrics.clone();
|
||||||
|
|
||||||
let decremented = false.into();
|
let conn = Mutex::new(Some(conn));
|
||||||
|
|
||||||
Self {
|
Self { conn, metrics }
|
||||||
conn,
|
|
||||||
metrics,
|
|
||||||
decremented,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn clone_connection(&self) -> Arc<Web3Connection> {
|
pub fn clone_connection(&self) -> Arc<Web3Connection> {
|
||||||
self.conn.clone()
|
if let Some(conn) = self.conn.lock().as_ref() {
|
||||||
|
conn.clone()
|
||||||
|
} else {
|
||||||
|
unimplemented!("this shouldn't happen")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send a web3 request
|
/// Send a web3 request
|
||||||
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
|
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
|
||||||
/// By taking self here, we ensure that this is dropped after the request is complete.
|
|
||||||
/// TODO: we no longer take self because metered doesn't like that
|
/// TODO: we no longer take self because metered doesn't like that
|
||||||
/// TODO: ErrorCount includes too many types of errors, such as transaction reverts
|
/// TODO: ErrorCount includes too many types of errors, such as transaction reverts
|
||||||
#[instrument(skip_all)]
|
#[instrument(skip_all)]
|
||||||
|
@ -70,23 +70,30 @@ impl OpenRequestHandle {
|
||||||
&self,
|
&self,
|
||||||
method: &str,
|
method: &str,
|
||||||
params: T,
|
params: T,
|
||||||
|
// TODO: change this to error_log_level?
|
||||||
silent_errors: bool,
|
silent_errors: bool,
|
||||||
) -> Result<R, ethers::prelude::ProviderError>
|
) -> Result<R, ProviderError>
|
||||||
where
|
where
|
||||||
T: fmt::Debug + serde::Serialize + Send + Sync,
|
T: fmt::Debug + serde::Serialize + Send + Sync,
|
||||||
R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug,
|
R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug,
|
||||||
{
|
{
|
||||||
|
let conn = self
|
||||||
|
.conn
|
||||||
|
.lock()
|
||||||
|
.take()
|
||||||
|
.expect("cannot use request multiple times");
|
||||||
|
|
||||||
// TODO: use tracing spans properly
|
// TODO: use tracing spans properly
|
||||||
// TODO: requests from customers have request ids, but we should add
|
// TODO: requests from customers have request ids, but we should add
|
||||||
// TODO: including params in this is way too verbose
|
// TODO: including params in this is way too verbose
|
||||||
trace!(rpc=%self.conn, %method, "request");
|
trace!(rpc=%conn, %method, "request");
|
||||||
|
|
||||||
let mut provider = None;
|
let mut provider = None;
|
||||||
|
|
||||||
while provider.is_none() {
|
while provider.is_none() {
|
||||||
match self.conn.provider.read().await.as_ref() {
|
match conn.provider.read().await.as_ref() {
|
||||||
None => {
|
None => {
|
||||||
warn!(rpc=%self.conn, "no provider!");
|
warn!(rpc=%conn, "no provider!");
|
||||||
// TODO: how should this work? a reconnect should be in progress. but maybe force one now?
|
// TODO: how should this work? a reconnect should be in progress. but maybe force one now?
|
||||||
// TODO: sleep how long? subscribe to something instead?
|
// TODO: sleep how long? subscribe to something instead?
|
||||||
sleep(Duration::from_millis(100)).await
|
sleep(Duration::from_millis(100)).await
|
||||||
|
@ -100,22 +107,18 @@ impl OpenRequestHandle {
|
||||||
Web3Provider::Ws(provider) => provider.request(method, params).await,
|
Web3Provider::Ws(provider) => provider.request(method, params).await,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.decremented.store(true, atomic::Ordering::Release);
|
// no need to do conn.active_requests.fetch_sub because Drop will do that
|
||||||
self.conn
|
|
||||||
.active_requests
|
|
||||||
.fetch_sub(1, atomic::Ordering::AcqRel);
|
|
||||||
// todo: do something to make sure this doesn't get called again? i miss having the function sig have self
|
|
||||||
|
|
||||||
// TODO: i think ethers already has trace logging (and does it much more fancy)
|
// TODO: i think ethers already has trace logging (and does it much more fancy)
|
||||||
if let Err(err) = &response {
|
if let Err(err) = &response {
|
||||||
if !silent_errors {
|
if !silent_errors {
|
||||||
// TODO: this isn't always bad. missing trie node while we are checking initial
|
// TODO: this isn't always bad. missing trie node while we are checking initial
|
||||||
warn!(?err, %method, rpc=%self.conn, "bad response!");
|
warn!(?err, %method, rpc=%conn, "bad response!");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// TODO: opt-in response inspection to log reverts with their request. put into redis or what?
|
// TODO: opt-in response inspection to log reverts with their request. put into redis or what?
|
||||||
// trace!(rpc=%self.0, %method, ?response);
|
// trace!(rpc=%self.0, %method, ?response);
|
||||||
trace!(%method, rpc=%self.conn, "response");
|
trace!(%method, rpc=%conn, "response");
|
||||||
}
|
}
|
||||||
|
|
||||||
response
|
response
|
||||||
|
@ -124,13 +127,8 @@ impl OpenRequestHandle {
|
||||||
|
|
||||||
impl Drop for OpenRequestHandle {
|
impl Drop for OpenRequestHandle {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if self.decremented.load(atomic::Ordering::Acquire) {
|
if let Some(conn) = self.conn.lock().take() {
|
||||||
// we already decremented from a successful request
|
conn.active_requests.fetch_sub(1, atomic::Ordering::AcqRel);
|
||||||
return;
|
}
|
||||||
}
|
|
||||||
|
|
||||||
self.conn
|
|
||||||
.active_requests
|
|
||||||
.fetch_sub(1, atomic::Ordering::AcqRel);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue