send_head_block_result more places

This commit is contained in:
Bryan Stitt 2022-11-06 20:52:11 +00:00
parent 2e7e005ec5
commit 058dfa6d8e
3 changed files with 65 additions and 22 deletions

@ -318,12 +318,12 @@ impl Web3ProxyApp {
// keep 1GB of blocks in the cache
// TODO: limits from config
// these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes
// TODO: how can we do the weigher better? this is going to be slow!
// TODO: how can we do the weigher better?
let block_map = Cache::builder()
.max_capacity(1024 * 1024 * 1024)
.weigher(|_k, v: &Arc<Block<TxHash>>| {
.weigher(|_k, v: &ArcBlock| {
// TODO: is this good enough?
v.transactions.len().try_into().unwrap_or(u32::MAX)
1 + v.transactions.len().try_into().unwrap_or(u32::MAX)
})
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());

@ -312,13 +312,15 @@ mod tests {
// TODO: do something to the node. query latest block, mine another block, query again
let proxy_provider = Provider::<Http>::try_from(anvil.endpoint()).unwrap();
let anvil_result: Block<TxHash> = anvil_provider
.request("eth_getBlockByNumber", ("latest", true))
let anvil_result = anvil_provider
.request::<_, Option<Block<TxHash>>>("eth_getBlockByNumber", ("latest", true))
.await
.unwrap()
.unwrap();
let proxy_result: Block<TxHash> = proxy_provider
.request("eth_getBlockByNumber", ("latest", true))
let proxy_result = proxy_provider
.request::<_, Option<Block<TxHash>>>("eth_getBlockByNumber", ("latest", true))
.await
.unwrap()
.unwrap();
assert_eq!(anvil_result, proxy_result);
@ -330,13 +332,15 @@ mod tests {
.await
.unwrap();
let anvil_result: Block<TxHash> = anvil_provider
.request("eth_getBlockByNumber", ("latest", true))
let anvil_result = anvil_provider
.request::<_, Option<Block<TxHash>>>("eth_getBlockByNumber", ("latest", true))
.await
.unwrap()
.unwrap();
let proxy_result: Block<TxHash> = proxy_provider
.request("eth_getBlockByNumber", ("latest", true))
let proxy_result = proxy_provider
.request::<_, Option<Block<TxHash>>>("eth_getBlockByNumber", ("latest", true))
.await
.unwrap()
.unwrap();
assert_eq!(anvil_result, proxy_result);

@ -381,14 +381,25 @@ impl Web3Connection {
match new_head_block {
Ok(None) => {
// TODO: i think this should clear the local block and then update over the block sender
todo!("handle no block")
warn!("unsynced server {}", self);
{
let mut head_block_id = self.head_block_id.write();
*head_block_id = None;
}
block_sender
.send_async((None, self.clone()))
.await
.context("clearing block_sender")?;
}
Ok(Some(mut new_head_block)) => {
Ok(Some(new_head_block)) => {
// TODO: is unwrap_or_default ok? we might have an empty block
let new_hash = new_head_block.hash.unwrap_or_default();
// if we already have this block saved, set new_head_block to that arc. otherwise store this copy
new_head_block = block_map
let new_head_block = block_map
.get_with(new_hash, async move { new_head_block })
.await;
@ -418,11 +429,17 @@ impl Web3Connection {
.await
.context("block_sender")?;
}
Err(e) => {
warn!("unable to get block from {}: {}", self, e);
// TODO: do something to rpc_chain?
Err(err) => {
warn!(?err, "unable to get block from {}", self);
{
let mut head_block_id = self.head_block_id.write();
*head_block_id = None;
}
// send an empty block to take this server out of rotation
// TODO: this is NOT working!!!!
block_sender
.send_async((None, self.clone()))
.await
@ -547,7 +564,7 @@ impl Web3Connection {
.await
{
Ok(active_request_handle) => {
let block: Result<Block<TxHash>, _> = active_request_handle
let block: Result<Option<ArcBlock>, _> = active_request_handle
.request(
"eth_getBlockByNumber",
&json!(("latest", false)),
@ -556,7 +573,17 @@ impl Web3Connection {
.await;
match block {
Ok(block) => {
Ok(None) => {
warn!("no head block on {}", self);
self.send_head_block_result(
Ok(None),
&block_sender,
block_map.clone(),
)
.await?;
}
Ok(Some(block)) => {
// don't send repeat blocks
let new_hash = block
.hash
@ -567,7 +594,7 @@ impl Web3Connection {
last_hash = new_hash;
self.send_head_block_result(
Ok(Some(Arc::new(block))),
Ok(Some(block)),
&block_sender,
block_map.clone(),
)
@ -587,6 +614,14 @@ impl Web3Connection {
}
Err(err) => {
warn!(?err, "Internal error on latest block from {}", self);
self.send_head_block_result(
Ok(None),
&block_sender,
block_map.clone(),
)
.await?;
// TODO: what should we do? sleep? extra time?
}
}
@ -621,6 +656,7 @@ impl Web3Connection {
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block
// TODO: how does this get wrapped in an arc? does ethers handle that?
let block: Result<Option<ArcBlock>, _> = self
.wait_for_request_handle(None, Duration::from_secs(30))
.await?
@ -629,8 +665,7 @@ impl Web3Connection {
&json!(("latest", false)),
tracing::Level::ERROR.into(),
)
.await
.map(|x| Some(Arc::new(x)));
.await;
let mut last_hash = match &block {
Ok(Some(new_block)) => new_block
@ -663,6 +698,10 @@ impl Web3Connection {
.await?;
}
// clear the head block. this might not be needed, but it won't hurt
self.send_head_block_result(Ok(None), &block_sender, block_map)
.await?;
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
warn!(rpc=%self, "new_heads subscription ended");