From e8cc27bd37fd2747d5b705665b98137705234b18 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 29 Apr 2022 19:22:46 +0000 Subject: [PATCH] more structs --- src/main.rs | 189 ++++++------------------------------------ src/provider.rs | 39 ++++++++- src/provider_tiers.rs | 110 ++++++++++++++++++++++-- 3 files changed, 167 insertions(+), 171 deletions(-) diff --git a/src/main.rs b/src/main.rs index be3ae6dc..2f730d6c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,12 +3,8 @@ mod provider; mod provider_tiers; use futures::future; -use futures::stream::FuturesUnordered; -use futures::StreamExt; use governor::clock::{Clock, QuantaClock}; -use serde::{Deserialize, Serialize}; use serde_json::json; -use serde_json::value::RawValue; use std::collections::HashMap; use std::fmt; use std::sync::Arc; @@ -20,7 +16,8 @@ use warp::Filter; // use crate::types::{BlockMap, ConnectionsMap, RpcRateLimiterMap}; use crate::block_watcher::BlockWatcher; -use crate::provider_tiers::{Web3ConnectionMap, Web3ProviderTier}; +use crate::provider::JsonRpcRequest; +use crate::provider_tiers::Web3ProviderTier; static APP_USER_AGENT: &str = concat!( "satoshiandkin/", @@ -29,21 +26,6 @@ static APP_USER_AGENT: &str = concat!( env!("CARGO_PKG_VERSION"), ); -#[derive(Clone, Deserialize)] -struct JsonRpcRequest { - jsonrpc: Box, - id: Box, - method: String, - params: Box, -} - -#[derive(Clone, Serialize)] -struct JsonRpcForwardedResponse { - jsonrpc: Box, - id: Box, - result: Box, -} - /// The application // TODO: this debug impl is way too verbose. make something smaller struct Web3ProxyApp { @@ -64,7 +46,7 @@ struct Web3ProxyApp { impl fmt::Debug for Web3ProxyApp { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default formatter takes forever to write. this is too quiet though - write!(f, "Web3ProxyApp(...)") + f.debug_struct("Web3ProxyApp").finish_non_exhaustive() } } @@ -195,36 +177,28 @@ impl Web3ProxyApp { json_body: JsonRpcRequest, ) -> anyhow::Result { if self.private_rpcs.is_some() && json_body.method == "eth_sendRawTransaction" { - let private_rpcs = self.private_rpcs.clone().unwrap(); + let private_rpcs = self.private_rpcs.as_ref().unwrap(); // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs loop { + // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit let read_lock = self.private_rpcs_ratelimiter_lock.read().await; - let json_body_clone = json_body.clone(); - match private_rpcs.get_upstream_servers().await { Ok(upstream_servers) => { - let (tx, mut rx) = - mpsc::unbounded_channel::>(); + let (tx, mut rx) = mpsc::unbounded_channel(); - let clone = self.clone(); let connections = private_rpcs.clone_connections(); - - // check incoming_id before sending any requests - let incoming_id = &*json_body.id; + let method = json_body.method.clone(); + let params = json_body.params.clone(); tokio::spawn(async move { - clone - .try_send_requests( - upstream_servers, - connections, - json_body_clone, - tx, - ) + connections + .try_send_requests(upstream_servers, method, params, tx) .await }); + // wait for the first response let response = rx .recv() .await @@ -233,7 +207,7 @@ impl Web3ProxyApp { if let Ok(partial_response) = response { let response = json!({ "jsonrpc": "2.0", - "id": incoming_id, + "id": json_body.id, "result": partial_response }); return Ok(warp::reply::json(&response)); @@ -265,58 +239,37 @@ impl Web3ProxyApp { // there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again let mut earliest_not_until = None; - // check incoming_id before sending any requests - let incoming_id = &*json_body.id; - for balanced_rpcs in self.balanced_rpc_tiers.iter() { // TODO: what allowed lag? match balanced_rpcs.next_upstream_server().await { Ok(upstream_server) => { - // TODO: better type for this. right now its request (the full jsonrpc object), response (just the inner result) - let (tx, mut rx) = - mpsc::unbounded_channel::>(); + let connections = balanced_rpcs.connections(); - { - // clone things so we can move them into the future and still use them here - let clone = self.clone(); - let connections = balanced_rpcs.clone_connections(); - let json_body = json_body.clone(); - let upstream_server = upstream_server.clone(); - - tokio::spawn(async move { - clone - .try_send_requests( - vec![upstream_server], - connections, - json_body, - tx, - ) - .await - }); - } - - let response = rx - .recv() - .await - .ok_or_else(|| anyhow::anyhow!("no successful response"))?; + let response = connections + .try_send_request( + upstream_server, + json_body.method.clone(), + json_body.params.clone(), + ) + .await; let response = match response { Ok(partial_response) => { - // TODO: trace + // TODO: trace here was really slow with millions of requests. // info!("forwarding request from {}", upstream_server); json!({ - "jsonrpc": "2.0", - "id": incoming_id, + // TODO: re-use their jsonrpc? + "jsonrpc": json_body.jsonrpc, + "id": json_body.id, "result": partial_response }) } Err(e) => { // TODO: what is the proper format for an error? - // TODO: use e json!({ - "jsonrpc": "2.0", - "id": incoming_id, + "jsonrpc": json_body.jsonrpc, + "id": json_body.id, "error": format!("{}", e) }) } @@ -329,6 +282,7 @@ impl Web3ProxyApp { } Err(Some(not_until)) => { // save the smallest not_until. if nothing succeeds, return an Err with not_until in it + // TODO: helper function for this if earliest_not_until.is_none() { earliest_not_until.replace(not_until); } else { @@ -366,95 +320,6 @@ impl Web3ProxyApp { } } } - - async fn try_send_requests( - &self, - rpc_servers: Vec, - connections: Arc, - json_request_body: JsonRpcRequest, - // TODO: better type for this - tx: mpsc::UnboundedSender>, - ) -> anyhow::Result<()> { - // {"jsonrpc":"2.0","method":"eth_syncing","params":[],"id":1} - let method = json_request_body.method.clone(); - let params = json_request_body.params; - - if rpc_servers.len() == 1 { - let rpc = rpc_servers.first().unwrap(); - - let provider = connections.get(rpc).unwrap().clone_provider(); - - let response = provider.request(&method, params).await; - - connections.get(rpc).unwrap().dec_active_requests(); - - tx.send(response.map_err(Into::into))?; - - Ok(()) - } else { - // TODO: lets just use a usize index or something - let method = Arc::new(method); - - let mut unordered_futures = FuturesUnordered::new(); - - for rpc in rpc_servers { - let connections = connections.clone(); - let method = method.clone(); - let params = params.clone(); - let tx = tx.clone(); - - let handle = tokio::spawn(async move { - // get the client for this rpc server - let provider = connections.get(&rpc).unwrap().clone_provider(); - - let response = provider.request(&method, params).await; - - connections.get(&rpc).unwrap().dec_active_requests(); - - let response = response?; - - // TODO: if "no block with that header" or some other jsonrpc errors, skip this response - - // send the first good response to a one shot channel. that way we respond quickly - // drop the result because errors are expected after the first send - let _ = tx.send(Ok(response)); - - Ok::<(), anyhow::Error>(()) - }); - - unordered_futures.push(handle); - } - - // TODO: use iterators instead of pushing into a vec - let mut errs = vec![]; - if let Some(x) = unordered_futures.next().await { - match x.unwrap() { - Ok(_) => {} - Err(e) => { - // TODO: better errors - warn!("Got an error sending request: {}", e); - errs.push(e); - } - } - } - - // get the first error (if any) - let e: anyhow::Result = if !errs.is_empty() { - Err(errs.pop().unwrap()) - } else { - Err(anyhow::anyhow!("no successful responses")) - }; - - // send the error to the channel - if tx.send(e).is_ok() { - // if we were able to send an error, then we never sent a success - return Err(anyhow::anyhow!("no successful responses")); - } else { - // if sending the error failed. the other side must be closed (which means we sent a success earlier) - Ok(()) - } - } - } } #[tokio::main] diff --git a/src/provider.rs b/src/provider.rs index a39d6c01..d3dfe3cb 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -7,6 +7,8 @@ use governor::middleware::NoOpMiddleware; use governor::state::{InMemoryState, NotKeyed}; use governor::NotUntil; use governor::RateLimiter; +use serde::{Deserialize, Serialize}; +use serde_json::value::RawValue; use std::fmt; use std::sync::atomic::{self, AtomicUsize}; use std::time::Duration; @@ -19,6 +21,39 @@ use crate::block_watcher::BlockWatcherSender; type Web3RateLimiter = RateLimiter>; +#[derive(Clone, Deserialize)] +pub struct JsonRpcRequest { + pub jsonrpc: Box, + pub id: Box, + pub method: String, + pub params: Box, +} + +impl fmt::Debug for JsonRpcRequest { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: the default formatter takes forever to write. this is too quiet though + f.debug_struct("JsonRpcRequest") + .field("id", &self.id) + .finish_non_exhaustive() + } +} + +#[derive(Clone, Deserialize, Serialize)] +pub struct JsonRpcForwardedResponse { + pub jsonrpc: Box, + pub id: Box, + pub result: Box, +} + +impl fmt::Debug for JsonRpcForwardedResponse { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: the default formatter takes forever to write. this is too quiet though + f.debug_struct("JsonRpcForwardedResponse") + .field("id", &self.id) + .finish_non_exhaustive() + } +} + // TODO: instead of an enum, I tried to use Box, but hit https://github.com/gakonst/ethers-rs/issues/592 #[derive(From)] pub enum Web3Provider { @@ -29,7 +64,7 @@ pub enum Web3Provider { impl fmt::Debug for Web3Provider { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default formatter takes forever to write. this is too quiet though - write!(f, "Web3Provider(...)") + f.debug_struct("Web3Provider").finish_non_exhaustive() } } @@ -40,7 +75,7 @@ impl Web3Provider { &self, method: &str, params: Box, - ) -> Result { + ) -> Result { match self { Self::Http(provider) => provider.request(method, params).await, Self::Ws(provider) => provider.request(method, params).await, diff --git a/src/provider_tiers.rs b/src/provider_tiers.rs index 2026721d..e94a0486 100644 --- a/src/provider_tiers.rs +++ b/src/provider_tiers.rs @@ -1,18 +1,110 @@ ///! Communicate with groups of web3 providers use arc_swap::ArcSwap; +use derive_more::From; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use governor::clock::{QuantaClock, QuantaInstant}; use governor::NotUntil; +use serde_json::value::RawValue; use std::cmp; use std::collections::HashMap; use std::fmt; use std::num::NonZeroU32; use std::sync::Arc; +use tokio::sync::mpsc; +use tracing::warn; use crate::block_watcher::{BlockWatcher, SyncStatus}; -use crate::provider::Web3Connection; +use crate::provider::{JsonRpcForwardedResponse, Web3Connection}; -// TODO: move the rate limiter into the connection -pub type Web3ConnectionMap = HashMap; +#[derive(From)] +pub struct Web3Connections(HashMap); + +impl Web3Connections { + pub fn get(&self, rpc: &str) -> Option<&Web3Connection> { + self.0.get(rpc) + } + + pub async fn try_send_request( + &self, + rpc: String, + method: String, + params: Box, + ) -> anyhow::Result { + let connection = self.get(&rpc).unwrap(); + + // TODO: do we need this clone or can we do a reference? + let provider = connection.clone_provider(); + + let response = provider.request(&method, params).await; + + connection.dec_active_requests(); + + // TODO: if "no block with that header" or some other jsonrpc errors, skip this response + + response.map_err(Into::into) + } + + pub async fn try_send_requests( + self: Arc, + rpc_servers: Vec, + method: String, + params: Box, + // TODO: i think this should actually be a oneshot + response_sender: mpsc::UnboundedSender>, + ) -> anyhow::Result<()> { + let method = Arc::new(method); + + let mut unordered_futures = FuturesUnordered::new(); + + for rpc in rpc_servers { + let connections = self.clone(); + let method = method.to_string(); + let params = params.clone(); + let response_sender = response_sender.clone(); + + let handle = tokio::spawn(async move { + // get the client for this rpc server + let response = connections.try_send_request(rpc, method, params).await?; + + // send the first good response to a one shot channel. that way we respond quickly + // drop the result because errors are expected after the first send + response_sender.send(Ok(response)).map_err(Into::into) + }); + + unordered_futures.push(handle); + } + + // TODO: use iterators instead of pushing into a vec + let mut errs = vec![]; + if let Some(x) = unordered_futures.next().await { + match x.unwrap() { + Ok(_) => {} + Err(e) => { + // TODO: better errors + warn!("Got an error sending request: {}", e); + errs.push(e); + } + } + } + + // get the first error (if any) + let e = if !errs.is_empty() { + Err(errs.pop().unwrap()) + } else { + Err(anyhow::anyhow!("no successful responses")) + }; + + // send the error to the channel + if response_sender.send(e).is_ok() { + // if we were able to send an error, then we never sent a success + return Err(anyhow::anyhow!("no successful responses")); + } else { + // if sending the error failed. the other side must be closed (which means we sent a success earlier) + Ok(()) + } + } +} /// Load balance to the rpc pub struct Web3ProviderTier { @@ -20,13 +112,13 @@ pub struct Web3ProviderTier { /// TODO: we probably want a better lock synced_rpcs: ArcSwap>, rpcs: Vec, - connections: Arc, + connections: Arc, } impl fmt::Debug for Web3ProviderTier { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default formatter takes forever to write. this is too quiet though - write!(f, "Web3ProviderTier") + f.debug_struct("Web3ProviderTier").finish_non_exhaustive() } } @@ -67,11 +159,15 @@ impl Web3ProviderTier { Ok(Web3ProviderTier { synced_rpcs: ArcSwap::from(Arc::new(vec![])), rpcs, - connections: Arc::new(connections), + connections: Arc::new(connections.into()), }) } - pub fn clone_connections(&self) -> Arc { + pub fn connections(&self) -> &Web3Connections { + &self.connections + } + + pub fn clone_connections(&self) -> Arc { self.connections.clone() }