yes health, no channel
This commit is contained in:
parent
63adbc0587
commit
3c9576c13b
@ -129,7 +129,6 @@ pub struct Web3Rpc {
|
|||||||
pub(super) head_latency: RwLock<Latency>,
|
pub(super) head_latency: RwLock<Latency>,
|
||||||
/// Track request latency
|
/// Track request latency
|
||||||
pub(super) request_latency: RwLock<Latency>,
|
pub(super) request_latency: RwLock<Latency>,
|
||||||
pub(super) request_latency_sender: Option<flume::Sender<Duration>>,
|
|
||||||
/// Track total requests served
|
/// Track total requests served
|
||||||
/// TODO: maybe move this to graphana
|
/// TODO: maybe move this to graphana
|
||||||
pub(super) total_requests: AtomicUsize,
|
pub(super) total_requests: AtomicUsize,
|
||||||
@ -217,9 +216,6 @@ impl Web3Rpc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: what max capacity?
|
|
||||||
let (request_latency_sender, request_latency_receiver) = flume::bounded(10_000);
|
|
||||||
|
|
||||||
let new_connection = Self {
|
let new_connection = Self {
|
||||||
name,
|
name,
|
||||||
db_conn: db_conn.clone(),
|
db_conn: db_conn.clone(),
|
||||||
@ -234,7 +230,6 @@ impl Web3Rpc {
|
|||||||
backup,
|
backup,
|
||||||
block_data_limit,
|
block_data_limit,
|
||||||
tier: config.tier,
|
tier: config.tier,
|
||||||
request_latency_sender: Some(request_latency_sender),
|
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -254,7 +249,6 @@ impl Web3Rpc {
|
|||||||
block_sender,
|
block_sender,
|
||||||
chain_id,
|
chain_id,
|
||||||
http_interval_sender,
|
http_interval_sender,
|
||||||
request_latency_receiver,
|
|
||||||
reconnect,
|
reconnect,
|
||||||
tx_id_sender,
|
tx_id_sender,
|
||||||
)
|
)
|
||||||
@ -641,12 +635,9 @@ impl Web3Rpc {
|
|||||||
block_sender: Option<flume::Sender<BlockAndRpc>>,
|
block_sender: Option<flume::Sender<BlockAndRpc>>,
|
||||||
chain_id: u64,
|
chain_id: u64,
|
||||||
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
|
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
|
||||||
request_latency_receiver: flume::Receiver<Duration>,
|
|
||||||
reconnect: bool,
|
reconnect: bool,
|
||||||
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
|
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let request_latency_receiver = Arc::new(request_latency_receiver);
|
|
||||||
|
|
||||||
let revert_handler = if self.backup {
|
let revert_handler = if self.backup {
|
||||||
RequestRevertHandler::DebugLevel
|
RequestRevertHandler::DebugLevel
|
||||||
} else {
|
} else {
|
||||||
@ -699,40 +690,40 @@ impl Web3Rpc {
|
|||||||
|
|
||||||
if new_total_requests - old_total_requests < 10 {
|
if new_total_requests - old_total_requests < 10 {
|
||||||
// TODO: if this fails too many times, reset the connection
|
// TODO: if this fails too many times, reset the connection
|
||||||
// let head_block = conn.head_block.read().clone();
|
let head_block = conn.head_block.read().clone();
|
||||||
|
|
||||||
// if let Some((block_hash, txid)) = head_block.and_then(|x| {
|
if let Some((block_hash, txid)) = head_block.and_then(|x| {
|
||||||
// let block = x.block.clone();
|
let block = x.block.clone();
|
||||||
|
|
||||||
// let block_hash = block.hash?;
|
let block_hash = block.hash?;
|
||||||
// let txid = block.transactions.last().cloned()?;
|
let txid = block.transactions.last().cloned()?;
|
||||||
|
|
||||||
// Some((block_hash, txid))
|
Some((block_hash, txid))
|
||||||
// }) {
|
}) {
|
||||||
// let authorization = authorization.clone();
|
let authorization = authorization.clone();
|
||||||
// let conn = conn.clone();
|
let conn = conn.clone();
|
||||||
|
|
||||||
// let x = async move {
|
let x = async move {
|
||||||
// conn.try_request_handle(&authorization, Some(client)).await
|
conn.try_request_handle(&authorization, Some(client)).await
|
||||||
// }
|
}
|
||||||
// .await;
|
.await;
|
||||||
|
|
||||||
// if let Ok(OpenRequestResult::Handle(x)) = x {
|
if let Ok(OpenRequestResult::Handle(x)) = x {
|
||||||
// if let Ok(Some(x)) = x
|
if let Ok(Some(x)) = x
|
||||||
// .request::<_, Option<Transaction>>(
|
.request::<_, Option<Transaction>>(
|
||||||
// "eth_getTransactionByHash",
|
"eth_getTransactionByHash",
|
||||||
// &txid,
|
&txid,
|
||||||
// revert_handler,
|
revert_handler,
|
||||||
// None,
|
None,
|
||||||
// )
|
)
|
||||||
// .await
|
.await
|
||||||
// {
|
{
|
||||||
// // TODO: make this flatter
|
// TODO: make this flatter
|
||||||
// // TODO: do more (fair, not random) things here
|
// TODO: do more (fair, not random) things here
|
||||||
// // let = x.request("eth_getCode", (tx.to.unwrap_or(Address::zero()), block_hash), RequestRevertHandler::ErrorLevel, Some(client.clone()))
|
// let = x.request("eth_getCode", (tx.to.unwrap_or(Address::zero()), block_hash), RequestRevertHandler::ErrorLevel, Some(client.clone()))
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
old_total_requests = new_total_requests;
|
old_total_requests = new_total_requests;
|
||||||
@ -765,23 +756,6 @@ impl Web3Rpc {
|
|||||||
futures.push(flatten_handle(tokio::spawn(f)));
|
futures.push(flatten_handle(tokio::spawn(f)));
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
|
||||||
let conn = self.clone();
|
|
||||||
let request_latency_receiver = request_latency_receiver.clone();
|
|
||||||
|
|
||||||
let f = async move {
|
|
||||||
while let Ok(latency) = request_latency_receiver.recv_async().await {
|
|
||||||
conn.request_latency
|
|
||||||
.write()
|
|
||||||
.record(latency.as_secs_f64() * 1000.0);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
};
|
|
||||||
|
|
||||||
futures.push(flatten_handle(tokio::spawn(f)));
|
|
||||||
}
|
|
||||||
|
|
||||||
match try_join_all(futures).await {
|
match try_join_all(futures).await {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
// futures all exited without error. break instead of restarting subscriptions
|
// futures all exited without error. break instead of restarting subscriptions
|
||||||
|
@ -374,10 +374,12 @@ impl OpenRequestHandle {
|
|||||||
tokio::spawn(f);
|
tokio::spawn(f);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if let Some(x) = self.rpc.request_latency_sender.as_ref() {
|
} else {
|
||||||
if let Err(err) = x.send(start.elapsed()) {
|
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
|
||||||
error!("no request latency sender! {:#?}", err);
|
|
||||||
}
|
let mut latency_recording = self.rpc.request_latency.write();
|
||||||
|
|
||||||
|
latency_recording.record(latency_ms);
|
||||||
}
|
}
|
||||||
|
|
||||||
response
|
response
|
||||||
|
Loading…
Reference in New Issue
Block a user