better loops. (i think)

This commit is contained in:
Bryan Stitt 2022-04-24 21:54:29 +00:00
parent 7f14c32828
commit 6f1d367776

View File

@ -1,6 +1,5 @@
use dashmap::DashMap; use dashmap::DashMap;
use futures::stream; use futures::future;
use futures::StreamExt;
use governor::clock::{QuantaClock, QuantaInstant}; use governor::clock::{QuantaClock, QuantaInstant};
use governor::middleware::NoOpMiddleware; use governor::middleware::NoOpMiddleware;
use governor::state::{InMemoryState, NotKeyed}; use governor::state::{InMemoryState, NotKeyed};
@ -11,9 +10,6 @@ use tokio::sync::RwLock;
// use tokio::time::{sleep, Duration}; // use tokio::time::{sleep, Duration};
use warp::Filter; use warp::Filter;
// TODO: what should this be?
const PARALLEL_REQUESTS: usize = 4;
type RpcRateLimiter = type RpcRateLimiter =
RateLimiter<NotKeyed, InMemoryState, QuantaClock, NoOpMiddleware<QuantaInstant>>; RateLimiter<NotKeyed, InMemoryState, QuantaClock, NoOpMiddleware<QuantaInstant>>;
@ -211,33 +207,34 @@ impl Web3ProxyState {
json_body: &serde_json::Value, json_body: &serde_json::Value,
) -> anyhow::Result<String> { ) -> anyhow::Result<String> {
// send the query to all the servers // send the query to all the servers
let mut bodies = stream::iter(upstream_servers) let bodies = future::join_all(upstream_servers.into_iter().map(|url| {
.map(|url| { let client = self.client.clone();
let client = self.client.clone(); let json_body = json_body.clone();
let json_body = json_body.clone(); tokio::spawn(async move {
tokio::spawn(async move { // TODO: there has to be a better way to do this map. i think maybe put the map outside this spawn?
let resp = client let resp = client
.post(&url) .post(&url)
.json(&json_body) .json(&json_body)
.send() .send()
.await .await
.map_err(|e| (url.clone(), e))?; .map_err(|e| (url.clone(), e))?;
resp.text() resp.text()
.await .await
.map(|t| (url.clone(), t)) .map(|t| (url.clone(), t))
.map_err(|e| (url, e)) .map_err(|e| (url, e))
})
}) })
.buffer_unordered(PARALLEL_REQUESTS); }))
.await;
// we are going to collect successes and failures
let mut oks = vec![]; let mut oks = vec![];
let mut errs = vec![]; let mut errs = vec![];
while let Some(b) = bodies.next().await { // TODO: parallel?
// TODO: reduce connection counter for b in bodies {
match b { match b {
Ok(Ok((url, b))) => { Ok(Ok((url, b))) => {
// reduce connection counter
if let Some(connections) = connections { if let Some(connections) = connections {
*connections.get_mut(&url).unwrap() -= 1; *connections.get_mut(&url).unwrap() -= 1;
} }
@ -246,6 +243,7 @@ impl Web3ProxyState {
oks.push(b); oks.push(b);
} }
Ok(Err((url, e))) => { Ok(Err((url, e))) => {
// reduce connection counter
if let Some(connections) = connections { if let Some(connections) = connections {
*connections.get_mut(&url).unwrap() -= 1; *connections.get_mut(&url).unwrap() -= 1;
} }