diff --git a/Cargo.lock b/Cargo.lock index fb58cc17..1b4cc439 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2496,6 +2496,17 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "proctitle" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924cd8a0de90723d63fed19c5035ea129913a0bc998b37686a67f1eaf6a2aab5" +dependencies = [ + "lazy_static", + "libc", + "winapi", +] + [[package]] name = "quanta" version = "0.9.3" @@ -3854,6 +3865,7 @@ dependencies = [ "hashbrown 0.12.1", "linkedhashmap", "parking_lot 0.12.0", + "proctitle", "regex", "reqwest", "rustc-hash", diff --git a/README.md b/README.md index 7bae1b54..f77e14d1 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,7 @@ Note: Testing with `getLatestBlockByNumber.lua` is not great because the latest ## Todo +- [ ] some production configs are using 100% cpu - [ ] after connecting to a server, check that it gives the expected chainId - [ ] if the fastest server has hit rate limits, we won't be able to serve any traffic until another server is synced. - [ ] proper logging with useful instrumentation @@ -79,6 +80,9 @@ Note: Testing with `getLatestBlockByNumber.lua` is not great because the latest - [ ] if a request gets a socket timeout, try on another server - maybe always try at least two servers in parallel? and then return the first? or only if the first one doesn't respond very quickly? - [ ] incoming rate limiting (by ip or by api key or what?) +- [ ] improve caching + - [ ] if the params include a block, we can cache for longer + - [ ] if the call is something simple like "symbol" or "decimals", cache that too - [ ] measure latency to nodes? - [ ] one proxy for mulitple chains? - [ ] zero downtime deploys diff --git a/config/example.toml b/config/example.toml index aeef796d..84177003 100644 --- a/config/example.toml +++ b/config/example.toml @@ -1,3 +1,6 @@ +[shared] +chain_id = 1 + [balanced_rpc_tiers] [balanced_rpc_tiers.0] diff --git a/linkedhashmap/LICENSE b/linkedhashmap/LICENSE index d62f41a9..e0d985c2 100644 --- a/linkedhashmap/LICENSE +++ b/linkedhashmap/LICENSE @@ -1 +1 @@ -https://github.com/quininer/linkedhashmap \ No newline at end of file +https://github.com/quininer/linkedhashmap diff --git a/web3-proxy/Cargo.toml b/web3-proxy/Cargo.toml index 1da21221..8698afac 100644 --- a/web3-proxy/Cargo.toml +++ b/web3-proxy/Cargo.toml @@ -17,6 +17,7 @@ governor = { version = "0.4.2", features = ["dashmap", "std"] } hashbrown = "0.12.1" linkedhashmap = { path = "../linkedhashmap" } parking_lot = "0.12.0" +proctitle = "0.1.1" regex = "1.5.5" reqwest = { version = "0.11.10", default-features = false, features = ["json", "rustls"] } rustc-hash = "1.1.0" diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index e811e1a9..cbba03e2 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -30,7 +30,8 @@ static APP_USER_AGENT: &str = concat!( const RESPONSE_CACHE_CAP: usize = 1024; /// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work -type ResponseLruCache = RwLock>; +type ResponseLruCache = + RwLock), JsonRpcForwardedResponse>>; /// The application // TODO: this debug impl is way too verbose. make something smaller @@ -61,6 +62,7 @@ impl fmt::Debug for Web3ProxyApp { impl Web3ProxyApp { pub async fn try_new( + chain_id: usize, balanced_rpc_tiers: Vec>, private_rpcs: Vec, ) -> anyhow::Result { @@ -81,6 +83,7 @@ impl Web3ProxyApp { let balanced_rpc_tiers = future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| { Web3Connections::try_new( + chain_id, best_head_block_number.clone(), balanced_rpc_tier, Some(http_client.clone()), @@ -100,6 +103,7 @@ impl Web3ProxyApp { } else { Some( Web3Connections::try_new( + chain_id, best_head_block_number.clone(), private_rpcs, Some(http_client), @@ -225,6 +229,9 @@ impl Web3ProxyApp { let deadline = not_until.wait_time_from(self.clock.now()); sleep(deadline).await; + } else { + // TODO: what should we do here? + return Err(anyhow::anyhow!("no private rpcs!")); } } }; @@ -246,7 +253,7 @@ impl Web3ProxyApp { for balanced_rpcs in rpc_iter { let best_head_block_number = - self.best_head_block_number.load(atomic::Ordering::Acquire); // TODO: we don't store current block for everything anymore. we store it on the connections + self.best_head_block_number.load(atomic::Ordering::Acquire); let best_rpc_block_number = balanced_rpcs.head_block_number(); @@ -259,11 +266,7 @@ impl Web3ProxyApp { let cache_key = ( best_head_block_number, request.method.clone(), - request - .params - .clone() - .map(|x| x.to_string()) - .unwrap_or_else(|| "[]".to_string()), + request.params.clone().map(|x| x.to_string()), ); if let Some(cached) = self.response_cache.read().await.get(&cache_key) { @@ -302,6 +305,7 @@ impl Web3ProxyApp { // TODO: cache the warp::reply to save us serializing every time response_cache.insert(cache_key, response.clone()); if response_cache.len() >= RESPONSE_CACHE_CAP { + // TODO: this isn't really an LRU. what is this called? should we make it an lru? these caches only live for one block response_cache.pop_front(); } @@ -412,7 +416,7 @@ impl Web3ProxyApp { // TODO: how long should we wait? // TODO: max wait time? warn!("No servers in sync!"); - // TODO: return json error? return a 502? + // TODO: return a 502? return Err(anyhow::anyhow!("no servers in sync")); }; } diff --git a/web3-proxy/src/config.rs b/web3-proxy/src/config.rs index c0e92476..dae3644b 100644 --- a/web3-proxy/src/config.rs +++ b/web3-proxy/src/config.rs @@ -21,11 +21,19 @@ pub struct CliConfig { #[derive(Deserialize)] pub struct RpcConfig { - // BTreeMap so that iterating keeps the same order + pub shared: RpcSharedConfig, + // BTreeMap so that iterating keeps the same order. we want tier 0 before tier 1! pub balanced_rpc_tiers: BTreeMap>, pub private_rpcs: Option>, } +/// shared configuration between Web3Connections +#[derive(Deserialize)] +pub struct RpcSharedConfig { + /// TODO: what type for chain_id? TODO: this isn't at the right level. this is inside a "Config" + pub chain_id: usize, +} + #[derive(Deserialize)] pub struct Web3ConnectionConfig { url: String, @@ -48,7 +56,7 @@ impl RpcConfig { vec![] }; - Web3ProxyApp::try_new(balanced_rpc_tiers, private_rpcs).await + Web3ProxyApp::try_new(self.shared.chain_id, balanced_rpc_tiers, private_rpcs).await } } @@ -57,9 +65,11 @@ impl Web3ConnectionConfig { pub async fn try_build( self, clock: &QuantaClock, + chain_id: usize, http_client: Option, ) -> anyhow::Result> { Web3Connection::try_new( + chain_id, self.url, http_client, self.hard_limit, @@ -67,6 +77,5 @@ impl Web3ConnectionConfig { self.soft_limit, ) .await - .map(Arc::new) } } diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 532c6454..b60402e1 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -7,7 +7,6 @@ use governor::middleware::NoOpMiddleware; use governor::state::{InMemoryState, NotKeyed}; use governor::NotUntil; use governor::RateLimiter; -use serde_json::value::RawValue; use std::fmt; use std::num::NonZeroU32; use std::sync::atomic::{self, AtomicU32, AtomicU64}; @@ -66,13 +65,14 @@ impl fmt::Display for Web3Connection { impl Web3Connection { /// Connect to a web3 rpc and subscribe to new heads pub async fn try_new( + chain_id: usize, url_str: String, http_client: Option, hard_rate_limit: Option, clock: &QuantaClock, // TODO: think more about this type soft_limit: u32, - ) -> anyhow::Result { + ) -> anyhow::Result> { let hard_rate_limiter = if let Some(hard_rate_limit) = hard_rate_limit { let quota = governor::Quota::per_second(NonZeroU32::new(hard_rate_limit).unwrap()); @@ -108,7 +108,7 @@ impl Web3Connection { return Err(anyhow::anyhow!("only http and ws servers are supported")); }; - Ok(Web3Connection { + let connection = Web3Connection { clock: clock.clone(), url: url_str.clone(), active_requests: Default::default(), @@ -116,7 +116,35 @@ impl Web3Connection { ratelimiter: hard_rate_limiter, soft_limit, head_block_number: 0.into(), - }) + }; + + let connection = Arc::new(connection); + + // TODO: check the chain_id here + let active_request_handle = connection.wait_for_request_handle().await; + + // TODO: passing empty_params like this feels awkward. + let empty_params: Option<()> = None; + let found_chain_id: String = active_request_handle + .request("eth_chainId", empty_params) + .await + .unwrap(); + + let found_chain_id = + usize::from_str_radix(found_chain_id.trim_start_matches("0x"), 16).unwrap(); + + if chain_id != found_chain_id { + return Err(anyhow::anyhow!( + "incorrect chain id! Expected {}. Found {}", + chain_id, + found_chain_id + )); + } + + // TODO: use anyhow + assert_eq!(chain_id, found_chain_id); + + Ok(connection) } #[inline] @@ -233,7 +261,6 @@ impl Web3Connection { } pub async fn wait_for_request_handle(self: &Arc) -> ActiveRequestHandle { - // rate limits loop { match self.try_request_handle() { Ok(pending_request_handle) => return pending_request_handle, @@ -244,8 +271,6 @@ impl Web3Connection { } } } - - // TODO: return a thing that when we drop it decrements? } pub fn try_request_handle( @@ -290,11 +315,15 @@ impl ActiveRequestHandle { /// Send a web3 request /// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented /// By taking self here, we ensure that this is dropped after the request is complete - pub async fn request( - self, + pub async fn request( + &self, method: &str, - params: &Option>, - ) -> Result, ethers::prelude::ProviderError> { + params: T, + ) -> Result + where + T: fmt::Debug + serde::Serialize + Send + Sync, + R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug, + { // TODO: this should probably be trace level and use a span // TODO: it would be nice to have the request id on this trace!("Sending {}({:?}) to {}", method, params, self.0); diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 583d4612..a32fa31b 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -61,6 +61,7 @@ impl fmt::Debug for Web3Connections { impl Web3Connections { pub async fn try_new( + chain_id: usize, best_head_block_number: Arc, servers: Vec, http_client: Option, @@ -72,7 +73,10 @@ impl Web3Connections { let num_connections = servers.len(); for server_config in servers.into_iter() { - match server_config.try_build(clock, http_client.clone()).await { + match server_config + .try_build(clock, chain_id, http_client.clone()) + .await + { Ok(connection) => connections.push(connection), Err(e) => warn!("Unable to connect to a server! {}", e), } diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index cac85089..a659f4c4 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -5,7 +5,9 @@ mod connections; mod jsonrpc; use std::fs; +use std::sync::atomic::{self, AtomicUsize}; use std::sync::Arc; +use tokio::runtime; use tracing::info; use warp::Filter; use warp::Reply; @@ -13,8 +15,7 @@ use warp::Reply; use crate::app::Web3ProxyApp; use crate::config::{CliConfig, RpcConfig}; -#[tokio::main] -async fn main() -> anyhow::Result<()> { +fn main() -> anyhow::Result<()> { // install global collector configured based on RUST_LOG env var. tracing_subscriber::fmt::init(); @@ -24,27 +25,44 @@ async fn main() -> anyhow::Result<()> { let rpc_config: String = fs::read_to_string(cli_config.rpc_config_path)?; let rpc_config: RpcConfig = toml::from_str(&rpc_config)?; - // TODO: load the config from yaml instead of hard coding - // TODO: support multiple chains in one process? then we could just point "chain.stytt.com" at this and caddy wouldn't need anything else - // TODO: be smart about about using archive nodes? have a set that doesn't use archive nodes since queries to them are more valuable - let listen_port = cli_config.listen_port; + // TODO: setting title inside of tokio doesnt seem to work. lets do it outside of tokio + proctitle::set_title(format!("web3-proxy-{}", rpc_config.shared.chain_id)); - let app = rpc_config.try_build().await?; + let chain_id = rpc_config.shared.chain_id; - let app: Arc = Arc::new(app); + let rt = runtime::Builder::new_multi_thread() + .worker_threads(4) + .enable_all() + .thread_name_fn(move || { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + // TODO: what ordering? + let worker_id = ATOMIC_ID.fetch_add(1, atomic::Ordering::SeqCst); + // TODO: i think these max at 15 characters + format!("web3-{}-{}", chain_id, worker_id) + }) + .build()?; - let proxy_rpc_filter = warp::any() - .and(warp::post()) - .and(warp::body::json()) - .then(move |json_body| app.clone().proxy_web3_rpc(json_body)); + // spawn the root task + rt.block_on(async { + let listen_port = cli_config.listen_port; - // TODO: filter for displaying connections and their block heights + let app = rpc_config.try_build().await.unwrap(); - // TODO: warp trace is super verbose. how do we make this more readable? - // let routes = proxy_rpc_filter.with(warp::trace::request()); - let routes = proxy_rpc_filter.map(handle_anyhow_errors); + let app: Arc = Arc::new(app); - warp::serve(routes).run(([0, 0, 0, 0], listen_port)).await; + let proxy_rpc_filter = warp::any() + .and(warp::post()) + .and(warp::body::json()) + .then(move |json_body| app.clone().proxy_web3_rpc(json_body)); + + // TODO: filter for displaying connections and their block heights + + // TODO: warp trace is super verbose. how do we make this more readable? + // let routes = proxy_rpc_filter.with(warp::trace::request()); + let routes = proxy_rpc_filter.map(handle_anyhow_errors); + + warp::serve(routes).run(([0, 0, 0, 0], listen_port)).await; + }); Ok(()) }