2022-08-10 05:37:34 +03:00
// TODO: this file is way too big now. move things into other modules
2022-08-24 03:59:05 +03:00
use crate ::block_number ::block_needed ;
use crate ::config ::{ AppConfig , TopConfig } ;
use crate ::jsonrpc ::JsonRpcForwardedResponse ;
use crate ::jsonrpc ::JsonRpcForwardedResponseEnum ;
use crate ::jsonrpc ::JsonRpcRequest ;
use crate ::jsonrpc ::JsonRpcRequestEnum ;
2022-09-07 06:54:16 +03:00
use crate ::rpcs ::blockchain ::{ ArcBlock , BlockHashesMap , BlockId } ;
2022-08-30 23:01:42 +03:00
use crate ::rpcs ::connections ::Web3Connections ;
2022-08-24 03:59:05 +03:00
use crate ::rpcs ::transactions ::TxStatus ;
2022-07-26 07:53:38 +03:00
use anyhow ::Context ;
2022-05-29 22:33:10 +03:00
use axum ::extract ::ws ::Message ;
2022-08-10 05:37:34 +03:00
use derive_more ::From ;
2022-07-22 22:30:39 +03:00
use ethers ::core ::utils ::keccak256 ;
2022-08-24 03:59:05 +03:00
use ethers ::prelude ::{ Address , Block , Bytes , TxHash , H256 , U64 } ;
2022-06-16 23:57:48 +03:00
use futures ::future ::Abortable ;
2022-05-30 04:28:22 +03:00
use futures ::future ::{ join_all , AbortHandle } ;
2022-06-14 08:43:28 +03:00
use futures ::stream ::FuturesUnordered ;
2022-05-30 04:28:22 +03:00
use futures ::stream ::StreamExt ;
2022-06-16 20:51:49 +03:00
use futures ::Future ;
2022-09-09 00:01:36 +03:00
use hashbrown ::HashMap ;
use metered ::{ metered , ErrorCount , HitCount , InFlight , ResponseTime , Throughput } ;
2022-08-03 03:27:26 +03:00
use migration ::{ Migrator , MigratorTrait } ;
2022-09-05 08:53:58 +03:00
use moka ::future ::Cache ;
2022-08-23 21:48:27 +03:00
use redis_rate_limit ::bb8 ::PooledConnection ;
2022-08-16 01:50:56 +03:00
use redis_rate_limit ::{
bb8 ::{ self , ErrorSink } ,
RedisConnectionManager , RedisErrorSink , RedisPool , RedisRateLimit ,
} ;
2022-08-06 03:07:12 +03:00
use sea_orm ::DatabaseConnection ;
2022-05-30 21:23:55 +03:00
use serde_json ::json ;
2022-05-12 02:50:52 +03:00
use std ::fmt ;
2022-06-16 20:51:49 +03:00
use std ::pin ::Pin ;
2022-07-14 02:25:01 +03:00
use std ::str ::FromStr ;
2022-09-07 06:54:16 +03:00
use std ::sync ::atomic ::{ self , AtomicU64 , AtomicUsize } ;
2022-05-12 02:50:52 +03:00
use std ::sync ::Arc ;
use std ::time ::Duration ;
2022-09-07 07:11:47 +03:00
use tokio ::sync ::{ broadcast , watch } ;
2022-06-14 08:43:28 +03:00
use tokio ::task ::JoinHandle ;
2022-08-10 05:37:34 +03:00
use tokio ::time ::{ timeout , Instant } ;
2022-06-16 23:57:48 +03:00
use tokio_stream ::wrappers ::{ BroadcastStream , WatchStream } ;
2022-09-06 23:12:45 +03:00
use tracing ::{ info , info_span , instrument , trace , warn , Instrument } ;
2022-08-10 05:37:34 +03:00
use uuid ::Uuid ;
2022-05-12 02:50:52 +03:00
2022-07-09 05:23:26 +03:00
// TODO: make this customizable?
2022-05-12 02:50:52 +03:00
static APP_USER_AGENT : & str = concat! (
" satoshiandkin/ " ,
env! ( " CARGO_PKG_NAME " ) ,
" / " ,
env! ( " CARGO_PKG_VERSION " ) ,
) ;
2022-08-24 03:59:05 +03:00
/// block hash, method, params
// TODO: better name
2022-09-07 07:11:47 +03:00
type ResponseCacheKey = ( H256 , String , Option < String > ) ;
type ResponseCache = Cache < ResponseCacheKey , JsonRpcForwardedResponse > ;
2022-05-21 01:16:15 +03:00
2022-06-14 08:43:28 +03:00
pub type AnyhowJoinHandle < T > = JoinHandle < anyhow ::Result < T > > ;
2022-08-10 08:56:09 +03:00
#[ derive(Clone, Copy, From) ]
pub struct UserCacheValue {
pub expires_at : Instant ,
2022-08-13 00:00:26 +03:00
pub user_id : u64 ,
2022-08-16 01:50:56 +03:00
pub user_count_per_period : u64 ,
2022-08-10 08:56:09 +03:00
}
2022-08-24 03:59:05 +03:00
/// The application
// TODO: this debug impl is way too verbose. make something smaller
// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard
pub struct Web3ProxyApp {
/// Send requests to the best server available
pub balanced_rpcs : Arc < Web3Connections > ,
/// Send private requests (like eth_sendRawTransaction) to all these servers
pub private_rpcs : Arc < Web3Connections > ,
2022-09-05 04:52:59 +03:00
response_cache : ResponseCache ,
2022-08-24 03:59:05 +03:00
// don't drop this or the sender will stop working
// TODO: broadcast channel instead?
2022-08-30 23:01:42 +03:00
head_block_receiver : watch ::Receiver < ArcBlock > ,
2022-08-24 03:59:05 +03:00
pending_tx_sender : broadcast ::Sender < TxStatus > ,
pub config : AppConfig ,
pub db_conn : Option < sea_orm ::DatabaseConnection > ,
2022-09-09 00:01:36 +03:00
/// prometheus metrics
metrics : Arc < Web3ProxyAppMetrics > ,
2022-09-07 06:54:16 +03:00
/// store pending queries so that we don't send the same request to our backends multiple times
pub total_queries : AtomicU64 ,
2022-09-05 08:53:58 +03:00
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
pub pending_transactions : Cache < TxHash , TxStatus > ,
2022-09-07 06:54:16 +03:00
pub frontend_rate_limiter : Option < RedisRateLimit > ,
2022-08-24 03:59:05 +03:00
pub redis_pool : Option < RedisPool > ,
2022-09-05 08:53:58 +03:00
pub user_cache : Cache < Uuid , UserCacheValue > ,
2022-08-24 03:59:05 +03:00
}
2022-07-16 03:08:22 +03:00
/// flatten a JoinError into an anyhow error
2022-08-10 05:37:34 +03:00
/// Useful when joining multiple futures.
2022-06-14 08:43:28 +03:00
pub async fn flatten_handle < T > ( handle : AnyhowJoinHandle < T > ) -> anyhow ::Result < T > {
match handle . await {
Ok ( Ok ( result ) ) = > Ok ( result ) ,
Ok ( Err ( err ) ) = > Err ( err ) ,
Err ( err ) = > Err ( err . into ( ) ) ,
}
}
2022-07-16 03:08:22 +03:00
/// return the first error or okay if everything worked
2022-06-16 23:57:48 +03:00
pub async fn flatten_handles < T > (
mut handles : FuturesUnordered < AnyhowJoinHandle < T > > ,
2022-06-16 20:51:49 +03:00
) -> anyhow ::Result < ( ) > {
while let Some ( x ) = handles . next ( ) . await {
match x {
Err ( e ) = > return Err ( e . into ( ) ) ,
Ok ( Err ( e ) ) = > return Err ( e ) ,
2022-07-16 03:08:22 +03:00
Ok ( Ok ( _ ) ) = > continue ,
2022-06-16 20:51:49 +03:00
}
}
Ok ( ( ) )
}
2022-08-10 05:37:34 +03:00
/// Connect to the database and run migrations
2022-09-09 00:01:36 +03:00
#[ instrument(skip_all) ]
2022-08-06 03:07:12 +03:00
pub async fn get_migrated_db (
db_url : String ,
2022-09-02 23:16:20 +03:00
min_connections : u32 ,
2022-08-10 07:30:54 +03:00
max_connections : u32 ,
2022-08-06 03:07:12 +03:00
) -> anyhow ::Result < DatabaseConnection > {
let mut db_opt = sea_orm ::ConnectOptions ::new ( db_url ) ;
2022-08-10 05:37:34 +03:00
// TODO: load all these options from the config file. i think mysql default max is 100
2022-08-06 03:07:12 +03:00
// TODO: sqlx logging only in debug. way too verbose for production
db_opt
2022-09-02 23:16:20 +03:00
. min_connections ( min_connections )
2022-08-10 07:30:54 +03:00
. max_connections ( max_connections )
2022-08-06 03:07:12 +03:00
. connect_timeout ( Duration ::from_secs ( 8 ) )
. idle_timeout ( Duration ::from_secs ( 8 ) )
. max_lifetime ( Duration ::from_secs ( 60 ) )
. sqlx_logging ( false ) ;
// .sqlx_logging_level(log::LevelFilter::Info);
let db = sea_orm ::Database ::connect ( db_opt ) . await ? ;
// TODO: if error, roll back?
Migrator ::up ( & db , None ) . await ? ;
Ok ( db )
}
2022-09-09 00:01:36 +03:00
#[ metered(registry = Web3ProxyAppMetrics) ]
2022-05-12 02:50:52 +03:00
impl Web3ProxyApp {
2022-06-14 07:04:14 +03:00
pub async fn spawn (
2022-08-12 22:07:14 +03:00
top_config : TopConfig ,
2022-09-06 23:50:37 +03:00
num_workers : u32 ,
2022-06-16 20:51:49 +03:00
) -> anyhow ::Result < (
Arc < Web3ProxyApp > ,
Pin < Box < dyn Future < Output = anyhow ::Result < ( ) > > > > ,
) > {
2022-08-12 22:07:14 +03:00
// safety checks on the config
assert! (
2022-09-05 09:29:27 +03:00
top_config . app . redirect_user_url . contains ( " {{user_id}} " ) ,
" redirect user url must contain \" {{user_id}} \" "
2022-08-12 22:07:14 +03:00
) ;
2022-08-03 03:27:26 +03:00
// first, we connect to mysql and make sure the latest migrations have run
2022-08-12 22:07:14 +03:00
let db_conn = if let Some ( db_url ) = & top_config . app . db_url {
2022-09-06 23:50:37 +03:00
let db_min_connections = top_config . app . db_min_connections . unwrap_or ( num_workers ) ;
2022-07-26 07:53:38 +03:00
2022-09-02 23:16:20 +03:00
// TODO: what default multiple?
let redis_max_connections = top_config
. app
. db_max_connections
2022-09-06 23:50:37 +03:00
. unwrap_or ( db_min_connections * 2 ) ;
2022-09-02 23:16:20 +03:00
let db =
get_migrated_db ( db_url . clone ( ) , db_min_connections , redis_max_connections ) . await ? ;
2022-08-03 03:27:26 +03:00
2022-08-06 03:07:12 +03:00
Some ( db )
2022-07-26 07:53:38 +03:00
} else {
info! ( " no database " ) ;
None
} ;
2022-08-12 22:07:14 +03:00
let balanced_rpcs = top_config . balanced_rpcs ;
2022-07-09 02:02:32 +03:00
2022-08-12 22:07:14 +03:00
let private_rpcs = if let Some ( private_rpcs ) = top_config . private_rpcs {
2022-08-10 08:56:09 +03:00
private_rpcs
2022-07-09 02:02:32 +03:00
} else {
2022-08-10 08:56:09 +03:00
Default ::default ( )
2022-07-09 02:02:32 +03:00
} ;
// TODO: try_join_all instead?
2022-06-16 20:51:49 +03:00
let handles = FuturesUnordered ::new ( ) ;
2022-06-14 08:43:28 +03:00
2022-05-12 02:50:52 +03:00
// make a http shared client
2022-07-09 02:02:32 +03:00
// TODO: can we configure the connection pool? should we?
2022-05-12 02:50:52 +03:00
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
2022-05-22 02:34:05 +03:00
let http_client = Some (
reqwest ::ClientBuilder ::new ( )
. connect_timeout ( Duration ::from_secs ( 5 ) )
. timeout ( Duration ::from_secs ( 60 ) )
. user_agent ( APP_USER_AGENT )
. build ( ) ? ,
) ;
2022-08-12 22:07:14 +03:00
let redis_pool = match top_config . app . redis_url . as_ref ( ) {
2022-07-26 07:53:38 +03:00
Some ( redis_url ) = > {
info! ( " Connecting to redis on {} " , redis_url ) ;
2022-05-22 21:39:06 +03:00
2022-08-12 22:07:14 +03:00
let manager = RedisConnectionManager ::new ( redis_url . as_ref ( ) ) ? ;
2022-07-09 02:02:32 +03:00
2022-09-06 23:50:37 +03:00
let redis_min_connections =
top_config . app . redis_min_connections . unwrap_or ( num_workers ) ;
2022-09-02 23:16:20 +03:00
let redis_max_connections = top_config
. app
. redis_max_connections
2022-09-06 23:50:37 +03:00
. unwrap_or ( redis_min_connections * 2 ) ;
2022-09-02 23:16:20 +03:00
2022-07-09 02:11:22 +03:00
// TODO: min_idle?
// TODO: set max_size based on max expected concurrent connections? set based on num_workers?
2022-07-09 02:02:32 +03:00
let builder = bb8 ::Pool ::builder ( )
2022-08-16 01:50:56 +03:00
. error_sink ( RedisErrorSink . boxed_clone ( ) )
2022-09-02 23:16:20 +03:00
. min_idle ( Some ( redis_min_connections ) )
. max_size ( redis_max_connections ) ;
2022-07-09 02:02:32 +03:00
let pool = builder . build ( manager ) . await ? ;
2022-05-22 21:39:06 +03:00
2022-07-07 06:22:09 +03:00
Some ( pool )
2022-05-22 21:39:06 +03:00
}
None = > {
2022-07-26 07:53:38 +03:00
warn! ( " no redis connection " ) ;
2022-05-22 21:39:06 +03:00
None
2022-05-22 02:34:05 +03:00
}
} ;
2022-05-12 02:50:52 +03:00
2022-09-06 15:29:37 +03:00
// TODO: i don't like doing Block::default here! Change this to "None"?
2022-07-22 08:11:26 +03:00
let ( head_block_sender , head_block_receiver ) = watch ::channel ( Arc ::new ( Block ::default ( ) ) ) ;
2022-07-09 03:00:31 +03:00
// TODO: will one receiver lagging be okay? how big should this be?
2022-07-26 07:53:38 +03:00
let ( pending_tx_sender , pending_tx_receiver ) = broadcast ::channel ( 256 ) ;
// TODO: use this? it could listen for confirmed transactions and then clear pending_transactions, but the head_block_sender is doing that
drop ( pending_tx_receiver ) ;
2022-06-16 20:51:49 +03:00
2022-09-05 08:53:58 +03:00
// TODO: sized and timed expiration!
// TODO: put some in Redis, too?
let pending_transactions = Cache ::new ( 10000 ) ;
2022-06-16 20:51:49 +03:00
// TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them
// TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks.
// TODO: we should still have some sort of expiration or maximum size limit for the map
2022-06-14 07:04:14 +03:00
2022-08-26 20:26:17 +03:00
// this block map is shared between balanced_rpcs and private_rpcs.
2022-09-05 08:53:58 +03:00
// TODO: what limits should we have for expiration?
let block_map = BlockHashesMap ::new ( 10_000 ) ;
2022-08-26 20:26:17 +03:00
2022-06-14 08:43:28 +03:00
let ( balanced_rpcs , balanced_handle ) = Web3Connections ::spawn (
2022-08-12 22:07:14 +03:00
top_config . app . chain_id ,
2022-05-22 02:34:05 +03:00
balanced_rpcs ,
2022-07-19 07:21:32 +03:00
http_client . clone ( ) ,
2022-08-10 05:37:34 +03:00
redis_pool . clone ( ) ,
2022-08-26 20:26:17 +03:00
block_map . clone ( ) ,
2022-06-14 07:04:14 +03:00
Some ( head_block_sender ) ,
2022-08-27 06:11:58 +03:00
top_config . app . min_sum_soft_limit ,
2022-08-27 03:33:45 +03:00
top_config . app . min_synced_rpcs ,
2022-06-16 05:53:37 +03:00
Some ( pending_tx_sender . clone ( ) ) ,
2022-06-16 20:51:49 +03:00
pending_transactions . clone ( ) ,
2022-05-22 02:34:05 +03:00
)
2022-07-26 07:53:38 +03:00
. await
. context ( " balanced rpcs " ) ? ;
2022-05-18 19:35:06 +03:00
2022-06-14 08:43:28 +03:00
handles . push ( balanced_handle ) ;
2022-05-12 02:50:52 +03:00
let private_rpcs = if private_rpcs . is_empty ( ) {
2022-09-06 23:12:45 +03:00
// TODO: do None instead of clone?
2022-05-12 02:50:52 +03:00
warn! ( " No private relays configured. Any transactions will be broadcast to the public mempool! " ) ;
2022-05-13 23:50:11 +03:00
balanced_rpcs . clone ( )
2022-05-12 02:50:52 +03:00
} else {
2022-06-14 07:04:14 +03:00
// TODO: attach context to this error
2022-06-14 08:43:28 +03:00
let ( private_rpcs , private_handle ) = Web3Connections ::spawn (
2022-08-12 22:07:14 +03:00
top_config . app . chain_id ,
2022-05-22 02:34:05 +03:00
private_rpcs ,
2022-07-19 07:21:32 +03:00
http_client . clone ( ) ,
2022-08-10 05:37:34 +03:00
redis_pool . clone ( ) ,
2022-08-26 20:26:17 +03:00
block_map ,
2022-08-11 00:29:50 +03:00
// subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs
None ,
2022-08-27 03:33:45 +03:00
// minimum doesn't really matter on private rpcs
1 ,
2022-08-27 06:11:58 +03:00
1 ,
2022-08-11 00:29:50 +03:00
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits
2022-06-14 07:04:14 +03:00
None ,
2022-06-16 20:51:49 +03:00
pending_transactions . clone ( ) ,
2022-05-22 02:34:05 +03:00
)
2022-07-26 07:53:38 +03:00
. await
. context ( " private_rpcs " ) ? ;
2022-06-14 08:43:28 +03:00
handles . push ( private_handle ) ;
private_rpcs
2022-05-12 02:50:52 +03:00
} ;
2022-08-10 05:37:34 +03:00
let frontend_rate_limiter = redis_pool . as_ref ( ) . map ( | redis_pool | {
2022-08-16 01:50:56 +03:00
RedisRateLimit ::new (
2022-08-10 05:37:34 +03:00
redis_pool . clone ( ) ,
2022-08-06 08:46:33 +03:00
" web3_proxy " ,
2022-08-06 08:26:43 +03:00
" frontend " ,
2022-08-12 22:07:14 +03:00
top_config . app . public_rate_limit_per_minute ,
2022-08-30 23:01:42 +03:00
60.0 ,
2022-08-06 08:26:43 +03:00
)
} ) ;
2022-07-07 06:22:09 +03:00
2022-09-05 08:53:58 +03:00
// TODO: change this to a sized cache
2022-09-07 06:54:16 +03:00
let total_queries = 0. into ( ) ;
2022-09-06 20:56:49 +03:00
let response_cache = Cache ::new ( 10_000 ) ;
2022-09-05 08:53:58 +03:00
let user_cache = Cache ::new ( 10_000 ) ;
2022-07-07 06:22:09 +03:00
let app = Self {
2022-08-12 22:07:14 +03:00
config : top_config . app ,
2022-05-13 23:50:11 +03:00
balanced_rpcs ,
2022-05-12 02:50:52 +03:00
private_rpcs ,
2022-09-05 08:53:58 +03:00
response_cache ,
2022-05-30 04:28:22 +03:00
head_block_receiver ,
2022-06-16 05:53:37 +03:00
pending_tx_sender ,
2022-09-07 06:54:16 +03:00
total_queries ,
2022-06-16 20:51:49 +03:00
pending_transactions ,
2022-09-07 06:54:16 +03:00
frontend_rate_limiter ,
2022-07-26 07:53:38 +03:00
db_conn ,
2022-08-10 05:37:34 +03:00
redis_pool ,
2022-09-09 00:01:36 +03:00
metrics : Default ::default ( ) ,
2022-09-05 08:53:58 +03:00
user_cache ,
2022-06-14 07:04:14 +03:00
} ;
let app = Arc ::new ( app ) ;
2022-06-16 20:51:49 +03:00
let handle = Box ::pin ( flatten_handles ( handles ) ) ;
2022-06-14 08:43:28 +03:00
Ok ( ( app , handle ) )
2022-05-12 02:50:52 +03:00
}
2022-09-09 00:01:36 +03:00
pub fn prometheus_metrics ( & self ) -> anyhow ::Result < String > {
let globals = HashMap ::new ( ) ;
// TODO: what globals? should this be the hostname or what?
// globals.insert("service", "web3_proxy");
let serialized = serde_prometheus ::to_string ( & self . metrics , Some ( " web3_proxy " ) , globals ) ? ;
Ok ( serialized )
}
#[ instrument(skip_all) ]
#[ measure( [ ErrorCount, HitCount, InFlight, ResponseTime, Throughput ] ) ]
pub async fn eth_subscribe < ' a > (
self : & ' a Arc < Self > ,
2022-05-29 22:33:10 +03:00
payload : JsonRpcRequest ,
2022-09-09 00:01:36 +03:00
subscription_count : & ' a AtomicUsize ,
2022-05-30 04:28:22 +03:00
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
2022-07-09 01:14:45 +03:00
response_sender : flume ::Sender < Message > ,
2022-05-30 04:28:22 +03:00
) -> anyhow ::Result < ( AbortHandle , JsonRpcForwardedResponse ) > {
2022-06-14 10:13:42 +03:00
let ( subscription_abort_handle , subscription_registration ) = AbortHandle ::new_pair ( ) ;
2022-05-30 04:28:22 +03:00
2022-06-14 07:04:14 +03:00
// TODO: this only needs to be unique per connection. we don't need it globably unique
2022-07-09 01:14:45 +03:00
let subscription_id = subscription_count . fetch_add ( 1 , atomic ::Ordering ::SeqCst ) ;
2022-08-04 02:17:02 +03:00
let subscription_id = U64 ::from ( subscription_id ) ;
2022-05-30 21:23:55 +03:00
2022-06-05 23:39:58 +03:00
// save the id so we can use it in the response
let id = payload . id . clone ( ) ;
2022-07-16 03:35:54 +03:00
// TODO: calling json! on every request is probably not fast. but we can only match against
// TODO: i think we need a stricter EthSubscribeRequest type that JsonRpcRequest can turn into
2022-07-08 22:01:11 +03:00
match payload . params {
Some ( x ) if x = = json! ( [ " newHeads " ] ) = > {
2022-06-16 23:57:48 +03:00
let head_block_receiver = self . head_block_receiver . clone ( ) ;
trace! ( ? subscription_id , " new heads subscription " ) ;
tokio ::spawn ( async move {
let mut head_block_receiver = Abortable ::new (
WatchStream ::new ( head_block_receiver ) ,
subscription_registration ,
) ;
while let Some ( new_head ) = head_block_receiver . next ( ) . await {
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json! ( {
" jsonrpc " : " 2.0 " ,
" method " :" eth_subscription " ,
" params " : {
" subscription " : subscription_id ,
2022-08-11 00:29:50 +03:00
// TODO: option to include full transaction objects instead of just the hashes?
2022-07-22 08:11:26 +03:00
" result " : new_head . as_ref ( ) ,
2022-06-16 23:57:48 +03:00
} ,
} ) ;
2022-08-11 00:29:50 +03:00
// TODO: do clients support binary messages?
let msg = Message ::Text (
serde_json ::to_string ( & msg ) . expect ( " this should always be valid json " ) ,
) ;
2022-06-16 23:57:48 +03:00
2022-07-09 01:14:45 +03:00
if response_sender . send_async ( msg ) . await . is_err ( ) {
2022-06-16 23:57:48 +03:00
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break ;
} ;
}
trace! ( ? subscription_id , " closed new heads subscription " ) ;
} ) ;
2022-05-30 04:28:22 +03:00
}
2022-07-08 22:01:11 +03:00
Some ( x ) if x = = json! ( [ " newPendingTransactions " ] ) = > {
2022-06-16 23:57:48 +03:00
let pending_tx_receiver = self . pending_tx_sender . subscribe ( ) ;
let mut pending_tx_receiver = Abortable ::new (
BroadcastStream ::new ( pending_tx_receiver ) ,
subscription_registration ,
) ;
trace! ( ? subscription_id , " pending transactions subscription " ) ;
tokio ::spawn ( async move {
while let Some ( Ok ( new_tx_state ) ) = pending_tx_receiver . next ( ) . await {
let new_tx = match new_tx_state {
2022-08-24 03:59:05 +03:00
TxStatus ::Pending ( tx ) = > tx ,
TxStatus ::Confirmed ( .. ) = > continue ,
TxStatus ::Orphaned ( tx ) = > tx ,
2022-06-16 23:57:48 +03:00
} ;
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json! ( {
" jsonrpc " : " 2.0 " ,
" method " : " eth_subscription " ,
" params " : {
" subscription " : subscription_id ,
" result " : new_tx . hash ,
} ,
} ) ;
let msg = Message ::Text ( serde_json ::to_string ( & msg ) . unwrap ( ) ) ;
2022-07-09 01:14:45 +03:00
if response_sender . send_async ( msg ) . await . is_err ( ) {
2022-06-16 23:57:48 +03:00
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break ;
} ;
}
trace! ( ? subscription_id , " closed new heads subscription " ) ;
} ) ;
}
2022-07-08 22:01:11 +03:00
Some ( x ) if x = = json! ( [ " newPendingFullTransactions " ] ) = > {
2022-06-16 23:57:48 +03:00
// TODO: too much copy/pasta with newPendingTransactions
let pending_tx_receiver = self . pending_tx_sender . subscribe ( ) ;
let mut pending_tx_receiver = Abortable ::new (
BroadcastStream ::new ( pending_tx_receiver ) ,
subscription_registration ,
) ;
trace! ( ? subscription_id , " pending transactions subscription " ) ;
// TODO: do something with this handle?
tokio ::spawn ( async move {
while let Some ( Ok ( new_tx_state ) ) = pending_tx_receiver . next ( ) . await {
let new_tx = match new_tx_state {
2022-08-24 03:59:05 +03:00
TxStatus ::Pending ( tx ) = > tx ,
TxStatus ::Confirmed ( .. ) = > continue ,
TxStatus ::Orphaned ( tx ) = > tx ,
2022-06-16 23:57:48 +03:00
} ;
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json! ( {
" jsonrpc " : " 2.0 " ,
" method " : " eth_subscription " ,
" params " : {
" subscription " : subscription_id ,
// upstream just sends the txid, but we want to send the whole transaction
" result " : new_tx ,
} ,
} ) ;
let msg = Message ::Text ( serde_json ::to_string ( & msg ) . unwrap ( ) ) ;
2022-07-09 01:14:45 +03:00
if response_sender . send_async ( msg ) . await . is_err ( ) {
2022-06-18 10:06:54 +03:00
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break ;
} ;
}
trace! ( ? subscription_id , " closed new heads subscription " ) ;
} ) ;
}
2022-07-08 22:01:11 +03:00
Some ( x ) if x = = json! ( [ " newPendingRawTransactions " ] ) = > {
2022-06-18 10:06:54 +03:00
// TODO: too much copy/pasta with newPendingTransactions
let pending_tx_receiver = self . pending_tx_sender . subscribe ( ) ;
let mut pending_tx_receiver = Abortable ::new (
BroadcastStream ::new ( pending_tx_receiver ) ,
subscription_registration ,
) ;
trace! ( ? subscription_id , " pending transactions subscription " ) ;
// TODO: do something with this handle?
tokio ::spawn ( async move {
while let Some ( Ok ( new_tx_state ) ) = pending_tx_receiver . next ( ) . await {
let new_tx = match new_tx_state {
2022-08-24 03:59:05 +03:00
TxStatus ::Pending ( tx ) = > tx ,
TxStatus ::Confirmed ( .. ) = > continue ,
TxStatus ::Orphaned ( tx ) = > tx ,
2022-06-18 10:06:54 +03:00
} ;
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json! ( {
" jsonrpc " : " 2.0 " ,
" method " : " eth_subscription " ,
" params " : {
" subscription " : subscription_id ,
2022-07-08 22:01:11 +03:00
// upstream just sends the txid, but we want to send the raw transaction
2022-06-18 10:06:54 +03:00
" result " : new_tx . rlp ( ) ,
} ,
} ) ;
let msg = Message ::Text ( serde_json ::to_string ( & msg ) . unwrap ( ) ) ;
2022-07-09 01:14:45 +03:00
if response_sender . send_async ( msg ) . await . is_err ( ) {
2022-06-16 23:57:48 +03:00
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break ;
} ;
}
trace! ( ? subscription_id , " closed new heads subscription " ) ;
} ) ;
}
_ = > return Err ( anyhow ::anyhow! ( " unimplemented " ) ) ,
}
2022-05-30 04:28:22 +03:00
2022-06-14 10:13:42 +03:00
// TODO: do something with subscription_join_handle?
2022-05-30 04:28:22 +03:00
2022-07-22 22:30:39 +03:00
let response = JsonRpcForwardedResponse ::from_value ( json! ( subscription_id ) , id ) ;
2022-05-30 04:28:22 +03:00
2022-06-16 05:53:37 +03:00
// TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct?
2022-06-14 10:13:42 +03:00
Ok ( ( subscription_abort_handle , response ) )
2022-05-29 22:33:10 +03:00
}
2022-07-22 22:30:39 +03:00
/// send the request or batch of requests to the approriate RPCs
2022-05-17 03:56:56 +03:00
#[ instrument(skip_all) ]
2022-05-12 02:50:52 +03:00
pub async fn proxy_web3_rpc (
2022-09-09 00:01:36 +03:00
self : & Arc < Self > ,
2022-05-12 02:50:52 +03:00
request : JsonRpcRequestEnum ,
2022-05-20 08:27:18 +03:00
) -> anyhow ::Result < JsonRpcForwardedResponseEnum > {
2022-09-05 04:52:59 +03:00
// TODO: this should probably be trace level
2022-09-06 23:12:45 +03:00
trace! ( ? request , " proxy_web3_rpc " ) ;
2022-05-12 02:50:52 +03:00
2022-05-22 07:22:30 +03:00
// even though we have timeouts on the requests to our backend providers,
2022-07-22 22:30:39 +03:00
// we need a timeout for the incoming request so that retries don't run forever
// TODO: take this as an optional argument. per user max? expiration time instead of duration?
let max_time = Duration ::from_secs ( 120 ) ;
2022-05-12 02:50:52 +03:00
let response = match request {
2022-05-22 07:22:30 +03:00
JsonRpcRequestEnum ::Single ( request ) = > JsonRpcForwardedResponseEnum ::Single (
2022-08-16 03:33:26 +03:00
timeout ( max_time , self . proxy_web3_rpc_request ( request ) ) . await ? ? ,
2022-05-22 07:22:30 +03:00
) ,
JsonRpcRequestEnum ::Batch ( requests ) = > JsonRpcForwardedResponseEnum ::Batch (
2022-08-16 03:33:26 +03:00
timeout ( max_time , self . proxy_web3_rpc_requests ( requests ) ) . await ? ? ,
2022-05-22 07:22:30 +03:00
) ,
2022-05-12 02:50:52 +03:00
} ;
2022-09-05 04:52:59 +03:00
// TODO: this should probably be trace level
2022-09-06 23:12:45 +03:00
trace! ( ? response , " Forwarding " ) ;
2022-05-17 03:56:56 +03:00
2022-05-20 08:27:18 +03:00
Ok ( response )
2022-05-12 02:50:52 +03:00
}
2022-09-09 00:01:36 +03:00
/// cut up the request and send to potentually different servers
/// TODO: make sure this isn't a problem
#[ instrument(skip_all) ]
2022-05-12 02:50:52 +03:00
async fn proxy_web3_rpc_requests (
2022-09-09 00:01:36 +03:00
self : & Arc < Self > ,
2022-05-12 02:50:52 +03:00
requests : Vec < JsonRpcRequest > ,
) -> anyhow ::Result < Vec < JsonRpcForwardedResponse > > {
// TODO: we should probably change ethers-rs to support this directly
let num_requests = requests . len ( ) ;
let responses = join_all (
requests
. into_iter ( )
2022-08-16 03:33:26 +03:00
. map ( | request | self . proxy_web3_rpc_request ( request ) )
2022-05-12 02:50:52 +03:00
. collect ::< Vec < _ > > ( ) ,
)
. await ;
// TODO: i'm sure this could be done better with iterators
let mut collected : Vec < JsonRpcForwardedResponse > = Vec ::with_capacity ( num_requests ) ;
for response in responses {
2022-05-18 19:35:06 +03:00
collected . push ( response ? ) ;
2022-05-12 02:50:52 +03:00
}
Ok ( collected )
}
2022-09-09 00:01:36 +03:00
#[ instrument(skip_all) ]
2022-09-02 08:40:56 +03:00
pub async fn redis_conn ( & self ) -> anyhow ::Result < PooledConnection < RedisConnectionManager > > {
match self . redis_pool . as_ref ( ) {
None = > Err ( anyhow ::anyhow! ( " no redis server configured " ) ) ,
Some ( redis_pool ) = > {
let redis_conn = redis_pool . get ( ) . await ? ;
Ok ( redis_conn )
}
}
}
2022-09-09 00:01:36 +03:00
#[ measure( [ ErrorCount, HitCount, InFlight, ResponseTime, Throughput ] ) ]
#[ instrument(skip_all) ]
2022-05-12 02:50:52 +03:00
async fn proxy_web3_rpc_request (
2022-09-09 00:01:36 +03:00
self : & Arc < Self > ,
2022-07-09 05:23:26 +03:00
mut request : JsonRpcRequest ,
2022-05-12 02:50:52 +03:00
) -> anyhow ::Result < JsonRpcForwardedResponse > {
trace! ( " Received request: {:?} " , request ) ;
2022-09-06 20:56:49 +03:00
// save the id so we can attach it to the response
2022-09-07 06:54:16 +03:00
let request_id = request . id . clone ( ) ;
2022-09-06 20:56:49 +03:00
2022-05-31 04:55:04 +03:00
// TODO: if eth_chainId or net_version, serve those without querying the backend
2022-05-22 07:22:30 +03:00
// TODO: how much should we retry? probably with a timeout and not with a count like this
// TODO: think more about this loop.
2022-06-14 07:04:14 +03:00
// // TODO: add more to this span such as
let span = info_span! ( " rpc_request " ) ;
2022-05-29 04:23:58 +03:00
// let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon)
2022-07-22 22:30:39 +03:00
2022-09-07 07:11:47 +03:00
self . total_queries . fetch_add ( 1 , atomic ::Ordering ::Relaxed ) ;
2022-09-05 08:53:58 +03:00
// TODO: don't clone
let partial_response : serde_json ::Value = match request . method . clone ( ) . as_ref ( ) {
2022-07-09 05:23:26 +03:00
// lots of commands are blocked
2022-09-09 00:01:36 +03:00
method @ ( " admin_addPeer "
2022-06-14 09:54:19 +03:00
| " admin_datadir "
| " admin_startRPC "
| " admin_startWS "
| " admin_stopRPC "
| " admin_stopWS "
2022-07-09 05:23:26 +03:00
| " db_getHex "
| " db_getString "
| " db_putHex "
| " db_putString "
2022-06-14 09:54:19 +03:00
| " debug_chaindbCompact "
| " debug_freezeClient "
| " debug_goTrace "
| " debug_mutexProfile "
| " debug_setBlockProfileRate "
| " debug_setGCPercent "
| " debug_setHead "
| " debug_setMutexProfileFraction "
| " debug_standardTraceBlockToFile "
| " debug_standardTraceBadBlockToFile "
| " debug_startCPUProfile "
| " debug_startGoTrace "
| " debug_stopCPUProfile "
| " debug_stopGoTrace "
| " debug_writeBlockProfile "
| " debug_writeMemProfile "
| " debug_writeMutexProfile "
2022-07-09 05:23:26 +03:00
| " eth_compileLLL "
| " eth_compileSerpent "
| " eth_compileSolidity "
| " eth_getCompilers "
| " eth_sendTransaction "
| " eth_sign "
| " eth_signTransaction "
| " eth_submitHashrate "
| " eth_submitWork "
2022-06-14 09:54:19 +03:00
| " les_addBalance "
| " les_setClientParams "
| " les_setDefaultParams "
| " miner_setExtra "
| " miner_setGasPrice "
| " miner_start "
| " miner_stop "
| " miner_setEtherbase "
| " miner_setGasLimit "
| " personal_importRawKey "
| " personal_listAccounts "
| " personal_lockAccount "
| " personal_newAccount "
| " personal_unlockAccount "
| " personal_sendTransaction "
| " personal_sign "
2022-07-09 05:23:26 +03:00
| " personal_ecRecover "
| " shh_addToGroup "
| " shh_getFilterChanges "
| " shh_getMessages "
| " shh_hasIdentity "
| " shh_newFilter "
| " shh_newGroup "
| " shh_newIdentity "
| " shh_post "
| " shh_uninstallFilter "
2022-09-09 00:01:36 +03:00
| " shh_version " ) = > {
2022-08-13 01:12:46 +03:00
// TODO: client error stat
2022-06-30 03:52:04 +03:00
// TODO: proper error code
2022-09-09 00:01:36 +03:00
return Err ( anyhow ::anyhow! ( " method unsupported: {} " , method ) ) ;
2022-07-09 05:23:26 +03:00
}
// TODO: implement these commands
2022-09-09 00:01:36 +03:00
method @ ( " eth_getFilterChanges "
2022-07-09 05:23:26 +03:00
| " eth_getFilterLogs "
| " eth_newBlockFilter "
| " eth_newFilter "
| " eth_newPendingTransactionFilter "
2022-09-09 00:01:36 +03:00
| " eth_uninstallFilter " ) = > {
2022-08-13 01:12:46 +03:00
// TODO: unsupported command stat
2022-09-09 00:01:36 +03:00
return Err ( anyhow ::anyhow! ( " not yet implemented: {} " , method ) ) ;
2022-08-13 01:12:46 +03:00
}
2022-07-09 05:23:26 +03:00
// some commands can use local data or caches
2022-08-13 01:12:46 +03:00
" eth_accounts " = > {
// no stats on this. its cheap
serde_json ::Value ::Array ( vec! [ ] )
}
2022-07-09 05:23:26 +03:00
" eth_blockNumber " = > {
2022-09-01 08:58:55 +03:00
match self . balanced_rpcs . head_block_num ( ) {
Some ( head_block_num ) = > {
json! ( head_block_num )
}
None = > {
// TODO: what does geth do if this happens?
2022-09-09 00:01:36 +03:00
return Err ( anyhow ::anyhow! (
" no servers synced. unknown eth_blockNumber "
) ) ;
2022-09-01 08:58:55 +03:00
}
2022-07-09 05:23:26 +03:00
}
}
// TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle)
// TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject)
// TODO: eth_sendPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_sendprivatetransaction)
" eth_coinbase " = > {
2022-07-22 22:30:39 +03:00
// no need for serving coinbase
// we could return a per-user payment address here, but then we might leak that to dapps
2022-08-13 01:12:46 +03:00
// no stats on this. its cheap
2022-07-22 22:30:39 +03:00
json! ( Address ::zero ( ) )
2022-07-09 05:23:26 +03:00
}
// TODO: eth_estimateGas using anvil?
// TODO: eth_gasPrice that does awesome magic to predict the future
" eth_hashrate " = > {
2022-08-13 01:12:46 +03:00
// no stats on this. its cheap
2022-07-22 22:30:39 +03:00
json! ( U64 ::zero ( ) )
2022-06-30 03:52:04 +03:00
}
2022-07-09 05:23:26 +03:00
" eth_mining " = > {
2022-08-13 01:12:46 +03:00
// no stats on this. its cheap
2022-07-22 22:30:39 +03:00
json! ( false )
2022-07-09 05:23:26 +03:00
}
// TODO: eth_sendBundle (flashbots command)
// broadcast transactions to all private rpcs at once
2022-08-09 19:54:05 +03:00
" eth_sendRawTransaction " = > {
2022-08-13 01:12:46 +03:00
// emit stats
2022-08-09 19:54:05 +03:00
return self
. private_rpcs
. try_send_all_upstream_servers ( request , None )
. instrument ( span )
. await ;
}
2022-07-09 05:23:26 +03:00
" eth_syncing " = > {
2022-08-13 01:12:46 +03:00
// no stats on this. its cheap
2022-07-09 05:23:26 +03:00
// TODO: return a real response if all backends are syncing or if no servers in sync
2022-07-22 22:30:39 +03:00
json! ( false )
2022-07-09 05:23:26 +03:00
}
" net_listening " = > {
2022-08-13 01:12:46 +03:00
// no stats on this. its cheap
2022-07-09 05:23:26 +03:00
// TODO: only if there are some backends on balanced_rpcs?
2022-07-22 22:30:39 +03:00
json! ( true )
2022-07-09 05:23:26 +03:00
}
2022-08-13 01:12:46 +03:00
" net_peerCount " = > {
// emit stats
self . balanced_rpcs . num_synced_rpcs ( ) . into ( )
}
" web3_clientVersion " = > {
// no stats on this. its cheap
serde_json ::Value ::String ( APP_USER_AGENT . to_string ( ) )
}
2022-07-22 22:30:39 +03:00
" web3_sha3 " = > {
2022-08-13 01:12:46 +03:00
// emit stats
2022-07-22 22:30:39 +03:00
// returns Keccak-256 (not the standardized SHA3-256) of the given data.
match & request . params {
Some ( serde_json ::Value ::Array ( params ) ) = > {
2022-08-10 05:37:34 +03:00
// TODO: make a struct and use serde conversion to clean this up
2022-07-22 22:30:39 +03:00
if params . len ( ) ! = 1 | | ! params [ 0 ] . is_string ( ) {
return Err ( anyhow ::anyhow! ( " invalid request " ) ) ;
}
2022-07-09 05:23:26 +03:00
2022-07-22 22:30:39 +03:00
let param = Bytes ::from_str ( params [ 0 ] . as_str ( ) . unwrap ( ) ) ? ;
2022-07-09 05:23:26 +03:00
2022-07-22 22:30:39 +03:00
let hash = H256 ::from ( keccak256 ( param ) ) ;
2022-07-09 05:23:26 +03:00
2022-07-22 22:30:39 +03:00
json! ( hash )
}
_ = > return Err ( anyhow ::anyhow! ( " invalid request " ) ) ,
}
2022-07-09 05:23:26 +03:00
}
2022-07-22 22:30:39 +03:00
// anything else gets sent to backend rpcs and cached
2022-06-30 03:52:04 +03:00
method = > {
2022-08-13 01:12:46 +03:00
// emit stats
2022-09-07 06:54:16 +03:00
// TODO: wait for them to be synced?
let head_block_id = self
2022-09-01 08:58:55 +03:00
. balanced_rpcs
2022-09-07 06:54:16 +03:00
. head_block_id ( )
2022-09-01 08:58:55 +03:00
. context ( " no servers synced " ) ? ;
2022-07-16 07:13:02 +03:00
2022-07-09 05:23:26 +03:00
// we do this check before checking caches because it might modify the request params
2022-07-16 08:48:02 +03:00
// TODO: add a stat for archive vs full since they should probably cost different
2022-09-07 06:54:16 +03:00
let request_block_id = if let Some ( request_block_needed ) =
block_needed ( method , request . params . as_mut ( ) , head_block_id . num )
2022-09-05 09:13:36 +03:00
{
2022-09-07 06:54:16 +03:00
// TODO: maybe this should be on the app and not on balanced_rpcs
let request_block_hash =
self . balanced_rpcs . block_hash ( & request_block_needed ) . await ? ;
2022-09-05 08:53:58 +03:00
2022-09-07 06:54:16 +03:00
BlockId {
num : request_block_needed ,
hash : request_block_hash ,
2022-06-14 07:04:14 +03:00
}
2022-09-07 06:54:16 +03:00
} else {
head_block_id
2022-06-14 07:04:14 +03:00
} ;
2022-09-07 06:54:16 +03:00
// TODO: struct for this?
// TODO: this can be rather large. is that okay?
let cache_key = (
request_block_id . hash ,
request . method . clone ( ) ,
request . params . clone ( ) . map ( | x | x . to_string ( ) ) ,
) ;
2022-07-22 22:30:39 +03:00
// TODO: move this caching outside this match and cache some of the other responses?
// TODO: cache the warp::reply to save us serializing every time?
2022-09-06 20:56:49 +03:00
let mut response = self
2022-09-05 04:52:59 +03:00
. response_cache
2022-09-05 08:53:58 +03:00
. try_get_with ( cache_key , async move {
match method {
" temporarily disabled " = > {
// "eth_getTransactionByHash" | "eth_getTransactionReceipt" => {
// TODO: try_send_all serially with retries instead of parallel
self . private_rpcs
2022-09-07 06:54:16 +03:00
. try_send_all_upstream_servers (
request ,
Some ( & request_block_id . num ) ,
)
2022-09-05 08:53:58 +03:00
. await
}
_ = > {
2022-09-07 06:54:16 +03:00
// TODO: retry some failures automatically!
2022-09-05 08:53:58 +03:00
self . balanced_rpcs
2022-09-07 06:54:16 +03:00
. try_send_best_upstream_server (
request ,
Some ( & request_block_id . num ) ,
)
2022-09-05 08:53:58 +03:00
. await
}
}
} )
. await
. unwrap ( ) ;
2022-05-29 04:23:58 +03:00
2022-09-07 06:54:16 +03:00
// since this data came out of a cache, the id is likely wrong.
// replace the id with our request's id.
response . id = request_id ;
2022-09-06 20:56:49 +03:00
2022-07-22 22:30:39 +03:00
return Ok ( response ) ;
2022-06-14 07:04:14 +03:00
}
2022-07-22 22:30:39 +03:00
} ;
2022-09-07 06:54:16 +03:00
let response = JsonRpcForwardedResponse ::from_value ( partial_response , request_id ) ;
2022-07-22 22:30:39 +03:00
Ok ( response )
2022-05-12 02:50:52 +03:00
}
}
2022-08-24 03:59:05 +03:00
impl fmt ::Debug for Web3ProxyApp {
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f . debug_struct ( " Web3ProxyApp " ) . finish_non_exhaustive ( )
}
}