2022-08-06 08:49:52 +03:00
//! Web3_proxy is a fast caching and load balancing proxy for web3 (Ethereum or similar) JsonRPC servers.
2022-07-25 21:53:11 +03:00
//!
//! Signed transactions (eth_sendRawTransaction) are sent in parallel to the configured private RPCs (eden, ethermine, flashbots, etc.).
//!
//! All other requests are sent to an RPC server on the latest block (alchemy, moralis, rivet, your own node, or one of many other providers).
//! If multiple servers are in sync, the fastest server is prioritized. Since the fastest server is most likely to serve requests, slow servers are unlikely to ever get any requests.
2022-05-29 17:50:08 +03:00
2022-07-25 21:53:11 +03:00
//#![warn(missing_docs)]
2022-07-25 21:21:58 +03:00
#![ forbid(unsafe_code) ]
2022-07-25 21:53:11 +03:00
2022-10-21 01:51:56 +03:00
use futures ::StreamExt ;
2022-05-16 08:16:32 +03:00
use parking_lot ::deadlock ;
2022-05-05 22:07:09 +03:00
use std ::fs ;
2022-05-12 21:49:57 +03:00
use std ::sync ::atomic ::{ self , AtomicUsize } ;
2022-05-16 08:16:32 +03:00
use std ::thread ;
2022-05-12 21:49:57 +03:00
use tokio ::runtime ;
2022-10-21 01:51:56 +03:00
use tokio ::sync ::broadcast ;
2022-08-07 09:48:57 +03:00
use tokio ::time ::Duration ;
2022-10-21 02:50:23 +03:00
use tracing ::{ debug , info , warn } ;
2022-10-25 00:07:29 +03:00
use tracing_subscriber ::prelude ::* ;
2022-05-20 08:27:18 +03:00
use tracing_subscriber ::EnvFilter ;
2022-10-21 01:51:56 +03:00
use web3_proxy ::app ::{ flatten_handle , flatten_handles , Web3ProxyApp } ;
2022-08-12 22:07:14 +03:00
use web3_proxy ::config ::{ CliConfig , TopConfig } ;
2022-09-20 09:56:24 +03:00
use web3_proxy ::{ frontend , metrics_frontend } ;
2022-06-16 05:53:37 +03:00
2022-07-23 02:26:04 +03:00
fn run (
2022-10-21 01:51:56 +03:00
shutdown_sender : broadcast ::Sender < ( ) > ,
2022-07-23 02:26:04 +03:00
cli_config : CliConfig ,
2022-08-12 22:07:14 +03:00
top_config : TopConfig ,
2022-07-23 02:26:04 +03:00
) -> anyhow ::Result < ( ) > {
2022-08-12 22:07:14 +03:00
debug! ( ? cli_config , ? top_config ) ;
2022-05-05 22:07:09 +03:00
2022-05-16 08:16:32 +03:00
// spawn a thread for deadlock detection
thread ::spawn ( move | | loop {
thread ::sleep ( Duration ::from_secs ( 10 ) ) ;
let deadlocks = deadlock ::check_deadlock ( ) ;
if deadlocks . is_empty ( ) {
continue ;
}
println! ( " {} deadlocks detected " , deadlocks . len ( ) ) ;
for ( i , threads ) in deadlocks . iter ( ) . enumerate ( ) {
println! ( " Deadlock # {} " , i ) ;
for t in threads {
println! ( " Thread Id {:#?} " , t . thread_id ( ) ) ;
println! ( " {:#?} " , t . backtrace ( ) ) ;
}
}
} ) ;
2022-07-14 02:24:47 +03:00
// set up tokio's async runtime
let mut rt_builder = runtime ::Builder ::new_multi_thread ( ) ;
2022-08-12 22:07:14 +03:00
let chain_id = top_config . app . chain_id ;
2022-07-14 02:24:47 +03:00
rt_builder . enable_all ( ) . thread_name_fn ( move | | {
static ATOMIC_ID : AtomicUsize = AtomicUsize ::new ( 0 ) ;
// TODO: what ordering? i think we want seqcst so that these all happen in order, but that might be stricter than we really need
let worker_id = ATOMIC_ID . fetch_add ( 1 , atomic ::Ordering ::SeqCst ) ;
// TODO: i think these max at 15 characters
format! ( " web3- {} - {} " , chain_id , worker_id )
} ) ;
if cli_config . workers > 0 {
rt_builder . worker_threads ( cli_config . workers ) ;
}
2022-07-08 21:27:06 +03:00
// start tokio's async runtime
let rt = rt_builder . build ( ) ? ;
2022-07-09 02:02:32 +03:00
2022-09-14 09:18:13 +03:00
let num_workers = rt . metrics ( ) . num_workers ( ) ;
2022-07-09 02:02:32 +03:00
debug! ( ? num_workers ) ;
2022-05-12 21:49:57 +03:00
rt . block_on ( async {
2022-08-13 00:00:26 +03:00
let app_frontend_port = cli_config . port ;
let app_prometheus_port = cli_config . prometheus_port ;
2022-10-31 23:05:58 +03:00
let mut spawned_app =
2022-10-21 01:51:56 +03:00
Web3ProxyApp ::spawn ( top_config , num_workers , shutdown_sender . subscribe ( ) ) . await ? ;
2022-08-13 00:00:26 +03:00
2022-10-31 23:05:58 +03:00
let frontend_handle =
tokio ::spawn ( frontend ::serve ( app_frontend_port , spawned_app . app . clone ( ) ) ) ;
2022-08-13 00:00:26 +03:00
2022-10-31 23:05:58 +03:00
let prometheus_handle = tokio ::spawn ( metrics_frontend ::serve (
spawned_app . app ,
app_prometheus_port ,
) ) ;
2022-06-14 08:43:28 +03:00
2022-06-16 05:53:37 +03:00
// if everything is working, these should both run forever
2022-10-18 00:47:58 +03:00
// TODO: join these instead and use shutdown handler properly. probably use tokio's ctrl+c helper
2022-06-16 05:53:37 +03:00
tokio ::select! {
2022-10-31 23:05:58 +03:00
x = flatten_handles ( spawned_app . app_handles ) = > {
2022-07-08 21:27:06 +03:00
match x {
Ok ( _ ) = > info! ( " app_handle exited " ) ,
Err ( e ) = > {
return Err ( e ) ;
}
}
2022-06-14 08:43:28 +03:00
}
2022-06-16 05:53:37 +03:00
x = flatten_handle ( frontend_handle ) = > {
2022-07-08 21:27:06 +03:00
match x {
Ok ( _ ) = > info! ( " frontend exited " ) ,
Err ( e ) = > {
return Err ( e ) ;
}
}
2022-06-14 08:43:28 +03:00
}
2022-08-13 00:00:26 +03:00
x = flatten_handle ( prometheus_handle ) = > {
match x {
Ok ( _ ) = > info! ( " prometheus exited " ) ,
Err ( e ) = > {
return Err ( e ) ;
}
}
}
2022-10-21 01:51:56 +03:00
x = tokio ::signal ::ctrl_c ( ) = > {
match x {
Ok ( _ ) = > info! ( " quiting from ctrl-c " ) ,
Err ( e ) = > {
return Err ( e . into ( ) ) ;
}
}
2022-07-23 02:26:04 +03:00
}
2022-06-16 05:53:37 +03:00
} ;
2022-05-05 22:07:09 +03:00
2022-10-21 01:51:56 +03:00
// one of the handles stopped. send a value so the others know to shut down
2022-10-21 02:50:23 +03:00
if let Err ( err ) = shutdown_sender . send ( ( ) ) {
warn! ( ? err , " shutdown sender " ) ;
} ;
2022-10-21 01:51:56 +03:00
// wait on all the important background tasks (like saving stats to the database) to complete
2022-10-31 23:05:58 +03:00
while let Some ( x ) = spawned_app . background_handles . next ( ) . await {
2022-10-21 01:51:56 +03:00
match x {
Err ( e ) = > return Err ( e . into ( ) ) ,
Ok ( Err ( e ) ) = > return Err ( e ) ,
Ok ( Ok ( _ ) ) = > continue ,
}
}
2022-10-10 07:15:07 +03:00
info! ( " finished " ) ;
2022-06-14 08:43:28 +03:00
Ok ( ( ) )
2022-05-18 19:35:06 +03:00
} )
2022-05-05 22:07:09 +03:00
}
2022-07-23 02:26:04 +03:00
fn main ( ) -> anyhow ::Result < ( ) > {
// if RUST_LOG isn't set, configure a default
// TODO: is there a better way to do this?
if std ::env ::var ( " RUST_LOG " ) . is_err ( ) {
2022-09-07 01:55:17 +03:00
std ::env ::set_var (
" RUST_LOG " ,
" info,ethers=debug,redis_rate_limit=debug,web3_proxy=debug " ,
) ;
2022-07-23 02:26:04 +03:00
}
// this probably won't matter for us in docker, but better safe than sorry
fdlimit ::raise_fd_limit ( ) ;
// initial configuration from flags
let cli_config : CliConfig = argh ::from_env ( ) ;
// advanced configuration is on disk
2022-08-12 22:07:14 +03:00
let top_config : String = fs ::read_to_string ( cli_config . config . clone ( ) ) ? ;
let top_config : TopConfig = toml ::from_str ( & top_config ) ? ;
2022-07-23 02:26:04 +03:00
// TODO: this doesn't seem to do anything
2022-08-12 22:07:14 +03:00
proctitle ::set_title ( format! ( " web3_proxy- {} " , top_config . app . chain_id ) ) ;
2022-07-23 02:26:04 +03:00
2022-10-25 00:07:29 +03:00
// connect to sentry for error reporting
// if no sentry, only log to stdout
let _sentry_guard = if let Some ( sentry_url ) = top_config . app . sentry_url . clone ( ) {
let guard = sentry ::init ( (
sentry_url ,
sentry ::ClientOptions {
release : sentry ::release_name! ( ) ,
// TODO: Set this a to lower value (from config) in production
traces_sample_rate : 1.0 ,
.. Default ::default ( )
} ,
) ) ;
// TODO: how do we put the EnvFilter on this?
tracing_subscriber ::registry ( )
. with (
tracing_subscriber ::fmt ::layer ( )
. compact ( )
. with_filter ( EnvFilter ::from_default_env ( ) ) ,
)
. with ( sentry_tracing ::layer ( ) )
. init ( ) ;
Some ( guard )
} else {
// install global collector configured based on RUST_LOG env var.
// TODO: attach sentry here
tracing_subscriber ::fmt ( )
. compact ( )
. with_env_filter ( EnvFilter ::from_default_env ( ) )
. init ( ) ;
None
} ;
// we used to do this earlier, but now we attach sentry
debug! ( " CLI config @ {:#?} " , cli_config . config ) ;
2022-08-12 22:07:14 +03:00
// tokio has code for catching ctrl+c so we use that
// this shutdown sender is currently only used in tests, but we might make a /shutdown endpoint or something
2022-10-21 01:51:56 +03:00
let ( shutdown_sender , _shutdown_receiver ) = broadcast ::channel ( 1 ) ;
2022-07-23 02:26:04 +03:00
2022-10-21 01:51:56 +03:00
run ( shutdown_sender , cli_config , top_config )
2022-07-23 02:26:04 +03:00
}
#[ cfg(test) ]
mod tests {
use ethers ::{
2022-07-23 03:19:13 +03:00
prelude ::{ Block , Http , Provider , TxHash , U256 } ,
2022-07-23 02:26:04 +03:00
utils ::Anvil ,
} ;
use hashbrown ::HashMap ;
use std ::env ;
2022-08-12 22:07:14 +03:00
use web3_proxy ::config ::{ AppConfig , Web3ConnectionConfig } ;
2022-07-23 02:26:04 +03:00
use super ::* ;
#[ tokio::test ]
async fn it_works ( ) {
// TODO: move basic setup into a test fixture
let path = env ::var ( " PATH " ) . unwrap ( ) ;
println! ( " path: {} " , path ) ;
2022-07-23 03:36:07 +03:00
// TODO: how should we handle logs in this?
// TODO: option for super verbose logs
2022-07-23 02:26:04 +03:00
std ::env ::set_var ( " RUST_LOG " , " info,web3_proxy=debug " ) ;
// install global collector configured based on RUST_LOG env var.
2022-10-25 00:07:29 +03:00
// TODO: sentry is needed here!
2022-07-23 02:26:04 +03:00
tracing_subscriber ::fmt ( )
. with_env_filter ( EnvFilter ::from_default_env ( ) )
. compact ( )
2022-07-26 07:53:38 +03:00
. with_test_writer ( )
2022-07-23 02:26:04 +03:00
. init ( ) ;
let anvil = Anvil ::new ( ) . spawn ( ) ;
println! ( " Anvil running at ` {} ` " , anvil . endpoint ( ) ) ;
2022-07-23 03:36:07 +03:00
let anvil_provider = Provider ::< Http > ::try_from ( anvil . endpoint ( ) ) . unwrap ( ) ;
2022-07-23 02:26:04 +03:00
// mine a block because my code doesn't like being on block 0
// TODO: make block 0 okay?
2022-07-23 03:36:07 +03:00
let _ : U256 = anvil_provider
. request ( " evm_mine " , None ::< ( ) > )
. await
. unwrap ( ) ;
2022-07-23 02:26:04 +03:00
// make a test CliConfig
let cli_config = CliConfig {
port : 0 ,
2022-08-13 00:00:26 +03:00
prometheus_port : 0 ,
2022-07-23 03:19:13 +03:00
workers : 4 ,
2022-07-23 02:26:04 +03:00
config : " ./does/not/exist/test.toml " . to_string ( ) ,
2022-09-24 05:47:44 +03:00
cookie_key_filename : " ./does/not/exist/development_cookie_key " . to_string ( ) ,
2022-07-23 02:26:04 +03:00
} ;
// make a test AppConfig
2022-08-12 22:07:14 +03:00
let app_config = TopConfig {
app : AppConfig {
2022-07-23 02:26:04 +03:00
chain_id : 31337 ,
2022-10-19 02:27:33 +03:00
default_user_requests_per_minute : Some ( 6_000_000 ) ,
2022-08-27 06:11:58 +03:00
min_sum_soft_limit : 1 ,
2022-08-27 03:33:45 +03:00
min_synced_rpcs : 1 ,
2022-10-19 02:27:33 +03:00
public_requests_per_minute : Some ( 1_000_000 ) ,
2022-07-23 02:26:04 +03:00
response_cache_max_bytes : 10_ usize . pow ( 7 ) ,
2022-10-18 00:47:58 +03:00
redirect_public_url : Some ( " example.com/ " . to_string ( ) ) ,
redirect_user_url : Some ( " example.com/{{user_id}} " . to_string ( ) ) ,
2022-09-02 23:16:20 +03:00
.. Default ::default ( )
2022-07-23 02:26:04 +03:00
} ,
balanced_rpcs : HashMap ::from ( [
(
" anvil " . to_string ( ) ,
2022-10-19 02:27:33 +03:00
Web3ConnectionConfig ::new ( false , anvil . endpoint ( ) , 100 , None , 1 , Some ( false ) ) ,
2022-07-23 02:26:04 +03:00
) ,
(
" anvil_ws " . to_string ( ) ,
2022-10-19 02:27:33 +03:00
Web3ConnectionConfig ::new ( false , anvil . ws_endpoint ( ) , 100 , None , 0 , Some ( true ) ) ,
2022-07-23 02:26:04 +03:00
) ,
] ) ,
private_rpcs : None ,
} ;
2022-10-21 01:51:56 +03:00
let ( shutdown_sender , _shutdown_receiver ) = broadcast ::channel ( 1 ) ;
2022-07-23 02:26:04 +03:00
// spawn another thread for running the app
2022-07-23 03:36:07 +03:00
// TODO: allow launching into the local tokio runtime instead of creating a new one?
2022-10-21 01:51:56 +03:00
let handle = {
let shutdown_sender = shutdown_sender . clone ( ) ;
thread ::spawn ( move | | run ( shutdown_sender , cli_config , app_config ) )
} ;
2022-07-23 02:26:04 +03:00
// TODO: do something to the node. query latest block, mine another block, query again
2022-07-23 03:19:13 +03:00
let proxy_provider = Provider ::< Http > ::try_from ( anvil . endpoint ( ) ) . unwrap ( ) ;
2022-07-23 02:26:04 +03:00
2022-07-23 03:36:07 +03:00
let anvil_result : Block < TxHash > = anvil_provider
2022-07-23 03:19:13 +03:00
. request ( " eth_getBlockByNumber " , ( " latest " , true ) )
. await
. unwrap ( ) ;
let proxy_result : Block < TxHash > = proxy_provider
. request ( " eth_getBlockByNumber " , ( " latest " , true ) )
. await
. unwrap ( ) ;
assert_eq! ( anvil_result , proxy_result ) ;
let first_block_num = anvil_result . number . unwrap ( ) ;
2022-07-23 03:36:07 +03:00
let _ : U256 = anvil_provider
. request ( " evm_mine " , None ::< ( ) > )
. await
. unwrap ( ) ;
2022-07-23 03:19:13 +03:00
2022-07-23 03:36:07 +03:00
let anvil_result : Block < TxHash > = anvil_provider
2022-07-23 03:19:13 +03:00
. request ( " eth_getBlockByNumber " , ( " latest " , true ) )
. await
. unwrap ( ) ;
let proxy_result : Block < TxHash > = proxy_provider
. request ( " eth_getBlockByNumber " , ( " latest " , true ) )
. await
. unwrap ( ) ;
assert_eq! ( anvil_result , proxy_result ) ;
let second_block_num = anvil_result . number . unwrap ( ) ;
assert_ne! ( first_block_num , second_block_num ) ;
// tell the test app to shut down
2022-07-23 02:26:04 +03:00
shutdown_sender . send ( ( ) ) . unwrap ( ) ;
println! ( " waiting for shutdown... " ) ;
2022-07-23 03:40:15 +03:00
// TODO: panic if a timeout is reached
2022-07-23 02:26:04 +03:00
handle . join ( ) . unwrap ( ) . unwrap ( ) ;
}
}