minor improvement

This commit is contained in:
Bryan Stitt 2022-04-29 02:22:54 +00:00
parent 7510db4989
commit 1669c15a32

@ -373,66 +373,80 @@ impl Web3ProxyApp {
.ok_or_else(|| anyhow::anyhow!("no params"))?
.to_owned();
// TODO: lets just use a usize index or something
let method = Arc::new(method);
if rpc_servers.len() == 1 {
let rpc = rpc_servers.first().unwrap();
let mut unordered_futures = FuturesUnordered::new();
let provider = connections.get(rpc).unwrap().clone_provider();
for rpc in rpc_servers {
let connections = connections.clone();
let method = method.clone();
let params = params.clone();
let tx = tx.clone();
let response = provider.request(&method, params).await;
let handle = tokio::spawn(async move {
// get the client for this rpc server
let provider = connections.get(&rpc).unwrap().clone_provider();
connections.get(rpc).unwrap().dec_active_requests();
let response = provider.request(&method, params).await;
tx.send(response.map_err(Into::into))?;
connections.get(&rpc).unwrap().dec_active_requests();
Ok(())
} else {
// TODO: lets just use a usize index or something
let method = Arc::new(method);
let response = response?;
let mut unordered_futures = FuturesUnordered::new();
// TODO: if "no block with that header" or some other jsonrpc errors, skip this response
for rpc in rpc_servers {
let connections = connections.clone();
let method = method.clone();
let params = params.clone();
let tx = tx.clone();
// send the first good response to a one shot channel. that way we respond quickly
// drop the result because errors are expected after the first send
let _ = tx.send(Ok(response));
let handle = tokio::spawn(async move {
// get the client for this rpc server
let provider = connections.get(&rpc).unwrap().clone_provider();
Ok::<(), anyhow::Error>(())
});
let response = provider.request(&method, params).await;
unordered_futures.push(handle);
}
connections.get(&rpc).unwrap().dec_active_requests();
// TODO: use iterators instead of pushing into a vec
let mut errs = vec![];
if let Some(x) = unordered_futures.next().await {
match x.unwrap() {
Ok(_) => {}
Err(e) => {
// TODO: better errors
warn!("Got an error sending request: {}", e);
errs.push(e);
let response = response?;
// TODO: if "no block with that header" or some other jsonrpc errors, skip this response
// send the first good response to a one shot channel. that way we respond quickly
// drop the result because errors are expected after the first send
let _ = tx.send(Ok(response));
Ok::<(), anyhow::Error>(())
});
unordered_futures.push(handle);
}
// TODO: use iterators instead of pushing into a vec
let mut errs = vec![];
if let Some(x) = unordered_futures.next().await {
match x.unwrap() {
Ok(_) => {}
Err(e) => {
// TODO: better errors
warn!("Got an error sending request: {}", e);
errs.push(e);
}
}
}
}
// get the first error (if any)
let e: anyhow::Result<serde_json::Value> = if !errs.is_empty() {
Err(errs.pop().unwrap())
} else {
Err(anyhow::anyhow!("no successful responses"))
};
// get the first error (if any)
let e: anyhow::Result<serde_json::Value> = if !errs.is_empty() {
Err(errs.pop().unwrap())
} else {
Err(anyhow::anyhow!("no successful responses"))
};
// send the error to the channel
if tx.send(e).is_ok() {
// if we were able to send an error, then we never sent a success
return Err(anyhow::anyhow!("no successful responses"));
} else {
// if sending the error failed. the other side must be closed (which means we sent a success earlier)
Ok(())
// send the error to the channel
if tx.send(e).is_ok() {
// if we were able to send an error, then we never sent a success
return Err(anyhow::anyhow!("no successful responses"));
} else {
// if sending the error failed. the other side must be closed (which means we sent a success earlier)
Ok(())
}
}
}
}