bug fix and better logs

This commit is contained in:
Bryan Stitt 2022-09-03 02:59:30 +00:00
parent 6f0ae1ec35
commit 8225285bb8
6 changed files with 31 additions and 19 deletions

View File

@ -1,10 +1,13 @@
[shared] [shared]
chain_id = 1 chain_id = 1
db_url = "mysql://root:dev_web3_proxy@dev-db:3306/dev_web3_proxy" db_url = "mysql://root:dev_web3_proxy@dev-db:3306/dev_web3_proxy"
# TODO: how do we find the optimal db_max_connections? too high actually ends up being slower
db_max_connections = 99 db_max_connections = 99
min_sum_soft_limit = 2000 min_sum_soft_limit = 2000
min_synced_rpcs = 2 min_synced_rpcs = 2
redis_url = "redis://dev-redis:6379/" redis_url = "redis://dev-redis:6379/"
# TODO: how do we find the optimal redis_max_connections? too high actually ends up being slower
redis_max_connections = 99
redirect_public_url = "https://llamanodes.com/free-rpc-stats" redirect_public_url = "https://llamanodes.com/free-rpc-stats"
redirect_user_url = "https://llamanodes.com/user-rpc-stats/{{user_id}}" redirect_user_url = "https://llamanodes.com/user-rpc-stats/{{user_id}}"
public_rate_limit_per_minute = 0 public_rate_limit_per_minute = 0

View File

@ -103,4 +103,8 @@ impl RedisRateLimit {
pub async fn throttle(&self) -> anyhow::Result<ThrottleResult> { pub async fn throttle(&self) -> anyhow::Result<ThrottleResult> {
self.throttle_label("", None, 1).await self.throttle_label("", None, 1).await
} }
pub fn max_requests_per_period(&self) -> u64 {
self.max_requests_per_period
}
} }

View File

