From dc037e663a52136dd2636165046ef6d5465f957d Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 20 Sep 2023 10:11:07 -0700 Subject: [PATCH] tokio spawn the cached future (#217) * tokio spawn the cached future * lint --- web3_proxy/src/app/mod.rs | 87 +++++++++++++++++++++------------------ 1 file changed, 47 insertions(+), 40 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index e7fe3ff6..cfcd8c1b 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1254,12 +1254,9 @@ impl Web3ProxyApp { head_block: Option<&Web3ProxyBlock>, request_metadata: &Arc, ) -> Web3ProxyResult>> { - // TODO: don't clone into a new string? - let request_method = method.to_string(); - // TODO: serve net_version without querying the backend // TODO: don't force RawValue - let response_data: JsonRpcResponseEnum> = match request_method.as_ref() { + let response_data: JsonRpcResponseEnum> = match method { // lots of commands are blocked method @ ("db_getHex" | "db_getString" @@ -1733,7 +1730,7 @@ impl Web3ProxyApp { } }; - // TODO: think more about timeouts + // TODO: think more about this timeout. we should probably have a `request_expires_at` Duration on the request_metadata // TODO: different user tiers should have different timeouts // erigon's timeout is 300, so keep this a few seconds shorter let max_wait = Some(Duration::from_secs(295)); @@ -1748,47 +1745,57 @@ impl Web3ProxyApp { // TODO: try to fetch out of s3 - self - .jsonrpc_response_cache - .try_get_with::<_, Web3ProxyError>(cache_key.hash(), async { - // TODO: think more about this timeout. we should probably have a `request_expires_at` Duration on the request_metadata - let response_data = timeout(Duration::from_secs(290), self.balanced_rpcs - .try_proxy_connection::<_, Arc>( - method, - params, - Some(request_metadata), - max_wait, - from_block_num.as_ref(), - to_block_num.as_ref(), - )).await; + // TODO: clone less? + let app = self.clone(); + let method = method.to_string(); + let params = params.clone(); + let request_metadata = request_metadata.clone(); - match response_data { - Ok(response_data) => { - if !cache_jsonrpc_errors && let Err(err) = response_data { - // if we are not supposed to cache jsonrpc errors, - // then we must not convert Provider errors into a JsonRpcResponseEnum - // return all the errors now. moka will not cache Err results - Err(err) - } else { - // convert jsonrpc errors into JsonRpcResponseEnum, but leave the rest as errors - let response_data: JsonRpcResponseEnum> = response_data.try_into()?; + let f = async move { + app + .jsonrpc_response_cache + .try_get_with::<_, Web3ProxyError>(cache_key.hash(), async { + let response_data = timeout(Duration::from_secs(290), app.balanced_rpcs + .try_proxy_connection::<_, Arc>( + &method, + ¶ms, + Some(&request_metadata), + max_wait, + from_block_num.as_ref(), + to_block_num.as_ref(), + )).await; - if response_data.is_null() { - // don't ever cache "null" as a success. its too likely to be a problem - Err(Web3ProxyError::NullJsonRpcResult) - } else if response_data.num_bytes() > max_response_cache_bytes { - // don't cache really large requests - // TODO: emit a stat - Err(Web3ProxyError::JsonRpcResponse(response_data)) + match response_data { + Ok(response_data) => { + if !cache_jsonrpc_errors && let Err(err) = response_data { + // if we are not supposed to cache jsonrpc errors, + // then we must not convert Provider errors into a JsonRpcResponseEnum + // return all the errors now. moka will not cache Err results + Err(err) } else { - // TODO: response data should maybe be Arc>>, but that's more work - Ok(response_data) + // convert jsonrpc errors into JsonRpcResponseEnum, but leave the rest as errors + let response_data: JsonRpcResponseEnum> = response_data.try_into()?; + + if response_data.is_null() { + // don't ever cache "null" as a success. its too likely to be a problem + Err(Web3ProxyError::NullJsonRpcResult) + } else if response_data.num_bytes() > max_response_cache_bytes { + // don't cache really large requests + // TODO: emit a stat + Err(Web3ProxyError::JsonRpcResponse(response_data)) + } else { + // TODO: response data should maybe be Arc>>, but that's more work + Ok(response_data) + } } } + Err(err) => Err(Web3ProxyError::from(err)), } - Err(err) => Err(Web3ProxyError::from(err)), - } - }).await? + }).await + }; + + // this is spawned so that if the client disconnects, the app keeps polling the future with a lock inside the moka cache + tokio::spawn(f).await?? } else { let x = timeout( Duration::from_secs(300),