more refactoring

This commit is contained in:
Bryan Stitt 2022-04-26 23:53:58 +00:00
parent bbc2e8d3d7
commit 83a460af82
4 changed files with 260 additions and 280 deletions

View File

@ -1,115 +1,86 @@
///! Track the head block of all the web3 providers
use ethers::prelude::{Block, TxHash}; use ethers::prelude::{Block, TxHash};
use governor::clock::{Clock, QuantaClock, QuantaInstant}; use std::cmp::Ordering;
use std::collections::HashMap; use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::time::{interval, Duration}; use tracing::info;
use tracing::{info, warn};
// TODO: what type for the Item? String url works, but i don't love it // TODO: what type for the Item? String url works, but i don't love it
// TODO: think about boxing this pub type NewHead = (String, Block<TxHash>);
#[derive(Debug)]
pub enum BlockWatcherItem {
NewHead(Box<(String, Block<TxHash>)>),
SubscribeHttp(String),
Interval,
}
pub type BlockWatcherSender = mpsc::UnboundedSender<BlockWatcherItem>; pub type BlockWatcherSender = mpsc::UnboundedSender<NewHead>;
pub type BlockWatcherReceiver = mpsc::UnboundedReceiver<BlockWatcherItem>; pub type BlockWatcherReceiver = mpsc::UnboundedReceiver<NewHead>;
pub struct BlockWatcher { pub struct BlockWatcher {
clock: QuantaClock,
sender: BlockWatcherSender,
receiver: BlockWatcherReceiver, receiver: BlockWatcherReceiver,
last_poll: QuantaInstant,
/// TODO: i don't think we want a hashmap. we want a left-right or some other concurrent map /// TODO: i don't think we want a hashmap. we want a left-right or some other concurrent map
blocks: HashMap<String, Block<TxHash>>, blocks: HashMap<String, Block<TxHash>>,
latest_block: Option<Block<TxHash>>,
} }
impl BlockWatcher { impl BlockWatcher {
pub fn new(clock: QuantaClock) -> (BlockWatcher, BlockWatcherSender) { pub fn new() -> (BlockWatcher, BlockWatcherSender) {
// TODO: this also needs to return a reader for blocks // TODO: this also needs to return a reader for blocks
let (sender, receiver) = mpsc::unbounded_channel(); let (sender, receiver) = mpsc::unbounded_channel();
let last_poll = clock.now();
let watcher = Self { let watcher = Self {
clock,
last_poll,
sender: sender.clone(),
receiver, receiver,
blocks: Default::default(), blocks: Default::default(),
latest_block: None,
}; };
(watcher, sender) (watcher, sender)
} }
pub async fn run(&mut self) -> anyhow::Result<()> { pub async fn run(&mut self) -> anyhow::Result<()> {
// TODO: we should probably set this to something like blocktime / 3 while let Some((rpc, block)) = self.receiver.recv().await {
let mut poll_interval = interval(Duration::from_secs(2)); let now = SystemTime::now()
// TODO: think more about what missed tick behavior we want .duration_since(UNIX_EPOCH)
// interval.set_missed_tick_behavior(MissedTickBehavior::Skip); .expect("Time went backwards")
.as_secs() as i64;
// TODO: only do this if we have http providers to watch let current_block = self.blocks.get(&rpc);
let interval_sender = self.sender.clone();
tokio::spawn(async move {
loop {
poll_interval.tick().await;
interval_sender if current_block == Some(&block) {
.send(BlockWatcherItem::Interval) // we already have this block
.expect("sending BlockWatcherItem::Interval failed"); continue;
} }
});
// don't poll faster than every second let label_slow_blocks = if self.latest_block.is_none() {
let min_wait_nanos = 1_000_000_000.into(); self.latest_block = Some(block.clone());
"+"
} else {
let latest_block = self.latest_block.as_ref().unwrap();
while let Some(x) = self.receiver.recv().await { // TODO: what if they have the same number but different hashes? or aren't on the same chain?
match x { match block.number.cmp(&latest_block.number) {
BlockWatcherItem::Interval => { Ordering::Equal => "",
if self.clock.now() < self.last_poll + min_wait_nanos { Ordering::Greater => {
// we already updated recently self.latest_block = Some(block.clone());
continue; "+"
} }
Ordering::Less => {
// TODO: include how many blocks behind?
"-"
}
}
};
// TODO: we got an interval. if we haven't updated the blocks recently, // TODO: include time since last update?
info!("TODO: query all the subscribed http providers") info!(
} "{:?} = {} Ts: {:?}, block number: {}, age: {}s {}",
BlockWatcherItem::NewHead(new_head) => { block.hash.unwrap(),
let (rpc, block) = *new_head; rpc,
info!( // TODO: human readable time?
"{:?} = {} Ts: {:?}, block number: {}", block.timestamp,
block.hash.unwrap(), block.number.unwrap(),
rpc, now - block.timestamp.as_u64() as i64,
block.timestamp, label_slow_blocks
block.number.unwrap(), );
); self.blocks.insert(rpc, block);
self.blocks.insert(rpc, block);
self.sender
.send(BlockWatcherItem::Interval)
.expect("sending BlockWatcherItem::Interval failed");
}
BlockWatcherItem::SubscribeHttp(rpc) => {
warn!("subscribing to {} is not yet supported", rpc);
}
}
} }
Ok(()) Ok(())
} }
} }
/*
pub async fn save_block(
&self,
url: String,
block_watcher_sender: BlockWatcherSender,
block: Block<TxHash>,
) -> anyhow::Result<()> {
// TODO: include the block age (compared to local time) in this, too
// TODO: use tracing properly
}
*/

