diff --git a/Cargo.toml b/Cargo.toml index 71b1e1ce..addd8055 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "web3-proxy", ] -[profile.release] -lto = true -panic = "abort" +# TODO: enable these once rapid development is done +#[profile.release] +#lto = true +#panic = "abort" diff --git a/config/example.toml b/config/example.toml index 0792c927..7168593e 100644 --- a/config/example.toml +++ b/config/example.toml @@ -1,20 +1,18 @@ [shared] chain_id = 1 -[balanced_rpc_tiers] +[balanced_rpcs] -[balanced_rpc_tiers.0] - - [balanced_rpc_tiers.0.erigon_archive] + [balanced_rpcs.erigon_archive] url = "ws://127.0.0.1:8549" # TODO: double check soft_limit on erigon soft_limit = 100_000 - [balanced_rpc_tiers.0.geth] + [balanced_rpcs.geth] url = "ws://127.0.0.1:8546" soft_limit = 200_000 - [balanced_rpc_tiers.0.ankr] + [balanced_rpcs.ankr] url = "https://rpc.ankr.com/eth" soft_limit = 3_000 diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 700079e2..5cdc58ef 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -7,15 +7,14 @@ use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; use ethers::prelude::ProviderError; use ethers::prelude::{HttpClientError, WsClientError}; -use futures::future; use futures::future::join_all; use governor::clock::{Clock, QuantaClock}; use linkedhashmap::LinkedHashMap; +use parking_lot::RwLock; use std::fmt; -use std::sync::atomic::{self, AtomicU64}; +use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::time::Duration; -use tokio::sync::RwLock; use tokio::time::sleep; use tracing::{trace, warn}; @@ -37,33 +36,27 @@ type ResponseLruCache = // TODO: this debug impl is way too verbose. make something smaller // TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs pub struct Web3ProxyApp { - best_head_block_number: Arc, /// clock used for rate limiting - /// TODO: use tokio's clock (will require a different ratelimiting crate) + /// TODO: use tokio's clock? (will require a different ratelimiting crate) clock: QuantaClock, /// Send requests to the best server available - balanced_rpc_tiers: Vec>, + balanced_rpcs: Arc, /// Send private requests (like eth_sendRawTransaction) to all these servers - private_rpcs: Option>, + private_rpcs: Arc, response_cache: ResponseLruCache, } impl fmt::Debug for Web3ProxyApp { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default formatter takes forever to write. this is too quiet though - f.debug_struct("Web3ProxyApp") - .field( - "best_head_block_number", - &self.best_head_block_number.load(atomic::Ordering::Relaxed), - ) - .finish_non_exhaustive() + f.debug_struct("Web3ProxyApp").finish_non_exhaustive() } } impl Web3ProxyApp { pub async fn try_new( chain_id: usize, - balanced_rpc_tiers: Vec>, + balanced_rpcs: Vec, private_rpcs: Vec, ) -> anyhow::Result { let clock = QuantaClock::default(); @@ -80,44 +73,35 @@ impl Web3ProxyApp { .build()?; // TODO: attach context to this error - let balanced_rpc_tiers = - future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| { - Web3Connections::try_new( - chain_id, - best_head_block_number.clone(), - balanced_rpc_tier, - Some(http_client.clone()), - &clock, - true, - ) - })) - .await - .into_iter() - .collect::>>>()?; + let balanced_rpcs = Web3Connections::try_new( + chain_id, + best_head_block_number.clone(), + balanced_rpcs, + Some(http_client.clone()), + &clock, + true, + ) + .await?; // TODO: attach context to this error let private_rpcs = if private_rpcs.is_empty() { warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); - // TODO: instead of None, set it to a list of all the rpcs from balanced_rpc_tiers. that way we broadcast very loudly - None + balanced_rpcs.clone() } else { - Some( - Web3Connections::try_new( - chain_id, - best_head_block_number.clone(), - private_rpcs, - Some(http_client), - &clock, - false, - ) - .await?, + Web3Connections::try_new( + chain_id, + best_head_block_number.clone(), + private_rpcs, + Some(http_client), + &clock, + false, ) + .await? }; Ok(Web3ProxyApp { - best_head_block_number, clock, - balanced_rpc_tiers, + balanced_rpcs, private_rpcs, response_cache: Default::default(), }) @@ -180,17 +164,15 @@ impl Web3ProxyApp { // TODO: apparently json_body can be a vec of multiple requests. should we split them up? we need to respond with a Vec too - if self.private_rpcs.is_some() && request.method == "eth_sendRawTransaction" { - let private_rpcs = self.private_rpcs.as_ref().unwrap(); - + if request.method == "eth_sendRawTransaction" { // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs loop { // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit - match private_rpcs.get_upstream_servers() { + match self.private_rpcs.get_upstream_servers() { Ok(active_request_handles) => { let (tx, rx) = flume::unbounded(); - let connections = private_rpcs.clone(); + let connections = self.private_rpcs.clone(); let method = request.method.clone(); let params = request.params.clone(); @@ -221,18 +203,17 @@ impl Web3ProxyApp { return Ok(response); } } - Err(not_until) => { + Err(None) => { + // TODO: return a 502? + return Err(anyhow::anyhow!("no private rpcs!")); + } + Err(Some(not_until)) => { // TODO: move this to a helper function // sleep (TODO: with a lock?) until our rate limits should be available // TODO: if a server catches up sync while we are waiting, we could stop waiting - if let Some(not_until) = not_until { - let deadline = not_until.wait_time_from(self.clock.now()); + let deadline = not_until.wait_time_from(self.clock.now()); - sleep(deadline).await; - } else { - // TODO: what should we do here? - return Err(anyhow::anyhow!("no private rpcs!")); - } + sleep(deadline).await; } }; } @@ -241,181 +222,138 @@ impl Web3ProxyApp { // try to send to each tier, stopping at the first success // if no tiers are synced, fallback to privates loop { - // there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again - let mut earliest_not_until = None; + let best_block_number = self.balanced_rpcs.head_block_number(); - // TODO: how can we better build this iterator? - let rpc_iter = if let Some(private_rpcs) = self.private_rpcs.as_ref() { - self.balanced_rpc_tiers.iter().chain(vec![private_rpcs]) - } else { - self.balanced_rpc_tiers.iter().chain(vec![]) - }; + // TODO: building this cache key is slow and its large, but i don't see a better way right now + // TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block + let cache_key = ( + best_block_number, + request.method.clone(), + request.params.clone().map(|x| x.to_string()), + ); - for balanced_rpcs in rpc_iter { - let best_head_block_number = - self.best_head_block_number.load(atomic::Ordering::Acquire); - - let best_rpc_block_number = balanced_rpcs.head_block_number(); - - if best_rpc_block_number < best_head_block_number { - continue; - } - - // TODO: building this cache key is slow and its large, but i don't see a better way right now - // TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block - let cache_key = ( - best_head_block_number, - request.method.clone(), - request.params.clone().map(|x| x.to_string()), - ); - - if let Some(cached) = self.response_cache.read().await.get(&cache_key) { - // TODO: this still serializes every time - // TODO: return a reference in the other places so that this works without a clone? - return Ok(cached.to_owned()); - } - - // TODO: what allowed lag? - match balanced_rpcs.next_upstream_server().await { - Ok(active_request_handle) => { - let response = active_request_handle - .request(&request.method, &request.params) - .await; - - let response = match response { - Ok(partial_response) => { - // TODO: trace here was really slow with millions of requests. - // info!("forwarding request from {}", upstream_server); - - let response = JsonRpcForwardedResponse { - jsonrpc: "2.0".to_string(), - id: request.id, - // TODO: since we only use the result here, should that be all we return from try_send_request? - result: Some(partial_response), - error: None, - }; - - // TODO: small race condidition here. parallel requests with the same query will both be saved to the cache - let mut response_cache = self.response_cache.write().await; - - // TODO: cache the warp::reply to save us serializing every time - response_cache.insert(cache_key, response.clone()); - if response_cache.len() >= RESPONSE_CACHE_CAP { - // TODO: this isn't really an LRU. what is this called? should we make it an lru? these caches only live for one block - response_cache.pop_front(); - } - - response - } - Err(e) => { - // TODO: move this to a helper function? - let code; - let message: String; - let data; - - match e { - ProviderError::JsonRpcClientError(e) => { - // TODO: we should check what type the provider is rather than trying to downcast both types of errors - if let Some(e) = e.downcast_ref::() { - match &*e { - HttpClientError::JsonRpcError(e) => { - code = e.code; - message = e.message.clone(); - data = e.data.clone(); - } - e => { - // TODO: improve this - code = -32603; - message = format!("{}", e); - data = None; - } - } - } else if let Some(e) = - e.downcast_ref::() - { - match &*e { - WsClientError::JsonRpcError(e) => { - code = e.code; - message = e.message.clone(); - data = e.data.clone(); - } - e => { - // TODO: improve this - code = -32603; - message = format!("{}", e); - data = None; - } - } - } else { - unimplemented!(); - } - } - _ => { - code = -32603; - message = format!("{}", e); - data = None; - } - } - - JsonRpcForwardedResponse { - jsonrpc: "2.0".to_string(), - id: request.id, - result: None, - error: Some(JsonRpcErrorData { - code, - message, - data, - }), - } - } - }; - - if response.error.is_some() { - trace!("Sending error reply: {:?}", response); - } else { - trace!("Sending reply: {:?}", response); - } - - return Ok(response); - } - Err(None) => { - // TODO: this is too verbose. if there are other servers in other tiers, we use those! - // warn!("No servers in sync!"); - } - Err(Some(not_until)) => { - // save the smallest not_until. if nothing succeeds, return an Err with not_until in it - // TODO: helper function for this - if earliest_not_until.is_none() { - earliest_not_until.replace(not_until); - } else { - let earliest_possible = - earliest_not_until.as_ref().unwrap().earliest_possible(); - - let new_earliest_possible = not_until.earliest_possible(); - - if earliest_possible > new_earliest_possible { - earliest_not_until = Some(not_until); - } - } - } - } + if let Some(cached) = self.response_cache.read().get(&cache_key) { + // TODO: this still serializes every time + // TODO: return a reference in the other places so that this works without a clone? + return Ok(cached.to_owned()); } - // we haven't returned an Ok - // if we did return a rate limit error, sleep and try again - if let Some(earliest_not_until) = earliest_not_until { - let deadline = earliest_not_until.wait_time_from(self.clock.now()); + match self.balanced_rpcs.next_upstream_server().await { + Ok(active_request_handle) => { + let response = active_request_handle + .request(&request.method, &request.params) + .await; - // TODO: max wait + let response = match response { + Ok(partial_response) => { + // TODO: trace here was really slow with millions of requests. + // info!("forwarding request from {}", upstream_server); - sleep(deadline).await; - } else { - // TODO: how long should we wait? - // TODO: max wait time? - warn!("No servers in sync!"); - // TODO: return a 502? - return Err(anyhow::anyhow!("no servers in sync")); - }; + let response = JsonRpcForwardedResponse { + jsonrpc: "2.0".to_string(), + id: request.id, + // TODO: since we only use the result here, should that be all we return from try_send_request? + result: Some(partial_response), + error: None, + }; + + // TODO: small race condidition here. parallel requests with the same query will both be saved to the cache + let mut response_cache = self.response_cache.write(); + + // TODO: cache the warp::reply to save us serializing every time + response_cache.insert(cache_key, response.clone()); + if response_cache.len() >= RESPONSE_CACHE_CAP { + // TODO: this isn't really an LRU. what is this called? should we make it an lru? these caches only live for one block + response_cache.pop_front(); + } + + drop(response_cache); + + response + } + Err(e) => { + // TODO: move this to a helper function? + let code; + let message: String; + let data; + + match e { + ProviderError::JsonRpcClientError(e) => { + // TODO: we should check what type the provider is rather than trying to downcast both types of errors + if let Some(e) = e.downcast_ref::() { + match &*e { + HttpClientError::JsonRpcError(e) => { + code = e.code; + message = e.message.clone(); + data = e.data.clone(); + } + e => { + // TODO: improve this + code = -32603; + message = format!("{}", e); + data = None; + } + } + } else if let Some(e) = e.downcast_ref::() { + match &*e { + WsClientError::JsonRpcError(e) => { + code = e.code; + message = e.message.clone(); + data = e.data.clone(); + } + e => { + // TODO: improve this + code = -32603; + message = format!("{}", e); + data = None; + } + } + } else { + unimplemented!(); + } + } + _ => { + code = -32603; + message = format!("{}", e); + data = None; + } + } + + JsonRpcForwardedResponse { + jsonrpc: "2.0".to_string(), + id: request.id, + result: None, + error: Some(JsonRpcErrorData { + code, + message, + data, + }), + } + } + }; + + if response.error.is_some() { + trace!("Sending error reply: {:?}", response); + } else { + trace!("Sending reply: {:?}", response); + } + + return Ok(response); + } + Err(None) => { + // TODO: this is too verbose. if there are other servers in other tiers, we use those! + warn!("No servers in sync!"); + return Err(anyhow::anyhow!("no servers in sync")); + } + Err(Some(not_until)) => { + // TODO: move this to a helper function + // sleep (TODO: with a lock?) until our rate limits should be available + // TODO: if a server catches up sync while we are waiting, we could stop waiting + let deadline = not_until.wait_time_from(self.clock.now()); + + sleep(deadline).await; + } + } } } } diff --git a/web3-proxy/src/config.rs b/web3-proxy/src/config.rs index dae3644b..f92d339a 100644 --- a/web3-proxy/src/config.rs +++ b/web3-proxy/src/config.rs @@ -1,7 +1,7 @@ use argh::FromArgs; use governor::clock::QuantaClock; use serde::Deserialize; -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::sync::Arc; use crate::connection::Web3Connection; @@ -22,8 +22,7 @@ pub struct CliConfig { #[derive(Deserialize)] pub struct RpcConfig { pub shared: RpcSharedConfig, - // BTreeMap so that iterating keeps the same order. we want tier 0 before tier 1! - pub balanced_rpc_tiers: BTreeMap>, + pub balanced_rpcs: HashMap, pub private_rpcs: Option>, } @@ -44,11 +43,7 @@ pub struct Web3ConnectionConfig { impl RpcConfig { /// Create a Web3ProxyApp from config pub async fn try_build(self) -> anyhow::Result { - let balanced_rpc_tiers = self - .balanced_rpc_tiers - .into_values() - .map(|x| x.into_values().collect()) - .collect(); + let balanced_rpcs = self.balanced_rpcs.into_values().collect(); let private_rpcs = if let Some(private_rpcs) = self.private_rpcs { private_rpcs.into_values().collect() @@ -56,7 +51,7 @@ impl RpcConfig { vec![] }; - Web3ProxyApp::try_new(self.shared.chain_id, balanced_rpc_tiers, private_rpcs).await + Web3ProxyApp::try_new(self.shared.chain_id, balanced_rpcs, private_rpcs).await } } diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index fb03b941..38b73a5a 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -45,6 +45,7 @@ pub struct Web3Connection { /// used for load balancing to the least loaded server soft_limit: u32, head_block_number: AtomicU64, + /// the same clock that is used by the rate limiter clock: QuantaClock, } @@ -268,6 +269,7 @@ impl Web3Connection { } pub async fn wait_for_request_handle(self: &Arc) -> ActiveRequestHandle { + // TODO: maximum wait time loop { match self.try_request_handle() { Ok(pending_request_handle) => return pending_request_handle, @@ -288,7 +290,7 @@ impl Web3Connection { match ratelimiter.check() { Ok(_) => { // rate limit succeeded - return Ok(ActiveRequestHandle(self.clone())); + return Ok(ActiveRequestHandle::new(self.clone())); } Err(not_until) => { // rate limit failed diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 7bdbee7d..9601cc70 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -345,7 +345,7 @@ impl Web3Connections { } /// get all rpc servers that are not rate limited - /// even fetches if they aren't in sync. This is useful for broadcasting signed transactions + /// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions pub fn get_upstream_servers( &self, ) -> Result, Option>> {