diff --git a/Cargo.lock b/Cargo.lock index 74c7adfc..c20fcc33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1825,6 +1825,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" dependencies = [ "ahash", + "serde", ] [[package]] diff --git a/TODO.md b/TODO.md index 7615a9d4..20463d13 100644 --- a/TODO.md +++ b/TODO.md @@ -51,13 +51,11 @@ - i think now that we retry header not found and similar, caching errors should be fine - [x] RESPONSE_CACHE_CAP from config - [x] web3_sha3 rpc command +- [ ] test that launches anvil and connects the proxy to it - [ ] if the fastest server has hit rate limits, we won't be able to serve any traffic until another server is synced. - thundering herd problem if we only allow a lag of 0 blocks - we can improve this by only `publish`ing the sorted list once a threshold of total available soft and hard limits is passed. how can we do this without hammering redis? at least its only once per block per server - [ ] basic request method stats -- [ ] fully test retrying when "header not found" - - i saw "header not found" on a simple eth_getCode query to a public load balanced bsc archive node on block 1 -- [ ] test that launches anvil and connects the proxy to it - [ ] nice output when cargo doc is run ## V1 @@ -178,3 +176,5 @@ in another repo: event subscriber - [ ] Archive check on BSC gave “archive” when it isn’t. and FTM gave 90k for all servers even though they should be archive - [ ] cache eth_getLogs in a database? - [ ] stats for "read amplification". how many backend requests do we send compared to frontend requests we received? +- [ ] fully test retrying when "header not found" + - i saw "header not found" on a simple eth_getCode query to a public load balanced bsc archive node on block 1 diff --git a/web3-proxy/Cargo.toml b/web3-proxy/Cargo.toml index c03193e9..d1eb8c4d 100644 --- a/web3-proxy/Cargo.toml +++ b/web3-proxy/Cargo.toml @@ -22,7 +22,7 @@ ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", " fdlimit = "0.2.1" flume = "0.10.14" futures = { version = "0.3.21", features = ["thread-pool"] } -hashbrown = "0.12.3" +hashbrown = { version = "0.12.3", features = ["serde"] } indexmap = "1.9.1" linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] } notify = "4.0.17" diff --git a/web3-proxy/src/config.rs b/web3-proxy/src/config.rs index 8021bf12..188ba9ce 100644 --- a/web3-proxy/src/config.rs +++ b/web3-proxy/src/config.rs @@ -1,7 +1,8 @@ use argh::FromArgs; +use derive_more::Constructor; use ethers::prelude::{Block, TxHash}; +use hashbrown::HashMap; use serde::Deserialize; -use std::collections::HashMap; use std::sync::Arc; use tokio::sync::broadcast; @@ -55,7 +56,7 @@ fn default_response_cache_max_bytes() -> usize { 10_usize.pow(8) } -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Constructor)] pub struct Web3ConnectionConfig { url: String, soft_limit: u32, diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index cccbc696..36da27c8 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -19,7 +19,7 @@ use tracing::{error, info, info_span, instrument, trace, warn, Instrument}; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::BlockAndRpc; -/// TODO: instead of an enum, I tried to use Box, but hit https://github.com/gakonst/ethers-rs/issues/592 +/// TODO: instead of an enum, I tried to use Box, but hit #[derive(From)] pub enum Web3Provider { Http(ethers::providers::Provider), diff --git a/web3-proxy/src/frontend/errors.rs b/web3-proxy/src/frontend/errors.rs index 606ac1f1..730bea35 100644 --- a/web3-proxy/src/frontend/errors.rs +++ b/web3-proxy/src/frontend/errors.rs @@ -10,7 +10,7 @@ pub async fn handler_404() -> impl IntoResponse { } /// handle errors by converting them into something that implements `IntoResponse` -/// TODO: use this. i can't get https://docs.rs/axum/latest/axum/error_handling/index.html to work +/// TODO: use this. i can't get to work /// TODO: i think we want a custom result type instead. put the anyhow result inside. then `impl IntoResponse for CustomResult` pub async fn handle_anyhow_error( code: Option, diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index 037845f0..9aaea8c9 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -21,34 +21,12 @@ use tracing_subscriber::EnvFilter; use crate::app::{flatten_handle, Web3ProxyApp}; use crate::config::{AppConfig, CliConfig}; -fn main() -> anyhow::Result<()> { - // if RUST_LOG isn't set, configure a default - // TODO: is there a better way to do this? - if std::env::var("RUST_LOG").is_err() { - std::env::set_var("RUST_LOG", "info,web3_proxy=debug"); - } - - // install global collector configured based on RUST_LOG env var. - tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .compact() - .init(); - - // this probably won't matter for us in docker, but better safe than sorry - fdlimit::raise_fd_limit(); - - // initial configuration from flags - let cli_config: CliConfig = argh::from_env(); - - // advanced configuration - info!("Loading rpc config @ {}", cli_config.config); - let rpc_config: String = fs::read_to_string(cli_config.config)?; - let rpc_config: AppConfig = toml::from_str(&rpc_config)?; - - debug!(?rpc_config); - - // TODO: this doesn't seem to do anything - proctitle::set_title(format!("web3-proxy-{}", rpc_config.shared.chain_id)); +fn run( + shutdown_receiver: flume::Receiver<()>, + cli_config: CliConfig, + app_config: AppConfig, +) -> anyhow::Result<()> { + debug!(?cli_config, ?app_config); // spawn a thread for deadlock detection thread::spawn(move || loop { @@ -71,7 +49,7 @@ fn main() -> anyhow::Result<()> { // set up tokio's async runtime let mut rt_builder = runtime::Builder::new_multi_thread(); - let chain_id = rpc_config.shared.chain_id; + let chain_id = app_config.shared.chain_id; rt_builder.enable_all().thread_name_fn(move || { static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); // TODO: what ordering? i think we want seqcst so that these all happen in order, but that might be stricter than we really need @@ -93,11 +71,12 @@ fn main() -> anyhow::Result<()> { debug!(?num_workers); rt.block_on(async { - let (app, app_handle) = Web3ProxyApp::spawn(rpc_config, num_workers).await?; + let (app, app_handle) = Web3ProxyApp::spawn(app_config, num_workers).await?; let frontend_handle = tokio::spawn(frontend::run(cli_config.port, app)); // if everything is working, these should both run forever + // TODO: select on the shutdown marker, here? tokio::select! { x = app_handle => { match x { @@ -115,8 +94,129 @@ fn main() -> anyhow::Result<()> { } } } + _ = shutdown_receiver.recv_async() => { + info!("shutdown signal"); + + // TODO: wait for outstanding requests to complete. graceful shutdown will make our users happier + + return Ok(()) + } }; Ok(()) }) } + +fn main() -> anyhow::Result<()> { + // if RUST_LOG isn't set, configure a default + // TODO: is there a better way to do this? + if std::env::var("RUST_LOG").is_err() { + std::env::set_var("RUST_LOG", "info,web3_proxy=debug"); + } + + // install global collector configured based on RUST_LOG env var. + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .compact() + .init(); + + // this probably won't matter for us in docker, but better safe than sorry + fdlimit::raise_fd_limit(); + + // initial configuration from flags + let cli_config: CliConfig = argh::from_env(); + + // advanced configuration is on disk + info!("Loading rpc config @ {}", cli_config.config); + let app_config: String = fs::read_to_string(cli_config.config.clone())?; + let app_config: AppConfig = toml::from_str(&app_config)?; + + // TODO: this doesn't seem to do anything + proctitle::set_title(format!("web3-proxy-{}", app_config.shared.chain_id)); + + let (_shutdown_sender, shutdown_receiver) = flume::bounded(1); + + run(shutdown_receiver, cli_config, app_config) +} + +#[cfg(test)] +mod tests { + use ethers::{ + prelude::{Http, Provider, U256}, + utils::Anvil, + }; + use hashbrown::HashMap; + use std::env; + + use crate::config::{RpcSharedConfig, Web3ConnectionConfig}; + + use super::*; + + #[tokio::test] + async fn it_works() { + // TODO: move basic setup into a test fixture + let path = env::var("PATH").unwrap(); + + println!("path: {}", path); + + // TODO: opton for super verbose option + std::env::set_var("RUST_LOG", "info,web3_proxy=debug"); + + // install global collector configured based on RUST_LOG env var. + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .compact() + .init(); + + let anvil = Anvil::new().spawn(); + + println!("Anvil running at `{}`", anvil.endpoint()); + + let provider = Provider::::try_from(anvil.endpoint()).unwrap(); + + // mine a block because my code doesn't like being on block 0 + // TODO: make block 0 okay? + let head_block_num: U256 = provider.request("evm_mine", None::<()>).await.unwrap(); + + // make a test CliConfig + let cli_config = CliConfig { + port: 0, + workers: 2, + config: "./does/not/exist/test.toml".to_string(), + }; + + // make a test AppConfig + let app_config = AppConfig { + shared: RpcSharedConfig { + chain_id: 31337, + redis_url: None, + public_rate_limit_per_minute: 0, + response_cache_max_bytes: 10_usize.pow(7), + }, + balanced_rpcs: HashMap::from([ + ( + "anvil".to_string(), + Web3ConnectionConfig::new(anvil.endpoint(), 100, None), + ), + ( + "anvil_ws".to_string(), + Web3ConnectionConfig::new(anvil.ws_endpoint(), 100, None), + ), + ]), + private_rpcs: None, + }; + + let (shutdown_sender, shutdown_receiver) = flume::bounded(1); + + // spawn another thread for running the app + // TODO: + let handle = thread::spawn(move || run(shutdown_receiver, cli_config, app_config)); + + // TODO: do something to the node. query latest block, mine another block, query again + + shutdown_sender.send(()).unwrap(); + + println!("waiting for shutdown..."); + handle.join().unwrap().unwrap(); + } +}