diff --git a/src/main.rs b/src/main.rs index aa9b903f..bce8df08 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,5 @@ use dashmap::DashMap; -use futures::stream; -use futures::StreamExt; +use futures::future; use governor::clock::{QuantaClock, QuantaInstant}; use governor::middleware::NoOpMiddleware; use governor::state::{InMemoryState, NotKeyed}; @@ -11,9 +10,6 @@ use tokio::sync::RwLock; // use tokio::time::{sleep, Duration}; use warp::Filter; -// TODO: what should this be? -const PARALLEL_REQUESTS: usize = 4; - type RpcRateLimiter = RateLimiter>; @@ -211,33 +207,34 @@ impl Web3ProxyState { json_body: &serde_json::Value, ) -> anyhow::Result { // send the query to all the servers - let mut bodies = stream::iter(upstream_servers) - .map(|url| { - let client = self.client.clone(); - let json_body = json_body.clone(); - tokio::spawn(async move { - let resp = client - .post(&url) - .json(&json_body) - .send() - .await - .map_err(|e| (url.clone(), e))?; - resp.text() - .await - .map(|t| (url.clone(), t)) - .map_err(|e| (url, e)) - }) + let bodies = future::join_all(upstream_servers.into_iter().map(|url| { + let client = self.client.clone(); + let json_body = json_body.clone(); + tokio::spawn(async move { + // TODO: there has to be a better way to do this map. i think maybe put the map outside this spawn? + let resp = client + .post(&url) + .json(&json_body) + .send() + .await + .map_err(|e| (url.clone(), e))?; + resp.text() + .await + .map(|t| (url.clone(), t)) + .map_err(|e| (url, e)) }) - .buffer_unordered(PARALLEL_REQUESTS); + })) + .await; + // we are going to collect successes and failures let mut oks = vec![]; let mut errs = vec![]; - while let Some(b) = bodies.next().await { - // TODO: reduce connection counter - + // TODO: parallel? + for b in bodies { match b { Ok(Ok((url, b))) => { + // reduce connection counter if let Some(connections) = connections { *connections.get_mut(&url).unwrap() -= 1; } @@ -246,6 +243,7 @@ impl Web3ProxyState { oks.push(b); } Ok(Err((url, e))) => { + // reduce connection counter if let Some(connections) = connections { *connections.get_mut(&url).unwrap() -= 1; }