use redis-cell instead of governor
This commit is contained in:
parent
5e14333e61
commit
2b4a2c3cad
67
Cargo.lock
generated
67
Cargo.lock
generated
@ -1598,23 +1598,6 @@ version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
|
||||
|
||||
[[package]]
|
||||
name = "governor"
|
||||
version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "19775995ee20209163239355bc3ad2f33f83da35d9ef72dea26e5af753552c87"
|
||||
dependencies = [
|
||||
"dashmap",
|
||||
"futures",
|
||||
"futures-timer",
|
||||
"no-std-compat",
|
||||
"nonzero_ext",
|
||||
"parking_lot 0.12.0",
|
||||
"quanta",
|
||||
"rand",
|
||||
"smallvec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "group"
|
||||
version = "0.11.0"
|
||||
@ -2044,15 +2027,6 @@ dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mach"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "matchers"
|
||||
version = "0.1.0"
|
||||
@ -2151,18 +2125,6 @@ version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54"
|
||||
|
||||
[[package]]
|
||||
name = "no-std-compat"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
|
||||
|
||||
[[package]]
|
||||
name = "nonzero_ext"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
|
||||
|
||||
[[package]]
|
||||
name = "ntapi"
|
||||
version = "0.3.7"
|
||||
@ -2608,22 +2570,6 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quanta"
|
||||
version = "0.9.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "20afe714292d5e879d8b12740aa223c6a88f118af41870e8b6196e39a02238a8"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
"libc",
|
||||
"mach",
|
||||
"once_cell",
|
||||
"raw-cpuid",
|
||||
"wasi 0.10.2+wasi-snapshot-preview1",
|
||||
"web-sys",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "1.0.18"
|
||||
@ -2675,15 +2621,6 @@ dependencies = [
|
||||
"getrandom",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "raw-cpuid"
|
||||
version = "10.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "738bc47119e3eeccc7e94c4a506901aea5e7b4944ecd0829cbebf4af04ceda12"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon"
|
||||
version = "1.5.3"
|
||||
@ -2729,7 +2666,7 @@ dependencies = [
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redis-ratelimit-client"
|
||||
name = "redis-cell-client"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
@ -3987,11 +3924,11 @@ dependencies = [
|
||||
"fdlimit",
|
||||
"flume",
|
||||
"futures",
|
||||
"governor",
|
||||
"hashbrown 0.12.1",
|
||||
"linkedhashmap",
|
||||
"parking_lot 0.12.0",
|
||||
"proctitle",
|
||||
"redis-cell-client",
|
||||
"regex",
|
||||
"reqwest",
|
||||
"rustc-hash",
|
||||
|
@ -1,6 +1,6 @@
|
||||
[shared]
|
||||
chain_id = 1
|
||||
rate_limit_redis = "redis:6379"
|
||||
rate_limit_redis = "redis://redis:6379/"
|
||||
|
||||
[balanced_rpcs]
|
||||
|
||||
@ -17,6 +17,7 @@ rate_limit_redis = "redis:6379"
|
||||
[balanced_rpcs.geth]
|
||||
url = "http://10.11.12.16:8545"
|
||||
soft_limit = 200_000
|
||||
hard_limit = 1_000
|
||||
|
||||
[balanced_rpcs.geth_ws]
|
||||
url = "ws://10.11.12.16:8546"
|
||||
|
@ -1,5 +1,5 @@
|
||||
[package]
|
||||
name = "redis-ratelimit-client"
|
||||
name = "redis-cell-client"
|
||||
version = "0.2.0"
|
||||
authors = ["Bryan Stitt <bryan@stitthappens.com>"]
|
||||
edition = "2018"
|
||||
|
@ -1,9 +1,8 @@
|
||||
use std::time;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::time::Duration;
|
||||
|
||||
use redis::aio::MultiplexedConnection;
|
||||
use redis::AsyncCommands;
|
||||
|
||||
// TODO: take this as an argument to open?
|
||||
const KEY_PREFIX: &str = "rate-limit";
|
||||
|
||||
#[derive(Clone)]
|
||||
@ -20,5 +19,96 @@ impl RedisCellClient {
|
||||
Ok(Self { conn })
|
||||
}
|
||||
|
||||
// CL.THROTTLE <key> <max_burst> <count per period> <period> [<quantity>]
|
||||
/*
|
||||
|
||||
|
||||
0. Whether the action was limited:
|
||||
0 indicates the action is allowed.
|
||||
1 indicates that the action was limited/blocked.
|
||||
1. The total limit of the key (max_burst + 1). This is equivalent to the common X-RateLimit-Limit HTTP header.
|
||||
2. The remaining limit of the key. Equivalent to X-RateLimit-Remaining.
|
||||
3. The number of seconds until the user should retry, and always -1 if the action was allowed. Equivalent to Retry-After.
|
||||
4. The number of seconds until the limit will reset to its maximum capacity. Equivalent to X-RateLimit-Reset.
|
||||
|
||||
*/
|
||||
pub async fn throttle(
|
||||
&self,
|
||||
key: &str,
|
||||
max_burst: u32,
|
||||
count_per_period: u32,
|
||||
period: u32,
|
||||
quantity: u32,
|
||||
) -> Result<(), Duration> {
|
||||
// TODO: should we return more error info?
|
||||
// https://github.com/brandur/redis-cell#response
|
||||
|
||||
let mut conn = self.conn.clone();
|
||||
|
||||
// TODO: don't unwrap. maybe return Option<Duration>
|
||||
let x: Vec<isize> = redis::cmd("CL.THROTTLE")
|
||||
.arg(&(key, max_burst, count_per_period, period, quantity))
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(x.len(), 5);
|
||||
|
||||
let retry_after = *x.get(3).unwrap();
|
||||
|
||||
if retry_after == -1 {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Duration::from_secs(retry_after as u64))
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: what else?
|
||||
}
|
||||
|
||||
pub struct RedisCellKey {
|
||||
client: RedisCellClient,
|
||||
key: String,
|
||||
max_burst: u32,
|
||||
count_per_period: u32,
|
||||
period: u32,
|
||||
}
|
||||
|
||||
impl RedisCellKey {
|
||||
// todo: seems like this could be derived
|
||||
pub fn new(
|
||||
client: &RedisCellClient,
|
||||
key: String,
|
||||
max_burst: u32,
|
||||
count_per_period: u32,
|
||||
period: u32,
|
||||
) -> Self {
|
||||
let key = format!("{}:{}", KEY_PREFIX, key);
|
||||
|
||||
Self {
|
||||
client: client.clone(),
|
||||
key,
|
||||
max_burst,
|
||||
count_per_period,
|
||||
period,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn throttle(&self) -> Result<(), Duration> {
|
||||
self.throttle_quantity(1).await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn throttle_quantity(&self, quantity: u32) -> Result<(), Duration> {
|
||||
self.client
|
||||
.throttle(
|
||||
&self.key,
|
||||
self.max_burst,
|
||||
self.count_per_period,
|
||||
self.period,
|
||||
quantity,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
@ -1,3 +1,3 @@
|
||||
mod client;
|
||||
|
||||
pub use client::RedisCellClient;
|
||||
pub use client::{RedisCellClient, RedisCellKey};
|
||||
|
@ -16,10 +16,9 @@ ethers = { git = "https://github.com/SatoshiAndKin/ethers-rs", features = ["rust
|
||||
fdlimit = "0.2.1"
|
||||
flume = "0.10.12"
|
||||
futures = { version = "0.3.21", features = ["thread-pool"] }
|
||||
# TODO: governor has a "futures" and "futures-timer" feature. do we want those?
|
||||
governor = { version = "0.4.2", features = ["dashmap", "std"] }
|
||||
hashbrown = "0.12.1"
|
||||
linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] }
|
||||
redis-cell-client = { path = "../redis-cell-client" }
|
||||
# TODO: parking_lot has an "arc_lock" feature that we might want to use
|
||||
parking_lot = { version = "0.12.0", features = ["deadlock_detection"] }
|
||||
proctitle = "0.1.1"
|
||||
|
@ -8,16 +8,16 @@ use crate::jsonrpc::JsonRpcRequestEnum;
|
||||
use dashmap::DashMap;
|
||||
use ethers::prelude::{HttpClientError, ProviderError, WsClientError, H256};
|
||||
use futures::future::join_all;
|
||||
use governor::clock::{Clock, QuantaClock};
|
||||
use linkedhashmap::LinkedHashMap;
|
||||
use parking_lot::RwLock;
|
||||
use redis_cell_client::RedisCellClient;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::watch;
|
||||
use tokio::task;
|
||||
use tokio::time::sleep;
|
||||
use tracing::{debug, instrument, trace, warn};
|
||||
use tracing::{debug, info, instrument, trace, warn};
|
||||
|
||||
static APP_USER_AGENT: &str = concat!(
|
||||
"satoshiandkin/",
|
||||
@ -40,9 +40,6 @@ type ActiveRequestsMap = DashMap<CacheKey, watch::Receiver<bool>>;
|
||||
// TODO: this debug impl is way too verbose. make something smaller
|
||||
// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs
|
||||
pub struct Web3ProxyApp {
|
||||
/// clock used for rate limiting
|
||||
/// TODO: use tokio's clock? (will require a different ratelimiting crate)
|
||||
clock: QuantaClock,
|
||||
/// Send requests to the best server available
|
||||
balanced_rpcs: Arc<Web3Connections>,
|
||||
/// Send private requests (like eth_sendRawTransaction) to all these servers
|
||||
@ -62,24 +59,37 @@ impl Web3ProxyApp {
|
||||
// #[instrument(name = "try_new_Web3ProxyApp", skip_all)]
|
||||
pub async fn try_new(
|
||||
chain_id: usize,
|
||||
redis_address: Option<String>,
|
||||
balanced_rpcs: Vec<Web3ConnectionConfig>,
|
||||
private_rpcs: Vec<Web3ConnectionConfig>,
|
||||
) -> anyhow::Result<Web3ProxyApp> {
|
||||
let clock = QuantaClock::default();
|
||||
|
||||
// make a http shared client
|
||||
// TODO: how should we configure the connection pool?
|
||||
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
|
||||
let http_client = reqwest::ClientBuilder::new()
|
||||
.connect_timeout(Duration::from_secs(5))
|
||||
.timeout(Duration::from_secs(60))
|
||||
.user_agent(APP_USER_AGENT)
|
||||
.build()?;
|
||||
let http_client = Some(
|
||||
reqwest::ClientBuilder::new()
|
||||
.connect_timeout(Duration::from_secs(5))
|
||||
.timeout(Duration::from_secs(60))
|
||||
.user_agent(APP_USER_AGENT)
|
||||
.build()?,
|
||||
);
|
||||
|
||||
let rate_limiter = match redis_address {
|
||||
Some(redis_address) => {
|
||||
info!("Conneting to redis on {}", redis_address);
|
||||
Some(RedisCellClient::open(&redis_address).await.unwrap())
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
// TODO: attach context to this error
|
||||
let balanced_rpcs =
|
||||
Web3Connections::try_new(chain_id, balanced_rpcs, Some(http_client.clone()), &clock)
|
||||
.await?;
|
||||
let balanced_rpcs = Web3Connections::try_new(
|
||||
chain_id,
|
||||
balanced_rpcs,
|
||||
http_client.as_ref(),
|
||||
rate_limiter.as_ref(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// TODO: do this separately instead of during try_new
|
||||
{
|
||||
@ -94,11 +104,16 @@ impl Web3ProxyApp {
|
||||
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
|
||||
balanced_rpcs.clone()
|
||||
} else {
|
||||
Web3Connections::try_new(chain_id, private_rpcs, Some(http_client), &clock).await?
|
||||
Web3Connections::try_new(
|
||||
chain_id,
|
||||
private_rpcs,
|
||||
http_client.as_ref(),
|
||||
rate_limiter.as_ref(),
|
||||
)
|
||||
.await?
|
||||
};
|
||||
|
||||
Ok(Web3ProxyApp {
|
||||
clock,
|
||||
balanced_rpcs,
|
||||
private_rpcs,
|
||||
active_requests: Default::default(),
|
||||
@ -180,7 +195,7 @@ impl Web3ProxyApp {
|
||||
if request.method == "eth_sendRawTransaction" {
|
||||
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
|
||||
// TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit
|
||||
match self.private_rpcs.get_upstream_servers() {
|
||||
match self.private_rpcs.get_upstream_servers().await {
|
||||
Ok(active_request_handles) => {
|
||||
let (tx, rx) = flume::unbounded();
|
||||
|
||||
@ -222,15 +237,11 @@ impl Web3ProxyApp {
|
||||
// TODO: return a 502?
|
||||
return Err(anyhow::anyhow!("no private rpcs!"));
|
||||
}
|
||||
Err(Some(not_until)) => {
|
||||
Err(Some(retry_after)) => {
|
||||
// TODO: move this to a helper function
|
||||
// sleep (TODO: with a lock?) until our rate limits should be available
|
||||
// TODO: if a server catches up sync while we are waiting, we could stop waiting
|
||||
let deadline = not_until.wait_time_from(self.clock.now());
|
||||
|
||||
let deadline = deadline.min(Duration::from_millis(200));
|
||||
|
||||
sleep(deadline).await;
|
||||
sleep(retry_after).await;
|
||||
|
||||
warn!("All rate limits exceeded. Sleeping");
|
||||
}
|
||||
@ -437,18 +448,14 @@ impl Web3ProxyApp {
|
||||
|
||||
return Err(anyhow::anyhow!("no servers in sync"));
|
||||
}
|
||||
Err(Some(not_until)) => {
|
||||
Err(Some(retry_after)) => {
|
||||
// TODO: move this to a helper function
|
||||
// sleep (TODO: with a lock?) until our rate limits should be available
|
||||
// TODO: if a server catches up sync while we are waiting, we could stop waiting
|
||||
let deadline = not_until.wait_time_from(self.clock.now());
|
||||
|
||||
let deadline = deadline.min(Duration::from_millis(200));
|
||||
|
||||
sleep(deadline).await;
|
||||
|
||||
warn!("All rate limits exceeded. Sleeping");
|
||||
|
||||
sleep(retry_after).await;
|
||||
|
||||
// TODO: needing to remove manually here makes me think we should do this differently
|
||||
let _ = self.active_requests.remove(&cache_key);
|
||||
let _ = in_flight_tx.send(false);
|
||||
|
@ -1,14 +1,13 @@
|
||||
use argh::FromArgs;
|
||||
use governor::clock::QuantaClock;
|
||||
use redis_cell_client::RedisCellClient;
|
||||
use serde::Deserialize;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
// use tracing::instrument;
|
||||
|
||||
use crate::connection::Web3Connection;
|
||||
use crate::Web3ProxyApp;
|
||||
|
||||
#[derive(FromArgs)]
|
||||
#[derive(Debug, FromArgs)]
|
||||
/// Web3-proxy is a fast caching and load balancing proxy for web3 (Ethereum or similar) JsonRPC servers.
|
||||
pub struct CliConfig {
|
||||
/// what port the proxy should listen on
|
||||
@ -24,7 +23,7 @@ pub struct CliConfig {
|
||||
pub config: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct RpcConfig {
|
||||
pub shared: RpcSharedConfig,
|
||||
pub balanced_rpcs: HashMap<String, Web3ConnectionConfig>,
|
||||
@ -32,13 +31,14 @@ pub struct RpcConfig {
|
||||
}
|
||||
|
||||
/// shared configuration between Web3Connections
|
||||
#[derive(Deserialize)]
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct RpcSharedConfig {
|
||||
/// TODO: what type for chain_id? TODO: this isn't at the right level. this is inside a "Config"
|
||||
pub chain_id: usize,
|
||||
pub rate_limit_redis: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct Web3ConnectionConfig {
|
||||
url: String,
|
||||
soft_limit: u32,
|
||||
@ -57,7 +57,13 @@ impl RpcConfig {
|
||||
vec![]
|
||||
};
|
||||
|
||||
Web3ProxyApp::try_new(self.shared.chain_id, balanced_rpcs, private_rpcs).await
|
||||
Web3ProxyApp::try_new(
|
||||
self.shared.chain_id,
|
||||
self.shared.rate_limit_redis,
|
||||
balanced_rpcs,
|
||||
private_rpcs,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@ -66,16 +72,17 @@ impl Web3ConnectionConfig {
|
||||
// #[instrument(name = "try_build_Web3ConnectionConfig", skip_all)]
|
||||
pub async fn try_build(
|
||||
self,
|
||||
clock: &QuantaClock,
|
||||
redis_ratelimiter: Option<&RedisCellClient>,
|
||||
chain_id: usize,
|
||||
http_client: Option<reqwest::Client>,
|
||||
http_client: Option<&reqwest::Client>,
|
||||
) -> anyhow::Result<Arc<Web3Connection>> {
|
||||
let hard_rate_limit = self.hard_limit.map(|x| (x, redis_ratelimiter.unwrap()));
|
||||
|
||||
Web3Connection::try_new(
|
||||
chain_id,
|
||||
self.url,
|
||||
http_client,
|
||||
self.hard_limit,
|
||||
clock,
|
||||
hard_rate_limit,
|
||||
self.soft_limit,
|
||||
)
|
||||
.await
|
||||
|
@ -2,15 +2,10 @@
|
||||
use derive_more::From;
|
||||
use ethers::prelude::{Block, Middleware, ProviderError, TxHash, H256};
|
||||
use futures::StreamExt;
|
||||
use governor::clock::{Clock, QuantaClock, QuantaInstant};
|
||||
use governor::middleware::NoOpMiddleware;
|
||||
use governor::state::{InMemoryState, NotKeyed};
|
||||
use governor::NotUntil;
|
||||
use governor::RateLimiter;
|
||||
use redis_cell_client::{RedisCellClient, RedisCellKey};
|
||||
use serde::ser::{SerializeStruct, Serializer};
|
||||
use serde::Serialize;
|
||||
use std::fmt;
|
||||
use std::num::NonZeroU32;
|
||||
use std::sync::atomic::{self, AtomicU32};
|
||||
use std::{cmp::Ordering, sync::Arc};
|
||||
use tokio::sync::RwLock;
|
||||
@ -18,9 +13,6 @@ use tokio::task;
|
||||
use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
|
||||
use tracing::{info, instrument, trace, warn};
|
||||
|
||||
type Web3RateLimiter =
|
||||
RateLimiter<NotKeyed, InMemoryState, QuantaClock, NoOpMiddleware<QuantaInstant>>;
|
||||
|
||||
/// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit https://github.com/gakonst/ethers-rs/issues/592
|
||||
#[derive(From)]
|
||||
pub enum Web3Provider {
|
||||
@ -30,11 +22,16 @@ pub enum Web3Provider {
|
||||
|
||||
impl Web3Provider {
|
||||
#[instrument]
|
||||
async fn from_str(url_str: &str, http_client: Option<reqwest::Client>) -> anyhow::Result<Self> {
|
||||
async fn from_str(
|
||||
url_str: &str,
|
||||
http_client: Option<&reqwest::Client>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let provider = if url_str.starts_with("http") {
|
||||
let url: url::Url = url_str.parse()?;
|
||||
|
||||
let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?;
|
||||
let http_client = http_client
|
||||
.ok_or_else(|| anyhow::anyhow!("no http_client"))?
|
||||
.clone();
|
||||
|
||||
let provider = ethers::providers::Http::new_with_client(url, http_client);
|
||||
|
||||
@ -75,12 +72,10 @@ pub struct Web3Connection {
|
||||
active_requests: AtomicU32,
|
||||
/// provider is in a RwLock so that we can replace it if re-connecting
|
||||
provider: RwLock<Arc<Web3Provider>>,
|
||||
/// this should store rate limits in redis/memcache/etc so that restarts and multiple proxies don't block eachother
|
||||
ratelimiter: Option<Web3RateLimiter>,
|
||||
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
|
||||
hard_limit: Option<RedisCellKey>,
|
||||
/// used for load balancing to the least loaded server
|
||||
soft_limit: u32,
|
||||
/// the same clock that is used by the rate limiter
|
||||
clock: QuantaClock,
|
||||
// TODO: track total number of requests?
|
||||
}
|
||||
|
||||
@ -143,35 +138,35 @@ impl Web3Connection {
|
||||
}
|
||||
|
||||
/// Connect to a web3 rpc and subscribe to new heads
|
||||
#[instrument(name = "try_new_Web3Connection", skip(clock, http_client))]
|
||||
#[instrument(name = "try_new_Web3Connection", skip(hard_limit, http_client))]
|
||||
pub async fn try_new(
|
||||
chain_id: usize,
|
||||
url_str: String,
|
||||
// optional because this is only used for http providers. websocket providers don't use it
|
||||
http_client: Option<reqwest::Client>,
|
||||
hard_rate_limit: Option<u32>,
|
||||
clock: &QuantaClock,
|
||||
http_client: Option<&reqwest::Client>,
|
||||
hard_limit: Option<(u32, &RedisCellClient)>,
|
||||
// TODO: think more about this type
|
||||
soft_limit: u32,
|
||||
) -> anyhow::Result<Arc<Web3Connection>> {
|
||||
let hard_rate_limiter = if let Some(hard_rate_limit) = hard_rate_limit {
|
||||
let quota = governor::Quota::per_second(NonZeroU32::new(hard_rate_limit).unwrap());
|
||||
|
||||
let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock);
|
||||
|
||||
Some(rate_limiter)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let hard_limit = hard_limit.map(|(hard_rate_limit, hard_rate_limiter)| {
|
||||
// TODO: allow different max_burst and count_per_period and period
|
||||
let period = 1;
|
||||
RedisCellKey::new(
|
||||
hard_rate_limiter,
|
||||
format!("{},{}", chain_id, url_str),
|
||||
hard_rate_limit,
|
||||
hard_rate_limit,
|
||||
period,
|
||||
)
|
||||
});
|
||||
|
||||
let provider = Web3Provider::from_str(&url_str, http_client).await?;
|
||||
|
||||
let connection = Web3Connection {
|
||||
clock: clock.clone(),
|
||||
url: url_str.clone(),
|
||||
active_requests: 0.into(),
|
||||
provider: RwLock::new(Arc::new(provider)),
|
||||
ratelimiter: hard_rate_limiter,
|
||||
hard_limit,
|
||||
soft_limit,
|
||||
};
|
||||
|
||||
@ -273,7 +268,7 @@ impl Web3Connection {
|
||||
// TODO: if error or rate limit, increase interval?
|
||||
interval.tick().await;
|
||||
|
||||
match self.try_request_handle() {
|
||||
match self.try_request_handle().await {
|
||||
Ok(active_request_handle) => {
|
||||
// TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest"
|
||||
let block: Result<Block<TxHash>, _> = provider
|
||||
@ -364,12 +359,10 @@ impl Web3Connection {
|
||||
// TODO: maximum wait time
|
||||
|
||||
for _ in 0..10 {
|
||||
match self.try_request_handle() {
|
||||
match self.try_request_handle().await {
|
||||
Ok(pending_request_handle) => return pending_request_handle,
|
||||
Err(not_until) => {
|
||||
let deadline = not_until.wait_time_from(self.clock.now());
|
||||
|
||||
sleep(deadline).await;
|
||||
Err(retry_after) => {
|
||||
sleep(retry_after).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -378,23 +371,21 @@ impl Web3Connection {
|
||||
panic!("no request handle after 10 tries");
|
||||
}
|
||||
|
||||
pub fn try_request_handle(
|
||||
self: &Arc<Self>,
|
||||
) -> Result<ActiveRequestHandle, NotUntil<QuantaInstant>> {
|
||||
pub async fn try_request_handle(self: &Arc<Self>) -> Result<ActiveRequestHandle, Duration> {
|
||||
// check rate limits
|
||||
if let Some(ratelimiter) = self.ratelimiter.as_ref() {
|
||||
match ratelimiter.check() {
|
||||
if let Some(ratelimiter) = self.hard_limit.as_ref() {
|
||||
match ratelimiter.throttle().await {
|
||||
Ok(_) => {
|
||||
// rate limit succeeded
|
||||
return Ok(ActiveRequestHandle::new(self.clone()));
|
||||
}
|
||||
Err(not_until) => {
|
||||
Err(retry_after) => {
|
||||
// rate limit failed
|
||||
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it
|
||||
// save the smallest retry_after. if nothing succeeds, return an Err with retry_after in it
|
||||
// TODO: use tracing better
|
||||
warn!("Exhausted rate limit on {:?}: {}", self, not_until);
|
||||
warn!("Exhausted rate limit on {:?}: {:?}", self, retry_after);
|
||||
|
||||
return Err(not_until);
|
||||
return Err(retry_after);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -5,9 +5,8 @@ use ethers::prelude::H256;
|
||||
use futures::future::join_all;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use governor::clock::{QuantaClock, QuantaInstant};
|
||||
use governor::NotUntil;
|
||||
use hashbrown::HashMap;
|
||||
use redis_cell_client::RedisCellClient;
|
||||
use serde::ser::{SerializeStruct, Serializer};
|
||||
use serde::Serialize;
|
||||
use serde_json::value::RawValue;
|
||||
@ -15,6 +14,7 @@ use std::cmp;
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::task;
|
||||
use tracing::Instrument;
|
||||
use tracing::{info, info_span, instrument, trace, warn};
|
||||
@ -79,8 +79,8 @@ impl Web3Connections {
|
||||
pub async fn try_new(
|
||||
chain_id: usize,
|
||||
servers: Vec<Web3ConnectionConfig>,
|
||||
http_client: Option<reqwest::Client>,
|
||||
clock: &QuantaClock,
|
||||
http_client: Option<&reqwest::Client>,
|
||||
rate_limiter: Option<&RedisCellClient>,
|
||||
) -> anyhow::Result<Arc<Self>> {
|
||||
let num_connections = servers.len();
|
||||
|
||||
@ -88,7 +88,7 @@ impl Web3Connections {
|
||||
let mut connections = Vec::with_capacity(num_connections);
|
||||
for server_config in servers.into_iter() {
|
||||
match server_config
|
||||
.try_build(clock, chain_id, http_client.clone())
|
||||
.try_build(rate_limiter, chain_id, http_client)
|
||||
.await
|
||||
{
|
||||
Ok(connection) => connections.push(connection),
|
||||
@ -351,10 +351,8 @@ impl Web3Connections {
|
||||
|
||||
/// get the best available rpc server
|
||||
#[instrument(skip_all)]
|
||||
pub async fn next_upstream_server(
|
||||
&self,
|
||||
) -> Result<ActiveRequestHandle, Option<NotUntil<QuantaInstant>>> {
|
||||
let mut earliest_not_until = None;
|
||||
pub async fn next_upstream_server(&self) -> Result<ActiveRequestHandle, Option<Duration>> {
|
||||
let mut earliest_retry_after = None;
|
||||
|
||||
let mut synced_rpc_ids: Vec<usize> = self
|
||||
.synced_connections
|
||||
@ -397,9 +395,9 @@ impl Web3Connections {
|
||||
let rpc = self.inner.get(rpc_id).unwrap();
|
||||
|
||||
// increment our connection counter
|
||||
match rpc.try_request_handle() {
|
||||
Err(not_until) => {
|
||||
earliest_possible(&mut earliest_not_until, not_until);
|
||||
match rpc.try_request_handle().await {
|
||||
Err(retry_after) => {
|
||||
earliest_retry_after = earliest_retry_after.min(Some(retry_after));
|
||||
}
|
||||
Ok(handle) => {
|
||||
trace!("next server on {:?}: {:?}", self, rpc_id);
|
||||
@ -408,26 +406,24 @@ impl Web3Connections {
|
||||
}
|
||||
}
|
||||
|
||||
warn!("no servers on {:?}! {:?}", self, earliest_not_until);
|
||||
warn!("no servers on {:?}! {:?}", self, earliest_retry_after);
|
||||
|
||||
// this might be None
|
||||
Err(earliest_not_until)
|
||||
Err(earliest_retry_after)
|
||||
}
|
||||
|
||||
/// get all rpc servers that are not rate limited
|
||||
/// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions
|
||||
pub fn get_upstream_servers(
|
||||
&self,
|
||||
) -> Result<Vec<ActiveRequestHandle>, Option<NotUntil<QuantaInstant>>> {
|
||||
let mut earliest_not_until = None;
|
||||
pub async fn get_upstream_servers(&self) -> Result<Vec<ActiveRequestHandle>, Option<Duration>> {
|
||||
let mut earliest_retry_after = None;
|
||||
// TODO: with capacity?
|
||||
let mut selected_rpcs = vec![];
|
||||
|
||||
for connection in self.inner.iter() {
|
||||
// check rate limits and increment our connection counter
|
||||
match connection.try_request_handle() {
|
||||
Err(not_until) => {
|
||||
earliest_possible(&mut earliest_not_until, not_until);
|
||||
match connection.try_request_handle().await {
|
||||
Err(retry_after) => {
|
||||
earliest_retry_after = earliest_retry_after.min(Some(retry_after));
|
||||
// this rpc is not available. skip it
|
||||
}
|
||||
Ok(handle) => selected_rpcs.push(handle),
|
||||
@ -438,24 +434,7 @@ impl Web3Connections {
|
||||
return Ok(selected_rpcs);
|
||||
}
|
||||
|
||||
// return the earliest not_until (if no rpcs are synced, this will be None)
|
||||
Err(earliest_not_until)
|
||||
}
|
||||
}
|
||||
|
||||
fn earliest_possible(
|
||||
earliest_not_until_option: &mut Option<NotUntil<QuantaInstant>>,
|
||||
new_not_until: NotUntil<QuantaInstant>,
|
||||
) {
|
||||
match earliest_not_until_option.as_ref() {
|
||||
None => *earliest_not_until_option = Some(new_not_until),
|
||||
Some(earliest_not_until) => {
|
||||
let earliest_possible = earliest_not_until.earliest_possible();
|
||||
let new_earliest_possible = new_not_until.earliest_possible();
|
||||
|
||||
if earliest_possible > new_earliest_possible {
|
||||
*earliest_not_until_option = Some(new_not_until);
|
||||
}
|
||||
}
|
||||
// return the earliest retry_after (if no rpcs are synced, this will be None)
|
||||
Err(earliest_retry_after)
|
||||
}
|
||||
}
|
||||
|
@ -12,7 +12,7 @@ use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use tokio::runtime;
|
||||
use tracing::info;
|
||||
use tracing::{info, trace};
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
use crate::app::Web3ProxyApp;
|
||||
@ -36,6 +36,8 @@ fn main() -> anyhow::Result<()> {
|
||||
let rpc_config: String = fs::read_to_string(cli_config.config)?;
|
||||
let rpc_config: RpcConfig = toml::from_str(&rpc_config)?;
|
||||
|
||||
trace!("rpc_config: {:?}", rpc_config);
|
||||
|
||||
// TODO: this doesn't seem to do anything
|
||||
proctitle::set_title(format!("web3-proxy-{}", rpc_config.shared.chain_id));
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user