diff --git a/redis-cell-client/src/client.rs b/redis-cell-client/src/client.rs index fce57c03..d3b3d0bd 100644 --- a/redis-cell-client/src/client.rs +++ b/redis-cell-client/src/client.rs @@ -15,6 +15,7 @@ pub struct RedisCellClient { impl RedisCellClient { // todo: seems like this could be derived + // TODO: take something generic for conn pub fn new( conn: MultiplexedConnection, key: String, diff --git a/redis-cell-client/src/lib.rs b/redis-cell-client/src/lib.rs index be0ef92d..d28c9f93 100644 --- a/redis-cell-client/src/lib.rs +++ b/redis-cell-client/src/lib.rs @@ -1,5 +1,7 @@ mod client; pub use client::RedisCellClient; +pub use redis; +// TODO: don't hard code MultiplexedConnection pub use redis::aio::MultiplexedConnection; pub use redis::Client; diff --git a/redis-cell-server/Dockerfile b/redis-cell-server/Dockerfile index f7cc5999..077806a0 100644 --- a/redis-cell-server/Dockerfile +++ b/redis-cell-server/Dockerfile @@ -1,3 +1,4 @@ +# A redis server with the libredis_cell module installed FROM rust:1-bullseye as builder WORKDIR /usr/src/redis-cell diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 44973f69..dcb2401f 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -242,65 +242,14 @@ impl Web3ProxyApp { for _i in 0..10usize { // // TODO: add more to this span // let span = info_span!("i", ?i); - // let _enter = span.enter(); + // let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon) if request.method == "eth_sendRawTransaction" { // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit - match self.private_rpcs.get_upstream_servers().await { - Ok(active_request_handles) => { - // TODO: refactor this to block until it has responses from all and also to return the most common success response - // TODO: i don't think we need to spawn it if we do that. - let (tx, rx) = flume::bounded(1); - - let connections = self.private_rpcs.clone(); - let method = request.method.clone(); - let params = request.params.clone(); - - // TODO: benchmark this compared to waiting on unbounded futures - // TODO: do something with this handle? - // TODO: - task::Builder::default() - .name("try_send_parallel_requests") - .spawn(async move { - connections - .try_send_parallel_requests( - active_request_handles, - method, - params, - tx, - ) - .await - }); - - // wait for the first response - // TODO: we don't want the first response. we want the quorum response - let backend_response = rx.recv_async().await?; - - if let Ok(backend_response) = backend_response { - // TODO: i think we - let response = JsonRpcForwardedResponse { - jsonrpc: "2.0".to_string(), - id: request.id, - result: Some(backend_response), - error: None, - }; - return Ok(response); - } - } - Err(None) => { - // TODO: return a 502? - // TODO: i don't think this will ever happen - return Err(anyhow::anyhow!("no private rpcs!")); - } - Err(Some(retry_after)) => { - // TODO: move this to a helper function - // sleep (TODO: with a lock?) until our rate limits should be available - // TODO: if a server catches up sync while we are waiting, we could stop waiting - sleep(retry_after).await; - - warn!("All rate limits exceeded. Sleeping"); - } - }; + return self + .private_rpcs + .try_send_all_upstream_servers(request) + .await; } else { // this is not a private transaction (or no private relays are configured) @@ -353,6 +302,7 @@ impl Web3ProxyApp { } } + // TODO: move this whole match to a function on self.balanced_rpcs match self.balanced_rpcs.next_upstream_server().await { Ok(active_request_handle) => { let response = active_request_handle diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 89c71e89..14fa1881 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -1,7 +1,8 @@ ///! Load balanced communication with a group of web3 providers use arc_swap::ArcSwap; +use counter::Counter; use derive_more::From; -use ethers::prelude::H256; +use ethers::prelude::{ProviderError, H256}; use futures::future::join_all; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -15,11 +16,13 @@ use std::fmt; use std::sync::Arc; use std::time::Duration; use tokio::task; +use tokio::time::sleep; use tracing::Instrument; use tracing::{debug, info, info_span, instrument, trace, warn}; use crate::config::Web3ConnectionConfig; use crate::connection::{ActiveRequestHandle, Web3Connection}; +use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; // Serialize so we can print it on our debug endpoint #[derive(Clone, Default, Serialize)] @@ -155,70 +158,61 @@ impl Web3Connections { *self.synced_connections.load().get_head_block_hash() } - /// Send the same request to all the handles. Returning the fastest successful result. + /// Send the same request to all the handles. Returning the most common success or most common error. #[instrument(skip_all)] pub async fn try_send_parallel_requests( - self: Arc, + &self, active_request_handles: Vec, - method: String, - params: Option>, - response_sender: flume::Sender>>, - ) -> anyhow::Result<()> { + method: &str, + // TODO: remove this box once i figure out how to do the options + params: Option<&RawValue>, + ) -> Result, ProviderError> { // TODO: if only 1 active_request_handles, do self.try_send_request - let mut unordered_futures = FuturesUnordered::new(); - for active_request_handle in active_request_handles { - // clone things so we can pass them to a future - let method = method.clone(); - let params = params.clone(); - let response_sender = response_sender.clone(); + let responses = active_request_handles + .into_iter() + .map(|active_request_handle| async move { + let result: Result, _> = + active_request_handle.request(method, params).await; + result + }) + .collect::>() + .collect::, ProviderError>>>() + .await; - let handle = task::Builder::default() - .name("send_request") - .spawn(async move { - let response: Box = - active_request_handle.request(&method, ¶ms).await?; + // TODO: Strings are not great keys, but we can't use RawValue or ProviderError as keys + let mut count_map: HashMap, ProviderError>> = HashMap::new(); + let mut counts: Counter = Counter::new(); + let mut any_ok = false; + for response in responses { + let s = format!("{:?}", response); - // send the first good response to a one shot channel. that way we respond quickly - // drop the result because errors are expected after the first send - response_sender - .send_async(Ok(response)) - .await - .map_err(Into::into) - }); + if count_map.get(&s).is_none() { + if response.is_ok() { + any_ok = true; + } - unordered_futures.push(handle); + count_map.insert(s.clone(), response); + } + + counts.update([s].into_iter()); } - // TODO: use iterators instead of pushing into a vec? - let mut errs = vec![]; - if let Some(x) = unordered_futures.next().await { - match x.unwrap() { - Ok(_) => {} - Err(e) => { - // TODO: better errors - warn!("Got an error sending request: {}", e); - errs.push(e); - } + for (most_common, _) in counts.most_common_ordered() { + // TODO: how do we take this? + let most_common = count_map.remove(&most_common).unwrap(); + + if any_ok && most_common.is_err() { + // errors were more common, but we are going to skip them because we got an okay + continue; + } else { + // return the most common + return most_common; } } - // get the first error (if any) - // TODO: why collect multiple errors if we only pop one? - let e = if !errs.is_empty() { - Err(errs.pop().unwrap()) - } else { - Err(anyhow::anyhow!("no successful responses")) - }; - - // send the error to the channel - if response_sender.send_async(e).await.is_ok() { - // if we were able to send an error, then we never sent a success - return Err(anyhow::anyhow!("no successful responses")); - } else { - // if sending the error failed. the other side must be closed (which means we sent a success earlier) - Ok(()) - } + // TODO: what should we do if we get here? i don't think we will + panic!("i don't think this is possible") } /// TODO: possible dead lock here. investigate more. probably refactor @@ -440,4 +434,49 @@ impl Web3Connections { // return the earliest retry_after (if no rpcs are synced, this will be None) Err(earliest_retry_after) } + + pub async fn try_send_all_upstream_servers( + &self, + request: JsonRpcRequest, + ) -> anyhow::Result { + // TODO: timeout on this loop + loop { + match self.get_upstream_servers().await { + Ok(active_request_handles) => { + // TODO: benchmark this compared to waiting on unbounded futures + // TODO: do something with this handle? + // TODO: this is not working right. simplify + let quorum_response = self + .try_send_parallel_requests( + active_request_handles, + request.method.as_ref(), + request.params.as_deref(), + ) + .await?; + + let response = JsonRpcForwardedResponse { + jsonrpc: "2.0".to_string(), + id: request.id, + result: Some(quorum_response), + error: None, + }; + + return Ok(response); + } + Err(None) => { + // TODO: return a 502? + // TODO: i don't think this will ever happen + return Err(anyhow::anyhow!("no private rpcs!")); + } + Err(Some(retry_after)) => { + // TODO: move this to a helper function + // sleep (TODO: with a lock?) until our rate limits should be available + // TODO: if a server catches up sync while we are waiting, we could stop waiting + sleep(retry_after).await; + + warn!("All rate limits exceeded. Sleeping"); + } + } + } + } }