2022-04-26 19:46:32 +03:00
// TODO: don't use RwLock<HashMap>. i think we need a concurrent hashmap or we will hit all sorts of deadlocks
2022-04-26 09:54:24 +03:00
2022-04-26 19:50:02 +03:00
use derive_more ::From ;
2022-04-26 09:54:24 +03:00
use ethers ::prelude ::{ Block , TxHash } ;
use ethers ::providers ::Middleware ;
2022-04-25 00:54:29 +03:00
use futures ::future ;
2022-04-25 22:14:10 +03:00
use futures ::StreamExt ;
2022-04-25 04:12:07 +03:00
use governor ::clock ::{ Clock , QuantaClock , QuantaInstant } ;
2022-04-24 21:56:46 +03:00
use governor ::middleware ::NoOpMiddleware ;
use governor ::state ::{ InMemoryState , NotKeyed } ;
use governor ::{ NotUntil , RateLimiter } ;
2022-04-26 09:54:24 +03:00
use std ::cmp ::Ordering ;
use std ::collections ::HashMap ;
2022-04-24 21:56:46 +03:00
use std ::num ::NonZeroU32 ;
2022-03-05 06:46:57 +03:00
use std ::sync ::Arc ;
2022-04-25 22:14:10 +03:00
use std ::time ::Duration ;
2022-04-26 09:54:24 +03:00
use tokio ::sync ::{ mpsc , RwLock } ;
2022-04-25 04:12:07 +03:00
use tokio ::time ::sleep ;
2022-04-26 10:16:16 +03:00
use tracing ::info ;
2022-03-05 06:46:57 +03:00
use warp ::Filter ;
2022-04-26 09:54:24 +03:00
static APP_USER_AGENT : & str = concat! (
" satoshiandkin/ " ,
env! ( " CARGO_PKG_NAME " ) ,
" / " ,
env! ( " CARGO_PKG_VERSION " ) ,
) ;
2022-04-26 20:03:38 +03:00
type RpcRateLimiter =
RateLimiter < NotKeyed , InMemoryState , QuantaClock , NoOpMiddleware < QuantaInstant > > ;
type BlockMap = RwLock < HashMap < String , Block < TxHash > > > ;
type RateLimiterMap = RwLock < HashMap < String , RpcRateLimiter > > ;
// TODO: include the ethers client on this map
type ConnectionsMap = RwLock < HashMap < String , EthersConnection > > ;
// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit https://github.com/gakonst/ethers-rs/issues/592
2022-04-26 19:50:02 +03:00
#[ derive(From) ]
2022-04-26 09:54:24 +03:00
enum EthersProvider {
Http ( ethers ::providers ::Provider < ethers ::providers ::Http > ) ,
Ws ( ethers ::providers ::Provider < ethers ::providers ::Ws > ) ,
}
2022-04-26 20:03:38 +03:00
/// Forward functions to the inner ethers::providers::Provider
2022-04-26 09:54:24 +03:00
impl EthersProvider {
2022-04-26 19:50:02 +03:00
/// Send a web3 request
2022-04-26 09:54:24 +03:00
pub async fn request (
& self ,
method : & str ,
params : serde_json ::Value ,
) -> Result < serde_json ::Value , ethers ::prelude ::ProviderError > {
match self {
Self ::Http ( provider ) = > provider . request ( method , params ) . await ,
Self ::Ws ( provider ) = > provider . request ( method , params ) . await ,
}
}
2022-04-26 19:45:47 +03:00
2022-04-26 19:50:02 +03:00
/// Subscribe to new block heads
2022-04-26 19:45:47 +03:00
pub async fn new_heads ( & self , url : String , blocks : Arc < BlockMap > ) -> anyhow ::Result < ( ) > {
// TODO: automatically reconnect
match & self {
EthersProvider ::Http ( provider ) = > {
let mut stream = provider . watch_blocks ( ) . await ? ;
while let Some ( block_number ) = stream . next ( ) . await {
let block = provider . get_block ( block_number ) . await ? . unwrap ( ) ;
println! (
" {:?} = {} Ts: {:?}, block number: {} " ,
block . hash . unwrap ( ) ,
url ,
block . timestamp ,
block . number . unwrap ( ) ,
) ;
let mut blocks = blocks . write ( ) . await ;
blocks . insert ( url . clone ( ) , block ) ;
}
}
EthersProvider ::Ws ( provider ) = > {
let mut stream = provider . subscribe_blocks ( ) . await ? ;
while let Some ( block ) = stream . next ( ) . await {
// TODO: save the block into a dashmap on
println! (
" {:?} = {} Ts: {:?}, block number: {} " ,
block . hash . unwrap ( ) ,
url ,
block . timestamp ,
block . number . unwrap ( ) ,
) ;
let mut blocks = blocks . write ( ) . await ;
blocks . insert ( url . clone ( ) , block ) ;
}
}
}
Ok ( ( ) )
}
2022-04-26 09:54:24 +03:00
}
2022-04-26 20:03:38 +03:00
/// An active connection to a Web3Rpc
2022-04-26 09:54:24 +03:00
struct EthersConnection {
/// keep track of currently open requests. We sort on this
active_requests : u32 ,
provider : Arc < EthersProvider > ,
}
impl EthersConnection {
2022-04-26 20:03:38 +03:00
/// Connect to a web3 rpc and subscribe to new heads
2022-04-26 09:54:24 +03:00
async fn try_new (
2022-04-26 19:45:47 +03:00
url_str : String ,
2022-04-26 09:54:24 +03:00
http_client : Option < reqwest ::Client > ,
blocks : Arc < BlockMap > ,
) -> anyhow ::Result < EthersConnection > {
// TODO: create an ethers-rs rpc client and subscribe/watch new heads in a spawned task
let provider = if url_str . starts_with ( " http " ) {
2022-04-26 19:45:47 +03:00
let url : url ::Url = url_str . parse ( ) ? ;
2022-04-26 09:54:24 +03:00
let http_client = http_client . ok_or_else ( | | anyhow ::anyhow! ( " no http_client " ) ) ? ;
2022-04-26 10:16:16 +03:00
let provider = ethers ::providers ::Http ::new_with_client ( url , http_client ) ;
// TODO: dry this up
ethers ::providers ::Provider ::new ( provider )
. interval ( Duration ::from_secs ( 1 ) )
. into ( )
2022-04-26 09:54:24 +03:00
} else if url_str . starts_with ( " ws " ) {
2022-04-26 19:45:47 +03:00
let provider = ethers ::providers ::Ws ::connect ( url_str . clone ( ) ) . await ? ;
2022-04-26 10:16:16 +03:00
2022-04-26 09:54:24 +03:00
// TODO: make sure this survives disconnects
2022-04-26 10:16:16 +03:00
// TODO: dry this up
ethers ::providers ::Provider ::new ( provider )
. interval ( Duration ::from_secs ( 1 ) )
. into ( )
2022-04-26 09:54:24 +03:00
} else {
return Err ( anyhow ::anyhow! ( " only http and ws servers are supported " ) ) ;
} ;
2022-04-26 19:45:47 +03:00
let provider = Arc ::new ( provider ) ;
2022-04-26 09:54:24 +03:00
2022-04-26 19:45:47 +03:00
// subscribe to new heads in a spawned future
// TODO: if http, maybe we should check them all on the same interval. and if there is at least one websocket, use that message to start check?
let provider_clone : Arc < EthersProvider > = Arc ::clone ( & provider ) ;
tokio ::spawn ( async move { provider_clone . new_heads ( url_str , blocks ) . await } ) ;
2022-04-26 09:54:24 +03:00
Ok ( EthersConnection {
active_requests : 0 ,
2022-04-26 19:45:47 +03:00
provider ,
2022-04-26 09:54:24 +03:00
} )
}
2022-04-26 20:03:38 +03:00
fn inc_active_requests ( & mut self ) {
2022-04-26 09:54:24 +03:00
self . active_requests + = 1 ;
}
2022-04-26 20:03:38 +03:00
fn dec_active_requests ( & mut self ) {
2022-04-26 09:54:24 +03:00
self . active_requests - = 1 ;
}
}
impl Eq for EthersConnection { }
impl Ord for EthersConnection {
fn cmp ( & self , other : & Self ) -> std ::cmp ::Ordering {
self . active_requests . cmp ( & other . active_requests )
}
}
impl PartialOrd for EthersConnection {
fn partial_cmp ( & self , other : & Self ) -> Option < Ordering > {
Some ( self . cmp ( other ) )
}
}
2022-04-26 20:03:38 +03:00
/// note that this is just comparing the active requests. two providers with different rpc urls are equal!
2022-04-26 09:54:24 +03:00
impl PartialEq for EthersConnection {
fn eq ( & self , other : & Self ) -> bool {
self . active_requests = = other . active_requests
}
}
2022-04-26 20:03:38 +03:00
/// Load balance to the rpc
2022-04-26 09:54:24 +03:00
struct RpcTier {
2022-04-26 20:03:38 +03:00
/// RPC urls sorted by
/// TODO: what type?
2022-04-24 10:26:00 +03:00
rpcs : RwLock < Vec < String > > ,
2022-04-26 09:54:24 +03:00
connections : Arc < ConnectionsMap > ,
2022-04-25 01:36:51 +03:00
ratelimits : RateLimiterMap ,
2022-04-24 10:26:00 +03:00
}
2022-03-05 07:51:38 +03:00
2022-04-26 09:54:24 +03:00
impl RpcTier {
async fn try_new (
servers : Vec < ( & str , u32 ) > ,
http_client : Option < reqwest ::Client > ,
blocks : Arc < BlockMap > ,
clock : & QuantaClock ,
) -> anyhow ::Result < RpcTier > {
2022-04-24 10:26:00 +03:00
let mut rpcs : Vec < String > = vec! [ ] ;
2022-04-26 09:54:24 +03:00
let mut connections = HashMap ::new ( ) ;
let mut ratelimits = HashMap ::new ( ) ;
2022-04-24 10:26:00 +03:00
2022-04-25 04:12:07 +03:00
for ( s , limit ) in servers . into_iter ( ) {
2022-04-24 10:26:00 +03:00
rpcs . push ( s . to_string ( ) ) ;
2022-04-25 23:26:54 +03:00
2022-04-26 09:54:24 +03:00
let connection =
2022-04-26 19:45:47 +03:00
EthersConnection ::try_new ( s . to_string ( ) , http_client . clone ( ) , blocks . clone ( ) )
. await ? ;
2022-04-25 23:26:54 +03:00
2022-04-26 09:54:24 +03:00
connections . insert ( s . to_string ( ) , connection ) ;
2022-04-24 21:56:46 +03:00
if limit > 0 {
let quota = governor ::Quota ::per_second ( NonZeroU32 ::new ( limit ) . unwrap ( ) ) ;
2022-04-25 04:12:07 +03:00
let rate_limiter = governor ::RateLimiter ::direct_with_clock ( quota , clock ) ;
2022-04-24 21:56:46 +03:00
ratelimits . insert ( s . to_string ( ) , rate_limiter ) ;
}
2022-04-24 10:26:00 +03:00
}
2022-04-26 09:54:24 +03:00
/*
2022-04-25 22:14:10 +03:00
let new_heads_handles = rpcs
. clone ( )
. into_iter ( )
. map ( | rpc | {
// start the subscription inside an abort handler. this way, dropping this BalancedRpcs will close these connections
let ( abort_handle , abort_registration ) = AbortHandle ::new_pair ( ) ;
tokio ::spawn ( Abortable ::new (
async move {
// replace "http" at the start with "ws"
// TODO: this is fragile. some nodes use different ports, too. use proper config
// TODO: maybe we should use this websocket for more than just the new heads subscription. we could send all our requests over it (but would need to modify ids)
let re = Regex ::new ( " ^http " ) . expect ( " bad regex " ) ;
let ws_rpc = re . replace ( & rpc , " ws " ) ;
// TODO: if websocket not supported, use polling?
let ws_rpc = url ::Url ::parse ( & ws_rpc ) . expect ( " invalid websocket url " ) ;
// loop so that if it disconnects, we reconnect
loop {
match connect_async ( & ws_rpc ) . await {
Ok ( ( ws_stream , _ ) ) = > {
let ( mut write , mut read ) = ws_stream . split ( ) ;
// TODO: send eth_subscribe New Heads
if ( write . send ( tungstenite ::Message ::Text ( " { \" id \" : 1, \" method \" : \" eth_subscribe \" , \" params \" : [ \" newHeads \" ]} " . to_string ( ) ) ) . await ) . is_ok ( ) {
if let Some ( Ok ( _first ) ) = read . next ( ) . await {
// TODO: what should we do with the first message?
while let Some ( Ok ( message ) ) = read . next ( ) . await {
if let Err ( e ) = handle_new_head_message ( message ) . await {
eprintln! ( " error handling new head message @ {} : {} " , ws_rpc , e ) ;
break ;
}
}
}
// no more messages or we got an error
}
}
Err ( e ) = > {
// TODO: proper logging
eprintln! ( " error connecting to websocket @ {} : {} " , ws_rpc , e ) ;
}
}
// TODO: log that we are going to reconnectto ws_rpc in 1 second
// TODO: how long should we wait? exponential backoff?
sleep ( Duration ::from_secs ( 1 ) ) . await ;
}
} ,
abort_registration ,
) ) ;
abort_handle
} )
. collect ( ) ;
2022-04-26 09:54:24 +03:00
* /
2022-04-25 22:14:10 +03:00
2022-04-26 09:54:24 +03:00
Ok ( RpcTier {
2022-04-24 10:26:00 +03:00
rpcs : RwLock ::new ( rpcs ) ,
2022-04-26 09:54:24 +03:00
connections : Arc ::new ( RwLock ::new ( connections ) ) ,
ratelimits : RwLock ::new ( ratelimits ) ,
} )
2022-04-24 10:26:00 +03:00
}
2022-03-05 07:51:38 +03:00
2022-04-25 22:14:10 +03:00
/// get the best available rpc server
2022-04-26 09:54:24 +03:00
async fn next_upstream_server ( & self ) -> Result < String , NotUntil < QuantaInstant > > {
2022-04-24 10:26:00 +03:00
let mut balanced_rpcs = self . rpcs . write ( ) . await ;
2022-03-05 07:51:38 +03:00
2022-04-26 09:54:24 +03:00
// sort rpcs by their active connections
let connections = self . connections . read ( ) . await ;
balanced_rpcs
. sort_unstable_by ( | a , b | connections . get ( a ) . unwrap ( ) . cmp ( connections . get ( b ) . unwrap ( ) ) ) ;
2022-03-05 06:46:57 +03:00
2022-04-24 21:56:46 +03:00
let mut earliest_not_until = None ;
for selected_rpc in balanced_rpcs . iter ( ) {
2022-04-26 09:54:24 +03:00
// TODO: check current block number. if behind, make our own NotUntil here
let ratelimits = self . ratelimits . write ( ) . await ;
2022-04-24 21:56:46 +03:00
// check rate limits
2022-04-26 09:54:24 +03:00
match ratelimits . get ( selected_rpc ) . unwrap ( ) . check ( ) {
2022-04-24 21:56:46 +03:00
Ok ( _ ) = > {
// rate limit succeeded
}
Err ( not_until ) = > {
// rate limit failed
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it
if earliest_not_until . is_none ( ) {
earliest_not_until = Some ( not_until ) ;
} else {
let earliest_possible =
earliest_not_until . as_ref ( ) . unwrap ( ) . earliest_possible ( ) ;
let new_earliest_possible = not_until . earliest_possible ( ) ;
if earliest_possible > new_earliest_possible {
earliest_not_until = Some ( not_until ) ;
}
}
continue ;
}
} ;
// increment our connection counter
2022-04-26 09:54:24 +03:00
self . connections
. write ( )
. await
. get_mut ( selected_rpc )
. unwrap ( )
2022-04-26 20:03:38 +03:00
. inc_active_requests ( ) ;
2022-03-05 06:46:57 +03:00
2022-04-24 21:56:46 +03:00
// return the selected RPC
return Ok ( selected_rpc . clone ( ) ) ;
2022-04-24 10:26:00 +03:00
}
2022-03-05 06:46:57 +03:00
2022-04-24 21:56:46 +03:00
// return the smallest not_until
if let Some ( not_until ) = earliest_not_until {
2022-04-25 22:14:10 +03:00
Err ( not_until )
2022-04-24 21:56:46 +03:00
} else {
unimplemented! ( ) ;
}
2022-04-24 10:26:00 +03:00
}
2022-03-05 06:46:57 +03:00
2022-04-25 22:14:10 +03:00
/// get all available rpc servers
2022-04-25 04:12:07 +03:00
async fn get_upstream_servers ( & self ) -> Result < Vec < String > , NotUntil < QuantaInstant > > {
let mut earliest_not_until = None ;
let mut selected_rpcs = vec! [ ] ;
2022-04-26 09:54:24 +03:00
for selected_rpc in self . rpcs . read ( ) . await . iter ( ) {
2022-04-25 04:12:07 +03:00
// check rate limits
2022-04-26 09:54:24 +03:00
match self
. ratelimits
. write ( )
. await
. get ( selected_rpc )
. unwrap ( )
. check ( )
{
2022-04-25 04:12:07 +03:00
Ok ( _ ) = > {
// rate limit succeeded
}
Err ( not_until ) = > {
// rate limit failed
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it
if earliest_not_until . is_none ( ) {
earliest_not_until = Some ( not_until ) ;
} else {
let earliest_possible =
earliest_not_until . as_ref ( ) . unwrap ( ) . earliest_possible ( ) ;
let new_earliest_possible = not_until . earliest_possible ( ) ;
if earliest_possible > new_earliest_possible {
earliest_not_until = Some ( not_until ) ;
}
}
continue ;
}
} ;
2022-04-26 09:54:24 +03:00
// increment our connection counter
self . connections
. write ( )
. await
. get_mut ( selected_rpc )
. unwrap ( )
2022-04-26 20:03:38 +03:00
. inc_active_requests ( ) ;
2022-04-26 09:54:24 +03:00
2022-04-25 22:14:10 +03:00
// this is rpc should work
2022-04-25 04:12:07 +03:00
selected_rpcs . push ( selected_rpc . clone ( ) ) ;
}
2022-04-25 22:14:10 +03:00
if ! selected_rpcs . is_empty ( ) {
2022-04-25 04:12:07 +03:00
return Ok ( selected_rpcs ) ;
}
// return the earliest not_until
if let Some ( not_until ) = earliest_not_until {
2022-04-25 22:14:10 +03:00
Err ( not_until )
2022-04-25 04:12:07 +03:00
} else {
2022-04-26 09:54:24 +03:00
// TODO: is this right?
Ok ( vec! [ ] )
2022-04-25 04:12:07 +03:00
}
2022-04-24 10:26:00 +03:00
}
}
2022-03-05 06:46:57 +03:00
2022-04-26 20:03:38 +03:00
/// Application state
2022-04-24 10:26:00 +03:00
struct Web3ProxyState {
2022-04-26 20:03:38 +03:00
/// clock used for rate limiting
/// TODO: use tokio's clock (will require a different ratelimiting crate)
2022-04-25 04:12:07 +03:00
clock : QuantaClock ,
2022-04-26 20:03:38 +03:00
/// Send requests to the best server available
2022-04-26 09:54:24 +03:00
balanced_rpc_tiers : Arc < Vec < RpcTier > > ,
2022-04-26 20:03:38 +03:00
/// Send private requests (like eth_sendRawTransaction) to all these servers
2022-04-26 09:54:24 +03:00
private_rpcs : Option < Arc < RpcTier > > ,
/// write lock on these when all rate limits are hit
2022-04-25 04:26:23 +03:00
balanced_rpc_ratelimiter_lock : RwLock < ( ) > ,
private_rpcs_ratelimiter_lock : RwLock < ( ) > ,
2022-04-24 10:26:00 +03:00
}
2022-03-05 06:46:57 +03:00
2022-04-24 10:26:00 +03:00
impl Web3ProxyState {
2022-04-26 09:54:24 +03:00
async fn try_new (
2022-04-25 01:36:51 +03:00
balanced_rpc_tiers : Vec < Vec < ( & str , u32 ) > > ,
private_rpcs : Vec < ( & str , u32 ) > ,
2022-04-26 09:54:24 +03:00
) -> anyhow ::Result < Web3ProxyState > {
2022-04-25 04:12:07 +03:00
let clock = QuantaClock ::default ( ) ;
2022-04-26 09:54:24 +03:00
let blocks = Arc ::new ( BlockMap ::default ( ) ) ;
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something
let http_client = reqwest ::ClientBuilder ::new ( )
. timeout ( Duration ::from_secs ( 300 ) )
. user_agent ( APP_USER_AGENT )
. build ( ) ? ;
// TODO: i'm sure we s
let balanced_rpc_tiers = Arc ::new (
future ::join_all ( balanced_rpc_tiers . into_iter ( ) . map ( | balanced_rpc_tier | {
RpcTier ::try_new (
balanced_rpc_tier ,
Some ( http_client . clone ( ) ) ,
blocks . clone ( ) ,
& clock ,
)
} ) )
. await
2022-04-25 22:14:10 +03:00
. into_iter ( )
2022-04-26 09:54:24 +03:00
. collect ::< anyhow ::Result < Vec < RpcTier > > > ( ) ? ,
) ;
2022-04-25 22:14:10 +03:00
2022-04-26 09:54:24 +03:00
let private_rpcs = if private_rpcs . is_empty ( ) {
None
} else {
Some ( Arc ::new (
RpcTier ::try_new ( private_rpcs , Some ( http_client ) , blocks . clone ( ) , & clock ) . await ? ,
) )
} ;
2022-04-25 22:14:10 +03:00
2022-04-24 10:26:00 +03:00
// TODO: warn if no private relays
2022-04-26 09:54:24 +03:00
Ok ( Web3ProxyState {
2022-04-25 22:14:10 +03:00
clock ,
balanced_rpc_tiers ,
private_rpcs ,
2022-04-25 04:26:23 +03:00
balanced_rpc_ratelimiter_lock : Default ::default ( ) ,
private_rpcs_ratelimiter_lock : Default ::default ( ) ,
2022-04-26 09:54:24 +03:00
} )
2022-04-24 10:26:00 +03:00
}
2022-03-05 06:46:57 +03:00
2022-04-24 10:26:00 +03:00
/// send the request to the approriate RPCs
2022-04-26 09:54:24 +03:00
/// TODO: dry this up
2022-04-24 10:26:00 +03:00
async fn proxy_web3_rpc (
self : Arc < Web3ProxyState > ,
json_body : serde_json ::Value ,
) -> anyhow ::Result < impl warp ::Reply > {
let eth_send_raw_transaction =
serde_json ::Value ::String ( " eth_sendRawTransaction " . to_string ( ) ) ;
2022-04-26 09:54:24 +03:00
if self . private_rpcs . is_some ( ) & & json_body . get ( " method " ) = = Some ( & eth_send_raw_transaction )
2022-04-24 10:26:00 +03:00
{
2022-04-26 09:54:24 +03:00
let private_rpcs = self . private_rpcs . clone ( ) . unwrap ( ) ;
2022-04-25 04:14:34 +03:00
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
2022-04-25 04:12:07 +03:00
loop {
2022-04-25 04:26:23 +03:00
let read_lock = self . private_rpcs_ratelimiter_lock . read ( ) . await ;
2022-04-26 09:54:24 +03:00
match private_rpcs . get_upstream_servers ( ) . await {
2022-04-25 04:12:07 +03:00
Ok ( upstream_servers ) = > {
2022-04-26 10:10:13 +03:00
let ( tx , mut rx ) =
mpsc ::unbounded_channel ::< anyhow ::Result < serde_json ::Value > > ( ) ;
2022-04-26 09:54:24 +03:00
let clone = self . clone ( ) ;
let connections = private_rpcs . connections . clone ( ) ;
let json_body = json_body . clone ( ) ;
tokio ::spawn ( async move {
clone
. try_send_requests ( upstream_servers , connections , json_body , tx )
. await
} ) ;
let response = rx
. recv ( )
2022-04-25 04:12:07 +03:00
. await
2022-04-26 10:10:13 +03:00
. ok_or_else ( | | anyhow ::anyhow! ( " no successful response " ) ) ? ;
2022-04-26 09:54:24 +03:00
2022-04-26 10:10:13 +03:00
if let Ok ( response ) = response {
return Ok ( warp ::reply ::json ( & response ) ) ;
}
2022-04-25 04:12:07 +03:00
}
Err ( not_until ) = > {
2022-04-25 22:14:10 +03:00
// TODO: move this to a helper function
2022-04-25 04:30:55 +03:00
// sleep (with a lock) until our rate limits should be available
2022-04-25 04:26:23 +03:00
drop ( read_lock ) ;
let write_lock = self . balanced_rpc_ratelimiter_lock . write ( ) . await ;
2022-04-25 04:12:07 +03:00
let deadline = not_until . wait_time_from ( self . clock . now ( ) ) ;
sleep ( deadline ) . await ;
2022-04-25 04:26:23 +03:00
drop ( write_lock ) ;
2022-04-25 04:12:07 +03:00
}
} ;
2022-04-24 10:26:00 +03:00
}
} else {
// this is not a private transaction (or no private relays are configured)
2022-04-26 10:10:13 +03:00
// try to send to each tier, stopping at the first success
2022-04-25 04:12:07 +03:00
loop {
2022-04-26 20:03:38 +03:00
// TODO: i'm not positive that this locking is correct
2022-04-25 04:26:23 +03:00
let read_lock = self . balanced_rpc_ratelimiter_lock . read ( ) . await ;
// there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again
2022-04-25 04:12:07 +03:00
let mut earliest_not_until = None ;
for balanced_rpcs in self . balanced_rpc_tiers . iter ( ) {
2022-04-26 09:54:24 +03:00
match balanced_rpcs . next_upstream_server ( ) . await {
2022-04-25 04:12:07 +03:00
Ok ( upstream_server ) = > {
2022-04-26 10:10:13 +03:00
let ( tx , mut rx ) =
mpsc ::unbounded_channel ::< anyhow ::Result < serde_json ::Value > > ( ) ;
2022-04-26 09:54:24 +03:00
let clone = self . clone ( ) ;
let connections = balanced_rpcs . connections . clone ( ) ;
let json_body = json_body . clone ( ) ;
tokio ::spawn ( async move {
clone
. try_send_requests (
vec! [ upstream_server ] ,
connections ,
json_body ,
tx ,
)
. await
} ) ;
let response = rx
. recv ( )
2022-04-25 04:12:07 +03:00
. await
2022-04-26 10:10:13 +03:00
. ok_or_else ( | | anyhow ::anyhow! ( " no successful response " ) ) ? ;
2022-04-26 09:54:24 +03:00
2022-04-26 10:10:13 +03:00
if let Ok ( response ) = response {
return Ok ( warp ::reply ::json ( & response ) ) ;
}
2022-04-25 04:12:07 +03:00
}
Err ( not_until ) = > {
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it
if earliest_not_until . is_none ( ) {
earliest_not_until = Some ( not_until ) ;
} else {
// TODO: do we need to unwrap this far? can we just compare the not_untils
let earliest_possible =
earliest_not_until . as_ref ( ) . unwrap ( ) . earliest_possible ( ) ;
let new_earliest_possible = not_until . earliest_possible ( ) ;
if earliest_possible > new_earliest_possible {
earliest_not_until = Some ( not_until ) ;
}
}
}
2022-04-24 10:26:00 +03:00
}
}
2022-04-25 04:12:07 +03:00
2022-04-25 04:30:55 +03:00
// we haven't returned an Ok, sleep and try again
2022-04-25 22:14:10 +03:00
// TODO: move this to a helper function
2022-04-25 04:26:23 +03:00
drop ( read_lock ) ;
let write_lock = self . balanced_rpc_ratelimiter_lock . write ( ) . await ;
2022-04-25 04:12:07 +03:00
// unwrap should be safe since we would have returned if it wasn't set
2022-04-26 10:10:13 +03:00
let deadline = if let Some ( earliest_not_until ) = earliest_not_until {
earliest_not_until . wait_time_from ( self . clock . now ( ) )
} else {
// TODO: exponential backoff?
Duration ::from_secs ( 1 )
} ;
2022-04-25 04:30:55 +03:00
2022-04-25 04:12:07 +03:00
sleep ( deadline ) . await ;
2022-04-25 04:26:23 +03:00
drop ( write_lock ) ;
2022-04-24 10:26:00 +03:00
}
}
}
2022-03-05 08:01:45 +03:00
2022-04-24 10:26:00 +03:00
async fn try_send_requests (
& self ,
2022-04-26 09:54:24 +03:00
rpc_servers : Vec < String > ,
connections : Arc < ConnectionsMap > ,
json_request_body : serde_json ::Value ,
2022-04-26 10:10:13 +03:00
tx : mpsc ::UnboundedSender < anyhow ::Result < serde_json ::Value > > ,
2022-04-26 09:54:24 +03:00
) -> anyhow ::Result < ( ) > {
// {"jsonrpc":"2.0","method":"eth_syncing","params":[],"id":1}
let incoming_id = json_request_body
. get ( " id " )
. ok_or_else ( | | anyhow ::anyhow! ( " bad id " ) ) ?
. to_owned ( ) ;
let method = json_request_body
. get ( " method " )
. and_then ( | x | x . as_str ( ) )
. ok_or_else ( | | anyhow ::anyhow! ( " bad id " ) ) ?
. to_string ( ) ;
let params = json_request_body
. get ( " params " )
. ok_or_else ( | | anyhow ::anyhow! ( " no params " ) ) ?
. to_owned ( ) ;
2022-04-24 10:26:00 +03:00
// send the query to all the servers
2022-04-26 09:54:24 +03:00
let bodies = future ::join_all ( rpc_servers . into_iter ( ) . map ( | rpc | {
let incoming_id = incoming_id . clone ( ) ;
let connections = connections . clone ( ) ;
let method = method . clone ( ) ;
let params = params . clone ( ) ;
let tx = tx . clone ( ) ;
2022-04-24 21:56:46 +03:00
2022-04-26 09:54:24 +03:00
async move {
// get the client for this rpc server
let provider = connections . read ( ) . await . get ( & rpc ) . unwrap ( ) . provider . clone ( ) ;
2022-04-24 21:56:46 +03:00
2022-04-26 20:03:38 +03:00
let response = provider . request ( & method , params ) . await ;
connections
. write ( )
. await
. get_mut ( & rpc )
. unwrap ( )
. dec_active_requests ( ) ;
2022-04-24 22:55:13 +03:00
2022-04-26 20:03:38 +03:00
let mut response = response ? ;
2022-04-24 22:55:13 +03:00
2022-04-26 09:54:24 +03:00
if let Some ( response_id ) = response . get_mut ( " id " ) {
* response_id = incoming_id ;
2022-04-24 21:56:46 +03:00
}
2022-04-26 09:54:24 +03:00
// send the first good response to a one shot channel. that way we respond quickly
// drop the result because errors are expected after the first send
// TODO: if "no block with that header" or some other jsonrpc errors, skip this response
2022-04-26 10:10:13 +03:00
let _ = tx . send ( Ok ( response ) ) ;
2022-04-26 09:54:24 +03:00
Ok ::< ( ) , anyhow ::Error > ( ( ) )
}
} ) )
. await ;
// TODO: use iterators instead of pushing into a vec
let mut errs = vec! [ ] ;
for x in bodies {
match x {
Ok ( _ ) = > { }
2022-04-24 21:56:46 +03:00
Err ( e ) = > {
// TODO: better errors
eprintln! ( " Got a tokio::JoinError: {} " , e ) ;
errs . push ( anyhow ::anyhow! ( " Got a tokio::JoinError " ) ) ;
}
2022-04-24 10:26:00 +03:00
}
}
2022-04-26 10:10:13 +03:00
let e : anyhow ::Result < serde_json ::Value > = if ! errs . is_empty ( ) {
2022-04-25 22:14:10 +03:00
Err ( errs . pop ( ) . unwrap ( ) )
2022-04-24 21:56:46 +03:00
} else {
2022-04-26 10:10:13 +03:00
Err ( anyhow ::anyhow! ( " no successful responses " ) )
} ;
// TODO: think about this more. we want to send it
if tx . send ( e ) . is_ok ( ) {
// if we were able to send an error, then we never sent a success
2022-04-24 21:56:46 +03:00
return Err ( anyhow ::anyhow! ( " no successful responses " ) ) ;
2022-04-26 10:10:13 +03:00
} else {
// sending the error failed. the other side must be closed (which means we sent a success)
Ok ( ( ) )
2022-04-24 10:26:00 +03:00
}
}
2022-03-05 06:46:57 +03:00
}
2022-04-24 10:26:00 +03:00
#[ tokio::main ]
async fn main ( ) {
2022-04-26 10:16:16 +03:00
// install global collector configured based on RUST_LOG env var.
tracing_subscriber ::fmt ::init ( ) ;
info! ( " starting " ) ;
2022-04-24 10:26:00 +03:00
// TODO: load the config from yaml instead of hard coding
// TODO: support multiple chains in one process. then we could just point "chain.stytt.com" at this and caddy wouldn't need anything else
// TODO: i kind of want to make use of caddy's load balancing and health checking and such though
let listen_port = 8445 ;
2022-04-25 04:12:07 +03:00
2022-04-24 10:26:00 +03:00
// TODO: be smart about about using archive nodes?
2022-04-26 09:54:24 +03:00
let state = Web3ProxyState ::try_new (
2022-04-24 10:26:00 +03:00
vec! [
// local nodes
2022-04-25 23:26:54 +03:00
vec! [ ( " ws://10.11.12.16:8545 " , 0 ) , ( " ws://10.11.12.16:8946 " , 0 ) ] ,
2022-04-24 10:26:00 +03:00
// paid nodes
2022-04-24 21:56:46 +03:00
// TODO: add paid nodes (with rate limits)
2022-04-24 10:26:00 +03:00
// free nodes
2022-04-24 21:56:46 +03:00
// TODO: add rate limits
vec! [
( " https://main-rpc.linkpool.io " , 0 ) ,
( " https://rpc.ankr.com/eth " , 0 ) ,
] ,
] ,
vec! [
2022-04-25 01:36:51 +03:00
( " https://api.edennetwork.io/v1/beta " , 0 ) ,
( " https://api.edennetwork.io/v1/ " , 0 ) ,
2022-04-24 10:26:00 +03:00
] ,
2022-04-26 09:54:24 +03:00
)
. await
. unwrap ( ) ;
2022-04-24 10:26:00 +03:00
let state : Arc < Web3ProxyState > = Arc ::new ( state ) ;
let proxy_rpc_filter = warp ::any ( )
. and ( warp ::post ( ) )
. and ( warp ::body ::json ( ) )
. then ( move | json_body | state . clone ( ) . proxy_web3_rpc ( json_body ) )
. map ( handle_anyhow_errors ) ;
println! ( " Listening on 0.0.0.0: {} " , listen_port ) ;
warp ::serve ( proxy_rpc_filter )
. run ( ( [ 0 , 0 , 0 , 0 ] , listen_port ) )
. await ;
2022-03-05 06:46:57 +03:00
}
2022-03-25 00:08:40 +03:00
/// convert result into an http response. use this at the end of your warp filter
2022-03-05 06:46:57 +03:00
pub fn handle_anyhow_errors < T : warp ::Reply > ( res : anyhow ::Result < T > ) -> Box < dyn warp ::Reply > {
match res {
Ok ( r ) = > Box ::new ( r . into_response ( ) ) ,
// TODO: json error?
Err ( e ) = > Box ::new ( warp ::reply ::with_status (
format! ( " {} " , e ) ,
reqwest ::StatusCode ::INTERNAL_SERVER_ERROR ,
) ) ,
}
}