From 1669c15a3274f993925307216bdd0569e37b67f7 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 29 Apr 2022 02:22:54 +0000 Subject: [PATCH] minor improvement --- src/main.rs | 104 +++++++++++++++++++++++++++++----------------------- 1 file changed, 59 insertions(+), 45 deletions(-) diff --git a/src/main.rs b/src/main.rs index 1d85b78f..d09d0228 100644 --- a/src/main.rs +++ b/src/main.rs @@ -373,66 +373,80 @@ impl Web3ProxyApp { .ok_or_else(|| anyhow::anyhow!("no params"))? .to_owned(); - // TODO: lets just use a usize index or something - let method = Arc::new(method); + if rpc_servers.len() == 1 { + let rpc = rpc_servers.first().unwrap(); - let mut unordered_futures = FuturesUnordered::new(); + let provider = connections.get(rpc).unwrap().clone_provider(); - for rpc in rpc_servers { - let connections = connections.clone(); - let method = method.clone(); - let params = params.clone(); - let tx = tx.clone(); + let response = provider.request(&method, params).await; - let handle = tokio::spawn(async move { - // get the client for this rpc server - let provider = connections.get(&rpc).unwrap().clone_provider(); + connections.get(rpc).unwrap().dec_active_requests(); - let response = provider.request(&method, params).await; + tx.send(response.map_err(Into::into))?; - connections.get(&rpc).unwrap().dec_active_requests(); + Ok(()) + } else { + // TODO: lets just use a usize index or something + let method = Arc::new(method); - let response = response?; + let mut unordered_futures = FuturesUnordered::new(); - // TODO: if "no block with that header" or some other jsonrpc errors, skip this response + for rpc in rpc_servers { + let connections = connections.clone(); + let method = method.clone(); + let params = params.clone(); + let tx = tx.clone(); - // send the first good response to a one shot channel. that way we respond quickly - // drop the result because errors are expected after the first send - let _ = tx.send(Ok(response)); + let handle = tokio::spawn(async move { + // get the client for this rpc server + let provider = connections.get(&rpc).unwrap().clone_provider(); - Ok::<(), anyhow::Error>(()) - }); + let response = provider.request(&method, params).await; - unordered_futures.push(handle); - } + connections.get(&rpc).unwrap().dec_active_requests(); - // TODO: use iterators instead of pushing into a vec - let mut errs = vec![]; - if let Some(x) = unordered_futures.next().await { - match x.unwrap() { - Ok(_) => {} - Err(e) => { - // TODO: better errors - warn!("Got an error sending request: {}", e); - errs.push(e); + let response = response?; + + // TODO: if "no block with that header" or some other jsonrpc errors, skip this response + + // send the first good response to a one shot channel. that way we respond quickly + // drop the result because errors are expected after the first send + let _ = tx.send(Ok(response)); + + Ok::<(), anyhow::Error>(()) + }); + + unordered_futures.push(handle); + } + + // TODO: use iterators instead of pushing into a vec + let mut errs = vec![]; + if let Some(x) = unordered_futures.next().await { + match x.unwrap() { + Ok(_) => {} + Err(e) => { + // TODO: better errors + warn!("Got an error sending request: {}", e); + errs.push(e); + } } } - } - // get the first error (if any) - let e: anyhow::Result = if !errs.is_empty() { - Err(errs.pop().unwrap()) - } else { - Err(anyhow::anyhow!("no successful responses")) - }; + // get the first error (if any) + let e: anyhow::Result = if !errs.is_empty() { + Err(errs.pop().unwrap()) + } else { + Err(anyhow::anyhow!("no successful responses")) + }; - // send the error to the channel - if tx.send(e).is_ok() { - // if we were able to send an error, then we never sent a success - return Err(anyhow::anyhow!("no successful responses")); - } else { - // if sending the error failed. the other side must be closed (which means we sent a success earlier) - Ok(()) + // send the error to the channel + if tx.send(e).is_ok() { + // if we were able to send an error, then we never sent a success + return Err(anyhow::anyhow!("no successful responses")); + } else { + // if sending the error failed. the other side must be closed (which means we sent a success earlier) + Ok(()) + } } } }