diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 872214d9..5ee3089e 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -320,7 +320,7 @@ impl Web3ProxyApp { .build() .into(); - // Generate the instance name (hostname + random hash) + // Generate the instance name let instance_hash = Ulid::new().to_string(); // create a channel for receiving stats diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index 659172ea..9f908ee0 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -26,6 +26,7 @@ use serde::Serialize; use serde_json::value::RawValue; use siwe::VerificationError; use std::sync::Arc; +use std::time::Duration; use std::{borrow::Cow, net::IpAddr}; use tokio::{sync::AcquireError, task::JoinError, time::Instant}; use tracing::{debug, error, trace, warn}; @@ -148,7 +149,7 @@ pub enum Web3ProxyError { /// TODO: what should be attached to the timout? #[display(fmt = "{:?}", _0)] #[error(ignore)] - Timeout(Option), + Timeout(Option), UlidDecode(ulid::DecodeError), #[error(ignore)] UnknownBlockHash(H256), @@ -1128,8 +1129,8 @@ impl From for Web3ProxyError { } impl From for Web3ProxyError { - fn from(err: tokio::time::error::Elapsed) -> Self { - Self::Timeout(Some(err)) + fn from(_: tokio::time::error::Elapsed) -> Self { + Self::Timeout(None) } } diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 0978481c..832936d5 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -10,8 +10,8 @@ pub mod rpc_proxy_ws; pub mod status; pub mod users; -use crate::app::Web3ProxyApp; use crate::errors::Web3ProxyResult; +use crate::{app::Web3ProxyApp, errors::Web3ProxyError}; use axum::{ error_handling::HandleErrorLayer, routing::{get, post}, @@ -269,9 +269,8 @@ pub async fn serve( ServiceBuilder::new() // this middleware goes above `TimeoutLayer` because it will receive // errors returned by `TimeoutLayer` - // TODO: JsonRPC error response .layer(HandleErrorLayer::new(|_: BoxError| async { - StatusCode::REQUEST_TIMEOUT + Web3ProxyError::Timeout(Some(Duration::from_secs(5 * 60))) })) .layer(TimeoutLayer::new(Duration::from_secs(5 * 60))), ) diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index b757363e..df220e0a 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -568,7 +568,6 @@ pub async fn query_user_influx_stats<'a>( } }); - // datapoints.insert(out.get("time"), out); json!(out) }) .collect::>(); diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index a410d943..f5947ded 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -479,37 +479,29 @@ impl BufferedRpcQueryStats { measurement: &str, chain_id: u64, key: RpcQueryKey, - instance: &String, + instance: &str, ) -> anyhow::Result { - let mut builder = DataPoint::builder(measurement); - - builder = builder.tag("chain_id", chain_id.to_string()); - - if let Some(rpc_secret_key_id) = key.rpc_secret_key_id { - builder = builder.tag("rpc_secret_key_id", rpc_secret_key_id.to_string()); - } - - builder = builder.tag("instance", instance); - - builder = builder.tag("method", key.method); - - builder = builder + let mut builder = DataPoint::builder(measurement) .tag("archive_needed", key.archive_needed.to_string()) + .tag("chain_id", chain_id.to_string()) .tag("error_response", key.error_response.to_string()) + .tag("instance", instance) + .tag("method", key.method) .tag("user_error_response", key.user_error_response.to_string()) - .field("frontend_requests", self.frontend_requests as i64) + .timestamp(key.response_timestamp) .field("backend_requests", self.backend_requests as i64) - .field("no_servers", self.no_servers as i64) - .field("cache_misses", self.cache_misses as i64) .field("cache_hits", self.cache_hits as i64) + .field("cache_misses", self.cache_misses as i64) + .field("frontend_requests", self.frontend_requests as i64) + .field("no_servers", self.no_servers as i64) .field("sum_request_bytes", self.sum_request_bytes as i64) - .field("sum_response_millis", self.sum_response_millis as i64) .field("sum_response_bytes", self.sum_response_bytes as i64) + .field("sum_response_millis", self.sum_response_millis as i64) .field( - "sum_incl_free_credits_used", - self.sum_credits_used + "balance", + self.approximate_balance_remaining .to_f64() - .context("sum_credits_used is really (too) large")?, + .context("balance is really (too) large")?, ) .field( "sum_credits_used", @@ -518,13 +510,16 @@ impl BufferedRpcQueryStats { .context("sum_credits_used is really (too) large")?, ) .field( - "balance", - self.approximate_balance_remaining + "sum_incl_free_credits_used", + self.sum_credits_used .to_f64() - .context("balance is really (too) large")?, + .context("sum_credits_used is really (too) large")?, ); - builder = builder.timestamp(key.response_timestamp); + // TODO: set the rpc_secret_key_id tag to 0 when anon? will that make other queries easier? + if let Some(rpc_secret_key_id) = key.rpc_secret_key_id { + builder = builder.tag("rpc_secret_key_id", rpc_secret_key_id.to_string()); + } let point = builder.build()?; diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index d106e196..a09f946d 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -50,6 +50,7 @@ pub struct StatBuffer { global_timeseries_buffer: HashMap, influxdb_bucket: Option, influxdb_client: Option, + instance_hash: String, opt_in_timeseries_buffer: HashMap, rpc_secret_key_cache: RpcSecretKeyCache, timestamp_precision: TimestampPrecision, @@ -57,7 +58,6 @@ pub struct StatBuffer { user_balance_cache: UserBalanceCache, _flush_sender: mpsc::Sender>, - instance_hash: String, } impl StatBuffer { @@ -92,13 +92,14 @@ impl StatBuffer { global_timeseries_buffer: Default::default(), influxdb_bucket, influxdb_client, + instance_hash, opt_in_timeseries_buffer: Default::default(), rpc_secret_key_cache: rpc_secret_key_cache.unwrap(), timestamp_precision, tsdb_save_interval_seconds, user_balance_cache: user_balance_cache.unwrap(), + _flush_sender: flush_sender, - instance_hash, }; // any errors inside this task will cause the application to exit @@ -149,20 +150,8 @@ impl StatBuffer { x = flush_receiver.recv() => { match x { Some(x) => { - trace!("flush"); + let flushed_stats = self._flush(&mut stat_receiver).await?; - // fill the buffer - while let Ok(stat) = stat_receiver.try_recv() { - self._buffer_app_stat(stat).await?; - } - - // flush the buffers - let tsdb_count = self.save_tsdb_stats().await; - let relational_count = self.save_relational_stats().await; - - // notify - let flushed_stats = FlushedStats{ timeseries: tsdb_count, relational: relational_count}; - trace!(?flushed_stats); if let Err(err) = x.send(flushed_stats) { error!(?flushed_stats, ?err, "unable to notify about flushed stats"); } @@ -201,13 +190,7 @@ impl StatBuffer { // sleep(Duration::from_millis(10)).await; // } - let saved_relational = self.save_relational_stats().await; - - info!("saved {} pending relational stat(s)", saved_relational); - - let saved_tsdb = self.save_tsdb_stats().await; - - info!("saved {} pending tsdb stat(s)", saved_tsdb); + self._flush(&mut stat_receiver).await?; info!("accounting and stat save loop complete"); @@ -320,6 +303,32 @@ impl StatBuffer { Ok(()) } + async fn _flush( + &mut self, + stat_receiver: &mut mpsc::UnboundedReceiver, + ) -> Web3ProxyResult { + trace!("flush"); + + // fill the buffer + while let Ok(stat) = stat_receiver.try_recv() { + self._buffer_app_stat(stat).await?; + } + + // flush the buffers + let tsdb_count = self.save_tsdb_stats().await; + let relational_count = self.save_relational_stats().await; + + // notify + let flushed_stats = FlushedStats { + timeseries: tsdb_count, + relational: relational_count, + }; + + trace!(?flushed_stats); + + Ok(flushed_stats) + } + async fn save_relational_stats(&mut self) -> usize { let mut count = 0; diff --git a/web3_proxy/tests/test_multiple_proxy.rs b/web3_proxy/tests/test_multiple_proxy.rs index a628770a..1033fbee 100644 --- a/web3_proxy/tests/test_multiple_proxy.rs +++ b/web3_proxy/tests/test_multiple_proxy.rs @@ -131,7 +131,7 @@ async fn test_multiple_proxies_stats_add_up() { assert_eq!(flush_1_count_0.relational, 1); assert_eq!(flush_1_count_0.timeseries, 2); - // // no more stats should arrive + // no more stats should arrive let flush_0_count_1 = x_0.flush_stats().await.unwrap(); let flush_1_count_1 = x_1.flush_stats().await.unwrap(); info!("Counts 0 are: {:?}", flush_0_count_1);