2022-07-26 07:53:38 +03:00
use anyhow ::Context ;
2022-05-29 22:33:10 +03:00
use axum ::extract ::ws ::Message ;
2022-06-14 09:42:52 +03:00
use dashmap ::mapref ::entry ::Entry as DashMapEntry ;
2022-05-16 22:15:40 +03:00
use dashmap ::DashMap ;
2022-07-22 22:30:39 +03:00
use ethers ::core ::utils ::keccak256 ;
2022-07-19 04:31:12 +03:00
use ethers ::prelude ::{ Address , Block , BlockNumber , Bytes , Transaction , TxHash , H256 , U64 } ;
2022-06-16 23:57:48 +03:00
use futures ::future ::Abortable ;
2022-05-30 04:28:22 +03:00
use futures ::future ::{ join_all , AbortHandle } ;
2022-06-14 08:43:28 +03:00
use futures ::stream ::FuturesUnordered ;
2022-05-30 04:28:22 +03:00
use futures ::stream ::StreamExt ;
2022-06-16 20:51:49 +03:00
use futures ::Future ;
2022-05-16 01:02:14 +03:00
use linkedhashmap ::LinkedHashMap ;
2022-08-03 03:27:26 +03:00
use migration ::{ Migrator , MigratorTrait } ;
2022-05-16 01:02:14 +03:00
use parking_lot ::RwLock ;
2022-07-09 02:02:32 +03:00
use redis_cell_client ::bb8 ::ErrorSink ;
2022-07-07 06:29:47 +03:00
use redis_cell_client ::{ bb8 , RedisCellClient , RedisConnectionManager } ;
2022-08-06 03:07:12 +03:00
use sea_orm ::DatabaseConnection ;
2022-05-30 21:23:55 +03:00
use serde_json ::json ;
2022-05-12 02:50:52 +03:00
use std ::fmt ;
2022-07-22 22:30:39 +03:00
use std ::mem ::size_of_val ;
2022-06-16 20:51:49 +03:00
use std ::pin ::Pin ;
2022-07-14 02:25:01 +03:00
use std ::str ::FromStr ;
2022-05-30 21:51:19 +03:00
use std ::sync ::atomic ::{ self , AtomicUsize } ;
2022-05-12 02:50:52 +03:00
use std ::sync ::Arc ;
use std ::time ::Duration ;
2022-06-16 05:53:37 +03:00
use tokio ::sync ::{ broadcast , watch } ;
2022-06-14 08:43:28 +03:00
use tokio ::task ::JoinHandle ;
2022-05-29 04:23:58 +03:00
use tokio ::time ::timeout ;
2022-06-16 23:57:48 +03:00
use tokio_stream ::wrappers ::{ BroadcastStream , WatchStream } ;
2022-07-22 22:30:39 +03:00
use tracing ::{ debug , info , info_span , instrument , trace , warn , Instrument } ;
2022-05-12 02:50:52 +03:00
2022-07-09 02:02:32 +03:00
use crate ::bb8_helpers ;
use crate ::config ::AppConfig ;
2022-06-16 05:53:37 +03:00
use crate ::connections ::Web3Connections ;
2022-07-14 02:25:01 +03:00
use crate ::firewall ::check_firewall_raw ;
2022-06-16 05:53:37 +03:00
use crate ::jsonrpc ::JsonRpcForwardedResponse ;
use crate ::jsonrpc ::JsonRpcForwardedResponseEnum ;
use crate ::jsonrpc ::JsonRpcRequest ;
use crate ::jsonrpc ::JsonRpcRequestEnum ;
2022-07-09 05:23:26 +03:00
// TODO: make this customizable?
2022-05-12 02:50:52 +03:00
static APP_USER_AGENT : & str = concat! (
" satoshiandkin/ " ,
env! ( " CARGO_PKG_NAME " ) ,
" / " ,
env! ( " CARGO_PKG_VERSION " ) ,
) ;
2022-07-09 05:23:26 +03:00
// block hash, method, params
type CacheKey = ( H256 , String , Option < String > ) ;
2022-05-16 22:15:40 +03:00
2022-07-16 03:08:22 +03:00
// TODO: make something more advanced that keeps track of cache size in bytes
2022-05-22 07:22:30 +03:00
type ResponseLrcCache = RwLock < LinkedHashMap < CacheKey , JsonRpcForwardedResponse > > ;
2022-05-12 02:50:52 +03:00
2022-05-21 01:16:15 +03:00
type ActiveRequestsMap = DashMap < CacheKey , watch ::Receiver < bool > > ;
2022-06-14 08:43:28 +03:00
pub type AnyhowJoinHandle < T > = JoinHandle < anyhow ::Result < T > > ;
2022-07-16 03:08:22 +03:00
/// flatten a JoinError into an anyhow error
2022-06-14 08:43:28 +03:00
pub async fn flatten_handle < T > ( handle : AnyhowJoinHandle < T > ) -> anyhow ::Result < T > {
match handle . await {
Ok ( Ok ( result ) ) = > Ok ( result ) ,
Ok ( Err ( err ) ) = > Err ( err ) ,
Err ( err ) = > Err ( err . into ( ) ) ,
}
}
2022-07-16 03:08:22 +03:00
/// return the first error or okay if everything worked
2022-06-16 23:57:48 +03:00
pub async fn flatten_handles < T > (
mut handles : FuturesUnordered < AnyhowJoinHandle < T > > ,
2022-06-16 20:51:49 +03:00
) -> anyhow ::Result < ( ) > {
while let Some ( x ) = handles . next ( ) . await {
match x {
Err ( e ) = > return Err ( e . into ( ) ) ,
Ok ( Err ( e ) ) = > return Err ( e ) ,
2022-07-16 03:08:22 +03:00
Ok ( Ok ( _ ) ) = > continue ,
2022-06-16 20:51:49 +03:00
}
}
Ok ( ( ) )
}
2022-07-19 04:31:12 +03:00
fn block_num_to_u64 ( block_num : BlockNumber , latest_block : U64 ) -> ( bool , U64 ) {
2022-07-16 07:13:02 +03:00
match block_num {
2022-07-19 04:31:12 +03:00
BlockNumber ::Earliest = > ( false , U64 ::zero ( ) ) ,
2022-07-16 07:13:02 +03:00
BlockNumber ::Latest = > {
// change "latest" to a number
( true , latest_block )
2022-07-16 03:35:54 +03:00
}
2022-07-19 04:31:12 +03:00
BlockNumber ::Number ( x ) = > ( false , x ) ,
2022-07-16 07:13:02 +03:00
// TODO: think more about how to handle Pending
BlockNumber ::Pending = > ( false , latest_block ) ,
2022-07-09 05:23:26 +03:00
}
2022-07-16 07:13:02 +03:00
}
2022-08-04 01:23:10 +03:00
fn clean_block_number (
2022-07-16 07:13:02 +03:00
params : & mut serde_json ::Value ,
block_param_id : usize ,
2022-07-19 04:31:12 +03:00
latest_block : U64 ,
) -> anyhow ::Result < U64 > {
2022-07-16 07:13:02 +03:00
match params . as_array_mut ( ) {
None = > Err ( anyhow ::anyhow! ( " params not an array " ) ) ,
Some ( params ) = > match params . get_mut ( block_param_id ) {
None = > {
if params . len ( ) ! = block_param_id - 1 {
return Err ( anyhow ::anyhow! ( " unexpected params length " ) ) ;
}
// add the latest block number to the end of the params
2022-07-19 04:31:12 +03:00
params . push ( serde_json ::to_value ( latest_block ) ? ) ;
2022-07-16 07:13:02 +03:00
Ok ( latest_block )
}
Some ( x ) = > {
// convert the json value to a BlockNumber
2022-07-19 04:31:12 +03:00
let block_num : BlockNumber = serde_json ::from_value ( x . clone ( ) ) ? ;
2022-07-16 07:13:02 +03:00
let ( modified , block_num ) = block_num_to_u64 ( block_num , latest_block ) ;
// if we changed "latest" to a number, update the params to match
if modified {
2022-07-19 04:31:12 +03:00
* x = serde_json ::to_value ( block_num ) ? ;
2022-07-16 07:13:02 +03:00
}
Ok ( block_num )
}
} ,
}
}
2022-07-21 02:49:29 +03:00
// TODO: change this to return also return the hash needed
2022-08-04 01:23:10 +03:00
fn block_needed (
2022-07-16 07:13:02 +03:00
method : & str ,
params : Option < & mut serde_json ::Value > ,
2022-07-22 22:30:39 +03:00
head_block : U64 ,
2022-07-19 04:31:12 +03:00
) -> Option < U64 > {
let params = params ? ;
2022-07-16 07:13:02 +03:00
// TODO: double check these. i think some of the getBlock stuff will never need archive
let block_param_id = match method {
" eth_call " = > 1 ,
" eth_estimateGas " = > 1 ,
" eth_getBalance " = > 1 ,
" eth_getBlockByHash " = > {
2022-07-22 08:11:26 +03:00
// TODO: double check that any node can serve this
2022-07-19 04:31:12 +03:00
return None ;
2022-07-16 07:13:02 +03:00
}
2022-07-16 08:48:02 +03:00
" eth_getBlockByNumber " = > {
2022-07-22 08:11:26 +03:00
// TODO: double check that any node can serve this
2022-07-19 04:31:12 +03:00
return None ;
2022-07-16 08:48:02 +03:00
}
2022-07-16 07:13:02 +03:00
" eth_getBlockTransactionCountByHash " = > {
2022-07-22 08:11:26 +03:00
// TODO: double check that any node can serve this
2022-07-19 04:31:12 +03:00
return None ;
2022-07-16 07:13:02 +03:00
}
" eth_getBlockTransactionCountByNumber " = > 0 ,
" eth_getCode " = > 1 ,
" eth_getLogs " = > {
let obj = params [ 0 ] . as_object_mut ( ) . unwrap ( ) ;
if let Some ( x ) = obj . get_mut ( " fromBlock " ) {
2022-07-19 04:31:12 +03:00
let block_num : BlockNumber = serde_json ::from_value ( x . clone ( ) ) . ok ( ) ? ;
2022-07-09 05:23:26 +03:00
2022-07-22 22:30:39 +03:00
let ( modified , block_num ) = block_num_to_u64 ( block_num , head_block ) ;
2022-07-09 05:23:26 +03:00
2022-07-19 04:31:12 +03:00
if modified {
* x = serde_json ::to_value ( block_num ) . unwrap ( ) ;
2022-07-16 07:13:02 +03:00
}
2022-07-19 04:31:12 +03:00
return Some ( block_num ) ;
2022-07-16 07:13:02 +03:00
}
2022-07-09 05:23:26 +03:00
2022-07-16 07:13:02 +03:00
if let Some ( x ) = obj . get_mut ( " toBlock " ) {
2022-07-19 04:31:12 +03:00
let block_num : BlockNumber = serde_json ::from_value ( x . clone ( ) ) . ok ( ) ? ;
2022-07-09 05:23:26 +03:00
2022-07-22 22:30:39 +03:00
let ( modified , block_num ) = block_num_to_u64 ( block_num , head_block ) ;
2022-07-09 05:23:26 +03:00
2022-07-19 04:31:12 +03:00
if modified {
* x = serde_json ::to_value ( block_num ) . unwrap ( ) ;
2022-07-16 07:13:02 +03:00
}
2022-07-19 04:31:12 +03:00
return Some ( block_num ) ;
2022-07-16 07:13:02 +03:00
}
if let Some ( x ) = obj . get ( " blockHash " ) {
// TODO: check a linkedhashmap of recent hashes
// TODO: error if fromBlock or toBlock were set
2022-07-22 22:30:39 +03:00
todo! ( " handle blockHash {} " , x ) ;
2022-07-16 07:13:02 +03:00
}
2022-07-19 04:31:12 +03:00
return None ;
2022-07-16 07:13:02 +03:00
}
" eth_getStorageAt " = > 2 ,
" eth_getTransactionByHash " = > {
// TODO: not sure how best to look these up
// try full nodes first. retry will use archive
2022-07-19 04:31:12 +03:00
return None ;
2022-07-16 07:13:02 +03:00
}
" eth_getTransactionByBlockHashAndIndex " = > {
// TODO: check a linkedhashmap of recent hashes
// try full nodes first. retry will use archive
2022-07-19 04:31:12 +03:00
return None ;
2022-07-16 07:13:02 +03:00
}
" eth_getTransactionByBlockNumberAndIndex " = > 0 ,
" eth_getTransactionCount " = > 1 ,
" eth_getTransactionReceipt " = > {
// TODO: not sure how best to look these up
// try full nodes first. retry will use archive
2022-07-19 04:31:12 +03:00
return None ;
2022-07-16 07:13:02 +03:00
}
" eth_getUncleByBlockHashAndIndex " = > {
// TODO: check a linkedhashmap of recent hashes
// try full nodes first. retry will use archive
2022-07-19 04:31:12 +03:00
return None ;
2022-07-16 07:13:02 +03:00
}
" eth_getUncleByBlockNumberAndIndex " = > 0 ,
" eth_getUncleCountByBlockHash " = > {
// TODO: check a linkedhashmap of recent hashes
// try full nodes first. retry will use archive
2022-07-19 04:31:12 +03:00
return None ;
2022-07-16 07:13:02 +03:00
}
" eth_getUncleCountByBlockNumber " = > 0 ,
_ = > {
// some other command that doesn't take block numbers as an argument
2022-07-19 04:31:12 +03:00
return None ;
2022-07-16 07:13:02 +03:00
}
} ;
2022-08-04 01:23:10 +03:00
match clean_block_number ( params , block_param_id , head_block ) {
2022-07-19 04:31:12 +03:00
Ok ( block ) = > Some ( block ) ,
Err ( err ) = > {
// TODO: seems unlikely that we will get here
// if this is incorrect, it should retry on an archive server
warn! ( ? err , " could not get block from params " ) ;
None
}
2022-07-16 07:13:02 +03:00
}
2022-07-09 05:23:26 +03:00
}
2022-08-06 03:07:12 +03:00
pub async fn get_migrated_db (
db_url : String ,
min_connections : u32 ,
) -> anyhow ::Result < DatabaseConnection > {
let mut db_opt = sea_orm ::ConnectOptions ::new ( db_url ) ;
// TODO: load all these options from the config file
// TODO: sqlx logging only in debug. way too verbose for production
db_opt
. max_connections ( 100 )
. min_connections ( min_connections )
. connect_timeout ( Duration ::from_secs ( 8 ) )
. idle_timeout ( Duration ::from_secs ( 8 ) )
. max_lifetime ( Duration ::from_secs ( 60 ) )
. sqlx_logging ( false ) ;
// .sqlx_logging_level(log::LevelFilter::Info);
let db = sea_orm ::Database ::connect ( db_opt ) . await ? ;
// TODO: if error, roll back?
Migrator ::up ( & db , None ) . await ? ;
Ok ( db )
}
2022-06-16 23:57:48 +03:00
// TODO: think more about TxState. d
2022-06-16 05:53:37 +03:00
#[ derive(Clone) ]
2022-06-14 09:42:52 +03:00
pub enum TxState {
2022-06-15 01:02:18 +03:00
Pending ( Transaction ) ,
2022-06-16 05:53:37 +03:00
Confirmed ( Transaction ) ,
2022-06-15 01:02:18 +03:00
Orphaned ( Transaction ) ,
2022-06-14 09:42:52 +03:00
}
2022-05-12 02:50:52 +03:00
/// The application
// TODO: this debug impl is way too verbose. make something smaller
// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs
2022-06-16 20:51:49 +03:00
// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard
2022-05-12 02:50:52 +03:00
pub struct Web3ProxyApp {
/// Send requests to the best server available
2022-05-13 23:50:11 +03:00
balanced_rpcs : Arc < Web3Connections > ,
2022-05-12 02:50:52 +03:00
/// Send private requests (like eth_sendRawTransaction) to all these servers
2022-05-13 23:50:11 +03:00
private_rpcs : Arc < Web3Connections > ,
2022-08-04 01:23:10 +03:00
active_requests : ActiveRequestsMap ,
2022-07-22 22:30:39 +03:00
/// bytes available to response_cache (it will be slightly larger than this)
response_cache_max_bytes : AtomicUsize ,
2022-05-22 07:22:30 +03:00
response_cache : ResponseLrcCache ,
2022-05-30 07:30:13 +03:00
// don't drop this or the sender will stop working
2022-06-16 05:53:37 +03:00
// TODO: broadcast channel instead?
2022-07-22 08:11:26 +03:00
head_block_receiver : watch ::Receiver < Arc < Block < TxHash > > > ,
2022-06-16 05:53:37 +03:00
pending_tx_sender : broadcast ::Sender < TxState > ,
2022-06-16 20:51:49 +03:00
pending_transactions : Arc < DashMap < TxHash , TxState > > ,
2022-08-04 04:10:27 +03:00
rate_limiter : Option < RedisCellClient > ,
2022-07-26 07:53:38 +03:00
db_conn : Option < sea_orm ::DatabaseConnection > ,
2022-05-12 02:50:52 +03:00
}
impl fmt ::Debug for Web3ProxyApp {
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
2022-05-13 23:50:11 +03:00
f . debug_struct ( " Web3ProxyApp " ) . finish_non_exhaustive ( )
2022-05-12 02:50:52 +03:00
}
}
impl Web3ProxyApp {
2022-08-04 04:10:27 +03:00
pub fn db_conn ( & self ) -> & sea_orm ::DatabaseConnection {
self . db_conn . as_ref ( ) . unwrap ( )
}
2022-08-04 01:23:10 +03:00
pub fn pending_transactions ( & self ) -> & DashMap < TxHash , TxState > {
2022-06-16 23:57:48 +03:00
& self . pending_transactions
}
2022-08-04 04:10:27 +03:00
pub fn rate_limiter ( & self ) -> Option < & RedisCellClient > {
self . rate_limiter . as_ref ( )
2022-07-07 06:22:09 +03:00
}
2022-07-09 02:02:32 +03:00
// TODO: should we just take the rpc config as the only arg instead?
2022-06-14 07:04:14 +03:00
pub async fn spawn (
2022-07-09 02:02:32 +03:00
app_config : AppConfig ,
num_workers : usize ,
2022-06-16 20:51:49 +03:00
) -> anyhow ::Result < (
Arc < Web3ProxyApp > ,
Pin < Box < dyn Future < Output = anyhow ::Result < ( ) > > > > ,
) > {
2022-08-03 03:27:26 +03:00
// first, we connect to mysql and make sure the latest migrations have run
2022-07-26 07:53:38 +03:00
let db_conn = if let Some ( db_url ) = app_config . shared . db_url {
2022-08-06 03:07:12 +03:00
let min_connections = num_workers . try_into ( ) ? ;
2022-07-26 07:53:38 +03:00
2022-08-06 03:07:12 +03:00
let db = get_migrated_db ( db_url , min_connections ) . await ? ;
2022-08-03 03:27:26 +03:00
2022-08-06 03:07:12 +03:00
Some ( db )
2022-07-26 07:53:38 +03:00
} else {
info! ( " no database " ) ;
None
} ;
2022-07-09 02:02:32 +03:00
let balanced_rpcs = app_config . balanced_rpcs . into_values ( ) . collect ( ) ;
let private_rpcs = if let Some ( private_rpcs ) = app_config . private_rpcs {
private_rpcs . into_values ( ) . collect ( )
} else {
vec! [ ]
} ;
// TODO: try_join_all instead?
2022-06-16 20:51:49 +03:00
let handles = FuturesUnordered ::new ( ) ;
2022-06-14 08:43:28 +03:00
2022-05-12 02:50:52 +03:00
// make a http shared client
2022-07-09 02:02:32 +03:00
// TODO: can we configure the connection pool? should we?
2022-05-12 02:50:52 +03:00
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
2022-05-22 02:34:05 +03:00
let http_client = Some (
reqwest ::ClientBuilder ::new ( )
. connect_timeout ( Duration ::from_secs ( 5 ) )
. timeout ( Duration ::from_secs ( 60 ) )
. user_agent ( APP_USER_AGENT )
. build ( ) ? ,
) ;
2022-07-22 22:30:39 +03:00
let redis_client_pool = match app_config . shared . redis_url {
2022-07-26 07:53:38 +03:00
Some ( redis_url ) = > {
info! ( " Connecting to redis on {} " , redis_url ) ;
2022-05-22 21:39:06 +03:00
2022-07-26 07:53:38 +03:00
let manager = RedisConnectionManager ::new ( redis_url ) ? ;
2022-07-09 02:02:32 +03:00
2022-07-09 03:00:31 +03:00
let min_size = num_workers as u32 ;
let max_size = min_size * 4 ;
2022-07-09 02:11:22 +03:00
// TODO: min_idle?
// TODO: set max_size based on max expected concurrent connections? set based on num_workers?
2022-07-09 02:02:32 +03:00
let builder = bb8 ::Pool ::builder ( )
. error_sink ( bb8_helpers ::RedisErrorSink . boxed_clone ( ) )
2022-07-09 03:00:31 +03:00
. min_idle ( Some ( min_size ) )
. max_size ( max_size ) ;
2022-07-09 02:02:32 +03:00
let pool = builder . build ( manager ) . await ? ;
2022-05-22 21:39:06 +03:00
2022-07-07 06:22:09 +03:00
Some ( pool )
2022-05-22 21:39:06 +03:00
}
None = > {
2022-07-26 07:53:38 +03:00
warn! ( " no redis connection " ) ;
2022-05-22 21:39:06 +03:00
None
2022-05-22 02:34:05 +03:00
}
} ;
2022-05-12 02:50:52 +03:00
2022-07-22 08:11:26 +03:00
let ( head_block_sender , head_block_receiver ) = watch ::channel ( Arc ::new ( Block ::default ( ) ) ) ;
2022-07-09 03:00:31 +03:00
// TODO: will one receiver lagging be okay? how big should this be?
2022-07-26 07:53:38 +03:00
let ( pending_tx_sender , pending_tx_receiver ) = broadcast ::channel ( 256 ) ;
// TODO: use this? it could listen for confirmed transactions and then clear pending_transactions, but the head_block_sender is doing that
drop ( pending_tx_receiver ) ;
2022-06-16 20:51:49 +03:00
2022-07-22 08:11:26 +03:00
// TODO: this will grow unbounded!! add some expiration to this. and probably move to redis
2022-06-16 20:51:49 +03:00
let pending_transactions = Arc ::new ( DashMap ::new ( ) ) ;
// TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them
// TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks.
// TODO: we should still have some sort of expiration or maximum size limit for the map
2022-06-14 07:04:14 +03:00
2022-05-12 02:50:52 +03:00
// TODO: attach context to this error
2022-06-14 08:43:28 +03:00
let ( balanced_rpcs , balanced_handle ) = Web3Connections ::spawn (
2022-07-09 02:02:32 +03:00
app_config . shared . chain_id ,
2022-05-22 02:34:05 +03:00
balanced_rpcs ,
2022-07-19 07:21:32 +03:00
http_client . clone ( ) ,
redis_client_pool . clone ( ) ,
2022-06-14 07:04:14 +03:00
Some ( head_block_sender ) ,
2022-06-16 05:53:37 +03:00
Some ( pending_tx_sender . clone ( ) ) ,
2022-06-16 20:51:49 +03:00
pending_transactions . clone ( ) ,
2022-05-22 02:34:05 +03:00
)
2022-07-26 07:53:38 +03:00
. await
. context ( " balanced rpcs " ) ? ;
2022-05-18 19:35:06 +03:00
2022-06-14 08:43:28 +03:00
handles . push ( balanced_handle ) ;
2022-05-12 02:50:52 +03:00
let private_rpcs = if private_rpcs . is_empty ( ) {
warn! ( " No private relays configured. Any transactions will be broadcast to the public mempool! " ) ;
2022-05-13 23:50:11 +03:00
balanced_rpcs . clone ( )
2022-05-12 02:50:52 +03:00
} else {
2022-06-14 07:04:14 +03:00
// TODO: attach context to this error
2022-06-14 08:43:28 +03:00
let ( private_rpcs , private_handle ) = Web3Connections ::spawn (
2022-07-09 02:02:32 +03:00
app_config . shared . chain_id ,
2022-05-22 02:34:05 +03:00
private_rpcs ,
2022-07-19 07:21:32 +03:00
http_client . clone ( ) ,
redis_client_pool . clone ( ) ,
2022-06-14 07:04:14 +03:00
// subscribing to new heads here won't work well
None ,
2022-06-14 08:43:28 +03:00
// TODO: subscribe to pending transactions on the private rpcs?
2022-06-16 20:51:49 +03:00
Some ( pending_tx_sender . clone ( ) ) ,
pending_transactions . clone ( ) ,
2022-05-22 02:34:05 +03:00
)
2022-07-26 07:53:38 +03:00
. await
. context ( " private_rpcs " ) ? ;
2022-06-14 08:43:28 +03:00
handles . push ( private_handle ) ;
private_rpcs
2022-05-12 02:50:52 +03:00
} ;
2022-07-07 06:22:09 +03:00
// TODO: how much should we allow?
2022-07-09 02:02:32 +03:00
let public_max_burst = app_config . shared . public_rate_limit_per_minute / 3 ;
2022-07-07 06:22:09 +03:00
2022-08-06 08:26:43 +03:00
let frontend_rate_limiter = redis_client_pool . as_ref ( ) . map ( | redis_client_pool | {
RedisCellClient ::new (
redis_client_pool . clone ( ) ,
2022-08-06 08:46:33 +03:00
" web3_proxy " ,
2022-08-06 08:26:43 +03:00
" frontend " ,
public_max_burst ,
app_config . shared . public_rate_limit_per_minute ,
60 ,
)
} ) ;
2022-07-07 06:22:09 +03:00
let app = Self {
2022-05-13 23:50:11 +03:00
balanced_rpcs ,
2022-05-12 02:50:52 +03:00
private_rpcs ,
2022-08-04 01:23:10 +03:00
active_requests : Default ::default ( ) ,
2022-07-22 22:30:39 +03:00
response_cache_max_bytes : AtomicUsize ::new ( app_config . shared . response_cache_max_bytes ) ,
2022-05-16 01:02:14 +03:00
response_cache : Default ::default ( ) ,
2022-05-30 04:28:22 +03:00
head_block_receiver ,
2022-06-16 05:53:37 +03:00
pending_tx_sender ,
2022-06-16 20:51:49 +03:00
pending_transactions ,
2022-08-06 08:26:43 +03:00
rate_limiter : frontend_rate_limiter ,
2022-07-26 07:53:38 +03:00
db_conn ,
2022-06-14 07:04:14 +03:00
} ;
let app = Arc ::new ( app ) ;
2022-06-14 08:43:28 +03:00
// create a handle that returns on the first error
2022-06-16 20:51:49 +03:00
// TODO: move this to a helper. i think Web3Connections needs it too
let handle = Box ::pin ( flatten_handles ( handles ) ) ;
2022-06-14 08:43:28 +03:00
Ok ( ( app , handle ) )
2022-05-12 02:50:52 +03:00
}
2022-05-29 22:33:10 +03:00
pub async fn eth_subscribe (
2022-06-14 10:13:42 +03:00
self : Arc < Self > ,
2022-05-29 22:33:10 +03:00
payload : JsonRpcRequest ,
2022-07-09 01:14:45 +03:00
subscription_count : & AtomicUsize ,
2022-05-30 04:28:22 +03:00
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
2022-07-09 01:14:45 +03:00
response_sender : flume ::Sender < Message > ,
2022-05-30 04:28:22 +03:00
) -> anyhow ::Result < ( AbortHandle , JsonRpcForwardedResponse ) > {
2022-06-14 10:13:42 +03:00
let ( subscription_abort_handle , subscription_registration ) = AbortHandle ::new_pair ( ) ;
2022-05-30 04:28:22 +03:00
2022-06-14 07:04:14 +03:00
// TODO: this only needs to be unique per connection. we don't need it globably unique
2022-07-09 01:14:45 +03:00
let subscription_id = subscription_count . fetch_add ( 1 , atomic ::Ordering ::SeqCst ) ;
2022-08-04 02:17:02 +03:00
let subscription_id = U64 ::from ( subscription_id ) ;
2022-05-30 21:23:55 +03:00
2022-06-05 23:39:58 +03:00
// save the id so we can use it in the response
let id = payload . id . clone ( ) ;
2022-07-16 03:35:54 +03:00
// TODO: calling json! on every request is probably not fast. but we can only match against
// TODO: i think we need a stricter EthSubscribeRequest type that JsonRpcRequest can turn into
2022-07-08 22:01:11 +03:00
match payload . params {
Some ( x ) if x = = json! ( [ " newHeads " ] ) = > {
2022-06-16 23:57:48 +03:00
let head_block_receiver = self . head_block_receiver . clone ( ) ;
trace! ( ? subscription_id , " new heads subscription " ) ;
tokio ::spawn ( async move {
let mut head_block_receiver = Abortable ::new (
WatchStream ::new ( head_block_receiver ) ,
subscription_registration ,
) ;
while let Some ( new_head ) = head_block_receiver . next ( ) . await {
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json! ( {
" jsonrpc " : " 2.0 " ,
" method " :" eth_subscription " ,
" params " : {
" subscription " : subscription_id ,
2022-07-22 08:11:26 +03:00
" result " : new_head . as_ref ( ) ,
2022-06-16 23:57:48 +03:00
} ,
} ) ;
let msg = Message ::Text ( serde_json ::to_string ( & msg ) . unwrap ( ) ) ;
2022-07-09 01:14:45 +03:00
if response_sender . send_async ( msg ) . await . is_err ( ) {
2022-06-16 23:57:48 +03:00
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break ;
} ;
}
trace! ( ? subscription_id , " closed new heads subscription " ) ;
} ) ;
2022-05-30 04:28:22 +03:00
}
2022-07-08 22:01:11 +03:00
Some ( x ) if x = = json! ( [ " newPendingTransactions " ] ) = > {
2022-06-16 23:57:48 +03:00
let pending_tx_receiver = self . pending_tx_sender . subscribe ( ) ;
let mut pending_tx_receiver = Abortable ::new (
BroadcastStream ::new ( pending_tx_receiver ) ,
subscription_registration ,
) ;
trace! ( ? subscription_id , " pending transactions subscription " ) ;
tokio ::spawn ( async move {
while let Some ( Ok ( new_tx_state ) ) = pending_tx_receiver . next ( ) . await {
let new_tx = match new_tx_state {
TxState ::Pending ( tx ) = > tx ,
TxState ::Confirmed ( .. ) = > continue ,
TxState ::Orphaned ( tx ) = > tx ,
} ;
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json! ( {
" jsonrpc " : " 2.0 " ,
" method " : " eth_subscription " ,
" params " : {
" subscription " : subscription_id ,
" result " : new_tx . hash ,
} ,
} ) ;
let msg = Message ::Text ( serde_json ::to_string ( & msg ) . unwrap ( ) ) ;
2022-07-09 01:14:45 +03:00
if response_sender . send_async ( msg ) . await . is_err ( ) {
2022-06-16 23:57:48 +03:00
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break ;
} ;
}
trace! ( ? subscription_id , " closed new heads subscription " ) ;
} ) ;
}
2022-07-08 22:01:11 +03:00
Some ( x ) if x = = json! ( [ " newPendingFullTransactions " ] ) = > {
2022-06-16 23:57:48 +03:00
// TODO: too much copy/pasta with newPendingTransactions
let pending_tx_receiver = self . pending_tx_sender . subscribe ( ) ;
let mut pending_tx_receiver = Abortable ::new (
BroadcastStream ::new ( pending_tx_receiver ) ,
subscription_registration ,
) ;
trace! ( ? subscription_id , " pending transactions subscription " ) ;
// TODO: do something with this handle?
tokio ::spawn ( async move {
while let Some ( Ok ( new_tx_state ) ) = pending_tx_receiver . next ( ) . await {
let new_tx = match new_tx_state {
TxState ::Pending ( tx ) = > tx ,
TxState ::Confirmed ( .. ) = > continue ,
TxState ::Orphaned ( tx ) = > tx ,
} ;
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json! ( {
" jsonrpc " : " 2.0 " ,
" method " : " eth_subscription " ,
" params " : {
" subscription " : subscription_id ,
// upstream just sends the txid, but we want to send the whole transaction
" result " : new_tx ,
} ,
} ) ;
let msg = Message ::Text ( serde_json ::to_string ( & msg ) . unwrap ( ) ) ;
2022-07-09 01:14:45 +03:00
if response_sender . send_async ( msg ) . await . is_err ( ) {
2022-06-18 10:06:54 +03:00
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break ;
} ;
}
trace! ( ? subscription_id , " closed new heads subscription " ) ;
} ) ;
}
2022-07-08 22:01:11 +03:00
Some ( x ) if x = = json! ( [ " newPendingRawTransactions " ] ) = > {
2022-06-18 10:06:54 +03:00
// TODO: too much copy/pasta with newPendingTransactions
let pending_tx_receiver = self . pending_tx_sender . subscribe ( ) ;
let mut pending_tx_receiver = Abortable ::new (
BroadcastStream ::new ( pending_tx_receiver ) ,
subscription_registration ,
) ;
trace! ( ? subscription_id , " pending transactions subscription " ) ;
// TODO: do something with this handle?
tokio ::spawn ( async move {
while let Some ( Ok ( new_tx_state ) ) = pending_tx_receiver . next ( ) . await {
let new_tx = match new_tx_state {
TxState ::Pending ( tx ) = > tx ,
TxState ::Confirmed ( .. ) = > continue ,
TxState ::Orphaned ( tx ) = > tx ,
} ;
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json! ( {
" jsonrpc " : " 2.0 " ,
" method " : " eth_subscription " ,
" params " : {
" subscription " : subscription_id ,
2022-07-08 22:01:11 +03:00
// upstream just sends the txid, but we want to send the raw transaction
2022-06-18 10:06:54 +03:00
" result " : new_tx . rlp ( ) ,
} ,
} ) ;
let msg = Message ::Text ( serde_json ::to_string ( & msg ) . unwrap ( ) ) ;
2022-07-09 01:14:45 +03:00
if response_sender . send_async ( msg ) . await . is_err ( ) {
2022-06-16 23:57:48 +03:00
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break ;
} ;
}
trace! ( ? subscription_id , " closed new heads subscription " ) ;
} ) ;
}
_ = > return Err ( anyhow ::anyhow! ( " unimplemented " ) ) ,
}
2022-05-30 04:28:22 +03:00
2022-06-14 10:13:42 +03:00
// TODO: do something with subscription_join_handle?
2022-05-30 04:28:22 +03:00
2022-07-22 22:30:39 +03:00
let response = JsonRpcForwardedResponse ::from_value ( json! ( subscription_id ) , id ) ;
2022-05-30 04:28:22 +03:00
2022-06-16 05:53:37 +03:00
// TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct?
2022-06-14 10:13:42 +03:00
Ok ( ( subscription_abort_handle , response ) )
2022-05-29 22:33:10 +03:00
}
2022-08-04 01:23:10 +03:00
pub fn balanced_rpcs ( & self ) -> & Web3Connections {
2022-05-21 01:16:15 +03:00
& self . balanced_rpcs
}
2022-08-04 01:23:10 +03:00
pub fn private_rpcs ( & self ) -> & Web3Connections {
2022-05-21 01:16:15 +03:00
& self . private_rpcs
}
2022-08-04 01:23:10 +03:00
pub fn active_requests ( & self ) -> & ActiveRequestsMap {
& self . active_requests
2022-05-21 01:16:15 +03:00
}
2022-07-22 22:30:39 +03:00
/// send the request or batch of requests to the approriate RPCs
2022-05-17 03:56:56 +03:00
#[ instrument(skip_all) ]
2022-05-12 02:50:52 +03:00
pub async fn proxy_web3_rpc (
2022-05-29 20:28:41 +03:00
& self ,
2022-05-12 02:50:52 +03:00
request : JsonRpcRequestEnum ,
2022-05-20 08:27:18 +03:00
) -> anyhow ::Result < JsonRpcForwardedResponseEnum > {
2022-07-22 22:30:39 +03:00
debug! ( ? request , " proxy_web3_rpc " ) ;
2022-05-12 02:50:52 +03:00
2022-05-22 07:22:30 +03:00
// even though we have timeouts on the requests to our backend providers,
2022-07-22 22:30:39 +03:00
// we need a timeout for the incoming request so that retries don't run forever
// TODO: take this as an optional argument. per user max? expiration time instead of duration?
let max_time = Duration ::from_secs ( 120 ) ;
// TODO: instrument this with a unique id
2022-05-22 07:22:30 +03:00
2022-05-12 02:50:52 +03:00
let response = match request {
2022-05-22 07:22:30 +03:00
JsonRpcRequestEnum ::Single ( request ) = > JsonRpcForwardedResponseEnum ::Single (
timeout ( max_time , self . proxy_web3_rpc_request ( request ) ) . await ? ? ,
) ,
JsonRpcRequestEnum ::Batch ( requests ) = > JsonRpcForwardedResponseEnum ::Batch (
timeout ( max_time , self . proxy_web3_rpc_requests ( requests ) ) . await ? ? ,
) ,
2022-05-12 02:50:52 +03:00
} ;
2022-07-22 22:30:39 +03:00
debug! ( ? response , " Forwarding response " ) ;
2022-05-17 03:56:56 +03:00
2022-05-20 08:27:18 +03:00
Ok ( response )
2022-05-12 02:50:52 +03:00
}
2022-05-17 20:15:18 +03:00
// #[instrument(skip_all)]
2022-05-12 02:50:52 +03:00
async fn proxy_web3_rpc_requests (
2022-05-29 20:28:41 +03:00
& self ,
2022-05-12 02:50:52 +03:00
requests : Vec < JsonRpcRequest > ,
) -> anyhow ::Result < Vec < JsonRpcForwardedResponse > > {
// TODO: we should probably change ethers-rs to support this directly
// we cut up the request and send to potentually different servers. this could be a problem.
// if the client needs consistent blocks, they should specify instead of assume batches work on the same
// TODO: is spawning here actually slower?
let num_requests = requests . len ( ) ;
let responses = join_all (
requests
. into_iter ( )
2022-05-29 20:28:41 +03:00
. map ( | request | self . proxy_web3_rpc_request ( request ) )
2022-05-12 02:50:52 +03:00
. collect ::< Vec < _ > > ( ) ,
)
. await ;
// TODO: i'm sure this could be done better with iterators
let mut collected : Vec < JsonRpcForwardedResponse > = Vec ::with_capacity ( num_requests ) ;
for response in responses {
2022-05-18 19:35:06 +03:00
collected . push ( response ? ) ;
2022-05-12 02:50:52 +03:00
}
Ok ( collected )
}
2022-08-04 01:23:10 +03:00
async fn cached_response (
2022-05-22 07:22:30 +03:00
& self ,
2022-07-21 02:49:29 +03:00
// TODO: accept a block hash here also?
2022-07-22 22:30:39 +03:00
min_block_needed : Option < & U64 > ,
2022-05-22 07:22:30 +03:00
request : & JsonRpcRequest ,
2022-07-19 04:31:12 +03:00
) -> anyhow ::Result < (
2022-05-22 07:22:30 +03:00
CacheKey ,
Result < JsonRpcForwardedResponse , & ResponseLrcCache > ,
2022-07-19 04:31:12 +03:00
) > {
2022-05-22 07:22:30 +03:00
// TODO: inspect the request to pick the right cache
// TODO: https://github.com/ethereum/web3.py/blob/master/web3/middleware/cache.py
2022-07-19 04:31:12 +03:00
let request_block_hash = if let Some ( min_block_needed ) = min_block_needed {
2022-07-21 02:49:29 +03:00
// TODO: maybe this should be on the app and not on balanced_rpcs
2022-08-04 01:23:10 +03:00
self . balanced_rpcs . block_hash ( min_block_needed ) . await ?
2022-07-19 04:31:12 +03:00
} else {
2022-07-21 02:49:29 +03:00
// TODO: maybe this should be on the app and not on balanced_rpcs
2022-08-04 01:23:10 +03:00
self . balanced_rpcs . head_block_hash ( )
2022-07-19 04:31:12 +03:00
} ;
2022-05-22 07:22:30 +03:00
// TODO: better key? benchmark this
let key = (
2022-07-19 04:31:12 +03:00
request_block_hash ,
2022-05-22 07:22:30 +03:00
request . method . clone ( ) ,
request . params . clone ( ) . map ( | x | x . to_string ( ) ) ,
) ;
if let Some ( response ) = self . response_cache . read ( ) . get ( & key ) {
// TODO: emit a stat
2022-07-22 22:30:57 +03:00
trace! ( ? request . method , " cache hit! " ) ;
2022-05-22 07:22:30 +03:00
// TODO: can we make references work? maybe put them in an Arc?
2022-07-19 04:31:12 +03:00
return Ok ( ( key , Ok ( response . to_owned ( ) ) ) ) ;
2022-05-22 07:22:30 +03:00
} else {
// TODO: emit a stat
2022-07-22 22:30:57 +03:00
trace! ( ? request . method , " cache miss! " ) ;
2022-05-22 07:22:30 +03:00
}
// TODO: multiple caches. if head_block_hash is None, have a persistent cache (disk backed?)
let cache = & self . response_cache ;
2022-07-19 04:31:12 +03:00
Ok ( ( key , Err ( cache ) ) )
2022-05-22 07:22:30 +03:00
}
2022-05-17 20:15:18 +03:00
// #[instrument(skip_all)]
2022-05-12 02:50:52 +03:00
async fn proxy_web3_rpc_request (
2022-05-29 20:28:41 +03:00
& self ,
2022-07-09 05:23:26 +03:00
mut request : JsonRpcRequest ,
2022-05-12 02:50:52 +03:00
) -> anyhow ::Result < JsonRpcForwardedResponse > {
trace! ( " Received request: {:?} " , request ) ;
2022-05-31 04:55:04 +03:00
// TODO: if eth_chainId or net_version, serve those without querying the backend
2022-05-22 07:22:30 +03:00
// TODO: how much should we retry? probably with a timeout and not with a count like this
// TODO: think more about this loop.
2022-06-14 07:04:14 +03:00
// // TODO: add more to this span such as
let span = info_span! ( " rpc_request " ) ;
2022-05-29 04:23:58 +03:00
// let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon)
2022-07-22 22:30:39 +03:00
let partial_response : serde_json ::Value = match request . method . as_ref ( ) {
2022-07-09 05:23:26 +03:00
// lots of commands are blocked
2022-06-14 09:54:19 +03:00
" admin_addPeer "
| " admin_datadir "
| " admin_startRPC "
| " admin_startWS "
| " admin_stopRPC "
| " admin_stopWS "
2022-07-09 05:23:26 +03:00
| " db_getHex "
| " db_getString "
| " db_putHex "
| " db_putString "
2022-06-14 09:54:19 +03:00
| " debug_chaindbCompact "
| " debug_freezeClient "
| " debug_goTrace "
| " debug_mutexProfile "
| " debug_setBlockProfileRate "
| " debug_setGCPercent "
| " debug_setHead "
| " debug_setMutexProfileFraction "
| " debug_standardTraceBlockToFile "
| " debug_standardTraceBadBlockToFile "
| " debug_startCPUProfile "
| " debug_startGoTrace "
| " debug_stopCPUProfile "
| " debug_stopGoTrace "
| " debug_writeBlockProfile "
| " debug_writeMemProfile "
| " debug_writeMutexProfile "
2022-07-09 05:23:26 +03:00
| " eth_compileLLL "
| " eth_compileSerpent "
| " eth_compileSolidity "
| " eth_getCompilers "
| " eth_sendTransaction "
| " eth_sign "
| " eth_signTransaction "
| " eth_submitHashrate "
| " eth_submitWork "
2022-06-14 09:54:19 +03:00
| " les_addBalance "
| " les_setClientParams "
| " les_setDefaultParams "
| " miner_setExtra "
| " miner_setGasPrice "
| " miner_start "
| " miner_stop "
| " miner_setEtherbase "
| " miner_setGasLimit "
| " personal_importRawKey "
| " personal_listAccounts "
| " personal_lockAccount "
| " personal_newAccount "
| " personal_unlockAccount "
| " personal_sendTransaction "
| " personal_sign "
2022-07-09 05:23:26 +03:00
| " personal_ecRecover "
| " shh_addToGroup "
| " shh_getFilterChanges "
| " shh_getMessages "
| " shh_hasIdentity "
| " shh_newFilter "
| " shh_newGroup "
| " shh_newIdentity "
| " shh_post "
| " shh_uninstallFilter "
| " shh_version " = > {
2022-06-30 03:52:04 +03:00
// TODO: proper error code
2022-07-22 22:30:39 +03:00
return Err ( anyhow ::anyhow! ( " unsupported " ) ) ;
2022-07-09 05:23:26 +03:00
}
// TODO: implement these commands
" eth_getFilterChanges "
| " eth_getFilterLogs "
| " eth_newBlockFilter "
| " eth_newFilter "
| " eth_newPendingTransactionFilter "
2022-07-22 22:30:39 +03:00
| " eth_uninstallFilter " = > return Err ( anyhow ::anyhow! ( " not yet implemented " ) ) ,
2022-07-09 05:23:26 +03:00
// some commands can use local data or caches
2022-07-22 22:30:39 +03:00
" eth_accounts " = > serde_json ::Value ::Array ( vec! [ ] ) ,
2022-07-09 05:23:26 +03:00
" eth_blockNumber " = > {
2022-08-04 01:23:10 +03:00
let head_block_number = self . balanced_rpcs . head_block_num ( ) ;
2022-07-09 05:23:26 +03:00
2022-07-25 03:27:00 +03:00
// TODO: technically, block 0 is okay. i guess we should be using an option
2022-07-22 22:30:39 +03:00
if head_block_number . as_u64 ( ) = = 0 {
2022-07-09 05:23:26 +03:00
return Err ( anyhow ::anyhow! ( " no servers synced " ) ) ;
}
2022-07-22 22:30:39 +03:00
json! ( head_block_number )
2022-07-09 05:23:26 +03:00
}
// TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle)
// TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject)
// TODO: eth_sendPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_sendprivatetransaction)
" eth_coinbase " = > {
2022-07-22 22:30:39 +03:00
// no need for serving coinbase
// we could return a per-user payment address here, but then we might leak that to dapps
json! ( Address ::zero ( ) )
2022-07-09 05:23:26 +03:00
}
// TODO: eth_estimateGas using anvil?
// TODO: eth_gasPrice that does awesome magic to predict the future
" eth_hashrate " = > {
2022-07-22 22:30:39 +03:00
json! ( U64 ::zero ( ) )
2022-06-30 03:52:04 +03:00
}
2022-07-09 05:23:26 +03:00
" eth_mining " = > {
2022-07-22 22:30:39 +03:00
json! ( false )
2022-07-09 05:23:26 +03:00
}
// TODO: eth_sendBundle (flashbots command)
// broadcast transactions to all private rpcs at once
2022-07-14 02:25:01 +03:00
" eth_sendRawTransaction " = > match & request . params {
Some ( serde_json ::Value ::Array ( params ) ) = > {
2022-07-22 22:30:39 +03:00
// parsing params like this is gross. make struct and use serde to do all these checks and error handling
2022-07-14 02:25:01 +03:00
if params . len ( ) ! = 1 | | ! params [ 0 ] . is_string ( ) {
return Err ( anyhow ::anyhow! ( " invalid request " ) ) ;
}
let raw_tx = Bytes ::from_str ( params [ 0 ] . as_str ( ) . unwrap ( ) ) ? ;
if check_firewall_raw ( & raw_tx ) . await ? {
2022-07-22 22:30:39 +03:00
return self
. private_rpcs
2022-07-19 04:31:12 +03:00
. try_send_all_upstream_servers ( request , None )
2022-07-14 02:25:01 +03:00
. instrument ( span )
2022-07-22 22:30:39 +03:00
. await ;
2022-07-14 02:25:01 +03:00
} else {
2022-07-22 22:30:39 +03:00
return Err ( anyhow ::anyhow! ( " transaction blocked by firewall " ) ) ;
2022-07-14 02:25:01 +03:00
}
}
2022-07-22 22:30:39 +03:00
_ = > return Err ( anyhow ::anyhow! ( " invalid request " ) ) ,
2022-07-14 02:25:01 +03:00
} ,
2022-07-09 05:23:26 +03:00
" eth_syncing " = > {
// TODO: return a real response if all backends are syncing or if no servers in sync
2022-07-22 22:30:39 +03:00
json! ( false )
2022-07-09 05:23:26 +03:00
}
" net_listening " = > {
// TODO: only if there are some backends on balanced_rpcs?
2022-07-22 22:30:39 +03:00
json! ( true )
2022-07-09 05:23:26 +03:00
}
2022-07-22 22:30:39 +03:00
" net_peerCount " = > self . balanced_rpcs . num_synced_rpcs ( ) . into ( ) ,
" web3_clientVersion " = > serde_json ::Value ::String ( APP_USER_AGENT . to_string ( ) ) ,
" web3_sha3 " = > {
// returns Keccak-256 (not the standardized SHA3-256) of the given data.
match & request . params {
Some ( serde_json ::Value ::Array ( params ) ) = > {
if params . len ( ) ! = 1 | | ! params [ 0 ] . is_string ( ) {
return Err ( anyhow ::anyhow! ( " invalid request " ) ) ;
}
2022-07-09 05:23:26 +03:00
2022-07-22 22:30:39 +03:00
let param = Bytes ::from_str ( params [ 0 ] . as_str ( ) . unwrap ( ) ) ? ;
2022-07-09 05:23:26 +03:00
2022-07-22 22:30:39 +03:00
let hash = H256 ::from ( keccak256 ( param ) ) ;
2022-07-09 05:23:26 +03:00
2022-07-22 22:30:39 +03:00
json! ( hash )
}
_ = > return Err ( anyhow ::anyhow! ( " invalid request " ) ) ,
}
2022-07-09 05:23:26 +03:00
}
2022-07-22 22:30:39 +03:00
2022-07-09 05:23:26 +03:00
// TODO: web3_sha3?
2022-07-22 22:30:39 +03:00
// anything else gets sent to backend rpcs and cached
2022-06-30 03:52:04 +03:00
method = > {
2022-08-04 01:23:10 +03:00
let head_block_number = self . balanced_rpcs . head_block_num ( ) ;
2022-07-16 07:13:02 +03:00
2022-07-09 05:23:26 +03:00
// we do this check before checking caches because it might modify the request params
2022-07-16 08:48:02 +03:00
// TODO: add a stat for archive vs full since they should probably cost different
2022-07-19 04:31:12 +03:00
let min_block_needed =
2022-08-04 01:23:10 +03:00
block_needed ( method , request . params . as_mut ( ) , head_block_number ) ;
2022-07-22 22:30:39 +03:00
let min_block_needed = min_block_needed . as_ref ( ) ;
2022-07-19 04:31:12 +03:00
trace! ( ? min_block_needed , ? method ) ;
2022-05-16 22:15:40 +03:00
2022-07-19 04:31:12 +03:00
let ( cache_key , cache_result ) =
2022-08-04 01:23:10 +03:00
self . cached_response ( min_block_needed , & request ) . await ? ;
2022-07-16 08:48:02 +03:00
2022-07-19 04:31:12 +03:00
let response_cache = match cache_result {
Ok ( response ) = > {
2022-08-04 01:23:10 +03:00
let _ = self . active_requests . remove ( & cache_key ) ;
2022-05-29 04:23:58 +03:00
2022-07-16 08:48:02 +03:00
// TODO: if the response is cached, should it count less against the account's costs?
2022-06-14 07:04:14 +03:00
return Ok ( response ) ;
}
2022-07-19 04:31:12 +03:00
Err ( response_cache ) = > response_cache ,
2022-06-14 07:04:14 +03:00
} ;
// check if this request is already in flight
// TODO: move this logic into an IncomingRequestHandler (ActiveRequestHandler has an rpc, but this won't)
let ( incoming_tx , incoming_rx ) = watch ::channel ( true ) ;
let mut other_incoming_rx = None ;
2022-08-04 01:23:10 +03:00
match self . active_requests . entry ( cache_key . clone ( ) ) {
2022-06-14 09:42:52 +03:00
DashMapEntry ::Occupied ( entry ) = > {
2022-06-14 07:04:14 +03:00
other_incoming_rx = Some ( entry . get ( ) . clone ( ) ) ;
}
2022-06-14 09:42:52 +03:00
DashMapEntry ::Vacant ( entry ) = > {
2022-06-14 07:04:14 +03:00
entry . insert ( incoming_rx ) ;
}
2022-05-29 04:23:58 +03:00
}
2022-05-16 22:15:40 +03:00
2022-06-14 07:04:14 +03:00
if let Some ( mut other_incoming_rx ) = other_incoming_rx {
// wait for the other request to finish. it might have finished successfully or with an error
trace! ( " {:?} waiting on in-flight request " , request ) ;
let _ = other_incoming_rx . changed ( ) . await ;
// now that we've waited, lets check the cache again
if let Some ( cached ) = response_cache . read ( ) . get ( & cache_key ) {
2022-08-04 01:23:10 +03:00
let _ = self . active_requests . remove ( & cache_key ) ;
2022-06-14 07:04:14 +03:00
let _ = incoming_tx . send ( false ) ;
// TODO: emit a stat
trace! (
" {:?} cache hit after waiting for in-flight request! " ,
request
) ;
return Ok ( cached . to_owned ( ) ) ;
} else {
// TODO: emit a stat
trace! (
" {:?} cache miss after waiting for in-flight request! " ,
request
) ;
}
2022-05-13 23:50:11 +03:00
}
2022-05-16 22:15:40 +03:00
2022-06-30 03:52:04 +03:00
let response = match method {
" eth_getTransactionByHash " | " eth_getTransactionReceipt " = > {
// TODO: try_send_all serially with retries instead of parallel
self . private_rpcs
2022-07-19 04:31:12 +03:00
. try_send_all_upstream_servers ( request , min_block_needed )
2022-06-30 03:52:04 +03:00
. await ?
}
_ = > {
// TODO: retries?
self . balanced_rpcs
2022-07-19 04:31:12 +03:00
. try_send_best_upstream_server ( request , min_block_needed )
2022-06-30 03:52:04 +03:00
. await ?
}
} ;
2022-05-29 04:23:58 +03:00
2022-07-22 22:30:39 +03:00
// TODO: move this caching outside this match and cache some of the other responses?
// TODO: cache the warp::reply to save us serializing every time?
2022-07-09 05:23:26 +03:00
{
let mut response_cache = response_cache . write ( ) ;
2022-05-29 04:23:58 +03:00
2022-07-22 22:30:39 +03:00
let response_cache_max_bytes = self
. response_cache_max_bytes
. load ( atomic ::Ordering ::Acquire ) ;
// TODO: this might be too naive. not sure how much overhead the object has
let new_size = size_of_val ( & cache_key ) + size_of_val ( & response ) ;
// no item is allowed to take more than 1% of the cache
// TODO: get this from config?
if new_size < response_cache_max_bytes / 100 {
// TODO: this probably has wildly variable timings
while size_of_val ( & response_cache ) + new_size > = response_cache_max_bytes {
// TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block
response_cache . pop_front ( ) ;
}
2022-05-29 04:23:58 +03:00
2022-07-22 22:30:39 +03:00
response_cache . insert ( cache_key . clone ( ) , response . clone ( ) ) ;
} else {
// TODO: emit a stat instead?
warn! ( ? new_size , " value too large for caching " ) ;
2022-07-09 05:23:26 +03:00
}
}
2022-05-29 04:23:58 +03:00
2022-08-04 01:23:10 +03:00
let _ = self . active_requests . remove ( & cache_key ) ;
2022-06-14 07:04:14 +03:00
let _ = incoming_tx . send ( false ) ;
2022-05-29 04:23:58 +03:00
2022-07-22 22:30:39 +03:00
return Ok ( response ) ;
2022-06-14 07:04:14 +03:00
}
2022-07-22 22:30:39 +03:00
} ;
let response = JsonRpcForwardedResponse ::from_value ( partial_response , request . id ) ;
Ok ( response )
2022-05-12 02:50:52 +03:00
}
}