small cleanup
This commit is contained in:
parent
6e12edd555
commit
a7cb3d00df
@ -39,7 +39,7 @@ use tokio::sync::{broadcast, watch};
|
|||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
use tokio::time::{timeout, Instant};
|
use tokio::time::{timeout, Instant};
|
||||||
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
|
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
|
||||||
use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument};
|
use tracing::{debug, info, info_span, instrument, trace, warn, Instrument};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
// TODO: make this customizable?
|
// TODO: make this customizable?
|
||||||
@ -590,7 +590,7 @@ impl Web3ProxyApp {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn cached_response(
|
async fn cached_response_or_key(
|
||||||
&self,
|
&self,
|
||||||
// TODO: accept a block hash here also?
|
// TODO: accept a block hash here also?
|
||||||
min_block_needed: Option<&U64>,
|
min_block_needed: Option<&U64>,
|
||||||
@ -829,7 +829,10 @@ impl Web3ProxyApp {
|
|||||||
trace!(?min_block_needed, ?method);
|
trace!(?min_block_needed, ?method);
|
||||||
|
|
||||||
// TODO: emit a stat on error. maybe with .map_err?
|
// TODO: emit a stat on error. maybe with .map_err?
|
||||||
let cache_key = match self.cached_response(min_block_needed, &request).await? {
|
let cache_key = match self
|
||||||
|
.cached_response_or_key(min_block_needed, &request)
|
||||||
|
.await?
|
||||||
|
{
|
||||||
Ok(mut cache_result) => {
|
Ok(mut cache_result) => {
|
||||||
// we got a cache hit! no need to do any backend requests.
|
// we got a cache hit! no need to do any backend requests.
|
||||||
|
|
||||||
|
@ -36,11 +36,7 @@ impl Display for BlockId {
|
|||||||
|
|
||||||
impl Web3Connections {
|
impl Web3Connections {
|
||||||
/// add a block to our map and it's hash to our graphmap of the blockchain
|
/// add a block to our map and it's hash to our graphmap of the blockchain
|
||||||
pub async fn save_block(
|
pub async fn save_block(&self, block: &ArcBlock, heaviest_chain: bool) -> anyhow::Result<()> {
|
||||||
&self,
|
|
||||||
block: &ArcBlock,
|
|
||||||
heaviest_chain: Option<bool>,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
// TODO: i think we can rearrange this function to make it faster on the hot path
|
// TODO: i think we can rearrange this function to make it faster on the hot path
|
||||||
let block_hash = block.hash.as_ref().context("no block hash")?;
|
let block_hash = block.hash.as_ref().context("no block hash")?;
|
||||||
let block_num = block.number.as_ref().context("no block num")?;
|
let block_num = block.number.as_ref().context("no block num")?;
|
||||||
@ -57,30 +53,34 @@ impl Web3Connections {
|
|||||||
let mut blockchain = self.blockchain_graphmap.write().await;
|
let mut blockchain = self.blockchain_graphmap.write().await;
|
||||||
|
|
||||||
// TODO: think more about heaviest_chain
|
// TODO: think more about heaviest_chain
|
||||||
if heaviest_chain.unwrap_or(true) {
|
if heaviest_chain {
|
||||||
// this is the only place that writes to block_numbers, and its inside a write lock on blockchain_graphmap, so i think there is no race
|
// this is the only place that writes to block_numbers
|
||||||
|
// its inside a write lock on blockchain_graphmap, so i think there is no race
|
||||||
if let Some(old_hash) = self.block_numbers.get(block_num) {
|
if let Some(old_hash) = self.block_numbers.get(block_num) {
|
||||||
if block_hash == &old_hash {
|
if block_hash == &old_hash {
|
||||||
// this block has already been saved
|
// this block has already been saved
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// i think a race here isn't that big of a problem. just 2 inserts
|
// i think a race here isn't that big of a problem. just 2 inserts
|
||||||
self.block_numbers.insert(*block_num, *block_hash).await;
|
self.block_numbers.insert(*block_num, *block_hash).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if blockchain.contains_node(*block_hash) {
|
if blockchain.contains_node(*block_hash) {
|
||||||
// // this hash is already included. we must have hit that race condition
|
// this hash is already included
|
||||||
// // return now since this work was already done.
|
// return now since this work was already done.
|
||||||
// return Ok(());
|
return Ok(());
|
||||||
// }
|
}
|
||||||
|
|
||||||
// TODO: theres a small race between contains_key and insert
|
|
||||||
self.block_hashes.insert(*block_hash, block.clone()).await;
|
|
||||||
|
|
||||||
// TODO: prettier log? or probably move the log somewhere else
|
// TODO: prettier log? or probably move the log somewhere else
|
||||||
trace!(%block_hash, "new block");
|
trace!(%block_hash, "new block");
|
||||||
|
|
||||||
|
// TODO: theres a small race between contains_key and insert
|
||||||
|
self.block_hashes
|
||||||
|
.insert(*block_hash, block.to_owned())
|
||||||
|
.await;
|
||||||
|
|
||||||
blockchain.add_node(*block_hash);
|
blockchain.add_node(*block_hash);
|
||||||
|
|
||||||
// what should edge weight be? and should the nodes be the blocks instead?
|
// what should edge weight be? and should the nodes be the blocks instead?
|
||||||
@ -134,7 +134,7 @@ impl Web3Connections {
|
|||||||
let block = Arc::new(block);
|
let block = Arc::new(block);
|
||||||
|
|
||||||
// the block was fetched using eth_getBlockByHash, so it should have all fields
|
// the block was fetched using eth_getBlockByHash, so it should have all fields
|
||||||
self.save_block(&block, None).await?;
|
self.save_block(&block, true).await?;
|
||||||
|
|
||||||
Ok(block)
|
Ok(block)
|
||||||
}
|
}
|
||||||
@ -192,7 +192,7 @@ impl Web3Connections {
|
|||||||
let block = Arc::new(block);
|
let block = Arc::new(block);
|
||||||
|
|
||||||
// the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain
|
// the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain
|
||||||
self.save_block(&block, Some(true)).await?;
|
self.save_block(&block, true).await?;
|
||||||
|
|
||||||
Ok(block)
|
Ok(block)
|
||||||
}
|
}
|
||||||
@ -247,10 +247,10 @@ impl Web3Connections {
|
|||||||
|
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
connection_heads.insert(rpc.name.to_owned(), rpc_head_hash);
|
|
||||||
|
|
||||||
// we don't know if its on the heaviest chain yet
|
// we don't know if its on the heaviest chain yet
|
||||||
self.save_block(&rpc_head_block, Some(false)).await?;
|
self.save_block(&rpc_head_block, false).await?;
|
||||||
|
|
||||||
|
connection_heads.insert(rpc.name.to_owned(), rpc_head_hash);
|
||||||
|
|
||||||
Some(BlockId {
|
Some(BlockId {
|
||||||
hash: rpc_head_hash,
|
hash: rpc_head_hash,
|
||||||
@ -401,7 +401,7 @@ impl Web3Connections {
|
|||||||
None => {
|
None => {
|
||||||
debug!(block=%heavy_block_id, %rpc, "first consensus head");
|
debug!(block=%heavy_block_id, %rpc, "first consensus head");
|
||||||
|
|
||||||
self.save_block(&rpc_head_block, Some(true)).await?;
|
self.save_block(&rpc_head_block, true).await?;
|
||||||
|
|
||||||
head_block_sender.send(heavy_block)?;
|
head_block_sender.send(heavy_block)?;
|
||||||
}
|
}
|
||||||
@ -417,7 +417,7 @@ impl Web3Connections {
|
|||||||
info!(heavy=%heavy_block_id, old=%old_block_id, %rpc, "unc block");
|
info!(heavy=%heavy_block_id, old=%old_block_id, %rpc, "unc block");
|
||||||
|
|
||||||
// todo!("handle equal by updating the cannonical chain");
|
// todo!("handle equal by updating the cannonical chain");
|
||||||
self.save_block(&rpc_head_block, Some(true)).await?;
|
self.save_block(&rpc_head_block, true).await?;
|
||||||
|
|
||||||
head_block_sender.send(heavy_block)?;
|
head_block_sender.send(heavy_block)?;
|
||||||
}
|
}
|
||||||
@ -427,7 +427,7 @@ impl Web3Connections {
|
|||||||
// TODO: better log
|
// TODO: better log
|
||||||
warn!(head=%heavy_block_id, %rpc, "chain rolled back");
|
warn!(head=%heavy_block_id, %rpc, "chain rolled back");
|
||||||
|
|
||||||
self.save_block(&rpc_head_block, Some(true)).await?;
|
self.save_block(&rpc_head_block, true).await?;
|
||||||
|
|
||||||
// todo!("handle less by removing higher blocks from the cannonical chain");
|
// todo!("handle less by removing higher blocks from the cannonical chain");
|
||||||
head_block_sender.send(heavy_block)?;
|
head_block_sender.send(heavy_block)?;
|
||||||
@ -437,7 +437,7 @@ impl Web3Connections {
|
|||||||
|
|
||||||
// todo!("handle greater by adding this block to and any missing parents to the cannonical chain");
|
// todo!("handle greater by adding this block to and any missing parents to the cannonical chain");
|
||||||
|
|
||||||
self.save_block(&rpc_head_block, Some(true)).await?;
|
self.save_block(&rpc_head_block, true).await?;
|
||||||
|
|
||||||
head_block_sender.send(heavy_block)?;
|
head_block_sender.send(heavy_block)?;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user