From 6e5b11be8f4a6c86420905822d69a6c290f1a19a Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sun, 22 May 2022 18:39:06 +0000 Subject: [PATCH] flatten cache code --- config/example.toml | 3 +- docker-compose.common.yml | 2 +- docker-compose.yml | 4 +- redis-cell-client/LICENSE | 21 ------- redis-cell-client/README.md | 2 - redis-cell-client/src/client.rs | 100 +++++++++++--------------------- redis-cell-client/src/lib.rs | 4 +- web3-proxy/src/app.rs | 14 +++-- web3-proxy/src/config.rs | 5 +- web3-proxy/src/connection.rs | 12 ++-- web3-proxy/src/connections.rs | 3 +- 11 files changed, 60 insertions(+), 110 deletions(-) delete mode 100644 redis-cell-client/LICENSE delete mode 100644 redis-cell-client/README.md diff --git a/config/example.toml b/config/example.toml index 3be09886..6c3b1766 100644 --- a/config/example.toml +++ b/config/example.toml @@ -1,6 +1,7 @@ [shared] chain_id = 1 -rate_limit_redis = "redis://redis:6379/" +# in prod, do `rate_limit_redis = "redis://redis:6379/"` +rate_limit_redis = "redis://dev-redis:6379/" [balanced_rpcs] diff --git a/docker-compose.common.yml b/docker-compose.common.yml index 5d7e9baa..e8f99b63 100644 --- a/docker-compose.common.yml +++ b/docker-compose.common.yml @@ -5,4 +5,4 @@ services: restart: unless-stopped command: --config /config.toml --workers 8 environment: - RUST_LOG: "web3_proxy=debug" + RUST_LOG: "info,web3_proxy=debug" diff --git a/docker-compose.yml b/docker-compose.yml index 2e5eb94f..e6ef53e0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,10 +3,10 @@ version: "3.4" services: - redis: + dev-redis: build: ./redis-cell-server/ - eth: + dev-eth: extends: file: docker-compose.common.yml service: base diff --git a/redis-cell-client/LICENSE b/redis-cell-client/LICENSE deleted file mode 100644 index 14faaa3b..00000000 --- a/redis-cell-client/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2020 Tin Rabzelj - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. \ No newline at end of file diff --git a/redis-cell-client/README.md b/redis-cell-client/README.md deleted file mode 100644 index 0635a2c9..00000000 --- a/redis-cell-client/README.md +++ /dev/null @@ -1,2 +0,0 @@ -Based on https://github.com/tinrab/rusty-redis-rate-limiting/blob/46d95040708df92d46cef22d319d0182e12a2c0c/src/rate_limiter.rs -Upgraded \ No newline at end of file diff --git a/redis-cell-client/src/client.rs b/redis-cell-client/src/client.rs index c1cc0add..fce57c03 100644 --- a/redis-cell-client/src/client.rs +++ b/redis-cell-client/src/client.rs @@ -5,79 +5,18 @@ use redis::aio::MultiplexedConnection; // TODO: take this as an argument to open? const KEY_PREFIX: &str = "rate-limit"; -#[derive(Clone)] pub struct RedisCellClient { conn: MultiplexedConnection, -} - -impl RedisCellClient { - pub async fn open(redis_address: &str) -> anyhow::Result { - let client = redis::Client::open(redis_address)?; - - let conn = client.get_multiplexed_tokio_connection().await?; - - Ok(Self { conn }) - } - - // CL.THROTTLE [] - /* - - - 0. Whether the action was limited: - 0 indicates the action is allowed. - 1 indicates that the action was limited/blocked. - 1. The total limit of the key (max_burst + 1). This is equivalent to the common X-RateLimit-Limit HTTP header. - 2. The remaining limit of the key. Equivalent to X-RateLimit-Remaining. - 3. The number of seconds until the user should retry, and always -1 if the action was allowed. Equivalent to Retry-After. - 4. The number of seconds until the limit will reset to its maximum capacity. Equivalent to X-RateLimit-Reset. - - */ - pub async fn throttle( - &self, - key: &str, - max_burst: u32, - count_per_period: u32, - period: u32, - quantity: u32, - ) -> Result<(), Duration> { - // TODO: should we return more error info? - // https://github.com/brandur/redis-cell#response - - let mut conn = self.conn.clone(); - - // TODO: don't unwrap. maybe return Option - let x: Vec = redis::cmd("CL.THROTTLE") - .arg(&(key, max_burst, count_per_period, period, quantity)) - .query_async(&mut conn) - .await - .unwrap(); - - assert_eq!(x.len(), 5); - - let retry_after = *x.get(3).unwrap(); - - if retry_after == -1 { - Ok(()) - } else { - Err(Duration::from_secs(retry_after as u64)) - } - } - - // TODO: what else? -} - -pub struct RedisCellKey { - client: RedisCellClient, key: String, max_burst: u32, count_per_period: u32, period: u32, } -impl RedisCellKey { +impl RedisCellClient { // todo: seems like this could be derived pub fn new( - client: &RedisCellClient, + conn: MultiplexedConnection, key: String, max_burst: u32, count_per_period: u32, @@ -86,7 +25,7 @@ impl RedisCellKey { let key = format!("{}:{}", KEY_PREFIX, key); Self { - client: client.clone(), + conn, key, max_burst, count_per_period, @@ -101,14 +40,41 @@ impl RedisCellKey { #[inline] pub async fn throttle_quantity(&self, quantity: u32) -> Result<(), Duration> { - self.client - .throttle( + /* + https://github.com/brandur/redis-cell#response + + CL.THROTTLE [] + + 0. Whether the action was limited: + 0 indicates the action is allowed. + 1 indicates that the action was limited/blocked. + 1. The total limit of the key (max_burst + 1). This is equivalent to the common X-RateLimit-Limit HTTP header. + 2. The remaining limit of the key. Equivalent to X-RateLimit-Remaining. + 3. The number of seconds until the user should retry, and always -1 if the action was allowed. Equivalent to Retry-After. + 4. The number of seconds until the limit will reset to its maximum capacity. Equivalent to X-RateLimit-Reset. + */ + // TODO: don't unwrap. maybe return anyhow::Result> + // TODO: should we return more error info? + let x: Vec = redis::cmd("CL.THROTTLE") + .arg(&( &self.key, self.max_burst, self.count_per_period, self.period, quantity, - ) + )) + .query_async(&mut self.conn.clone()) .await + .unwrap(); + + assert_eq!(x.len(), 5); + + let retry_after = *x.get(3).unwrap(); + + if retry_after == -1 { + Ok(()) + } else { + Err(Duration::from_secs(retry_after as u64)) + } } } diff --git a/redis-cell-client/src/lib.rs b/redis-cell-client/src/lib.rs index c8573d8a..be0ef92d 100644 --- a/redis-cell-client/src/lib.rs +++ b/redis-cell-client/src/lib.rs @@ -1,3 +1,5 @@ mod client; -pub use client::{RedisCellClient, RedisCellKey}; +pub use client::RedisCellClient; +pub use redis::aio::MultiplexedConnection; +pub use redis::Client; diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 5cc9a8c1..44973f69 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -9,7 +9,6 @@ use ethers::prelude::H256; use futures::future::join_all; use linkedhashmap::LinkedHashMap; use parking_lot::RwLock; -use redis_cell_client::RedisCellClient; use std::fmt; use std::sync::Arc; use std::time::Duration; @@ -74,10 +73,17 @@ impl Web3ProxyApp { let rate_limiter = match redis_address { Some(redis_address) => { - info!("Conneting to redis on {}", redis_address); - Some(RedisCellClient::open(&redis_address).await.unwrap()) + info!("Connecting to redis on {}", redis_address); + let redis_client = redis_cell_client::Client::open(redis_address)?; + + let redis_conn = redis_client.get_multiplexed_tokio_connection().await?; + + Some(redis_conn) + } + None => { + info!("No redis address"); + None } - None => None, }; // TODO: attach context to this error diff --git a/web3-proxy/src/config.rs b/web3-proxy/src/config.rs index e7405c63..a2415d30 100644 --- a/web3-proxy/src/config.rs +++ b/web3-proxy/src/config.rs @@ -1,5 +1,4 @@ use argh::FromArgs; -use redis_cell_client::RedisCellClient; use serde::Deserialize; use std::collections::HashMap; use std::sync::Arc; @@ -72,11 +71,11 @@ impl Web3ConnectionConfig { // #[instrument(name = "try_build_Web3ConnectionConfig", skip_all)] pub async fn try_build( self, - redis_ratelimiter: Option<&RedisCellClient>, + redis_conn: Option<&redis_cell_client::MultiplexedConnection>, chain_id: usize, http_client: Option<&reqwest::Client>, ) -> anyhow::Result> { - let hard_rate_limit = self.hard_limit.map(|x| (x, redis_ratelimiter.unwrap())); + let hard_rate_limit = self.hard_limit.map(|x| (x, redis_conn.unwrap())); Web3Connection::try_new( chain_id, diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index c65afffe..5cd912a2 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -2,7 +2,7 @@ use derive_more::From; use ethers::prelude::{Block, Middleware, ProviderError, TxHash, H256}; use futures::StreamExt; -use redis_cell_client::{RedisCellClient, RedisCellKey}; +use redis_cell_client::RedisCellClient; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use std::fmt; @@ -73,7 +73,7 @@ pub struct Web3Connection { /// provider is in a RwLock so that we can replace it if re-connecting provider: RwLock>, /// rate limits are stored in a central redis so that multiple proxies can share their rate limits - hard_limit: Option, + hard_limit: Option, /// used for load balancing to the least loaded server soft_limit: u32, // TODO: track total number of requests? @@ -144,15 +144,15 @@ impl Web3Connection { url_str: String, // optional because this is only used for http providers. websocket providers don't use it http_client: Option<&reqwest::Client>, - hard_limit: Option<(u32, &RedisCellClient)>, + hard_limit: Option<(u32, &redis_cell_client::MultiplexedConnection)>, // TODO: think more about this type soft_limit: u32, ) -> anyhow::Result> { - let hard_limit = hard_limit.map(|(hard_rate_limit, hard_rate_limiter)| { + let hard_limit = hard_limit.map(|(hard_rate_limit, redis_conection)| { // TODO: allow different max_burst and count_per_period and period let period = 1; - RedisCellKey::new( - hard_rate_limiter, + RedisCellClient::new( + redis_conection.clone(), format!("{},{}", chain_id, url_str), hard_rate_limit, hard_rate_limit, diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 05ac3915..89c71e89 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -6,7 +6,6 @@ use futures::future::join_all; use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::HashMap; -use redis_cell_client::RedisCellClient; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::value::RawValue; @@ -80,7 +79,7 @@ impl Web3Connections { chain_id: usize, servers: Vec, http_client: Option<&reqwest::Client>, - rate_limiter: Option<&RedisCellClient>, + rate_limiter: Option<&redis_cell_client::MultiplexedConnection>, ) -> anyhow::Result> { let num_connections = servers.len();