tokio spawn the cached future (#217)
* tokio spawn the cached future * lint
This commit is contained in:
parent
e4d3a736d0
commit
dc037e663a
|
@ -1254,12 +1254,9 @@ impl Web3ProxyApp {
|
||||||
head_block: Option<&Web3ProxyBlock>,
|
head_block: Option<&Web3ProxyBlock>,
|
||||||
request_metadata: &Arc<RequestMetadata>,
|
request_metadata: &Arc<RequestMetadata>,
|
||||||
) -> Web3ProxyResult<JsonRpcResponseEnum<Arc<RawValue>>> {
|
) -> Web3ProxyResult<JsonRpcResponseEnum<Arc<RawValue>>> {
|
||||||
// TODO: don't clone into a new string?
|
|
||||||
let request_method = method.to_string();
|
|
||||||
|
|
||||||
// TODO: serve net_version without querying the backend
|
// TODO: serve net_version without querying the backend
|
||||||
// TODO: don't force RawValue
|
// TODO: don't force RawValue
|
||||||
let response_data: JsonRpcResponseEnum<Arc<RawValue>> = match request_method.as_ref() {
|
let response_data: JsonRpcResponseEnum<Arc<RawValue>> = match method {
|
||||||
// lots of commands are blocked
|
// lots of commands are blocked
|
||||||
method @ ("db_getHex"
|
method @ ("db_getHex"
|
||||||
| "db_getString"
|
| "db_getString"
|
||||||
|
@ -1733,7 +1730,7 @@ impl Web3ProxyApp {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: think more about timeouts
|
// TODO: think more about this timeout. we should probably have a `request_expires_at` Duration on the request_metadata
|
||||||
// TODO: different user tiers should have different timeouts
|
// TODO: different user tiers should have different timeouts
|
||||||
// erigon's timeout is 300, so keep this a few seconds shorter
|
// erigon's timeout is 300, so keep this a few seconds shorter
|
||||||
let max_wait = Some(Duration::from_secs(295));
|
let max_wait = Some(Duration::from_secs(295));
|
||||||
|
@ -1748,47 +1745,57 @@ impl Web3ProxyApp {
|
||||||
|
|
||||||
// TODO: try to fetch out of s3
|
// TODO: try to fetch out of s3
|
||||||
|
|
||||||
self
|
// TODO: clone less?
|
||||||
.jsonrpc_response_cache
|
let app = self.clone();
|
||||||
.try_get_with::<_, Web3ProxyError>(cache_key.hash(), async {
|
let method = method.to_string();
|
||||||
// TODO: think more about this timeout. we should probably have a `request_expires_at` Duration on the request_metadata
|
let params = params.clone();
|
||||||
let response_data = timeout(Duration::from_secs(290), self.balanced_rpcs
|
let request_metadata = request_metadata.clone();
|
||||||
.try_proxy_connection::<_, Arc<RawValue>>(
|
|
||||||
method,
|
|
||||||
params,
|
|
||||||
Some(request_metadata),
|
|
||||||
max_wait,
|
|
||||||
from_block_num.as_ref(),
|
|
||||||
to_block_num.as_ref(),
|
|
||||||
)).await;
|
|
||||||
|
|
||||||
match response_data {
|
let f = async move {
|
||||||
Ok(response_data) => {
|
app
|
||||||
if !cache_jsonrpc_errors && let Err(err) = response_data {
|
.jsonrpc_response_cache
|
||||||
// if we are not supposed to cache jsonrpc errors,
|
.try_get_with::<_, Web3ProxyError>(cache_key.hash(), async {
|
||||||
// then we must not convert Provider errors into a JsonRpcResponseEnum
|
let response_data = timeout(Duration::from_secs(290), app.balanced_rpcs
|
||||||
// return all the errors now. moka will not cache Err results
|
.try_proxy_connection::<_, Arc<RawValue>>(
|
||||||
Err(err)
|
&method,
|
||||||
} else {
|
¶ms,
|
||||||
// convert jsonrpc errors into JsonRpcResponseEnum, but leave the rest as errors
|
Some(&request_metadata),
|
||||||
let response_data: JsonRpcResponseEnum<Arc<RawValue>> = response_data.try_into()?;
|
max_wait,
|
||||||
|
from_block_num.as_ref(),
|
||||||
|
to_block_num.as_ref(),
|
||||||
|
)).await;
|
||||||
|
|
||||||
if response_data.is_null() {
|
match response_data {
|
||||||
// don't ever cache "null" as a success. its too likely to be a problem
|
Ok(response_data) => {
|
||||||
Err(Web3ProxyError::NullJsonRpcResult)
|
if !cache_jsonrpc_errors && let Err(err) = response_data {
|
||||||
} else if response_data.num_bytes() > max_response_cache_bytes {
|
// if we are not supposed to cache jsonrpc errors,
|
||||||
// don't cache really large requests
|
// then we must not convert Provider errors into a JsonRpcResponseEnum
|
||||||
// TODO: emit a stat
|
// return all the errors now. moka will not cache Err results
|
||||||
Err(Web3ProxyError::JsonRpcResponse(response_data))
|
Err(err)
|
||||||
} else {
|
} else {
|
||||||
// TODO: response data should maybe be Arc<JsonRpcResponseEnum<Box<RawValue>>>, but that's more work
|
// convert jsonrpc errors into JsonRpcResponseEnum, but leave the rest as errors
|
||||||
Ok(response_data)
|
let response_data: JsonRpcResponseEnum<Arc<RawValue>> = response_data.try_into()?;
|
||||||
|
|
||||||
|
if response_data.is_null() {
|
||||||
|
// don't ever cache "null" as a success. its too likely to be a problem
|
||||||
|
Err(Web3ProxyError::NullJsonRpcResult)
|
||||||
|
} else if response_data.num_bytes() > max_response_cache_bytes {
|
||||||
|
// don't cache really large requests
|
||||||
|
// TODO: emit a stat
|
||||||
|
Err(Web3ProxyError::JsonRpcResponse(response_data))
|
||||||
|
} else {
|
||||||
|
// TODO: response data should maybe be Arc<JsonRpcResponseEnum<Box<RawValue>>>, but that's more work
|
||||||
|
Ok(response_data)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Err(err) => Err(Web3ProxyError::from(err)),
|
||||||
}
|
}
|
||||||
Err(err) => Err(Web3ProxyError::from(err)),
|
}).await
|
||||||
}
|
};
|
||||||
}).await?
|
|
||||||
|
// this is spawned so that if the client disconnects, the app keeps polling the future with a lock inside the moka cache
|
||||||
|
tokio::spawn(f).await??
|
||||||
} else {
|
} else {
|
||||||
let x = timeout(
|
let x = timeout(
|
||||||
Duration::from_secs(300),
|
Duration::from_secs(300),
|
||||||
|
|
Loading…
Reference in New Issue