diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index ac86b93c..de5a5035 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -37,11 +37,11 @@ use uuid::Uuid; use crate::block_helpers::block_needed; use crate::config::{AppConfig, TopConfig}; -use crate::connections::Web3Connections; use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; +use crate::rpcs::Web3Connections; use crate::stats::AppStats; // TODO: make this customizable? diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index abbff65c..5354e39b 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use tokio::sync::broadcast; use crate::app::AnyhowJoinHandle; -use crate::connection::Web3Connection; +use crate::rpcs::Web3Connection; pub type BlockAndRpc = (Arc>, Arc); diff --git a/web3_proxy/src/frontend/http_proxy.rs b/web3_proxy/src/frontend/http_proxy.rs index 6f61ade8..86930351 100644 --- a/web3_proxy/src/frontend/http_proxy.rs +++ b/web3_proxy/src/frontend/http_proxy.rs @@ -1,6 +1,6 @@ use super::errors::FrontendResult; use super::rate_limit::{rate_limit_by_ip, rate_limit_by_user_key}; -use crate::stats::{Protocol, ProxyRequestLabels}; +use crate::stats::Protocol; use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum}; use axum::extract::Path; use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; diff --git a/web3_proxy/src/frontend/rate_limit.rs b/web3_proxy/src/frontend/rate_limit.rs index b17a3246..182ff3e4 100644 --- a/web3_proxy/src/frontend/rate_limit.rs +++ b/web3_proxy/src/frontend/rate_limit.rs @@ -28,8 +28,6 @@ pub enum RequestFrom { User(u64), } -pub type RateLimitFrontendResult = Result; - impl TryFrom for IpAddr { type Error = anyhow::Error; diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 05cbeaba..d0b6e263 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -18,8 +18,8 @@ use axum::{ use axum_auth::AuthBearer; use axum_client_ip::ClientIp; use axum_macros::debug_handler; -use entities::sea_orm_active_enums::Role; -use entities::{user, user_keys}; +// use entities::sea_orm_active_enums::Role; +use entities::user; use ethers::{prelude::Address, types::Bytes}; use hashbrown::HashMap; use redis_rate_limit::redis::AsyncCommands; diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index 5da543cf..289bfa36 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -1,9 +1,8 @@ pub mod app; pub mod block_helpers; pub mod config; -pub mod connection; -pub mod connections; pub mod frontend; pub mod jsonrpc; +pub mod rpcs; pub mod stats; pub mod users; diff --git a/web3_proxy/src/connection.rs b/web3_proxy/src/rpcs/connection.rs similarity index 99% rename from web3_proxy/src/connection.rs rename to web3_proxy/src/rpcs/connection.rs index c225c9a7..64bbec6c 100644 --- a/web3_proxy/src/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -20,6 +20,7 @@ use tracing::{error, info, info_span, instrument, trace, warn, Instrument}; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::BlockAndRpc; +// TODO: rename this pub enum HandleResult { ActiveRequest(ActiveRequestHandle), RetryAt(Instant), @@ -33,6 +34,25 @@ pub enum Web3Provider { Ws(ethers::providers::Provider), } +/// An active connection to a Web3Rpc +pub struct Web3Connection { + name: String, + /// TODO: can we get this from the provider? do we even need it? + pub url: String, + /// keep track of currently open requests. We sort on this + active_requests: AtomicU32, + /// provider is in a RwLock so that we can replace it if re-connecting + /// it is an async lock because we hold it open across awaits + provider: AsyncRwLock>>, + /// rate limits are stored in a central redis so that multiple proxies can share their rate limits + hard_limit: Option, + /// used for load balancing to the least loaded server + pub soft_limit: u32, + block_data_limit: AtomicU64, + pub weight: u32, + head_block: RwLock<(H256, U64)>, +} + impl Web3Provider { #[instrument] async fn from_str(url_str: &str, http_client: Option) -> anyhow::Result { @@ -71,25 +91,6 @@ impl fmt::Debug for Web3Provider { } } -/// An active connection to a Web3Rpc -pub struct Web3Connection { - name: String, - /// TODO: can we get this from the provider? do we even need it? - pub url: String, - /// keep track of currently open requests. We sort on this - active_requests: AtomicU32, - /// provider is in a RwLock so that we can replace it if re-connecting - /// it is an async lock because we hold it open across awaits - provider: AsyncRwLock>>, - /// rate limits are stored in a central redis so that multiple proxies can share their rate limits - hard_limit: Option, - /// used for load balancing to the least loaded server - pub soft_limit: u32, - block_data_limit: AtomicU64, - pub weight: u32, - head_block: RwLock<(H256, U64)>, -} - impl Web3Connection { /// Connect to a web3 rpc // #[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))] diff --git a/web3_proxy/src/connections.rs b/web3_proxy/src/rpcs/connections.rs similarity index 99% rename from web3_proxy/src/connections.rs rename to web3_proxy/src/rpcs/connections.rs index 380978bc..7f423941 100644 --- a/web3_proxy/src/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -13,6 +13,11 @@ use indexmap::{IndexMap, IndexSet}; use std::cmp::Reverse; // use parking_lot::RwLock; // use petgraph::graphmap::DiGraphMap; +use super::SyncedConnections; +use super::{ActiveRequestHandle, HandleResult, Web3Connection}; +use crate::app::{flatten_handle, AnyhowJoinHandle, TxState}; +use crate::config::Web3ConnectionConfig; +use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; @@ -26,34 +31,6 @@ use tokio::time::{interval, sleep, sleep_until, MissedTickBehavior}; use tokio::time::{Duration, Instant}; use tracing::{debug, error, info, instrument, trace, warn}; -use crate::app::{flatten_handle, AnyhowJoinHandle, TxState}; -use crate::config::Web3ConnectionConfig; -use crate::connection::{ActiveRequestHandle, HandleResult, Web3Connection}; -use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; - -/// A collection of Web3Connections that are on the same block. -/// Serialize is so we can print it on our debug endpoint -#[derive(Clone, Default, Serialize)] -struct SyncedConnections { - head_block_num: U64, - head_block_hash: H256, - // TODO: this should be able to serialize, but it isn't - #[serde(skip_serializing)] - conns: IndexSet>, -} - -impl fmt::Debug for SyncedConnections { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though - // TODO: print the actual conns? - f.debug_struct("SyncedConnections") - .field("head_num", &self.head_block_num) - .field("head_hash", &self.head_block_hash) - .field("num_conns", &self.conns.len()) - .finish_non_exhaustive() - } -} - #[derive(Default)] pub struct BlockChain { /// only includes blocks on the main chain. @@ -1198,3 +1175,15 @@ mod tests { assert_eq!(x, [false, true, true]) } } + +impl fmt::Debug for SyncedConnections { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: the default formatter takes forever to write. this is too quiet though + // TODO: print the actual conns? + f.debug_struct("SyncedConnections") + .field("head_num", &self.head_block_num) + .field("head_hash", &self.head_block_hash) + .field("num_conns", &self.conns.len()) + .finish_non_exhaustive() + } +} diff --git a/web3_proxy/src/rpcs/mod.rs b/web3_proxy/src/rpcs/mod.rs new file mode 100644 index 00000000..c2337ac9 --- /dev/null +++ b/web3_proxy/src/rpcs/mod.rs @@ -0,0 +1,7 @@ +mod connection; +mod connections; +mod synced_connections; + +pub use connection::{ActiveRequestHandle, HandleResult, Web3Connection}; +pub use connections::Web3Connections; +pub use synced_connections::SyncedConnections; diff --git a/web3_proxy/src/rpcs/synced_connections.rs b/web3_proxy/src/rpcs/synced_connections.rs new file mode 100644 index 00000000..d2350bca --- /dev/null +++ b/web3_proxy/src/rpcs/synced_connections.rs @@ -0,0 +1,16 @@ +use super::Web3Connection; +use ethers::prelude::{H256, U64}; +use indexmap::IndexSet; +use serde::Serialize; +use std::sync::Arc; + +/// A collection of Web3Connections that are on the same block. +/// Serialize is so we can print it on our debug endpoint +#[derive(Clone, Default, Serialize)] +pub struct SyncedConnections { + pub(super) head_block_num: U64, + pub(super) head_block_hash: H256, + // TODO: this should be able to serialize, but it isn't + #[serde(skip_serializing)] + pub(super) conns: IndexSet>, +}