@ -42,7 +42,7 @@ use tokio::sync::{broadcast, watch};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::{timeout, Instant}; use tokio::time::{timeout, Instant};
use tokio_stream::wrappers::{BroadcastStream, WatchStream}; use tokio_stream::wrappers::{BroadcastStream, WatchStream};
use tracing::{info, info_span, instrument, trace, warn, Instrument}; use tracing::{debug, info, info_span, instrument, trace, warn, Instrument};
use uuid::Uuid; use uuid::Uuid;
// TODO: make this customizable? // TODO: make this customizable?
@ -322,7 +322,7 @@ impl Web3ProxyApp {
redis_pool, redis_pool,
stats: app_stats, stats: app_stats,
// TODO: make the size configurable // TODO: make the size configurable
// TODO: why does this need to be async but the other one doesn't? // TODO: better type for this?
user_cache: RwLock::new(FifoCountMap::new(1_000)), user_cache: RwLock::new(FifoCountMap::new(1_000)),
}; };
@ -530,15 +530,13 @@ impl Web3ProxyApp {
&self, &self,
request: JsonRpcRequestEnum, request: JsonRpcRequestEnum,
) -> anyhow::Result<JsonRpcForwardedResponseEnum> { ) -> anyhow::Result<JsonRpcForwardedResponseEnum> {
trace!(?request, "proxy_web3_rpc"); debug!(?request, "proxy_web3_rpc");
// even though we have timeouts on the requests to our backend providers, // even though we have timeouts on the requests to our backend providers,
// we need a timeout for the incoming request so that retries don't run forever // we need a timeout for the incoming request so that retries don't run forever
// TODO: take this as an optional argument. per user max? expiration time instead of duration? // TODO: take this as an optional argument. per user max? expiration time instead of duration?
let max_time = Duration::from_secs(120); let max_time = Duration::from_secs(120);
// TODO: instrument this with a unique id
let response = match request { let response = match request {
JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single( JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single(
timeout(max_time, self.proxy_web3_rpc_request(request)).await??, timeout(max_time, self.proxy_web3_rpc_request(request)).await??,
@ -548,7 +546,7 @@ impl Web3ProxyApp {
), ),
}; };
trace!(?response, "Forwarding"); debug!(?response, "Forwarding");
Ok(response) Ok(response)
} }
@ -835,11 +833,14 @@ impl Web3ProxyApp {
self.cached_response(min_block_needed, &request).await?; self.cached_response(min_block_needed, &request).await?;
let response_cache = match cache_result { let response_cache = match cache_result {
Ok(response) => { Ok(mut response) => {
let _ = self.active_requests.remove(&cache_key); let _ = self.active_requests.remove(&cache_key);
// TODO: if the response is cached, should it count less against the account's costs? // TODO: if the response is cached, should it count less against the account's costs?
// TODO: cache just the result part of the response?
response.id = request.id;
return Ok(response); return Ok(response);
} }
Err(response_cache) => response_cache, Err(response_cache) => response_cache,

View File

@ -10,7 +10,7 @@ use sea_orm::{
}; };
use std::{net::IpAddr, time::Duration}; use std::{net::IpAddr, time::Duration};
use tokio::time::Instant; use tokio::time::Instant;
use tracing::{debug, warn}; use tracing::debug;
use uuid::Uuid; use uuid::Uuid;
pub enum RateLimitResult { pub enum RateLimitResult {
@ -117,10 +117,11 @@ impl TryFrom<RateLimitResult> for RequestFrom {
impl Web3ProxyApp { impl Web3ProxyApp {
pub async fn rate_limit_by_ip(&self, ip: IpAddr) -> anyhow::Result<RateLimitResult> { pub async fn rate_limit_by_ip(&self, ip: IpAddr) -> anyhow::Result<RateLimitResult> {
let rate_limiter_label = format!("ip-{}", ip);
// TODO: dry this up with rate_limit_by_key // TODO: dry this up with rate_limit_by_key
if let Some(rate_limiter) = &self.rate_limiter { if let Some(rate_limiter) = &self.rate_limiter {
let rate_limiter_label = format!("ip-{}", ip);
// TODO: query redis in the background so that users don't have to wait on this network request
match rate_limiter match rate_limiter
.throttle_label(&rate_limiter_label, None, 1) .throttle_label(&rate_limiter_label, None, 1)
.await .await
@ -133,7 +134,8 @@ impl Web3ProxyApp {
return Ok(RateLimitResult::IpRateLimitExceeded(ip)); return Ok(RateLimitResult::IpRateLimitExceeded(ip));
} }
Ok(ThrottleResult::RetryNever) => { Ok(ThrottleResult::RetryNever) => {
return Err(anyhow::anyhow!("blocked by rate limiter")) // TODO: prettier error for the user
return Err(anyhow::anyhow!("blocked by rate limiter"));
} }
Err(err) => { Err(err) => {
// internal error, not rate limit being hit // internal error, not rate limit being hit
@ -142,8 +144,8 @@ impl Web3ProxyApp {
} }
} }
} else { } else {
// TODO: if no redis, rate limit with a local cache? // TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right
warn!("no rate limiter!"); todo!("no rate limiter");
} }
Ok(RateLimitResult::AllowedIp(ip)) Ok(RateLimitResult::AllowedIp(ip))
@ -197,6 +199,7 @@ impl Web3ProxyApp {
)) ))
} }
None => { None => {
// TODO: think about this more
UserCacheValue::from(( UserCacheValue::from((
// TODO: how long should this cache last? get this from config // TODO: how long should this cache last? get this from config
Instant::now() + Duration::from_secs(60), Instant::now() + Duration::from_secs(60),
@ -225,6 +228,7 @@ impl Web3ProxyApp {
// user key is valid. now check rate limits // user key is valid. now check rate limits
if let Some(rate_limiter) = &self.rate_limiter { if let Some(rate_limiter) = &self.rate_limiter {
// TODO: query redis in the background so that users don't have to wait on this network request
if rate_limiter if rate_limiter
.throttle_label( .throttle_label(
&user_key.to_string(), &user_key.to_string(),
@ -242,7 +246,7 @@ impl Web3ProxyApp {
} }
} else { } else {
// TODO: if no redis, rate limit with a local cache? // TODO: if no redis, rate limit with a local cache?
unimplemented!("no redis. cannot rate limit") todo!("no redis. cannot rate limit")
} }
Ok(RateLimitResult::AllowedUser(user_data.user_id)) Ok(RateLimitResult::AllowedUser(user_data.user_id))

View File

@ -17,7 +17,7 @@ use serde::Serialize;
use serde_json::json; use serde_json::json;
use std::{cmp::Ordering, fmt::Display, sync::Arc}; use std::{cmp::Ordering, fmt::Display, sync::Arc};
use tokio::sync::{broadcast, watch}; use tokio::sync::{broadcast, watch};
use tracing::{debug, info, trace, warn}; use tracing::{debug, trace, warn};
pub type ArcBlock = Arc<Block<TxHash>>; pub type ArcBlock = Arc<Block<TxHash>>;
@ -272,7 +272,7 @@ impl Web3Connections {
} }
_ => { _ => {
// TODO: warn is too verbose. this is expected if a node disconnects and has to reconnect // TODO: warn is too verbose. this is expected if a node disconnects and has to reconnect
info!(%rpc, "Block without number or hash!"); trace!(%rpc, "Block without number or hash!");
connection_heads.remove(&rpc.name); connection_heads.remove(&rpc.name);

View File

@ -233,7 +233,7 @@ impl Web3Connection {
// websocket doesn't need the http client // websocket doesn't need the http client
let http_client = None; let http_client = None;
info!(?self, "reconnecting"); info!(%self, "reconnecting");
// since this lock is held open over an await, we use tokio's locking // since this lock is held open over an await, we use tokio's locking
// TODO: timeout on this lock. if its slow, something is wrong // TODO: timeout on this lock. if its slow, something is wrong
@ -437,7 +437,7 @@ impl Web3Connection {
block_sender: flume::Sender<BlockAndRpc>, block_sender: flume::Sender<BlockAndRpc>,
block_map: BlockHashesMap, block_map: BlockHashesMap,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
info!(?self, "watching new heads"); info!(%self, "watching new heads");
// TODO: is a RwLock of an Option<Arc> the right thing here? // TODO: is a RwLock of an Option<Arc> the right thing here?
if let Some(provider) = self.provider.read().await.clone() { if let Some(provider) = self.provider.read().await.clone() {
@ -554,7 +554,7 @@ impl Web3Connection {
self: Arc<Self>, self: Arc<Self>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>, tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
info!(?self, "watching pending transactions"); info!(%self, "watching pending transactions");
// TODO: is a RwLock of an Option<Arc> the right thing here? // TODO: is a RwLock of an Option<Arc> the right thing here?
if let Some(provider) = self.provider.read().await.clone() { if let Some(provider) = self.provider.read().await.clone() {