View File

@ -1,15 +1,9 @@
// TODO: don't use RwLock<HashMap>. i think we need a concurrent hashmap or we will hit all sorts of deadlocks
mod block_watcher; mod block_watcher;
mod provider; mod provider;
mod provider_tiers;
use futures::future; use futures::future;
use governor::clock::{Clock, QuantaClock, QuantaInstant}; use governor::clock::{Clock, QuantaClock};
use governor::middleware::NoOpMiddleware;
use governor::state::{InMemoryState, NotKeyed};
use governor::NotUntil;
use governor::RateLimiter;
use std::collections::HashMap;
use std::num::NonZeroU32;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::{mpsc, RwLock}; use tokio::sync::{mpsc, RwLock};
@ -18,8 +12,8 @@ use tracing::log::warn;
use warp::Filter; use warp::Filter;
// use crate::types::{BlockMap, ConnectionsMap, RpcRateLimiterMap}; // use crate::types::{BlockMap, ConnectionsMap, RpcRateLimiterMap};
use crate::block_watcher::{BlockWatcher, BlockWatcherSender}; use crate::block_watcher::BlockWatcher;
use crate::provider::Web3Connection; use crate::provider_tiers::{Web3ConnectionMap, Web3ProviderTier};
static APP_USER_AGENT: &str = concat!( static APP_USER_AGENT: &str = concat!(
"satoshiandkin/", "satoshiandkin/",
@ -28,192 +22,15 @@ static APP_USER_AGENT: &str = concat!(
env!("CARGO_PKG_VERSION"), env!("CARGO_PKG_VERSION"),
); );
type RpcRateLimiter =
RateLimiter<NotKeyed, InMemoryState, QuantaClock, NoOpMiddleware<QuantaInstant>>;
type RpcRateLimiterMap = RwLock<HashMap<String, RpcRateLimiter>>;
type ConnectionsMap = RwLock<HashMap<String, Web3Connection>>;
/// Load balance to the rpc
/// TODO: i'm not sure about having 3 locks here. can we share them?
struct RpcTier {
/// RPC urls sorted by active requests
/// TODO: what type for the rpc?
rpcs: RwLock<Vec<String>>,
connections: Arc<ConnectionsMap>,
ratelimits: RpcRateLimiterMap,
}
impl RpcTier {
async fn try_new(
servers: Vec<(&str, u32)>,
http_client: Option<reqwest::Client>,
block_watcher_sender: BlockWatcherSender,
clock: &QuantaClock,
) -> anyhow::Result<RpcTier> {
let mut rpcs: Vec<String> = vec![];
let mut connections = HashMap::new();
let mut ratelimits = HashMap::new();
for (s, limit) in servers.into_iter() {
rpcs.push(s.to_string());
let connection = Web3Connection::try_new(
s.to_string(),
http_client.clone(),
block_watcher_sender.clone(),
)
.await?;
connections.insert(s.to_string(), connection);
if limit > 0 {
let quota = governor::Quota::per_second(NonZeroU32::new(limit).unwrap());
let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock);
ratelimits.insert(s.to_string(), rate_limiter);
}
}
Ok(RpcTier {
rpcs: RwLock::new(rpcs),
connections: Arc::new(RwLock::new(connections)),
ratelimits: RwLock::new(ratelimits),
})
}
/// get the best available rpc server
async fn next_upstream_server(&self) -> Result<String, NotUntil<QuantaInstant>> {
let mut balanced_rpcs = self.rpcs.write().await;
// sort rpcs by their active connections
let connections = self.connections.read().await;
balanced_rpcs
.sort_unstable_by(|a, b| connections.get(a).unwrap().cmp(connections.get(b).unwrap()));
let mut earliest_not_until = None;
for selected_rpc in balanced_rpcs.iter() {
// TODO: check current block number. if behind, make our own NotUntil here
let ratelimits = self.ratelimits.write().await;
// check rate limits
match ratelimits.get(selected_rpc).unwrap().check() {
Ok(_) => {
// rate limit succeeded
}
Err(not_until) => {
// rate limit failed
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it
if earliest_not_until.is_none() {
earliest_not_until = Some(not_until);
} else {
let earliest_possible =
earliest_not_until.as_ref().unwrap().earliest_possible();
let new_earliest_possible = not_until.earliest_possible();
if earliest_possible > new_earliest_possible {
earliest_not_until = Some(not_until);
}
}
continue;
}
};
// increment our connection counter
self.connections
.write()
.await
.get_mut(selected_rpc)
.unwrap()
.inc_active_requests();
// return the selected RPC
return Ok(selected_rpc.clone());
}
// return the smallest not_until
if let Some(not_until) = earliest_not_until {
Err(not_until)
} else {
unimplemented!();
}
}
/// get all available rpc servers
async fn get_upstream_servers(&self) -> Result<Vec<String>, NotUntil<QuantaInstant>> {
let mut earliest_not_until = None;
let mut selected_rpcs = vec![];
for selected_rpc in self.rpcs.read().await.iter() {
// check rate limits
match self
.ratelimits
.write()
.await
.get(selected_rpc)
.unwrap()
.check()
{
Ok(_) => {
// rate limit succeeded
}
Err(not_until) => {
// rate limit failed
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it
if earliest_not_until.is_none() {
earliest_not_until = Some(not_until);
} else {
let earliest_possible =
earliest_not_until.as_ref().unwrap().earliest_possible();
let new_earliest_possible = not_until.earliest_possible();
if earliest_possible > new_earliest_possible {
earliest_not_until = Some(not_until);
}
}
continue;
}
};
// increment our connection counter
self.connections
.write()
.await
.get_mut(selected_rpc)
.unwrap()
.inc_active_requests();
// this is rpc should work
selected_rpcs.push(selected_rpc.clone());
}
if !selected_rpcs.is_empty() {
return Ok(selected_rpcs);
}
// return the earliest not_until
if let Some(not_until) = earliest_not_until {
Err(not_until)
} else {
// TODO: is this right?
Ok(vec![])
}
}
}
/// The application /// The application
struct Web3ProxyApp { struct Web3ProxyApp {
/// clock used for rate limiting /// clock used for rate limiting
/// TODO: use tokio's clock (will require a different ratelimiting crate) /// TODO: use tokio's clock (will require a different ratelimiting crate)
clock: QuantaClock, clock: QuantaClock,
/// Send requests to the best server available /// Send requests to the best server available
balanced_rpc_tiers: Arc<Vec<RpcTier>>, balanced_rpc_tiers: Arc<Vec<Web3ProviderTier>>,
/// Send private requests (like eth_sendRawTransaction) to all these servers /// Send private requests (like eth_sendRawTransaction) to all these servers
private_rpcs: Option<Arc<RpcTier>>, private_rpcs: Option<Arc<Web3ProviderTier>>,
/// write lock on these when all rate limits are hit /// write lock on these when all rate limits are hit
balanced_rpc_ratelimiter_lock: RwLock<()>, balanced_rpc_ratelimiter_lock: RwLock<()>,
private_rpcs_ratelimiter_lock: RwLock<()>, private_rpcs_ratelimiter_lock: RwLock<()>,
@ -226,7 +43,7 @@ impl Web3ProxyApp {
) -> anyhow::Result<Web3ProxyApp> { ) -> anyhow::Result<Web3ProxyApp> {
let clock = QuantaClock::default(); let clock = QuantaClock::default();
let (mut block_watcher, block_watcher_sender) = BlockWatcher::new(clock.clone()); let (mut block_watcher, block_watcher_sender) = BlockWatcher::new();
// make a http shared client // make a http shared client
// TODO: how should we configure the connection pool? // TODO: how should we configure the connection pool?
@ -241,7 +58,7 @@ impl Web3ProxyApp {
let balanced_rpc_tiers = Arc::new( let balanced_rpc_tiers = Arc::new(
future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| { future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| {
RpcTier::try_new( Web3ProviderTier::try_new(
balanced_rpc_tier, balanced_rpc_tier,
Some(http_client.clone()), Some(http_client.clone()),
block_watcher_sender.clone(), block_watcher_sender.clone(),
@ -250,14 +67,14 @@ impl Web3ProxyApp {
})) }))
.await .await
.into_iter() .into_iter()
.collect::<anyhow::Result<Vec<RpcTier>>>()?, .collect::<anyhow::Result<Vec<Web3ProviderTier>>>()?,
); );
let private_rpcs = if private_rpcs.is_empty() { let private_rpcs = if private_rpcs.is_empty() {
None None
} else { } else {
Some(Arc::new( Some(Arc::new(
RpcTier::try_new( Web3ProviderTier::try_new(
private_rpcs, private_rpcs,
Some(http_client), Some(http_client),
block_watcher_sender, block_watcher_sender,
@ -300,7 +117,7 @@ impl Web3ProxyApp {
mpsc::unbounded_channel::<anyhow::Result<serde_json::Value>>(); mpsc::unbounded_channel::<anyhow::Result<serde_json::Value>>();
let clone = self.clone(); let clone = self.clone();
let connections = private_rpcs.connections.clone(); let connections = private_rpcs.clone_connections();
let json_body = json_body.clone(); let json_body = json_body.clone();
tokio::spawn(async move { tokio::spawn(async move {
@ -349,7 +166,7 @@ impl Web3ProxyApp {
mpsc::unbounded_channel::<anyhow::Result<serde_json::Value>>(); mpsc::unbounded_channel::<anyhow::Result<serde_json::Value>>();
let clone = self.clone(); let clone = self.clone();
let connections = balanced_rpcs.connections.clone(); let connections = balanced_rpcs.clone_connections();
let json_body = json_body.clone(); let json_body = json_body.clone();
tokio::spawn(async move { tokio::spawn(async move {
@ -413,7 +230,7 @@ impl Web3ProxyApp {
async fn try_send_requests( async fn try_send_requests(
&self, &self,
rpc_servers: Vec<String>, rpc_servers: Vec<String>,
connections: Arc<ConnectionsMap>, connections: Arc<Web3ConnectionMap>,
json_request_body: serde_json::Value, json_request_body: serde_json::Value,
tx: mpsc::UnboundedSender<anyhow::Result<serde_json::Value>>, tx: mpsc::UnboundedSender<anyhow::Result<serde_json::Value>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {

View File

@ -1,3 +1,4 @@
///! Communicate with a web3 providers
use derive_more::From; use derive_more::From;
use ethers::prelude::Middleware; use ethers::prelude::Middleware;
use futures::StreamExt; use futures::StreamExt;
@ -5,7 +6,7 @@ use std::time::Duration;
use std::{cmp::Ordering, sync::Arc}; use std::{cmp::Ordering, sync::Arc};
use tracing::{info, warn}; use tracing::{info, warn};
use crate::block_watcher::{BlockWatcherItem, BlockWatcherSender}; use crate::block_watcher::BlockWatcherSender;
// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit https://github.com/gakonst/ethers-rs/issues/592 // TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit https://github.com/gakonst/ethers-rs/issues/592
#[derive(From)] #[derive(From)]
@ -49,16 +50,12 @@ impl Web3Provider {
.unwrap(); .unwrap();
} }
*/ */
block_watcher_sender info!("work in progress");
.send(BlockWatcherItem::SubscribeHttp(url.clone()))
.unwrap();
} }
Web3Provider::Ws(provider) => { Web3Provider::Ws(provider) => {
let mut stream = provider.subscribe_blocks().await?; let mut stream = provider.subscribe_blocks().await?;
while let Some(block) = stream.next().await { while let Some(block) = stream.next().await {
block_watcher_sender block_watcher_sender.send((url.clone(), block)).unwrap();
.send(BlockWatcherItem::NewHead(Box::new((url.clone(), block))))
.unwrap();
} }
} }
} }

195
src/provider_tiers.rs Normal file
View File

@ -0,0 +1,195 @@
///! Communicate with groups of web3 providers
use governor::clock::{QuantaClock, QuantaInstant};
use governor::middleware::NoOpMiddleware;
use governor::state::{InMemoryState, NotKeyed};
use governor::NotUntil;
use governor::RateLimiter;
use std::collections::HashMap;
use std::num::NonZeroU32;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::block_watcher::BlockWatcherSender;
use crate::provider::Web3Connection;
type Web3RateLimiter =
RateLimiter<NotKeyed, InMemoryState, QuantaClock, NoOpMiddleware<QuantaInstant>>;
type Web3RateLimiterMap = RwLock<HashMap<String, Web3RateLimiter>>;
pub type Web3ConnectionMap = RwLock<HashMap<String, Web3Connection>>;
/// Load balance to the rpc
/// TODO: i'm not sure about having 3 locks here. can we share them?
pub struct Web3ProviderTier {
/// RPC urls sorted by active requests
/// TODO: what type for the rpc?
rpcs: RwLock<Vec<String>>,
connections: Arc<Web3ConnectionMap>,
ratelimits: Web3RateLimiterMap,
}
impl Web3ProviderTier {
pub async fn try_new(
servers: Vec<(&str, u32)>,
http_client: Option<reqwest::Client>,
block_watcher_sender: BlockWatcherSender,
clock: &QuantaClock,
) -> anyhow::Result<Web3ProviderTier> {
let mut rpcs: Vec<String> = vec![];
let mut connections = HashMap::new();
let mut ratelimits = HashMap::new();
for (s, limit) in servers.into_iter() {
rpcs.push(s.to_string());
let connection = Web3Connection::try_new(
s.to_string(),
http_client.clone(),
block_watcher_sender.clone(),
)
.await?;
connections.insert(s.to_string(), connection);
if limit > 0 {
let quota = governor::Quota::per_second(NonZeroU32::new(limit).unwrap());
let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock);
ratelimits.insert(s.to_string(), rate_limiter);
}
}
Ok(Web3ProviderTier {
rpcs: RwLock::new(rpcs),
connections: Arc::new(RwLock::new(connections)),
ratelimits: RwLock::new(ratelimits),
})
}
pub fn clone_connections(&self) -> Arc<Web3ConnectionMap> {
self.connections.clone()
}
/// get the best available rpc server
pub async fn next_upstream_server(&self) -> Result<String, NotUntil<QuantaInstant>> {
let mut balanced_rpcs = self.rpcs.write().await;
// sort rpcs by their active connections
let connections = self.connections.read().await;
balanced_rpcs
.sort_unstable_by(|a, b| connections.get(a).unwrap().cmp(connections.get(b).unwrap()));
let mut earliest_not_until = None;
for selected_rpc in balanced_rpcs.iter() {
// TODO: check current block number. if behind, make our own NotUntil here
let ratelimits = self.ratelimits.write().await;
// check rate limits
match ratelimits.get(selected_rpc).unwrap().check() {
Ok(_) => {
// rate limit succeeded
}
Err(not_until) => {
// rate limit failed
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it
if earliest_not_until.is_none() {
earliest_not_until = Some(not_until);
} else {
let earliest_possible =
earliest_not_until.as_ref().unwrap().earliest_possible();
let new_earliest_possible = not_until.earliest_possible();
if earliest_possible > new_earliest_possible {
earliest_not_until = Some(not_until);
}
}
continue;
}
};
// increment our connection counter
self.connections
.write()
.await
.get_mut(selected_rpc)
.unwrap()
.inc_active_requests();
// return the selected RPC
return Ok(selected_rpc.clone());
}
// return the smallest not_until
if let Some(not_until) = earliest_not_until {
Err(not_until)
} else {
unimplemented!();
}
}
/// get all available rpc servers
pub async fn get_upstream_servers(&self) -> Result<Vec<String>, NotUntil<QuantaInstant>> {
let mut earliest_not_until = None;
let mut selected_rpcs = vec![];
for selected_rpc in self.rpcs.read().await.iter() {
// check rate limits
match self
.ratelimits
.write()
.await
.get(selected_rpc)
.unwrap()
.check()
{
Ok(_) => {
// rate limit succeeded
}
Err(not_until) => {
// rate limit failed
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it
if earliest_not_until.is_none() {
earliest_not_until = Some(not_until);
} else {
let earliest_possible =
earliest_not_until.as_ref().unwrap().earliest_possible();
let new_earliest_possible = not_until.earliest_possible();
if earliest_possible > new_earliest_possible {
earliest_not_until = Some(not_until);
}
}
continue;
}
};
// increment our connection counter
self.connections
.write()
.await
.get_mut(selected_rpc)
.unwrap()
.inc_active_requests();
// this is rpc should work
selected_rpcs.push(selected_rpc.clone());
}
if !selected_rpcs.is_empty() {
return Ok(selected_rpcs);
}
// return the earliest not_until
if let Some(not_until) = earliest_not_until {
Err(not_until)
} else {
// TODO: is this right?
Ok(vec![])
}
}
}