From 8d011e0cd17fbd760e76ac4c6f8617b7ce8759d6 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 22 Sep 2022 22:10:28 +0000 Subject: [PATCH] pass db conn through --- web3_proxy/src/app.rs | 2 + web3_proxy/src/config.rs | 3 ++ web3_proxy/src/frontend/authorization.rs | 17 +++--- web3_proxy/src/rpcs/connection.rs | 4 ++ web3_proxy/src/rpcs/connections.rs | 4 ++ web3_proxy/src/rpcs/request.rs | 69 +++++++++++++----------- 6 files changed, 62 insertions(+), 37 deletions(-) diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index f07d2738..9ba59801 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -288,6 +288,7 @@ impl Web3ProxyApp { Some(pending_tx_sender.clone()), pending_transactions.clone(), open_request_handle_metrics.clone(), + db_conn.clone(), ) .await .context("balanced rpcs")?; @@ -315,6 +316,7 @@ impl Web3ProxyApp { None, pending_transactions.clone(), open_request_handle_metrics.clone(), + db_conn.clone(), ) .await .context("private_rpcs")?; diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 46f00077..c28dcb0a 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -6,6 +6,7 @@ use argh::FromArgs; use derive_more::Constructor; use ethers::prelude::TxHash; use hashbrown::HashMap; +use sea_orm::DatabaseConnection; use serde::Deserialize; use std::sync::Arc; use tokio::sync::broadcast; @@ -124,6 +125,7 @@ impl Web3ConnectionConfig { block_sender: Option>, tx_id_sender: Option>, open_request_handle_metrics: Arc, + db_conn: Option, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let hard_limit = match (self.hard_limit, redis_pool) { (None, None) => None, @@ -156,6 +158,7 @@ impl Web3ConnectionConfig { true, self.weight, open_request_handle_metrics, + db_conn, ) .await } diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index b1963c37..40b0db25 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -5,7 +5,8 @@ use axum::headers::{Referer, UserAgent}; use deferred_rate_limiter::DeferredRateLimitResult; use entities::user_keys; use sea_orm::{ - ColumnTrait, DeriveColumn, EntityTrait, EnumIter, IdenStatic, QueryFilter, QuerySelect, + ColumnTrait, DatabaseConnection, DeriveColumn, EntityTrait, EnumIter, IdenStatic, QueryFilter, + QuerySelect, }; use serde::Serialize; use std::{net::IpAddr, sync::Arc}; @@ -53,11 +54,11 @@ impl AuthorizedKey { #[derive(Debug, Serialize)] pub enum AuthorizedRequest { /// Request from the app itself - Internal, + Internal(#[serde(skip)] Option), /// Request from an anonymous IP address - Ip(IpAddr), + Ip(#[serde(skip)] Option, IpAddr), /// Request from an authenticated and authorized user - User(AuthorizedKey), + User(#[serde(skip)] Option, AuthorizedKey), } pub async fn ip_is_authorized( @@ -74,7 +75,9 @@ pub async fn ip_is_authorized( x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x), }; - Ok(AuthorizedRequest::Ip(ip)) + let db = app.db_conn.clone(); + + Ok(AuthorizedRequest::Ip(db, ip)) } pub async fn key_is_authorized( @@ -97,7 +100,9 @@ pub async fn key_is_authorized( let authorized_user = AuthorizedKey::try_new(ip, user_data, referer, user_agent)?; - Ok(AuthorizedRequest::User(authorized_user)) + let db = app.db_conn.clone(); + + Ok(AuthorizedRequest::User(db, authorized_user)) } impl Web3ProxyApp { diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 7d2f3a67..8aab4def 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -12,6 +12,7 @@ use futures::StreamExt; use parking_lot::RwLock; use rand::Rng; use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; +use sea_orm::DatabaseConnection; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use std::cmp::min; @@ -52,6 +53,7 @@ pub struct Web3Connection { // TODO: async lock? pub(super) head_block_id: RwLock>, pub(super) open_request_handle_metrics: Arc, + pub(super) db_conn: Option, } impl Web3Connection { @@ -76,6 +78,7 @@ impl Web3Connection { reconnect: bool, weight: u32, open_request_handle_metrics: Arc, + db_conn: Option, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let hard_limit = hard_limit.map(|(hard_rate_limit, redis_pool)| { // TODO: is cache size 1 okay? i think we need @@ -101,6 +104,7 @@ impl Web3Connection { head_block_id: RwLock::new(Default::default()), weight, open_request_handle_metrics, + db_conn, }; let new_connection = Arc::new(new_connection); diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 04b36a6d..783ba2c5 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -20,6 +20,7 @@ use futures::StreamExt; use hashbrown::HashMap; use moka::future::{Cache, ConcurrentCacheExt}; use petgraph::graphmap::DiGraphMap; +use sea_orm::DatabaseConnection; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::value::RawValue; @@ -68,6 +69,7 @@ impl Web3Connections { pending_tx_sender: Option>, pending_transactions: Cache, open_request_handle_metrics: Arc, + db_conn: Option, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (block_sender, block_receiver) = flume::unbounded::(); @@ -125,6 +127,7 @@ impl Web3Connections { let pending_tx_id_sender = Some(pending_tx_id_sender.clone()); let block_map = block_map.clone(); let open_request_handle_metrics = open_request_handle_metrics.clone(); + let db_conn = db_conn.clone(); tokio::spawn(async move { server_config @@ -138,6 +141,7 @@ impl Web3Connections { block_sender, pending_tx_id_sender, open_request_handle_metrics, + db_conn, ) .await }) diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index ca95bf9e..5e86d340 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -7,6 +7,7 @@ use metered::metered; use metered::HitCount; use metered::ResponseTime; use metered::Throughput; +use rand::Rng; use std::fmt; use std::sync::atomic::{self, AtomicBool, Ordering}; use std::sync::Arc; @@ -77,7 +78,10 @@ impl OpenRequestHandle { let metrics = conn.open_request_handle_metrics.clone(); let used = false.into(); - let authorization = authorization.unwrap_or_else(|| Arc::new(AuthorizedRequest::Internal)); + let authorization = authorization.unwrap_or_else(|| { + let db_conn = conn.db_conn.clone(); + Arc::new(AuthorizedRequest::Internal(db_conn)) + }); Self { authorization, @@ -156,43 +160,46 @@ impl OpenRequestHandle { // TODO: only set SaveReverts if this is an eth_call or eth_estimateGas? we'll need eth_sendRawTransaction somewhere else // TODO: logging every one is going to flood the database // TODO: have a percent chance to do this. or maybe a "logged reverts per second" - if let ProviderError::JsonRpcClientError(err) = err { - match provider { - Web3Provider::Http(_) => { - if let Some(HttpClientError::JsonRpcError(err)) = - err.downcast_ref::() - { - if err.message.starts_with("execution reverted") { - debug!(%method, ?params, "TODO: save the request"); + if save_chance == 1.0 || rand::thread_rng().gen_range(0.0..=1.0) <= save_chance + { + if let ProviderError::JsonRpcClientError(err) = err { + match provider { + Web3Provider::Http(_) => { + if let Some(HttpClientError::JsonRpcError(err)) = + err.downcast_ref::() + { + if err.message.starts_with("execution reverted") { + debug!(%method, ?params, "TODO: save the request"); - let f = self - .authorization - .clone() - .save_revert(method.to_string(), params); + let f = self + .authorization + .clone() + .save_revert(method.to_string(), params); - tokio::spawn(async move { f.await }); + tokio::spawn(async move { f.await }); - // TODO: don't do this on the hot path. spawn it - } else { - debug!(?err, %method, rpc=%self.conn, "bad response!"); + // TODO: don't do this on the hot path. spawn it + } else { + debug!(?err, %method, rpc=%self.conn, "bad response!"); + } } } - } - Web3Provider::Ws(_) => { - if let Some(WsClientError::JsonRpcError(err)) = - err.downcast_ref::() - { - if err.message.starts_with("execution reverted") { - debug!(%method, ?params, "TODO: save the request"); + Web3Provider::Ws(_) => { + if let Some(WsClientError::JsonRpcError(err)) = + err.downcast_ref::() + { + if err.message.starts_with("execution reverted") { + debug!(%method, ?params, "TODO: save the request"); - let f = self - .authorization - .clone() - .save_revert(method.to_string(), params); + let f = self + .authorization + .clone() + .save_revert(method.to_string(), params); - tokio::spawn(async move { f.await }); - } else { - debug!(?err, %method, rpc=%self.conn, "bad response!"); + tokio::spawn(async move { f.await }); + } else { + debug!(?err, %method, rpc=%self.conn, "bad response!"); + } } } }