rename Web3ProxyBlock to BlockHeader

This commit is contained in:
Bryan Stitt 2023-11-23 00:17:46 -04:00
parent ebc3a15579
commit 10dd616d31
10 changed files with 87 additions and 77 deletions

@ -11,7 +11,7 @@ use crate::jsonrpc::{
};
use crate::relational_db::{connect_db, migrate_db};
use crate::response_cache::{ForwardedResponse, JsonRpcResponseCache, JsonRpcResponseWeigher};
use crate::rpcs::blockchain::Web3ProxyBlock;
use crate::rpcs::blockchain::BlockHeader;
use crate::rpcs::consensus::RankedRpcs;
use crate::rpcs::many::Web3Rpcs;
use crate::rpcs::one::Web3Rpc;
@ -85,7 +85,7 @@ pub struct App {
/// rpc clients that subscribe to newHeads use this channel
/// don't drop this or the sender will stop working
/// TODO: broadcast channel instead?
pub watch_consensus_head_receiver: watch::Receiver<Option<Web3ProxyBlock>>,
pub watch_consensus_head_receiver: watch::Receiver<Option<BlockHeader>>,
/// rpc clients that subscribe to newPendingTransactions use this channel
pub pending_txid_firehose: Arc<DedupedBroadcaster<TxHash>>,
pub hostname: Option<String>,
@ -751,7 +751,7 @@ impl App {
Ok(())
}
pub fn head_block_receiver(&self) -> watch::Receiver<Option<Web3ProxyBlock>> {
pub fn head_block_receiver(&self) -> watch::Receiver<Option<BlockHeader>> {
self.watch_consensus_head_receiver.clone()
}
@ -1038,7 +1038,7 @@ impl App {
// get the head block now so that any requests that need it all use the same block
// TODO: this still has an edge condition if there is a reorg in the middle of the request!!!
let head_block: Web3ProxyBlock = self
let head_block: BlockHeader = self
.balanced_rpcs
.head_block()
.ok_or(Web3ProxyError::NoServersSynced)?;
@ -1252,14 +1252,22 @@ impl App {
self: &Arc<Self>,
request: SingleRequest,
authorization: Arc<Authorization>,
head_block: Option<Web3ProxyBlock>,
head_block: Option<BlockHeader>,
request_id: Option<String>,
) -> (StatusCode, jsonrpc::SingleResponse, Vec<Arc<Web3Rpc>>) {
// TODO: this clone is only for an error response. refactor to not need it
let error_id = request.id.clone();
// TODO: think more about how to handle retries without hammering our servers with errors
let mut ranked_rpcs = self.balanced_rpcs.watch_ranked_rpcs.subscribe();
let mut ranked_rpcs_recv = self.balanced_rpcs.watch_ranked_rpcs.subscribe();
let ranked_rpcs = ranked_rpcs_recv.borrow_and_update().clone();
let head_block = if head_block.is_none() {
ranked_rpcs.and_then(|x| x.head_block.clone())
} else {
head_block
};
let web3_request = match ValidatedRequest::new_with_app(
self,
@ -1267,7 +1275,7 @@ impl App {
None,
None,
request.into(),
head_block.clone(),
head_block,
request_id,
)
.await
@ -1305,9 +1313,9 @@ impl App {
}
select! {
_ = ranked_rpcs.changed() => {
_ = ranked_rpcs_recv.changed() => {
// TODO: pass these RankedRpcs to ValidatedRequest::new_with_app
ranked_rpcs.borrow_and_update();
ranked_rpcs_recv.borrow_and_update();
}
_ = &mut latest_start => {
// do not retry if we've already been trying for 3 seconds

@ -3,7 +3,7 @@ use crate::app::App;
use crate::jsonrpc::SingleRequest;
use crate::{
errors::{Web3ProxyError, Web3ProxyResult},
rpcs::blockchain::Web3ProxyBlock,
rpcs::blockchain::BlockHeader,
};
use anyhow::Context;
use derive_more::From;
@ -57,8 +57,8 @@ impl BlockNumAndHash {
}
}
impl From<&Web3ProxyBlock> for BlockNumAndHash {
fn from(value: &Web3ProxyBlock) -> Self {
impl From<&BlockHeader> for BlockNumAndHash {
fn from(value: &BlockHeader) -> Self {
let n = value.number();
let h = *value.hash();
@ -71,7 +71,7 @@ impl From<&Web3ProxyBlock> for BlockNumAndHash {
pub async fn clean_block_number<'a>(
params: &'a mut serde_json::Value,
block_param_id: usize,
head_block: &'a Web3ProxyBlock,
head_block: &'a BlockHeader,
app: Option<&'a App>,
) -> Web3ProxyResult<BlockNumOrHash> {
match params.as_array_mut() {
@ -216,8 +216,8 @@ impl BlockNumOrHash {
}
}
impl From<&Web3ProxyBlock> for BlockNumOrHash {
fn from(value: &Web3ProxyBlock) -> Self {
impl From<&BlockHeader> for BlockNumOrHash {
fn from(value: &BlockHeader) -> Self {
Self::And(value.into())
}
}
@ -279,7 +279,7 @@ impl CacheMode {
/// returns None if this request should not be cached
pub async fn new<'a>(
request: &'a mut SingleRequest,
head_block: Option<&'a Web3ProxyBlock>,
head_block: Option<&'a BlockHeader>,
app: Option<&'a App>,
) -> Web3ProxyResult<Self> {
match Self::try_new(request, head_block, app).await {
@ -318,7 +318,7 @@ impl CacheMode {
pub async fn try_new(
request: &mut SingleRequest,
head_block: Option<&Web3ProxyBlock>,
head_block: Option<&BlockHeader>,
app: Option<&App>,
) -> Web3ProxyResult<Self> {
let params = &mut request.params;
@ -572,7 +572,7 @@ mod test {
use crate::{
errors::Web3ProxyError,
jsonrpc::{LooseId, SingleRequest},
rpcs::blockchain::Web3ProxyBlock,
rpcs::blockchain::BlockHeader,
};
use ethers::types::{Block, H256};
use serde_json::json;
@ -589,7 +589,7 @@ mod test {
..Default::default()
};
let head_block = Web3ProxyBlock::try_new(Arc::new(head_block)).unwrap();
let head_block = BlockHeader::try_new(Arc::new(head_block)).unwrap();
let id = LooseId::Number(9);
@ -625,7 +625,7 @@ mod test {
..Default::default()
};
let head_block = Web3ProxyBlock::try_new(Arc::new(head_block)).unwrap();
let head_block = BlockHeader::try_new(Arc::new(head_block)).unwrap();
let id = LooseId::Number(99);
@ -663,7 +663,7 @@ mod test {
..Default::default()
};
let head_block = Web3ProxyBlock::try_new(Arc::new(head_block)).unwrap();
let head_block = BlockHeader::try_new(Arc::new(head_block)).unwrap();
let mut request = SingleRequest::new(99.into(), method.into(), params).unwrap();

@ -1,6 +1,6 @@
use crate::app::Web3ProxyJoinHandle;
use crate::compute_units::default_usd_per_cu;
use crate::rpcs::blockchain::{BlocksByHashCache, Web3ProxyBlock};
use crate::rpcs::blockchain::{BlockHeader, BlocksByHashCache};
use crate::rpcs::one::Web3Rpc;
use argh::FromArgs;
use deduped_broadcast::DedupedBroadcaster;
@ -18,7 +18,7 @@ use std::time::Duration;
use tokio::sync::mpsc;
use tracing::warn;
pub type BlockAndRpc = (Option<Web3ProxyBlock>, Arc<Web3Rpc>);
pub type BlockAndRpc = (Option<BlockHeader>, Arc<Web3Rpc>);
pub type TxHashAndRpc = (TxHash, Arc<Web3Rpc>);
#[derive(Debug, FromArgs)]

@ -6,7 +6,7 @@ use crate::jsonrpc::{
self, JsonRpcErrorData, ParsedResponse, SingleRequest, StreamResponse, ValidatedRequest,
};
use crate::response_cache::ForwardedResponse;
use crate::rpcs::blockchain::Web3ProxyBlock;
use crate::rpcs::blockchain::BlockHeader;
use crate::rpcs::one::Web3Rpc;
use crate::rpcs::provider::EthersHttpProvider;
use axum::extract::rejection::JsonRejection;
@ -144,7 +144,7 @@ pub enum Web3ProxyError {
#[error(ignore)]
#[from(ignore)]
#[display(fmt = "{} @ {}", _0, _1)]
OldHead(Arc<Web3Rpc>, Web3ProxyBlock),
OldHead(Arc<Web3Rpc>, BlockHeader),
OriginRequired,
#[error(ignore)]
#[from(ignore)]

@ -9,7 +9,7 @@ use crate::{
},
globals::APP,
response_cache::JsonRpcQueryCacheKey,
rpcs::{blockchain::Web3ProxyBlock, one::Web3Rpc},
rpcs::{blockchain::BlockHeader, one::Web3Rpc},
secrets::RpcSecretKey,
stats::AppStat,
};
@ -46,7 +46,7 @@ use {
pub struct RequestBuilder {
app: Option<Arc<App>>,
archive_request: bool,
head_block: Option<Web3ProxyBlock>,
head_block: Option<BlockHeader>,
authorization: Option<Arc<Authorization>>,
request_or_method: RequestOrMethod,
}
@ -244,7 +244,7 @@ pub struct ValidatedRequest {
/// TODO: this should probably be in a global config. although maybe if we run multiple chains in one process this will be useful
pub chain_id: u64,
pub head_block: Option<Web3ProxyBlock>,
pub head_block: Option<BlockHeader>,
/// TODO: this should be in a global config. not copied to every single request
pub usd_per_cu: Decimal,
@ -340,7 +340,7 @@ impl ValidatedRequest {
app: Option<&App>,
authorization: Arc<Authorization>,
chain_id: u64,
head_block: Option<Web3ProxyBlock>,
head_block: Option<BlockHeader>,
#[cfg(feature = "rdkafka")] kafka_debug_logger: Option<Arc<KafkaDebugLogger>>,
max_wait: Option<Duration>,
permit: Option<OwnedSemaphorePermit>,
@ -418,7 +418,7 @@ impl ValidatedRequest {
max_wait: Option<Duration>,
permit: Option<OwnedSemaphorePermit>,
request: RequestOrMethod,
head_block: Option<Web3ProxyBlock>,
head_block: Option<BlockHeader>,
request_id: Option<String>,
) -> Web3ProxyResult<Arc<Self>> {
#[cfg(feature = "rdkafka")]
@ -457,7 +457,7 @@ impl ValidatedRequest {
pub async fn new_internal<P: JsonRpcParams>(
method: Cow<'static, str>,
params: &P,
head_block: Option<Web3ProxyBlock>,
head_block: Option<BlockHeader>,
max_wait: Option<Duration>,
) -> Web3ProxyResult<Arc<Self>> {
let authorization = Arc::new(Authorization::internal().unwrap());

@ -20,15 +20,16 @@ use tracing::{debug, error, warn};
// TODO: type for Hydrated Blocks with their full transactions?
pub type ArcBlock = Arc<Block<TxHash>>;
pub type BlocksByHashCache = Cache<H256, Web3ProxyBlock>;
pub type BlocksByHashCache = Cache<H256, BlockHeader>;
pub type BlocksByNumberCache = Cache<U64, H256>;
/// A block and its age with a less verbose serialized format
/// This does **not** implement Default. We rarely want a block with number 0 and hash 0.
/// TODO: make a newtype for this? it doesn't have all the same fields as a block
#[derive(Clone)]
pub struct Web3ProxyBlock(pub ArcBlock);
pub struct BlockHeader(pub ArcBlock);
impl Debug for Web3ProxyBlock {
impl Debug for BlockHeader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Web3ProxyBlock")
.field("number", &self.number())
@ -37,7 +38,7 @@ impl Debug for Web3ProxyBlock {
}
}
impl Serialize for Web3ProxyBlock {
impl Serialize for BlockHeader {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
@ -60,7 +61,7 @@ impl Serialize for Web3ProxyBlock {
}
}
impl PartialEq for Web3ProxyBlock {
impl PartialEq for BlockHeader {
fn eq(&self, other: &Self) -> bool {
match (self.0.hash, other.0.hash) {
(None, None) => true,
@ -71,15 +72,15 @@ impl PartialEq for Web3ProxyBlock {
}
}
impl Eq for Web3ProxyBlock {}
impl Eq for BlockHeader {}
impl Hash for Web3ProxyBlock {
impl Hash for BlockHeader {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.0.hash.hash(state);
}
}
impl Web3ProxyBlock {
impl BlockHeader {
/// A new block has arrived over a subscription. skip it if its empty
pub fn try_new(block: ArcBlock) -> Option<Self> {
if block.number.is_none() || block.hash.is_none() {
@ -131,7 +132,7 @@ impl Web3ProxyBlock {
}
}
impl Display for Web3ProxyBlock {
impl Display for BlockHeader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
@ -143,7 +144,7 @@ impl Display for Web3ProxyBlock {
}
}
impl TryFrom<ArcBlock> for Web3ProxyBlock {
impl TryFrom<ArcBlock> for BlockHeader {
type Error = Web3ProxyError;
fn try_from(block: ArcBlock) -> Result<Self, Self::Error> {
@ -153,11 +154,11 @@ impl TryFrom<ArcBlock> for Web3ProxyBlock {
impl Web3Rpcs {
/// add a block to our mappings and track the heaviest chain
pub async fn try_cache_block(
pub async fn try_cache_block_header(
&self,
block: Web3ProxyBlock,
block: BlockHeader,
consensus_head: bool,
) -> Web3ProxyResult<Web3ProxyBlock> {
) -> Web3ProxyResult<BlockHeader> {
let block_hash = *block.hash();
// TODO: i think we can rearrange this function to make it faster on the hot path

@ -1,4 +1,4 @@
use super::blockchain::Web3ProxyBlock;
use super::blockchain::BlockHeader;
use super::many::Web3Rpcs;
use super::one::Web3Rpc;
use super::request::OpenRequestHandle;
@ -86,7 +86,7 @@ enum SortMethod {
/// TODO: make serializing work. the key needs to be a string. I think we need `serialize_with`
#[derive(Clone, Debug, Serialize)]
pub struct RankedRpcs {
pub head_block: Option<Web3ProxyBlock>,
pub head_block: Option<BlockHeader>,
pub num_synced: usize,
pub backups_needed: bool,
pub check_block_data: bool,
@ -107,7 +107,7 @@ pub struct RpcsForRequest {
impl RankedRpcs {
pub fn from_rpcs(
rpcs: Vec<Arc<Web3Rpc>>,
head_block: Option<Web3ProxyBlock>,
head_block: Option<BlockHeader>,
check_block_data: bool,
) -> Self {
// we don't need to sort the rpcs now. we will sort them when a request neds them
@ -135,8 +135,8 @@ impl RankedRpcs {
min_synced_rpcs: usize,
min_sum_soft_limit: u32,
max_lag_block: U64,
votes: HashMap<Web3ProxyBlock, (HashSet<&Arc<Web3Rpc>>, u32)>,
heads: HashMap<Arc<Web3Rpc>, Web3ProxyBlock>,
votes: HashMap<BlockHeader, (HashSet<&Arc<Web3Rpc>>, u32)>,
heads: HashMap<Arc<Web3Rpc>, BlockHeader>,
) -> Option<Self> {
// find the blocks that meets our min_sum_soft_limit and min_synced_rpcs
let mut votes: Vec<_> = votes
@ -319,7 +319,7 @@ impl RankedRpcs {
// TODO: move this to many.rs
impl Web3Rpcs {
#[inline]
pub fn head_block(&self) -> Option<Web3ProxyBlock> {
pub fn head_block(&self) -> Option<BlockHeader> {
self.watch_head_block
.as_ref()
.and_then(|x| x.borrow().clone())
@ -357,7 +357,7 @@ type FirstSeenCache = Cache<H256, Instant>;
/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers
pub struct ConsensusFinder {
rpc_heads: HashMap<Arc<Web3Rpc>, Web3ProxyBlock>,
rpc_heads: HashMap<Arc<Web3Rpc>, BlockHeader>,
/// no consensus if the best known block is too old
max_head_block_age: Option<Duration>,
/// no consensus if the best consensus block is too far behind the best known
@ -397,7 +397,7 @@ impl ConsensusFinder {
&mut self,
web3_rpcs: &Web3Rpcs,
rpc: Option<&Arc<Web3Rpc>>,
new_block: Option<Web3ProxyBlock>,
new_block: Option<BlockHeader>,
) -> Web3ProxyResult<bool> {
let rpc_block_sender = rpc.and_then(|x| x.head_block_sender.as_ref());
@ -477,7 +477,7 @@ impl ConsensusFinder {
let consensus_head_block = if let Some(consensus_head_block) = consensus_head_block
{
let consensus_head_block = web3_rpcs
.try_cache_block(consensus_head_block, true)
.try_cache_block_header(consensus_head_block, true)
.await?;
Some(consensus_head_block)
@ -538,7 +538,7 @@ impl ConsensusFinder {
consensus_head_block
{
let consensus_head_block = web3_rpcs
.try_cache_block(consensus_head_block, true)
.try_cache_block_header(consensus_head_block, true)
.await
.web3_context("save consensus_head_block as heaviest chain")?;
@ -578,7 +578,7 @@ impl ConsensusFinder {
let consensus_head_block =
if let Some(consensus_head_block) = consensus_head_block {
let consensus_head_block = web3_rpcs
.try_cache_block(consensus_head_block, true)
.try_cache_block_header(consensus_head_block, true)
.await
.web3_context(
"save_block sending consensus_head_block as heaviest chain",
@ -617,7 +617,7 @@ impl ConsensusFinder {
if let Some(consensus_head_block) = consensus_head_block {
Some(
web3_rpcs
.try_cache_block(consensus_head_block, true)
.try_cache_block_header(consensus_head_block, true)
.await?,
)
} else {
@ -638,7 +638,7 @@ impl ConsensusFinder {
pub(super) async fn process_block_from_rpc(
&mut self,
web3_rpcs: &Web3Rpcs,
new_block: Option<Web3ProxyBlock>,
new_block: Option<BlockHeader>,
rpc: Arc<Web3Rpc>,
) -> Web3ProxyResult<bool> {
// TODO: how should we handle an error here?
@ -655,11 +655,11 @@ impl ConsensusFinder {
self.refresh(web3_rpcs, Some(&rpc), new_block).await
}
fn remove(&mut self, rpc: &Arc<Web3Rpc>) -> Option<Web3ProxyBlock> {
fn remove(&mut self, rpc: &Arc<Web3Rpc>) -> Option<BlockHeader> {
self.rpc_heads.remove(rpc)
}
async fn insert(&mut self, rpc: Arc<Web3Rpc>, block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
async fn insert(&mut self, rpc: Arc<Web3Rpc>, block: BlockHeader) -> Option<BlockHeader> {
let first_seen = self
.first_seen
.get_with_by_ref(block.hash(), async { Instant::now() })
@ -678,7 +678,7 @@ impl ConsensusFinder {
/// Update our tracking of the rpc and return true if something changed
pub(crate) async fn update_rpc(
&mut self,
rpc_head_block: Option<Web3ProxyBlock>,
rpc_head_block: Option<BlockHeader>,
rpc: Arc<Web3Rpc>,
// we need this so we can save the block to caches. i don't like it though. maybe we should use a lazy_static Cache wrapper that has a "save_block" method?. i generally dislike globals but i also dislike all the types having to pass eachother around
web3_connections: &Web3Rpcs,
@ -688,7 +688,7 @@ impl ConsensusFinder {
Some(mut rpc_head_block) => {
// we don't know if its on the heaviest chain yet
rpc_head_block = web3_connections
.try_cache_block(rpc_head_block, false)
.try_cache_block_header(rpc_head_block, false)
.await
.web3_context("failed caching block")?;
@ -844,9 +844,9 @@ impl ConsensusFinder {
// TODO: also track the sum of *available* hard_limits? if any servers have no hard limits, use their soft limit or no limit?
// TODO: struct for the value of the votes hashmap?
let mut primary_votes: HashMap<Web3ProxyBlock, (HashSet<&Arc<Web3Rpc>>, u32)> =
let mut primary_votes: HashMap<BlockHeader, (HashSet<&Arc<Web3Rpc>>, u32)> =
HashMap::with_capacity(num_known);
let mut backup_votes: HashMap<Web3ProxyBlock, (HashSet<&Arc<Web3Rpc>>, u32)> =
let mut backup_votes: HashMap<BlockHeader, (HashSet<&Arc<Web3Rpc>>, u32)> =
HashMap::with_capacity(num_known);
for (rpc, rpc_head) in self.rpc_heads.iter() {
@ -1049,7 +1049,7 @@ impl RpcsForRequest {
}
}
struct MaybeBlock<'a>(pub &'a Option<Web3ProxyBlock>);
struct MaybeBlock<'a>(pub &'a Option<BlockHeader>);
impl std::fmt::Display for MaybeBlock<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {

@ -1,5 +1,5 @@
//! Load balanced communication with a group of web3 rpc providers
use super::blockchain::{BlocksByHashCache, BlocksByNumberCache, Web3ProxyBlock};
use super::blockchain::{BlockHeader, BlocksByHashCache, BlocksByNumberCache};
use super::consensus::{RankedRpcs, RpcsForRequest};
use super::one::Web3Rpc;
use crate::app::{App, Web3ProxyJoinHandle};
@ -35,7 +35,7 @@ pub struct Web3Rpcs {
pub(crate) name: Cow<'static, str>,
pub(crate) chain_id: u64,
/// if watch_head_block is some, Web3Rpc inside self will send blocks here when they get them
pub(crate) block_and_rpc_sender: mpsc::UnboundedSender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
pub(crate) block_and_rpc_sender: mpsc::UnboundedSender<(Option<BlockHeader>, Arc<Web3Rpc>)>,
/// any requests will be forwarded to one (or more) of these connections
/// TODO: hopefully this not being an async lock will be okay. if you need it across awaits, clone the arc
pub(crate) by_name: RwLock<HashMap<String, Arc<Web3Rpc>>>,
@ -46,7 +46,7 @@ pub struct Web3Rpcs {
pub(crate) watch_ranked_rpcs: watch::Sender<Option<Arc<RankedRpcs>>>,
/// this head receiver makes it easy to wait until there is a new block
/// this is None if none of the child Rpcs are subscribed to newHeads
pub(super) watch_head_block: Option<watch::Sender<Option<Web3ProxyBlock>>>,
pub(super) watch_head_block: Option<watch::Sender<Option<BlockHeader>>>,
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
/// all blocks, including uncles
/// TODO: i think uncles should be excluded
@ -102,7 +102,7 @@ impl Web3Rpcs {
min_head_rpcs: usize,
min_sum_soft_limit: u32,
name: Cow<'static, str>,
watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
watch_consensus_head_sender: Option<watch::Sender<Option<BlockHeader>>>,
pending_txid_firehose: Option<Arc<DedupedBroadcaster<TxHash>>>,
) -> anyhow::Result<(
Arc<Self>,
@ -688,7 +688,7 @@ mod tests {
use super::*;
use crate::block_number::{BlockNumAndHash, CacheMode};
use crate::rpcs::blockchain::Web3ProxyBlock;
use crate::rpcs::blockchain::BlockHeader;
use crate::rpcs::consensus::ConsensusFinder;
use arc_swap::ArcSwap;
use ethers::types::H256;
@ -726,7 +726,7 @@ mod tests {
let blocks: Vec<_> = [block_0, block_1, block_2]
.into_iter()
.map(|x| Web3ProxyBlock::try_new(Arc::new(x)).unwrap())
.map(|x| BlockHeader::try_new(Arc::new(x)).unwrap())
.collect();
let (tx_a, _) = watch::channel(None);

@ -1,5 +1,5 @@
//! Rate-limited communication with a web3 provider.
use super::blockchain::{ArcBlock, BlocksByHashCache, Web3ProxyBlock};
use super::blockchain::{ArcBlock, BlockHeader, BlocksByHashCache};
use super::provider::{connect_ws, EthersWsProvider};
use super::request::{OpenRequestHandle, OpenRequestResult};
use crate::app::Web3ProxyJoinHandle;
@ -78,7 +78,7 @@ pub struct Web3Rpc {
/// TODO: have an enum for this so that "no limit" prints pretty?
pub(super) block_data_limit: AtomicU64,
/// head_block is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) head_block_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
pub(super) head_block_sender: Option<watch::Sender<Option<BlockHeader>>>,
/// Track head block latency.
/// TODO: This is in a sync lock, but writes are infrequent and quick. Is this actually okay? Set from a spawned task and read an atomic instead?
pub(super) head_delay: RwLock<EwmaLatency>,
@ -575,7 +575,7 @@ impl Web3Rpc {
let new_head_block = match new_head_block {
Ok(x) => {
let x = x.and_then(Web3ProxyBlock::try_new);
let x = x.and_then(BlockHeader::try_new);
match x {
None => {
@ -1515,7 +1515,7 @@ mod tests {
let random_block = Arc::new(random_block);
let head_block = Web3ProxyBlock::try_new(random_block).unwrap();
let head_block = BlockHeader::try_new(random_block).unwrap();
let block_data_limit = u64::MAX;
let (tx, _) = watch::channel(Some(head_block.clone()));
@ -1541,7 +1541,7 @@ mod tests {
fn test_pruned_node_has_block_data() {
let now = chrono::Utc::now().timestamp().into();
let head_block: Web3ProxyBlock = Arc::new(Block {
let head_block: BlockHeader = Arc::new(Block {
hash: Some(H256::random()),
number: Some(1_000_000.into()),
timestamp: now,

@ -570,8 +570,9 @@ impl RpcQueryStats {
// force error_response to true
// this can happen when a try operator escapes and metadata.add_response() isn't called
warn!(
"no response known, but no errors logged. investigate. {:?}",
metadata
?response_lock,
?metadata,
"no response known, but no errors logged. investigate",
);
error_response = true;
}