less instrumenting

This commit is contained in:
Bryan Stitt 2022-05-17 17:15:18 +00:00
parent c1144e5e42
commit 997d4fa5bb
4 changed files with 53 additions and 22 deletions

@ -17,6 +17,7 @@ use std::time::Duration;
use tokio::sync::watch;
use tokio::task;
use tokio::time::sleep;
use tracing::info_span;
use tracing::{debug, instrument, trace, warn};
static APP_USER_AGENT: &str = concat!(
@ -57,7 +58,7 @@ impl fmt::Debug for Web3ProxyApp {
}
impl Web3ProxyApp {
#[instrument(name = "try_new_Web3ProxyApp", skip_all)]
// #[instrument(name = "try_new_Web3ProxyApp", skip_all)]
pub async fn try_new(
chain_id: usize,
balanced_rpcs: Vec<Web3ConnectionConfig>,
@ -126,7 +127,7 @@ impl Web3ProxyApp {
Ok(warp::reply::json(&response))
}
#[instrument(skip_all)]
// #[instrument(skip_all)]
async fn proxy_web3_rpc_requests(
self: Arc<Web3ProxyApp>,
requests: Vec<JsonRpcRequest>,
@ -158,7 +159,7 @@ impl Web3ProxyApp {
Ok(collected)
}
#[instrument(skip_all)]
// #[instrument(skip_all)]
async fn proxy_web3_rpc_request(
self: Arc<Web3ProxyApp>,
request: JsonRpcRequest,
@ -228,10 +229,16 @@ impl Web3ProxyApp {
} else {
// this is not a private transaction (or no private relays are configured)
// TODO: how much should we retry?
for _ in 0..10 {
for i in 0..10 {
// TODO: think more about this loop.
// TODO: set tracing span to have the loop count in it
let span = info_span!("i", i);
let _enter = span.enter();
// todo: move getting a cache_key or the result into a helper function. then we could have multiple caches
// TODO: i think we are maybe getting stuck on this lock. maybe a new block arrives, it tries to write and gets hung up on something. then this can't proceed
trace!("{:?} waiting for best_block_hash", request);
let best_block_hash = self
.balanced_rpcs
.get_synced_rpcs()
@ -239,6 +246,8 @@ impl Web3ProxyApp {
.map(|x| *x.get_head_block_hash())
.unwrap();
trace!("{:?} best_block_hash {}", request, best_block_hash);
// TODO: building this cache key is slow and its large, but i don't see a better way right now
// TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block
let cache_key = (
@ -252,9 +261,11 @@ impl Web3ProxyApp {
let _ = self.active_requests.remove(&cache_key);
// TODO: emit a stat
trace!("cache hit!");
trace!("{:?} cache hit!", request);
return Ok(cached.to_owned());
} else {
trace!("{:?} cache miss!", request);
}
// check if this request is already in flight
@ -271,6 +282,8 @@ impl Web3ProxyApp {
if let Some(mut other_in_flight_rx) = other_in_flight_rx {
// wait for the other request to finish. it can finish successfully or with an error
trace!("{:?} waiting on in-flight request", request);
let _ = other_in_flight_rx.changed().await;
// now that we've waited, lets check the cache again
@ -278,7 +291,19 @@ impl Web3ProxyApp {
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
// TODO: emit a stat
trace!(
"{:?} cache hit after waiting for in-flight request!",
request
);
return Ok(cached.to_owned());
} else {
// TODO: emit a stat
trace!(
"{:?} cache miss after waiting for in-flight request!",
request
);
}
}
@ -291,7 +316,7 @@ impl Web3ProxyApp {
let response = match response {
Ok(partial_response) => {
// TODO: trace here was really slow with millions of requests.
// info!("forwarding request from {}", upstream_server);
// trace!("forwarding request from {}", upstream_server);
let response = JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
@ -383,15 +408,18 @@ impl Web3ProxyApp {
}
};
if response.error.is_some() {
trace!("Sending error reply: {:?}", response);
} else {
trace!("Sending reply: {:?}", response);
}
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
if response.error.is_some() {
trace!("Sending error reply: {:?}", response);
// errors already sent false to the in_flight_tx
} else {
trace!("Sending reply: {:?}", response);
let _ = in_flight_tx.send(false);
}
return Ok(response);
}
@ -416,12 +444,15 @@ impl Web3ProxyApp {
sleep(deadline).await;
warn!("All rate limits exceeded. Sleeping");
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
// continue
continue;
}
}
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
}
}

@ -3,7 +3,7 @@ use governor::clock::QuantaClock;
use serde::Deserialize;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::instrument;
// use tracing::instrument;
use crate::connection::Web3Connection;
use crate::Web3ProxyApp;
@ -43,7 +43,7 @@ pub struct Web3ConnectionConfig {
impl RpcConfig {
/// Create a Web3ProxyApp from config
#[instrument(name = "try_build_RpcConfig", skip_all)]
// #[instrument(name = "try_build_RpcConfig", skip_all)]
pub async fn try_build(self) -> anyhow::Result<Web3ProxyApp> {
let balanced_rpcs = self.balanced_rpcs.into_values().collect();
@ -59,7 +59,7 @@ impl RpcConfig {
impl Web3ConnectionConfig {
/// Create a Web3Connection from config
#[instrument(name = "try_build_Web3ConnectionConfig", skip_all)]
// #[instrument(name = "try_build_Web3ConnectionConfig", skip_all)]
pub async fn try_build(
self,
clock: &QuantaClock,

@ -146,7 +146,7 @@ impl fmt::Debug for Web3Connections {
}
impl Web3Connections {
#[instrument(name = "try_new_Web3Connections", skip_all)]
// #[instrument(name = "try_new_Web3Connections", skip_all)]
pub async fn try_new(
chain_id: usize,
servers: Vec<Web3ConnectionConfig>,

@ -22,7 +22,7 @@ use crate::config::{CliConfig, RpcConfig};
fn main() -> anyhow::Result<()> {
// install global collector configured based on RUST_LOG env var.
// tracing_subscriber::fmt::init();
// TODO: if RUST_LOG isn't set, set it to "web3_proxy=debug" or something
console_subscriber::init();
let cli_config: CliConfig = argh::from_env();