flatten cache code

This commit is contained in:
Bryan Stitt 2022-05-22 18:39:06 +00:00
parent 8aacd84955
commit 6e5b11be8f
11 changed files with 60 additions and 110 deletions

@ -1,6 +1,7 @@
[shared]
chain_id = 1
rate_limit_redis = "redis://redis:6379/"
# in prod, do `rate_limit_redis = "redis://redis:6379/"`
rate_limit_redis = "redis://dev-redis:6379/"
[balanced_rpcs]

@ -5,4 +5,4 @@ services:
restart: unless-stopped
command: --config /config.toml --workers 8
environment:
RUST_LOG: "web3_proxy=debug"
RUST_LOG: "info,web3_proxy=debug"

@ -3,10 +3,10 @@
version: "3.4"
services:
redis:
dev-redis:
build: ./redis-cell-server/
eth:
dev-eth:
extends:
file: docker-compose.common.yml
service: base

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2020 Tin Rabzelj
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

@ -1,2 +0,0 @@
Based on https://github.com/tinrab/rusty-redis-rate-limiting/blob/46d95040708df92d46cef22d319d0182e12a2c0c/src/rate_limiter.rs
Upgraded

@ -5,79 +5,18 @@ use redis::aio::MultiplexedConnection;
// TODO: take this as an argument to open?
const KEY_PREFIX: &str = "rate-limit";
#[derive(Clone)]
pub struct RedisCellClient {
conn: MultiplexedConnection,
}
impl RedisCellClient {
pub async fn open(redis_address: &str) -> anyhow::Result<Self> {
let client = redis::Client::open(redis_address)?;
let conn = client.get_multiplexed_tokio_connection().await?;
Ok(Self { conn })
}
// CL.THROTTLE <key> <max_burst> <count per period> <period> [<quantity>]
/*
0. Whether the action was limited:
0 indicates the action is allowed.
1 indicates that the action was limited/blocked.
1. The total limit of the key (max_burst + 1). This is equivalent to the common X-RateLimit-Limit HTTP header.
2. The remaining limit of the key. Equivalent to X-RateLimit-Remaining.
3. The number of seconds until the user should retry, and always -1 if the action was allowed. Equivalent to Retry-After.
4. The number of seconds until the limit will reset to its maximum capacity. Equivalent to X-RateLimit-Reset.
*/
pub async fn throttle(
&self,
key: &str,
max_burst: u32,
count_per_period: u32,
period: u32,
quantity: u32,
) -> Result<(), Duration> {
// TODO: should we return more error info?
// https://github.com/brandur/redis-cell#response
let mut conn = self.conn.clone();
// TODO: don't unwrap. maybe return Option<Duration>
let x: Vec<isize> = redis::cmd("CL.THROTTLE")
.arg(&(key, max_burst, count_per_period, period, quantity))
.query_async(&mut conn)
.await
.unwrap();
assert_eq!(x.len(), 5);
let retry_after = *x.get(3).unwrap();
if retry_after == -1 {
Ok(())
} else {
Err(Duration::from_secs(retry_after as u64))
}
}
// TODO: what else?
}
pub struct RedisCellKey {
client: RedisCellClient,
key: String,
max_burst: u32,
count_per_period: u32,
period: u32,
}
impl RedisCellKey {
impl RedisCellClient {
// todo: seems like this could be derived
pub fn new(
client: &RedisCellClient,
conn: MultiplexedConnection,
key: String,
max_burst: u32,
count_per_period: u32,
@ -86,7 +25,7 @@ impl RedisCellKey {
let key = format!("{}:{}", KEY_PREFIX, key);
Self {
client: client.clone(),
conn,
key,
max_burst,
count_per_period,
@ -101,14 +40,41 @@ impl RedisCellKey {
#[inline]
pub async fn throttle_quantity(&self, quantity: u32) -> Result<(), Duration> {
self.client
.throttle(
/*
https://github.com/brandur/redis-cell#response
CL.THROTTLE <key> <max_burst> <count per period> <period> [<quantity>]
0. Whether the action was limited:
0 indicates the action is allowed.
1 indicates that the action was limited/blocked.
1. The total limit of the key (max_burst + 1). This is equivalent to the common X-RateLimit-Limit HTTP header.
2. The remaining limit of the key. Equivalent to X-RateLimit-Remaining.
3. The number of seconds until the user should retry, and always -1 if the action was allowed. Equivalent to Retry-After.
4. The number of seconds until the limit will reset to its maximum capacity. Equivalent to X-RateLimit-Reset.
*/
// TODO: don't unwrap. maybe return anyhow::Result<Result<(), Duration>>
// TODO: should we return more error info?
let x: Vec<isize> = redis::cmd("CL.THROTTLE")
.arg(&(
&self.key,
self.max_burst,
self.count_per_period,
self.period,
quantity,
)
))
.query_async(&mut self.conn.clone())
.await
.unwrap();
assert_eq!(x.len(), 5);
let retry_after = *x.get(3).unwrap();
if retry_after == -1 {
Ok(())
} else {
Err(Duration::from_secs(retry_after as u64))
}
}
}

@ -1,3 +1,5 @@
mod client;
pub use client::{RedisCellClient, RedisCellKey};
pub use client::RedisCellClient;
pub use redis::aio::MultiplexedConnection;
pub use redis::Client;

@ -9,7 +9,6 @@ use ethers::prelude::H256;
use futures::future::join_all;
use linkedhashmap::LinkedHashMap;
use parking_lot::RwLock;
use redis_cell_client::RedisCellClient;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
@ -74,10 +73,17 @@ impl Web3ProxyApp {
let rate_limiter = match redis_address {
Some(redis_address) => {
info!("Conneting to redis on {}", redis_address);
Some(RedisCellClient::open(&redis_address).await.unwrap())
info!("Connecting to redis on {}", redis_address);
let redis_client = redis_cell_client::Client::open(redis_address)?;
let redis_conn = redis_client.get_multiplexed_tokio_connection().await?;
Some(redis_conn)
}
None => {
info!("No redis address");
None
}
None => None,
};
// TODO: attach context to this error

@ -1,5 +1,4 @@
use argh::FromArgs;
use redis_cell_client::RedisCellClient;
use serde::Deserialize;
use std::collections::HashMap;
use std::sync::Arc;
@ -72,11 +71,11 @@ impl Web3ConnectionConfig {
// #[instrument(name = "try_build_Web3ConnectionConfig", skip_all)]
pub async fn try_build(
self,
redis_ratelimiter: Option<&RedisCellClient>,
redis_conn: Option<&redis_cell_client::MultiplexedConnection>,
chain_id: usize,
http_client: Option<&reqwest::Client>,
) -> anyhow::Result<Arc<Web3Connection>> {
let hard_rate_limit = self.hard_limit.map(|x| (x, redis_ratelimiter.unwrap()));
let hard_rate_limit = self.hard_limit.map(|x| (x, redis_conn.unwrap()));
Web3Connection::try_new(
chain_id,

@ -2,7 +2,7 @@
use derive_more::From;
use ethers::prelude::{Block, Middleware, ProviderError, TxHash, H256};
use futures::StreamExt;
use redis_cell_client::{RedisCellClient, RedisCellKey};
use redis_cell_client::RedisCellClient;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use std::fmt;
@ -73,7 +73,7 @@ pub struct Web3Connection {
/// provider is in a RwLock so that we can replace it if re-connecting
provider: RwLock<Arc<Web3Provider>>,
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
hard_limit: Option<RedisCellKey>,
hard_limit: Option<redis_cell_client::RedisCellClient>,
/// used for load balancing to the least loaded server
soft_limit: u32,
// TODO: track total number of requests?
@ -144,15 +144,15 @@ impl Web3Connection {
url_str: String,
// optional because this is only used for http providers. websocket providers don't use it
http_client: Option<&reqwest::Client>,
hard_limit: Option<(u32, &RedisCellClient)>,
hard_limit: Option<(u32, &redis_cell_client::MultiplexedConnection)>,
// TODO: think more about this type
soft_limit: u32,
) -> anyhow::Result<Arc<Web3Connection>> {
let hard_limit = hard_limit.map(|(hard_rate_limit, hard_rate_limiter)| {
let hard_limit = hard_limit.map(|(hard_rate_limit, redis_conection)| {
// TODO: allow different max_burst and count_per_period and period
let period = 1;
RedisCellKey::new(
hard_rate_limiter,
RedisCellClient::new(
redis_conection.clone(),
format!("{},{}", chain_id, url_str),
hard_rate_limit,
hard_rate_limit,

@ -6,7 +6,6 @@ use futures::future::join_all;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hashbrown::HashMap;
use redis_cell_client::RedisCellClient;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use serde_json::value::RawValue;
@ -80,7 +79,7 @@ impl Web3Connections {
chain_id: usize,
servers: Vec<Web3ConnectionConfig>,
http_client: Option<&reqwest::Client>,
rate_limiter: Option<&RedisCellClient>,
rate_limiter: Option<&redis_cell_client::MultiplexedConnection>,
) -> anyhow::Result<Arc<Self>> {
let num_connections = servers.len();