From 4d3b851b2c140ba5aa7a39b28e4ae10000912130 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 26 Apr 2022 17:03:38 +0000 Subject: [PATCH] decrement even if error --- src/main.rs | 52 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/src/main.rs b/src/main.rs index 4bdbad58..f2f0fd2f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,13 +26,22 @@ static APP_USER_AGENT: &str = concat!( env!("CARGO_PKG_VERSION"), ); -// TODO: i tried to use Box, but hit https://github.com/gakonst/ethers-rs/issues/592 +type RpcRateLimiter = + RateLimiter>; + +type BlockMap = RwLock>>; +type RateLimiterMap = RwLock>; +// TODO: include the ethers client on this map +type ConnectionsMap = RwLock>; + +// TODO: instead of an enum, I tried to use Box, but hit https://github.com/gakonst/ethers-rs/issues/592 #[derive(From)] enum EthersProvider { Http(ethers::providers::Provider), Ws(ethers::providers::Provider), } +/// Forward functions to the inner ethers::providers::Provider impl EthersProvider { /// Send a web3 request pub async fn request( @@ -91,6 +100,7 @@ impl EthersProvider { } } +/// An active connection to a Web3Rpc struct EthersConnection { /// keep track of currently open requests. We sort on this active_requests: u32, @@ -98,6 +108,7 @@ struct EthersConnection { } impl EthersConnection { + /// Connect to a web3 rpc and subscribe to new heads async fn try_new( url_str: String, http_client: Option, @@ -141,11 +152,11 @@ impl EthersConnection { }) } - fn inc(&mut self) { + fn inc_active_requests(&mut self) { self.active_requests += 1; } - fn dec(&mut self) { + fn dec_active_requests(&mut self) { self.active_requests -= 1; } } @@ -164,22 +175,17 @@ impl PartialOrd for EthersConnection { } } +/// note that this is just comparing the active requests. two providers with different rpc urls are equal! impl PartialEq for EthersConnection { fn eq(&self, other: &Self) -> bool { self.active_requests == other.active_requests } } -type BlockMap = RwLock>>; -type RateLimiterMap = RwLock>; -// TODO: include the ethers client on this map -type ConnectionsMap = RwLock>; - -type RpcRateLimiter = - RateLimiter>; - -/// Load balance to the least-connection rpc +/// Load balance to the rpc struct RpcTier { + /// RPC urls sorted by + /// TODO: what type? rpcs: RwLock>, connections: Arc, ratelimits: RateLimiterMap, @@ -325,7 +331,7 @@ impl RpcTier { .await .get_mut(selected_rpc) .unwrap() - .inc(); + .inc_active_requests(); // return the selected RPC return Ok(selected_rpc.clone()); @@ -382,7 +388,7 @@ impl RpcTier { .await .get_mut(selected_rpc) .unwrap() - .inc(); + .inc_active_requests(); // this is rpc should work selected_rpcs.push(selected_rpc.clone()); @@ -402,9 +408,14 @@ impl RpcTier { } } +/// Application state struct Web3ProxyState { + /// clock used for rate limiting + /// TODO: use tokio's clock (will require a different ratelimiting crate) clock: QuantaClock, + /// Send requests to the best server available balanced_rpc_tiers: Arc>, + /// Send private requests (like eth_sendRawTransaction) to all these servers private_rpcs: Option>, /// write lock on these when all rate limits are hit balanced_rpc_ratelimiter_lock: RwLock<()>, @@ -518,6 +529,7 @@ impl Web3ProxyState { // this is not a private transaction (or no private relays are configured) // try to send to each tier, stopping at the first success loop { + // TODO: i'm not positive that this locking is correct let read_lock = self.balanced_rpc_ratelimiter_lock.read().await; // there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again @@ -625,10 +637,16 @@ impl Web3ProxyState { // get the client for this rpc server let provider = connections.read().await.get(&rpc).unwrap().provider.clone(); - // TODO: there has to be a better way to attach the url to the result - let mut response = provider.request(&method, params).await?; + let response = provider.request(&method, params).await; - connections.write().await.get_mut(&rpc).unwrap().dec(); + connections + .write() + .await + .get_mut(&rpc) + .unwrap() + .dec_active_requests(); + + let mut response = response?; if let Some(response_id) = response.get_mut("id") { *response_id = incoming_id;