web3-proxy/web3-proxy/src/connection.rs

633 lines
24 KiB
Rust
Raw Normal View History

2022-05-12 02:50:52 +03:00
///! Rate-limited communication with a web3 provider
2022-05-05 22:07:09 +03:00
use derive_more::From;
2022-05-30 04:28:22 +03:00
use ethers::prelude::{Block, Middleware, ProviderError, TxHash};
2022-05-05 22:07:09 +03:00
use futures::StreamExt;
2022-05-22 21:39:06 +03:00
use redis_cell_client::RedisCellClient;
2022-05-21 01:16:15 +03:00
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
2022-05-05 22:07:09 +03:00
use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicU32};
2022-05-05 22:07:09 +03:00
use std::{cmp::Ordering, sync::Arc};
2022-06-14 08:43:28 +03:00
use tokio::sync::oneshot;
2022-05-17 05:26:47 +03:00
use tokio::sync::RwLock;
2022-05-17 19:23:27 +03:00
use tokio::task;
2022-05-19 06:00:54 +03:00
use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
2022-06-14 08:43:28 +03:00
use tracing::{error, info, instrument, trace, warn};
use crate::app::AnyhowJoinHandle;
2022-05-05 22:07:09 +03:00
/// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit https://github.com/gakonst/ethers-rs/issues/592
#[derive(From)]
pub enum Web3Provider {
Http(ethers::providers::Provider<ethers::providers::Http>),
Ws(ethers::providers::Provider<ethers::providers::Ws>),
}
2022-05-17 05:26:47 +03:00
impl Web3Provider {
#[instrument]
2022-05-22 02:34:05 +03:00
async fn from_str(
url_str: &str,
http_client: Option<&reqwest::Client>,
) -> anyhow::Result<Self> {
2022-05-17 05:26:47 +03:00
let provider = if url_str.starts_with("http") {
let url: url::Url = url_str.parse()?;
2022-05-22 02:34:05 +03:00
let http_client = http_client
.ok_or_else(|| anyhow::anyhow!("no http_client"))?
.clone();
2022-05-17 05:26:47 +03:00
let provider = ethers::providers::Http::new_with_client(url, http_client);
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
ethers::providers::Provider::new(provider)
.interval(Duration::from_secs(1))
.into()
} else if url_str.starts_with("ws") {
// TODO: wrapper automatically reconnect
let provider = ethers::providers::Ws::connect(url_str).await?;
// TODO: make sure this automatically reconnects
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
ethers::providers::Provider::new(provider)
.interval(Duration::from_secs(1))
.into()
} else {
return Err(anyhow::anyhow!("only http and ws servers are supported"));
};
Ok(provider)
}
}
2022-05-05 22:07:09 +03:00
impl fmt::Debug for Web3Provider {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url
f.debug_struct("Web3Provider").finish_non_exhaustive()
}
}
/// An active connection to a Web3Rpc
pub struct Web3Connection {
/// TODO: can we get this from the provider? do we even need it?
url: String,
/// keep track of currently open requests. We sort on this
active_requests: AtomicU32,
/// provider is in a RwLock so that we can replace it if re-connecting
2022-06-04 01:22:55 +03:00
provider: RwLock<Option<Arc<Web3Provider>>>,
2022-05-22 02:34:05 +03:00
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
2022-05-22 21:39:06 +03:00
hard_limit: Option<redis_cell_client::RedisCellClient>,
2022-05-05 22:07:09 +03:00
/// used for load balancing to the least loaded server
soft_limit: u32,
2022-05-21 01:16:15 +03:00
// TODO: track total number of requests?
2022-05-05 22:07:09 +03:00
}
2022-05-21 01:16:15 +03:00
impl Serialize for Web3Connection {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
// 3 is the number of fields in the struct.
let mut state = serializer.serialize_struct("Web3Connection", 1)?;
// TODO: sanitize any credentials in the url
state.serialize_field("url", &self.url)?;
state.serialize_field("soft_limit", &self.soft_limit)?;
state.serialize_field(
"active_requests",
&self.active_requests.load(atomic::Ordering::Acquire),
)?;
state.end()
}
}
2022-05-05 22:07:09 +03:00
impl fmt::Debug for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Web3Connection")
.field("url", &self.url)
.finish_non_exhaustive()
}
}
impl fmt::Display for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", &self.url)
}
}
impl Web3Connection {
/// Connect to a web3 rpc
2022-06-14 08:43:28 +03:00
// #[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))]
#[allow(clippy::too_many_arguments)]
pub async fn spawn(
2022-05-12 21:49:57 +03:00
chain_id: usize,
2022-05-05 22:07:09 +03:00
url_str: String,
2022-05-15 04:51:24 +03:00
// optional because this is only used for http providers. websocket providers don't use it
2022-05-22 02:34:05 +03:00
http_client: Option<&reqwest::Client>,
2022-05-22 21:39:06 +03:00
hard_limit: Option<(u32, &redis_cell_client::MultiplexedConnection)>,
2022-05-05 22:07:09 +03:00
// TODO: think more about this type
soft_limit: u32,
2022-06-14 08:43:28 +03:00
block_sender: Option<flume::Sender<(Block<TxHash>, Arc<Self>)>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool,
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
2022-05-22 21:39:06 +03:00
let hard_limit = hard_limit.map(|(hard_rate_limit, redis_conection)| {
2022-05-22 02:34:05 +03:00
// TODO: allow different max_burst and count_per_period and period
let period = 1;
2022-05-22 21:39:06 +03:00
RedisCellClient::new(
redis_conection.clone(),
2022-05-22 02:34:05 +03:00
format!("{},{}", chain_id, url_str),
hard_rate_limit,
hard_rate_limit,
period,
)
});
2022-05-05 22:07:09 +03:00
2022-05-17 05:26:47 +03:00
let provider = Web3Provider::from_str(&url_str, http_client).await?;
2022-05-05 22:07:09 +03:00
2022-05-12 21:49:57 +03:00
let connection = Web3Connection {
2022-05-05 22:07:09 +03:00
url: url_str.clone(),
2022-05-15 04:51:24 +03:00
active_requests: 0.into(),
2022-06-04 01:22:55 +03:00
provider: RwLock::new(Some(Arc::new(provider))),
2022-05-22 02:34:05 +03:00
hard_limit,
2022-05-05 22:07:09 +03:00
soft_limit,
2022-05-12 21:49:57 +03:00
};
let connection = Arc::new(connection);
2022-05-15 04:51:24 +03:00
// check the server's chain_id here
2022-05-27 19:34:42 +03:00
// TODO: move this outside the `new` function and into a `start` function or something
2022-05-15 04:51:24 +03:00
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
let found_chain_id: Result<String, _> = connection
.wait_for_request_handle()
.await
2022-05-15 04:51:24 +03:00
.request("eth_chainId", Option::None::<()>)
2022-05-13 00:20:33 +03:00
.await;
match found_chain_id {
Ok(found_chain_id) => {
let found_chain_id =
usize::from_str_radix(found_chain_id.trim_start_matches("0x"), 16).unwrap();
if chain_id != found_chain_id {
return Err(anyhow::anyhow!(
"incorrect chain id! Expected {}. Found {}",
chain_id,
found_chain_id
));
2022-05-29 06:03:15 +03:00
} else {
2022-06-14 08:43:28 +03:00
info!(?connection, "success");
2022-05-13 00:20:33 +03:00
}
}
Err(e) => {
2022-05-13 00:29:33 +03:00
let e = anyhow::Error::from(e).context(format!("{}", connection));
2022-05-13 00:20:33 +03:00
return Err(e);
}
2022-05-12 21:49:57 +03:00
}
2022-06-14 08:43:28 +03:00
let handle = {
let connection = connection.clone();
tokio::spawn(async move {
connection
.subscribe(block_sender, tx_id_sender, reconnect)
.await
})
};
Ok((connection, handle))
2022-05-05 22:07:09 +03:00
}
#[instrument(skip_all)]
pub async fn reconnect(
self: &Arc<Self>,
block_sender: &flume::Sender<(Block<TxHash>, Arc<Self>)>,
) -> anyhow::Result<()> {
// websocket doesn't need the http client
let http_client = None;
// since this lock is held open over an await, we use tokio's locking
let mut provider = self.provider.write().await;
*provider = None;
// tell the block subscriber that we are at 0
block_sender
.send_async((Block::default(), self.clone()))
.await?;
let new_provider = Web3Provider::from_str(&self.url, http_client).await?;
*provider = Some(Arc::new(new_provider));
Ok(())
}
2022-05-06 09:07:01 +03:00
#[inline]
2022-05-05 22:07:09 +03:00
pub fn active_requests(&self) -> u32 {
self.active_requests.load(atomic::Ordering::Acquire)
}
2022-05-06 23:44:12 +03:00
#[inline]
pub fn soft_limit(&self) -> u32 {
self.soft_limit
}
2022-06-04 01:22:55 +03:00
#[inline]
pub async fn has_provider(&self) -> bool {
self.provider.read().await.is_some()
}
2022-05-17 03:56:56 +03:00
#[instrument(skip_all)]
2022-05-16 08:16:32 +03:00
async fn send_block(
self: &Arc<Self>,
block: Result<Block<TxHash>, ProviderError>,
block_sender: &flume::Sender<(Block<TxHash>, Arc<Self>)>,
2022-05-30 07:30:13 +03:00
) -> anyhow::Result<()> {
match block {
Ok(block) => {
// TODO: i'm pretty sure we don't need send_async, but double check
block_sender.send_async((block, self.clone())).await?;
}
Err(e) => {
warn!("unable to get block from {}: {}", self, e);
}
}
2022-05-30 07:30:13 +03:00
Ok(())
}
2022-06-14 08:43:28 +03:00
async fn subscribe(
self: Arc<Self>,
2022-06-14 08:43:28 +03:00
block_sender: Option<flume::Sender<(Block<TxHash>, Arc<Self>)>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool,
) -> anyhow::Result<()> {
2022-06-14 08:43:28 +03:00
match (block_sender, tx_id_sender) {
(None, None) => {
// TODO: is there a better way to make a channel that is never ready?
let (tx, rx) = oneshot::channel::<()>();
rx.await?;
drop(tx);
}
2022-06-14 08:43:28 +03:00
(Some(block_sender), Some(tx_id_sender)) => {
// TODO: make these abortable so that if one fails the other can be cancelled?
loop {
let new_heads = {
let clone = self.clone();
let block_sender = block_sender.clone();
clone.subscribe_new_heads(block_sender)
};
let pending_txs = {
let clone = self.clone();
let tx_id_sender = tx_id_sender.clone();
clone.subscribe_pending_transactions(tx_id_sender)
};
match tokio::try_join!(new_heads, pending_txs) {
Ok(_) => break,
Err(err) => {
if reconnect {
// TODO: exponential backoff
// TODO: share code with new heads subscription
warn!(
"subscription exited. Attempting to reconnect in 1 second. {:?}", err
);
sleep(Duration::from_secs(1)).await;
// TODO: loop on reconnecting! do not return with a "?" here
// TODO: this isn't going to work. it will get in a loop with newHeads
self.reconnect(&block_sender).await?;
} else {
error!("subscription exited. {:?}", err);
break;
}
}
};
}
}
2022-06-14 08:43:28 +03:00
_ => panic!(),
}
Ok(())
}
2022-05-17 05:26:47 +03:00
/// Subscribe to new blocks. If `reconnect` is true, this runs forever.
2022-05-17 07:24:13 +03:00
/// TODO: instrument with the url
2022-05-17 03:56:56 +03:00
#[instrument(skip_all)]
async fn subscribe_new_heads(
2022-05-05 22:07:09 +03:00
self: Arc<Self>,
block_sender: flume::Sender<(Block<TxHash>, Arc<Self>)>,
2022-05-05 22:07:09 +03:00
) -> anyhow::Result<()> {
2022-06-14 08:43:28 +03:00
info!("watching {}", self);
// TODO: is a RwLock of an Option<Arc> the right thing here?
if let Some(provider) = self.provider.read().await.clone() {
match &*provider {
Web3Provider::Http(_provider) => {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: what should this interval be? probably some fraction of block time. set automatically?
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
let mut interval = interval(Duration::from_secs(2));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let mut last_hash = Default::default();
loop {
// wait for the interval
// TODO: if error or rate limit, increase interval?
interval.tick().await;
match self.try_request_handle().await {
Ok(active_request_handle) => {
// TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest"
let block: Result<Block<TxHash>, _> = active_request_handle
.request("eth_getBlockByNumber", ("latest", false))
.await;
// don't send repeat blocks
if let Ok(block) = &block {
let new_hash = block.hash.unwrap();
if new_hash == last_hash {
continue;
2022-05-17 05:26:47 +03:00
}
last_hash = new_hash;
2022-05-16 22:15:40 +03:00
}
self.send_block(block, &block_sender).await?;
}
Err(e) => {
warn!("Failed getting latest block from {}: {:?}", self, e);
2022-05-16 22:15:40 +03:00
}
}
2022-05-15 22:28:22 +03:00
}
}
Web3Provider::Ws(provider) => {
let active_request_handle = self.wait_for_request_handle().await;
let mut stream = provider.subscribe_blocks().await?;
drop(active_request_handle);
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block
// TODO: subscribe to Block<TransactionReceipt> instead?
let block: Result<Block<TxHash>, _> = self
.wait_for_request_handle()
.await
.request("eth_getBlockByNumber", ("latest", false))
.await;
self.send_block(block, &block_sender).await?;
// TODO: should the stream have a timeout on it here?
// TODO: although reconnects will make this less of an issue
loop {
match stream.next().await {
Some(new_block) => {
self.send_block(Ok(new_block), &block_sender).await?;
// TODO: really not sure about this
task::yield_now().await;
}
None => {
warn!("subscription ended");
break;
2022-05-17 07:24:13 +03:00
}
}
2022-05-17 05:26:47 +03:00
}
2022-05-05 22:07:09 +03:00
}
2022-05-17 05:26:47 +03:00
}
}
2022-05-17 03:56:56 +03:00
Ok(())
}
2022-05-17 05:26:47 +03:00
#[instrument(skip_all)]
async fn subscribe_pending_transactions(
self: Arc<Self>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
) -> anyhow::Result<()> {
2022-06-14 08:43:28 +03:00
info!("watching {}", self);
// TODO: is a RwLock of an Option<Arc> the right thing here?
if let Some(provider) = self.provider.read().await.clone() {
match &*provider {
Web3Provider::Http(_provider) => {
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: what should this interval be? probably automatically set to some fraction of block time
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
2022-06-14 08:43:28 +03:00
let mut interval = interval(Duration::from_secs(60));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
// TODO: create a filter
loop {
// wait for the interval
// TODO: if error or rate limit, increase interval?
interval.tick().await;
2022-06-14 08:43:28 +03:00
// TODO: actually do something here
/*
match self.try_request_handle().await {
Ok(active_request_handle) => {
// TODO: check the filter
unimplemented!("actually send a request");
}
Err(e) => {
warn!("Failed getting latest block from {}: {:?}", self, e);
}
}
2022-06-14 08:43:28 +03:00
*/
}
}
Web3Provider::Ws(provider) => {
// rate limits
let active_request_handle = self.wait_for_request_handle().await;
// TODO: automatically reconnect?
// TODO: it would be faster to get the block number, but subscriptions don't provide that
// TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out?
let mut stream = provider.subscribe_pending_txs().await?;
drop(active_request_handle);
// TODO: query existing pending txs?
// TODO: should the stream have a timeout on it here?
// TODO: although reconnects will make this less of an issue
loop {
match stream.next().await {
Some(pending_tx_id) => {
tx_id_sender
.send_async((pending_tx_id, self.clone()))
.await?;
}
None => {
warn!("subscription ended");
break;
}
}
}
}
2022-05-05 22:07:09 +03:00
}
}
Ok(())
}
2022-05-17 03:56:56 +03:00
#[instrument(skip_all)]
pub async fn wait_for_request_handle(self: &Arc<Self>) -> ActiveRequestHandle {
2022-05-13 23:50:11 +03:00
// TODO: maximum wait time
2022-05-16 22:15:40 +03:00
for _ in 0..10 {
2022-05-22 02:34:05 +03:00
match self.try_request_handle().await {
Ok(pending_request_handle) => return pending_request_handle,
2022-05-22 02:34:05 +03:00
Err(retry_after) => {
sleep(retry_after).await;
}
}
2022-05-05 22:07:09 +03:00
}
2022-05-16 22:15:40 +03:00
2022-06-14 08:43:28 +03:00
// TODO: what should we do? panic isn't ever what we want
2022-05-16 22:15:40 +03:00
panic!("no request handle after 10 tries");
2022-05-05 22:07:09 +03:00
}
2022-05-22 02:34:05 +03:00
pub async fn try_request_handle(self: &Arc<Self>) -> Result<ActiveRequestHandle, Duration> {
2022-06-04 01:22:55 +03:00
// check that we are connected
if !self.has_provider().await {
// TODO: how long? use the same amount as the exponential backoff on retry
return Err(Duration::from_secs(1));
}
2022-05-05 22:07:09 +03:00
// check rate limits
2022-05-22 02:34:05 +03:00
if let Some(ratelimiter) = self.hard_limit.as_ref() {
match ratelimiter.throttle().await {
2022-05-05 22:07:09 +03:00
Ok(_) => {
// rate limit succeeded
2022-05-13 23:50:11 +03:00
return Ok(ActiveRequestHandle::new(self.clone()));
2022-05-05 22:07:09 +03:00
}
2022-05-22 02:34:05 +03:00
Err(retry_after) => {
2022-05-05 22:07:09 +03:00
// rate limit failed
2022-05-22 02:34:05 +03:00
// save the smallest retry_after. if nothing succeeds, return an Err with retry_after in it
2022-05-05 22:07:09 +03:00
// TODO: use tracing better
2022-05-22 02:34:05 +03:00
warn!("Exhausted rate limit on {:?}: {:?}", self, retry_after);
2022-05-05 22:07:09 +03:00
2022-05-22 02:34:05 +03:00
return Err(retry_after);
2022-05-05 22:07:09 +03:00
}
}
};
Ok(ActiveRequestHandle::new(self.clone()))
}
}
impl Hash for Web3Connection {
fn hash<H: Hasher>(&self, state: &mut H) {
self.url.hash(state);
}
}
2022-05-12 02:50:52 +03:00
/// Drop this once a connection completes
pub struct ActiveRequestHandle(Arc<Web3Connection>);
impl ActiveRequestHandle {
fn new(connection: Arc<Web3Connection>) -> Self {
2022-05-12 22:58:26 +03:00
// TODO: attach a unique id to this
2022-05-05 22:07:09 +03:00
// TODO: what ordering?!
connection
.active_requests
.fetch_add(1, atomic::Ordering::AcqRel);
2022-05-05 22:07:09 +03:00
Self(connection)
2022-05-05 22:07:09 +03:00
}
/// Send a web3 request
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
/// By taking self here, we ensure that this is dropped after the request is complete
2022-05-17 03:56:56 +03:00
#[instrument(skip_all)]
2022-05-12 21:49:57 +03:00
pub async fn request<T, R>(
&self,
method: &str,
2022-05-12 21:49:57 +03:00
params: T,
) -> Result<R, ethers::prelude::ProviderError>
where
T: fmt::Debug + serde::Serialize + Send + Sync,
R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug,
{
2022-05-18 19:35:06 +03:00
// TODO: use tracing spans properly
2022-05-07 07:42:47 +03:00
// TODO: it would be nice to have the request id on this
2022-05-18 19:35:06 +03:00
// TODO: including params in this is way too verbose
trace!("Sending {} to {}", method, self.0);
2022-05-07 07:42:47 +03:00
2022-06-04 01:22:55 +03:00
let mut provider = None;
while provider.is_none() {
// TODO: if no provider, don't unwrap. wait until there is one.
match self.0.provider.read().await.as_ref() {
None => {}
Some(found_provider) => provider = Some(found_provider.clone()),
}
}
2022-05-17 05:26:47 +03:00
2022-06-04 01:22:55 +03:00
let response = match &*provider.unwrap() {
Web3Provider::Http(provider) => provider.request(method, params).await,
Web3Provider::Ws(provider) => provider.request(method, params).await,
2022-05-07 07:42:47 +03:00
};
// TODO: i think ethers already has trace logging (and does it much more fancy)
// TODO: at least instrument this with more useful information
2022-05-18 19:35:06 +03:00
// trace!("Reply from {}: {:?}", self.0, response);
trace!("Reply from {}", self.0);
2022-05-07 07:42:47 +03:00
response
}
}
impl Drop for ActiveRequestHandle {
fn drop(&mut self) {
self.0
.active_requests
.fetch_sub(1, atomic::Ordering::AcqRel);
2022-05-05 22:07:09 +03:00
}
}
impl Eq for Web3Connection {}
impl Ord for Web3Connection {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// TODO: what atomic ordering?!
let a = self.active_requests.load(atomic::Ordering::Acquire);
let b = other.active_requests.load(atomic::Ordering::Acquire);
// TODO: how should we include the soft limit? floats are slower than integer math
2022-05-27 19:34:42 +03:00
let a = (a + 1) as f32 / self.soft_limit as f32;
let b = (b + 1) as f32 / other.soft_limit as f32;
2022-05-05 22:07:09 +03:00
a.partial_cmp(&b).unwrap()
}
}
impl PartialOrd for Web3Connection {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
/// note that this is just comparing the active requests. two providers with different rpc urls are equal!
impl PartialEq for Web3Connection {
fn eq(&self, other: &Self) -> bool {
// TODO: what ordering?!
self.active_requests.load(atomic::Ordering::Acquire)
== other.active_requests.load(atomic::Ordering::Acquire)
}
}