From 49ca541f2ceb47d3636371b4bb75bc1b4c24a34b Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sun, 24 Apr 2022 22:36:51 +0000 Subject: [PATCH] start adding rate limits for loud rpcs --- src/main.rs | 40 ++++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/src/main.rs b/src/main.rs index d6802ad0..6e4afd6a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,13 +13,14 @@ use warp::Filter; type RpcRateLimiter = RateLimiter>; +type RateLimiterMap = DashMap; type ConnectionsMap = DashMap; /// Load balance to the least-connection rpc struct BalancedRpcs { rpcs: RwLock>, connections: ConnectionsMap, - ratelimits: DashMap, + ratelimits: RateLimiterMap, } // TODO: also pass rate limits to this? @@ -29,9 +30,6 @@ impl Into for Vec<(&str, u32)> { let connections = DashMap::new(); let ratelimits = DashMap::new(); - // TODO: where should we get the rate limits from? - // TODO: this is not going to work. we need different rate limits for different endpoints - for (s, limit) in self.into_iter() { rpcs.push(s.to_string()); connections.insert(s.to_string(), 0); @@ -108,31 +106,38 @@ impl BalancedRpcs { } /// Send to all the Rpcs +/// Unlike BalancedRpcs, there is no tracking of connections +/// We do still track rate limits struct LoudRpcs { rpcs: Vec, // TODO: what type? store with connections? - // ratelimits: DashMap, + ratelimits: RateLimiterMap, } -impl Into for Vec<&str> { +impl Into for Vec<(&str, u32)> { fn into(self) -> LoudRpcs { let mut rpcs: Vec = vec![]; - // let ratelimits = DashMap::new(); + let ratelimits = RateLimiterMap::new(); - for s in self.into_iter() { + for (s, limit) in self.into_iter() { rpcs.push(s.to_string()); - // ratelimits.insert(s.to_string(), 0); + + if limit > 0 { + let quota = governor::Quota::per_second(NonZeroU32::new(limit).unwrap()); + + let rate_limiter = governor::RateLimiter::direct(quota); + + ratelimits.insert(s.to_string(), rate_limiter); + } } - LoudRpcs { - rpcs, - // ratelimits, - } + LoudRpcs { rpcs, ratelimits } } } impl LoudRpcs { async fn get_upstream_servers(&self) -> Vec { + // self.rpcs.clone() } @@ -148,7 +153,10 @@ struct Web3ProxyState { } impl Web3ProxyState { - fn new(balanced_rpc_tiers: Vec>, private_rpcs: Vec<&str>) -> Web3ProxyState { + fn new( + balanced_rpc_tiers: Vec>, + private_rpcs: Vec<(&str, u32)>, + ) -> Web3ProxyState { // TODO: warn if no private relays Web3ProxyState { client: reqwest::Client::new(), @@ -294,8 +302,8 @@ async fn main() { ], ], vec![ - "https://api.edennetwork.io/v1/beta", - "https://api.edennetwork.io/v1/", + ("https://api.edennetwork.io/v1/beta", 0), + ("https://api.edennetwork.io/v1/", 0), ], );