diff --git a/src/block_watcher.rs b/src/block_watcher.rs index 40342b6a..e408ab56 100644 --- a/src/block_watcher.rs +++ b/src/block_watcher.rs @@ -1,115 +1,86 @@ +///! Track the head block of all the web3 providers use ethers::prelude::{Block, TxHash}; -use governor::clock::{Clock, QuantaClock, QuantaInstant}; +use std::cmp::Ordering; use std::collections::HashMap; +use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; -use tokio::time::{interval, Duration}; -use tracing::{info, warn}; +use tracing::info; // TODO: what type for the Item? String url works, but i don't love it -// TODO: think about boxing this -#[derive(Debug)] -pub enum BlockWatcherItem { - NewHead(Box<(String, Block)>), - SubscribeHttp(String), - Interval, -} +pub type NewHead = (String, Block); -pub type BlockWatcherSender = mpsc::UnboundedSender; -pub type BlockWatcherReceiver = mpsc::UnboundedReceiver; +pub type BlockWatcherSender = mpsc::UnboundedSender; +pub type BlockWatcherReceiver = mpsc::UnboundedReceiver; pub struct BlockWatcher { - clock: QuantaClock, - sender: BlockWatcherSender, receiver: BlockWatcherReceiver, - last_poll: QuantaInstant, /// TODO: i don't think we want a hashmap. we want a left-right or some other concurrent map blocks: HashMap>, + latest_block: Option>, } impl BlockWatcher { - pub fn new(clock: QuantaClock) -> (BlockWatcher, BlockWatcherSender) { + pub fn new() -> (BlockWatcher, BlockWatcherSender) { // TODO: this also needs to return a reader for blocks let (sender, receiver) = mpsc::unbounded_channel(); - let last_poll = clock.now(); - let watcher = Self { - clock, - last_poll, - sender: sender.clone(), receiver, blocks: Default::default(), + latest_block: None, }; (watcher, sender) } pub async fn run(&mut self) -> anyhow::Result<()> { - // TODO: we should probably set this to something like blocktime / 3 - let mut poll_interval = interval(Duration::from_secs(2)); - // TODO: think more about what missed tick behavior we want - // interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + while let Some((rpc, block)) = self.receiver.recv().await { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs() as i64; - // TODO: only do this if we have http providers to watch - let interval_sender = self.sender.clone(); - tokio::spawn(async move { - loop { - poll_interval.tick().await; + let current_block = self.blocks.get(&rpc); - interval_sender - .send(BlockWatcherItem::Interval) - .expect("sending BlockWatcherItem::Interval failed"); + if current_block == Some(&block) { + // we already have this block + continue; } - }); - // don't poll faster than every second - let min_wait_nanos = 1_000_000_000.into(); + let label_slow_blocks = if self.latest_block.is_none() { + self.latest_block = Some(block.clone()); + "+" + } else { + let latest_block = self.latest_block.as_ref().unwrap(); - while let Some(x) = self.receiver.recv().await { - match x { - BlockWatcherItem::Interval => { - if self.clock.now() < self.last_poll + min_wait_nanos { - // we already updated recently - continue; + // TODO: what if they have the same number but different hashes? or aren't on the same chain? + match block.number.cmp(&latest_block.number) { + Ordering::Equal => "", + Ordering::Greater => { + self.latest_block = Some(block.clone()); + "+" } + Ordering::Less => { + // TODO: include how many blocks behind? + "-" + } + } + }; - // TODO: we got an interval. if we haven't updated the blocks recently, - info!("TODO: query all the subscribed http providers") - } - BlockWatcherItem::NewHead(new_head) => { - let (rpc, block) = *new_head; - info!( - "{:?} = {} Ts: {:?}, block number: {}", - block.hash.unwrap(), - rpc, - block.timestamp, - block.number.unwrap(), - ); - self.blocks.insert(rpc, block); - - self.sender - .send(BlockWatcherItem::Interval) - .expect("sending BlockWatcherItem::Interval failed"); - } - BlockWatcherItem::SubscribeHttp(rpc) => { - warn!("subscribing to {} is not yet supported", rpc); - } - } + // TODO: include time since last update? + info!( + "{:?} = {} Ts: {:?}, block number: {}, age: {}s {}", + block.hash.unwrap(), + rpc, + // TODO: human readable time? + block.timestamp, + block.number.unwrap(), + now - block.timestamp.as_u64() as i64, + label_slow_blocks + ); + self.blocks.insert(rpc, block); } Ok(()) } } - -/* -pub async fn save_block( - &self, - url: String, - block_watcher_sender: BlockWatcherSender, - block: Block, -) -> anyhow::Result<()> { - // TODO: include the block age (compared to local time) in this, too - // TODO: use tracing properly - -} -*/ diff --git a/src/main.rs b/src/main.rs index 6c16947c..908cd7d0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,9 @@ -// TODO: don't use RwLock. i think we need a concurrent hashmap or we will hit all sorts of deadlocks mod block_watcher; mod provider; +mod provider_tiers; use futures::future; -use governor::clock::{Clock, QuantaClock, QuantaInstant}; -use governor::middleware::NoOpMiddleware; -use governor::state::{InMemoryState, NotKeyed}; -use governor::NotUntil; -use governor::RateLimiter; -use std::collections::HashMap; -use std::num::NonZeroU32; +use governor::clock::{Clock, QuantaClock}; use std::sync::Arc; use std::time::Duration; use tokio::sync::{mpsc, RwLock}; @@ -18,8 +12,8 @@ use tracing::log::warn; use warp::Filter; // use crate::types::{BlockMap, ConnectionsMap, RpcRateLimiterMap}; -use crate::block_watcher::{BlockWatcher, BlockWatcherSender}; -use crate::provider::Web3Connection; +use crate::block_watcher::BlockWatcher; +use crate::provider_tiers::{Web3ConnectionMap, Web3ProviderTier}; static APP_USER_AGENT: &str = concat!( "satoshiandkin/", @@ -28,192 +22,15 @@ static APP_USER_AGENT: &str = concat!( env!("CARGO_PKG_VERSION"), ); -type RpcRateLimiter = - RateLimiter>; - -type RpcRateLimiterMap = RwLock>; -type ConnectionsMap = RwLock>; - -/// Load balance to the rpc -/// TODO: i'm not sure about having 3 locks here. can we share them? -struct RpcTier { - /// RPC urls sorted by active requests - /// TODO: what type for the rpc? - rpcs: RwLock>, - connections: Arc, - ratelimits: RpcRateLimiterMap, -} - -impl RpcTier { - async fn try_new( - servers: Vec<(&str, u32)>, - http_client: Option, - block_watcher_sender: BlockWatcherSender, - clock: &QuantaClock, - ) -> anyhow::Result { - let mut rpcs: Vec = vec![]; - let mut connections = HashMap::new(); - let mut ratelimits = HashMap::new(); - - for (s, limit) in servers.into_iter() { - rpcs.push(s.to_string()); - - let connection = Web3Connection::try_new( - s.to_string(), - http_client.clone(), - block_watcher_sender.clone(), - ) - .await?; - - connections.insert(s.to_string(), connection); - - if limit > 0 { - let quota = governor::Quota::per_second(NonZeroU32::new(limit).unwrap()); - - let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock); - - ratelimits.insert(s.to_string(), rate_limiter); - } - } - - Ok(RpcTier { - rpcs: RwLock::new(rpcs), - connections: Arc::new(RwLock::new(connections)), - ratelimits: RwLock::new(ratelimits), - }) - } - - /// get the best available rpc server - async fn next_upstream_server(&self) -> Result> { - let mut balanced_rpcs = self.rpcs.write().await; - - // sort rpcs by their active connections - let connections = self.connections.read().await; - - balanced_rpcs - .sort_unstable_by(|a, b| connections.get(a).unwrap().cmp(connections.get(b).unwrap())); - - let mut earliest_not_until = None; - - for selected_rpc in balanced_rpcs.iter() { - // TODO: check current block number. if behind, make our own NotUntil here - let ratelimits = self.ratelimits.write().await; - - // check rate limits - match ratelimits.get(selected_rpc).unwrap().check() { - Ok(_) => { - // rate limit succeeded - } - Err(not_until) => { - // rate limit failed - // save the smallest not_until. if nothing succeeds, return an Err with not_until in it - if earliest_not_until.is_none() { - earliest_not_until = Some(not_until); - } else { - let earliest_possible = - earliest_not_until.as_ref().unwrap().earliest_possible(); - let new_earliest_possible = not_until.earliest_possible(); - - if earliest_possible > new_earliest_possible { - earliest_not_until = Some(not_until); - } - } - continue; - } - }; - - // increment our connection counter - self.connections - .write() - .await - .get_mut(selected_rpc) - .unwrap() - .inc_active_requests(); - - // return the selected RPC - return Ok(selected_rpc.clone()); - } - - // return the smallest not_until - if let Some(not_until) = earliest_not_until { - Err(not_until) - } else { - unimplemented!(); - } - } - - /// get all available rpc servers - async fn get_upstream_servers(&self) -> Result, NotUntil> { - let mut earliest_not_until = None; - - let mut selected_rpcs = vec![]; - - for selected_rpc in self.rpcs.read().await.iter() { - // check rate limits - match self - .ratelimits - .write() - .await - .get(selected_rpc) - .unwrap() - .check() - { - Ok(_) => { - // rate limit succeeded - } - Err(not_until) => { - // rate limit failed - // save the smallest not_until. if nothing succeeds, return an Err with not_until in it - if earliest_not_until.is_none() { - earliest_not_until = Some(not_until); - } else { - let earliest_possible = - earliest_not_until.as_ref().unwrap().earliest_possible(); - let new_earliest_possible = not_until.earliest_possible(); - - if earliest_possible > new_earliest_possible { - earliest_not_until = Some(not_until); - } - } - continue; - } - }; - - // increment our connection counter - self.connections - .write() - .await - .get_mut(selected_rpc) - .unwrap() - .inc_active_requests(); - - // this is rpc should work - selected_rpcs.push(selected_rpc.clone()); - } - - if !selected_rpcs.is_empty() { - return Ok(selected_rpcs); - } - - // return the earliest not_until - if let Some(not_until) = earliest_not_until { - Err(not_until) - } else { - // TODO: is this right? - Ok(vec![]) - } - } -} - /// The application struct Web3ProxyApp { /// clock used for rate limiting /// TODO: use tokio's clock (will require a different ratelimiting crate) clock: QuantaClock, /// Send requests to the best server available - balanced_rpc_tiers: Arc>, + balanced_rpc_tiers: Arc>, /// Send private requests (like eth_sendRawTransaction) to all these servers - private_rpcs: Option>, + private_rpcs: Option>, /// write lock on these when all rate limits are hit balanced_rpc_ratelimiter_lock: RwLock<()>, private_rpcs_ratelimiter_lock: RwLock<()>, @@ -226,7 +43,7 @@ impl Web3ProxyApp { ) -> anyhow::Result { let clock = QuantaClock::default(); - let (mut block_watcher, block_watcher_sender) = BlockWatcher::new(clock.clone()); + let (mut block_watcher, block_watcher_sender) = BlockWatcher::new(); // make a http shared client // TODO: how should we configure the connection pool? @@ -241,7 +58,7 @@ impl Web3ProxyApp { let balanced_rpc_tiers = Arc::new( future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| { - RpcTier::try_new( + Web3ProviderTier::try_new( balanced_rpc_tier, Some(http_client.clone()), block_watcher_sender.clone(), @@ -250,14 +67,14 @@ impl Web3ProxyApp { })) .await .into_iter() - .collect::>>()?, + .collect::>>()?, ); let private_rpcs = if private_rpcs.is_empty() { None } else { Some(Arc::new( - RpcTier::try_new( + Web3ProviderTier::try_new( private_rpcs, Some(http_client), block_watcher_sender, @@ -300,7 +117,7 @@ impl Web3ProxyApp { mpsc::unbounded_channel::>(); let clone = self.clone(); - let connections = private_rpcs.connections.clone(); + let connections = private_rpcs.clone_connections(); let json_body = json_body.clone(); tokio::spawn(async move { @@ -349,7 +166,7 @@ impl Web3ProxyApp { mpsc::unbounded_channel::>(); let clone = self.clone(); - let connections = balanced_rpcs.connections.clone(); + let connections = balanced_rpcs.clone_connections(); let json_body = json_body.clone(); tokio::spawn(async move { @@ -413,7 +230,7 @@ impl Web3ProxyApp { async fn try_send_requests( &self, rpc_servers: Vec, - connections: Arc, + connections: Arc, json_request_body: serde_json::Value, tx: mpsc::UnboundedSender>, ) -> anyhow::Result<()> { diff --git a/src/provider.rs b/src/provider.rs index 3668bf1b..8003d1af 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -1,3 +1,4 @@ +///! Communicate with a web3 providers use derive_more::From; use ethers::prelude::Middleware; use futures::StreamExt; @@ -5,7 +6,7 @@ use std::time::Duration; use std::{cmp::Ordering, sync::Arc}; use tracing::{info, warn}; -use crate::block_watcher::{BlockWatcherItem, BlockWatcherSender}; +use crate::block_watcher::BlockWatcherSender; // TODO: instead of an enum, I tried to use Box, but hit https://github.com/gakonst/ethers-rs/issues/592 #[derive(From)] @@ -49,16 +50,12 @@ impl Web3Provider { .unwrap(); } */ - block_watcher_sender - .send(BlockWatcherItem::SubscribeHttp(url.clone())) - .unwrap(); + info!("work in progress"); } Web3Provider::Ws(provider) => { let mut stream = provider.subscribe_blocks().await?; while let Some(block) = stream.next().await { - block_watcher_sender - .send(BlockWatcherItem::NewHead(Box::new((url.clone(), block)))) - .unwrap(); + block_watcher_sender.send((url.clone(), block)).unwrap(); } } } diff --git a/src/provider_tiers.rs b/src/provider_tiers.rs new file mode 100644 index 00000000..eb281fa8 --- /dev/null +++ b/src/provider_tiers.rs @@ -0,0 +1,195 @@ +///! Communicate with groups of web3 providers +use governor::clock::{QuantaClock, QuantaInstant}; +use governor::middleware::NoOpMiddleware; +use governor::state::{InMemoryState, NotKeyed}; +use governor::NotUntil; +use governor::RateLimiter; +use std::collections::HashMap; +use std::num::NonZeroU32; +use std::sync::Arc; +use tokio::sync::RwLock; + +use crate::block_watcher::BlockWatcherSender; +use crate::provider::Web3Connection; + +type Web3RateLimiter = + RateLimiter>; + +type Web3RateLimiterMap = RwLock>; + +pub type Web3ConnectionMap = RwLock>; + +/// Load balance to the rpc +/// TODO: i'm not sure about having 3 locks here. can we share them? +pub struct Web3ProviderTier { + /// RPC urls sorted by active requests + /// TODO: what type for the rpc? + rpcs: RwLock>, + connections: Arc, + ratelimits: Web3RateLimiterMap, +} + +impl Web3ProviderTier { + pub async fn try_new( + servers: Vec<(&str, u32)>, + http_client: Option, + block_watcher_sender: BlockWatcherSender, + clock: &QuantaClock, + ) -> anyhow::Result { + let mut rpcs: Vec = vec![]; + let mut connections = HashMap::new(); + let mut ratelimits = HashMap::new(); + + for (s, limit) in servers.into_iter() { + rpcs.push(s.to_string()); + + let connection = Web3Connection::try_new( + s.to_string(), + http_client.clone(), + block_watcher_sender.clone(), + ) + .await?; + + connections.insert(s.to_string(), connection); + + if limit > 0 { + let quota = governor::Quota::per_second(NonZeroU32::new(limit).unwrap()); + + let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock); + + ratelimits.insert(s.to_string(), rate_limiter); + } + } + + Ok(Web3ProviderTier { + rpcs: RwLock::new(rpcs), + connections: Arc::new(RwLock::new(connections)), + ratelimits: RwLock::new(ratelimits), + }) + } + + pub fn clone_connections(&self) -> Arc { + self.connections.clone() + } + + /// get the best available rpc server + pub async fn next_upstream_server(&self) -> Result> { + let mut balanced_rpcs = self.rpcs.write().await; + + // sort rpcs by their active connections + let connections = self.connections.read().await; + + balanced_rpcs + .sort_unstable_by(|a, b| connections.get(a).unwrap().cmp(connections.get(b).unwrap())); + + let mut earliest_not_until = None; + + for selected_rpc in balanced_rpcs.iter() { + // TODO: check current block number. if behind, make our own NotUntil here + let ratelimits = self.ratelimits.write().await; + + // check rate limits + match ratelimits.get(selected_rpc).unwrap().check() { + Ok(_) => { + // rate limit succeeded + } + Err(not_until) => { + // rate limit failed + // save the smallest not_until. if nothing succeeds, return an Err with not_until in it + if earliest_not_until.is_none() { + earliest_not_until = Some(not_until); + } else { + let earliest_possible = + earliest_not_until.as_ref().unwrap().earliest_possible(); + let new_earliest_possible = not_until.earliest_possible(); + + if earliest_possible > new_earliest_possible { + earliest_not_until = Some(not_until); + } + } + continue; + } + }; + + // increment our connection counter + self.connections + .write() + .await + .get_mut(selected_rpc) + .unwrap() + .inc_active_requests(); + + // return the selected RPC + return Ok(selected_rpc.clone()); + } + + // return the smallest not_until + if let Some(not_until) = earliest_not_until { + Err(not_until) + } else { + unimplemented!(); + } + } + + /// get all available rpc servers + pub async fn get_upstream_servers(&self) -> Result, NotUntil> { + let mut earliest_not_until = None; + + let mut selected_rpcs = vec![]; + + for selected_rpc in self.rpcs.read().await.iter() { + // check rate limits + match self + .ratelimits + .write() + .await + .get(selected_rpc) + .unwrap() + .check() + { + Ok(_) => { + // rate limit succeeded + } + Err(not_until) => { + // rate limit failed + // save the smallest not_until. if nothing succeeds, return an Err with not_until in it + if earliest_not_until.is_none() { + earliest_not_until = Some(not_until); + } else { + let earliest_possible = + earliest_not_until.as_ref().unwrap().earliest_possible(); + let new_earliest_possible = not_until.earliest_possible(); + + if earliest_possible > new_earliest_possible { + earliest_not_until = Some(not_until); + } + } + continue; + } + }; + + // increment our connection counter + self.connections + .write() + .await + .get_mut(selected_rpc) + .unwrap() + .inc_active_requests(); + + // this is rpc should work + selected_rpcs.push(selected_rpc.clone()); + } + + if !selected_rpcs.is_empty() { + return Ok(selected_rpcs); + } + + // return the earliest not_until + if let Some(not_until) = earliest_not_until { + Err(not_until) + } else { + // TODO: is this right? + Ok(vec![]) + } + } +}