2023-05-31 02:32:34 +03:00
|
|
|
//! Rate-limited communication with a web3 provider.
|
2023-02-26 10:52:33 +03:00
|
|
|
use super::blockchain::{ArcBlock, BlocksByHashCache, Web3ProxyBlock};
|
2023-09-27 04:18:06 +03:00
|
|
|
use super::provider::{connect_ws, EthersWsProvider};
|
2023-02-06 05:16:09 +03:00
|
|
|
use super::request::{OpenRequestHandle, OpenRequestResult};
|
2023-05-24 00:40:34 +03:00
|
|
|
use crate::app::{flatten_handle, Web3ProxyJoinHandle};
|
2023-02-12 12:22:53 +03:00
|
|
|
use crate::config::{BlockAndRpc, Web3RpcConfig};
|
2023-06-29 07:34:51 +03:00
|
|
|
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
|
2023-10-03 23:46:27 +03:00
|
|
|
use crate::frontend::authorization::Web3Request;
|
2023-09-27 04:18:06 +03:00
|
|
|
use crate::jsonrpc::{self, JsonRpcParams, JsonRpcResultData};
|
2023-04-06 01:34:28 +03:00
|
|
|
use crate::rpcs::request::RequestErrorHandler;
|
2023-02-12 12:22:53 +03:00
|
|
|
use anyhow::{anyhow, Context};
|
2023-06-13 04:58:51 +03:00
|
|
|
use arc_swap::ArcSwapOption;
|
2023-09-13 22:05:47 +03:00
|
|
|
use ethers::prelude::{Address, Bytes, Middleware, Transaction, TxHash, U256, U64};
|
2023-07-11 06:59:03 +03:00
|
|
|
use futures::stream::FuturesUnordered;
|
2023-03-31 14:43:41 +03:00
|
|
|
use futures::StreamExt;
|
2023-06-18 19:47:40 +03:00
|
|
|
use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency};
|
2022-11-14 21:24:52 +03:00
|
|
|
use migration::sea_orm::DatabaseConnection;
|
2023-10-06 04:57:04 +03:00
|
|
|
use nanorand::tls::TlsWyRand;
|
2023-06-13 05:13:06 +03:00
|
|
|
use nanorand::Rng;
|
2022-09-15 20:57:24 +03:00
|
|
|
use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter};
|
2022-05-21 01:16:15 +03:00
|
|
|
use serde::ser::{SerializeStruct, Serializer};
|
|
|
|
use serde::Serialize;
|
2022-09-24 05:47:44 +03:00
|
|
|
use serde_json::json;
|
2023-06-09 23:09:58 +03:00
|
|
|
use std::cmp::Reverse;
|
2022-05-05 22:07:09 +03:00
|
|
|
use std::fmt;
|
2022-06-14 07:04:14 +03:00
|
|
|
use std::hash::{Hash, Hasher};
|
2023-06-13 06:44:52 +03:00
|
|
|
use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize};
|
2022-05-05 22:07:09 +03:00
|
|
|
use std::{cmp::Ordering, sync::Arc};
|
2023-07-11 09:08:06 +03:00
|
|
|
use tokio::sync::{mpsc, watch, RwLock as AsyncRwLock};
|
2023-06-21 00:22:14 +03:00
|
|
|
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
|
2023-07-11 06:59:17 +03:00
|
|
|
use tracing::{debug, error, info, trace, warn, Level};
|
2023-05-11 23:07:31 +03:00
|
|
|
use url::Url;
|
2022-12-06 00:13:36 +03:00
|
|
|
|
2022-09-15 20:57:24 +03:00
|
|
|
/// An active connection to a Web3 RPC server like geth or erigon.
|
2023-10-06 04:57:04 +03:00
|
|
|
/// TODO: smarter Default derive or move the channels around so they aren't part of this at all
|
2023-02-03 01:48:23 +03:00
|
|
|
#[derive(Default)]
|
2023-02-06 20:55:27 +03:00
|
|
|
pub struct Web3Rpc {
|
2022-08-24 03:11:49 +03:00
|
|
|
pub name: String,
|
2023-10-03 23:46:27 +03:00
|
|
|
pub chain_id: u64,
|
2023-06-13 08:26:10 +03:00
|
|
|
pub block_interval: Duration,
|
2022-11-14 00:05:37 +03:00
|
|
|
pub display_name: Option<String>,
|
2023-01-03 22:54:24 +03:00
|
|
|
pub db_conn: Option<DatabaseConnection>,
|
2023-09-13 22:05:47 +03:00
|
|
|
pub subscribe_txs: bool,
|
2023-05-23 01:32:15 +03:00
|
|
|
/// most all requests prefer use the http_provider
|
2023-09-27 04:18:06 +03:00
|
|
|
pub(super) http_client: Option<reqwest::Client>,
|
|
|
|
pub(super) http_url: Option<Url>,
|
2023-06-13 04:58:51 +03:00
|
|
|
/// the websocket url is only used for subscriptions
|
|
|
|
pub(super) ws_url: Option<Url>,
|
2023-05-23 01:32:15 +03:00
|
|
|
/// the websocket provider is only used for subscriptions
|
2023-06-13 04:58:51 +03:00
|
|
|
pub(super) ws_provider: ArcSwapOption<EthersWsProvider>,
|
2023-05-13 01:15:32 +03:00
|
|
|
/// keep track of hard limits
|
2023-06-13 04:58:51 +03:00
|
|
|
/// hard_limit_until is only inside an Option so that the "Default" derive works. it will always be set.
|
2023-01-25 07:44:50 +03:00
|
|
|
pub(super) hard_limit_until: Option<watch::Sender<Instant>>,
|
2022-08-24 02:13:56 +03:00
|
|
|
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
|
2022-09-15 20:57:24 +03:00
|
|
|
/// We do not use the deferred rate limiter because going over limits would cause errors
|
2022-11-23 01:45:22 +03:00
|
|
|
pub(super) hard_limit: Option<RedisRateLimiter>,
|
2023-05-13 01:15:32 +03:00
|
|
|
/// used for ensuring enough requests are available before advancing the head block
|
2022-08-26 20:26:17 +03:00
|
|
|
pub(super) soft_limit: u32,
|
2022-12-06 00:13:36 +03:00
|
|
|
/// use web3 queries to find the block data limit for archive/pruned nodes
|
|
|
|
pub(super) automatic_block_limit: bool,
|
2023-01-19 13:13:00 +03:00
|
|
|
/// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs
|
2023-03-01 22:23:59 +03:00
|
|
|
pub backup: bool,
|
2022-09-06 23:12:45 +03:00
|
|
|
/// TODO: have an enum for this so that "no limit" prints pretty?
|
2022-11-23 01:45:22 +03:00
|
|
|
pub(super) block_data_limit: AtomicU64,
|
2023-06-13 04:58:51 +03:00
|
|
|
/// head_block is only inside an Option so that the "Default" derive works. it will always be set.
|
2023-10-03 23:46:27 +03:00
|
|
|
pub(super) head_block_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
|
2023-06-18 19:47:40 +03:00
|
|
|
/// Track head block latency.
|
2023-07-10 21:05:07 +03:00
|
|
|
pub(super) head_delay: AsyncRwLock<EwmaLatency>,
|
2023-05-11 23:09:15 +03:00
|
|
|
/// Track peak request latency
|
2023-06-13 04:58:51 +03:00
|
|
|
/// peak_latency is only inside an Option so that the "Default" derive works. it will always be set.
|
2023-05-11 23:09:15 +03:00
|
|
|
pub(super) peak_latency: Option<PeakEwmaLatency>,
|
2023-06-09 22:21:50 +03:00
|
|
|
/// Automatically set priority
|
2023-06-13 06:44:52 +03:00
|
|
|
pub(super) tier: AtomicU32,
|
2023-06-18 19:47:40 +03:00
|
|
|
/// Track total internal requests served
|
2023-06-13 21:51:19 +03:00
|
|
|
pub(super) internal_requests: AtomicUsize,
|
2023-06-18 19:47:40 +03:00
|
|
|
/// Track total external requests served
|
2023-06-13 21:51:19 +03:00
|
|
|
pub(super) external_requests: AtomicUsize,
|
2023-07-11 08:17:15 +03:00
|
|
|
/// If the head block is too old, it is ignored.
|
|
|
|
pub(super) max_head_block_age: Duration,
|
2023-06-18 19:47:40 +03:00
|
|
|
/// Track time used by external requests served
|
|
|
|
/// request_ms_histogram is only inside an Option so that the "Default" derive works. it will always be set.
|
2023-06-18 20:46:22 +03:00
|
|
|
pub(super) median_latency: Option<RollingQuantileLatency>,
|
2023-06-13 21:51:19 +03:00
|
|
|
/// Track in-flight requests
|
2023-02-16 11:26:58 +03:00
|
|
|
pub(super) active_requests: AtomicUsize,
|
2023-06-13 04:58:51 +03:00
|
|
|
/// disconnect_watch is only inside an Option so that the "Default" derive works. it will always be set.
|
2023-02-27 10:52:37 +03:00
|
|
|
pub(super) disconnect_watch: Option<watch::Sender<bool>>,
|
2023-06-13 04:58:51 +03:00
|
|
|
/// created_at is only inside an Option so that the "Default" derive works. it will always be set.
|
2023-02-28 22:01:34 +03:00
|
|
|
pub(super) created_at: Option<Instant>,
|
2022-08-24 02:13:56 +03:00
|
|
|
}
|
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
impl Web3Rpc {
|
2022-06-14 07:04:14 +03:00
|
|
|
/// Connect to a web3 rpc
|
2023-01-04 09:37:51 +03:00
|
|
|
// TODO: have this take a builder (which will have channels attached). or maybe just take the config and give the config public fields
|
2022-06-14 08:43:28 +03:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
2022-06-14 07:04:14 +03:00
|
|
|
pub async fn spawn(
|
2023-06-25 07:16:26 +03:00
|
|
|
config: Web3RpcConfig,
|
2022-08-10 08:56:09 +03:00
|
|
|
name: String,
|
2022-07-19 07:21:32 +03:00
|
|
|
chain_id: u64,
|
2023-07-13 20:58:22 +03:00
|
|
|
// optional because this is only used for http providers. websocket-only providers don't use it
|
2022-07-19 07:21:32 +03:00
|
|
|
http_client: Option<reqwest::Client>,
|
2023-02-12 12:22:53 +03:00
|
|
|
redis_pool: Option<RedisPool>,
|
2023-08-03 21:54:50 +03:00
|
|
|
server_id: i64,
|
2023-05-24 00:40:34 +03:00
|
|
|
block_interval: Duration,
|
2023-02-26 10:52:33 +03:00
|
|
|
block_map: BlocksByHashCache,
|
2023-07-11 09:08:06 +03:00
|
|
|
block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
|
2023-09-13 22:05:47 +03:00
|
|
|
pending_txid_firehose_sender: Option<mpsc::Sender<TxHash>>,
|
2023-07-11 08:17:15 +03:00
|
|
|
max_head_block_age: Duration,
|
2023-05-24 00:40:34 +03:00
|
|
|
) -> anyhow::Result<(Arc<Web3Rpc>, Web3ProxyJoinHandle<()>)> {
|
2023-02-28 22:01:34 +03:00
|
|
|
let created_at = Instant::now();
|
|
|
|
|
2023-02-12 12:22:53 +03:00
|
|
|
let hard_limit = match (config.hard_limit, redis_pool) {
|
|
|
|
(None, None) => None,
|
|
|
|
(Some(hard_limit), Some(redis_pool)) => {
|
2023-08-03 21:54:50 +03:00
|
|
|
let label = if config.hard_limit_per_endpoint {
|
|
|
|
format!("{}:{}:{}", chain_id, "endpoint", name)
|
|
|
|
} else {
|
|
|
|
format!("{}:{}:{}", chain_id, server_id, name)
|
|
|
|
};
|
|
|
|
|
|
|
|
// TODO: in process rate limiter instead? or maybe deferred? or is this good enough?
|
2023-02-12 12:22:53 +03:00
|
|
|
let rrl = RedisRateLimiter::new(
|
|
|
|
"web3_proxy",
|
2023-08-03 21:54:50 +03:00
|
|
|
&label,
|
2023-02-12 12:22:53 +03:00
|
|
|
hard_limit,
|
2023-08-03 21:54:50 +03:00
|
|
|
config.hard_limit_period as f32,
|
2023-02-12 12:22:53 +03:00
|
|
|
redis_pool,
|
|
|
|
);
|
|
|
|
|
|
|
|
Some(rrl)
|
|
|
|
}
|
|
|
|
(None, Some(_)) => None,
|
|
|
|
(Some(_hard_limit), None) => {
|
|
|
|
return Err(anyhow::anyhow!(
|
|
|
|
"no redis client pool! needed for hard limit"
|
|
|
|
))
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let backup = config.backup;
|
|
|
|
|
2023-09-12 23:35:51 +03:00
|
|
|
let block_data_limit: AtomicU64 = config.block_data_limit.into();
|
2023-09-13 22:35:09 +03:00
|
|
|
let automatic_block_limit = (block_data_limit.load(atomic::Ordering::Relaxed) == 0)
|
2023-06-16 10:46:27 +03:00
|
|
|
&& block_and_rpc_sender.is_some();
|
2022-11-25 03:45:13 +03:00
|
|
|
|
2023-05-13 01:15:32 +03:00
|
|
|
// have a sender for tracking hard limit anywhere. we use this in case we
|
2023-01-25 07:44:50 +03:00
|
|
|
// and track on servers that have a configured hard limit
|
2023-05-13 01:15:32 +03:00
|
|
|
let (hard_limit_until, _) = watch::channel(Instant::now());
|
2023-01-25 07:44:50 +03:00
|
|
|
|
2023-02-12 12:22:53 +03:00
|
|
|
if config.ws_url.is_none() && config.http_url.is_none() {
|
2023-06-25 07:16:26 +03:00
|
|
|
return Err(anyhow!(
|
|
|
|
"either ws_url or http_url are required. it is best to set both. they must both point to the same server!"
|
|
|
|
));
|
2023-02-12 12:22:53 +03:00
|
|
|
}
|
|
|
|
|
2023-05-13 01:15:32 +03:00
|
|
|
let (head_block, _) = watch::channel(None);
|
|
|
|
|
2023-05-11 23:09:15 +03:00
|
|
|
// Spawn the task for calculting average peak latency
|
|
|
|
// TODO Should these defaults be in config
|
|
|
|
let peak_latency = PeakEwmaLatency::spawn(
|
|
|
|
// Decay over 15s
|
2023-05-23 01:43:39 +03:00
|
|
|
Duration::from_secs(15),
|
2023-05-11 23:09:15 +03:00
|
|
|
// Peak requests so far around 5k, we will use an order of magnitude
|
|
|
|
// more to be safe. Should only use about 50mb RAM
|
|
|
|
50_000,
|
|
|
|
// Start latency at 1 second
|
|
|
|
Duration::from_secs(1),
|
|
|
|
);
|
|
|
|
|
2023-06-18 20:46:22 +03:00
|
|
|
let median_request_latency = RollingQuantileLatency::spawn_median(1_000).await;
|
2023-06-18 19:47:40 +03:00
|
|
|
|
2023-09-27 04:18:06 +03:00
|
|
|
let (http_url, http_client) = if let Some(http_url) = config.http_url {
|
2023-05-23 01:32:15 +03:00
|
|
|
let http_url = http_url.parse::<Url>()?;
|
2023-09-27 04:18:06 +03:00
|
|
|
// TODO: double-check not missing anything from connect_http()
|
|
|
|
let http_client = http_client.unwrap_or_default();
|
|
|
|
(Some(http_url), Some(http_client))
|
2023-05-11 23:07:31 +03:00
|
|
|
} else {
|
2023-09-27 04:18:06 +03:00
|
|
|
(None, None)
|
2023-05-11 23:07:31 +03:00
|
|
|
};
|
|
|
|
|
2023-06-13 04:58:51 +03:00
|
|
|
let ws_url = if let Some(ws_url) = config.ws_url {
|
2023-05-23 01:32:15 +03:00
|
|
|
let ws_url = ws_url.parse::<Url>()?;
|
|
|
|
|
2023-06-13 04:58:51 +03:00
|
|
|
Some(ws_url)
|
2023-05-11 23:07:31 +03:00
|
|
|
} else {
|
|
|
|
None
|
|
|
|
};
|
|
|
|
|
2023-05-24 00:40:34 +03:00
|
|
|
let (disconnect_watch, _) = watch::channel(false);
|
|
|
|
|
2023-05-23 01:32:15 +03:00
|
|
|
let new_rpc = Self {
|
2022-12-06 00:13:36 +03:00
|
|
|
automatic_block_limit,
|
2023-01-19 13:13:00 +03:00
|
|
|
backup,
|
2022-11-25 03:45:13 +03:00
|
|
|
block_data_limit,
|
2023-06-13 08:26:10 +03:00
|
|
|
block_interval,
|
2023-02-28 22:01:34 +03:00
|
|
|
created_at: Some(created_at),
|
2023-05-23 01:32:15 +03:00
|
|
|
display_name: config.display_name,
|
|
|
|
hard_limit,
|
|
|
|
hard_limit_until: Some(hard_limit_until),
|
2023-10-03 23:46:27 +03:00
|
|
|
head_block_sender: Some(head_block),
|
2023-09-27 04:18:06 +03:00
|
|
|
http_url,
|
|
|
|
http_client,
|
2023-07-11 08:17:15 +03:00
|
|
|
max_head_block_age,
|
2023-05-23 01:32:15 +03:00
|
|
|
name,
|
2023-05-11 23:09:15 +03:00
|
|
|
peak_latency: Some(peak_latency),
|
2023-06-18 20:46:22 +03:00
|
|
|
median_latency: Some(median_request_latency),
|
2023-05-23 01:32:15 +03:00
|
|
|
soft_limit: config.soft_limit,
|
2023-09-13 22:05:47 +03:00
|
|
|
subscribe_txs: config.subscribe_txs,
|
2023-06-13 04:58:51 +03:00
|
|
|
ws_url,
|
2023-05-24 00:40:34 +03:00
|
|
|
disconnect_watch: Some(disconnect_watch),
|
2023-02-06 20:55:27 +03:00
|
|
|
..Default::default()
|
2022-05-12 21:49:57 +03:00
|
|
|
};
|
|
|
|
|
2023-05-23 01:32:15 +03:00
|
|
|
let new_connection = Arc::new(new_rpc);
|
2022-05-12 21:49:57 +03:00
|
|
|
|
2022-07-19 04:31:12 +03:00
|
|
|
// subscribe to new blocks and new transactions
|
2022-12-06 00:13:36 +03:00
|
|
|
// subscribing starts the connection (with retries)
|
2022-06-14 08:43:28 +03:00
|
|
|
let handle = {
|
2022-07-09 07:25:59 +03:00
|
|
|
let new_connection = new_connection.clone();
|
2022-06-14 08:43:28 +03:00
|
|
|
tokio::spawn(async move {
|
2022-07-09 07:25:59 +03:00
|
|
|
new_connection
|
2023-09-13 22:05:47 +03:00
|
|
|
.subscribe_with_reconnect(
|
|
|
|
block_map,
|
|
|
|
block_and_rpc_sender,
|
|
|
|
pending_txid_firehose_sender,
|
|
|
|
chain_id,
|
|
|
|
)
|
2022-06-14 08:43:28 +03:00
|
|
|
.await
|
|
|
|
})
|
|
|
|
};
|
|
|
|
|
2022-08-27 05:13:36 +03:00
|
|
|
Ok((new_connection, handle))
|
|
|
|
}
|
2022-07-19 04:31:12 +03:00
|
|
|
|
2023-10-06 04:57:04 +03:00
|
|
|
pub fn next_available(&self) -> Instant {
|
|
|
|
let hard_limit_until = *self.hard_limit_until.as_ref().unwrap().borrow();
|
|
|
|
|
|
|
|
hard_limit_until.max(Instant::now())
|
|
|
|
}
|
|
|
|
|
2023-06-09 23:09:58 +03:00
|
|
|
/// sort by...
|
2023-10-06 04:57:04 +03:00
|
|
|
/// - rate limit (ascending)
|
2023-06-18 20:09:21 +03:00
|
|
|
/// - backups last
|
|
|
|
/// - tier (ascending)
|
|
|
|
/// - block number (descending)
|
2023-06-09 23:09:58 +03:00
|
|
|
/// TODO: tests on this!
|
2023-06-17 22:15:40 +03:00
|
|
|
/// TODO: should tier or block number take priority?
|
2023-06-09 23:09:58 +03:00
|
|
|
/// TODO: should this return a struct that implements sorting traits?
|
2023-06-27 22:36:41 +03:00
|
|
|
/// TODO: move this to consensus.rs
|
2023-10-06 04:57:04 +03:00
|
|
|
fn sort_on(&self, max_block: Option<U64>) -> (Reverse<Instant>, bool, Reverse<U64>, u32) {
|
2023-06-09 23:09:58 +03:00
|
|
|
let mut head_block = self
|
2023-10-03 23:46:27 +03:00
|
|
|
.head_block_sender
|
2023-06-09 23:09:58 +03:00
|
|
|
.as_ref()
|
2023-10-03 23:46:27 +03:00
|
|
|
.and_then(|x| x.borrow().as_ref().map(|x| x.number()))
|
2023-06-09 23:09:58 +03:00
|
|
|
.unwrap_or_default();
|
|
|
|
|
|
|
|
if let Some(max_block) = max_block {
|
|
|
|
head_block = head_block.min(max_block);
|
|
|
|
}
|
|
|
|
|
|
|
|
let tier = self.tier.load(atomic::Ordering::Relaxed);
|
|
|
|
|
|
|
|
let backup = self.backup;
|
|
|
|
|
2023-10-06 04:57:04 +03:00
|
|
|
let rate_limit_until =
|
|
|
|
(*self.hard_limit_until.as_ref().unwrap().borrow()).max(Instant::now());
|
|
|
|
|
|
|
|
(
|
|
|
|
Reverse(rate_limit_until),
|
|
|
|
!backup,
|
|
|
|
Reverse(head_block),
|
|
|
|
tier,
|
|
|
|
)
|
2023-06-09 23:09:58 +03:00
|
|
|
}
|
|
|
|
|
2023-06-27 22:36:41 +03:00
|
|
|
/// TODO: move this to consensus.rs
|
2023-06-09 23:09:58 +03:00
|
|
|
pub fn sort_for_load_balancing_on(
|
|
|
|
&self,
|
|
|
|
max_block: Option<U64>,
|
2023-10-06 04:57:04 +03:00
|
|
|
) -> ((Reverse<Instant>, bool, Reverse<U64>, u32), Duration) {
|
2023-06-09 23:09:58 +03:00
|
|
|
let sort_on = self.sort_on(max_block);
|
|
|
|
|
2023-06-18 20:46:22 +03:00
|
|
|
let weighted_peak_latency = self.weighted_peak_latency();
|
2023-06-09 23:09:58 +03:00
|
|
|
|
2023-06-18 20:46:22 +03:00
|
|
|
let x = (sort_on, weighted_peak_latency);
|
2023-06-09 23:09:58 +03:00
|
|
|
|
|
|
|
trace!("sort_for_load_balancing {}: {:?}", self, x);
|
|
|
|
|
|
|
|
x
|
|
|
|
}
|
|
|
|
|
2023-06-18 20:46:22 +03:00
|
|
|
/// like sort_for_load_balancing, but shuffles tiers randomly instead of sorting by weighted_peak_latency
|
2023-06-27 22:36:41 +03:00
|
|
|
/// TODO: move this to consensus.rs
|
2023-10-06 04:57:04 +03:00
|
|
|
/// TODO: this return type is too complex
|
2023-06-09 23:09:58 +03:00
|
|
|
pub fn shuffle_for_load_balancing_on(
|
|
|
|
&self,
|
2023-10-06 04:57:04 +03:00
|
|
|
rng: &mut TlsWyRand,
|
2023-06-09 23:09:58 +03:00
|
|
|
max_block: Option<U64>,
|
2023-10-06 04:57:04 +03:00
|
|
|
) -> ((Reverse<Instant>, bool, Reverse<U64>, u32), u8) {
|
2023-06-09 23:09:58 +03:00
|
|
|
let sort_on = self.sort_on(max_block);
|
|
|
|
|
2023-06-13 08:42:22 +03:00
|
|
|
let r = rng.generate::<u8>();
|
2023-06-09 23:09:58 +03:00
|
|
|
|
|
|
|
(sort_on, r)
|
|
|
|
}
|
|
|
|
|
2023-06-18 20:46:22 +03:00
|
|
|
pub fn weighted_peak_latency(&self) -> Duration {
|
2023-05-24 00:52:45 +03:00
|
|
|
let peak_latency = if let Some(peak_latency) = self.peak_latency.as_ref() {
|
2023-06-18 20:46:22 +03:00
|
|
|
peak_latency.latency()
|
2023-05-24 00:52:45 +03:00
|
|
|
} else {
|
2023-06-18 20:46:22 +03:00
|
|
|
Duration::from_secs(1)
|
2023-05-24 00:52:45 +03:00
|
|
|
};
|
2023-02-16 11:26:58 +03:00
|
|
|
|
2023-09-13 22:35:09 +03:00
|
|
|
// TODO: what scaling?
|
|
|
|
// TODO: figure out how many requests add what level of latency
|
|
|
|
let request_scaling = 0.01;
|
2023-02-16 11:26:58 +03:00
|
|
|
// TODO: what ordering?
|
2023-09-13 22:35:09 +03:00
|
|
|
let active_requests =
|
|
|
|
self.active_requests.load(atomic::Ordering::Relaxed) as f32 * request_scaling + 1.0;
|
2023-02-16 11:26:58 +03:00
|
|
|
|
2023-06-18 20:46:22 +03:00
|
|
|
peak_latency.mul_f32(active_requests)
|
2023-02-16 11:26:58 +03:00
|
|
|
}
|
|
|
|
|
2022-12-28 05:17:11 +03:00
|
|
|
// TODO: would be great if rpcs exposed this. see https://github.com/ledgerwatch/erigon/issues/6391
|
2023-05-31 02:32:34 +03:00
|
|
|
async fn check_block_data_limit(self: &Arc<Self>) -> anyhow::Result<Option<u64>> {
|
2022-12-06 00:13:36 +03:00
|
|
|
if !self.automatic_block_limit {
|
2022-12-28 05:17:11 +03:00
|
|
|
// TODO: is this a good thing to return?
|
2022-12-06 00:13:36 +03:00
|
|
|
return Ok(None);
|
|
|
|
}
|
|
|
|
|
2023-01-19 14:05:39 +03:00
|
|
|
// TODO: check eth_syncing. if it is not false, return Ok(None)
|
2022-12-06 00:13:36 +03:00
|
|
|
|
2022-12-28 05:17:11 +03:00
|
|
|
let mut limit = None;
|
|
|
|
|
2022-11-25 03:45:13 +03:00
|
|
|
// TODO: binary search between 90k and max?
|
2022-12-06 00:13:36 +03:00
|
|
|
// TODO: start at 0 or 1?
|
2022-11-25 03:45:13 +03:00
|
|
|
for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] {
|
2023-06-21 00:22:14 +03:00
|
|
|
let head_block_num = self
|
|
|
|
.internal_request::<_, U256>(
|
|
|
|
"eth_blockNumber",
|
2023-06-29 09:00:34 +03:00
|
|
|
&[(); 0],
|
2023-06-21 00:22:14 +03:00
|
|
|
// error here are expected, so keep the level low
|
2023-06-24 02:28:45 +03:00
|
|
|
Some(Level::DEBUG.into()),
|
2023-06-21 00:22:14 +03:00
|
|
|
Some(Duration::from_secs(5)),
|
|
|
|
)
|
2022-12-06 00:13:36 +03:00
|
|
|
.await
|
2023-06-21 00:22:14 +03:00
|
|
|
.context("head_block_num error during check_block_data_limit")?;
|
2022-09-06 06:26:23 +03:00
|
|
|
|
2022-11-25 03:45:13 +03:00
|
|
|
let maybe_archive_block = head_block_num.saturating_sub((block_data_limit).into());
|
2022-08-27 05:13:36 +03:00
|
|
|
|
2022-12-06 00:13:36 +03:00
|
|
|
trace!(
|
|
|
|
"checking maybe_archive_block on {}: {}",
|
|
|
|
self,
|
|
|
|
maybe_archive_block
|
|
|
|
);
|
|
|
|
|
2022-09-06 06:26:23 +03:00
|
|
|
// TODO: wait for the handle BEFORE we check the current block number. it might be delayed too!
|
2022-09-20 09:00:27 +03:00
|
|
|
// TODO: what should the request be?
|
2023-05-24 00:40:34 +03:00
|
|
|
let archive_result: Result<Bytes, _> = self
|
2023-05-31 02:32:34 +03:00
|
|
|
.internal_request(
|
2022-08-27 05:13:36 +03:00
|
|
|
"eth_getCode",
|
2022-09-24 05:47:44 +03:00
|
|
|
&json!((
|
2022-08-27 05:13:36 +03:00
|
|
|
"0xdead00000000000000000000000000000000beef",
|
|
|
|
maybe_archive_block,
|
2022-09-24 05:47:44 +03:00
|
|
|
)),
|
2022-09-21 07:48:21 +03:00
|
|
|
// error here are expected, so keep the level low
|
2023-06-24 02:28:45 +03:00
|
|
|
Some(Level::TRACE.into()),
|
2023-06-21 00:22:14 +03:00
|
|
|
Some(Duration::from_secs(5)),
|
2022-08-27 05:13:36 +03:00
|
|
|
)
|
|
|
|
.await;
|
|
|
|
|
2022-11-25 03:45:13 +03:00
|
|
|
trace!(
|
2022-12-06 00:13:36 +03:00
|
|
|
"archive_result on {} for {} ({}): {:?}",
|
|
|
|
self,
|
2022-11-25 03:45:13 +03:00
|
|
|
block_data_limit,
|
2022-12-06 00:13:36 +03:00
|
|
|
maybe_archive_block,
|
2022-11-25 03:45:13 +03:00
|
|
|
archive_result
|
|
|
|
);
|
2022-08-27 05:13:36 +03:00
|
|
|
|
2022-11-25 03:45:13 +03:00
|
|
|
if archive_result.is_err() {
|
2022-08-27 05:13:36 +03:00
|
|
|
break;
|
2022-07-19 04:31:12 +03:00
|
|
|
}
|
2022-11-25 03:45:13 +03:00
|
|
|
|
|
|
|
limit = Some(block_data_limit);
|
2022-07-19 04:31:12 +03:00
|
|
|
}
|
|
|
|
|
2022-08-27 05:13:36 +03:00
|
|
|
if let Some(limit) = limit {
|
2023-01-03 22:54:24 +03:00
|
|
|
if limit == 0 {
|
|
|
|
warn!("{} is unable to serve requests", self);
|
|
|
|
}
|
|
|
|
|
2022-08-27 05:13:36 +03:00
|
|
|
self.block_data_limit
|
2023-09-13 22:35:09 +03:00
|
|
|
.store(limit, atomic::Ordering::Relaxed);
|
2022-08-27 05:13:36 +03:00
|
|
|
}
|
2022-07-19 04:31:12 +03:00
|
|
|
|
2023-02-22 08:14:49 +03:00
|
|
|
if limit == Some(u64::MAX) {
|
|
|
|
info!("block data limit on {}: archive", self);
|
|
|
|
} else {
|
|
|
|
info!("block data limit on {}: {:?}", self, limit);
|
|
|
|
}
|
2022-11-25 03:45:13 +03:00
|
|
|
|
2022-08-27 05:13:36 +03:00
|
|
|
Ok(limit)
|
2022-07-09 07:25:59 +03:00
|
|
|
}
|
|
|
|
|
2022-11-04 01:16:27 +03:00
|
|
|
/// TODO: this might be too simple. different nodes can prune differently. its possible we will have a block range
|
2022-07-25 03:27:00 +03:00
|
|
|
pub fn block_data_limit(&self) -> U64 {
|
2023-01-03 22:54:24 +03:00
|
|
|
self.block_data_limit.load(atomic::Ordering::Acquire).into()
|
2022-07-19 04:31:12 +03:00
|
|
|
}
|
|
|
|
|
2023-05-13 01:15:32 +03:00
|
|
|
/// TODO: get rid of this now that consensus rpcs does it
|
2023-10-03 23:46:27 +03:00
|
|
|
pub fn has_block_data(&self, needed_block_num: U64) -> bool {
|
|
|
|
if let Some(head_block_sender) = self.head_block_sender.as_ref() {
|
|
|
|
// TODO: this needs a max of our overall head block number
|
|
|
|
let head_block_num = match head_block_sender.borrow().as_ref() {
|
|
|
|
None => return false,
|
|
|
|
Some(x) => x.number(),
|
|
|
|
};
|
2022-07-19 04:31:12 +03:00
|
|
|
|
2023-10-03 23:46:27 +03:00
|
|
|
// this rpc doesn't have that block yet. still syncing
|
|
|
|
if needed_block_num > head_block_num {
|
|
|
|
trace!(
|
|
|
|
"{} has head {} but needs {}",
|
|
|
|
self,
|
|
|
|
head_block_num,
|
|
|
|
needed_block_num,
|
|
|
|
);
|
|
|
|
return false;
|
|
|
|
}
|
2022-11-04 01:16:27 +03:00
|
|
|
|
2023-10-03 23:46:27 +03:00
|
|
|
// if this is a pruning node, we might not actually have the block
|
|
|
|
let block_data_limit: U64 = self.block_data_limit();
|
2022-11-04 01:16:27 +03:00
|
|
|
|
2023-10-03 23:46:27 +03:00
|
|
|
let oldest_block_num = head_block_num.saturating_sub(block_data_limit);
|
2022-07-19 04:31:12 +03:00
|
|
|
|
2023-10-03 23:46:27 +03:00
|
|
|
if needed_block_num < oldest_block_num {
|
|
|
|
trace!(
|
|
|
|
"{} needs {} but the oldest available is {}",
|
|
|
|
self,
|
|
|
|
needed_block_num,
|
|
|
|
oldest_block_num
|
|
|
|
);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
true
|
|
|
|
} else {
|
|
|
|
false
|
2023-01-26 08:24:09 +03:00
|
|
|
}
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
|
|
|
|
2023-05-23 01:32:15 +03:00
|
|
|
/// query the web3 provider to confirm it is on the expected chain with the expected data available
|
2023-06-13 04:58:51 +03:00
|
|
|
/// TODO: this currently checks only the http if both http and ws are set. it should check both and make sure they match
|
2023-05-24 00:40:34 +03:00
|
|
|
async fn check_provider(self: &Arc<Self>, chain_id: u64) -> Web3ProxyResult<()> {
|
2023-05-23 01:32:15 +03:00
|
|
|
// check the server's chain_id here
|
|
|
|
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
|
|
|
|
// TODO: what should the timeout be? should there be a request timeout?
|
|
|
|
// trace!("waiting on chain id for {}", self);
|
2023-05-31 02:32:34 +03:00
|
|
|
let found_chain_id: U64 = self
|
2023-06-21 00:22:14 +03:00
|
|
|
.internal_request(
|
|
|
|
"eth_chainId",
|
2023-06-29 09:00:34 +03:00
|
|
|
&[(); 0],
|
2023-06-24 02:28:45 +03:00
|
|
|
Some(Level::TRACE.into()),
|
2023-06-21 00:22:14 +03:00
|
|
|
Some(Duration::from_secs(5)),
|
|
|
|
)
|
2023-05-31 02:32:34 +03:00
|
|
|
.await?;
|
|
|
|
|
2023-05-23 01:32:15 +03:00
|
|
|
trace!("found_chain_id: {:#?}", found_chain_id);
|
|
|
|
|
2023-05-31 02:32:34 +03:00
|
|
|
if chain_id != found_chain_id.as_u64() {
|
|
|
|
return Err(anyhow::anyhow!(
|
|
|
|
"incorrect chain id! Config has {}, but RPC has {}",
|
|
|
|
chain_id,
|
|
|
|
found_chain_id
|
|
|
|
)
|
|
|
|
.context(format!("failed @ {}", self))
|
|
|
|
.into());
|
2023-05-23 01:32:15 +03:00
|
|
|
}
|
2022-12-06 00:13:36 +03:00
|
|
|
|
2023-06-27 22:36:41 +03:00
|
|
|
// TODO: only do this for balanced_rpcs. this errors on 4337 rpcs
|
2023-05-31 02:32:34 +03:00
|
|
|
self.check_block_data_limit()
|
2023-05-23 01:32:15 +03:00
|
|
|
.await
|
|
|
|
.context(format!("unable to check_block_data_limit of {}", self))?;
|
2023-02-28 00:29:07 +03:00
|
|
|
|
2023-05-23 01:32:15 +03:00
|
|
|
info!("successfully connected to {}", self);
|
2023-02-28 00:29:07 +03:00
|
|
|
|
2023-02-27 10:52:37 +03:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-05-13 01:15:32 +03:00
|
|
|
pub(crate) async fn send_head_block_result(
|
2022-08-26 20:26:17 +03:00
|
|
|
self: &Arc<Self>,
|
2023-05-24 00:40:34 +03:00
|
|
|
new_head_block: Web3ProxyResult<Option<ArcBlock>>,
|
2023-07-11 09:08:06 +03:00
|
|
|
block_and_rpc_sender: &mpsc::UnboundedSender<BlockAndRpc>,
|
2023-05-24 00:40:34 +03:00
|
|
|
block_map: &BlocksByHashCache,
|
|
|
|
) -> Web3ProxyResult<()> {
|
2023-10-03 23:46:27 +03:00
|
|
|
let head_block_sender = self.head_block_sender.as_ref().unwrap();
|
2023-02-28 22:01:34 +03:00
|
|
|
|
2023-06-13 19:31:22 +03:00
|
|
|
let new_head_block = match new_head_block {
|
|
|
|
Ok(x) => {
|
|
|
|
let x = x.and_then(Web3ProxyBlock::try_new);
|
|
|
|
|
|
|
|
match x {
|
|
|
|
None => {
|
|
|
|
if head_block_sender.borrow().is_none() {
|
|
|
|
// we previously sent a None. return early
|
|
|
|
return Ok(());
|
|
|
|
}
|
2022-11-06 23:52:11 +03:00
|
|
|
|
2023-06-13 19:31:22 +03:00
|
|
|
let age = self.created_at.unwrap().elapsed().as_millis();
|
2022-11-06 23:52:11 +03:00
|
|
|
|
2023-07-15 04:30:01 +03:00
|
|
|
trace!("clearing head block on {} ({}ms old)!", self, age);
|
2023-02-14 23:14:50 +03:00
|
|
|
|
2023-06-13 20:00:08 +03:00
|
|
|
// send an empty block to take this server out of rotation
|
2023-06-13 19:31:22 +03:00
|
|
|
head_block_sender.send_replace(None);
|
2022-08-26 20:26:17 +03:00
|
|
|
|
2023-06-13 20:00:08 +03:00
|
|
|
// TODO: clear self.block_data_limit?
|
|
|
|
|
2023-06-13 19:31:22 +03:00
|
|
|
None
|
|
|
|
}
|
|
|
|
Some(new_head_block) => {
|
|
|
|
let new_hash = *new_head_block.hash();
|
|
|
|
|
|
|
|
// if we already have this block saved, set new_head_block to that arc. otherwise store this copy
|
|
|
|
let new_head_block = block_map
|
|
|
|
.get_with_by_ref(&new_hash, async move { new_head_block })
|
|
|
|
.await;
|
|
|
|
|
2023-06-13 20:00:08 +03:00
|
|
|
// we are synced! yey!
|
2023-06-13 19:31:22 +03:00
|
|
|
head_block_sender.send_replace(Some(new_head_block.clone()));
|
|
|
|
|
|
|
|
if self.block_data_limit() == U64::zero() {
|
|
|
|
if let Err(err) = self.check_block_data_limit().await {
|
|
|
|
warn!(
|
|
|
|
"failed checking block limit after {} finished syncing. {:?}",
|
|
|
|
self, err
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
2022-07-19 04:31:12 +03:00
|
|
|
|
2023-06-13 19:31:22 +03:00
|
|
|
Some(new_head_block)
|
2023-01-03 22:54:24 +03:00
|
|
|
}
|
|
|
|
}
|
2022-05-15 09:27:13 +03:00
|
|
|
}
|
2022-11-06 23:52:11 +03:00
|
|
|
Err(err) => {
|
2023-06-29 07:08:43 +03:00
|
|
|
warn!(?err, "unable to get block from {}", self);
|
2022-11-06 23:52:11 +03:00
|
|
|
|
2023-06-13 20:00:08 +03:00
|
|
|
// send an empty block to take this server out of rotation
|
2023-06-13 19:31:22 +03:00
|
|
|
head_block_sender.send_replace(None);
|
2022-08-07 09:48:57 +03:00
|
|
|
|
2023-06-13 20:00:08 +03:00
|
|
|
// TODO: clear self.block_data_limit?
|
|
|
|
|
2022-12-03 08:31:03 +03:00
|
|
|
None
|
2022-05-15 09:27:13 +03:00
|
|
|
}
|
2022-12-03 08:31:03 +03:00
|
|
|
};
|
|
|
|
|
2023-06-13 20:00:08 +03:00
|
|
|
// tell web3rpcs about this rpc having this block
|
|
|
|
block_and_rpc_sender
|
2023-07-11 09:08:06 +03:00
|
|
|
.send((new_head_block, self.clone()))
|
2023-06-13 20:00:08 +03:00
|
|
|
.context("block_and_rpc_sender failed sending")?;
|
2022-05-30 07:30:13 +03:00
|
|
|
|
|
|
|
Ok(())
|
2022-05-15 09:27:13 +03:00
|
|
|
}
|
|
|
|
|
2023-03-02 21:27:32 +03:00
|
|
|
fn should_disconnect(&self) -> bool {
|
|
|
|
*self.disconnect_watch.as_ref().unwrap().borrow()
|
|
|
|
}
|
|
|
|
|
2023-05-24 00:40:34 +03:00
|
|
|
async fn healthcheck(
|
|
|
|
self: &Arc<Self>,
|
2023-05-31 02:32:34 +03:00
|
|
|
error_handler: Option<RequestErrorHandler>,
|
2023-05-24 00:40:34 +03:00
|
|
|
) -> Web3ProxyResult<()> {
|
2023-10-03 23:46:27 +03:00
|
|
|
let head_block = self.head_block_sender.as_ref().unwrap().borrow().clone();
|
2023-05-24 00:40:34 +03:00
|
|
|
|
|
|
|
if let Some(head_block) = head_block {
|
|
|
|
// TODO: if head block is very old and not expected to be syncing, emit warning
|
2023-07-11 08:17:15 +03:00
|
|
|
if head_block.age() > self.max_head_block_age {
|
|
|
|
return Err(anyhow::anyhow!("head_block is too old!").into());
|
|
|
|
}
|
|
|
|
|
2023-10-03 23:46:27 +03:00
|
|
|
let block_number = head_block.number();
|
2023-05-24 00:40:34 +03:00
|
|
|
|
2023-10-03 23:46:27 +03:00
|
|
|
let to = if let Some(txid) = head_block.transactions().last().cloned() {
|
2023-05-24 00:40:34 +03:00
|
|
|
let tx = self
|
2023-05-31 02:32:34 +03:00
|
|
|
.internal_request::<_, Option<Transaction>>(
|
2023-05-24 00:40:34 +03:00
|
|
|
"eth_getTransactionByHash",
|
|
|
|
&(txid,),
|
|
|
|
error_handler,
|
2023-06-21 00:22:14 +03:00
|
|
|
Some(Duration::from_secs(5)),
|
2023-05-24 00:40:34 +03:00
|
|
|
)
|
|
|
|
.await?
|
|
|
|
.context("no transaction")?;
|
|
|
|
|
|
|
|
// TODO: what default? something real?
|
|
|
|
tx.to.unwrap_or_else(|| {
|
|
|
|
"0xdead00000000000000000000000000000000beef"
|
|
|
|
.parse::<Address>()
|
|
|
|
.expect("deafbeef")
|
|
|
|
})
|
|
|
|
} else {
|
|
|
|
"0xdead00000000000000000000000000000000beef"
|
|
|
|
.parse::<Address>()
|
|
|
|
.expect("deafbeef")
|
|
|
|
};
|
|
|
|
|
|
|
|
let _code = self
|
2023-05-31 02:32:34 +03:00
|
|
|
.internal_request::<_, Option<Bytes>>(
|
2023-05-24 00:40:34 +03:00
|
|
|
"eth_getCode",
|
|
|
|
&(to, block_number),
|
|
|
|
error_handler,
|
2023-06-21 00:22:14 +03:00
|
|
|
Some(Duration::from_secs(5)),
|
2023-05-24 00:40:34 +03:00
|
|
|
)
|
|
|
|
.await?;
|
|
|
|
} else {
|
|
|
|
// TODO: if head block is none for too long, give an error
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-07-13 20:58:22 +03:00
|
|
|
/// TODO: this needs to be a subscribe_with_reconnect that does a retry with jitter and exponential backoff
|
2023-06-13 04:58:51 +03:00
|
|
|
async fn subscribe_with_reconnect(
|
|
|
|
self: Arc<Self>,
|
|
|
|
block_map: BlocksByHashCache,
|
2023-07-11 09:08:06 +03:00
|
|
|
block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
|
2023-09-13 22:05:47 +03:00
|
|
|
pending_txid_firehose_sender: Option<mpsc::Sender<TxHash>>,
|
2023-06-13 04:58:51 +03:00
|
|
|
chain_id: u64,
|
|
|
|
) -> Web3ProxyResult<()> {
|
|
|
|
loop {
|
|
|
|
if let Err(err) = self
|
|
|
|
.clone()
|
2023-09-13 22:05:47 +03:00
|
|
|
.subscribe(
|
|
|
|
block_map.clone(),
|
|
|
|
block_and_rpc_sender.clone(),
|
|
|
|
pending_txid_firehose_sender.clone(),
|
|
|
|
chain_id,
|
|
|
|
)
|
2023-06-13 04:58:51 +03:00
|
|
|
.await
|
|
|
|
{
|
|
|
|
if self.should_disconnect() {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2023-06-29 07:34:51 +03:00
|
|
|
warn!(?err, "subscribe err on {}", self);
|
2023-06-13 04:58:51 +03:00
|
|
|
} else if self.should_disconnect() {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
if self.backup {
|
|
|
|
debug!("reconnecting to {} in 30 seconds", self);
|
|
|
|
} else {
|
|
|
|
info!("reconnecting to {} in 30 seconds", self);
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: exponential backoff with jitter
|
|
|
|
sleep(Duration::from_secs(30)).await;
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-05-23 01:32:15 +03:00
|
|
|
/// subscribe to blocks and transactions
|
2022-12-06 00:13:36 +03:00
|
|
|
/// This should only exit when the program is exiting.
|
2023-05-24 00:40:34 +03:00
|
|
|
/// TODO: should more of these args be on self? chain_id for sure
|
2022-06-14 08:43:28 +03:00
|
|
|
async fn subscribe(
|
2022-06-14 07:04:14 +03:00
|
|
|
self: Arc<Self>,
|
2023-02-26 10:52:33 +03:00
|
|
|
block_map: BlocksByHashCache,
|
2023-07-11 09:08:06 +03:00
|
|
|
block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
|
2023-09-13 22:05:47 +03:00
|
|
|
pending_txid_firehose_sender: Option<mpsc::Sender<TxHash>>,
|
2022-12-06 00:13:36 +03:00
|
|
|
chain_id: u64,
|
2023-05-24 00:40:34 +03:00
|
|
|
) -> Web3ProxyResult<()> {
|
2023-01-26 08:24:09 +03:00
|
|
|
let error_handler = if self.backup {
|
2023-05-31 02:32:34 +03:00
|
|
|
Some(RequestErrorHandler::DebugLevel)
|
2023-02-16 02:31:59 +03:00
|
|
|
} else {
|
2023-06-29 07:08:43 +03:00
|
|
|
// TODO: info level?
|
|
|
|
Some(RequestErrorHandler::InfoLevel)
|
2023-02-16 02:31:59 +03:00
|
|
|
};
|
2023-03-31 14:43:41 +03:00
|
|
|
|
2023-06-16 10:46:27 +03:00
|
|
|
if self.should_disconnect() {
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
|
2023-06-13 04:58:51 +03:00
|
|
|
if let Some(url) = self.ws_url.clone() {
|
2023-06-17 09:46:20 +03:00
|
|
|
trace!("starting websocket provider on {}", self);
|
2023-06-13 04:58:51 +03:00
|
|
|
|
|
|
|
let x = connect_ws(url, usize::MAX).await?;
|
|
|
|
|
|
|
|
let x = Arc::new(x);
|
|
|
|
|
|
|
|
self.ws_provider.store(Some(x));
|
|
|
|
}
|
|
|
|
|
2023-06-16 10:46:27 +03:00
|
|
|
if self.should_disconnect() {
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
|
2023-06-17 09:46:20 +03:00
|
|
|
trace!("starting subscriptions on {}", self);
|
2022-06-16 05:53:37 +03:00
|
|
|
|
2023-06-29 07:34:51 +03:00
|
|
|
self.check_provider(chain_id)
|
|
|
|
.await
|
|
|
|
.web3_context("failed check_provider")?;
|
2023-03-01 23:56:00 +03:00
|
|
|
|
2023-07-11 06:59:03 +03:00
|
|
|
let mut futures = FuturesUnordered::new();
|
2023-02-28 00:29:07 +03:00
|
|
|
|
2023-06-13 04:58:51 +03:00
|
|
|
// TODO: use this channel instead of self.disconnect_watch
|
2023-06-16 10:46:27 +03:00
|
|
|
let (subscribe_stop_tx, subscribe_stop_rx) = watch::channel(false);
|
2023-06-13 04:58:51 +03:00
|
|
|
|
2023-06-16 10:46:27 +03:00
|
|
|
// subscribe to the disconnect watch. the app uses this when shutting down or when configs change
|
2023-06-13 04:58:51 +03:00
|
|
|
if let Some(disconnect_watch_tx) = self.disconnect_watch.as_ref() {
|
2023-06-29 04:36:17 +03:00
|
|
|
let rpc = self.clone();
|
2023-06-13 04:58:51 +03:00
|
|
|
let mut disconnect_watch_rx = disconnect_watch_tx.subscribe();
|
|
|
|
|
|
|
|
let f = async move {
|
2023-06-16 10:46:27 +03:00
|
|
|
loop {
|
|
|
|
if *disconnect_watch_rx.borrow_and_update() {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
disconnect_watch_rx.changed().await?;
|
2023-06-13 04:58:51 +03:00
|
|
|
}
|
2023-07-15 04:30:01 +03:00
|
|
|
trace!("disconnect triggered on {}", rpc);
|
2023-06-13 04:58:51 +03:00
|
|
|
Ok(())
|
|
|
|
};
|
|
|
|
|
|
|
|
futures.push(flatten_handle(tokio::spawn(f)));
|
2023-07-21 23:28:00 +03:00
|
|
|
} else {
|
|
|
|
unimplemented!("there should always be a disconnect watch!");
|
2023-06-13 04:58:51 +03:00
|
|
|
}
|
|
|
|
|
2023-05-24 00:40:34 +03:00
|
|
|
// health check that runs if there haven't been any recent requests
|
2023-07-21 23:28:00 +03:00
|
|
|
if block_and_rpc_sender.is_some() {
|
2023-05-24 00:40:34 +03:00
|
|
|
// TODO: move this into a proper function
|
|
|
|
let rpc = self.clone();
|
|
|
|
|
|
|
|
// TODO: how often? different depending on the chain?
|
2023-06-18 20:46:22 +03:00
|
|
|
// TODO: reset this timeout when a new block is seen? we need to keep median_request_latency updated though
|
2023-06-09 22:21:50 +03:00
|
|
|
let health_sleep_seconds = 5;
|
2023-05-24 00:40:34 +03:00
|
|
|
|
|
|
|
// health check loop
|
|
|
|
let f = async move {
|
|
|
|
// TODO: benchmark this and lock contention
|
|
|
|
let mut old_total_requests = 0;
|
|
|
|
let mut new_total_requests;
|
|
|
|
|
2023-06-13 04:58:51 +03:00
|
|
|
// errors here should not cause the loop to exit!
|
|
|
|
while !(*subscribe_stop_rx.borrow()) {
|
2023-06-13 21:51:19 +03:00
|
|
|
new_total_requests = rpc.internal_requests.load(atomic::Ordering::Relaxed)
|
|
|
|
+ rpc.external_requests.load(atomic::Ordering::Relaxed);
|
2023-05-24 00:40:34 +03:00
|
|
|
|
2023-06-09 22:21:50 +03:00
|
|
|
if new_total_requests - old_total_requests < 5 {
|
2023-05-24 00:40:34 +03:00
|
|
|
// TODO: if this fails too many times, reset the connection
|
|
|
|
// TODO: move this into a function and the chaining should be easier
|
2023-05-31 02:32:34 +03:00
|
|
|
if let Err(err) = rpc.healthcheck(error_handler).await {
|
2023-05-24 00:40:34 +03:00
|
|
|
// TODO: different level depending on the error handler
|
2023-06-29 04:36:17 +03:00
|
|
|
// TODO: if rate limit error, set "retry_at"
|
2023-07-11 06:59:17 +03:00
|
|
|
if rpc.backup {
|
|
|
|
warn!(?err, "health check on {} failed", rpc);
|
|
|
|
} else {
|
|
|
|
error!(?err, "health check on {} failed", rpc);
|
|
|
|
}
|
2022-12-06 00:13:36 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-05-24 00:40:34 +03:00
|
|
|
// TODO: should we count the requests done inside this health check
|
|
|
|
old_total_requests = new_total_requests;
|
2022-12-06 00:13:36 +03:00
|
|
|
|
2023-05-24 00:40:34 +03:00
|
|
|
sleep(Duration::from_secs(health_sleep_seconds)).await;
|
2022-09-14 04:43:09 +03:00
|
|
|
}
|
2023-03-23 04:43:13 +03:00
|
|
|
|
2023-07-15 04:30:01 +03:00
|
|
|
trace!("healthcheck loop on {} exited", rpc);
|
2023-03-23 04:43:13 +03:00
|
|
|
|
2023-05-24 00:40:34 +03:00
|
|
|
Ok(())
|
|
|
|
};
|
2023-03-23 04:43:13 +03:00
|
|
|
|
2023-05-24 00:40:34 +03:00
|
|
|
futures.push(flatten_handle(tokio::spawn(f)));
|
|
|
|
}
|
2023-03-23 04:43:13 +03:00
|
|
|
|
2023-05-24 00:40:34 +03:00
|
|
|
// subscribe to new heads
|
2023-06-16 10:46:27 +03:00
|
|
|
if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() {
|
2023-06-13 04:58:51 +03:00
|
|
|
let clone = self.clone();
|
|
|
|
let subscribe_stop_rx = subscribe_stop_tx.subscribe();
|
2023-07-11 06:59:03 +03:00
|
|
|
let block_map = block_map.clone();
|
2023-06-13 04:58:51 +03:00
|
|
|
|
|
|
|
let f = async move {
|
|
|
|
clone
|
2023-07-11 06:59:03 +03:00
|
|
|
.subscribe_new_heads(block_and_rpc_sender.clone(), block_map, subscribe_stop_rx)
|
|
|
|
.await
|
2023-06-13 04:58:51 +03:00
|
|
|
};
|
|
|
|
|
2023-05-24 00:40:34 +03:00
|
|
|
futures.push(flatten_handle(tokio::spawn(f)));
|
|
|
|
}
|
2023-03-23 04:43:13 +03:00
|
|
|
|
2023-09-13 22:05:47 +03:00
|
|
|
// subscribe to new transactions
|
|
|
|
if let Some(pending_txid_firehose) = pending_txid_firehose_sender.clone() {
|
|
|
|
let clone = self.clone();
|
|
|
|
let subscribe_stop_rx = subscribe_stop_tx.subscribe();
|
|
|
|
|
|
|
|
let f = async move {
|
|
|
|
clone
|
|
|
|
.subscribe_new_transactions(pending_txid_firehose, subscribe_stop_rx)
|
|
|
|
.await
|
|
|
|
};
|
|
|
|
|
|
|
|
futures.push(flatten_handle(tokio::spawn(f)));
|
|
|
|
}
|
|
|
|
|
2023-07-11 06:59:03 +03:00
|
|
|
// exit if any of the futures exit
|
2023-07-15 04:30:01 +03:00
|
|
|
// TODO: have an enum for which one exited?
|
2023-07-11 06:59:03 +03:00
|
|
|
let first_exit = futures.next().await;
|
2023-07-11 05:41:14 +03:00
|
|
|
|
2023-07-11 06:59:03 +03:00
|
|
|
debug!(?first_exit, "subscriptions on {} exited", self);
|
|
|
|
|
|
|
|
// clear the head block
|
|
|
|
if let Some(block_and_rpc_sender) = block_and_rpc_sender {
|
|
|
|
self.send_head_block_result(Ok(None), &block_and_rpc_sender, &block_map)
|
|
|
|
.await?
|
|
|
|
};
|
2023-05-23 01:32:15 +03:00
|
|
|
|
2023-06-13 04:58:51 +03:00
|
|
|
subscribe_stop_tx.send_replace(true);
|
|
|
|
|
|
|
|
// TODO: wait for all of the futures to exit?
|
2022-09-06 16:14:15 +03:00
|
|
|
|
2023-06-16 10:46:27 +03:00
|
|
|
// TODO: tell ethers to disconnect?
|
|
|
|
self.ws_provider.store(None);
|
|
|
|
|
2022-06-14 07:04:14 +03:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-09-13 22:05:47 +03:00
|
|
|
async fn subscribe_new_transactions(
|
|
|
|
self: &Arc<Self>,
|
|
|
|
pending_txid_firehose: mpsc::Sender<TxHash>,
|
|
|
|
mut subscribe_stop_rx: watch::Receiver<bool>,
|
|
|
|
) -> Web3ProxyResult<()> {
|
|
|
|
trace!("subscribing to new transactions on {}", self);
|
|
|
|
|
|
|
|
// rpcs opt-into subscribing to transactions. its a lot of bandwidth
|
|
|
|
if !self.subscribe_txs {
|
|
|
|
loop {
|
2023-09-19 09:27:44 +03:00
|
|
|
if *subscribe_stop_rx.borrow_and_update() {
|
2023-09-13 22:05:47 +03:00
|
|
|
trace!("stopping ws block subscription on {}", self);
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
subscribe_stop_rx.changed().await?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if let Some(ws_provider) = self.ws_provider.load().as_ref() {
|
|
|
|
// todo: move subscribe_blocks onto the request handle
|
|
|
|
let authorization = Default::default();
|
|
|
|
|
|
|
|
let error_handler = Some(Level::ERROR.into());
|
|
|
|
|
|
|
|
let active_request_handle = self
|
|
|
|
.wait_for_request_handle(&authorization, None, error_handler)
|
|
|
|
.await;
|
|
|
|
|
|
|
|
let mut pending_txs_sub = ws_provider.subscribe_pending_txs().await?;
|
|
|
|
|
|
|
|
drop(active_request_handle);
|
|
|
|
|
|
|
|
while let Some(x) = pending_txs_sub.next().await {
|
2023-09-19 09:27:44 +03:00
|
|
|
if *subscribe_stop_rx.borrow_and_update() {
|
2023-09-13 22:05:47 +03:00
|
|
|
// TODO: this is checking way too often. have this on a timer instead
|
|
|
|
trace!("stopping ws block subscription on {}", self);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
// this should always work
|
|
|
|
if let Err(err) = pending_txid_firehose.try_send(x) {
|
|
|
|
error!(
|
|
|
|
?err,
|
|
|
|
"pending_txid_firehose failed sending. it must be full"
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
2023-09-14 22:51:40 +03:00
|
|
|
// TODO: what should we do here?
|
|
|
|
loop {
|
2023-09-19 09:27:44 +03:00
|
|
|
if *subscribe_stop_rx.borrow_and_update() {
|
2023-09-14 22:51:40 +03:00
|
|
|
trace!("stopping ws block subscription on {}", self);
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
subscribe_stop_rx.changed().await?;
|
|
|
|
}
|
2023-09-13 22:05:47 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-03-02 21:14:17 +03:00
|
|
|
/// Subscribe to new blocks.
|
|
|
|
async fn subscribe_new_heads(
|
2023-06-13 04:58:51 +03:00
|
|
|
self: &Arc<Self>,
|
2023-07-11 09:08:06 +03:00
|
|
|
block_sender: mpsc::UnboundedSender<BlockAndRpc>,
|
2023-03-02 21:14:17 +03:00
|
|
|
block_map: BlocksByHashCache,
|
2023-09-19 09:27:44 +03:00
|
|
|
mut subscribe_stop_rx: watch::Receiver<bool>,
|
2023-05-24 00:40:34 +03:00
|
|
|
) -> Web3ProxyResult<()> {
|
2023-06-17 09:46:20 +03:00
|
|
|
trace!("subscribing to new heads on {}", self);
|
2023-03-02 21:14:17 +03:00
|
|
|
|
2023-05-31 02:32:34 +03:00
|
|
|
// TODO: different handler depending on backup or not
|
2023-07-13 20:58:22 +03:00
|
|
|
let error_handler = if self.backup {
|
|
|
|
Some(Level::DEBUG.into())
|
|
|
|
} else {
|
|
|
|
Some(Level::ERROR.into())
|
|
|
|
};
|
2023-05-31 02:32:34 +03:00
|
|
|
|
2023-06-13 04:58:51 +03:00
|
|
|
if let Some(ws_provider) = self.ws_provider.load().as_ref() {
|
2023-05-24 00:40:34 +03:00
|
|
|
// todo: move subscribe_blocks onto the request handle
|
2023-07-13 20:58:22 +03:00
|
|
|
let authorization = Default::default();
|
|
|
|
|
2023-05-31 02:32:34 +03:00
|
|
|
let active_request_handle = self
|
|
|
|
.wait_for_request_handle(&authorization, None, error_handler)
|
|
|
|
.await;
|
2023-05-24 00:40:34 +03:00
|
|
|
let mut blocks = ws_provider.subscribe_blocks().await?;
|
|
|
|
drop(active_request_handle);
|
|
|
|
|
|
|
|
// query the block once since the subscription doesn't send the current block
|
|
|
|
// there is a very small race condition here where the stream could send us a new block right now
|
|
|
|
// but all seeing the same block twice won't break anything
|
|
|
|
// TODO: how does this get wrapped in an arc? does ethers handle that?
|
2023-06-13 04:58:51 +03:00
|
|
|
// TODO: send this request to the ws_provider instead of the http_provider
|
2023-05-24 00:40:34 +03:00
|
|
|
let latest_block: Result<Option<ArcBlock>, _> = self
|
2023-07-13 20:58:22 +03:00
|
|
|
.internal_request(
|
2023-05-24 00:40:34 +03:00
|
|
|
"eth_getBlockByNumber",
|
2023-05-31 02:32:34 +03:00
|
|
|
&("latest", false),
|
2023-07-13 20:58:22 +03:00
|
|
|
error_handler,
|
2023-06-21 00:22:14 +03:00
|
|
|
Some(Duration::from_secs(5)),
|
2023-05-24 00:40:34 +03:00
|
|
|
)
|
|
|
|
.await;
|
2022-11-06 23:52:11 +03:00
|
|
|
|
2023-05-24 00:40:34 +03:00
|
|
|
self.send_head_block_result(latest_block, &block_sender, &block_map)
|
|
|
|
.await?;
|
2022-07-19 04:31:12 +03:00
|
|
|
|
2023-05-24 00:40:34 +03:00
|
|
|
while let Some(block) = blocks.next().await {
|
2023-06-13 04:58:51 +03:00
|
|
|
if *subscribe_stop_rx.borrow() {
|
2023-06-16 10:46:27 +03:00
|
|
|
trace!("stopping ws block subscription on {}", self);
|
2023-05-24 00:40:34 +03:00
|
|
|
break;
|
2022-06-14 07:04:14 +03:00
|
|
|
}
|
2023-01-03 18:51:18 +03:00
|
|
|
|
2023-05-24 00:40:34 +03:00
|
|
|
let block = Arc::new(block);
|
2022-07-09 01:14:45 +03:00
|
|
|
|
2023-05-24 00:40:34 +03:00
|
|
|
self.send_head_block_result(Ok(Some(block)), &block_sender, &block_map)
|
2023-02-12 12:22:53 +03:00
|
|
|
.await?;
|
2023-05-24 00:40:34 +03:00
|
|
|
}
|
2023-09-27 04:18:06 +03:00
|
|
|
} else if self.http_client.is_some() {
|
2023-06-21 00:22:14 +03:00
|
|
|
// there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints
|
2023-06-13 08:26:10 +03:00
|
|
|
// TODO: is 1/2 the block time okay?
|
|
|
|
let mut i = interval(self.block_interval / 2);
|
|
|
|
i.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
2023-02-12 12:22:53 +03:00
|
|
|
|
2023-06-13 08:26:10 +03:00
|
|
|
loop {
|
2023-09-19 09:27:44 +03:00
|
|
|
if *subscribe_stop_rx.borrow_and_update() {
|
2023-07-11 06:59:03 +03:00
|
|
|
trace!(%self, "stopping http block subscription");
|
2023-05-24 00:40:34 +03:00
|
|
|
break;
|
|
|
|
}
|
2022-11-06 23:52:11 +03:00
|
|
|
|
2023-06-13 08:26:10 +03:00
|
|
|
let block_result = self
|
2023-07-13 20:58:22 +03:00
|
|
|
.internal_request::<_, Option<ArcBlock>>(
|
2023-06-13 08:26:10 +03:00
|
|
|
"eth_getBlockByNumber",
|
|
|
|
&("latest", false),
|
2023-07-13 20:58:22 +03:00
|
|
|
error_handler,
|
2023-06-21 00:22:14 +03:00
|
|
|
Some(Duration::from_secs(5)),
|
2023-06-13 08:26:10 +03:00
|
|
|
)
|
|
|
|
.await;
|
2023-02-12 12:22:53 +03:00
|
|
|
|
2023-06-13 08:26:10 +03:00
|
|
|
self.send_head_block_result(block_result, &block_sender, &block_map)
|
2023-02-12 12:22:53 +03:00
|
|
|
.await?;
|
2023-06-13 08:26:10 +03:00
|
|
|
|
|
|
|
i.tick().await;
|
2022-05-17 05:26:47 +03:00
|
|
|
}
|
2023-05-24 00:40:34 +03:00
|
|
|
} else {
|
2023-10-03 23:46:27 +03:00
|
|
|
return Err(anyhow!("no ws or http provider!").into());
|
2022-06-14 07:04:14 +03:00
|
|
|
}
|
2023-02-27 10:52:37 +03:00
|
|
|
|
|
|
|
// clear the head block. this might not be needed, but it won't hurt
|
2023-05-24 00:40:34 +03:00
|
|
|
self.send_head_block_result(Ok(None), &block_sender, &block_map)
|
2023-02-27 10:52:37 +03:00
|
|
|
.await?;
|
|
|
|
|
2023-06-13 04:58:51 +03:00
|
|
|
if *subscribe_stop_rx.borrow() {
|
2023-07-15 04:30:01 +03:00
|
|
|
trace!(%self, "new heads subscription exited");
|
2023-03-23 02:16:15 +03:00
|
|
|
Ok(())
|
|
|
|
} else {
|
2023-05-24 00:40:34 +03:00
|
|
|
Err(anyhow!("new_heads subscription exited. reconnect needed").into())
|
2023-03-23 02:16:15 +03:00
|
|
|
}
|
2022-06-14 07:04:14 +03:00
|
|
|
}
|
2022-05-17 05:26:47 +03:00
|
|
|
|
2023-05-31 02:32:34 +03:00
|
|
|
pub async fn wait_for_request_handle(
|
|
|
|
self: &Arc<Self>,
|
2023-10-03 23:46:27 +03:00
|
|
|
web3_request: &Arc<Web3Request>,
|
2023-01-25 09:45:20 +03:00
|
|
|
max_wait: Option<Duration>,
|
2023-05-31 02:32:34 +03:00
|
|
|
error_handler: Option<RequestErrorHandler>,
|
2023-03-20 04:52:28 +03:00
|
|
|
) -> Web3ProxyResult<OpenRequestHandle> {
|
2023-07-08 01:56:46 +03:00
|
|
|
// TODO: what should the default be?
|
|
|
|
// TODO: split max_wait_connect (which might wait if a rate limit is pending) and max_wait_request
|
2023-05-31 02:32:34 +03:00
|
|
|
let max_wait_until = max_wait.map(|x| Instant::now() + x);
|
2022-05-16 22:15:40 +03:00
|
|
|
|
2022-06-17 01:23:41 +03:00
|
|
|
loop {
|
2023-10-03 23:46:27 +03:00
|
|
|
match self.try_request_handle(web3_request, error_handler).await {
|
2022-08-24 03:59:05 +03:00
|
|
|
Ok(OpenRequestResult::Handle(handle)) => return Ok(handle),
|
2022-08-24 03:14:49 +03:00
|
|
|
Ok(OpenRequestResult::RetryAt(retry_at)) => {
|
2022-08-07 09:48:57 +03:00
|
|
|
// TODO: emit a stat?
|
2023-01-25 09:45:20 +03:00
|
|
|
let wait = retry_at.duration_since(Instant::now());
|
|
|
|
|
|
|
|
trace!(
|
|
|
|
"waiting {} millis for request handle on {}",
|
|
|
|
wait.as_millis(),
|
|
|
|
self
|
|
|
|
);
|
|
|
|
|
2023-05-31 02:32:34 +03:00
|
|
|
if let Some(max_wait_until) = max_wait_until {
|
|
|
|
if retry_at > max_wait_until {
|
2023-01-25 09:45:20 +03:00
|
|
|
// break now since we will wait past our maximum wait time
|
2023-03-20 04:52:28 +03:00
|
|
|
return Err(Web3ProxyError::Timeout(None));
|
2023-01-25 09:45:20 +03:00
|
|
|
}
|
2022-09-20 09:00:27 +03:00
|
|
|
}
|
2023-01-25 07:44:50 +03:00
|
|
|
|
2022-08-07 09:48:57 +03:00
|
|
|
sleep_until(retry_at).await;
|
|
|
|
}
|
2023-02-15 04:41:40 +03:00
|
|
|
Ok(OpenRequestResult::NotReady) => {
|
2022-08-24 03:59:05 +03:00
|
|
|
// TODO: when can this happen? log? emit a stat?
|
2023-01-25 07:44:50 +03:00
|
|
|
trace!("{} has no handle ready", self);
|
|
|
|
|
2023-05-31 02:32:34 +03:00
|
|
|
if let Some(max_wait_until) = max_wait_until {
|
|
|
|
if Instant::now() > max_wait_until {
|
2023-03-20 04:52:28 +03:00
|
|
|
return Err(Web3ProxyError::NoHandleReady);
|
2023-01-25 09:45:20 +03:00
|
|
|
}
|
2023-01-25 07:44:50 +03:00
|
|
|
}
|
|
|
|
|
2022-08-07 09:48:57 +03:00
|
|
|
// TODO: sleep how long? maybe just error?
|
2023-05-24 00:40:34 +03:00
|
|
|
// TODO: instead of an arbitrary sleep, subscribe to the head block on this?
|
2023-01-25 07:44:50 +03:00
|
|
|
sleep(Duration::from_millis(10)).await;
|
2022-05-06 07:29:25 +03:00
|
|
|
}
|
2022-08-07 09:48:57 +03:00
|
|
|
Err(err) => return Err(err),
|
2022-05-06 07:29:25 +03:00
|
|
|
}
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-09-22 23:27:14 +03:00
|
|
|
pub async fn try_request_handle(
|
|
|
|
self: &Arc<Self>,
|
2023-10-03 23:46:27 +03:00
|
|
|
web3_request: &Arc<Web3Request>,
|
2023-05-31 02:32:34 +03:00
|
|
|
error_handler: Option<RequestErrorHandler>,
|
2023-03-20 04:52:28 +03:00
|
|
|
) -> Web3ProxyResult<OpenRequestResult> {
|
2023-05-23 01:32:15 +03:00
|
|
|
// TODO: if websocket is reconnecting, return an error?
|
2022-12-08 09:54:38 +03:00
|
|
|
|
2023-05-19 01:11:29 +03:00
|
|
|
// check cached rate limits
|
2023-01-25 07:44:50 +03:00
|
|
|
if let Some(hard_limit_until) = self.hard_limit_until.as_ref() {
|
2023-02-27 09:44:09 +03:00
|
|
|
let hard_limit_ready = *hard_limit_until.borrow();
|
2023-01-25 07:44:50 +03:00
|
|
|
let now = Instant::now();
|
|
|
|
if now < hard_limit_ready {
|
|
|
|
return Ok(OpenRequestResult::RetryAt(hard_limit_ready));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-05-19 01:11:29 +03:00
|
|
|
// check shared rate limits
|
2022-05-22 02:34:05 +03:00
|
|
|
if let Some(ratelimiter) = self.hard_limit.as_ref() {
|
2022-09-20 09:56:24 +03:00
|
|
|
// TODO: how should we know if we should set expire or not?
|
2023-01-25 09:45:20 +03:00
|
|
|
match ratelimiter
|
|
|
|
.throttle()
|
|
|
|
.await
|
|
|
|
.context(format!("attempting to throttle {}", self))?
|
|
|
|
{
|
2022-09-15 20:57:24 +03:00
|
|
|
RedisRateLimitResult::Allowed(_) => {
|
2023-01-25 07:44:50 +03:00
|
|
|
// trace!("rate limit succeeded")
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
2022-09-15 20:57:24 +03:00
|
|
|
RedisRateLimitResult::RetryAt(retry_at, _) => {
|
2023-01-25 09:45:20 +03:00
|
|
|
// rate limit gave us a wait time
|
2023-05-19 01:11:29 +03:00
|
|
|
// if not a backup server, warn. backups hit rate limits often
|
2023-01-25 09:45:20 +03:00
|
|
|
if !self.backup {
|
|
|
|
let when = retry_at.duration_since(Instant::now());
|
|
|
|
warn!(
|
2023-06-29 04:44:54 +03:00
|
|
|
retry_ms=%when.as_millis(),
|
|
|
|
"Exhausted rate limit on {}",
|
2023-01-25 09:45:20 +03:00
|
|
|
self,
|
|
|
|
);
|
|
|
|
}
|
2022-05-05 22:07:09 +03:00
|
|
|
|
2023-01-25 07:44:50 +03:00
|
|
|
if let Some(hard_limit_until) = self.hard_limit_until.as_ref() {
|
2023-02-27 09:44:09 +03:00
|
|
|
hard_limit_until.send_replace(retry_at);
|
2023-01-25 07:44:50 +03:00
|
|
|
}
|
|
|
|
|
2022-09-10 03:12:14 +03:00
|
|
|
return Ok(OpenRequestResult::RetryAt(retry_at));
|
2022-08-07 09:48:57 +03:00
|
|
|
}
|
2022-09-15 20:57:24 +03:00
|
|
|
RedisRateLimitResult::RetryNever => {
|
2023-05-19 01:11:29 +03:00
|
|
|
warn!("how did retry never on {} happen?", self);
|
2023-02-15 04:41:40 +03:00
|
|
|
return Ok(OpenRequestResult::NotReady);
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2023-05-31 02:32:34 +03:00
|
|
|
let handle =
|
2023-10-03 23:46:27 +03:00
|
|
|
OpenRequestHandle::new(web3_request.clone(), self.clone(), error_handler).await;
|
2022-08-24 03:11:49 +03:00
|
|
|
|
2023-05-31 02:32:34 +03:00
|
|
|
Ok(handle.into())
|
2022-08-24 03:11:49 +03:00
|
|
|
}
|
2023-02-16 11:26:58 +03:00
|
|
|
|
2023-05-31 02:32:34 +03:00
|
|
|
pub async fn internal_request<P: JsonRpcParams, R: JsonRpcResultData>(
|
2023-02-16 11:26:58 +03:00
|
|
|
self: &Arc<Self>,
|
|
|
|
method: &str,
|
|
|
|
params: &P,
|
2023-05-31 02:32:34 +03:00
|
|
|
error_handler: Option<RequestErrorHandler>,
|
2023-06-21 00:22:14 +03:00
|
|
|
max_wait: Option<Duration>,
|
2023-05-31 02:32:34 +03:00
|
|
|
) -> Web3ProxyResult<R> {
|
2023-10-07 00:51:21 +03:00
|
|
|
// TODO: think about this more. its hard to do this without being self-referenctial!
|
|
|
|
let web3_request = Web3Request::new_internal(method.into(), params, None, max_wait).await?;
|
2023-05-31 02:32:34 +03:00
|
|
|
|
2023-10-03 23:46:27 +03:00
|
|
|
self.authorized_request(&web3_request, error_handler, max_wait)
|
2023-08-03 19:15:31 +03:00
|
|
|
.await
|
2023-05-31 02:32:34 +03:00
|
|
|
}
|
|
|
|
|
2023-10-03 23:46:27 +03:00
|
|
|
pub async fn authorized_request<R: JsonRpcResultData>(
|
2023-05-31 02:32:34 +03:00
|
|
|
self: &Arc<Self>,
|
2023-10-03 23:46:27 +03:00
|
|
|
web3_request: &Arc<Web3Request>,
|
2023-05-31 02:32:34 +03:00
|
|
|
error_handler: Option<RequestErrorHandler>,
|
2023-06-21 00:22:14 +03:00
|
|
|
max_wait: Option<Duration>,
|
2023-05-31 02:32:34 +03:00
|
|
|
) -> Web3ProxyResult<R> {
|
2023-08-03 19:15:31 +03:00
|
|
|
let handle = self
|
2023-10-03 23:46:27 +03:00
|
|
|
.wait_for_request_handle(web3_request, max_wait, error_handler)
|
2023-08-03 19:15:31 +03:00
|
|
|
.await?;
|
2023-08-03 10:22:45 +03:00
|
|
|
|
2023-10-03 23:46:27 +03:00
|
|
|
let response = handle.request().await?;
|
2023-09-27 04:18:06 +03:00
|
|
|
let parsed = response.parsed().await?;
|
|
|
|
match parsed.payload {
|
|
|
|
jsonrpc::Payload::Success { result } => Ok(result),
|
|
|
|
jsonrpc::Payload::Error { error } => Err(error.into()),
|
|
|
|
}
|
2023-02-16 11:26:58 +03:00
|
|
|
}
|
2022-08-24 03:11:49 +03:00
|
|
|
}
|
2022-08-07 09:48:57 +03:00
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
impl Hash for Web3Rpc {
|
2022-06-14 07:04:14 +03:00
|
|
|
fn hash<H: Hasher>(&self, state: &mut H) {
|
2023-06-09 22:21:50 +03:00
|
|
|
// do not include automatic block limit because it can change
|
|
|
|
// do not include tier because it can change
|
|
|
|
self.backup.hash(state);
|
|
|
|
self.created_at.hash(state);
|
2023-02-28 22:01:34 +03:00
|
|
|
self.display_name.hash(state);
|
2023-06-09 22:21:50 +03:00
|
|
|
self.name.hash(state);
|
|
|
|
|
2023-05-23 01:32:15 +03:00
|
|
|
// TODO: url does NOT include the authorization data. i think created_at should protect us if auth changes without anything else
|
2023-09-27 04:18:06 +03:00
|
|
|
self.http_url.hash(state);
|
2023-06-09 22:21:50 +03:00
|
|
|
// TODO: figure out how to get the url for the ws provider
|
2023-05-23 01:32:15 +03:00
|
|
|
// self.ws_provider.map(|x| x.url()).hash(state);
|
2023-06-09 22:21:50 +03:00
|
|
|
|
2023-05-19 01:11:29 +03:00
|
|
|
// TODO: don't include soft_limit if we change them to be dynamic
|
2023-02-28 22:01:34 +03:00
|
|
|
self.soft_limit.hash(state);
|
2022-06-14 07:04:14 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
impl Eq for Web3Rpc {}
|
2022-05-05 22:07:09 +03:00
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
impl Ord for Web3Rpc {
|
2022-05-05 22:07:09 +03:00
|
|
|
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
2022-08-24 03:32:16 +03:00
|
|
|
self.name.cmp(&other.name)
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
impl PartialOrd for Web3Rpc {
|
2022-05-05 22:07:09 +03:00
|
|
|
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
|
|
|
Some(self.cmp(other))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
impl PartialEq for Web3Rpc {
|
2022-05-05 22:07:09 +03:00
|
|
|
fn eq(&self, other: &Self) -> bool {
|
2022-08-24 03:32:16 +03:00
|
|
|
self.name == other.name
|
2022-05-05 22:07:09 +03:00
|
|
|
}
|
|
|
|
}
|
2022-08-10 08:56:09 +03:00
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
impl Serialize for Web3Rpc {
|
2022-08-10 08:56:09 +03:00
|
|
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
|
|
where
|
|
|
|
S: Serializer,
|
|
|
|
{
|
2023-07-10 21:05:07 +03:00
|
|
|
// 14 if we bring head_delay back
|
|
|
|
let mut state = serializer.serialize_struct("Web3Rpc", 13)?;
|
2022-08-10 08:56:09 +03:00
|
|
|
|
2022-11-14 00:05:37 +03:00
|
|
|
// the url is excluded because it likely includes private information. just show the name that we use in keys
|
2022-08-10 08:56:09 +03:00
|
|
|
state.serialize_field("name", &self.name)?;
|
2022-11-14 00:05:37 +03:00
|
|
|
// a longer name for display to users
|
|
|
|
state.serialize_field("display_name", &self.display_name)?;
|
2022-08-10 08:56:09 +03:00
|
|
|
|
2023-02-02 19:00:59 +03:00
|
|
|
state.serialize_field("backup", &self.backup)?;
|
|
|
|
|
2023-05-13 09:00:03 +03:00
|
|
|
match self.block_data_limit.load(atomic::Ordering::Acquire) {
|
2022-12-28 05:17:11 +03:00
|
|
|
u64::MAX => {
|
|
|
|
state.serialize_field("block_data_limit", &None::<()>)?;
|
|
|
|
}
|
|
|
|
block_data_limit => {
|
|
|
|
state.serialize_field("block_data_limit", &block_data_limit)?;
|
|
|
|
}
|
2022-09-06 23:12:45 +03:00
|
|
|
}
|
2022-08-10 08:56:09 +03:00
|
|
|
|
2023-01-04 09:37:51 +03:00
|
|
|
state.serialize_field("tier", &self.tier)?;
|
2023-01-05 01:33:39 +03:00
|
|
|
|
2022-08-10 08:56:09 +03:00
|
|
|
state.serialize_field("soft_limit", &self.soft_limit)?;
|
|
|
|
|
2023-02-15 23:33:43 +03:00
|
|
|
// TODO: maybe this is too much data. serialize less?
|
2023-05-13 01:15:32 +03:00
|
|
|
{
|
2023-10-03 23:46:27 +03:00
|
|
|
let head_block = self.head_block_sender.as_ref().unwrap();
|
2023-05-13 01:15:32 +03:00
|
|
|
let head_block = head_block.borrow();
|
|
|
|
let head_block = head_block.as_ref();
|
|
|
|
state.serialize_field("head_block", &head_block)?;
|
|
|
|
}
|
2022-09-06 23:12:45 +03:00
|
|
|
|
2023-05-13 09:00:03 +03:00
|
|
|
state.serialize_field(
|
2023-06-13 21:51:19 +03:00
|
|
|
"external_requests",
|
|
|
|
&self.external_requests.load(atomic::Ordering::Relaxed),
|
|
|
|
)?;
|
|
|
|
|
|
|
|
state.serialize_field(
|
|
|
|
"internal_requests",
|
|
|
|
&self.internal_requests.load(atomic::Ordering::Relaxed),
|
2023-05-13 09:00:03 +03:00
|
|
|
)?;
|
|
|
|
|
|
|
|
state.serialize_field(
|
|
|
|
"active_requests",
|
2023-05-13 21:13:02 +03:00
|
|
|
&self.active_requests.load(atomic::Ordering::Relaxed),
|
2023-05-13 09:00:03 +03:00
|
|
|
)?;
|
|
|
|
|
2023-07-10 21:05:07 +03:00
|
|
|
// {
|
|
|
|
// let head_delay_ms = self.head_delay.read().await.latency().as_secs_f32() * 1000.0;
|
|
|
|
// state.serialize_field("head_delay_ms", &(head_delay_ms))?;
|
|
|
|
// }
|
2023-06-18 19:47:40 +03:00
|
|
|
|
2023-06-18 20:46:22 +03:00
|
|
|
{
|
|
|
|
let median_latency_ms = self
|
|
|
|
.median_latency
|
2023-06-18 19:47:40 +03:00
|
|
|
.as_ref()
|
|
|
|
.unwrap()
|
2023-06-18 20:46:22 +03:00
|
|
|
.latency()
|
|
|
|
.as_secs_f32()
|
|
|
|
* 1000.0;
|
|
|
|
state.serialize_field("median_latency_ms", &(median_latency_ms))?;
|
|
|
|
}
|
2022-09-05 19:39:46 +03:00
|
|
|
|
2023-06-13 21:51:19 +03:00
|
|
|
{
|
2023-06-18 20:46:22 +03:00
|
|
|
let peak_latency_ms =
|
|
|
|
self.peak_latency.as_ref().unwrap().latency().as_secs_f32() * 1000.0;
|
|
|
|
state.serialize_field("peak_latency_ms", &peak_latency_ms)?;
|
|
|
|
}
|
|
|
|
{
|
|
|
|
let weighted_latency_ms = self.weighted_peak_latency().as_secs_f32() * 1000.0;
|
|
|
|
state.serialize_field("weighted_latency_ms", &weighted_latency_ms)?;
|
2023-06-13 21:51:19 +03:00
|
|
|
}
|
2023-05-13 21:13:02 +03:00
|
|
|
|
2022-08-10 08:56:09 +03:00
|
|
|
state.end()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
impl fmt::Debug for Web3Rpc {
|
2022-08-10 08:56:09 +03:00
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
2023-02-06 20:55:27 +03:00
|
|
|
let mut f = f.debug_struct("Web3Rpc");
|
2022-08-10 08:56:09 +03:00
|
|
|
|
2022-08-24 03:32:16 +03:00
|
|
|
f.field("name", &self.name);
|
2022-08-10 08:56:09 +03:00
|
|
|
|
2023-05-13 09:00:03 +03:00
|
|
|
let block_data_limit = self.block_data_limit.load(atomic::Ordering::Acquire);
|
2022-08-10 08:56:09 +03:00
|
|
|
if block_data_limit == u64::MAX {
|
2022-09-07 06:54:16 +03:00
|
|
|
f.field("blocks", &"all");
|
2022-08-10 08:56:09 +03:00
|
|
|
} else {
|
2022-09-07 06:54:16 +03:00
|
|
|
f.field("blocks", &block_data_limit);
|
2022-08-10 08:56:09 +03:00
|
|
|
}
|
|
|
|
|
2023-07-12 10:35:07 +03:00
|
|
|
f.field("backup", &self.backup);
|
|
|
|
|
|
|
|
f.field("tier", &self.tier.load(atomic::Ordering::Relaxed));
|
|
|
|
|
|
|
|
f.field("weighted_ms", &self.weighted_peak_latency().as_millis());
|
|
|
|
|
2023-10-03 23:46:27 +03:00
|
|
|
if let Some(head_block_watch) = self.head_block_sender.as_ref() {
|
2023-07-12 10:35:07 +03:00
|
|
|
if let Some(head_block) = head_block_watch.borrow().as_ref() {
|
2023-10-03 23:46:27 +03:00
|
|
|
f.field("head_num", &head_block.number());
|
2023-07-12 10:35:07 +03:00
|
|
|
f.field("head_hash", head_block.hash());
|
|
|
|
} else {
|
|
|
|
f.field("head_num", &None::<()>);
|
|
|
|
f.field("head_hash", &None::<()>);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-08-10 08:56:09 +03:00
|
|
|
f.finish_non_exhaustive()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
impl fmt::Display for Web3Rpc {
|
2022-08-10 08:56:09 +03:00
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
2022-08-24 03:32:16 +03:00
|
|
|
write!(f, "{}", &self.name)
|
2022-08-10 08:56:09 +03:00
|
|
|
}
|
|
|
|
}
|
2022-11-22 23:23:08 +03:00
|
|
|
|
|
|
|
mod tests {
|
2022-11-30 00:34:42 +03:00
|
|
|
#![allow(unused_imports)]
|
2022-11-22 23:23:08 +03:00
|
|
|
use super::*;
|
2023-05-24 00:40:34 +03:00
|
|
|
use ethers::types::{Block, H256, U256};
|
2022-11-22 23:23:08 +03:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_archive_node_has_block_data() {
|
2023-01-26 08:24:09 +03:00
|
|
|
let now = chrono::Utc::now().timestamp().into();
|
2022-12-05 04:10:20 +03:00
|
|
|
|
2022-12-03 08:31:03 +03:00
|
|
|
let random_block = Block {
|
|
|
|
hash: Some(H256::random()),
|
|
|
|
number: Some(1_000_000.into()),
|
2022-12-05 04:10:20 +03:00
|
|
|
timestamp: now,
|
2022-12-03 08:31:03 +03:00
|
|
|
..Default::default()
|
2022-11-22 23:23:08 +03:00
|
|
|
};
|
2022-12-03 08:31:03 +03:00
|
|
|
|
|
|
|
let random_block = Arc::new(random_block);
|
|
|
|
|
2023-02-15 04:41:40 +03:00
|
|
|
let head_block = Web3ProxyBlock::try_new(random_block).unwrap();
|
2022-11-22 23:44:23 +03:00
|
|
|
let block_data_limit = u64::MAX;
|
2022-11-22 23:23:08 +03:00
|
|
|
|
2023-05-13 01:15:32 +03:00
|
|
|
let (tx, _) = watch::channel(Some(head_block.clone()));
|
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
let x = Web3Rpc {
|
2022-11-22 23:23:08 +03:00
|
|
|
name: "name".to_string(),
|
|
|
|
soft_limit: 1_000,
|
2022-12-06 00:13:36 +03:00
|
|
|
automatic_block_limit: false,
|
2023-01-19 13:13:00 +03:00
|
|
|
backup: false,
|
2022-11-22 23:44:23 +03:00
|
|
|
block_data_limit: block_data_limit.into(),
|
2023-10-03 23:46:27 +03:00
|
|
|
head_block_sender: Some(tx),
|
2023-02-06 20:55:27 +03:00
|
|
|
..Default::default()
|
2022-11-22 23:23:08 +03:00
|
|
|
};
|
|
|
|
|
2023-10-03 23:46:27 +03:00
|
|
|
assert!(x.has_block_data(0.into()));
|
|
|
|
assert!(x.has_block_data(1.into()));
|
2023-02-27 09:44:09 +03:00
|
|
|
assert!(x.has_block_data(head_block.number()));
|
2023-10-03 23:46:27 +03:00
|
|
|
assert!(!x.has_block_data(head_block.number() + 1));
|
|
|
|
assert!(!x.has_block_data(head_block.number() + 1000));
|
2022-11-22 23:23:08 +03:00
|
|
|
}
|
2022-11-22 23:44:23 +03:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_pruned_node_has_block_data() {
|
2023-01-26 08:24:09 +03:00
|
|
|
let now = chrono::Utc::now().timestamp().into();
|
2022-12-05 04:10:20 +03:00
|
|
|
|
2023-02-14 23:14:50 +03:00
|
|
|
let head_block: Web3ProxyBlock = Arc::new(Block {
|
2022-12-03 08:31:03 +03:00
|
|
|
hash: Some(H256::random()),
|
|
|
|
number: Some(1_000_000.into()),
|
2022-12-05 04:10:20 +03:00
|
|
|
timestamp: now,
|
2022-12-03 08:31:03 +03:00
|
|
|
..Default::default()
|
|
|
|
})
|
2023-02-15 04:41:40 +03:00
|
|
|
.try_into()
|
|
|
|
.unwrap();
|
2022-11-22 23:44:23 +03:00
|
|
|
|
|
|
|
let block_data_limit = 64;
|
|
|
|
|
2023-05-13 01:15:32 +03:00
|
|
|
let (tx, _rx) = watch::channel(Some(head_block.clone()));
|
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
let x = Web3Rpc {
|
2022-11-22 23:44:23 +03:00
|
|
|
name: "name".to_string(),
|
|
|
|
soft_limit: 1_000,
|
2022-12-06 00:13:36 +03:00
|
|
|
automatic_block_limit: false,
|
2023-01-19 13:13:00 +03:00
|
|
|
backup: false,
|
2022-11-22 23:44:23 +03:00
|
|
|
block_data_limit: block_data_limit.into(),
|
2023-10-03 23:46:27 +03:00
|
|
|
head_block_sender: Some(tx),
|
2023-02-06 20:55:27 +03:00
|
|
|
..Default::default()
|
2022-11-22 23:44:23 +03:00
|
|
|
};
|
|
|
|
|
2023-10-03 23:46:27 +03:00
|
|
|
assert!(!x.has_block_data(0.into()));
|
|
|
|
assert!(!x.has_block_data(1.into()));
|
|
|
|
assert!(!x.has_block_data(head_block.number() - block_data_limit - 1));
|
|
|
|
assert!(x.has_block_data(head_block.number() - block_data_limit));
|
2023-02-27 09:44:09 +03:00
|
|
|
assert!(x.has_block_data(head_block.number()));
|
2023-10-03 23:46:27 +03:00
|
|
|
assert!(!x.has_block_data(head_block.number() + 1));
|
|
|
|
assert!(!x.has_block_data(head_block.number() + 1000));
|
2022-11-22 23:44:23 +03:00
|
|
|
}
|
2022-12-05 04:10:20 +03:00
|
|
|
|
2023-01-19 14:05:39 +03:00
|
|
|
/*
|
|
|
|
// TODO: think about how to bring the concept of a "lagged" node back
|
2022-12-05 04:10:20 +03:00
|
|
|
#[test]
|
|
|
|
fn test_lagged_node_not_has_block_data() {
|
2023-01-26 08:24:09 +03:00
|
|
|
let now = chrono::Utc::now().timestamp().into();
|
2022-12-05 04:10:20 +03:00
|
|
|
|
|
|
|
// head block is an hour old
|
|
|
|
let head_block = Block {
|
|
|
|
hash: Some(H256::random()),
|
|
|
|
number: Some(1_000_000.into()),
|
|
|
|
timestamp: now - 3600,
|
|
|
|
..Default::default()
|
|
|
|
};
|
|
|
|
|
|
|
|
let head_block = Arc::new(head_block);
|
|
|
|
|
2023-01-26 08:24:09 +03:00
|
|
|
let head_block = Web3ProxyBlock::new(head_block);
|
2022-12-05 04:10:20 +03:00
|
|
|
let block_data_limit = u64::MAX;
|
|
|
|
|
|
|
|
let metrics = OpenRequestHandleMetrics::default();
|
|
|
|
|
2023-02-06 20:55:27 +03:00
|
|
|
let x = Web3Rpc {
|
2022-12-05 04:10:20 +03:00
|
|
|
name: "name".to_string(),
|
2023-01-03 22:54:24 +03:00
|
|
|
db_conn: None,
|
2022-12-05 04:10:20 +03:00
|
|
|
display_name: None,
|
|
|
|
url: "ws://example.com".to_string(),
|
|
|
|
http_client: None,
|
|
|
|
active_requests: 0.into(),
|
|
|
|
frontend_requests: 0.into(),
|
|
|
|
internal_requests: 0.into(),
|
2022-12-06 00:13:36 +03:00
|
|
|
provider_state: AsyncRwLock::new(ProviderState::None),
|
2022-12-05 04:10:20 +03:00
|
|
|
hard_limit: None,
|
|
|
|
soft_limit: 1_000,
|
2022-12-06 00:13:36 +03:00
|
|
|
automatic_block_limit: false,
|
2023-01-19 13:13:00 +03:00
|
|
|
backup: false,
|
2022-12-05 04:10:20 +03:00
|
|
|
block_data_limit: block_data_limit.into(),
|
2023-01-04 09:37:51 +03:00
|
|
|
tier: 0,
|
2023-07-10 21:05:07 +03:00
|
|
|
head_block: AsyncRwLock::new(Some(head_block.clone())),
|
2022-12-05 04:10:20 +03:00
|
|
|
};
|
|
|
|
|
2023-10-03 23:46:27 +03:00
|
|
|
assert!(!x.has_block_data(0.into()));
|
|
|
|
assert!(!x.has_block_data(1.into()));
|
|
|
|
assert!(!x.has_block_data(head_block.number());
|
|
|
|
assert!(!x.has_block_data(head_block.number() + 1));
|
|
|
|
assert!(!x.has_block_data(head_block.number() + 1000));
|
2022-12-05 04:10:20 +03:00
|
|
|
}
|
2023-01-19 14:05:39 +03:00
|
|
|
*/
|
2022-11-22 23:23:08 +03:00
|
|
|
}
|