From 6ab2b3a533929dfdfa6913330cf8afc9c66a19d2 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 16 May 2022 19:15:40 +0000 Subject: [PATCH] in-flight request checks --- Cargo.lock | 1 + Cargo.toml | 5 +- web3-proxy/Cargo.toml | 1 + web3-proxy/src/app.rs | 165 +++++++++++++++++++++++------------ web3-proxy/src/connection.rs | 46 ++++++---- 5 files changed, 142 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0f2d1221..d2c6b736 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3965,6 +3965,7 @@ version = "0.1.0" dependencies = [ "anyhow", "argh", + "dashmap", "derive_more", "ethers", "flume", diff --git a/Cargo.toml b/Cargo.toml index 810e197d..2a483cf2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ ] # TODO: enable these once rapid development is done -[profile.release] +# TODO: we can't do panic = abort because the websockets disconnect by panicing sometimes +#[profile.release] +#panic = abort #lto = true -panic = "abort" diff --git a/web3-proxy/Cargo.toml b/web3-proxy/Cargo.toml index 31330e58..69bc3d5c 100644 --- a/web3-proxy/Cargo.toml +++ b/web3-proxy/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" anyhow = "1.0.57" argh = "0.1.7" # axum = "*" # TODO: use this instead of warp? +dashmap = "5.3.3" derive_more = "0.99.17" ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] } flume = "0.10.12" diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 2761abcc..34e149b0 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -5,6 +5,7 @@ use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; +use dashmap::DashMap; use ethers::prelude::{HttpClientError, ProviderError, WsClientError, H256}; use futures::future::join_all; use governor::clock::{Clock, QuantaClock}; @@ -13,6 +14,7 @@ use parking_lot::RwLock; use std::fmt; use std::sync::Arc; use std::time::Duration; +use tokio::sync::watch; use tokio::time::sleep; use tracing::{trace, warn}; @@ -27,8 +29,9 @@ static APP_USER_AGENT: &str = concat!( const RESPONSE_CACHE_CAP: usize = 1024; /// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work -type ResponseLruCache = - RwLock), JsonRpcForwardedResponse>>; +type CacheKey = (H256, String, Option); + +type ResponseLruCache = RwLock>; /// The application // TODO: this debug impl is way too verbose. make something smaller @@ -41,6 +44,7 @@ pub struct Web3ProxyApp { balanced_rpcs: Arc, /// Send private requests (like eth_sendRawTransaction) to all these servers private_rpcs: Arc, + active_requests: DashMap>, response_cache: ResponseLruCache, } @@ -91,6 +95,7 @@ impl Web3ProxyApp { clock, balanced_rpcs, private_rpcs, + active_requests: Default::default(), response_cache: Default::default(), }) } @@ -154,64 +159,62 @@ impl Web3ProxyApp { if request.method == "eth_sendRawTransaction" { // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs - loop { - // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit - match self.private_rpcs.get_upstream_servers() { - Ok(active_request_handles) => { - let (tx, rx) = flume::unbounded(); + // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit + match self.private_rpcs.get_upstream_servers() { + Ok(active_request_handles) => { + let (tx, rx) = flume::unbounded(); - let connections = self.private_rpcs.clone(); - let method = request.method.clone(); - let params = request.params.clone(); + let connections = self.private_rpcs.clone(); + let method = request.method.clone(); + let params = request.params.clone(); - // TODO: benchmark this compared to waiting on unbounded futures - // TODO: do something with this handle? - tokio::spawn(async move { - connections - .try_send_parallel_requests( - active_request_handles, - method, - params, - tx, - ) - .await - }); + // TODO: benchmark this compared to waiting on unbounded futures + // TODO: do something with this handle? + tokio::spawn(async move { + connections + .try_send_parallel_requests(active_request_handles, method, params, tx) + .await + }); - // wait for the first response - let backend_response = rx.recv_async().await?; + // wait for the first response + // TODO: we don't want the first response. we want the quorum response + let backend_response = rx.recv_async().await?; - if let Ok(backend_response) = backend_response { - // TODO: i think we - let response = JsonRpcForwardedResponse { - jsonrpc: "2.0".to_string(), - id: request.id, - result: Some(backend_response), - error: None, - }; - return Ok(response); - } + if let Ok(backend_response) = backend_response { + // TODO: i think we + let response = JsonRpcForwardedResponse { + jsonrpc: "2.0".to_string(), + id: request.id, + result: Some(backend_response), + error: None, + }; + return Ok(response); } - Err(None) => { - // TODO: return a 502? - return Err(anyhow::anyhow!("no private rpcs!")); - } - Err(Some(not_until)) => { - // TODO: move this to a helper function - // sleep (TODO: with a lock?) until our rate limits should be available - // TODO: if a server catches up sync while we are waiting, we could stop waiting - let deadline = not_until.wait_time_from(self.clock.now()); + } + Err(None) => { + // TODO: return a 502? + return Err(anyhow::anyhow!("no private rpcs!")); + } + Err(Some(not_until)) => { + // TODO: move this to a helper function + // sleep (TODO: with a lock?) until our rate limits should be available + // TODO: if a server catches up sync while we are waiting, we could stop waiting + let deadline = not_until.wait_time_from(self.clock.now()); - sleep(deadline).await; - } - }; - } + let deadline = deadline.min(Duration::from_millis(200)); + + sleep(deadline).await; + + warn!("All rate limits exceeded. Sleeping"); + } + }; } else { // this is not a private transaction (or no private relays are configured) - // try to send to each tier, stopping at the first success - // if no tiers are synced, fallback to privates - // TODO: think more about this loop. - loop { - // todo: bring back this caching + // TODO: how much should we retry? + for _ in 0..10 { + // TODO: think more about this loop. + + // todo: move getting a cache_key or the result into a helper function. then we could have multiple caches let best_block_hash = self .balanced_rpcs .get_synced_rpcs() @@ -227,12 +230,38 @@ impl Web3ProxyApp { request.params.clone().map(|x| x.to_string()), ); + // first check to see if this is cached if let Some(cached) = self.response_cache.read().get(&cache_key) { - // TODO: this still serializes every time - // TODO: return a reference in the other places so that this works without a clone? + let _ = self.active_requests.remove(&cache_key); + return Ok(cached.to_owned()); } + // check if this request is already in flight + let (in_flight_tx, in_flight_rx) = watch::channel(true); + let mut other_in_flight_rx = None; + match self.active_requests.entry(cache_key.clone()) { + dashmap::mapref::entry::Entry::Occupied(entry) => { + other_in_flight_rx = Some(entry.get().clone()); + } + dashmap::mapref::entry::Entry::Vacant(entry) => { + entry.insert(in_flight_rx); + } + } + + if let Some(mut other_in_flight_rx) = other_in_flight_rx { + // wait for the other request to finish. it can finish successfully or with an error + let _ = other_in_flight_rx.changed().await; + + // now that we've waited, lets check the cache again + if let Some(cached) = self.response_cache.read().get(&cache_key) { + let _ = self.active_requests.remove(&cache_key); + let _ = in_flight_tx.send(false); + + return Ok(cached.to_owned()); + } + } + match self.balanced_rpcs.next_upstream_server().await { Ok(active_request_handle) => { let response = active_request_handle @@ -256,7 +285,7 @@ impl Web3ProxyApp { let mut response_cache = self.response_cache.write(); // TODO: cache the warp::reply to save us serializing every time - response_cache.insert(cache_key, response.clone()); + response_cache.insert(cache_key.clone(), response.clone()); if response_cache.len() >= RESPONSE_CACHE_CAP { // TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block response_cache.pop_front(); @@ -264,9 +293,16 @@ impl Web3ProxyApp { drop(response_cache); + // TODO: needing to remove manually here makes me think we should do this differently + let _ = self.active_requests.remove(&cache_key); + let _ = in_flight_tx.send(false); + response } Err(e) => { + // send now since we aren't going to cache an error response + let _ = in_flight_tx.send(false); + // TODO: move this to a helper function? let code; let message: String; @@ -333,11 +369,20 @@ impl Web3ProxyApp { trace!("Sending reply: {:?}", response); } + // TODO: needing to remove manually here makes me think we should do this differently + let _ = self.active_requests.remove(&cache_key); + let _ = in_flight_tx.send(false); + return Ok(response); } Err(None) => { // TODO: this is too verbose. if there are other servers in other tiers, we use those! warn!("No servers in sync!"); + + // TODO: needing to remove manually here makes me think we should do this differently + let _ = self.active_requests.remove(&cache_key); + let _ = in_flight_tx.send(false); + return Err(anyhow::anyhow!("no servers in sync")); } Err(Some(not_until)) => { @@ -346,10 +391,20 @@ impl Web3ProxyApp { // TODO: if a server catches up sync while we are waiting, we could stop waiting let deadline = not_until.wait_time_from(self.clock.now()); + let deadline = deadline.min(Duration::from_millis(200)); + sleep(deadline).await; + + warn!("All rate limits exceeded. Sleeping"); } } + + // TODO: needing to remove manually here makes me think we should do this differently + let _ = self.active_requests.remove(&cache_key); + let _ = in_flight_tx.send(false); } } + + Err(anyhow::anyhow!("internal error")) } } diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 6ffff20e..90809b81 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -199,6 +199,7 @@ impl Web3Connection { // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints // TODO: what should this interval be? probably some fraction of block time. set automatically? // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now + // TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though let mut interval = interval(Duration::from_secs(2)); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); @@ -209,27 +210,32 @@ impl Web3Connection { // TODO: if error or rate limit, increase interval? interval.tick().await; - let active_request_handle = self.wait_for_request_handle().await; + match self.try_request_handle() { + Ok(active_request_handle) => { + // TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest" + let block: Result, _> = provider + .request("eth_getBlockByNumber", ("latest", false)) + .await; - // TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest" - let block: Result, _> = provider - .request("eth_getBlockByNumber", ("latest", false)) - .await; + drop(active_request_handle); - drop(active_request_handle); + // don't send repeat blocks + if let Ok(block) = &block { + let new_hash = block.hash.unwrap(); - // don't send repeat blocks - if let Ok(block) = &block { - let new_hash = block.hash.unwrap(); + if new_hash == last_hash { + continue; + } - if new_hash == last_hash { - continue; + last_hash = new_hash; + } + + self.send_block(block, &block_sender).await; + } + Err(e) => { + warn!("Failed getting latest block from {}: {:?}", self, e); } - - last_hash = new_hash; } - - self.send_block(block, &block_sender).await; } } Web3Provider::Ws(provider) => { @@ -248,12 +254,10 @@ impl Web3Connection { // there is a very small race condition here where the stream could send us a new block right now // all it does is print "new block" for the same block as current block // TODO: rate limit! - let block: Result, _> = provider + let block: Result, _> = active_request_handle .request("eth_getBlockByNumber", ("latest", false)) .await; - drop(active_request_handle); - self.send_block(block, &block_sender).await; while let Some(new_block) = stream.next().await { @@ -269,7 +273,8 @@ impl Web3Connection { pub async fn wait_for_request_handle(self: &Arc) -> ActiveRequestHandle { // TODO: maximum wait time - loop { + + for _ in 0..10 { match self.try_request_handle() { Ok(pending_request_handle) => return pending_request_handle, Err(not_until) => { @@ -279,6 +284,9 @@ impl Web3Connection { } } } + + // TODO: what should we do? + panic!("no request handle after 10 tries"); } pub fn try_request_handle(