2022-05-29 22:33:10 +03:00
use axum ::extract ::ws ::Message ;
2022-06-14 09:42:52 +03:00
use dashmap ::mapref ::entry ::Entry as DashMapEntry ;
2022-05-16 22:15:40 +03:00
use dashmap ::DashMap ;
2022-07-19 04:31:12 +03:00
use ethers ::prelude ::{ Address , Block , BlockNumber , Bytes , Transaction , TxHash , H256 , U64 } ;
2022-06-16 23:57:48 +03:00
use futures ::future ::Abortable ;
2022-05-30 04:28:22 +03:00
use futures ::future ::{ join_all , AbortHandle } ;
2022-06-14 08:43:28 +03:00
use futures ::stream ::FuturesUnordered ;
2022-05-30 04:28:22 +03:00
use futures ::stream ::StreamExt ;
2022-06-16 20:51:49 +03:00
use futures ::Future ;
2022-05-16 01:02:14 +03:00
use linkedhashmap ::LinkedHashMap ;
use parking_lot ::RwLock ;
2022-07-09 02:02:32 +03:00
use redis_cell_client ::bb8 ::ErrorSink ;
2022-07-07 06:29:47 +03:00
use redis_cell_client ::{ bb8 , RedisCellClient , RedisConnectionManager } ;
2022-05-30 21:23:55 +03:00
use serde_json ::json ;
2022-05-12 02:50:52 +03:00
use std ::fmt ;
2022-06-16 20:51:49 +03:00
use std ::pin ::Pin ;
2022-07-14 02:25:01 +03:00
use std ::str ::FromStr ;
2022-05-30 21:51:19 +03:00
use std ::sync ::atomic ::{ self , AtomicUsize } ;
2022-05-12 02:50:52 +03:00
use std ::sync ::Arc ;
use std ::time ::Duration ;
2022-06-16 05:53:37 +03:00
use tokio ::sync ::{ broadcast , watch } ;
2022-06-14 08:43:28 +03:00
use tokio ::task ::JoinHandle ;
2022-05-29 04:23:58 +03:00
use tokio ::time ::timeout ;
2022-06-16 23:57:48 +03:00
use tokio_stream ::wrappers ::{ BroadcastStream , WatchStream } ;
2022-07-19 04:31:12 +03:00
use tracing ::{ info , info_span , instrument , trace , warn , Instrument } ;
2022-05-12 02:50:52 +03:00
2022-07-09 02:02:32 +03:00
use crate ::bb8_helpers ;
use crate ::config ::AppConfig ;
2022-06-16 05:53:37 +03:00
use crate ::connections ::Web3Connections ;
2022-07-14 02:25:01 +03:00
use crate ::firewall ::check_firewall_raw ;
2022-06-16 05:53:37 +03:00
use crate ::jsonrpc ::JsonRpcForwardedResponse ;
use crate ::jsonrpc ::JsonRpcForwardedResponseEnum ;
use crate ::jsonrpc ::JsonRpcRequest ;
use crate ::jsonrpc ::JsonRpcRequestEnum ;
2022-07-09 05:23:26 +03:00
// TODO: make this customizable?
2022-05-12 02:50:52 +03:00
static APP_USER_AGENT : & str = concat! (
" satoshiandkin/ " ,
env! ( " CARGO_PKG_NAME " ) ,
" / " ,
env! ( " CARGO_PKG_VERSION " ) ,
) ;
2022-07-09 02:02:32 +03:00
// TODO: put this in config? what size should we do? probably should structure this to be a size in MB
2022-05-16 01:02:14 +03:00
const RESPONSE_CACHE_CAP : usize = 1024 ;
2022-05-12 02:50:52 +03:00
2022-07-09 05:23:26 +03:00
// block hash, method, params
type CacheKey = ( H256 , String , Option < String > ) ;
2022-05-16 22:15:40 +03:00
2022-07-16 03:08:22 +03:00
// TODO: make something more advanced that keeps track of cache size in bytes
2022-05-22 07:22:30 +03:00
type ResponseLrcCache = RwLock < LinkedHashMap < CacheKey , JsonRpcForwardedResponse > > ;
2022-05-12 02:50:52 +03:00
2022-05-21 01:16:15 +03:00
type ActiveRequestsMap = DashMap < CacheKey , watch ::Receiver < bool > > ;
2022-06-14 08:43:28 +03:00
pub type AnyhowJoinHandle < T > = JoinHandle < anyhow ::Result < T > > ;
2022-07-16 03:08:22 +03:00
/// flatten a JoinError into an anyhow error
2022-06-14 08:43:28 +03:00
pub async fn flatten_handle < T > ( handle : AnyhowJoinHandle < T > ) -> anyhow ::Result < T > {
match handle . await {
Ok ( Ok ( result ) ) = > Ok ( result ) ,
Ok ( Err ( err ) ) = > Err ( err ) ,
Err ( err ) = > Err ( err . into ( ) ) ,
}
}
2022-07-16 03:08:22 +03:00
/// return the first error or okay if everything worked
2022-06-16 23:57:48 +03:00
pub async fn flatten_handles < T > (
mut handles : FuturesUnordered < AnyhowJoinHandle < T > > ,
2022-06-16 20:51:49 +03:00
) -> anyhow ::Result < ( ) > {
while let Some ( x ) = handles . next ( ) . await {
match x {
Err ( e ) = > return Err ( e . into ( ) ) ,
Ok ( Err ( e ) ) = > return Err ( e ) ,
2022-07-16 03:08:22 +03:00
Ok ( Ok ( _ ) ) = > continue ,
2022-06-16 20:51:49 +03:00
}
}
Ok ( ( ) )
}
2022-07-19 04:31:12 +03:00
fn block_num_to_u64 ( block_num : BlockNumber , latest_block : U64 ) -> ( bool , U64 ) {
2022-07-16 07:13:02 +03:00
match block_num {
2022-07-19 04:31:12 +03:00
BlockNumber ::Earliest = > ( false , U64 ::zero ( ) ) ,
2022-07-16 07:13:02 +03:00
BlockNumber ::Latest = > {
// change "latest" to a number
( true , latest_block )
2022-07-16 03:35:54 +03:00
}
2022-07-19 04:31:12 +03:00
BlockNumber ::Number ( x ) = > ( false , x ) ,
2022-07-16 07:13:02 +03:00
// TODO: think more about how to handle Pending
BlockNumber ::Pending = > ( false , latest_block ) ,
2022-07-09 05:23:26 +03:00
}
2022-07-16 07:13:02 +03:00
}
fn get_or_set_block_number (
params : & mut serde_json ::Value ,
block_param_id : usize ,
2022-07-19 04:31:12 +03:00
latest_block : U64 ,
) -> anyhow ::Result < U64 > {
2022-07-16 07:13:02 +03:00
match params . as_array_mut ( ) {
None = > Err ( anyhow ::anyhow! ( " params not an array " ) ) ,
Some ( params ) = > match params . get_mut ( block_param_id ) {
None = > {
if params . len ( ) ! = block_param_id - 1 {
return Err ( anyhow ::anyhow! ( " unexpected params length " ) ) ;
}
// add the latest block number to the end of the params
2022-07-19 04:31:12 +03:00
params . push ( serde_json ::to_value ( latest_block ) ? ) ;
2022-07-16 07:13:02 +03:00
Ok ( latest_block )
}
Some ( x ) = > {
// convert the json value to a BlockNumber
2022-07-19 04:31:12 +03:00
let block_num : BlockNumber = serde_json ::from_value ( x . clone ( ) ) ? ;
2022-07-16 07:13:02 +03:00
let ( modified , block_num ) = block_num_to_u64 ( block_num , latest_block ) ;
// if we changed "latest" to a number, update the params to match
if modified {
2022-07-19 04:31:12 +03:00
* x = serde_json ::to_value ( block_num ) ? ;
2022-07-16 07:13:02 +03:00
}
Ok ( block_num )
}
} ,
}
}
2022-07-21 02:49:29 +03:00
// TODO: change this to return also return the hash needed
2022-07-19 04:31:12 +03:00
fn get_min_block_needed (
2022-07-16 07:13:02 +03:00
method : & str ,
params : Option < & mut serde_json ::Value > ,
2022-07-19 04:31:12 +03:00
latest_block : U64 ,
) -> Option < U64 > {
let params = params ? ;
2022-07-16 07:13:02 +03:00
// TODO: double check these. i think some of the getBlock stuff will never need archive
let block_param_id = match method {
" eth_call " = > 1 ,
" eth_estimateGas " = > 1 ,
" eth_getBalance " = > 1 ,
" eth_getBlockByHash " = > {
2022-07-19 04:31:12 +03:00
return None ;
2022-07-16 07:13:02 +03:00
}
2022-07-16 08:48:02 +03:00
" eth_getBlockByNumber " = > {
2022-07-19 04:31:12 +03:00
return None ;
2022-07-16 08:48:02 +03:00
}
2022-07-16 07:13:02 +03:00
" eth_getBlockTransactionCountByHash " = > {
// TODO: turn block hash into number and check. will need a linkedhashmap of recent hashes
2022-07-19 04:31:12 +03:00
return None ;
2022-07-16 07:13:02 +03:00
}
" eth_getBlockTransactionCountByNumber " = > 0 ,
" eth_getCode " = > 1 ,
" eth_getLogs " = > {
let obj = params [ 0 ] . as_object_mut ( ) . unwrap ( ) ;
if let Some ( x ) = obj . get_mut ( " fromBlock " ) {
2022-07-19 04:31:12 +03:00
let block_num : BlockNumber = serde_json ::from_value ( x . clone ( ) ) . ok ( ) ? ;
2022-07-09 05:23:26 +03:00
2022-07-19 04:31:12 +03:00
let ( modified , block_num ) = block_num_to_u64 ( block_num , latest_block ) ;
2022-07-09 05:23:26 +03:00
2022-07-19 04:31:12 +03:00
if modified {
* x = serde_json ::to_value ( block_num ) . unwrap ( ) ;
2022-07-16 07:13:02 +03:00
}
2022-07-19 04:31:12 +03:00
return Some ( block_num ) ;
2022-07-16 07:13:02 +03:00
}
2022-07-09 05:23:26 +03:00
2022-07-16 07:13:02 +03:00
if let Some ( x ) = obj . get_mut ( " toBlock " ) {
2022-07-19 04:31:12 +03:00
let block_num : BlockNumber = serde_json ::from_value ( x . clone ( ) ) . ok ( ) ? ;
2022-07-09 05:23:26 +03:00
2022-07-19 04:31:12 +03:00
let ( modified , block_num ) = block_num_to_u64 ( block_num , latest_block ) ;
2022-07-09 05:23:26 +03:00
2022-07-19 04:31:12 +03:00
if modified {
* x = serde_json ::to_value ( block_num ) . unwrap ( ) ;
2022-07-16 07:13:02 +03:00
}
2022-07-19 04:31:12 +03:00
return Some ( block_num ) ;
2022-07-16 07:13:02 +03:00
}
if let Some ( x ) = obj . get ( " blockHash " ) {
// TODO: check a linkedhashmap of recent hashes
// TODO: error if fromBlock or toBlock were set
2022-07-19 04:31:12 +03:00
unimplemented! ( " handle blockHash {} " , x ) ;
2022-07-16 07:13:02 +03:00
}
2022-07-19 04:31:12 +03:00
return None ;
2022-07-16 07:13:02 +03:00
}
" eth_getStorageAt " = > 2 ,
" eth_getTransactionByHash " = > {
// TODO: not sure how best to look these up
// try full nodes first. retry will use archive
2022-07-19 04:31:12 +03:00
return None ;
2022-07-16 07:13:02 +03:00
}
" eth_getTransactionByBlockHashAndIndex " = > {
// TODO: check a linkedhashmap of recent hashes
// try full nodes first. retry will use archive
2022-07-19 04:31:12 +03:00
return None ;
2022-07-16 07:13:02 +03:00
}
" eth_getTransactionByBlockNumberAndIndex " = > 0 ,
" eth_getTransactionCount " = > 1 ,
" eth_getTransactionReceipt " = > {
// TODO: not sure how best to look these up
// try full nodes first. retry will use archive
2022-07-19 04:31:12 +03:00
return None ;
2022-07-16 07:13:02 +03:00
}
" eth_getUncleByBlockHashAndIndex " = > {
// TODO: check a linkedhashmap of recent hashes
// try full nodes first. retry will use archive
2022-07-19 04:31:12 +03:00
return None ;
2022-07-16 07:13:02 +03:00
}
" eth_getUncleByBlockNumberAndIndex " = > 0 ,
" eth_getUncleCountByBlockHash " = > {
// TODO: check a linkedhashmap of recent hashes
// try full nodes first. retry will use archive
2022-07-19 04:31:12 +03:00
return None ;
2022-07-16 07:13:02 +03:00
}
" eth_getUncleCountByBlockNumber " = > 0 ,
_ = > {
// some other command that doesn't take block numbers as an argument
2022-07-19 04:31:12 +03:00
return None ;
2022-07-16 07:13:02 +03:00
}
} ;
2022-07-19 04:31:12 +03:00
match get_or_set_block_number ( params , block_param_id , latest_block ) {
Ok ( block ) = > Some ( block ) ,
Err ( err ) = > {
// TODO: seems unlikely that we will get here
// if this is incorrect, it should retry on an archive server
warn! ( ? err , " could not get block from params " ) ;
None
}
2022-07-16 07:13:02 +03:00
}
2022-07-09 05:23:26 +03:00
}
2022-06-16 23:57:48 +03:00
// TODO: think more about TxState. d
2022-06-16 05:53:37 +03:00
#[ derive(Clone) ]
2022-06-14 09:42:52 +03:00
pub enum TxState {
2022-06-15 01:02:18 +03:00
Pending ( Transaction ) ,
2022-06-16 05:53:37 +03:00
Confirmed ( Transaction ) ,
2022-06-15 01:02:18 +03:00
Orphaned ( Transaction ) ,
2022-06-14 09:42:52 +03:00
}
2022-05-12 02:50:52 +03:00
/// The application
// TODO: this debug impl is way too verbose. make something smaller
// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs
2022-06-16 20:51:49 +03:00
// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard
2022-05-12 02:50:52 +03:00
pub struct Web3ProxyApp {
/// Send requests to the best server available
2022-05-13 23:50:11 +03:00
balanced_rpcs : Arc < Web3Connections > ,
2022-05-12 02:50:52 +03:00
/// Send private requests (like eth_sendRawTransaction) to all these servers
2022-05-13 23:50:11 +03:00
private_rpcs : Arc < Web3Connections > ,
2022-05-22 21:11:42 +03:00
incoming_requests : ActiveRequestsMap ,
2022-05-22 07:22:30 +03:00
response_cache : ResponseLrcCache ,
2022-05-30 07:30:13 +03:00
// don't drop this or the sender will stop working
2022-06-16 05:53:37 +03:00
// TODO: broadcast channel instead?
2022-05-30 07:30:13 +03:00
head_block_receiver : watch ::Receiver < Block < TxHash > > ,
2022-06-16 05:53:37 +03:00
pending_tx_sender : broadcast ::Sender < TxState > ,
2022-06-16 20:51:49 +03:00
pending_transactions : Arc < DashMap < TxHash , TxState > > ,
2022-07-07 06:22:09 +03:00
public_rate_limiter : Option < RedisCellClient > ,
2022-05-12 02:50:52 +03:00
}
impl fmt ::Debug for Web3ProxyApp {
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
2022-05-13 23:50:11 +03:00
f . debug_struct ( " Web3ProxyApp " ) . finish_non_exhaustive ( )
2022-05-12 02:50:52 +03:00
}
}
impl Web3ProxyApp {
2022-06-16 23:57:48 +03:00
pub fn get_pending_transactions ( & self ) -> & DashMap < TxHash , TxState > {
& self . pending_transactions
}
2022-07-07 06:22:09 +03:00
pub fn get_public_rate_limiter ( & self ) -> Option < & RedisCellClient > {
self . public_rate_limiter . as_ref ( )
}
2022-07-09 02:02:32 +03:00
// TODO: should we just take the rpc config as the only arg instead?
2022-06-14 07:04:14 +03:00
pub async fn spawn (
2022-07-09 02:02:32 +03:00
app_config : AppConfig ,
num_workers : usize ,
2022-06-16 20:51:49 +03:00
) -> anyhow ::Result < (
Arc < Web3ProxyApp > ,
Pin < Box < dyn Future < Output = anyhow ::Result < ( ) > > > > ,
) > {
2022-07-09 02:02:32 +03:00
let balanced_rpcs = app_config . balanced_rpcs . into_values ( ) . collect ( ) ;
let private_rpcs = if let Some ( private_rpcs ) = app_config . private_rpcs {
private_rpcs . into_values ( ) . collect ( )
} else {
vec! [ ]
} ;
// TODO: try_join_all instead?
2022-06-16 20:51:49 +03:00
let handles = FuturesUnordered ::new ( ) ;
2022-06-14 08:43:28 +03:00
2022-05-12 02:50:52 +03:00
// make a http shared client
2022-07-09 02:02:32 +03:00
// TODO: can we configure the connection pool? should we?
2022-05-12 02:50:52 +03:00
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
2022-05-22 02:34:05 +03:00
let http_client = Some (
reqwest ::ClientBuilder ::new ( )
. connect_timeout ( Duration ::from_secs ( 5 ) )
. timeout ( Duration ::from_secs ( 60 ) )
. user_agent ( APP_USER_AGENT )
. build ( ) ? ,
) ;
2022-07-09 03:00:31 +03:00
let redis_client_pool = match app_config . shared . rate_limit_redis {
2022-05-22 02:34:05 +03:00
Some ( redis_address ) = > {
2022-05-22 21:39:06 +03:00
info! ( " Connecting to redis on {} " , redis_address ) ;
2022-07-07 06:22:09 +03:00
let manager = RedisConnectionManager ::new ( redis_address ) ? ;
2022-07-09 02:02:32 +03:00
2022-07-09 03:00:31 +03:00
let min_size = num_workers as u32 ;
let max_size = min_size * 4 ;
2022-07-09 02:11:22 +03:00
// TODO: min_idle?
// TODO: set max_size based on max expected concurrent connections? set based on num_workers?
2022-07-09 02:02:32 +03:00
let builder = bb8 ::Pool ::builder ( )
. error_sink ( bb8_helpers ::RedisErrorSink . boxed_clone ( ) )
2022-07-09 03:00:31 +03:00
. min_idle ( Some ( min_size ) )
. max_size ( max_size ) ;
2022-07-09 02:02:32 +03:00
let pool = builder . build ( manager ) . await ? ;
2022-05-22 21:39:06 +03:00
2022-07-07 06:22:09 +03:00
Some ( pool )
2022-05-22 21:39:06 +03:00
}
None = > {
info! ( " No redis address " ) ;
None
2022-05-22 02:34:05 +03:00
}
} ;
2022-05-12 02:50:52 +03:00
2022-06-14 07:04:14 +03:00
let ( head_block_sender , head_block_receiver ) = watch ::channel ( Block ::default ( ) ) ;
2022-07-09 03:00:31 +03:00
// TODO: will one receiver lagging be okay? how big should this be?
2022-06-16 05:53:37 +03:00
let ( pending_tx_sender , pending_tx_receiver ) = broadcast ::channel ( 16 ) ;
2022-06-16 20:51:49 +03:00
let pending_transactions = Arc ::new ( DashMap ::new ( ) ) ;
// TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them
// TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks.
// TODO: we should still have some sort of expiration or maximum size limit for the map
2022-06-14 07:04:14 +03:00
2022-05-12 02:50:52 +03:00
// TODO: attach context to this error
2022-06-14 08:43:28 +03:00
let ( balanced_rpcs , balanced_handle ) = Web3Connections ::spawn (
2022-07-09 02:02:32 +03:00
app_config . shared . chain_id ,
2022-05-22 02:34:05 +03:00
balanced_rpcs ,
2022-07-19 07:21:32 +03:00
http_client . clone ( ) ,
redis_client_pool . clone ( ) ,
2022-06-14 07:04:14 +03:00
Some ( head_block_sender ) ,
2022-06-16 05:53:37 +03:00
Some ( pending_tx_sender . clone ( ) ) ,
2022-06-16 20:51:49 +03:00
pending_transactions . clone ( ) ,
2022-05-22 02:34:05 +03:00
)
. await ? ;
2022-05-18 19:35:06 +03:00
2022-06-14 08:43:28 +03:00
handles . push ( balanced_handle ) ;
2022-05-12 02:50:52 +03:00
let private_rpcs = if private_rpcs . is_empty ( ) {
warn! ( " No private relays configured. Any transactions will be broadcast to the public mempool! " ) ;
2022-05-13 23:50:11 +03:00
balanced_rpcs . clone ( )
2022-05-12 02:50:52 +03:00
} else {
2022-06-14 07:04:14 +03:00
// TODO: attach context to this error
2022-06-14 08:43:28 +03:00
let ( private_rpcs , private_handle ) = Web3Connections ::spawn (
2022-07-09 02:02:32 +03:00
app_config . shared . chain_id ,
2022-05-22 02:34:05 +03:00
private_rpcs ,
2022-07-19 07:21:32 +03:00
http_client . clone ( ) ,
redis_client_pool . clone ( ) ,
2022-06-14 07:04:14 +03:00
// subscribing to new heads here won't work well
None ,
2022-06-14 08:43:28 +03:00
// TODO: subscribe to pending transactions on the private rpcs?
2022-06-16 20:51:49 +03:00
Some ( pending_tx_sender . clone ( ) ) ,
pending_transactions . clone ( ) ,
2022-05-22 02:34:05 +03:00
)
2022-06-14 08:43:28 +03:00
. await ? ;
handles . push ( private_handle ) ;
private_rpcs
2022-05-12 02:50:52 +03:00
} ;
2022-06-16 23:57:48 +03:00
// TODO: use this? it could listen for confirmed transactions and then clear pending_transactions, but the head_block_sender is doing that
drop ( pending_tx_receiver ) ;
2022-07-07 06:22:09 +03:00
// TODO: how much should we allow?
2022-07-09 02:02:32 +03:00
let public_max_burst = app_config . shared . public_rate_limit_per_minute / 3 ;
2022-07-07 06:22:09 +03:00
2022-07-09 03:00:31 +03:00
let public_rate_limiter = if app_config . shared . public_rate_limit_per_minute = = 0 {
None
} else {
redis_client_pool . as_ref ( ) . map ( | redis_client_pool | {
RedisCellClient ::new (
redis_client_pool . clone ( ) ,
2022-07-09 03:02:04 +03:00
" ip " . to_string ( ) ,
2022-07-09 03:00:31 +03:00
public_max_burst ,
app_config . shared . public_rate_limit_per_minute ,
60 ,
)
} )
} ;
2022-07-07 06:22:09 +03:00
let app = Self {
2022-05-13 23:50:11 +03:00
balanced_rpcs ,
2022-05-12 02:50:52 +03:00
private_rpcs ,
2022-05-22 21:11:42 +03:00
incoming_requests : Default ::default ( ) ,
2022-05-16 01:02:14 +03:00
response_cache : Default ::default ( ) ,
2022-05-30 04:28:22 +03:00
head_block_receiver ,
2022-06-16 05:53:37 +03:00
pending_tx_sender ,
2022-06-16 20:51:49 +03:00
pending_transactions ,
2022-07-07 06:22:09 +03:00
public_rate_limiter ,
2022-06-14 07:04:14 +03:00
} ;
let app = Arc ::new ( app ) ;
2022-06-14 08:43:28 +03:00
// create a handle that returns on the first error
2022-06-16 20:51:49 +03:00
// TODO: move this to a helper. i think Web3Connections needs it too
let handle = Box ::pin ( flatten_handles ( handles ) ) ;
2022-06-14 08:43:28 +03:00
Ok ( ( app , handle ) )
2022-05-12 02:50:52 +03:00
}
2022-05-29 22:33:10 +03:00
pub async fn eth_subscribe (
2022-06-14 10:13:42 +03:00
self : Arc < Self > ,
2022-05-29 22:33:10 +03:00
payload : JsonRpcRequest ,
2022-07-09 01:14:45 +03:00
subscription_count : & AtomicUsize ,
2022-05-30 04:28:22 +03:00
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
2022-07-09 01:14:45 +03:00
response_sender : flume ::Sender < Message > ,
2022-05-30 04:28:22 +03:00
) -> anyhow ::Result < ( AbortHandle , JsonRpcForwardedResponse ) > {
2022-06-14 10:13:42 +03:00
let ( subscription_abort_handle , subscription_registration ) = AbortHandle ::new_pair ( ) ;
2022-05-30 04:28:22 +03:00
2022-06-14 07:04:14 +03:00
// TODO: this only needs to be unique per connection. we don't need it globably unique
2022-07-09 01:14:45 +03:00
let subscription_id = subscription_count . fetch_add ( 1 , atomic ::Ordering ::SeqCst ) ;
2022-05-30 21:51:19 +03:00
let subscription_id = format! ( " {:#x} " , subscription_id ) ;
2022-05-30 21:23:55 +03:00
2022-06-05 23:39:58 +03:00
// save the id so we can use it in the response
let id = payload . id . clone ( ) ;
2022-07-16 03:35:54 +03:00
// TODO: calling json! on every request is probably not fast. but we can only match against
// TODO: i think we need a stricter EthSubscribeRequest type that JsonRpcRequest can turn into
2022-07-08 22:01:11 +03:00
match payload . params {
Some ( x ) if x = = json! ( [ " newHeads " ] ) = > {
2022-06-16 23:57:48 +03:00
let head_block_receiver = self . head_block_receiver . clone ( ) ;
let subscription_id = subscription_id . clone ( ) ;
trace! ( ? subscription_id , " new heads subscription " ) ;
tokio ::spawn ( async move {
let mut head_block_receiver = Abortable ::new (
WatchStream ::new ( head_block_receiver ) ,
subscription_registration ,
) ;
while let Some ( new_head ) = head_block_receiver . next ( ) . await {
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json! ( {
" jsonrpc " : " 2.0 " ,
" method " :" eth_subscription " ,
" params " : {
" subscription " : subscription_id ,
" result " : new_head ,
} ,
} ) ;
let msg = Message ::Text ( serde_json ::to_string ( & msg ) . unwrap ( ) ) ;
2022-07-09 01:14:45 +03:00
if response_sender . send_async ( msg ) . await . is_err ( ) {
2022-06-16 23:57:48 +03:00
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break ;
} ;
}
trace! ( ? subscription_id , " closed new heads subscription " ) ;
} ) ;
2022-05-30 04:28:22 +03:00
}
2022-07-08 22:01:11 +03:00
Some ( x ) if x = = json! ( [ " newPendingTransactions " ] ) = > {
2022-06-16 23:57:48 +03:00
let pending_tx_receiver = self . pending_tx_sender . subscribe ( ) ;
2022-07-09 01:14:45 +03:00
let subscription_id = subscription_id . clone ( ) ;
2022-06-16 23:57:48 +03:00
let mut pending_tx_receiver = Abortable ::new (
BroadcastStream ::new ( pending_tx_receiver ) ,
subscription_registration ,
) ;
trace! ( ? subscription_id , " pending transactions subscription " ) ;
tokio ::spawn ( async move {
while let Some ( Ok ( new_tx_state ) ) = pending_tx_receiver . next ( ) . await {
let new_tx = match new_tx_state {
TxState ::Pending ( tx ) = > tx ,
TxState ::Confirmed ( .. ) = > continue ,
TxState ::Orphaned ( tx ) = > tx ,
} ;
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json! ( {
" jsonrpc " : " 2.0 " ,
" method " : " eth_subscription " ,
" params " : {
" subscription " : subscription_id ,
" result " : new_tx . hash ,
} ,
} ) ;
let msg = Message ::Text ( serde_json ::to_string ( & msg ) . unwrap ( ) ) ;
2022-07-09 01:14:45 +03:00
if response_sender . send_async ( msg ) . await . is_err ( ) {
2022-06-16 23:57:48 +03:00
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break ;
} ;
}
trace! ( ? subscription_id , " closed new heads subscription " ) ;
} ) ;
}
2022-07-08 22:01:11 +03:00
Some ( x ) if x = = json! ( [ " newPendingFullTransactions " ] ) = > {
2022-06-16 23:57:48 +03:00
// TODO: too much copy/pasta with newPendingTransactions
let pending_tx_receiver = self . pending_tx_sender . subscribe ( ) ;
2022-07-09 01:14:45 +03:00
let subscription_id = subscription_id . clone ( ) ;
2022-06-16 23:57:48 +03:00
let mut pending_tx_receiver = Abortable ::new (
BroadcastStream ::new ( pending_tx_receiver ) ,
subscription_registration ,
) ;
trace! ( ? subscription_id , " pending transactions subscription " ) ;
// TODO: do something with this handle?
tokio ::spawn ( async move {
while let Some ( Ok ( new_tx_state ) ) = pending_tx_receiver . next ( ) . await {
let new_tx = match new_tx_state {
TxState ::Pending ( tx ) = > tx ,
TxState ::Confirmed ( .. ) = > continue ,
TxState ::Orphaned ( tx ) = > tx ,
} ;
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json! ( {
" jsonrpc " : " 2.0 " ,
" method " : " eth_subscription " ,
" params " : {
" subscription " : subscription_id ,
// upstream just sends the txid, but we want to send the whole transaction
" result " : new_tx ,
} ,
} ) ;
let msg = Message ::Text ( serde_json ::to_string ( & msg ) . unwrap ( ) ) ;
2022-07-09 01:14:45 +03:00
if response_sender . send_async ( msg ) . await . is_err ( ) {
2022-06-18 10:06:54 +03:00
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break ;
} ;
}
trace! ( ? subscription_id , " closed new heads subscription " ) ;
} ) ;
}
2022-07-08 22:01:11 +03:00
Some ( x ) if x = = json! ( [ " newPendingRawTransactions " ] ) = > {
2022-06-18 10:06:54 +03:00
// TODO: too much copy/pasta with newPendingTransactions
let pending_tx_receiver = self . pending_tx_sender . subscribe ( ) ;
let mut pending_tx_receiver = Abortable ::new (
BroadcastStream ::new ( pending_tx_receiver ) ,
subscription_registration ,
) ;
let subscription_id = subscription_id . clone ( ) ;
trace! ( ? subscription_id , " pending transactions subscription " ) ;
// TODO: do something with this handle?
tokio ::spawn ( async move {
while let Some ( Ok ( new_tx_state ) ) = pending_tx_receiver . next ( ) . await {
let new_tx = match new_tx_state {
TxState ::Pending ( tx ) = > tx ,
TxState ::Confirmed ( .. ) = > continue ,
TxState ::Orphaned ( tx ) = > tx ,
} ;
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json! ( {
" jsonrpc " : " 2.0 " ,
" method " : " eth_subscription " ,
" params " : {
" subscription " : subscription_id ,
2022-07-08 22:01:11 +03:00
// upstream just sends the txid, but we want to send the raw transaction
2022-06-18 10:06:54 +03:00
" result " : new_tx . rlp ( ) ,
} ,
} ) ;
let msg = Message ::Text ( serde_json ::to_string ( & msg ) . unwrap ( ) ) ;
2022-07-09 01:14:45 +03:00
if response_sender . send_async ( msg ) . await . is_err ( ) {
2022-06-16 23:57:48 +03:00
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break ;
} ;
}
trace! ( ? subscription_id , " closed new heads subscription " ) ;
} ) ;
}
_ = > return Err ( anyhow ::anyhow! ( " unimplemented " ) ) ,
}
2022-05-30 04:28:22 +03:00
2022-06-14 10:13:42 +03:00
// TODO: do something with subscription_join_handle?
2022-05-30 04:28:22 +03:00
let response = JsonRpcForwardedResponse ::from_string ( subscription_id , id ) ;
2022-06-16 05:53:37 +03:00
// TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct?
2022-06-14 10:13:42 +03:00
Ok ( ( subscription_abort_handle , response ) )
2022-05-29 22:33:10 +03:00
}
2022-05-21 01:16:15 +03:00
pub fn get_balanced_rpcs ( & self ) -> & Web3Connections {
& self . balanced_rpcs
}
pub fn get_private_rpcs ( & self ) -> & Web3Connections {
& self . private_rpcs
}
pub fn get_active_requests ( & self ) -> & ActiveRequestsMap {
2022-05-22 21:11:42 +03:00
& self . incoming_requests
2022-05-21 01:16:15 +03:00
}
2022-05-12 02:50:52 +03:00
/// send the request to the approriate RPCs
/// TODO: dry this up
2022-05-17 03:56:56 +03:00
#[ instrument(skip_all) ]
2022-05-12 02:50:52 +03:00
pub async fn proxy_web3_rpc (
2022-05-29 20:28:41 +03:00
& self ,
2022-05-12 02:50:52 +03:00
request : JsonRpcRequestEnum ,
2022-05-20 08:27:18 +03:00
) -> anyhow ::Result < JsonRpcForwardedResponseEnum > {
2022-05-20 06:35:45 +03:00
// TODO: i don't always see this in the logs. why?
2022-07-07 06:22:09 +03:00
trace! ( " Received request: {:?} " , request ) ;
2022-05-12 02:50:52 +03:00
2022-05-22 07:22:30 +03:00
// even though we have timeouts on the requests to our backend providers,
// we need a timeout for the incoming request so that delays from
let max_time = Duration ::from_secs ( 60 ) ;
2022-05-12 02:50:52 +03:00
let response = match request {
2022-05-22 07:22:30 +03:00
JsonRpcRequestEnum ::Single ( request ) = > JsonRpcForwardedResponseEnum ::Single (
timeout ( max_time , self . proxy_web3_rpc_request ( request ) ) . await ? ? ,
) ,
JsonRpcRequestEnum ::Batch ( requests ) = > JsonRpcForwardedResponseEnum ::Batch (
timeout ( max_time , self . proxy_web3_rpc_requests ( requests ) ) . await ? ? ,
) ,
2022-05-12 02:50:52 +03:00
} ;
2022-05-20 06:35:45 +03:00
// TODO: i don't always see this in the logs. why?
2022-07-07 06:22:09 +03:00
trace! ( " Forwarding response: {:?} " , response ) ;
2022-05-17 03:56:56 +03:00
2022-05-20 08:27:18 +03:00
Ok ( response )
2022-05-12 02:50:52 +03:00
}
2022-05-17 20:15:18 +03:00
// #[instrument(skip_all)]
2022-05-12 02:50:52 +03:00
async fn proxy_web3_rpc_requests (
2022-05-29 20:28:41 +03:00
& self ,
2022-05-12 02:50:52 +03:00
requests : Vec < JsonRpcRequest > ,
) -> anyhow ::Result < Vec < JsonRpcForwardedResponse > > {
// TODO: we should probably change ethers-rs to support this directly
// we cut up the request and send to potentually different servers. this could be a problem.
// if the client needs consistent blocks, they should specify instead of assume batches work on the same
// TODO: is spawning here actually slower?
let num_requests = requests . len ( ) ;
let responses = join_all (
requests
. into_iter ( )
2022-05-29 20:28:41 +03:00
. map ( | request | self . proxy_web3_rpc_request ( request ) )
2022-05-12 02:50:52 +03:00
. collect ::< Vec < _ > > ( ) ,
)
. await ;
// TODO: i'm sure this could be done better with iterators
let mut collected : Vec < JsonRpcForwardedResponse > = Vec ::with_capacity ( num_requests ) ;
for response in responses {
2022-05-18 19:35:06 +03:00
collected . push ( response ? ) ;
2022-05-12 02:50:52 +03:00
}
Ok ( collected )
}
2022-07-19 04:31:12 +03:00
async fn get_cached_response (
2022-05-22 07:22:30 +03:00
& self ,
2022-07-21 02:49:29 +03:00
// TODO: accept a block hash here also?
2022-07-19 04:31:12 +03:00
min_block_needed : Option < U64 > ,
2022-05-22 07:22:30 +03:00
request : & JsonRpcRequest ,
2022-07-19 04:31:12 +03:00
) -> anyhow ::Result < (
2022-05-22 07:22:30 +03:00
CacheKey ,
Result < JsonRpcForwardedResponse , & ResponseLrcCache > ,
2022-07-19 04:31:12 +03:00
) > {
2022-05-22 07:22:30 +03:00
// TODO: inspect the request to pick the right cache
// TODO: https://github.com/ethereum/web3.py/blob/master/web3/middleware/cache.py
2022-07-19 04:31:12 +03:00
let request_block_hash = if let Some ( min_block_needed ) = min_block_needed {
2022-07-21 02:49:29 +03:00
// TODO: maybe this should be on the app and not on balanced_rpcs
self . balanced_rpcs . get_block_hash ( min_block_needed ) . await ?
2022-07-19 04:31:12 +03:00
} else {
2022-07-21 02:49:29 +03:00
// TODO: maybe this should be on the app and not on balanced_rpcs
2022-07-19 04:31:12 +03:00
self . balanced_rpcs . get_head_block_hash ( )
} ;
2022-05-22 07:22:30 +03:00
// TODO: better key? benchmark this
let key = (
2022-07-19 04:31:12 +03:00
request_block_hash ,
2022-05-22 07:22:30 +03:00
request . method . clone ( ) ,
request . params . clone ( ) . map ( | x | x . to_string ( ) ) ,
) ;
if let Some ( response ) = self . response_cache . read ( ) . get ( & key ) {
// TODO: emit a stat
2022-07-19 04:31:12 +03:00
trace! ( ? request . method , " cache hit! " ) ;
2022-05-22 07:22:30 +03:00
// TODO: can we make references work? maybe put them in an Arc?
2022-07-19 04:31:12 +03:00
return Ok ( ( key , Ok ( response . to_owned ( ) ) ) ) ;
2022-05-22 07:22:30 +03:00
} else {
// TODO: emit a stat
2022-07-19 04:31:12 +03:00
trace! ( ? request . method , " cache miss! " ) ;
2022-05-22 07:22:30 +03:00
}
// TODO: multiple caches. if head_block_hash is None, have a persistent cache (disk backed?)
let cache = & self . response_cache ;
2022-07-19 04:31:12 +03:00
Ok ( ( key , Err ( cache ) ) )
2022-05-22 07:22:30 +03:00
}
2022-05-17 20:15:18 +03:00
// #[instrument(skip_all)]
2022-05-12 02:50:52 +03:00
async fn proxy_web3_rpc_request (
2022-05-29 20:28:41 +03:00
& self ,
2022-07-09 05:23:26 +03:00
mut request : JsonRpcRequest ,
2022-05-12 02:50:52 +03:00
) -> anyhow ::Result < JsonRpcForwardedResponse > {
trace! ( " Received request: {:?} " , request ) ;
2022-05-31 04:55:04 +03:00
// TODO: if eth_chainId or net_version, serve those without querying the backend
2022-05-22 07:22:30 +03:00
// TODO: how much should we retry? probably with a timeout and not with a count like this
// TODO: think more about this loop.
2022-06-14 07:04:14 +03:00
// // TODO: add more to this span such as
let span = info_span! ( " rpc_request " ) ;
2022-05-29 04:23:58 +03:00
// let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon)
2022-07-19 04:31:12 +03:00
match request . method . as_ref ( ) {
2022-07-09 05:23:26 +03:00
// lots of commands are blocked
2022-06-14 09:54:19 +03:00
" admin_addPeer "
| " admin_datadir "
| " admin_startRPC "
| " admin_startWS "
| " admin_stopRPC "
| " admin_stopWS "
2022-07-09 05:23:26 +03:00
| " db_getHex "
| " db_getString "
| " db_putHex "
| " db_putString "
2022-06-14 09:54:19 +03:00
| " debug_chaindbCompact "
| " debug_freezeClient "
| " debug_goTrace "
| " debug_mutexProfile "
| " debug_setBlockProfileRate "
| " debug_setGCPercent "
| " debug_setHead "
| " debug_setMutexProfileFraction "
| " debug_standardTraceBlockToFile "
| " debug_standardTraceBadBlockToFile "
| " debug_startCPUProfile "
| " debug_startGoTrace "
| " debug_stopCPUProfile "
| " debug_stopGoTrace "
| " debug_writeBlockProfile "
| " debug_writeMemProfile "
| " debug_writeMutexProfile "
2022-07-09 05:23:26 +03:00
| " eth_compileLLL "
| " eth_compileSerpent "
| " eth_compileSolidity "
| " eth_getCompilers "
| " eth_sendTransaction "
| " eth_sign "
| " eth_signTransaction "
| " eth_submitHashrate "
| " eth_submitWork "
2022-06-14 09:54:19 +03:00
| " les_addBalance "
| " les_setClientParams "
| " les_setDefaultParams "
| " miner_setExtra "
| " miner_setGasPrice "
| " miner_start "
| " miner_stop "
| " miner_setEtherbase "
| " miner_setGasLimit "
| " personal_importRawKey "
| " personal_listAccounts "
| " personal_lockAccount "
| " personal_newAccount "
| " personal_unlockAccount "
| " personal_sendTransaction "
| " personal_sign "
2022-07-09 05:23:26 +03:00
| " personal_ecRecover "
| " shh_addToGroup "
| " shh_getFilterChanges "
| " shh_getMessages "
| " shh_hasIdentity "
| " shh_newFilter "
| " shh_newGroup "
| " shh_newIdentity "
| " shh_post "
| " shh_uninstallFilter "
| " shh_version " = > {
2022-06-30 03:52:04 +03:00
// TODO: proper error code
2022-07-09 05:23:26 +03:00
Err ( anyhow ::anyhow! ( " unsupported " ) )
}
// TODO: implement these commands
" eth_getFilterChanges "
| " eth_getFilterLogs "
| " eth_newBlockFilter "
| " eth_newFilter "
| " eth_newPendingTransactionFilter "
| " eth_uninstallFilter " = > Err ( anyhow ::anyhow! ( " not yet implemented " ) ) ,
// some commands can use local data or caches
" eth_accounts " = > {
let partial_response = serde_json ::Value ::Array ( vec! [ ] ) ;
let response = JsonRpcForwardedResponse ::from_value ( partial_response , request . id ) ;
Ok ( response )
}
" eth_blockNumber " = > {
let head_block_number = self . balanced_rpcs . get_head_block_num ( ) ;
if head_block_number = = 0 {
return Err ( anyhow ::anyhow! ( " no servers synced " ) ) ;
}
2022-07-09 05:33:53 +03:00
let response = JsonRpcForwardedResponse ::from_number ( head_block_number , request . id ) ;
2022-07-09 05:23:26 +03:00
Ok ( response )
}
// TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle)
// TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject)
// TODO: eth_sendPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_sendprivatetransaction)
" eth_coinbase " = > {
// no need for serving coinbase. we could return a per-user payment address here, but then we might leak that to dapps
let partial_response = json! ( Address ::zero ( ) ) ;
let response = JsonRpcForwardedResponse ::from_value ( partial_response , request . id ) ;
Ok ( response )
}
// TODO: eth_estimateGas using anvil?
// TODO: eth_gasPrice that does awesome magic to predict the future
// TODO: eth_getBlockByHash from caches
// TODO: eth_getBlockByNumber from caches
// TODO: eth_getBlockTransactionCountByHash from caches
// TODO: eth_getBlockTransactionCountByNumber from caches
// TODO: eth_getUncleCountByBlockHash from caches
// TODO: eth_getUncleCountByBlockNumber from caches
" eth_hashrate " = > {
let partial_response = json! ( " 0x0 " ) ;
let response = JsonRpcForwardedResponse ::from_value ( partial_response , request . id ) ;
Ok ( response )
2022-06-30 03:52:04 +03:00
}
2022-07-09 05:23:26 +03:00
" eth_mining " = > {
let partial_response = json! ( false ) ;
let response = JsonRpcForwardedResponse ::from_value ( partial_response , request . id ) ;
Ok ( response )
}
// TODO: eth_sendBundle (flashbots command)
// broadcast transactions to all private rpcs at once
2022-07-14 02:25:01 +03:00
" eth_sendRawTransaction " = > match & request . params {
Some ( serde_json ::Value ::Array ( params ) ) = > {
if params . len ( ) ! = 1 | | ! params [ 0 ] . is_string ( ) {
return Err ( anyhow ::anyhow! ( " invalid request " ) ) ;
}
let raw_tx = Bytes ::from_str ( params [ 0 ] . as_str ( ) . unwrap ( ) ) ? ;
if check_firewall_raw ( & raw_tx ) . await ? {
self . private_rpcs
2022-07-19 04:31:12 +03:00
. try_send_all_upstream_servers ( request , None )
2022-07-14 02:25:01 +03:00
. instrument ( span )
. await
} else {
Err ( anyhow ::anyhow! ( " transaction blocked by firewall " ) )
}
}
_ = > Err ( anyhow ::anyhow! ( " invalid request " ) ) ,
} ,
2022-07-09 05:23:26 +03:00
" eth_syncing " = > {
// TODO: return a real response if all backends are syncing or if no servers in sync
2022-07-09 05:37:58 +03:00
let partial_response = json! ( false ) ;
2022-07-09 05:23:26 +03:00
let response = JsonRpcForwardedResponse ::from_value ( partial_response , request . id ) ;
Ok ( response )
}
" net_listening " = > {
// TODO: only if there are some backends on balanced_rpcs?
2022-07-09 05:37:58 +03:00
let partial_response = json! ( true ) ;
2022-07-09 05:23:26 +03:00
let response = JsonRpcForwardedResponse ::from_value ( partial_response , request . id ) ;
Ok ( response )
}
" net_peerCount " = > {
2022-07-09 05:33:53 +03:00
let response = JsonRpcForwardedResponse ::from_number (
self . balanced_rpcs . num_synced_rpcs ( ) ,
request . id ,
) ;
2022-07-09 05:23:26 +03:00
Ok ( response )
}
" web3_clientVersion " = > {
// TODO: return a real response if all backends are syncing or if no servers in sync
let partial_response = serde_json ::Value ::String ( APP_USER_AGENT . to_string ( ) ) ;
let response = JsonRpcForwardedResponse ::from_value ( partial_response , request . id ) ;
Ok ( response )
}
// TODO: web3_sha3?
2022-06-30 03:52:04 +03:00
method = > {
2022-07-09 05:23:26 +03:00
// everything else is relayed to a backend
// this is not a private transaction
2022-07-16 07:13:02 +03:00
let head_block_number = self . balanced_rpcs . get_head_block_num ( ) ;
2022-07-09 05:23:26 +03:00
// we do this check before checking caches because it might modify the request params
2022-07-16 08:48:02 +03:00
// TODO: add a stat for archive vs full since they should probably cost different
2022-07-19 04:31:12 +03:00
let min_block_needed =
get_min_block_needed ( method , request . params . as_mut ( ) , head_block_number . into ( ) ) ;
trace! ( ? min_block_needed , ? method ) ;
2022-05-16 22:15:40 +03:00
2022-07-19 04:31:12 +03:00
let ( cache_key , cache_result ) =
self . get_cached_response ( min_block_needed , & request ) . await ? ;
2022-07-16 08:48:02 +03:00
2022-07-19 04:31:12 +03:00
let response_cache = match cache_result {
Ok ( response ) = > {
2022-06-14 07:04:14 +03:00
let _ = self . incoming_requests . remove ( & cache_key ) ;
2022-05-29 04:23:58 +03:00
2022-07-16 08:48:02 +03:00
// TODO: if the response is cached, should it count less against the account's costs?
2022-06-14 07:04:14 +03:00
return Ok ( response ) ;
}
2022-07-19 04:31:12 +03:00
Err ( response_cache ) = > response_cache ,
2022-06-14 07:04:14 +03:00
} ;
// check if this request is already in flight
// TODO: move this logic into an IncomingRequestHandler (ActiveRequestHandler has an rpc, but this won't)
let ( incoming_tx , incoming_rx ) = watch ::channel ( true ) ;
let mut other_incoming_rx = None ;
match self . incoming_requests . entry ( cache_key . clone ( ) ) {
2022-06-14 09:42:52 +03:00
DashMapEntry ::Occupied ( entry ) = > {
2022-06-14 07:04:14 +03:00
other_incoming_rx = Some ( entry . get ( ) . clone ( ) ) ;
}
2022-06-14 09:42:52 +03:00
DashMapEntry ::Vacant ( entry ) = > {
2022-06-14 07:04:14 +03:00
entry . insert ( incoming_rx ) ;
}
2022-05-29 04:23:58 +03:00
}
2022-05-16 22:15:40 +03:00
2022-06-14 07:04:14 +03:00
if let Some ( mut other_incoming_rx ) = other_incoming_rx {
// wait for the other request to finish. it might have finished successfully or with an error
trace! ( " {:?} waiting on in-flight request " , request ) ;
let _ = other_incoming_rx . changed ( ) . await ;
// now that we've waited, lets check the cache again
if let Some ( cached ) = response_cache . read ( ) . get ( & cache_key ) {
let _ = self . incoming_requests . remove ( & cache_key ) ;
let _ = incoming_tx . send ( false ) ;
// TODO: emit a stat
trace! (
" {:?} cache hit after waiting for in-flight request! " ,
request
) ;
return Ok ( cached . to_owned ( ) ) ;
} else {
// TODO: emit a stat
trace! (
" {:?} cache miss after waiting for in-flight request! " ,
request
) ;
}
2022-05-13 23:50:11 +03:00
}
2022-05-16 22:15:40 +03:00
2022-06-30 03:52:04 +03:00
let response = match method {
" eth_getTransactionByHash " | " eth_getTransactionReceipt " = > {
// TODO: try_send_all serially with retries instead of parallel
self . private_rpcs
2022-07-19 04:31:12 +03:00
. try_send_all_upstream_servers ( request , min_block_needed )
2022-06-30 03:52:04 +03:00
. await ?
}
_ = > {
// TODO: retries?
self . balanced_rpcs
2022-07-19 04:31:12 +03:00
. try_send_best_upstream_server ( request , min_block_needed )
2022-06-30 03:52:04 +03:00
. await ?
}
} ;
2022-05-29 04:23:58 +03:00
2022-07-09 05:23:26 +03:00
{
let mut response_cache = response_cache . write ( ) ;
2022-05-29 04:23:58 +03:00
2022-07-09 05:23:26 +03:00
// TODO: cache the warp::reply to save us serializing every time?
response_cache . insert ( cache_key . clone ( ) , response . clone ( ) ) ;
2022-05-29 04:23:58 +03:00
2022-07-09 05:23:26 +03:00
// TODO: instead of checking length, check size in bytes
if response_cache . len ( ) > = RESPONSE_CACHE_CAP {
// TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block
response_cache . pop_front ( ) ;
}
}
2022-05-29 04:23:58 +03:00
2022-06-14 07:04:14 +03:00
let _ = self . incoming_requests . remove ( & cache_key ) ;
let _ = incoming_tx . send ( false ) ;
2022-05-29 04:23:58 +03:00
2022-06-14 07:04:14 +03:00
Ok ( response )
}
2022-05-29 04:23:58 +03:00
}
2022-05-12 02:50:52 +03:00
}
}