From 8225285bb8bbbb926a87d412604d046b7f85f44e Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 3 Sep 2022 02:59:30 +0000 Subject: [PATCH] bug fix and better logs --- config/example.toml | 3 +++ redis-rate-limit/src/lib.rs | 4 ++++ web3_proxy/src/app.rs | 15 ++++++++------- web3_proxy/src/frontend/rate_limit.rs | 18 +++++++++++------- web3_proxy/src/rpcs/blockchain.rs | 4 ++-- web3_proxy/src/rpcs/connection.rs | 6 +++--- 6 files changed, 31 insertions(+), 19 deletions(-) diff --git a/config/example.toml b/config/example.toml index 096d16ee..7de66214 100644 --- a/config/example.toml +++ b/config/example.toml @@ -1,10 +1,13 @@ [shared] chain_id = 1 db_url = "mysql://root:dev_web3_proxy@dev-db:3306/dev_web3_proxy" +# TODO: how do we find the optimal db_max_connections? too high actually ends up being slower db_max_connections = 99 min_sum_soft_limit = 2000 min_synced_rpcs = 2 redis_url = "redis://dev-redis:6379/" +# TODO: how do we find the optimal redis_max_connections? too high actually ends up being slower +redis_max_connections = 99 redirect_public_url = "https://llamanodes.com/free-rpc-stats" redirect_user_url = "https://llamanodes.com/user-rpc-stats/{{user_id}}" public_rate_limit_per_minute = 0 diff --git a/redis-rate-limit/src/lib.rs b/redis-rate-limit/src/lib.rs index 45dd3d43..132d7717 100644 --- a/redis-rate-limit/src/lib.rs +++ b/redis-rate-limit/src/lib.rs @@ -103,4 +103,8 @@ impl RedisRateLimit { pub async fn throttle(&self) -> anyhow::Result { self.throttle_label("", None, 1).await } + + pub fn max_requests_per_period(&self) -> u64 { + self.max_requests_per_period + } } diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 3cfc6529..0ec1d36c 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -42,7 +42,7 @@ use tokio::sync::{broadcast, watch}; use tokio::task::JoinHandle; use tokio::time::{timeout, Instant}; use tokio_stream::wrappers::{BroadcastStream, WatchStream}; -use tracing::{info, info_span, instrument, trace, warn, Instrument}; +use tracing::{debug, info, info_span, instrument, trace, warn, Instrument}; use uuid::Uuid; // TODO: make this customizable? @@ -322,7 +322,7 @@ impl Web3ProxyApp { redis_pool, stats: app_stats, // TODO: make the size configurable - // TODO: why does this need to be async but the other one doesn't? + // TODO: better type for this? user_cache: RwLock::new(FifoCountMap::new(1_000)), }; @@ -530,15 +530,13 @@ impl Web3ProxyApp { &self, request: JsonRpcRequestEnum, ) -> anyhow::Result { - trace!(?request, "proxy_web3_rpc"); + debug!(?request, "proxy_web3_rpc"); // even though we have timeouts on the requests to our backend providers, // we need a timeout for the incoming request so that retries don't run forever // TODO: take this as an optional argument. per user max? expiration time instead of duration? let max_time = Duration::from_secs(120); - // TODO: instrument this with a unique id - let response = match request { JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single( timeout(max_time, self.proxy_web3_rpc_request(request)).await??, @@ -548,7 +546,7 @@ impl Web3ProxyApp { ), }; - trace!(?response, "Forwarding"); + debug!(?response, "Forwarding"); Ok(response) } @@ -835,11 +833,14 @@ impl Web3ProxyApp { self.cached_response(min_block_needed, &request).await?; let response_cache = match cache_result { - Ok(response) => { + Ok(mut response) => { let _ = self.active_requests.remove(&cache_key); // TODO: if the response is cached, should it count less against the account's costs? + // TODO: cache just the result part of the response? + response.id = request.id; + return Ok(response); } Err(response_cache) => response_cache, diff --git a/web3_proxy/src/frontend/rate_limit.rs b/web3_proxy/src/frontend/rate_limit.rs index 182ff3e4..2b568a3a 100644 --- a/web3_proxy/src/frontend/rate_limit.rs +++ b/web3_proxy/src/frontend/rate_limit.rs @@ -10,7 +10,7 @@ use sea_orm::{ }; use std::{net::IpAddr, time::Duration}; use tokio::time::Instant; -use tracing::{debug, warn}; +use tracing::debug; use uuid::Uuid; pub enum RateLimitResult { @@ -117,10 +117,11 @@ impl TryFrom for RequestFrom { impl Web3ProxyApp { pub async fn rate_limit_by_ip(&self, ip: IpAddr) -> anyhow::Result { - let rate_limiter_label = format!("ip-{}", ip); - // TODO: dry this up with rate_limit_by_key if let Some(rate_limiter) = &self.rate_limiter { + let rate_limiter_label = format!("ip-{}", ip); + + // TODO: query redis in the background so that users don't have to wait on this network request match rate_limiter .throttle_label(&rate_limiter_label, None, 1) .await @@ -133,7 +134,8 @@ impl Web3ProxyApp { return Ok(RateLimitResult::IpRateLimitExceeded(ip)); } Ok(ThrottleResult::RetryNever) => { - return Err(anyhow::anyhow!("blocked by rate limiter")) + // TODO: prettier error for the user + return Err(anyhow::anyhow!("blocked by rate limiter")); } Err(err) => { // internal error, not rate limit being hit @@ -142,8 +144,8 @@ impl Web3ProxyApp { } } } else { - // TODO: if no redis, rate limit with a local cache? - warn!("no rate limiter!"); + // TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right + todo!("no rate limiter"); } Ok(RateLimitResult::AllowedIp(ip)) @@ -197,6 +199,7 @@ impl Web3ProxyApp { )) } None => { + // TODO: think about this more UserCacheValue::from(( // TODO: how long should this cache last? get this from config Instant::now() + Duration::from_secs(60), @@ -225,6 +228,7 @@ impl Web3ProxyApp { // user key is valid. now check rate limits if let Some(rate_limiter) = &self.rate_limiter { + // TODO: query redis in the background so that users don't have to wait on this network request if rate_limiter .throttle_label( &user_key.to_string(), @@ -242,7 +246,7 @@ impl Web3ProxyApp { } } else { // TODO: if no redis, rate limit with a local cache? - unimplemented!("no redis. cannot rate limit") + todo!("no redis. cannot rate limit") } Ok(RateLimitResult::AllowedUser(user_data.user_id)) diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 53da5eec..77f7802e 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -17,7 +17,7 @@ use serde::Serialize; use serde_json::json; use std::{cmp::Ordering, fmt::Display, sync::Arc}; use tokio::sync::{broadcast, watch}; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, trace, warn}; pub type ArcBlock = Arc>; @@ -272,7 +272,7 @@ impl Web3Connections { } _ => { // TODO: warn is too verbose. this is expected if a node disconnects and has to reconnect - info!(%rpc, "Block without number or hash!"); + trace!(%rpc, "Block without number or hash!"); connection_heads.remove(&rpc.name); diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index da31e895..660723d8 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -233,7 +233,7 @@ impl Web3Connection { // websocket doesn't need the http client let http_client = None; - info!(?self, "reconnecting"); + info!(%self, "reconnecting"); // since this lock is held open over an await, we use tokio's locking // TODO: timeout on this lock. if its slow, something is wrong @@ -437,7 +437,7 @@ impl Web3Connection { block_sender: flume::Sender, block_map: BlockHashesMap, ) -> anyhow::Result<()> { - info!(?self, "watching new heads"); + info!(%self, "watching new heads"); // TODO: is a RwLock of an Option the right thing here? if let Some(provider) = self.provider.read().await.clone() { @@ -554,7 +554,7 @@ impl Web3Connection { self: Arc, tx_id_sender: flume::Sender<(TxHash, Arc)>, ) -> anyhow::Result<()> { - info!(?self, "watching pending transactions"); + info!(%self, "watching pending transactions"); // TODO: is a RwLock of an Option the right thing here? if let Some(provider) = self.provider.read().await.clone() {