From f2d35ba5eb203c047e9a53b92f653c592ec96bed Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sun, 5 Feb 2023 18:16:09 -0800 Subject: [PATCH] remove metered in favor of influxdb stats --- Cargo.lock | 118 ------------------ TODO.md | 3 +- web3_proxy/Cargo.toml | 1 - web3_proxy/examples/metrics.rs | 32 ----- web3_proxy/src/app/mod.rs | 19 +-- .../bin/web3_proxy_cli/popularity_contest.rs | 24 ++-- web3_proxy/src/config.rs | 3 - web3_proxy/src/lib.rs | 1 - web3_proxy/src/metered/jsonrpc_error_count.rs | 6 - .../src/metered/provider_error_count.rs | 6 - web3_proxy/src/rpcs/connection.rs | 44 ++----- web3_proxy/src/rpcs/connections.rs | 9 +- web3_proxy/src/rpcs/request.rs | 36 +----- 13 files changed, 28 insertions(+), 274 deletions(-) delete mode 100644 web3_proxy/examples/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 8ca5d318..4ed501f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -148,28 +148,6 @@ dependencies = [ "term", ] -[[package]] -name = "aspect" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3927b415bba088539aaaf872d19752c7d00101a25ead1d123fcd7633f9c224d" -dependencies = [ - "aspect-weave", -] - -[[package]] -name = "aspect-weave" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea4f672ac5290272725e1453014af99a86d2c1712808d647f469bf9427519f41" -dependencies = [ - "indexmap", - "proc-macro2", - "quote", - "syn", - "synattra", -] - [[package]] name = "async-io" version = "1.12.0" @@ -252,15 +230,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "atomic" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b88d82667eca772c4aa12f0f1348b3ae643424c8876448f3f7bd5787032e234c" -dependencies = [ - "autocfg", -] - [[package]] name = "atty" version = "0.2.14" @@ -272,40 +241,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "auto_enums" -version = "0.7.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe0dfe45d75158751e195799f47ea02e81f570aa24bc5ef999cdd9e888c4b5c3" -dependencies = [ - "auto_enums_core", - "auto_enums_derive", -] - -[[package]] -name = "auto_enums_core" -version = "0.7.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da47c46001293a2c4b744d731958be22cff408a2ab76e2279328f9713b1267b4" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "auto_enums_derive" -version = "0.7.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41aed1da83ecdc799503b7cb94da1b45a34d72b49caf40a61d9cf5b88ec07cfd" -dependencies = [ - "autocfg", - "derive_utils", - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "auto_impl" version = "0.5.0" @@ -1330,17 +1265,6 @@ dependencies = [ "syn", ] -[[package]] -name = "derive_utils" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "532b4c15dccee12c7044f1fcad956e98410860b22231e44a3b827464797ca7bf" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "dialoguer" version = "0.8.0" @@ -2903,36 +2827,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "metered" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17491527d2ceff20d00d02166bdd18e23056e7ced22b9a8bb0efdfd293f0441a" -dependencies = [ - "aspect", - "atomic", - "cfg-if", - "hdrhistogram", - "metered-macro", - "parking_lot 0.12.1", - "serde", -] - -[[package]] -name = "metered-macro" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5ef9d33baa693e2d449d069f6ef6eb549762ed0c0179976c45bd98f3aa4a4e1" -dependencies = [ - "aspect-weave", - "heck 0.4.0", - "indexmap", - "proc-macro2", - "quote", - "syn", - "synattra", -] - [[package]] name = "migration" version = "0.13.0" @@ -5082,17 +4976,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "synattra" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "378cd5695f9ef5a26668bb70e81a464e7de6144bac3f77f42d5fa596c690be63" -dependencies = [ - "auto_enums", - "proc-macro2", - "syn", -] - [[package]] name = "sync_wrapper" version = "0.1.1" @@ -5927,7 +5810,6 @@ dependencies = [ "ipnet", "itertools", "log", - "metered", "migration", "moka", "notify", diff --git a/TODO.md b/TODO.md index 53849742..58155f94 100644 --- a/TODO.md +++ b/TODO.md @@ -328,6 +328,8 @@ These are not yet ordered. There might be duplicates. We might not actually need - [x] cache /status for longer - [x] sort connections during eth_sendRawTransaction - [x] block all admin_ rpc commands +- [x] remove the "metered" crate now that we save aggregate queries? +- [x] add archive depth to app config - [-] proxy mode for benchmarking all backends - [-] proxy mode for sending to multiple backends - [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly @@ -379,7 +381,6 @@ These are not yet ordered. There might be duplicates. We might not actually need - [ ] cli commands to search users by key - [ ] cli flag to set prometheus port - [ ] flamegraphs show 25% of the time to be in moka-housekeeper. tune that -- [ ] remove the "metered" crate now that we save aggregate queries? - [ ] remove/change the "active_requests" counter? maybe only once we have dynamic soft limits? - [ ] refactor so configs can change while running - this will probably be a rather large change, but is necessary when we have autoscaling diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index a4c384dc..9e3d2769 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -48,7 +48,6 @@ http = "0.2.8" ipnet = "2.7.1" itertools = "0.10.5" log = "0.4.17" -metered = { version = "0.9.0", features = ["serialize"] } moka = { version = "0.9.6", default-features = false, features = ["future"] } notify = "5.1.0" num = "0.4.0" diff --git a/web3_proxy/examples/metrics.rs b/web3_proxy/examples/metrics.rs deleted file mode 100644 index 59667b18..00000000 --- a/web3_proxy/examples/metrics.rs +++ /dev/null @@ -1,32 +0,0 @@ -use metered::{metered, HitCount, Throughput}; -use serde::Serialize; -use thread_fast_rng::{rand::Rng, thread_fast_rng}; - -#[derive(Default, Debug, Serialize)] -pub struct Biz { - metrics: BizMetrics, -} - -#[metered(registry = BizMetrics)] -impl Biz { - #[measure([HitCount, Throughput])] - pub fn biz(&self) { - let delay = std::time::Duration::from_millis(thread_fast_rng().gen::() % 200); - std::thread::sleep(delay); - } -} - -fn main() { - let buz = Biz::default(); - - for _ in 0..100 { - buz.biz(); - } - - let mut globals = std::collections::HashMap::new(); - globals.insert("service", "web3_proxy_prometheus_example"); - - let serialized = serde_prometheus::to_string(&buz.metrics, Some("example"), globals).unwrap(); - - println!("{}", serialized); -} diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index ce56e927..5553cdb6 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -13,7 +13,6 @@ use crate::jsonrpc::{ use crate::rpcs::blockchain::{ArcBlock, SavedBlock}; use crate::rpcs::connection::Web3Connection; use crate::rpcs::connections::Web3Connections; -use crate::rpcs::request::OpenRequestHandleMetrics; use crate::rpcs::transactions::TxStatus; use crate::user_token::UserBearerToken; use anyhow::Context; @@ -210,9 +209,6 @@ pub struct Web3ProxyApp { pub config: AppConfig, pub db_conn: Option, pub db_replica: Option, - /// prometheus metrics - // app_metrics: Arc, - open_request_handle_metrics: Arc, /// store pending transactions that we've seen so that we don't send duplicates to subscribers pub pending_transactions: Cache, pub frontend_ip_rate_limiter: Option>, @@ -393,10 +389,6 @@ impl Web3ProxyApp { ); } - // setup metrics - // let app_metrics = Default::default(); - let open_request_handle_metrics: Arc = Default::default(); - let mut db_conn = None::; let mut db_replica = None::; @@ -592,7 +584,6 @@ impl Web3ProxyApp { top_config.app.min_synced_rpcs, Some(pending_tx_sender.clone()), pending_transactions.clone(), - open_request_handle_metrics.clone(), ) .await .context("spawning balanced rpcs")?; @@ -623,7 +614,6 @@ impl Web3ProxyApp { // TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits None, pending_transactions.clone(), - open_request_handle_metrics.clone(), ) .await .context("spawning private_rpcs")?; @@ -734,8 +724,6 @@ impl Web3ProxyApp { db_conn, db_replica, vredis_pool, - // app_metrics, - open_request_handle_metrics, rpc_secret_key_cache, bearer_token_semaphores, ip_semaphores, @@ -909,9 +897,7 @@ impl Web3ProxyApp { // "user_cache_size": app.rpc_secret_key_cache.weighted_size(), #[derive(Serialize)] - struct CombinedMetrics<'a> { - // app: &'a Web3ProxyAppMetrics, - backend_rpc: &'a OpenRequestHandleMetrics, + struct CombinedMetrics { recent_ip_counts: RecentCounts, recent_user_id_counts: RecentCounts, recent_tx_counts: RecentCounts, @@ -919,8 +905,6 @@ impl Web3ProxyApp { } let metrics = CombinedMetrics { - // app: &self.app_metrics, - backend_rpc: &self.open_request_handle_metrics, recent_ip_counts, recent_user_id_counts, recent_tx_counts, @@ -1395,6 +1379,7 @@ impl Web3ProxyApp { .context("parsing params 0 into str then bytes")?, ) .map_err(|x| { + trace!("bad request: {:?}", x); FrontendErrorResponse::BadRequest( "param 0 could not be read as H256".to_string(), ) diff --git a/web3_proxy/src/bin/web3_proxy_cli/popularity_contest.rs b/web3_proxy/src/bin/web3_proxy_cli/popularity_contest.rs index b8b0e565..455eb659 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/popularity_contest.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/popularity_contest.rs @@ -18,9 +18,9 @@ pub struct PopularityContestSubCommand { #[derive(Debug)] struct BackendRpcData<'a> { name: &'a str, - tier: u64, - backup: bool, - block_data_limit: u64, + // tier: u64, + // backup: bool, + // block_data_limit: u64, requests: u64, } @@ -62,21 +62,21 @@ impl PopularityContestSubCommand { let tier = conn.get("tier").unwrap().as_u64().unwrap(); - let backup = conn.get("backup").unwrap().as_bool().unwrap(); + // let backup = conn.get("backup").unwrap().as_bool().unwrap(); - let block_data_limit = conn - .get("block_data_limit") - .unwrap() - .as_u64() - .unwrap_or(u64::MAX); + // let block_data_limit = conn + // .get("block_data_limit") + // .unwrap() + // .as_u64() + // .unwrap_or(u64::MAX); let requests = conn.get("total_requests").unwrap().as_u64().unwrap(); let rpc_data = BackendRpcData { name, - tier, - backup, - block_data_limit, + // tier, + // backup, + // block_data_limit, requests, }; diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index af5eef9d..a5693c8c 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -1,6 +1,5 @@ use crate::rpcs::blockchain::BlockHashesCache; use crate::rpcs::connection::Web3Connection; -use crate::rpcs::request::OpenRequestHandleMetrics; use crate::{app::AnyhowJoinHandle, rpcs::blockchain::ArcBlock}; use argh::FromArgs; use ethers::prelude::TxHash; @@ -246,7 +245,6 @@ impl Web3ConnectionConfig { block_map: BlockHashesCache, block_sender: Option>, tx_id_sender: Option>, - open_request_handle_metrics: Arc, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { if !self.extra.is_empty() { warn!( @@ -291,7 +289,6 @@ impl Web3ConnectionConfig { tx_id_sender, true, self.tier, - open_request_handle_metrics, ) .await } diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index 571e245f..dcf6a8c1 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -4,7 +4,6 @@ pub mod block_number; pub mod config; pub mod frontend; pub mod jsonrpc; -pub mod metered; pub mod metrics_frontend; pub mod pagerduty; pub mod rpcs; diff --git a/web3_proxy/src/metered/jsonrpc_error_count.rs b/web3_proxy/src/metered/jsonrpc_error_count.rs index 424e8b7c..eb8ed33f 100644 --- a/web3_proxy/src/metered/jsonrpc_error_count.rs +++ b/web3_proxy/src/metered/jsonrpc_error_count.rs @@ -1,12 +1,6 @@ //! A module providing the `JsonRpcErrorCount` metric. use ethers::providers::ProviderError; -use metered::metric::{Advice, Enter, OnResult}; -use metered::{ - atomic::AtomicInt, - clear::Clear, - metric::{Counter, Metric}, -}; use serde::Serialize; use std::ops::Deref; diff --git a/web3_proxy/src/metered/provider_error_count.rs b/web3_proxy/src/metered/provider_error_count.rs index 5670e3ba..9025c463 100644 --- a/web3_proxy/src/metered/provider_error_count.rs +++ b/web3_proxy/src/metered/provider_error_count.rs @@ -1,12 +1,6 @@ //! A module providing the `JsonRpcErrorCount` metric. use ethers::providers::ProviderError; -use metered::metric::{Advice, Enter, OnResult}; -use metered::{ - atomic::AtomicInt, - clear::Clear, - metric::{Counter, Metric}, -}; use serde::Serialize; use std::ops::Deref; diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 838023fa..1b34ba4b 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -1,7 +1,7 @@ ///! Rate-limited communication with a web3 provider. use super::blockchain::{ArcBlock, BlockHashesCache, SavedBlock}; use super::provider::Web3Provider; -use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult}; +use super::request::{OpenRequestHandle, OpenRequestResult}; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::BlockAndRpc; use crate::frontend::authorization::Authorization; @@ -25,7 +25,7 @@ use std::{cmp::Ordering, sync::Arc}; use thread_fast_rng::rand::Rng; use thread_fast_rng::thread_fast_rng; use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock}; -use tokio::time::{interval, sleep, sleep_until, timeout, Duration, Instant, MissedTickBehavior}; +use tokio::time::{sleep, sleep_until, timeout, Duration, Instant}; // TODO: maybe provider state should have the block data limit in it. but it is inside an async lock and we can't Serialize then #[derive(Clone, Debug)] @@ -98,9 +98,8 @@ pub struct Web3Connection { pub(super) block_data_limit: AtomicU64, /// Lower tiers are higher priority when sending requests pub(super) tier: u64, - /// TODO: should this be an AsyncRwLock? + /// TODO: change this to a watch channel so that http providers can subscribe and take action on change pub(super) head_block: RwLock>, - pub(super) open_request_handle_metrics: Arc, } impl Web3Connection { @@ -127,7 +126,6 @@ impl Web3Connection { tx_id_sender: Option)>>, reconnect: bool, tier: u64, - open_request_handle_metrics: Arc, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let hard_limit = hard_limit.map(|(hard_rate_limit, redis_pool)| { // TODO: is cache size 1 okay? i think we need @@ -173,7 +171,6 @@ impl Web3Connection { block_data_limit, head_block: RwLock::new(Default::default()), tier, - open_request_handle_metrics, }; let new_connection = Arc::new(new_connection); @@ -892,34 +889,14 @@ impl Web3Connection { .clone() { trace!("watching pending transactions on {}", self); + // TODO: does this keep the lock open for too long? match provider.as_ref() { Web3Provider::Mock => unimplemented!(), Web3Provider::Http(provider) => { // there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints - // TODO: what should this interval be? probably automatically set to some fraction of block time - // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now - // TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though - let mut interval = interval(Duration::from_secs(60)); - interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - - loop { - // TODO: actually do something here - /* - match self.try_request_handle().await { - Ok(active_request_handle) => { - // TODO: check the filter - todo!("actually send a request"); - } - Err(e) => { - warn!("Failed getting latest block from {}: {:?}", self, e); - } - } - */ - - // wait for the interval - // TODO: if error or rate limit, increase interval? - interval.tick().await; - } + // TODO: maybe subscribe to self.head_block? + // TODO: this keeps a read lock guard open on provider_state forever. is that okay for an http client? + futures::future::pending::<()>().await; } Web3Provider::Ws(provider) => { // TODO: maybe the subscribe_pending_txs function should be on the active_request_handle @@ -1216,8 +1193,6 @@ mod tests { let head_block = SavedBlock::new(random_block); let block_data_limit = u64::MAX; - let metrics = OpenRequestHandleMetrics::default(); - let x = Web3Connection { name: "name".to_string(), db_conn: None, @@ -1236,7 +1211,6 @@ mod tests { block_data_limit: block_data_limit.into(), tier: 0, head_block: RwLock::new(Some(head_block.clone())), - open_request_handle_metrics: Arc::new(metrics), }; assert!(x.has_block_data(&0.into())); @@ -1264,8 +1238,6 @@ mod tests { let block_data_limit = 64; - let metrics = OpenRequestHandleMetrics::default(); - // TODO: this is getting long. have a `impl Default` let x = Web3Connection { name: "name".to_string(), @@ -1285,7 +1257,6 @@ mod tests { block_data_limit: block_data_limit.into(), tier: 0, head_block: RwLock::new(Some(head_block.clone())), - open_request_handle_metrics: Arc::new(metrics), }; assert!(!x.has_block_data(&0.into())); @@ -1339,7 +1310,6 @@ mod tests { block_data_limit: block_data_limit.into(), tier: 0, head_block: RwLock::new(Some(head_block.clone())), - open_request_handle_metrics: Arc::new(metrics), }; assert!(!x.has_block_data(&0.into())); diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 5c536c27..b45c58e2 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -2,7 +2,7 @@ use super::blockchain::{ArcBlock, BlockHashesCache}; use super::connection::Web3Connection; use super::request::{ - OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult, RequestRevertHandler, + OpenRequestHandle, OpenRequestResult, RequestRevertHandler, }; use super::synced_connections::ConsensusConnections; use crate::app::{flatten_handle, AnyhowJoinHandle}; @@ -69,7 +69,6 @@ impl Web3Connections { min_head_rpcs: usize, pending_tx_sender: Option>, pending_transactions: Cache, - open_request_handle_metrics: Arc, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (block_sender, block_receiver) = flume::unbounded::(); @@ -149,7 +148,6 @@ impl Web3Connections { let pending_tx_id_sender = Some(pending_tx_id_sender.clone()); let block_map = block_map.clone(); - let open_request_handle_metrics = open_request_handle_metrics.clone(); let handle = tokio::spawn(async move { server_config @@ -163,7 +161,6 @@ impl Web3Connections { block_map, block_sender, pending_tx_id_sender, - open_request_handle_metrics, ) .await }); @@ -1315,7 +1312,6 @@ mod tests { block_data_limit: block_data_limit.into(), tier: 0, head_block: RwLock::new(Some(head_block.clone())), - open_request_handle_metrics: Arc::new(Default::default()), }; let lagged_rpc = Web3Connection { @@ -1338,7 +1334,6 @@ mod tests { block_data_limit: block_data_limit.into(), tier: 0, head_block: RwLock::new(Some(lagged_block.clone())), - open_request_handle_metrics: Arc::new(Default::default()), }; assert!(head_rpc.has_block_data(&lagged_block.number())); @@ -1548,7 +1543,6 @@ mod tests { block_data_limit: 64.into(), tier: 1, head_block: RwLock::new(Some(head_block.clone())), - open_request_handle_metrics: Arc::new(Default::default()), }; let archive_rpc = Web3Connection { @@ -1571,7 +1565,6 @@ mod tests { block_data_limit: u64::MAX.into(), tier: 2, head_block: RwLock::new(Some(head_block.clone())), - open_request_handle_metrics: Arc::new(Default::default()), }; assert!(pruned_rpc.has_block_data(&head_block.number())); diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index d7f2aaf9..a586f90b 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -1,7 +1,6 @@ use super::connection::Web3Connection; use super::provider::Web3Provider; use crate::frontend::authorization::{Authorization, AuthorizationType}; -use crate::metered::{JsonRpcErrorCount, ProviderErrorCount}; use anyhow::Context; use chrono::Utc; use entities::revert_log; @@ -9,14 +8,10 @@ use entities::sea_orm_active_enums::Method; use ethers::providers::{HttpClientError, ProviderError, WsClientError}; use ethers::types::{Address, Bytes}; use log::{debug, error, trace, warn, Level}; -use metered::metered; -use metered::HitCount; -use metered::ResponseTime; -use metered::Throughput; use migration::sea_orm::{self, ActiveEnum, ActiveModelTrait}; use serde_json::json; use std::fmt; -use std::sync::atomic::{self, AtomicBool, Ordering}; +use std::sync::atomic; use std::sync::Arc; use thread_fast_rng::rand::Rng; use tokio::time::{sleep, Duration, Instant}; @@ -36,10 +31,7 @@ pub enum OpenRequestResult { pub struct OpenRequestHandle { authorization: Arc, conn: Arc, - // TODO: this is the same metrics on the conn. use a reference? - metrics: Arc, provider: Arc, - used: AtomicBool, } /// Depending on the context, RPC errors can require different handling. @@ -129,14 +121,11 @@ impl Authorization { } } -#[metered(registry = OpenRequestHandleMetrics, visibility = pub)] impl OpenRequestHandle { pub async fn new(authorization: Arc, conn: Arc) -> Self { // TODO: take request_id as an argument? // TODO: attach a unique id to this? customer requests have one, but not internal queries // TODO: what ordering?! - // TODO: should we be using metered, or not? i think not because we want stats for each handle - // TODO: these should maybe be sent to an influxdb instance? conn.active_requests.fetch_add(1, atomic::Ordering::Relaxed); let mut provider = None; @@ -184,15 +173,10 @@ impl OpenRequestHandle { } } - let metrics = conn.open_request_handle_metrics.clone(); - let used = false.into(); - Self { authorization, conn, - metrics, provider, - used, } } @@ -207,11 +191,8 @@ impl OpenRequestHandle { /// Send a web3 request /// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented - /// TODO: we no longer take self because metered doesn't like that - /// TODO: ErrorCount includes too many types of errors, such as transaction reverts - #[measure([JsonRpcErrorCount, HitCount, ProviderErrorCount, ResponseTime, Throughput])] pub async fn request( - &self, + self, method: &str, params: &P, revert_handler: RequestRevertHandler, @@ -221,20 +202,11 @@ impl OpenRequestHandle { P: Clone + fmt::Debug + serde::Serialize + Send + Sync + 'static, R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug, { - // ensure this function only runs once - if self.used.swap(true, Ordering::Release) { - unimplemented!("a request handle should only be used once"); - } - // TODO: use tracing spans - // TODO: requests from customers have request ids, but we should add - // TODO: including params in this is way too verbose - // the authorization field is already on a parent span + // TODO: including params in this log is way too verbose // trace!(rpc=%self.conn, %method, "request"); - // trace!("got provider for {:?}", self); - - // TODO: really sucks that we have to clone here + // TODO: replace ethers-rs providers with our own that supports streaming the responses let response = match &*self.provider { Web3Provider::Mock => unimplemented!(), Web3Provider::Http(provider) => provider.request(method, params).await,