From a80503ac481068154eb76f7e3095faa04b8cc7dd Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 24 Sep 2022 07:04:11 +0000 Subject: [PATCH] better logging on save reverts checks --- web3_proxy/Cargo.toml | 4 +- web3_proxy/src/app.rs | 2 - web3_proxy/src/config.rs | 3 - web3_proxy/src/frontend/authorization.rs | 19 +++--- web3_proxy/src/rpcs/connection.rs | 4 -- web3_proxy/src/rpcs/connections.rs | 6 +- web3_proxy/src/rpcs/request.rs | 79 +++++++++++++++++------- 7 files changed, 68 insertions(+), 49 deletions(-) diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index b4d2e819..450f23a2 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -25,7 +25,7 @@ argh = "0.1.9" axum = { version = "0.5.16", features = ["headers", "serde_json", "tokio-tungstenite", "ws"] } axum-client-ip = "0.2.0" axum-macros = "0.2.3" -# TODO: import this from ethorm so we always have the same version +# TODO: import chrono from sea-orm so we always have the same version chrono = "0.4.22" counter = "0.5.6" dashmap = "5.4.0" @@ -42,6 +42,8 @@ metered = { version = "0.9.0", features = ["serialize"] } moka = { version = "0.9.4", default-features = false, features = ["future"] } notify = "5.0.0" num = "0.4.0" +# TODO: import num_traits from sea-orm so we always have the same version +num-traits = "0.2.15" parking_lot = { version = "0.12.1", features = ["arc_lock"] } petgraph = "0.6.2" proctitle = "0.1.1" diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index d7951389..dcc68c5f 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -295,7 +295,6 @@ impl Web3ProxyApp { Some(pending_tx_sender.clone()), pending_transactions.clone(), open_request_handle_metrics.clone(), - db_conn.clone(), ) .await .context("balanced rpcs")?; @@ -323,7 +322,6 @@ impl Web3ProxyApp { None, pending_transactions.clone(), open_request_handle_metrics.clone(), - db_conn.clone(), ) .await .context("private_rpcs")?; diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 7b261373..f420806f 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -6,7 +6,6 @@ use argh::FromArgs; use derive_more::Constructor; use ethers::prelude::TxHash; use hashbrown::HashMap; -use sea_orm::DatabaseConnection; use serde::Deserialize; use std::sync::Arc; use tokio::sync::broadcast; @@ -131,7 +130,6 @@ impl Web3ConnectionConfig { block_sender: Option>, tx_id_sender: Option>, open_request_handle_metrics: Arc, - db_conn: Option, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let hard_limit = match (self.hard_limit, redis_pool) { (None, None) => None, @@ -164,7 +162,6 @@ impl Web3ConnectionConfig { true, self.weight, open_request_handle_metrics, - db_conn, ) .await } diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 32a92ca3..57d87ce1 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -180,19 +180,20 @@ impl AuthorizedKey { #[derive(Debug, Serialize)] pub enum AuthorizedRequest { - /// Request from the app itself - Internal(#[serde(skip)] Option), + /// Request from this app + Internal, /// Request from an anonymous IP address - Ip(#[serde(skip)] Option, IpAddr), + Ip(#[serde(skip)] IpAddr), /// Request from an authenticated and authorized user User(#[serde(skip)] Option, AuthorizedKey), } impl AuthorizedRequest { + /// Only User has a database connection in case it needs to save a revert to the database. pub fn db_conn(&self) -> Option<&DatabaseConnection> { match self { - Self::Internal(x) => x.as_ref(), - Self::Ip(x, _) => x.as_ref(), + Self::Internal => None, + Self::Ip(_) => None, Self::User(x, _) => x.as_ref(), } } @@ -213,9 +214,7 @@ pub async fn login_is_authorized( x => unimplemented!("rate_limit_login shouldn't ever see these: {:?}", x), }; - let db = None; - - Ok(AuthorizedRequest::Ip(db, ip)) + Ok(AuthorizedRequest::Ip(ip)) } pub async fn bearer_is_authorized( @@ -272,9 +271,7 @@ pub async fn ip_is_authorized( x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x), }; - let db = app.db_conn.clone(); - - Ok(AuthorizedRequest::Ip(db, ip)) + Ok(AuthorizedRequest::Ip(ip)) } pub async fn key_is_authorized( diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 76c8222a..23f2307b 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -12,7 +12,6 @@ use futures::StreamExt; use parking_lot::RwLock; use rand::Rng; use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; -use sea_orm::DatabaseConnection; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; @@ -54,7 +53,6 @@ pub struct Web3Connection { // TODO: async lock? pub(super) head_block_id: RwLock>, pub(super) open_request_handle_metrics: Arc, - pub(super) db_conn: Option, } impl Web3Connection { @@ -79,7 +77,6 @@ impl Web3Connection { reconnect: bool, weight: u32, open_request_handle_metrics: Arc, - db_conn: Option, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let hard_limit = hard_limit.map(|(hard_rate_limit, redis_pool)| { // TODO: is cache size 1 okay? i think we need @@ -105,7 +102,6 @@ impl Web3Connection { head_block_id: RwLock::new(Default::default()), weight, open_request_handle_metrics, - db_conn, }; let new_connection = Arc::new(new_connection); diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 615821d2..c655b09a 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -20,7 +20,6 @@ use futures::StreamExt; use hashbrown::HashMap; use moka::future::{Cache, ConcurrentCacheExt}; use petgraph::graphmap::DiGraphMap; -use sea_orm::DatabaseConnection; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; @@ -70,7 +69,6 @@ impl Web3Connections { pending_tx_sender: Option>, pending_transactions: Cache, open_request_handle_metrics: Arc, - db_conn: Option, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (block_sender, block_receiver) = flume::unbounded::(); @@ -128,7 +126,6 @@ impl Web3Connections { let pending_tx_id_sender = Some(pending_tx_id_sender.clone()); let block_map = block_map.clone(); let open_request_handle_metrics = open_request_handle_metrics.clone(); - let db_conn = db_conn.clone(); tokio::spawn(async move { server_config @@ -142,7 +139,6 @@ impl Web3Connections { block_sender, pending_tx_id_sender, open_request_handle_metrics, - db_conn, ) .await }) @@ -523,7 +519,7 @@ impl Web3Connections { .request( &request.method, &json!(request.params), - RequestErrorHandler::SaveReverts(0.0), + RequestErrorHandler::SaveReverts, ) .await; diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index af6363c0..872d0c0f 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -11,7 +11,9 @@ use metered::metered; use metered::HitCount; use metered::ResponseTime; use metered::Throughput; +use num_traits::cast::FromPrimitive; use rand::Rng; +use sea_orm::prelude::Decimal; use sea_orm::ActiveModelTrait; use serde_json::json; use std::fmt; @@ -42,8 +44,8 @@ pub struct OpenRequestHandle { /// Depending on the context, RPC errors can require different handling. pub enum RequestErrorHandler { - /// Contains the percent chance to save the revert - SaveReverts(f32), + /// Potentially save the revert. Users can tune how often this happens + SaveReverts, /// Log at the debug level. Use when errors are expected. DebugLevel, /// Log at the error level. Use when errors are bad. @@ -52,13 +54,17 @@ pub enum RequestErrorHandler { WarnLevel, } +// TODO: second param could be skipped since we don't need it here #[derive(serde::Deserialize, serde::Serialize)] -struct EthCallParams { +struct EthCallParams((EthCallFirstParams, Option)); + +#[derive(serde::Deserialize, serde::Serialize)] +struct EthCallFirstParams { method: Method, // TODO: do this as Address instead to: Vec, // TODO: do this as a Bytes instead - data: String, + data: Option, } impl From for RequestErrorHandler { @@ -74,7 +80,7 @@ impl From for RequestErrorHandler { impl AuthorizedRequest { /// Save a RPC call that return "execution reverted" to the database. - async fn save_revert(self: Arc, params: EthCallParams) -> anyhow::Result<()> { + async fn save_revert(self: Arc, params: EthCallFirstParams) -> anyhow::Result<()> { if let Self::User(Some(db_conn), authorized_request) = &*self { // TODO: do this on the database side? let timestamp = Utc::now(); @@ -122,10 +128,8 @@ impl OpenRequestHandle { let metrics = conn.open_request_handle_metrics.clone(); let used = false.into(); - let authorized_request = authorized_request.unwrap_or_else(|| { - let db_conn = conn.db_conn.clone(); - Arc::new(AuthorizedRequest::Internal(db_conn)) - }); + let authorized_request = + authorized_request.unwrap_or_else(|| Arc::new(AuthorizedRequest::Internal)); Self { authorized_request, @@ -193,17 +197,40 @@ impl OpenRequestHandle { if let Err(err) = &response { // only save reverts for some types of calls // TODO: do something special for eth_sendRawTransaction too - let error_handler = if let RequestErrorHandler::SaveReverts(save_chance) = error_handler - { - if ["eth_call", "eth_estimateGas"].contains(&method) - && self.authorized_request.db_conn().is_some() - && save_chance != 0.0 - && (save_chance == 1.0 - || rand::thread_rng().gen_range(0.0..=1.0) <= save_chance) + let error_handler = if let RequestErrorHandler::SaveReverts = error_handler { + if !["eth_call", "eth_estimateGas"].contains(&method) { + debug!(%method, "skipping save on revert"); + RequestErrorHandler::DebugLevel + } else if self.authorized_request.db_conn().is_none() { + debug!(%method, "no database. skipping save on revert"); + RequestErrorHandler::DebugLevel + } else if let AuthorizedRequest::User(db_conn, y) = self.authorized_request.as_ref() { - error_handler + if db_conn.is_none() { + trace!(%method, "no database. skipping save on revert"); + RequestErrorHandler::DebugLevel + } else { + let log_revert_chance = y.log_revert_chance; + + if log_revert_chance.is_zero() { + trace!(%method, "no chance. skipping save on revert"); + RequestErrorHandler::DebugLevel + } else if log_revert_chance == Decimal::ONE { + trace!(%method, "gaurenteed chance. SAVING on revert"); + error_handler + } else if Decimal::from_f32(rand::thread_rng().gen_range(0.0f32..=1.0)) + .expect("f32 should always convert to a Decimal") + > log_revert_chance + { + trace!(%method, "missed chance. skipping save on revert"); + RequestErrorHandler::DebugLevel + } else { + trace!("Saving on revert"); + // TODO: is always logging at debug level fine? + error_handler + } + } } else { - // TODO: is always logging at debug level fine? RequestErrorHandler::DebugLevel } } else { @@ -220,10 +247,11 @@ impl OpenRequestHandle { RequestErrorHandler::WarnLevel => { warn!(?err, %method, rpc=%self.conn, "bad response!"); } - RequestErrorHandler::SaveReverts(_) => { + RequestErrorHandler::SaveReverts => { // TODO: logging every one is going to flood the database // TODO: have a percent chance to do this. or maybe a "logged reverts per second" if let ProviderError::JsonRpcClientError(err) = err { + // Http and Ws errors are very similar, but different types let msg = match provider { Web3Provider::Http(_) => { if let Some(HttpClientError::JsonRpcError(err)) = @@ -248,14 +276,19 @@ impl OpenRequestHandle { if let Some(msg) = msg { if msg.starts_with("execution reverted") { // TODO: is there a more efficient way to do this? - let params: EthCallParams = serde_json::from_value(json!(params)) - .expect("parsing eth_call"); + debug!(?params); - // spawn saving to the database so we don't slow down the request (or error if no db) - let f = self.authorized_request.clone().save_revert(params); + // TODO: DO NOT UNWRAP! But also figure out the best way to keep returning ProviderErrors here + let params: EthCallParams = serde_json::from_value(json!(params)) + .context("parsing params to EthCallParams") + .unwrap(); + + // spawn saving to the database so we don't slow down the request + let f = self.authorized_request.clone().save_revert(params.0 .0); tokio::spawn(async move { f.await }); } else { + // TODO: log any of the errors? debug!(?err, %method, rpc=%self.conn, "bad response!"); } }