more dashmap

This commit is contained in:
Bryan Stitt 2022-09-05 01:52:59 +00:00
parent 1c2f3e1445
commit 593eb461b8
10 changed files with 79 additions and 117 deletions

4
Cargo.lock generated

@ -85,9 +85,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.63"
version = "1.0.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a26fa4d7e3f2eebadf743988fc8aec9fa9a9e82611acafd77c1462ed6262440a"
checksum = "b9a8f622bcf6ff3df478e9deba3e03e4e04b300f8e6a139e192c05fa3490afc7"
dependencies = [
"backtrace",
]

@ -137,6 +137,8 @@
- [ ] when using a bunch of slow public servers, i see "no servers in sync" even when things should be right
- [ ] i think checking the parents of the heaviest chain works most of the time, but not always
- maybe iterate connection heads by total weight? i still think we need to include parent hashes
- [ ] some of the DashMaps grow unbounded! Make a "SizedDashMap" that cleans up old rows with some garbage collection task
## V1

@ -5,6 +5,6 @@ authors = ["Bryan Stitt <bryan@stitthappens.com>"]
edition = "2021"
[dependencies]
anyhow = "1.0.63"
anyhow = "1.0.64"
bb8-redis = "0.11.0"
tracing = "0.1.36"

@ -16,7 +16,7 @@ entities = { path = "../entities" }
migration = { path = "../migration" }
redis-rate-limit = { path = "../redis-rate-limit" }
anyhow = { version = "1.0.63", features = ["backtrace"] }
anyhow = { version = "1.0.64", features = ["backtrace"] }
arc-swap = "1.5.1"
argh = "0.1.8"
axum = { version = "0.5.15", features = ["headers", "serde_json", "tokio-tungstenite", "ws"] }

