2022-08-27 02:44:25 +03:00
|
|
|
///! Rate-limited communication with a web3 provider.
|
2022-12-03 08:31:03 +03:00
|
|
|
use super::blockchain::{ArcBlock, BlockHashesCache, SavedBlock};
|
2022-08-24 02:56:47 +03:00
|
|
|
use super::provider::Web3Provider;
|
2023-02-06 05:16:09 +03:00
|
|
|
use super::request::{OpenRequestHandle, OpenRequestResult};
|
2022-08-24 02:56:47 +03:00
|
|
|
use crate::app::{flatten_handle, AnyhowJoinHandle};
|
|
|
|
use crate::config::BlockAndRpc;
|
2022-11-08 22:58:11 +03:00
|
|
|
use crate::frontend::authorization::Authorization;
|
2022-06-16 05:53:37 +03:00
|
|
|
use anyhow::Context;
|
2022-11-07 00:05:03 +03:00
|
|
|
use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
|
2022-12-06 01:38:54 +03:00
|
|
|
use ethers::types::U256;
|
2022-06-16 05:53:37 +03:00
|
|
|
use futures::future::try_join_all;
|
2022-05-05 22:07:09 +03:00
|
|
|
use futures::StreamExt;
|
2023-02-06 20:55:27 +03:00
|
|
|
use hdrhistogram::Histogram;
|
2022-11-25 03:45:13 +03:00
|
|
|
use log::{debug, error, info, trace, warn, Level};
|
2022-11-14 21:24:52 +03:00
|
|
|
use migration::sea_orm::DatabaseConnection;
|
2022-08-10 05:37:34 +03:00
|
|
|
use parking_lot::RwLock;
|
2022-09-15 20:57:24 +03:00
|
|
|
use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter};
|
2022-05-21 01:16:15 +03:00
|
|
|
use serde::ser::{SerializeStruct, Serializer};
|
|
|
|
use serde::Serialize;
|
2022-09-24 05:47:44 +03:00
|
|
|
use serde_json::json;
|
2022-09-14 04:43:09 +03:00
|
|
|
use std::cmp::min;
|
2022-05-05 22:07:09 +03:00
|
|
|
use std::fmt;
|
2022-06-14 07:04:14 +03:00
|
|
|
use std::hash::{Hash, Hasher};
|
2022-07-19 04:31:12 +03:00
|
|
|
use std::sync::atomic::{self, AtomicU32, AtomicU64};
|
2022-05-05 22:07:09 +03:00
|
|
|
use std::{cmp::Ordering, sync::Arc};
|
2022-11-12 09:11:58 +03:00
|
|
|
use thread_fast_rng::rand::Rng;
|
|
|
|
use thread_fast_rng::thread_fast_rng;
|
2023-01-25 07:44:50 +03:00
|
|
|
use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock};
|
2023-02-06 05:16:09 +03:00
|
|
|
use tokio::time::{sleep, sleep_until, timeout, Duration, Instant};
|
2022-12-06 00:13:36 +03:00
|
|
|
|
2022-12-28 05:17:11 +03:00
|
|
|
// TODO: maybe provider state should have the block data limit in it. but it is inside an async lock and we can't Serialize then
|
2022-12-06 03:06:28 +03:00
|
|
|
#[derive(Clone, Debug)]
|
2022-12-06 00:13:36 +03:00
|
|
|
pub enum ProviderState {
|
|
|
|
None,
|
2023-01-25 07:44:50 +03:00
|
|
|
Connecting(Arc<Web3Provider>),
|
|
|
|
Connected(Arc<Web3Provider>),
|
2022-12-06 00:13:36 +03:00
|
|
|
}
|
|
|
|
|
2023-02-03 01:48:23 +03:00
|
|
|
impl Default for ProviderState {
|
|
|
|
fn default() -> Self {
|
|
|
|
Self::None
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-12-06 00:13:36 +03:00
|
|
|
impl ProviderState {
|
|
|
|
pub async fn provider(&self, allow_not_ready: bool) -> Option<&Arc<Web3Provider>> {
|
|
|
|
match self {
|
|
|
|
ProviderState::None => None,
|
2023-01-25 07:44:50 +03:00
|
|
|
ProviderState::Connecting(x) => {
|
2022-12-06 00:13:36 +03:00
|
|
|
if allow_not_ready {
|
|
|
|
Some(x)
|
|
|
|
} else {
|
|
|
|
// TODO: do a ready check here?
|
|
|
|
None
|
|
|
|
}
|
|
|
|
}
|
2023-01-25 07:44:50 +03:00
|
|
|
ProviderState::Connected(x) => {
|
2022-12-06 00:13:36 +03:00
|
|
|
if x.ready() {
|
|
|
|
Some(x)
|
|
|
|
} else {
|
|
|
|
None
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-06-14 08:43:28 +03:00
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
pub struct Web3RpcLatencies {
|
|
|
|
/// Traack how far behind the fastest node we are
|
2023-02-11 07:24:20 +03:00
|
|
|
pub new_head: Histogram<u64>,
|
2023-02-06 20:55:27 +03:00
|
|
|
/// exponentially weighted moving average of how far behind the fastest node we are
|
2023-02-11 07:24:20 +03:00
|
|
|
pub new_head_ewma: u32,
|
2023-02-06 20:55:27 +03:00
|
|
|
/// Track how long an rpc call takes on average
|
2023-02-11 07:24:20 +03:00
|
|
|
pub request: Histogram<u64>,
|
2023-02-06 20:55:27 +03:00
|
|
|
/// exponentially weighted moving average of how far behind the fastest node we are
|
2023-02-11 07:24:20 +03:00
|
|
|
pub request_ewma: u32,
|
2023-02-06 20:55:27 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for Web3RpcLatencies {
|
|
|
|
fn default() -> Self {
|
|
|
|
Self {
|
|
|
|
new_head: Histogram::new(3).unwrap(),
|
|
|
|
new_head_ewma: 0,
|
|
|
|
request: Histogram::new(3).unwrap(),
|
|
|
|
request_ewma: 0,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-09-15 20:57:24 +03:00
|
|
|
/// An active connection to a Web3 RPC server like geth or erigon.
|
2023-02-03 01:48:23 +03:00
|
|
|
#[derive(Default)]
|
2023-02-06 20:55:27 +03:00
|
|
|
pub struct Web3Rpc {
|
2022-08-24 03:11:49 +03:00
|
|
|
pub name: String,
|
2022-11-14 00:05:37 +03:00
|
|
|
pub display_name: Option<String>,
|
2023-01-03 22:54:24 +03:00
|
|
|
pub db_conn: Option<DatabaseConnection>,
|
2022-08-24 02:13:56 +03:00
|
|
|
/// TODO: can we get this from the provider? do we even need it?
|
2022-11-23 01:45:22 +03:00
|
|
|
pub(super) url: String,
|
2022-09-15 20:57:24 +03:00
|
|
|
/// Some connections use an http_client. we keep a clone for reconnecting
|
2022-11-23 01:45:22 +03:00
|
|
|
pub(super) http_client: Option<reqwest::Client>,
|
2022-08-24 02:13:56 +03:00
|
|
|
/// keep track of currently open requests. We sort on this
|
2022-08-24 03:11:49 +03:00
|
|
|
pub(super) active_requests: AtomicU32,
|
2022-11-25 03:45:13 +03:00
|
|
|
/// keep track of total requests from the frontend
|
|
|
|
pub(super) frontend_requests: AtomicU64,
|
|
|
|
/// keep track of total requests from web3-proxy itself
|
|
|
|
pub(super) internal_requests: AtomicU64,
|
2022-08-24 02:13:56 +03:00
|
|
|
/// provider is in a RwLock so that we can replace it if re-connecting
|
|
|
|
/// it is an async lock because we hold it open across awaits
|
2022-12-06 00:13:36 +03:00
|
|
|
pub(super) provider_state: AsyncRwLock<ProviderState>,
|
2023-01-25 07:44:50 +03:00
|
|
|
/// keep track of hard limits
|
|
|
|
pub(super) hard_limit_until: Option<watch::Sender<Instant>>,
|
2022-08-24 02:13:56 +03:00
|
|
|
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
|
2022-09-15 20:57:24 +03:00
|
|
|
/// We do not use the deferred rate limiter because going over limits would cause errors
|
2022-11-23 01:45:22 +03:00
|
|
|
pub(super) hard_limit: Option<RedisRateLimiter>,
|
2022-08-24 02:13:56 +03:00
|
|
|
/// used for load balancing to the least loaded server
|
2022-08-26 20:26:17 +03:00
|
|
|
pub(super) soft_limit: u32,
|
2022-12-06 00:13:36 +03:00
|
|
|
/// use web3 queries to find the block data limit for archive/pruned nodes
|
|
|
|
pub(super) automatic_block_limit: bool,
|
2023-01-19 13:13:00 +03:00
|
|
|
/// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs
|
|
|
|
pub(super) backup: bool,
|
2022-09-06 23:12:45 +03:00
|
|
|
/// TODO: have an enum for this so that "no limit" prints pretty?
|
2022-11-23 01:45:22 +03:00
|
|
|
pub(super) block_data_limit: AtomicU64,
|
2023-01-04 09:37:51 +03:00
|
|
|
/// Lower tiers are higher priority when sending requests
|
|
|
|
pub(super) tier: u64,
|
2023-02-06 05:16:09 +03:00
|
|
|
/// TODO: change this to a watch channel so that http providers can subscribe and take action on change
|
2022-12-03 08:31:03 +03:00
|
|
|
pub(super) head_block: RwLock<Option<SavedBlock>>,
|
2023-02-06 20:55:27 +03:00
|
|
|
/// Track how fast this RPC is
|
|
|
|
pub(super) latency: Web3RpcLatencies,
|
2022-08-24 02:13:56 +03:00
|
|
|
}
|
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
impl Web3Rpc {
|
2022-06-14 07:04:14 +03:00
|
|
|
/// Connect to a web3 rpc
|
2023-01-04 09:37:51 +03:00
|
|
|
// TODO: have this take a builder (which will have channels attached). or maybe just take the config and give the config public fields
|
2022-06-14 08:43:28 +03:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
2022-06-14 07:04:14 +03:00
|
|
|
pub async fn spawn(
|
2022-08-10 08:56:09 +03:00
|
|
|
name: String,
|
2022-11-14 00:05:37 +03:00
|
|
|
display_name: Option<String>,
|
2022-07-19 07:21:32 +03:00
|
|
|
chain_id: u64,
|
2022-11-08 22:58:11 +03:00
|
|
|
db_conn: Option<DatabaseConnection>,
|
2022-05-05 22:07:09 +03:00
|
|
|
url_str: String,
|
2022-05-15 04:51:24 +03:00
|
|
|
// optional because this is only used for http providers. websocket providers don't use it
|
2022-07-19 07:21:32 +03:00
|
|
|
http_client: Option<reqwest::Client>,
|
2022-06-29 22:15:05 +03:00
|
|
|
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
|
2022-08-16 01:50:56 +03:00
|
|
|
// TODO: have a builder struct for this.
|
|
|
|
hard_limit: Option<(u64, RedisPool)>,
|
2022-05-05 22:07:09 +03:00
|
|
|
// TODO: think more about this type
|
|
|
|
soft_limit: u32,
|
2023-01-19 13:13:00 +03:00
|
|
|
backup: bool,
|
2022-11-25 03:45:13 +03:00
|
|
|
block_data_limit: Option<u64>,
|
2022-09-17 05:17:20 +03:00
|
|
|
block_map: BlockHashesCache,
|
2022-07-22 08:11:26 +03:00
|
|
|
block_sender: Option<flume::Sender<BlockAndRpc>>,
|
2022-06-14 08:43:28 +03:00
|
|
|
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
|
|
|
|
reconnect: bool,
|
2023-01-04 09:37:51 +03:00
|
|
|
tier: u64,
|
2023-02-06 20:55:27 +03:00
|
|
|
) -> anyhow::Result<(Arc<Web3Rpc>, AnyhowJoinHandle<()>)> {
|
2022-09-15 20:57:24 +03:00
|
|
|
let hard_limit = hard_limit.map(|(hard_rate_limit, redis_pool)| {
|
|
|
|
// TODO: is cache size 1 okay? i think we need
|
|
|
|
RedisRateLimiter::new(
|
2022-08-06 08:46:33 +03:00
|
|
|
"web3_proxy",
|
2022-11-12 09:11:58 +03:00
|
|
|
&format!("{}:{}", chain_id, name),
|
2022-05-22 02:34:05 +03:00
|
|
|
hard_rate_limit,
|
2022-08-30 23:01:42 +03:00
|
|
|
60.0,
|
2022-09-15 20:57:24 +03:00
|
|
|
redis_pool,
|
2022-05-22 02:34:05 +03:00
|
|
|
)
|
|
|
|
});
|
2022-05-05 22:07:09 +03:00
|
|
|
|
2022-12-06 00:13:36 +03:00
|
|
|
// TODO: should we do this even if block_sender is None? then we would know limits on private relays
|
|
|
|
let block_data_limit: AtomicU64 = block_data_limit.unwrap_or_default().into();
|
|
|
|
let automatic_block_limit =
|
|
|
|
(block_data_limit.load(atomic::Ordering::Acquire) == 0) && block_sender.is_some();
|
2022-11-25 03:45:13 +03:00
|
|
|
|
2023-01-25 07:44:50 +03:00
|
|
|
// track hard limit until on backup servers (which might surprise us with rate limit changes)
|
|
|
|
// and track on servers that have a configured hard limit
|
|
|
|
let hard_limit_until = if backup || hard_limit.is_some() {
|
|
|
|
let (sender, _) = watch::channel(Instant::now());
|
|
|
|
|
|
|
|
Some(sender)
|
|
|
|
} else {
|
|
|
|
None
|
|
|
|
};
|
|
|
|
|
2022-07-09 07:25:59 +03:00
|
|
|
let new_connection = Self {
|
2022-08-10 08:56:09 +03:00
|
|
|
name,
|
2023-01-03 22:54:24 +03:00
|
|
|
db_conn: db_conn.clone(),
|
2022-11-14 00:05:37 +03:00
|
|
|
display_name,
|
2022-09-14 05:11:48 +03:00
|
|
|
http_client,
|
2022-11-12 09:11:58 +03:00
|
|
|
url: url_str,
|
2022-05-22 02:34:05 +03:00
|
|
|
hard_limit,
|
2023-01-25 07:44:50 +03:00
|
|
|
hard_limit_until,
|
2022-05-05 22:07:09 +03:00
|
|
|
soft_limit,
|
2022-12-06 00:13:36 +03:00
|
|
|
automatic_block_limit,
|
2023-01-19 13:13:00 +03:00
|
|
|
backup,
|
2022-11-25 03:45:13 +03:00
|
|
|
block_data_limit,
|
2023-01-04 09:37:51 +03:00
|
|
|
tier,
|
2023-02-06 20:55:27 +03:00
|
|
|
..Default::default()
|
2022-05-12 21:49:57 +03:00
|
|
|
};
|
|
|
|
|
2022-07-09 07:25:59 +03:00
|
|
|
let new_connection = Arc::new(new_connection);
|
2022-05-12 21:49:57 +03:00
|
|
|
|
2022-07-19 04:31:12 +03:00
|
|
|
// subscribe to new blocks and new transactions
|
2022-12-06 00:13:36 +03:00
|
|
|
// subscribing starts the connection (with retries)
|
2022-07-19 04:31:12 +03:00
|
|
|
// TODO: make transaction subscription optional (just pass None for tx_id_sender)
|
2022-06-14 08:43:28 +03:00
|
|
|
let handle = {
|
2022-07-09 07:25:59 +03:00
|
|
|
let new_connection = new_connection.clone();
|
2022-12-06 00:13:36 +03:00
|
|
|
let authorization = Arc::new(Authorization::internal(db_conn)?);
|
2022-06-14 08:43:28 +03:00
|
|
|
tokio::spawn(async move {
|
2022-07-09 07:25:59 +03:00
|
|
|
new_connection
|
2022-08-26 20:26:17 +03:00
|
|
|
.subscribe(
|
2022-11-08 22:58:11 +03:00
|
|
|
&authorization,
|
2022-08-26 20:26:17 +03:00
|
|
|
block_map,
|
|
|
|
block_sender,
|
2022-12-06 00:13:36 +03:00
|
|
|
chain_id,
|
|
|
|
http_interval_sender,
|
2022-08-26 20:26:17 +03:00
|
|
|
reconnect,
|
2022-12-06 00:13:36 +03:00
|
|
|
tx_id_sender,
|
2022-08-26 20:26:17 +03:00
|
|
|
)
|
2022-06-14 08:43:28 +03:00
|
|
|
.await
|
|
|
|
})
|
|
|
|
};
|
|
|
|
|
2022-08-27 05:13:36 +03:00
|
|
|
Ok((new_connection, handle))
|
|
|
|
}
|
2022-07-19 04:31:12 +03:00
|
|
|
|
2022-12-28 05:17:11 +03:00
|
|
|
// TODO: would be great if rpcs exposed this. see https://github.com/ledgerwatch/erigon/issues/6391
|
2022-11-08 22:58:11 +03:00
|
|
|
async fn check_block_data_limit(
|
|
|
|
self: &Arc<Self>,
|
|
|
|
authorization: &Arc<Authorization>,
|
|
|
|
) -> anyhow::Result<Option<u64>> {
|
2022-12-06 00:13:36 +03:00
|
|
|
if !self.automatic_block_limit {
|
2022-12-28 05:17:11 +03:00
|
|
|
// TODO: is this a good thing to return?
|
2022-12-06 00:13:36 +03:00
|
|
|
return Ok(None);
|
|
|
|
}
|
|
|
|
|
2023-01-19 14:05:39 +03:00
|
|
|
// TODO: check eth_syncing. if it is not false, return Ok(None)
|
2022-12-06 00:13:36 +03:00
|
|
|
|
2022-12-28 05:17:11 +03:00
|
|
|
let mut limit = None;
|
|
|
|
|
2022-11-25 03:45:13 +03:00
|
|
|
// TODO: binary search between 90k and max?
|
2022-12-06 00:13:36 +03:00
|
|
|
// TODO: start at 0 or 1?
|
2022-11-25 03:45:13 +03:00
|
|
|
for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] {
|
2022-12-06 00:13:36 +03:00
|
|
|
let handle = self
|
2023-01-25 09:45:20 +03:00
|
|
|
.wait_for_request_handle(authorization, None, true)
|
2022-12-06 00:13:36 +03:00
|
|
|
.await?;
|
2022-07-19 04:31:12 +03:00
|
|
|
|
2022-12-06 00:13:36 +03:00
|
|
|
let head_block_num_future = handle.request::<Option<()>, U256>(
|
|
|
|
"eth_blockNumber",
|
|
|
|
&None,
|
|
|
|
// error here are expected, so keep the level low
|
|
|
|
Level::Debug.into(),
|
|
|
|
);
|
2022-08-11 00:52:28 +03:00
|
|
|
|
2022-12-06 00:13:36 +03:00
|
|
|
let head_block_num = timeout(Duration::from_secs(5), head_block_num_future)
|
|
|
|
.await
|
|
|
|
.context("timeout fetching eth_blockNumber")?
|
|
|
|
.context("provider error")?;
|
2022-09-06 06:26:23 +03:00
|
|
|
|
2022-11-25 03:45:13 +03:00
|
|
|
let maybe_archive_block = head_block_num.saturating_sub((block_data_limit).into());
|
2022-08-27 05:13:36 +03:00
|
|
|
|
2022-12-06 00:13:36 +03:00
|
|
|
trace!(
|
|
|
|
"checking maybe_archive_block on {}: {}",
|
|
|
|
self,
|
|
|
|
maybe_archive_block
|
|
|
|
);
|
|
|
|
|
2022-09-06 06:26:23 +03:00
|
|
|
// TODO: wait for the handle BEFORE we check the current block number. it might be delayed too!
|
2022-09-20 09:00:27 +03:00
|
|
|
// TODO: what should the request be?
|
2022-12-06 00:13:36 +03:00
|
|
|
let handle = self
|
2023-01-25 09:45:20 +03:00
|
|
|
.wait_for_request_handle(authorization, None, true)
|
2022-12-06 00:13:36 +03:00
|
|
|
.await?;
|
|
|
|
|
|
|
|
let archive_result: Result<Bytes, _> = handle
|
2022-08-27 05:13:36 +03:00
|
|
|
.request(
|
|
|
|
"eth_getCode",
|
2022-09-24 05:47:44 +03:00
|
|
|
&json!((
|
2022-08-27 05:13:36 +03:00
|
|
|
"0xdead00000000000000000000000000000000beef",
|
|
|
|
maybe_archive_block,
|
2022-09-24 05:47:44 +03:00
|
|
|
)),
|
2022-09-21 07:48:21 +03:00
|
|
|
// error here are expected, so keep the level low
|
2022-11-25 03:45:13 +03:00
|
|
|
Level::Trace.into(),
|
2022-08-27 05:13:36 +03:00
|
|
|
)
|
|
|
|
.await;
|
|
|
|
|
2022-11-25 03:45:13 +03:00
|
|
|
trace!(
|
2022-12-06 00:13:36 +03:00
|
|
|
"archive_result on {} for {} ({}): {:?}",
|
|
|
|
self,
|
2022-11-25 03:45:13 +03:00
|
|
|
block_data_limit,
|
2022-12-06 00:13:36 +03:00
|
|
|
maybe_archive_block,
|
2022-11-25 03:45:13 +03:00
|
|
|
archive_result
|
|
|
|
);
|
2022-08-27 05:13:36 +03:00
|
|
|
|
2022-11-25 03:45:13 +03:00
|
|
|
if archive_result.is_err() {
|
2022-08-27 05:13:36 +03:00
|
|
|
break;
|
2022-07-19 04:31:12 +03:00
|
|
|
}
|
2022-11-25 03:45:13 +03:00
|
|
|
|
|
|
|
limit = Some(block_data_limit);
|
2022-07-19 04:31:12 +03:00
|
|
|
}
|
|
|
|
|
2022-08-27 05:13:36 +03:00
|
|
|
if let Some(limit) = limit {
|
2023-01-03 22:54:24 +03:00
|
|
|
if limit == 0 {
|
|
|
|
warn!("{} is unable to serve requests", self);
|
|
|
|
}
|
|
|
|
|
2022-08-27 05:13:36 +03:00
|
|
|
self.block_data_limit
|
|
|
|
.store(limit, atomic::Ordering::Release);
|
|
|
|
}
|
2022-07-19 04:31:12 +03:00
|
|
|
|
2022-12-06 03:47:27 +03:00
|
|
|
info!("block data limit on {}: {:?}", self, limit);
|
2022-11-25 03:45:13 +03:00
|
|
|
|
2022-08-27 05:13:36 +03:00
|
|
|
Ok(limit)
|
2022-07-09 07:25:59 +03:00
|
|
|
}
|
|
|
|
|
2022-11-04 01:16:27 +03:00
|
|
|
/// TODO: this might be too simple. different nodes can prune differently. its possible we will have a block range
|
2022-07-25 03:27:00 +03:00
|
|
|
pub fn block_data_limit(&self) -> U64 {
|
2023-01-03 22:54:24 +03:00
|
|
|
self.block_data_limit.load(atomic::Ordering::Acquire).into()
|
2022-07-19 04:31:12 +03:00
|
|
|
}
|
|
|
|
|
2022-07-22 22:30:39 +03:00
|
|
|
pub fn has_block_data(&self, needed_block_num: &U64) -> bool {
|
2022-12-03 08:31:03 +03:00
|
|
|
let head_block_num = match self.head_block.read().clone() {
|
2022-09-06 06:26:23 +03:00
|
|
|
None => return false,
|
2023-01-19 14:05:39 +03:00
|
|
|
Some(x) => x.number(),
|
2022-09-06 06:26:23 +03:00
|
|
|
};
|
2022-07-19 04:31:12 +03:00
|
|
|
|
2022-11-20 01:05:51 +03:00
|
|
|
// this rpc doesn't have that block yet. still syncing
|
2022-11-22 23:23:08 +03:00
|
|
|
if needed_block_num > &head_block_num {
|
2022-11-04 01:16:27 +03:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
// if this is a pruning node, we might not actually have the block
|
|
|
|
let block_data_limit: U64 = self.block_data_limit();
|
|
|
|
|
2022-11-22 23:23:08 +03:00
|
|
|
let oldest_block_num = head_block_num.saturating_sub(block_data_limit);
|
2022-07-19 04:31:12 +03:00
|
|
|
|
2022-12-08 09:54:38 +03:00
|
|
|
*needed_block_num >= oldest_block_num
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
|
|
|
|
2022-09-14 04:43:09 +03:00
|
|
|
/// reconnect to the provider. errors are retried forever with exponential backoff with jitter.
|
2022-10-25 07:12:24 +03:00
|
|
|
/// We use the "Decorrelated" jitter from <https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/>
|
2022-09-14 04:43:09 +03:00
|
|
|
/// TODO: maybe it would be better to use "Full Jitter". The "Full Jitter" approach uses less work, but slightly more time.
|
2022-12-06 00:13:36 +03:00
|
|
|
pub async fn retrying_connect(
|
2022-09-14 04:43:09 +03:00
|
|
|
self: &Arc<Self>,
|
|
|
|
block_sender: Option<&flume::Sender<BlockAndRpc>>,
|
2022-12-06 00:13:36 +03:00
|
|
|
chain_id: u64,
|
|
|
|
db_conn: Option<&DatabaseConnection>,
|
|
|
|
delay_start: bool,
|
2022-09-14 04:43:09 +03:00
|
|
|
) -> anyhow::Result<()> {
|
|
|
|
// there are several crates that have retry helpers, but they all seem more complex than necessary
|
2022-11-14 21:24:52 +03:00
|
|
|
// TODO: move this backoff logic into a helper function so we can use it when doing database locking
|
2022-09-14 04:43:09 +03:00
|
|
|
let base_ms = 500;
|
|
|
|
let cap_ms = 30_000;
|
|
|
|
let range_multiplier = 3;
|
|
|
|
|
|
|
|
// sleep once before the initial retry attempt
|
2022-09-14 05:11:48 +03:00
|
|
|
// TODO: now that we use this method for our initial connection, do we still want this sleep?
|
2022-12-06 00:13:36 +03:00
|
|
|
let mut sleep_ms = if delay_start {
|
2022-09-14 05:11:48 +03:00
|
|
|
let first_sleep_ms = min(
|
|
|
|
cap_ms,
|
2022-11-12 09:11:58 +03:00
|
|
|
thread_fast_rng().gen_range(base_ms..(base_ms * range_multiplier)),
|
2022-09-14 05:11:48 +03:00
|
|
|
);
|
2022-09-14 09:38:53 +03:00
|
|
|
let reconnect_in = Duration::from_millis(first_sleep_ms);
|
|
|
|
|
2023-01-10 05:23:27 +03:00
|
|
|
info!("Reconnect to {} in {}ms", self, reconnect_in.as_millis());
|
2022-09-14 09:38:53 +03:00
|
|
|
|
|
|
|
sleep(reconnect_in).await;
|
2022-09-14 05:11:48 +03:00
|
|
|
|
|
|
|
first_sleep_ms
|
|
|
|
} else {
|
|
|
|
base_ms
|
|
|
|
};
|
2022-09-14 04:43:09 +03:00
|
|
|
|
|
|
|
// retry until we succeed
|
2022-12-06 00:13:36 +03:00
|
|
|
while let Err(err) = self.connect(block_sender, chain_id, db_conn).await {
|
|
|
|
// thread_rng is crytographically secure. we don't need that here
|
2022-09-14 04:43:09 +03:00
|
|
|
sleep_ms = min(
|
|
|
|
cap_ms,
|
2022-11-12 09:11:58 +03:00
|
|
|
thread_fast_rng().gen_range(base_ms..(sleep_ms * range_multiplier)),
|
2022-09-14 04:43:09 +03:00
|
|
|
);
|
|
|
|
|
|
|
|
let retry_in = Duration::from_millis(sleep_ms);
|
2023-01-24 08:37:23 +03:00
|
|
|
|
|
|
|
let error_level = if self.backup {
|
|
|
|
log::Level::Debug
|
|
|
|
} else {
|
|
|
|
log::Level::Info
|
|
|
|
};
|
|
|
|
|
|
|
|
log::log!(
|
|
|
|
error_level,
|
2022-11-12 11:24:32 +03:00
|
|
|
"Failed reconnect to {}! Retry in {}ms. err={:?}",
|
|
|
|
self,
|
|
|
|
retry_in.as_millis(),
|
|
|
|
err,
|
|
|
|
);
|
2022-09-14 04:43:09 +03:00
|
|
|
|
|
|
|
sleep(retry_in).await;
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-12-06 00:13:36 +03:00
|
|
|
/// connect to the web3 provider
|
|
|
|
async fn connect(
|
2022-06-14 07:04:14 +03:00
|
|
|
self: &Arc<Self>,
|
2022-09-14 04:43:09 +03:00
|
|
|
block_sender: Option<&flume::Sender<BlockAndRpc>>,
|
2022-12-06 00:13:36 +03:00
|
|
|
chain_id: u64,
|
|
|
|
db_conn: Option<&DatabaseConnection>,
|
2022-06-14 07:04:14 +03:00
|
|
|
) -> anyhow::Result<()> {
|
2022-12-06 03:06:28 +03:00
|
|
|
// trace!("provider_state {} locking...", self);
|
|
|
|
let mut provider_state = self
|
|
|
|
.provider_state
|
|
|
|
.try_write()
|
|
|
|
.context("locking provider for write")?;
|
|
|
|
// trace!("provider_state {} locked: {:?}", self, provider_state);
|
2022-09-14 09:38:53 +03:00
|
|
|
|
2022-12-06 00:13:36 +03:00
|
|
|
match &*provider_state {
|
|
|
|
ProviderState::None => {
|
|
|
|
info!("connecting to {}", self);
|
|
|
|
}
|
2023-01-25 07:44:50 +03:00
|
|
|
ProviderState::Connecting(provider) | ProviderState::Connected(provider) => {
|
2022-12-06 00:13:36 +03:00
|
|
|
// disconnect the current provider
|
|
|
|
if let Web3Provider::Mock = provider.as_ref() {
|
2022-09-14 09:38:53 +03:00
|
|
|
return Ok(());
|
|
|
|
}
|
2022-09-13 02:00:10 +03:00
|
|
|
|
2022-12-06 03:06:28 +03:00
|
|
|
debug!("reconnecting to {}", self);
|
2022-06-14 07:04:14 +03:00
|
|
|
|
2022-12-06 00:13:36 +03:00
|
|
|
// disconnect the current provider
|
|
|
|
*provider_state = ProviderState::None;
|
2022-09-14 07:27:18 +03:00
|
|
|
|
2022-12-06 00:13:36 +03:00
|
|
|
// reset sync status
|
2022-12-06 03:06:28 +03:00
|
|
|
// trace!("locking head block on {}", self);
|
2022-12-06 00:13:36 +03:00
|
|
|
{
|
|
|
|
let mut head_block = self.head_block.write();
|
|
|
|
*head_block = None;
|
|
|
|
}
|
2022-12-06 03:06:28 +03:00
|
|
|
// trace!("done with head block on {}", self);
|
2022-12-06 00:13:36 +03:00
|
|
|
|
|
|
|
// tell the block subscriber that we don't have any blocks
|
|
|
|
if let Some(block_sender) = block_sender {
|
|
|
|
block_sender
|
|
|
|
.send_async((None, self.clone()))
|
|
|
|
.await
|
|
|
|
.context("block_sender during connect")?;
|
|
|
|
}
|
2022-09-14 07:27:18 +03:00
|
|
|
}
|
2022-06-16 05:53:37 +03:00
|
|
|
}
|
2022-06-14 07:04:14 +03:00
|
|
|
|
2022-12-06 03:06:28 +03:00
|
|
|
// trace!("Creating new Web3Provider on {}", self);
|
2022-09-14 04:43:09 +03:00
|
|
|
// TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again!
|
2022-09-14 05:11:48 +03:00
|
|
|
let new_provider = Web3Provider::from_str(&self.url, self.http_client.clone()).await?;
|
2022-09-14 04:43:09 +03:00
|
|
|
|
2022-12-06 03:06:28 +03:00
|
|
|
// trace!("saving provider state as NotReady on {}", self);
|
2023-01-25 07:44:50 +03:00
|
|
|
*provider_state = ProviderState::Connecting(Arc::new(new_provider));
|
2022-12-06 00:13:36 +03:00
|
|
|
|
|
|
|
// drop the lock so that we can get a request handle
|
2022-12-06 03:06:28 +03:00
|
|
|
// trace!("provider_state {} unlocked", self);
|
2022-12-06 00:13:36 +03:00
|
|
|
drop(provider_state);
|
|
|
|
|
|
|
|
let authorization = Arc::new(Authorization::internal(db_conn.cloned())?);
|
|
|
|
|
|
|
|
// check the server's chain_id here
|
|
|
|
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
|
|
|
|
// TODO: what should the timeout be? should there be a request timeout?
|
2022-12-06 03:06:28 +03:00
|
|
|
// trace!("waiting on chain id for {}", self);
|
2022-12-06 00:13:36 +03:00
|
|
|
let found_chain_id: Result<U64, _> = self
|
2023-01-25 09:45:20 +03:00
|
|
|
.wait_for_request_handle(&authorization, None, true)
|
2022-12-06 00:13:36 +03:00
|
|
|
.await?
|
|
|
|
.request(
|
|
|
|
"eth_chainId",
|
|
|
|
&json!(Option::None::<()>),
|
|
|
|
Level::Trace.into(),
|
|
|
|
)
|
|
|
|
.await;
|
2022-12-06 03:06:28 +03:00
|
|
|
// trace!("found_chain_id: {:?}", found_chain_id);
|
2022-12-06 00:13:36 +03:00
|
|
|
|
|
|
|
match found_chain_id {
|
|
|
|
Ok(found_chain_id) => {
|
|
|
|
// TODO: there has to be a cleaner way to do this
|
|
|
|
if chain_id != found_chain_id.as_u64() {
|
|
|
|
return Err(anyhow::anyhow!(
|
|
|
|
"incorrect chain id! Config has {}, but RPC has {}",
|
|
|
|
chain_id,
|
|
|
|
found_chain_id
|
|
|
|
)
|
|
|
|
.context(format!("failed @ {}", self)));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
return Err(anyhow::Error::from(e));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
self.check_block_data_limit(&authorization).await?;
|
|
|
|
|
|
|
|
{
|
2022-12-06 03:06:28 +03:00
|
|
|
// trace!("locking for ready...");
|
2022-12-06 00:13:36 +03:00
|
|
|
let mut provider_state = self.provider_state.write().await;
|
2022-12-06 03:06:28 +03:00
|
|
|
// trace!("locked for ready...");
|
2022-12-06 00:13:36 +03:00
|
|
|
|
|
|
|
// TODO: do this without a clone
|
|
|
|
let ready_provider = provider_state
|
|
|
|
.provider(true)
|
|
|
|
.await
|
|
|
|
.context("provider missing")?
|
|
|
|
.clone();
|
|
|
|
|
2023-01-25 07:44:50 +03:00
|
|
|
*provider_state = ProviderState::Connected(ready_provider);
|
2022-12-06 03:06:28 +03:00
|
|
|
// trace!("unlocked for ready...");
|
2022-12-06 00:13:36 +03:00
|
|
|
}
|
2022-09-14 04:43:09 +03:00
|
|
|
|
2022-11-12 11:24:32 +03:00
|
|
|
info!("successfully connected to {}", self);
|
2022-09-14 04:43:09 +03:00
|
|
|
|
2022-06-14 07:04:14 +03:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-05-06 09:07:01 +03:00
|
|
|
#[inline]
|
2022-05-05 22:07:09 +03:00
|
|
|
pub fn active_requests(&self) -> u32 {
|
|
|
|
self.active_requests.load(atomic::Ordering::Acquire)
|
|
|
|
}
|
|
|
|
|
2022-08-26 20:26:17 +03:00
|
|
|
async fn send_head_block_result(
|
|
|
|
self: &Arc<Self>,
|
2022-09-06 15:29:37 +03:00
|
|
|
new_head_block: Result<Option<ArcBlock>, ProviderError>,
|
2022-07-22 08:11:26 +03:00
|
|
|
block_sender: &flume::Sender<BlockAndRpc>,
|
2022-09-17 05:17:20 +03:00
|
|
|
block_map: BlockHashesCache,
|
2022-05-30 07:30:13 +03:00
|
|
|
) -> anyhow::Result<()> {
|
2022-12-03 08:31:03 +03:00
|
|
|
let new_head_block = match new_head_block {
|
2022-09-06 15:29:37 +03:00
|
|
|
Ok(None) => {
|
2022-11-06 23:52:11 +03:00
|
|
|
{
|
2022-12-06 01:38:54 +03:00
|
|
|
let mut head_block = self.head_block.write();
|
2022-12-03 08:31:03 +03:00
|
|
|
|
2022-12-06 01:38:54 +03:00
|
|
|
if head_block.is_none() {
|
2022-12-03 08:31:03 +03:00
|
|
|
// we previously sent a None. return early
|
|
|
|
return Ok(());
|
|
|
|
}
|
2023-02-08 22:30:16 +03:00
|
|
|
warn!("clearing head block on {}!", self);
|
2022-11-06 23:52:11 +03:00
|
|
|
|
2022-12-06 01:38:54 +03:00
|
|
|
*head_block = None;
|
2022-11-06 23:52:11 +03:00
|
|
|
}
|
|
|
|
|
2022-12-03 08:31:03 +03:00
|
|
|
None
|
2022-09-06 15:29:37 +03:00
|
|
|
}
|
2022-11-06 23:52:11 +03:00
|
|
|
Ok(Some(new_head_block)) => {
|
2022-12-03 08:31:03 +03:00
|
|
|
let new_hash = new_head_block
|
|
|
|
.hash
|
|
|
|
.context("sending block to connections")?;
|
2022-08-26 20:26:17 +03:00
|
|
|
|
2022-09-14 22:39:08 +03:00
|
|
|
// if we already have this block saved, set new_head_block to that arc. otherwise store this copy
|
2022-11-06 23:52:11 +03:00
|
|
|
let new_head_block = block_map
|
2022-11-03 02:14:16 +03:00
|
|
|
.get_with(new_hash, async move { new_head_block })
|
2022-09-14 22:39:08 +03:00
|
|
|
.await;
|
2022-08-30 23:01:42 +03:00
|
|
|
|
2022-08-26 20:26:17 +03:00
|
|
|
// save the block so we don't send the same one multiple times
|
|
|
|
// also save so that archive checks can know how far back to query
|
|
|
|
{
|
2022-12-03 08:31:03 +03:00
|
|
|
let mut head_block = self.head_block.write();
|
2022-09-06 06:26:23 +03:00
|
|
|
|
2022-12-03 08:31:03 +03:00
|
|
|
let _ = head_block.insert(new_head_block.clone().into());
|
2022-07-19 04:31:12 +03:00
|
|
|
}
|
|
|
|
|
2023-01-19 14:05:39 +03:00
|
|
|
if self.block_data_limit() == U64::zero() {
|
2023-01-03 22:54:24 +03:00
|
|
|
let authorization = Arc::new(Authorization::internal(self.db_conn.clone())?);
|
|
|
|
if let Err(err) = self.check_block_data_limit(&authorization).await {
|
|
|
|
warn!(
|
|
|
|
"failed checking block limit after {} finished syncing. {:?}",
|
|
|
|
self, err
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-12-03 08:31:03 +03:00
|
|
|
Some(new_head_block)
|
2022-05-15 09:27:13 +03:00
|
|
|
}
|
2022-11-06 23:52:11 +03:00
|
|
|
Err(err) => {
|
2022-11-12 11:24:32 +03:00
|
|
|
warn!("unable to get block from {}. err={:?}", self, err);
|
2022-11-06 23:52:11 +03:00
|
|
|
|
|
|
|
{
|
2022-12-06 01:38:54 +03:00
|
|
|
let mut head_block = self.head_block.write();
|
2022-11-06 23:52:11 +03:00
|
|
|
|
2022-12-06 01:38:54 +03:00
|
|
|
*head_block = None;
|
2022-11-06 23:52:11 +03:00
|
|
|
}
|
2022-08-07 09:48:57 +03:00
|
|
|
|
2022-12-03 08:31:03 +03:00
|
|
|
None
|
2022-05-15 09:27:13 +03:00
|
|
|
}
|
2022-12-03 08:31:03 +03:00
|
|
|
};
|
|
|
|
|
|
|
|
// send an empty block to take this server out of rotation
|
|
|
|
block_sender
|
|
|
|
.send_async((new_head_block, self.clone()))
|
|
|
|
.await
|
|
|
|
.context("block_sender")?;
|
2022-05-30 07:30:13 +03:00
|
|
|
|
|
|
|
Ok(())
|
2022-05-15 09:27:13 +03:00
|
|
|
}
|
|
|
|
|
2022-09-14 04:43:09 +03:00
|
|
|
/// subscribe to blocks and transactions with automatic reconnects
|
2022-12-06 00:13:36 +03:00
|
|
|
/// This should only exit when the program is exiting.
|
|
|
|
/// TODO: should more of these args be on self?
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
2022-06-14 08:43:28 +03:00
|
|
|
async fn subscribe(
|
2022-06-14 07:04:14 +03:00
|
|
|
self: Arc<Self>,
|
2022-11-08 22:58:11 +03:00
|
|
|
authorization: &Arc<Authorization>,
|
2022-09-17 05:17:20 +03:00
|
|
|
block_map: BlockHashesCache,
|
2022-07-22 08:11:26 +03:00
|
|
|
block_sender: Option<flume::Sender<BlockAndRpc>>,
|
2022-12-06 00:13:36 +03:00
|
|
|
chain_id: u64,
|
|
|
|
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
|
2022-06-14 07:04:14 +03:00
|
|
|
reconnect: bool,
|
2022-12-06 00:13:36 +03:00
|
|
|
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
|
2022-06-14 07:04:14 +03:00
|
|
|
) -> anyhow::Result<()> {
|
2022-06-16 05:53:37 +03:00
|
|
|
loop {
|
2022-07-19 07:24:16 +03:00
|
|
|
let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe());
|
2022-06-29 22:15:05 +03:00
|
|
|
|
2022-06-16 05:53:37 +03:00
|
|
|
let mut futures = vec![];
|
|
|
|
|
2022-12-06 00:13:36 +03:00
|
|
|
{
|
|
|
|
// health check
|
|
|
|
// TODO: move this into a proper function
|
|
|
|
let authorization = authorization.clone();
|
|
|
|
let block_sender = block_sender.clone();
|
|
|
|
let conn = self.clone();
|
|
|
|
let (ready_tx, ready_rx) = oneshot::channel();
|
|
|
|
let f = async move {
|
|
|
|
// initial sleep to allow for the initial connection
|
|
|
|
conn.retrying_connect(
|
|
|
|
block_sender.as_ref(),
|
|
|
|
chain_id,
|
|
|
|
authorization.db_conn.as_ref(),
|
|
|
|
false,
|
|
|
|
)
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
// provider is ready
|
|
|
|
ready_tx.send(()).unwrap();
|
|
|
|
|
|
|
|
// wait before doing the initial health check
|
|
|
|
// TODO: how often?
|
2022-12-06 03:06:28 +03:00
|
|
|
// TODO: subscribe to self.head_block
|
2022-12-06 00:13:36 +03:00
|
|
|
let health_sleep_seconds = 10;
|
|
|
|
sleep(Duration::from_secs(health_sleep_seconds)).await;
|
|
|
|
|
|
|
|
loop {
|
|
|
|
// TODO: what if we just happened to have this check line up with another restart?
|
|
|
|
// TODO: think more about this
|
2022-12-06 03:06:28 +03:00
|
|
|
// trace!("health check on {}. locking...", conn);
|
2022-12-06 00:13:36 +03:00
|
|
|
if conn
|
|
|
|
.provider_state
|
|
|
|
.read()
|
|
|
|
.await
|
|
|
|
.provider(false)
|
|
|
|
.await
|
|
|
|
.is_none()
|
|
|
|
{
|
2022-12-06 03:06:28 +03:00
|
|
|
// trace!("health check unlocked with error on {}", conn);
|
2022-12-06 00:13:36 +03:00
|
|
|
// returning error will trigger a reconnect
|
|
|
|
return Err(anyhow::anyhow!("{} is not ready", conn));
|
|
|
|
}
|
2022-12-06 03:06:28 +03:00
|
|
|
// trace!("health check on {}. unlocked", conn);
|
2022-12-06 00:13:36 +03:00
|
|
|
|
|
|
|
sleep(Duration::from_secs(health_sleep_seconds)).await;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
futures.push(flatten_handle(tokio::spawn(f)));
|
|
|
|
|
|
|
|
// wait on the initial connection
|
|
|
|
ready_rx.await?;
|
|
|
|
}
|
|
|
|
|
2022-06-16 05:53:37 +03:00
|
|
|
if let Some(block_sender) = &block_sender {
|
2022-08-26 20:26:17 +03:00
|
|
|
let f = self.clone().subscribe_new_heads(
|
2022-11-08 22:58:11 +03:00
|
|
|
authorization.clone(),
|
2022-08-26 20:26:17 +03:00
|
|
|
http_interval_receiver,
|
|
|
|
block_sender.clone(),
|
|
|
|
block_map.clone(),
|
|
|
|
);
|
2022-06-16 05:53:37 +03:00
|
|
|
|
|
|
|
futures.push(flatten_handle(tokio::spawn(f)));
|
|
|
|
}
|
|
|
|
|
|
|
|
if let Some(tx_id_sender) = &tx_id_sender {
|
|
|
|
let f = self
|
|
|
|
.clone()
|
2022-11-08 22:58:11 +03:00
|
|
|
.subscribe_pending_transactions(authorization.clone(), tx_id_sender.clone());
|
2022-06-16 05:53:37 +03:00
|
|
|
|
|
|
|
futures.push(flatten_handle(tokio::spawn(f)));
|
|
|
|
}
|
|
|
|
|
|
|
|
match try_join_all(futures).await {
|
2022-09-14 04:43:09 +03:00
|
|
|
Ok(_) => {
|
|
|
|
// futures all exited without error. break instead of restarting subscriptions
|
|
|
|
break;
|
|
|
|
}
|
2022-06-16 05:53:37 +03:00
|
|
|
Err(err) => {
|
|
|
|
if reconnect {
|
2022-12-06 03:06:28 +03:00
|
|
|
warn!("{} connection ended. err={:?}", self, err);
|
2022-12-06 00:13:36 +03:00
|
|
|
|
|
|
|
self.clone()
|
|
|
|
.retrying_connect(
|
|
|
|
block_sender.as_ref(),
|
|
|
|
chain_id,
|
|
|
|
authorization.db_conn.as_ref(),
|
|
|
|
true,
|
|
|
|
)
|
|
|
|
.await?;
|
2022-06-16 05:53:37 +03:00
|
|
|
} else {
|
2022-11-12 11:24:32 +03:00
|
|
|
error!("{} subscription exited. err={:?}", self, err);
|
2022-06-16 05:53:37 +03:00
|
|
|
return Err(err);
|
|
|
|
}
|
2022-06-14 08:43:28 +03:00
|
|
|
}
|
2022-06-14 07:04:14 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-11-12 11:24:32 +03:00
|
|
|
info!("all subscriptions on {} completed", self);
|
2022-09-06 16:14:15 +03:00
|
|
|
|
2022-06-14 07:04:14 +03:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-05-17 05:26:47 +03:00
|
|
|
/// Subscribe to new blocks. If `reconnect` is true, this runs forever.
|
2022-06-14 07:04:14 +03:00
|
|
|
async fn subscribe_new_heads(
|
2022-05-05 22:07:09 +03:00
|
|
|
self: Arc<Self>,
|
2022-11-08 22:58:11 +03:00
|
|
|
authorization: Arc<Authorization>,
|
2022-06-29 22:15:05 +03:00
|
|
|
http_interval_receiver: Option<broadcast::Receiver<()>>,
|
2022-07-22 08:11:26 +03:00
|
|
|
block_sender: flume::Sender<BlockAndRpc>,
|
2022-09-17 05:17:20 +03:00
|
|
|
block_map: BlockHashesCache,
|
2022-05-05 22:07:09 +03:00
|
|
|
) -> anyhow::Result<()> {
|
2022-12-06 00:13:36 +03:00
|
|
|
trace!("watching new heads on {}", self);
|
2022-06-14 07:04:14 +03:00
|
|
|
|
2022-12-06 03:06:28 +03:00
|
|
|
// trace!("locking on new heads");
|
|
|
|
let provider_state = self
|
|
|
|
.provider_state
|
|
|
|
.try_read()
|
|
|
|
.context("subscribe_new_heads")?
|
|
|
|
.clone();
|
|
|
|
// trace!("unlocked on new heads");
|
|
|
|
|
|
|
|
// TODO: need a timeout
|
2023-01-25 07:44:50 +03:00
|
|
|
if let ProviderState::Connected(provider) = provider_state {
|
2022-12-06 00:13:36 +03:00
|
|
|
match provider.as_ref() {
|
2022-11-23 01:45:22 +03:00
|
|
|
Web3Provider::Mock => unimplemented!(),
|
2022-06-14 07:04:14 +03:00
|
|
|
Web3Provider::Http(_provider) => {
|
|
|
|
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
|
2022-06-29 22:15:05 +03:00
|
|
|
// TODO: try watch_blocks and fall back to this?
|
|
|
|
|
|
|
|
let mut http_interval_receiver = http_interval_receiver.unwrap();
|
2022-06-14 07:04:14 +03:00
|
|
|
|
2022-07-19 04:31:12 +03:00
|
|
|
let mut last_hash = H256::zero();
|
2022-06-14 07:04:14 +03:00
|
|
|
|
|
|
|
loop {
|
2022-09-20 09:00:27 +03:00
|
|
|
// TODO: what should the max_wait be?
|
2022-09-23 00:03:37 +03:00
|
|
|
match self
|
2023-01-25 09:45:20 +03:00
|
|
|
.wait_for_request_handle(&authorization, None, false)
|
2022-09-23 00:03:37 +03:00
|
|
|
.await
|
|
|
|
{
|
2022-08-26 20:26:17 +03:00
|
|
|
Ok(active_request_handle) => {
|
2022-11-06 23:52:11 +03:00
|
|
|
let block: Result<Option<ArcBlock>, _> = active_request_handle
|
2022-09-21 07:48:21 +03:00
|
|
|
.request(
|
|
|
|
"eth_getBlockByNumber",
|
2022-09-24 05:47:44 +03:00
|
|
|
&json!(("latest", false)),
|
2022-12-21 09:01:35 +03:00
|
|
|
Level::Warn.into(),
|
2022-09-21 07:48:21 +03:00
|
|
|
)
|
2022-06-14 07:04:14 +03:00
|
|
|
.await;
|
|
|
|
|
2022-08-26 20:26:17 +03:00
|
|
|
match block {
|
2022-11-06 23:52:11 +03:00
|
|
|
Ok(None) => {
|
|
|
|
warn!("no head block on {}", self);
|
|
|
|
|
|
|
|
self.send_head_block_result(
|
|
|
|
Ok(None),
|
|
|
|
&block_sender,
|
|
|
|
block_map.clone(),
|
|
|
|
)
|
|
|
|
.await?;
|
|
|
|
}
|
|
|
|
Ok(Some(block)) => {
|
2022-08-26 20:26:17 +03:00
|
|
|
// don't send repeat blocks
|
|
|
|
let new_hash = block
|
|
|
|
.hash
|
|
|
|
.expect("blocks here should always have hashes");
|
|
|
|
|
|
|
|
if new_hash != last_hash {
|
|
|
|
// new hash!
|
|
|
|
last_hash = new_hash;
|
|
|
|
|
|
|
|
self.send_head_block_result(
|
2022-11-06 23:52:11 +03:00
|
|
|
Ok(Some(block)),
|
2022-08-26 20:26:17 +03:00
|
|
|
&block_sender,
|
|
|
|
block_map.clone(),
|
|
|
|
)
|
2022-08-07 09:48:57 +03:00
|
|
|
.await?;
|
2022-08-26 20:26:17 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(err) => {
|
|
|
|
// we did not get a block back. something is up with the server. take it out of rotation
|
|
|
|
self.send_head_block_result(
|
|
|
|
Err(err),
|
|
|
|
&block_sender,
|
|
|
|
block_map.clone(),
|
|
|
|
)
|
|
|
|
.await?;
|
2022-07-19 07:31:30 +03:00
|
|
|
}
|
2022-05-16 22:15:40 +03:00
|
|
|
}
|
2022-06-14 07:04:14 +03:00
|
|
|
}
|
2022-07-09 01:14:45 +03:00
|
|
|
Err(err) => {
|
2022-11-12 11:24:32 +03:00
|
|
|
warn!("Internal error on latest block from {}. {:?}", self, err);
|
2022-11-06 23:52:11 +03:00
|
|
|
|
|
|
|
self.send_head_block_result(
|
|
|
|
Ok(None),
|
|
|
|
&block_sender,
|
|
|
|
block_map.clone(),
|
|
|
|
)
|
|
|
|
.await?;
|
|
|
|
|
2022-08-11 00:29:50 +03:00
|
|
|
// TODO: what should we do? sleep? extra time?
|
2022-05-16 22:15:40 +03:00
|
|
|
}
|
|
|
|
}
|
2022-07-19 04:31:12 +03:00
|
|
|
|
2022-08-26 20:26:17 +03:00
|
|
|
// wait for the next interval
|
2022-07-19 04:31:12 +03:00
|
|
|
// TODO: if error or rate limit, increase interval?
|
|
|
|
while let Err(err) = http_interval_receiver.recv().await {
|
|
|
|
match err {
|
|
|
|
broadcast::error::RecvError::Closed => {
|
2022-08-26 20:26:17 +03:00
|
|
|
// channel is closed! that's not good. bubble the error up
|
2022-07-19 04:31:12 +03:00
|
|
|
return Err(err.into());
|
|
|
|
}
|
|
|
|
broadcast::error::RecvError::Lagged(lagged) => {
|
2022-08-26 20:26:17 +03:00
|
|
|
// querying the block was delayed
|
|
|
|
// this can happen if tokio is very busy or waiting for requests limits took too long
|
2022-11-12 11:24:32 +03:00
|
|
|
warn!("http interval on {} lagging by {}!", self, lagged);
|
2022-07-19 04:31:12 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-05-15 22:28:22 +03:00
|
|
|
}
|
2022-06-14 07:04:14 +03:00
|
|
|
}
|
|
|
|
Web3Provider::Ws(provider) => {
|
2022-08-30 23:01:42 +03:00
|
|
|
// todo: move subscribe_blocks onto the request handle?
|
2022-09-23 00:03:37 +03:00
|
|
|
let active_request_handle = self
|
2023-01-25 09:45:20 +03:00
|
|
|
.wait_for_request_handle(&authorization, None, false)
|
2022-09-23 00:03:37 +03:00
|
|
|
.await;
|
2022-06-14 07:04:14 +03:00
|
|
|
let mut stream = provider.subscribe_blocks().await?;
|
|
|
|
drop(active_request_handle);
|
|
|
|
|
|
|
|
// query the block once since the subscription doesn't send the current block
|
|
|
|
// there is a very small race condition here where the stream could send us a new block right now
|
|
|
|
// all it does is print "new block" for the same block as current block
|
2022-11-06 23:52:11 +03:00
|
|
|
// TODO: how does this get wrapped in an arc? does ethers handle that?
|
2022-09-06 15:29:37 +03:00
|
|
|
let block: Result<Option<ArcBlock>, _> = self
|
2023-01-25 09:45:20 +03:00
|
|
|
.wait_for_request_handle(&authorization, None, false)
|
2022-08-06 08:26:43 +03:00
|
|
|
.await?
|
2022-09-21 07:48:21 +03:00
|
|
|
.request(
|
|
|
|
"eth_getBlockByNumber",
|
2022-09-24 05:47:44 +03:00
|
|
|
&json!(("latest", false)),
|
2022-12-21 09:01:35 +03:00
|
|
|
Level::Warn.into(),
|
2022-09-21 07:48:21 +03:00
|
|
|
)
|
2022-11-06 23:52:11 +03:00
|
|
|
.await;
|
2022-06-14 07:04:14 +03:00
|
|
|
|
2022-09-06 19:49:07 +03:00
|
|
|
let mut last_hash = match &block {
|
|
|
|
Ok(Some(new_block)) => new_block
|
|
|
|
.hash
|
|
|
|
.expect("blocks should always have a hash here"),
|
|
|
|
_ => H256::zero(),
|
|
|
|
};
|
|
|
|
|
2022-08-26 20:26:17 +03:00
|
|
|
self.send_head_block_result(block, &block_sender, block_map.clone())
|
|
|
|
.await?;
|
2022-06-14 07:04:14 +03:00
|
|
|
|
2023-01-03 18:51:18 +03:00
|
|
|
while let Some(new_block) = stream.next().await {
|
|
|
|
// TODO: check the new block's hash to be sure we don't send dupes
|
|
|
|
let new_hash = new_block
|
|
|
|
.hash
|
|
|
|
.expect("blocks should always have a hash here");
|
2023-01-03 17:08:40 +03:00
|
|
|
|
2023-01-03 18:51:18 +03:00
|
|
|
if new_hash == last_hash {
|
|
|
|
// some rpcs like to give us duplicates. don't waste our time on them
|
|
|
|
continue;
|
|
|
|
} else {
|
|
|
|
last_hash = new_hash;
|
2023-01-03 17:08:40 +03:00
|
|
|
}
|
2023-01-03 18:51:18 +03:00
|
|
|
|
|
|
|
self.send_head_block_result(
|
|
|
|
Ok(Some(Arc::new(new_block))),
|
|
|
|
&block_sender,
|
|
|
|
block_map.clone(),
|
|
|
|
)
|
|
|
|
.await?;
|
2022-05-17 05:26:47 +03:00
|
|
|
}
|
2022-07-09 01:14:45 +03:00
|
|
|
|
2022-11-06 23:52:11 +03:00
|
|
|
// clear the head block. this might not be needed, but it won't hurt
|
|
|
|
self.send_head_block_result(Ok(None), &block_sender, block_map)
|
|
|
|
.await?;
|
|
|
|
|
2022-09-06 19:49:07 +03:00
|
|
|
// TODO: is this always an error?
|
|
|
|
// TODO: we probably don't want a warn and to return error
|
2023-01-03 18:51:18 +03:00
|
|
|
warn!("new_heads subscription to {} ended", self);
|
2022-12-06 00:13:36 +03:00
|
|
|
Err(anyhow::anyhow!("new_heads subscription ended"))
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
2022-05-17 05:26:47 +03:00
|
|
|
}
|
2022-12-06 00:13:36 +03:00
|
|
|
} else {
|
|
|
|
Err(anyhow::anyhow!(
|
|
|
|
"Provider not ready! Unable to subscribe to heads"
|
|
|
|
))
|
2022-06-14 07:04:14 +03:00
|
|
|
}
|
|
|
|
}
|
2022-05-17 05:26:47 +03:00
|
|
|
|
2022-06-14 07:04:14 +03:00
|
|
|
async fn subscribe_pending_transactions(
|
|
|
|
self: Arc<Self>,
|
2022-11-08 22:58:11 +03:00
|
|
|
authorization: Arc<Authorization>,
|
2022-06-14 07:04:14 +03:00
|
|
|
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
|
|
|
|
) -> anyhow::Result<()> {
|
2023-01-25 07:44:50 +03:00
|
|
|
if let ProviderState::Connected(provider) = self
|
2022-12-06 03:06:28 +03:00
|
|
|
.provider_state
|
|
|
|
.try_read()
|
|
|
|
.context("subscribe_pending_transactions")?
|
|
|
|
.clone()
|
|
|
|
{
|
2022-12-06 00:13:36 +03:00
|
|
|
trace!("watching pending transactions on {}", self);
|
2023-02-06 05:16:09 +03:00
|
|
|
// TODO: does this keep the lock open for too long?
|
2022-12-06 00:13:36 +03:00
|
|
|
match provider.as_ref() {
|
2022-11-23 01:45:22 +03:00
|
|
|
Web3Provider::Mock => unimplemented!(),
|
2022-06-18 10:06:54 +03:00
|
|
|
Web3Provider::Http(provider) => {
|
2022-06-14 07:04:14 +03:00
|
|
|
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
|
2023-02-06 05:16:09 +03:00
|
|
|
// TODO: maybe subscribe to self.head_block?
|
|
|
|
// TODO: this keeps a read lock guard open on provider_state forever. is that okay for an http client?
|
|
|
|
futures::future::pending::<()>().await;
|
2022-06-14 07:04:14 +03:00
|
|
|
}
|
|
|
|
Web3Provider::Ws(provider) => {
|
2022-07-09 02:02:32 +03:00
|
|
|
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
|
2022-09-23 00:03:37 +03:00
|
|
|
let active_request_handle = self
|
2023-01-25 09:45:20 +03:00
|
|
|
.wait_for_request_handle(&authorization, None, false)
|
|
|
|
.await?;
|
2022-06-14 07:04:14 +03:00
|
|
|
|
|
|
|
let mut stream = provider.subscribe_pending_txs().await?;
|
|
|
|
|
|
|
|
drop(active_request_handle);
|
|
|
|
|
2022-07-08 21:27:06 +03:00
|
|
|
while let Some(pending_tx_id) = stream.next().await {
|
|
|
|
tx_id_sender
|
|
|
|
.send_async((pending_tx_id, self.clone()))
|
|
|
|
.await
|
|
|
|
.context("tx_id_sender")?;
|
2022-08-11 00:29:50 +03:00
|
|
|
|
|
|
|
// TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription
|
2022-06-14 07:04:14 +03:00
|
|
|
}
|
2022-07-08 21:27:06 +03:00
|
|
|
|
2022-09-06 19:49:07 +03:00
|
|
|
// TODO: is this always an error?
|
|
|
|
// TODO: we probably don't want a warn and to return error
|
2022-11-14 00:05:37 +03:00
|
|
|
warn!("pending_transactions subscription ended on {}", self);
|
2022-09-06 19:49:07 +03:00
|
|
|
return Err(anyhow::anyhow!("pending_transactions subscription ended"));
|
2022-06-14 07:04:14 +03:00
|
|
|
}
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
2022-12-06 00:13:36 +03:00
|
|
|
} else {
|
|
|
|
warn!(
|
|
|
|
"Provider not ready! Unable to watch pending transactions on {}",
|
|
|
|
self
|
|
|
|
);
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-08-30 23:01:42 +03:00
|
|
|
/// be careful with this; it might wait forever!
|
2022-12-06 00:13:36 +03:00
|
|
|
/// `allow_not_ready` is only for use by health checks while starting the provider
|
2023-01-25 07:44:50 +03:00
|
|
|
/// TODO: don't use anyhow. use specific error type
|
2022-09-20 09:00:27 +03:00
|
|
|
pub async fn wait_for_request_handle(
|
|
|
|
self: &Arc<Self>,
|
2022-11-08 22:58:11 +03:00
|
|
|
authorization: &Arc<Authorization>,
|
2023-01-25 09:45:20 +03:00
|
|
|
max_wait: Option<Duration>,
|
2022-12-06 00:13:36 +03:00
|
|
|
allow_not_ready: bool,
|
2022-09-20 09:00:27 +03:00
|
|
|
) -> anyhow::Result<OpenRequestHandle> {
|
2023-01-25 09:45:20 +03:00
|
|
|
let max_wait = max_wait.map(|x| Instant::now() + x);
|
2022-05-16 22:15:40 +03:00
|
|
|
|
2022-06-17 01:23:41 +03:00
|
|
|
loop {
|
2022-12-06 00:13:36 +03:00
|
|
|
match self
|
|
|
|
.try_request_handle(authorization, allow_not_ready)
|
|
|
|
.await
|
|
|
|
{
|
2022-08-24 03:59:05 +03:00
|
|
|
Ok(OpenRequestResult::Handle(handle)) => return Ok(handle),
|
2022-08-24 03:14:49 +03:00
|
|
|
Ok(OpenRequestResult::RetryAt(retry_at)) => {
|
2022-08-07 09:48:57 +03:00
|
|
|
// TODO: emit a stat?
|
2023-01-25 09:45:20 +03:00
|
|
|
let wait = retry_at.duration_since(Instant::now());
|
|
|
|
|
|
|
|
trace!(
|
|
|
|
"waiting {} millis for request handle on {}",
|
|
|
|
wait.as_millis(),
|
|
|
|
self
|
|
|
|
);
|
|
|
|
|
|
|
|
if let Some(max_wait) = max_wait {
|
|
|
|
if retry_at > max_wait {
|
|
|
|
// break now since we will wait past our maximum wait time
|
|
|
|
// TODO: don't use anyhow. use specific error type
|
|
|
|
return Err(anyhow::anyhow!("timeout waiting for request handle"));
|
|
|
|
}
|
2022-09-20 09:00:27 +03:00
|
|
|
}
|
2023-01-25 07:44:50 +03:00
|
|
|
|
2022-08-07 09:48:57 +03:00
|
|
|
sleep_until(retry_at).await;
|
|
|
|
}
|
2023-01-25 09:45:20 +03:00
|
|
|
Ok(OpenRequestResult::NotReady(_)) => {
|
2022-08-24 03:59:05 +03:00
|
|
|
// TODO: when can this happen? log? emit a stat?
|
2023-01-25 07:44:50 +03:00
|
|
|
trace!("{} has no handle ready", self);
|
|
|
|
|
2023-01-25 09:45:20 +03:00
|
|
|
if let Some(max_wait) = max_wait {
|
|
|
|
let now = Instant::now();
|
2023-01-25 07:44:50 +03:00
|
|
|
|
2023-01-25 09:45:20 +03:00
|
|
|
if now > max_wait {
|
|
|
|
return Err(anyhow::anyhow!("unable to retry for request handle"));
|
|
|
|
}
|
2023-01-25 07:44:50 +03:00
|
|
|
}
|
|
|
|
|
2022-08-07 09:48:57 +03:00
|
|
|
// TODO: sleep how long? maybe just error?
|
2023-01-25 07:44:50 +03:00
|
|
|
// TODO: instead of an arbitrary sleep, subscribe to the head block on this
|
|
|
|
sleep(Duration::from_millis(10)).await;
|
2022-05-06 07:29:25 +03:00
|
|
|
}
|
2022-08-07 09:48:57 +03:00
|
|
|
Err(err) => return Err(err),
|
2022-05-06 07:29:25 +03:00
|
|
|
}
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-09-22 23:27:14 +03:00
|
|
|
pub async fn try_request_handle(
|
|
|
|
self: &Arc<Self>,
|
2022-11-08 22:58:11 +03:00
|
|
|
authorization: &Arc<Authorization>,
|
2022-12-06 00:13:36 +03:00
|
|
|
// TODO? ready_provider: Option<&Arc<Web3Provider>>,
|
|
|
|
allow_not_ready: bool,
|
2022-09-22 23:27:14 +03:00
|
|
|
) -> anyhow::Result<OpenRequestResult> {
|
2022-12-08 09:54:38 +03:00
|
|
|
// TODO: think more about this read block
|
|
|
|
if !allow_not_ready
|
|
|
|
&& self
|
|
|
|
.provider_state
|
|
|
|
.read()
|
|
|
|
.await
|
|
|
|
.provider(allow_not_ready)
|
|
|
|
.await
|
|
|
|
.is_none()
|
|
|
|
{
|
2023-01-25 09:45:20 +03:00
|
|
|
trace!("{} is not ready", self);
|
|
|
|
return Ok(OpenRequestResult::NotReady(self.backup));
|
2022-12-08 09:54:38 +03:00
|
|
|
}
|
|
|
|
|
2023-01-25 07:44:50 +03:00
|
|
|
if let Some(hard_limit_until) = self.hard_limit_until.as_ref() {
|
|
|
|
let hard_limit_ready = hard_limit_until.borrow().clone();
|
|
|
|
|
|
|
|
let now = Instant::now();
|
|
|
|
|
|
|
|
if now < hard_limit_ready {
|
|
|
|
return Ok(OpenRequestResult::RetryAt(hard_limit_ready));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-05 22:07:09 +03:00
|
|
|
// check rate limits
|
2022-05-22 02:34:05 +03:00
|
|
|
if let Some(ratelimiter) = self.hard_limit.as_ref() {
|
2022-09-20 09:56:24 +03:00
|
|
|
// TODO: how should we know if we should set expire or not?
|
2023-01-25 09:45:20 +03:00
|
|
|
match ratelimiter
|
|
|
|
.throttle()
|
|
|
|
.await
|
|
|
|
.context(format!("attempting to throttle {}", self))?
|
|
|
|
{
|
2022-09-15 20:57:24 +03:00
|
|
|
RedisRateLimitResult::Allowed(_) => {
|
2023-01-25 07:44:50 +03:00
|
|
|
// trace!("rate limit succeeded")
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
2022-09-15 20:57:24 +03:00
|
|
|
RedisRateLimitResult::RetryAt(retry_at, _) => {
|
2023-01-25 09:45:20 +03:00
|
|
|
// rate limit gave us a wait time
|
|
|
|
if !self.backup {
|
|
|
|
let when = retry_at.duration_since(Instant::now());
|
|
|
|
warn!(
|
|
|
|
"Exhausted rate limit on {}. Retry in {}ms",
|
|
|
|
self,
|
|
|
|
when.as_millis()
|
|
|
|
);
|
|
|
|
}
|
2022-05-05 22:07:09 +03:00
|
|
|
|
2023-01-25 07:44:50 +03:00
|
|
|
if let Some(hard_limit_until) = self.hard_limit_until.as_ref() {
|
2023-01-25 09:45:20 +03:00
|
|
|
hard_limit_until.send_replace(retry_at.clone());
|
2023-01-25 07:44:50 +03:00
|
|
|
}
|
|
|
|
|
2022-09-10 03:12:14 +03:00
|
|
|
return Ok(OpenRequestResult::RetryAt(retry_at));
|
2022-08-07 09:48:57 +03:00
|
|
|
}
|
2022-09-15 20:57:24 +03:00
|
|
|
RedisRateLimitResult::RetryNever => {
|
2023-01-25 09:45:20 +03:00
|
|
|
return Ok(OpenRequestResult::NotReady(self.backup));
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2022-12-06 03:06:28 +03:00
|
|
|
let handle = OpenRequestHandle::new(authorization.clone(), self.clone()).await;
|
2022-08-24 03:11:49 +03:00
|
|
|
|
2022-08-24 03:59:05 +03:00
|
|
|
Ok(OpenRequestResult::Handle(handle))
|
2022-08-24 03:11:49 +03:00
|
|
|
}
|
|
|
|
}
|
2022-08-07 09:48:57 +03:00
|
|
|
|
2022-08-24 03:11:49 +03:00
|
|
|
impl fmt::Debug for Web3Provider {
|
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
|
// TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url
|
|
|
|
f.debug_struct("Web3Provider").finish_non_exhaustive()
|
2022-05-06 07:29:25 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
impl Hash for Web3Rpc {
|
2022-06-14 07:04:14 +03:00
|
|
|
fn hash<H: Hasher>(&self, state: &mut H) {
|
2022-08-24 03:32:16 +03:00
|
|
|
// TODO: is this enough?
|
|
|
|
self.name.hash(state);
|
2022-06-14 07:04:14 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
impl Eq for Web3Rpc {}
|
2022-05-05 22:07:09 +03:00
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
impl Ord for Web3Rpc {
|
2022-05-05 22:07:09 +03:00
|
|
|
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
2022-08-24 03:32:16 +03:00
|
|
|
self.name.cmp(&other.name)
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
impl PartialOrd for Web3Rpc {
|
2022-05-05 22:07:09 +03:00
|
|
|
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
|
|
|
Some(self.cmp(other))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
impl PartialEq for Web3Rpc {
|
2022-05-05 22:07:09 +03:00
|
|
|
fn eq(&self, other: &Self) -> bool {
|
2022-08-24 03:32:16 +03:00
|
|
|
self.name == other.name
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
|
|
|
}
|
2022-08-10 08:56:09 +03:00
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
impl Serialize for Web3Rpc {
|
2022-08-10 08:56:09 +03:00
|
|
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
|
|
where
|
|
|
|
S: Serializer,
|
|
|
|
{
|
|
|
|
// 3 is the number of fields in the struct.
|
2023-02-06 20:55:27 +03:00
|
|
|
let mut state = serializer.serialize_struct("Web3Rpc", 9)?;
|
2022-08-10 08:56:09 +03:00
|
|
|
|
2022-11-14 00:05:37 +03:00
|
|
|
// the url is excluded because it likely includes private information. just show the name that we use in keys
|
2022-08-10 08:56:09 +03:00
|
|
|
state.serialize_field("name", &self.name)?;
|
2022-11-14 00:05:37 +03:00
|
|
|
// a longer name for display to users
|
|
|
|
state.serialize_field("display_name", &self.display_name)?;
|
2022-08-10 08:56:09 +03:00
|
|
|
|
2023-02-02 19:00:59 +03:00
|
|
|
state.serialize_field("backup", &self.backup)?;
|
|
|
|
|
2022-12-28 05:17:11 +03:00
|
|
|
match self.block_data_limit.load(atomic::Ordering::Relaxed) {
|
|
|
|
u64::MAX => {
|
|
|
|
state.serialize_field("block_data_limit", &None::<()>)?;
|
|
|
|
}
|
|
|
|
block_data_limit => {
|
|
|
|
state.serialize_field("block_data_limit", &block_data_limit)?;
|
|
|
|
}
|
2022-09-06 23:12:45 +03:00
|
|
|
}
|
2022-08-10 08:56:09 +03:00
|
|
|
|
2023-01-04 09:37:51 +03:00
|
|
|
state.serialize_field("tier", &self.tier)?;
|
2023-01-05 01:33:39 +03:00
|
|
|
|
2022-08-10 08:56:09 +03:00
|
|
|
state.serialize_field("soft_limit", &self.soft_limit)?;
|
|
|
|
|
|
|
|
state.serialize_field(
|
|
|
|
"active_requests",
|
|
|
|
&self.active_requests.load(atomic::Ordering::Relaxed),
|
|
|
|
)?;
|
|
|
|
|
2022-09-06 23:12:45 +03:00
|
|
|
state.serialize_field(
|
|
|
|
"total_requests",
|
2022-11-25 03:45:13 +03:00
|
|
|
&self.frontend_requests.load(atomic::Ordering::Relaxed),
|
2022-09-06 23:12:45 +03:00
|
|
|
)?;
|
|
|
|
|
2022-12-28 05:17:11 +03:00
|
|
|
{
|
|
|
|
// TODO: maybe this is too much data. serialize less?
|
|
|
|
let head_block = &*self.head_block.read();
|
|
|
|
state.serialize_field("head_block", head_block)?;
|
|
|
|
}
|
2022-09-05 19:39:46 +03:00
|
|
|
|
2022-08-10 08:56:09 +03:00
|
|
|
state.end()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
impl fmt::Debug for Web3Rpc {
|
2022-08-10 08:56:09 +03:00
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
2023-02-06 20:55:27 +03:00
|
|
|
let mut f = f.debug_struct("Web3Rpc");
|
2022-08-10 08:56:09 +03:00
|
|
|
|
2022-08-24 03:32:16 +03:00
|
|
|
f.field("name", &self.name);
|
2022-08-10 08:56:09 +03:00
|
|
|
|
|
|
|
let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed);
|
|
|
|
if block_data_limit == u64::MAX {
|
2022-09-07 06:54:16 +03:00
|
|
|
f.field("blocks", &"all");
|
2022-08-10 08:56:09 +03:00
|
|
|
} else {
|
2022-09-07 06:54:16 +03:00
|
|
|
f.field("blocks", &block_data_limit);
|
2022-08-10 08:56:09 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
f.finish_non_exhaustive()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
impl fmt::Display for Web3Rpc {
|
2022-08-10 08:56:09 +03:00
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
|
// TODO: filter basic auth and api keys
|
2022-08-24 03:32:16 +03:00
|
|
|
write!(f, "{}", &self.name)
|
2022-08-10 08:56:09 +03:00
|
|
|
}
|
|
|
|
}
|
2022-11-22 23:23:08 +03:00
|
|
|
|
|
|
|
mod tests {
|
2022-11-30 00:34:42 +03:00
|
|
|
#![allow(unused_imports)]
|
2022-11-22 23:23:08 +03:00
|
|
|
use super::*;
|
2022-12-05 04:10:20 +03:00
|
|
|
use ethers::types::{Block, U256};
|
|
|
|
use std::time::{SystemTime, UNIX_EPOCH};
|
2022-11-22 23:23:08 +03:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_archive_node_has_block_data() {
|
2022-12-05 04:10:20 +03:00
|
|
|
let now = SystemTime::now()
|
|
|
|
.duration_since(UNIX_EPOCH)
|
|
|
|
.expect("cannot tell the time")
|
|
|
|
.as_secs()
|
|
|
|
.into();
|
|
|
|
|
2022-12-03 08:31:03 +03:00
|
|
|
let random_block = Block {
|
|
|
|
hash: Some(H256::random()),
|
|
|
|
number: Some(1_000_000.into()),
|
2022-12-05 04:10:20 +03:00
|
|
|
timestamp: now,
|
2022-12-03 08:31:03 +03:00
|
|
|
..Default::default()
|
2022-11-22 23:23:08 +03:00
|
|
|
};
|
2022-12-03 08:31:03 +03:00
|
|
|
|
|
|
|
let random_block = Arc::new(random_block);
|
|
|
|
|
|
|
|
let head_block = SavedBlock::new(random_block);
|
2022-11-22 23:44:23 +03:00
|
|
|
let block_data_limit = u64::MAX;
|
2022-11-22 23:23:08 +03:00
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
let x = Web3Rpc {
|
2022-11-22 23:23:08 +03:00
|
|
|
name: "name".to_string(),
|
|
|
|
url: "ws://example.com".to_string(),
|
|
|
|
soft_limit: 1_000,
|
2022-12-06 00:13:36 +03:00
|
|
|
automatic_block_limit: false,
|
2023-01-19 13:13:00 +03:00
|
|
|
backup: false,
|
2022-11-22 23:44:23 +03:00
|
|
|
block_data_limit: block_data_limit.into(),
|
2023-01-04 09:37:51 +03:00
|
|
|
tier: 0,
|
2022-12-03 08:31:03 +03:00
|
|
|
head_block: RwLock::new(Some(head_block.clone())),
|
2023-02-06 20:55:27 +03:00
|
|
|
..Default::default()
|
2022-11-22 23:23:08 +03:00
|
|
|
};
|
|
|
|
|
|
|
|
assert!(x.has_block_data(&0.into()));
|
|
|
|
assert!(x.has_block_data(&1.into()));
|
2022-12-03 08:31:03 +03:00
|
|
|
assert!(x.has_block_data(&head_block.number()));
|
|
|
|
assert!(!x.has_block_data(&(head_block.number() + 1)));
|
|
|
|
assert!(!x.has_block_data(&(head_block.number() + 1000)));
|
2022-11-22 23:23:08 +03:00
|
|
|
}
|
2022-11-22 23:44:23 +03:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_pruned_node_has_block_data() {
|
2022-12-05 04:10:20 +03:00
|
|
|
let now = SystemTime::now()
|
|
|
|
.duration_since(UNIX_EPOCH)
|
|
|
|
.expect("cannot tell the time")
|
|
|
|
.as_secs()
|
|
|
|
.into();
|
|
|
|
|
2022-12-03 08:31:03 +03:00
|
|
|
let head_block: SavedBlock = Arc::new(Block {
|
|
|
|
hash: Some(H256::random()),
|
|
|
|
number: Some(1_000_000.into()),
|
2022-12-05 04:10:20 +03:00
|
|
|
timestamp: now,
|
2022-12-03 08:31:03 +03:00
|
|
|
..Default::default()
|
|
|
|
})
|
|
|
|
.into();
|
2022-11-22 23:44:23 +03:00
|
|
|
|
|
|
|
let block_data_limit = 64;
|
|
|
|
|
2022-11-25 03:45:13 +03:00
|
|
|
// TODO: this is getting long. have a `impl Default`
|
2023-02-06 20:55:27 +03:00
|
|
|
let x = Web3Rpc {
|
2022-11-22 23:44:23 +03:00
|
|
|
name: "name".to_string(),
|
|
|
|
soft_limit: 1_000,
|
2022-12-06 00:13:36 +03:00
|
|
|
automatic_block_limit: false,
|
2023-01-19 13:13:00 +03:00
|
|
|
backup: false,
|
2022-11-22 23:44:23 +03:00
|
|
|
block_data_limit: block_data_limit.into(),
|
2023-01-04 09:37:51 +03:00
|
|
|
tier: 0,
|
2022-12-03 08:31:03 +03:00
|
|
|
head_block: RwLock::new(Some(head_block.clone())),
|
2023-02-06 20:55:27 +03:00
|
|
|
..Default::default()
|
2022-11-22 23:44:23 +03:00
|
|
|
};
|
|
|
|
|
|
|
|
assert!(!x.has_block_data(&0.into()));
|
|
|
|
assert!(!x.has_block_data(&1.into()));
|
2022-12-03 08:31:03 +03:00
|
|
|
assert!(!x.has_block_data(&(head_block.number() - block_data_limit - 1)));
|
|
|
|
assert!(x.has_block_data(&(head_block.number() - block_data_limit)));
|
|
|
|
assert!(x.has_block_data(&head_block.number()));
|
|
|
|
assert!(!x.has_block_data(&(head_block.number() + 1)));
|
|
|
|
assert!(!x.has_block_data(&(head_block.number() + 1000)));
|
2022-11-22 23:44:23 +03:00
|
|
|
}
|
2022-12-05 04:10:20 +03:00
|
|
|
|
2023-01-19 14:05:39 +03:00
|
|
|
/*
|
|
|
|
// TODO: think about how to bring the concept of a "lagged" node back
|
2022-12-05 04:10:20 +03:00
|
|
|
#[test]
|
|
|
|
fn test_lagged_node_not_has_block_data() {
|
|
|
|
let now: U256 = SystemTime::now()
|
|
|
|
.duration_since(UNIX_EPOCH)
|
|
|
|
.expect("cannot tell the time")
|
|
|
|
.as_secs()
|
|
|
|
.into();
|
|
|
|
|
|
|
|
// head block is an hour old
|
|
|
|
let head_block = Block {
|
|
|
|
hash: Some(H256::random()),
|
|
|
|
number: Some(1_000_000.into()),
|
|
|
|
timestamp: now - 3600,
|
|
|
|
..Default::default()
|
|
|
|
};
|
|
|
|
|
|
|
|
let head_block = Arc::new(head_block);
|
|
|
|
|
|
|
|
let head_block = SavedBlock::new(head_block);
|
|
|
|
let block_data_limit = u64::MAX;
|
|
|
|
|
|
|
|
let metrics = OpenRequestHandleMetrics::default();
|
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
let x = Web3Rpc {
|
2022-12-05 04:10:20 +03:00
|
|
|
name: "name".to_string(),
|
2023-01-03 22:54:24 +03:00
|
|
|
db_conn: None,
|
2022-12-05 04:10:20 +03:00
|
|
|
display_name: None,
|
|
|
|
url: "ws://example.com".to_string(),
|
|
|
|
http_client: None,
|
|
|
|
active_requests: 0.into(),
|
|
|
|
frontend_requests: 0.into(),
|
|
|
|
internal_requests: 0.into(),
|
2022-12-06 00:13:36 +03:00
|
|
|
provider_state: AsyncRwLock::new(ProviderState::None),
|
2022-12-05 04:10:20 +03:00
|
|
|
hard_limit: None,
|
|
|
|
soft_limit: 1_000,
|
2022-12-06 00:13:36 +03:00
|
|
|
automatic_block_limit: false,
|
2023-01-19 13:13:00 +03:00
|
|
|
backup: false,
|
2022-12-05 04:10:20 +03:00
|
|
|
block_data_limit: block_data_limit.into(),
|
2023-01-04 09:37:51 +03:00
|
|
|
tier: 0,
|
2022-12-05 04:10:20 +03:00
|
|
|
head_block: RwLock::new(Some(head_block.clone())),
|
|
|
|
};
|
|
|
|
|
|
|
|
assert!(!x.has_block_data(&0.into()));
|
|
|
|
assert!(!x.has_block_data(&1.into()));
|
|
|
|
assert!(!x.has_block_data(&head_block.number()));
|
|
|
|
assert!(!x.has_block_data(&(head_block.number() + 1)));
|
|
|
|
assert!(!x.has_block_data(&(head_block.number() + 1000)));
|
|
|
|
}
|
2023-01-19 14:05:39 +03:00
|
|
|
*/
|
2022-11-22 23:23:08 +03:00
|
|
|
}
|