2022-05-12 02:50:52 +03:00
|
|
|
///! Rate-limited communication with a web3 provider
|
2022-05-05 22:07:09 +03:00
|
|
|
use derive_more::From;
|
|
|
|
use ethers::prelude::Middleware;
|
|
|
|
use futures::StreamExt;
|
2022-05-06 07:29:25 +03:00
|
|
|
use governor::clock::{Clock, QuantaClock, QuantaInstant};
|
2022-05-05 22:07:09 +03:00
|
|
|
use governor::middleware::NoOpMiddleware;
|
|
|
|
use governor::state::{InMemoryState, NotKeyed};
|
|
|
|
use governor::NotUntil;
|
|
|
|
use governor::RateLimiter;
|
|
|
|
use serde_json::value::RawValue;
|
|
|
|
use std::fmt;
|
|
|
|
use std::num::NonZeroU32;
|
|
|
|
use std::sync::atomic::{self, AtomicU32, AtomicU64};
|
|
|
|
use std::time::Duration;
|
|
|
|
use std::{cmp::Ordering, sync::Arc};
|
2022-05-06 07:29:25 +03:00
|
|
|
use tokio::time::{interval, sleep, MissedTickBehavior};
|
2022-05-07 09:18:01 +03:00
|
|
|
use tracing::{info, trace, warn};
|
2022-05-05 22:07:09 +03:00
|
|
|
|
|
|
|
use crate::connections::Web3Connections;
|
|
|
|
|
|
|
|
type Web3RateLimiter =
|
|
|
|
RateLimiter<NotKeyed, InMemoryState, QuantaClock, NoOpMiddleware<QuantaInstant>>;
|
|
|
|
|
|
|
|
/// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit https://github.com/gakonst/ethers-rs/issues/592
|
|
|
|
#[derive(From)]
|
|
|
|
pub enum Web3Provider {
|
|
|
|
Http(ethers::providers::Provider<ethers::providers::Http>),
|
|
|
|
Ws(ethers::providers::Provider<ethers::providers::Ws>),
|
|
|
|
}
|
|
|
|
|
|
|
|
impl fmt::Debug for Web3Provider {
|
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
|
// TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url
|
|
|
|
f.debug_struct("Web3Provider").finish_non_exhaustive()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// An active connection to a Web3Rpc
|
|
|
|
pub struct Web3Connection {
|
|
|
|
/// TODO: can we get this from the provider? do we even need it?
|
|
|
|
url: String,
|
|
|
|
/// keep track of currently open requests. We sort on this
|
|
|
|
active_requests: AtomicU32,
|
|
|
|
provider: Web3Provider,
|
|
|
|
ratelimiter: Option<Web3RateLimiter>,
|
|
|
|
/// used for load balancing to the least loaded server
|
|
|
|
soft_limit: u32,
|
|
|
|
head_block_number: AtomicU64,
|
2022-05-06 07:29:25 +03:00
|
|
|
clock: QuantaClock,
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
impl fmt::Debug for Web3Connection {
|
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
|
f.debug_struct("Web3Connection")
|
|
|
|
.field("url", &self.url)
|
|
|
|
.finish_non_exhaustive()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl fmt::Display for Web3Connection {
|
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
|
write!(f, "{}", &self.url)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Web3Connection {
|
|
|
|
/// Connect to a web3 rpc and subscribe to new heads
|
|
|
|
pub async fn try_new(
|
|
|
|
url_str: String,
|
|
|
|
http_client: Option<reqwest::Client>,
|
|
|
|
hard_rate_limit: Option<u32>,
|
2022-05-06 07:29:25 +03:00
|
|
|
clock: &QuantaClock,
|
2022-05-05 22:07:09 +03:00
|
|
|
// TODO: think more about this type
|
|
|
|
soft_limit: u32,
|
|
|
|
) -> anyhow::Result<Web3Connection> {
|
|
|
|
let hard_rate_limiter = if let Some(hard_rate_limit) = hard_rate_limit {
|
|
|
|
let quota = governor::Quota::per_second(NonZeroU32::new(hard_rate_limit).unwrap());
|
|
|
|
|
2022-05-06 07:29:25 +03:00
|
|
|
let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock);
|
2022-05-05 22:07:09 +03:00
|
|
|
|
|
|
|
Some(rate_limiter)
|
|
|
|
} else {
|
|
|
|
None
|
|
|
|
};
|
|
|
|
|
|
|
|
let provider = if url_str.starts_with("http") {
|
|
|
|
let url: url::Url = url_str.parse()?;
|
|
|
|
|
|
|
|
let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?;
|
|
|
|
|
|
|
|
let provider = ethers::providers::Http::new_with_client(url, http_client);
|
|
|
|
|
|
|
|
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
|
|
|
|
ethers::providers::Provider::new(provider)
|
|
|
|
.interval(Duration::from_secs(1))
|
|
|
|
.into()
|
|
|
|
} else if url_str.starts_with("ws") {
|
2022-05-06 00:38:15 +03:00
|
|
|
// TODO: wrapper automatically reconnect
|
2022-05-05 22:07:09 +03:00
|
|
|
let provider = ethers::providers::Ws::connect(url_str.clone()).await?;
|
|
|
|
|
|
|
|
// TODO: make sure this automatically reconnects
|
|
|
|
|
|
|
|
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
|
|
|
|
ethers::providers::Provider::new(provider)
|
|
|
|
.interval(Duration::from_secs(1))
|
|
|
|
.into()
|
|
|
|
} else {
|
|
|
|
return Err(anyhow::anyhow!("only http and ws servers are supported"));
|
|
|
|
};
|
|
|
|
|
|
|
|
Ok(Web3Connection {
|
2022-05-06 07:29:25 +03:00
|
|
|
clock: clock.clone(),
|
2022-05-05 22:07:09 +03:00
|
|
|
url: url_str.clone(),
|
|
|
|
active_requests: Default::default(),
|
|
|
|
provider,
|
|
|
|
ratelimiter: hard_rate_limiter,
|
|
|
|
soft_limit,
|
|
|
|
head_block_number: 0.into(),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2022-05-06 09:07:01 +03:00
|
|
|
#[inline]
|
2022-05-05 22:07:09 +03:00
|
|
|
pub fn active_requests(&self) -> u32 {
|
|
|
|
self.active_requests.load(atomic::Ordering::Acquire)
|
|
|
|
}
|
|
|
|
|
2022-05-06 09:07:01 +03:00
|
|
|
#[inline]
|
|
|
|
pub fn head_block_number(&self) -> u64 {
|
|
|
|
self.head_block_number.load(atomic::Ordering::Acquire)
|
|
|
|
}
|
|
|
|
|
2022-05-06 23:44:12 +03:00
|
|
|
#[inline]
|
|
|
|
pub fn soft_limit(&self) -> u32 {
|
|
|
|
self.soft_limit
|
|
|
|
}
|
|
|
|
|
2022-05-06 09:07:01 +03:00
|
|
|
#[inline]
|
2022-05-05 22:07:09 +03:00
|
|
|
pub fn url(&self) -> &str {
|
|
|
|
&self.url
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Subscribe to new blocks
|
|
|
|
// #[instrument]
|
|
|
|
pub async fn new_heads(
|
|
|
|
self: Arc<Self>,
|
|
|
|
connections: Option<Arc<Web3Connections>>,
|
|
|
|
) -> anyhow::Result<()> {
|
|
|
|
info!("Watching new_heads on {}", self);
|
|
|
|
|
|
|
|
match &self.provider {
|
|
|
|
Web3Provider::Http(provider) => {
|
|
|
|
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
|
2022-05-06 08:44:30 +03:00
|
|
|
// TODO: what should this interval be? probably some fraction of block time. set automatically?
|
2022-05-05 22:07:09 +03:00
|
|
|
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
|
|
|
|
let mut interval = interval(Duration::from_secs(2));
|
2022-05-06 07:29:25 +03:00
|
|
|
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
2022-05-05 22:07:09 +03:00
|
|
|
|
|
|
|
loop {
|
|
|
|
// wait for the interval
|
2022-05-06 00:38:15 +03:00
|
|
|
// TODO: if error or rate limit, increase interval?
|
2022-05-05 22:07:09 +03:00
|
|
|
interval.tick().await;
|
|
|
|
|
2022-05-06 07:29:25 +03:00
|
|
|
let active_request_handle = self.wait_for_request_handle().await;
|
|
|
|
|
2022-05-05 22:07:09 +03:00
|
|
|
let block_number = provider.get_block_number().await.map(|x| x.as_u64())?;
|
|
|
|
|
2022-05-06 07:29:25 +03:00
|
|
|
drop(active_request_handle);
|
|
|
|
|
2022-05-05 22:07:09 +03:00
|
|
|
// TODO: only store if this isn't already stored?
|
|
|
|
// TODO: also send something to the provider_tier so it can sort?
|
|
|
|
let old_block_number = self
|
|
|
|
.head_block_number
|
|
|
|
.swap(block_number, atomic::Ordering::AcqRel);
|
|
|
|
|
|
|
|
if old_block_number != block_number {
|
|
|
|
if let Some(connections) = &connections {
|
2022-05-06 09:07:01 +03:00
|
|
|
connections.update_synced_rpcs(&self)?;
|
2022-05-06 23:44:12 +03:00
|
|
|
} else {
|
|
|
|
info!("new block on {}: {}", self, block_number);
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Web3Provider::Ws(provider) => {
|
2022-05-06 07:29:25 +03:00
|
|
|
// rate limits
|
|
|
|
let active_request_handle = self.wait_for_request_handle().await;
|
|
|
|
|
2022-05-05 22:07:09 +03:00
|
|
|
// TODO: automatically reconnect?
|
|
|
|
// TODO: it would be faster to get the block number, but subscriptions don't provide that
|
|
|
|
// TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out?
|
|
|
|
let mut stream = provider.subscribe_blocks().await?;
|
|
|
|
|
2022-05-06 07:29:25 +03:00
|
|
|
drop(active_request_handle);
|
|
|
|
let active_request_handle = self.wait_for_request_handle().await;
|
|
|
|
|
2022-05-05 22:07:09 +03:00
|
|
|
// query the block once since the subscription doesn't send the current block
|
|
|
|
// there is a very small race condition here where the stream could send us a new block right now
|
|
|
|
// all it does is print "new block" for the same block as current block
|
2022-05-06 01:21:27 +03:00
|
|
|
// TODO: rate limit!
|
2022-05-05 22:07:09 +03:00
|
|
|
let block_number = provider.get_block_number().await.map(|x| x.as_u64())?;
|
|
|
|
|
2022-05-06 07:29:25 +03:00
|
|
|
drop(active_request_handle);
|
|
|
|
|
2022-05-06 23:44:12 +03:00
|
|
|
// TODO: swap and check the result?
|
|
|
|
self.head_block_number
|
|
|
|
.store(block_number, atomic::Ordering::Release);
|
2022-05-05 22:07:09 +03:00
|
|
|
|
|
|
|
if let Some(connections) = &connections {
|
2022-05-06 09:07:01 +03:00
|
|
|
connections.update_synced_rpcs(&self)?;
|
2022-05-06 23:44:12 +03:00
|
|
|
} else {
|
2022-05-07 07:42:47 +03:00
|
|
|
info!("new head block {} from {}", block_number, self);
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
|
|
|
|
2022-05-06 23:44:12 +03:00
|
|
|
while let Some(new_block) = stream.next().await {
|
|
|
|
let new_block_number = new_block.number.unwrap().as_u64();
|
2022-05-05 22:07:09 +03:00
|
|
|
|
|
|
|
// TODO: only store if this isn't already stored?
|
|
|
|
// TODO: also send something to the provider_tier so it can sort?
|
|
|
|
// TODO: do we need this old block number check? its helpful on http, but here it shouldn't dupe except maybe on the first run
|
|
|
|
self.head_block_number
|
2022-05-06 23:44:12 +03:00
|
|
|
.fetch_max(new_block_number, atomic::Ordering::AcqRel);
|
2022-05-05 22:07:09 +03:00
|
|
|
|
|
|
|
if let Some(connections) = &connections {
|
2022-05-06 09:07:01 +03:00
|
|
|
connections.update_synced_rpcs(&self)?;
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
info!("Done watching new_heads on {}", self);
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-05-06 07:29:25 +03:00
|
|
|
pub async fn wait_for_request_handle(self: &Arc<Self>) -> ActiveRequestHandle {
|
|
|
|
// rate limits
|
|
|
|
loop {
|
|
|
|
match self.try_request_handle() {
|
|
|
|
Ok(pending_request_handle) => return pending_request_handle,
|
|
|
|
Err(not_until) => {
|
|
|
|
let deadline = not_until.wait_time_from(self.clock.now());
|
|
|
|
|
|
|
|
sleep(deadline).await;
|
|
|
|
}
|
|
|
|
}
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
2022-05-06 07:29:25 +03:00
|
|
|
|
|
|
|
// TODO: return a thing that when we drop it decrements?
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
|
|
|
|
2022-05-06 07:29:25 +03:00
|
|
|
pub fn try_request_handle(
|
|
|
|
self: &Arc<Self>,
|
|
|
|
) -> Result<ActiveRequestHandle, NotUntil<QuantaInstant>> {
|
2022-05-05 22:07:09 +03:00
|
|
|
// check rate limits
|
|
|
|
if let Some(ratelimiter) = self.ratelimiter.as_ref() {
|
|
|
|
match ratelimiter.check() {
|
|
|
|
Ok(_) => {
|
|
|
|
// rate limit succeeded
|
2022-05-06 07:29:25 +03:00
|
|
|
return Ok(ActiveRequestHandle(self.clone()));
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
|
|
|
Err(not_until) => {
|
|
|
|
// rate limit failed
|
|
|
|
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it
|
|
|
|
// TODO: use tracing better
|
|
|
|
warn!("Exhausted rate limit on {:?}: {}", self, not_until);
|
|
|
|
|
|
|
|
return Err(not_until);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2022-05-06 07:29:25 +03:00
|
|
|
Ok(ActiveRequestHandle::new(self.clone()))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-12 02:50:52 +03:00
|
|
|
/// Drop this once a connection completes
|
2022-05-06 07:29:25 +03:00
|
|
|
pub struct ActiveRequestHandle(Arc<Web3Connection>);
|
|
|
|
|
|
|
|
impl ActiveRequestHandle {
|
|
|
|
fn new(connection: Arc<Web3Connection>) -> Self {
|
2022-05-07 09:13:57 +03:00
|
|
|
// TODO: attach the incoming id to this? will be helpful for logging
|
2022-05-05 22:07:09 +03:00
|
|
|
// TODO: what ordering?!
|
2022-05-06 07:29:25 +03:00
|
|
|
connection
|
|
|
|
.active_requests
|
|
|
|
.fetch_add(1, atomic::Ordering::AcqRel);
|
2022-05-05 22:07:09 +03:00
|
|
|
|
2022-05-06 07:29:25 +03:00
|
|
|
Self(connection)
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
|
|
|
|
2022-05-06 07:29:25 +03:00
|
|
|
/// Send a web3 request
|
|
|
|
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
|
|
|
|
/// By taking self here, we ensure that this is dropped after the request is complete
|
|
|
|
pub async fn request(
|
|
|
|
self,
|
|
|
|
method: &str,
|
2022-05-12 06:40:41 +03:00
|
|
|
params: &Option<Box<serde_json::value::RawValue>>,
|
2022-05-06 08:44:30 +03:00
|
|
|
) -> Result<Box<RawValue>, ethers::prelude::ProviderError> {
|
2022-05-07 07:42:47 +03:00
|
|
|
// TODO: this should probably be trace level and use a span
|
|
|
|
// TODO: it would be nice to have the request id on this
|
2022-05-12 06:40:41 +03:00
|
|
|
trace!("Sending {}({:?}) to {}", method, params, self.0);
|
2022-05-07 07:42:47 +03:00
|
|
|
|
|
|
|
let response = match &self.0.provider {
|
2022-05-06 07:29:25 +03:00
|
|
|
Web3Provider::Http(provider) => provider.request(method, params).await,
|
|
|
|
Web3Provider::Ws(provider) => provider.request(method, params).await,
|
2022-05-07 07:42:47 +03:00
|
|
|
};
|
|
|
|
|
|
|
|
// TODO: i think ethers already has trace logging (and does it much more fancy)
|
|
|
|
// TODO: at least instrument this with more useful information
|
2022-05-07 09:13:57 +03:00
|
|
|
trace!("Response from {}: {:?}", self.0, response);
|
2022-05-07 07:42:47 +03:00
|
|
|
|
|
|
|
response
|
2022-05-06 07:29:25 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Drop for ActiveRequestHandle {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
self.0
|
|
|
|
.active_requests
|
|
|
|
.fetch_sub(1, atomic::Ordering::AcqRel);
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Eq for Web3Connection {}
|
|
|
|
|
|
|
|
impl Ord for Web3Connection {
|
|
|
|
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
|
|
|
// TODO: what atomic ordering?!
|
|
|
|
let a = self.active_requests.load(atomic::Ordering::Acquire);
|
|
|
|
let b = other.active_requests.load(atomic::Ordering::Acquire);
|
|
|
|
|
|
|
|
// TODO: how should we include the soft limit? floats are slower than integer math
|
|
|
|
let a = a as f32 / self.soft_limit as f32;
|
|
|
|
let b = b as f32 / other.soft_limit as f32;
|
|
|
|
|
|
|
|
a.partial_cmp(&b).unwrap()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl PartialOrd for Web3Connection {
|
|
|
|
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
|
|
|
Some(self.cmp(other))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// note that this is just comparing the active requests. two providers with different rpc urls are equal!
|
|
|
|
impl PartialEq for Web3Connection {
|
|
|
|
fn eq(&self, other: &Self) -> bool {
|
|
|
|
// TODO: what ordering?!
|
|
|
|
self.active_requests.load(atomic::Ordering::Acquire)
|
|
|
|
== other.active_requests.load(atomic::Ordering::Acquire)
|
|
|
|
}
|
|
|
|
}
|