check block hash and store block number fewer places

This commit is contained in:
Bryan Stitt 2022-05-15 06:27:13 +00:00
parent dfb98ede20
commit 0041709f3e
3 changed files with 101 additions and 151 deletions

@ -1,5 +1,7 @@
# Todo
- [ ] eth_sendRawTransaction should return the most common result, not the first
- [ ] if chain split detected, don't send transactions
- [ ] endpoint for health checks. if no synced servers, give a 502 error
- [ ] some production configs are occassionally stuck waiting at 100% cpu
- looks like its getting stuck on `futex(0x7fc15067b478, FUTEX_WAIT_PRIVATE, 1, NULL`

@ -1,6 +1,6 @@
///! Rate-limited communication with a web3 provider
use derive_more::From;
use ethers::prelude::Middleware;
use ethers::prelude::{Block, Middleware, ProviderError, TxHash, H256};
use futures::StreamExt;
use governor::clock::{Clock, QuantaClock, QuantaInstant};
use governor::middleware::NoOpMiddleware;
@ -9,14 +9,12 @@ use governor::NotUntil;
use governor::RateLimiter;
use std::fmt;
use std::num::NonZeroU32;
use std::sync::atomic::{self, AtomicU32, AtomicU64};
use std::sync::atomic::{self, AtomicU32};
use std::time::Duration;
use std::{cmp::Ordering, sync::Arc};
use tokio::time::{interval, sleep, MissedTickBehavior};
use tracing::{info, trace, warn};
use crate::connections::Web3Connections;
type Web3RateLimiter =
RateLimiter<NotKeyed, InMemoryState, QuantaClock, NoOpMiddleware<QuantaInstant>>;
@ -44,7 +42,6 @@ pub struct Web3Connection {
ratelimiter: Option<Web3RateLimiter>,
/// used for load balancing to the least loaded server
soft_limit: u32,
head_block_number: AtomicU64,
/// the same clock that is used by the rate limiter
clock: QuantaClock,
}
@ -117,7 +114,6 @@ impl Web3Connection {
provider,
ratelimiter: hard_rate_limiter,
soft_limit,
head_block_number: 0.into(),
};
let connection = Arc::new(connection);
@ -158,11 +154,6 @@ impl Web3Connection {
self.active_requests.load(atomic::Ordering::Acquire)
}
#[inline]
pub fn head_block_number(&self) -> u64 {
self.head_block_number.load(atomic::Ordering::Acquire)
}
#[inline]
pub fn soft_limit(&self) -> u32 {
self.soft_limit
@ -173,11 +164,32 @@ impl Web3Connection {
&self.url
}
fn send_block(
self: &Arc<Self>,
block: Result<Block<TxHash>, ProviderError>,
block_sender: &flume::Sender<(u64, H256, Arc<Self>)>,
) {
match block {
Ok(block) => {
let block_number = block.number.unwrap().as_u64();
let block_hash = block.hash.unwrap();
// TODO: i'm pretty sure we don't need send_async, but double check
block_sender
.send((block_number, block_hash, self.clone()))
.unwrap();
}
Err(e) => {
warn!("unable to get block from {}: {}", self, e);
}
}
}
/// Subscribe to new blocks
// #[instrument]
pub async fn new_heads(
pub async fn subscribe_new_heads(
self: Arc<Self>,
connections: Option<Arc<Web3Connections>>,
block_sender: flume::Sender<(u64, H256, Arc<Self>)>,
) -> anyhow::Result<()> {
info!("Watching new_heads on {}", self);
@ -196,23 +208,14 @@ impl Web3Connection {
let active_request_handle = self.wait_for_request_handle().await;
let block_number = provider.get_block_number().await.map(|x| x.as_u64())?;
// TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest"
let block: Result<Block<TxHash>, _> = provider
.request("eth_getBlockByNumber", ("latest", false))
.await;
drop(active_request_handle);
// TODO: only store if this isn't already stored?
// TODO: also send something to the provider_tier so it can sort?
let old_block_number = self
.head_block_number
.swap(block_number, atomic::Ordering::AcqRel);
if old_block_number != block_number {
if let Some(connections) = &connections {
connections.update_synced_rpcs(&self).await?;
} else {
info!("new block on {}: {}", self, block_number);
}
}
self.send_block(block, &block_sender);
}
}
Web3Provider::Ws(provider) => {
@ -231,32 +234,16 @@ impl Web3Connection {
// there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block
// TODO: rate limit!
let block_number = provider.get_block_number().await.map(|x| x.as_u64())?;
let block: Result<Block<TxHash>, _> = provider
.request("eth_getBlockByNumber", ("latest", false))
.await;
drop(active_request_handle);
// TODO: swap and check the result?
self.head_block_number
.store(block_number, atomic::Ordering::Release);
if let Some(connections) = &connections {
connections.update_synced_rpcs(&self).await?;
} else {
info!("new head block {} from {}", block_number, self);
}
self.send_block(block, &block_sender);
while let Some(new_block) = stream.next().await {
let new_block_number = new_block.number.unwrap().as_u64();
// TODO: only store if this isn't already stored?
// TODO: also send something to the provider_tier so it can sort?
// TODO: do we need this old block number check? its helpful on http, but here it shouldn't dupe except maybe on the first run
self.head_block_number
.fetch_max(new_block_number, atomic::Ordering::AcqRel);
if let Some(connections) = &connections {
connections.update_synced_rpcs(&self).await?;
}
self.send_block(Ok(new_block), &block_sender);
}
}
}

@ -1,5 +1,6 @@
///! Load balanced communication with a group of web3 providers
use derive_more::From;
use ethers::prelude::H256;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use governor::clock::{QuantaClock, QuantaInstant};
@ -11,7 +12,7 @@ use std::fmt;
use std::sync::atomic::{self, AtomicU64};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, info, instrument, trace, warn};
use tracing::{debug, info, trace, warn};
use crate::config::Web3ConnectionConfig;
use crate::connection::{ActiveRequestHandle, Web3Connection};
@ -19,7 +20,8 @@ use crate::connection::{ActiveRequestHandle, Web3Connection};
#[derive(Clone, Default)]
struct SyncedConnections {
head_block_number: u64,
inner: Vec<usize>,
head_block_hash: H256,
inner: Vec<Arc<Web3Connection>>,
}
impl fmt::Debug for SyncedConnections {
@ -35,6 +37,7 @@ impl SyncedConnections {
Self {
head_block_number: 0,
head_block_hash: Default::default(),
inner,
}
}
@ -91,16 +94,23 @@ impl Web3Connections {
});
if subscribe_heads {
let (block_sender, block_receiver) = flume::unbounded();
{
let connections = Arc::clone(&connections);
tokio::spawn(async move { connections.update_synced_rpcs(block_receiver).await });
}
for connection in connections.inner.iter() {
// subscribe to new heads in a spawned future
// TODO: channel instead. then we can have one future with write access to a left-right?
let connection = Arc::clone(connection);
let connections = connections.clone();
let block_sender = block_sender.clone();
tokio::spawn(async move {
let url = connection.url().to_string();
// TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date
if let Err(e) = connection.new_heads(Some(connections)).await {
if let Err(e) = connection.subscribe_new_heads(block_sender).await {
warn!("new_heads error on {}: {:?}", url, e);
}
});
@ -175,106 +185,56 @@ impl Web3Connections {
}
/// TODO: possible dead lock here. investigate more. probably refactor
#[instrument]
pub async fn update_synced_rpcs(&self, rpc: &Arc<Web3Connection>) -> anyhow::Result<()> {
info!("Locking synced_connections");
let mut synced_connections = self.synced_connections.write().await;
info!("Locked synced_connections");
// #[instrument]
pub async fn update_synced_rpcs(
&self,
block_receiver: flume::Receiver<(u64, H256, Arc<Web3Connection>)>,
) -> anyhow::Result<()> {
while let Ok((new_block_num, new_block_hash, rpc)) = block_receiver.recv_async().await {
if new_block_num == 0 {
warn!("{} is still syncing", rpc);
continue;
}
let new_block = rpc.head_block_number();
// TODO: experiment with different locks and such here
let mut synced_connections = self.synced_connections.write().await;
if new_block == 0 {
warn!("{} is still syncing", rpc);
return Ok(());
}
// TODO: double check this logic
match new_block_num.cmp(&synced_connections.head_block_number) {
cmp::Ordering::Greater => {
// the rpc's newest block is the new overall best block
info!("new head block {} from {}", new_block_num, rpc);
let current_best_block_number = synced_connections.head_block_number;
synced_connections.inner.clear();
synced_connections.inner.push(rpc);
let overall_best_head_block = self.head_block_number();
// TODO: double check this logic
match (
new_block.cmp(&overall_best_head_block),
new_block.cmp(&current_best_block_number),
) {
(cmp::Ordering::Greater, cmp::Ordering::Greater) => {
// the rpc's newest block is the new overall best block
synced_connections.inner.clear();
synced_connections.head_block_number = new_block;
// TODO: what ordering?
match self.best_head_block_number.compare_exchange(
overall_best_head_block,
new_block,
atomic::Ordering::AcqRel,
atomic::Ordering::Acquire,
) {
Ok(_) => {
info!("new head block {} from {}", new_block, rpc);
}
Err(current_best_block_number) => {
// actually, there was a race and this ended up not being the latest block. return now without adding this rpc to the synced list
debug!(
"behind {} on {:?}: {}",
current_best_block_number, rpc, new_block
synced_connections.head_block_number = new_block_num;
synced_connections.head_block_hash = new_block_hash;
}
cmp::Ordering::Equal => {
if new_block_hash != synced_connections.head_block_hash {
// same height, but different chain
warn!(
"chain is forked! {} has {}. First #{} was {}",
rpc, new_block_hash, new_block_num, synced_connections.head_block_hash
);
return Ok(());
continue;
}
// do not clear synced_connections.
// we just want to add this rpc to the end
synced_connections.inner.push(rpc);
}
cmp::Ordering::Less => {
// this isn't the best block in the tier. don't do anything
continue;
}
}
(cmp::Ordering::Equal, cmp::Ordering::Less) => {
// no need to do anything
return Ok(());
}
(cmp::Ordering::Greater, cmp::Ordering::Less) => {
// this isn't the best block in the tier. don't do anything
return Ok(());
}
(cmp::Ordering::Equal, cmp::Ordering::Equal) => {
// this rpc tier is synced, and it isn't the first to this block
}
(cmp::Ordering::Less, cmp::Ordering::Less) => {
// this rpc is behind the best and the tier. don't do anything
return Ok(());
}
(cmp::Ordering::Less, cmp::Ordering::Equal) => {
// this rpc is behind the best. but is an improvement for the tier
return Ok(());
}
(cmp::Ordering::Less, cmp::Ordering::Greater) => {
// this rpc is behind the best, but it is catching up
synced_connections.inner.clear();
synced_connections.head_block_number = new_block;
// return now because this isn't actually synced
return Ok(());
}
(cmp::Ordering::Equal, cmp::Ordering::Greater) => {
// we caught up to another tier
synced_connections.inner.clear();
synced_connections.head_block_number = new_block;
}
(cmp::Ordering::Greater, cmp::Ordering::Equal) => {
// TODO: what should we do? i think we got here because we aren't using atomics properly
// the overall block hasn't yet updated, but our internal block has
// TODO: maybe we should
}
// TODO: better log
trace!("Now synced: {:?}", synced_connections.inner);
}
let rpc_index = self
.inner
.iter()
.position(|x| x.url() == rpc.url())
.unwrap();
// TODO: hopefully nothing ends up in here twice. Greater+Equal might do that to us
synced_connections.inner.push(rpc_index);
info!("Now synced {:?}: {:?}", self, synced_connections.inner);
Ok(())
}
@ -291,13 +251,12 @@ impl Web3Connections {
// let a = a as f32 / self.soft_limit as f32;
// let b = b as f32 / other.soft_limit as f32;
let sort_cache: HashMap<usize, (f32, u32)> = synced_rpc_indexes
// TODO: better key!
let sort_cache: HashMap<String, (f32, u32)> = synced_rpc_indexes
.iter()
.map(|synced_index| {
let key = *synced_index;
let connection = self.inner.get(*synced_index).unwrap();
.map(|connection| {
// TODO: better key!
let key = format!("{}", connection);
let active_requests = connection.active_requests();
let soft_limit = connection.soft_limit();
@ -309,8 +268,12 @@ impl Web3Connections {
// TODO: i think we might need to load active connections and then
synced_rpc_indexes.sort_unstable_by(|a, b| {
let (a_utilization, a_soft_limit) = sort_cache.get(a).unwrap();
let (b_utilization, b_soft_limit) = sort_cache.get(b).unwrap();
// TODO: better keys
let a_key = format!("{}", a);
let b_key = format!("{}", b);
let (a_utilization, a_soft_limit) = sort_cache.get(&a_key).unwrap();
let (b_utilization, b_soft_limit) = sort_cache.get(&b_key).unwrap();
// TODO: i'm comparing floats. crap
match a_utilization
@ -323,8 +286,6 @@ impl Web3Connections {
});
for selected_rpc in synced_rpc_indexes.into_iter() {
let selected_rpc = self.inner.get(selected_rpc).unwrap();
// increment our connection counter
match selected_rpc.try_request_handle() {
Err(not_until) => {