web3-proxy/web3_proxy/src/response_cache.rs

396 lines
11 KiB
Rust
Raw Normal View History

use crate::{
block_number::{BlockNumAndHash, CacheMode},
errors::{Web3ProxyError, Web3ProxyResult},
frontend::authorization::RequestOrMethod,
jsonrpc::{self, JsonRpcErrorData},
};
use derive_more::From;
2023-06-27 08:40:00 +03:00
use ethers::{
providers::{HttpClientError, JsonRpcError, ProviderError, WsClientError},
More balance tests (#182) * fix popularity contest * more info in the Debug for Web3Rpc * add frontend_requests and cache_misses to the Balance query * add more to balance and stats flushing and improved test coverage * it compiles * deserializer for Ulid to Uuid I think a wrapper type on Ulid that implements sea_orm::Value is probably better * rename variable to match struct name * add deserializer for Address -> Vec<u8> * sql sum returns a Decimal. need to convert to u64 * assert more * one log and assert more * log more * use a helper to get the user's rpc provider * this should be 2 now that we have a public and authed call * this should be zero. the public has the cache miss * instrument cu calcs * trace the value we took, not the default that replaced it * move usd_per_chain into config * remove some extra logging * use Arc::into_inner to maybe avoid a race * off by 1 * pass paid credits used instead of returning it this lets us use it to write to our user balance cache first. importantly, this keeps us from holding a write lock while writing to mysql * no cache misses expected in this test * actually check the admin * put the balance checks back now that the rest of the test works * archive request is being set incorrectly * wow howd we manage flipping the greater than sign on archive depth * move latest_balance and premium_credits_used to before any stats are emitted * lint * and build undoes the linting. fun i didnt even want to lint them in the first place, so this is fine * missed incrementing total_spent when not incrementing total_spent_paid_credits * use the credits on self * use the credits on self (pt 2) * fix type for 10 cu query * convert the requestmetadata on the other side of the channel * logs * viewing stats is allowed even without a balance * move paid_credits_used to AuthorizationChecks * wip * test_sum_credits_used finally passes * UserBalanceCache::get_or_insert * re-enable rpc_secret_key_cache * move invalidate to a helper function and always call it **after** the db is commited * fix PartialEq and Eq on RpcSecretKey * cargo upgrade
2023-07-12 10:35:07 +03:00
types::U64,
2023-06-27 08:40:00 +03:00
};
use hashbrown::hash_map::DefaultHashBuilder;
2023-06-08 03:26:38 +03:00
use moka::future::Cache;
use parking_lot::Mutex;
use serde_json::value::RawValue;
use std::{
hash::{BuildHasher, Hash, Hasher},
2023-06-07 23:57:38 +03:00
sync::Arc,
};
#[derive(Clone, Debug, Eq, From)]
pub struct JsonRpcQueryCacheKey<'a> {
/// hashed params so that
hash: u64,
from_block: Option<&'a BlockNumAndHash>,
to_block: Option<&'a BlockNumAndHash>,
cache_jsonrpc_errors: bool,
}
impl JsonRpcQueryCacheKey<'_> {
2023-06-07 23:57:38 +03:00
pub fn hash(&self) -> u64 {
self.hash
}
pub fn from_block_num(&self) -> Option<&U64> {
self.from_block.as_ref().map(|x| x.num())
}
pub fn to_block_num(&self) -> Option<&U64> {
self.to_block.as_ref().map(|x| x.num())
}
pub fn cache_errors(&self) -> bool {
self.cache_jsonrpc_errors
}
}
impl PartialEq for JsonRpcQueryCacheKey<'_> {
fn eq(&self, other: &Self) -> bool {
self.hash.eq(&other.hash)
}
}
impl Hash for JsonRpcQueryCacheKey<'_> {
fn hash<H: Hasher>(&self, state: &mut H) {
// TODO: i feel like this hashes twice. oh well
self.hash.hash(state);
}
}
impl<'a> JsonRpcQueryCacheKey<'a> {
pub fn new(cache_mode: &'a CacheMode, request: &'a RequestOrMethod) -> Self {
// TODO: do this without clone
let from_block = cache_mode.from_block();
let to_block = cache_mode.to_block();
let cache_jsonrpc_errors = cache_mode.cache_jsonrpc_errors();
let mut hasher = DefaultHashBuilder::default().build_hasher();
from_block.hash(&mut hasher);
to_block.hash(&mut hasher);
request.method().hash(&mut hasher);
// TODO: make sure preserve_order feature is OFF
// TODO: is there a faster way to do this?
request.params().to_string().hash(&mut hasher);
cache_jsonrpc_errors.hash(&mut hasher);
let hash = hasher.finish();
Self {
hash,
from_block,
to_block,
cache_jsonrpc_errors,
}
}
}
2023-06-08 03:26:38 +03:00
pub type JsonRpcResponseCache = Cache<u64, JsonRpcResponseEnum<Arc<RawValue>>>;
/// TODO: we might need one that holds RawValue and one that holds serde_json::Value
#[derive(Clone, Debug)]
pub enum JsonRpcResponseEnum<R> {
2023-08-01 00:26:07 +03:00
NullResult,
Result {
value: R,
2023-06-08 03:26:38 +03:00
num_bytes: u32,
},
RpcError {
error_data: JsonRpcErrorData,
2023-06-08 03:26:38 +03:00
num_bytes: u32,
},
}
// TODO: impl for other inner result types?
impl<R> JsonRpcResponseEnum<R> {
2023-06-08 03:26:38 +03:00
pub fn num_bytes(&self) -> u32 {
match self {
2023-08-01 00:26:07 +03:00
Self::NullResult => 1,
Self::Result { num_bytes, .. } => *num_bytes,
Self::RpcError { num_bytes, .. } => *num_bytes,
}
}
2023-08-03 01:22:26 +03:00
pub fn is_error(&self) -> bool {
match self {
Self::NullResult => false,
Self::Result { .. } => false,
Self::RpcError { .. } => true,
}
}
}
2023-08-01 00:26:07 +03:00
impl<R> JsonRpcResponseEnum<Option<R>> {
pub fn is_null(&self) -> bool {
matches!(self, Self::NullResult | Self::Result { value: None, .. })
}
}
impl JsonRpcResponseEnum<Arc<RawValue>> {
pub fn is_null(&self) -> bool {
match self {
Self::NullResult => true,
Self::Result { value, .. } => value.get() == "null",
_ => false,
}
}
}
impl TryFrom<Web3ProxyResult<jsonrpc::SingleResponse>> for JsonRpcResponseEnum<Arc<RawValue>> {
type Error = Web3ProxyError;
fn try_from(response: Web3ProxyResult<jsonrpc::SingleResponse>) -> Result<Self, Self::Error> {
match response {
Ok(jsonrpc::SingleResponse::Parsed(parsed)) => match parsed.payload {
jsonrpc::Payload::Success { result } => {
let num_bytes = result.get().len() as u32;
Ok(JsonRpcResponseEnum::Result {
value: result,
num_bytes,
})
}
jsonrpc::Payload::Error { error } => {
let num_bytes = error.num_bytes() as u32;
Ok(JsonRpcResponseEnum::RpcError {
error_data: error,
// TODO: this double serializes
num_bytes,
})
}
},
Ok(jsonrpc::SingleResponse::Stream(stream)) => {
Err(Web3ProxyError::StreamResponse(Mutex::new(Some(stream))))
}
Err(err) => err.try_into(),
}
}
}
2023-06-07 23:57:38 +03:00
impl From<serde_json::Value> for JsonRpcResponseEnum<Arc<RawValue>> {
fn from(value: serde_json::Value) -> Self {
let value = RawValue::from_string(value.to_string()).unwrap();
value.into()
}
}
2023-06-07 23:57:38 +03:00
impl From<Arc<RawValue>> for JsonRpcResponseEnum<Arc<RawValue>> {
fn from(value: Arc<RawValue>) -> Self {
let num_bytes = value.get().len();
2023-06-08 03:26:38 +03:00
let num_bytes = num_bytes as u32;
2023-06-07 23:57:38 +03:00
Self::Result { value, num_bytes }
}
}
impl From<Box<RawValue>> for JsonRpcResponseEnum<Arc<RawValue>> {
fn from(value: Box<RawValue>) -> Self {
let num_bytes = value.get().len();
2023-06-08 03:26:38 +03:00
let num_bytes = num_bytes as u32;
2023-06-07 23:57:38 +03:00
let value = value.into();
Self::Result { value, num_bytes }
}
}
2023-08-03 01:22:26 +03:00
impl TryFrom<Web3ProxyError> for JsonRpcResponseEnum<Arc<RawValue>> {
type Error = Web3ProxyError;
fn try_from(value: Web3ProxyError) -> Result<Self, Self::Error> {
2023-08-03 01:22:26 +03:00
match value {
Web3ProxyError::EthersProvider(err) => {
if let Ok(x) = JsonRpcErrorData::try_from(&err) {
let x = x.into();
Ok(x)
} else {
Err(err.into())
}
}
2023-08-03 01:22:26 +03:00
Web3ProxyError::NullJsonRpcResult => Ok(JsonRpcResponseEnum::NullResult),
Web3ProxyError::JsonRpcResponse(x) => Ok(x),
Web3ProxyError::JsonRpcErrorData(err) => Ok(err.into()),
err => Err(err),
}
}
}
2023-06-07 23:57:38 +03:00
impl TryFrom<Result<Arc<RawValue>, Web3ProxyError>> for JsonRpcResponseEnum<Arc<RawValue>> {
type Error = Web3ProxyError;
fn try_from(value: Result<Arc<RawValue>, Web3ProxyError>) -> Result<Self, Self::Error> {
match value {
Ok(x) => Ok(x.into()),
Err(err) => {
let x: Self = err.try_into()?;
Ok(x)
}
}
}
}
impl TryFrom<Result<Box<RawValue>, Web3ProxyError>> for JsonRpcResponseEnum<Arc<RawValue>> {
type Error = Web3ProxyError;
fn try_from(value: Result<Box<RawValue>, Web3ProxyError>) -> Result<Self, Self::Error> {
match value {
Ok(x) => Ok(x.into()),
Err(err) => {
let x: Self = err.try_into()?;
Ok(x)
}
}
}
}
impl<R> From<JsonRpcErrorData> for JsonRpcResponseEnum<R> {
fn from(value: JsonRpcErrorData) -> Self {
// TODO: wrap the error in a complete response?
let num_bytes = serde_json::to_string(&value).unwrap().len();
2023-06-08 03:26:38 +03:00
let num_bytes = num_bytes as u32;
Self::RpcError {
error_data: value,
num_bytes,
}
}
}
2023-06-27 08:40:00 +03:00
impl<'a> From<&'a JsonRpcError> for JsonRpcErrorData {
fn from(value: &'a JsonRpcError) -> Self {
Self {
code: value.code,
message: value.message.clone().into(),
data: value.data.clone(),
}
}
}
2023-06-27 08:40:00 +03:00
impl<'a> TryFrom<&'a ProviderError> for JsonRpcErrorData {
type Error = &'a ProviderError;
2023-06-27 08:40:00 +03:00
fn try_from(e: &'a ProviderError) -> Result<Self, Self::Error> {
match e {
ProviderError::JsonRpcClientError(err) => {
if let Some(err) = err.as_error_response() {
2023-06-27 08:40:00 +03:00
Ok(err.into())
} else {
2023-06-27 08:40:00 +03:00
Err(e)
}
}
2023-06-27 08:40:00 +03:00
e => Err(e),
}
2023-06-27 08:40:00 +03:00
}
}
2023-06-27 08:40:00 +03:00
impl<'a> TryFrom<&'a HttpClientError> for JsonRpcErrorData {
type Error = &'a HttpClientError;
fn try_from(e: &'a HttpClientError) -> Result<Self, Self::Error> {
match e {
HttpClientError::JsonRpcError(err) => Ok(err.into()),
e => Err(e),
}
}
}
impl<'a> TryFrom<&'a WsClientError> for JsonRpcErrorData {
type Error = &'a WsClientError;
fn try_from(e: &'a WsClientError) -> Result<Self, Self::Error> {
match e {
WsClientError::JsonRpcError(err) => Ok(err.into()),
e => Err(e),
}
}
}
/// The inner u32 is the maximum weight per item
#[derive(Copy, Clone)]
pub struct JsonRpcResponseWeigher(pub u32);
impl JsonRpcResponseWeigher {
pub fn weigh<K, R>(&self, _key: &K, value: &JsonRpcResponseEnum<R>) -> u32 {
let x = value.num_bytes();
if x > self.0 {
// return max. the item may start to be inserted into the cache, but it will be immediatly removed
u32::MAX
} else {
x
}
}
}
#[cfg(test)]
mod tests {
2023-06-08 03:26:38 +03:00
use super::JsonRpcResponseEnum;
use crate::response_cache::JsonRpcResponseWeigher;
use moka::future::{Cache, CacheBuilder};
use serde_json::value::RawValue;
use std::{sync::Arc, time::Duration};
#[tokio::test(start_paused = true)]
async fn test_json_rpc_query_weigher() {
let max_item_weight = 200;
let weight_capacity = 1_000;
let weigher = JsonRpcResponseWeigher(max_item_weight);
2023-06-08 03:42:34 +03:00
let small_data: JsonRpcResponseEnum<Arc<RawValue>> = JsonRpcResponseEnum::Result {
2023-06-07 23:57:38 +03:00
value: Box::<RawValue>::default().into(),
2023-06-08 03:42:34 +03:00
num_bytes: max_item_weight / 2,
};
assert_eq!(weigher.weigh(&(), &small_data), max_item_weight / 2);
2023-06-08 03:42:34 +03:00
let max_sized_data: JsonRpcResponseEnum<Arc<RawValue>> = JsonRpcResponseEnum::Result {
2023-06-07 23:57:38 +03:00
value: Box::<RawValue>::default().into(),
2023-06-08 03:26:38 +03:00
num_bytes: max_item_weight,
};
assert_eq!(weigher.weigh(&(), &max_sized_data), max_item_weight);
2023-06-08 03:42:34 +03:00
let oversized_data: JsonRpcResponseEnum<Arc<RawValue>> = JsonRpcResponseEnum::Result {
2023-06-07 23:57:38 +03:00
value: Box::<RawValue>::default().into(),
2023-06-08 03:26:38 +03:00
num_bytes: max_item_weight * 2,
};
assert_eq!(weigher.weigh(&(), &oversized_data), u32::MAX);
let test_cache: Cache<u32, JsonRpcResponseEnum<Arc<RawValue>>> =
CacheBuilder::new(weight_capacity)
.weigher(move |k, v| weigher.weigh(k, v))
.time_to_live(Duration::from_secs(2))
.build();
2023-06-08 03:42:34 +03:00
2023-06-08 03:26:38 +03:00
test_cache.insert(0, small_data).await;
test_cache.get(&0).await.unwrap();
2023-06-08 03:26:38 +03:00
test_cache.insert(1, max_sized_data).await;
test_cache.get(&0).await.unwrap();
test_cache.get(&1).await.unwrap();
2023-06-08 03:26:38 +03:00
test_cache.insert(2, oversized_data).await;
test_cache.get(&0).await.unwrap();
test_cache.get(&1).await.unwrap();
// oversized data will be in the cache temporarily (it should just be an arc though, so that should be fine)
test_cache.get(&2).await.unwrap();
// sync should do necessary cleanup
test_cache.run_pending_tasks().await;
// now it should be empty
assert!(test_cache.get(&2).await.is_none());
}
}