diff --git a/Cargo.lock b/Cargo.lock index 6278329d..866dd939 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3770,6 +3770,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "toml", "tracing", "tracing-subscriber", "url", diff --git a/Cargo.toml b/Cargo.toml index a02e4919..e5b45fd3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ reqwest = { version = "0.11.10", features = ["json", "rustls"] } rustc-hash = "1.1.0" serde = { version = "1.0.137", features = [] } serde_json = { version = "1.0.80", default-features = false, features = ["alloc"] } +toml = "*" tracing = "0.1.34" tracing-subscriber = "0.3.11" url = "2.2.2" diff --git a/src/config.rs b/src/config.rs index 99ae3106..4ed24a8a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,13 +1,8 @@ -use ethers::prelude::{Block, TxHash}; use governor::clock::QuantaClock; use serde::Deserialize; use std::collections::{BTreeMap, HashMap}; -use std::num::NonZeroU32; use std::sync::Arc; -use tokio::sync::mpsc; -use crate::block_watcher::BlockWatcherSender; -// use crate::block_watcher::BlockWatcher; use crate::connection::Web3Connection; use crate::Web3ProxyApp; @@ -47,30 +42,16 @@ impl Web3ConnectionConfig { pub async fn try_build( self, clock: &QuantaClock, - block_watcher_sender: BlockWatcherSender, http_client: Option, ) -> anyhow::Result> { - let hard_rate_limiter = if let Some(hard_limit) = self.hard_limit { - let quota = governor::Quota::per_second(NonZeroU32::new(hard_limit).unwrap()); - - let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock); - - Some(rate_limiter) - } else { - None - }; - Web3Connection::try_new( self.url, http_client, - block_watcher_sender, - hard_rate_limiter, + self.hard_limit, + Some(clock), self.soft_limit, ) .await - } - - pub fn url(&self) -> &str { - &self.url + .map(Arc::new) } } diff --git a/src/connections.rs b/src/connections.rs index 91058c4d..5401c062 100644 --- a/src/connections.rs +++ b/src/connections.rs @@ -13,6 +13,7 @@ use std::sync::Arc; use tokio::sync::mpsc; use tracing::warn; +use crate::config::Web3ConnectionConfig; use crate::connection::{JsonRpcForwardedResponse, Web3Connection}; #[derive(Clone, Default)] @@ -60,7 +61,7 @@ impl fmt::Debug for Web3Connections { impl Web3Connections { pub async fn try_new( // TODO: servers should be a Web3ConnectionBuilder struct - servers: Vec<(&str, u32, Option)>, + servers: Vec, http_client: Option, clock: &QuantaClock, ) -> anyhow::Result> { @@ -68,17 +69,8 @@ impl Web3Connections { let num_connections = servers.len(); - for (s, soft_rate_limit, hard_rate_limit) in servers.into_iter() { - let connection = Web3Connection::try_new( - s.to_string(), - http_client.clone(), - hard_rate_limit, - Some(clock), - soft_rate_limit, - ) - .await?; - - let connection = Arc::new(connection); + for server_config in servers.into_iter() { + let connection = server_config.try_build(clock, http_client.clone()).await?; connections.push(connection); } @@ -90,6 +82,7 @@ impl Web3Connections { for connection in connections.inner.iter() { // subscribe to new heads in a spawned future + // TODO: channel instead. then we can have one future with write access to a left-right let connection = Arc::clone(connection); let connections = connections.clone(); tokio::spawn(async move { diff --git a/src/main.rs b/src/main.rs index 81b095a2..dc75bf3d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,13 @@ +mod config; mod connection; mod connections; +use config::Web3ConnectionConfig; use futures::future; use governor::clock::{Clock, QuantaClock}; use serde_json::json; use std::fmt; +use std::fs; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; @@ -13,6 +16,7 @@ use tracing::warn; use warp::Filter; use warp::Reply; +use crate::config::RootConfig; use crate::connection::JsonRpcRequest; use crate::connections::Web3Connections; @@ -25,7 +29,7 @@ static APP_USER_AGENT: &str = concat!( /// The application // TODO: this debug impl is way too verbose. make something smaller -struct Web3ProxyApp { +pub struct Web3ProxyApp { /// clock used for rate limiting /// TODO: use tokio's clock (will require a different ratelimiting crate) clock: QuantaClock, @@ -44,25 +48,11 @@ impl fmt::Debug for Web3ProxyApp { impl Web3ProxyApp { async fn try_new( - balanced_rpc_tiers: Vec)>>, - private_rpcs: Vec<(&str, u32, Option)>, + balanced_rpc_tiers: Vec>, + private_rpcs: Vec, ) -> anyhow::Result { let clock = QuantaClock::default(); - let mut rpcs = vec![]; - for balanced_rpc_tier in balanced_rpc_tiers.iter() { - for rpc_data in balanced_rpc_tier { - let rpc = rpc_data.0.to_string(); - - rpcs.push(rpc); - } - } - for rpc_data in private_rpcs.iter() { - let rpc = rpc_data.0.to_string(); - - rpcs.push(rpc); - } - // make a http shared client // TODO: how should we configure the connection pool? // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server @@ -229,51 +219,30 @@ impl Web3ProxyApp { } #[tokio::main] -async fn main() { +async fn main() -> anyhow::Result<()> { // install global collector configured based on RUST_LOG env var. tracing_subscriber::fmt::init(); + // TODO: use flags for the config path + let config = "./data/config/example.toml"; + + let config: String = fs::read_to_string(config)?; + + let config: RootConfig = toml::from_str(&config)?; + // TODO: load the config from yaml instead of hard coding // TODO: support multiple chains in one process? then we could just point "chain.stytt.com" at this and caddy wouldn't need anything else // TODO: be smart about about using archive nodes? have a set that doesn't use archive nodes since queries to them are more valuable - let listen_port = 8445; - // TODO: what should this be? 0 will cause a thundering herd + let listen_port = config.config.listen_port; - let state = Web3ProxyApp::try_new( - vec![ - // local nodes - vec![ - ("ws://127.0.0.1:8545", 68_800, None), - ("ws://127.0.0.1:8946", 152_138, None), - ], - // paid nodes - // TODO: add paid nodes (with rate limits) - // vec![ - // // chainstack.com archive - // // moralis free (25/sec rate limit) - // ], - // free nodes - vec![ - ("https://main-rpc.linkpool.io", 4_779, None), // linkpool is slow and often offline - ("https://rpc.ankr.com/eth", 23_967, None), - ], - ], - vec![ - ("https://api.edennetwork.io/v1/", 1_805, None), - ("https://api.edennetwork.io/v1/beta", 300, None), - ("https://rpc.ethermine.org/", 5_861, None), - ("https://rpc.flashbots.net", 7074, None), - ], - ) - .await - .unwrap(); + let app = config.try_build().await?; - let state: Arc = Arc::new(state); + let app: Arc = Arc::new(app); let proxy_rpc_filter = warp::any() .and(warp::post()) .and(warp::body::json()) - .then(move |json_body| state.clone().proxy_web3_rpc(json_body)); + .then(move |json_body| app.clone().proxy_web3_rpc(json_body)); // TODO: filter for displaying connections and their block heights @@ -282,6 +251,8 @@ async fn main() { let routes = proxy_rpc_filter.map(handle_anyhow_errors); warp::serve(routes).run(([0, 0, 0, 0], listen_port)).await; + + Ok(()) } /// convert result into an http response. use this at the end of your warp filter