better logging on save reverts checks
This commit is contained in:
parent
7da8864a1d
commit
a80503ac48
@ -25,7 +25,7 @@ argh = "0.1.9"
|
|||||||
axum = { version = "0.5.16", features = ["headers", "serde_json", "tokio-tungstenite", "ws"] }
|
axum = { version = "0.5.16", features = ["headers", "serde_json", "tokio-tungstenite", "ws"] }
|
||||||
axum-client-ip = "0.2.0"
|
axum-client-ip = "0.2.0"
|
||||||
axum-macros = "0.2.3"
|
axum-macros = "0.2.3"
|
||||||
# TODO: import this from ethorm so we always have the same version
|
# TODO: import chrono from sea-orm so we always have the same version
|
||||||
chrono = "0.4.22"
|
chrono = "0.4.22"
|
||||||
counter = "0.5.6"
|
counter = "0.5.6"
|
||||||
dashmap = "5.4.0"
|
dashmap = "5.4.0"
|
||||||
@ -42,6 +42,8 @@ metered = { version = "0.9.0", features = ["serialize"] }
|
|||||||
moka = { version = "0.9.4", default-features = false, features = ["future"] }
|
moka = { version = "0.9.4", default-features = false, features = ["future"] }
|
||||||
notify = "5.0.0"
|
notify = "5.0.0"
|
||||||
num = "0.4.0"
|
num = "0.4.0"
|
||||||
|
# TODO: import num_traits from sea-orm so we always have the same version
|
||||||
|
num-traits = "0.2.15"
|
||||||
parking_lot = { version = "0.12.1", features = ["arc_lock"] }
|
parking_lot = { version = "0.12.1", features = ["arc_lock"] }
|
||||||
petgraph = "0.6.2"
|
petgraph = "0.6.2"
|
||||||
proctitle = "0.1.1"
|
proctitle = "0.1.1"
|
||||||
|
@ -295,7 +295,6 @@ impl Web3ProxyApp {
|
|||||||
Some(pending_tx_sender.clone()),
|
Some(pending_tx_sender.clone()),
|
||||||
pending_transactions.clone(),
|
pending_transactions.clone(),
|
||||||
open_request_handle_metrics.clone(),
|
open_request_handle_metrics.clone(),
|
||||||
db_conn.clone(),
|
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.context("balanced rpcs")?;
|
.context("balanced rpcs")?;
|
||||||
@ -323,7 +322,6 @@ impl Web3ProxyApp {
|
|||||||
None,
|
None,
|
||||||
pending_transactions.clone(),
|
pending_transactions.clone(),
|
||||||
open_request_handle_metrics.clone(),
|
open_request_handle_metrics.clone(),
|
||||||
db_conn.clone(),
|
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.context("private_rpcs")?;
|
.context("private_rpcs")?;
|
||||||
|
@ -6,7 +6,6 @@ use argh::FromArgs;
|
|||||||
use derive_more::Constructor;
|
use derive_more::Constructor;
|
||||||
use ethers::prelude::TxHash;
|
use ethers::prelude::TxHash;
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use sea_orm::DatabaseConnection;
|
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
@ -131,7 +130,6 @@ impl Web3ConnectionConfig {
|
|||||||
block_sender: Option<flume::Sender<BlockAndRpc>>,
|
block_sender: Option<flume::Sender<BlockAndRpc>>,
|
||||||
tx_id_sender: Option<flume::Sender<TxHashAndRpc>>,
|
tx_id_sender: Option<flume::Sender<TxHashAndRpc>>,
|
||||||
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
|
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
|
||||||
db_conn: Option<DatabaseConnection>,
|
|
||||||
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
|
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
|
||||||
let hard_limit = match (self.hard_limit, redis_pool) {
|
let hard_limit = match (self.hard_limit, redis_pool) {
|
||||||
(None, None) => None,
|
(None, None) => None,
|
||||||
@ -164,7 +162,6 @@ impl Web3ConnectionConfig {
|
|||||||
true,
|
true,
|
||||||
self.weight,
|
self.weight,
|
||||||
open_request_handle_metrics,
|
open_request_handle_metrics,
|
||||||
db_conn,
|
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -180,19 +180,20 @@ impl AuthorizedKey {
|
|||||||
|
|
||||||
#[derive(Debug, Serialize)]
|
#[derive(Debug, Serialize)]
|
||||||
pub enum AuthorizedRequest {
|
pub enum AuthorizedRequest {
|
||||||
/// Request from the app itself
|
/// Request from this app
|
||||||
Internal(#[serde(skip)] Option<DatabaseConnection>),
|
Internal,
|
||||||
/// Request from an anonymous IP address
|
/// Request from an anonymous IP address
|
||||||
Ip(#[serde(skip)] Option<DatabaseConnection>, IpAddr),
|
Ip(#[serde(skip)] IpAddr),
|
||||||
/// Request from an authenticated and authorized user
|
/// Request from an authenticated and authorized user
|
||||||
User(#[serde(skip)] Option<DatabaseConnection>, AuthorizedKey),
|
User(#[serde(skip)] Option<DatabaseConnection>, AuthorizedKey),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AuthorizedRequest {
|
impl AuthorizedRequest {
|
||||||
|
/// Only User has a database connection in case it needs to save a revert to the database.
|
||||||
pub fn db_conn(&self) -> Option<&DatabaseConnection> {
|
pub fn db_conn(&self) -> Option<&DatabaseConnection> {
|
||||||
match self {
|
match self {
|
||||||
Self::Internal(x) => x.as_ref(),
|
Self::Internal => None,
|
||||||
Self::Ip(x, _) => x.as_ref(),
|
Self::Ip(_) => None,
|
||||||
Self::User(x, _) => x.as_ref(),
|
Self::User(x, _) => x.as_ref(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -213,9 +214,7 @@ pub async fn login_is_authorized(
|
|||||||
x => unimplemented!("rate_limit_login shouldn't ever see these: {:?}", x),
|
x => unimplemented!("rate_limit_login shouldn't ever see these: {:?}", x),
|
||||||
};
|
};
|
||||||
|
|
||||||
let db = None;
|
Ok(AuthorizedRequest::Ip(ip))
|
||||||
|
|
||||||
Ok(AuthorizedRequest::Ip(db, ip))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn bearer_is_authorized(
|
pub async fn bearer_is_authorized(
|
||||||
@ -272,9 +271,7 @@ pub async fn ip_is_authorized(
|
|||||||
x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x),
|
x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x),
|
||||||
};
|
};
|
||||||
|
|
||||||
let db = app.db_conn.clone();
|
Ok(AuthorizedRequest::Ip(ip))
|
||||||
|
|
||||||
Ok(AuthorizedRequest::Ip(db, ip))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn key_is_authorized(
|
pub async fn key_is_authorized(
|
||||||
|
@ -12,7 +12,6 @@ use futures::StreamExt;
|
|||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter};
|
use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter};
|
||||||
use sea_orm::DatabaseConnection;
|
|
||||||
use serde::ser::{SerializeStruct, Serializer};
|
use serde::ser::{SerializeStruct, Serializer};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
@ -54,7 +53,6 @@ pub struct Web3Connection {
|
|||||||
// TODO: async lock?
|
// TODO: async lock?
|
||||||
pub(super) head_block_id: RwLock<Option<BlockId>>,
|
pub(super) head_block_id: RwLock<Option<BlockId>>,
|
||||||
pub(super) open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
|
pub(super) open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
|
||||||
pub(super) db_conn: Option<DatabaseConnection>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Web3Connection {
|
impl Web3Connection {
|
||||||
@ -79,7 +77,6 @@ impl Web3Connection {
|
|||||||
reconnect: bool,
|
reconnect: bool,
|
||||||
weight: u32,
|
weight: u32,
|
||||||
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
|
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
|
||||||
db_conn: Option<DatabaseConnection>,
|
|
||||||
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
|
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
|
||||||
let hard_limit = hard_limit.map(|(hard_rate_limit, redis_pool)| {
|
let hard_limit = hard_limit.map(|(hard_rate_limit, redis_pool)| {
|
||||||
// TODO: is cache size 1 okay? i think we need
|
// TODO: is cache size 1 okay? i think we need
|
||||||
@ -105,7 +102,6 @@ impl Web3Connection {
|
|||||||
head_block_id: RwLock::new(Default::default()),
|
head_block_id: RwLock::new(Default::default()),
|
||||||
weight,
|
weight,
|
||||||
open_request_handle_metrics,
|
open_request_handle_metrics,
|
||||||
db_conn,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let new_connection = Arc::new(new_connection);
|
let new_connection = Arc::new(new_connection);
|
||||||
|
@ -20,7 +20,6 @@ use futures::StreamExt;
|
|||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use moka::future::{Cache, ConcurrentCacheExt};
|
use moka::future::{Cache, ConcurrentCacheExt};
|
||||||
use petgraph::graphmap::DiGraphMap;
|
use petgraph::graphmap::DiGraphMap;
|
||||||
use sea_orm::DatabaseConnection;
|
|
||||||
use serde::ser::{SerializeStruct, Serializer};
|
use serde::ser::{SerializeStruct, Serializer};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
@ -70,7 +69,6 @@ impl Web3Connections {
|
|||||||
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
|
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
|
||||||
pending_transactions: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
|
pending_transactions: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
|
||||||
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
|
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
|
||||||
db_conn: Option<DatabaseConnection>,
|
|
||||||
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
|
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
|
||||||
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
|
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
|
||||||
let (block_sender, block_receiver) = flume::unbounded::<BlockAndRpc>();
|
let (block_sender, block_receiver) = flume::unbounded::<BlockAndRpc>();
|
||||||
@ -128,7 +126,6 @@ impl Web3Connections {
|
|||||||
let pending_tx_id_sender = Some(pending_tx_id_sender.clone());
|
let pending_tx_id_sender = Some(pending_tx_id_sender.clone());
|
||||||
let block_map = block_map.clone();
|
let block_map = block_map.clone();
|
||||||
let open_request_handle_metrics = open_request_handle_metrics.clone();
|
let open_request_handle_metrics = open_request_handle_metrics.clone();
|
||||||
let db_conn = db_conn.clone();
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
server_config
|
server_config
|
||||||
@ -142,7 +139,6 @@ impl Web3Connections {
|
|||||||
block_sender,
|
block_sender,
|
||||||
pending_tx_id_sender,
|
pending_tx_id_sender,
|
||||||
open_request_handle_metrics,
|
open_request_handle_metrics,
|
||||||
db_conn,
|
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
})
|
})
|
||||||
@ -523,7 +519,7 @@ impl Web3Connections {
|
|||||||
.request(
|
.request(
|
||||||
&request.method,
|
&request.method,
|
||||||
&json!(request.params),
|
&json!(request.params),
|
||||||
RequestErrorHandler::SaveReverts(0.0),
|
RequestErrorHandler::SaveReverts,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
@ -11,7 +11,9 @@ use metered::metered;
|
|||||||
use metered::HitCount;
|
use metered::HitCount;
|
||||||
use metered::ResponseTime;
|
use metered::ResponseTime;
|
||||||
use metered::Throughput;
|
use metered::Throughput;
|
||||||
|
use num_traits::cast::FromPrimitive;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
|
use sea_orm::prelude::Decimal;
|
||||||
use sea_orm::ActiveModelTrait;
|
use sea_orm::ActiveModelTrait;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
@ -42,8 +44,8 @@ pub struct OpenRequestHandle {
|
|||||||
|
|
||||||
/// Depending on the context, RPC errors can require different handling.
|
/// Depending on the context, RPC errors can require different handling.
|
||||||
pub enum RequestErrorHandler {
|
pub enum RequestErrorHandler {
|
||||||
/// Contains the percent chance to save the revert
|
/// Potentially save the revert. Users can tune how often this happens
|
||||||
SaveReverts(f32),
|
SaveReverts,
|
||||||
/// Log at the debug level. Use when errors are expected.
|
/// Log at the debug level. Use when errors are expected.
|
||||||
DebugLevel,
|
DebugLevel,
|
||||||
/// Log at the error level. Use when errors are bad.
|
/// Log at the error level. Use when errors are bad.
|
||||||
@ -52,13 +54,17 @@ pub enum RequestErrorHandler {
|
|||||||
WarnLevel,
|
WarnLevel,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: second param could be skipped since we don't need it here
|
||||||
#[derive(serde::Deserialize, serde::Serialize)]
|
#[derive(serde::Deserialize, serde::Serialize)]
|
||||||
struct EthCallParams {
|
struct EthCallParams((EthCallFirstParams, Option<serde_json::Value>));
|
||||||
|
|
||||||
|
#[derive(serde::Deserialize, serde::Serialize)]
|
||||||
|
struct EthCallFirstParams {
|
||||||
method: Method,
|
method: Method,
|
||||||
// TODO: do this as Address instead
|
// TODO: do this as Address instead
|
||||||
to: Vec<u8>,
|
to: Vec<u8>,
|
||||||
// TODO: do this as a Bytes instead
|
// TODO: do this as a Bytes instead
|
||||||
data: String,
|
data: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<Level> for RequestErrorHandler {
|
impl From<Level> for RequestErrorHandler {
|
||||||
@ -74,7 +80,7 @@ impl From<Level> for RequestErrorHandler {
|
|||||||
|
|
||||||
impl AuthorizedRequest {
|
impl AuthorizedRequest {
|
||||||
/// Save a RPC call that return "execution reverted" to the database.
|
/// Save a RPC call that return "execution reverted" to the database.
|
||||||
async fn save_revert(self: Arc<Self>, params: EthCallParams) -> anyhow::Result<()> {
|
async fn save_revert(self: Arc<Self>, params: EthCallFirstParams) -> anyhow::Result<()> {
|
||||||
if let Self::User(Some(db_conn), authorized_request) = &*self {
|
if let Self::User(Some(db_conn), authorized_request) = &*self {
|
||||||
// TODO: do this on the database side?
|
// TODO: do this on the database side?
|
||||||
let timestamp = Utc::now();
|
let timestamp = Utc::now();
|
||||||
@ -122,10 +128,8 @@ impl OpenRequestHandle {
|
|||||||
let metrics = conn.open_request_handle_metrics.clone();
|
let metrics = conn.open_request_handle_metrics.clone();
|
||||||
let used = false.into();
|
let used = false.into();
|
||||||
|
|
||||||
let authorized_request = authorized_request.unwrap_or_else(|| {
|
let authorized_request =
|
||||||
let db_conn = conn.db_conn.clone();
|
authorized_request.unwrap_or_else(|| Arc::new(AuthorizedRequest::Internal));
|
||||||
Arc::new(AuthorizedRequest::Internal(db_conn))
|
|
||||||
});
|
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
authorized_request,
|
authorized_request,
|
||||||
@ -193,17 +197,40 @@ impl OpenRequestHandle {
|
|||||||
if let Err(err) = &response {
|
if let Err(err) = &response {
|
||||||
// only save reverts for some types of calls
|
// only save reverts for some types of calls
|
||||||
// TODO: do something special for eth_sendRawTransaction too
|
// TODO: do something special for eth_sendRawTransaction too
|
||||||
let error_handler = if let RequestErrorHandler::SaveReverts(save_chance) = error_handler
|
let error_handler = if let RequestErrorHandler::SaveReverts = error_handler {
|
||||||
{
|
if !["eth_call", "eth_estimateGas"].contains(&method) {
|
||||||
if ["eth_call", "eth_estimateGas"].contains(&method)
|
debug!(%method, "skipping save on revert");
|
||||||
&& self.authorized_request.db_conn().is_some()
|
RequestErrorHandler::DebugLevel
|
||||||
&& save_chance != 0.0
|
} else if self.authorized_request.db_conn().is_none() {
|
||||||
&& (save_chance == 1.0
|
debug!(%method, "no database. skipping save on revert");
|
||||||
|| rand::thread_rng().gen_range(0.0..=1.0) <= save_chance)
|
RequestErrorHandler::DebugLevel
|
||||||
|
} else if let AuthorizedRequest::User(db_conn, y) = self.authorized_request.as_ref()
|
||||||
{
|
{
|
||||||
error_handler
|
if db_conn.is_none() {
|
||||||
|
trace!(%method, "no database. skipping save on revert");
|
||||||
|
RequestErrorHandler::DebugLevel
|
||||||
|
} else {
|
||||||
|
let log_revert_chance = y.log_revert_chance;
|
||||||
|
|
||||||
|
if log_revert_chance.is_zero() {
|
||||||
|
trace!(%method, "no chance. skipping save on revert");
|
||||||
|
RequestErrorHandler::DebugLevel
|
||||||
|
} else if log_revert_chance == Decimal::ONE {
|
||||||
|
trace!(%method, "gaurenteed chance. SAVING on revert");
|
||||||
|
error_handler
|
||||||
|
} else if Decimal::from_f32(rand::thread_rng().gen_range(0.0f32..=1.0))
|
||||||
|
.expect("f32 should always convert to a Decimal")
|
||||||
|
> log_revert_chance
|
||||||
|
{
|
||||||
|
trace!(%method, "missed chance. skipping save on revert");
|
||||||
|
RequestErrorHandler::DebugLevel
|
||||||
|
} else {
|
||||||
|
trace!("Saving on revert");
|
||||||
|
// TODO: is always logging at debug level fine?
|
||||||
|
error_handler
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// TODO: is always logging at debug level fine?
|
|
||||||
RequestErrorHandler::DebugLevel
|
RequestErrorHandler::DebugLevel
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -220,10 +247,11 @@ impl OpenRequestHandle {
|
|||||||
RequestErrorHandler::WarnLevel => {
|
RequestErrorHandler::WarnLevel => {
|
||||||
warn!(?err, %method, rpc=%self.conn, "bad response!");
|
warn!(?err, %method, rpc=%self.conn, "bad response!");
|
||||||
}
|
}
|
||||||
RequestErrorHandler::SaveReverts(_) => {
|
RequestErrorHandler::SaveReverts => {
|
||||||
// TODO: logging every one is going to flood the database
|
// TODO: logging every one is going to flood the database
|
||||||
// TODO: have a percent chance to do this. or maybe a "logged reverts per second"
|
// TODO: have a percent chance to do this. or maybe a "logged reverts per second"
|
||||||
if let ProviderError::JsonRpcClientError(err) = err {
|
if let ProviderError::JsonRpcClientError(err) = err {
|
||||||
|
// Http and Ws errors are very similar, but different types
|
||||||
let msg = match provider {
|
let msg = match provider {
|
||||||
Web3Provider::Http(_) => {
|
Web3Provider::Http(_) => {
|
||||||
if let Some(HttpClientError::JsonRpcError(err)) =
|
if let Some(HttpClientError::JsonRpcError(err)) =
|
||||||
@ -248,14 +276,19 @@ impl OpenRequestHandle {
|
|||||||
if let Some(msg) = msg {
|
if let Some(msg) = msg {
|
||||||
if msg.starts_with("execution reverted") {
|
if msg.starts_with("execution reverted") {
|
||||||
// TODO: is there a more efficient way to do this?
|
// TODO: is there a more efficient way to do this?
|
||||||
let params: EthCallParams = serde_json::from_value(json!(params))
|
debug!(?params);
|
||||||
.expect("parsing eth_call");
|
|
||||||
|
|
||||||
// spawn saving to the database so we don't slow down the request (or error if no db)
|
// TODO: DO NOT UNWRAP! But also figure out the best way to keep returning ProviderErrors here
|
||||||
let f = self.authorized_request.clone().save_revert(params);
|
let params: EthCallParams = serde_json::from_value(json!(params))
|
||||||
|
.context("parsing params to EthCallParams")
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// spawn saving to the database so we don't slow down the request
|
||||||
|
let f = self.authorized_request.clone().save_revert(params.0 .0);
|
||||||
|
|
||||||
tokio::spawn(async move { f.await });
|
tokio::spawn(async move { f.await });
|
||||||
} else {
|
} else {
|
||||||
|
// TODO: log any of the errors?
|
||||||
debug!(?err, %method, rpc=%self.conn, "bad response!");
|
debug!(?err, %method, rpc=%self.conn, "bad response!");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user