From d3859b463ee83125adcb01f7e6d42d8e1b5d1359 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 6 May 2022 04:29:25 +0000 Subject: [PATCH] use a request handle for ratelimit handling --- web3-proxy/src/config.rs | 4 +- web3-proxy/src/connection.rs | 92 +++++++++++++++++++++++++++-------- web3-proxy/src/connections.rs | 46 +++++++++--------- web3-proxy/src/main.rs | 6 +-- 4 files changed, 98 insertions(+), 50 deletions(-) diff --git a/web3-proxy/src/config.rs b/web3-proxy/src/config.rs index a3c5bc50..3a725fc3 100644 --- a/web3-proxy/src/config.rs +++ b/web3-proxy/src/config.rs @@ -16,7 +16,7 @@ pub struct CliConfig { /// what port the proxy should listen on // TODO: use flags for the config path "./data/config/example.toml" - #[argh(option, default = "\"./data/config/example.toml\".to_string()")] + #[argh(option, default = "\"./config/example.toml\".to_string()")] pub rpc_config_path: String, } @@ -62,7 +62,7 @@ impl Web3ConnectionConfig { self.url, http_client, self.hard_limit, - Some(clock), + clock, self.soft_limit, ) .await diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 4c6877df..f72a09ec 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -2,7 +2,7 @@ use derive_more::From; use ethers::prelude::Middleware; use futures::StreamExt; -use governor::clock::{QuantaClock, QuantaInstant}; +use governor::clock::{Clock, QuantaClock, QuantaInstant}; use governor::middleware::NoOpMiddleware; use governor::state::{InMemoryState, NotKeyed}; use governor::NotUntil; @@ -14,7 +14,7 @@ use std::num::NonZeroU32; use std::sync::atomic::{self, AtomicU32, AtomicU64}; use std::time::Duration; use std::{cmp::Ordering, sync::Arc}; -use tokio::time::interval; +use tokio::time::{interval, sleep, MissedTickBehavior}; use tracing::{info, warn}; use crate::connections::Web3Connections; @@ -47,6 +47,7 @@ pub struct Web3Connection { /// used for load balancing to the least loaded server soft_limit: u32, head_block_number: AtomicU64, + clock: QuantaClock, } impl fmt::Debug for Web3Connection { @@ -69,14 +70,14 @@ impl Web3Connection { url_str: String, http_client: Option, hard_rate_limit: Option, - clock: Option<&QuantaClock>, + clock: &QuantaClock, // TODO: think more about this type soft_limit: u32, ) -> anyhow::Result { let hard_rate_limiter = if let Some(hard_rate_limit) = hard_rate_limit { let quota = governor::Quota::per_second(NonZeroU32::new(hard_rate_limit).unwrap()); - let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock.unwrap()); + let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock); Some(rate_limiter) } else { @@ -109,6 +110,7 @@ impl Web3Connection { }; Ok(Web3Connection { + clock: clock.clone(), url: url_str.clone(), active_requests: Default::default(), provider, @@ -140,15 +142,20 @@ impl Web3Connection { // TODO: what should this interval be? probably some fraction of block time // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now let mut interval = interval(Duration::from_secs(2)); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); loop { // wait for the interval // TODO: if error or rate limit, increase interval? interval.tick().await; - // TODO: rate limit! + // rate limits + let active_request_handle = self.wait_for_request_handle().await; + let block_number = provider.get_block_number().await.map(|x| x.as_u64())?; + drop(active_request_handle); + // TODO: only store if this isn't already stored? // TODO: also send something to the provider_tier so it can sort? let old_block_number = self @@ -165,17 +172,25 @@ impl Web3Connection { } } Web3Provider::Ws(provider) => { + // rate limits + let active_request_handle = self.wait_for_request_handle().await; + // TODO: automatically reconnect? // TODO: it would be faster to get the block number, but subscriptions don't provide that // TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out? let mut stream = provider.subscribe_blocks().await?; + drop(active_request_handle); + let active_request_handle = self.wait_for_request_handle().await; + // query the block once since the subscription doesn't send the current block // there is a very small race condition here where the stream could send us a new block right now // all it does is print "new block" for the same block as current block // TODO: rate limit! let block_number = provider.get_block_number().await.map(|x| x.as_u64())?; + drop(active_request_handle); + info!("current block on {}: {}", self, block_number); self.head_block_number @@ -208,24 +223,31 @@ impl Web3Connection { Ok(()) } - /// Send a web3 request - pub async fn request( - &self, - method: &str, - params: &serde_json::value::RawValue, - ) -> Result { - match &self.provider { - Web3Provider::Http(provider) => provider.request(method, params).await, - Web3Provider::Ws(provider) => provider.request(method, params).await, + pub async fn wait_for_request_handle(self: &Arc) -> ActiveRequestHandle { + // rate limits + loop { + match self.try_request_handle() { + Ok(pending_request_handle) => return pending_request_handle, + Err(not_until) => { + let deadline = not_until.wait_time_from(self.clock.now()); + + sleep(deadline).await; + } + } } + + // TODO: return a thing that when we drop it decrements? } - pub fn try_inc_active_requests(&self) -> Result<(), NotUntil> { + pub fn try_request_handle( + self: &Arc, + ) -> Result> { // check rate limits if let Some(ratelimiter) = self.ratelimiter.as_ref() { match ratelimiter.check() { Ok(_) => { // rate limit succeeded + return Ok(ActiveRequestHandle(self.clone())); } Err(not_until) => { // rate limit failed @@ -238,15 +260,43 @@ impl Web3Connection { } }; - // TODO: what ordering?! - self.active_requests.fetch_add(1, atomic::Ordering::AcqRel); + Ok(ActiveRequestHandle::new(self.clone())) + } +} - Ok(()) +/// Drop this once a connection to completes +pub struct ActiveRequestHandle(Arc); + +impl ActiveRequestHandle { + fn new(connection: Arc) -> Self { + // TODO: what ordering?! + connection + .active_requests + .fetch_add(1, atomic::Ordering::AcqRel); + + Self(connection) } - pub fn dec_active_requests(&self) { - // TODO: what ordering?! - self.active_requests.fetch_sub(1, atomic::Ordering::AcqRel); + /// Send a web3 request + /// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented + /// By taking self here, we ensure that this is dropped after the request is complete + pub async fn request( + self, + method: &str, + params: &serde_json::value::RawValue, + ) -> Result { + match &self.0.provider { + Web3Provider::Http(provider) => provider.request(method, params).await, + Web3Provider::Ws(provider) => provider.request(method, params).await, + } + } +} + +impl Drop for ActiveRequestHandle { + fn drop(&mut self) { + self.0 + .active_requests + .fetch_sub(1, atomic::Ordering::AcqRel); } } diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 0f3ff62d..1fd3ef2b 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -13,7 +13,7 @@ use std::sync::Arc; use tracing::warn; use crate::config::Web3ConnectionConfig; -use crate::connection::{JsonRpcForwardedResponse, Web3Connection}; +use crate::connection::{ActiveRequestHandle, JsonRpcForwardedResponse, Web3Connection}; #[derive(Clone, Default)] struct SyncedConnections { @@ -104,17 +104,15 @@ impl Web3Connections { self.synced_connections.load().head_block_number } - pub async fn try_send_request( + pub async fn try_send_request<'a>( &self, - connection: &Web3Connection, + connection_handle: ActiveRequestHandle, method: &str, params: &RawValue, ) -> anyhow::Result { // connection.in_active_requests was called when this rpc was selected - let response = connection.request(method, params).await; - - connection.dec_active_requests(); + let response = connection_handle.request(method, params).await; // TODO: if "no block with that header" or some other jsonrpc errors, skip this response @@ -123,7 +121,7 @@ impl Web3Connections { pub async fn try_send_requests( self: Arc, - connections: Vec>, + connections: Vec, method: String, params: Box, response_sender: flume::Sender>, @@ -140,7 +138,7 @@ impl Web3Connections { let handle = tokio::spawn(async move { // get the client for this rpc server let response = connections - .try_send_request(connection.as_ref(), &method, ¶ms) + .try_send_request(connection, &method, ¶ms) .await?; // send the first good response to a one shot channel. that way we respond quickly @@ -234,7 +232,7 @@ impl Web3Connections { /// get the best available rpc server pub async fn next_upstream_server( &self, - ) -> Result, Option>> { + ) -> Result>> { let mut earliest_not_until = None; // TODO: this clone is probably not the best way to do this @@ -255,6 +253,9 @@ impl Web3Connections { let a = cache.get(a).unwrap(); let b = cache.get(b).unwrap(); + // TODO: don't just sort by active requests. sort by active requests as a percentage of soft limit + // TODO: if those are equal, sort on soft limit + a.cmp(b) }); @@ -262,14 +263,12 @@ impl Web3Connections { let selected_rpc = self.inner.get(selected_rpc).unwrap(); // increment our connection counter - if let Err(not_until) = selected_rpc.try_inc_active_requests() { - earliest_possible(&mut earliest_not_until, not_until); - - continue; + match selected_rpc.try_request_handle() { + Err(not_until) => { + earliest_possible(&mut earliest_not_until, not_until); + } + Ok(handle) => return Ok(handle), } - - // return the selected RPC - return Ok(selected_rpc.clone()); } // this might be None @@ -280,21 +279,20 @@ impl Web3Connections { /// even fetches if they aren't in sync. This is useful for broadcasting signed transactions pub fn get_upstream_servers( &self, - ) -> Result>, Option>> { + ) -> Result, Option>> { let mut earliest_not_until = None; // TODO: with capacity? let mut selected_rpcs = vec![]; for connection in self.inner.iter() { // check rate limits and increment our connection counter - if let Err(not_until) = connection.try_inc_active_requests() { - earliest_possible(&mut earliest_not_until, not_until); - - // this rpc is not available. skip it - continue; + match connection.try_request_handle() { + Err(not_until) => { + earliest_possible(&mut earliest_not_until, not_until); + // this rpc is not available. skip it + } + Ok(handle) => selected_rpcs.push(handle), } - - selected_rpcs.push(connection.clone()); } if !selected_rpcs.is_empty() { diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index af85c291..eff525d1 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -67,7 +67,7 @@ impl Web3ProxyApp { .user_agent(APP_USER_AGENT) .build()?; - // TODO: attach context to this error? + // TODO: attach context to this error let balanced_rpc_tiers = future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| { Web3Connections::try_new(balanced_rpc_tier, Some(http_client.clone()), &clock) @@ -76,7 +76,7 @@ impl Web3ProxyApp { .into_iter() .collect::>>>()?; - // TODO: attach context to this error? + // TODO: attach context to this error let private_rpcs = if private_rpcs.is_empty() { warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); // TODO: instead of None, set it to a list of all the rpcs from balanced_rpc_tiers. that way we broadcast very loudly @@ -170,7 +170,7 @@ impl Web3ProxyApp { Ok(upstream_server) => { let response = balanced_rpcs .try_send_request( - &upstream_server, + upstream_server, &json_body.method, &json_body.params, )