remove metered in favor of influxdb stats

This commit is contained in:
Bryan Stitt 2023-02-05 18:16:09 -08:00
parent 6d959e2c1f
commit f2d35ba5eb
13 changed files with 28 additions and 274 deletions

118
Cargo.lock generated
View File

@ -148,28 +148,6 @@ dependencies = [
"term",
]
[[package]]
name = "aspect"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3927b415bba088539aaaf872d19752c7d00101a25ead1d123fcd7633f9c224d"
dependencies = [
"aspect-weave",
]
[[package]]
name = "aspect-weave"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea4f672ac5290272725e1453014af99a86d2c1712808d647f469bf9427519f41"
dependencies = [
"indexmap",
"proc-macro2",
"quote",
"syn",
"synattra",
]
[[package]]
name = "async-io"
version = "1.12.0"
@ -252,15 +230,6 @@ dependencies = [
"num-traits",
]
[[package]]
name = "atomic"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b88d82667eca772c4aa12f0f1348b3ae643424c8876448f3f7bd5787032e234c"
dependencies = [
"autocfg",
]
[[package]]
name = "atty"
version = "0.2.14"
@ -272,40 +241,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "auto_enums"
version = "0.7.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0dfe45d75158751e195799f47ea02e81f570aa24bc5ef999cdd9e888c4b5c3"
dependencies = [
"auto_enums_core",
"auto_enums_derive",
]
[[package]]
name = "auto_enums_core"
version = "0.7.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da47c46001293a2c4b744d731958be22cff408a2ab76e2279328f9713b1267b4"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "auto_enums_derive"
version = "0.7.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41aed1da83ecdc799503b7cb94da1b45a34d72b49caf40a61d9cf5b88ec07cfd"
dependencies = [
"autocfg",
"derive_utils",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "auto_impl"
version = "0.5.0"
@ -1330,17 +1265,6 @@ dependencies = [
"syn",
]
[[package]]
name = "derive_utils"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "532b4c15dccee12c7044f1fcad956e98410860b22231e44a3b827464797ca7bf"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "dialoguer"
version = "0.8.0"
@ -2903,36 +2827,6 @@ dependencies = [
"autocfg",
]
[[package]]
name = "metered"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17491527d2ceff20d00d02166bdd18e23056e7ced22b9a8bb0efdfd293f0441a"
dependencies = [
"aspect",
"atomic",
"cfg-if",
"hdrhistogram",
"metered-macro",
"parking_lot 0.12.1",
"serde",
]
[[package]]
name = "metered-macro"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5ef9d33baa693e2d449d069f6ef6eb549762ed0c0179976c45bd98f3aa4a4e1"
dependencies = [
"aspect-weave",
"heck 0.4.0",
"indexmap",
"proc-macro2",
"quote",
"syn",
"synattra",
]
[[package]]
name = "migration"
version = "0.13.0"
@ -5082,17 +4976,6 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "synattra"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "378cd5695f9ef5a26668bb70e81a464e7de6144bac3f77f42d5fa596c690be63"
dependencies = [
"auto_enums",
"proc-macro2",
"syn",
]
[[package]]
name = "sync_wrapper"
version = "0.1.1"
@ -5927,7 +5810,6 @@ dependencies = [
"ipnet",
"itertools",
"log",
"metered",
"migration",
"moka",
"notify",

View File

@ -328,6 +328,8 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [x] cache /status for longer
- [x] sort connections during eth_sendRawTransaction
- [x] block all admin_ rpc commands
- [x] remove the "metered" crate now that we save aggregate queries?
- [x] add archive depth to app config
- [-] proxy mode for benchmarking all backends
- [-] proxy mode for sending to multiple backends
- [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly
@ -379,7 +381,6 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [ ] cli commands to search users by key
- [ ] cli flag to set prometheus port
- [ ] flamegraphs show 25% of the time to be in moka-housekeeper. tune that
- [ ] remove the "metered" crate now that we save aggregate queries?
- [ ] remove/change the "active_requests" counter? maybe only once we have dynamic soft limits?
- [ ] refactor so configs can change while running
- this will probably be a rather large change, but is necessary when we have autoscaling

View File

@ -48,7 +48,6 @@ http = "0.2.8"
ipnet = "2.7.1"
itertools = "0.10.5"
log = "0.4.17"
metered = { version = "0.9.0", features = ["serialize"] }
moka = { version = "0.9.6", default-features = false, features = ["future"] }
notify = "5.1.0"
num = "0.4.0"

View File

@ -1,32 +0,0 @@
use metered::{metered, HitCount, Throughput};
use serde::Serialize;
use thread_fast_rng::{rand::Rng, thread_fast_rng};
#[derive(Default, Debug, Serialize)]
pub struct Biz {
metrics: BizMetrics,
}
#[metered(registry = BizMetrics)]
impl Biz {
#[measure([HitCount, Throughput])]
pub fn biz(&self) {
let delay = std::time::Duration::from_millis(thread_fast_rng().gen::<u64>() % 200);
std::thread::sleep(delay);
}
}
fn main() {
let buz = Biz::default();
for _ in 0..100 {
buz.biz();
}
let mut globals = std::collections::HashMap::new();
globals.insert("service", "web3_proxy_prometheus_example");
let serialized = serde_prometheus::to_string(&buz.metrics, Some("example"), globals).unwrap();
println!("{}", serialized);
}

View File

@ -13,7 +13,6 @@ use crate::jsonrpc::{
use crate::rpcs::blockchain::{ArcBlock, SavedBlock};
use crate::rpcs::connection::Web3Connection;
use crate::rpcs::connections::Web3Connections;
use crate::rpcs::request::OpenRequestHandleMetrics;
use crate::rpcs::transactions::TxStatus;
use crate::user_token::UserBearerToken;
use anyhow::Context;
@ -210,9 +209,6 @@ pub struct Web3ProxyApp {
pub config: AppConfig,
pub db_conn: Option<sea_orm::DatabaseConnection>,
pub db_replica: Option<DatabaseReplica>,
/// prometheus metrics
// app_metrics: Arc<Web3ProxyAppMetrics>,
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
pub pending_transactions: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>,
@ -393,10 +389,6 @@ impl Web3ProxyApp {
);
}
// setup metrics
// let app_metrics = Default::default();
let open_request_handle_metrics: Arc<OpenRequestHandleMetrics> = Default::default();
let mut db_conn = None::<DatabaseConnection>;
let mut db_replica = None::<DatabaseReplica>;
@ -592,7 +584,6 @@ impl Web3ProxyApp {
top_config.app.min_synced_rpcs,
Some(pending_tx_sender.clone()),
pending_transactions.clone(),
open_request_handle_metrics.clone(),
)
.await
.context("spawning balanced rpcs")?;
@ -623,7 +614,6 @@ impl Web3ProxyApp {
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits
None,
pending_transactions.clone(),
open_request_handle_metrics.clone(),
)
.await
.context("spawning private_rpcs")?;
@ -734,8 +724,6 @@ impl Web3ProxyApp {
db_conn,
db_replica,
vredis_pool,
// app_metrics,
open_request_handle_metrics,
rpc_secret_key_cache,
bearer_token_semaphores,
ip_semaphores,
@ -909,9 +897,7 @@ impl Web3ProxyApp {
// "user_cache_size": app.rpc_secret_key_cache.weighted_size(),
#[derive(Serialize)]
struct CombinedMetrics<'a> {
// app: &'a Web3ProxyAppMetrics,
backend_rpc: &'a OpenRequestHandleMetrics,
struct CombinedMetrics {
recent_ip_counts: RecentCounts,
recent_user_id_counts: RecentCounts,
recent_tx_counts: RecentCounts,
@ -919,8 +905,6 @@ impl Web3ProxyApp {
}
let metrics = CombinedMetrics {
// app: &self.app_metrics,
backend_rpc: &self.open_request_handle_metrics,
recent_ip_counts,
recent_user_id_counts,
recent_tx_counts,
@ -1395,6 +1379,7 @@ impl Web3ProxyApp {
.context("parsing params 0 into str then bytes")?,
)
.map_err(|x| {
trace!("bad request: {:?}", x);
FrontendErrorResponse::BadRequest(
"param 0 could not be read as H256".to_string(),
)

View File

@ -18,9 +18,9 @@ pub struct PopularityContestSubCommand {
#[derive(Debug)]
struct BackendRpcData<'a> {
name: &'a str,
tier: u64,
backup: bool,
block_data_limit: u64,
// tier: u64,
// backup: bool,
// block_data_limit: u64,
requests: u64,
}
@ -62,21 +62,21 @@ impl PopularityContestSubCommand {
let tier = conn.get("tier").unwrap().as_u64().unwrap();
let backup = conn.get("backup").unwrap().as_bool().unwrap();
// let backup = conn.get("backup").unwrap().as_bool().unwrap();
let block_data_limit = conn
.get("block_data_limit")
.unwrap()
.as_u64()
.unwrap_or(u64::MAX);
// let block_data_limit = conn
// .get("block_data_limit")
// .unwrap()
// .as_u64()
// .unwrap_or(u64::MAX);
let requests = conn.get("total_requests").unwrap().as_u64().unwrap();
let rpc_data = BackendRpcData {
name,
tier,
backup,
block_data_limit,
// tier,
// backup,
// block_data_limit,
requests,
};

View File

@ -1,6 +1,5 @@
use crate::rpcs::blockchain::BlockHashesCache;
use crate::rpcs::connection::Web3Connection;
use crate::rpcs::request::OpenRequestHandleMetrics;
use crate::{app::AnyhowJoinHandle, rpcs::blockchain::ArcBlock};
use argh::FromArgs;
use ethers::prelude::TxHash;
@ -246,7 +245,6 @@ impl Web3ConnectionConfig {
block_map: BlockHashesCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<TxHashAndRpc>>,
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
if !self.extra.is_empty() {
warn!(
@ -291,7 +289,6 @@ impl Web3ConnectionConfig {
tx_id_sender,
true,
self.tier,
open_request_handle_metrics,
)
.await
}

View File

@ -4,7 +4,6 @@ pub mod block_number;
pub mod config;
pub mod frontend;
pub mod jsonrpc;
pub mod metered;
pub mod metrics_frontend;
pub mod pagerduty;
pub mod rpcs;

View File

@ -1,12 +1,6 @@
//! A module providing the `JsonRpcErrorCount` metric.
use ethers::providers::ProviderError;
use metered::metric::{Advice, Enter, OnResult};
use metered::{
atomic::AtomicInt,
clear::Clear,
metric::{Counter, Metric},
};
use serde::Serialize;
use std::ops::Deref;

View File

@ -1,12 +1,6 @@
//! A module providing the `JsonRpcErrorCount` metric.
use ethers::providers::ProviderError;
use metered::metric::{Advice, Enter, OnResult};
use metered::{
atomic::AtomicInt,
clear::Clear,
metric::{Counter, Metric},
};
use serde::Serialize;
use std::ops::Deref;

View File

@ -1,7 +1,7 @@
///! Rate-limited communication with a web3 provider.
use super::blockchain::{ArcBlock, BlockHashesCache, SavedBlock};
use super::provider::Web3Provider;
use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult};
use super::request::{OpenRequestHandle, OpenRequestResult};
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::BlockAndRpc;
use crate::frontend::authorization::Authorization;
@ -25,7 +25,7 @@ use std::{cmp::Ordering, sync::Arc};
use thread_fast_rng::rand::Rng;
use thread_fast_rng::thread_fast_rng;
use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock};
use tokio::time::{interval, sleep, sleep_until, timeout, Duration, Instant, MissedTickBehavior};
use tokio::time::{sleep, sleep_until, timeout, Duration, Instant};
// TODO: maybe provider state should have the block data limit in it. but it is inside an async lock and we can't Serialize then
#[derive(Clone, Debug)]
@ -98,9 +98,8 @@ pub struct Web3Connection {
pub(super) block_data_limit: AtomicU64,
/// Lower tiers are higher priority when sending requests
pub(super) tier: u64,
/// TODO: should this be an AsyncRwLock?
/// TODO: change this to a watch channel so that http providers can subscribe and take action on change
pub(super) head_block: RwLock<Option<SavedBlock>>,
pub(super) open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
}
impl Web3Connection {
@ -127,7 +126,6 @@ impl Web3Connection {
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool,
tier: u64,
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
let hard_limit = hard_limit.map(|(hard_rate_limit, redis_pool)| {
// TODO: is cache size 1 okay? i think we need
@ -173,7 +171,6 @@ impl Web3Connection {
block_data_limit,
head_block: RwLock::new(Default::default()),
tier,
open_request_handle_metrics,
};
let new_connection = Arc::new(new_connection);
@ -892,34 +889,14 @@ impl Web3Connection {
.clone()
{
trace!("watching pending transactions on {}", self);
// TODO: does this keep the lock open for too long?
match provider.as_ref() {
Web3Provider::Mock => unimplemented!(),
Web3Provider::Http(provider) => {
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: what should this interval be? probably automatically set to some fraction of block time
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
let mut interval = interval(Duration::from_secs(60));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
// TODO: actually do something here
/*
match self.try_request_handle().await {
Ok(active_request_handle) => {
// TODO: check the filter
todo!("actually send a request");
}
Err(e) => {
warn!("Failed getting latest block from {}: {:?}", self, e);
}
}
*/
// wait for the interval
// TODO: if error or rate limit, increase interval?
interval.tick().await;
}
// TODO: maybe subscribe to self.head_block?
// TODO: this keeps a read lock guard open on provider_state forever. is that okay for an http client?
futures::future::pending::<()>().await;
}
Web3Provider::Ws(provider) => {
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
@ -1216,8 +1193,6 @@ mod tests {
let head_block = SavedBlock::new(random_block);
let block_data_limit = u64::MAX;
let metrics = OpenRequestHandleMetrics::default();
let x = Web3Connection {
name: "name".to_string(),
db_conn: None,
@ -1236,7 +1211,6 @@ mod tests {
block_data_limit: block_data_limit.into(),
tier: 0,
head_block: RwLock::new(Some(head_block.clone())),
open_request_handle_metrics: Arc::new(metrics),
};
assert!(x.has_block_data(&0.into()));
@ -1264,8 +1238,6 @@ mod tests {
let block_data_limit = 64;
let metrics = OpenRequestHandleMetrics::default();
// TODO: this is getting long. have a `impl Default`
let x = Web3Connection {
name: "name".to_string(),
@ -1285,7 +1257,6 @@ mod tests {
block_data_limit: block_data_limit.into(),
tier: 0,
head_block: RwLock::new(Some(head_block.clone())),
open_request_handle_metrics: Arc::new(metrics),
};
assert!(!x.has_block_data(&0.into()));
@ -1339,7 +1310,6 @@ mod tests {
block_data_limit: block_data_limit.into(),
tier: 0,
head_block: RwLock::new(Some(head_block.clone())),
open_request_handle_metrics: Arc::new(metrics),
};
assert!(!x.has_block_data(&0.into()));

View File

@ -2,7 +2,7 @@
use super::blockchain::{ArcBlock, BlockHashesCache};
use super::connection::Web3Connection;
use super::request::{
OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult, RequestRevertHandler,
OpenRequestHandle, OpenRequestResult, RequestRevertHandler,
};
use super::synced_connections::ConsensusConnections;
use crate::app::{flatten_handle, AnyhowJoinHandle};
@ -69,7 +69,6 @@ impl Web3Connections {
min_head_rpcs: usize,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
pending_transactions: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) = flume::unbounded::<BlockAndRpc>();
@ -149,7 +148,6 @@ impl Web3Connections {
let pending_tx_id_sender = Some(pending_tx_id_sender.clone());
let block_map = block_map.clone();
let open_request_handle_metrics = open_request_handle_metrics.clone();
let handle = tokio::spawn(async move {
server_config
@ -163,7 +161,6 @@ impl Web3Connections {
block_map,
block_sender,
pending_tx_id_sender,
open_request_handle_metrics,
)
.await
});
@ -1315,7 +1312,6 @@ mod tests {
block_data_limit: block_data_limit.into(),
tier: 0,
head_block: RwLock::new(Some(head_block.clone())),
open_request_handle_metrics: Arc::new(Default::default()),
};
let lagged_rpc = Web3Connection {
@ -1338,7 +1334,6 @@ mod tests {
block_data_limit: block_data_limit.into(),
tier: 0,
head_block: RwLock::new(Some(lagged_block.clone())),
open_request_handle_metrics: Arc::new(Default::default()),
};
assert!(head_rpc.has_block_data(&lagged_block.number()));
@ -1548,7 +1543,6 @@ mod tests {
block_data_limit: 64.into(),
tier: 1,
head_block: RwLock::new(Some(head_block.clone())),
open_request_handle_metrics: Arc::new(Default::default()),
};
let archive_rpc = Web3Connection {
@ -1571,7 +1565,6 @@ mod tests {
block_data_limit: u64::MAX.into(),
tier: 2,
head_block: RwLock::new(Some(head_block.clone())),
open_request_handle_metrics: Arc::new(Default::default()),
};
assert!(pruned_rpc.has_block_data(&head_block.number()));

View File

@ -1,7 +1,6 @@
use super::connection::Web3Connection;
use super::provider::Web3Provider;
use crate::frontend::authorization::{Authorization, AuthorizationType};
use crate::metered::{JsonRpcErrorCount, ProviderErrorCount};
use anyhow::Context;
use chrono::Utc;
use entities::revert_log;
@ -9,14 +8,10 @@ use entities::sea_orm_active_enums::Method;
use ethers::providers::{HttpClientError, ProviderError, WsClientError};
use ethers::types::{Address, Bytes};
use log::{debug, error, trace, warn, Level};
use metered::metered;
use metered::HitCount;
use metered::ResponseTime;
use metered::Throughput;
use migration::sea_orm::{self, ActiveEnum, ActiveModelTrait};
use serde_json::json;
use std::fmt;
use std::sync::atomic::{self, AtomicBool, Ordering};
use std::sync::atomic;
use std::sync::Arc;
use thread_fast_rng::rand::Rng;
use tokio::time::{sleep, Duration, Instant};
@ -36,10 +31,7 @@ pub enum OpenRequestResult {
pub struct OpenRequestHandle {
authorization: Arc<Authorization>,
conn: Arc<Web3Connection>,
// TODO: this is the same metrics on the conn. use a reference?
metrics: Arc<OpenRequestHandleMetrics>,
provider: Arc<Web3Provider>,
used: AtomicBool,
}
/// Depending on the context, RPC errors can require different handling.
@ -129,14 +121,11 @@ impl Authorization {
}
}
#[metered(registry = OpenRequestHandleMetrics, visibility = pub)]
impl OpenRequestHandle {
pub async fn new(authorization: Arc<Authorization>, conn: Arc<Web3Connection>) -> Self {
// TODO: take request_id as an argument?
// TODO: attach a unique id to this? customer requests have one, but not internal queries
// TODO: what ordering?!
// TODO: should we be using metered, or not? i think not because we want stats for each handle
// TODO: these should maybe be sent to an influxdb instance?
conn.active_requests.fetch_add(1, atomic::Ordering::Relaxed);
let mut provider = None;
@ -184,15 +173,10 @@ impl OpenRequestHandle {
}
}
let metrics = conn.open_request_handle_metrics.clone();
let used = false.into();
Self {
authorization,
conn,
metrics,
provider,
used,
}
}
@ -207,11 +191,8 @@ impl OpenRequestHandle {
/// Send a web3 request
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
/// TODO: we no longer take self because metered doesn't like that
/// TODO: ErrorCount includes too many types of errors, such as transaction reverts
#[measure([JsonRpcErrorCount, HitCount, ProviderErrorCount, ResponseTime, Throughput])]
pub async fn request<P, R>(
&self,
self,
method: &str,
params: &P,
revert_handler: RequestRevertHandler,
@ -221,20 +202,11 @@ impl OpenRequestHandle {
P: Clone + fmt::Debug + serde::Serialize + Send + Sync + 'static,
R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug,
{
// ensure this function only runs once
if self.used.swap(true, Ordering::Release) {
unimplemented!("a request handle should only be used once");
}
// TODO: use tracing spans
// TODO: requests from customers have request ids, but we should add
// TODO: including params in this is way too verbose
// the authorization field is already on a parent span
// TODO: including params in this log is way too verbose
// trace!(rpc=%self.conn, %method, "request");
// trace!("got provider for {:?}", self);
// TODO: really sucks that we have to clone here
// TODO: replace ethers-rs providers with our own that supports streaming the responses
let response = match &*self.provider {
Web3Provider::Mock => unimplemented!(),
Web3Provider::Http(provider) => provider.request(method, params).await,