diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 0e7a4176..1b0eed8a 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::watch; use tokio::task; -use tokio::time::sleep; +use tokio::time::{sleep, timeout}; use tracing::{debug, info, instrument, trace, warn}; static APP_USER_AGENT: &str = concat!( @@ -30,9 +30,9 @@ static APP_USER_AGENT: &str = concat!( const RESPONSE_CACHE_CAP: usize = 1024; /// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work -type CacheKey = (H256, String, Option); +type CacheKey = (Option, String, Option); -type ResponseLruCache = RwLock>; +type ResponseLrcCache = RwLock>; type ActiveRequestsMap = DashMap>; @@ -45,7 +45,7 @@ pub struct Web3ProxyApp { /// Send private requests (like eth_sendRawTransaction) to all these servers private_rpcs: Arc, active_requests: ActiveRequestsMap, - response_cache: ResponseLruCache, + response_cache: ResponseLrcCache, } impl fmt::Debug for Web3ProxyApp { @@ -56,7 +56,6 @@ impl fmt::Debug for Web3ProxyApp { } impl Web3ProxyApp { - // #[instrument(name = "try_new_Web3ProxyApp", skip_all)] pub async fn try_new( chain_id: usize, redis_address: Option, @@ -143,13 +142,17 @@ impl Web3ProxyApp { // TODO: i don't always see this in the logs. why? debug!("Received request: {:?}", request); + // even though we have timeouts on the requests to our backend providers, + // we need a timeout for the incoming request so that delays from + let max_time = Duration::from_secs(60); + let response = match request { - JsonRpcRequestEnum::Single(request) => { - JsonRpcForwardedResponseEnum::Single(self.proxy_web3_rpc_request(request).await?) - } - JsonRpcRequestEnum::Batch(requests) => { - JsonRpcForwardedResponseEnum::Batch(self.proxy_web3_rpc_requests(requests).await?) - } + JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single( + timeout(max_time, self.proxy_web3_rpc_request(request)).await??, + ), + JsonRpcRequestEnum::Batch(requests) => JsonRpcForwardedResponseEnum::Batch( + timeout(max_time, self.proxy_web3_rpc_requests(requests)).await??, + ), }; // TODO: i don't always see this in the logs. why? @@ -185,6 +188,43 @@ impl Web3ProxyApp { Ok(collected) } + fn get_cached_response( + &self, + request: &JsonRpcRequest, + ) -> ( + CacheKey, + Result, + ) { + // TODO: inspect the request to pick the right cache + // TODO: https://github.com/ethereum/web3.py/blob/master/web3/middleware/cache.py + + // TODO: Some requests should skip caching on the head_block_hash + let head_block_hash = Some(self.balanced_rpcs.get_head_block_hash()); + + // TODO: better key? benchmark this + let key = ( + head_block_hash, + request.method.clone(), + request.params.clone().map(|x| x.to_string()), + ); + + if let Some(response) = self.response_cache.read().get(&key) { + // TODO: emit a stat + trace!("{:?} cache hit!", request); + + // TODO: can we make references work? maybe put them in an Arc? + return (key, Ok(response.to_owned())); + } else { + // TODO: emit a stat + trace!("{:?} cache miss!", request); + } + + // TODO: multiple caches. if head_block_hash is None, have a persistent cache (disk backed?) + let cache = &self.response_cache; + + (key, Err(cache)) + } + // #[instrument(skip_all)] async fn proxy_web3_rpc_request( self: Arc, @@ -192,96 +232,81 @@ impl Web3ProxyApp { ) -> anyhow::Result { trace!("Received request: {:?}", request); - if request.method == "eth_sendRawTransaction" { - // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs - // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit - match self.private_rpcs.get_upstream_servers().await { - Ok(active_request_handles) => { - let (tx, rx) = flume::unbounded(); + // TODO: how much should we retry? probably with a timeout and not with a count like this + // TODO: think more about this loop. + for _i in 0..10usize { + // // TODO: add more to this span + // let span = info_span!("i", ?i); + // let _enter = span.enter(); + if request.method == "eth_sendRawTransaction" { + // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs + // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit + match self.private_rpcs.get_upstream_servers().await { + Ok(active_request_handles) => { + // TODO: refactor this to block until it has responses from all and also to return the most common success response + // TODO: i don't think we need to spawn it if we do that. + let (tx, rx) = flume::bounded(1); - let connections = self.private_rpcs.clone(); - let method = request.method.clone(); - let params = request.params.clone(); + let connections = self.private_rpcs.clone(); + let method = request.method.clone(); + let params = request.params.clone(); - // TODO: benchmark this compared to waiting on unbounded futures - // TODO: do something with this handle? - task::Builder::default() - .name("try_send_parallel_requests") - .spawn(async move { - connections - .try_send_parallel_requests( - active_request_handles, - method, - params, - tx, - ) - .await - }); + // TODO: benchmark this compared to waiting on unbounded futures + // TODO: do something with this handle? + // TODO: + task::Builder::default() + .name("try_send_parallel_requests") + .spawn(async move { + connections + .try_send_parallel_requests( + active_request_handles, + method, + params, + tx, + ) + .await + }); - // wait for the first response - // TODO: we don't want the first response. we want the quorum response - let backend_response = rx.recv_async().await?; + // wait for the first response + // TODO: we don't want the first response. we want the quorum response + let backend_response = rx.recv_async().await?; + + if let Ok(backend_response) = backend_response { + // TODO: i think we + let response = JsonRpcForwardedResponse { + jsonrpc: "2.0".to_string(), + id: request.id, + result: Some(backend_response), + error: None, + }; + return Ok(response); + } + } + Err(None) => { + // TODO: return a 502? + // TODO: i don't think this will ever happen + return Err(anyhow::anyhow!("no private rpcs!")); + } + Err(Some(retry_after)) => { + // TODO: move this to a helper function + // sleep (TODO: with a lock?) until our rate limits should be available + // TODO: if a server catches up sync while we are waiting, we could stop waiting + sleep(retry_after).await; + + warn!("All rate limits exceeded. Sleeping"); + } + }; + } else { + // this is not a private transaction (or no private relays are configured) + + let (cache_key, response_cache) = match self.get_cached_response(&request) { + (cache_key, Ok(response)) => { + let _ = self.active_requests.remove(&cache_key); - if let Ok(backend_response) = backend_response { - // TODO: i think we - let response = JsonRpcForwardedResponse { - jsonrpc: "2.0".to_string(), - id: request.id, - result: Some(backend_response), - error: None, - }; return Ok(response); } - } - Err(None) => { - // TODO: return a 502? - return Err(anyhow::anyhow!("no private rpcs!")); - } - Err(Some(retry_after)) => { - // TODO: move this to a helper function - // sleep (TODO: with a lock?) until our rate limits should be available - // TODO: if a server catches up sync while we are waiting, we could stop waiting - sleep(retry_after).await; - - warn!("All rate limits exceeded. Sleeping"); - } - }; - } else { - // this is not a private transaction (or no private relays are configured) - // TODO: how much should we retry? - for _i in 0..10usize { - // TODO: think more about this loop. - // // TODO: add more to this span. and do it properly - // let span = info_span!("i", ?i); - // let _enter = span.enter(); - - // todo: move getting a cache_key or the result into a helper function. then we could have multiple caches - // TODO: i think we are maybe getting stuck on this lock. maybe a new block arrives, it tries to write and gets hung up on something. then this can't proceed - trace!("{:?} waiting for head_block_hash", request); - - let head_block_hash = self.balanced_rpcs.get_head_block_hash(); - - trace!("{:?} head_block_hash {}", request, head_block_hash); - - // TODO: building this cache key is slow and its large, but i don't see a better way right now - // TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block - let cache_key = ( - head_block_hash, - request.method.clone(), - request.params.clone().map(|x| x.to_string()), - ); - - // first check to see if this is cached - if let Some(cached) = self.response_cache.read().get(&cache_key) { - let _ = self.active_requests.remove(&cache_key); - - // TODO: emit a stat - trace!("{:?} cache hit!", request); - - return Ok(cached.to_owned()); - } else { - trace!("{:?} cache miss!", request); - } + (cache_key, Err(response_cache)) => (cache_key, response_cache), + }; // check if this request is already in flight let (in_flight_tx, in_flight_rx) = watch::channel(true); @@ -302,7 +327,7 @@ impl Web3ProxyApp { let _ = other_in_flight_rx.changed().await; // now that we've waited, lets check the cache again - if let Some(cached) = self.response_cache.read().get(&cache_key) { + if let Some(cached) = response_cache.read().get(&cache_key) { let _ = self.active_requests.remove(&cache_key); let _ = in_flight_tx.send(false); @@ -342,7 +367,7 @@ impl Web3ProxyApp { }; // TODO: small race condidition here. parallel requests with the same query will both be saved to the cache - let mut response_cache = self.response_cache.write(); + let mut response_cache = response_cache.write(); // TODO: cache the warp::reply to save us serializing every time response_cache.insert(cache_key.clone(), response.clone()); diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index bd0cfc95..05ac3915 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::task; use tracing::Instrument; -use tracing::{info, info_span, instrument, trace, warn}; +use tracing::{debug, info, info_span, instrument, trace, warn}; use crate::config::Web3ConnectionConfig; use crate::connection::{ActiveRequestHandle, Web3Connection}; @@ -112,7 +112,8 @@ impl Web3Connections { } pub async fn subscribe_heads(self: &Arc) { - let (block_sender, block_receiver) = flume::unbounded(); + // TODO: i don't think this needs to be very big + let (block_sender, block_receiver) = flume::bounded(16); let mut handles = vec![]; @@ -330,6 +331,11 @@ impl Web3Connections { // the synced connections have changed let synced_connections = Arc::new(pending_synced_connections.clone()); + if synced_connections.inner.len() == max_connections { + // TODO: more metrics + debug!("all head: {}", new_block_hash); + } + trace!( "rpcs at {}: {:?}", synced_connections.head_block_hash, diff --git a/web3-proxy/src/frontend.rs b/web3-proxy/src/frontend.rs index 1a58d29b..6f3a3e54 100644 --- a/web3-proxy/src/frontend.rs +++ b/web3-proxy/src/frontend.rs @@ -44,13 +44,13 @@ pub async fn run(port: u16, proxy_app: Arc) -> anyhow::Result<()> .map_err(Into::into) } -/// basic handler that responds with a page for configuration your -/// TODO: check auth (from authp?) here? +/// a page for configuring your wallet with all the rpcs +/// TODO: check auth (from authp?) here async fn root() -> impl IntoResponse { "Hello, World!" } -// TODO: i can't get https://docs.rs/axum/latest/axum/error_handling/index.html to work +/// TODO: check auth (from authp?) here async fn proxy_web3_rpc( payload: Json, app: Extension>, @@ -81,12 +81,13 @@ async fn status(app: Extension>) -> impl IntoResponse { (StatusCode::INTERNAL_SERVER_ERROR, body.to_string()) } +/// TODO: pretty 404 page async fn handler_404() -> impl IntoResponse { (StatusCode::NOT_FOUND, "nothing to see here") } -// handle errors by converting them into something that implements `IntoResponse` -// TODO: use this +/// handle errors by converting them into something that implements `IntoResponse` +/// TODO: use this. i can't get https://docs.rs/axum/latest/axum/error_handling/index.html to work async fn _handle_anyhow_error(err: anyhow::Error) -> impl IntoResponse { let err = format!("{:?}", err);