From 3914a41fa27349b5075fe552578eeb16bde736e5 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 11 May 2022 23:50:52 +0000 Subject: [PATCH] handle batched requests --- Cargo.lock | 4 +- config/example.toml | 21 -- config/example.toml.bac | 29 +++ web3-proxy/Cargo.toml | 2 +- web3-proxy/src/app.rs | 415 ++++++++++++++++++++++++++++++++++ web3-proxy/src/connection.rs | 52 +---- web3-proxy/src/connections.rs | 15 +- web3-proxy/src/jsonrpc.rs | 211 +++++++++++++++++ web3-proxy/src/main.rs | 358 +---------------------------- 9 files changed, 678 insertions(+), 429 deletions(-) create mode 100644 config/example.toml.bac create mode 100644 web3-proxy/src/app.rs create mode 100644 web3-proxy/src/jsonrpc.rs diff --git a/Cargo.lock b/Cargo.lock index 2cfd51c0..fb58cc17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3333,9 +3333,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.18.1" +version = "1.18.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dce653fb475565de9f6fb0614b28bca8df2c430c0cf84bcd9c843f15de5414cc" +checksum = "4903bf0427cf68dddd5aa6a93220756f8be0c34fcfa9f5e6191e103e15a31395" dependencies = [ "bytes", "libc", diff --git a/config/example.toml b/config/example.toml index aeef796d..3cf01124 100644 --- a/config/example.toml +++ b/config/example.toml @@ -6,24 +6,3 @@ url = "ws://127.0.0.1:8546" soft_limit = 200_000 - [balanced_rpc_tiers.0.ankr] - url = "https://rpc.ankr.com/eth" - soft_limit = 3_000 - -[private_rpcs] - - [private_rpcs.eden] - url = "https://api.edennetwork.io/v1/" - soft_limit = 1_805 - - [private_rpcs.eden_beta] - url = "https://api.edennetwork.io/v1/beta" - soft_limit = 5_861 - - [private_rpcs.ethermine] - url = "https://rpc.ethermine.org" - soft_limit = 5_861 - - [private_rpcs.flashbots] - url = "https://rpc.flashbots.net" - soft_limit = 7074 diff --git a/config/example.toml.bac b/config/example.toml.bac new file mode 100644 index 00000000..aeef796d --- /dev/null +++ b/config/example.toml.bac @@ -0,0 +1,29 @@ +[balanced_rpc_tiers] + +[balanced_rpc_tiers.0] + + [balanced_rpc_tiers.0.geth] + url = "ws://127.0.0.1:8546" + soft_limit = 200_000 + + [balanced_rpc_tiers.0.ankr] + url = "https://rpc.ankr.com/eth" + soft_limit = 3_000 + +[private_rpcs] + + [private_rpcs.eden] + url = "https://api.edennetwork.io/v1/" + soft_limit = 1_805 + + [private_rpcs.eden_beta] + url = "https://api.edennetwork.io/v1/beta" + soft_limit = 5_861 + + [private_rpcs.ethermine] + url = "https://rpc.ethermine.org" + soft_limit = 5_861 + + [private_rpcs.flashbots] + url = "https://rpc.flashbots.net" + soft_limit = 7074 diff --git a/web3-proxy/Cargo.toml b/web3-proxy/Cargo.toml index 1eef379c..1da21221 100644 --- a/web3-proxy/Cargo.toml +++ b/web3-proxy/Cargo.toml @@ -22,7 +22,7 @@ reqwest = { version = "0.11.10", default-features = false, features = ["json", " rustc-hash = "1.1.0" serde = { version = "1.0.137", features = [] } serde_json = { version = "1.0.81", default-features = false, features = ["alloc"] } -tokio = { version = "1.18.1", features = ["full"] } +tokio = { version = "1.18.2", features = ["full"] } toml = "0.5.9" tracing = "0.1.34" tracing-subscriber = "0.3.11" diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs new file mode 100644 index 00000000..14d010f5 --- /dev/null +++ b/web3-proxy/src/app.rs @@ -0,0 +1,415 @@ +use crate::config::Web3ConnectionConfig; +use crate::connections::Web3Connections; +use crate::jsonrpc::JsonRpcErrorData; +use crate::jsonrpc::JsonRpcForwardedResponse; +use crate::jsonrpc::JsonRpcForwardedResponseEnum; +use crate::jsonrpc::JsonRpcRequest; +use crate::jsonrpc::JsonRpcRequestEnum; +use ethers::prelude::ProviderError; +use ethers::prelude::{HttpClientError, WsClientError}; +use futures::future; +use futures::future::join_all; +use governor::clock::{Clock, QuantaClock}; +use linkedhashmap::LinkedHashMap; +use std::fmt; +use std::sync::atomic::{self, AtomicU64}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::RwLock; +use tokio::time::sleep; +use tracing::{trace, warn}; + +static APP_USER_AGENT: &str = concat!( + "satoshiandkin/", + env!("CARGO_PKG_NAME"), + "/", + env!("CARGO_PKG_VERSION"), +); + +// TODO: put this in config? what size should we do? +const RESPONSE_CACHE_CAP: usize = 1024; + +/// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work +type ResponseLruCache = RwLock>; + +/// The application +// TODO: this debug impl is way too verbose. make something smaller +// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs +pub struct Web3ProxyApp { + best_head_block_number: Arc, + /// clock used for rate limiting + /// TODO: use tokio's clock (will require a different ratelimiting crate) + clock: QuantaClock, + /// Send requests to the best server available + balanced_rpc_tiers: Vec>, + /// Send private requests (like eth_sendRawTransaction) to all these servers + private_rpcs: Option>, + response_cache: ResponseLruCache, +} + +impl fmt::Debug for Web3ProxyApp { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: the default formatter takes forever to write. this is too quiet though + f.debug_struct("Web3ProxyApp") + .field( + "best_head_block_number", + &self.best_head_block_number.load(atomic::Ordering::Relaxed), + ) + .finish_non_exhaustive() + } +} + +impl Web3ProxyApp { + pub async fn try_new( + balanced_rpc_tiers: Vec>, + private_rpcs: Vec, + ) -> anyhow::Result { + let clock = QuantaClock::default(); + + let best_head_block_number = Arc::new(AtomicU64::new(0)); + + // make a http shared client + // TODO: how should we configure the connection pool? + // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server + let http_client = reqwest::ClientBuilder::new() + .connect_timeout(Duration::from_secs(5)) + .timeout(Duration::from_secs(300)) + .user_agent(APP_USER_AGENT) + .build()?; + + // TODO: attach context to this error + let balanced_rpc_tiers = + future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| { + Web3Connections::try_new( + best_head_block_number.clone(), + balanced_rpc_tier, + Some(http_client.clone()), + &clock, + ) + })) + .await + .into_iter() + .collect::>>>()?; + + // TODO: attach context to this error + let private_rpcs = if private_rpcs.is_empty() { + warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); + // TODO: instead of None, set it to a list of all the rpcs from balanced_rpc_tiers. that way we broadcast very loudly + None + } else { + Some( + Web3Connections::try_new( + best_head_block_number.clone(), + private_rpcs, + Some(http_client), + &clock, + ) + .await?, + ) + }; + + Ok(Web3ProxyApp { + best_head_block_number, + clock, + balanced_rpc_tiers, + private_rpcs, + response_cache: Default::default(), + }) + } + + /// send the request to the approriate RPCs + /// TODO: dry this up + pub async fn proxy_web3_rpc( + self: Arc, + request: JsonRpcRequestEnum, + ) -> anyhow::Result { + trace!("Received request: {:?}", request); + + let response = match request { + JsonRpcRequestEnum::Single(request) => { + JsonRpcForwardedResponseEnum::Single(self.proxy_web3_rpc_request(request).await?) + } + JsonRpcRequestEnum::Batch(requests) => { + JsonRpcForwardedResponseEnum::Batch(self.proxy_web3_rpc_requests(requests).await?) + } + }; + + Ok(warp::reply::json(&response)) + } + + async fn proxy_web3_rpc_requests( + self: Arc, + requests: Vec, + ) -> anyhow::Result> { + // TODO: we should probably change ethers-rs to support this directly + // we cut up the request and send to potentually different servers. this could be a problem. + // if the client needs consistent blocks, they should specify instead of assume batches work on the same + // TODO: is spawning here actually slower? + let num_requests = requests.len(); + let responses = join_all( + requests + .into_iter() + .map(|request| { + let clone = self.clone(); + tokio::spawn(async move { clone.proxy_web3_rpc_request(request).await }) + }) + .collect::>(), + ) + .await; + + // TODO: i'm sure this could be done better with iterators + let mut collected: Vec = Vec::with_capacity(num_requests); + for response in responses { + collected.push(response??); + } + + Ok(collected) + } + + async fn proxy_web3_rpc_request( + self: Arc, + request: JsonRpcRequest, + ) -> anyhow::Result { + trace!("Received request: {:?}", request); + + // TODO: apparently json_body can be a vec of multiple requests. should we split them up? we need to respond with a Vec too + + if self.private_rpcs.is_some() && request.method == "eth_sendRawTransaction" { + let private_rpcs = self.private_rpcs.as_ref().unwrap(); + + // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs + loop { + // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit + match private_rpcs.get_upstream_servers() { + Ok(active_request_handles) => { + let (tx, rx) = flume::unbounded(); + + let connections = private_rpcs.clone(); + let method = request.method.clone(); + let params = request.params.clone(); + + // TODO: benchmark this compared to waiting on unbounded futures + // TODO: do something with this handle? + tokio::spawn(async move { + connections + .try_send_parallel_requests( + active_request_handles, + method, + params, + tx, + ) + .await + }); + + // wait for the first response + let backend_response = rx.recv_async().await?; + + if let Ok(backend_response) = backend_response { + // TODO: i think we + let response = JsonRpcForwardedResponse { + jsonrpc: "2.0".to_string(), + id: request.id, + result: Some(backend_response), + error: None, + }; + return Ok(response); + } + } + Err(not_until) => { + // TODO: move this to a helper function + // sleep (TODO: with a lock?) until our rate limits should be available + // TODO: if a server catches up sync while we are waiting, we could stop waiting + if let Some(not_until) = not_until { + let deadline = not_until.wait_time_from(self.clock.now()); + + sleep(deadline).await; + } + } + }; + } + } else { + // this is not a private transaction (or no private relays are configured) + // try to send to each tier, stopping at the first success + // if no tiers are synced, fallback to privates + loop { + // there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again + let mut earliest_not_until = None; + + // TODO: how can we better build this iterator? + let rpc_iter = if let Some(private_rpcs) = self.private_rpcs.as_ref() { + self.balanced_rpc_tiers.iter().chain(vec![private_rpcs]) + } else { + self.balanced_rpc_tiers.iter().chain(vec![]) + }; + + for balanced_rpcs in rpc_iter { + let best_head_block_number = + self.best_head_block_number.load(atomic::Ordering::Acquire); // TODO: we don't store current block for everything anymore. we store it on the connections + + let best_rpc_block_number = balanced_rpcs.head_block_number(); + + if best_rpc_block_number < best_head_block_number { + continue; + } + + // TODO: building this cache key is slow and its large, but i don't see a better way right now + // TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block + let cache_key = ( + best_head_block_number, + request.method.clone(), + request.params.to_string(), + ); + + if let Some(cached) = self.response_cache.read().await.get(&cache_key) { + // TODO: this still serializes every time + // TODO: return a reference in the other places so that this works without a clone? + return Ok(cached.to_owned()); + } + + // TODO: what allowed lag? + match balanced_rpcs.next_upstream_server().await { + Ok(active_request_handle) => { + let response = balanced_rpcs + .try_send_request( + active_request_handle, + &request.method, + &request.params, + ) + .await; + + let response = match response { + Ok(partial_response) => { + // TODO: trace here was really slow with millions of requests. + // info!("forwarding request from {}", upstream_server); + + let response = JsonRpcForwardedResponse { + jsonrpc: "2.0".to_string(), + id: request.id, + // TODO: since we only use the result here, should that be all we return from try_send_request? + result: Some(partial_response), + error: None, + }; + + // TODO: small race condidition here. parallel requests with the same query will both be saved to the cache + let mut response_cache = self.response_cache.write().await; + + // TODO: cache the warp::reply to save us serializing every time + response_cache.insert(cache_key, response.clone()); + if response_cache.len() >= RESPONSE_CACHE_CAP { + response_cache.pop_front(); + } + + response + } + Err(e) => { + let code; + let message: String; + let data; + + match e { + ProviderError::JsonRpcClientError(e) => { + // TODO: we should check what type the provider is rather than trying to downcast both types of errors + if let Some(e) = e.downcast_ref::() { + match &*e { + HttpClientError::JsonRpcError(e) => { + code = e.code; + message = e.message.clone(); + data = e.data.clone(); + } + e => { + // TODO: improve this + code = -32603; + message = format!("{}", e); + data = None; + } + } + } else if let Some(e) = + e.downcast_ref::() + { + match &*e { + WsClientError::JsonRpcError(e) => { + code = e.code; + message = e.message.clone(); + data = e.data.clone(); + } + e => { + // TODO: improve this + code = -32603; + message = format!("{}", e); + data = None; + } + } + } else { + unimplemented!(); + } + } + _ => { + code = -32603; + message = format!("{}", e); + data = None; + } + } + + JsonRpcForwardedResponse { + jsonrpc: "2.0".to_string(), + id: request.id, + result: None, + error: Some(JsonRpcErrorData { + code, + message, + data, + }), + } + } + }; + + if response.error.is_some() { + trace!("Sending error reply: {:?}", response); + } else { + trace!("Sending reply: {:?}", response); + } + + return Ok(response); + } + Err(None) => { + // TODO: this is too verbose. if there are other servers in other tiers, we use those! + // warn!("No servers in sync!"); + } + Err(Some(not_until)) => { + // save the smallest not_until. if nothing succeeds, return an Err with not_until in it + // TODO: helper function for this + if earliest_not_until.is_none() { + earliest_not_until.replace(not_until); + } else { + let earliest_possible = + earliest_not_until.as_ref().unwrap().earliest_possible(); + + let new_earliest_possible = not_until.earliest_possible(); + + if earliest_possible > new_earliest_possible { + earliest_not_until = Some(not_until); + } + } + } + } + } + + // we haven't returned an Ok + // if we did return a rate limit error, sleep and try again + if let Some(earliest_not_until) = earliest_not_until { + let deadline = earliest_not_until.wait_time_from(self.clock.now()); + + // TODO: max wait + + sleep(deadline).await; + } else { + // TODO: how long should we wait? + // TODO: max wait time? + warn!("No servers in sync!"); + // TODO: return json error? return a 502? + return Err(anyhow::anyhow!("no servers in sync")); + }; + } + } + } +} diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 2ccb39dc..28a55bfa 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -1,4 +1,4 @@ -///! Communicate with a web3 provider +///! Rate-limited communication with a web3 provider use derive_more::From; use ethers::prelude::Middleware; use futures::StreamExt; @@ -7,7 +7,6 @@ use governor::middleware::NoOpMiddleware; use governor::state::{InMemoryState, NotKeyed}; use governor::NotUntil; use governor::RateLimiter; -use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; use std::fmt; use std::num::NonZeroU32; @@ -274,7 +273,7 @@ impl Web3Connection { } } -/// Drop this once a connection to completes +/// Drop this once a connection completes pub struct ActiveRequestHandle(Arc); impl ActiveRequestHandle { @@ -351,50 +350,3 @@ impl PartialEq for Web3Connection { == other.active_requests.load(atomic::Ordering::Acquire) } } - -#[derive(Clone, Deserialize)] -pub struct JsonRpcRequest { - pub id: Box, - pub method: String, - pub params: Box, -} - -impl fmt::Debug for JsonRpcRequest { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though - f.debug_struct("JsonRpcRequest") - .field("id", &self.id) - .finish_non_exhaustive() - } -} - -#[derive(Serialize, Clone)] -pub struct JsonRpcErrorData { - /// The error code - pub code: i64, - /// The error message - pub message: String, - /// Additional data - #[serde(skip_serializing_if = "Option::is_none")] - pub data: Option, -} - -#[derive(Clone, Serialize)] -pub struct JsonRpcForwardedResponse { - pub jsonrpc: String, - pub id: Box, - #[serde(skip_serializing_if = "Option::is_none")] - pub result: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub error: Option, - // TODO: optional error -} - -impl fmt::Debug for JsonRpcForwardedResponse { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though - f.debug_struct("JsonRpcForwardedResponse") - .field("id", &self.id) - .finish_non_exhaustive() - } -} diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index d66d5131..f4e67454 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -1,4 +1,4 @@ -///! Communicate with a group of web3 providers +///! Load balanced communication with a group of web3 providers use derive_more::From; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -109,27 +109,28 @@ impl Web3Connections { pub async fn try_send_request( &self, - connection_handle: ActiveRequestHandle, + active_request_handle: ActiveRequestHandle, method: &str, params: &RawValue, ) -> Result, ethers::prelude::ProviderError> { - let response = connection_handle.request(method, params).await; + let response = active_request_handle.request(method, params).await; // TODO: if "no block with that header" or some other jsonrpc errors, skip this response? response } - pub async fn try_send_requests( + /// Send the same request to all the handles. Returning the fastest successful result. + pub async fn try_send_parallel_requests( self: Arc, - connections: Vec, + active_request_handles: Vec, method: String, params: Box, response_sender: flume::Sender>>, ) -> anyhow::Result<()> { let mut unordered_futures = FuturesUnordered::new(); - for connection in connections { + for connection in active_request_handles { // clone things so we can pass them to a future let connections = self.clone(); let method = method.clone(); @@ -150,7 +151,7 @@ impl Web3Connections { unordered_futures.push(handle); } - // TODO: use iterators instead of pushing into a vec + // TODO: use iterators instead of pushing into a vec? let mut errs = vec![]; if let Some(x) = unordered_futures.next().await { match x.unwrap() { diff --git a/web3-proxy/src/jsonrpc.rs b/web3-proxy/src/jsonrpc.rs new file mode 100644 index 00000000..e705f252 --- /dev/null +++ b/web3-proxy/src/jsonrpc.rs @@ -0,0 +1,211 @@ +use serde::de::{self, Deserialize, Deserializer, MapAccess, SeqAccess, Visitor}; +use serde::Serialize; +use serde_json::value::RawValue; +use std::fmt; + +#[derive(Clone, serde::Deserialize)] +pub struct JsonRpcRequest { + // TODO: skip jsonrpc entireley? + // pub jsonrpc: Box, + pub id: Box, + pub method: String, + pub params: Box, +} + +impl fmt::Debug for JsonRpcRequest { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: the default formatter takes forever to write. this is too quiet though + f.debug_struct("JsonRpcRequest") + .field("id", &self.id) + .field("method", &self.method) + .field("params", &self.params) + .finish() + } +} + +/// Requests can come in multiple formats +#[derive(Debug)] +pub enum JsonRpcRequestEnum { + Batch(Vec), + Single(JsonRpcRequest), +} + +impl<'de> Deserialize<'de> for JsonRpcRequestEnum { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + #[derive(serde::Deserialize)] + #[serde(field_identifier, rename_all = "lowercase")] + enum Field { + JsonRpc, + Id, + Method, + Params, + // TODO: jsonrpc here, too? + } + + struct JsonRpcBatchVisitor; + + impl<'de> Visitor<'de> for JsonRpcBatchVisitor { + type Value = JsonRpcRequestEnum; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("JsonRpcRequestEnum") + } + + fn visit_seq(self, mut seq: V) -> Result + where + V: SeqAccess<'de>, + { + // TODO: what size should we use as the default? + let mut batch: Vec = + Vec::with_capacity(seq.size_hint().unwrap_or(10)); + + // this was easier than expected + while let Ok(Some(s)) = seq.next_element::() { + batch.push(s); + } + + Ok(JsonRpcRequestEnum::Batch(batch)) + } + + fn visit_map(self, mut map: A) -> Result + where + A: MapAccess<'de>, + { + // TODO: i feel like this should be easier + let mut id = None; + let mut method = None; + let mut params = None; + + while let Some(key) = map.next_key()? { + match key { + Field::JsonRpc => { + // throw away the value + let foo: String = map.next_value()?; + } + Field::Id => { + if id.is_some() { + return Err(de::Error::duplicate_field("id")); + } + id = Some(map.next_value()?); + } + Field::Method => { + if method.is_some() { + return Err(de::Error::duplicate_field("method")); + } + method = Some(map.next_value()?); + } + Field::Params => { + if params.is_some() { + return Err(de::Error::duplicate_field("params")); + } + params = Some(map.next_value()?); + } + } + } + let id = id.ok_or_else(|| de::Error::missing_field("id"))?; + let method = method.ok_or_else(|| de::Error::missing_field("method"))?; + let params = params.ok_or_else(|| de::Error::missing_field("params"))?; + + let single = JsonRpcRequest { id, method, params }; + + Ok(JsonRpcRequestEnum::Single(single)) + } + } + + let batch_visitor = JsonRpcBatchVisitor {}; + + deserializer.deserialize_any(batch_visitor) + } +} + +/// All jsonrpc errors use this structure +#[derive(Serialize, Clone)] +pub struct JsonRpcErrorData { + /// The error code + pub code: i64, + /// The error message + pub message: String, + /// Additional data + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, +} + +/// A complete response +#[derive(Clone, Serialize)] +pub struct JsonRpcForwardedResponse { + pub jsonrpc: String, + pub id: Box, + #[serde(skip_serializing_if = "Option::is_none")] + pub result: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +/// TODO: the default formatter takes forever to write. this is too quiet though +impl fmt::Debug for JsonRpcForwardedResponse { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("JsonRpcForwardedResponse") + .field("id", &self.id) + .finish_non_exhaustive() + } +} + +/// JSONRPC Responses can include one or many response objects. +#[derive(Clone, Serialize)] +#[serde(untagged)] +pub enum JsonRpcForwardedResponseEnum { + Single(JsonRpcForwardedResponse), + Batch(Vec), +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn this_deserialize_single() { + let input = r#"{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}"#; + + // test deserializing it directly to a single request object + let output: JsonRpcRequest = serde_json::from_str(input).unwrap(); + + assert_eq!(output.id.to_string(), "1"); + assert_eq!(output.method, "eth_blockNumber"); + assert_eq!(output.params.to_string(), "[]"); + + // test deserializing it into an enum + let output: JsonRpcRequestEnum = serde_json::from_str(input).unwrap(); + + assert!(matches!(output, JsonRpcRequestEnum::Single(_))); + } + + #[test] + fn this_deserialize_batch() { + let input = r#"[{"jsonrpc":"2.0","method":"eth_getCode","params":["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"],"id":27},{"jsonrpc":"2.0","method":"eth_getTransactionCount","params":["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"],"id":28},{"jsonrpc":"2.0","method":"eth_getBalance","params":["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"],"id":29}]"#; + + // test deserializing it directly to a batch of request objects + let output: Vec = serde_json::from_str(input).unwrap(); + + assert_eq!(output.len(), 3); + + assert_eq!(output[0].id.to_string(), "27"); + assert_eq!(output[0].method, "eth_getCode"); + assert_eq!( + output[0].params.to_string(), + r#"["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"]"# + ); + + assert_eq!(output[1].id.to_string(), "28"); + assert_eq!(output[2].id.to_string(), "29"); + + // test deserializing it into an enum + let output: JsonRpcRequestEnum = serde_json::from_str(input).unwrap(); + + assert!(matches!(output, JsonRpcRequestEnum::Batch(_))); + + assert_eq!(0, 1); + } +} diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index 00128554..534e12c5 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -1,365 +1,27 @@ +mod app; mod config; mod connection; mod connections; +mod jsonrpc; -use config::Web3ConnectionConfig; -use connection::JsonRpcErrorData; -use connection::JsonRpcForwardedResponse; -use ethers::prelude::ProviderError; -use ethers::prelude::{HttpClientError, WsClientError}; -// use ethers::providers::transports::common::JsonRpcError; - -use futures::future; -use governor::clock::{Clock, QuantaClock}; -use linkedhashmap::LinkedHashMap; -use std::fmt; use std::fs; -use std::sync::atomic::{self, AtomicU64}; use std::sync::Arc; -use std::time::Duration; -use tokio::sync::RwLock; -use tokio::time::sleep; -use tracing::{info, warn}; +use tracing::{info, trace, warn}; use warp::Filter; use warp::Reply; +use crate::app::Web3ProxyApp; use crate::config::{CliConfig, RpcConfig}; -use crate::connection::JsonRpcRequest; -use crate::connections::Web3Connections; - -static APP_USER_AGENT: &str = concat!( - "satoshiandkin/", - env!("CARGO_PKG_NAME"), - "/", - env!("CARGO_PKG_VERSION"), -); - -// TODO: put this in config? what size should we do? -const RESPONSE_CACHE_CAP: usize = 1024; - -/// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work -type ResponseLruCache = RwLock>; - -/// The application -// TODO: this debug impl is way too verbose. make something smaller -// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs -pub struct Web3ProxyApp { - best_head_block_number: Arc, - /// clock used for rate limiting - /// TODO: use tokio's clock (will require a different ratelimiting crate) - clock: QuantaClock, - /// Send requests to the best server available - balanced_rpc_tiers: Vec>, - /// Send private requests (like eth_sendRawTransaction) to all these servers - private_rpcs: Option>, - response_cache: ResponseLruCache, -} - -impl fmt::Debug for Web3ProxyApp { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though - f.debug_struct("Web3ProxyApp") - .field( - "best_head_block_number", - &self.best_head_block_number.load(atomic::Ordering::Relaxed), - ) - .finish_non_exhaustive() - } -} - -impl Web3ProxyApp { - async fn try_new( - balanced_rpc_tiers: Vec>, - private_rpcs: Vec, - ) -> anyhow::Result { - let clock = QuantaClock::default(); - - let best_head_block_number = Arc::new(AtomicU64::new(0)); - - // make a http shared client - // TODO: how should we configure the connection pool? - // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server - let http_client = reqwest::ClientBuilder::new() - .connect_timeout(Duration::from_secs(5)) - .timeout(Duration::from_secs(300)) - .user_agent(APP_USER_AGENT) - .build()?; - - // TODO: attach context to this error - let balanced_rpc_tiers = - future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| { - Web3Connections::try_new( - best_head_block_number.clone(), - balanced_rpc_tier, - Some(http_client.clone()), - &clock, - ) - })) - .await - .into_iter() - .collect::>>>()?; - - // TODO: attach context to this error - let private_rpcs = if private_rpcs.is_empty() { - warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); - // TODO: instead of None, set it to a list of all the rpcs from balanced_rpc_tiers. that way we broadcast very loudly - None - } else { - Some( - Web3Connections::try_new( - best_head_block_number.clone(), - private_rpcs, - Some(http_client), - &clock, - ) - .await?, - ) - }; - - Ok(Web3ProxyApp { - best_head_block_number, - clock, - balanced_rpc_tiers, - private_rpcs, - response_cache: Default::default(), - }) - } - - /// send the request to the approriate RPCs - /// TODO: dry this up - async fn proxy_web3_rpc( - self: Arc, - json_body: JsonRpcRequest, - ) -> anyhow::Result { - if self.private_rpcs.is_some() && json_body.method == "eth_sendRawTransaction" { - let private_rpcs = self.private_rpcs.as_ref().unwrap(); - - // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs - loop { - // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit - match private_rpcs.get_upstream_servers() { - Ok(upstream_servers) => { - let (tx, rx) = flume::unbounded(); - - let connections = private_rpcs.clone(); - let method = json_body.method.clone(); - let params = json_body.params.clone(); - - // TODO: benchmark this compared to waiting on unbounded futures - tokio::spawn(async move { - connections - .try_send_requests(upstream_servers, method, params, tx) - .await - }); - - // wait for the first response - let backend_response = rx.recv_async().await?; - - if let Ok(backend_response) = backend_response { - // TODO: i think we - let response = JsonRpcForwardedResponse { - jsonrpc: "2.0".to_string(), - id: json_body.id, - result: Some(backend_response), - error: None, - }; - return Ok(warp::reply::json(&response)); - } - } - Err(not_until) => { - // TODO: move this to a helper function - // sleep (with a lock) until our rate limits should be available - if let Some(not_until) = not_until { - let deadline = not_until.wait_time_from(self.clock.now()); - - sleep(deadline).await; - } - } - }; - } - } else { - // this is not a private transaction (or no private relays are configured) - // try to send to each tier, stopping at the first success - // if no tiers are synced, fallback to privates - loop { - // there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again - let mut earliest_not_until = None; - - // TODO: how can we better build this iterator? - let rpc_iter = if let Some(private_rpcs) = self.private_rpcs.as_ref() { - self.balanced_rpc_tiers.iter().chain(vec![private_rpcs]) - } else { - self.balanced_rpc_tiers.iter().chain(vec![]) - }; - - for balanced_rpcs in rpc_iter { - let best_head_block_number = - self.best_head_block_number.load(atomic::Ordering::Acquire); // TODO: we don't store current block for everything anymore. we store it on the connections - - let best_rpc_block_number = balanced_rpcs.head_block_number(); - - if best_rpc_block_number < best_head_block_number { - continue; - } - - // TODO: building this cache key is slow and its large, but i don't see a better way right now - // TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block - let cache_key = ( - best_head_block_number, - json_body.method.clone(), - json_body.params.to_string(), - ); - - if let Some(cached) = self.response_cache.read().await.get(&cache_key) { - // TODO: this still serializes every time - return Ok(warp::reply::json(cached)); - } - - // TODO: what allowed lag? - match balanced_rpcs.next_upstream_server().await { - Ok(upstream_server) => { - let response = balanced_rpcs - .try_send_request( - upstream_server, - &json_body.method, - &json_body.params, - ) - .await; - - let response = match response { - Ok(partial_response) => { - // TODO: trace here was really slow with millions of requests. - // info!("forwarding request from {}", upstream_server); - - let response = JsonRpcForwardedResponse { - jsonrpc: "2.0".to_string(), - id: json_body.id, - // TODO: since we only use the result here, should that be all we return from try_send_request? - result: Some(partial_response), - error: None, - }; - - // TODO: small race condidition here. parallel requests with the same query will both be saved to the cache - let mut response_cache = self.response_cache.write().await; - - // TODO: cache the warp::reply to save us serializing every time - response_cache.insert(cache_key, response.clone()); - if response_cache.len() >= RESPONSE_CACHE_CAP { - response_cache.pop_front(); - } - - response - } - Err(e) => { - let code; - let message: String; - let data; - - match e { - ProviderError::JsonRpcClientError(e) => { - // TODO: we should check what type the provider is rather than trying to downcast both types of errors - if let Some(e) = e.downcast_ref::() { - match &*e { - HttpClientError::JsonRpcError(e) => { - code = e.code; - message = e.message.clone(); - data = e.data.clone(); - } - e => { - // TODO: improve this - code = -32603; - message = format!("{}", e); - data = None; - } - } - } else if let Some(e) = - e.downcast_ref::() - { - match &*e { - WsClientError::JsonRpcError(e) => { - code = e.code; - message = e.message.clone(); - data = e.data.clone(); - } - e => { - // TODO: improve this - code = -32603; - message = format!("{}", e); - data = None; - } - } - } else { - unimplemented!(); - } - } - _ => { - code = -32603; - message = format!("{}", e); - data = None; - } - } - - JsonRpcForwardedResponse { - jsonrpc: "2.0".to_string(), - id: json_body.id, - result: None, - error: Some(JsonRpcErrorData { - code, - message, - data, - }), - } - } - }; - - return Ok(warp::reply::json(&response)); - } - Err(None) => { - // TODO: this is too verbose. if there are other servers in other tiers, we use those! - // warn!("No servers in sync!"); - } - Err(Some(not_until)) => { - // save the smallest not_until. if nothing succeeds, return an Err with not_until in it - // TODO: helper function for this - if earliest_not_until.is_none() { - earliest_not_until.replace(not_until); - } else { - let earliest_possible = - earliest_not_until.as_ref().unwrap().earliest_possible(); - - let new_earliest_possible = not_until.earliest_possible(); - - if earliest_possible > new_earliest_possible { - earliest_not_until = Some(not_until); - } - } - } - } - } - - // we haven't returned an Ok - // if we did return a rate limit error, sleep and try again - if let Some(earliest_not_until) = earliest_not_until { - let deadline = earliest_not_until.wait_time_from(self.clock.now()); - - sleep(deadline).await; - } else { - // TODO: how long should we wait? - // TODO: max wait time? - warn!("No servers in sync!"); - // TODO: return json error? return a 502? - return Err(anyhow::anyhow!("no servers in sync")); - }; - } - } - } -} #[tokio::main] async fn main() -> anyhow::Result<()> { // install global collector configured based on RUST_LOG env var. tracing_subscriber::fmt::init(); + info!("test info"); + warn!("test warn"); + trace!("test trace"); + let cli_config: CliConfig = argh::from_env(); info!("Loading rpc config @ {}", cli_config.rpc_config_path); @@ -391,14 +53,14 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -/// convert result into an http response. use this at the end of your warp filter -/// TODO: using boxes can't be the best way. think about this more +/// convert result into a jsonrpc error. use this at the end of your warp filter fn handle_anyhow_errors( res: anyhow::Result, ) -> warp::http::Response { match res { Ok(r) => r.into_response(), Err(e) => warp::reply::with_status( + // TODO: json error format!("{}", e), warp::http::StatusCode::INTERNAL_SERVER_ERROR, )