move cache helper in preparation for larger refactor

This commit is contained in:
Bryan Stitt 2022-05-22 04:22:30 +00:00
parent 32b03ad3dd
commit 8a2535da74
3 changed files with 136 additions and 104 deletions

@ -16,7 +16,7 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use tokio::task;
use tokio::time::sleep;
use tokio::time::{sleep, timeout};
use tracing::{debug, info, instrument, trace, warn};
static APP_USER_AGENT: &str = concat!(
@ -30,9 +30,9 @@ static APP_USER_AGENT: &str = concat!(
const RESPONSE_CACHE_CAP: usize = 1024;
/// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work
type CacheKey = (H256, String, Option<String>);
type CacheKey = (Option<H256>, String, Option<String>);
type ResponseLruCache = RwLock<LinkedHashMap<CacheKey, JsonRpcForwardedResponse>>;
type ResponseLrcCache = RwLock<LinkedHashMap<CacheKey, JsonRpcForwardedResponse>>;
type ActiveRequestsMap = DashMap<CacheKey, watch::Receiver<bool>>;
@ -45,7 +45,7 @@ pub struct Web3ProxyApp {
/// Send private requests (like eth_sendRawTransaction) to all these servers
private_rpcs: Arc<Web3Connections>,
active_requests: ActiveRequestsMap,
response_cache: ResponseLruCache,
response_cache: ResponseLrcCache,
}
impl fmt::Debug for Web3ProxyApp {
@ -56,7 +56,6 @@ impl fmt::Debug for Web3ProxyApp {
}
impl Web3ProxyApp {
// #[instrument(name = "try_new_Web3ProxyApp", skip_all)]
pub async fn try_new(
chain_id: usize,
redis_address: Option<String>,
@ -143,13 +142,17 @@ impl Web3ProxyApp {
// TODO: i don't always see this in the logs. why?
debug!("Received request: {:?}", request);
// even though we have timeouts on the requests to our backend providers,
// we need a timeout for the incoming request so that delays from
let max_time = Duration::from_secs(60);
let response = match request {
JsonRpcRequestEnum::Single(request) => {
JsonRpcForwardedResponseEnum::Single(self.proxy_web3_rpc_request(request).await?)
}
JsonRpcRequestEnum::Batch(requests) => {
JsonRpcForwardedResponseEnum::Batch(self.proxy_web3_rpc_requests(requests).await?)
}
JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single(
timeout(max_time, self.proxy_web3_rpc_request(request)).await??,
),
JsonRpcRequestEnum::Batch(requests) => JsonRpcForwardedResponseEnum::Batch(
timeout(max_time, self.proxy_web3_rpc_requests(requests)).await??,
),
};
// TODO: i don't always see this in the logs. why?
@ -185,6 +188,43 @@ impl Web3ProxyApp {
Ok(collected)
}
fn get_cached_response(
&self,
request: &JsonRpcRequest,
) -> (
CacheKey,
Result<JsonRpcForwardedResponse, &ResponseLrcCache>,
) {
// TODO: inspect the request to pick the right cache
// TODO: https://github.com/ethereum/web3.py/blob/master/web3/middleware/cache.py
// TODO: Some requests should skip caching on the head_block_hash
let head_block_hash = Some(self.balanced_rpcs.get_head_block_hash());
// TODO: better key? benchmark this
let key = (
head_block_hash,
request.method.clone(),
request.params.clone().map(|x| x.to_string()),
);
if let Some(response) = self.response_cache.read().get(&key) {
// TODO: emit a stat
trace!("{:?} cache hit!", request);
// TODO: can we make references work? maybe put them in an Arc?
return (key, Ok(response.to_owned()));
} else {
// TODO: emit a stat
trace!("{:?} cache miss!", request);
}
// TODO: multiple caches. if head_block_hash is None, have a persistent cache (disk backed?)
let cache = &self.response_cache;
(key, Err(cache))
}
// #[instrument(skip_all)]
async fn proxy_web3_rpc_request(
self: Arc<Web3ProxyApp>,
@ -192,96 +232,81 @@ impl Web3ProxyApp {
) -> anyhow::Result<JsonRpcForwardedResponse> {
trace!("Received request: {:?}", request);
if request.method == "eth_sendRawTransaction" {
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
// TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit
match self.private_rpcs.get_upstream_servers().await {
Ok(active_request_handles) => {
let (tx, rx) = flume::unbounded();
// TODO: how much should we retry? probably with a timeout and not with a count like this
// TODO: think more about this loop.
for _i in 0..10usize {
// // TODO: add more to this span
// let span = info_span!("i", ?i);
// let _enter = span.enter();
if request.method == "eth_sendRawTransaction" {
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
// TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit
match self.private_rpcs.get_upstream_servers().await {
Ok(active_request_handles) => {
// TODO: refactor this to block until it has responses from all and also to return the most common success response
// TODO: i don't think we need to spawn it if we do that.
let (tx, rx) = flume::bounded(1);
let connections = self.private_rpcs.clone();
let method = request.method.clone();
let params = request.params.clone();
let connections = self.private_rpcs.clone();
let method = request.method.clone();
let params = request.params.clone();
// TODO: benchmark this compared to waiting on unbounded futures
// TODO: do something with this handle?
task::Builder::default()
.name("try_send_parallel_requests")
.spawn(async move {
connections
.try_send_parallel_requests(
active_request_handles,
method,
params,
tx,
)
.await
});
// TODO: benchmark this compared to waiting on unbounded futures
// TODO: do something with this handle?
// TODO:
task::Builder::default()
.name("try_send_parallel_requests")
.spawn(async move {
connections
.try_send_parallel_requests(
active_request_handles,
method,
params,
tx,
)
.await
});
// wait for the first response
// TODO: we don't want the first response. we want the quorum response
let backend_response = rx.recv_async().await?;
// wait for the first response
// TODO: we don't want the first response. we want the quorum response
let backend_response = rx.recv_async().await?;
if let Ok(backend_response) = backend_response {
// TODO: i think we
let response = JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: Some(backend_response),
error: None,
};
return Ok(response);
}
}
Err(None) => {
// TODO: return a 502?
// TODO: i don't think this will ever happen
return Err(anyhow::anyhow!("no private rpcs!"));
}
Err(Some(retry_after)) => {
// TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
sleep(retry_after).await;
warn!("All rate limits exceeded. Sleeping");
}
};
} else {
// this is not a private transaction (or no private relays are configured)
let (cache_key, response_cache) = match self.get_cached_response(&request) {
(cache_key, Ok(response)) => {
let _ = self.active_requests.remove(&cache_key);
if let Ok(backend_response) = backend_response {
// TODO: i think we
let response = JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: Some(backend_response),
error: None,
};
return Ok(response);
}
}
Err(None) => {
// TODO: return a 502?
return Err(anyhow::anyhow!("no private rpcs!"));
}
Err(Some(retry_after)) => {
// TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
sleep(retry_after).await;
warn!("All rate limits exceeded. Sleeping");
}
};
} else {
// this is not a private transaction (or no private relays are configured)
// TODO: how much should we retry?
for _i in 0..10usize {
// TODO: think more about this loop.
// // TODO: add more to this span. and do it properly
// let span = info_span!("i", ?i);
// let _enter = span.enter();
// todo: move getting a cache_key or the result into a helper function. then we could have multiple caches
// TODO: i think we are maybe getting stuck on this lock. maybe a new block arrives, it tries to write and gets hung up on something. then this can't proceed
trace!("{:?} waiting for head_block_hash", request);
let head_block_hash = self.balanced_rpcs.get_head_block_hash();
trace!("{:?} head_block_hash {}", request, head_block_hash);
// TODO: building this cache key is slow and its large, but i don't see a better way right now
// TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block
let cache_key = (
head_block_hash,
request.method.clone(),
request.params.clone().map(|x| x.to_string()),
);
// first check to see if this is cached
if let Some(cached) = self.response_cache.read().get(&cache_key) {
let _ = self.active_requests.remove(&cache_key);
// TODO: emit a stat
trace!("{:?} cache hit!", request);
return Ok(cached.to_owned());
} else {
trace!("{:?} cache miss!", request);
}
(cache_key, Err(response_cache)) => (cache_key, response_cache),
};
// check if this request is already in flight
let (in_flight_tx, in_flight_rx) = watch::channel(true);
@ -302,7 +327,7 @@ impl Web3ProxyApp {
let _ = other_in_flight_rx.changed().await;
// now that we've waited, lets check the cache again
if let Some(cached) = self.response_cache.read().get(&cache_key) {
if let Some(cached) = response_cache.read().get(&cache_key) {
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
@ -342,7 +367,7 @@ impl Web3ProxyApp {
};
// TODO: small race condidition here. parallel requests with the same query will both be saved to the cache
let mut response_cache = self.response_cache.write();
let mut response_cache = response_cache.write();
// TODO: cache the warp::reply to save us serializing every time
response_cache.insert(cache_key.clone(), response.clone());

@ -17,7 +17,7 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::task;
use tracing::Instrument;
use tracing::{info, info_span, instrument, trace, warn};
use tracing::{debug, info, info_span, instrument, trace, warn};
use crate::config::Web3ConnectionConfig;
use crate::connection::{ActiveRequestHandle, Web3Connection};
@ -112,7 +112,8 @@ impl Web3Connections {
}
pub async fn subscribe_heads(self: &Arc<Self>) {
let (block_sender, block_receiver) = flume::unbounded();
// TODO: i don't think this needs to be very big
let (block_sender, block_receiver) = flume::bounded(16);
let mut handles = vec![];
@ -330,6 +331,11 @@ impl Web3Connections {
// the synced connections have changed
let synced_connections = Arc::new(pending_synced_connections.clone());
if synced_connections.inner.len() == max_connections {
// TODO: more metrics
debug!("all head: {}", new_block_hash);
}
trace!(
"rpcs at {}: {:?}",
synced_connections.head_block_hash,

@ -44,13 +44,13 @@ pub async fn run(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()>
.map_err(Into::into)
}
/// basic handler that responds with a page for configuration your
/// TODO: check auth (from authp?) here?
/// a page for configuring your wallet with all the rpcs
/// TODO: check auth (from authp?) here
async fn root() -> impl IntoResponse {
"Hello, World!"
}
// TODO: i can't get https://docs.rs/axum/latest/axum/error_handling/index.html to work
/// TODO: check auth (from authp?) here
async fn proxy_web3_rpc(
payload: Json<JsonRpcRequestEnum>,
app: Extension<Arc<Web3ProxyApp>>,
@ -81,12 +81,13 @@ async fn status(app: Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
(StatusCode::INTERNAL_SERVER_ERROR, body.to_string())
}
/// TODO: pretty 404 page
async fn handler_404() -> impl IntoResponse {
(StatusCode::NOT_FOUND, "nothing to see here")
}
// handle errors by converting them into something that implements `IntoResponse`
// TODO: use this
/// handle errors by converting them into something that implements `IntoResponse`
/// TODO: use this. i can't get https://docs.rs/axum/latest/axum/error_handling/index.html to work
async fn _handle_anyhow_error(err: anyhow::Error) -> impl IntoResponse {
let err = format!("{:?}", err);