diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index f68676c5..9f3d04df 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -14,7 +14,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::watch; use tokio::task; -use tokio::time::{sleep, timeout}; +use tokio::time::timeout; use tracing::{debug, info, instrument, trace, warn}; static APP_USER_AGENT: &str = concat!( @@ -239,158 +239,89 @@ impl Web3ProxyApp { // TODO: how much should we retry? probably with a timeout and not with a count like this // TODO: think more about this loop. - for _i in 0..10usize { - // // TODO: add more to this span - // let span = info_span!("i", ?i); - // let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon) - if request.method == "eth_sendRawTransaction" { - // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs - // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit - return self - .private_rpcs - .try_send_all_upstream_servers(request) - .await; - } else { - // this is not a private transaction (or no private relays are configured) + // // TODO: add more to this span + // let span = info_span!("i", ?i); + // let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon) + if request.method == "eth_sendRawTransaction" { + // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs + // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit + return self + .private_rpcs + .try_send_all_upstream_servers(request) + .await; + } else { + // this is not a private transaction (or no private relays are configured) - let (cache_key, response_cache) = match self.get_cached_response(&request) { - (cache_key, Ok(response)) => { - let _ = self.incoming_requests.remove(&cache_key); + let (cache_key, response_cache) = match self.get_cached_response(&request) { + (cache_key, Ok(response)) => { + let _ = self.incoming_requests.remove(&cache_key); - return Ok(response); - } - (cache_key, Err(response_cache)) => (cache_key, response_cache), - }; - - // check if this request is already in flight - // TODO: move this logic into an IncomingRequestHandler (ActiveRequestHandler has an rpc, but this won't) - let (incoming_tx, incoming_rx) = watch::channel(true); - let mut other_incoming_rx = None; - match self.incoming_requests.entry(cache_key.clone()) { - dashmap::mapref::entry::Entry::Occupied(entry) => { - other_incoming_rx = Some(entry.get().clone()); - } - dashmap::mapref::entry::Entry::Vacant(entry) => { - entry.insert(incoming_rx); - } + return Ok(response); } + (cache_key, Err(response_cache)) => (cache_key, response_cache), + }; - if let Some(mut other_incoming_rx) = other_incoming_rx { - // wait for the other request to finish. it might have finished successfully or with an error - trace!("{:?} waiting on in-flight request", request); - - let _ = other_incoming_rx.changed().await; - - // now that we've waited, lets check the cache again - if let Some(cached) = response_cache.read().get(&cache_key) { - let _ = self.incoming_requests.remove(&cache_key); - let _ = incoming_tx.send(false); - - // TODO: emit a stat - trace!( - "{:?} cache hit after waiting for in-flight request!", - request - ); - - return Ok(cached.to_owned()); - } else { - // TODO: emit a stat - trace!( - "{:?} cache miss after waiting for in-flight request!", - request - ); - } + // check if this request is already in flight + // TODO: move this logic into an IncomingRequestHandler (ActiveRequestHandler has an rpc, but this won't) + let (incoming_tx, incoming_rx) = watch::channel(true); + let mut other_incoming_rx = None; + match self.incoming_requests.entry(cache_key.clone()) { + dashmap::mapref::entry::Entry::Occupied(entry) => { + other_incoming_rx = Some(entry.get().clone()); } - - // TODO: move this whole match to a function on self.balanced_rpcs. incoming requests checks makes it awkward - match self.balanced_rpcs.next_upstream_server().await { - Ok(active_request_handle) => { - let response = active_request_handle - .request(&request.method, &request.params) - .await; - - let response = match response { - Ok(partial_response) => { - // TODO: trace here was really slow with millions of requests. - // trace!("forwarding request from {}", upstream_server); - - let response = JsonRpcForwardedResponse { - jsonrpc: "2.0".to_string(), - id: request.id, - // TODO: since we only use the result here, should that be all we return from try_send_request? - result: Some(partial_response), - error: None, - }; - - // TODO: small race condidition here. parallel requests with the same query will both be saved to the cache - let mut response_cache = response_cache.write(); - - // TODO: cache the warp::reply to save us serializing every time - response_cache.insert(cache_key.clone(), response.clone()); - if response_cache.len() >= RESPONSE_CACHE_CAP { - // TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block - response_cache.pop_front(); - } - - drop(response_cache); - - // TODO: needing to remove manually here makes me think we should do this differently - let _ = self.incoming_requests.remove(&cache_key); - let _ = incoming_tx.send(false); - - response - } - Err(e) => { - // send now since we aren't going to cache an error response - let _ = incoming_tx.send(false); - - JsonRpcForwardedResponse::from_ethers_error(e, request.id) - } - }; - - // TODO: needing to remove manually here makes me think we should do this differently - let _ = self.incoming_requests.remove(&cache_key); - - if response.error.is_some() { - trace!("Sending error reply: {:?}", response); - - // errors already sent false to the in_flight_tx - } else { - trace!("Sending reply: {:?}", response); - - let _ = incoming_tx.send(false); - } - - return Ok(response); - } - Err(None) => { - // TODO: this is too verbose. if there are other servers in other tiers, we use those! - warn!("No servers in sync!"); - - // TODO: needing to remove manually here makes me think we should do this differently - let _ = self.incoming_requests.remove(&cache_key); - let _ = incoming_tx.send(false); - - return Err(anyhow::anyhow!("no servers in sync")); - } - Err(Some(retry_after)) => { - // TODO: move this to a helper function - // sleep (TODO: with a lock?) until our rate limits should be available - // TODO: if a server catches up sync while we are waiting, we could stop waiting - warn!("All rate limits exceeded. Sleeping"); - - sleep(retry_after).await; - - // TODO: needing to remove manually here makes me think we should do this differently - let _ = self.incoming_requests.remove(&cache_key); - let _ = incoming_tx.send(false); - - continue; - } + dashmap::mapref::entry::Entry::Vacant(entry) => { + entry.insert(incoming_rx); } } - } - Err(anyhow::anyhow!("internal error")) + if let Some(mut other_incoming_rx) = other_incoming_rx { + // wait for the other request to finish. it might have finished successfully or with an error + trace!("{:?} waiting on in-flight request", request); + + let _ = other_incoming_rx.changed().await; + + // now that we've waited, lets check the cache again + if let Some(cached) = response_cache.read().get(&cache_key) { + let _ = self.incoming_requests.remove(&cache_key); + let _ = incoming_tx.send(false); + + // TODO: emit a stat + trace!( + "{:?} cache hit after waiting for in-flight request!", + request + ); + + return Ok(cached.to_owned()); + } else { + // TODO: emit a stat + trace!( + "{:?} cache miss after waiting for in-flight request!", + request + ); + } + } + + let response = self + .balanced_rpcs + .try_send_best_upstream_server(request) + .await?; + + // TODO: small race condidition here. parallel requests with the same query will both be saved to the cache + let mut response_cache = response_cache.write(); + + // TODO: cache the warp::reply to save us serializing every time + response_cache.insert(cache_key.clone(), response.clone()); + if response_cache.len() >= RESPONSE_CACHE_CAP { + // TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block + response_cache.pop_front(); + } + + drop(response_cache); + + let _ = self.incoming_requests.remove(&cache_key); + let _ = incoming_tx.send(false); + + Ok(response) + } } } diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 98c58cdc..104c6a25 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -434,6 +434,49 @@ impl Web3Connections { Err(earliest_retry_after) } + /// be sure there is a timeout on this or it might loop forever + pub async fn try_send_best_upstream_server( + &self, + request: JsonRpcRequest, + ) -> anyhow::Result { + loop { + match self.next_upstream_server().await { + Ok(active_request_handle) => { + let response_result = active_request_handle + .request(&request.method, &request.params) + .await; + + let response = + JsonRpcForwardedResponse::from_response_result(response_result, request.id); + + if response.error.is_some() { + trace!(?response, "Sending error reply",); + // errors already sent false to the in_flight_tx + } else { + trace!(?response, "Sending reply"); + } + + return Ok(response); + } + Err(None) => { + warn!(?self, "No servers in sync!"); + + return Err(anyhow::anyhow!("no servers in sync")); + } + Err(Some(retry_after)) => { + // TODO: move this to a helper function + // sleep (TODO: with a lock?) until our rate limits should be available + // TODO: if a server catches up sync while we are waiting, we could stop waiting + warn!(?retry_after, "All rate limits exceeded. Sleeping"); + + sleep(retry_after).await; + + continue; + } + } + } + } + pub async fn try_send_all_upstream_servers( &self, request: JsonRpcRequest, @@ -463,6 +506,8 @@ impl Web3Connections { return Ok(response); } Err(None) => { + warn!(?self, "No servers in sync!"); + // TODO: return a 502? // TODO: i don't think this will ever happen return Err(anyhow::anyhow!("no available rpcs!")); diff --git a/web3-proxy/src/jsonrpc.rs b/web3-proxy/src/jsonrpc.rs index 48dfafed..e1c226bc 100644 --- a/web3-proxy/src/jsonrpc.rs +++ b/web3-proxy/src/jsonrpc.rs @@ -162,7 +162,27 @@ impl fmt::Debug for JsonRpcForwardedResponse { } impl JsonRpcForwardedResponse { - pub fn from_ethers_error(e: ProviderError, id: Box) -> Self { + pub fn from_response_result( + result: Result, ProviderError>, + id: Box, + ) -> Self { + match result { + Ok(response) => Self::from_response(response, id), + Err(e) => Self::from_ethers_error(e, id), + } + } + + pub fn from_response(partial_response: Box, id: Box) -> Self { + JsonRpcForwardedResponse { + jsonrpc: "2.0".to_string(), + id, + // TODO: since we only use the result here, should that be all we return from try_send_request? + result: Some(partial_response), + error: None, + } + } + + pub fn from_ethers_error(e: ProviderError, id: Box) -> Self { // TODO: move turning ClientError into json to a helper function? let code; let message: String;