diff --git a/TODO.md b/TODO.md index 735c9a82..b02e8fce 100644 --- a/TODO.md +++ b/TODO.md @@ -206,3 +206,5 @@ in another repo: event subscriber - [ ] better error handling. we warn too often for validation errors and use the same error code for most every request - [ ] use &str more instead of String. lifetime annotations get really annoying though - [ ] tarpit instead of reject requests (unless theres a lot) +- [ ] tune database connection pool size. i think a single web3-proxy currently maxes out our server +- [ ] subscribing to transactions should be configurable per server. listening to paid servers can get expensive diff --git a/config/example.toml b/config/example.toml index 888c4d1d..000d88cc 100644 --- a/config/example.toml +++ b/config/example.toml @@ -2,7 +2,7 @@ chain_id = 1 db_url = "mysql://root:dev_web3_proxy@dev-mysql:3306/dev_web3_proxy" redis_url = "redis://dev-redis:6379/" -public_rate_limit_per_minute = 60_000 +public_rate_limit_per_minute = 0 # 1GB of cache response_cache_max_bytes = 10000000000 diff --git a/redis-cell-client/src/lib.rs b/redis-cell-client/src/lib.rs index 2ecf571c..71fb09e8 100644 --- a/redis-cell-client/src/lib.rs +++ b/redis-cell-client/src/lib.rs @@ -48,11 +48,16 @@ impl RedisCellClient { count_per_period: Option, period: Option, quantity: u32, - ) -> Result<(), Duration> { + ) -> Result<(), Option> { let mut conn = self.pool.get().await.unwrap(); - let max_burst = max_burst.unwrap_or(self.default_max_burst); let count_per_period = count_per_period.unwrap_or(self.default_count_per_period); + + if count_per_period == 0 { + return Err(None); + } + + let max_burst = max_burst.unwrap_or(self.default_max_burst); let period = period.unwrap_or(self.default_period); /* @@ -85,24 +90,31 @@ impl RedisCellClient { if retry_after == -1 { Ok(()) } else { - Err(Duration::from_secs(retry_after as u64)) + Err(Some(Duration::from_secs(retry_after as u64))) } } #[inline] - pub async fn throttle(&self) -> Result<(), Duration> { + pub async fn throttle(&self) -> Result<(), Option> { self._throttle(&self.key, None, None, None, 1).await } #[inline] - pub async fn throttle_key(&self, key: &str) -> Result<(), Duration> { + pub async fn throttle_key( + &self, + key: &str, + max_burst: Option, + count_per_period: Option, + period: Option, + ) -> Result<(), Option> { let key = format!("{}:{}", self.key, key); - self._throttle(key.as_ref(), None, None, None, 1).await + self._throttle(key.as_ref(), max_burst, count_per_period, period, 1) + .await } #[inline] - pub async fn throttle_quantity(&self, quantity: u32) -> Result<(), Duration> { + pub async fn throttle_quantity(&self, quantity: u32) -> Result<(), Option> { self._throttle(&self.key, None, None, None, quantity).await } } diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index cc5dcc2d..5d80cf94 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -435,20 +435,16 @@ impl Web3ProxyApp { // TODO: how much should we allow? let public_max_burst = app_config.shared.public_rate_limit_per_minute / 3; - let public_rate_limiter = if app_config.shared.public_rate_limit_per_minute == 0 { - None - } else { - redis_client_pool.as_ref().map(|redis_client_pool| { - RedisCellClient::new( - redis_client_pool.clone(), - "web3-proxy", - "ip", - public_max_burst, - app_config.shared.public_rate_limit_per_minute, - 60, - ) - }) - }; + let frontend_rate_limiter = redis_client_pool.as_ref().map(|redis_client_pool| { + RedisCellClient::new( + redis_client_pool.clone(), + "web3-proxy", + "frontend", + public_max_burst, + app_config.shared.public_rate_limit_per_minute, + 60, + ) + }); let app = Self { balanced_rpcs, @@ -459,7 +455,7 @@ impl Web3ProxyApp { head_block_receiver, pending_tx_sender, pending_transactions, - rate_limiter: public_rate_limiter, + rate_limiter: frontend_rate_limiter, db_conn, }; diff --git a/web3_proxy/src/connection.rs b/web3_proxy/src/connection.rs index 8f319e79..fadb191a 100644 --- a/web3_proxy/src/connection.rs +++ b/web3_proxy/src/connection.rs @@ -179,7 +179,7 @@ impl Web3Connection { // TODO: this will wait forever. do we want that? let found_chain_id: Result = new_connection .wait_for_request_handle() - .await + .await? .request("eth_chainId", Option::None::<()>) .await; @@ -243,7 +243,7 @@ impl Web3Connection { let archive_result: Result = new_connection .wait_for_request_handle() - .await + .await? .request( "eth_getCode", ( @@ -503,7 +503,7 @@ impl Web3Connection { // all it does is print "new block" for the same block as current block let block: Result, _> = self .wait_for_request_handle() - .await + .await? .request("eth_getBlockByNumber", ("latest", false)) .await; @@ -583,24 +583,27 @@ impl Web3Connection { /// be careful with this; it will wait forever! #[instrument(skip_all)] - pub async fn wait_for_request_handle(self: &Arc) -> ActiveRequestHandle { + pub async fn wait_for_request_handle(self: &Arc) -> anyhow::Result { // TODO: maximum wait time? i think timeouts in other parts of the code are probably best loop { match self.try_request_handle().await { - Ok(pending_request_handle) => return pending_request_handle, - Err(retry_after) => { + Ok(pending_request_handle) => return Ok(pending_request_handle), + Err(Some(retry_after)) => { sleep(retry_after).await; } + Err(None) => return Err(anyhow::anyhow!("rate limit will never succeed")), } } } - pub async fn try_request_handle(self: &Arc) -> Result { + pub async fn try_request_handle( + self: &Arc, + ) -> Result> { // check that we are connected if !self.has_provider().await { // TODO: how long? use the same amount as the exponential backoff on retry - return Err(Duration::from_secs(1)); + return Err(Some(Duration::from_secs(1))); } // check rate limits diff --git a/web3_proxy/src/connections.rs b/web3_proxy/src/connections.rs index b0be195d..3d5c4448 100644 --- a/web3_proxy/src/connections.rs +++ b/web3_proxy/src/connections.rs @@ -954,7 +954,7 @@ impl Web3Connections { // increment our connection counter match rpc.try_request_handle().await { Err(retry_after) => { - earliest_retry_after = earliest_retry_after.min(Some(retry_after)); + earliest_retry_after = earliest_retry_after.min(retry_after); } Ok(handle) => { trace!("next server on {:?}: {:?}", self, rpc); @@ -990,7 +990,7 @@ impl Web3Connections { match connection.try_request_handle().await { Err(retry_after) => { // this rpc is not available. skip it - earliest_retry_after = earliest_retry_after.min(Some(retry_after)); + earliest_retry_after = earliest_retry_after.min(retry_after); } Ok(handle) => selected_rpcs.push(handle), } diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 730bea35..b9ef5e4f 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -13,7 +13,7 @@ pub async fn handler_404() -> impl IntoResponse { /// TODO: use this. i can't get to work /// TODO: i think we want a custom result type instead. put the anyhow result inside. then `impl IntoResponse for CustomResult` pub async fn handle_anyhow_error( - code: Option, + http_code: Option, id: Option>, err: anyhow::Error, ) -> impl IntoResponse { @@ -25,7 +25,7 @@ pub async fn handle_anyhow_error( // TODO: logs here are too verbose. emit a stat // warn!("Responding with error: {:?}", err); - let code = code.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + let code = http_code.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); (code, Json(err)) } diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 6f6ac43c..5afb881e 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -30,14 +30,18 @@ pub async fn rate_limit_by_ip(app: &Web3ProxyApp, ip: &IpAddr) -> Result<(), imp // TODO: dry this up with rate_limit_by_key if let Some(rate_limiter) = app.rate_limiter() { - if rate_limiter.throttle_key(&rate_limiter_key).await.is_err() { + if rate_limiter + .throttle_key(&rate_limiter_key, None, None, None) + .await + .is_err() + { // TODO: set headers so they know when they can retry // warn!(?ip, "public rate limit exceeded"); // this is too verbose, but a stat might be good // TODO: use their id if possible return Err(handle_anyhow_error( Some(StatusCode::TOO_MANY_REQUESTS), None, - anyhow::anyhow!("too many requests"), + anyhow::anyhow!("too many requests from this ip"), ) .await .into_response()); @@ -76,8 +80,19 @@ pub async fn rate_limit_by_key( Ok::, _>(Some(_)) => { // user key is valid if let Some(rate_limiter) = app.rate_limiter() { + // TODO: check the db for this? maybe add to the find above with a join? + let user_count_per_period = 100_000; + // TODO: how does max burst actually work? what should it be? + let user_max_burst = user_count_per_period; + let user_period = 1; + if rate_limiter - .throttle_key(&user_key.to_string()) + .throttle_key( + &user_key.to_string(), + Some(user_max_burst), + Some(user_count_per_period), + Some(user_period), + ) .await .is_err() { @@ -87,7 +102,7 @@ pub async fn rate_limit_by_key( return Err(handle_anyhow_error( Some(StatusCode::TOO_MANY_REQUESTS), None, - anyhow::anyhow!("too many requests"), + anyhow::anyhow!("too many requests from this key"), ) .await .into_response());