refactor send_head_block_result

This commit is contained in:
Bryan Stitt 2023-06-13 10:00:08 -07:00
parent 1473ccb085
commit f5a1ac274a
4 changed files with 36 additions and 35 deletions

@ -388,6 +388,7 @@ impl Web3ProxyApp {
top_config top_config
.app .app
.influxdb_bucket .influxdb_bucket
.as_ref()
.expect("influxdb_bucket needed when influxdb_host is set"); .expect("influxdb_bucket needed when influxdb_host is set");
let influxdb_client = let influxdb_client =

@ -60,6 +60,7 @@ impl MigrateStatsToV2 {
top_config top_config
.app .app
.influxdb_bucket .influxdb_bucket
.as_ref()
.expect("influxdb_token needed when influxdb_host is set"); .expect("influxdb_token needed when influxdb_host is set");
let influxdb_client = let influxdb_client =

@ -222,15 +222,9 @@ impl Web3Rpcs {
self.by_name.store(Arc::new(new_by_name)); self.by_name.store(Arc::new(new_by_name));
if let Some(old_rpc) = old_rpc { if let Some(old_rpc) = old_rpc {
if old_rpc if old_rpc.head_block.as_ref().unwrap().borrow().is_some() {
.head_block_sender
.as_ref()
.unwrap()
.borrow()
.is_some()
{
let mut new_head_receiver = let mut new_head_receiver =
rpc.head_block_sender.as_ref().unwrap().subscribe(); rpc.head_block.as_ref().unwrap().subscribe();
debug!("waiting for new {} to sync", rpc); debug!("waiting for new {} to sync", rpc);
// TODO: maximum wait time or this could block things for too long // TODO: maximum wait time or this could block things for too long
@ -1373,42 +1367,42 @@ mod tests {
Web3Rpc { Web3Rpc {
name: "a".to_string(), name: "a".to_string(),
tier: 0.into(), tier: 0.into(),
head_block_sender: Some(tx_a), head_block: Some(tx_a),
peak_latency: Some(new_peak_latency()), peak_latency: Some(new_peak_latency()),
..Default::default() ..Default::default()
}, },
Web3Rpc { Web3Rpc {
name: "b".to_string(), name: "b".to_string(),
tier: 0.into(), tier: 0.into(),
head_block_sender: Some(tx_b), head_block: Some(tx_b),
peak_latency: Some(new_peak_latency()), peak_latency: Some(new_peak_latency()),
..Default::default() ..Default::default()
}, },
Web3Rpc { Web3Rpc {
name: "c".to_string(), name: "c".to_string(),
tier: 0.into(), tier: 0.into(),
head_block_sender: Some(tx_c), head_block: Some(tx_c),
peak_latency: Some(new_peak_latency()), peak_latency: Some(new_peak_latency()),
..Default::default() ..Default::default()
}, },
Web3Rpc { Web3Rpc {
name: "d".to_string(), name: "d".to_string(),
tier: 1.into(), tier: 1.into(),
head_block_sender: Some(tx_d), head_block: Some(tx_d),
peak_latency: Some(new_peak_latency()), peak_latency: Some(new_peak_latency()),
..Default::default() ..Default::default()
}, },
Web3Rpc { Web3Rpc {
name: "e".to_string(), name: "e".to_string(),
tier: 1.into(), tier: 1.into(),
head_block_sender: Some(tx_e), head_block: Some(tx_e),
peak_latency: Some(new_peak_latency()), peak_latency: Some(new_peak_latency()),
..Default::default() ..Default::default()
}, },
Web3Rpc { Web3Rpc {
name: "f".to_string(), name: "f".to_string(),
tier: 1.into(), tier: 1.into(),
head_block_sender: Some(tx_f), head_block: Some(tx_f),
peak_latency: Some(new_peak_latency()), peak_latency: Some(new_peak_latency()),
..Default::default() ..Default::default()
}, },
@ -1464,7 +1458,7 @@ mod tests {
backup: false, backup: false,
block_data_limit: block_data_limit.into(), block_data_limit: block_data_limit.into(),
// tier: 0, // tier: 0,
head_block_sender: Some(tx_synced), head_block: Some(tx_synced),
peak_latency: Some(new_peak_latency()), peak_latency: Some(new_peak_latency()),
..Default::default() ..Default::default()
}; };
@ -1478,7 +1472,7 @@ mod tests {
backup: false, backup: false,
block_data_limit: block_data_limit.into(), block_data_limit: block_data_limit.into(),
// tier: 0, // tier: 0,
head_block_sender: Some(tx_lagged), head_block: Some(tx_lagged),
peak_latency: Some(new_peak_latency()), peak_latency: Some(new_peak_latency()),
..Default::default() ..Default::default()
}; };
@ -1748,7 +1742,7 @@ mod tests {
backup: false, backup: false,
block_data_limit: 64.into(), block_data_limit: 64.into(),
// tier: 1, // tier: 1,
head_block_sender: Some(tx_pruned), head_block: Some(tx_pruned),
..Default::default() ..Default::default()
}; };
@ -1761,7 +1755,7 @@ mod tests {
backup: false, backup: false,
block_data_limit: u64::MAX.into(), block_data_limit: u64::MAX.into(),
// tier: 2, // tier: 2,
head_block_sender: Some(tx_archive), head_block: Some(tx_archive),
..Default::default() ..Default::default()
}; };
@ -1930,7 +1924,7 @@ mod tests {
backup: false, backup: false,
block_data_limit: 64.into(), block_data_limit: 64.into(),
// tier: 0, // tier: 0,
head_block_sender: Some(tx_mock_geth), head_block: Some(tx_mock_geth),
peak_latency: Some(new_peak_latency()), peak_latency: Some(new_peak_latency()),
..Default::default() ..Default::default()
}; };
@ -1942,7 +1936,7 @@ mod tests {
backup: false, backup: false,
block_data_limit: u64::MAX.into(), block_data_limit: u64::MAX.into(),
// tier: 1, // tier: 1,
head_block_sender: Some(tx_mock_erigon_archive), head_block: Some(tx_mock_erigon_archive),
peak_latency: Some(new_peak_latency()), peak_latency: Some(new_peak_latency()),
..Default::default() ..Default::default()
}; };

@ -62,7 +62,7 @@ pub struct Web3Rpc {
/// TODO: have an enum for this so that "no limit" prints pretty? /// TODO: have an enum for this so that "no limit" prints pretty?
pub(super) block_data_limit: AtomicU64, pub(super) block_data_limit: AtomicU64,
/// head_block is only inside an Option so that the "Default" derive works. it will always be set. /// head_block is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) head_block_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>, pub(super) head_block: Option<watch::Sender<Option<Web3ProxyBlock>>>,
/// Track head block latency /// Track head block latency
pub(super) head_latency: RwLock<EwmaLatency>, pub(super) head_latency: RwLock<EwmaLatency>,
/// Track peak request latency /// Track peak request latency
@ -197,7 +197,7 @@ impl Web3Rpc {
display_name: config.display_name, display_name: config.display_name,
hard_limit, hard_limit,
hard_limit_until: Some(hard_limit_until), hard_limit_until: Some(hard_limit_until),
head_block_sender: Some(head_block), head_block: Some(head_block),
http_provider, http_provider,
name, name,
peak_latency: Some(peak_latency), peak_latency: Some(peak_latency),
@ -234,7 +234,7 @@ impl Web3Rpc {
/// TODO: should this return a struct that implements sorting traits? /// TODO: should this return a struct that implements sorting traits?
fn sort_on(&self, max_block: Option<U64>) -> (bool, u32, Reverse<U64>) { fn sort_on(&self, max_block: Option<U64>) -> (bool, u32, Reverse<U64>) {
let mut head_block = self let mut head_block = self
.head_block_sender .head_block
.as_ref() .as_ref()
.and_then(|x| x.borrow().as_ref().map(|x| *x.number())) .and_then(|x| x.borrow().as_ref().map(|x| *x.number()))
.unwrap_or_default(); .unwrap_or_default();
@ -380,7 +380,7 @@ impl Web3Rpc {
/// TODO: get rid of this now that consensus rpcs does it /// TODO: get rid of this now that consensus rpcs does it
pub fn has_block_data(&self, needed_block_num: &U64) -> bool { pub fn has_block_data(&self, needed_block_num: &U64) -> bool {
let head_block_num = match self.head_block_sender.as_ref().unwrap().borrow().as_ref() { let head_block_num = match self.head_block.as_ref().unwrap().borrow().as_ref() {
None => return false, None => return false,
Some(x) => *x.number(), Some(x) => *x.number(),
}; };
@ -449,10 +449,10 @@ impl Web3Rpc {
pub(crate) async fn send_head_block_result( pub(crate) async fn send_head_block_result(
self: &Arc<Self>, self: &Arc<Self>,
new_head_block: Web3ProxyResult<Option<ArcBlock>>, new_head_block: Web3ProxyResult<Option<ArcBlock>>,
block_sender: &flume::Sender<BlockAndRpc>, block_and_rpc_sender: &flume::Sender<BlockAndRpc>,
block_map: &BlocksByHashCache, block_map: &BlocksByHashCache,
) -> Web3ProxyResult<()> { ) -> Web3ProxyResult<()> {
let head_block_sender = self.head_block_sender.as_ref().unwrap(); let head_block_sender = self.head_block.as_ref().unwrap();
let new_head_block = match new_head_block { let new_head_block = match new_head_block {
Ok(x) => { Ok(x) => {
@ -469,8 +469,11 @@ impl Web3Rpc {
debug!("clearing head block on {} ({}ms old)!", self, age); debug!("clearing head block on {} ({}ms old)!", self, age);
// send an empty block to take this server out of rotation
head_block_sender.send_replace(None); head_block_sender.send_replace(None);
// TODO: clear self.block_data_limit?
None None
} }
Some(new_head_block) => { Some(new_head_block) => {
@ -481,8 +484,7 @@ impl Web3Rpc {
.get_with_by_ref(&new_hash, async move { new_head_block }) .get_with_by_ref(&new_hash, async move { new_head_block })
.await; .await;
// save the block so we don't send the same one multiple times // we are synced! yey!
// also save so that archive checks can know how far back to query
head_block_sender.send_replace(Some(new_head_block.clone())); head_block_sender.send_replace(Some(new_head_block.clone()));
if self.block_data_limit() == U64::zero() { if self.block_data_limit() == U64::zero() {
@ -501,17 +503,20 @@ impl Web3Rpc {
Err(err) => { Err(err) => {
warn!("unable to get block from {}. err={:?}", self, err); warn!("unable to get block from {}. err={:?}", self, err);
// send an empty block to take this server out of rotation
head_block_sender.send_replace(None); head_block_sender.send_replace(None);
// TODO: clear self.block_data_limit?
None None
} }
}; };
// send an empty block to take this server out of rotation // tell web3rpcs about this rpc having this block
block_sender block_and_rpc_sender
.send_async((new_head_block, self.clone())) .send_async((new_head_block, self.clone()))
.await .await
.context("block_sender")?; .context("block_and_rpc_sender failed sending")?;
Ok(()) Ok(())
} }
@ -524,7 +529,7 @@ impl Web3Rpc {
self: &Arc<Self>, self: &Arc<Self>,
error_handler: Option<RequestErrorHandler>, error_handler: Option<RequestErrorHandler>,
) -> Web3ProxyResult<()> { ) -> Web3ProxyResult<()> {
let head_block = self.head_block_sender.as_ref().unwrap().borrow().clone(); let head_block = self.head_block.as_ref().unwrap().borrow().clone();
if let Some(head_block) = head_block { if let Some(head_block) = head_block {
let head_block = head_block.block; let head_block = head_block.block;
@ -1110,7 +1115,7 @@ impl Serialize for Web3Rpc {
// TODO: maybe this is too much data. serialize less? // TODO: maybe this is too much data. serialize less?
{ {
let head_block = self.head_block_sender.as_ref().unwrap(); let head_block = self.head_block.as_ref().unwrap();
let head_block = head_block.borrow(); let head_block = head_block.borrow();
let head_block = head_block.as_ref(); let head_block = head_block.as_ref();
state.serialize_field("head_block", &head_block)?; state.serialize_field("head_block", &head_block)?;
@ -1192,7 +1197,7 @@ mod tests {
automatic_block_limit: false, automatic_block_limit: false,
backup: false, backup: false,
block_data_limit: block_data_limit.into(), block_data_limit: block_data_limit.into(),
head_block_sender: Some(tx), head_block: Some(tx),
..Default::default() ..Default::default()
}; };
@ -1226,7 +1231,7 @@ mod tests {
automatic_block_limit: false, automatic_block_limit: false,
backup: false, backup: false,
block_data_limit: block_data_limit.into(), block_data_limit: block_data_limit.into(),
head_block_sender: Some(tx), head_block: Some(tx),
..Default::default() ..Default::default()
}; };