web3-proxy/web3-proxy/src/connection.rs

353 lines
13 KiB
Rust
Raw Normal View History

2022-05-12 02:50:52 +03:00
///! Rate-limited communication with a web3 provider
2022-05-05 22:07:09 +03:00
use derive_more::From;
use ethers::prelude::Middleware;
use futures::StreamExt;
use governor::clock::{Clock, QuantaClock, QuantaInstant};
2022-05-05 22:07:09 +03:00
use governor::middleware::NoOpMiddleware;
use governor::state::{InMemoryState, NotKeyed};
use governor::NotUntil;
use governor::RateLimiter;
use serde_json::value::RawValue;
use std::fmt;
use std::num::NonZeroU32;
use std::sync::atomic::{self, AtomicU32, AtomicU64};
use std::time::Duration;
use std::{cmp::Ordering, sync::Arc};
use tokio::time::{interval, sleep, MissedTickBehavior};
2022-05-07 09:18:01 +03:00
use tracing::{info, trace, warn};
2022-05-05 22:07:09 +03:00
use crate::connections::Web3Connections;
type Web3RateLimiter =
RateLimiter<NotKeyed, InMemoryState, QuantaClock, NoOpMiddleware<QuantaInstant>>;
/// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit https://github.com/gakonst/ethers-rs/issues/592
#[derive(From)]
pub enum Web3Provider {
Http(ethers::providers::Provider<ethers::providers::Http>),
Ws(ethers::providers::Provider<ethers::providers::Ws>),
}
impl fmt::Debug for Web3Provider {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url
f.debug_struct("Web3Provider").finish_non_exhaustive()
}
}
/// An active connection to a Web3Rpc
pub struct Web3Connection {
/// TODO: can we get this from the provider? do we even need it?
url: String,
/// keep track of currently open requests. We sort on this
active_requests: AtomicU32,
provider: Web3Provider,
ratelimiter: Option<Web3RateLimiter>,
/// used for load balancing to the least loaded server
soft_limit: u32,
head_block_number: AtomicU64,
clock: QuantaClock,
2022-05-05 22:07:09 +03:00
}
impl fmt::Debug for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Web3Connection")
.field("url", &self.url)
.finish_non_exhaustive()
}
}
impl fmt::Display for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", &self.url)
}
}
impl Web3Connection {
/// Connect to a web3 rpc and subscribe to new heads
pub async fn try_new(
url_str: String,
http_client: Option<reqwest::Client>,
hard_rate_limit: Option<u32>,
clock: &QuantaClock,
2022-05-05 22:07:09 +03:00
// TODO: think more about this type
soft_limit: u32,
) -> anyhow::Result<Web3Connection> {
let hard_rate_limiter = if let Some(hard_rate_limit) = hard_rate_limit {
let quota = governor::Quota::per_second(NonZeroU32::new(hard_rate_limit).unwrap());
let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock);
2022-05-05 22:07:09 +03:00
Some(rate_limiter)
} else {
None
};
let provider = if url_str.starts_with("http") {
let url: url::Url = url_str.parse()?;
let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?;
let provider = ethers::providers::Http::new_with_client(url, http_client);
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
ethers::providers::Provider::new(provider)
.interval(Duration::from_secs(1))
.into()
} else if url_str.starts_with("ws") {
2022-05-06 00:38:15 +03:00
// TODO: wrapper automatically reconnect
2022-05-05 22:07:09 +03:00
let provider = ethers::providers::Ws::connect(url_str.clone()).await?;
// TODO: make sure this automatically reconnects
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
ethers::providers::Provider::new(provider)
.interval(Duration::from_secs(1))
.into()
} else {
return Err(anyhow::anyhow!("only http and ws servers are supported"));
};
Ok(Web3Connection {
clock: clock.clone(),
2022-05-05 22:07:09 +03:00
url: url_str.clone(),
active_requests: Default::default(),
provider,
ratelimiter: hard_rate_limiter,
soft_limit,
head_block_number: 0.into(),
})
}
2022-05-06 09:07:01 +03:00
#[inline]
2022-05-05 22:07:09 +03:00
pub fn active_requests(&self) -> u32 {
self.active_requests.load(atomic::Ordering::Acquire)
}
2022-05-06 09:07:01 +03:00
#[inline]
pub fn head_block_number(&self) -> u64 {
self.head_block_number.load(atomic::Ordering::Acquire)
}
2022-05-06 23:44:12 +03:00
#[inline]
pub fn soft_limit(&self) -> u32 {
self.soft_limit
}
2022-05-06 09:07:01 +03:00
#[inline]
2022-05-05 22:07:09 +03:00
pub fn url(&self) -> &str {
&self.url
}
/// Subscribe to new blocks
// #[instrument]
pub async fn new_heads(
self: Arc<Self>,
connections: Option<Arc<Web3Connections>>,
) -> anyhow::Result<()> {
info!("Watching new_heads on {}", self);
match &self.provider {
Web3Provider::Http(provider) => {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
2022-05-06 08:44:30 +03:00
// TODO: what should this interval be? probably some fraction of block time. set automatically?
2022-05-05 22:07:09 +03:00
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
let mut interval = interval(Duration::from_secs(2));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
2022-05-05 22:07:09 +03:00
loop {
// wait for the interval
2022-05-06 00:38:15 +03:00
// TODO: if error or rate limit, increase interval?
2022-05-05 22:07:09 +03:00
interval.tick().await;
let active_request_handle = self.wait_for_request_handle().await;
2022-05-05 22:07:09 +03:00
let block_number = provider.get_block_number().await.map(|x| x.as_u64())?;
drop(active_request_handle);
2022-05-05 22:07:09 +03:00
// TODO: only store if this isn't already stored?
// TODO: also send something to the provider_tier so it can sort?
let old_block_number = self
.head_block_number
.swap(block_number, atomic::Ordering::AcqRel);
if old_block_number != block_number {
if let Some(connections) = &connections {
2022-05-06 09:07:01 +03:00
connections.update_synced_rpcs(&self)?;
2022-05-06 23:44:12 +03:00
} else {
info!("new block on {}: {}", self, block_number);
2022-05-05 22:07:09 +03:00
}
}
}
}
Web3Provider::Ws(provider) => {
// rate limits
let active_request_handle = self.wait_for_request_handle().await;
2022-05-05 22:07:09 +03:00
// TODO: automatically reconnect?
// TODO: it would be faster to get the block number, but subscriptions don't provide that
// TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out?
let mut stream = provider.subscribe_blocks().await?;
drop(active_request_handle);
let active_request_handle = self.wait_for_request_handle().await;
2022-05-05 22:07:09 +03:00
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block
2022-05-06 01:21:27 +03:00
// TODO: rate limit!
2022-05-05 22:07:09 +03:00
let block_number = provider.get_block_number().await.map(|x| x.as_u64())?;
drop(active_request_handle);
2022-05-06 23:44:12 +03:00
// TODO: swap and check the result?
self.head_block_number
.store(block_number, atomic::Ordering::Release);
2022-05-05 22:07:09 +03:00
if let Some(connections) = &connections {
2022-05-06 09:07:01 +03:00
connections.update_synced_rpcs(&self)?;
2022-05-06 23:44:12 +03:00
} else {
2022-05-07 07:42:47 +03:00
info!("new head block {} from {}", block_number, self);
2022-05-05 22:07:09 +03:00
}
2022-05-06 23:44:12 +03:00
while let Some(new_block) = stream.next().await {
let new_block_number = new_block.number.unwrap().as_u64();
2022-05-05 22:07:09 +03:00
// TODO: only store if this isn't already stored?
// TODO: also send something to the provider_tier so it can sort?
// TODO: do we need this old block number check? its helpful on http, but here it shouldn't dupe except maybe on the first run
self.head_block_number
2022-05-06 23:44:12 +03:00
.fetch_max(new_block_number, atomic::Ordering::AcqRel);
2022-05-05 22:07:09 +03:00
if let Some(connections) = &connections {
2022-05-06 09:07:01 +03:00
connections.update_synced_rpcs(&self)?;
2022-05-05 22:07:09 +03:00
}
}
}
}
info!("Done watching new_heads on {}", self);
Ok(())
}
pub async fn wait_for_request_handle(self: &Arc<Self>) -> ActiveRequestHandle {
// rate limits
loop {
match self.try_request_handle() {
Ok(pending_request_handle) => return pending_request_handle,
Err(not_until) => {
let deadline = not_until.wait_time_from(self.clock.now());
sleep(deadline).await;
}
}
2022-05-05 22:07:09 +03:00
}
// TODO: return a thing that when we drop it decrements?
2022-05-05 22:07:09 +03:00
}
pub fn try_request_handle(
self: &Arc<Self>,
) -> Result<ActiveRequestHandle, NotUntil<QuantaInstant>> {
2022-05-05 22:07:09 +03:00
// check rate limits
if let Some(ratelimiter) = self.ratelimiter.as_ref() {
match ratelimiter.check() {
Ok(_) => {
// rate limit succeeded
return Ok(ActiveRequestHandle(self.clone()));
2022-05-05 22:07:09 +03:00
}
Err(not_until) => {
// rate limit failed
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it
// TODO: use tracing better
warn!("Exhausted rate limit on {:?}: {}", self, not_until);
return Err(not_until);
}
}
};
Ok(ActiveRequestHandle::new(self.clone()))
}
}
2022-05-12 02:50:52 +03:00
/// Drop this once a connection completes
pub struct ActiveRequestHandle(Arc<Web3Connection>);
impl ActiveRequestHandle {
fn new(connection: Arc<Web3Connection>) -> Self {
2022-05-07 09:13:57 +03:00
// TODO: attach the incoming id to this? will be helpful for logging
2022-05-05 22:07:09 +03:00
// TODO: what ordering?!
connection
.active_requests
.fetch_add(1, atomic::Ordering::AcqRel);
2022-05-05 22:07:09 +03:00
Self(connection)
2022-05-05 22:07:09 +03:00
}
/// Send a web3 request
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
/// By taking self here, we ensure that this is dropped after the request is complete
pub async fn request(
self,
method: &str,
params: &serde_json::value::RawValue,
2022-05-06 08:44:30 +03:00
) -> Result<Box<RawValue>, ethers::prelude::ProviderError> {
2022-05-07 07:42:47 +03:00
// TODO: this should probably be trace level and use a span
// TODO: it would be nice to have the request id on this
2022-05-07 09:13:57 +03:00
trace!("Sending {}({}) to {}", method, params.to_string(), self.0);
2022-05-07 07:42:47 +03:00
let response = match &self.0.provider {
Web3Provider::Http(provider) => provider.request(method, params).await,
Web3Provider::Ws(provider) => provider.request(method, params).await,
2022-05-07 07:42:47 +03:00
};
// TODO: i think ethers already has trace logging (and does it much more fancy)
// TODO: at least instrument this with more useful information
2022-05-07 09:13:57 +03:00
trace!("Response from {}: {:?}", self.0, response);
2022-05-07 07:42:47 +03:00
response
}
}
impl Drop for ActiveRequestHandle {
fn drop(&mut self) {
self.0
.active_requests
.fetch_sub(1, atomic::Ordering::AcqRel);
2022-05-05 22:07:09 +03:00
}
}
impl Eq for Web3Connection {}
impl Ord for Web3Connection {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// TODO: what atomic ordering?!
let a = self.active_requests.load(atomic::Ordering::Acquire);
let b = other.active_requests.load(atomic::Ordering::Acquire);
// TODO: how should we include the soft limit? floats are slower than integer math
let a = a as f32 / self.soft_limit as f32;
let b = b as f32 / other.soft_limit as f32;
a.partial_cmp(&b).unwrap()
}
}
impl PartialOrd for Web3Connection {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
/// note that this is just comparing the active requests. two providers with different rpc urls are equal!
impl PartialEq for Web3Connection {
fn eq(&self, other: &Self) -> bool {
// TODO: what ordering?!
self.active_requests.load(atomic::Ordering::Acquire)
== other.active_requests.load(atomic::Ordering::Acquire)
}
}