web3-proxy/web3_proxy/src/rpcs/connection.rs

1072 lines
42 KiB
Rust
Raw Normal View History

2022-08-27 02:44:25 +03:00
///! Rate-limited communication with a web3 provider.
use super::blockchain::{ArcBlock, BlockHashesCache, BlockId};
use super::provider::Web3Provider;
2022-09-09 06:53:16 +03:00
use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult};
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::BlockAndRpc;
use crate::frontend::authorization::Authorization;
2022-06-16 05:53:37 +03:00
use anyhow::Context;
2022-11-07 00:05:03 +03:00
use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
2022-06-16 05:53:37 +03:00
use futures::future::try_join_all;
2022-05-05 22:07:09 +03:00
use futures::StreamExt;
use log::{debug, error, info, trace, warn, Level};
2022-11-14 21:24:52 +03:00
use migration::sea_orm::DatabaseConnection;
2022-08-10 05:37:34 +03:00
use parking_lot::RwLock;
2022-09-15 20:57:24 +03:00
use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter};
2022-05-21 01:16:15 +03:00
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use serde_json::json;
2022-09-14 04:43:09 +03:00
use std::cmp::min;
2022-05-05 22:07:09 +03:00
use std::fmt;
use std::hash::{Hash, Hasher};
2022-07-19 04:31:12 +03:00
use std::sync::atomic::{self, AtomicU32, AtomicU64};
2022-05-05 22:07:09 +03:00
use std::{cmp::Ordering, sync::Arc};
2022-11-12 09:11:58 +03:00
use thread_fast_rng::rand::Rng;
use thread_fast_rng::thread_fast_rng;
use tokio::sync::broadcast;
2022-08-10 05:37:34 +03:00
use tokio::sync::RwLock as AsyncRwLock;
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
2022-06-14 08:43:28 +03:00
2022-09-15 20:57:24 +03:00
/// An active connection to a Web3 RPC server like geth or erigon.
2022-08-24 02:13:56 +03:00
pub struct Web3Connection {
2022-08-24 03:11:49 +03:00
pub name: String,
2022-11-14 00:05:37 +03:00
pub display_name: Option<String>,
2022-08-24 02:13:56 +03:00
/// TODO: can we get this from the provider? do we even need it?
pub(super) url: String,
2022-09-15 20:57:24 +03:00
/// Some connections use an http_client. we keep a clone for reconnecting
pub(super) http_client: Option<reqwest::Client>,
2022-08-24 02:13:56 +03:00
/// keep track of currently open requests. We sort on this
2022-08-24 03:11:49 +03:00
pub(super) active_requests: AtomicU32,
/// keep track of total requests from the frontend
pub(super) frontend_requests: AtomicU64,
/// keep track of total requests from web3-proxy itself
pub(super) internal_requests: AtomicU64,
2022-08-24 02:13:56 +03:00
/// provider is in a RwLock so that we can replace it if re-connecting
/// it is an async lock because we hold it open across awaits
2022-08-24 03:11:49 +03:00
pub(super) provider: AsyncRwLock<Option<Arc<Web3Provider>>>,
2022-08-24 02:13:56 +03:00
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
2022-09-15 20:57:24 +03:00
/// We do not use the deferred rate limiter because going over limits would cause errors
pub(super) hard_limit: Option<RedisRateLimiter>,
2022-08-24 02:13:56 +03:00
/// used for load balancing to the least loaded server
2022-08-26 20:26:17 +03:00
pub(super) soft_limit: u32,
/// TODO: have an enum for this so that "no limit" prints pretty?
pub(super) block_data_limit: AtomicU64,
2022-11-12 09:11:58 +03:00
/// Lower weight are higher priority when sending requests. 0 to 99.
pub(super) weight: f64,
2022-11-22 23:23:08 +03:00
/// TODO: should this be an AsyncRwLock?
2022-09-06 06:26:23 +03:00
pub(super) head_block_id: RwLock<Option<BlockId>>,
2022-09-09 06:53:16 +03:00
pub(super) open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
2022-08-24 02:13:56 +03:00
}
2022-05-05 22:07:09 +03:00
impl Web3Connection {
/// Connect to a web3 rpc
// TODO: have this take a builder (which will have channels attached)
2022-06-14 08:43:28 +03:00
#[allow(clippy::too_many_arguments)]
pub async fn spawn(
2022-08-10 08:56:09 +03:00
name: String,
2022-11-14 00:05:37 +03:00
display_name: Option<String>,
chain_id: u64,
db_conn: Option<DatabaseConnection>,
2022-05-05 22:07:09 +03:00
url_str: String,
2022-05-15 04:51:24 +03:00
// optional because this is only used for http providers. websocket providers don't use it
http_client: Option<reqwest::Client>,
2022-06-29 22:15:05 +03:00
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
// TODO: have a builder struct for this.
hard_limit: Option<(u64, RedisPool)>,
2022-05-05 22:07:09 +03:00
// TODO: think more about this type
soft_limit: u32,
block_data_limit: Option<u64>,
block_map: BlockHashesCache,
2022-07-22 08:11:26 +03:00
block_sender: Option<flume::Sender<BlockAndRpc>>,
2022-06-14 08:43:28 +03:00
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool,
2022-08-08 22:57:54 +03:00
weight: u32,
2022-09-09 06:53:16 +03:00
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
2022-06-14 08:43:28 +03:00
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
2022-09-15 20:57:24 +03:00
let hard_limit = hard_limit.map(|(hard_rate_limit, redis_pool)| {
// TODO: is cache size 1 okay? i think we need
RedisRateLimiter::new(
2022-08-06 08:46:33 +03:00
"web3_proxy",
2022-11-12 09:11:58 +03:00
&format!("{}:{}", chain_id, name),
2022-05-22 02:34:05 +03:00
hard_rate_limit,
2022-08-30 23:01:42 +03:00
60.0,
2022-09-15 20:57:24 +03:00
redis_pool,
2022-05-22 02:34:05 +03:00
)
});
2022-05-05 22:07:09 +03:00
2022-11-12 09:11:58 +03:00
// turn weight 0 into 100% and weight 100 into 0%
let weight = (100 - weight) as f64 / 100.0;
let block_data_limit = block_data_limit.unwrap_or_default().into();
2022-07-09 07:25:59 +03:00
let new_connection = Self {
2022-08-10 08:56:09 +03:00
name,
2022-11-14 00:05:37 +03:00
display_name,
2022-09-14 05:11:48 +03:00
http_client,
2022-11-12 09:11:58 +03:00
url: url_str,
2022-05-15 04:51:24 +03:00
active_requests: 0.into(),
frontend_requests: 0.into(),
internal_requests: 0.into(),
2022-09-14 04:43:09 +03:00
provider: AsyncRwLock::new(None),
2022-05-22 02:34:05 +03:00
hard_limit,
2022-05-05 22:07:09 +03:00
soft_limit,
block_data_limit,
head_block_id: RwLock::new(Default::default()),
2022-08-08 22:57:54 +03:00
weight,
2022-09-09 06:53:16 +03:00
open_request_handle_metrics,
2022-05-12 21:49:57 +03:00
};
2022-07-09 07:25:59 +03:00
let new_connection = Arc::new(new_connection);
2022-05-12 21:49:57 +03:00
2022-09-14 04:43:09 +03:00
// connect to the server (with retries)
new_connection
2022-09-14 05:11:48 +03:00
.retrying_reconnect(block_sender.as_ref(), false)
2022-09-14 04:43:09 +03:00
.await?;
let authorization = Arc::new(Authorization::internal(db_conn)?);
2022-05-15 04:51:24 +03:00
// check the server's chain_id here
2022-07-19 04:31:12 +03:00
// TODO: move this outside the `new` function and into a `start` function or something. that way we can do retries from there
2022-05-15 04:51:24 +03:00
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
// TODO: what should the timeout be?
let found_chain_id: Result<U64, _> = new_connection
.wait_for_request_handle(&authorization, Duration::from_secs(30))
2022-08-06 08:26:43 +03:00
.await?
.request(
"eth_chainId",
&json!(Option::None::<()>),
2022-11-12 11:24:32 +03:00
Level::Error.into(),
)
2022-05-13 00:20:33 +03:00
.await;
match found_chain_id {
Ok(found_chain_id) => {
2022-06-29 22:15:05 +03:00
// TODO: there has to be a cleaner way to do this
if chain_id != found_chain_id.as_u64() {
2022-05-13 00:20:33 +03:00
return Err(anyhow::anyhow!(
2022-10-20 09:54:45 +03:00
"incorrect chain id! Config has {}, but RPC has {}",
2022-05-13 00:20:33 +03:00
chain_id,
found_chain_id
)
2022-07-19 09:41:04 +03:00
.context(format!("failed @ {}", new_connection)));
2022-05-13 00:20:33 +03:00
}
}
Err(e) => {
2022-07-19 09:41:04 +03:00
let e = anyhow::Error::from(e).context(format!("failed @ {}", new_connection));
2022-05-13 00:20:33 +03:00
return Err(e);
}
2022-05-12 21:49:57 +03:00
}
// TODO: should we do this even if block_sender is None? then we would know limits on private relays
let check_block_limit_needed = (new_connection
.block_data_limit
.load(atomic::Ordering::Acquire)
== 0)
&& block_sender.is_some();
2022-07-19 04:31:12 +03:00
// subscribe to new blocks and new transactions
// TODO: make transaction subscription optional (just pass None for tx_id_sender)
2022-06-14 08:43:28 +03:00
let handle = {
2022-07-09 07:25:59 +03:00
let new_connection = new_connection.clone();
let authorization = authorization.clone();
2022-06-14 08:43:28 +03:00
tokio::spawn(async move {
2022-07-09 07:25:59 +03:00
new_connection
2022-08-26 20:26:17 +03:00
.subscribe(
&authorization,
2022-08-26 20:26:17 +03:00
http_interval_sender,
block_map,
block_sender,
tx_id_sender,
reconnect,
)
2022-06-14 08:43:28 +03:00
.await
})
};
2022-07-19 04:31:12 +03:00
// we could take "archive" as a parameter, but we would want a safety check on it regardless
2022-07-19 07:22:02 +03:00
// check common archive thresholds
2022-07-23 03:19:13 +03:00
// TODO: would be great if rpcs exposed this
// TODO: move this to a helper function so we can recheck on errors or as the chain grows
2022-08-03 03:27:26 +03:00
// TODO: move this to a helper function that checks
if check_block_limit_needed {
// TODO: make sure the server isn't still syncing
2022-07-19 04:31:12 +03:00
// TODO: don't sleep. wait for new heads subscription instead
// TODO: i think instead of atomics, we could maybe use a watch channel
sleep(Duration::from_millis(250)).await;
2022-07-19 04:31:12 +03:00
new_connection
.check_block_data_limit(&authorization)
.await?;
2022-08-27 05:13:36 +03:00
}
2022-07-19 04:31:12 +03:00
2022-08-27 05:13:36 +03:00
Ok((new_connection, handle))
}
2022-07-19 04:31:12 +03:00
async fn check_block_data_limit(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
) -> anyhow::Result<Option<u64>> {
2022-08-27 05:13:36 +03:00
let mut limit = None;
// TODO: binary search between 90k and max?
// TODO: start at 0 or 1
for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] {
2022-09-06 06:26:23 +03:00
let mut head_block_id = self.head_block_id.read().clone();
2022-09-14 05:11:48 +03:00
// TODO: subscribe to a channel instead of polling. subscribe to http_interval_sender?
2022-09-06 06:26:23 +03:00
while head_block_id.is_none() {
2022-11-12 11:24:32 +03:00
warn!("no head block yet. retrying rpc {}", self);
2022-07-19 04:31:12 +03:00
// TODO: sleep for the block time, or maybe subscribe to a channel instead of this simple pull
2022-09-14 05:11:48 +03:00
sleep(Duration::from_secs(13)).await;
2022-09-06 06:26:23 +03:00
head_block_id = self.head_block_id.read().clone();
2022-08-27 05:13:36 +03:00
}
2022-09-06 06:26:23 +03:00
let head_block_num = head_block_id.expect("is_none was checked above").num;
2022-08-27 05:13:36 +03:00
// TODO: subtract 1 from block_data_limit for safety?
let maybe_archive_block = head_block_num.saturating_sub((block_data_limit).into());
2022-08-27 05:13:36 +03:00
2022-09-06 06:26:23 +03:00
// TODO: wait for the handle BEFORE we check the current block number. it might be delayed too!
// TODO: what should the request be?
2022-08-27 05:13:36 +03:00
let archive_result: Result<Bytes, _> = self
.wait_for_request_handle(authorization, Duration::from_secs(30))
2022-08-27 05:13:36 +03:00
.await?
.request(
"eth_getCode",
&json!((
2022-08-27 05:13:36 +03:00
"0xdead00000000000000000000000000000000beef",
maybe_archive_block,
)),
// error here are expected, so keep the level low
Level::Trace.into(),
2022-08-27 05:13:36 +03:00
)
.await;
trace!(
"archive_result on {} for {}: {:?}",
block_data_limit,
self.name,
archive_result
);
2022-08-27 05:13:36 +03:00
if archive_result.is_err() {
2022-08-27 05:13:36 +03:00
break;
2022-07-19 04:31:12 +03:00
}
limit = Some(block_data_limit);
2022-07-19 04:31:12 +03:00
}
2022-08-27 05:13:36 +03:00
if let Some(limit) = limit {
self.block_data_limit
.store(limit, atomic::Ordering::Release);
}
2022-07-19 04:31:12 +03:00
debug!("block data limit on {}: {:?}", self.name, limit);
2022-08-27 05:13:36 +03:00
Ok(limit)
2022-07-09 07:25:59 +03:00
}
2022-11-04 01:16:27 +03:00
/// TODO: this might be too simple. different nodes can prune differently. its possible we will have a block range
2022-07-25 03:27:00 +03:00
pub fn block_data_limit(&self) -> U64 {
2022-11-04 01:16:27 +03:00
self.block_data_limit.load(atomic::Ordering::Relaxed).into()
2022-07-19 04:31:12 +03:00
}
2022-07-22 22:30:39 +03:00
pub fn has_block_data(&self, needed_block_num: &U64) -> bool {
2022-11-22 23:23:08 +03:00
let head_block_num = match self.head_block_id.read().clone() {
2022-09-06 06:26:23 +03:00
None => return false,
Some(x) => x.num,
};
2022-07-19 04:31:12 +03:00
2022-11-20 01:05:51 +03:00
// this rpc doesn't have that block yet. still syncing
2022-11-22 23:23:08 +03:00
if needed_block_num > &head_block_num {
2022-11-04 01:16:27 +03:00
return false;
}
// if this is a pruning node, we might not actually have the block
let block_data_limit: U64 = self.block_data_limit();
2022-11-22 23:23:08 +03:00
let oldest_block_num = head_block_num.saturating_sub(block_data_limit);
2022-07-19 04:31:12 +03:00
2022-11-04 01:16:27 +03:00
needed_block_num >= &oldest_block_num
2022-05-05 22:07:09 +03:00
}
2022-09-14 04:43:09 +03:00
/// reconnect to the provider. errors are retried forever with exponential backoff with jitter.
2022-10-25 07:12:24 +03:00
/// We use the "Decorrelated" jitter from <https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/>
2022-09-14 04:43:09 +03:00
/// TODO: maybe it would be better to use "Full Jitter". The "Full Jitter" approach uses less work, but slightly more time.
pub async fn retrying_reconnect(
self: &Arc<Self>,
block_sender: Option<&flume::Sender<BlockAndRpc>>,
2022-09-14 05:11:48 +03:00
initial_sleep: bool,
2022-09-14 04:43:09 +03:00
) -> anyhow::Result<()> {
// there are several crates that have retry helpers, but they all seem more complex than necessary
2022-11-14 21:24:52 +03:00
// TODO: move this backoff logic into a helper function so we can use it when doing database locking
2022-09-14 04:43:09 +03:00
let base_ms = 500;
let cap_ms = 30_000;
let range_multiplier = 3;
// sleep once before the initial retry attempt
2022-09-14 05:11:48 +03:00
// TODO: now that we use this method for our initial connection, do we still want this sleep?
let mut sleep_ms = if initial_sleep {
let first_sleep_ms = min(
cap_ms,
2022-11-12 09:11:58 +03:00
thread_fast_rng().gen_range(base_ms..(base_ms * range_multiplier)),
2022-09-14 05:11:48 +03:00
);
let reconnect_in = Duration::from_millis(first_sleep_ms);
2022-11-12 11:24:32 +03:00
warn!("Reconnect to {} in {}ms", self, reconnect_in.as_millis());
sleep(reconnect_in).await;
2022-09-14 05:11:48 +03:00
first_sleep_ms
} else {
base_ms
};
2022-09-14 04:43:09 +03:00
// retry until we succeed
while let Err(err) = self.reconnect(block_sender).await {
2022-11-12 09:11:58 +03:00
// thread_rng is crytographically secure. we don't need that, but we don't need this super efficient so its fine
2022-09-14 04:43:09 +03:00
sleep_ms = min(
cap_ms,
2022-11-12 09:11:58 +03:00
thread_fast_rng().gen_range(base_ms..(sleep_ms * range_multiplier)),
2022-09-14 04:43:09 +03:00
);
let retry_in = Duration::from_millis(sleep_ms);
2022-11-12 11:24:32 +03:00
warn!(
"Failed reconnect to {}! Retry in {}ms. err={:?}",
self,
retry_in.as_millis(),
err,
);
2022-09-14 04:43:09 +03:00
sleep(retry_in).await;
}
Ok(())
}
/// reconnect a websocket provider
pub async fn reconnect(
self: &Arc<Self>,
// websocket doesn't need the http client
2022-09-14 04:43:09 +03:00
block_sender: Option<&flume::Sender<BlockAndRpc>>,
) -> anyhow::Result<()> {
// since this lock is held open over an await, we use tokio's locking
2022-07-19 04:31:12 +03:00
// TODO: timeout on this lock. if its slow, something is wrong
let mut provider_option = self.provider.write().await;
if let Some(provider) = &*provider_option {
match provider.as_ref() {
Web3Provider::Http(_) => {
// http clients don't need to do anything for reconnecting
// they *do* need to run this function to setup the first
return Ok(());
}
Web3Provider::Ws(_) => {}
Web3Provider::Mock => return Ok(()),
}
2022-11-12 11:24:32 +03:00
info!("Reconnecting to {}", self);
2022-09-13 02:00:10 +03:00
// disconnect the current provider
*provider_option = None;
// reset sync status
{
let mut head_block_id = self.head_block_id.write();
*head_block_id = None;
}
// tell the block subscriber that we don't have any blocks
if let Some(block_sender) = &block_sender {
block_sender
.send_async((None, self.clone()))
.await
.context("block_sender during connect")?;
}
} else {
2022-11-12 11:24:32 +03:00
info!("connecting to {}", self);
2022-06-16 05:53:37 +03:00
}
2022-09-14 04:43:09 +03:00
// TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again!
2022-09-14 05:11:48 +03:00
let new_provider = Web3Provider::from_str(&self.url, self.http_client.clone()).await?;
2022-09-14 04:43:09 +03:00
*provider_option = Some(Arc::new(new_provider));
2022-09-14 04:43:09 +03:00
2022-11-12 11:24:32 +03:00
info!("successfully connected to {}", self);
2022-09-14 04:43:09 +03:00
Ok(())
}
2022-05-06 09:07:01 +03:00
#[inline]
2022-05-05 22:07:09 +03:00
pub fn active_requests(&self) -> u32 {
self.active_requests.load(atomic::Ordering::Acquire)
}
2022-06-04 01:22:55 +03:00
#[inline]
pub async fn has_provider(&self) -> bool {
self.provider.read().await.is_some()
}
2022-08-26 20:26:17 +03:00
async fn send_head_block_result(
self: &Arc<Self>,
new_head_block: Result<Option<ArcBlock>, ProviderError>,
2022-07-22 08:11:26 +03:00
block_sender: &flume::Sender<BlockAndRpc>,
block_map: BlockHashesCache,
2022-05-30 07:30:13 +03:00
) -> anyhow::Result<()> {
2022-08-26 20:26:17 +03:00
match new_head_block {
Ok(None) => {
2022-09-13 02:00:10 +03:00
// TODO: i think this should clear the local block and then update over the block sender
2022-11-06 23:52:11 +03:00
warn!("unsynced server {}", self);
{
let mut head_block_id = self.head_block_id.write();
*head_block_id = None;
}
block_sender
.send_async((None, self.clone()))
.await
.context("clearing block_sender")?;
}
2022-11-06 23:52:11 +03:00
Ok(Some(new_head_block)) => {
2022-08-26 20:26:17 +03:00
// TODO: is unwrap_or_default ok? we might have an empty block
let new_hash = new_head_block.hash.unwrap_or_default();
// if we already have this block saved, set new_head_block to that arc. otherwise store this copy
2022-11-06 23:52:11 +03:00
let new_head_block = block_map
2022-11-03 02:14:16 +03:00
.get_with(new_hash, async move { new_head_block })
.await;
2022-08-30 23:01:42 +03:00
2022-08-26 20:26:17 +03:00
let new_num = new_head_block.number.unwrap_or_default();
// save the block so we don't send the same one multiple times
// also save so that archive checks can know how far back to query
{
2022-09-06 06:26:23 +03:00
let mut head_block_id = self.head_block_id.write();
if head_block_id.is_none() {
*head_block_id = Some(BlockId {
hash: new_hash,
num: new_num,
});
} else {
head_block_id.as_mut().map(|x| {
x.hash = new_hash;
x.num = new_num;
x
});
}
2022-07-19 04:31:12 +03:00
}
2022-11-14 00:25:58 +03:00
// send the block off to be saved
2022-06-16 05:53:37 +03:00
block_sender
.send_async((Some(new_head_block), self.clone()))
2022-06-16 05:53:37 +03:00
.await
.context("block_sender")?;
}
2022-11-06 23:52:11 +03:00
Err(err) => {
2022-11-12 11:24:32 +03:00
warn!("unable to get block from {}. err={:?}", self, err);
2022-11-06 23:52:11 +03:00
{
let mut head_block_id = self.head_block_id.write();
*head_block_id = None;
}
2022-08-07 09:48:57 +03:00
// send an empty block to take this server out of rotation
2022-11-06 23:52:11 +03:00
// TODO: this is NOT working!!!!
2022-08-07 09:48:57 +03:00
block_sender
.send_async((None, self.clone()))
2022-08-07 09:48:57 +03:00
.await
.context("block_sender")?;
}
}
2022-05-30 07:30:13 +03:00
Ok(())
}
2022-09-14 04:43:09 +03:00
/// subscribe to blocks and transactions with automatic reconnects
2022-06-14 08:43:28 +03:00
async fn subscribe(
self: Arc<Self>,
authorization: &Arc<Authorization>,
2022-06-29 22:15:05 +03:00
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
block_map: BlockHashesCache,
2022-07-22 08:11:26 +03:00
block_sender: Option<flume::Sender<BlockAndRpc>>,
2022-06-14 08:43:28 +03:00
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool,
) -> anyhow::Result<()> {
2022-06-16 05:53:37 +03:00
loop {
2022-11-12 11:24:32 +03:00
debug!("subscribing to {}", self);
2022-07-19 07:24:16 +03:00
let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe());
2022-06-29 22:15:05 +03:00
2022-06-16 05:53:37 +03:00
let mut futures = vec![];
if let Some(block_sender) = &block_sender {
2022-08-26 20:26:17 +03:00
let f = self.clone().subscribe_new_heads(
authorization.clone(),
2022-08-26 20:26:17 +03:00
http_interval_receiver,
block_sender.clone(),
block_map.clone(),
);
2022-06-16 05:53:37 +03:00
futures.push(flatten_handle(tokio::spawn(f)));
}
if let Some(tx_id_sender) = &tx_id_sender {
let f = self
.clone()
.subscribe_pending_transactions(authorization.clone(), tx_id_sender.clone());
2022-06-16 05:53:37 +03:00
futures.push(flatten_handle(tokio::spawn(f)));
}
{
// TODO: move this into a proper function
let conn = self.clone();
// health check
let f = async move {
loop {
if let Some(provider) = conn.provider.read().await.as_ref() {
if provider.ready() {
2022-11-12 11:24:32 +03:00
// // trace!(rpc=%conn, "provider is ready");
} else {
2022-11-12 11:24:32 +03:00
warn!("rpc {} is NOT ready", conn);
// returning error will trigger a reconnect
// TODO: what if we just happened to have this check line up with another restart?
return Err(anyhow::anyhow!("provider is not ready"));
}
}
// TODO: how often?
// TODO: also check that the head block has changed recently
sleep(Duration::from_secs(10)).await;
}
};
futures.push(flatten_handle(tokio::spawn(f)));
}
2022-06-16 05:53:37 +03:00
match try_join_all(futures).await {
2022-09-14 04:43:09 +03:00
Ok(_) => {
// futures all exited without error. break instead of restarting subscriptions
break;
}
2022-06-16 05:53:37 +03:00
Err(err) => {
if reconnect {
2022-11-12 11:24:32 +03:00
warn!("{} subscription exited. err={:?}", self, err);
2022-06-16 05:53:37 +03:00
2022-09-14 05:11:48 +03:00
self.retrying_reconnect(block_sender.as_ref(), true).await?;
2022-06-16 05:53:37 +03:00
} else {
2022-11-12 11:24:32 +03:00
error!("{} subscription exited. err={:?}", self, err);
2022-06-16 05:53:37 +03:00
return Err(err);
}
2022-06-14 08:43:28 +03:00
}
}
}
2022-11-12 11:24:32 +03:00
info!("all subscriptions on {} completed", self);
Ok(())
}
2022-05-17 05:26:47 +03:00
/// Subscribe to new blocks. If `reconnect` is true, this runs forever.
async fn subscribe_new_heads(
2022-05-05 22:07:09 +03:00
self: Arc<Self>,
authorization: Arc<Authorization>,
2022-06-29 22:15:05 +03:00
http_interval_receiver: Option<broadcast::Receiver<()>>,
2022-07-22 08:11:26 +03:00
block_sender: flume::Sender<BlockAndRpc>,
block_map: BlockHashesCache,
2022-05-05 22:07:09 +03:00
) -> anyhow::Result<()> {
2022-11-12 11:24:32 +03:00
info!("watching new heads on {}", self);
// TODO: is a RwLock of an Option<Arc> the right thing here?
if let Some(provider) = self.provider.read().await.clone() {
match &*provider {
Web3Provider::Mock => unimplemented!(),
Web3Provider::Http(_provider) => {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
2022-06-29 22:15:05 +03:00
// TODO: try watch_blocks and fall back to this?
let mut http_interval_receiver = http_interval_receiver.unwrap();
2022-07-19 04:31:12 +03:00
let mut last_hash = H256::zero();
loop {
// TODO: what should the max_wait be?
2022-09-23 00:03:37 +03:00
match self
.wait_for_request_handle(&authorization, Duration::from_secs(30))
2022-09-23 00:03:37 +03:00
.await
{
2022-08-26 20:26:17 +03:00
Ok(active_request_handle) => {
2022-11-06 23:52:11 +03:00
let block: Result<Option<ArcBlock>, _> = active_request_handle
.request(
"eth_getBlockByNumber",
&json!(("latest", false)),
2022-11-12 11:24:32 +03:00
Level::Error.into(),
)
.await;
2022-08-26 20:26:17 +03:00
match block {
2022-11-06 23:52:11 +03:00
Ok(None) => {
warn!("no head block on {}", self);
self.send_head_block_result(
Ok(None),
&block_sender,
block_map.clone(),
)
.await?;
}
Ok(Some(block)) => {
2022-08-26 20:26:17 +03:00
// don't send repeat blocks
let new_hash = block
.hash
.expect("blocks here should always have hashes");
if new_hash != last_hash {
// new hash!
last_hash = new_hash;
self.send_head_block_result(
2022-11-06 23:52:11 +03:00
Ok(Some(block)),
2022-08-26 20:26:17 +03:00
&block_sender,
block_map.clone(),
)
2022-08-07 09:48:57 +03:00
.await?;
2022-08-26 20:26:17 +03:00
}
}
Err(err) => {
// we did not get a block back. something is up with the server. take it out of rotation
self.send_head_block_result(
Err(err),
&block_sender,
block_map.clone(),
)
.await?;
2022-07-19 07:31:30 +03:00
}
2022-05-16 22:15:40 +03:00
}
}
2022-07-09 01:14:45 +03:00
Err(err) => {
2022-11-12 11:24:32 +03:00
warn!("Internal error on latest block from {}. {:?}", self, err);
2022-11-06 23:52:11 +03:00
self.send_head_block_result(
Ok(None),
&block_sender,
block_map.clone(),
)
.await?;
2022-08-11 00:29:50 +03:00
// TODO: what should we do? sleep? extra time?
2022-05-16 22:15:40 +03:00
}
}
2022-07-19 04:31:12 +03:00
2022-08-26 20:26:17 +03:00
// wait for the next interval
2022-07-19 04:31:12 +03:00
// TODO: if error or rate limit, increase interval?
while let Err(err) = http_interval_receiver.recv().await {
match err {
broadcast::error::RecvError::Closed => {
2022-08-26 20:26:17 +03:00
// channel is closed! that's not good. bubble the error up
2022-07-19 04:31:12 +03:00
return Err(err.into());
}
broadcast::error::RecvError::Lagged(lagged) => {
2022-08-26 20:26:17 +03:00
// querying the block was delayed
// this can happen if tokio is very busy or waiting for requests limits took too long
2022-11-12 11:24:32 +03:00
warn!("http interval on {} lagging by {}!", self, lagged);
2022-07-19 04:31:12 +03:00
}
}
}
2022-11-12 11:24:32 +03:00
// // trace!(rpc=%self, "ok http interval");
2022-05-15 22:28:22 +03:00
}
}
Web3Provider::Ws(provider) => {
2022-08-30 23:01:42 +03:00
// todo: move subscribe_blocks onto the request handle?
2022-09-23 00:03:37 +03:00
let active_request_handle = self
.wait_for_request_handle(&authorization, Duration::from_secs(30))
2022-09-23 00:03:37 +03:00
.await;
let mut stream = provider.subscribe_blocks().await?;
drop(active_request_handle);
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block
2022-11-06 23:52:11 +03:00
// TODO: how does this get wrapped in an arc? does ethers handle that?
let block: Result<Option<ArcBlock>, _> = self
.wait_for_request_handle(&authorization, Duration::from_secs(30))
2022-08-06 08:26:43 +03:00
.await?
.request(
"eth_getBlockByNumber",
&json!(("latest", false)),
2022-11-12 11:24:32 +03:00
Level::Error.into(),
)
2022-11-06 23:52:11 +03:00
.await;
let mut last_hash = match &block {
Ok(Some(new_block)) => new_block
.hash
.expect("blocks should always have a hash here"),
_ => H256::zero(),
};
2022-08-26 20:26:17 +03:00
self.send_head_block_result(block, &block_sender, block_map.clone())
.await?;
2022-07-09 01:14:45 +03:00
while let Some(new_block) = stream.next().await {
// TODO: check the new block's hash to be sure we don't send dupes
let new_hash = new_block
.hash
.expect("blocks should always have a hash here");
if new_hash == last_hash {
// some rpcs like to give us duplicates. don't waste our time on them
continue;
} else {
last_hash = new_hash;
}
2022-08-26 20:26:17 +03:00
self.send_head_block_result(
Ok(Some(Arc::new(new_block))),
2022-08-26 20:26:17 +03:00
&block_sender,
block_map.clone(),
)
.await?;
2022-05-17 05:26:47 +03:00
}
2022-07-09 01:14:45 +03:00
2022-11-06 23:52:11 +03:00
// clear the head block. this might not be needed, but it won't hurt
self.send_head_block_result(Ok(None), &block_sender, block_map)
.await?;
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
2022-11-12 11:24:32 +03:00
warn!("new_heads subscription to {} ended", self);
return Err(anyhow::anyhow!("new_heads subscription ended"));
2022-05-05 22:07:09 +03:00
}
2022-05-17 05:26:47 +03:00
}
}
2022-05-17 03:56:56 +03:00
Ok(())
}
2022-05-17 05:26:47 +03:00
async fn subscribe_pending_transactions(
self: Arc<Self>,
authorization: Arc<Authorization>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
) -> anyhow::Result<()> {
2022-11-14 00:05:37 +03:00
info!("watching pending transactions on {}", self);
// TODO: is a RwLock of an Option<Arc> the right thing here?
if let Some(provider) = self.provider.read().await.clone() {
match &*provider {
Web3Provider::Mock => unimplemented!(),
2022-06-18 10:06:54 +03:00
Web3Provider::Http(provider) => {
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: what should this interval be? probably automatically set to some fraction of block time
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
2022-06-14 08:43:28 +03:00
let mut interval = interval(Duration::from_secs(60));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
2022-06-14 08:43:28 +03:00
// TODO: actually do something here
/*
match self.try_request_handle().await {
Ok(active_request_handle) => {
// TODO: check the filter
2022-07-22 22:30:39 +03:00
todo!("actually send a request");
}
Err(e) => {
warn!("Failed getting latest block from {}: {:?}", self, e);
}
}
2022-06-14 08:43:28 +03:00
*/
2022-07-07 06:22:09 +03:00
// wait for the interval
// TODO: if error or rate limit, increase interval?
interval.tick().await;
}
}
Web3Provider::Ws(provider) => {
2022-07-09 02:02:32 +03:00
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
2022-09-23 00:03:37 +03:00
let active_request_handle = self
.wait_for_request_handle(&authorization, Duration::from_secs(30))
2022-09-23 00:03:37 +03:00
.await;
let mut stream = provider.subscribe_pending_txs().await?;
drop(active_request_handle);
2022-07-08 21:27:06 +03:00
while let Some(pending_tx_id) = stream.next().await {
tx_id_sender
.send_async((pending_tx_id, self.clone()))
.await
.context("tx_id_sender")?;
2022-08-11 00:29:50 +03:00
// TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription
}
2022-07-08 21:27:06 +03:00
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
2022-11-14 00:05:37 +03:00
warn!("pending_transactions subscription ended on {}", self);
return Err(anyhow::anyhow!("pending_transactions subscription ended"));
}
2022-05-05 22:07:09 +03:00
}
}
Ok(())
}
2022-08-30 23:01:42 +03:00
/// be careful with this; it might wait forever!
2022-11-12 11:24:32 +03:00
pub async fn wait_for_request_handle(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
max_wait: Duration,
) -> anyhow::Result<OpenRequestHandle> {
let max_wait = Instant::now() + max_wait;
2022-05-16 22:15:40 +03:00
2022-06-17 01:23:41 +03:00
loop {
let x = self.try_request_handle(authorization).await;
2022-08-30 23:01:42 +03:00
2022-11-12 11:24:32 +03:00
// // trace!(?x, "try_request_handle");
2022-08-30 23:01:42 +03:00
match x {
2022-08-24 03:59:05 +03:00
Ok(OpenRequestResult::Handle(handle)) => return Ok(handle),
2022-08-24 03:14:49 +03:00
Ok(OpenRequestResult::RetryAt(retry_at)) => {
2022-08-07 09:48:57 +03:00
// TODO: emit a stat?
2022-11-12 11:24:32 +03:00
// // trace!(?retry_at);
if retry_at > max_wait {
// break now since we will wait past our maximum wait time
return Err(anyhow::anyhow!("timeout waiting for request handle"));
}
2022-08-07 09:48:57 +03:00
sleep_until(retry_at).await;
}
2022-08-30 23:01:42 +03:00
Ok(OpenRequestResult::RetryNever) => {
2022-08-24 03:59:05 +03:00
// TODO: when can this happen? log? emit a stat?
// TODO: subscribe to the head block on this
2022-08-07 09:48:57 +03:00
// TODO: sleep how long? maybe just error?
return Err(anyhow::anyhow!("unable to retry for request handle"));
}
2022-08-07 09:48:57 +03:00
Err(err) => return Err(err),
}
2022-05-05 22:07:09 +03:00
}
}
pub async fn try_request_handle(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
) -> anyhow::Result<OpenRequestResult> {
2022-06-04 01:22:55 +03:00
// check that we are connected
if !self.has_provider().await {
2022-08-07 09:48:57 +03:00
// TODO: emit a stat?
2022-09-23 00:03:37 +03:00
// TODO: wait until we have a provider?
2022-08-30 23:01:42 +03:00
return Ok(OpenRequestResult::RetryNever);
2022-06-04 01:22:55 +03:00
}
2022-05-05 22:07:09 +03:00
// check rate limits
2022-05-22 02:34:05 +03:00
if let Some(ratelimiter) = self.hard_limit.as_ref() {
2022-09-20 09:56:24 +03:00
// TODO: how should we know if we should set expire or not?
2022-09-24 06:59:21 +03:00
match ratelimiter.throttle().await? {
2022-09-15 20:57:24 +03:00
RedisRateLimitResult::Allowed(_) => {
2022-11-12 11:24:32 +03:00
// // trace!("rate limit succeeded")
2022-05-05 22:07:09 +03:00
}
2022-09-15 20:57:24 +03:00
RedisRateLimitResult::RetryAt(retry_at, _) => {
2022-05-05 22:07:09 +03:00
// rate limit failed
2022-05-22 02:34:05 +03:00
// save the smallest retry_after. if nothing succeeds, return an Err with retry_after in it
2022-05-05 22:07:09 +03:00
// TODO: use tracing better
2022-06-25 06:33:49 +03:00
// TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0?
2022-11-14 00:05:37 +03:00
warn!("Exhausted rate limit on {}. Retry at {:?}", self, retry_at);
2022-05-05 22:07:09 +03:00
return Ok(OpenRequestResult::RetryAt(retry_at));
2022-08-07 09:48:57 +03:00
}
2022-09-15 20:57:24 +03:00
RedisRateLimitResult::RetryNever => {
return Ok(OpenRequestResult::RetryNever);
2022-05-05 22:07:09 +03:00
}
}
};
let handle = OpenRequestHandle::new(authorization.clone(), self.clone());
2022-08-24 03:11:49 +03:00
2022-08-24 03:59:05 +03:00
Ok(OpenRequestResult::Handle(handle))
2022-08-24 03:11:49 +03:00
}
}
2022-08-07 09:48:57 +03:00
2022-08-24 03:11:49 +03:00
impl fmt::Debug for Web3Provider {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url
f.debug_struct("Web3Provider").finish_non_exhaustive()
}
}
impl Hash for Web3Connection {
fn hash<H: Hasher>(&self, state: &mut H) {
// TODO: is this enough?
self.name.hash(state);
}
}
2022-05-05 22:07:09 +03:00
impl Eq for Web3Connection {}
impl Ord for Web3Connection {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.name.cmp(&other.name)
2022-05-05 22:07:09 +03:00
}
}
impl PartialOrd for Web3Connection {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for Web3Connection {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
2022-05-05 22:07:09 +03:00
}
}
2022-08-10 08:56:09 +03:00
impl Serialize for Web3Connection {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
// 3 is the number of fields in the struct.
2022-11-14 00:05:37 +03:00
let mut state = serializer.serialize_struct("Web3Connection", 8)?;
2022-08-10 08:56:09 +03:00
2022-11-14 00:05:37 +03:00
// the url is excluded because it likely includes private information. just show the name that we use in keys
2022-08-10 08:56:09 +03:00
state.serialize_field("name", &self.name)?;
2022-11-14 00:05:37 +03:00
// a longer name for display to users
state.serialize_field("display_name", &self.display_name)?;
2022-08-10 08:56:09 +03:00
let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed);
if block_data_limit == u64::MAX {
2022-09-07 06:54:16 +03:00
state.serialize_field("block_data_limit", &None::<()>)?;
} else {
state.serialize_field("block_data_limit", &block_data_limit)?;
}
2022-08-10 08:56:09 +03:00
state.serialize_field("weight", &self.weight)?;
2022-08-10 08:56:09 +03:00
state.serialize_field("soft_limit", &self.soft_limit)?;
state.serialize_field(
"active_requests",
&self.active_requests.load(atomic::Ordering::Relaxed),
)?;
state.serialize_field(
"total_requests",
&self.frontend_requests.load(atomic::Ordering::Relaxed),
)?;
let head_block_id = &*self.head_block_id.read();
state.serialize_field("head_block_id", head_block_id)?;
2022-08-10 08:56:09 +03:00
state.end()
}
}
impl fmt::Debug for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut f = f.debug_struct("Web3Connection");
f.field("name", &self.name);
2022-08-10 08:56:09 +03:00
let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed);
if block_data_limit == u64::MAX {
2022-09-07 06:54:16 +03:00
f.field("blocks", &"all");
2022-08-10 08:56:09 +03:00
} else {
2022-09-07 06:54:16 +03:00
f.field("blocks", &block_data_limit);
2022-08-10 08:56:09 +03:00
}
f.finish_non_exhaustive()
}
}
impl fmt::Display for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: filter basic auth and api keys
write!(f, "{}", &self.name)
2022-08-10 08:56:09 +03:00
}
}
2022-11-22 23:23:08 +03:00
mod tests {
use super::*;
#[test]
fn test_archive_node_has_block_data() {
let head_block = BlockId {
hash: H256::random(),
num: 1_000_000.into(),
};
2022-11-22 23:44:23 +03:00
let block_data_limit = u64::MAX;
2022-11-22 23:23:08 +03:00
let metrics = OpenRequestHandleMetrics::default();
let x = Web3Connection {
name: "name".to_string(),
display_name: None,
url: "ws://example.com".to_string(),
http_client: None,
active_requests: 0.into(),
frontend_requests: 0.into(),
internal_requests: 0.into(),
2022-11-22 23:23:08 +03:00
provider: AsyncRwLock::new(None),
hard_limit: None,
soft_limit: 1_000,
2022-11-22 23:44:23 +03:00
block_data_limit: block_data_limit.into(),
2022-11-22 23:23:08 +03:00
weight: 100.0,
head_block_id: RwLock::new(Some(head_block.clone())),
open_request_handle_metrics: Arc::new(metrics),
};
assert!(x.has_block_data(&0.into()));
assert!(x.has_block_data(&1.into()));
assert!(x.has_block_data(&head_block.num));
assert!(!x.has_block_data(&(head_block.num + 1)));
assert!(!x.has_block_data(&(head_block.num + 1000)));
}
2022-11-22 23:44:23 +03:00
#[test]
fn test_pruned_node_has_block_data() {
let head_block = BlockId {
hash: H256::random(),
num: 1_000_000.into(),
};
let block_data_limit = 64;
let metrics = OpenRequestHandleMetrics::default();
// TODO: this is getting long. have a `impl Default`
2022-11-22 23:44:23 +03:00
let x = Web3Connection {
name: "name".to_string(),
display_name: None,
url: "ws://example.com".to_string(),
http_client: None,
active_requests: 0.into(),
frontend_requests: 0.into(),
internal_requests: 0.into(),
2022-11-22 23:44:23 +03:00
provider: AsyncRwLock::new(None),
hard_limit: None,
soft_limit: 1_000,
block_data_limit: block_data_limit.into(),
weight: 100.0,
head_block_id: RwLock::new(Some(head_block.clone())),
open_request_handle_metrics: Arc::new(metrics),
};
assert!(!x.has_block_data(&0.into()));
assert!(!x.has_block_data(&1.into()));
assert!(!x.has_block_data(&(head_block.num - block_data_limit - 1)));
assert!(x.has_block_data(&(head_block.num - block_data_limit)));
assert!(x.has_block_data(&head_block.num));
assert!(!x.has_block_data(&(head_block.num + 1)));
assert!(!x.has_block_data(&(head_block.num + 1000)));
}
2022-11-22 23:23:08 +03:00
}