2022-08-10 05:37:34 +03:00
// TODO: this file is way too big now. move things into other modules
2022-10-19 21:38:00 +03:00
use crate ::app_stats ::{ ProxyResponseStat , StatEmitter , Web3ProxyStat } ;
2022-08-24 03:59:05 +03:00
use crate ::block_number ::block_needed ;
use crate ::config ::{ AppConfig , TopConfig } ;
2022-10-10 07:15:07 +03:00
use crate ::frontend ::authorization ::{ AuthorizedRequest , RequestMetadata } ;
2022-08-24 03:59:05 +03:00
use crate ::jsonrpc ::JsonRpcForwardedResponse ;
use crate ::jsonrpc ::JsonRpcForwardedResponseEnum ;
use crate ::jsonrpc ::JsonRpcRequest ;
use crate ::jsonrpc ::JsonRpcRequestEnum ;
2022-09-17 05:17:20 +03:00
use crate ::rpcs ::blockchain ::{ ArcBlock , BlockId } ;
2022-08-30 23:01:42 +03:00
use crate ::rpcs ::connections ::Web3Connections ;
2022-09-09 06:53:16 +03:00
use crate ::rpcs ::request ::OpenRequestHandleMetrics ;
2022-08-24 03:59:05 +03:00
use crate ::rpcs ::transactions ::TxStatus ;
2022-07-26 07:53:38 +03:00
use anyhow ::Context ;
2022-05-29 22:33:10 +03:00
use axum ::extract ::ws ::Message ;
2022-10-27 00:39:26 +03:00
use axum ::headers ::{ Origin , Referer , UserAgent } ;
2022-09-15 20:57:24 +03:00
use deferred_rate_limiter ::DeferredRateLimiter ;
2022-08-10 05:37:34 +03:00
use derive_more ::From ;
2022-07-22 22:30:39 +03:00
use ethers ::core ::utils ::keccak256 ;
2022-08-24 03:59:05 +03:00
use ethers ::prelude ::{ Address , Block , Bytes , TxHash , H256 , U64 } ;
2022-06-16 23:57:48 +03:00
use futures ::future ::Abortable ;
2022-05-30 04:28:22 +03:00
use futures ::future ::{ join_all , AbortHandle } ;
2022-06-14 08:43:28 +03:00
use futures ::stream ::FuturesUnordered ;
2022-05-30 04:28:22 +03:00
use futures ::stream ::StreamExt ;
2022-09-09 00:01:36 +03:00
use hashbrown ::HashMap ;
2022-09-23 08:22:33 +03:00
use ipnet ::IpNet ;
2022-09-14 10:08:48 +03:00
use metered ::{ metered , ErrorCount , HitCount , ResponseTime , Throughput } ;
2022-08-03 03:27:26 +03:00
use migration ::{ Migrator , MigratorTrait } ;
2022-09-05 08:53:58 +03:00
use moka ::future ::Cache ;
2022-09-15 20:57:24 +03:00
use redis_rate_limiter ::{ DeadpoolRuntime , RedisConfig , RedisPool , RedisRateLimiter } ;
2022-08-06 03:07:12 +03:00
use sea_orm ::DatabaseConnection ;
2022-09-09 06:53:16 +03:00
use serde ::Serialize ;
2022-05-30 21:23:55 +03:00
use serde_json ::json ;
2022-05-12 02:50:52 +03:00
use std ::fmt ;
2022-09-15 20:57:24 +03:00
use std ::net ::IpAddr ;
2022-07-14 02:25:01 +03:00
use std ::str ::FromStr ;
2022-09-09 06:53:16 +03:00
use std ::sync ::atomic ::{ self , AtomicUsize } ;
2022-05-12 02:50:52 +03:00
use std ::sync ::Arc ;
use std ::time ::Duration ;
2022-09-27 05:01:45 +03:00
use tokio ::sync ::{ broadcast , watch , Semaphore } ;
2022-06-14 08:43:28 +03:00
use tokio ::task ::JoinHandle ;
2022-09-20 06:26:12 +03:00
use tokio ::time ::timeout ;
2022-06-16 23:57:48 +03:00
use tokio_stream ::wrappers ::{ BroadcastStream , WatchStream } ;
2022-10-29 01:52:47 +03:00
use tracing ::{ error , info , instrument , trace , warn } ;
2022-09-24 08:53:45 +03:00
use ulid ::Ulid ;
2022-05-12 02:50:52 +03:00
2022-07-09 05:23:26 +03:00
// TODO: make this customizable?
2022-05-12 02:50:52 +03:00
static APP_USER_AGENT : & str = concat! (
" satoshiandkin/ " ,
env! ( " CARGO_PKG_NAME " ) ,
" / " ,
env! ( " CARGO_PKG_VERSION " ) ,
) ;
2022-08-24 03:59:05 +03:00
/// block hash, method, params
// TODO: better name
2022-09-07 07:11:47 +03:00
type ResponseCacheKey = ( H256 , String , Option < String > ) ;
2022-09-20 04:33:39 +03:00
type ResponseCache =
Cache < ResponseCacheKey , JsonRpcForwardedResponse , hashbrown ::hash_map ::DefaultHashBuilder > ;
2022-05-21 01:16:15 +03:00
2022-06-14 08:43:28 +03:00
pub type AnyhowJoinHandle < T > = JoinHandle < anyhow ::Result < T > > ;
2022-09-24 05:47:44 +03:00
#[ derive(Clone, Debug, Default, From) ]
2022-09-22 23:27:14 +03:00
pub struct UserKeyData {
2022-10-21 23:59:05 +03:00
/// database id of the primary user
pub user_id : u64 ,
2022-10-27 03:12:42 +03:00
/// database id of the rpc key
pub rpc_key_id : u64 ,
2022-11-01 21:54:39 +03:00
/// if None, allow unlimited queries. inherited from the user_tier
2022-09-28 06:35:55 +03:00
pub max_requests_per_period : Option < u64 > ,
2022-11-01 21:54:39 +03:00
// if None, allow unlimited concurrent requests. inherited from the user_tier
2022-11-01 22:12:57 +03:00
pub max_concurrent_requests : Option < u32 > ,
2022-09-23 08:22:33 +03:00
/// if None, allow any Origin
2022-10-27 00:39:26 +03:00
pub allowed_origins : Option < Vec < Origin > > ,
2022-09-22 23:27:14 +03:00
/// if None, allow any Referer
2022-09-23 08:22:33 +03:00
pub allowed_referers : Option < Vec < Referer > > ,
2022-09-22 23:27:14 +03:00
/// if None, allow any UserAgent
2022-09-23 08:22:33 +03:00
pub allowed_user_agents : Option < Vec < UserAgent > > ,
/// if None, allow any IP Address
pub allowed_ips : Option < Vec < IpNet > > ,
2022-09-24 05:47:44 +03:00
/// Chance to save reverting eth_call, eth_estimateGas, and eth_sendRawTransaction to the database.
2022-11-01 21:54:39 +03:00
/// TODO: f32 would be fine
pub log_revert_chance : f64 ,
2022-08-10 08:56:09 +03:00
}
2022-08-24 03:59:05 +03:00
/// The application
// TODO: this debug impl is way too verbose. make something smaller
// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard
pub struct Web3ProxyApp {
/// Send requests to the best server available
pub balanced_rpcs : Arc < Web3Connections > ,
/// Send private requests (like eth_sendRawTransaction) to all these servers
2022-09-14 07:27:18 +03:00
pub private_rpcs : Option < Arc < Web3Connections > > ,
2022-09-05 04:52:59 +03:00
response_cache : ResponseCache ,
2022-08-24 03:59:05 +03:00
// don't drop this or the sender will stop working
// TODO: broadcast channel instead?
2022-08-30 23:01:42 +03:00
head_block_receiver : watch ::Receiver < ArcBlock > ,
2022-08-24 03:59:05 +03:00
pending_tx_sender : broadcast ::Sender < TxStatus > ,
pub config : AppConfig ,
pub db_conn : Option < sea_orm ::DatabaseConnection > ,
2022-09-09 00:01:36 +03:00
/// prometheus metrics
2022-09-09 06:53:16 +03:00
app_metrics : Arc < Web3ProxyAppMetrics > ,
open_request_handle_metrics : Arc < OpenRequestHandleMetrics > ,
2022-09-05 08:53:58 +03:00
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
2022-09-20 04:33:39 +03:00
pub pending_transactions : Cache < TxHash , TxStatus , hashbrown ::hash_map ::DefaultHashBuilder > ,
2022-09-15 20:57:24 +03:00
pub frontend_ip_rate_limiter : Option < DeferredRateLimiter < IpAddr > > ,
2022-09-24 08:53:45 +03:00
pub frontend_key_rate_limiter : Option < DeferredRateLimiter < Ulid > > ,
2022-09-24 06:59:21 +03:00
pub login_rate_limiter : Option < RedisRateLimiter > ,
2022-10-07 05:15:53 +03:00
pub vredis_pool : Option < RedisPool > ,
2022-11-01 21:54:39 +03:00
// TODO: this key should be our RpcSecretKey class, not Ulid
pub rpc_secret_key_cache : Cache < Ulid , UserKeyData , hashbrown ::hash_map ::DefaultHashBuilder > ,
2022-10-27 03:12:42 +03:00
pub rpc_key_semaphores : Cache < u64 , Arc < Semaphore > , hashbrown ::hash_map ::DefaultHashBuilder > ,
2022-09-27 05:01:45 +03:00
pub ip_semaphores : Cache < IpAddr , Arc < Semaphore > , hashbrown ::hash_map ::DefaultHashBuilder > ,
2022-10-25 21:26:58 +03:00
pub bearer_token_semaphores :
Cache < String , Arc < Semaphore > , hashbrown ::hash_map ::DefaultHashBuilder > ,
2022-10-03 23:02:05 +03:00
pub stat_sender : Option < flume ::Sender < Web3ProxyStat > > ,
2022-08-24 03:59:05 +03:00
}
2022-07-16 03:08:22 +03:00
/// flatten a JoinError into an anyhow error
2022-08-10 05:37:34 +03:00
/// Useful when joining multiple futures.
2022-10-29 01:52:47 +03:00
#[ instrument(skip_all) ]
2022-06-14 08:43:28 +03:00
pub async fn flatten_handle < T > ( handle : AnyhowJoinHandle < T > ) -> anyhow ::Result < T > {
match handle . await {
Ok ( Ok ( result ) ) = > Ok ( result ) ,
Ok ( Err ( err ) ) = > Err ( err ) ,
Err ( err ) = > Err ( err . into ( ) ) ,
}
}
2022-07-16 03:08:22 +03:00
/// return the first error or okay if everything worked
2022-10-29 01:52:47 +03:00
#[ instrument(skip_all) ]
2022-06-16 23:57:48 +03:00
pub async fn flatten_handles < T > (
mut handles : FuturesUnordered < AnyhowJoinHandle < T > > ,
2022-06-16 20:51:49 +03:00
) -> anyhow ::Result < ( ) > {
while let Some ( x ) = handles . next ( ) . await {
match x {
Err ( e ) = > return Err ( e . into ( ) ) ,
Ok ( Err ( e ) ) = > return Err ( e ) ,
2022-07-16 03:08:22 +03:00
Ok ( Ok ( _ ) ) = > continue ,
2022-06-16 20:51:49 +03:00
}
}
Ok ( ( ) )
}
2022-08-10 05:37:34 +03:00
/// Connect to the database and run migrations
2022-10-29 01:52:47 +03:00
#[ instrument(level = " trace " ) ]
2022-08-06 03:07:12 +03:00
pub async fn get_migrated_db (
db_url : String ,
2022-09-02 23:16:20 +03:00
min_connections : u32 ,
2022-08-10 07:30:54 +03:00
max_connections : u32 ,
2022-08-06 03:07:12 +03:00
) -> anyhow ::Result < DatabaseConnection > {
2022-09-14 04:43:09 +03:00
// TODO: scrub credentials and then include the db_url in logs
info! ( " Connecting to db " ) ;
2022-08-06 03:07:12 +03:00
let mut db_opt = sea_orm ::ConnectOptions ::new ( db_url ) ;
2022-08-10 05:37:34 +03:00
// TODO: load all these options from the config file. i think mysql default max is 100
2022-08-06 03:07:12 +03:00
// TODO: sqlx logging only in debug. way too verbose for production
db_opt
2022-09-20 01:41:53 +03:00
. connect_timeout ( Duration ::from_secs ( 30 ) )
2022-09-02 23:16:20 +03:00
. min_connections ( min_connections )
2022-08-10 07:30:54 +03:00
. max_connections ( max_connections )
2022-08-06 03:07:12 +03:00
. sqlx_logging ( false ) ;
// .sqlx_logging_level(log::LevelFilter::Info);
2022-10-20 09:17:20 +03:00
let db_conn = sea_orm ::Database ::connect ( db_opt ) . await ? ;
2022-08-06 03:07:12 +03:00
// TODO: if error, roll back?
2022-10-20 09:17:20 +03:00
Migrator ::up ( & db_conn , None ) . await ? ;
2022-08-06 03:07:12 +03:00
2022-10-20 09:17:20 +03:00
Ok ( db_conn )
2022-08-06 03:07:12 +03:00
}
2022-10-31 23:05:58 +03:00
#[ derive(From) ]
pub struct Web3ProxyAppSpawn {
/// the app. probably clone this to use in other groups of handles
pub app : Arc < Web3ProxyApp > ,
// cancellable handles
pub app_handles : FuturesUnordered < AnyhowJoinHandle < ( ) > > ,
/// these are important and must be allowed to finish
pub background_handles : FuturesUnordered < AnyhowJoinHandle < ( ) > > ,
}
2022-09-09 06:53:16 +03:00
#[ metered(registry = Web3ProxyAppMetrics, registry_expr = self.app_metrics, visibility = pub) ]
2022-05-12 02:50:52 +03:00
impl Web3ProxyApp {
2022-10-03 21:08:01 +03:00
/// The main entrypoint.
2022-10-29 01:52:47 +03:00
#[ instrument(level = " trace " ) ]
2022-06-14 07:04:14 +03:00
pub async fn spawn (
2022-08-12 22:07:14 +03:00
top_config : TopConfig ,
2022-09-14 09:18:13 +03:00
num_workers : usize ,
2022-10-21 01:51:56 +03:00
shutdown_receiver : broadcast ::Receiver < ( ) > ,
2022-10-31 23:05:58 +03:00
) -> anyhow ::Result < Web3ProxyAppSpawn > {
2022-08-12 22:07:14 +03:00
// safety checks on the config
2022-10-18 00:47:58 +03:00
if let Some ( redirect ) = & top_config . app . redirect_user_url {
assert! (
2022-11-08 01:10:19 +03:00
redirect . contains ( " {{rpc_key_id}} " ) ,
" redirect_user_url user url must contain \" {{rpc_key_id}} \" "
2022-10-18 00:47:58 +03:00
) ;
}
2022-08-12 22:07:14 +03:00
2022-10-03 21:08:01 +03:00
// setup metrics
2022-09-09 06:53:16 +03:00
let app_metrics = Default ::default ( ) ;
let open_request_handle_metrics : Arc < OpenRequestHandleMetrics > = Default ::default ( ) ;
2022-10-03 21:08:01 +03:00
// connect to mysql and make sure the latest migrations have run
2022-09-20 06:26:12 +03:00
let db_conn = if let Some ( db_url ) = top_config . app . db_url . clone ( ) {
2022-09-14 09:18:13 +03:00
let db_min_connections = top_config
. app
. db_min_connections
. unwrap_or ( num_workers as u32 ) ;
2022-07-26 07:53:38 +03:00
2022-09-02 23:16:20 +03:00
// TODO: what default multiple?
2022-10-03 21:08:01 +03:00
let db_max_connections = top_config
2022-09-02 23:16:20 +03:00
. app
. db_max_connections
2022-09-06 23:50:37 +03:00
. unwrap_or ( db_min_connections * 2 ) ;
2022-09-02 23:16:20 +03:00
2022-10-20 09:17:20 +03:00
let db_conn = get_migrated_db ( db_url , db_min_connections , db_max_connections ) . await ? ;
2022-08-03 03:27:26 +03:00
2022-10-20 09:17:20 +03:00
Some ( db_conn )
2022-07-26 07:53:38 +03:00
} else {
info! ( " no database " ) ;
None
} ;
2022-08-12 22:07:14 +03:00
let balanced_rpcs = top_config . balanced_rpcs ;
2022-10-03 21:08:01 +03:00
let private_rpcs = top_config . private_rpcs . unwrap_or_default ( ) ;
2022-07-09 02:02:32 +03:00
2022-10-21 01:51:56 +03:00
// these are safe to cancel
let cancellable_handles = FuturesUnordered ::new ( ) ;
// we must wait for these to end on their own (and they need to subscribe to shutdown_sender)
let important_background_handles = FuturesUnordered ::new ( ) ;
2022-06-14 08:43:28 +03:00
2022-05-12 02:50:52 +03:00
// make a http shared client
2022-07-09 02:02:32 +03:00
// TODO: can we configure the connection pool? should we?
2022-10-03 21:08:01 +03:00
// TODO: timeouts from config. defaults are hopefully good
2022-05-22 02:34:05 +03:00
let http_client = Some (
reqwest ::ClientBuilder ::new ( )
. connect_timeout ( Duration ::from_secs ( 5 ) )
. timeout ( Duration ::from_secs ( 60 ) )
. user_agent ( APP_USER_AGENT )
. build ( ) ? ,
) ;
2022-10-03 21:08:01 +03:00
// create a connection pool for redis
// a failure to connect does NOT block the application from starting
2022-10-07 05:15:53 +03:00
let vredis_pool = match top_config . app . volatile_redis_url . as_ref ( ) {
2022-07-26 07:53:38 +03:00
Some ( redis_url ) = > {
2022-09-14 04:43:09 +03:00
// TODO: scrub credentials and then include the redis_url in logs
2022-10-07 05:15:53 +03:00
info! ( " Connecting to vredis " ) ;
2022-05-22 21:39:06 +03:00
2022-09-14 09:18:13 +03:00
// TODO: what is a good default?
2022-09-02 23:16:20 +03:00
let redis_max_connections = top_config
. app
2022-10-07 05:15:53 +03:00
. volatile_redis_max_connections
2022-09-14 09:18:13 +03:00
. unwrap_or ( num_workers * 2 ) ;
2022-09-20 06:26:12 +03:00
// TODO: what are reasonable timeouts?
2022-09-14 09:18:13 +03:00
let redis_pool = RedisConfig ::from_url ( redis_url )
. builder ( ) ?
. max_size ( redis_max_connections )
2022-09-15 20:57:24 +03:00
. runtime ( DeadpoolRuntime ::Tokio1 )
2022-09-14 09:18:13 +03:00
. build ( ) ? ;
2022-09-20 06:26:12 +03:00
// test the redis pool
2022-09-17 04:19:11 +03:00
if let Err ( err ) = redis_pool . get ( ) . await {
2022-09-17 05:17:20 +03:00
error! (
? err ,
2022-10-07 05:15:53 +03:00
" failed to connect to vredis. some features will be disabled "
2022-09-17 05:17:20 +03:00
) ;
2022-09-17 04:19:11 +03:00
} ;
2022-09-14 09:18:13 +03:00
Some ( redis_pool )
2022-05-22 21:39:06 +03:00
}
None = > {
2022-10-07 05:15:53 +03:00
warn! ( " no redis connection. some features will be disabled " ) ;
2022-05-22 21:39:06 +03:00
None
2022-05-22 02:34:05 +03:00
}
} ;
2022-05-12 02:50:52 +03:00
2022-10-07 05:15:53 +03:00
// setup a channel for receiving stats (generally with a high cardinality, such as per-user)
2022-10-03 21:08:01 +03:00
// we do this in a channel so we don't slow down our response to the users
2022-10-10 07:15:07 +03:00
let stat_sender = if let Some ( db_conn ) = db_conn . clone ( ) {
2022-11-03 02:14:16 +03:00
let emitter_spawn =
StatEmitter ::spawn ( top_config . app . chain_id , db_conn , 60 , shutdown_receiver ) ? ;
2022-10-10 07:15:07 +03:00
2022-11-03 02:14:16 +03:00
important_background_handles . push ( emitter_spawn . background_handle ) ;
2022-10-03 21:08:01 +03:00
2022-11-03 02:14:16 +03:00
Some ( emitter_spawn . stat_sender )
2022-10-03 21:08:01 +03:00
} else {
2022-11-03 02:14:16 +03:00
warn! ( " cannot store stats without a database connection " ) ;
2022-10-03 21:08:01 +03:00
2022-11-04 22:52:46 +03:00
// TODO: subscribe to the shutdown_receiver here since the stat emitter isn't running?
2022-10-03 21:08:01 +03:00
None
} ;
2022-09-06 15:29:37 +03:00
// TODO: i don't like doing Block::default here! Change this to "None"?
2022-07-22 08:11:26 +03:00
let ( head_block_sender , head_block_receiver ) = watch ::channel ( Arc ::new ( Block ::default ( ) ) ) ;
2022-07-09 03:00:31 +03:00
// TODO: will one receiver lagging be okay? how big should this be?
2022-07-26 07:53:38 +03:00
let ( pending_tx_sender , pending_tx_receiver ) = broadcast ::channel ( 256 ) ;
// TODO: use this? it could listen for confirmed transactions and then clear pending_transactions, but the head_block_sender is doing that
2022-10-03 21:08:01 +03:00
// TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them?
2022-09-20 01:24:56 +03:00
// TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks.
// TODO: we should still have some sort of expiration or maximum size limit for the map
2022-07-26 07:53:38 +03:00
drop ( pending_tx_receiver ) ;
2022-09-20 04:33:39 +03:00
2022-09-17 05:30:06 +03:00
// TODO: capacity from configs
// all these are the same size, so no need for a weigher
2022-10-11 22:58:25 +03:00
// TODO: ttl on this? or is max_capacity fine?
2022-09-17 05:17:20 +03:00
let pending_transactions = Cache ::builder ( )
. max_capacity ( 10_000 )
2022-09-20 04:33:39 +03:00
. build_with_hasher ( hashbrown ::hash_map ::DefaultHashBuilder ::new ( ) ) ;
2022-06-16 20:51:49 +03:00
2022-09-20 01:24:56 +03:00
// keep 1GB of blocks in the cache
2022-09-17 05:30:06 +03:00
// TODO: limits from config
2022-09-20 01:24:56 +03:00
// these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes
2022-11-06 23:52:11 +03:00
// TODO: how can we do the weigher better?
2022-09-17 05:17:20 +03:00
let block_map = Cache ::builder ( )
2022-09-20 01:24:56 +03:00
. max_capacity ( 1024 * 1024 * 1024 )
2022-11-06 23:52:11 +03:00
. weigher ( | _k , v : & ArcBlock | {
2022-10-11 22:58:25 +03:00
// TODO: is this good enough?
2022-11-06 23:52:11 +03:00
1 + v . transactions . len ( ) . try_into ( ) . unwrap_or ( u32 ::MAX )
2022-10-11 22:58:25 +03:00
} )
2022-09-20 04:33:39 +03:00
. build_with_hasher ( hashbrown ::hash_map ::DefaultHashBuilder ::new ( ) ) ;
2022-08-26 20:26:17 +03:00
2022-10-03 21:08:01 +03:00
// connect to the load balanced rpcs
2022-06-14 08:43:28 +03:00
let ( balanced_rpcs , balanced_handle ) = Web3Connections ::spawn (
2022-08-12 22:07:14 +03:00
top_config . app . chain_id ,
2022-05-22 02:34:05 +03:00
balanced_rpcs ,
2022-07-19 07:21:32 +03:00
http_client . clone ( ) ,
2022-10-07 05:15:53 +03:00
vredis_pool . clone ( ) ,
2022-08-26 20:26:17 +03:00
block_map . clone ( ) ,
2022-06-14 07:04:14 +03:00
Some ( head_block_sender ) ,
2022-08-27 06:11:58 +03:00
top_config . app . min_sum_soft_limit ,
2022-08-27 03:33:45 +03:00
top_config . app . min_synced_rpcs ,
2022-06-16 05:53:37 +03:00
Some ( pending_tx_sender . clone ( ) ) ,
2022-06-16 20:51:49 +03:00
pending_transactions . clone ( ) ,
2022-09-09 06:53:16 +03:00
open_request_handle_metrics . clone ( ) ,
2022-05-22 02:34:05 +03:00
)
2022-07-26 07:53:38 +03:00
. await
2022-10-03 21:08:01 +03:00
. context ( " spawning balanced rpcs " ) ? ;
2022-05-18 19:35:06 +03:00
2022-10-03 21:08:01 +03:00
// save the handle to catch any errors
2022-10-21 01:51:56 +03:00
cancellable_handles . push ( balanced_handle ) ;
2022-06-14 08:43:28 +03:00
2022-10-03 21:08:01 +03:00
// connect to the private rpcs
// only some chains have this, so this is optional
2022-05-12 02:50:52 +03:00
let private_rpcs = if private_rpcs . is_empty ( ) {
2022-09-06 23:12:45 +03:00
// TODO: do None instead of clone?
2022-05-12 02:50:52 +03:00
warn! ( " No private relays configured. Any transactions will be broadcast to the public mempool! " ) ;
2022-09-14 07:27:18 +03:00
None
2022-05-12 02:50:52 +03:00
} else {
2022-06-14 08:43:28 +03:00
let ( private_rpcs , private_handle ) = Web3Connections ::spawn (
2022-08-12 22:07:14 +03:00
top_config . app . chain_id ,
2022-05-22 02:34:05 +03:00
private_rpcs ,
2022-07-19 07:21:32 +03:00
http_client . clone ( ) ,
2022-10-07 05:15:53 +03:00
vredis_pool . clone ( ) ,
2022-08-26 20:26:17 +03:00
block_map ,
2022-08-11 00:29:50 +03:00
// subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs
None ,
2022-10-19 02:27:33 +03:00
0 ,
0 ,
2022-08-11 00:29:50 +03:00
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits
2022-06-14 07:04:14 +03:00
None ,
2022-06-16 20:51:49 +03:00
pending_transactions . clone ( ) ,
2022-09-09 06:53:16 +03:00
open_request_handle_metrics . clone ( ) ,
2022-05-22 02:34:05 +03:00
)
2022-07-26 07:53:38 +03:00
. await
2022-10-03 21:08:01 +03:00
. context ( " spawning private_rpcs " ) ? ;
2022-06-14 08:43:28 +03:00
2022-10-20 11:14:38 +03:00
if private_rpcs . conns . is_empty ( ) {
2022-10-20 10:03:58 +03:00
None
} else {
// save the handle to catch any errors
2022-10-21 01:51:56 +03:00
cancellable_handles . push ( private_handle ) ;
2022-06-14 08:43:28 +03:00
2022-10-20 10:03:58 +03:00
Some ( private_rpcs )
}
2022-05-12 02:50:52 +03:00
} ;
2022-10-03 21:08:01 +03:00
// create rate limiters
// these are optional. they require redis
2022-09-15 20:57:24 +03:00
let mut frontend_ip_rate_limiter = None ;
let mut frontend_key_rate_limiter = None ;
2022-09-24 06:59:21 +03:00
let mut login_rate_limiter = None ;
2022-10-07 05:15:53 +03:00
if let Some ( redis_pool ) = vredis_pool . as_ref ( ) {
2022-09-24 06:59:21 +03:00
let rpc_rrl = RedisRateLimiter ::new (
2022-08-06 08:46:33 +03:00
" web3_proxy " ,
2022-08-06 08:26:43 +03:00
" frontend " ,
2022-10-19 02:27:33 +03:00
// TODO: think about this unwrapping
top_config
. app
2022-11-01 22:12:57 +03:00
. public_requests_per_period
2022-10-19 02:27:33 +03:00
. unwrap_or ( u64 ::MAX ) ,
2022-08-30 23:01:42 +03:00
60.0 ,
2022-09-15 20:57:24 +03:00
redis_pool . clone ( ) ,
) ;
2022-09-24 06:59:21 +03:00
// these two rate limiters can share the base limiter
2022-10-25 21:26:58 +03:00
// these are deferred rate limiters because we don't want redis network requests on the hot path
2022-09-15 20:57:24 +03:00
// TODO: take cache_size from config
frontend_ip_rate_limiter = Some ( DeferredRateLimiter ::< IpAddr > ::new (
10_000 ,
" ip " ,
2022-09-24 06:59:21 +03:00
rpc_rrl . clone ( ) ,
None ,
) ) ;
2022-09-24 08:53:45 +03:00
frontend_key_rate_limiter = Some ( DeferredRateLimiter ::< Ulid > ::new (
2022-09-24 06:59:21 +03:00
10_000 , " key " , rpc_rrl , None ,
) ) ;
login_rate_limiter = Some ( RedisRateLimiter ::new (
" web3_proxy " ,
" login " ,
2022-11-01 22:12:57 +03:00
top_config . app . login_rate_limit_per_period ,
2022-09-24 06:59:21 +03:00
60.0 ,
redis_pool . clone ( ) ,
2022-09-15 20:57:24 +03:00
) ) ;
}
2022-07-07 06:22:09 +03:00
2022-09-20 01:24:56 +03:00
// keep 1GB of blocks in the cache
2022-09-17 05:30:06 +03:00
// responses can be very different in sizes, so this definitely needs a weigher
// TODO: max_capacity from config
2022-09-20 01:24:56 +03:00
// TODO: don't allow any response to be bigger than X% of the cache
2022-09-17 05:17:20 +03:00
let response_cache = Cache ::builder ( )
2022-09-20 01:24:56 +03:00
. max_capacity ( 1024 * 1024 * 1024 )
2022-10-11 22:58:25 +03:00
. weigher ( | k : & ( H256 , String , Option < String > ) , v | {
// TODO: make this weigher past. serializing json is not fast
let mut size = ( k . 1 ) . len ( ) ;
if let Some ( params ) = & k . 2 {
size + = params . len ( )
}
if let Ok ( v ) = serde_json ::to_string ( v ) {
size + = v . len ( ) ;
// the or in unwrap_or is probably never called
size . try_into ( ) . unwrap_or ( u32 ::MAX )
} else {
// this seems impossible
u32 ::MAX
}
} )
2022-09-20 04:33:39 +03:00
. build_with_hasher ( hashbrown ::hash_map ::DefaultHashBuilder ::new ( ) ) ;
2022-09-15 20:57:24 +03:00
2022-09-17 05:30:06 +03:00
// all the users are the same size, so no need for a weigher
2022-10-03 21:08:01 +03:00
// if there is no database of users, there will be no keys and so this will be empty
2022-09-17 05:30:06 +03:00
// TODO: max_capacity from config
2022-09-22 02:50:55 +03:00
// TODO: ttl from config
2022-11-01 21:54:39 +03:00
let rpc_secret_key_cache = Cache ::builder ( )
2022-09-17 05:17:20 +03:00
. max_capacity ( 10_000 )
2022-11-01 21:54:39 +03:00
. time_to_live ( Duration ::from_secs ( 600 ) )
2022-09-20 04:33:39 +03:00
. build_with_hasher ( hashbrown ::hash_map ::DefaultHashBuilder ::new ( ) ) ;
2022-09-05 08:53:58 +03:00
2022-10-03 21:08:01 +03:00
// create semaphores for concurrent connection limits
2022-09-27 05:01:45 +03:00
// TODO: what should tti be for semaphores?
2022-10-25 21:26:58 +03:00
let bearer_token_semaphores = Cache ::builder ( )
2022-09-27 05:01:45 +03:00
. time_to_idle ( Duration ::from_secs ( 120 ) )
. build_with_hasher ( hashbrown ::hash_map ::DefaultHashBuilder ::new ( ) ) ;
let ip_semaphores = Cache ::builder ( )
. time_to_idle ( Duration ::from_secs ( 120 ) )
. build_with_hasher ( hashbrown ::hash_map ::DefaultHashBuilder ::new ( ) ) ;
2022-10-27 03:12:42 +03:00
let rpc_key_semaphores = Cache ::builder ( )
2022-10-25 21:26:58 +03:00
. time_to_idle ( Duration ::from_secs ( 120 ) )
. build_with_hasher ( hashbrown ::hash_map ::DefaultHashBuilder ::new ( ) ) ;
2022-09-27 05:01:45 +03:00
2022-07-07 06:22:09 +03:00
let app = Self {
2022-08-12 22:07:14 +03:00
config : top_config . app ,
2022-05-13 23:50:11 +03:00
balanced_rpcs ,
2022-05-12 02:50:52 +03:00
private_rpcs ,
2022-09-05 08:53:58 +03:00
response_cache ,
2022-05-30 04:28:22 +03:00
head_block_receiver ,
2022-06-16 05:53:37 +03:00
pending_tx_sender ,
2022-06-16 20:51:49 +03:00
pending_transactions ,
2022-09-15 20:57:24 +03:00
frontend_ip_rate_limiter ,
frontend_key_rate_limiter ,
2022-09-24 06:59:21 +03:00
login_rate_limiter ,
2022-07-26 07:53:38 +03:00
db_conn ,
2022-10-07 05:15:53 +03:00
vredis_pool ,
2022-09-09 06:53:16 +03:00
app_metrics ,
open_request_handle_metrics ,
2022-11-01 21:54:39 +03:00
rpc_secret_key_cache ,
2022-10-25 21:26:58 +03:00
bearer_token_semaphores ,
2022-09-27 05:01:45 +03:00
ip_semaphores ,
2022-10-27 03:12:42 +03:00
rpc_key_semaphores ,
2022-10-03 23:02:05 +03:00
stat_sender ,
2022-06-14 07:04:14 +03:00
} ;
let app = Arc ::new ( app ) ;
2022-10-31 23:05:58 +03:00
Ok ( ( app , cancellable_handles , important_background_handles ) . into ( ) )
2022-05-12 02:50:52 +03:00
}
2022-10-29 01:52:47 +03:00
#[ instrument(level = " trace " ) ]
2022-09-10 05:59:07 +03:00
pub fn prometheus_metrics ( & self ) -> String {
2022-09-09 00:01:36 +03:00
let globals = HashMap ::new ( ) ;
// TODO: what globals? should this be the hostname or what?
// globals.insert("service", "web3_proxy");
2022-09-09 06:53:16 +03:00
#[ derive(Serialize) ]
struct CombinedMetrics < ' a > {
app : & ' a Web3ProxyAppMetrics ,
backend_rpc : & ' a OpenRequestHandleMetrics ,
}
let metrics = CombinedMetrics {
app : & self . app_metrics ,
backend_rpc : & self . open_request_handle_metrics ,
} ;
2022-09-10 05:59:07 +03:00
serde_prometheus ::to_string ( & metrics , Some ( " web3_proxy " ) , globals )
. expect ( " prometheus metrics should always serialize " )
2022-09-09 00:01:36 +03:00
}
2022-09-14 10:08:48 +03:00
#[ measure( [ ErrorCount, HitCount, ResponseTime, Throughput ] ) ]
2022-10-29 01:52:47 +03:00
#[ instrument(level = " trace " ) ]
2022-09-09 00:01:36 +03:00
pub async fn eth_subscribe < ' a > (
self : & ' a Arc < Self > ,
2022-09-24 08:53:45 +03:00
authorized_request : Arc < AuthorizedRequest > ,
2022-05-29 22:33:10 +03:00
payload : JsonRpcRequest ,
2022-09-09 00:01:36 +03:00
subscription_count : & ' a AtomicUsize ,
2022-05-30 04:28:22 +03:00
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
2022-07-09 01:14:45 +03:00
response_sender : flume ::Sender < Message > ,
2022-05-30 04:28:22 +03:00
) -> anyhow ::Result < ( AbortHandle , JsonRpcForwardedResponse ) > {
2022-06-14 10:13:42 +03:00
let ( subscription_abort_handle , subscription_registration ) = AbortHandle ::new_pair ( ) ;
2022-05-30 04:28:22 +03:00
2022-06-14 07:04:14 +03:00
// TODO: this only needs to be unique per connection. we don't need it globably unique
2022-07-09 01:14:45 +03:00
let subscription_id = subscription_count . fetch_add ( 1 , atomic ::Ordering ::SeqCst ) ;
2022-08-04 02:17:02 +03:00
let subscription_id = U64 ::from ( subscription_id ) ;
2022-05-30 21:23:55 +03:00
2022-06-05 23:39:58 +03:00
// save the id so we can use it in the response
let id = payload . id . clone ( ) ;
2022-07-16 03:35:54 +03:00
// TODO: calling json! on every request is probably not fast. but we can only match against
// TODO: i think we need a stricter EthSubscribeRequest type that JsonRpcRequest can turn into
2022-07-08 22:01:11 +03:00
match payload . params {
Some ( x ) if x = = json! ( [ " newHeads " ] ) = > {
2022-06-16 23:57:48 +03:00
let head_block_receiver = self . head_block_receiver . clone ( ) ;
trace! ( ? subscription_id , " new heads subscription " ) ;
tokio ::spawn ( async move {
let mut head_block_receiver = Abortable ::new (
WatchStream ::new ( head_block_receiver ) ,
subscription_registration ,
) ;
while let Some ( new_head ) = head_block_receiver . next ( ) . await {
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json! ( {
" jsonrpc " : " 2.0 " ,
" method " :" eth_subscription " ,
" params " : {
" subscription " : subscription_id ,
2022-08-11 00:29:50 +03:00
// TODO: option to include full transaction objects instead of just the hashes?
2022-07-22 08:11:26 +03:00
" result " : new_head . as_ref ( ) ,
2022-06-16 23:57:48 +03:00
} ,
} ) ;
2022-08-11 00:29:50 +03:00
// TODO: do clients support binary messages?
let msg = Message ::Text (
serde_json ::to_string ( & msg ) . expect ( " this should always be valid json " ) ,
) ;
2022-06-16 23:57:48 +03:00
2022-07-09 01:14:45 +03:00
if response_sender . send_async ( msg ) . await . is_err ( ) {
2022-06-16 23:57:48 +03:00
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break ;
} ;
}
trace! ( ? subscription_id , " closed new heads subscription " ) ;
} ) ;
2022-05-30 04:28:22 +03:00
}
2022-07-08 22:01:11 +03:00
Some ( x ) if x = = json! ( [ " newPendingTransactions " ] ) = > {
2022-06-16 23:57:48 +03:00
let pending_tx_receiver = self . pending_tx_sender . subscribe ( ) ;
let mut pending_tx_receiver = Abortable ::new (
BroadcastStream ::new ( pending_tx_receiver ) ,
subscription_registration ,
) ;
trace! ( ? subscription_id , " pending transactions subscription " ) ;
tokio ::spawn ( async move {
while let Some ( Ok ( new_tx_state ) ) = pending_tx_receiver . next ( ) . await {
let new_tx = match new_tx_state {
2022-08-24 03:59:05 +03:00
TxStatus ::Pending ( tx ) = > tx ,
TxStatus ::Confirmed ( .. ) = > continue ,
TxStatus ::Orphaned ( tx ) = > tx ,
2022-06-16 23:57:48 +03:00
} ;
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json! ( {
" jsonrpc " : " 2.0 " ,
" method " : " eth_subscription " ,
" params " : {
" subscription " : subscription_id ,
" result " : new_tx . hash ,
} ,
} ) ;
2022-09-20 06:26:12 +03:00
let msg =
Message ::Text ( serde_json ::to_string ( & msg ) . expect ( " we made this `msg` " ) ) ;
2022-06-16 23:57:48 +03:00
2022-07-09 01:14:45 +03:00
if response_sender . send_async ( msg ) . await . is_err ( ) {
2022-06-16 23:57:48 +03:00
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break ;
} ;
}
trace! ( ? subscription_id , " closed new heads subscription " ) ;
} ) ;
}
2022-07-08 22:01:11 +03:00
Some ( x ) if x = = json! ( [ " newPendingFullTransactions " ] ) = > {
2022-06-16 23:57:48 +03:00
// TODO: too much copy/pasta with newPendingTransactions
let pending_tx_receiver = self . pending_tx_sender . subscribe ( ) ;
let mut pending_tx_receiver = Abortable ::new (
BroadcastStream ::new ( pending_tx_receiver ) ,
subscription_registration ,
) ;
trace! ( ? subscription_id , " pending transactions subscription " ) ;
// TODO: do something with this handle?
tokio ::spawn ( async move {
while let Some ( Ok ( new_tx_state ) ) = pending_tx_receiver . next ( ) . await {
let new_tx = match new_tx_state {
2022-08-24 03:59:05 +03:00
TxStatus ::Pending ( tx ) = > tx ,
TxStatus ::Confirmed ( .. ) = > continue ,
TxStatus ::Orphaned ( tx ) = > tx ,
2022-06-16 23:57:48 +03:00
} ;
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json! ( {
" jsonrpc " : " 2.0 " ,
" method " : " eth_subscription " ,
" params " : {
" subscription " : subscription_id ,
// upstream just sends the txid, but we want to send the whole transaction
" result " : new_tx ,
} ,
} ) ;
2022-09-20 06:26:12 +03:00
let msg = Message ::Text (
serde_json ::to_string ( & msg ) . expect ( " we made this message " ) ,
) ;
2022-06-16 23:57:48 +03:00
2022-07-09 01:14:45 +03:00
if response_sender . send_async ( msg ) . await . is_err ( ) {
2022-06-18 10:06:54 +03:00
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break ;
} ;
}
trace! ( ? subscription_id , " closed new heads subscription " ) ;
} ) ;
}
2022-07-08 22:01:11 +03:00
Some ( x ) if x = = json! ( [ " newPendingRawTransactions " ] ) = > {
2022-06-18 10:06:54 +03:00
// TODO: too much copy/pasta with newPendingTransactions
let pending_tx_receiver = self . pending_tx_sender . subscribe ( ) ;
let mut pending_tx_receiver = Abortable ::new (
BroadcastStream ::new ( pending_tx_receiver ) ,
subscription_registration ,
) ;
trace! ( ? subscription_id , " pending transactions subscription " ) ;
// TODO: do something with this handle?
tokio ::spawn ( async move {
while let Some ( Ok ( new_tx_state ) ) = pending_tx_receiver . next ( ) . await {
let new_tx = match new_tx_state {
2022-08-24 03:59:05 +03:00
TxStatus ::Pending ( tx ) = > tx ,
TxStatus ::Confirmed ( .. ) = > continue ,
TxStatus ::Orphaned ( tx ) = > tx ,
2022-06-18 10:06:54 +03:00
} ;
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json! ( {
" jsonrpc " : " 2.0 " ,
" method " : " eth_subscription " ,
" params " : {
" subscription " : subscription_id ,
2022-07-08 22:01:11 +03:00
// upstream just sends the txid, but we want to send the raw transaction
2022-06-18 10:06:54 +03:00
" result " : new_tx . rlp ( ) ,
} ,
} ) ;
2022-09-30 07:18:18 +03:00
let msg = Message ::Text (
serde_json ::to_string ( & msg ) . expect ( " this message was just built " ) ,
) ;
2022-06-18 10:06:54 +03:00
2022-07-09 01:14:45 +03:00
if response_sender . send_async ( msg ) . await . is_err ( ) {
2022-06-16 23:57:48 +03:00
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break ;
} ;
}
trace! ( ? subscription_id , " closed new heads subscription " ) ;
} ) ;
}
_ = > return Err ( anyhow ::anyhow! ( " unimplemented " ) ) ,
}
2022-05-30 04:28:22 +03:00
2022-06-14 10:13:42 +03:00
// TODO: do something with subscription_join_handle?
2022-05-30 04:28:22 +03:00
2022-07-22 22:30:39 +03:00
let response = JsonRpcForwardedResponse ::from_value ( json! ( subscription_id ) , id ) ;
2022-05-30 04:28:22 +03:00
2022-06-16 05:53:37 +03:00
// TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct?
2022-06-14 10:13:42 +03:00
Ok ( ( subscription_abort_handle , response ) )
2022-05-29 22:33:10 +03:00
}
2022-07-22 22:30:39 +03:00
/// send the request or batch of requests to the approriate RPCs
2022-10-29 01:52:47 +03:00
#[ instrument(level = " trace " ) ]
2022-05-12 02:50:52 +03:00
pub async fn proxy_web3_rpc (
2022-09-09 00:01:36 +03:00
self : & Arc < Self > ,
2022-10-10 07:15:07 +03:00
authorized_request : Arc < AuthorizedRequest > ,
2022-05-12 02:50:52 +03:00
request : JsonRpcRequestEnum ,
2022-05-20 08:27:18 +03:00
) -> anyhow ::Result < JsonRpcForwardedResponseEnum > {
2022-09-05 04:52:59 +03:00
// TODO: this should probably be trace level
2022-09-06 23:12:45 +03:00
trace! ( ? request , " proxy_web3_rpc " ) ;
2022-05-12 02:50:52 +03:00
2022-05-22 07:22:30 +03:00
// even though we have timeouts on the requests to our backend providers,
2022-07-22 22:30:39 +03:00
// we need a timeout for the incoming request so that retries don't run forever
// TODO: take this as an optional argument. per user max? expiration time instead of duration?
let max_time = Duration ::from_secs ( 120 ) ;
2022-05-12 02:50:52 +03:00
let response = match request {
2022-05-22 07:22:30 +03:00
JsonRpcRequestEnum ::Single ( request ) = > JsonRpcForwardedResponseEnum ::Single (
2022-09-22 23:27:14 +03:00
timeout (
max_time ,
2022-09-24 08:53:45 +03:00
self . proxy_web3_rpc_request ( authorized_request , request ) ,
2022-09-22 23:27:14 +03:00
)
. await ? ? ,
2022-05-22 07:22:30 +03:00
) ,
JsonRpcRequestEnum ::Batch ( requests ) = > JsonRpcForwardedResponseEnum ::Batch (
2022-09-22 23:27:14 +03:00
timeout (
max_time ,
2022-09-24 08:53:45 +03:00
self . proxy_web3_rpc_requests ( authorized_request , requests ) ,
2022-09-22 23:27:14 +03:00
)
. await ? ? ,
2022-05-22 07:22:30 +03:00
) ,
2022-05-12 02:50:52 +03:00
} ;
2022-09-05 04:52:59 +03:00
// TODO: this should probably be trace level
2022-09-06 23:12:45 +03:00
trace! ( ? response , " Forwarding " ) ;
2022-05-17 03:56:56 +03:00
2022-05-20 08:27:18 +03:00
Ok ( response )
2022-05-12 02:50:52 +03:00
}
2022-09-09 00:01:36 +03:00
/// cut up the request and send to potentually different servers
/// TODO: make sure this isn't a problem
2022-10-29 01:52:47 +03:00
#[ instrument(level = " trace " ) ]
2022-05-12 02:50:52 +03:00
async fn proxy_web3_rpc_requests (
2022-09-09 00:01:36 +03:00
self : & Arc < Self > ,
2022-10-10 07:15:07 +03:00
authorized_request : Arc < AuthorizedRequest > ,
2022-05-12 02:50:52 +03:00
requests : Vec < JsonRpcRequest > ,
) -> anyhow ::Result < Vec < JsonRpcForwardedResponse > > {
// TODO: we should probably change ethers-rs to support this directly
let num_requests = requests . len ( ) ;
2022-10-10 07:15:07 +03:00
2022-05-12 02:50:52 +03:00
let responses = join_all (
requests
. into_iter ( )
2022-10-10 07:15:07 +03:00
. map ( | request | {
let authorized_request = authorized_request . clone ( ) ;
// TODO: spawn so the requests go in parallel
// TODO: i think we will need to flatten
self . proxy_web3_rpc_request ( authorized_request , request )
} )
2022-05-12 02:50:52 +03:00
. collect ::< Vec < _ > > ( ) ,
)
. await ;
// TODO: i'm sure this could be done better with iterators
let mut collected : Vec < JsonRpcForwardedResponse > = Vec ::with_capacity ( num_requests ) ;
for response in responses {
2022-05-18 19:35:06 +03:00
collected . push ( response ? ) ;
2022-05-12 02:50:52 +03:00
}
Ok ( collected )
}
2022-10-19 03:56:57 +03:00
/// TODO: i don't think we want or need this. just use app.db_conn, or maybe app.db_conn.clone() or app.db_conn.as_ref()
2022-10-29 01:52:47 +03:00
#[ instrument(level = " trace " ) ]
2022-10-19 03:56:57 +03:00
pub fn db_conn ( & self ) -> Option < DatabaseConnection > {
self . db_conn . clone ( )
2022-09-24 07:31:06 +03:00
}
2022-10-29 01:52:47 +03:00
#[ instrument(level = " trace " ) ]
2022-09-15 20:57:24 +03:00
pub async fn redis_conn ( & self ) -> anyhow ::Result < redis_rate_limiter ::RedisConnection > {
2022-10-07 05:15:53 +03:00
match self . vredis_pool . as_ref ( ) {
2022-09-02 08:40:56 +03:00
None = > Err ( anyhow ::anyhow! ( " no redis server configured " ) ) ,
Some ( redis_pool ) = > {
let redis_conn = redis_pool . get ( ) . await ? ;
Ok ( redis_conn )
}
}
}
2022-09-14 10:08:48 +03:00
#[ measure( [ ErrorCount, HitCount, ResponseTime, Throughput ] ) ]
2022-10-29 01:52:47 +03:00
#[ instrument(level = " trace " ) ]
2022-05-12 02:50:52 +03:00
async fn proxy_web3_rpc_request (
2022-09-09 00:01:36 +03:00
self : & Arc < Self > ,
2022-10-10 07:15:07 +03:00
authorized_request : Arc < AuthorizedRequest > ,
2022-07-09 05:23:26 +03:00
mut request : JsonRpcRequest ,
2022-05-12 02:50:52 +03:00
) -> anyhow ::Result < JsonRpcForwardedResponse > {
trace! ( " Received request: {:?} " , request ) ;
2022-10-11 22:58:25 +03:00
// TODO: allow customizing the period?
let request_metadata = Arc ::new ( RequestMetadata ::new ( 60 , & request ) ? ) ;
2022-10-10 07:15:07 +03:00
2022-09-06 20:56:49 +03:00
// save the id so we can attach it to the response
2022-09-20 06:26:12 +03:00
// TODO: instead of cloning, take the id out
2022-09-07 06:54:16 +03:00
let request_id = request . id . clone ( ) ;
2022-09-06 20:56:49 +03:00
2022-05-31 04:55:04 +03:00
// TODO: if eth_chainId or net_version, serve those without querying the backend
2022-09-22 02:50:55 +03:00
// TODO: don't clone?
2022-09-05 08:53:58 +03:00
let partial_response : serde_json ::Value = match request . method . clone ( ) . as_ref ( ) {
2022-07-09 05:23:26 +03:00
// lots of commands are blocked
2022-09-09 00:01:36 +03:00
method @ ( " admin_addPeer "
2022-06-14 09:54:19 +03:00
| " admin_datadir "
| " admin_startRPC "
| " admin_startWS "
| " admin_stopRPC "
| " admin_stopWS "
2022-07-09 05:23:26 +03:00
| " db_getHex "
| " db_getString "
| " db_putHex "
| " db_putString "
2022-06-14 09:54:19 +03:00
| " debug_chaindbCompact "
| " debug_freezeClient "
| " debug_goTrace "
| " debug_mutexProfile "
| " debug_setBlockProfileRate "
| " debug_setGCPercent "
| " debug_setHead "
| " debug_setMutexProfileFraction "
| " debug_standardTraceBlockToFile "
| " debug_standardTraceBadBlockToFile "
| " debug_startCPUProfile "
| " debug_startGoTrace "
| " debug_stopCPUProfile "
| " debug_stopGoTrace "
| " debug_writeBlockProfile "
| " debug_writeMemProfile "
| " debug_writeMutexProfile "
2022-07-09 05:23:26 +03:00
| " eth_compileLLL "
| " eth_compileSerpent "
| " eth_compileSolidity "
| " eth_getCompilers "
| " eth_sendTransaction "
| " eth_sign "
| " eth_signTransaction "
| " eth_submitHashrate "
| " eth_submitWork "
2022-06-14 09:54:19 +03:00
| " les_addBalance "
| " les_setClientParams "
| " les_setDefaultParams "
| " miner_setExtra "
| " miner_setGasPrice "
| " miner_start "
| " miner_stop "
| " miner_setEtherbase "
| " miner_setGasLimit "
| " personal_importRawKey "
| " personal_listAccounts "
| " personal_lockAccount "
| " personal_newAccount "
| " personal_unlockAccount "
| " personal_sendTransaction "
| " personal_sign "
2022-07-09 05:23:26 +03:00
| " personal_ecRecover "
| " shh_addToGroup "
| " shh_getFilterChanges "
| " shh_getMessages "
| " shh_hasIdentity "
| " shh_newFilter "
| " shh_newGroup "
| " shh_newIdentity "
| " shh_post "
| " shh_uninstallFilter "
2022-09-09 00:01:36 +03:00
| " shh_version " ) = > {
2022-08-13 01:12:46 +03:00
// TODO: client error stat
2022-06-30 03:52:04 +03:00
// TODO: proper error code
2022-09-09 00:01:36 +03:00
return Err ( anyhow ::anyhow! ( " method unsupported: {} " , method ) ) ;
2022-07-09 05:23:26 +03:00
}
// TODO: implement these commands
2022-09-09 00:01:36 +03:00
method @ ( " eth_getFilterChanges "
2022-07-09 05:23:26 +03:00
| " eth_getFilterLogs "
| " eth_newBlockFilter "
| " eth_newFilter "
| " eth_newPendingTransactionFilter "
2022-09-09 00:01:36 +03:00
| " eth_uninstallFilter " ) = > {
2022-08-13 01:12:46 +03:00
// TODO: unsupported command stat
2022-09-09 00:01:36 +03:00
return Err ( anyhow ::anyhow! ( " not yet implemented: {} " , method ) ) ;
2022-08-13 01:12:46 +03:00
}
2022-07-09 05:23:26 +03:00
// some commands can use local data or caches
2022-08-13 01:12:46 +03:00
" eth_accounts " = > {
// no stats on this. its cheap
serde_json ::Value ::Array ( vec! [ ] )
}
2022-07-09 05:23:26 +03:00
" eth_blockNumber " = > {
2022-09-01 08:58:55 +03:00
match self . balanced_rpcs . head_block_num ( ) {
Some ( head_block_num ) = > {
json! ( head_block_num )
}
None = > {
// TODO: what does geth do if this happens?
2022-09-09 00:01:36 +03:00
return Err ( anyhow ::anyhow! (
" no servers synced. unknown eth_blockNumber "
) ) ;
2022-09-01 08:58:55 +03:00
}
2022-07-09 05:23:26 +03:00
}
}
// TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle)
// TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject)
// TODO: eth_sendPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_sendprivatetransaction)
" eth_coinbase " = > {
2022-07-22 22:30:39 +03:00
// no need for serving coinbase
// we could return a per-user payment address here, but then we might leak that to dapps
2022-08-13 01:12:46 +03:00
// no stats on this. its cheap
2022-07-22 22:30:39 +03:00
json! ( Address ::zero ( ) )
2022-07-09 05:23:26 +03:00
}
// TODO: eth_estimateGas using anvil?
// TODO: eth_gasPrice that does awesome magic to predict the future
" eth_hashrate " = > {
2022-08-13 01:12:46 +03:00
// no stats on this. its cheap
2022-07-22 22:30:39 +03:00
json! ( U64 ::zero ( ) )
2022-06-30 03:52:04 +03:00
}
2022-07-09 05:23:26 +03:00
" eth_mining " = > {
2022-08-13 01:12:46 +03:00
// no stats on this. its cheap
2022-07-22 22:30:39 +03:00
json! ( false )
2022-07-09 05:23:26 +03:00
}
// TODO: eth_sendBundle (flashbots command)
// broadcast transactions to all private rpcs at once
2022-08-09 19:54:05 +03:00
" eth_sendRawTransaction " = > {
2022-08-13 01:12:46 +03:00
// emit stats
2022-09-14 07:27:18 +03:00
let rpcs = self . private_rpcs . as_ref ( ) . unwrap_or ( & self . balanced_rpcs ) ;
2022-09-22 23:27:14 +03:00
return rpcs
2022-10-11 22:58:25 +03:00
. try_send_all_upstream_servers (
Some ( & authorized_request ) ,
request ,
Some ( request_metadata ) ,
None ,
)
2022-09-22 23:27:14 +03:00
. await ;
2022-08-09 19:54:05 +03:00
}
2022-07-09 05:23:26 +03:00
" eth_syncing " = > {
2022-08-13 01:12:46 +03:00
// no stats on this. its cheap
2022-07-09 05:23:26 +03:00
// TODO: return a real response if all backends are syncing or if no servers in sync
2022-07-22 22:30:39 +03:00
json! ( false )
2022-07-09 05:23:26 +03:00
}
" net_listening " = > {
2022-08-13 01:12:46 +03:00
// no stats on this. its cheap
2022-07-09 05:23:26 +03:00
// TODO: only if there are some backends on balanced_rpcs?
2022-07-22 22:30:39 +03:00
json! ( true )
2022-07-09 05:23:26 +03:00
}
2022-08-13 01:12:46 +03:00
" net_peerCount " = > {
// emit stats
self . balanced_rpcs . num_synced_rpcs ( ) . into ( )
}
" web3_clientVersion " = > {
// no stats on this. its cheap
serde_json ::Value ::String ( APP_USER_AGENT . to_string ( ) )
}
2022-07-22 22:30:39 +03:00
" web3_sha3 " = > {
2022-08-13 01:12:46 +03:00
// emit stats
2022-07-22 22:30:39 +03:00
// returns Keccak-256 (not the standardized SHA3-256) of the given data.
match & request . params {
Some ( serde_json ::Value ::Array ( params ) ) = > {
2022-08-10 05:37:34 +03:00
// TODO: make a struct and use serde conversion to clean this up
2022-07-22 22:30:39 +03:00
if params . len ( ) ! = 1 | | ! params [ 0 ] . is_string ( ) {
2022-09-10 03:58:33 +03:00
// TODO: this needs the correct error code in the response
2022-07-22 22:30:39 +03:00
return Err ( anyhow ::anyhow! ( " invalid request " ) ) ;
}
2022-07-09 05:23:26 +03:00
2022-09-20 06:26:12 +03:00
let param = Bytes ::from_str (
params [ 0 ]
. as_str ( )
. context ( " parsing params 0 into str then bytes " ) ? ,
) ? ;
2022-07-09 05:23:26 +03:00
2022-07-22 22:30:39 +03:00
let hash = H256 ::from ( keccak256 ( param ) ) ;
2022-07-09 05:23:26 +03:00
2022-07-22 22:30:39 +03:00
json! ( hash )
}
2022-09-10 03:58:33 +03:00
_ = > {
// TODO: this needs the correct error code in the response
2022-10-12 00:31:34 +03:00
// TODO: emit stat?
2022-09-10 03:58:33 +03:00
return Err ( anyhow ::anyhow! ( " invalid request " ) ) ;
}
2022-07-22 22:30:39 +03:00
}
2022-07-09 05:23:26 +03:00
}
2022-07-22 22:30:39 +03:00
// anything else gets sent to backend rpcs and cached
2022-06-30 03:52:04 +03:00
method = > {
2022-08-13 01:12:46 +03:00
// emit stats
2022-11-03 02:14:16 +03:00
// TODO: if no servers synced, wait for them to be synced?
2022-09-07 06:54:16 +03:00
let head_block_id = self
2022-09-01 08:58:55 +03:00
. balanced_rpcs
2022-09-07 06:54:16 +03:00
. head_block_id ( )
2022-09-01 08:58:55 +03:00
. context ( " no servers synced " ) ? ;
2022-07-16 07:13:02 +03:00
2022-07-09 05:23:26 +03:00
// we do this check before checking caches because it might modify the request params
2022-07-16 08:48:02 +03:00
// TODO: add a stat for archive vs full since they should probably cost different
2022-09-22 02:50:55 +03:00
let request_block_id = if let Some ( request_block_needed ) = block_needed (
method ,
request . params . as_mut ( ) ,
head_block_id . num ,
& self . balanced_rpcs ,
)
2022-09-30 07:18:18 +03:00
. await ?
2022-09-05 09:13:36 +03:00
{
2022-09-07 06:54:16 +03:00
// TODO: maybe this should be on the app and not on balanced_rpcs
2022-11-03 02:14:16 +03:00
let ( request_block_hash , archive_needed ) =
2022-09-07 06:54:16 +03:00
self . balanced_rpcs . block_hash ( & request_block_needed ) . await ? ;
2022-09-05 08:53:58 +03:00
2022-11-03 02:14:16 +03:00
if archive_needed {
request_metadata
. archive_request
. store ( true , atomic ::Ordering ::Relaxed ) ;
}
2022-09-07 06:54:16 +03:00
BlockId {
num : request_block_needed ,
hash : request_block_hash ,
2022-06-14 07:04:14 +03:00
}
2022-09-07 06:54:16 +03:00
} else {
head_block_id
2022-06-14 07:04:14 +03:00
} ;
2022-09-07 06:54:16 +03:00
// TODO: struct for this?
// TODO: this can be rather large. is that okay?
let cache_key = (
request_block_id . hash ,
request . method . clone ( ) ,
request . params . clone ( ) . map ( | x | x . to_string ( ) ) ,
) ;
2022-10-03 23:02:05 +03:00
let mut response = {
2022-10-11 22:58:25 +03:00
let request_metadata = request_metadata . clone ( ) ;
2022-10-03 23:02:05 +03:00
2022-10-10 07:15:07 +03:00
let authorized_request = authorized_request . clone ( ) ;
2022-10-03 23:02:05 +03:00
self . response_cache
. try_get_with ( cache_key , async move {
// TODO: retry some failures automatically!
// TODO: try private_rpcs if all the balanced_rpcs fail!
// TODO: put the hash here instead?
let mut response = self
. balanced_rpcs
. try_send_best_upstream_server (
2022-10-10 07:15:07 +03:00
Some ( & authorized_request ) ,
2022-10-03 23:02:05 +03:00
request ,
2022-10-11 22:58:25 +03:00
Some ( & request_metadata ) ,
2022-10-03 23:02:05 +03:00
Some ( & request_block_id . num ) ,
)
. await ? ;
// discard their id by replacing it with an empty
response . id = Default ::default ( ) ;
2022-10-12 00:31:34 +03:00
// TODO: only cache the inner response (or error)
2022-10-03 23:02:05 +03:00
Ok ::< _ , anyhow ::Error > ( response )
} )
. await
// TODO: what is the best way to handle an Arc here?
. map_err ( | err | {
// TODO: emit a stat for an error
anyhow ::anyhow! ( err )
} )
. context ( " caching response " ) ?
} ;
2022-10-11 22:58:25 +03:00
// since this data came likely out of a cache, the id is not going to match
// replace the id with our request's id.
response . id = request_id ;
2022-10-12 00:31:34 +03:00
// DRY this up by just returning the partial result (or error) here
2022-10-10 07:15:07 +03:00
if let ( Some ( stat_sender ) , Ok ( AuthorizedRequest ::User ( Some ( _ ) , authorized_key ) ) ) = (
self . stat_sender . as_ref ( ) ,
Arc ::try_unwrap ( authorized_request ) ,
) {
2022-10-03 23:02:05 +03:00
let response_stat = ProxyResponseStat ::new (
method . to_string ( ) ,
2022-10-10 07:15:07 +03:00
authorized_key ,
request_metadata ,
2022-10-11 22:58:25 +03:00
& response ,
2022-10-03 23:02:05 +03:00
) ;
2022-10-21 02:50:23 +03:00
stat_sender
. send_async ( response_stat . into ( ) )
. await
. context ( " stat_sender sending response_stat " ) ? ;
2022-10-03 23:02:05 +03:00
}
2022-05-29 04:23:58 +03:00
2022-07-22 22:30:39 +03:00
return Ok ( response ) ;
2022-06-14 07:04:14 +03:00
}
2022-07-22 22:30:39 +03:00
} ;
2022-09-07 06:54:16 +03:00
let response = JsonRpcForwardedResponse ::from_value ( partial_response , request_id ) ;
2022-07-22 22:30:39 +03:00
2022-10-12 00:31:34 +03:00
if let ( Some ( stat_sender ) , Ok ( AuthorizedRequest ::User ( Some ( _ ) , authorized_key ) ) ) = (
self . stat_sender . as_ref ( ) ,
Arc ::try_unwrap ( authorized_request ) ,
) {
let response_stat =
ProxyResponseStat ::new ( request . method , authorized_key , request_metadata , & response ) ;
2022-10-21 02:50:23 +03:00
stat_sender
. send_async ( response_stat . into ( ) )
. await
. context ( " stat_sender sending response stat " ) ? ;
2022-10-12 00:31:34 +03:00
}
2022-07-22 22:30:39 +03:00
Ok ( response )
2022-05-12 02:50:52 +03:00
}
}
2022-08-24 03:59:05 +03:00
impl fmt ::Debug for Web3ProxyApp {
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f . debug_struct ( " Web3ProxyApp " ) . finish_non_exhaustive ( )
}
}