even more metrics

This commit is contained in:
Bryan Stitt 2022-09-09 03:53:16 +00:00
parent c646ca9eab
commit c8da98d12e
6 changed files with 67 additions and 35 deletions

@ -8,6 +8,7 @@ use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum;
use crate::rpcs::blockchain::{ArcBlock, BlockHashesMap, BlockId};
use crate::rpcs::connections::Web3Connections;
use crate::rpcs::request::OpenRequestHandleMetrics;
use crate::rpcs::transactions::TxStatus;
use anyhow::Context;
use axum::extract::ws::Message;
@ -29,11 +30,12 @@ use redis_rate_limit::{
RedisConnectionManager, RedisErrorSink, RedisPool, RedisRateLimit,
};
use sea_orm::DatabaseConnection;
use serde::Serialize;
use serde_json::json;
use std::fmt;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::atomic::{self, AtomicU64, AtomicUsize};
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, watch};
@ -81,9 +83,8 @@ pub struct Web3ProxyApp {
pub config: AppConfig,
pub db_conn: Option<sea_orm::DatabaseConnection>,
/// prometheus metrics
metrics: Arc<Web3ProxyAppMetrics>,
/// store pending queries so that we don't send the same request to our backends multiple times
pub total_queries: AtomicU64,
app_metrics: Arc<Web3ProxyAppMetrics>,
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
pub pending_transactions: Cache<TxHash, TxStatus>,
pub frontend_rate_limiter: Option<RedisRateLimit>,
@ -144,7 +145,7 @@ pub async fn get_migrated_db(
Ok(db)
}
#[metered(registry = Web3ProxyAppMetrics)]
#[metered(registry = Web3ProxyAppMetrics, registry_expr = self.app_metrics, visibility = pub)]
impl Web3ProxyApp {
pub async fn spawn(
top_config: TopConfig,
@ -159,6 +160,9 @@ impl Web3ProxyApp {
"redirect user url must contain \"{{user_id}}\""
);
let app_metrics = Default::default();
let open_request_handle_metrics: Arc<OpenRequestHandleMetrics> = Default::default();
// first, we connect to mysql and make sure the latest migrations have run
let db_conn = if let Some(db_url) = &top_config.app.db_url {
let db_min_connections = top_config.app.db_min_connections.unwrap_or(num_workers);
@ -262,6 +266,7 @@ impl Web3ProxyApp {
top_config.app.min_synced_rpcs,
Some(pending_tx_sender.clone()),
pending_transactions.clone(),
open_request_handle_metrics.clone(),
)
.await
.context("balanced rpcs")?;
@ -288,6 +293,7 @@ impl Web3ProxyApp {
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits
None,
pending_transactions.clone(),
open_request_handle_metrics.clone(),
)
.await
.context("private_rpcs")?;
@ -308,7 +314,6 @@ impl Web3ProxyApp {
});
// TODO: change this to a sized cache
let total_queries = 0.into();
let response_cache = Cache::new(10_000);
let user_cache = Cache::new(10_000);
@ -319,12 +324,12 @@ impl Web3ProxyApp {
response_cache,
head_block_receiver,
pending_tx_sender,
total_queries,
pending_transactions,
frontend_rate_limiter,
db_conn,
redis_pool,
metrics: Default::default(),
app_metrics,
open_request_handle_metrics,
user_cache,
};
@ -340,7 +345,18 @@ impl Web3ProxyApp {
// TODO: what globals? should this be the hostname or what?
// globals.insert("service", "web3_proxy");
let serialized = serde_prometheus::to_string(&self.metrics, Some("web3_proxy"), globals)?;
#[derive(Serialize)]
struct CombinedMetrics<'a> {
app: &'a Web3ProxyAppMetrics,
backend_rpc: &'a OpenRequestHandleMetrics,
}
let metrics = CombinedMetrics {
app: &self.app_metrics,
backend_rpc: &self.open_request_handle_metrics,
};
let serialized = serde_prometheus::to_string(&metrics, Some("web3_proxy"), globals)?;
Ok(serialized)
}
@ -618,12 +634,10 @@ impl Web3ProxyApp {
// TODO: how much should we retry? probably with a timeout and not with a count like this
// TODO: think more about this loop.
// // TODO: add more to this span such as
// TODO: add things to this span
let span = info_span!("rpc_request");
// let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon)
self.total_queries.fetch_add(1, atomic::Ordering::Relaxed);
// TODO: don't clone
let partial_response: serde_json::Value = match request.method.clone().as_ref() {
// lots of commands are blocked

@ -1,5 +1,6 @@
use crate::rpcs::blockchain::BlockHashesMap;
use crate::rpcs::connection::Web3Connection;
use crate::rpcs::request::OpenRequestHandleMetrics;
use crate::{app::AnyhowJoinHandle, rpcs::blockchain::ArcBlock};
use argh::FromArgs;
use derive_more::Constructor;
@ -125,6 +126,7 @@ impl Web3ConnectionConfig {
block_map: BlockHashesMap,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<TxHashAndRpc>>,
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
let hard_limit = match (self.hard_limit, redis_pool) {
(None, None) => None,
@ -156,6 +158,7 @@ impl Web3ConnectionConfig {
tx_id_sender,
true,
self.weight,
open_request_handle_metrics,
)
.await
}

@ -1,8 +1,6 @@
use crate::app::Web3ProxyApp;
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
use moka::future::ConcurrentCacheExt;
use serde_json::json;
use std::sync::{atomic, Arc};
use axum::{http::StatusCode, response::IntoResponse, Extension};
use std::sync::Arc;
use tracing::instrument;
/// Health check page for load balancers to use

@ -1,7 +1,7 @@
///! Rate-limited communication with a web3 provider.
use super::blockchain::{ArcBlock, BlockHashesMap, BlockId};
use super::provider::Web3Provider;
use super::request::{OpenRequestHandle, OpenRequestResult};
use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult};
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::BlockAndRpc;
use anyhow::Context;
@ -30,6 +30,7 @@ pub struct Web3Connection {
pub(super) active_requests: AtomicU32,
/// keep track of total requests
/// TODO: is this type okay?
/// TODO: replace this with something in metered?
pub(super) total_requests: AtomicU64,
/// provider is in a RwLock so that we can replace it if re-connecting
/// it is an async lock because we hold it open across awaits
@ -44,6 +45,7 @@ pub struct Web3Connection {
pub(super) weight: u32,
// TODO: async lock?
pub(super) head_block_id: RwLock<Option<BlockId>>,
pub(super) open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
}
impl Web3Connection {
@ -67,6 +69,7 @@ impl Web3Connection {
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool,
weight: u32,
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
let hard_limit = hard_limit.map(|(hard_rate_limit, redis_conection)| {
// TODO: allow configurable period and max_burst
@ -92,6 +95,7 @@ impl Web3Connection {
block_data_limit: Default::default(),
head_block_id: RwLock::new(Default::default()),
weight,
open_request_handle_metrics,
};
let new_connection = Arc::new(new_connection);

@ -1,7 +1,7 @@
///! Load balanced communication with a group of web3 providers
use super::blockchain::{ArcBlock, BlockHashesMap};
use super::connection::Web3Connection;
use super::request::{OpenRequestHandle, OpenRequestResult};
use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult};
use super::synced_connections::SyncedConnections;
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig};
@ -63,6 +63,7 @@ impl Web3Connections {
min_synced_rpcs: usize,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
pending_transactions: Cache<TxHash, TxStatus>,
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) = flume::unbounded::<BlockAndRpc>();
@ -119,6 +120,7 @@ impl Web3Connections {
let pending_tx_id_sender = Some(pending_tx_id_sender.clone());
let block_map = block_map.clone();
let open_request_handle_metrics = open_request_handle_metrics.clone();
tokio::spawn(async move {
server_config
@ -131,6 +133,7 @@ impl Web3Connections {
block_map,
block_sender,
pending_tx_id_sender,
open_request_handle_metrics,
)
.await
})

@ -1,6 +1,11 @@
use super::connection::Web3Connection;
use super::provider::Web3Provider;
// use metered::{measure, ErrorCount, HitCount, InFlight, ResponseTime, Throughput};
use metered::metered;
use metered::ErrorCount;
use metered::HitCount;
use metered::InFlight;
use metered::ResponseTime;
use metered::Throughput;
use std::fmt;
use std::sync::atomic;
use std::sync::Arc;
@ -19,34 +24,39 @@ pub enum OpenRequestResult {
/// Make RPC requests through this handle and drop it when you are done.
#[derive(Debug)]
pub struct OpenRequestHandle(Arc<Web3Connection>);
pub struct OpenRequestHandle {
conn: Arc<Web3Connection>,
// TODO: this is the same metrics on the conn. use a reference
metrics: Arc<OpenRequestHandleMetrics>,
}
#[metered(registry = OpenRequestHandleMetrics, visibility = pub)]
impl OpenRequestHandle {
pub fn new(connection: Arc<Web3Connection>) -> Self {
pub fn new(conn: Arc<Web3Connection>) -> Self {
// TODO: attach a unique id to this? customer requests have one, but not internal queries
// TODO: what ordering?!
connection
.active_requests
.fetch_add(1, atomic::Ordering::AcqRel);
// TODO: should we be using metered, or not? i think not because we want stats for each handle
// TODO: these should maybe be sent to an influxdb instance?
conn.active_requests.fetch_add(1, atomic::Ordering::AcqRel);
// TODO: handle overflows?
// TODO: what ordering?
connection
.total_requests
.fetch_add(1, atomic::Ordering::Relaxed);
conn.total_requests.fetch_add(1, atomic::Ordering::Relaxed);
Self(connection)
let metrics = conn.open_request_handle_metrics.clone();
Self { conn, metrics }
}
pub fn clone_connection(&self) -> Arc<Web3Connection> {
self.0.clone()
self.conn.clone()
}
/// Send a web3 request
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
/// By taking self here, we ensure that this is dropped after the request is complete
// #[measure([ErrorCount, HitCount, InFlight, ResponseTime, Throughput])]
#[instrument(skip_all)]
#[measure([ErrorCount, HitCount, InFlight, ResponseTime, Throughput])]
pub async fn request<T, R>(
&self,
method: &str,
@ -59,14 +69,14 @@ impl OpenRequestHandle {
// TODO: use tracing spans properly
// TODO: requests from customers have request ids, but we should add
// TODO: including params in this is way too verbose
trace!(rpc=%self.0, %method, "request");
trace!(rpc=%self.conn, %method, "request");
let mut provider = None;
while provider.is_none() {
match self.0.provider.read().await.as_ref() {
match self.conn.provider.read().await.as_ref() {
None => {
warn!(rpc=%self.0, "no provider!");
warn!(rpc=%self.conn, "no provider!");
// TODO: how should this work? a reconnect should be in progress. but maybe force one now?
// TODO: sleep how long? subscribe to something instead?
sleep(Duration::from_millis(100)).await
@ -83,7 +93,7 @@ impl OpenRequestHandle {
// TODO: i think ethers already has trace logging (and does it much more fancy)
// TODO: at least instrument this with more useful information
// trace!(rpc=%self.0, %method, ?response);
trace!(rpc=%self.0, %method, "response");
trace!(rpc=%self.conn, %method, "response");
response
}
@ -91,7 +101,7 @@ impl OpenRequestHandle {
impl Drop for OpenRequestHandle {
fn drop(&mut self) {
self.0
self.conn
.active_requests
.fetch_sub(1, atomic::Ordering::AcqRel);
}