2022-08-10 05:37:34 +03:00
// TODO: this file is way too big now. move things into other modules
2022-11-22 01:52:47 +03:00
mod ws ;
2022-08-10 05:37:34 +03:00
2022-10-19 21:38:00 +03:00
use crate ::app_stats ::{ ProxyResponseStat , StatEmitter , Web3ProxyStat } ;
2022-12-17 07:05:01 +03:00
use crate ::block_number ::{ block_needed , BlockNeeded } ;
2022-08-24 03:59:05 +03:00
use crate ::config ::{ AppConfig , TopConfig } ;
2023-01-19 03:17:43 +03:00
use crate ::frontend ::authorization ::{ Authorization , RequestMetadata , RpcSecretKey } ;
2022-12-20 08:37:12 +03:00
use crate ::frontend ::errors ::FrontendErrorResponse ;
2023-01-17 09:54:40 +03:00
use crate ::frontend ::rpc_proxy_ws ::ProxyMode ;
2022-12-29 00:50:34 +03:00
use crate ::jsonrpc ::{
JsonRpcForwardedResponse , JsonRpcForwardedResponseEnum , JsonRpcRequest , JsonRpcRequestEnum ,
} ;
2023-02-27 09:44:09 +03:00
use crate ::rpcs ::blockchain ::Web3ProxyBlock ;
2023-02-06 20:55:27 +03:00
use crate ::rpcs ::many ::Web3Rpcs ;
use crate ::rpcs ::one ::Web3Rpc ;
2022-08-24 03:59:05 +03:00
use crate ::rpcs ::transactions ::TxStatus ;
2022-12-14 05:13:23 +03:00
use crate ::user_token ::UserBearerToken ;
2022-07-26 07:53:38 +03:00
use anyhow ::Context ;
2022-10-27 00:39:26 +03:00
use axum ::headers ::{ Origin , Referer , UserAgent } ;
2022-12-28 09:11:18 +03:00
use chrono ::Utc ;
2022-09-15 20:57:24 +03:00
use deferred_rate_limiter ::DeferredRateLimiter ;
2022-08-10 05:37:34 +03:00
use derive_more ::From ;
2022-12-12 07:39:54 +03:00
use entities ::sea_orm_active_enums ::LogLevel ;
2023-01-03 04:06:36 +03:00
use entities ::user ;
2022-07-22 22:30:39 +03:00
use ethers ::core ::utils ::keccak256 ;
2023-02-14 23:14:50 +03:00
use ethers ::prelude ::{ Address , Bytes , Transaction , TxHash , H256 , U64 } ;
2023-01-20 05:08:53 +03:00
use ethers ::types ::U256 ;
2022-12-24 04:32:58 +03:00
use ethers ::utils ::rlp ::{ Decodable , Rlp } ;
2023-02-27 10:52:37 +03:00
use futures ::future ::join_all ;
2022-12-29 00:50:34 +03:00
use futures ::stream ::{ FuturesUnordered , StreamExt } ;
2022-12-20 08:37:12 +03:00
use hashbrown ::{ HashMap , HashSet } ;
2022-09-23 08:22:33 +03:00
use ipnet ::IpNet ;
2023-01-02 21:44:36 +03:00
use log ::{ debug , error , info , trace , warn , Level } ;
2023-01-03 04:06:36 +03:00
use migration ::sea_orm ::{
self , ConnectionTrait , Database , DatabaseConnection , EntityTrait , PaginatorTrait ,
} ;
2022-11-14 21:24:52 +03:00
use migration ::sea_query ::table ::ColumnDef ;
2022-11-14 22:13:42 +03:00
use migration ::{ Alias , DbErr , Migrator , MigratorTrait , Table } ;
2022-09-05 08:53:58 +03:00
use moka ::future ::Cache ;
2023-03-03 04:39:50 +03:00
use rdkafka ::message ::{ Header , OwnedHeaders } ;
use rdkafka ::producer ::FutureRecord ;
2022-12-29 09:21:09 +03:00
use redis_rate_limiter ::redis ::AsyncCommands ;
2022-12-29 00:50:34 +03:00
use redis_rate_limiter ::{ redis , DeadpoolRuntime , RedisConfig , RedisPool , RedisRateLimiter } ;
2022-09-09 06:53:16 +03:00
use serde ::Serialize ;
2022-05-30 21:23:55 +03:00
use serde_json ::json ;
2022-12-24 04:32:58 +03:00
use serde_json ::value ::to_raw_value ;
2023-02-27 10:52:37 +03:00
use std ::fmt ;
2022-12-17 07:05:01 +03:00
use std ::hash ::{ Hash , Hasher } ;
2022-09-15 20:57:24 +03:00
use std ::net ::IpAddr ;
2022-11-10 02:58:07 +03:00
use std ::num ::NonZeroU64 ;
2022-07-14 02:25:01 +03:00
use std ::str ::FromStr ;
2022-12-29 00:50:34 +03:00
use std ::sync ::{ atomic , Arc } ;
2022-05-12 02:50:52 +03:00
use std ::time ::Duration ;
2022-09-27 05:01:45 +03:00
use tokio ::sync ::{ broadcast , watch , Semaphore } ;
2022-06-14 08:43:28 +03:00
use tokio ::task ::JoinHandle ;
2022-11-14 21:24:52 +03:00
use tokio ::time ::{ sleep , timeout } ;
2022-09-24 08:53:45 +03:00
use ulid ::Ulid ;
2022-05-12 02:50:52 +03:00
2022-07-09 05:23:26 +03:00
// TODO: make this customizable?
2023-01-18 07:18:18 +03:00
// TODO: include GIT_REF in here. i had trouble getting https://docs.rs/vergen/latest/vergen/ to work with a workspace. also .git is in .dockerignore
2022-11-08 22:58:11 +03:00
pub static APP_USER_AGENT : & str = concat! (
2023-01-18 07:18:18 +03:00
" llamanodes_ " ,
2022-05-12 02:50:52 +03:00
env! ( " CARGO_PKG_NAME " ) ,
2023-01-18 07:18:18 +03:00
" /v " ,
env! ( " CARGO_PKG_VERSION " )
2022-05-12 02:50:52 +03:00
) ;
2022-11-20 01:05:51 +03:00
/// TODO: allow customizing the request period?
pub static REQUEST_PERIOD : u64 = 60 ;
2023-02-27 09:59:42 +03:00
#[ derive(Debug, From) ]
2022-12-17 07:05:01 +03:00
struct ResponseCacheKey {
// if none, this is cached until evicted
2023-02-14 23:14:50 +03:00
from_block : Option < Web3ProxyBlock > ,
2023-01-31 19:30:24 +03:00
// to_block is only set when ranges of blocks are requested (like with eth_getLogs)
2023-02-14 23:14:50 +03:00
to_block : Option < Web3ProxyBlock > ,
2022-12-17 07:05:01 +03:00
method : String ,
// TODO: better type for this
params : Option < serde_json ::Value > ,
cache_errors : bool ,
}
impl ResponseCacheKey {
fn weight ( & self ) -> usize {
let mut w = self . method . len ( ) ;
if let Some ( p ) = self . params . as_ref ( ) {
w + = p . to_string ( ) . len ( ) ;
}
w
}
}
impl PartialEq for ResponseCacheKey {
fn eq ( & self , other : & Self ) -> bool {
if self . cache_errors ! = other . cache_errors {
return false ;
}
2023-01-31 19:30:24 +03:00
match ( self . from_block . as_ref ( ) , other . from_block . as_ref ( ) ) {
( None , None ) = > { }
( None , Some ( _ ) ) = > {
return false ;
}
( Some ( _ ) , None ) = > {
return false ;
}
( Some ( s ) , Some ( o ) ) = > {
if s ! = o {
return false ;
}
}
}
match ( self . to_block . as_ref ( ) , other . to_block . as_ref ( ) ) {
2022-12-17 07:05:01 +03:00
( None , None ) = > { }
( None , Some ( _ ) ) = > {
return false ;
}
( Some ( _ ) , None ) = > {
return false ;
}
( Some ( s ) , Some ( o ) ) = > {
if s ! = o {
return false ;
}
}
}
if self . method ! = other . method {
return false ;
}
self . params = = other . params
}
}
impl Eq for ResponseCacheKey { }
impl Hash for ResponseCacheKey {
fn hash < H : Hasher > ( & self , state : & mut H ) {
2023-01-31 19:30:24 +03:00
self . from_block . as_ref ( ) . map ( | x | x . hash ( ) ) . hash ( state ) ;
self . to_block . as_ref ( ) . map ( | x | x . hash ( ) ) . hash ( state ) ;
2022-12-17 07:05:01 +03:00
self . method . hash ( state ) ;
self . params . as_ref ( ) . map ( | x | x . to_string ( ) ) . hash ( state ) ;
self . cache_errors . hash ( state )
}
}
2022-09-20 04:33:39 +03:00
type ResponseCache =
Cache < ResponseCacheKey , JsonRpcForwardedResponse , hashbrown ::hash_map ::DefaultHashBuilder > ;
2022-05-21 01:16:15 +03:00
2022-06-14 08:43:28 +03:00
pub type AnyhowJoinHandle < T > = JoinHandle < anyhow ::Result < T > > ;
2022-09-24 05:47:44 +03:00
#[ derive(Clone, Debug, Default, From) ]
2022-11-08 22:58:11 +03:00
pub struct AuthorizationChecks {
2023-01-19 03:17:43 +03:00
/// database id of the primary user. 0 if anon
2022-11-08 22:58:11 +03:00
/// TODO: do we need this? its on the authorization so probably not
2022-10-21 23:59:05 +03:00
pub user_id : u64 ,
2023-01-19 03:17:43 +03:00
/// the key used (if any)
pub rpc_secret_key : Option < RpcSecretKey > ,
2022-10-27 03:12:42 +03:00
/// database id of the rpc key
2022-11-10 02:58:07 +03:00
/// if this is None, then this request is being rate limited by ip
2023-01-19 03:17:43 +03:00
pub rpc_secret_key_id : Option < NonZeroU64 > ,
2022-11-01 21:54:39 +03:00
/// if None, allow unlimited queries. inherited from the user_tier
2022-09-28 06:35:55 +03:00
pub max_requests_per_period : Option < u64 > ,
2022-11-01 21:54:39 +03:00
// if None, allow unlimited concurrent requests. inherited from the user_tier
2022-11-01 22:12:57 +03:00
pub max_concurrent_requests : Option < u32 > ,
2022-09-23 08:22:33 +03:00
/// if None, allow any Origin
2022-10-27 00:39:26 +03:00
pub allowed_origins : Option < Vec < Origin > > ,
2022-09-22 23:27:14 +03:00
/// if None, allow any Referer
2022-09-23 08:22:33 +03:00
pub allowed_referers : Option < Vec < Referer > > ,
2022-09-22 23:27:14 +03:00
/// if None, allow any UserAgent
2022-09-23 08:22:33 +03:00
pub allowed_user_agents : Option < Vec < UserAgent > > ,
/// if None, allow any IP Address
pub allowed_ips : Option < Vec < IpNet > > ,
2022-12-12 07:39:54 +03:00
pub log_level : LogLevel ,
2022-09-24 05:47:44 +03:00
/// Chance to save reverting eth_call, eth_estimateGas, and eth_sendRawTransaction to the database.
2022-11-01 21:54:39 +03:00
/// TODO: f32 would be fine
pub log_revert_chance : f64 ,
2023-01-12 01:51:01 +03:00
/// if true, transactions are broadcast to private mempools. They will still be public on the blockchain!
pub private_txs : bool ,
2023-03-03 04:39:50 +03:00
pub proxy_mode : ProxyMode ,
2022-08-10 08:56:09 +03:00
}
2022-12-16 11:48:24 +03:00
/// Simple wrapper so that we can keep track of read only connections.
/// This does no blocking of writing in the compiler!
#[ derive(Clone) ]
pub struct DatabaseReplica ( pub DatabaseConnection ) ;
// TODO: I feel like we could do something smart with DeRef or AsRef or Borrow, but that wasn't working for me
impl DatabaseReplica {
pub fn conn ( & self ) -> & DatabaseConnection {
& self . 0
}
}
2022-08-24 03:59:05 +03:00
/// The application
// TODO: this debug impl is way too verbose. make something smaller
// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard
pub struct Web3ProxyApp {
/// Send requests to the best server available
2023-02-06 20:55:27 +03:00
pub balanced_rpcs : Arc < Web3Rpcs > ,
2023-04-14 10:04:35 +03:00
/// Send 4337 Abstraction Bundler requests to one of these servers
pub bundler_4337_rpcs : Option < Arc < Web3Rpcs > > ,
2023-02-26 10:52:33 +03:00
pub http_client : Option < reqwest ::Client > ,
2022-08-24 03:59:05 +03:00
/// Send private requests (like eth_sendRawTransaction) to all these servers
2023-02-06 20:55:27 +03:00
pub private_rpcs : Option < Arc < Web3Rpcs > > ,
2022-09-05 04:52:59 +03:00
response_cache : ResponseCache ,
2022-08-24 03:59:05 +03:00
// don't drop this or the sender will stop working
// TODO: broadcast channel instead?
2023-02-15 04:41:40 +03:00
watch_consensus_head_receiver : watch ::Receiver < Option < Web3ProxyBlock > > ,
2022-08-24 03:59:05 +03:00
pending_tx_sender : broadcast ::Sender < TxStatus > ,
pub config : AppConfig ,
pub db_conn : Option < sea_orm ::DatabaseConnection > ,
2022-12-16 11:48:24 +03:00
pub db_replica : Option < DatabaseReplica > ,
2023-03-23 01:36:40 +03:00
pub hostname : Option < String > ,
2022-09-05 08:53:58 +03:00
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
2022-09-20 04:33:39 +03:00
pub pending_transactions : Cache < TxHash , TxStatus , hashbrown ::hash_map ::DefaultHashBuilder > ,
2022-09-15 20:57:24 +03:00
pub frontend_ip_rate_limiter : Option < DeferredRateLimiter < IpAddr > > ,
2022-12-29 00:50:34 +03:00
pub frontend_registered_user_rate_limiter : Option < DeferredRateLimiter < u64 > > ,
2022-09-24 06:59:21 +03:00
pub login_rate_limiter : Option < RedisRateLimiter > ,
2022-10-07 05:15:53 +03:00
pub vredis_pool : Option < RedisPool > ,
2022-11-01 21:54:39 +03:00
// TODO: this key should be our RpcSecretKey class, not Ulid
2022-11-08 22:58:11 +03:00
pub rpc_secret_key_cache :
Cache < Ulid , AuthorizationChecks , hashbrown ::hash_map ::DefaultHashBuilder > ,
2022-12-29 00:50:34 +03:00
pub registered_user_semaphores :
2022-11-10 02:58:07 +03:00
Cache < NonZeroU64 , Arc < Semaphore > , hashbrown ::hash_map ::DefaultHashBuilder > ,
2022-09-27 05:01:45 +03:00
pub ip_semaphores : Cache < IpAddr , Arc < Semaphore > , hashbrown ::hash_map ::DefaultHashBuilder > ,
2022-10-25 21:26:58 +03:00
pub bearer_token_semaphores :
2022-12-14 05:13:23 +03:00
Cache < UserBearerToken , Arc < Semaphore > , hashbrown ::hash_map ::DefaultHashBuilder > ,
2022-10-03 23:02:05 +03:00
pub stat_sender : Option < flume ::Sender < Web3ProxyStat > > ,
2023-03-03 04:39:50 +03:00
pub kafka_producer : Option < rdkafka ::producer ::FutureProducer > ,
2022-08-24 03:59:05 +03:00
}
2022-07-16 03:08:22 +03:00
/// flatten a JoinError into an anyhow error
2022-08-10 05:37:34 +03:00
/// Useful when joining multiple futures.
2022-06-14 08:43:28 +03:00
pub async fn flatten_handle < T > ( handle : AnyhowJoinHandle < T > ) -> anyhow ::Result < T > {
match handle . await {
Ok ( Ok ( result ) ) = > Ok ( result ) ,
Ok ( Err ( err ) ) = > Err ( err ) ,
Err ( err ) = > Err ( err . into ( ) ) ,
}
}
2022-07-16 03:08:22 +03:00
/// return the first error or okay if everything worked
2022-11-12 11:24:32 +03:00
2022-06-16 23:57:48 +03:00
pub async fn flatten_handles < T > (
mut handles : FuturesUnordered < AnyhowJoinHandle < T > > ,
2022-06-16 20:51:49 +03:00
) -> anyhow ::Result < ( ) > {
while let Some ( x ) = handles . next ( ) . await {
match x {
Err ( e ) = > return Err ( e . into ( ) ) ,
Ok ( Err ( e ) ) = > return Err ( e ) ,
2022-07-16 03:08:22 +03:00
Ok ( Ok ( _ ) ) = > continue ,
2022-06-16 20:51:49 +03:00
}
}
Ok ( ( ) )
}
2022-11-14 22:13:42 +03:00
pub async fn get_db (
2022-08-06 03:07:12 +03:00
db_url : String ,
2022-09-02 23:16:20 +03:00
min_connections : u32 ,
2022-08-10 07:30:54 +03:00
max_connections : u32 ,
2022-11-14 22:13:42 +03:00
) -> Result < DatabaseConnection , DbErr > {
2022-09-14 04:43:09 +03:00
// TODO: scrub credentials and then include the db_url in logs
info! ( " Connecting to db " ) ;
2022-08-06 03:07:12 +03:00
let mut db_opt = sea_orm ::ConnectOptions ::new ( db_url ) ;
2022-08-10 05:37:34 +03:00
// TODO: load all these options from the config file. i think mysql default max is 100
2022-08-06 03:07:12 +03:00
// TODO: sqlx logging only in debug. way too verbose for production
db_opt
2022-09-20 01:41:53 +03:00
. connect_timeout ( Duration ::from_secs ( 30 ) )
2022-09-02 23:16:20 +03:00
. min_connections ( min_connections )
2022-08-10 07:30:54 +03:00
. max_connections ( max_connections )
2022-08-06 03:07:12 +03:00
. sqlx_logging ( false ) ;
// .sqlx_logging_level(log::LevelFilter::Info);
2022-11-14 22:13:42 +03:00
Database ::connect ( db_opt ) . await
}
2022-08-06 03:07:12 +03:00
2022-11-14 22:13:42 +03:00
pub async fn drop_migration_lock ( db_conn : & DatabaseConnection ) -> Result < ( ) , DbErr > {
2022-11-14 21:24:52 +03:00
let db_backend = db_conn . get_database_backend ( ) ;
2022-11-14 22:13:42 +03:00
let drop_lock_statment = db_backend . build ( Table ::drop ( ) . table ( Alias ::new ( " migration_lock " ) ) ) ;
db_conn . execute ( drop_lock_statment ) . await ? ;
2022-12-12 07:39:54 +03:00
debug! ( " migration lock unlocked " ) ;
2022-11-14 22:13:42 +03:00
Ok ( ( ) )
}
2023-01-20 08:28:33 +03:00
/// Be super careful with override_existing_lock! It is very important that only one process is running the migrations at a time!
pub async fn migrate_db (
db_conn : & DatabaseConnection ,
override_existing_lock : bool ,
) -> Result < ( ) , DbErr > {
2022-11-14 22:13:42 +03:00
let db_backend = db_conn . get_database_backend ( ) ;
2022-11-14 21:24:52 +03:00
2023-01-20 08:28:33 +03:00
// TODO: put the timestamp and hostname into this as columns?
2022-11-14 21:24:52 +03:00
let create_lock_statment = db_backend . build (
Table ::create ( )
2022-11-14 22:13:42 +03:00
. table ( Alias ::new ( " migration_lock " ) )
2022-11-14 21:24:52 +03:00
. col ( ColumnDef ::new ( Alias ::new ( " locked " ) ) . boolean ( ) . default ( true ) ) ,
) ;
loop {
2023-02-08 22:11:16 +03:00
if Migrator ::get_pending_migrations ( db_conn ) . await ? . is_empty ( ) {
2022-11-14 21:24:52 +03:00
info! ( " no migrations to apply " ) ;
2023-01-20 08:28:33 +03:00
return Ok ( ( ) ) ;
2022-11-14 21:24:52 +03:00
}
// there are migrations to apply
// acquire a lock
if let Err ( err ) = db_conn . execute ( create_lock_statment . clone ( ) ) . await {
2023-01-20 08:28:33 +03:00
if override_existing_lock {
warn! ( " OVERRIDING EXISTING LOCK in 10 seconds! ctrl+c now if other migrations are actually running! " ) ;
sleep ( Duration ::from_secs ( 10 ) ) . await
} else {
debug! ( " Unable to acquire lock. if you are positive no migration is running, run \" web3_proxy_cli drop_migration_lock \" . err={:?} " , err ) ;
2022-11-14 21:24:52 +03:00
2023-01-20 08:28:33 +03:00
// TODO: exponential backoff with jitter?
sleep ( Duration ::from_secs ( 1 ) ) . await ;
2022-11-14 21:24:52 +03:00
2023-01-20 08:28:33 +03:00
continue ;
}
2022-11-14 21:24:52 +03:00
}
debug! ( " migration lock acquired " ) ;
break ;
}
2023-02-08 22:11:16 +03:00
let migration_result = Migrator ::up ( db_conn , None ) . await ;
2022-11-14 21:24:52 +03:00
// drop the distributed lock
2023-02-08 22:11:16 +03:00
drop_migration_lock ( db_conn ) . await ? ;
2022-11-14 21:24:52 +03:00
// return if migrations erred
2023-01-20 08:28:33 +03:00
migration_result
}
/// Connect to the database and run migrations
pub async fn get_migrated_db (
db_url : String ,
min_connections : u32 ,
max_connections : u32 ,
) -> Result < DatabaseConnection , DbErr > {
// TODO: this seems to fail silently
let db_conn = get_db ( db_url , min_connections , max_connections ) . await ? ;
migrate_db ( & db_conn , false ) . await ? ;
2022-08-06 03:07:12 +03:00
2022-10-20 09:17:20 +03:00
Ok ( db_conn )
2022-08-06 03:07:12 +03:00
}
2022-10-31 23:05:58 +03:00
#[ derive(From) ]
pub struct Web3ProxyAppSpawn {
/// the app. probably clone this to use in other groups of handles
pub app : Arc < Web3ProxyApp > ,
2023-02-26 10:52:33 +03:00
/// handles for the balanced and private rpcs
pub app_handles : FuturesUnordered < AnyhowJoinHandle < ( ) > > ,
2022-10-31 23:05:58 +03:00
/// these are important and must be allowed to finish
pub background_handles : FuturesUnordered < AnyhowJoinHandle < ( ) > > ,
2023-02-27 10:52:37 +03:00
/// config changes are sent here
pub new_top_config_sender : watch ::Sender < TopConfig > ,
2022-10-31 23:05:58 +03:00
}
2022-05-12 02:50:52 +03:00
impl Web3ProxyApp {
2022-10-03 21:08:01 +03:00
/// The main entrypoint.
2022-06-14 07:04:14 +03:00
pub async fn spawn (
2022-08-12 22:07:14 +03:00
top_config : TopConfig ,
2022-09-14 09:18:13 +03:00
num_workers : usize ,
2022-10-21 01:51:56 +03:00
shutdown_receiver : broadcast ::Receiver < ( ) > ,
2022-10-31 23:05:58 +03:00
) -> anyhow ::Result < Web3ProxyAppSpawn > {
2023-02-26 10:52:33 +03:00
// safety checks on the config
// while i would prefer this to be in a "apply_top_config" function, that is a larger refactor
2023-02-27 09:44:09 +03:00
// TODO: maybe don't spawn with a config at all. have all config updates come through an apply_top_config call
2023-02-26 10:52:33 +03:00
if let Some ( redirect ) = & top_config . app . redirect_rpc_key_url {
assert! (
redirect . contains ( " {{rpc_key_id}} " ) ,
" redirect_rpc_key_url user url must contain \" {{rpc_key_id}} \" "
) ;
}
if ! top_config . extra . is_empty ( ) {
warn! (
" unknown TopConfig fields!: {:?} " ,
top_config . app . extra . keys ( )
) ;
}
if ! top_config . app . extra . is_empty ( ) {
warn! (
" unknown Web3ProxyAppConfig fields!: {:?} " ,
top_config . app . extra . keys ( )
) ;
}
2023-02-27 09:44:09 +03:00
// these futures are key parts of the app. if they stop running, the app has encountered an irrecoverable error
let app_handles = FuturesUnordered ::new ( ) ;
2023-02-25 20:48:40 +03:00
// we must wait for these to end on their own (and they need to subscribe to shutdown_sender)
let important_background_handles = FuturesUnordered ::new ( ) ;
2022-12-28 19:36:22 +03:00
2023-02-27 09:44:09 +03:00
// connect to the database and make sure the latest migrations have run
2022-12-16 11:48:24 +03:00
let mut db_conn = None ::< DatabaseConnection > ;
let mut db_replica = None ::< DatabaseReplica > ;
if let Some ( db_url ) = top_config . app . db_url . clone ( ) {
2022-09-14 09:18:13 +03:00
let db_min_connections = top_config
. app
. db_min_connections
. unwrap_or ( num_workers as u32 ) ;
2022-07-26 07:53:38 +03:00
2022-09-02 23:16:20 +03:00
// TODO: what default multiple?
2022-10-03 21:08:01 +03:00
let db_max_connections = top_config
2022-09-02 23:16:20 +03:00
. app
. db_max_connections
2022-09-06 23:50:37 +03:00
. unwrap_or ( db_min_connections * 2 ) ;
2022-09-02 23:16:20 +03:00
2022-12-21 00:38:10 +03:00
db_conn = Some (
get_migrated_db ( db_url . clone ( ) , db_min_connections , db_max_connections ) . await ? ,
) ;
2022-12-16 11:48:24 +03:00
db_replica = if let Some ( db_replica_url ) = top_config . app . db_replica_url . clone ( ) {
2022-12-21 00:38:10 +03:00
if db_replica_url = = db_url {
// url is the same. do not make a new connection or we might go past our max connections
db_conn . clone ( ) . map ( DatabaseReplica )
} else {
let db_replica_min_connections = top_config
. app
. db_replica_min_connections
. unwrap_or ( db_min_connections ) ;
let db_replica_max_connections = top_config
. app
. db_replica_max_connections
. unwrap_or ( db_max_connections ) ;
let db_replica = get_db (
db_replica_url ,
db_replica_min_connections ,
db_replica_max_connections ,
)
. await ? ;
Some ( DatabaseReplica ( db_replica ) )
}
2022-12-16 11:48:24 +03:00
} else {
// just clone so that we don't need a bunch of checks all over our code
db_conn . clone ( ) . map ( DatabaseReplica )
} ;
2022-07-26 07:53:38 +03:00
} else {
2022-12-16 11:48:24 +03:00
if top_config . app . db_replica_url . is_some ( ) {
return Err ( anyhow ::anyhow! (
" if there is a db_replica_url, there must be a db_url "
) ) ;
}
2022-11-20 01:05:51 +03:00
warn! ( " no database. some features will be disabled " ) ;
2022-07-26 07:53:38 +03:00
} ;
2023-03-03 04:39:50 +03:00
// connect to kafka for logging requests from the /debug/ urls
let mut kafka_producer : Option < rdkafka ::producer ::FutureProducer > = None ;
if let Some ( kafka_brokers ) = top_config . app . kafka_urls . clone ( ) {
match rdkafka ::ClientConfig ::new ( )
. set ( " bootstrap.servers " , kafka_brokers )
. set ( " message.timeout.ms " , " 5000 " )
. create ( )
{
Ok ( k ) = > kafka_producer = Some ( k ) ,
Err ( err ) = > error! ( " Failed connecting to kafka. This will not retry. {:?} " , err ) ,
}
}
2023-02-25 20:48:40 +03:00
// TODO: do this during apply_config so that we can change redis url while running
2022-10-03 21:08:01 +03:00
// create a connection pool for redis
// a failure to connect does NOT block the application from starting
2022-10-07 05:15:53 +03:00
let vredis_pool = match top_config . app . volatile_redis_url . as_ref ( ) {
2022-07-26 07:53:38 +03:00
Some ( redis_url ) = > {
2022-09-14 04:43:09 +03:00
// TODO: scrub credentials and then include the redis_url in logs
2022-10-07 05:15:53 +03:00
info! ( " Connecting to vredis " ) ;
2022-05-22 21:39:06 +03:00
2022-09-14 09:18:13 +03:00
// TODO: what is a good default?
2022-09-02 23:16:20 +03:00
let redis_max_connections = top_config
. app
2022-10-07 05:15:53 +03:00
. volatile_redis_max_connections
2022-09-14 09:18:13 +03:00
. unwrap_or ( num_workers * 2 ) ;
2022-09-20 06:26:12 +03:00
// TODO: what are reasonable timeouts?
2022-09-14 09:18:13 +03:00
let redis_pool = RedisConfig ::from_url ( redis_url )
. builder ( ) ?
. max_size ( redis_max_connections )
2022-09-15 20:57:24 +03:00
. runtime ( DeadpoolRuntime ::Tokio1 )
2022-09-14 09:18:13 +03:00
. build ( ) ? ;
2022-09-20 06:26:12 +03:00
// test the redis pool
2022-09-17 04:19:11 +03:00
if let Err ( err ) = redis_pool . get ( ) . await {
2022-09-17 05:17:20 +03:00
error! (
2022-11-12 11:24:32 +03:00
" failed to connect to vredis. some features will be disabled. err={:?} " ,
err
2022-09-17 05:17:20 +03:00
) ;
2022-09-17 04:19:11 +03:00
} ;
2022-09-14 09:18:13 +03:00
Some ( redis_pool )
2022-05-22 21:39:06 +03:00
}
None = > {
2022-10-07 05:15:53 +03:00
warn! ( " no redis connection. some features will be disabled " ) ;
2022-05-22 21:39:06 +03:00
None
2022-05-22 02:34:05 +03:00
}
} ;
2022-05-12 02:50:52 +03:00
2022-10-07 05:15:53 +03:00
// setup a channel for receiving stats (generally with a high cardinality, such as per-user)
2022-10-03 21:08:01 +03:00
// we do this in a channel so we don't slow down our response to the users
2022-10-10 07:15:07 +03:00
let stat_sender = if let Some ( db_conn ) = db_conn . clone ( ) {
2022-11-03 02:14:16 +03:00
let emitter_spawn =
StatEmitter ::spawn ( top_config . app . chain_id , db_conn , 60 , shutdown_receiver ) ? ;
2022-10-10 07:15:07 +03:00
2022-11-03 02:14:16 +03:00
important_background_handles . push ( emitter_spawn . background_handle ) ;
2022-10-03 21:08:01 +03:00
2022-11-03 02:14:16 +03:00
Some ( emitter_spawn . stat_sender )
2022-10-03 21:08:01 +03:00
} else {
2022-11-03 02:14:16 +03:00
warn! ( " cannot store stats without a database connection " ) ;
2022-10-03 21:08:01 +03:00
2022-11-04 22:52:46 +03:00
// TODO: subscribe to the shutdown_receiver here since the stat emitter isn't running?
2022-10-03 21:08:01 +03:00
None
} ;
2023-02-25 20:48:40 +03:00
// make a http shared client
// TODO: can we configure the connection pool? should we?
// TODO: timeouts from config. defaults are hopefully good
let http_client = Some (
reqwest ::ClientBuilder ::new ( )
. connect_timeout ( Duration ::from_secs ( 5 ) )
. timeout ( Duration ::from_secs ( 5 * 60 ) )
. user_agent ( APP_USER_AGENT )
. build ( ) ? ,
) ;
2023-02-26 10:52:33 +03:00
// create rate limiters
// these are optional. they require redis
let mut frontend_ip_rate_limiter = None ;
let mut frontend_registered_user_rate_limiter = None ;
let mut login_rate_limiter = None ;
if let Some ( redis_pool ) = vredis_pool . as_ref ( ) {
if let Some ( public_requests_per_period ) = top_config . app . public_requests_per_period {
// chain id is included in the app name so that rpc rate limits are per-chain
let rpc_rrl = RedisRateLimiter ::new (
& format! ( " web3_proxy: {} " , top_config . app . chain_id ) ,
" frontend " ,
public_requests_per_period ,
60.0 ,
redis_pool . clone ( ) ,
) ;
// these two rate limiters can share the base limiter
// these are deferred rate limiters because we don't want redis network requests on the hot path
// TODO: take cache_size from config
frontend_ip_rate_limiter = Some ( DeferredRateLimiter ::< IpAddr > ::new (
10_000 ,
" ip " ,
rpc_rrl . clone ( ) ,
None ,
) ) ;
frontend_registered_user_rate_limiter = Some ( DeferredRateLimiter ::< u64 > ::new (
10_000 , " key " , rpc_rrl , None ,
) ) ;
}
// login rate limiter
login_rate_limiter = Some ( RedisRateLimiter ::new (
" web3_proxy " ,
" login " ,
top_config . app . login_rate_limit_per_period ,
60.0 ,
redis_pool . clone ( ) ,
) ) ;
}
2022-09-06 15:29:37 +03:00
// TODO: i don't like doing Block::default here! Change this to "None"?
2023-02-15 04:41:40 +03:00
let ( watch_consensus_head_sender , watch_consensus_head_receiver ) = watch ::channel ( None ) ;
2022-07-09 03:00:31 +03:00
// TODO: will one receiver lagging be okay? how big should this be?
2022-07-26 07:53:38 +03:00
let ( pending_tx_sender , pending_tx_receiver ) = broadcast ::channel ( 256 ) ;
// TODO: use this? it could listen for confirmed transactions and then clear pending_transactions, but the head_block_sender is doing that
2022-10-03 21:08:01 +03:00
// TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them?
2022-09-20 01:24:56 +03:00
// TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks.
// TODO: we should still have some sort of expiration or maximum size limit for the map
2022-07-26 07:53:38 +03:00
drop ( pending_tx_receiver ) ;
2022-09-20 04:33:39 +03:00
2022-09-17 05:30:06 +03:00
// TODO: capacity from configs
// all these are the same size, so no need for a weigher
2022-10-11 22:58:25 +03:00
// TODO: ttl on this? or is max_capacity fine?
2022-09-17 05:17:20 +03:00
let pending_transactions = Cache ::builder ( )
. max_capacity ( 10_000 )
2023-02-15 22:42:25 +03:00
// TODO: different chains might handle this differently
// TODO: what should we set? 5 minutes is arbitrary. the nodes themselves hold onto transactions for much longer
. time_to_idle ( Duration ::from_secs ( 300 ) )
2022-11-11 21:40:52 +03:00
. build_with_hasher ( hashbrown ::hash_map ::DefaultHashBuilder ::default ( ) ) ;
2022-06-16 20:51:49 +03:00
2023-02-25 20:48:40 +03:00
// responses can be very different in sizes, so this is a cache with a max capacity and a weigher
// TODO: don't allow any response to be bigger than X% of the cache
let response_cache = Cache ::builder ( )
. max_capacity ( top_config . app . response_cache_max_bytes )
. weigher ( | k : & ResponseCacheKey , v | {
// TODO: is this good enough?
if let Ok ( v ) = serde_json ::to_string ( v ) {
let weight = k . weight ( ) + v . len ( ) ;
// the or in unwrap_or is probably never called
weight . try_into ( ) . unwrap_or ( u32 ::MAX )
} else {
// this seems impossible
u32 ::MAX
}
} )
// TODO: what should we set? 10 minutes is arbitrary. the nodes themselves hold onto transactions for much longer
. time_to_idle ( Duration ::from_secs ( 600 ) )
. build_with_hasher ( hashbrown ::hash_map ::DefaultHashBuilder ::default ( ) ) ;
// all the users are the same size, so no need for a weigher
// if there is no database of users, there will be no keys and so this will be empty
// TODO: max_capacity from config
// TODO: ttl from config
let rpc_secret_key_cache = Cache ::builder ( )
. max_capacity ( 10_000 )
. time_to_live ( Duration ::from_secs ( 600 ) )
. build_with_hasher ( hashbrown ::hash_map ::DefaultHashBuilder ::default ( ) ) ;
// create semaphores for concurrent connection limits
// TODO: what should tti be for semaphores?
let bearer_token_semaphores = Cache ::builder ( )
. time_to_idle ( Duration ::from_secs ( 120 ) )
. build_with_hasher ( hashbrown ::hash_map ::DefaultHashBuilder ::default ( ) ) ;
let ip_semaphores = Cache ::builder ( )
. time_to_idle ( Duration ::from_secs ( 120 ) )
. build_with_hasher ( hashbrown ::hash_map ::DefaultHashBuilder ::default ( ) ) ;
let registered_user_semaphores = Cache ::builder ( )
. time_to_idle ( Duration ::from_secs ( 120 ) )
. build_with_hasher ( hashbrown ::hash_map ::DefaultHashBuilder ::default ( ) ) ;
2023-02-26 10:52:33 +03:00
// prepare a Web3Rpcs to hold all our balanced connections
let ( balanced_rpcs , balanced_rpcs_handle ) = Web3Rpcs ::spawn (
2022-08-12 22:07:14 +03:00
top_config . app . chain_id ,
2022-11-08 22:58:11 +03:00
db_conn . clone ( ) ,
2022-07-19 07:21:32 +03:00
http_client . clone ( ) ,
2023-02-14 23:14:50 +03:00
top_config . app . max_block_age ,
top_config . app . max_block_lag ,
2022-08-27 03:33:45 +03:00
top_config . app . min_synced_rpcs ,
2023-02-12 12:22:53 +03:00
top_config . app . min_sum_soft_limit ,
2022-06-16 20:51:49 +03:00
pending_transactions . clone ( ) ,
2023-02-12 12:22:53 +03:00
Some ( pending_tx_sender . clone ( ) ) ,
Some ( watch_consensus_head_sender ) ,
2022-05-22 02:34:05 +03:00
)
2022-07-26 07:53:38 +03:00
. await
2022-10-03 21:08:01 +03:00
. context ( " spawning balanced rpcs " ) ? ;
2022-05-18 19:35:06 +03:00
2023-02-26 10:52:33 +03:00
app_handles . push ( balanced_rpcs_handle ) ;
2022-06-14 08:43:28 +03:00
2023-02-26 10:52:33 +03:00
// prepare a Web3Rpcs to hold all our private connections
2022-10-03 21:08:01 +03:00
// only some chains have this, so this is optional
2023-02-26 10:52:33 +03:00
let private_rpcs = if top_config . private_rpcs . is_none ( ) {
2022-05-12 02:50:52 +03:00
warn! ( " No private relays configured. Any transactions will be broadcast to the public mempool! " ) ;
2022-09-14 07:27:18 +03:00
None
2022-05-12 02:50:52 +03:00
} else {
2023-02-25 20:48:40 +03:00
// TODO: do something with the spawn handle
2023-02-26 10:52:33 +03:00
let ( private_rpcs , private_rpcs_handle ) = Web3Rpcs ::spawn (
2022-08-12 22:07:14 +03:00
top_config . app . chain_id ,
2022-11-08 22:58:11 +03:00
db_conn . clone ( ) ,
2022-07-19 07:21:32 +03:00
http_client . clone ( ) ,
2023-02-14 23:14:50 +03:00
// private rpcs don't get subscriptions, so no need for max_block_age or max_block_lag
None ,
None ,
2023-02-12 12:22:53 +03:00
0 ,
0 ,
pending_transactions . clone ( ) ,
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits, but they should have
None ,
2022-08-11 00:29:50 +03:00
// subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs
2023-01-23 09:02:08 +03:00
// they also often have low rate limits
// however, they are well connected to miners/validators. so maybe using them as a safety check would be good
2023-02-12 12:22:53 +03:00
// TODO: but maybe we could include privates in the "backup" tier
2022-08-11 00:29:50 +03:00
None ,
2022-05-22 02:34:05 +03:00
)
2022-07-26 07:53:38 +03:00
. await
2022-10-03 21:08:01 +03:00
. context ( " spawning private_rpcs " ) ? ;
2022-06-14 08:43:28 +03:00
2023-02-26 10:52:33 +03:00
app_handles . push ( private_rpcs_handle ) ;
2023-02-25 20:48:40 +03:00
2023-02-26 10:52:33 +03:00
Some ( private_rpcs )
2022-05-12 02:50:52 +03:00
} ;
2023-04-14 10:04:35 +03:00
// prepare a Web3Rpcs to hold all our 4337 Abstraction Bundler connections
// only some chains have this, so this is optional
let bundler_4337_rpcs = if top_config . bundler_4337_rpcs . is_none ( ) {
warn! ( " No bundler_4337_rpcs configured " ) ;
None
} else {
// TODO: do something with the spawn handle
let ( bundler_4337_rpcs , bundler_4337_rpcs_handle ) = Web3Rpcs ::spawn (
top_config . app . chain_id ,
db_conn . clone ( ) ,
http_client . clone ( ) ,
// bundler_4337_rpcs don't get subscriptions, so no need for max_block_age or max_block_lag
None ,
None ,
0 ,
0 ,
pending_transactions . clone ( ) ,
None ,
// subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs
// they also often have low rate limits
// however, they are well connected to miners/validators. so maybe using them as a safety check would be good
// TODO: but maybe we could include privates in the "backup" tier
None ,
)
. await
. context ( " spawning bundler_4337_rpcs " ) ? ;
app_handles . push ( bundler_4337_rpcs_handle ) ;
Some ( bundler_4337_rpcs )
} ;
let hostname = hostname ::get ( )
. ok ( )
. and_then ( | x | x . to_str ( ) . map ( | x | x . to_string ( ) ) ) ;
2023-03-23 01:18:54 +03:00
2023-02-25 20:48:40 +03:00
let app = Self {
2023-02-26 10:52:33 +03:00
config : top_config . app . clone ( ) ,
2023-02-25 20:48:40 +03:00
balanced_rpcs ,
2023-04-14 10:04:35 +03:00
bundler_4337_rpcs ,
2023-02-26 10:52:33 +03:00
http_client ,
2023-03-03 04:39:50 +03:00
kafka_producer ,
2023-02-25 20:48:40 +03:00
private_rpcs ,
response_cache ,
watch_consensus_head_receiver ,
pending_tx_sender ,
pending_transactions ,
frontend_ip_rate_limiter ,
frontend_registered_user_rate_limiter ,
login_rate_limiter ,
db_conn ,
db_replica ,
2023-03-23 01:18:54 +03:00
hostname ,
2023-02-25 20:48:40 +03:00
vredis_pool ,
rpc_secret_key_cache ,
bearer_token_semaphores ,
ip_semaphores ,
registered_user_semaphores ,
stat_sender ,
} ;
let app = Arc ::new ( app ) ;
2023-02-27 09:44:09 +03:00
// watch for config changes
// TODO: initial config reload should be from this channel. not from the call to spawn
2023-02-27 10:52:37 +03:00
let ( new_top_config_sender , mut new_top_config_receiver ) = watch ::channel ( top_config ) ;
2023-02-27 09:44:09 +03:00
2023-02-27 10:52:37 +03:00
{
2023-02-27 09:44:09 +03:00
let app = app . clone ( ) ;
let config_handle = tokio ::spawn ( async move {
loop {
2023-02-27 10:52:37 +03:00
let new_top_config = new_top_config_receiver . borrow_and_update ( ) . to_owned ( ) ;
2023-02-27 09:44:09 +03:00
app . apply_top_config ( new_top_config )
. await
. context ( " failed applying new top_config " ) ? ;
2023-02-27 10:52:37 +03:00
new_top_config_receiver
2023-02-27 09:44:09 +03:00
. changed ( )
. await
. context ( " failed awaiting top_config change " ) ? ;
info! ( " config changed " ) ;
}
} ) ;
app_handles . push ( config_handle ) ;
}
2023-02-25 20:48:40 +03:00
2023-02-27 10:52:37 +03:00
Ok ( (
app ,
app_handles ,
important_background_handles ,
new_top_config_sender ,
)
. into ( ) )
2023-02-25 20:48:40 +03:00
}
2023-02-26 10:52:33 +03:00
pub async fn apply_top_config ( & self , new_top_config : TopConfig ) -> anyhow ::Result < ( ) > {
// TODO: also update self.config from new_top_config.app
2023-02-25 20:48:40 +03:00
2023-02-26 10:52:33 +03:00
// connect to the backends
self . balanced_rpcs
. apply_server_configs ( self , new_top_config . balanced_rpcs )
. await ? ;
2023-02-25 20:48:40 +03:00
2023-02-26 10:52:33 +03:00
if let Some ( private_rpc_configs ) = new_top_config . private_rpcs {
if let Some ( private_rpcs ) = self . private_rpcs . as_ref ( ) {
private_rpcs
. apply_server_configs ( self , private_rpc_configs )
. await ? ;
} else {
// TODO: maybe we should have private_rpcs just be empty instead of being None
todo! ( " handle toggling private_rpcs " )
2023-02-22 08:10:23 +03:00
}
2022-09-15 20:57:24 +03:00
}
2022-07-07 06:22:09 +03:00
2023-04-14 10:04:35 +03:00
if let Some ( bundler_4337_rpc_configs ) = new_top_config . bundler_4337_rpcs {
if let Some ( bundler_4337_rpcs ) = self . bundler_4337_rpcs . as_ref ( ) {
bundler_4337_rpcs
. apply_server_configs ( self , bundler_4337_rpc_configs )
. await ? ;
} else {
// TODO: maybe we should have bundler_4337_rpcs just be empty instead of being None
todo! ( " handle toggling bundler_4337_rpcs " )
}
}
2023-02-25 20:48:40 +03:00
Ok ( ( ) )
2022-05-12 02:50:52 +03:00
}
2023-02-15 04:41:40 +03:00
pub fn head_block_receiver ( & self ) -> watch ::Receiver < Option < Web3ProxyBlock > > {
2023-01-23 09:02:08 +03:00
self . watch_consensus_head_receiver . clone ( )
2023-01-19 13:13:00 +03:00
}
2022-12-28 09:11:18 +03:00
pub async fn prometheus_metrics ( & self ) -> String {
2022-09-09 00:01:36 +03:00
let globals = HashMap ::new ( ) ;
// TODO: what globals? should this be the hostname or what?
// globals.insert("service", "web3_proxy");
2023-01-03 04:06:36 +03:00
#[ derive(Default, Serialize) ]
struct UserCount ( i64 ) ;
let user_count : UserCount = if let Some ( db ) = self . db_conn ( ) {
match user ::Entity ::find ( ) . count ( & db ) . await {
Ok ( user_count ) = > UserCount ( user_count as i64 ) ,
Err ( err ) = > {
warn! ( " unable to count users: {:?} " , err ) ;
UserCount ( - 1 )
}
}
} else {
UserCount ( - 1 )
} ;
2022-12-29 09:21:09 +03:00
#[ derive(Default, Serialize) ]
2022-12-29 00:50:34 +03:00
struct RecentCounts {
2023-01-03 03:34:32 +03:00
one_week : i64 ,
2022-12-29 00:50:34 +03:00
one_day : i64 ,
one_hour : i64 ,
one_minute : i64 ,
}
2022-12-28 09:11:18 +03:00
2022-12-29 00:50:34 +03:00
impl RecentCounts {
fn for_err ( ) -> Self {
Self {
2023-01-03 03:34:32 +03:00
one_week : - 1 ,
2022-12-29 00:50:34 +03:00
one_day : - 1 ,
one_hour : - 1 ,
one_minute : - 1 ,
}
2022-12-28 09:11:18 +03:00
}
2022-12-29 00:50:34 +03:00
}
2022-12-28 09:11:18 +03:00
2022-12-29 09:21:09 +03:00
let ( recent_ip_counts , recent_user_id_counts , recent_tx_counts ) : (
RecentCounts ,
RecentCounts ,
RecentCounts ,
) = match self . redis_conn ( ) . await {
Ok ( Some ( mut redis_conn ) ) = > {
// TODO: delete any hash entries where
const ONE_MINUTE : i64 = 60 ;
const ONE_HOUR : i64 = ONE_MINUTE * 60 ;
const ONE_DAY : i64 = ONE_HOUR * 24 ;
2023-01-03 03:34:32 +03:00
const ONE_WEEK : i64 = ONE_DAY * 7 ;
2022-12-29 09:21:09 +03:00
2023-01-03 03:34:32 +03:00
let one_week_ago = Utc ::now ( ) . timestamp ( ) - ONE_WEEK ;
2022-12-29 09:21:09 +03:00
let one_day_ago = Utc ::now ( ) . timestamp ( ) - ONE_DAY ;
let one_hour_ago = Utc ::now ( ) . timestamp ( ) - ONE_HOUR ;
let one_minute_ago = Utc ::now ( ) . timestamp ( ) - ONE_MINUTE ;
let recent_users_by_id = format! ( " recent_users:id: {} " , self . config . chain_id ) ;
let recent_users_by_ip = format! ( " recent_users:ip: {} " , self . config . chain_id ) ;
let recent_transactions =
format! ( " eth_sendRawTransaction: {} " , self . config . chain_id ) ;
match redis ::pipe ( )
. atomic ( )
2023-01-03 03:34:32 +03:00
// delete any entries older than 1 week
. zrembyscore ( & recent_users_by_id , i64 ::MIN , one_week_ago )
2022-12-29 09:21:09 +03:00
. ignore ( )
2023-01-03 03:34:32 +03:00
. zrembyscore ( & recent_users_by_ip , i64 ::MIN , one_week_ago )
2022-12-29 09:21:09 +03:00
. ignore ( )
2023-01-03 03:34:32 +03:00
. zrembyscore ( & recent_transactions , i64 ::MIN , one_week_ago )
2022-12-29 09:21:09 +03:00
. ignore ( )
2023-01-03 03:34:32 +03:00
// get counts for last week
. zcount ( & recent_users_by_id , one_week_ago , i64 ::MAX )
. zcount ( & recent_users_by_ip , one_week_ago , i64 ::MAX )
. zcount ( & recent_transactions , one_week_ago , i64 ::MAX )
2022-12-29 09:21:09 +03:00
// get counts for last day
. zcount ( & recent_users_by_id , one_day_ago , i64 ::MAX )
. zcount ( & recent_users_by_ip , one_day_ago , i64 ::MAX )
. zcount ( & recent_transactions , one_day_ago , i64 ::MAX )
// get counts for last hour
. zcount ( & recent_users_by_id , one_hour_ago , i64 ::MAX )
. zcount ( & recent_users_by_ip , one_hour_ago , i64 ::MAX )
. zcount ( & recent_transactions , one_hour_ago , i64 ::MAX )
// get counts for last minute
. zcount ( & recent_users_by_id , one_minute_ago , i64 ::MAX )
. zcount ( & recent_users_by_ip , one_minute_ago , i64 ::MAX )
. zcount ( & recent_transactions , one_minute_ago , i64 ::MAX )
. query_async ( & mut redis_conn )
. await
{
Ok ( (
2023-01-03 03:34:32 +03:00
user_id_in_week ,
ip_in_week ,
txs_in_week ,
2022-12-29 09:21:09 +03:00
user_id_in_day ,
ip_in_day ,
txs_in_day ,
user_id_in_hour ,
ip_in_hour ,
txs_in_hour ,
user_id_in_minute ,
ip_in_minute ,
txs_in_minute ,
) ) = > {
let recent_user_id_counts = RecentCounts {
2023-01-03 03:34:32 +03:00
one_week : user_id_in_week ,
2022-12-29 09:21:09 +03:00
one_day : user_id_in_day ,
one_hour : user_id_in_hour ,
one_minute : user_id_in_minute ,
} ;
let recent_ip_counts = RecentCounts {
2023-01-03 03:34:32 +03:00
one_week : ip_in_week ,
2022-12-29 09:21:09 +03:00
one_day : ip_in_day ,
one_hour : ip_in_hour ,
one_minute : ip_in_minute ,
} ;
let recent_tx_counts = RecentCounts {
2023-01-03 03:34:32 +03:00
one_week : txs_in_week ,
2022-12-29 09:21:09 +03:00
one_day : txs_in_day ,
one_hour : txs_in_hour ,
one_minute : txs_in_minute ,
} ;
( recent_ip_counts , recent_user_id_counts , recent_tx_counts )
}
Err ( err ) = > {
warn! ( " unable to count recent users: {} " , err ) ;
(
RecentCounts ::for_err ( ) ,
RecentCounts ::for_err ( ) ,
RecentCounts ::for_err ( ) ,
)
2022-12-29 00:50:34 +03:00
}
}
2022-12-29 09:21:09 +03:00
}
Ok ( None ) = > (
RecentCounts ::default ( ) ,
RecentCounts ::default ( ) ,
RecentCounts ::default ( ) ,
) ,
Err ( err ) = > {
warn! ( " unable to connect to redis while counting users: {:?} " , err ) ;
(
RecentCounts ::for_err ( ) ,
RecentCounts ::for_err ( ) ,
RecentCounts ::for_err ( ) ,
)
}
} ;
2022-12-28 09:11:18 +03:00
2023-01-07 01:02:06 +03:00
// app.pending_transactions.sync();
// app.rpc_secret_key_cache.sync();
// "pending_transactions_count": app.pending_transactions.entry_count(),
// "pending_transactions_size": app.pending_transactions.weighted_size(),
// "user_cache_count": app.rpc_secret_key_cache.entry_count(),
// "user_cache_size": app.rpc_secret_key_cache.weighted_size(),
2022-09-09 06:53:16 +03:00
#[ derive(Serialize) ]
2023-02-06 05:16:09 +03:00
struct CombinedMetrics {
2022-12-29 00:50:34 +03:00
recent_ip_counts : RecentCounts ,
2022-12-29 09:21:09 +03:00
recent_user_id_counts : RecentCounts ,
recent_tx_counts : RecentCounts ,
2023-01-03 04:06:36 +03:00
user_count : UserCount ,
2022-09-09 06:53:16 +03:00
}
let metrics = CombinedMetrics {
2022-12-29 00:50:34 +03:00
recent_ip_counts ,
2022-12-29 09:21:09 +03:00
recent_user_id_counts ,
recent_tx_counts ,
2023-01-03 04:06:36 +03:00
user_count ,
2022-09-09 06:53:16 +03:00
} ;
2023-02-06 20:55:27 +03:00
// TODO: i don't like this library. it doesn't include HELP or TYPE lines and so our prometheus server fails to parse it
2022-09-10 05:59:07 +03:00
serde_prometheus ::to_string ( & metrics , Some ( " web3_proxy " ) , globals )
. expect ( " prometheus metrics should always serialize " )
2022-05-29 22:33:10 +03:00
}
2022-07-22 22:30:39 +03:00
/// send the request or batch of requests to the approriate RPCs
2022-05-12 02:50:52 +03:00
pub async fn proxy_web3_rpc (
2022-09-09 00:01:36 +03:00
self : & Arc < Self > ,
2022-11-08 22:58:11 +03:00
authorization : Arc < Authorization > ,
2022-05-12 02:50:52 +03:00
request : JsonRpcRequestEnum ,
2023-02-06 20:55:27 +03:00
) -> Result < ( JsonRpcForwardedResponseEnum , Vec < Arc < Web3Rpc > > ) , FrontendErrorResponse > {
2023-01-17 09:54:40 +03:00
// trace!(?request, "proxy_web3_rpc");
2022-05-12 02:50:52 +03:00
2022-05-22 07:22:30 +03:00
// even though we have timeouts on the requests to our backend providers,
2022-07-22 22:30:39 +03:00
// we need a timeout for the incoming request so that retries don't run forever
// TODO: take this as an optional argument. per user max? expiration time instead of duration?
let max_time = Duration ::from_secs ( 120 ) ;
2022-05-12 02:50:52 +03:00
let response = match request {
2022-12-20 08:37:12 +03:00
JsonRpcRequestEnum ::Single ( request ) = > {
let ( response , rpcs ) = timeout (
2022-09-22 23:27:14 +03:00
max_time ,
2023-03-03 04:39:50 +03:00
self . proxy_cached_request ( & authorization , request , None ) ,
2022-09-22 23:27:14 +03:00
)
2022-12-20 08:37:12 +03:00
. await ? ? ;
( JsonRpcForwardedResponseEnum ::Single ( response ) , rpcs )
}
JsonRpcRequestEnum ::Batch ( requests ) = > {
let ( responses , rpcs ) = timeout (
2022-09-22 23:27:14 +03:00
max_time ,
2023-03-03 04:39:50 +03:00
self . proxy_web3_rpc_requests ( & authorization , requests ) ,
2022-09-22 23:27:14 +03:00
)
2022-12-20 08:37:12 +03:00
. await ? ? ;
2022-05-12 02:50:52 +03:00
2022-12-20 08:37:12 +03:00
( JsonRpcForwardedResponseEnum ::Batch ( responses ) , rpcs )
}
} ;
2022-05-17 03:56:56 +03:00
2022-05-20 08:27:18 +03:00
Ok ( response )
2022-05-12 02:50:52 +03:00
}
2022-09-09 00:01:36 +03:00
/// cut up the request and send to potentually different servers
/// TODO: make sure this isn't a problem
2022-05-12 02:50:52 +03:00
async fn proxy_web3_rpc_requests (
2022-09-09 00:01:36 +03:00
self : & Arc < Self > ,
2022-11-08 22:58:11 +03:00
authorization : & Arc < Authorization > ,
2022-05-12 02:50:52 +03:00
requests : Vec < JsonRpcRequest > ,
2023-02-06 20:55:27 +03:00
) -> Result < ( Vec < JsonRpcForwardedResponse > , Vec < Arc < Web3Rpc > > ) , FrontendErrorResponse > {
2022-12-20 08:37:12 +03:00
// TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though
2022-05-12 02:50:52 +03:00
let num_requests = requests . len ( ) ;
2022-10-10 07:15:07 +03:00
2022-12-20 08:37:12 +03:00
// TODO: spawn so the requests go in parallel? need to think about rate limiting more if we do that
// TODO: improve flattening
2023-02-15 04:41:40 +03:00
// get the head block now so that any requests that need it all use the same block
// TODO: FrontendErrorResponse that handles "no servers synced" in a consistent way
// TODO: this still has an edge condition if there is a reorg in the middle of the request!!!
let head_block_num = self
. balanced_rpcs
. head_block_num ( )
. context ( anyhow ::anyhow! ( " no servers synced " ) ) ? ;
2022-05-12 02:50:52 +03:00
let responses = join_all (
requests
. into_iter ( )
2023-02-15 04:41:40 +03:00
. map ( | request | {
2023-03-03 04:39:50 +03:00
self . proxy_cached_request ( authorization , request , Some ( head_block_num ) )
2023-02-15 04:41:40 +03:00
} )
2022-05-12 02:50:52 +03:00
. collect ::< Vec < _ > > ( ) ,
)
. await ;
2022-12-20 08:37:12 +03:00
// TODO: i'm sure this could be done better with iterators
2022-11-08 22:58:11 +03:00
// TODO: stream the response?
2022-05-12 02:50:52 +03:00
let mut collected : Vec < JsonRpcForwardedResponse > = Vec ::with_capacity ( num_requests ) ;
2023-02-28 22:01:34 +03:00
let mut collected_rpc_names : HashSet < String > = HashSet ::new ( ) ;
let mut collected_rpcs : Vec < Arc < Web3Rpc > > = vec! [ ] ;
2022-05-12 02:50:52 +03:00
for response in responses {
2022-12-20 08:37:12 +03:00
// TODO: any way to attach the tried rpcs to the error? it is likely helpful
let ( response , rpcs ) = response ? ;
collected . push ( response ) ;
2023-02-28 22:01:34 +03:00
collected_rpcs . extend ( rpcs . into_iter ( ) . filter ( | x | {
if collected_rpc_names . contains ( & x . name ) {
false
} else {
collected_rpc_names . insert ( x . name . clone ( ) ) ;
true
}
} ) ) ;
2022-05-12 02:50:52 +03:00
}
2022-12-20 08:37:12 +03:00
Ok ( ( collected , collected_rpcs ) )
2022-05-12 02:50:52 +03:00
}
2022-10-19 03:56:57 +03:00
/// TODO: i don't think we want or need this. just use app.db_conn, or maybe app.db_conn.clone() or app.db_conn.as_ref()
pub fn db_conn ( & self ) -> Option < DatabaseConnection > {
self . db_conn . clone ( )
2022-09-24 07:31:06 +03:00
}
2022-12-16 11:48:24 +03:00
pub fn db_replica ( & self ) -> Option < DatabaseReplica > {
self . db_replica . clone ( )
}
2022-12-29 09:21:09 +03:00
pub async fn redis_conn ( & self ) -> anyhow ::Result < Option < redis_rate_limiter ::RedisConnection > > {
2022-10-07 05:15:53 +03:00
match self . vredis_pool . as_ref ( ) {
2022-12-29 09:21:09 +03:00
// TODO: don't do an error. return None
None = > Ok ( None ) ,
2022-09-02 08:40:56 +03:00
Some ( redis_pool ) = > {
let redis_conn = redis_pool . get ( ) . await ? ;
2022-12-29 09:21:09 +03:00
Ok ( Some ( redis_conn ) )
2022-09-02 08:40:56 +03:00
}
}
}
2023-02-03 21:56:05 +03:00
// #[measure([ErrorCount, HitCount, ResponseTime, Throughput])]
2023-04-14 10:04:35 +03:00
// TODO: more robust stats and kafka logic! if we use the try operator, they aren't saved!
2023-01-17 09:54:40 +03:00
async fn proxy_cached_request (
2022-09-09 00:01:36 +03:00
self : & Arc < Self > ,
2022-11-08 22:58:11 +03:00
authorization : & Arc < Authorization > ,
2022-07-09 05:23:26 +03:00
mut request : JsonRpcRequest ,
2023-02-15 04:41:40 +03:00
head_block_num : Option < U64 > ,
2023-02-06 20:55:27 +03:00
) -> Result < ( JsonRpcForwardedResponse , Vec < Arc < Web3Rpc > > ) , FrontendErrorResponse > {
2022-11-12 11:24:32 +03:00
// trace!("Received request: {:?}", request);
2022-05-12 02:50:52 +03:00
2022-11-20 01:05:51 +03:00
let request_metadata = Arc ::new ( RequestMetadata ::new ( REQUEST_PERIOD , request . num_bytes ( ) ) ? ) ;
2022-10-10 07:15:07 +03:00
2023-03-03 04:39:50 +03:00
let mut kafka_stuff = None ;
2023-03-03 08:12:35 +03:00
2023-03-03 17:58:45 +03:00
if matches! ( authorization . checks . proxy_mode , ProxyMode ::Debug ) {
if let Some ( kafka_producer ) = self . kafka_producer . clone ( ) {
let kafka_topic = " proxy_cached_request " . to_string ( ) ;
let rpc_secret_key_id = authorization
. checks
. rpc_secret_key_id
. map ( | x | x . get ( ) )
. unwrap_or_default ( ) ;
let kafka_key = rmp_serde ::to_vec ( & rpc_secret_key_id ) ? ;
let request_bytes = rmp_serde ::to_vec ( & request ) ? ;
let request_hash = Some ( keccak256 ( & request_bytes ) ) ;
let chain_id = self . config . chain_id ;
// another item is added with the response, so initial_capacity is +1 what is needed here
let kafka_headers = OwnedHeaders ::new_with_capacity ( 4 )
. insert ( Header {
key : " request_hash " ,
value : request_hash . as_ref ( ) ,
} )
. insert ( Header {
key : " head_block_num " ,
value : head_block_num . map ( | x | x . to_string ( ) ) . as_ref ( ) ,
} )
. insert ( Header {
key : " chain_id " ,
value : Some ( & chain_id . to_le_bytes ( ) ) ,
} ) ;
// save the key and headers for when we log the response
kafka_stuff = Some ( (
kafka_topic . clone ( ) ,
kafka_key . clone ( ) ,
kafka_headers . clone ( ) ,
) ) ;
2023-03-03 04:39:50 +03:00
2023-03-03 17:58:45 +03:00
let f = async move {
let produce_future = kafka_producer . send (
FutureRecord ::to ( & kafka_topic )
. key ( & kafka_key )
. payload ( & request_bytes )
. headers ( kafka_headers ) ,
Duration ::from_secs ( 0 ) ,
) ;
2023-03-03 04:39:50 +03:00
2023-03-03 17:58:45 +03:00
if let Err ( ( err , _ ) ) = produce_future . await {
error! ( " produce kafka request log: {} " , err ) ;
// TODO: re-queue the msg?
}
} ;
2023-03-03 04:39:50 +03:00
2023-03-03 17:58:45 +03:00
tokio ::spawn ( f ) ;
}
2023-03-03 04:39:50 +03:00
}
2022-09-06 20:56:49 +03:00
// save the id so we can attach it to the response
2022-11-08 22:58:11 +03:00
// TODO: instead of cloning, take the id out?
2022-09-07 06:54:16 +03:00
let request_id = request . id . clone ( ) ;
2022-12-14 22:03:42 +03:00
let request_method = request . method . clone ( ) ;
2022-09-06 20:56:49 +03:00
2022-05-31 04:55:04 +03:00
// TODO: if eth_chainId or net_version, serve those without querying the backend
2022-09-22 02:50:55 +03:00
// TODO: don't clone?
2022-12-14 22:03:42 +03:00
let partial_response : serde_json ::Value = match request_method . as_ref ( ) {
2022-07-09 05:23:26 +03:00
// lots of commands are blocked
2023-02-03 21:56:05 +03:00
method @ ( " db_getHex "
2022-07-09 05:23:26 +03:00
| " db_getString "
| " db_putHex "
| " db_putString "
2023-02-12 12:22:53 +03:00
| " debug_accountRange "
| " debug_backtraceAt "
| " debug_blockProfile "
2022-06-14 09:54:19 +03:00
| " debug_chaindbCompact "
2023-02-12 12:22:53 +03:00
| " debug_chaindbProperty "
| " debug_cpuProfile "
| " debug_freeOSMemory "
2022-06-14 09:54:19 +03:00
| " debug_freezeClient "
2023-02-12 12:22:53 +03:00
| " debug_gcStats "
2022-06-14 09:54:19 +03:00
| " debug_goTrace "
2023-02-12 12:22:53 +03:00
| " debug_memStats "
2022-06-14 09:54:19 +03:00
| " debug_mutexProfile "
| " debug_setBlockProfileRate "
| " debug_setGCPercent "
| " debug_setHead "
| " debug_setMutexProfileFraction "
| " debug_standardTraceBlockToFile "
| " debug_standardTraceBadBlockToFile "
| " debug_startCPUProfile "
| " debug_startGoTrace "
| " debug_stopCPUProfile "
| " debug_stopGoTrace "
| " debug_writeBlockProfile "
| " debug_writeMemProfile "
| " debug_writeMutexProfile "
2022-07-09 05:23:26 +03:00
| " eth_compileLLL "
| " eth_compileSerpent "
| " eth_compileSolidity "
| " eth_getCompilers "
| " eth_sendTransaction "
| " eth_sign "
| " eth_signTransaction "
| " eth_submitHashrate "
| " eth_submitWork "
2022-11-21 20:49:41 +03:00
| " erigon_cacheCheck "
2022-06-14 09:54:19 +03:00
| " les_addBalance "
| " les_setClientParams "
| " les_setDefaultParams "
| " miner_setExtra "
| " miner_setGasPrice "
| " miner_start "
| " miner_stop "
| " miner_setEtherbase "
| " miner_setGasLimit "
| " personal_importRawKey "
| " personal_listAccounts "
| " personal_lockAccount "
| " personal_newAccount "
| " personal_unlockAccount "
| " personal_sendTransaction "
| " personal_sign "
2022-07-09 05:23:26 +03:00
| " personal_ecRecover "
| " shh_addToGroup "
| " shh_getFilterChanges "
| " shh_getMessages "
| " shh_hasIdentity "
| " shh_newFilter "
| " shh_newGroup "
| " shh_newIdentity "
| " shh_post "
| " shh_uninstallFilter "
2022-09-09 00:01:36 +03:00
| " shh_version " ) = > {
2022-08-13 01:12:46 +03:00
// TODO: client error stat
2023-01-24 19:07:10 +03:00
// TODO: what error code?
return Ok ( (
JsonRpcForwardedResponse ::from_string (
format! ( " method unsupported: {} " , method ) ,
None ,
Some ( request_id ) ,
) ,
vec! [ ] ,
) ) ;
2022-07-09 05:23:26 +03:00
}
// TODO: implement these commands
2022-09-09 00:01:36 +03:00
method @ ( " eth_getFilterChanges "
2022-07-09 05:23:26 +03:00
| " eth_getFilterLogs "
| " eth_newBlockFilter "
| " eth_newFilter "
| " eth_newPendingTransactionFilter "
2023-02-02 00:56:43 +03:00
| " eth_pollSubscriptions "
2022-09-09 00:01:36 +03:00
| " eth_uninstallFilter " ) = > {
2022-08-13 01:12:46 +03:00
// TODO: unsupported command stat
2023-01-24 19:07:10 +03:00
// TODO: what error code?
return Ok ( (
JsonRpcForwardedResponse ::from_string (
format! ( " not yet implemented: {} " , method ) ,
None ,
Some ( request_id ) ,
) ,
vec! [ ] ,
) ) ;
2022-08-13 01:12:46 +03:00
}
2023-04-14 10:04:35 +03:00
method @ ( " debug_bundler_sendBundleNow "
| " debug_bundler_clearState "
| " debug_bundler_dumpMempool " ) = > {
return Ok ( (
JsonRpcForwardedResponse ::from_string (
// TODO: we should probably have some escaping on this. but maybe serde will protect us enough
format! ( " method unsupported: {} " , method ) ,
None ,
Some ( request_id ) ,
) ,
vec! [ ] ,
) ) ;
}
_method @ ( " eth_sendUserOperation "
| " eth_estimateUserOperationGas "
| " eth_getUserOperationByHash "
| " eth_getUserOperationReceipt "
| " eth_supportedEntryPoints " ) = > match self . bundler_4337_rpcs . as_ref ( ) {
Some ( bundler_4337_rpcs ) = > {
let response = bundler_4337_rpcs
. try_proxy_connection (
authorization ,
request ,
Some ( & request_metadata ) ,
None ,
None ,
)
. await ? ;
// TODO: DRY
let rpcs = request_metadata . backend_requests . lock ( ) . clone ( ) ;
if let Some ( stat_sender ) = self . stat_sender . as_ref ( ) {
let response_stat = ProxyResponseStat ::new (
request_method ,
authorization . clone ( ) ,
request_metadata ,
response . num_bytes ( ) ,
) ;
stat_sender
. send_async ( response_stat . into ( ) )
. await
. context ( " stat_sender sending bundler_4337 response stat " ) ? ;
}
return Ok ( ( response , rpcs ) ) ;
}
None = > {
// TODO: stats!
return Err ( anyhow ::anyhow! ( " no bundler_4337_rpcs available " ) . into ( ) ) ;
}
} ,
2022-07-09 05:23:26 +03:00
// some commands can use local data or caches
2022-08-13 01:12:46 +03:00
" eth_accounts " = > {
// no stats on this. its cheap
serde_json ::Value ::Array ( vec! [ ] )
}
2022-07-09 05:23:26 +03:00
" eth_blockNumber " = > {
2023-02-15 04:41:40 +03:00
match head_block_num . or ( self . balanced_rpcs . head_block_num ( ) ) {
2022-09-01 08:58:55 +03:00
Some ( head_block_num ) = > {
json! ( head_block_num )
}
None = > {
// TODO: what does geth do if this happens?
2023-02-03 21:56:05 +03:00
// TODO: i think we want a 502 so that haproxy retries on another server
return Err (
anyhow ::anyhow! ( " no servers synced. unknown eth_blockNumber " ) . into ( ) ,
) ;
2022-09-01 08:58:55 +03:00
}
2022-07-09 05:23:26 +03:00
}
}
2023-02-16 08:19:24 +03:00
" eth_chainId " = > json! ( U64 ::from ( self . config . chain_id ) ) ,
2022-07-09 05:23:26 +03:00
// TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle)
// TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject)
// TODO: eth_sendPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_sendprivatetransaction)
" eth_coinbase " = > {
2022-07-22 22:30:39 +03:00
// no need for serving coinbase
2022-08-13 01:12:46 +03:00
// no stats on this. its cheap
2022-07-22 22:30:39 +03:00
json! ( Address ::zero ( ) )
2022-07-09 05:23:26 +03:00
}
2022-12-14 22:03:42 +03:00
" eth_estimateGas " = > {
let mut response = self
. balanced_rpcs
2023-01-20 05:08:53 +03:00
. try_proxy_connection (
2022-12-14 22:03:42 +03:00
authorization ,
request ,
Some ( & request_metadata ) ,
None ,
2023-02-11 07:45:57 +03:00
None ,
2022-12-14 22:03:42 +03:00
)
. await ? ;
2023-01-20 05:08:53 +03:00
let mut gas_estimate : U256 = if let Some ( gas_estimate ) = response . result . take ( ) {
serde_json ::from_str ( gas_estimate . get ( ) )
. context ( " gas estimate result is not an U256 " ) ?
2022-12-14 22:03:42 +03:00
} else {
// i think this is always an error response
2022-12-20 08:37:12 +03:00
let rpcs = request_metadata . backend_requests . lock ( ) . clone ( ) ;
2023-04-14 10:04:35 +03:00
// TODO! save stats
2022-12-20 08:37:12 +03:00
return Ok ( ( response , rpcs ) ) ;
2022-12-14 22:03:42 +03:00
} ;
2023-01-20 05:08:53 +03:00
let gas_increase =
if let Some ( gas_increase_percent ) = self . config . gas_increase_percent {
let gas_increase = gas_estimate * gas_increase_percent / U256 ::from ( 100 ) ;
let min_gas_increase = self . config . gas_increase_min . unwrap_or_default ( ) ;
gas_increase . max ( min_gas_increase )
} else {
self . config . gas_increase_min . unwrap_or_default ( )
} ;
gas_estimate + = gas_increase ;
2022-12-14 22:03:42 +03:00
2023-01-20 05:08:53 +03:00
json! ( gas_estimate )
2022-12-14 22:03:42 +03:00
}
2022-07-09 05:23:26 +03:00
// TODO: eth_gasPrice that does awesome magic to predict the future
" eth_hashrate " = > {
2022-08-13 01:12:46 +03:00
// no stats on this. its cheap
2023-02-16 08:19:24 +03:00
json! ( U64 ::zero ( ) )
2022-06-30 03:52:04 +03:00
}
2022-07-09 05:23:26 +03:00
" eth_mining " = > {
2022-08-13 01:12:46 +03:00
// no stats on this. its cheap
2023-02-16 08:16:33 +03:00
serde_json ::Value ::Bool ( false )
2022-07-09 05:23:26 +03:00
}
// TODO: eth_sendBundle (flashbots command)
// broadcast transactions to all private rpcs at once
2022-08-09 19:54:05 +03:00
" eth_sendRawTransaction " = > {
2023-01-17 09:54:40 +03:00
// TODO: how should we handle private_mode here?
2023-03-03 04:39:50 +03:00
let default_num = match authorization . checks . proxy_mode {
2023-01-17 09:54:40 +03:00
// TODO: how many balanced rpcs should we send to? configurable? percentage of total?
2023-03-03 04:39:50 +03:00
ProxyMode ::Best | ProxyMode ::Debug = > Some ( 4 ) ,
2023-01-17 09:54:40 +03:00
ProxyMode ::Fastest ( 0 ) = > None ,
// TODO: how many balanced rpcs should we send to? configurable? percentage of total?
// TODO: what if we do 2 per tier? we want to blast the third party rpcs
2023-02-06 20:55:27 +03:00
// TODO: maybe having the third party rpcs in their own Web3Rpcs would be good for this
2023-01-24 12:58:31 +03:00
ProxyMode ::Fastest ( x ) = > Some ( x * 4 ) ,
2023-01-17 09:54:40 +03:00
ProxyMode ::Versus = > None ,
} ;
2023-01-12 01:51:01 +03:00
let ( private_rpcs , num ) = if let Some ( private_rpcs ) = self . private_rpcs . as_ref ( ) {
2023-03-02 02:21:09 +03:00
if ! private_rpcs . is_empty ( ) & & authorization . checks . private_txs {
2023-01-17 09:54:40 +03:00
// if we are sending the transaction privately, no matter the proxy_mode, we send to ALL private rpcs
2023-01-12 01:51:01 +03:00
( private_rpcs , None )
} else {
2023-02-09 22:47:56 +03:00
// TODO: send to balanced_rpcs AND private_rpcs
2023-01-17 09:54:40 +03:00
( & self . balanced_rpcs , default_num )
2023-01-12 01:51:01 +03:00
}
} else {
2023-01-17 09:54:40 +03:00
( & self . balanced_rpcs , default_num )
2023-01-12 01:51:01 +03:00
} ;
2022-09-14 07:27:18 +03:00
2023-02-15 04:41:40 +03:00
let head_block_num = head_block_num
. or ( self . balanced_rpcs . head_block_num ( ) )
. ok_or_else ( | | anyhow ::anyhow! ( " no servers synced " ) ) ? ;
// TODO: error/wait if no head block!
2023-02-11 07:24:20 +03:00
2022-12-24 04:32:58 +03:00
// try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here.
2023-03-02 02:21:09 +03:00
// TODO: what lag should we allow?
2022-12-20 08:37:12 +03:00
let mut response = private_rpcs
2023-01-17 09:54:40 +03:00
. try_send_all_synced_connections (
2022-11-08 22:58:11 +03:00
authorization ,
2022-12-24 04:32:58 +03:00
& request ,
2022-12-20 08:37:12 +03:00
Some ( request_metadata . clone ( ) ) ,
2023-03-02 02:21:09 +03:00
Some ( & head_block_num . saturating_sub ( 2. into ( ) ) ) ,
2023-02-11 07:45:57 +03:00
None ,
2022-12-21 08:55:12 +03:00
Level ::Trace ,
2023-01-12 01:51:01 +03:00
num ,
2023-02-09 22:47:56 +03:00
true ,
2022-10-11 22:58:25 +03:00
)
2022-12-20 08:37:12 +03:00
. await ? ;
2022-12-24 04:32:58 +03:00
// sometimes we get an error that the transaction is already known by our nodes,
// that's not really an error. Just return the hash like a successful response would.
if let Some ( response_error ) = response . error . as_ref ( ) {
if response_error . code = = - 32000
& & ( response_error . message = = " ALREADY_EXISTS: already known "
| | response_error . message
= = " INTERNAL_ERROR: existing tx with same hash " )
{
let params = request
. params
. context ( " there must be params if we got this far " ) ? ;
let params = params
. as_array ( )
. context ( " there must be an array if we got this far " ) ?
. get ( 0 )
. context ( " there must be an item if we got this far " ) ?
. as_str ( )
. context ( " there must be a string if we got this far " ) ? ;
let params = Bytes ::from_str ( params )
. expect ( " there must be Bytes if we got this far " ) ;
let rlp = Rlp ::new ( params . as_ref ( ) ) ;
if let Ok ( tx ) = Transaction ::decode ( & rlp ) {
let tx_hash = json! ( tx . hash ( ) ) ;
2023-01-02 21:34:16 +03:00
trace! ( " tx_hash: {:#?} " , tx_hash ) ;
2022-12-24 04:32:58 +03:00
let tx_hash = to_raw_value ( & tx_hash ) . unwrap ( ) ;
response . error = None ;
response . result = Some ( tx_hash ) ;
}
}
}
2022-12-20 08:37:12 +03:00
let rpcs = request_metadata . backend_requests . lock ( ) . clone ( ) ;
2023-01-12 01:51:01 +03:00
// emit stats
2022-12-29 09:21:09 +03:00
if let Some ( salt ) = self . config . public_recent_ips_salt . as_ref ( ) {
if let Some ( tx_hash ) = response . result . clone ( ) {
let now = Utc ::now ( ) . timestamp ( ) ;
let salt = salt . clone ( ) ;
let app = self . clone ( ) ;
let f = async move {
match app . redis_conn ( ) . await {
Ok ( Some ( mut redis_conn ) ) = > {
let salted_tx_hash = format! ( " {} : {} " , salt , tx_hash ) ;
let hashed_tx_hash =
Bytes ::from ( keccak256 ( salted_tx_hash . as_bytes ( ) ) ) ;
let recent_tx_hash_key =
format! ( " eth_sendRawTransaction: {} " , app . config . chain_id ) ;
redis_conn
. zadd ( recent_tx_hash_key , hashed_tx_hash . to_string ( ) , now )
. await ? ;
}
Ok ( None ) = > { }
Err ( err ) = > {
warn! (
" unable to save stats for eth_sendRawTransaction: {:?} " ,
err
)
}
}
Ok ::< _ , anyhow ::Error > ( ( ) )
} ;
tokio ::spawn ( f ) ;
}
}
2022-12-20 08:37:12 +03:00
return Ok ( ( response , rpcs ) ) ;
2022-08-09 19:54:05 +03:00
}
2022-07-09 05:23:26 +03:00
" eth_syncing " = > {
2022-08-13 01:12:46 +03:00
// no stats on this. its cheap
2022-07-09 05:23:26 +03:00
// TODO: return a real response if all backends are syncing or if no servers in sync
2023-02-16 08:16:33 +03:00
serde_json ::Value ::Bool ( false )
2022-07-09 05:23:26 +03:00
}
2022-12-22 23:05:15 +03:00
" eth_subscribe " = > {
2023-01-24 19:07:10 +03:00
return Ok ( (
2023-01-25 09:45:20 +03:00
JsonRpcForwardedResponse ::from_str (
" notifications not supported. eth_subscribe is only available over a websocket " ,
2023-01-24 19:07:10 +03:00
Some ( - 32601 ) ,
Some ( request_id ) ,
) ,
vec! [ ] ,
2022-12-22 23:05:15 +03:00
) ) ;
}
" eth_unsubscribe " = > {
2023-01-24 19:07:10 +03:00
return Ok ( (
2023-01-25 09:45:20 +03:00
JsonRpcForwardedResponse ::from_str (
" notifications not supported. eth_unsubscribe is only available over a websocket " ,
2023-01-24 19:07:10 +03:00
Some ( - 32601 ) ,
Some ( request_id ) ,
) ,
vec! [ ] ,
2022-12-22 23:05:15 +03:00
) ) ;
}
2022-07-09 05:23:26 +03:00
" net_listening " = > {
2022-08-13 01:12:46 +03:00
// no stats on this. its cheap
2022-07-09 05:23:26 +03:00
// TODO: only if there are some backends on balanced_rpcs?
2023-02-16 08:16:33 +03:00
serde_json ::Value ::Bool ( true )
2022-07-09 05:23:26 +03:00
}
2022-08-13 01:12:46 +03:00
" net_peerCount " = > {
2023-01-17 09:54:40 +03:00
// no stats on this. its cheap
// TODO: do something with proxy_mode here?
2023-02-16 08:19:24 +03:00
json! ( U64 ::from ( self . balanced_rpcs . num_synced_rpcs ( ) ) )
2022-08-13 01:12:46 +03:00
}
" web3_clientVersion " = > {
// no stats on this. its cheap
serde_json ::Value ::String ( APP_USER_AGENT . to_string ( ) )
}
2022-07-22 22:30:39 +03:00
" web3_sha3 " = > {
// returns Keccak-256 (not the standardized SHA3-256) of the given data.
match & request . params {
Some ( serde_json ::Value ::Array ( params ) ) = > {
2022-08-10 05:37:34 +03:00
// TODO: make a struct and use serde conversion to clean this up
2023-03-09 20:32:30 +03:00
if params . len ( ) ! = 1
| | ! params . get ( 0 ) . map ( | x | x . is_string ( ) ) . unwrap_or ( false )
{
2023-01-24 19:07:10 +03:00
// TODO: what error code?
return Ok ( (
JsonRpcForwardedResponse ::from_str (
" Invalid request " ,
Some ( - 32600 ) ,
Some ( request_id ) ,
) ,
vec! [ ] ,
) ) ;
2022-07-22 22:30:39 +03:00
}
2022-07-09 05:23:26 +03:00
2022-09-20 06:26:12 +03:00
let param = Bytes ::from_str (
params [ 0 ]
. as_str ( )
. context ( " parsing params 0 into str then bytes " ) ? ,
2023-02-03 21:56:05 +03:00
)
. map_err ( | x | {
2023-02-06 05:16:09 +03:00
trace! ( " bad request: {:?} " , x ) ;
2023-02-03 21:56:05 +03:00
FrontendErrorResponse ::BadRequest (
" param 0 could not be read as H256 " . to_string ( ) ,
)
} ) ? ;
2022-07-09 05:23:26 +03:00
2022-07-22 22:30:39 +03:00
let hash = H256 ::from ( keccak256 ( param ) ) ;
2022-07-09 05:23:26 +03:00
2022-07-22 22:30:39 +03:00
json! ( hash )
}
2022-09-10 03:58:33 +03:00
_ = > {
// TODO: this needs the correct error code in the response
2022-10-12 00:31:34 +03:00
// TODO: emit stat?
2023-01-24 19:07:10 +03:00
return Ok ( (
JsonRpcForwardedResponse ::from_str (
" invalid request " ,
None ,
Some ( request_id ) ,
) ,
vec! [ ] ,
) ) ;
2022-09-10 03:58:33 +03:00
}
2022-07-22 22:30:39 +03:00
}
2022-07-09 05:23:26 +03:00
}
2023-01-24 19:07:10 +03:00
" test " = > {
return Ok ( (
JsonRpcForwardedResponse ::from_str (
" The method test does not exist/is not available. " ,
Some ( - 32601 ) ,
Some ( request_id ) ,
) ,
vec! [ ] ,
) ) ;
}
2022-07-22 22:30:39 +03:00
// anything else gets sent to backend rpcs and cached
2022-06-30 03:52:04 +03:00
method = > {
2023-02-03 21:56:05 +03:00
if method . starts_with ( " admin_ " ) {
// TODO: emit a stat? will probably just be noise
return Err ( FrontendErrorResponse ::AccessDenied ) ;
}
2023-01-23 09:02:08 +03:00
// TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server
2023-02-15 04:41:40 +03:00
let head_block_num = head_block_num
. or ( self . balanced_rpcs . head_block_num ( ) )
2022-09-01 08:58:55 +03:00
. context ( " no servers synced " ) ? ;
2022-07-16 07:13:02 +03:00
2022-07-09 05:23:26 +03:00
// we do this check before checking caches because it might modify the request params
2022-07-16 08:48:02 +03:00
// TODO: add a stat for archive vs full since they should probably cost different
2022-12-17 07:05:01 +03:00
// TODO: this cache key can be rather large. is that okay?
let cache_key : Option < ResponseCacheKey > = match block_needed (
2022-11-08 22:58:11 +03:00
authorization ,
2022-09-22 02:50:55 +03:00
method ,
request . params . as_mut ( ) ,
2023-01-23 09:02:08 +03:00
head_block_num ,
2022-09-22 02:50:55 +03:00
& self . balanced_rpcs ,
)
2022-09-30 07:18:18 +03:00
. await ?
2022-09-05 09:13:36 +03:00
{
2022-12-17 07:05:01 +03:00
BlockNeeded ::CacheSuccessForever = > Some ( ResponseCacheKey {
2023-01-31 19:30:24 +03:00
from_block : None ,
to_block : None ,
2022-12-17 07:05:01 +03:00
method : method . to_string ( ) ,
params : request . params . clone ( ) ,
cache_errors : false ,
} ) ,
BlockNeeded ::CacheNever = > None ,
BlockNeeded ::Cache {
block_num ,
cache_errors ,
} = > {
2023-02-06 04:58:03 +03:00
let ( request_block_hash , block_depth ) = self
2022-12-17 07:05:01 +03:00
. balanced_rpcs
. block_hash ( authorization , & block_num )
. await ? ;
2023-02-06 04:58:03 +03:00
if block_depth < self . config . archive_depth {
2022-12-17 07:05:01 +03:00
request_metadata
. archive_request
. store ( true , atomic ::Ordering ::Relaxed ) ;
}
2022-12-03 08:31:03 +03:00
2022-12-17 07:05:01 +03:00
let request_block = self
. balanced_rpcs
. block ( authorization , & request_block_hash , None )
. await ? ;
Some ( ResponseCacheKey {
2023-02-14 23:14:50 +03:00
from_block : Some ( request_block ) ,
2023-01-31 19:30:24 +03:00
to_block : None ,
method : method . to_string ( ) ,
// TODO: hash here?
params : request . params . clone ( ) ,
cache_errors ,
} )
}
BlockNeeded ::CacheRange {
from_block_num ,
to_block_num ,
cache_errors ,
} = > {
2023-02-06 04:58:03 +03:00
let ( from_block_hash , block_depth ) = self
2023-01-31 19:30:24 +03:00
. balanced_rpcs
. block_hash ( authorization , & from_block_num )
. await ? ;
2023-02-06 04:58:03 +03:00
if block_depth < self . config . archive_depth {
2023-01-31 19:30:24 +03:00
request_metadata
. archive_request
. store ( true , atomic ::Ordering ::Relaxed ) ;
}
let from_block = self
. balanced_rpcs
. block ( authorization , & from_block_hash , None )
. await ? ;
let ( to_block_hash , _ ) = self
. balanced_rpcs
. block_hash ( authorization , & to_block_num )
. await ? ;
let to_block = self
. balanced_rpcs
. block ( authorization , & to_block_hash , None )
. await ? ;
Some ( ResponseCacheKey {
2023-02-14 23:14:50 +03:00
from_block : Some ( from_block ) ,
to_block : Some ( to_block ) ,
2022-12-17 07:05:01 +03:00
method : method . to_string ( ) ,
// TODO: hash here?
params : request . params . clone ( ) ,
cache_errors ,
} )
}
2022-06-14 07:04:14 +03:00
} ;
2023-02-27 09:59:42 +03:00
trace! ( " cache_key: {:#?} " , cache_key ) ;
2022-06-14 07:04:14 +03:00
2022-10-03 23:02:05 +03:00
let mut response = {
2022-10-11 22:58:25 +03:00
let request_metadata = request_metadata . clone ( ) ;
2022-10-03 23:02:05 +03:00
2022-11-08 22:58:11 +03:00
let authorization = authorization . clone ( ) ;
2022-10-10 07:15:07 +03:00
2022-12-17 07:05:01 +03:00
if let Some ( cache_key ) = cache_key {
2023-02-14 23:14:50 +03:00
let from_block_num = cache_key . from_block . as_ref ( ) . map ( | x | * x . number ( ) ) ;
let to_block_num = cache_key . to_block . as_ref ( ) . map ( | x | * x . number ( ) ) ;
2022-12-17 07:05:01 +03:00
self . response_cache
. try_get_with ( cache_key , async move {
2023-01-17 09:54:40 +03:00
// TODO: put the hash here instead of the block number? its in the request already.
2022-12-17 07:05:01 +03:00
let mut response = self
. balanced_rpcs
2023-01-17 09:54:40 +03:00
. try_proxy_connection (
2022-12-17 07:05:01 +03:00
& authorization ,
request ,
Some ( & request_metadata ) ,
2023-01-31 19:30:24 +03:00
from_block_num . as_ref ( ) ,
2023-02-11 07:45:57 +03:00
to_block_num . as_ref ( ) ,
2022-12-17 07:05:01 +03:00
)
. await ? ;
// discard their id by replacing it with an empty
response . id = Default ::default ( ) ;
2022-12-24 04:32:58 +03:00
// TODO: only cache the inner response
2023-02-09 22:47:56 +03:00
// TODO: how are we going to stream this?
2023-02-11 07:45:57 +03:00
// TODO: check response size. if its very large, return it in a custom Error type that bypasses caching? or will moka do that for us?
2022-12-17 07:05:01 +03:00
Ok ::< _ , anyhow ::Error > ( response )
} )
. await
// TODO: what is the best way to handle an Arc here?
. map_err ( | err | {
// TODO: emit a stat for an error
2023-01-13 00:56:39 +03:00
anyhow ::anyhow! (
" error while caching and forwarding response: {} " ,
err
)
} ) ?
2022-12-17 07:05:01 +03:00
} else {
2022-12-24 04:32:58 +03:00
self . balanced_rpcs
2023-01-17 09:54:40 +03:00
. try_proxy_connection (
2022-12-17 07:05:01 +03:00
& authorization ,
request ,
Some ( & request_metadata ) ,
None ,
2023-02-11 07:45:57 +03:00
None ,
2022-12-17 07:05:01 +03:00
)
2023-01-17 09:54:40 +03:00
. await ?
2022-12-17 07:05:01 +03:00
}
2022-10-03 23:02:05 +03:00
} ;
2022-10-11 22:58:25 +03:00
// since this data came likely out of a cache, the id is not going to match
// replace the id with our request's id.
response . id = request_id ;
2022-12-20 08:37:12 +03:00
// TODO: DRY!
let rpcs = request_metadata . backend_requests . lock ( ) . clone ( ) ;
2022-11-08 22:58:11 +03:00
if let Some ( stat_sender ) = self . stat_sender . as_ref ( ) {
2022-10-03 23:02:05 +03:00
let response_stat = ProxyResponseStat ::new (
method . to_string ( ) ,
2022-11-08 22:58:11 +03:00
authorization . clone ( ) ,
2022-10-10 07:15:07 +03:00
request_metadata ,
2022-11-20 01:05:51 +03:00
response . num_bytes ( ) ,
2022-10-03 23:02:05 +03:00
) ;
2022-10-21 02:50:23 +03:00
stat_sender
. send_async ( response_stat . into ( ) )
. await
. context ( " stat_sender sending response_stat " ) ? ;
2022-10-03 23:02:05 +03:00
}
2022-05-29 04:23:58 +03:00
2022-12-20 08:37:12 +03:00
return Ok ( ( response , rpcs ) ) ;
2022-06-14 07:04:14 +03:00
}
2022-07-22 22:30:39 +03:00
} ;
2022-09-07 06:54:16 +03:00
let response = JsonRpcForwardedResponse ::from_value ( partial_response , request_id ) ;
2022-07-22 22:30:39 +03:00
2022-12-20 08:37:12 +03:00
// TODO: DRY
let rpcs = request_metadata . backend_requests . lock ( ) . clone ( ) ;
2022-11-08 22:58:11 +03:00
if let Some ( stat_sender ) = self . stat_sender . as_ref ( ) {
let response_stat = ProxyResponseStat ::new (
2022-12-14 22:03:42 +03:00
request_method ,
2022-11-08 22:58:11 +03:00
authorization . clone ( ) ,
request_metadata ,
2022-11-20 01:05:51 +03:00
response . num_bytes ( ) ,
2022-11-08 22:58:11 +03:00
) ;
2022-10-12 00:31:34 +03:00
2022-10-21 02:50:23 +03:00
stat_sender
. send_async ( response_stat . into ( ) )
. await
. context ( " stat_sender sending response stat " ) ? ;
2022-10-12 00:31:34 +03:00
}
2023-03-03 13:54:52 +03:00
if let Some ( ( kafka_topic , kafka_key , kafka_headers ) ) = kafka_stuff {
2023-03-03 04:39:50 +03:00
let kafka_producer = self
. kafka_producer
. clone ( )
. expect ( " if headers are set, producer must exist " ) ;
let response_bytes =
rmp_serde ::to_vec ( & response ) . context ( " failed msgpack serialize response " ) ? ;
let f = async move {
let produce_future = kafka_producer . send (
2023-03-03 13:54:52 +03:00
FutureRecord ::to ( & kafka_topic )
2023-03-03 04:39:50 +03:00
. key ( & kafka_key )
. payload ( & response_bytes )
. headers ( kafka_headers ) ,
Duration ::from_secs ( 0 ) ,
) ;
2023-03-03 17:58:45 +03:00
if let Err ( ( err , _ ) ) = produce_future . await {
error! ( " produce kafka response log: {} " , err ) ;
2023-03-03 04:39:50 +03:00
}
} ;
tokio ::spawn ( f ) ;
}
2022-12-20 08:37:12 +03:00
Ok ( ( response , rpcs ) )
2022-05-12 02:50:52 +03:00
}
}
2022-08-24 03:59:05 +03:00
impl fmt ::Debug for Web3ProxyApp {
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f . debug_struct ( " Web3ProxyApp " ) . finish_non_exhaustive ( )
}
}