use a request handle for ratelimit handling

This commit is contained in:
Bryan Stitt 2022-05-06 04:29:25 +00:00
parent 79ae74e151
commit d3859b463e
4 changed files with 98 additions and 50 deletions

@ -16,7 +16,7 @@ pub struct CliConfig {
/// what port the proxy should listen on
// TODO: use flags for the config path "./data/config/example.toml"
#[argh(option, default = "\"./data/config/example.toml\".to_string()")]
#[argh(option, default = "\"./config/example.toml\".to_string()")]
pub rpc_config_path: String,
}
@ -62,7 +62,7 @@ impl Web3ConnectionConfig {
self.url,
http_client,
self.hard_limit,
Some(clock),
clock,
self.soft_limit,
)
.await

@ -2,7 +2,7 @@
use derive_more::From;
use ethers::prelude::Middleware;
use futures::StreamExt;
use governor::clock::{QuantaClock, QuantaInstant};
use governor::clock::{Clock, QuantaClock, QuantaInstant};
use governor::middleware::NoOpMiddleware;
use governor::state::{InMemoryState, NotKeyed};
use governor::NotUntil;
@ -14,7 +14,7 @@ use std::num::NonZeroU32;
use std::sync::atomic::{self, AtomicU32, AtomicU64};
use std::time::Duration;
use std::{cmp::Ordering, sync::Arc};
use tokio::time::interval;
use tokio::time::{interval, sleep, MissedTickBehavior};
use tracing::{info, warn};
use crate::connections::Web3Connections;
@ -47,6 +47,7 @@ pub struct Web3Connection {
/// used for load balancing to the least loaded server
soft_limit: u32,
head_block_number: AtomicU64,
clock: QuantaClock,
}
impl fmt::Debug for Web3Connection {
@ -69,14 +70,14 @@ impl Web3Connection {
url_str: String,
http_client: Option<reqwest::Client>,
hard_rate_limit: Option<u32>,
clock: Option<&QuantaClock>,
clock: &QuantaClock,
// TODO: think more about this type
soft_limit: u32,
) -> anyhow::Result<Web3Connection> {
let hard_rate_limiter = if let Some(hard_rate_limit) = hard_rate_limit {
let quota = governor::Quota::per_second(NonZeroU32::new(hard_rate_limit).unwrap());
let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock.unwrap());
let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock);
Some(rate_limiter)
} else {
@ -109,6 +110,7 @@ impl Web3Connection {
};
Ok(Web3Connection {
clock: clock.clone(),
url: url_str.clone(),
active_requests: Default::default(),
provider,
@ -140,15 +142,20 @@ impl Web3Connection {
// TODO: what should this interval be? probably some fraction of block time
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
let mut interval = interval(Duration::from_secs(2));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
// wait for the interval
// TODO: if error or rate limit, increase interval?
interval.tick().await;
// TODO: rate limit!
// rate limits
let active_request_handle = self.wait_for_request_handle().await;
let block_number = provider.get_block_number().await.map(|x| x.as_u64())?;
drop(active_request_handle);
// TODO: only store if this isn't already stored?
// TODO: also send something to the provider_tier so it can sort?
let old_block_number = self
@ -165,17 +172,25 @@ impl Web3Connection {
}
}
Web3Provider::Ws(provider) => {
// rate limits
let active_request_handle = self.wait_for_request_handle().await;
// TODO: automatically reconnect?
// TODO: it would be faster to get the block number, but subscriptions don't provide that
// TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out?
let mut stream = provider.subscribe_blocks().await?;
drop(active_request_handle);
let active_request_handle = self.wait_for_request_handle().await;
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block
// TODO: rate limit!
let block_number = provider.get_block_number().await.map(|x| x.as_u64())?;
drop(active_request_handle);
info!("current block on {}: {}", self, block_number);
self.head_block_number
@ -208,24 +223,31 @@ impl Web3Connection {
Ok(())
}
/// Send a web3 request
pub async fn request(
&self,
method: &str,
params: &serde_json::value::RawValue,
) -> Result<JsonRpcForwardedResponse, ethers::prelude::ProviderError> {
match &self.provider {
Web3Provider::Http(provider) => provider.request(method, params).await,
Web3Provider::Ws(provider) => provider.request(method, params).await,
pub async fn wait_for_request_handle(self: &Arc<Self>) -> ActiveRequestHandle {
// rate limits
loop {
match self.try_request_handle() {
Ok(pending_request_handle) => return pending_request_handle,
Err(not_until) => {
let deadline = not_until.wait_time_from(self.clock.now());
sleep(deadline).await;
}
}
}
// TODO: return a thing that when we drop it decrements?
}
pub fn try_inc_active_requests(&self) -> Result<(), NotUntil<QuantaInstant>> {
pub fn try_request_handle(
self: &Arc<Self>,
) -> Result<ActiveRequestHandle, NotUntil<QuantaInstant>> {
// check rate limits
if let Some(ratelimiter) = self.ratelimiter.as_ref() {
match ratelimiter.check() {
Ok(_) => {
// rate limit succeeded
return Ok(ActiveRequestHandle(self.clone()));
}
Err(not_until) => {
// rate limit failed
@ -238,15 +260,43 @@ impl Web3Connection {
}
};
// TODO: what ordering?!
self.active_requests.fetch_add(1, atomic::Ordering::AcqRel);
Ok(ActiveRequestHandle::new(self.clone()))
}
}
Ok(())
/// Drop this once a connection to completes
pub struct ActiveRequestHandle(Arc<Web3Connection>);
impl ActiveRequestHandle {
fn new(connection: Arc<Web3Connection>) -> Self {
// TODO: what ordering?!
connection
.active_requests
.fetch_add(1, atomic::Ordering::AcqRel);
Self(connection)
}
pub fn dec_active_requests(&self) {
// TODO: what ordering?!
self.active_requests.fetch_sub(1, atomic::Ordering::AcqRel);
/// Send a web3 request
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
/// By taking self here, we ensure that this is dropped after the request is complete
pub async fn request(
self,
method: &str,
params: &serde_json::value::RawValue,
) -> Result<JsonRpcForwardedResponse, ethers::prelude::ProviderError> {
match &self.0.provider {
Web3Provider::Http(provider) => provider.request(method, params).await,
Web3Provider::Ws(provider) => provider.request(method, params).await,
}
}
}
impl Drop for ActiveRequestHandle {
fn drop(&mut self) {
self.0
.active_requests
.fetch_sub(1, atomic::Ordering::AcqRel);
}
}

@ -13,7 +13,7 @@ use std::sync::Arc;
use tracing::warn;
use crate::config::Web3ConnectionConfig;
use crate::connection::{JsonRpcForwardedResponse, Web3Connection};
use crate::connection::{ActiveRequestHandle, JsonRpcForwardedResponse, Web3Connection};
#[derive(Clone, Default)]
struct SyncedConnections {
@ -104,17 +104,15 @@ impl Web3Connections {
self.synced_connections.load().head_block_number
}
pub async fn try_send_request(
pub async fn try_send_request<'a>(
&self,
connection: &Web3Connection,
connection_handle: ActiveRequestHandle,
method: &str,
params: &RawValue,
) -> anyhow::Result<JsonRpcForwardedResponse> {
// connection.in_active_requests was called when this rpc was selected
let response = connection.request(method, params).await;
connection.dec_active_requests();
let response = connection_handle.request(method, params).await;
// TODO: if "no block with that header" or some other jsonrpc errors, skip this response
@ -123,7 +121,7 @@ impl Web3Connections {
pub async fn try_send_requests(
self: Arc<Self>,
connections: Vec<Arc<Web3Connection>>,
connections: Vec<ActiveRequestHandle>,
method: String,
params: Box<RawValue>,
response_sender: flume::Sender<anyhow::Result<JsonRpcForwardedResponse>>,
@ -140,7 +138,7 @@ impl Web3Connections {
let handle = tokio::spawn(async move {
// get the client for this rpc server
let response = connections
.try_send_request(connection.as_ref(), &method, &params)
.try_send_request(connection, &method, &params)
.await?;
// send the first good response to a one shot channel. that way we respond quickly
@ -234,7 +232,7 @@ impl Web3Connections {
/// get the best available rpc server
pub async fn next_upstream_server(
&self,
) -> Result<Arc<Web3Connection>, Option<NotUntil<QuantaInstant>>> {
) -> Result<ActiveRequestHandle, Option<NotUntil<QuantaInstant>>> {
let mut earliest_not_until = None;
// TODO: this clone is probably not the best way to do this
@ -255,6 +253,9 @@ impl Web3Connections {
let a = cache.get(a).unwrap();
let b = cache.get(b).unwrap();
// TODO: don't just sort by active requests. sort by active requests as a percentage of soft limit
// TODO: if those are equal, sort on soft limit
a.cmp(b)
});
@ -262,14 +263,12 @@ impl Web3Connections {
let selected_rpc = self.inner.get(selected_rpc).unwrap();
// increment our connection counter
if let Err(not_until) = selected_rpc.try_inc_active_requests() {
earliest_possible(&mut earliest_not_until, not_until);
continue;
match selected_rpc.try_request_handle() {
Err(not_until) => {
earliest_possible(&mut earliest_not_until, not_until);
}
Ok(handle) => return Ok(handle),
}
// return the selected RPC
return Ok(selected_rpc.clone());
}
// this might be None
@ -280,21 +279,20 @@ impl Web3Connections {
/// even fetches if they aren't in sync. This is useful for broadcasting signed transactions
pub fn get_upstream_servers(
&self,
) -> Result<Vec<Arc<Web3Connection>>, Option<NotUntil<QuantaInstant>>> {
) -> Result<Vec<ActiveRequestHandle>, Option<NotUntil<QuantaInstant>>> {
let mut earliest_not_until = None;
// TODO: with capacity?
let mut selected_rpcs = vec![];
for connection in self.inner.iter() {
// check rate limits and increment our connection counter
if let Err(not_until) = connection.try_inc_active_requests() {
earliest_possible(&mut earliest_not_until, not_until);
// this rpc is not available. skip it
continue;
match connection.try_request_handle() {
Err(not_until) => {
earliest_possible(&mut earliest_not_until, not_until);
// this rpc is not available. skip it
}
Ok(handle) => selected_rpcs.push(handle),
}
selected_rpcs.push(connection.clone());
}
if !selected_rpcs.is_empty() {

@ -67,7 +67,7 @@ impl Web3ProxyApp {
.user_agent(APP_USER_AGENT)
.build()?;
// TODO: attach context to this error?
// TODO: attach context to this error
let balanced_rpc_tiers =
future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| {
Web3Connections::try_new(balanced_rpc_tier, Some(http_client.clone()), &clock)
@ -76,7 +76,7 @@ impl Web3ProxyApp {
.into_iter()
.collect::<anyhow::Result<Vec<Arc<Web3Connections>>>>()?;
// TODO: attach context to this error?
// TODO: attach context to this error
let private_rpcs = if private_rpcs.is_empty() {
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
// TODO: instead of None, set it to a list of all the rpcs from balanced_rpc_tiers. that way we broadcast very loudly
@ -170,7 +170,7 @@ impl Web3ProxyApp {
Ok(upstream_server) => {
let response = balanced_rpcs
.try_send_request(
&upstream_server,
upstream_server,
&json_body.method,
&json_body.params,
)