From 058dfa6d8eeacece5ec8dec05e51f3383468fca8 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sun, 6 Nov 2022 20:52:11 +0000 Subject: [PATCH] send_head_block_result more places --- web3_proxy/src/app.rs | 6 +-- web3_proxy/src/bin/web3_proxy.rs | 20 ++++++---- web3_proxy/src/rpcs/connection.rs | 61 +++++++++++++++++++++++++------ 3 files changed, 65 insertions(+), 22 deletions(-) diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index fa71cd76..47e9ba2a 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -318,12 +318,12 @@ impl Web3ProxyApp { // keep 1GB of blocks in the cache // TODO: limits from config // these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes - // TODO: how can we do the weigher better? this is going to be slow! + // TODO: how can we do the weigher better? let block_map = Cache::builder() .max_capacity(1024 * 1024 * 1024) - .weigher(|_k, v: &Arc>| { + .weigher(|_k, v: &ArcBlock| { // TODO: is this good enough? - v.transactions.len().try_into().unwrap_or(u32::MAX) + 1 + v.transactions.len().try_into().unwrap_or(u32::MAX) }) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index 614c89ad..60e96646 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -312,13 +312,15 @@ mod tests { // TODO: do something to the node. query latest block, mine another block, query again let proxy_provider = Provider::::try_from(anvil.endpoint()).unwrap(); - let anvil_result: Block = anvil_provider - .request("eth_getBlockByNumber", ("latest", true)) + let anvil_result = anvil_provider + .request::<_, Option>>("eth_getBlockByNumber", ("latest", true)) .await + .unwrap() .unwrap(); - let proxy_result: Block = proxy_provider - .request("eth_getBlockByNumber", ("latest", true)) + let proxy_result = proxy_provider + .request::<_, Option>>("eth_getBlockByNumber", ("latest", true)) .await + .unwrap() .unwrap(); assert_eq!(anvil_result, proxy_result); @@ -330,13 +332,15 @@ mod tests { .await .unwrap(); - let anvil_result: Block = anvil_provider - .request("eth_getBlockByNumber", ("latest", true)) + let anvil_result = anvil_provider + .request::<_, Option>>("eth_getBlockByNumber", ("latest", true)) .await + .unwrap() .unwrap(); - let proxy_result: Block = proxy_provider - .request("eth_getBlockByNumber", ("latest", true)) + let proxy_result = proxy_provider + .request::<_, Option>>("eth_getBlockByNumber", ("latest", true)) .await + .unwrap() .unwrap(); assert_eq!(anvil_result, proxy_result); diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 3d806a6d..9eee22d3 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -381,14 +381,25 @@ impl Web3Connection { match new_head_block { Ok(None) => { // TODO: i think this should clear the local block and then update over the block sender - todo!("handle no block") + warn!("unsynced server {}", self); + + { + let mut head_block_id = self.head_block_id.write(); + + *head_block_id = None; + } + + block_sender + .send_async((None, self.clone())) + .await + .context("clearing block_sender")?; } - Ok(Some(mut new_head_block)) => { + Ok(Some(new_head_block)) => { // TODO: is unwrap_or_default ok? we might have an empty block let new_hash = new_head_block.hash.unwrap_or_default(); // if we already have this block saved, set new_head_block to that arc. otherwise store this copy - new_head_block = block_map + let new_head_block = block_map .get_with(new_hash, async move { new_head_block }) .await; @@ -418,11 +429,17 @@ impl Web3Connection { .await .context("block_sender")?; } - Err(e) => { - warn!("unable to get block from {}: {}", self, e); - // TODO: do something to rpc_chain? + Err(err) => { + warn!(?err, "unable to get block from {}", self); + + { + let mut head_block_id = self.head_block_id.write(); + + *head_block_id = None; + } // send an empty block to take this server out of rotation + // TODO: this is NOT working!!!! block_sender .send_async((None, self.clone())) .await @@ -547,7 +564,7 @@ impl Web3Connection { .await { Ok(active_request_handle) => { - let block: Result, _> = active_request_handle + let block: Result, _> = active_request_handle .request( "eth_getBlockByNumber", &json!(("latest", false)), @@ -556,7 +573,17 @@ impl Web3Connection { .await; match block { - Ok(block) => { + Ok(None) => { + warn!("no head block on {}", self); + + self.send_head_block_result( + Ok(None), + &block_sender, + block_map.clone(), + ) + .await?; + } + Ok(Some(block)) => { // don't send repeat blocks let new_hash = block .hash @@ -567,7 +594,7 @@ impl Web3Connection { last_hash = new_hash; self.send_head_block_result( - Ok(Some(Arc::new(block))), + Ok(Some(block)), &block_sender, block_map.clone(), ) @@ -587,6 +614,14 @@ impl Web3Connection { } Err(err) => { warn!(?err, "Internal error on latest block from {}", self); + + self.send_head_block_result( + Ok(None), + &block_sender, + block_map.clone(), + ) + .await?; + // TODO: what should we do? sleep? extra time? } } @@ -621,6 +656,7 @@ impl Web3Connection { // query the block once since the subscription doesn't send the current block // there is a very small race condition here where the stream could send us a new block right now // all it does is print "new block" for the same block as current block + // TODO: how does this get wrapped in an arc? does ethers handle that? let block: Result, _> = self .wait_for_request_handle(None, Duration::from_secs(30)) .await? @@ -629,8 +665,7 @@ impl Web3Connection { &json!(("latest", false)), tracing::Level::ERROR.into(), ) - .await - .map(|x| Some(Arc::new(x))); + .await; let mut last_hash = match &block { Ok(Some(new_block)) => new_block @@ -663,6 +698,10 @@ impl Web3Connection { .await?; } + // clear the head block. this might not be needed, but it won't hurt + self.send_head_block_result(Ok(None), &block_sender, block_map) + .await?; + // TODO: is this always an error? // TODO: we probably don't want a warn and to return error warn!(rpc=%self, "new_heads subscription ended");