improve health checking
This commit is contained in:
parent
1b144d0aeb
commit
9ba2045dbd
@ -1055,7 +1055,7 @@ impl RpcsForRequest {
|
|||||||
attempted.push(best_rpc);
|
attempted.push(best_rpc);
|
||||||
|
|
||||||
match best_rpc
|
match best_rpc
|
||||||
.try_request_handle(&self.request, error_handler)
|
.try_request_handle(&self.request, error_handler, false)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(OpenRequestResult::Handle(handle)) => {
|
Ok(OpenRequestResult::Handle(handle)) => {
|
||||||
|
@ -26,7 +26,7 @@ use serde_json::json;
|
|||||||
use std::cmp::Reverse;
|
use std::cmp::Reverse;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::hash::{Hash, Hasher};
|
use std::hash::{Hash, Hasher};
|
||||||
use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize};
|
use std::sync::atomic::{self, AtomicBool, AtomicU32, AtomicU64, AtomicUsize};
|
||||||
use std::{cmp::Ordering, sync::Arc};
|
use std::{cmp::Ordering, sync::Arc};
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
use tokio::sync::{mpsc, watch, RwLock as AsyncRwLock};
|
use tokio::sync::{mpsc, watch, RwLock as AsyncRwLock};
|
||||||
@ -89,6 +89,8 @@ pub struct Web3Rpc {
|
|||||||
pub(super) disconnect_watch: Option<watch::Sender<bool>>,
|
pub(super) disconnect_watch: Option<watch::Sender<bool>>,
|
||||||
/// created_at is only inside an Option so that the "Default" derive works. it will always be set.
|
/// created_at is only inside an Option so that the "Default" derive works. it will always be set.
|
||||||
pub(super) created_at: Option<Instant>,
|
pub(super) created_at: Option<Instant>,
|
||||||
|
/// false if a health check has failed
|
||||||
|
pub(super) healthy: AtomicBool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Web3Rpc {
|
impl Web3Rpc {
|
||||||
@ -190,6 +192,9 @@ impl Web3Rpc {
|
|||||||
|
|
||||||
let (disconnect_watch, _) = watch::channel(false);
|
let (disconnect_watch, _) = watch::channel(false);
|
||||||
|
|
||||||
|
// TODO: start optimistically?
|
||||||
|
let healthy = false.into();
|
||||||
|
|
||||||
let new_rpc = Self {
|
let new_rpc = Self {
|
||||||
automatic_block_limit,
|
automatic_block_limit,
|
||||||
backup,
|
backup,
|
||||||
@ -210,6 +215,7 @@ impl Web3Rpc {
|
|||||||
subscribe_txs: config.subscribe_txs,
|
subscribe_txs: config.subscribe_txs,
|
||||||
ws_url,
|
ws_url,
|
||||||
disconnect_watch: Some(disconnect_watch),
|
disconnect_watch: Some(disconnect_watch),
|
||||||
|
healthy,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -591,8 +597,9 @@ impl Web3Rpc {
|
|||||||
*self.disconnect_watch.as_ref().unwrap().borrow()
|
*self.disconnect_watch.as_ref().unwrap().borrow()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn healthcheck(
|
async fn check_health(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
|
detailed_healthcheck: bool,
|
||||||
error_handler: Option<RequestErrorHandler>,
|
error_handler: Option<RequestErrorHandler>,
|
||||||
) -> Web3ProxyResult<()> {
|
) -> Web3ProxyResult<()> {
|
||||||
let head_block = self.head_block_sender.as_ref().unwrap().borrow().clone();
|
let head_block = self.head_block_sender.as_ref().unwrap().borrow().clone();
|
||||||
@ -603,39 +610,41 @@ impl Web3Rpc {
|
|||||||
return Err(anyhow::anyhow!("head_block is too old!").into());
|
return Err(anyhow::anyhow!("head_block is too old!").into());
|
||||||
}
|
}
|
||||||
|
|
||||||
let block_number = head_block.number();
|
if detailed_healthcheck {
|
||||||
|
let block_number = head_block.number();
|
||||||
|
|
||||||
let to = if let Some(txid) = head_block.transactions().last().cloned() {
|
let to = if let Some(txid) = head_block.transactions().last().cloned() {
|
||||||
let tx = self
|
let tx = self
|
||||||
.internal_request::<_, Option<Transaction>>(
|
.internal_request::<_, Option<Transaction>>(
|
||||||
"eth_getTransactionByHash",
|
"eth_getTransactionByHash",
|
||||||
&(txid,),
|
&(txid,),
|
||||||
error_handler,
|
error_handler,
|
||||||
Some(Duration::from_secs(5)),
|
Some(Duration::from_secs(5)),
|
||||||
)
|
)
|
||||||
.await?
|
.await?
|
||||||
.context("no transaction")?;
|
.context("no transaction")?;
|
||||||
|
|
||||||
// TODO: what default? something real?
|
// TODO: what default? something real?
|
||||||
tx.to.unwrap_or_else(|| {
|
tx.to.unwrap_or_else(|| {
|
||||||
|
"0xdead00000000000000000000000000000000beef"
|
||||||
|
.parse::<Address>()
|
||||||
|
.expect("deafbeef")
|
||||||
|
})
|
||||||
|
} else {
|
||||||
"0xdead00000000000000000000000000000000beef"
|
"0xdead00000000000000000000000000000000beef"
|
||||||
.parse::<Address>()
|
.parse::<Address>()
|
||||||
.expect("deafbeef")
|
.expect("deafbeef")
|
||||||
})
|
};
|
||||||
} else {
|
|
||||||
"0xdead00000000000000000000000000000000beef"
|
|
||||||
.parse::<Address>()
|
|
||||||
.expect("deafbeef")
|
|
||||||
};
|
|
||||||
|
|
||||||
let _code = self
|
let _code = self
|
||||||
.internal_request::<_, Option<Bytes>>(
|
.internal_request::<_, Option<Bytes>>(
|
||||||
"eth_getCode",
|
"eth_getCode",
|
||||||
&(to, block_number),
|
&(to, block_number),
|
||||||
error_handler,
|
error_handler,
|
||||||
Some(Duration::from_secs(5)),
|
Some(Duration::from_secs(5)),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// TODO: if head block is none for too long, give an error
|
// TODO: if head block is none for too long, give an error
|
||||||
}
|
}
|
||||||
@ -671,14 +680,13 @@ impl Web3Rpc {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.backup {
|
|
||||||
debug!("reconnecting to {} in 30 seconds", self);
|
|
||||||
} else {
|
|
||||||
info!("reconnecting to {} in 30 seconds", self);
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: exponential backoff with jitter
|
// TODO: exponential backoff with jitter
|
||||||
sleep(Duration::from_secs(30)).await;
|
if self.backup {
|
||||||
|
debug!("reconnecting to {} in 10 seconds", self);
|
||||||
|
} else {
|
||||||
|
info!("reconnecting to {} in 10 seconds", self);
|
||||||
|
}
|
||||||
|
sleep(Duration::from_secs(10)).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -728,7 +736,7 @@ impl Web3Rpc {
|
|||||||
let mut futures = FuturesUnordered::new();
|
let mut futures = FuturesUnordered::new();
|
||||||
|
|
||||||
// TODO: use this channel instead of self.disconnect_watch
|
// TODO: use this channel instead of self.disconnect_watch
|
||||||
let (subscribe_stop_tx, subscribe_stop_rx) = watch::channel(false);
|
let (subscribe_stop_tx, mut subscribe_stop_rx) = watch::channel(false);
|
||||||
|
|
||||||
// subscribe to the disconnect watch. the app uses this when shutting down or when configs change
|
// subscribe to the disconnect watch. the app uses this when shutting down or when configs change
|
||||||
if let Some(disconnect_watch_tx) = self.disconnect_watch.as_ref() {
|
if let Some(disconnect_watch_tx) = self.disconnect_watch.as_ref() {
|
||||||
@ -753,13 +761,14 @@ impl Web3Rpc {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// health check that runs if there haven't been any recent requests
|
// health check that runs if there haven't been any recent requests
|
||||||
if block_and_rpc_sender.is_some() {
|
if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() {
|
||||||
// TODO: move this into a proper function
|
// TODO: move this into a proper function
|
||||||
let rpc = self.clone();
|
let rpc = self.clone();
|
||||||
|
let block_map = block_map.clone();
|
||||||
|
|
||||||
// TODO: how often? different depending on the chain?
|
// TODO: how often? different depending on the chain?
|
||||||
// TODO: reset this timeout when a new block is seen? we need to keep median_request_latency updated though
|
// TODO: reset this timeout when a new block is seen? we need to keep median_request_latency updated though
|
||||||
let health_sleep_seconds = 5;
|
let health_sleep_seconds = 10;
|
||||||
|
|
||||||
// health check loop
|
// health check loop
|
||||||
let f = async move {
|
let f = async move {
|
||||||
@ -767,29 +776,54 @@ impl Web3Rpc {
|
|||||||
let mut old_total_requests = 0;
|
let mut old_total_requests = 0;
|
||||||
let mut new_total_requests;
|
let mut new_total_requests;
|
||||||
|
|
||||||
|
if *subscribe_stop_rx.borrow_and_update() {
|
||||||
|
trace!(
|
||||||
|
"stopping healthcheck loop before even starting it on {}",
|
||||||
|
rpc
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
// errors here should not cause the loop to exit!
|
// errors here should not cause the loop to exit!
|
||||||
while !(*subscribe_stop_rx.borrow()) {
|
loop {
|
||||||
new_total_requests = rpc.internal_requests.load(atomic::Ordering::Relaxed)
|
new_total_requests = rpc.internal_requests.load(atomic::Ordering::Relaxed)
|
||||||
+ rpc.external_requests.load(atomic::Ordering::Relaxed);
|
+ rpc.external_requests.load(atomic::Ordering::Relaxed);
|
||||||
|
|
||||||
if new_total_requests - old_total_requests < 5 {
|
let detailed_healthcheck = new_total_requests - old_total_requests < 5;
|
||||||
// TODO: if this fails too many times, reset the connection
|
|
||||||
// TODO: move this into a function and the chaining should be easier
|
// TODO: if this fails too many times, reset the connection
|
||||||
if let Err(err) = rpc.healthcheck(error_handler).await {
|
if let Err(err) = rpc.check_health(detailed_healthcheck, error_handler).await {
|
||||||
// TODO: different level depending on the error handler
|
rpc.healthy.store(false, atomic::Ordering::Relaxed);
|
||||||
// TODO: if rate limit error, set "retry_at"
|
|
||||||
if rpc.backup {
|
// TODO: different level depending on the error handler
|
||||||
warn!(?err, "health check on {} failed", rpc);
|
// TODO: if rate limit error, set "retry_at"
|
||||||
} else {
|
if rpc.backup {
|
||||||
error!(?err, "health check on {} failed", rpc);
|
warn!(?err, "health check on {} failed", rpc);
|
||||||
}
|
} else {
|
||||||
|
error!(?err, "health check on {} failed", rpc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// clear the head block since we are unhealthy and shouldn't serve any requests
|
||||||
|
rpc.send_head_block_result(Ok(None), &block_and_rpc_sender, &block_map)
|
||||||
|
.await?;
|
||||||
|
} else {
|
||||||
|
rpc.healthy.store(true, atomic::Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: should we count the requests done inside this health check
|
// TODO: should we count the requests done inside this health check
|
||||||
old_total_requests = new_total_requests;
|
old_total_requests = new_total_requests;
|
||||||
|
|
||||||
sleep(Duration::from_secs(health_sleep_seconds)).await;
|
select! {
|
||||||
|
x = subscribe_stop_rx.changed() => {
|
||||||
|
if x.is_err() || *subscribe_stop_rx.borrow_and_update() {
|
||||||
|
trace!(%rpc, "stopping http block subscription");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ = sleep(Duration::from_secs(health_sleep_seconds)) => {
|
||||||
|
// continue
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!("healthcheck loop on {} exited", rpc);
|
trace!("healthcheck loop on {} exited", rpc);
|
||||||
@ -797,15 +831,40 @@ impl Web3Rpc {
|
|||||||
Ok(())
|
Ok(())
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// TODO: log quick_check lik
|
||||||
|
let initial_check = if let Err(err) = self.check_health(false, error_handler).await {
|
||||||
|
if self.backup {
|
||||||
|
warn!(?err, "initial health check on {} failed", self);
|
||||||
|
} else {
|
||||||
|
error!(?err, "initial health check on {} failed", self);
|
||||||
|
}
|
||||||
|
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
true
|
||||||
|
};
|
||||||
|
|
||||||
|
self.healthy.store(initial_check, atomic::Ordering::Relaxed);
|
||||||
|
|
||||||
futures.push(flatten_handle(tokio::spawn(f)));
|
futures.push(flatten_handle(tokio::spawn(f)));
|
||||||
|
} else {
|
||||||
|
self.healthy.store(true, atomic::Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
// subscribe to new heads
|
// subscribe to new heads
|
||||||
if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() {
|
if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() {
|
||||||
let clone = self.clone();
|
let clone = self.clone();
|
||||||
let subscribe_stop_rx = subscribe_stop_tx.subscribe();
|
let mut subscribe_stop_rx = subscribe_stop_tx.subscribe();
|
||||||
let block_map = block_map.clone();
|
let block_map = block_map.clone();
|
||||||
|
|
||||||
|
if *subscribe_stop_rx.borrow_and_update() {
|
||||||
|
return Err(anyhow::anyhow!(
|
||||||
|
"exiting subscribe_new_heads before even starting on {}",
|
||||||
|
self
|
||||||
|
)
|
||||||
|
.into());
|
||||||
|
}
|
||||||
|
|
||||||
let f = async move {
|
let f = async move {
|
||||||
clone
|
clone
|
||||||
.subscribe_new_heads(block_and_rpc_sender.clone(), block_map, subscribe_stop_rx)
|
.subscribe_new_heads(block_and_rpc_sender.clone(), block_map, subscribe_stop_rx)
|
||||||
@ -818,7 +877,15 @@ impl Web3Rpc {
|
|||||||
// subscribe to new transactions
|
// subscribe to new transactions
|
||||||
if let Some(pending_txid_firehose) = pending_txid_firehose.clone() {
|
if let Some(pending_txid_firehose) = pending_txid_firehose.clone() {
|
||||||
let clone = self.clone();
|
let clone = self.clone();
|
||||||
let subscribe_stop_rx = subscribe_stop_tx.subscribe();
|
let mut subscribe_stop_rx = subscribe_stop_tx.subscribe();
|
||||||
|
|
||||||
|
if *subscribe_stop_rx.borrow_and_update() {
|
||||||
|
return Err(anyhow::anyhow!(
|
||||||
|
"exiting subscribe_new_heads before even starting on {}",
|
||||||
|
self
|
||||||
|
)
|
||||||
|
.into());
|
||||||
|
}
|
||||||
|
|
||||||
let f = async move {
|
let f = async move {
|
||||||
clone
|
clone
|
||||||
@ -877,19 +944,23 @@ impl Web3Rpc {
|
|||||||
// TODO: only subscribe if a user has subscribed
|
// TODO: only subscribe if a user has subscribed
|
||||||
let mut pending_txs_sub = ws_provider.subscribe_pending_txs().await?;
|
let mut pending_txs_sub = ws_provider.subscribe_pending_txs().await?;
|
||||||
|
|
||||||
while let Some(x) = pending_txs_sub.next().await {
|
loop {
|
||||||
// TODO: check this less often
|
select! {
|
||||||
if *subscribe_stop_rx.borrow() {
|
x = subscribe_stop_rx.changed() => {
|
||||||
// TODO: this is checking way too often. have this on a timer instead
|
if x.is_err() || *subscribe_stop_rx.borrow_and_update() {
|
||||||
trace!("stopping ws block subscription on {}", self);
|
break;
|
||||||
break;
|
}
|
||||||
|
}
|
||||||
|
x = pending_txs_sub.next() => {
|
||||||
|
if let Some(x) = x {
|
||||||
|
pending_txid_firehose.send(x).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pending_txid_firehose.send(x).await;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// only websockets subscribe to pending transactions
|
// only websockets subscribe to pending transactions
|
||||||
// its possibel to do with http, but not recommended
|
// its possible to do with http, but not recommended
|
||||||
// TODO: what should we do here?
|
// TODO: what should we do here?
|
||||||
loop {
|
loop {
|
||||||
if *subscribe_stop_rx.borrow_and_update() {
|
if *subscribe_stop_rx.borrow_and_update() {
|
||||||
@ -942,16 +1013,21 @@ impl Web3Rpc {
|
|||||||
self.send_head_block_result(latest_block, &block_sender, &block_map)
|
self.send_head_block_result(latest_block, &block_sender, &block_map)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
while let Some(block) = blocks.next().await {
|
loop {
|
||||||
if *subscribe_stop_rx.borrow() {
|
select! {
|
||||||
trace!("stopping ws block subscription on {}", self);
|
x = subscribe_stop_rx.changed() => {
|
||||||
break;
|
if x.is_err() || *subscribe_stop_rx.borrow_and_update() {
|
||||||
|
trace!(%self, "stopping websocket block subscription");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
block = blocks.next() => {
|
||||||
|
let block = block.map(Arc::new);
|
||||||
|
|
||||||
|
self.send_head_block_result(Ok(block), &block_sender, &block_map)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let block = Arc::new(block);
|
|
||||||
|
|
||||||
self.send_head_block_result(Ok(Some(block)), &block_sender, &block_map)
|
|
||||||
.await?;
|
|
||||||
}
|
}
|
||||||
} else if self.http_client.is_some() {
|
} else if self.http_client.is_some() {
|
||||||
// there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints
|
// there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints
|
||||||
@ -960,11 +1036,6 @@ impl Web3Rpc {
|
|||||||
i.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
i.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if *subscribe_stop_rx.borrow_and_update() {
|
|
||||||
trace!(%self, "stopping http block subscription");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
let block_result = self
|
let block_result = self
|
||||||
.internal_request::<_, Option<ArcBlock>>(
|
.internal_request::<_, Option<ArcBlock>>(
|
||||||
"eth_getBlockByNumber",
|
"eth_getBlockByNumber",
|
||||||
@ -977,7 +1048,16 @@ impl Web3Rpc {
|
|||||||
self.send_head_block_result(block_result, &block_sender, &block_map)
|
self.send_head_block_result(block_result, &block_sender, &block_map)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
i.tick().await;
|
// TODO: should this select be at the start or end of the loop?
|
||||||
|
select! {
|
||||||
|
x = subscribe_stop_rx.changed() => {
|
||||||
|
if x.is_err() || *subscribe_stop_rx.borrow_and_update() {
|
||||||
|
trace!(%self, "stopping http block subscription");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ = i.tick() => {}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return Err(anyhow!("no ws or http provider!").into());
|
return Err(anyhow!("no ws or http provider!").into());
|
||||||
@ -999,9 +1079,13 @@ impl Web3Rpc {
|
|||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
web3_request: &Arc<Web3Request>,
|
web3_request: &Arc<Web3Request>,
|
||||||
error_handler: Option<RequestErrorHandler>,
|
error_handler: Option<RequestErrorHandler>,
|
||||||
|
allow_unhealthy: bool,
|
||||||
) -> Web3ProxyResult<OpenRequestHandle> {
|
) -> Web3ProxyResult<OpenRequestHandle> {
|
||||||
loop {
|
loop {
|
||||||
match self.try_request_handle(web3_request, error_handler).await {
|
match self
|
||||||
|
.try_request_handle(web3_request, error_handler, allow_unhealthy)
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(OpenRequestResult::Handle(handle)) => return Ok(handle),
|
Ok(OpenRequestResult::Handle(handle)) => return Ok(handle),
|
||||||
Ok(OpenRequestResult::RetryAt(retry_at)) => {
|
Ok(OpenRequestResult::RetryAt(retry_at)) => {
|
||||||
// TODO: emit a stat?
|
// TODO: emit a stat?
|
||||||
@ -1137,10 +1221,15 @@ impl Web3Rpc {
|
|||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
web3_request: &Arc<Web3Request>,
|
web3_request: &Arc<Web3Request>,
|
||||||
error_handler: Option<RequestErrorHandler>,
|
error_handler: Option<RequestErrorHandler>,
|
||||||
|
allow_unhealthy: bool,
|
||||||
) -> Web3ProxyResult<OpenRequestResult> {
|
) -> Web3ProxyResult<OpenRequestResult> {
|
||||||
// TODO: check a boolean set by health checks?
|
|
||||||
// TODO: if websocket is reconnecting, return an error?
|
// TODO: if websocket is reconnecting, return an error?
|
||||||
|
|
||||||
|
// TODO: what if this is a health check?!
|
||||||
|
if !(self.healthy.load(atomic::Ordering::Relaxed) || allow_unhealthy) {
|
||||||
|
return Ok(OpenRequestResult::Failed);
|
||||||
|
}
|
||||||
|
|
||||||
// make sure this block has the oldest block that this request needs
|
// make sure this block has the oldest block that this request needs
|
||||||
// TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check
|
// TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check
|
||||||
if let Some(block_needed) = web3_request.min_block_needed() {
|
if let Some(block_needed) = web3_request.min_block_needed() {
|
||||||
@ -1230,16 +1319,20 @@ impl Web3Rpc {
|
|||||||
// TODO: think about this more. its hard to do this without being self-referenctial!
|
// TODO: think about this more. its hard to do this without being self-referenctial!
|
||||||
let web3_request = Web3Request::new_internal(method.into(), params, None, max_wait).await?;
|
let web3_request = Web3Request::new_internal(method.into(), params, None, max_wait).await?;
|
||||||
|
|
||||||
self.authorized_request(&web3_request, error_handler).await
|
// TODO: if we are inside the health checks and we aren't healthy yet. we need some sort of flag to force try_handle to not error
|
||||||
|
|
||||||
|
self.authorized_request(&web3_request, error_handler, true)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn authorized_request<R: JsonRpcResultData>(
|
pub async fn authorized_request<R: JsonRpcResultData>(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
web3_request: &Arc<Web3Request>,
|
web3_request: &Arc<Web3Request>,
|
||||||
error_handler: Option<RequestErrorHandler>,
|
error_handler: Option<RequestErrorHandler>,
|
||||||
|
allow_unhealthy: bool,
|
||||||
) -> Web3ProxyResult<R> {
|
) -> Web3ProxyResult<R> {
|
||||||
let handle = self
|
let handle = self
|
||||||
.wait_for_request_handle(web3_request, error_handler)
|
.wait_for_request_handle(web3_request, error_handler, allow_unhealthy)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let response = handle.request().await?;
|
let response = handle.request().await?;
|
||||||
@ -1295,7 +1388,7 @@ impl Serialize for Web3Rpc {
|
|||||||
S: Serializer,
|
S: Serializer,
|
||||||
{
|
{
|
||||||
// 14 if we bring head_delay back
|
// 14 if we bring head_delay back
|
||||||
let mut state = serializer.serialize_struct("Web3Rpc", 13)?;
|
let mut state = serializer.serialize_struct("Web3Rpc", 14)?;
|
||||||
|
|
||||||
// the url is excluded because it likely includes private information. just show the name that we use in keys
|
// the url is excluded because it likely includes private information. just show the name that we use in keys
|
||||||
state.serialize_field("name", &self.name)?;
|
state.serialize_field("name", &self.name)?;
|
||||||
@ -1365,6 +1458,10 @@ impl Serialize for Web3Rpc {
|
|||||||
let weighted_latency_ms = self.weighted_peak_latency().as_secs_f32() * 1000.0;
|
let weighted_latency_ms = self.weighted_peak_latency().as_secs_f32() * 1000.0;
|
||||||
state.serialize_field("weighted_latency_ms", &weighted_latency_ms)?;
|
state.serialize_field("weighted_latency_ms", &weighted_latency_ms)?;
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
let healthy = self.healthy.load(atomic::Ordering::Relaxed);
|
||||||
|
state.serialize_field("healthy", &healthy)?;
|
||||||
|
}
|
||||||
|
|
||||||
state.end()
|
state.end()
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user