web3-proxy/web3_proxy/src/bin/web3_proxy.rs

264 lines
8.8 KiB
Rust
Raw Normal View History

2022-08-06 08:49:52 +03:00
//! Web3_proxy is a fast caching and load balancing proxy for web3 (Ethereum or similar) JsonRPC servers.
//!
//! Signed transactions (eth_sendRawTransaction) are sent in parallel to the configured private RPCs (eden, ethermine, flashbots, etc.).
//!
//! All other requests are sent to an RPC server on the latest block (alchemy, moralis, rivet, your own node, or one of many other providers).
//! If multiple servers are in sync, the fastest server is prioritized. Since the fastest server is most likely to serve requests, slow servers are unlikely to ever get any requests.
2022-05-29 17:50:08 +03:00
//#![warn(missing_docs)]
2022-07-25 21:21:58 +03:00
#![forbid(unsafe_code)]
2022-05-16 08:16:32 +03:00
use parking_lot::deadlock;
2022-05-05 22:07:09 +03:00
use std::fs;
2022-05-12 21:49:57 +03:00
use std::sync::atomic::{self, AtomicUsize};
2022-05-16 08:16:32 +03:00
use std::thread;
2022-05-12 21:49:57 +03:00
use tokio::runtime;
2022-08-07 09:48:57 +03:00
use tokio::time::Duration;
2022-07-22 22:30:39 +03:00
use tracing::{debug, info};
2022-05-20 08:27:18 +03:00
use tracing_subscriber::EnvFilter;
2022-08-05 22:22:23 +03:00
use web3_proxy::app::{flatten_handle, Web3ProxyApp};
use web3_proxy::config::{AppConfig, CliConfig};
use web3_proxy::frontend;
2022-06-16 05:53:37 +03:00
2022-07-23 02:26:04 +03:00
fn run(
shutdown_receiver: flume::Receiver<()>,
cli_config: CliConfig,
app_config: AppConfig,
) -> anyhow::Result<()> {
debug!(?cli_config, ?app_config);
2022-05-05 22:07:09 +03:00
2022-05-16 08:16:32 +03:00
// spawn a thread for deadlock detection
thread::spawn(move || loop {
thread::sleep(Duration::from_secs(10));
let deadlocks = deadlock::check_deadlock();
if deadlocks.is_empty() {
continue;
}
println!("{} deadlocks detected", deadlocks.len());
for (i, threads) in deadlocks.iter().enumerate() {
println!("Deadlock #{}", i);
for t in threads {
println!("Thread Id {:#?}", t.thread_id());
println!("{:#?}", t.backtrace());
}
}
});
2022-07-14 02:24:47 +03:00
// set up tokio's async runtime
let mut rt_builder = runtime::Builder::new_multi_thread();
2022-07-23 02:26:04 +03:00
let chain_id = app_config.shared.chain_id;
2022-07-14 02:24:47 +03:00
rt_builder.enable_all().thread_name_fn(move || {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
// TODO: what ordering? i think we want seqcst so that these all happen in order, but that might be stricter than we really need
let worker_id = ATOMIC_ID.fetch_add(1, atomic::Ordering::SeqCst);
// TODO: i think these max at 15 characters
format!("web3-{}-{}", chain_id, worker_id)
});
if cli_config.workers > 0 {
rt_builder.worker_threads(cli_config.workers);
}
2022-07-08 21:27:06 +03:00
// start tokio's async runtime
let rt = rt_builder.build()?;
2022-07-09 02:02:32 +03:00
// we use this worker count to also set our redis connection pool size
// TODO: think about this more
let num_workers = rt.metrics().num_workers();
debug!(?num_workers);
2022-05-12 21:49:57 +03:00
rt.block_on(async {
2022-07-23 02:26:04 +03:00
let (app, app_handle) = Web3ProxyApp::spawn(app_config, num_workers).await?;
2022-06-14 08:43:28 +03:00
let frontend_handle = tokio::spawn(frontend::serve(cli_config.port, app));
2022-06-14 08:43:28 +03:00
2022-06-16 05:53:37 +03:00
// if everything is working, these should both run forever
// TODO: try_join these instead? use signal_shutdown here?
2022-06-16 05:53:37 +03:00
tokio::select! {
2022-06-16 20:51:49 +03:00
x = app_handle => {
2022-07-08 21:27:06 +03:00
match x {
Ok(_) => info!("app_handle exited"),
Err(e) => {
return Err(e);
}
}
2022-06-14 08:43:28 +03:00
}
2022-06-16 05:53:37 +03:00
x = flatten_handle(frontend_handle) => {
2022-07-08 21:27:06 +03:00
match x {
Ok(_) => info!("frontend exited"),
Err(e) => {
return Err(e);
}
}
2022-06-14 08:43:28 +03:00
}
2022-07-23 02:26:04 +03:00
_ = shutdown_receiver.recv_async() => {
// TODO: think more about this. we need some way for tests to tell the app to stop
info!("received shutdown signal");
2022-07-23 02:26:04 +03:00
// TODO: wait for outstanding requests to complete. graceful shutdown will make our users happier
return Ok(())
}
2022-06-16 05:53:37 +03:00
};
2022-05-05 22:07:09 +03:00
2022-06-14 08:43:28 +03:00
Ok(())
2022-05-18 19:35:06 +03:00
})
2022-05-05 22:07:09 +03:00
}
2022-07-23 02:26:04 +03:00
fn main() -> anyhow::Result<()> {
// if RUST_LOG isn't set, configure a default
// TODO: is there a better way to do this?
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info,web3_proxy=debug");
}
// install global collector configured based on RUST_LOG env var.
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.compact()
.init();
// this probably won't matter for us in docker, but better safe than sorry
fdlimit::raise_fd_limit();
// initial configuration from flags
let cli_config: CliConfig = argh::from_env();
// advanced configuration is on disk
info!("Loading rpc config @ {}", cli_config.config);
let app_config: String = fs::read_to_string(cli_config.config.clone())?;
let app_config: AppConfig = toml::from_str(&app_config)?;
// TODO: this doesn't seem to do anything
2022-08-06 08:49:52 +03:00
proctitle::set_title(format!("web3_proxy-{}", app_config.shared.chain_id));
2022-07-23 02:26:04 +03:00
let (_shutdown_sender, shutdown_receiver) = flume::bounded(1);
run(shutdown_receiver, cli_config, app_config)
}
#[cfg(test)]
mod tests {
use ethers::{
2022-07-23 03:19:13 +03:00
prelude::{Block, Http, Provider, TxHash, U256},
2022-07-23 02:26:04 +03:00
utils::Anvil,
};
use hashbrown::HashMap;
use std::env;
2022-08-05 22:22:23 +03:00
use web3_proxy::config::{RpcSharedConfig, Web3ConnectionConfig};
2022-07-23 02:26:04 +03:00
use super::*;
#[tokio::test]
async fn it_works() {
// TODO: move basic setup into a test fixture
let path = env::var("PATH").unwrap();
println!("path: {}", path);
2022-07-23 03:36:07 +03:00
// TODO: how should we handle logs in this?
// TODO: option for super verbose logs
2022-07-23 02:26:04 +03:00
std::env::set_var("RUST_LOG", "info,web3_proxy=debug");
// install global collector configured based on RUST_LOG env var.
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.compact()
2022-07-26 07:53:38 +03:00
.with_test_writer()
2022-07-23 02:26:04 +03:00
.init();
let anvil = Anvil::new().spawn();
println!("Anvil running at `{}`", anvil.endpoint());
2022-07-23 03:36:07 +03:00
let anvil_provider = Provider::<Http>::try_from(anvil.endpoint()).unwrap();
2022-07-23 02:26:04 +03:00
// mine a block because my code doesn't like being on block 0
// TODO: make block 0 okay?
2022-07-23 03:36:07 +03:00
let _: U256 = anvil_provider
.request("evm_mine", None::<()>)
.await
.unwrap();
2022-07-23 02:26:04 +03:00
// make a test CliConfig
let cli_config = CliConfig {
port: 0,
2022-07-23 03:19:13 +03:00
workers: 4,
2022-07-23 02:26:04 +03:00
config: "./does/not/exist/test.toml".to_string(),
};
// make a test AppConfig
let app_config = AppConfig {
shared: RpcSharedConfig {
chain_id: 31337,
2022-07-26 07:53:38 +03:00
db_url: None,
2022-07-23 02:26:04 +03:00
redis_url: None,
public_rate_limit_per_minute: 0,
response_cache_max_bytes: 10_usize.pow(7),
},
balanced_rpcs: HashMap::from([
(
"anvil".to_string(),
2022-08-08 22:57:54 +03:00
Web3ConnectionConfig::new(anvil.endpoint(), 100, None, 1),
2022-07-23 02:26:04 +03:00
),
(
"anvil_ws".to_string(),
2022-08-08 22:57:54 +03:00
Web3ConnectionConfig::new(anvil.ws_endpoint(), 100, None, 0),
2022-07-23 02:26:04 +03:00
),
]),
private_rpcs: None,
};
let (shutdown_sender, shutdown_receiver) = flume::bounded(1);
// spawn another thread for running the app
2022-07-23 03:36:07 +03:00
// TODO: allow launching into the local tokio runtime instead of creating a new one?
2022-07-23 02:26:04 +03:00
let handle = thread::spawn(move || run(shutdown_receiver, cli_config, app_config));
// TODO: do something to the node. query latest block, mine another block, query again
2022-07-23 03:19:13 +03:00
let proxy_provider = Provider::<Http>::try_from(anvil.endpoint()).unwrap();
2022-07-23 02:26:04 +03:00
2022-07-23 03:36:07 +03:00
let anvil_result: Block<TxHash> = anvil_provider
2022-07-23 03:19:13 +03:00
.request("eth_getBlockByNumber", ("latest", true))
.await
.unwrap();
let proxy_result: Block<TxHash> = proxy_provider
.request("eth_getBlockByNumber", ("latest", true))
.await
.unwrap();
assert_eq!(anvil_result, proxy_result);
let first_block_num = anvil_result.number.unwrap();
2022-07-23 03:36:07 +03:00
let _: U256 = anvil_provider
.request("evm_mine", None::<()>)
.await
.unwrap();
2022-07-23 03:19:13 +03:00
2022-07-23 03:36:07 +03:00
let anvil_result: Block<TxHash> = anvil_provider
2022-07-23 03:19:13 +03:00
.request("eth_getBlockByNumber", ("latest", true))
.await
.unwrap();
let proxy_result: Block<TxHash> = proxy_provider
.request("eth_getBlockByNumber", ("latest", true))
.await
.unwrap();
assert_eq!(anvil_result, proxy_result);
let second_block_num = anvil_result.number.unwrap();
assert_ne!(first_block_num, second_block_num);
// tell the test app to shut down
2022-07-23 02:26:04 +03:00
shutdown_sender.send(()).unwrap();
println!("waiting for shutdown...");
2022-07-23 03:40:15 +03:00
// TODO: panic if a timeout is reached
2022-07-23 02:26:04 +03:00
handle.join().unwrap().unwrap();
}
}