first pass at less sturcts
This commit is contained in:
parent
d1af945de2
commit
b47482298c
|
@ -48,12 +48,6 @@ version = "1.0.57"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc"
|
checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "arc-swap"
|
|
||||||
version = "1.5.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "argh"
|
name = "argh"
|
||||||
version = "0.1.7"
|
version = "0.1.7"
|
||||||
|
@ -120,12 +114,6 @@ dependencies = [
|
||||||
"rustc_version",
|
"rustc_version",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "atomic-counter"
|
|
||||||
version = "1.0.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "62f447d68cfa5a9ab0c1c862a703da2a65b5ed1b7ce1153c9eb0169506d56019"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "atty"
|
name = "atty"
|
||||||
version = "0.2.14"
|
version = "0.2.14"
|
||||||
|
@ -2656,6 +2644,12 @@ dependencies = [
|
||||||
"syn",
|
"syn",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rustc-hash"
|
||||||
|
version = "1.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rustc-hex"
|
name = "rustc-hex"
|
||||||
version = "2.1.0"
|
version = "2.1.0"
|
||||||
|
@ -3756,16 +3750,16 @@ name = "web3-proxy"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"arc-swap",
|
|
||||||
"argh",
|
"argh",
|
||||||
"atomic-counter",
|
|
||||||
"derive_more",
|
"derive_more",
|
||||||
"ethers",
|
"ethers",
|
||||||
"futures",
|
"futures",
|
||||||
|
"fxhash",
|
||||||
"governor",
|
"governor",
|
||||||
"parking_lot 0.12.0",
|
"parking_lot 0.12.0",
|
||||||
"regex",
|
"regex",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
|
"rustc-hash",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
|
|
@ -6,18 +6,18 @@ edition = "2021"
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
arc-swap = "1.5.0"
|
|
||||||
argh = "0.1.7"
|
argh = "0.1.7"
|
||||||
anyhow = "1.0.57"
|
anyhow = "1.0.57"
|
||||||
atomic-counter = "1.0.1"
|
|
||||||
derive_more = "0.99.17"
|
derive_more = "0.99.17"
|
||||||
ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] }
|
ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] }
|
||||||
futures = { version = "0.3.21", features = ["thread-pool"] }
|
futures = { version = "0.3.21", features = ["thread-pool"] }
|
||||||
|
fxhash = "0.2.1"
|
||||||
governor = { version = "0.4.2", features = ["dashmap", "std"] }
|
governor = { version = "0.4.2", features = ["dashmap", "std"] }
|
||||||
tokio = { version = "1.18.0", features = ["full"] }
|
tokio = { version = "1.18.0", features = ["full"] }
|
||||||
parking_lot = "0.12.0"
|
parking_lot = { version = "0.12.0" }
|
||||||
regex = "1.5.5"
|
regex = "1.5.5"
|
||||||
reqwest = { version = "0.11.10", features = ["json", "rustls"] }
|
reqwest = { version = "0.11.10", features = ["json", "rustls"] }
|
||||||
|
rustc-hash = { version = "1.0" }
|
||||||
serde = { version = "1.0.136", features = [] }
|
serde = { version = "1.0.136", features = [] }
|
||||||
serde_json = { version = "1.0.79", default-features = false, features = ["alloc"] }
|
serde_json = { version = "1.0.79", default-features = false, features = ["alloc"] }
|
||||||
tracing = "0.1.34"
|
tracing = "0.1.34"
|
||||||
|
|
|
@ -1,204 +0,0 @@
|
||||||
///! Track the head block of all the web3 providers
|
|
||||||
use ethers::prelude::{Block, TxHash};
|
|
||||||
use std::cmp;
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::fmt;
|
|
||||||
use std::sync::atomic::{self, AtomicU64};
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
|
||||||
use tokio::sync::{mpsc, watch, Mutex};
|
|
||||||
use tracing::info;
|
|
||||||
|
|
||||||
// TODO: what type for the Item? String url works, but i don't love it
|
|
||||||
pub type NewHead = (String, Block<TxHash>);
|
|
||||||
|
|
||||||
pub type BlockWatcherSender = mpsc::UnboundedSender<NewHead>;
|
|
||||||
pub type BlockWatcherReceiver = mpsc::UnboundedReceiver<NewHead>;
|
|
||||||
|
|
||||||
// TODO: ethers has a similar SyncingStatus
|
|
||||||
#[derive(Eq)]
|
|
||||||
pub enum SyncStatus {
|
|
||||||
Synced(u64),
|
|
||||||
Behind(u64),
|
|
||||||
Unknown,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Ord for SyncStatus {
|
|
||||||
fn cmp(&self, other: &Self) -> cmp::Ordering {
|
|
||||||
match (self, other) {
|
|
||||||
(SyncStatus::Synced(a), SyncStatus::Synced(b)) => a.cmp(b),
|
|
||||||
(SyncStatus::Synced(_), SyncStatus::Behind(_)) => cmp::Ordering::Greater,
|
|
||||||
(SyncStatus::Synced(_), SyncStatus::Unknown) => cmp::Ordering::Greater,
|
|
||||||
(SyncStatus::Behind(_), SyncStatus::Synced(_)) => cmp::Ordering::Less,
|
|
||||||
(SyncStatus::Behind(a), SyncStatus::Behind(b)) => a.cmp(b),
|
|
||||||
(SyncStatus::Behind(_), SyncStatus::Unknown) => cmp::Ordering::Greater,
|
|
||||||
(SyncStatus::Unknown, SyncStatus::Synced(_)) => cmp::Ordering::Less,
|
|
||||||
(SyncStatus::Unknown, SyncStatus::Behind(_)) => cmp::Ordering::Less,
|
|
||||||
(SyncStatus::Unknown, SyncStatus::Unknown) => cmp::Ordering::Equal,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PartialOrd for SyncStatus {
|
|
||||||
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
|
|
||||||
Some(self.cmp(other))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PartialEq for SyncStatus {
|
|
||||||
fn eq(&self, other: &Self) -> bool {
|
|
||||||
self.cmp(other) == cmp::Ordering::Equal
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct BlockWatcher {
|
|
||||||
sender: BlockWatcherSender,
|
|
||||||
/// this Mutex is locked over awaits, so we want an async lock
|
|
||||||
receiver: Mutex<BlockWatcherReceiver>,
|
|
||||||
// TODO: better key
|
|
||||||
block_numbers: HashMap<String, AtomicU64>,
|
|
||||||
head_block_number: AtomicU64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Debug for BlockWatcher {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
// TODO: the default formatter takes forever to write. this is too quiet though
|
|
||||||
write!(f, "BlockWatcher(...)")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BlockWatcher {
|
|
||||||
pub fn new(rpcs: Vec<String>) -> Self {
|
|
||||||
let (sender, receiver) = mpsc::unbounded_channel();
|
|
||||||
|
|
||||||
let block_numbers = rpcs.into_iter().map(|rpc| (rpc, 0.into())).collect();
|
|
||||||
|
|
||||||
Self {
|
|
||||||
sender,
|
|
||||||
receiver: Mutex::new(receiver),
|
|
||||||
block_numbers,
|
|
||||||
head_block_number: Default::default(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn clone_sender(&self) -> BlockWatcherSender {
|
|
||||||
self.sender.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn sync_status(&self, rpc: &str, allowed_lag: u64) -> SyncStatus {
|
|
||||||
match (
|
|
||||||
self.head_block_number.load(atomic::Ordering::Acquire),
|
|
||||||
self.block_numbers.get(rpc),
|
|
||||||
) {
|
|
||||||
(0, _) => SyncStatus::Unknown,
|
|
||||||
(_, None) => SyncStatus::Unknown,
|
|
||||||
(head_block_number, Some(rpc_block_number)) => {
|
|
||||||
let rpc_block_number = rpc_block_number.load(atomic::Ordering::Acquire);
|
|
||||||
|
|
||||||
match head_block_number.cmp(&rpc_block_number) {
|
|
||||||
cmp::Ordering::Equal => SyncStatus::Synced(0),
|
|
||||||
cmp::Ordering::Greater => {
|
|
||||||
// this probably won't happen, but it might if the block arrives at the exact wrong time
|
|
||||||
// TODO: should this be negative?
|
|
||||||
SyncStatus::Synced(0)
|
|
||||||
}
|
|
||||||
cmp::Ordering::Less => {
|
|
||||||
// allow being some behind
|
|
||||||
let lag = head_block_number - rpc_block_number;
|
|
||||||
|
|
||||||
if lag <= allowed_lag {
|
|
||||||
SyncStatus::Synced(lag)
|
|
||||||
} else {
|
|
||||||
SyncStatus::Behind(lag)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn run(
|
|
||||||
self: Arc<Self>,
|
|
||||||
new_block_sender: watch::Sender<String>,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let mut receiver = self.receiver.lock().await;
|
|
||||||
|
|
||||||
while let Some((rpc, new_block)) = receiver.recv().await {
|
|
||||||
let new_block_number = new_block.number.unwrap().as_u64();
|
|
||||||
|
|
||||||
{
|
|
||||||
if let Some(rpc_block_number) = self.block_numbers.get(&rpc) {
|
|
||||||
let rpc_block_number = rpc_block_number.load(atomic::Ordering::Acquire);
|
|
||||||
|
|
||||||
// if we already have this block height
|
|
||||||
// this probably own't happen with websockets, but is likely with polling against http rpcs
|
|
||||||
// TODO: should we compare more than just height? hash too?
|
|
||||||
if rpc_block_number == new_block_number {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let now = SystemTime::now()
|
|
||||||
.duration_since(UNIX_EPOCH)
|
|
||||||
.expect("Time went backwards")
|
|
||||||
.as_secs() as i64;
|
|
||||||
|
|
||||||
// save the block for this rpc
|
|
||||||
// TODO: store the actual chain as a graph and then have self.blocks point to that?
|
|
||||||
self.block_numbers
|
|
||||||
.get(&rpc)
|
|
||||||
.unwrap()
|
|
||||||
.swap(new_block_number, atomic::Ordering::Release);
|
|
||||||
|
|
||||||
let head_number = self.head_block_number.load(atomic::Ordering::Acquire);
|
|
||||||
|
|
||||||
let label_slow_heads = if head_number == 0 {
|
|
||||||
// first block seen
|
|
||||||
self.head_block_number
|
|
||||||
.swap(new_block_number, atomic::Ordering::AcqRel);
|
|
||||||
", +".to_string()
|
|
||||||
} else {
|
|
||||||
// TODO: what if they have the same number but different hashes?
|
|
||||||
// TODO: alert if there is a large chain split?
|
|
||||||
match (new_block_number).cmp(&head_number) {
|
|
||||||
cmp::Ordering::Equal => {
|
|
||||||
// this block is already saved as the head
|
|
||||||
"".to_string()
|
|
||||||
}
|
|
||||||
cmp::Ordering::Greater => {
|
|
||||||
// new_block is the new head_block
|
|
||||||
self.head_block_number
|
|
||||||
.swap(new_block_number, atomic::Ordering::AcqRel);
|
|
||||||
", +".to_string()
|
|
||||||
}
|
|
||||||
cmp::Ordering::Less => {
|
|
||||||
// this rpc is behind
|
|
||||||
let lag = new_block_number as i64 - head_number as i64;
|
|
||||||
|
|
||||||
let mut s = ", ".to_string();
|
|
||||||
|
|
||||||
s.push_str(&lag.to_string());
|
|
||||||
|
|
||||||
s
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// have the provider tiers update_synced_rpcs
|
|
||||||
new_block_sender.send(rpc.clone())?;
|
|
||||||
|
|
||||||
// TODO: include time since last update?
|
|
||||||
info!(
|
|
||||||
"{:?} = {}, {}, {} sec{}",
|
|
||||||
new_block.hash.unwrap(),
|
|
||||||
new_block.number.unwrap(),
|
|
||||||
rpc,
|
|
||||||
now - new_block.timestamp.as_u64() as i64,
|
|
||||||
label_slow_heads
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,6 +1,6 @@
|
||||||
///! Communicate with a web3 providers
|
///! Communicate with a web3 provider
|
||||||
use derive_more::From;
|
use derive_more::From;
|
||||||
use ethers::prelude::{BlockNumber, Middleware};
|
use ethers::prelude::Middleware;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use governor::clock::{QuantaClock, QuantaInstant};
|
use governor::clock::{QuantaClock, QuantaInstant};
|
||||||
use governor::middleware::NoOpMiddleware;
|
use governor::middleware::NoOpMiddleware;
|
||||||
|
@ -10,13 +10,15 @@ use governor::RateLimiter;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::value::RawValue;
|
use serde_json::value::RawValue;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::sync::atomic::{self, AtomicU32};
|
use std::hash::{Hash, Hasher};
|
||||||
|
use std::num::NonZeroU32;
|
||||||
|
use std::sync::atomic::{self, AtomicU32, AtomicU64};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::{cmp::Ordering, sync::Arc};
|
use std::{cmp::Ordering, sync::Arc};
|
||||||
use tokio::time::interval;
|
use tokio::time::interval;
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
|
|
||||||
use crate::block_watcher::BlockWatcherSender;
|
use crate::connections::Web3Connections;
|
||||||
|
|
||||||
type Web3RateLimiter =
|
type Web3RateLimiter =
|
||||||
RateLimiter<NotKeyed, InMemoryState, QuantaClock, NoOpMiddleware<QuantaInstant>>;
|
RateLimiter<NotKeyed, InMemoryState, QuantaClock, NoOpMiddleware<QuantaInstant>>;
|
||||||
|
@ -62,95 +64,58 @@ pub enum Web3Provider {
|
||||||
|
|
||||||
impl fmt::Debug for Web3Provider {
|
impl fmt::Debug for Web3Provider {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
// TODO: the default formatter takes forever to write. this is too quiet though
|
// TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url
|
||||||
f.debug_struct("Web3Provider").finish_non_exhaustive()
|
f.debug_struct("Web3Provider").finish_non_exhaustive()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Forward functions to the inner ethers::providers::Provider
|
|
||||||
impl Web3Provider {
|
|
||||||
/// Send a web3 request
|
|
||||||
pub async fn request(
|
|
||||||
&self,
|
|
||||||
method: &str,
|
|
||||||
params: Box<serde_json::value::RawValue>,
|
|
||||||
) -> Result<JsonRpcForwardedResponse, ethers::prelude::ProviderError> {
|
|
||||||
match self {
|
|
||||||
Self::Http(provider) => provider.request(method, params).await,
|
|
||||||
Self::Ws(provider) => provider.request(method, params).await,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Subscribe to new blocks
|
|
||||||
pub async fn new_heads(
|
|
||||||
&self,
|
|
||||||
url: String,
|
|
||||||
block_watcher_sender: BlockWatcherSender,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
info!("Watching new_heads from {}", url);
|
|
||||||
|
|
||||||
match &self {
|
|
||||||
Web3Provider::Http(provider) => {
|
|
||||||
// TODO: there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
|
|
||||||
// TODO: what should this interval be?
|
|
||||||
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
|
|
||||||
let mut interval = interval(Duration::from_secs(2));
|
|
||||||
|
|
||||||
loop {
|
|
||||||
// wait for 2 seconds
|
|
||||||
interval.tick().await;
|
|
||||||
|
|
||||||
match provider.get_block(BlockNumber::Latest).await {
|
|
||||||
Ok(Some(block)) => block_watcher_sender.send((url.clone(), block)).unwrap(),
|
|
||||||
Ok(None) => warn!("no black at {}", url),
|
|
||||||
Err(e) => warn!("getBlock at {} failed: {}", url, e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Web3Provider::Ws(provider) => {
|
|
||||||
// TODO: automatically reconnect?
|
|
||||||
let mut stream = provider.subscribe_blocks().await?;
|
|
||||||
while let Some(block) = stream.next().await {
|
|
||||||
block_watcher_sender.send((url.clone(), block)).unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("Done watching new_heads from {}", url);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// An active connection to a Web3Rpc
|
/// An active connection to a Web3Rpc
|
||||||
pub struct Web3Connection {
|
pub struct Web3Connection {
|
||||||
|
/// TODO: can we get this from the provider? do we even need it?
|
||||||
|
url: String,
|
||||||
/// keep track of currently open requests. We sort on this
|
/// keep track of currently open requests. We sort on this
|
||||||
active_requests: AtomicU32,
|
active_requests: AtomicU32,
|
||||||
provider: Arc<Web3Provider>,
|
provider: Web3Provider,
|
||||||
ratelimiter: Option<Web3RateLimiter>,
|
ratelimiter: Option<Web3RateLimiter>,
|
||||||
/// used for load balancing to the least loaded server
|
/// used for load balancing to the least loaded server
|
||||||
soft_limit: f32,
|
soft_limit: u32,
|
||||||
|
head_block_number: AtomicU64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Hash for Web3Connection {
|
||||||
|
fn hash<H: Hasher>(&self, state: &mut H) {
|
||||||
|
self.url.hash(state);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Debug for Web3Connection {
|
impl fmt::Debug for Web3Connection {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
f.debug_struct("Web3Connection").finish_non_exhaustive()
|
f.debug_struct("Web3Connection")
|
||||||
|
.field("url", &self.url)
|
||||||
|
.finish_non_exhaustive()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Web3Connection {
|
impl Web3Connection {
|
||||||
pub fn clone_provider(&self) -> Arc<Web3Provider> {
|
|
||||||
self.provider.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Connect to a web3 rpc and subscribe to new heads
|
/// Connect to a web3 rpc and subscribe to new heads
|
||||||
pub async fn try_new(
|
pub async fn try_new(
|
||||||
url_str: String,
|
url_str: String,
|
||||||
http_client: Option<reqwest::Client>,
|
http_client: Option<reqwest::Client>,
|
||||||
block_watcher_sender: BlockWatcherSender,
|
hard_rate_limit: Option<u32>,
|
||||||
hard_rate_limiter: Option<Web3RateLimiter>,
|
clock: Option<&QuantaClock>,
|
||||||
soft_limit: f32,
|
// TODO: think more about this type
|
||||||
|
soft_limit: u32,
|
||||||
) -> anyhow::Result<Web3Connection> {
|
) -> anyhow::Result<Web3Connection> {
|
||||||
|
let hard_rate_limiter = if let Some(hard_rate_limit) = hard_rate_limit {
|
||||||
|
let quota = governor::Quota::per_second(NonZeroU32::new(hard_rate_limit).unwrap());
|
||||||
|
|
||||||
|
let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock.unwrap());
|
||||||
|
|
||||||
|
Some(rate_limiter)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
let provider = if url_str.starts_with("http") {
|
let provider = if url_str.starts_with("http") {
|
||||||
let url: url::Url = url_str.parse()?;
|
let url: url::Url = url_str.parse()?;
|
||||||
|
|
||||||
|
@ -175,27 +140,104 @@ impl Web3Connection {
|
||||||
return Err(anyhow::anyhow!("only http and ws servers are supported"));
|
return Err(anyhow::anyhow!("only http and ws servers are supported"));
|
||||||
};
|
};
|
||||||
|
|
||||||
let provider = Arc::new(provider);
|
|
||||||
|
|
||||||
// subscribe to new heads in a spawned future
|
|
||||||
let provider_clone: Arc<Web3Provider> = Arc::clone(&provider);
|
|
||||||
tokio::spawn(async move {
|
|
||||||
while let Err(e) = provider_clone
|
|
||||||
.new_heads(url_str.clone(), block_watcher_sender.clone())
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
warn!("new_heads error for {}: {:?}", url_str, e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(Web3Connection {
|
Ok(Web3Connection {
|
||||||
|
url: url_str.clone(),
|
||||||
active_requests: Default::default(),
|
active_requests: Default::default(),
|
||||||
provider,
|
provider,
|
||||||
ratelimiter: hard_rate_limiter,
|
ratelimiter: hard_rate_limiter,
|
||||||
soft_limit,
|
soft_limit,
|
||||||
|
head_block_number: 0.into(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn active_requests(&self) -> u32 {
|
||||||
|
self.active_requests.load(atomic::Ordering::Acquire)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Subscribe to new blocks
|
||||||
|
// #[instrument]
|
||||||
|
pub async fn new_heads(
|
||||||
|
self: Arc<Self>,
|
||||||
|
connections: Option<Arc<Web3Connections>>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
info!("Watching new_heads on {:?}", self);
|
||||||
|
|
||||||
|
match &self.provider {
|
||||||
|
Web3Provider::Http(provider) => {
|
||||||
|
// TODO: there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
|
||||||
|
// TODO: what should this interval be? probably some fraction of block time
|
||||||
|
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
|
||||||
|
let mut interval = interval(Duration::from_secs(2));
|
||||||
|
|
||||||
|
loop {
|
||||||
|
// wait for 2 seconds
|
||||||
|
interval.tick().await;
|
||||||
|
|
||||||
|
match (
|
||||||
|
&connections,
|
||||||
|
provider.get_block_number().await.map(|x| x.as_u64()),
|
||||||
|
) {
|
||||||
|
(None, Ok(block_number)) => {
|
||||||
|
// TODO: only store if this isn't already stored?
|
||||||
|
// TODO: also send something to the provider_tier so it can sort?
|
||||||
|
self.head_block_number
|
||||||
|
.store(block_number, atomic::Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
(Some(connections), Ok(block_number)) => {
|
||||||
|
let old_block_number = self
|
||||||
|
.head_block_number
|
||||||
|
.swap(block_number, atomic::Ordering::SeqCst);
|
||||||
|
|
||||||
|
if old_block_number != block_number {
|
||||||
|
connections.update_synced_rpcs(&self, block_number)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(_, Err(e)) => warn!("getBlockNumber failed: {}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Web3Provider::Ws(provider) => {
|
||||||
|
// TODO: automatically reconnect?
|
||||||
|
// TODO: it would be faster to get the block number, but subscriptions don't provide that
|
||||||
|
// TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out?
|
||||||
|
let mut stream = provider.subscribe_blocks().await?;
|
||||||
|
while let Some(block) = stream.next().await {
|
||||||
|
let block_number = block.number.unwrap().as_u64();
|
||||||
|
|
||||||
|
// TODO: only store if this isn't already stored?
|
||||||
|
// TODO: also send something to the provider_tier so it can sort?
|
||||||
|
let old_block_number = self
|
||||||
|
.head_block_number
|
||||||
|
.swap(block_number, atomic::Ordering::SeqCst);
|
||||||
|
|
||||||
|
if old_block_number != block_number {
|
||||||
|
info!("new block on {:?}: {}", self, block_number);
|
||||||
|
|
||||||
|
if let Some(connections) = &connections {
|
||||||
|
connections.update_synced_rpcs(&self, block_number)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Done watching new_heads");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a web3 request
|
||||||
|
pub async fn request(
|
||||||
|
&self,
|
||||||
|
method: &str,
|
||||||
|
params: &serde_json::value::RawValue,
|
||||||
|
) -> Result<JsonRpcForwardedResponse, ethers::prelude::ProviderError> {
|
||||||
|
match &self.provider {
|
||||||
|
Web3Provider::Http(provider) => provider.request(method, params).await,
|
||||||
|
Web3Provider::Ws(provider) => provider.request(method, params).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn try_inc_active_requests(&self) -> Result<(), NotUntil<QuantaInstant>> {
|
pub fn try_inc_active_requests(&self) -> Result<(), NotUntil<QuantaInstant>> {
|
||||||
// check rate limits
|
// check rate limits
|
||||||
if let Some(ratelimiter) = self.ratelimiter.as_ref() {
|
if let Some(ratelimiter) = self.ratelimiter.as_ref() {
|
||||||
|
@ -215,14 +257,14 @@ impl Web3Connection {
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: what ordering?!
|
// TODO: what ordering?!
|
||||||
self.active_requests.fetch_add(1, atomic::Ordering::AcqRel);
|
self.active_requests.fetch_add(1, atomic::Ordering::SeqCst);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn dec_active_requests(&self) {
|
pub fn dec_active_requests(&self) {
|
||||||
// TODO: what ordering?!
|
// TODO: what ordering?!
|
||||||
self.active_requests.fetch_sub(1, atomic::Ordering::AcqRel);
|
self.active_requests.fetch_sub(1, atomic::Ordering::SeqCst);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -231,12 +273,12 @@ impl Eq for Web3Connection {}
|
||||||
impl Ord for Web3Connection {
|
impl Ord for Web3Connection {
|
||||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||||
// TODO: what atomic ordering?!
|
// TODO: what atomic ordering?!
|
||||||
let a = self.active_requests.load(atomic::Ordering::Acquire);
|
let a = self.active_requests.load(atomic::Ordering::SeqCst);
|
||||||
let b = other.active_requests.load(atomic::Ordering::Acquire);
|
let b = other.active_requests.load(atomic::Ordering::SeqCst);
|
||||||
|
|
||||||
// TODO: how should we include the soft limit? floats are slower than integer math
|
// TODO: how should we include the soft limit? floats are slower than integer math
|
||||||
let a = a as f32 / self.soft_limit;
|
let a = a as f32 / self.soft_limit as f32;
|
||||||
let b = b as f32 / other.soft_limit;
|
let b = b as f32 / other.soft_limit as f32;
|
||||||
|
|
||||||
a.partial_cmp(&b).unwrap()
|
a.partial_cmp(&b).unwrap()
|
||||||
}
|
}
|
|
@ -0,0 +1,297 @@
|
||||||
|
///! Communicate with a group of web3 providers
|
||||||
|
use derive_more::From;
|
||||||
|
use futures::stream::FuturesUnordered;
|
||||||
|
use futures::StreamExt;
|
||||||
|
use fxhash::FxHashMap;
|
||||||
|
use governor::clock::{QuantaClock, QuantaInstant};
|
||||||
|
use governor::NotUntil;
|
||||||
|
use parking_lot::RwLock;
|
||||||
|
use serde_json::value::RawValue;
|
||||||
|
use std::cmp;
|
||||||
|
use std::fmt;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tracing::{info, warn};
|
||||||
|
|
||||||
|
use crate::connection::{JsonRpcForwardedResponse, Web3Connection};
|
||||||
|
|
||||||
|
#[derive(Clone, Default)]
|
||||||
|
struct SyncedConnections {
|
||||||
|
head_block_number: u64,
|
||||||
|
inner: Vec<Arc<Web3Connection>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SyncedConnections {
|
||||||
|
fn new(max_connections: usize) -> Self {
|
||||||
|
let inner = Vec::with_capacity(max_connections);
|
||||||
|
|
||||||
|
Self {
|
||||||
|
head_block_number: 0,
|
||||||
|
inner,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A collection of web3 connections. Sends requests either the current best server or all servers.
|
||||||
|
#[derive(From)]
|
||||||
|
pub struct Web3Connections {
|
||||||
|
inner: Vec<Arc<Web3Connection>>,
|
||||||
|
/// TODO: what is the best type for this? Heavy reads with writes every few seconds. When writes happen, there is a burst of them
|
||||||
|
synced_connections: RwLock<SyncedConnections>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for Web3Connections {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
// TODO: the default formatter takes forever to write. this is too quiet though
|
||||||
|
f.debug_struct("Web3Connections")
|
||||||
|
.field("inner", &self.inner)
|
||||||
|
.finish_non_exhaustive()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Web3Connections {
|
||||||
|
pub async fn try_new(
|
||||||
|
// TODO: servers should be a Web3ConnectionBuilder struct
|
||||||
|
servers: Vec<(&str, u32, Option<u32>)>,
|
||||||
|
http_client: Option<reqwest::Client>,
|
||||||
|
clock: &QuantaClock,
|
||||||
|
) -> anyhow::Result<Arc<Self>> {
|
||||||
|
let mut connections = vec![];
|
||||||
|
|
||||||
|
let num_connections = servers.len();
|
||||||
|
|
||||||
|
for (s, soft_rate_limit, hard_rate_limit) in servers.into_iter() {
|
||||||
|
let connection = Web3Connection::try_new(
|
||||||
|
s.to_string(),
|
||||||
|
http_client.clone(),
|
||||||
|
hard_rate_limit,
|
||||||
|
Some(clock),
|
||||||
|
soft_rate_limit,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let connection = Arc::new(connection);
|
||||||
|
|
||||||
|
connections.push(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
let connections = Arc::new(Self {
|
||||||
|
inner: connections,
|
||||||
|
synced_connections: RwLock::new(SyncedConnections::new(num_connections)),
|
||||||
|
});
|
||||||
|
|
||||||
|
for connection in connections.inner.iter() {
|
||||||
|
// subscribe to new heads in a spawned future
|
||||||
|
let connection = Arc::clone(connection);
|
||||||
|
let connections = connections.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = connection.new_heads(Some(connections)).await {
|
||||||
|
warn!("new_heads error: {:?}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(connections)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn try_send_request(
|
||||||
|
&self,
|
||||||
|
connection: &Web3Connection,
|
||||||
|
method: &str,
|
||||||
|
params: &RawValue,
|
||||||
|
) -> anyhow::Result<JsonRpcForwardedResponse> {
|
||||||
|
// connection.in_active_requests was called when this rpc was selected
|
||||||
|
|
||||||
|
let response = connection.request(method, params).await;
|
||||||
|
|
||||||
|
connection.dec_active_requests();
|
||||||
|
|
||||||
|
// TODO: if "no block with that header" or some other jsonrpc errors, skip this response
|
||||||
|
|
||||||
|
response.map_err(Into::into)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn try_send_requests(
|
||||||
|
self: Arc<Self>,
|
||||||
|
connections: Vec<Arc<Web3Connection>>,
|
||||||
|
method: String,
|
||||||
|
params: Box<RawValue>,
|
||||||
|
response_sender: mpsc::UnboundedSender<anyhow::Result<JsonRpcForwardedResponse>>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let mut unordered_futures = FuturesUnordered::new();
|
||||||
|
|
||||||
|
for connection in connections {
|
||||||
|
// clone things so we can pass them to a future
|
||||||
|
let connections = self.clone();
|
||||||
|
let method = method.clone();
|
||||||
|
let params = params.clone();
|
||||||
|
let response_sender = response_sender.clone();
|
||||||
|
|
||||||
|
let handle = tokio::spawn(async move {
|
||||||
|
// get the client for this rpc server
|
||||||
|
let response = connections
|
||||||
|
.try_send_request(connection.as_ref(), &method, ¶ms)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// send the first good response to a one shot channel. that way we respond quickly
|
||||||
|
// drop the result because errors are expected after the first send
|
||||||
|
response_sender.send(Ok(response)).map_err(Into::into)
|
||||||
|
});
|
||||||
|
|
||||||
|
unordered_futures.push(handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: use iterators instead of pushing into a vec
|
||||||
|
let mut errs = vec![];
|
||||||
|
if let Some(x) = unordered_futures.next().await {
|
||||||
|
match x.unwrap() {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(e) => {
|
||||||
|
// TODO: better errors
|
||||||
|
warn!("Got an error sending request: {}", e);
|
||||||
|
errs.push(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// get the first error (if any)
|
||||||
|
// TODO: why collect multiple errors if we only pop one?
|
||||||
|
let e = if !errs.is_empty() {
|
||||||
|
Err(errs.pop().unwrap())
|
||||||
|
} else {
|
||||||
|
Err(anyhow::anyhow!("no successful responses"))
|
||||||
|
};
|
||||||
|
|
||||||
|
// send the error to the channel
|
||||||
|
if response_sender.send(e).is_ok() {
|
||||||
|
// if we were able to send an error, then we never sent a success
|
||||||
|
return Err(anyhow::anyhow!("no successful responses"));
|
||||||
|
} else {
|
||||||
|
// if sending the error failed. the other side must be closed (which means we sent a success earlier)
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update_synced_rpcs(
|
||||||
|
&self,
|
||||||
|
rpc: &Arc<Web3Connection>,
|
||||||
|
new_block: u64,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
// TODO: is RwLock the best type for this?
|
||||||
|
// TODO: start with a read lock?
|
||||||
|
let mut synced_connections = self.synced_connections.write();
|
||||||
|
|
||||||
|
// should we load new_block here?
|
||||||
|
|
||||||
|
match synced_connections.head_block_number.cmp(&new_block) {
|
||||||
|
cmp::Ordering::Equal => {
|
||||||
|
// this rpc is synced, but it isn't the first to this block
|
||||||
|
}
|
||||||
|
cmp::Ordering::Less => {
|
||||||
|
// this is a new head block. clear the current synced connections
|
||||||
|
// TODO: this is too verbose with a bunch of tiers. include the tier
|
||||||
|
// info!("new head block from {:?}: {}", rpc, new_block);
|
||||||
|
|
||||||
|
synced_connections.inner.clear();
|
||||||
|
|
||||||
|
synced_connections.head_block_number = new_block;
|
||||||
|
}
|
||||||
|
cmp::Ordering::Greater => {
|
||||||
|
// not the latest block. return now
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let rpc = Arc::clone(rpc);
|
||||||
|
|
||||||
|
synced_connections.inner.push(rpc);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// get the best available rpc server
|
||||||
|
pub async fn next_upstream_server(
|
||||||
|
&self,
|
||||||
|
) -> Result<Arc<Web3Connection>, Option<NotUntil<QuantaInstant>>> {
|
||||||
|
let mut earliest_not_until = None;
|
||||||
|
|
||||||
|
// TODO: this clone is probably not the best way to do this
|
||||||
|
let mut synced_rpcs = self.synced_connections.read().inner.clone();
|
||||||
|
|
||||||
|
// i'm pretty sure i did this safely. Hash on Web3Connection just uses the url and not any of the atomics
|
||||||
|
#[allow(clippy::mutable_key_type)]
|
||||||
|
let cache: FxHashMap<Arc<Web3Connection>, u32> = synced_rpcs
|
||||||
|
.iter()
|
||||||
|
.map(|synced_rpc| (synced_rpc.clone(), synced_rpc.active_requests()))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// TODO: i think we might need to load active connections and then
|
||||||
|
synced_rpcs.sort_unstable_by(|a, b| {
|
||||||
|
let a = cache.get(a).unwrap();
|
||||||
|
let b = cache.get(b).unwrap();
|
||||||
|
|
||||||
|
a.cmp(b)
|
||||||
|
});
|
||||||
|
|
||||||
|
for selected_rpc in synced_rpcs.iter() {
|
||||||
|
// increment our connection counter
|
||||||
|
if let Err(not_until) = selected_rpc.try_inc_active_requests() {
|
||||||
|
earliest_possible(&mut earliest_not_until, not_until);
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// return the selected RPC
|
||||||
|
return Ok(selected_rpc.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
// this might be None
|
||||||
|
Err(earliest_not_until)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// get all rpc servers that are not rate limited
|
||||||
|
/// even fetches if they aren't in sync. This is useful for broadcasting signed transactions
|
||||||
|
pub fn get_upstream_servers(
|
||||||
|
&self,
|
||||||
|
) -> Result<Vec<Arc<Web3Connection>>, Option<NotUntil<QuantaInstant>>> {
|
||||||
|
let mut earliest_not_until = None;
|
||||||
|
// TODO: with capacity?
|
||||||
|
let mut selected_rpcs = vec![];
|
||||||
|
|
||||||
|
for connection in self.inner.iter() {
|
||||||
|
// check rate limits and increment our connection counter
|
||||||
|
if let Err(not_until) = connection.try_inc_active_requests() {
|
||||||
|
earliest_possible(&mut earliest_not_until, not_until);
|
||||||
|
|
||||||
|
// this rpc is not available. skip it
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
selected_rpcs.push(connection.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
if !selected_rpcs.is_empty() {
|
||||||
|
return Ok(selected_rpcs);
|
||||||
|
}
|
||||||
|
|
||||||
|
// return the earliest not_until (if no rpcs are synced, this will be None)
|
||||||
|
Err(earliest_not_until)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn earliest_possible(
|
||||||
|
earliest_not_until_option: &mut Option<NotUntil<QuantaInstant>>,
|
||||||
|
new_not_until: NotUntil<QuantaInstant>,
|
||||||
|
) {
|
||||||
|
match earliest_not_until_option.as_ref() {
|
||||||
|
None => *earliest_not_until_option = Some(new_not_until),
|
||||||
|
Some(earliest_not_until) => {
|
||||||
|
let earliest_possible = earliest_not_until.earliest_possible();
|
||||||
|
let new_earliest_possible = new_not_until.earliest_possible();
|
||||||
|
|
||||||
|
if earliest_possible > new_earliest_possible {
|
||||||
|
*earliest_not_until_option = Some(new_not_until);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
126
src/main.rs
126
src/main.rs
|
@ -1,23 +1,20 @@
|
||||||
mod block_watcher;
|
mod connection;
|
||||||
mod provider;
|
mod connections;
|
||||||
mod provider_tiers;
|
|
||||||
|
|
||||||
use futures::future;
|
use futures::future;
|
||||||
use governor::clock::{Clock, QuantaClock};
|
use governor::clock::{Clock, QuantaClock};
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::sync::{mpsc, watch};
|
use tokio::sync::mpsc;
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
use tracing::warn;
|
use tracing::warn;
|
||||||
use warp::Filter;
|
use warp::Filter;
|
||||||
use warp::Reply;
|
use warp::Reply;
|
||||||
|
|
||||||
use crate::block_watcher::BlockWatcher;
|
use crate::connection::JsonRpcRequest;
|
||||||
use crate::provider::JsonRpcRequest;
|
use crate::connections::Web3Connections;
|
||||||
use crate::provider_tiers::Web3ProviderTier;
|
|
||||||
|
|
||||||
static APP_USER_AGENT: &str = concat!(
|
static APP_USER_AGENT: &str = concat!(
|
||||||
"satoshiandkin/",
|
"satoshiandkin/",
|
||||||
|
@ -33,9 +30,9 @@ struct Web3ProxyApp {
|
||||||
/// TODO: use tokio's clock (will require a different ratelimiting crate)
|
/// TODO: use tokio's clock (will require a different ratelimiting crate)
|
||||||
clock: QuantaClock,
|
clock: QuantaClock,
|
||||||
/// Send requests to the best server available
|
/// Send requests to the best server available
|
||||||
balanced_rpc_tiers: Arc<Vec<Web3ProviderTier>>,
|
balanced_rpc_tiers: Vec<Arc<Web3Connections>>,
|
||||||
/// Send private requests (like eth_sendRawTransaction) to all these servers
|
/// Send private requests (like eth_sendRawTransaction) to all these servers
|
||||||
private_rpcs: Option<Arc<Web3ProviderTier>>,
|
private_rpcs: Option<Arc<Web3Connections>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Debug for Web3ProxyApp {
|
impl fmt::Debug for Web3ProxyApp {
|
||||||
|
@ -47,7 +44,6 @@ impl fmt::Debug for Web3ProxyApp {
|
||||||
|
|
||||||
impl Web3ProxyApp {
|
impl Web3ProxyApp {
|
||||||
async fn try_new(
|
async fn try_new(
|
||||||
allowed_lag: u64,
|
|
||||||
balanced_rpc_tiers: Vec<Vec<(&str, u32, Option<u32>)>>,
|
balanced_rpc_tiers: Vec<Vec<(&str, u32, Option<u32>)>>,
|
||||||
private_rpcs: Vec<(&str, u32, Option<u32>)>,
|
private_rpcs: Vec<(&str, u32, Option<u32>)>,
|
||||||
) -> anyhow::Result<Web3ProxyApp> {
|
) -> anyhow::Result<Web3ProxyApp> {
|
||||||
|
@ -67,8 +63,6 @@ impl Web3ProxyApp {
|
||||||
rpcs.push(rpc);
|
rpcs.push(rpc);
|
||||||
}
|
}
|
||||||
|
|
||||||
let block_watcher = Arc::new(BlockWatcher::new(rpcs));
|
|
||||||
|
|
||||||
// make a http shared client
|
// make a http shared client
|
||||||
// TODO: how should we configure the connection pool?
|
// TODO: how should we configure the connection pool?
|
||||||
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
|
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
|
||||||
|
@ -78,84 +72,22 @@ impl Web3ProxyApp {
|
||||||
.user_agent(APP_USER_AGENT)
|
.user_agent(APP_USER_AGENT)
|
||||||
.build()?;
|
.build()?;
|
||||||
|
|
||||||
let balanced_rpc_tiers = Arc::new(
|
let balanced_rpc_tiers =
|
||||||
future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| {
|
future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| {
|
||||||
Web3ProviderTier::try_new(
|
Web3Connections::try_new(balanced_rpc_tier, Some(http_client.clone()), &clock)
|
||||||
balanced_rpc_tier,
|
|
||||||
Some(http_client.clone()),
|
|
||||||
block_watcher.clone(),
|
|
||||||
&clock,
|
|
||||||
)
|
|
||||||
}))
|
}))
|
||||||
.await
|
.await
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.collect::<anyhow::Result<Vec<Web3ProviderTier>>>()?,
|
.collect::<anyhow::Result<Vec<Arc<Web3Connections>>>>()?;
|
||||||
);
|
|
||||||
|
|
||||||
let private_rpcs = if private_rpcs.is_empty() {
|
let private_rpcs = if private_rpcs.is_empty() {
|
||||||
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
|
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
|
||||||
// TODO: instead of None, set it to a list of all the rpcs from balanced_rpc_tiers. that way we broadcast very loudly
|
// TODO: instead of None, set it to a list of all the rpcs from balanced_rpc_tiers. that way we broadcast very loudly
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
Some(Arc::new(
|
Some(Web3Connections::try_new(private_rpcs, Some(http_client), &clock).await?)
|
||||||
Web3ProviderTier::try_new(
|
|
||||||
private_rpcs,
|
|
||||||
Some(http_client),
|
|
||||||
block_watcher.clone(),
|
|
||||||
&clock,
|
|
||||||
)
|
|
||||||
.await?,
|
|
||||||
))
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let (new_block_sender, mut new_block_receiver) = watch::channel::<String>("".to_string());
|
|
||||||
|
|
||||||
{
|
|
||||||
// TODO: spawn this later?
|
|
||||||
// spawn a future for the block_watcher
|
|
||||||
let block_watcher = block_watcher.clone();
|
|
||||||
tokio::spawn(async move { block_watcher.run(new_block_sender).await });
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
// spawn a future for sorting our synced rpcs
|
|
||||||
// TODO: spawn this later?
|
|
||||||
let balanced_rpc_tiers = balanced_rpc_tiers.clone();
|
|
||||||
let private_rpcs = private_rpcs.clone();
|
|
||||||
let block_watcher = block_watcher.clone();
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut tier_map = HashMap::new();
|
|
||||||
let mut private_map = HashMap::new();
|
|
||||||
|
|
||||||
for balanced_rpc_tier in balanced_rpc_tiers.iter() {
|
|
||||||
for rpc in balanced_rpc_tier.clone_rpcs() {
|
|
||||||
tier_map.insert(rpc, balanced_rpc_tier);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(private_rpcs) = private_rpcs {
|
|
||||||
for rpc in private_rpcs.clone_rpcs() {
|
|
||||||
private_map.insert(rpc, private_rpcs.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
while new_block_receiver.changed().await.is_ok() {
|
|
||||||
let updated_rpc = new_block_receiver.borrow().clone();
|
|
||||||
|
|
||||||
if let Some(tier) = tier_map.get(&updated_rpc) {
|
|
||||||
tier.update_synced_rpcs(block_watcher.clone(), allowed_lag)
|
|
||||||
.unwrap();
|
|
||||||
} else if let Some(tier) = private_map.get(&updated_rpc) {
|
|
||||||
tier.update_synced_rpcs(block_watcher.clone(), allowed_lag)
|
|
||||||
.unwrap();
|
|
||||||
} else {
|
|
||||||
panic!("howd this happen");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Web3ProxyApp {
|
Ok(Web3ProxyApp {
|
||||||
clock,
|
clock,
|
||||||
balanced_rpc_tiers,
|
balanced_rpc_tiers,
|
||||||
|
@ -175,11 +107,11 @@ impl Web3ProxyApp {
|
||||||
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
|
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
|
||||||
loop {
|
loop {
|
||||||
// TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit
|
// TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit
|
||||||
match private_rpcs.get_upstream_servers().await {
|
match private_rpcs.get_upstream_servers() {
|
||||||
Ok(upstream_servers) => {
|
Ok(upstream_servers) => {
|
||||||
let (tx, mut rx) = mpsc::unbounded_channel();
|
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let connections = private_rpcs.clone_connections();
|
let connections = private_rpcs.clone();
|
||||||
let method = json_body.method.clone();
|
let method = json_body.method.clone();
|
||||||
let params = json_body.params.clone();
|
let params = json_body.params.clone();
|
||||||
|
|
||||||
|
@ -226,13 +158,11 @@ impl Web3ProxyApp {
|
||||||
// TODO: what allowed lag?
|
// TODO: what allowed lag?
|
||||||
match balanced_rpcs.next_upstream_server().await {
|
match balanced_rpcs.next_upstream_server().await {
|
||||||
Ok(upstream_server) => {
|
Ok(upstream_server) => {
|
||||||
let connections = balanced_rpcs.connections();
|
let response = balanced_rpcs
|
||||||
|
|
||||||
let response = connections
|
|
||||||
.try_send_request(
|
.try_send_request(
|
||||||
upstream_server,
|
&upstream_server,
|
||||||
json_body.method.clone(),
|
&json_body.method,
|
||||||
json_body.params.clone(),
|
&json_body.params,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
@ -307,15 +237,13 @@ async fn main() {
|
||||||
// TODO: be smart about about using archive nodes? have a set that doesn't use archive nodes since queries to them are more valuable
|
// TODO: be smart about about using archive nodes? have a set that doesn't use archive nodes since queries to them are more valuable
|
||||||
let listen_port = 8445;
|
let listen_port = 8445;
|
||||||
// TODO: what should this be? 0 will cause a thundering herd
|
// TODO: what should this be? 0 will cause a thundering herd
|
||||||
let allowed_lag = 0;
|
|
||||||
|
|
||||||
let state = Web3ProxyApp::try_new(
|
let state = Web3ProxyApp::try_new(
|
||||||
allowed_lag,
|
|
||||||
vec![
|
vec![
|
||||||
// local nodes
|
// local nodes
|
||||||
vec![
|
vec![
|
||||||
("ws://10.11.12.16:8545", 68_800, None),
|
("ws://127.0.0.1:8545", 68_800, None),
|
||||||
("ws://10.11.12.16:8946", 152_138, None),
|
("ws://127.0.0.1:8946", 152_138, None),
|
||||||
],
|
],
|
||||||
// paid nodes
|
// paid nodes
|
||||||
// TODO: add paid nodes (with rate limits)
|
// TODO: add paid nodes (with rate limits)
|
||||||
|
@ -324,16 +252,16 @@ async fn main() {
|
||||||
// // moralis free (25/sec rate limit)
|
// // moralis free (25/sec rate limit)
|
||||||
// ],
|
// ],
|
||||||
// free nodes
|
// free nodes
|
||||||
// vec![
|
vec![
|
||||||
// // ("https://main-rpc.linkpool.io", 4_779, None), // linkpool is slow and often offline
|
("https://main-rpc.linkpool.io", 4_779, None), // linkpool is slow and often offline
|
||||||
// ("https://rpc.ankr.com/eth", 23_967, None),
|
("https://rpc.ankr.com/eth", 23_967, None),
|
||||||
// ],
|
],
|
||||||
],
|
],
|
||||||
vec![
|
vec![
|
||||||
// ("https://api.edennetwork.io/v1/", 1_805, None),
|
("https://api.edennetwork.io/v1/", 1_805, None),
|
||||||
// ("https://api.edennetwork.io/v1/beta", 300, None),
|
("https://api.edennetwork.io/v1/beta", 300, None),
|
||||||
// ("https://rpc.ethermine.org/", 5_861, None),
|
("https://rpc.ethermine.org/", 5_861, None),
|
||||||
// ("https://rpc.flashbots.net", 7074, None),
|
("https://rpc.flashbots.net", 7074, None),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
|
|
|
@ -1,303 +0,0 @@
|
||||||
///! Communicate with groups of web3 providers
|
|
||||||
use arc_swap::ArcSwap;
|
|
||||||
use derive_more::From;
|
|
||||||
use futures::stream::FuturesUnordered;
|
|
||||||
use futures::StreamExt;
|
|
||||||
use governor::clock::{QuantaClock, QuantaInstant};
|
|
||||||
use governor::NotUntil;
|
|
||||||
use serde_json::value::RawValue;
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::fmt;
|
|
||||||
use std::num::NonZeroU32;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use tokio::sync::mpsc;
|
|
||||||
use tracing::warn;
|
|
||||||
|
|
||||||
use crate::block_watcher::{BlockWatcher, SyncStatus};
|
|
||||||
use crate::provider::{JsonRpcForwardedResponse, Web3Connection};
|
|
||||||
|
|
||||||
#[derive(From)]
|
|
||||||
pub struct Web3Connections(HashMap<String, Web3Connection>);
|
|
||||||
|
|
||||||
impl Web3Connections {
|
|
||||||
pub fn get(&self, rpc: &str) -> Option<&Web3Connection> {
|
|
||||||
self.0.get(rpc)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn try_send_request(
|
|
||||||
&self,
|
|
||||||
rpc: String,
|
|
||||||
method: String,
|
|
||||||
params: Box<RawValue>,
|
|
||||||
) -> anyhow::Result<JsonRpcForwardedResponse> {
|
|
||||||
let connection = self.get(&rpc).unwrap();
|
|
||||||
|
|
||||||
// TODO: do we need this clone or can we do a reference?
|
|
||||||
let provider = connection.clone_provider();
|
|
||||||
|
|
||||||
let response = provider.request(&method, params).await;
|
|
||||||
|
|
||||||
connection.dec_active_requests();
|
|
||||||
|
|
||||||
// TODO: if "no block with that header" or some other jsonrpc errors, skip this response
|
|
||||||
|
|
||||||
response.map_err(Into::into)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn try_send_requests(
|
|
||||||
self: Arc<Self>,
|
|
||||||
rpc_servers: Vec<String>,
|
|
||||||
method: String,
|
|
||||||
params: Box<RawValue>,
|
|
||||||
// TODO: i think this should actually be a oneshot
|
|
||||||
response_sender: mpsc::UnboundedSender<anyhow::Result<JsonRpcForwardedResponse>>,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let method = Arc::new(method);
|
|
||||||
|
|
||||||
let mut unordered_futures = FuturesUnordered::new();
|
|
||||||
|
|
||||||
for rpc in rpc_servers {
|
|
||||||
let connections = self.clone();
|
|
||||||
let method = method.to_string();
|
|
||||||
let params = params.clone();
|
|
||||||
let response_sender = response_sender.clone();
|
|
||||||
|
|
||||||
let handle = tokio::spawn(async move {
|
|
||||||
// get the client for this rpc server
|
|
||||||
let response = connections.try_send_request(rpc, method, params).await?;
|
|
||||||
|
|
||||||
// send the first good response to a one shot channel. that way we respond quickly
|
|
||||||
// drop the result because errors are expected after the first send
|
|
||||||
response_sender.send(Ok(response)).map_err(Into::into)
|
|
||||||
});
|
|
||||||
|
|
||||||
unordered_futures.push(handle);
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: use iterators instead of pushing into a vec
|
|
||||||
let mut errs = vec![];
|
|
||||||
if let Some(x) = unordered_futures.next().await {
|
|
||||||
match x.unwrap() {
|
|
||||||
Ok(_) => {}
|
|
||||||
Err(e) => {
|
|
||||||
// TODO: better errors
|
|
||||||
warn!("Got an error sending request: {}", e);
|
|
||||||
errs.push(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// get the first error (if any)
|
|
||||||
let e = if !errs.is_empty() {
|
|
||||||
Err(errs.pop().unwrap())
|
|
||||||
} else {
|
|
||||||
Err(anyhow::anyhow!("no successful responses"))
|
|
||||||
};
|
|
||||||
|
|
||||||
// send the error to the channel
|
|
||||||
if response_sender.send(e).is_ok() {
|
|
||||||
// if we were able to send an error, then we never sent a success
|
|
||||||
return Err(anyhow::anyhow!("no successful responses"));
|
|
||||||
} else {
|
|
||||||
// if sending the error failed. the other side must be closed (which means we sent a success earlier)
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Load balance to the rpc
|
|
||||||
pub struct Web3ProviderTier {
|
|
||||||
/// TODO: what type for the rpc? Vec<String> isn't great. i think we want this to be the key for the provider and not the provider itself
|
|
||||||
/// TODO: we probably want a better lock
|
|
||||||
synced_rpcs: ArcSwap<Vec<String>>,
|
|
||||||
rpcs: Vec<String>,
|
|
||||||
connections: Arc<Web3Connections>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Debug for Web3ProviderTier {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
// TODO: the default formatter takes forever to write. this is too quiet though
|
|
||||||
f.debug_struct("Web3ProviderTier").finish_non_exhaustive()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Web3ProviderTier {
|
|
||||||
pub async fn try_new(
|
|
||||||
servers: Vec<(&str, u32, Option<u32>)>,
|
|
||||||
http_client: Option<reqwest::Client>,
|
|
||||||
block_watcher: Arc<BlockWatcher>,
|
|
||||||
clock: &QuantaClock,
|
|
||||||
) -> anyhow::Result<Web3ProviderTier> {
|
|
||||||
let mut rpcs: Vec<String> = vec![];
|
|
||||||
let mut connections = HashMap::new();
|
|
||||||
|
|
||||||
for (s, soft_limit, hard_limit) in servers.into_iter() {
|
|
||||||
rpcs.push(s.to_string());
|
|
||||||
|
|
||||||
let hard_rate_limiter = if let Some(hard_limit) = hard_limit {
|
|
||||||
let quota = governor::Quota::per_second(NonZeroU32::new(hard_limit).unwrap());
|
|
||||||
|
|
||||||
let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock);
|
|
||||||
|
|
||||||
Some(rate_limiter)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
let connection = Web3Connection::try_new(
|
|
||||||
s.to_string(),
|
|
||||||
http_client.clone(),
|
|
||||||
block_watcher.clone_sender(),
|
|
||||||
hard_rate_limiter,
|
|
||||||
soft_limit as f32,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
connections.insert(s.to_string(), connection);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Web3ProviderTier {
|
|
||||||
synced_rpcs: ArcSwap::from(Arc::new(vec![])),
|
|
||||||
rpcs,
|
|
||||||
connections: Arc::new(connections.into()),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn connections(&self) -> &Web3Connections {
|
|
||||||
&self.connections
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn clone_connections(&self) -> Arc<Web3Connections> {
|
|
||||||
self.connections.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn clone_rpcs(&self) -> Vec<String> {
|
|
||||||
self.rpcs.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn update_synced_rpcs(
|
|
||||||
&self,
|
|
||||||
block_watcher: Arc<BlockWatcher>,
|
|
||||||
allowed_lag: u64,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let mut available_rpcs = self.rpcs.clone();
|
|
||||||
|
|
||||||
// collect sync status for all the rpcs
|
|
||||||
let sync_status: HashMap<String, SyncStatus> = available_rpcs
|
|
||||||
.clone()
|
|
||||||
.into_iter()
|
|
||||||
.map(|rpc| {
|
|
||||||
let status = block_watcher.sync_status(&rpc, allowed_lag);
|
|
||||||
(rpc, status)
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
// sort rpcs by their sync status
|
|
||||||
// TODO: if we only changed one entry, we don't need to sort the whole thing. we can do this better
|
|
||||||
available_rpcs.sort_unstable_by(|a, b| {
|
|
||||||
let a_synced = sync_status.get(a).unwrap();
|
|
||||||
let b_synced = sync_status.get(b).unwrap();
|
|
||||||
|
|
||||||
a_synced.cmp(b_synced)
|
|
||||||
});
|
|
||||||
|
|
||||||
// filter out unsynced rpcs
|
|
||||||
let synced_rpcs: Vec<String> = available_rpcs
|
|
||||||
.into_iter()
|
|
||||||
.take_while(|rpc| matches!(sync_status.get(rpc).unwrap(), SyncStatus::Synced(_)))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
// TODO: is arcswap the best type for this?
|
|
||||||
self.synced_rpcs.swap(Arc::new(synced_rpcs));
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// get the best available rpc server
|
|
||||||
pub async fn next_upstream_server(&self) -> Result<String, Option<NotUntil<QuantaInstant>>> {
|
|
||||||
let mut earliest_not_until = None;
|
|
||||||
|
|
||||||
// TODO: this clone is probably not the best way to do this
|
|
||||||
let mut synced_rpcs = Vec::clone(&*self.synced_rpcs.load());
|
|
||||||
|
|
||||||
// TODO: we don't want to sort on active connections. we want to sort on remaining capacity for connections. for example, geth can handle more than erigon
|
|
||||||
synced_rpcs.sort_unstable_by(|a, b| {
|
|
||||||
let a = self.connections.get(a).unwrap();
|
|
||||||
let b = self.connections.get(b).unwrap();
|
|
||||||
|
|
||||||
// sort on active connections
|
|
||||||
a.cmp(b)
|
|
||||||
});
|
|
||||||
|
|
||||||
for selected_rpc in synced_rpcs.iter() {
|
|
||||||
// increment our connection counter
|
|
||||||
if let Err(not_until) = self
|
|
||||||
.connections
|
|
||||||
.get(selected_rpc)
|
|
||||||
.unwrap()
|
|
||||||
.try_inc_active_requests()
|
|
||||||
{
|
|
||||||
// TODO: do this better
|
|
||||||
if earliest_not_until.is_none() {
|
|
||||||
earliest_not_until = Some(not_until);
|
|
||||||
} else {
|
|
||||||
let earliest_possible =
|
|
||||||
earliest_not_until.as_ref().unwrap().earliest_possible();
|
|
||||||
let new_earliest_possible = not_until.earliest_possible();
|
|
||||||
|
|
||||||
if earliest_possible > new_earliest_possible {
|
|
||||||
earliest_not_until = Some(not_until);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// return the selected RPC
|
|
||||||
return Ok(selected_rpc.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
// this might be None
|
|
||||||
Err(earliest_not_until)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// get all available rpc servers
|
|
||||||
pub async fn get_upstream_servers(
|
|
||||||
&self,
|
|
||||||
) -> Result<Vec<String>, Option<NotUntil<QuantaInstant>>> {
|
|
||||||
let mut earliest_not_until = None;
|
|
||||||
let mut selected_rpcs = vec![];
|
|
||||||
for selected_rpc in self.synced_rpcs.load().iter() {
|
|
||||||
// check rate limits and increment our connection counter
|
|
||||||
// TODO: share code with next_upstream_server
|
|
||||||
if let Err(not_until) = self
|
|
||||||
.connections
|
|
||||||
.get(selected_rpc)
|
|
||||||
.unwrap()
|
|
||||||
.try_inc_active_requests()
|
|
||||||
{
|
|
||||||
if earliest_not_until.is_none() {
|
|
||||||
earliest_not_until = Some(not_until);
|
|
||||||
} else {
|
|
||||||
let earliest_possible =
|
|
||||||
earliest_not_until.as_ref().unwrap().earliest_possible();
|
|
||||||
let new_earliest_possible = not_until.earliest_possible();
|
|
||||||
|
|
||||||
if earliest_possible > new_earliest_possible {
|
|
||||||
earliest_not_until = Some(not_until);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// this is rpc should work
|
|
||||||
selected_rpcs.push(selected_rpc.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
if !selected_rpcs.is_empty() {
|
|
||||||
return Ok(selected_rpcs);
|
|
||||||
}
|
|
||||||
|
|
||||||
// return the earliest not_until (if no rpcs are synced, this will be None)
|
|
||||||
Err(earliest_not_until)
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue