no more dashmap. all atomics

This commit is contained in:
Bryan Stitt 2022-04-28 22:40:29 +00:00
parent 63428cad6b
commit 2fef0e6acb
6 changed files with 68 additions and 39 deletions

1
Cargo.lock generated
View File

@ -3759,7 +3759,6 @@ dependencies = [
"arc-swap", "arc-swap",
"argh", "argh",
"atomic-counter", "atomic-counter",
"dashmap",
"derive_more", "derive_more",
"ethers", "ethers",
"futures", "futures",

View File

@ -10,7 +10,6 @@ arc-swap = "1.5.0"
argh = "0.1.7" argh = "0.1.7"
anyhow = "1.0.57" anyhow = "1.0.57"
atomic-counter = "1.0.1" atomic-counter = "1.0.1"
dashmap = "5.2.0"
derive_more = "0.99" derive_more = "0.99"
ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] } ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] }
futures = { version = "0.3.21", features = ["thread-pool"] } futures = { version = "0.3.21", features = ["thread-pool"] }

View File

@ -1,7 +1,7 @@
///! Track the head block of all the web3 providers ///! Track the head block of all the web3 providers
use dashmap::DashMap;
use ethers::prelude::{Block, TxHash}; use ethers::prelude::{Block, TxHash};
use std::cmp; use std::cmp;
use std::collections::HashMap;
use std::fmt; use std::fmt;
use std::sync::atomic::{self, AtomicU64}; use std::sync::atomic::{self, AtomicU64};
use std::sync::Arc; use std::sync::Arc;
@ -26,7 +26,8 @@ pub struct BlockWatcher {
sender: BlockWatcherSender, sender: BlockWatcherSender,
/// this Mutex is locked over awaits, so we want an async lock /// this Mutex is locked over awaits, so we want an async lock
receiver: Mutex<BlockWatcherReceiver>, receiver: Mutex<BlockWatcherReceiver>,
block_numbers: DashMap<String, u64>, // TODO: better key
block_numbers: HashMap<String, AtomicU64>,
head_block_number: AtomicU64, head_block_number: AtomicU64,
} }
@ -38,13 +39,15 @@ impl fmt::Debug for BlockWatcher {
} }
impl BlockWatcher { impl BlockWatcher {
pub fn new() -> Self { pub fn new(rpcs: Vec<String>) -> Self {
let (sender, receiver) = mpsc::unbounded_channel(); let (sender, receiver) = mpsc::unbounded_channel();
let block_numbers = rpcs.into_iter().map(|rpc| (rpc, 0.into())).collect();
Self { Self {
sender, sender,
receiver: Mutex::new(receiver), receiver: Mutex::new(receiver),
block_numbers: Default::default(), block_numbers,
head_block_number: Default::default(), head_block_number: Default::default(),
} }
} }
@ -55,12 +58,14 @@ impl BlockWatcher {
pub fn sync_status(&self, rpc: &str, allowed_lag: u64) -> SyncStatus { pub fn sync_status(&self, rpc: &str, allowed_lag: u64) -> SyncStatus {
match ( match (
self.head_block_number.load(atomic::Ordering::SeqCst), self.head_block_number.load(atomic::Ordering::Acquire),
self.block_numbers.get(rpc), self.block_numbers.get(rpc),
) { ) {
(0, _) => SyncStatus::Unknown, (0, _) => SyncStatus::Unknown,
(_, None) => SyncStatus::Unknown, (_, None) => SyncStatus::Unknown,
(head_block_number, Some(rpc_block_number)) => { (head_block_number, Some(rpc_block_number)) => {
let rpc_block_number = rpc_block_number.load(atomic::Ordering::Acquire);
match head_block_number.cmp(&rpc_block_number) { match head_block_number.cmp(&rpc_block_number) {
cmp::Ordering::Equal => SyncStatus::Synced(0), cmp::Ordering::Equal => SyncStatus::Synced(0),
cmp::Ordering::Greater => { cmp::Ordering::Greater => {
@ -70,7 +75,7 @@ impl BlockWatcher {
} }
cmp::Ordering::Less => { cmp::Ordering::Less => {
// allow being some behind // allow being some behind
let lag = head_block_number - *rpc_block_number; let lag = head_block_number - rpc_block_number;
if lag <= allowed_lag { if lag <= allowed_lag {
SyncStatus::Synced(lag) SyncStatus::Synced(lag)
@ -94,10 +99,12 @@ impl BlockWatcher {
{ {
if let Some(rpc_block_number) = self.block_numbers.get(&rpc) { if let Some(rpc_block_number) = self.block_numbers.get(&rpc) {
let rpc_block_number = rpc_block_number.load(atomic::Ordering::Acquire);
// if we already have this block height // if we already have this block height
// this probably own't happen with websockets, but is likely with polling against http rpcs // this probably own't happen with websockets, but is likely with polling against http rpcs
// TODO: should we compare more than just height? hash too? // TODO: should we compare more than just height? hash too?
if *rpc_block_number == new_block_number { if rpc_block_number == new_block_number {
continue; continue;
} }
} }
@ -110,14 +117,17 @@ impl BlockWatcher {
// save the block for this rpc // save the block for this rpc
// TODO: store the actual chain as a graph and then have self.blocks point to that? // TODO: store the actual chain as a graph and then have self.blocks point to that?
self.block_numbers.insert(rpc.clone(), new_block_number); self.block_numbers
.get(&rpc)
.unwrap()
.swap(new_block_number, atomic::Ordering::Release);
let head_number = self.head_block_number.load(atomic::Ordering::SeqCst); let head_number = self.head_block_number.load(atomic::Ordering::Acquire);
let label_slow_heads = if head_number == 0 { let label_slow_heads = if head_number == 0 {
// first block seen // first block seen
self.head_block_number self.head_block_number
.swap(new_block_number, atomic::Ordering::SeqCst); .swap(new_block_number, atomic::Ordering::AcqRel);
", +".to_string() ", +".to_string()
} else { } else {
// TODO: what if they have the same number but different hashes? // TODO: what if they have the same number but different hashes?
@ -130,7 +140,7 @@ impl BlockWatcher {
cmp::Ordering::Greater => { cmp::Ordering::Greater => {
// new_block is the new head_block // new_block is the new head_block
self.head_block_number self.head_block_number
.swap(new_block_number, atomic::Ordering::SeqCst); .swap(new_block_number, atomic::Ordering::AcqRel);
", +".to_string() ", +".to_string()
} }
cmp::Ordering::Less => { cmp::Ordering::Less => {

View File

@ -57,6 +57,22 @@ impl Web3ProxyApp {
) -> anyhow::Result<Web3ProxyApp> { ) -> anyhow::Result<Web3ProxyApp> {
let clock = QuantaClock::default(); let clock = QuantaClock::default();
let mut rpcs = vec![];
for balanced_rpc_tier in balanced_rpc_tiers.iter() {
for rpc_data in balanced_rpc_tier {
let rpc = rpc_data.0.to_string();
rpcs.push(rpc);
}
}
for rpc_data in private_rpcs.iter() {
let rpc = rpc_data.0.to_string();
rpcs.push(rpc);
}
let block_watcher = Arc::new(BlockWatcher::new(rpcs));
// make a http shared client // make a http shared client
// TODO: how should we configure the connection pool? // TODO: how should we configure the connection pool?
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
@ -65,17 +81,6 @@ impl Web3ProxyApp {
.user_agent(APP_USER_AGENT) .user_agent(APP_USER_AGENT)
.build()?; .build()?;
let block_watcher = Arc::new(BlockWatcher::new());
let (new_block_sender, mut new_block_receiver) = watch::channel::<String>("".to_string());
{
// TODO: spawn this later?
// spawn a future for the block_watcher
let block_watcher = block_watcher.clone();
tokio::spawn(async move { block_watcher.run(new_block_sender).await });
}
let balanced_rpc_tiers = Arc::new( let balanced_rpc_tiers = Arc::new(
future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| { future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| {
Web3ProviderTier::try_new( Web3ProviderTier::try_new(
@ -105,6 +110,15 @@ impl Web3ProxyApp {
)) ))
}; };
let (new_block_sender, mut new_block_receiver) = watch::channel::<String>("".to_string());
{
// TODO: spawn this later?
// spawn a future for the block_watcher
let block_watcher = block_watcher.clone();
tokio::spawn(async move { block_watcher.run(new_block_sender).await });
}
{ {
// spawn a future for sorting our synced rpcs // spawn a future for sorting our synced rpcs
// TODO: spawn this later? // TODO: spawn this later?
@ -368,7 +382,7 @@ impl Web3ProxyApp {
let response = provider.request(&method, params).await; let response = provider.request(&method, params).await;
connections.get_mut(&rpc).unwrap().dec_active_requests(); connections.get(&rpc).unwrap().dec_active_requests();
let response = response?; let response = response?;

View File

@ -8,6 +8,7 @@ use governor::state::{InMemoryState, NotKeyed};
use governor::NotUntil; use governor::NotUntil;
use governor::RateLimiter; use governor::RateLimiter;
use std::fmt; use std::fmt;
use std::sync::atomic::{self, AtomicUsize};
use std::time::Duration; use std::time::Duration;
use std::{cmp::Ordering, sync::Arc}; use std::{cmp::Ordering, sync::Arc};
use tokio::time::interval; use tokio::time::interval;
@ -91,7 +92,7 @@ impl Web3Provider {
#[derive(Debug)] #[derive(Debug)]
pub struct Web3Connection { pub struct Web3Connection {
/// keep track of currently open requests. We sort on this /// keep track of currently open requests. We sort on this
active_requests: u32, active_requests: AtomicUsize,
provider: Arc<Web3Provider>, provider: Arc<Web3Provider>,
ratelimiter: Option<Web3RateLimiter>, ratelimiter: Option<Web3RateLimiter>,
} }
@ -146,13 +147,13 @@ impl Web3Connection {
}); });
Ok(Web3Connection { Ok(Web3Connection {
active_requests: 0, active_requests: Default::default(),
provider, provider,
ratelimiter, ratelimiter,
}) })
} }
pub fn try_inc_active_requests(&mut self) -> Result<(), NotUntil<QuantaInstant>> { pub fn try_inc_active_requests(&self) -> Result<(), NotUntil<QuantaInstant>> {
// check rate limits // check rate limits
if let Some(ratelimiter) = self.ratelimiter.as_ref() { if let Some(ratelimiter) = self.ratelimiter.as_ref() {
match ratelimiter.check() { match ratelimiter.check() {
@ -170,13 +171,15 @@ impl Web3Connection {
} }
}; };
self.active_requests += 1; // TODO: what ordering?!
self.active_requests.fetch_add(1, atomic::Ordering::AcqRel);
Ok(()) Ok(())
} }
pub fn dec_active_requests(&mut self) { pub fn dec_active_requests(&self) {
self.active_requests -= 1; // TODO: what ordering?!
self.active_requests.fetch_sub(1, atomic::Ordering::AcqRel);
} }
} }
@ -184,7 +187,10 @@ impl Eq for Web3Connection {}
impl Ord for Web3Connection { impl Ord for Web3Connection {
fn cmp(&self, other: &Self) -> std::cmp::Ordering { fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.active_requests.cmp(&other.active_requests) // TODO: what atomic ordering?!
self.active_requests
.load(atomic::Ordering::Acquire)
.cmp(&other.active_requests.load(atomic::Ordering::Acquire))
} }
} }
@ -197,6 +203,8 @@ impl PartialOrd for Web3Connection {
/// note that this is just comparing the active requests. two providers with different rpc urls are equal! /// note that this is just comparing the active requests. two providers with different rpc urls are equal!
impl PartialEq for Web3Connection { impl PartialEq for Web3Connection {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
self.active_requests == other.active_requests // TODO: what ordering?!
self.active_requests.load(atomic::Ordering::Acquire)
== other.active_requests.load(atomic::Ordering::Acquire)
} }
} }

View File

@ -1,6 +1,5 @@
///! Communicate with groups of web3 providers ///! Communicate with groups of web3 providers
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
use dashmap::DashMap;
use governor::clock::{QuantaClock, QuantaInstant}; use governor::clock::{QuantaClock, QuantaInstant};
use governor::NotUntil; use governor::NotUntil;
use std::cmp; use std::cmp;
@ -13,7 +12,7 @@ use crate::block_watcher::{BlockWatcher, SyncStatus};
use crate::provider::Web3Connection; use crate::provider::Web3Connection;
// TODO: move the rate limiter into the connection // TODO: move the rate limiter into the connection
pub type Web3ConnectionMap = DashMap<String, Web3Connection>; pub type Web3ConnectionMap = HashMap<String, Web3Connection>;
/// Load balance to the rpc /// Load balance to the rpc
pub struct Web3ProviderTier { pub struct Web3ProviderTier {
@ -39,7 +38,7 @@ impl Web3ProviderTier {
clock: &QuantaClock, clock: &QuantaClock,
) -> anyhow::Result<Web3ProviderTier> { ) -> anyhow::Result<Web3ProviderTier> {
let mut rpcs: Vec<String> = vec![]; let mut rpcs: Vec<String> = vec![];
let connections = DashMap::new(); let mut connections = HashMap::new();
for (s, limit) in servers.into_iter() { for (s, limit) in servers.into_iter() {
rpcs.push(s.to_string()); rpcs.push(s.to_string());
@ -144,7 +143,7 @@ impl Web3ProviderTier {
self.connections self.connections
.get(a) .get(a)
.unwrap() .unwrap()
.cmp(&self.connections.get(b).unwrap()) .cmp(self.connections.get(b).unwrap())
}); });
// filter out // filter out
@ -166,7 +165,7 @@ impl Web3ProviderTier {
// increment our connection counter // increment our connection counter
if let Err(not_until) = self if let Err(not_until) = self
.connections .connections
.get_mut(selected_rpc) .get(selected_rpc)
.unwrap() .unwrap()
.try_inc_active_requests() .try_inc_active_requests()
{ {
@ -204,7 +203,7 @@ impl Web3ProviderTier {
// TODO: share code with next_upstream_server // TODO: share code with next_upstream_server
if let Err(not_until) = self if let Err(not_until) = self
.connections .connections
.get_mut(selected_rpc) .get(selected_rpc)
.unwrap() .unwrap()
.try_inc_active_requests() .try_inc_active_requests()
{ {