diff --git a/Cargo.lock b/Cargo.lock index db1867d0..5d6a2b33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -85,9 +85,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.63" +version = "1.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a26fa4d7e3f2eebadf743988fc8aec9fa9a9e82611acafd77c1462ed6262440a" +checksum = "b9a8f622bcf6ff3df478e9deba3e03e4e04b300f8e6a139e192c05fa3490afc7" dependencies = [ "backtrace", ] diff --git a/TODO.md b/TODO.md index 7b88e3da..e514cc3c 100644 --- a/TODO.md +++ b/TODO.md @@ -137,6 +137,8 @@ - [ ] when using a bunch of slow public servers, i see "no servers in sync" even when things should be right - [ ] i think checking the parents of the heaviest chain works most of the time, but not always - maybe iterate connection heads by total weight? i still think we need to include parent hashes +- [ ] some of the DashMaps grow unbounded! Make a "SizedDashMap" that cleans up old rows with some garbage collection task + ## V1 diff --git a/redis-rate-limit/Cargo.toml b/redis-rate-limit/Cargo.toml index 399ead19..e1178aa0 100644 --- a/redis-rate-limit/Cargo.toml +++ b/redis-rate-limit/Cargo.toml @@ -5,6 +5,6 @@ authors = ["Bryan Stitt "] edition = "2021" [dependencies] -anyhow = "1.0.63" +anyhow = "1.0.64" bb8-redis = "0.11.0" tracing = "0.1.36" diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index f3452256..f3ca836f 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -16,7 +16,7 @@ entities = { path = "../entities" } migration = { path = "../migration" } redis-rate-limit = { path = "../redis-rate-limit" } -anyhow = { version = "1.0.63", features = ["backtrace"] } +anyhow = { version = "1.0.64", features = ["backtrace"] } arc-swap = "1.5.1" argh = "0.1.8" axum = { version = "0.5.15", features = ["headers", "serde_json", "tokio-tungstenite", "ws"] } diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 0ec1d36c..4dbbf971 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -12,19 +12,16 @@ use crate::rpcs::transactions::TxStatus; use crate::stats::AppStats; use anyhow::Context; use axum::extract::ws::Message; -use dashmap::mapref::entry::Entry as DashMapEntry; use dashmap::DashMap; use derive_more::From; use ethers::core::utils::keccak256; use ethers::prelude::{Address, Block, Bytes, TxHash, H256, U64}; -use fifomap::{FifoCountMap, FifoSizedMap}; use futures::future::Abortable; use futures::future::{join_all, AbortHandle}; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use futures::Future; use migration::{Migrator, MigratorTrait}; -use parking_lot::RwLock; use redis_rate_limit::bb8::PooledConnection; use redis_rate_limit::{ bb8::{self, ErrorSink}, @@ -55,11 +52,10 @@ static APP_USER_AGENT: &str = concat!( /// block hash, method, params // TODO: better name -type CacheKey = (H256, String, Option); +type ResponseCacheKey = (H256, String, Option); -type ResponseLrcCache = RwLock>; - -type ActiveRequestsMap = DashMap>; +// TODO!! a RWLock on this made us super slow. But a DashMap makes this grow unbounded! +type ResponseCache = DashMap; pub type AnyhowJoinHandle = JoinHandle>; @@ -78,20 +74,20 @@ pub struct Web3ProxyApp { pub balanced_rpcs: Arc, /// Send private requests (like eth_sendRawTransaction) to all these servers pub private_rpcs: Arc, - /// Track active requests so that we don't send the same query to multiple backends - pub active_requests: ActiveRequestsMap, - response_cache: ResponseLrcCache, + response_cache: ResponseCache, // don't drop this or the sender will stop working // TODO: broadcast channel instead? head_block_receiver: watch::Receiver, pending_tx_sender: broadcast::Sender, + pub active_requests: AtomicUsize, pub config: AppConfig, pub db_conn: Option, pub pending_transactions: Arc>, pub rate_limiter: Option, pub redis_pool: Option, pub stats: AppStats, - pub user_cache: RwLock>, + // TODO: this grows unbounded! Make a "SizedDashMap" that cleans up old rows with some garbage collection task + pub user_cache: DashMap, } /// flatten a JoinError into an anyhow error @@ -304,16 +300,12 @@ impl Web3ProxyApp { ) }); - // keep the borrow checker happy - let response_cache_max_bytes = top_config.app.response_cache_max_bytes; - let app = Self { config: top_config.app, balanced_rpcs, private_rpcs, active_requests: Default::default(), - // TODO: make the share configurable. or maybe take a number as bytes? - response_cache: RwLock::new(FifoSizedMap::new(response_cache_max_bytes, 100)), + response_cache: Default::default(), head_block_receiver, pending_tx_sender, pending_transactions, @@ -323,7 +315,7 @@ impl Web3ProxyApp { stats: app_stats, // TODO: make the size configurable // TODO: better type for this? - user_cache: RwLock::new(FifoCountMap::new(1_000)), + user_cache: Default::default(), }; let app = Arc::new(app); @@ -530,7 +522,8 @@ impl Web3ProxyApp { &self, request: JsonRpcRequestEnum, ) -> anyhow::Result { - debug!(?request, "proxy_web3_rpc"); + // TODO: this should probably be trace level + trace!(?request, "proxy_web3_rpc"); // even though we have timeouts on the requests to our backend providers, // we need a timeout for the incoming request so that retries don't run forever @@ -546,7 +539,8 @@ impl Web3ProxyApp { ), }; - debug!(?response, "Forwarding"); + // TODO: this should probably be trace level + trace!(?response, "Forwarding"); Ok(response) } @@ -593,10 +587,7 @@ impl Web3ProxyApp { // TODO: accept a block hash here also? min_block_needed: Option<&U64>, request: &JsonRpcRequest, - ) -> anyhow::Result<( - CacheKey, - Result, - )> { + ) -> anyhow::Result> { // TODO: inspect the request to pick the right cache // TODO: https://github.com/ethereum/web3.py/blob/master/web3/middleware/cache.py @@ -617,20 +608,20 @@ impl Web3ProxyApp { request.params.clone().map(|x| x.to_string()), ); - if let Some(response) = self.response_cache.read().get(&key) { + if let Some(response) = self.response_cache.get(&key) { // TODO: emit a stat trace!(?request.method, "cache hit!"); // TODO: can we make references work? maybe put them in an Arc? - return Ok((key, Ok(response.to_owned()))); - } else { - // TODO: emit a stat - trace!(?request.method, "cache miss!"); + return Ok(Ok(response.to_owned())); } - let cache = &self.response_cache; + // TODO: another lock here so that we don't send the same request to a backend more than onces xzs - Ok((key, Err(cache))) + // TODO: emit a stat + trace!(?request.method, "cache miss!"); + + Ok(Err(key)) } async fn proxy_web3_rpc_request( @@ -828,64 +819,21 @@ impl Web3ProxyApp { trace!(?min_block_needed, ?method); - // TODO: emit a stat on error. maybe with .map_err? - let (cache_key, cache_result) = + let cached_response_result = self.cached_response(min_block_needed, &request).await?; - let response_cache = match cache_result { - Ok(mut response) => { - let _ = self.active_requests.remove(&cache_key); + // TODO: emit a stat on error. maybe with .map_err? + let cache_key = match cached_response_result { + Ok(mut cache_result) => { + // put our request id on the cached response + // TODO: maybe only cache the inner result? + cache_result.id = request.id; - // TODO: if the response is cached, should it count less against the account's costs? - - // TODO: cache just the result part of the response? - response.id = request.id; - - return Ok(response); + return Ok(cache_result); } - Err(response_cache) => response_cache, + Err(cache_key) => cache_key, }; - // check if this request is already in flight - // TODO: move this logic into an IncomingRequestHandler (ActiveRequestHandler has an rpc, but this won't) - let (incoming_tx, incoming_rx) = watch::channel(true); - let mut other_incoming_rx = None; - match self.active_requests.entry(cache_key.clone()) { - DashMapEntry::Occupied(entry) => { - other_incoming_rx = Some(entry.get().clone()); - } - DashMapEntry::Vacant(entry) => { - entry.insert(incoming_rx); - } - } - - if let Some(mut other_incoming_rx) = other_incoming_rx { - // wait for the other request to finish. it might have finished successfully or with an error - trace!("{:?} waiting on in-flight request", request); - - let _ = other_incoming_rx.changed().await; - - // now that we've waited, lets check the cache again - if let Some(cached) = response_cache.read().get(&cache_key) { - let _ = self.active_requests.remove(&cache_key); - let _ = incoming_tx.send(false); - - // TODO: emit a stat - trace!( - "{:?} cache hit after waiting for in-flight request!", - request - ); - - return Ok(cached.to_owned()); - } else { - // TODO: emit a stat - trace!( - "{:?} cache miss after waiting for in-flight request!", - request - ); - } - } - let response = match method { "temporarily disabled" => { // "eth_getTransactionByHash" | "eth_getTransactionReceipt" => { @@ -904,19 +852,15 @@ impl Web3ProxyApp { // TODO: move this caching outside this match and cache some of the other responses? // TODO: cache the warp::reply to save us serializing every time? + if self + .response_cache + .insert(cache_key.clone(), response.clone()) + .is_some() { - let mut response_cache = response_cache.write(); - - if response_cache.insert(cache_key.clone(), response.clone()) { - } else { - // TODO: emit a stat instead? what else should be in the log - trace!(?cache_key, "value too large for caching"); - } + // TODO: we had another lock to prevent this, but its not worth the extra locking + debug!("already cached") } - let _ = self.active_requests.remove(&cache_key); - let _ = incoming_tx.send(false); - return Ok(response); } }; diff --git a/web3_proxy/src/frontend/http.rs b/web3_proxy/src/frontend/http.rs index 9f443c90..d8777c31 100644 --- a/web3_proxy/src/frontend/http.rs +++ b/web3_proxy/src/frontend/http.rs @@ -1,7 +1,7 @@ use crate::app::Web3ProxyApp; use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; use serde_json::json; -use std::sync::Arc; +use std::sync::{atomic::Ordering, Arc}; /// Health check page for load balancers to use pub async fn health(Extension(app): Extension>) -> impl IntoResponse { @@ -19,7 +19,7 @@ pub async fn status(Extension(app): Extension>) -> impl IntoRe let body = json!({ "balanced_rpcs": app.balanced_rpcs, "private_rpcs": app.private_rpcs, - "num_active_requests": app.active_requests.len(), + "num_active_requests": app.active_requests.load(Ordering::Acquire), "num_pending_transactions": app.pending_transactions.len(), }); diff --git a/web3_proxy/src/frontend/rate_limit.rs b/web3_proxy/src/frontend/rate_limit.rs index c4f2905a..bc69dba9 100644 --- a/web3_proxy/src/frontend/rate_limit.rs +++ b/web3_proxy/src/frontend/rate_limit.rs @@ -195,14 +195,14 @@ impl Web3ProxyApp { }; // save for the next run - self.user_cache.write().insert(user_key, user_data); + self.user_cache.insert(user_key, user_data); Ok(user_data) } pub async fn rate_limit_by_key(&self, user_key: Uuid) -> anyhow::Result { // check the local cache - let user_data = if let Some(cached_user) = self.user_cache.read().get(&user_key) { + let user_data = if let Some(cached_user) = self.user_cache.get(&user_key) { // TODO: also include the time this value was last checked! otherwise we cache forever! if cached_user.expires_at < Instant::now() { // old record found diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 880b8f58..4ab00569 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -253,6 +253,7 @@ pub async fn post_login( } /// the JSON input to the `post_user` handler +/// This handles updating #[derive(Deserialize)] pub struct PostUser { primary_address: Address, @@ -262,7 +263,7 @@ pub struct PostUser { } #[debug_handler] -/// post to the user endpoint to modify your account +/// post to the user endpoint to modify your existing account pub async fn post_user( AuthBearer(bearer_token): AuthBearer, ClientIp(ip): ClientIp, @@ -271,10 +272,26 @@ pub async fn post_user( ) -> FrontendResult { let _ip = rate_limit_by_ip(&app, ip).await?; - ProtectedAction::PostUser + let user = ProtectedAction::PostUser .verify(app.as_ref(), bearer_token, &payload.primary_address) .await?; + let mut user: user::ActiveModel = user.into(); + + // TODO: rate limit by user, too? + + if let Some(x) = payload.email { + if x.is_empty() { + user.email = sea_orm::Set(None); + } else { + user.email = sea_orm::Set(Some(x)); + } + } + + let db = app.db_conn.as_ref().unwrap(); + + user.save(db).await?; + // let user = user::ActiveModel { // address: sea_orm::Set(payload.address.to_fixed_bytes().into()), // email: sea_orm::Set(payload.email), @@ -295,7 +312,7 @@ impl ProtectedAction { app: &Web3ProxyApp, bearer_token: String, primary_address: &Address, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { // get the attached address from redis for the given auth_token. let bearer_key = format!("bearer:{}", bearer_token); diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 77f7802e..2ed74a09 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -17,7 +17,7 @@ use serde::Serialize; use serde_json::json; use std::{cmp::Ordering, fmt::Display, sync::Arc}; use tokio::sync::{broadcast, watch}; -use tracing::{debug, trace, warn}; +use tracing::{debug, info, trace, warn}; pub type ArcBlock = Arc>; @@ -60,10 +60,10 @@ impl Web3Connections { Entry::Occupied(mut x) => { let old_hash = x.insert(*block_hash); - // if block_hash == &old_hash { - // // this block has already been saved - // return Ok(()); - // } + if block_hash == &old_hash { + // this block has already been saved + return Ok(()); + } // TODO: what should we do? // TODO: if old_hash's number is > block_num, we need to remove more entries @@ -284,19 +284,19 @@ impl Web3Connections { // iterate the known heads to find the highest_work_block let mut checked_heads = HashSet::new(); let mut highest_work_block: Option> = None; - for rpc_head_hash in connection_heads.values() { - if checked_heads.contains(rpc_head_hash) { + for conn_head_hash in connection_heads.values() { + if checked_heads.contains(conn_head_hash) { // we already checked this head from another rpc continue; } // don't check the same hash multiple times - checked_heads.insert(rpc_head_hash); + checked_heads.insert(conn_head_hash); - let rpc_head_block = if let Some(x) = self.block_hashes.get(rpc_head_hash) { + let rpc_head_block = if let Some(x) = self.block_hashes.get(conn_head_hash) { x } else { // TODO: why does this happen? - warn!(%rpc_head_hash, %rpc, "No block found"); + warn!(%conn_head_hash, %rpc, "No block found"); continue; }; @@ -404,6 +404,7 @@ impl Web3Connections { .synced_connections .swap(Arc::new(new_synced_connections)); + // TODO: add these to the log messages let num_connection_heads = connection_heads.len(); let total_conns = self.conns.len(); @@ -422,11 +423,10 @@ impl Web3Connections { // multiple blocks with the same fork! if heavy_block_id.hash == old_block_id.hash { // no change in hash. no need to use head_block_sender - debug!(head=%heavy_block_id, %rpc, "con block") + debug!(head=%heavy_block_id, old=%old_block_id, %rpc, "con block") } else { // hash changed - // TODO: better log - warn!(heavy=%heavy_block_id, %rpc, "fork detected"); + info!(heavy=%heavy_block_id, old=%old_block_id, %rpc, "unc block"); // todo!("handle equal by updating the cannonical chain"); self.save_block(&rpc_head_block, Some(true))?; diff --git a/web3_proxy/src/rpcs/transactions.rs b/web3_proxy/src/rpcs/transactions.rs index d62ac4be..481d1d72 100644 --- a/web3_proxy/src/rpcs/transactions.rs +++ b/web3_proxy/src/rpcs/transactions.rs @@ -71,7 +71,6 @@ impl Web3Connections { } trace!(?pending_tx_id, "checking pending_transactions on {}", rpc); - if self.pending_transactions.contains_key(&pending_tx_id) { // this transaction has already been processed return Ok(());