diff --git a/Cargo.lock b/Cargo.lock index a30936f2..0bdfe6a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -48,12 +48,6 @@ version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc" -[[package]] -name = "arc-swap" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f" - [[package]] name = "argh" version = "0.1.7" @@ -120,12 +114,6 @@ dependencies = [ "rustc_version", ] -[[package]] -name = "atomic-counter" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62f447d68cfa5a9ab0c1c862a703da2a65b5ed1b7ce1153c9eb0169506d56019" - [[package]] name = "atty" version = "0.2.14" @@ -2656,6 +2644,12 @@ dependencies = [ "syn", ] +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc-hex" version = "2.1.0" @@ -3756,16 +3750,16 @@ name = "web3-proxy" version = "0.1.0" dependencies = [ "anyhow", - "arc-swap", "argh", - "atomic-counter", "derive_more", "ethers", "futures", + "fxhash", "governor", "parking_lot 0.12.0", "regex", "reqwest", + "rustc-hash", "serde", "serde_json", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 338167d5..7868db99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,18 +6,18 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -arc-swap = "1.5.0" argh = "0.1.7" anyhow = "1.0.57" -atomic-counter = "1.0.1" derive_more = "0.99.17" ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] } futures = { version = "0.3.21", features = ["thread-pool"] } +fxhash = "0.2.1" governor = { version = "0.4.2", features = ["dashmap", "std"] } tokio = { version = "1.18.0", features = ["full"] } -parking_lot = "0.12.0" +parking_lot = { version = "0.12.0" } regex = "1.5.5" reqwest = { version = "0.11.10", features = ["json", "rustls"] } +rustc-hash = { version = "1.0" } serde = { version = "1.0.136", features = [] } serde_json = { version = "1.0.79", default-features = false, features = ["alloc"] } tracing = "0.1.34" diff --git a/src/block_watcher.rs b/src/block_watcher.rs deleted file mode 100644 index cfe9561e..00000000 --- a/src/block_watcher.rs +++ /dev/null @@ -1,204 +0,0 @@ -///! Track the head block of all the web3 providers -use ethers::prelude::{Block, TxHash}; -use std::cmp; -use std::collections::HashMap; -use std::fmt; -use std::sync::atomic::{self, AtomicU64}; -use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; -use tokio::sync::{mpsc, watch, Mutex}; -use tracing::info; - -// TODO: what type for the Item? String url works, but i don't love it -pub type NewHead = (String, Block); - -pub type BlockWatcherSender = mpsc::UnboundedSender; -pub type BlockWatcherReceiver = mpsc::UnboundedReceiver; - -// TODO: ethers has a similar SyncingStatus -#[derive(Eq)] -pub enum SyncStatus { - Synced(u64), - Behind(u64), - Unknown, -} - -impl Ord for SyncStatus { - fn cmp(&self, other: &Self) -> cmp::Ordering { - match (self, other) { - (SyncStatus::Synced(a), SyncStatus::Synced(b)) => a.cmp(b), - (SyncStatus::Synced(_), SyncStatus::Behind(_)) => cmp::Ordering::Greater, - (SyncStatus::Synced(_), SyncStatus::Unknown) => cmp::Ordering::Greater, - (SyncStatus::Behind(_), SyncStatus::Synced(_)) => cmp::Ordering::Less, - (SyncStatus::Behind(a), SyncStatus::Behind(b)) => a.cmp(b), - (SyncStatus::Behind(_), SyncStatus::Unknown) => cmp::Ordering::Greater, - (SyncStatus::Unknown, SyncStatus::Synced(_)) => cmp::Ordering::Less, - (SyncStatus::Unknown, SyncStatus::Behind(_)) => cmp::Ordering::Less, - (SyncStatus::Unknown, SyncStatus::Unknown) => cmp::Ordering::Equal, - } - } -} - -impl PartialOrd for SyncStatus { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl PartialEq for SyncStatus { - fn eq(&self, other: &Self) -> bool { - self.cmp(other) == cmp::Ordering::Equal - } -} - -pub struct BlockWatcher { - sender: BlockWatcherSender, - /// this Mutex is locked over awaits, so we want an async lock - receiver: Mutex, - // TODO: better key - block_numbers: HashMap, - head_block_number: AtomicU64, -} - -impl fmt::Debug for BlockWatcher { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though - write!(f, "BlockWatcher(...)") - } -} - -impl BlockWatcher { - pub fn new(rpcs: Vec) -> Self { - let (sender, receiver) = mpsc::unbounded_channel(); - - let block_numbers = rpcs.into_iter().map(|rpc| (rpc, 0.into())).collect(); - - Self { - sender, - receiver: Mutex::new(receiver), - block_numbers, - head_block_number: Default::default(), - } - } - - pub fn clone_sender(&self) -> BlockWatcherSender { - self.sender.clone() - } - - pub fn sync_status(&self, rpc: &str, allowed_lag: u64) -> SyncStatus { - match ( - self.head_block_number.load(atomic::Ordering::Acquire), - self.block_numbers.get(rpc), - ) { - (0, _) => SyncStatus::Unknown, - (_, None) => SyncStatus::Unknown, - (head_block_number, Some(rpc_block_number)) => { - let rpc_block_number = rpc_block_number.load(atomic::Ordering::Acquire); - - match head_block_number.cmp(&rpc_block_number) { - cmp::Ordering::Equal => SyncStatus::Synced(0), - cmp::Ordering::Greater => { - // this probably won't happen, but it might if the block arrives at the exact wrong time - // TODO: should this be negative? - SyncStatus::Synced(0) - } - cmp::Ordering::Less => { - // allow being some behind - let lag = head_block_number - rpc_block_number; - - if lag <= allowed_lag { - SyncStatus::Synced(lag) - } else { - SyncStatus::Behind(lag) - } - } - } - } - } - } - - pub async fn run( - self: Arc, - new_block_sender: watch::Sender, - ) -> anyhow::Result<()> { - let mut receiver = self.receiver.lock().await; - - while let Some((rpc, new_block)) = receiver.recv().await { - let new_block_number = new_block.number.unwrap().as_u64(); - - { - if let Some(rpc_block_number) = self.block_numbers.get(&rpc) { - let rpc_block_number = rpc_block_number.load(atomic::Ordering::Acquire); - - // if we already have this block height - // this probably own't happen with websockets, but is likely with polling against http rpcs - // TODO: should we compare more than just height? hash too? - if rpc_block_number == new_block_number { - continue; - } - } - } - - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_secs() as i64; - - // save the block for this rpc - // TODO: store the actual chain as a graph and then have self.blocks point to that? - self.block_numbers - .get(&rpc) - .unwrap() - .swap(new_block_number, atomic::Ordering::Release); - - let head_number = self.head_block_number.load(atomic::Ordering::Acquire); - - let label_slow_heads = if head_number == 0 { - // first block seen - self.head_block_number - .swap(new_block_number, atomic::Ordering::AcqRel); - ", +".to_string() - } else { - // TODO: what if they have the same number but different hashes? - // TODO: alert if there is a large chain split? - match (new_block_number).cmp(&head_number) { - cmp::Ordering::Equal => { - // this block is already saved as the head - "".to_string() - } - cmp::Ordering::Greater => { - // new_block is the new head_block - self.head_block_number - .swap(new_block_number, atomic::Ordering::AcqRel); - ", +".to_string() - } - cmp::Ordering::Less => { - // this rpc is behind - let lag = new_block_number as i64 - head_number as i64; - - let mut s = ", ".to_string(); - - s.push_str(&lag.to_string()); - - s - } - } - }; - - // have the provider tiers update_synced_rpcs - new_block_sender.send(rpc.clone())?; - - // TODO: include time since last update? - info!( - "{:?} = {}, {}, {} sec{}", - new_block.hash.unwrap(), - new_block.number.unwrap(), - rpc, - now - new_block.timestamp.as_u64() as i64, - label_slow_heads - ); - } - - Ok(()) - } -} diff --git a/src/provider.rs b/src/connection.rs similarity index 59% rename from src/provider.rs rename to src/connection.rs index 4956c102..debff9f9 100644 --- a/src/provider.rs +++ b/src/connection.rs @@ -1,6 +1,6 @@ -///! Communicate with a web3 providers +///! Communicate with a web3 provider use derive_more::From; -use ethers::prelude::{BlockNumber, Middleware}; +use ethers::prelude::Middleware; use futures::StreamExt; use governor::clock::{QuantaClock, QuantaInstant}; use governor::middleware::NoOpMiddleware; @@ -10,13 +10,15 @@ use governor::RateLimiter; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; use std::fmt; -use std::sync::atomic::{self, AtomicU32}; +use std::hash::{Hash, Hasher}; +use std::num::NonZeroU32; +use std::sync::atomic::{self, AtomicU32, AtomicU64}; use std::time::Duration; use std::{cmp::Ordering, sync::Arc}; use tokio::time::interval; use tracing::{info, warn}; -use crate::block_watcher::BlockWatcherSender; +use crate::connections::Web3Connections; type Web3RateLimiter = RateLimiter>; @@ -62,95 +64,58 @@ pub enum Web3Provider { impl fmt::Debug for Web3Provider { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though + // TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url f.debug_struct("Web3Provider").finish_non_exhaustive() } } -/// Forward functions to the inner ethers::providers::Provider -impl Web3Provider { - /// Send a web3 request - pub async fn request( - &self, - method: &str, - params: Box, - ) -> Result { - match self { - Self::Http(provider) => provider.request(method, params).await, - Self::Ws(provider) => provider.request(method, params).await, - } - } - - /// Subscribe to new blocks - pub async fn new_heads( - &self, - url: String, - block_watcher_sender: BlockWatcherSender, - ) -> anyhow::Result<()> { - info!("Watching new_heads from {}", url); - - match &self { - Web3Provider::Http(provider) => { - // TODO: there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints - // TODO: what should this interval be? - // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now - let mut interval = interval(Duration::from_secs(2)); - - loop { - // wait for 2 seconds - interval.tick().await; - - match provider.get_block(BlockNumber::Latest).await { - Ok(Some(block)) => block_watcher_sender.send((url.clone(), block)).unwrap(), - Ok(None) => warn!("no black at {}", url), - Err(e) => warn!("getBlock at {} failed: {}", url, e), - } - } - } - Web3Provider::Ws(provider) => { - // TODO: automatically reconnect? - let mut stream = provider.subscribe_blocks().await?; - while let Some(block) = stream.next().await { - block_watcher_sender.send((url.clone(), block)).unwrap(); - } - } - } - - info!("Done watching new_heads from {}", url); - - Ok(()) - } -} - /// An active connection to a Web3Rpc pub struct Web3Connection { + /// TODO: can we get this from the provider? do we even need it? + url: String, /// keep track of currently open requests. We sort on this active_requests: AtomicU32, - provider: Arc, + provider: Web3Provider, ratelimiter: Option, /// used for load balancing to the least loaded server - soft_limit: f32, + soft_limit: u32, + head_block_number: AtomicU64, +} + +impl Hash for Web3Connection { + fn hash(&self, state: &mut H) { + self.url.hash(state); + } } impl fmt::Debug for Web3Connection { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Web3Connection").finish_non_exhaustive() + f.debug_struct("Web3Connection") + .field("url", &self.url) + .finish_non_exhaustive() } } impl Web3Connection { - pub fn clone_provider(&self) -> Arc { - self.provider.clone() - } - /// Connect to a web3 rpc and subscribe to new heads pub async fn try_new( url_str: String, http_client: Option, - block_watcher_sender: BlockWatcherSender, - hard_rate_limiter: Option, - soft_limit: f32, + hard_rate_limit: Option, + clock: Option<&QuantaClock>, + // TODO: think more about this type + soft_limit: u32, ) -> anyhow::Result { + let hard_rate_limiter = if let Some(hard_rate_limit) = hard_rate_limit { + let quota = governor::Quota::per_second(NonZeroU32::new(hard_rate_limit).unwrap()); + + let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock.unwrap()); + + Some(rate_limiter) + } else { + None + }; + let provider = if url_str.starts_with("http") { let url: url::Url = url_str.parse()?; @@ -175,27 +140,104 @@ impl Web3Connection { return Err(anyhow::anyhow!("only http and ws servers are supported")); }; - let provider = Arc::new(provider); - - // subscribe to new heads in a spawned future - let provider_clone: Arc = Arc::clone(&provider); - tokio::spawn(async move { - while let Err(e) = provider_clone - .new_heads(url_str.clone(), block_watcher_sender.clone()) - .await - { - warn!("new_heads error for {}: {:?}", url_str, e); - } - }); - Ok(Web3Connection { + url: url_str.clone(), active_requests: Default::default(), provider, ratelimiter: hard_rate_limiter, soft_limit, + head_block_number: 0.into(), }) } + pub fn active_requests(&self) -> u32 { + self.active_requests.load(atomic::Ordering::Acquire) + } + + /// Subscribe to new blocks + // #[instrument] + pub async fn new_heads( + self: Arc, + connections: Option>, + ) -> anyhow::Result<()> { + info!("Watching new_heads on {:?}", self); + + match &self.provider { + Web3Provider::Http(provider) => { + // TODO: there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints + // TODO: what should this interval be? probably some fraction of block time + // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now + let mut interval = interval(Duration::from_secs(2)); + + loop { + // wait for 2 seconds + interval.tick().await; + + match ( + &connections, + provider.get_block_number().await.map(|x| x.as_u64()), + ) { + (None, Ok(block_number)) => { + // TODO: only store if this isn't already stored? + // TODO: also send something to the provider_tier so it can sort? + self.head_block_number + .store(block_number, atomic::Ordering::SeqCst); + } + (Some(connections), Ok(block_number)) => { + let old_block_number = self + .head_block_number + .swap(block_number, atomic::Ordering::SeqCst); + + if old_block_number != block_number { + connections.update_synced_rpcs(&self, block_number)?; + } + } + (_, Err(e)) => warn!("getBlockNumber failed: {}", e), + } + } + } + Web3Provider::Ws(provider) => { + // TODO: automatically reconnect? + // TODO: it would be faster to get the block number, but subscriptions don't provide that + // TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out? + let mut stream = provider.subscribe_blocks().await?; + while let Some(block) = stream.next().await { + let block_number = block.number.unwrap().as_u64(); + + // TODO: only store if this isn't already stored? + // TODO: also send something to the provider_tier so it can sort? + let old_block_number = self + .head_block_number + .swap(block_number, atomic::Ordering::SeqCst); + + if old_block_number != block_number { + info!("new block on {:?}: {}", self, block_number); + + if let Some(connections) = &connections { + connections.update_synced_rpcs(&self, block_number)?; + } + } + } + } + } + + info!("Done watching new_heads"); + + Ok(()) + } + + /// Send a web3 request + pub async fn request( + &self, + method: &str, + params: &serde_json::value::RawValue, + ) -> Result { + match &self.provider { + Web3Provider::Http(provider) => provider.request(method, params).await, + Web3Provider::Ws(provider) => provider.request(method, params).await, + } + } + pub fn try_inc_active_requests(&self) -> Result<(), NotUntil> { // check rate limits if let Some(ratelimiter) = self.ratelimiter.as_ref() { @@ -215,14 +257,14 @@ impl Web3Connection { }; // TODO: what ordering?! - self.active_requests.fetch_add(1, atomic::Ordering::AcqRel); + self.active_requests.fetch_add(1, atomic::Ordering::SeqCst); Ok(()) } pub fn dec_active_requests(&self) { // TODO: what ordering?! - self.active_requests.fetch_sub(1, atomic::Ordering::AcqRel); + self.active_requests.fetch_sub(1, atomic::Ordering::SeqCst); } } @@ -231,12 +273,12 @@ impl Eq for Web3Connection {} impl Ord for Web3Connection { fn cmp(&self, other: &Self) -> std::cmp::Ordering { // TODO: what atomic ordering?! - let a = self.active_requests.load(atomic::Ordering::Acquire); - let b = other.active_requests.load(atomic::Ordering::Acquire); + let a = self.active_requests.load(atomic::Ordering::SeqCst); + let b = other.active_requests.load(atomic::Ordering::SeqCst); // TODO: how should we include the soft limit? floats are slower than integer math - let a = a as f32 / self.soft_limit; - let b = b as f32 / other.soft_limit; + let a = a as f32 / self.soft_limit as f32; + let b = b as f32 / other.soft_limit as f32; a.partial_cmp(&b).unwrap() } diff --git a/src/connections.rs b/src/connections.rs new file mode 100644 index 00000000..ba617bce --- /dev/null +++ b/src/connections.rs @@ -0,0 +1,297 @@ +///! Communicate with a group of web3 providers +use derive_more::From; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use fxhash::FxHashMap; +use governor::clock::{QuantaClock, QuantaInstant}; +use governor::NotUntil; +use parking_lot::RwLock; +use serde_json::value::RawValue; +use std::cmp; +use std::fmt; +use std::sync::Arc; +use tokio::sync::mpsc; +use tracing::{info, warn}; + +use crate::connection::{JsonRpcForwardedResponse, Web3Connection}; + +#[derive(Clone, Default)] +struct SyncedConnections { + head_block_number: u64, + inner: Vec>, +} + +impl SyncedConnections { + fn new(max_connections: usize) -> Self { + let inner = Vec::with_capacity(max_connections); + + Self { + head_block_number: 0, + inner, + } + } +} + +/// A collection of web3 connections. Sends requests either the current best server or all servers. +#[derive(From)] +pub struct Web3Connections { + inner: Vec>, + /// TODO: what is the best type for this? Heavy reads with writes every few seconds. When writes happen, there is a burst of them + synced_connections: RwLock, +} + +impl fmt::Debug for Web3Connections { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: the default formatter takes forever to write. this is too quiet though + f.debug_struct("Web3Connections") + .field("inner", &self.inner) + .finish_non_exhaustive() + } +} + +impl Web3Connections { + pub async fn try_new( + // TODO: servers should be a Web3ConnectionBuilder struct + servers: Vec<(&str, u32, Option)>, + http_client: Option, + clock: &QuantaClock, + ) -> anyhow::Result> { + let mut connections = vec![]; + + let num_connections = servers.len(); + + for (s, soft_rate_limit, hard_rate_limit) in servers.into_iter() { + let connection = Web3Connection::try_new( + s.to_string(), + http_client.clone(), + hard_rate_limit, + Some(clock), + soft_rate_limit, + ) + .await?; + + let connection = Arc::new(connection); + + connections.push(connection); + } + + let connections = Arc::new(Self { + inner: connections, + synced_connections: RwLock::new(SyncedConnections::new(num_connections)), + }); + + for connection in connections.inner.iter() { + // subscribe to new heads in a spawned future + let connection = Arc::clone(connection); + let connections = connections.clone(); + tokio::spawn(async move { + if let Err(e) = connection.new_heads(Some(connections)).await { + warn!("new_heads error: {:?}", e); + } + }); + } + + Ok(connections) + } + + pub async fn try_send_request( + &self, + connection: &Web3Connection, + method: &str, + params: &RawValue, + ) -> anyhow::Result { + // connection.in_active_requests was called when this rpc was selected + + let response = connection.request(method, params).await; + + connection.dec_active_requests(); + + // TODO: if "no block with that header" or some other jsonrpc errors, skip this response + + response.map_err(Into::into) + } + + pub async fn try_send_requests( + self: Arc, + connections: Vec>, + method: String, + params: Box, + response_sender: mpsc::UnboundedSender>, + ) -> anyhow::Result<()> { + let mut unordered_futures = FuturesUnordered::new(); + + for connection in connections { + // clone things so we can pass them to a future + let connections = self.clone(); + let method = method.clone(); + let params = params.clone(); + let response_sender = response_sender.clone(); + + let handle = tokio::spawn(async move { + // get the client for this rpc server + let response = connections + .try_send_request(connection.as_ref(), &method, ¶ms) + .await?; + + // send the first good response to a one shot channel. that way we respond quickly + // drop the result because errors are expected after the first send + response_sender.send(Ok(response)).map_err(Into::into) + }); + + unordered_futures.push(handle); + } + + // TODO: use iterators instead of pushing into a vec + let mut errs = vec![]; + if let Some(x) = unordered_futures.next().await { + match x.unwrap() { + Ok(_) => {} + Err(e) => { + // TODO: better errors + warn!("Got an error sending request: {}", e); + errs.push(e); + } + } + } + + // get the first error (if any) + // TODO: why collect multiple errors if we only pop one? + let e = if !errs.is_empty() { + Err(errs.pop().unwrap()) + } else { + Err(anyhow::anyhow!("no successful responses")) + }; + + // send the error to the channel + if response_sender.send(e).is_ok() { + // if we were able to send an error, then we never sent a success + return Err(anyhow::anyhow!("no successful responses")); + } else { + // if sending the error failed. the other side must be closed (which means we sent a success earlier) + Ok(()) + } + } + + pub fn update_synced_rpcs( + &self, + rpc: &Arc, + new_block: u64, + ) -> anyhow::Result<()> { + // TODO: is RwLock the best type for this? + // TODO: start with a read lock? + let mut synced_connections = self.synced_connections.write(); + + // should we load new_block here? + + match synced_connections.head_block_number.cmp(&new_block) { + cmp::Ordering::Equal => { + // this rpc is synced, but it isn't the first to this block + } + cmp::Ordering::Less => { + // this is a new head block. clear the current synced connections + // TODO: this is too verbose with a bunch of tiers. include the tier + // info!("new head block from {:?}: {}", rpc, new_block); + + synced_connections.inner.clear(); + + synced_connections.head_block_number = new_block; + } + cmp::Ordering::Greater => { + // not the latest block. return now + return Ok(()); + } + } + + let rpc = Arc::clone(rpc); + + synced_connections.inner.push(rpc); + + Ok(()) + } + + /// get the best available rpc server + pub async fn next_upstream_server( + &self, + ) -> Result, Option>> { + let mut earliest_not_until = None; + + // TODO: this clone is probably not the best way to do this + let mut synced_rpcs = self.synced_connections.read().inner.clone(); + + // i'm pretty sure i did this safely. Hash on Web3Connection just uses the url and not any of the atomics + #[allow(clippy::mutable_key_type)] + let cache: FxHashMap, u32> = synced_rpcs + .iter() + .map(|synced_rpc| (synced_rpc.clone(), synced_rpc.active_requests())) + .collect(); + + // TODO: i think we might need to load active connections and then + synced_rpcs.sort_unstable_by(|a, b| { + let a = cache.get(a).unwrap(); + let b = cache.get(b).unwrap(); + + a.cmp(b) + }); + + for selected_rpc in synced_rpcs.iter() { + // increment our connection counter + if let Err(not_until) = selected_rpc.try_inc_active_requests() { + earliest_possible(&mut earliest_not_until, not_until); + + continue; + } + + // return the selected RPC + return Ok(selected_rpc.clone()); + } + + // this might be None + Err(earliest_not_until) + } + + /// get all rpc servers that are not rate limited + /// even fetches if they aren't in sync. This is useful for broadcasting signed transactions + pub fn get_upstream_servers( + &self, + ) -> Result>, Option>> { + let mut earliest_not_until = None; + // TODO: with capacity? + let mut selected_rpcs = vec![]; + + for connection in self.inner.iter() { + // check rate limits and increment our connection counter + if let Err(not_until) = connection.try_inc_active_requests() { + earliest_possible(&mut earliest_not_until, not_until); + + // this rpc is not available. skip it + continue; + } + + selected_rpcs.push(connection.clone()); + } + + if !selected_rpcs.is_empty() { + return Ok(selected_rpcs); + } + + // return the earliest not_until (if no rpcs are synced, this will be None) + Err(earliest_not_until) + } +} + +fn earliest_possible( + earliest_not_until_option: &mut Option>, + new_not_until: NotUntil, +) { + match earliest_not_until_option.as_ref() { + None => *earliest_not_until_option = Some(new_not_until), + Some(earliest_not_until) => { + let earliest_possible = earliest_not_until.earliest_possible(); + let new_earliest_possible = new_not_until.earliest_possible(); + + if earliest_possible > new_earliest_possible { + *earliest_not_until_option = Some(new_not_until); + } + } + } +} diff --git a/src/main.rs b/src/main.rs index 888e1af5..3ca67a93 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,23 +1,20 @@ -mod block_watcher; -mod provider; -mod provider_tiers; +mod connection; +mod connections; use futures::future; use governor::clock::{Clock, QuantaClock}; use serde_json::json; -use std::collections::HashMap; use std::fmt; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{mpsc, watch}; +use tokio::sync::mpsc; use tokio::time::sleep; use tracing::warn; use warp::Filter; use warp::Reply; -use crate::block_watcher::BlockWatcher; -use crate::provider::JsonRpcRequest; -use crate::provider_tiers::Web3ProviderTier; +use crate::connection::JsonRpcRequest; +use crate::connections::Web3Connections; static APP_USER_AGENT: &str = concat!( "satoshiandkin/", @@ -33,9 +30,9 @@ struct Web3ProxyApp { /// TODO: use tokio's clock (will require a different ratelimiting crate) clock: QuantaClock, /// Send requests to the best server available - balanced_rpc_tiers: Arc>, + balanced_rpc_tiers: Vec>, /// Send private requests (like eth_sendRawTransaction) to all these servers - private_rpcs: Option>, + private_rpcs: Option>, } impl fmt::Debug for Web3ProxyApp { @@ -47,7 +44,6 @@ impl fmt::Debug for Web3ProxyApp { impl Web3ProxyApp { async fn try_new( - allowed_lag: u64, balanced_rpc_tiers: Vec)>>, private_rpcs: Vec<(&str, u32, Option)>, ) -> anyhow::Result { @@ -67,8 +63,6 @@ impl Web3ProxyApp { rpcs.push(rpc); } - let block_watcher = Arc::new(BlockWatcher::new(rpcs)); - // make a http shared client // TODO: how should we configure the connection pool? // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server @@ -78,84 +72,22 @@ impl Web3ProxyApp { .user_agent(APP_USER_AGENT) .build()?; - let balanced_rpc_tiers = Arc::new( + let balanced_rpc_tiers = future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| { - Web3ProviderTier::try_new( - balanced_rpc_tier, - Some(http_client.clone()), - block_watcher.clone(), - &clock, - ) + Web3Connections::try_new(balanced_rpc_tier, Some(http_client.clone()), &clock) })) .await .into_iter() - .collect::>>()?, - ); + .collect::>>>()?; let private_rpcs = if private_rpcs.is_empty() { warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); // TODO: instead of None, set it to a list of all the rpcs from balanced_rpc_tiers. that way we broadcast very loudly None } else { - Some(Arc::new( - Web3ProviderTier::try_new( - private_rpcs, - Some(http_client), - block_watcher.clone(), - &clock, - ) - .await?, - )) + Some(Web3Connections::try_new(private_rpcs, Some(http_client), &clock).await?) }; - let (new_block_sender, mut new_block_receiver) = watch::channel::("".to_string()); - - { - // TODO: spawn this later? - // spawn a future for the block_watcher - let block_watcher = block_watcher.clone(); - tokio::spawn(async move { block_watcher.run(new_block_sender).await }); - } - - { - // spawn a future for sorting our synced rpcs - // TODO: spawn this later? - let balanced_rpc_tiers = balanced_rpc_tiers.clone(); - let private_rpcs = private_rpcs.clone(); - let block_watcher = block_watcher.clone(); - - tokio::spawn(async move { - let mut tier_map = HashMap::new(); - let mut private_map = HashMap::new(); - - for balanced_rpc_tier in balanced_rpc_tiers.iter() { - for rpc in balanced_rpc_tier.clone_rpcs() { - tier_map.insert(rpc, balanced_rpc_tier); - } - } - - if let Some(private_rpcs) = private_rpcs { - for rpc in private_rpcs.clone_rpcs() { - private_map.insert(rpc, private_rpcs.clone()); - } - } - - while new_block_receiver.changed().await.is_ok() { - let updated_rpc = new_block_receiver.borrow().clone(); - - if let Some(tier) = tier_map.get(&updated_rpc) { - tier.update_synced_rpcs(block_watcher.clone(), allowed_lag) - .unwrap(); - } else if let Some(tier) = private_map.get(&updated_rpc) { - tier.update_synced_rpcs(block_watcher.clone(), allowed_lag) - .unwrap(); - } else { - panic!("howd this happen"); - } - } - }); - } - Ok(Web3ProxyApp { clock, balanced_rpc_tiers, @@ -175,11 +107,11 @@ impl Web3ProxyApp { // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs loop { // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit - match private_rpcs.get_upstream_servers().await { + match private_rpcs.get_upstream_servers() { Ok(upstream_servers) => { let (tx, mut rx) = mpsc::unbounded_channel(); - let connections = private_rpcs.clone_connections(); + let connections = private_rpcs.clone(); let method = json_body.method.clone(); let params = json_body.params.clone(); @@ -226,13 +158,11 @@ impl Web3ProxyApp { // TODO: what allowed lag? match balanced_rpcs.next_upstream_server().await { Ok(upstream_server) => { - let connections = balanced_rpcs.connections(); - - let response = connections + let response = balanced_rpcs .try_send_request( - upstream_server, - json_body.method.clone(), - json_body.params.clone(), + &upstream_server, + &json_body.method, + &json_body.params, ) .await; @@ -307,15 +237,13 @@ async fn main() { // TODO: be smart about about using archive nodes? have a set that doesn't use archive nodes since queries to them are more valuable let listen_port = 8445; // TODO: what should this be? 0 will cause a thundering herd - let allowed_lag = 0; let state = Web3ProxyApp::try_new( - allowed_lag, vec![ // local nodes vec![ - ("ws://10.11.12.16:8545", 68_800, None), - ("ws://10.11.12.16:8946", 152_138, None), + ("ws://127.0.0.1:8545", 68_800, None), + ("ws://127.0.0.1:8946", 152_138, None), ], // paid nodes // TODO: add paid nodes (with rate limits) @@ -324,16 +252,16 @@ async fn main() { // // moralis free (25/sec rate limit) // ], // free nodes - // vec![ - // // ("https://main-rpc.linkpool.io", 4_779, None), // linkpool is slow and often offline - // ("https://rpc.ankr.com/eth", 23_967, None), - // ], + vec![ + ("https://main-rpc.linkpool.io", 4_779, None), // linkpool is slow and often offline + ("https://rpc.ankr.com/eth", 23_967, None), + ], ], vec![ - // ("https://api.edennetwork.io/v1/", 1_805, None), - // ("https://api.edennetwork.io/v1/beta", 300, None), - // ("https://rpc.ethermine.org/", 5_861, None), - // ("https://rpc.flashbots.net", 7074, None), + ("https://api.edennetwork.io/v1/", 1_805, None), + ("https://api.edennetwork.io/v1/beta", 300, None), + ("https://rpc.ethermine.org/", 5_861, None), + ("https://rpc.flashbots.net", 7074, None), ], ) .await diff --git a/src/provider_tiers.rs b/src/provider_tiers.rs deleted file mode 100644 index b82f7a51..00000000 --- a/src/provider_tiers.rs +++ /dev/null @@ -1,303 +0,0 @@ -///! Communicate with groups of web3 providers -use arc_swap::ArcSwap; -use derive_more::From; -use futures::stream::FuturesUnordered; -use futures::StreamExt; -use governor::clock::{QuantaClock, QuantaInstant}; -use governor::NotUntil; -use serde_json::value::RawValue; -use std::collections::HashMap; -use std::fmt; -use std::num::NonZeroU32; -use std::sync::Arc; -use tokio::sync::mpsc; -use tracing::warn; - -use crate::block_watcher::{BlockWatcher, SyncStatus}; -use crate::provider::{JsonRpcForwardedResponse, Web3Connection}; - -#[derive(From)] -pub struct Web3Connections(HashMap); - -impl Web3Connections { - pub fn get(&self, rpc: &str) -> Option<&Web3Connection> { - self.0.get(rpc) - } - - pub async fn try_send_request( - &self, - rpc: String, - method: String, - params: Box, - ) -> anyhow::Result { - let connection = self.get(&rpc).unwrap(); - - // TODO: do we need this clone or can we do a reference? - let provider = connection.clone_provider(); - - let response = provider.request(&method, params).await; - - connection.dec_active_requests(); - - // TODO: if "no block with that header" or some other jsonrpc errors, skip this response - - response.map_err(Into::into) - } - - pub async fn try_send_requests( - self: Arc, - rpc_servers: Vec, - method: String, - params: Box, - // TODO: i think this should actually be a oneshot - response_sender: mpsc::UnboundedSender>, - ) -> anyhow::Result<()> { - let method = Arc::new(method); - - let mut unordered_futures = FuturesUnordered::new(); - - for rpc in rpc_servers { - let connections = self.clone(); - let method = method.to_string(); - let params = params.clone(); - let response_sender = response_sender.clone(); - - let handle = tokio::spawn(async move { - // get the client for this rpc server - let response = connections.try_send_request(rpc, method, params).await?; - - // send the first good response to a one shot channel. that way we respond quickly - // drop the result because errors are expected after the first send - response_sender.send(Ok(response)).map_err(Into::into) - }); - - unordered_futures.push(handle); - } - - // TODO: use iterators instead of pushing into a vec - let mut errs = vec![]; - if let Some(x) = unordered_futures.next().await { - match x.unwrap() { - Ok(_) => {} - Err(e) => { - // TODO: better errors - warn!("Got an error sending request: {}", e); - errs.push(e); - } - } - } - - // get the first error (if any) - let e = if !errs.is_empty() { - Err(errs.pop().unwrap()) - } else { - Err(anyhow::anyhow!("no successful responses")) - }; - - // send the error to the channel - if response_sender.send(e).is_ok() { - // if we were able to send an error, then we never sent a success - return Err(anyhow::anyhow!("no successful responses")); - } else { - // if sending the error failed. the other side must be closed (which means we sent a success earlier) - Ok(()) - } - } -} - -/// Load balance to the rpc -pub struct Web3ProviderTier { - /// TODO: what type for the rpc? Vec isn't great. i think we want this to be the key for the provider and not the provider itself - /// TODO: we probably want a better lock - synced_rpcs: ArcSwap>, - rpcs: Vec, - connections: Arc, -} - -impl fmt::Debug for Web3ProviderTier { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though - f.debug_struct("Web3ProviderTier").finish_non_exhaustive() - } -} - -impl Web3ProviderTier { - pub async fn try_new( - servers: Vec<(&str, u32, Option)>, - http_client: Option, - block_watcher: Arc, - clock: &QuantaClock, - ) -> anyhow::Result { - let mut rpcs: Vec = vec![]; - let mut connections = HashMap::new(); - - for (s, soft_limit, hard_limit) in servers.into_iter() { - rpcs.push(s.to_string()); - - let hard_rate_limiter = if let Some(hard_limit) = hard_limit { - let quota = governor::Quota::per_second(NonZeroU32::new(hard_limit).unwrap()); - - let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock); - - Some(rate_limiter) - } else { - None - }; - - let connection = Web3Connection::try_new( - s.to_string(), - http_client.clone(), - block_watcher.clone_sender(), - hard_rate_limiter, - soft_limit as f32, - ) - .await?; - - connections.insert(s.to_string(), connection); - } - - Ok(Web3ProviderTier { - synced_rpcs: ArcSwap::from(Arc::new(vec![])), - rpcs, - connections: Arc::new(connections.into()), - }) - } - - pub fn connections(&self) -> &Web3Connections { - &self.connections - } - - pub fn clone_connections(&self) -> Arc { - self.connections.clone() - } - - pub fn clone_rpcs(&self) -> Vec { - self.rpcs.clone() - } - - pub fn update_synced_rpcs( - &self, - block_watcher: Arc, - allowed_lag: u64, - ) -> anyhow::Result<()> { - let mut available_rpcs = self.rpcs.clone(); - - // collect sync status for all the rpcs - let sync_status: HashMap = available_rpcs - .clone() - .into_iter() - .map(|rpc| { - let status = block_watcher.sync_status(&rpc, allowed_lag); - (rpc, status) - }) - .collect(); - - // sort rpcs by their sync status - // TODO: if we only changed one entry, we don't need to sort the whole thing. we can do this better - available_rpcs.sort_unstable_by(|a, b| { - let a_synced = sync_status.get(a).unwrap(); - let b_synced = sync_status.get(b).unwrap(); - - a_synced.cmp(b_synced) - }); - - // filter out unsynced rpcs - let synced_rpcs: Vec = available_rpcs - .into_iter() - .take_while(|rpc| matches!(sync_status.get(rpc).unwrap(), SyncStatus::Synced(_))) - .collect(); - - // TODO: is arcswap the best type for this? - self.synced_rpcs.swap(Arc::new(synced_rpcs)); - - Ok(()) - } - - /// get the best available rpc server - pub async fn next_upstream_server(&self) -> Result>> { - let mut earliest_not_until = None; - - // TODO: this clone is probably not the best way to do this - let mut synced_rpcs = Vec::clone(&*self.synced_rpcs.load()); - - // TODO: we don't want to sort on active connections. we want to sort on remaining capacity for connections. for example, geth can handle more than erigon - synced_rpcs.sort_unstable_by(|a, b| { - let a = self.connections.get(a).unwrap(); - let b = self.connections.get(b).unwrap(); - - // sort on active connections - a.cmp(b) - }); - - for selected_rpc in synced_rpcs.iter() { - // increment our connection counter - if let Err(not_until) = self - .connections - .get(selected_rpc) - .unwrap() - .try_inc_active_requests() - { - // TODO: do this better - if earliest_not_until.is_none() { - earliest_not_until = Some(not_until); - } else { - let earliest_possible = - earliest_not_until.as_ref().unwrap().earliest_possible(); - let new_earliest_possible = not_until.earliest_possible(); - - if earliest_possible > new_earliest_possible { - earliest_not_until = Some(not_until); - } - } - continue; - } - - // return the selected RPC - return Ok(selected_rpc.clone()); - } - - // this might be None - Err(earliest_not_until) - } - - /// get all available rpc servers - pub async fn get_upstream_servers( - &self, - ) -> Result, Option>> { - let mut earliest_not_until = None; - let mut selected_rpcs = vec![]; - for selected_rpc in self.synced_rpcs.load().iter() { - // check rate limits and increment our connection counter - // TODO: share code with next_upstream_server - if let Err(not_until) = self - .connections - .get(selected_rpc) - .unwrap() - .try_inc_active_requests() - { - if earliest_not_until.is_none() { - earliest_not_until = Some(not_until); - } else { - let earliest_possible = - earliest_not_until.as_ref().unwrap().earliest_possible(); - let new_earliest_possible = not_until.earliest_possible(); - - if earliest_possible > new_earliest_possible { - earliest_not_until = Some(not_until); - } - } - continue; - } - - // this is rpc should work - selected_rpcs.push(selected_rpc.clone()); - } - - if !selected_rpcs.is_empty() { - return Ok(selected_rpcs); - } - - // return the earliest not_until (if no rpcs are synced, this will be None) - Err(earliest_not_until) - } -}