flatten transaction sending (untested)

This commit is contained in:
Bryan Stitt 2022-05-28 04:26:24 +00:00
parent 4335d2120e
commit 588a6c7924
5 changed files with 101 additions and 108 deletions

@ -15,6 +15,7 @@ pub struct RedisCellClient {
impl RedisCellClient {
// todo: seems like this could be derived
// TODO: take something generic for conn
pub fn new(
conn: MultiplexedConnection,
key: String,

@ -1,5 +1,7 @@
mod client;
pub use client::RedisCellClient;
pub use redis;
// TODO: don't hard code MultiplexedConnection
pub use redis::aio::MultiplexedConnection;
pub use redis::Client;

@ -1,3 +1,4 @@
# A redis server with the libredis_cell module installed
FROM rust:1-bullseye as builder
WORKDIR /usr/src/redis-cell

@ -242,65 +242,14 @@ impl Web3ProxyApp {
for _i in 0..10usize {
// // TODO: add more to this span
// let span = info_span!("i", ?i);
// let _enter = span.enter();
// let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon)
if request.method == "eth_sendRawTransaction" {
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
// TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit
match self.private_rpcs.get_upstream_servers().await {
Ok(active_request_handles) => {
// TODO: refactor this to block until it has responses from all and also to return the most common success response
// TODO: i don't think we need to spawn it if we do that.
let (tx, rx) = flume::bounded(1);
let connections = self.private_rpcs.clone();
let method = request.method.clone();
let params = request.params.clone();
// TODO: benchmark this compared to waiting on unbounded futures
// TODO: do something with this handle?
// TODO:
task::Builder::default()
.name("try_send_parallel_requests")
.spawn(async move {
connections
.try_send_parallel_requests(
active_request_handles,
method,
params,
tx,
)
.await
});
// wait for the first response
// TODO: we don't want the first response. we want the quorum response
let backend_response = rx.recv_async().await?;
if let Ok(backend_response) = backend_response {
// TODO: i think we
let response = JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: Some(backend_response),
error: None,
};
return Ok(response);
}
}
Err(None) => {
// TODO: return a 502?
// TODO: i don't think this will ever happen
return Err(anyhow::anyhow!("no private rpcs!"));
}
Err(Some(retry_after)) => {
// TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
sleep(retry_after).await;
warn!("All rate limits exceeded. Sleeping");
}
};
return self
.private_rpcs
.try_send_all_upstream_servers(request)
.await;
} else {
// this is not a private transaction (or no private relays are configured)
@ -353,6 +302,7 @@ impl Web3ProxyApp {
}
}
// TODO: move this whole match to a function on self.balanced_rpcs
match self.balanced_rpcs.next_upstream_server().await {
Ok(active_request_handle) => {
let response = active_request_handle

@ -1,7 +1,8 @@
///! Load balanced communication with a group of web3 providers
use arc_swap::ArcSwap;
use counter::Counter;
use derive_more::From;
use ethers::prelude::H256;
use ethers::prelude::{ProviderError, H256};
use futures::future::join_all;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
@ -15,11 +16,13 @@ use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use tokio::task;
use tokio::time::sleep;
use tracing::Instrument;
use tracing::{debug, info, info_span, instrument, trace, warn};
use crate::config::Web3ConnectionConfig;
use crate::connection::{ActiveRequestHandle, Web3Connection};
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
// Serialize so we can print it on our debug endpoint
#[derive(Clone, Default, Serialize)]
@ -155,70 +158,61 @@ impl Web3Connections {
*self.synced_connections.load().get_head_block_hash()
}
/// Send the same request to all the handles. Returning the fastest successful result.
/// Send the same request to all the handles. Returning the most common success or most common error.
#[instrument(skip_all)]
pub async fn try_send_parallel_requests(
self: Arc<Self>,
&self,
active_request_handles: Vec<ActiveRequestHandle>,
method: String,
params: Option<Box<RawValue>>,
response_sender: flume::Sender<anyhow::Result<Box<RawValue>>>,
) -> anyhow::Result<()> {
method: &str,
// TODO: remove this box once i figure out how to do the options
params: Option<&RawValue>,
) -> Result<Box<RawValue>, ProviderError> {
// TODO: if only 1 active_request_handles, do self.try_send_request
let mut unordered_futures = FuturesUnordered::new();
for active_request_handle in active_request_handles {
// clone things so we can pass them to a future
let method = method.clone();
let params = params.clone();
let response_sender = response_sender.clone();
let responses = active_request_handles
.into_iter()
.map(|active_request_handle| async move {
let result: Result<Box<RawValue>, _> =
active_request_handle.request(method, params).await;
result
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<Box<RawValue>, ProviderError>>>()
.await;
let handle = task::Builder::default()
.name("send_request")
.spawn(async move {
let response: Box<RawValue> =
active_request_handle.request(&method, &params).await?;
// TODO: Strings are not great keys, but we can't use RawValue or ProviderError as keys
let mut count_map: HashMap<String, Result<Box<RawValue>, ProviderError>> = HashMap::new();
let mut counts: Counter<String> = Counter::new();
let mut any_ok = false;
for response in responses {
let s = format!("{:?}", response);
// send the first good response to a one shot channel. that way we respond quickly
// drop the result because errors are expected after the first send
response_sender
.send_async(Ok(response))
.await
.map_err(Into::into)
});
if count_map.get(&s).is_none() {
if response.is_ok() {
any_ok = true;
}
unordered_futures.push(handle);
count_map.insert(s.clone(), response);
}
counts.update([s].into_iter());
}
// TODO: use iterators instead of pushing into a vec?
let mut errs = vec![];
if let Some(x) = unordered_futures.next().await {
match x.unwrap() {
Ok(_) => {}
Err(e) => {
// TODO: better errors
warn!("Got an error sending request: {}", e);
errs.push(e);
}
for (most_common, _) in counts.most_common_ordered() {
// TODO: how do we take this?
let most_common = count_map.remove(&most_common).unwrap();
if any_ok && most_common.is_err() {
// errors were more common, but we are going to skip them because we got an okay
continue;
} else {
// return the most common
return most_common;
}
}
// get the first error (if any)
// TODO: why collect multiple errors if we only pop one?
let e = if !errs.is_empty() {
Err(errs.pop().unwrap())
} else {
Err(anyhow::anyhow!("no successful responses"))
};
// send the error to the channel
if response_sender.send_async(e).await.is_ok() {
// if we were able to send an error, then we never sent a success
return Err(anyhow::anyhow!("no successful responses"));
} else {
// if sending the error failed. the other side must be closed (which means we sent a success earlier)
Ok(())
}
// TODO: what should we do if we get here? i don't think we will
panic!("i don't think this is possible")
}
/// TODO: possible dead lock here. investigate more. probably refactor
@ -440,4 +434,49 @@ impl Web3Connections {
// return the earliest retry_after (if no rpcs are synced, this will be None)
Err(earliest_retry_after)
}
pub async fn try_send_all_upstream_servers(
&self,
request: JsonRpcRequest,
) -> anyhow::Result<JsonRpcForwardedResponse> {
// TODO: timeout on this loop
loop {
match self.get_upstream_servers().await {
Ok(active_request_handles) => {
// TODO: benchmark this compared to waiting on unbounded futures
// TODO: do something with this handle?
// TODO: this is not working right. simplify
let quorum_response = self
.try_send_parallel_requests(
active_request_handles,
request.method.as_ref(),
request.params.as_deref(),
)
.await?;
let response = JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: Some(quorum_response),
error: None,
};
return Ok(response);
}
Err(None) => {
// TODO: return a 502?
// TODO: i don't think this will ever happen
return Err(anyhow::anyhow!("no private rpcs!"));
}
Err(Some(retry_after)) => {
// TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
sleep(retry_after).await;
warn!("All rate limits exceeded. Sleeping");
}
}
}
}
}