use ahash

this might actually be slower. do real benchmarks
This commit is contained in:
Bryan Stitt 2022-09-17 02:17:20 +00:00
parent 3d4bfbfde0
commit 05af0551c9
8 changed files with 46 additions and 30 deletions

1
Cargo.lock generated
View File

@ -5544,6 +5544,7 @@ dependencies = [
name = "web3_proxy" name = "web3_proxy"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"ahash 0.8.0",
"anyhow", "anyhow",
"arc-swap", "arc-swap",
"argh", "argh",

View File

@ -34,14 +34,12 @@ where
pub fn new(cache_size: u64, prefix: &str, rrl: RedisRateLimiter) -> Self { pub fn new(cache_size: u64, prefix: &str, rrl: RedisRateLimiter) -> Self {
let ttl = rrl.period as u64; let ttl = rrl.period as u64;
let hasher = ahash::RandomState::new(); // TODO: time to live is not right. we want this ttl counter to start only after redis is down. this works for now
// TODO: think more about this ttl. if
let local_cache = Cache::builder() let local_cache = Cache::builder()
.time_to_live(Duration::from_secs(ttl)) .time_to_live(Duration::from_secs(ttl))
.max_capacity(cache_size) .max_capacity(cache_size)
.name(prefix) .name(prefix)
.build_with_hasher(hasher); .build_with_hasher(ahash::RandomState::new());
Self { Self {
local_cache, local_cache,

View File

@ -19,6 +19,7 @@ entities = { path = "../entities" }
migration = { path = "../migration" } migration = { path = "../migration" }
redis-rate-limiter = { path = "../redis-rate-limiter" } redis-rate-limiter = { path = "../redis-rate-limiter" }
ahash = "0.8.0"
anyhow = { version = "1.0.65", features = ["backtrace"] } anyhow = { version = "1.0.65", features = ["backtrace"] }
arc-swap = "1.5.1" arc-swap = "1.5.1"
argh = "0.1.8" argh = "0.1.8"

View File

@ -6,7 +6,7 @@ use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcForwardedResponseEnum;
use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum; use crate::jsonrpc::JsonRpcRequestEnum;
use crate::rpcs::blockchain::{ArcBlock, BlockHashesMap, BlockId}; use crate::rpcs::blockchain::{ArcBlock, BlockId};
use crate::rpcs::connections::Web3Connections; use crate::rpcs::connections::Web3Connections;
use crate::rpcs::request::OpenRequestHandleMetrics; use crate::rpcs::request::OpenRequestHandleMetrics;
use crate::rpcs::transactions::TxStatus; use crate::rpcs::transactions::TxStatus;
@ -54,7 +54,7 @@ static APP_USER_AGENT: &str = concat!(
/// block hash, method, params /// block hash, method, params
// TODO: better name // TODO: better name
type ResponseCacheKey = (H256, String, Option<String>); type ResponseCacheKey = (H256, String, Option<String>);
type ResponseCache = Cache<ResponseCacheKey, JsonRpcForwardedResponse>; type ResponseCache = Cache<ResponseCacheKey, JsonRpcForwardedResponse, ahash::RandomState>;
pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>; pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
@ -84,11 +84,11 @@ pub struct Web3ProxyApp {
app_metrics: Arc<Web3ProxyAppMetrics>, app_metrics: Arc<Web3ProxyAppMetrics>,
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>, open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
/// store pending transactions that we've seen so that we don't send duplicates to subscribers /// store pending transactions that we've seen so that we don't send duplicates to subscribers
pub pending_transactions: Cache<TxHash, TxStatus>, pub pending_transactions: Cache<TxHash, TxStatus, ahash::RandomState>,
pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>, pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>,
pub frontend_key_rate_limiter: Option<DeferredRateLimiter<Uuid>>, pub frontend_key_rate_limiter: Option<DeferredRateLimiter<Uuid>>,
pub redis_pool: Option<RedisPool>, pub redis_pool: Option<RedisPool>,
pub user_cache: Cache<Uuid, UserCacheValue>, pub user_cache: Cache<Uuid, UserCacheValue, ahash::RandomState>,
} }
/// flatten a JoinError into an anyhow error /// flatten a JoinError into an anyhow error
@ -232,7 +232,10 @@ impl Web3ProxyApp {
// test the pool // test the pool
if let Err(err) = redis_pool.get().await { if let Err(err) = redis_pool.get().await {
error!("failed to connect to redis. some features will be disabled"); error!(
?err,
"failed to connect to redis. some features will be disabled"
);
}; };
Some(redis_pool) Some(redis_pool)
@ -252,8 +255,9 @@ impl Web3ProxyApp {
drop(pending_tx_receiver); drop(pending_tx_receiver);
// TODO: sized and timed expiration! // TODO: sized and timed expiration!
// TODO: put some in Redis, too? let pending_transactions = Cache::builder()
let pending_transactions = Cache::new(10000); .max_capacity(10_000)
.build_with_hasher(ahash::RandomState::new());
// TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them // TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them
// TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks. // TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks.
@ -261,7 +265,9 @@ impl Web3ProxyApp {
// this block map is shared between balanced_rpcs and private_rpcs. // this block map is shared between balanced_rpcs and private_rpcs.
// TODO: what limits should we have for expiration? // TODO: what limits should we have for expiration?
let block_map = BlockHashesMap::new(10_000); let block_map = Cache::builder()
.max_capacity(10_000)
.build_with_hasher(ahash::RandomState::new());
let (balanced_rpcs, balanced_handle) = Web3Connections::spawn( let (balanced_rpcs, balanced_handle) = Web3Connections::spawn(
top_config.app.chain_id, top_config.app.chain_id,
@ -332,9 +338,13 @@ impl Web3ProxyApp {
} }
// TODO: change this to a sized cache. theres some potentially giant responses that will break things // TODO: change this to a sized cache. theres some potentially giant responses that will break things
let response_cache = Cache::new(10_000); let response_cache = Cache::builder()
.max_capacity(10_000)
.build_with_hasher(ahash::RandomState::new());
let user_cache = Cache::new(10_000); let user_cache = Cache::builder()
.max_capacity(10_000)
.build_with_hasher(ahash::RandomState::new());
let app = Self { let app = Self {
config: top_config.app, config: top_config.app,

View File

@ -1,4 +1,4 @@
use crate::rpcs::blockchain::BlockHashesMap; use crate::rpcs::blockchain::BlockHashesCache;
use crate::rpcs::connection::Web3Connection; use crate::rpcs::connection::Web3Connection;
use crate::rpcs::request::OpenRequestHandleMetrics; use crate::rpcs::request::OpenRequestHandleMetrics;
use crate::{app::AnyhowJoinHandle, rpcs::blockchain::ArcBlock}; use crate::{app::AnyhowJoinHandle, rpcs::blockchain::ArcBlock};
@ -120,7 +120,7 @@ impl Web3ConnectionConfig {
chain_id: u64, chain_id: u64,
http_client: Option<reqwest::Client>, http_client: Option<reqwest::Client>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>, http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
block_map: BlockHashesMap, block_map: BlockHashesCache,
block_sender: Option<flume::Sender<BlockAndRpc>>, block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<TxHashAndRpc>>, tx_id_sender: Option<flume::Sender<TxHashAndRpc>>,
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>, open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,

View File

@ -19,7 +19,7 @@ use tracing::{debug, trace, warn};
// TODO: type for Hydrated Blocks with their full transactions? // TODO: type for Hydrated Blocks with their full transactions?
pub type ArcBlock = Arc<Block<TxHash>>; pub type ArcBlock = Arc<Block<TxHash>>;
pub type BlockHashesMap = Cache<H256, ArcBlock>; pub type BlockHashesCache = Cache<H256, ArcBlock, ahash::RandomState>;
/// A block's hash and number. /// A block's hash and number.
#[derive(Clone, Debug, Default, From, Serialize)] #[derive(Clone, Debug, Default, From, Serialize)]

View File

@ -1,5 +1,5 @@
///! Rate-limited communication with a web3 provider. ///! Rate-limited communication with a web3 provider.
use super::blockchain::{ArcBlock, BlockHashesMap, BlockId}; use super::blockchain::{ArcBlock, BlockHashesCache, BlockId};
use super::provider::Web3Provider; use super::provider::Web3Provider;
use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult}; use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult};
use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::app::{flatten_handle, AnyhowJoinHandle};
@ -69,7 +69,7 @@ impl Web3Connection {
hard_limit: Option<(u64, RedisPool)>, hard_limit: Option<(u64, RedisPool)>,
// TODO: think more about this type // TODO: think more about this type
soft_limit: u32, soft_limit: u32,
block_map: BlockHashesMap, block_map: BlockHashesCache,
block_sender: Option<flume::Sender<BlockAndRpc>>, block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>, tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool, reconnect: bool,
@ -366,7 +366,7 @@ impl Web3Connection {
self: &Arc<Self>, self: &Arc<Self>,
new_head_block: Result<Option<ArcBlock>, ProviderError>, new_head_block: Result<Option<ArcBlock>, ProviderError>,
block_sender: &flume::Sender<BlockAndRpc>, block_sender: &flume::Sender<BlockAndRpc>,
block_map: BlockHashesMap, block_map: BlockHashesCache,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
match new_head_block { match new_head_block {
Ok(None) => { Ok(None) => {
@ -428,7 +428,7 @@ impl Web3Connection {
async fn subscribe( async fn subscribe(
self: Arc<Self>, self: Arc<Self>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>, http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
block_map: BlockHashesMap, block_map: BlockHashesCache,
block_sender: Option<flume::Sender<BlockAndRpc>>, block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>, tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool, reconnect: bool,
@ -516,7 +516,7 @@ impl Web3Connection {
self: Arc<Self>, self: Arc<Self>,
http_interval_receiver: Option<broadcast::Receiver<()>>, http_interval_receiver: Option<broadcast::Receiver<()>>,
block_sender: flume::Sender<BlockAndRpc>, block_sender: flume::Sender<BlockAndRpc>,
block_map: BlockHashesMap, block_map: BlockHashesCache,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
info!(%self, "watching new heads"); info!(%self, "watching new heads");

View File

@ -1,5 +1,5 @@
///! Load balanced communication with a group of web3 providers ///! Load balanced communication with a group of web3 providers
use super::blockchain::{ArcBlock, BlockHashesMap}; use super::blockchain::{ArcBlock, BlockHashesCache};
use super::connection::Web3Connection; use super::connection::Web3Connection;
use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult}; use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult};
use super::synced_connections::SyncedConnections; use super::synced_connections::SyncedConnections;
@ -36,12 +36,12 @@ pub struct Web3Connections {
pub(super) conns: HashMap<String, Arc<Web3Connection>>, pub(super) conns: HashMap<String, Arc<Web3Connection>>,
/// any requests will be forwarded to one (or more) of these connections /// any requests will be forwarded to one (or more) of these connections
pub(super) synced_connections: ArcSwap<SyncedConnections>, pub(super) synced_connections: ArcSwap<SyncedConnections>,
pub(super) pending_transactions: Cache<TxHash, TxStatus>, pub(super) pending_transactions: Cache<TxHash, TxStatus, ahash::RandomState>,
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
/// all blocks, including orphans /// all blocks, including orphans
pub(super) block_hashes: BlockHashesMap, pub(super) block_hashes: BlockHashesCache,
/// blocks on the heaviest chain /// blocks on the heaviest chain
pub(super) block_numbers: Cache<U64, H256>, pub(super) block_numbers: Cache<U64, H256, ahash::RandomState>,
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
/// TODO: what should we use for edges? /// TODO: what should we use for edges?
pub(super) blockchain_graphmap: AsyncRwLock<DiGraphMap<H256, u32>>, pub(super) blockchain_graphmap: AsyncRwLock<DiGraphMap<H256, u32>>,
@ -57,12 +57,12 @@ impl Web3Connections {
server_configs: HashMap<String, Web3ConnectionConfig>, server_configs: HashMap<String, Web3ConnectionConfig>,
http_client: Option<reqwest::Client>, http_client: Option<reqwest::Client>,
redis_pool: Option<redis_rate_limiter::RedisPool>, redis_pool: Option<redis_rate_limiter::RedisPool>,
block_map: BlockHashesMap, block_map: BlockHashesCache,
head_block_sender: Option<watch::Sender<ArcBlock>>, head_block_sender: Option<watch::Sender<ArcBlock>>,
min_sum_soft_limit: u32, min_sum_soft_limit: u32,
min_synced_rpcs: usize, min_synced_rpcs: usize,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>, pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
pending_transactions: Cache<TxHash, TxStatus>, pending_transactions: Cache<TxHash, TxStatus, ahash::RandomState>,
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>, open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> { ) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
@ -176,8 +176,14 @@ impl Web3Connections {
let synced_connections = SyncedConnections::default(); let synced_connections = SyncedConnections::default();
// TODO: sizing and expiration on these caches! // TODO: sizing and expiration on these caches!
let block_hashes = Cache::new(10000); let block_hashes = Cache::builder()
let block_numbers = Cache::new(10000); .time_to_idle(Duration::from_secs(600))
.max_capacity(10_000)
.build_with_hasher(ahash::RandomState::new());
let block_numbers = Cache::builder()
.time_to_idle(Duration::from_secs(600))
.max_capacity(10_000)
.build_with_hasher(ahash::RandomState::new());
let connections = Arc::new(Self { let connections = Arc::new(Self {
conns: connections, conns: connections,