@ -12,19 +12,16 @@ use crate::rpcs::transactions::TxStatus;
use crate::stats::AppStats;
use anyhow::Context;
use axum::extract::ws::Message;
use dashmap::mapref::entry::Entry as DashMapEntry;
use dashmap::DashMap;
use derive_more::From;
use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Block, Bytes, TxHash, H256, U64};
use fifomap::{FifoCountMap, FifoSizedMap};
use futures::future::Abortable;
use futures::future::{join_all, AbortHandle};
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::Future;
use migration::{Migrator, MigratorTrait};
use parking_lot::RwLock;
use redis_rate_limit::bb8::PooledConnection;
use redis_rate_limit::{
bb8::{self, ErrorSink},
@ -55,11 +52,10 @@ static APP_USER_AGENT: &str = concat!(
/// block hash, method, params
// TODO: better name
type CacheKey = (H256, String, Option<String>);
type ResponseCacheKey = (H256, String, Option<String>);
type ResponseLrcCache = RwLock<FifoSizedMap<CacheKey, JsonRpcForwardedResponse>>;
type ActiveRequestsMap = DashMap<CacheKey, watch::Receiver<bool>>;
// TODO!! a RWLock on this made us super slow. But a DashMap makes this grow unbounded!
type ResponseCache = DashMap<ResponseCacheKey, JsonRpcForwardedResponse>;
pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
@ -78,20 +74,20 @@ pub struct Web3ProxyApp {
pub balanced_rpcs: Arc<Web3Connections>,
/// Send private requests (like eth_sendRawTransaction) to all these servers
pub private_rpcs: Arc<Web3Connections>,
/// Track active requests so that we don't send the same query to multiple backends
pub active_requests: ActiveRequestsMap,
response_cache: ResponseLrcCache,
response_cache: ResponseCache,
// don't drop this or the sender will stop working
// TODO: broadcast channel instead?
head_block_receiver: watch::Receiver<ArcBlock>,
pending_tx_sender: broadcast::Sender<TxStatus>,
pub active_requests: AtomicUsize,
pub config: AppConfig,
pub db_conn: Option<sea_orm::DatabaseConnection>,
pub pending_transactions: Arc<DashMap<TxHash, TxStatus>>,
pub rate_limiter: Option<RedisRateLimit>,
pub redis_pool: Option<RedisPool>,
pub stats: AppStats,
pub user_cache: RwLock<FifoCountMap<Uuid, UserCacheValue>>,
// TODO: this grows unbounded! Make a "SizedDashMap" that cleans up old rows with some garbage collection task
pub user_cache: DashMap<Uuid, UserCacheValue>,
}
/// flatten a JoinError into an anyhow error
@ -304,16 +300,12 @@ impl Web3ProxyApp {
)
});
// keep the borrow checker happy
let response_cache_max_bytes = top_config.app.response_cache_max_bytes;
let app = Self {
config: top_config.app,
balanced_rpcs,
private_rpcs,
active_requests: Default::default(),
// TODO: make the share configurable. or maybe take a number as bytes?
response_cache: RwLock::new(FifoSizedMap::new(response_cache_max_bytes, 100)),
response_cache: Default::default(),
head_block_receiver,
pending_tx_sender,
pending_transactions,
@ -323,7 +315,7 @@ impl Web3ProxyApp {
stats: app_stats,
// TODO: make the size configurable
// TODO: better type for this?
user_cache: RwLock::new(FifoCountMap::new(1_000)),
user_cache: Default::default(),
};
let app = Arc::new(app);
@ -530,7 +522,8 @@ impl Web3ProxyApp {
&self,
request: JsonRpcRequestEnum,
) -> anyhow::Result<JsonRpcForwardedResponseEnum> {
debug!(?request, "proxy_web3_rpc");
// TODO: this should probably be trace level
trace!(?request, "proxy_web3_rpc");
// even though we have timeouts on the requests to our backend providers,
// we need a timeout for the incoming request so that retries don't run forever
@ -546,7 +539,8 @@ impl Web3ProxyApp {
),
};
debug!(?response, "Forwarding");
// TODO: this should probably be trace level
trace!(?response, "Forwarding");
Ok(response)
}
@ -593,10 +587,7 @@ impl Web3ProxyApp {
// TODO: accept a block hash here also?
min_block_needed: Option<&U64>,
request: &JsonRpcRequest,
) -> anyhow::Result<(
CacheKey,
Result<JsonRpcForwardedResponse, &ResponseLrcCache>,
)> {
) -> anyhow::Result<Result<JsonRpcForwardedResponse, ResponseCacheKey>> {
// TODO: inspect the request to pick the right cache
// TODO: https://github.com/ethereum/web3.py/blob/master/web3/middleware/cache.py
@ -617,20 +608,20 @@ impl Web3ProxyApp {
request.params.clone().map(|x| x.to_string()),
);
if let Some(response) = self.response_cache.read().get(&key) {
if let Some(response) = self.response_cache.get(&key) {
// TODO: emit a stat
trace!(?request.method, "cache hit!");
// TODO: can we make references work? maybe put them in an Arc?
return Ok((key, Ok(response.to_owned())));
} else {
// TODO: emit a stat
trace!(?request.method, "cache miss!");
return Ok(Ok(response.to_owned()));
}
let cache = &self.response_cache;
// TODO: another lock here so that we don't send the same request to a backend more than onces xzs
Ok((key, Err(cache)))
// TODO: emit a stat
trace!(?request.method, "cache miss!");
Ok(Err(key))
}
async fn proxy_web3_rpc_request(
@ -828,64 +819,21 @@ impl Web3ProxyApp {
trace!(?min_block_needed, ?method);
// TODO: emit a stat on error. maybe with .map_err?
let (cache_key, cache_result) =
let cached_response_result =
self.cached_response(min_block_needed, &request).await?;
let response_cache = match cache_result {
Ok(mut response) => {
let _ = self.active_requests.remove(&cache_key);
// TODO: emit a stat on error. maybe with .map_err?
let cache_key = match cached_response_result {
Ok(mut cache_result) => {
// put our request id on the cached response
// TODO: maybe only cache the inner result?
cache_result.id = request.id;
// TODO: if the response is cached, should it count less against the account's costs?
// TODO: cache just the result part of the response?
response.id = request.id;
return Ok(response);
return Ok(cache_result);
}
Err(response_cache) => response_cache,
Err(cache_key) => cache_key,
};
// check if this request is already in flight
// TODO: move this logic into an IncomingRequestHandler (ActiveRequestHandler has an rpc, but this won't)
let (incoming_tx, incoming_rx) = watch::channel(true);
let mut other_incoming_rx = None;
match self.active_requests.entry(cache_key.clone()) {
DashMapEntry::Occupied(entry) => {
other_incoming_rx = Some(entry.get().clone());
}
DashMapEntry::Vacant(entry) => {
entry.insert(incoming_rx);
}
}
if let Some(mut other_incoming_rx) = other_incoming_rx {
// wait for the other request to finish. it might have finished successfully or with an error
trace!("{:?} waiting on in-flight request", request);
let _ = other_incoming_rx.changed().await;
// now that we've waited, lets check the cache again
if let Some(cached) = response_cache.read().get(&cache_key) {
let _ = self.active_requests.remove(&cache_key);
let _ = incoming_tx.send(false);
// TODO: emit a stat
trace!(
"{:?} cache hit after waiting for in-flight request!",
request
);
return Ok(cached.to_owned());
} else {
// TODO: emit a stat
trace!(
"{:?} cache miss after waiting for in-flight request!",
request
);
}
}
let response = match method {
"temporarily disabled" => {
// "eth_getTransactionByHash" | "eth_getTransactionReceipt" => {
@ -904,19 +852,15 @@ impl Web3ProxyApp {
// TODO: move this caching outside this match and cache some of the other responses?
// TODO: cache the warp::reply to save us serializing every time?
if self
.response_cache
.insert(cache_key.clone(), response.clone())
.is_some()
{
let mut response_cache = response_cache.write();
if response_cache.insert(cache_key.clone(), response.clone()) {
} else {
// TODO: emit a stat instead? what else should be in the log
trace!(?cache_key, "value too large for caching");
}
// TODO: we had another lock to prevent this, but its not worth the extra locking
debug!("already cached")
}
let _ = self.active_requests.remove(&cache_key);
let _ = incoming_tx.send(false);
return Ok(response);
}
};

@ -1,7 +1,7 @@
use crate::app::Web3ProxyApp;
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
use serde_json::json;
use std::sync::Arc;
use std::sync::{atomic::Ordering, Arc};
/// Health check page for load balancers to use
pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
@ -19,7 +19,7 @@ pub async fn status(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoRe
let body = json!({
"balanced_rpcs": app.balanced_rpcs,
"private_rpcs": app.private_rpcs,
"num_active_requests": app.active_requests.len(),
"num_active_requests": app.active_requests.load(Ordering::Acquire),
"num_pending_transactions": app.pending_transactions.len(),
});

@ -195,14 +195,14 @@ impl Web3ProxyApp {
};
// save for the next run
self.user_cache.write().insert(user_key, user_data);
self.user_cache.insert(user_key, user_data);
Ok(user_data)
}
pub async fn rate_limit_by_key(&self, user_key: Uuid) -> anyhow::Result<RateLimitResult> {
// check the local cache
let user_data = if let Some(cached_user) = self.user_cache.read().get(&user_key) {
let user_data = if let Some(cached_user) = self.user_cache.get(&user_key) {
// TODO: also include the time this value was last checked! otherwise we cache forever!
if cached_user.expires_at < Instant::now() {
// old record found

@ -253,6 +253,7 @@ pub async fn post_login(
}
/// the JSON input to the `post_user` handler
/// This handles updating
#[derive(Deserialize)]
pub struct PostUser {
primary_address: Address,
@ -262,7 +263,7 @@ pub struct PostUser {
}
#[debug_handler]
/// post to the user endpoint to modify your account
/// post to the user endpoint to modify your existing account
pub async fn post_user(
AuthBearer(bearer_token): AuthBearer,
ClientIp(ip): ClientIp,
@ -271,10 +272,26 @@ pub async fn post_user(
) -> FrontendResult {
let _ip = rate_limit_by_ip(&app, ip).await?;
ProtectedAction::PostUser
let user = ProtectedAction::PostUser
.verify(app.as_ref(), bearer_token, &payload.primary_address)
.await?;
let mut user: user::ActiveModel = user.into();
// TODO: rate limit by user, too?
if let Some(x) = payload.email {
if x.is_empty() {
user.email = sea_orm::Set(None);
} else {
user.email = sea_orm::Set(Some(x));
}
}
let db = app.db_conn.as_ref().unwrap();
user.save(db).await?;
// let user = user::ActiveModel {
// address: sea_orm::Set(payload.address.to_fixed_bytes().into()),
// email: sea_orm::Set(payload.email),
@ -295,7 +312,7 @@ impl ProtectedAction {
app: &Web3ProxyApp,
bearer_token: String,
primary_address: &Address,
) -> anyhow::Result<()> {
) -> anyhow::Result<user::Model> {
// get the attached address from redis for the given auth_token.
let bearer_key = format!("bearer:{}", bearer_token);

@ -17,7 +17,7 @@ use serde::Serialize;
use serde_json::json;
use std::{cmp::Ordering, fmt::Display, sync::Arc};
use tokio::sync::{broadcast, watch};
use tracing::{debug, trace, warn};
use tracing::{debug, info, trace, warn};
pub type ArcBlock = Arc<Block<TxHash>>;
@ -60,10 +60,10 @@ impl Web3Connections {
Entry::Occupied(mut x) => {
let old_hash = x.insert(*block_hash);
// if block_hash == &old_hash {
// // this block has already been saved
// return Ok(());
// }
if block_hash == &old_hash {
// this block has already been saved
return Ok(());
}
// TODO: what should we do?
// TODO: if old_hash's number is > block_num, we need to remove more entries
@ -284,19 +284,19 @@ impl Web3Connections {
// iterate the known heads to find the highest_work_block
let mut checked_heads = HashSet::new();
let mut highest_work_block: Option<Ref<H256, ArcBlock>> = None;
for rpc_head_hash in connection_heads.values() {
if checked_heads.contains(rpc_head_hash) {
for conn_head_hash in connection_heads.values() {
if checked_heads.contains(conn_head_hash) {
// we already checked this head from another rpc
continue;
}
// don't check the same hash multiple times
checked_heads.insert(rpc_head_hash);
checked_heads.insert(conn_head_hash);
let rpc_head_block = if let Some(x) = self.block_hashes.get(rpc_head_hash) {
let rpc_head_block = if let Some(x) = self.block_hashes.get(conn_head_hash) {
x
} else {
// TODO: why does this happen?
warn!(%rpc_head_hash, %rpc, "No block found");
warn!(%conn_head_hash, %rpc, "No block found");
continue;
};
@ -404,6 +404,7 @@ impl Web3Connections {
.synced_connections
.swap(Arc::new(new_synced_connections));
// TODO: add these to the log messages
let num_connection_heads = connection_heads.len();
let total_conns = self.conns.len();
@ -422,11 +423,10 @@ impl Web3Connections {
// multiple blocks with the same fork!
if heavy_block_id.hash == old_block_id.hash {
// no change in hash. no need to use head_block_sender
debug!(head=%heavy_block_id, %rpc, "con block")
debug!(head=%heavy_block_id, old=%old_block_id, %rpc, "con block")
} else {
// hash changed
// TODO: better log
warn!(heavy=%heavy_block_id, %rpc, "fork detected");
info!(heavy=%heavy_block_id, old=%old_block_id, %rpc, "unc block");
// todo!("handle equal by updating the cannonical chain");
self.save_block(&rpc_head_block, Some(true))?;

@ -71,7 +71,6 @@ impl Web3Connections {
}
trace!(?pending_tx_id, "checking pending_transactions on {}", rpc);
if self.pending_transactions.contains_key(&pending_tx_id) {
// this transaction has already been processed
return Ok(());