more dashmap

This commit is contained in:
Bryan Stitt 2022-04-27 06:03:00 +00:00
parent 2feb5d213c
commit 8d68900fd4
2 changed files with 38 additions and 52 deletions

View File

@ -269,16 +269,11 @@ impl Web3ProxyApp {
async move { async move {
// get the client for this rpc server // get the client for this rpc server
let provider = connections.read().await.get(&rpc).unwrap().clone_provider(); let provider = connections.get(&rpc).unwrap().clone_provider();
let response = provider.request(&method, params).await; let response = provider.request(&method, params).await;
connections connections.get_mut(&rpc).unwrap().dec_active_requests();
.write()
.await
.get_mut(&rpc)
.unwrap()
.dec_active_requests();
let mut response = response?; let mut response = response?;

View File

@ -1,10 +1,10 @@
///! Communicate with groups of web3 providers /// Communicate with groups of web3 providers
use dashmap::DashMap;
use governor::clock::{QuantaClock, QuantaInstant}; use governor::clock::{QuantaClock, QuantaInstant};
use governor::middleware::NoOpMiddleware; use governor::middleware::NoOpMiddleware;
use governor::state::{InMemoryState, NotKeyed}; use governor::state::{InMemoryState, NotKeyed};
use governor::NotUntil; use governor::NotUntil;
use governor::RateLimiter; use governor::RateLimiter;
use std::collections::HashMap;
use std::num::NonZeroU32; use std::num::NonZeroU32;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -15,9 +15,9 @@ use crate::provider::Web3Connection;
type Web3RateLimiter = type Web3RateLimiter =
RateLimiter<NotKeyed, InMemoryState, QuantaClock, NoOpMiddleware<QuantaInstant>>; RateLimiter<NotKeyed, InMemoryState, QuantaClock, NoOpMiddleware<QuantaInstant>>;
type Web3RateLimiterMap = RwLock<HashMap<String, Web3RateLimiter>>; type Web3RateLimiterMap = DashMap<String, Web3RateLimiter>;
pub type Web3ConnectionMap = RwLock<HashMap<String, Web3Connection>>; pub type Web3ConnectionMap = DashMap<String, Web3Connection>;
/// Load balance to the rpc /// Load balance to the rpc
/// TODO: i'm not sure about having 3 locks here. can we share them? /// TODO: i'm not sure about having 3 locks here. can we share them?
@ -26,7 +26,7 @@ pub struct Web3ProviderTier {
/// TODO: what type for the rpc? /// TODO: what type for the rpc?
rpcs: RwLock<Vec<String>>, rpcs: RwLock<Vec<String>>,
connections: Arc<Web3ConnectionMap>, connections: Arc<Web3ConnectionMap>,
ratelimits: Web3RateLimiterMap, ratelimiters: Web3RateLimiterMap,
} }
impl Web3ProviderTier { impl Web3ProviderTier {
@ -37,8 +37,8 @@ impl Web3ProviderTier {
clock: &QuantaClock, clock: &QuantaClock,
) -> anyhow::Result<Web3ProviderTier> { ) -> anyhow::Result<Web3ProviderTier> {
let mut rpcs: Vec<String> = vec![]; let mut rpcs: Vec<String> = vec![];
let mut connections = HashMap::new(); let connections = DashMap::new();
let mut ratelimits = HashMap::new(); let ratelimits = DashMap::new();
for (s, limit) in servers.into_iter() { for (s, limit) in servers.into_iter() {
rpcs.push(s.to_string()); rpcs.push(s.to_string());
@ -63,8 +63,8 @@ impl Web3ProviderTier {
Ok(Web3ProviderTier { Ok(Web3ProviderTier {
rpcs: RwLock::new(rpcs), rpcs: RwLock::new(rpcs),
connections: Arc::new(RwLock::new(connections)), connections: Arc::new(connections),
ratelimits: RwLock::new(ratelimits), ratelimiters: ratelimits,
}) })
} }
@ -80,10 +80,12 @@ impl Web3ProviderTier {
let mut balanced_rpcs = self.rpcs.write().await; let mut balanced_rpcs = self.rpcs.write().await;
// sort rpcs by their active connections // sort rpcs by their active connections
let connections = self.connections.read().await; balanced_rpcs.sort_unstable_by(|a, b| {
self.connections
balanced_rpcs .get(a)
.sort_unstable_by(|a, b| connections.get(a).unwrap().cmp(connections.get(b).unwrap())); .unwrap()
.cmp(&self.connections.get(b).unwrap())
});
let mut earliest_not_until = None; let mut earliest_not_until = None;
@ -98,35 +100,33 @@ impl Web3ProviderTier {
continue; continue;
} }
let ratelimits = self.ratelimits.write().await;
// check rate limits // check rate limits
match ratelimits.get(selected_rpc).unwrap().check() { if let Some(ratelimiter) = self.ratelimiters.get(selected_rpc) {
Ok(_) => { match ratelimiter.check() {
// rate limit succeeded Ok(_) => {
} // rate limit succeeded
Err(not_until) => { }
// rate limit failed Err(not_until) => {
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it // rate limit failed
if earliest_not_until.is_none() { // save the smallest not_until. if nothing succeeds, return an Err with not_until in it
earliest_not_until = Some(not_until); if earliest_not_until.is_none() {
} else { earliest_not_until = Some(not_until);
let earliest_possible = } else {
earliest_not_until.as_ref().unwrap().earliest_possible(); let earliest_possible =
let new_earliest_possible = not_until.earliest_possible(); earliest_not_until.as_ref().unwrap().earliest_possible();
let new_earliest_possible = not_until.earliest_possible();
if earliest_possible > new_earliest_possible {
earliest_not_until = Some(not_until); if earliest_possible > new_earliest_possible {
} earliest_not_until = Some(not_until);
}
}
continue;
} }
continue;
} }
}; };
// increment our connection counter // increment our connection counter
self.connections self.connections
.write()
.await
.get_mut(selected_rpc) .get_mut(selected_rpc)
.unwrap() .unwrap()
.inc_active_requests(); .inc_active_requests();
@ -164,14 +164,7 @@ impl Web3ProviderTier {
} }
// check rate limits // check rate limits
match self match self.ratelimiters.get(selected_rpc).unwrap().check() {
.ratelimits
.write()
.await
.get(selected_rpc)
.unwrap()
.check()
{
Ok(_) => { Ok(_) => {
// rate limit succeeded // rate limit succeeded
} }
@ -195,8 +188,6 @@ impl Web3ProviderTier {
// increment our connection counter // increment our connection counter
self.connections self.connections
.write()
.await
.get_mut(selected_rpc) .get_mut(selected_rpc)
.unwrap() .unwrap()
.inc_active_requests(); .inc_active_requests();