From 2b4a2c3cadafe5202efb86b48c25545d1c997342 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 21 May 2022 23:34:05 +0000 Subject: [PATCH] use redis-cell instead of governor --- Cargo.lock | 67 +---------------------- config/example.toml | 3 +- redis-cell-client/Cargo.toml | 2 +- redis-cell-client/src/client.rs | 96 +++++++++++++++++++++++++++++++-- redis-cell-client/src/lib.rs | 2 +- web3-proxy/Cargo.toml | 3 +- web3-proxy/src/app.rs | 69 +++++++++++++----------- web3-proxy/src/config.rs | 29 ++++++---- web3-proxy/src/connection.rs | 81 +++++++++++++--------------- web3-proxy/src/connections.rs | 59 +++++++------------- web3-proxy/src/main.rs | 4 +- 11 files changed, 214 insertions(+), 201 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 90607052..596d772d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1598,23 +1598,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" -[[package]] -name = "governor" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19775995ee20209163239355bc3ad2f33f83da35d9ef72dea26e5af753552c87" -dependencies = [ - "dashmap", - "futures", - "futures-timer", - "no-std-compat", - "nonzero_ext", - "parking_lot 0.12.0", - "quanta", - "rand", - "smallvec", -] - [[package]] name = "group" version = "0.11.0" @@ -2044,15 +2027,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "mach" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa" -dependencies = [ - "libc", -] - [[package]] name = "matchers" version = "0.1.0" @@ -2151,18 +2125,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54" -[[package]] -name = "no-std-compat" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" - -[[package]] -name = "nonzero_ext" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" - [[package]] name = "ntapi" version = "0.3.7" @@ -2608,22 +2570,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "quanta" -version = "0.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20afe714292d5e879d8b12740aa223c6a88f118af41870e8b6196e39a02238a8" -dependencies = [ - "crossbeam-utils", - "libc", - "mach", - "once_cell", - "raw-cpuid", - "wasi 0.10.2+wasi-snapshot-preview1", - "web-sys", - "winapi", -] - [[package]] name = "quote" version = "1.0.18" @@ -2675,15 +2621,6 @@ dependencies = [ "getrandom", ] -[[package]] -name = "raw-cpuid" -version = "10.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "738bc47119e3eeccc7e94c4a506901aea5e7b4944ecd0829cbebf4af04ceda12" -dependencies = [ - "bitflags", -] - [[package]] name = "rayon" version = "1.5.3" @@ -2729,7 +2666,7 @@ dependencies = [ ] [[package]] -name = "redis-ratelimit-client" +name = "redis-cell-client" version = "0.2.0" dependencies = [ "anyhow", @@ -3987,11 +3924,11 @@ dependencies = [ "fdlimit", "flume", "futures", - "governor", "hashbrown 0.12.1", "linkedhashmap", "parking_lot 0.12.0", "proctitle", + "redis-cell-client", "regex", "reqwest", "rustc-hash", diff --git a/config/example.toml b/config/example.toml index 67a061a9..3be09886 100644 --- a/config/example.toml +++ b/config/example.toml @@ -1,6 +1,6 @@ [shared] chain_id = 1 -rate_limit_redis = "redis:6379" +rate_limit_redis = "redis://redis:6379/" [balanced_rpcs] @@ -17,6 +17,7 @@ rate_limit_redis = "redis:6379" [balanced_rpcs.geth] url = "http://10.11.12.16:8545" soft_limit = 200_000 + hard_limit = 1_000 [balanced_rpcs.geth_ws] url = "ws://10.11.12.16:8546" diff --git a/redis-cell-client/Cargo.toml b/redis-cell-client/Cargo.toml index 15680f34..eabfaa73 100644 --- a/redis-cell-client/Cargo.toml +++ b/redis-cell-client/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "redis-ratelimit-client" +name = "redis-cell-client" version = "0.2.0" authors = ["Bryan Stitt "] edition = "2018" diff --git a/redis-cell-client/src/client.rs b/redis-cell-client/src/client.rs index 84461c30..c1cc0add 100644 --- a/redis-cell-client/src/client.rs +++ b/redis-cell-client/src/client.rs @@ -1,9 +1,8 @@ -use std::time; -use std::time::{Duration, SystemTime}; +use std::time::Duration; use redis::aio::MultiplexedConnection; -use redis::AsyncCommands; +// TODO: take this as an argument to open? const KEY_PREFIX: &str = "rate-limit"; #[derive(Clone)] @@ -20,5 +19,96 @@ impl RedisCellClient { Ok(Self { conn }) } + // CL.THROTTLE [] + /* + + + 0. Whether the action was limited: + 0 indicates the action is allowed. + 1 indicates that the action was limited/blocked. + 1. The total limit of the key (max_burst + 1). This is equivalent to the common X-RateLimit-Limit HTTP header. + 2. The remaining limit of the key. Equivalent to X-RateLimit-Remaining. + 3. The number of seconds until the user should retry, and always -1 if the action was allowed. Equivalent to Retry-After. + 4. The number of seconds until the limit will reset to its maximum capacity. Equivalent to X-RateLimit-Reset. + + */ + pub async fn throttle( + &self, + key: &str, + max_burst: u32, + count_per_period: u32, + period: u32, + quantity: u32, + ) -> Result<(), Duration> { + // TODO: should we return more error info? + // https://github.com/brandur/redis-cell#response + + let mut conn = self.conn.clone(); + + // TODO: don't unwrap. maybe return Option + let x: Vec = redis::cmd("CL.THROTTLE") + .arg(&(key, max_burst, count_per_period, period, quantity)) + .query_async(&mut conn) + .await + .unwrap(); + + assert_eq!(x.len(), 5); + + let retry_after = *x.get(3).unwrap(); + + if retry_after == -1 { + Ok(()) + } else { + Err(Duration::from_secs(retry_after as u64)) + } + } + // TODO: what else? } + +pub struct RedisCellKey { + client: RedisCellClient, + key: String, + max_burst: u32, + count_per_period: u32, + period: u32, +} + +impl RedisCellKey { + // todo: seems like this could be derived + pub fn new( + client: &RedisCellClient, + key: String, + max_burst: u32, + count_per_period: u32, + period: u32, + ) -> Self { + let key = format!("{}:{}", KEY_PREFIX, key); + + Self { + client: client.clone(), + key, + max_burst, + count_per_period, + period, + } + } + + #[inline] + pub async fn throttle(&self) -> Result<(), Duration> { + self.throttle_quantity(1).await + } + + #[inline] + pub async fn throttle_quantity(&self, quantity: u32) -> Result<(), Duration> { + self.client + .throttle( + &self.key, + self.max_burst, + self.count_per_period, + self.period, + quantity, + ) + .await + } +} diff --git a/redis-cell-client/src/lib.rs b/redis-cell-client/src/lib.rs index bdcb980f..c8573d8a 100644 --- a/redis-cell-client/src/lib.rs +++ b/redis-cell-client/src/lib.rs @@ -1,3 +1,3 @@ mod client; -pub use client::RedisCellClient; +pub use client::{RedisCellClient, RedisCellKey}; diff --git a/web3-proxy/Cargo.toml b/web3-proxy/Cargo.toml index efd513ee..1b6bc4f7 100644 --- a/web3-proxy/Cargo.toml +++ b/web3-proxy/Cargo.toml @@ -16,10 +16,9 @@ ethers = { git = "https://github.com/SatoshiAndKin/ethers-rs", features = ["rust fdlimit = "0.2.1" flume = "0.10.12" futures = { version = "0.3.21", features = ["thread-pool"] } -# TODO: governor has a "futures" and "futures-timer" feature. do we want those? -governor = { version = "0.4.2", features = ["dashmap", "std"] } hashbrown = "0.12.1" linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] } +redis-cell-client = { path = "../redis-cell-client" } # TODO: parking_lot has an "arc_lock" feature that we might want to use parking_lot = { version = "0.12.0", features = ["deadlock_detection"] } proctitle = "0.1.1" diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 9d497f66..0e7a4176 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -8,16 +8,16 @@ use crate::jsonrpc::JsonRpcRequestEnum; use dashmap::DashMap; use ethers::prelude::{HttpClientError, ProviderError, WsClientError, H256}; use futures::future::join_all; -use governor::clock::{Clock, QuantaClock}; use linkedhashmap::LinkedHashMap; use parking_lot::RwLock; +use redis_cell_client::RedisCellClient; use std::fmt; use std::sync::Arc; use std::time::Duration; use tokio::sync::watch; use tokio::task; use tokio::time::sleep; -use tracing::{debug, instrument, trace, warn}; +use tracing::{debug, info, instrument, trace, warn}; static APP_USER_AGENT: &str = concat!( "satoshiandkin/", @@ -40,9 +40,6 @@ type ActiveRequestsMap = DashMap>; // TODO: this debug impl is way too verbose. make something smaller // TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs pub struct Web3ProxyApp { - /// clock used for rate limiting - /// TODO: use tokio's clock? (will require a different ratelimiting crate) - clock: QuantaClock, /// Send requests to the best server available balanced_rpcs: Arc, /// Send private requests (like eth_sendRawTransaction) to all these servers @@ -62,24 +59,37 @@ impl Web3ProxyApp { // #[instrument(name = "try_new_Web3ProxyApp", skip_all)] pub async fn try_new( chain_id: usize, + redis_address: Option, balanced_rpcs: Vec, private_rpcs: Vec, ) -> anyhow::Result { - let clock = QuantaClock::default(); - // make a http shared client // TODO: how should we configure the connection pool? // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server - let http_client = reqwest::ClientBuilder::new() - .connect_timeout(Duration::from_secs(5)) - .timeout(Duration::from_secs(60)) - .user_agent(APP_USER_AGENT) - .build()?; + let http_client = Some( + reqwest::ClientBuilder::new() + .connect_timeout(Duration::from_secs(5)) + .timeout(Duration::from_secs(60)) + .user_agent(APP_USER_AGENT) + .build()?, + ); + + let rate_limiter = match redis_address { + Some(redis_address) => { + info!("Conneting to redis on {}", redis_address); + Some(RedisCellClient::open(&redis_address).await.unwrap()) + } + None => None, + }; // TODO: attach context to this error - let balanced_rpcs = - Web3Connections::try_new(chain_id, balanced_rpcs, Some(http_client.clone()), &clock) - .await?; + let balanced_rpcs = Web3Connections::try_new( + chain_id, + balanced_rpcs, + http_client.as_ref(), + rate_limiter.as_ref(), + ) + .await?; // TODO: do this separately instead of during try_new { @@ -94,11 +104,16 @@ impl Web3ProxyApp { warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); balanced_rpcs.clone() } else { - Web3Connections::try_new(chain_id, private_rpcs, Some(http_client), &clock).await? + Web3Connections::try_new( + chain_id, + private_rpcs, + http_client.as_ref(), + rate_limiter.as_ref(), + ) + .await? }; Ok(Web3ProxyApp { - clock, balanced_rpcs, private_rpcs, active_requests: Default::default(), @@ -180,7 +195,7 @@ impl Web3ProxyApp { if request.method == "eth_sendRawTransaction" { // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit - match self.private_rpcs.get_upstream_servers() { + match self.private_rpcs.get_upstream_servers().await { Ok(active_request_handles) => { let (tx, rx) = flume::unbounded(); @@ -222,15 +237,11 @@ impl Web3ProxyApp { // TODO: return a 502? return Err(anyhow::anyhow!("no private rpcs!")); } - Err(Some(not_until)) => { + Err(Some(retry_after)) => { // TODO: move this to a helper function // sleep (TODO: with a lock?) until our rate limits should be available // TODO: if a server catches up sync while we are waiting, we could stop waiting - let deadline = not_until.wait_time_from(self.clock.now()); - - let deadline = deadline.min(Duration::from_millis(200)); - - sleep(deadline).await; + sleep(retry_after).await; warn!("All rate limits exceeded. Sleeping"); } @@ -437,18 +448,14 @@ impl Web3ProxyApp { return Err(anyhow::anyhow!("no servers in sync")); } - Err(Some(not_until)) => { + Err(Some(retry_after)) => { // TODO: move this to a helper function // sleep (TODO: with a lock?) until our rate limits should be available // TODO: if a server catches up sync while we are waiting, we could stop waiting - let deadline = not_until.wait_time_from(self.clock.now()); - - let deadline = deadline.min(Duration::from_millis(200)); - - sleep(deadline).await; - warn!("All rate limits exceeded. Sleeping"); + sleep(retry_after).await; + // TODO: needing to remove manually here makes me think we should do this differently let _ = self.active_requests.remove(&cache_key); let _ = in_flight_tx.send(false); diff --git a/web3-proxy/src/config.rs b/web3-proxy/src/config.rs index 05248094..e7405c63 100644 --- a/web3-proxy/src/config.rs +++ b/web3-proxy/src/config.rs @@ -1,14 +1,13 @@ use argh::FromArgs; -use governor::clock::QuantaClock; +use redis_cell_client::RedisCellClient; use serde::Deserialize; use std::collections::HashMap; use std::sync::Arc; -// use tracing::instrument; use crate::connection::Web3Connection; use crate::Web3ProxyApp; -#[derive(FromArgs)] +#[derive(Debug, FromArgs)] /// Web3-proxy is a fast caching and load balancing proxy for web3 (Ethereum or similar) JsonRPC servers. pub struct CliConfig { /// what port the proxy should listen on @@ -24,7 +23,7 @@ pub struct CliConfig { pub config: String, } -#[derive(Deserialize)] +#[derive(Debug, Deserialize)] pub struct RpcConfig { pub shared: RpcSharedConfig, pub balanced_rpcs: HashMap, @@ -32,13 +31,14 @@ pub struct RpcConfig { } /// shared configuration between Web3Connections -#[derive(Deserialize)] +#[derive(Debug, Deserialize)] pub struct RpcSharedConfig { /// TODO: what type for chain_id? TODO: this isn't at the right level. this is inside a "Config" pub chain_id: usize, + pub rate_limit_redis: Option, } -#[derive(Deserialize)] +#[derive(Debug, Deserialize)] pub struct Web3ConnectionConfig { url: String, soft_limit: u32, @@ -57,7 +57,13 @@ impl RpcConfig { vec![] }; - Web3ProxyApp::try_new(self.shared.chain_id, balanced_rpcs, private_rpcs).await + Web3ProxyApp::try_new( + self.shared.chain_id, + self.shared.rate_limit_redis, + balanced_rpcs, + private_rpcs, + ) + .await } } @@ -66,16 +72,17 @@ impl Web3ConnectionConfig { // #[instrument(name = "try_build_Web3ConnectionConfig", skip_all)] pub async fn try_build( self, - clock: &QuantaClock, + redis_ratelimiter: Option<&RedisCellClient>, chain_id: usize, - http_client: Option, + http_client: Option<&reqwest::Client>, ) -> anyhow::Result> { + let hard_rate_limit = self.hard_limit.map(|x| (x, redis_ratelimiter.unwrap())); + Web3Connection::try_new( chain_id, self.url, http_client, - self.hard_limit, - clock, + hard_rate_limit, self.soft_limit, ) .await diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 9304c0f9..c65afffe 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -2,15 +2,10 @@ use derive_more::From; use ethers::prelude::{Block, Middleware, ProviderError, TxHash, H256}; use futures::StreamExt; -use governor::clock::{Clock, QuantaClock, QuantaInstant}; -use governor::middleware::NoOpMiddleware; -use governor::state::{InMemoryState, NotKeyed}; -use governor::NotUntil; -use governor::RateLimiter; +use redis_cell_client::{RedisCellClient, RedisCellKey}; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use std::fmt; -use std::num::NonZeroU32; use std::sync::atomic::{self, AtomicU32}; use std::{cmp::Ordering, sync::Arc}; use tokio::sync::RwLock; @@ -18,9 +13,6 @@ use tokio::task; use tokio::time::{interval, sleep, Duration, MissedTickBehavior}; use tracing::{info, instrument, trace, warn}; -type Web3RateLimiter = - RateLimiter>; - /// TODO: instead of an enum, I tried to use Box, but hit https://github.com/gakonst/ethers-rs/issues/592 #[derive(From)] pub enum Web3Provider { @@ -30,11 +22,16 @@ pub enum Web3Provider { impl Web3Provider { #[instrument] - async fn from_str(url_str: &str, http_client: Option) -> anyhow::Result { + async fn from_str( + url_str: &str, + http_client: Option<&reqwest::Client>, + ) -> anyhow::Result { let provider = if url_str.starts_with("http") { let url: url::Url = url_str.parse()?; - let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?; + let http_client = http_client + .ok_or_else(|| anyhow::anyhow!("no http_client"))? + .clone(); let provider = ethers::providers::Http::new_with_client(url, http_client); @@ -75,12 +72,10 @@ pub struct Web3Connection { active_requests: AtomicU32, /// provider is in a RwLock so that we can replace it if re-connecting provider: RwLock>, - /// this should store rate limits in redis/memcache/etc so that restarts and multiple proxies don't block eachother - ratelimiter: Option, + /// rate limits are stored in a central redis so that multiple proxies can share their rate limits + hard_limit: Option, /// used for load balancing to the least loaded server soft_limit: u32, - /// the same clock that is used by the rate limiter - clock: QuantaClock, // TODO: track total number of requests? } @@ -143,35 +138,35 @@ impl Web3Connection { } /// Connect to a web3 rpc and subscribe to new heads - #[instrument(name = "try_new_Web3Connection", skip(clock, http_client))] + #[instrument(name = "try_new_Web3Connection", skip(hard_limit, http_client))] pub async fn try_new( chain_id: usize, url_str: String, // optional because this is only used for http providers. websocket providers don't use it - http_client: Option, - hard_rate_limit: Option, - clock: &QuantaClock, + http_client: Option<&reqwest::Client>, + hard_limit: Option<(u32, &RedisCellClient)>, // TODO: think more about this type soft_limit: u32, ) -> anyhow::Result> { - let hard_rate_limiter = if let Some(hard_rate_limit) = hard_rate_limit { - let quota = governor::Quota::per_second(NonZeroU32::new(hard_rate_limit).unwrap()); - - let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock); - - Some(rate_limiter) - } else { - None - }; + let hard_limit = hard_limit.map(|(hard_rate_limit, hard_rate_limiter)| { + // TODO: allow different max_burst and count_per_period and period + let period = 1; + RedisCellKey::new( + hard_rate_limiter, + format!("{},{}", chain_id, url_str), + hard_rate_limit, + hard_rate_limit, + period, + ) + }); let provider = Web3Provider::from_str(&url_str, http_client).await?; let connection = Web3Connection { - clock: clock.clone(), url: url_str.clone(), active_requests: 0.into(), provider: RwLock::new(Arc::new(provider)), - ratelimiter: hard_rate_limiter, + hard_limit, soft_limit, }; @@ -273,7 +268,7 @@ impl Web3Connection { // TODO: if error or rate limit, increase interval? interval.tick().await; - match self.try_request_handle() { + match self.try_request_handle().await { Ok(active_request_handle) => { // TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest" let block: Result, _> = provider @@ -364,12 +359,10 @@ impl Web3Connection { // TODO: maximum wait time for _ in 0..10 { - match self.try_request_handle() { + match self.try_request_handle().await { Ok(pending_request_handle) => return pending_request_handle, - Err(not_until) => { - let deadline = not_until.wait_time_from(self.clock.now()); - - sleep(deadline).await; + Err(retry_after) => { + sleep(retry_after).await; } } } @@ -378,23 +371,21 @@ impl Web3Connection { panic!("no request handle after 10 tries"); } - pub fn try_request_handle( - self: &Arc, - ) -> Result> { + pub async fn try_request_handle(self: &Arc) -> Result { // check rate limits - if let Some(ratelimiter) = self.ratelimiter.as_ref() { - match ratelimiter.check() { + if let Some(ratelimiter) = self.hard_limit.as_ref() { + match ratelimiter.throttle().await { Ok(_) => { // rate limit succeeded return Ok(ActiveRequestHandle::new(self.clone())); } - Err(not_until) => { + Err(retry_after) => { // rate limit failed - // save the smallest not_until. if nothing succeeds, return an Err with not_until in it + // save the smallest retry_after. if nothing succeeds, return an Err with retry_after in it // TODO: use tracing better - warn!("Exhausted rate limit on {:?}: {}", self, not_until); + warn!("Exhausted rate limit on {:?}: {:?}", self, retry_after); - return Err(not_until); + return Err(retry_after); } } }; diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 7671f9a6..8a6a1048 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -5,9 +5,8 @@ use ethers::prelude::H256; use futures::future::join_all; use futures::stream::FuturesUnordered; use futures::StreamExt; -use governor::clock::{QuantaClock, QuantaInstant}; -use governor::NotUntil; use hashbrown::HashMap; +use redis_cell_client::RedisCellClient; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::value::RawValue; @@ -15,6 +14,7 @@ use std::cmp; use std::collections::{BTreeMap, BTreeSet}; use std::fmt; use std::sync::Arc; +use std::time::Duration; use tokio::task; use tracing::Instrument; use tracing::{info, info_span, instrument, trace, warn}; @@ -79,8 +79,8 @@ impl Web3Connections { pub async fn try_new( chain_id: usize, servers: Vec, - http_client: Option, - clock: &QuantaClock, + http_client: Option<&reqwest::Client>, + rate_limiter: Option<&RedisCellClient>, ) -> anyhow::Result> { let num_connections = servers.len(); @@ -88,7 +88,7 @@ impl Web3Connections { let mut connections = Vec::with_capacity(num_connections); for server_config in servers.into_iter() { match server_config - .try_build(clock, chain_id, http_client.clone()) + .try_build(rate_limiter, chain_id, http_client) .await { Ok(connection) => connections.push(connection), @@ -351,10 +351,8 @@ impl Web3Connections { /// get the best available rpc server #[instrument(skip_all)] - pub async fn next_upstream_server( - &self, - ) -> Result>> { - let mut earliest_not_until = None; + pub async fn next_upstream_server(&self) -> Result> { + let mut earliest_retry_after = None; let mut synced_rpc_ids: Vec = self .synced_connections @@ -397,9 +395,9 @@ impl Web3Connections { let rpc = self.inner.get(rpc_id).unwrap(); // increment our connection counter - match rpc.try_request_handle() { - Err(not_until) => { - earliest_possible(&mut earliest_not_until, not_until); + match rpc.try_request_handle().await { + Err(retry_after) => { + earliest_retry_after = earliest_retry_after.min(Some(retry_after)); } Ok(handle) => { trace!("next server on {:?}: {:?}", self, rpc_id); @@ -408,26 +406,24 @@ impl Web3Connections { } } - warn!("no servers on {:?}! {:?}", self, earliest_not_until); + warn!("no servers on {:?}! {:?}", self, earliest_retry_after); // this might be None - Err(earliest_not_until) + Err(earliest_retry_after) } /// get all rpc servers that are not rate limited /// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions - pub fn get_upstream_servers( - &self, - ) -> Result, Option>> { - let mut earliest_not_until = None; + pub async fn get_upstream_servers(&self) -> Result, Option> { + let mut earliest_retry_after = None; // TODO: with capacity? let mut selected_rpcs = vec![]; for connection in self.inner.iter() { // check rate limits and increment our connection counter - match connection.try_request_handle() { - Err(not_until) => { - earliest_possible(&mut earliest_not_until, not_until); + match connection.try_request_handle().await { + Err(retry_after) => { + earliest_retry_after = earliest_retry_after.min(Some(retry_after)); // this rpc is not available. skip it } Ok(handle) => selected_rpcs.push(handle), @@ -438,24 +434,7 @@ impl Web3Connections { return Ok(selected_rpcs); } - // return the earliest not_until (if no rpcs are synced, this will be None) - Err(earliest_not_until) - } -} - -fn earliest_possible( - earliest_not_until_option: &mut Option>, - new_not_until: NotUntil, -) { - match earliest_not_until_option.as_ref() { - None => *earliest_not_until_option = Some(new_not_until), - Some(earliest_not_until) => { - let earliest_possible = earliest_not_until.earliest_possible(); - let new_earliest_possible = new_not_until.earliest_possible(); - - if earliest_possible > new_earliest_possible { - *earliest_not_until_option = Some(new_not_until); - } - } + // return the earliest retry_after (if no rpcs are synced, this will be None) + Err(earliest_retry_after) } } diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index e5ebfbae..4f874088 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -12,7 +12,7 @@ use std::sync::Arc; use std::thread; use std::time::Duration; use tokio::runtime; -use tracing::info; +use tracing::{info, trace}; use tracing_subscriber::EnvFilter; use crate::app::Web3ProxyApp; @@ -36,6 +36,8 @@ fn main() -> anyhow::Result<()> { let rpc_config: String = fs::read_to_string(cli_config.config)?; let rpc_config: RpcConfig = toml::from_str(&rpc_config)?; + trace!("rpc_config: {:?}", rpc_config); + // TODO: this doesn't seem to do anything proctitle::set_title(format!("web3-proxy-{}", rpc_config.shared.chain_id));