almost done with heaviest chain tracking

This commit is contained in:
Bryan Stitt 2022-09-02 05:40:56 +00:00
parent 6450a4cd42
commit 2606844c61
8 changed files with 235 additions and 179 deletions

4
Cargo.lock generated

@ -4403,9 +4403,9 @@ checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]] [[package]]
name = "siwe" name = "siwe"
version = "0.4.1" version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a35b706108fb328661325c3b6882f6b1d62da47d533da0b87fca36ac769877db" checksum = "f24fe2b646c33a670e7d79a232bffb41821fed28b1870a8bd1a47e6ae686ace6"
dependencies = [ dependencies = [
"hex", "hex",
"http", "http",

@ -48,7 +48,7 @@ regex = "1.6.0"
reqwest = { version = "0.11.11", default-features = false, features = ["json", "tokio-rustls"] } reqwest = { version = "0.11.11", default-features = false, features = ["json", "tokio-rustls"] }
handlebars = "4.3.3" handlebars = "4.3.3"
rustc-hash = "1.1.0" rustc-hash = "1.1.0"
siwe = "0.4.1" siwe = "0.4.2"
sea-orm = { version = "0.9.2", features = ["macros"] } sea-orm = { version = "0.9.2", features = ["macros"] }
serde = { version = "1.0.144", features = [] } serde = { version = "1.0.144", features = [] }
serde_json = { version = "1.0.85", default-features = false, features = ["alloc", "raw_value"] } serde_json = { version = "1.0.85", default-features = false, features = ["alloc", "raw_value"] }

@ -146,18 +146,6 @@ pub async fn get_migrated_db(
} }
impl Web3ProxyApp { impl Web3ProxyApp {
pub async fn redis_conn(&self) -> anyhow::Result<PooledConnection<RedisConnectionManager>> {
match self.redis_pool.as_ref() {
None => Err(anyhow::anyhow!("no redis server configured")),
Some(redis_pool) => {
let redis_conn = redis_pool.get().await?;
Ok(redis_conn)
}
}
}
// TODO: should we just take the rpc config as the only arg instead?
pub async fn spawn( pub async fn spawn(
app_stats: AppStats, app_stats: AppStats,
top_config: TopConfig, top_config: TopConfig,
@ -553,7 +541,6 @@ impl Web3ProxyApp {
Ok(response) Ok(response)
} }
// #[instrument(skip_all)]
async fn proxy_web3_rpc_requests( async fn proxy_web3_rpc_requests(
&self, &self,
requests: Vec<JsonRpcRequest>, requests: Vec<JsonRpcRequest>,
@ -580,6 +567,17 @@ impl Web3ProxyApp {
Ok(collected) Ok(collected)
} }
pub async fn redis_conn(&self) -> anyhow::Result<PooledConnection<RedisConnectionManager>> {
match self.redis_pool.as_ref() {
None => Err(anyhow::anyhow!("no redis server configured")),
Some(redis_pool) => {
let redis_conn = redis_pool.get().await?;
Ok(redis_conn)
}
}
}
async fn cached_response( async fn cached_response(
&self, &self,
// TODO: accept a block hash here also? // TODO: accept a block hash here also?
@ -620,13 +618,11 @@ impl Web3ProxyApp {
trace!(?request.method, "cache miss!"); trace!(?request.method, "cache miss!");
} }
// TODO: multiple caches. if head_block_hash is None, have a persistent cache (disk backed?)
let cache = &self.response_cache; let cache = &self.response_cache;
Ok((key, Err(cache))) Ok((key, Err(cache)))
} }
// #[instrument(skip_all)]
async fn proxy_web3_rpc_request( async fn proxy_web3_rpc_request(
&self, &self,
mut request: JsonRpcRequest, mut request: JsonRpcRequest,

@ -65,8 +65,6 @@ fn run(
// start tokio's async runtime // start tokio's async runtime
let rt = rt_builder.build()?; let rt = rt_builder.build()?;
// we use this worker count to also set our redis connection pool size
// TODO: think about this more
let num_workers = rt.metrics().num_workers(); let num_workers = rt.metrics().num_workers();
debug!(?num_workers); debug!(?num_workers);

@ -51,7 +51,7 @@ pub struct AppConfig {
#[serde(default = "default_min_sum_soft_limit")] #[serde(default = "default_min_sum_soft_limit")]
pub min_sum_soft_limit: u32, pub min_sum_soft_limit: u32,
#[serde(default = "default_min_synced_rpcs")] #[serde(default = "default_min_synced_rpcs")]
pub min_synced_rpcs: u32, pub min_synced_rpcs: usize,
pub redis_url: Option<String>, pub redis_url: Option<String>,
#[serde(default = "default_public_rate_limit_per_minute")] #[serde(default = "default_public_rate_limit_per_minute")]
pub public_rate_limit_per_minute: u64, pub public_rate_limit_per_minute: u64,
@ -72,7 +72,7 @@ fn default_min_sum_soft_limit() -> u32 {
1 1
} }
fn default_min_synced_rpcs() -> u32 { fn default_min_synced_rpcs() -> usize {
1 1
} }

@ -85,7 +85,7 @@ pub async fn get_login(
// TODO: if no redis server, store in local cache? // TODO: if no redis server, store in local cache?
// the address isn't enough. we need to save the actual message so we can read the nonce // the address isn't enough. we need to save the actual message so we can read the nonce
// TODO: what message format is the most efficient to store in redis? probably eip191_string // TODO: what message format is the most efficient to store in redis? probably eip191_bytes
// we add 1 to expire_seconds just to be sure redis has the key for the full expiration_time // we add 1 to expire_seconds just to be sure redis has the key for the full expiration_time
app.redis_conn() app.redis_conn()
.await? .await?
@ -100,7 +100,7 @@ pub async fn get_login(
let message: String = match message_eip.as_str() { let message: String = match message_eip.as_str() {
"eip4361" => message.to_string(), "eip4361" => message.to_string(),
// https://github.com/spruceid/siwe/issues/98 // https://github.com/spruceid/siwe/issues/98
"eip191_string" => Bytes::from(message.eip191_string().unwrap()).to_string(), "eip191_bytes" => Bytes::from(message.eip191_bytes().unwrap()).to_string(),
"eip191_hash" => Bytes::from(&message.eip191_hash().unwrap()).to_string(), "eip191_hash" => Bytes::from(&message.eip191_hash().unwrap()).to_string(),
_ => return Err(anyhow::anyhow!("invalid message eip given").into()), _ => return Err(anyhow::anyhow!("invalid message eip given").into()),
}; };

@ -13,7 +13,6 @@ use dashmap::{
use derive_more::From; use derive_more::From;
use ethers::prelude::{Block, TxHash, H256, U64}; use ethers::prelude::{Block, TxHash, H256, U64};
use hashbrown::{HashMap, HashSet}; use hashbrown::{HashMap, HashSet};
use petgraph::algo::all_simple_paths;
use serde::Serialize; use serde::Serialize;
use serde_json::json; use serde_json::json;
use std::{cmp::Ordering, fmt::Display, sync::Arc}; use std::{cmp::Ordering, fmt::Display, sync::Arc};
@ -39,7 +38,7 @@ impl Display for BlockId {
impl Web3Connections { impl Web3Connections {
/// add a block to our map and it's hash to our graphmap of the blockchain /// add a block to our map and it's hash to our graphmap of the blockchain
pub fn save_block(&self, block: &ArcBlock) -> anyhow::Result<()> { pub fn save_block(&self, block: &ArcBlock, heaviest_chain: bool) -> anyhow::Result<()> {
let block_hash = block.hash.as_ref().context("no block hash")?; let block_hash = block.hash.as_ref().context("no block hash")?;
let block_num = block.number.as_ref().context("no block num")?; let block_num = block.number.as_ref().context("no block num")?;
let _block_td = block let _block_td = block
@ -47,6 +46,22 @@ impl Web3Connections {
.as_ref() .as_ref()
.context("no block total difficulty")?; .context("no block total difficulty")?;
if heaviest_chain {
match self.block_numbers.entry(*block_num) {
Entry::Occupied(mut x) => {
let old = x.insert(*block_hash);
// TODO: what should we do?
warn!(
"do something with the old hash. we may need to update a bunch more block numbers"
)
}
Entry::Vacant(x) => {
x.insert(*block_hash);
}
}
}
if self.block_hashes.contains_key(block_hash) { if self.block_hashes.contains_key(block_hash) {
// this block is already included. no need to continue // this block is already included. no need to continue
return Ok(()); return Ok(());
@ -68,19 +83,6 @@ impl Web3Connections {
return Ok(()); return Ok(());
} }
match self.block_numbers.entry(*block_num) {
Entry::Occupied(mut x) => {
let old = x.insert(*block_hash);
todo!(
"do something with the old hash. we need to update a bunch more block numbers"
)
}
Entry::Vacant(x) => {
x.insert(*block_hash);
}
}
// TODO: prettier log? or probably move the log somewhere else // TODO: prettier log? or probably move the log somewhere else
trace!(%block_hash, "new block"); trace!(%block_hash, "new block");
@ -110,13 +112,7 @@ impl Web3Connections {
} }
// block not in cache. we need to ask an rpc for it // block not in cache. we need to ask an rpc for it
// TODO: helper for method+params => JsonRpcRequest
// TODO: get block with the transactions?
// TODO: does this id matter?
let request_params = (hash, false); let request_params = (hash, false);
// TODO: if error, retry? // TODO: if error, retry?
let block: Block<TxHash> = match rpc { let block: Block<TxHash> = match rpc {
Some(rpc) => { Some(rpc) => {
@ -126,6 +122,8 @@ impl Web3Connections {
.await? .await?
} }
None => { None => {
// TODO: helper for method+params => JsonRpcRequest
// TODO: does this id matter?
let request = let request =
json!({ "id": "1", "method": "eth_getBlockByHash", "params": request_params }); json!({ "id": "1", "method": "eth_getBlockByHash", "params": request_params });
let request: JsonRpcRequest = serde_json::from_value(request)?; let request: JsonRpcRequest = serde_json::from_value(request)?;
@ -141,7 +139,10 @@ impl Web3Connections {
let block = Arc::new(block); let block = Arc::new(block);
// the block was fetched using eth_getBlockByHash, so it should have all fields // the block was fetched using eth_getBlockByHash, so it should have all fields
self.save_block(&block)?; // TODO: how should we set this? all_simple_paths on the map?
let heaviest_chain = false;
self.save_block(&block, heaviest_chain)?;
Ok(block) Ok(block)
} }
@ -202,8 +203,10 @@ impl Web3Connections {
let block = Arc::new(block); let block = Arc::new(block);
// the block was fetched using eth_getBlockByNumber, so it should have all fields // the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain
self.save_block(&block)?; let heaviest_chain = true;
self.save_block(&block, heaviest_chain)?;
Ok(block) Ok(block)
} }
@ -259,7 +262,8 @@ impl Web3Connections {
} else { } else {
connection_heads.insert(rpc.name.to_owned(), rpc_head_hash); connection_heads.insert(rpc.name.to_owned(), rpc_head_hash);
self.save_block(&rpc_head_block)?; // we don't know if its on the heaviest chain yet
self.save_block(&rpc_head_block, false)?;
Some(BlockId { Some(BlockId {
hash: rpc_head_hash, hash: rpc_head_hash,
@ -277,27 +281,27 @@ impl Web3Connections {
} }
}; };
// iterate the rpc_map to find the highest_work_block // iterate the known heads to find the highest_work_block
let mut checked_heads = HashSet::new(); let mut checked_heads = HashSet::new();
let mut highest_work_block: Option<Ref<H256, ArcBlock>> = None; let mut highest_work_block: Option<Ref<H256, ArcBlock>> = None;
for rpc_head_hash in connection_heads.values() { for rpc_head_hash in connection_heads.values() {
if checked_heads.contains(rpc_head_hash) { if checked_heads.contains(rpc_head_hash) {
// we already checked this head from another rpc
continue; continue;
} }
// don't check the same hash multiple times
checked_heads.insert(rpc_head_hash); checked_heads.insert(rpc_head_hash);
let rpc_head_block = self.block_hashes.get(rpc_head_hash).unwrap(); let rpc_head_block = self.block_hashes.get(rpc_head_hash).unwrap();
match &rpc_head_block.total_difficulty { match &rpc_head_block.total_difficulty {
None => { None => {
// no total difficulty // no total difficulty. this is a bug
// TODO: should we fetch the block here? I think this shouldn't happen unimplemented!("block is missing total difficulty");
warn!(?rpc, %rpc_head_hash, "block is missing total difficulty");
continue;
} }
Some(td) => { Some(td) => {
// if this is the first block we've tried
// or if this rpc's newest block has a higher total difficulty
if highest_work_block.is_none() if highest_work_block.is_none()
|| td || td
> highest_work_block > highest_work_block
@ -313,100 +317,149 @@ impl Web3Connections {
} }
} }
// clone to release the read lock // clone to release the read lock on self.block_hashes
let highest_work_block = highest_work_block.map(|x| x.clone()); if let Some(mut maybe_head_block) = highest_work_block.map(|x| x.clone()) {
// track rpcs on this heaviest chain so we can build a new SyncedConnections
let mut highest_work_block = match highest_work_block {
None => todo!("no servers are in sync"),
Some(highest_work_block) => highest_work_block,
};
// track names so we don't check the same node multiple times
let mut heavy_names: HashSet<&String> = HashSet::new();
// track rpcs so we can build a new SyncedConnections
let mut heavy_rpcs: Vec<&Arc<Web3Connection>> = vec![]; let mut heavy_rpcs: Vec<&Arc<Web3Connection>> = vec![];
// a running total of the soft limits covered by the rpcs // a running total of the soft limits covered by the heavy rpcs
let mut heavy_sum_soft_limit: u32 = 0; let mut heavy_sum_soft_limit: u32 = 0;
// TODO: also track heavy_sum_hard_limit?
// check the highest work block and its parents for a set of rpcs that can serve our request load // check the highest work block for a set of rpcs that can serve our request load
// TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind // if it doesn't have enough rpcs for our request load, check the parent block
let blockchain_guard = self.blockchain_graphmap.read(); // TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind. probably different per chain
// TODO: this loop is pretty long. any way to clean up this code?
for _ in 0..3 { for _ in 0..3 {
let highest_work_hash = highest_work_block.hash.as_ref().unwrap(); let maybe_head_hash = maybe_head_block
.hash
.as_ref()
.expect("blocks here always need hashes");
for (rpc_name, rpc_head_hash) in connection_heads.iter() { // find all rpcs with maybe_head_block as their current head
if heavy_names.contains(rpc_name) { for (conn_name, conn_head_hash) in connection_heads.iter() {
// this block is already included if conn_head_hash != maybe_head_hash {
continue; continue;
} }
// TODO: does all_simple_paths make this check? if let Some(rpc) = self.conns.get(conn_name) {
if rpc_head_hash == highest_work_hash {
if let Some(rpc) = self.conns.get(rpc_name) {
heavy_names.insert(rpc_name);
heavy_rpcs.push(rpc); heavy_rpcs.push(rpc);
heavy_sum_soft_limit += rpc.soft_limit; heavy_sum_soft_limit += rpc.soft_limit;
} else {
warn!("connection missing")
} }
}
if heavy_sum_soft_limit < self.min_sum_soft_limit
|| heavy_rpcs.len() < self.min_synced_rpcs
{
// not enough rpcs yet. check the parent
if let Some(parent_block) = self.block_hashes.get(&maybe_head_block.parent_hash)
{
trace!(
child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd",
);
maybe_head_block = parent_block.clone();
continue; continue;
} } else {
// TODO: cache all_simple_paths. there should be a high hit rate
// TODO: use an algo that saves scratch space?
// TODO: how slow is this?
let is_connected = all_simple_paths::<Vec<H256>, _>(
&*blockchain_guard,
*highest_work_hash,
*rpc_head_hash,
0,
// TODO: what should this max be? probably configurable per chain
Some(10),
)
.next()
.is_some();
if is_connected {
if let Some(rpc) = self.conns.get(rpc_name) {
heavy_rpcs.push(rpc);
heavy_sum_soft_limit += rpc.soft_limit;
}
}
}
// TODO: min_sum_soft_limit as a percentage of total_soft_limit?
// let min_sum_soft_limit = total_soft_limit / self.min_sum_soft_limit;
if heavy_sum_soft_limit >= self.min_sum_soft_limit {
// success! this block has enough nodes on it
break;
}
// else, we need to try the parent block
trace!(%heavy_sum_soft_limit, ?highest_work_hash, "avoiding thundering herd");
// // TODO: this automatically queries for parents, but need to rearrange lifetimes to make an await work here
// highest_work_block = self
// .block(&highest_work_block.parent_hash, Some(&rpc))
// .await?;
// we give known stale data just because we don't have enough capacity to serve the latest.
// TODO: maybe we should delay serving requests if this happens.
// TODO: don't unwrap. break if none?
match self.block_hashes.get(&highest_work_block.parent_hash) {
None => {
warn!( warn!(
"ran out of parents to check. soft limit only {}/{}: {}%", "no parent to check. soft limit only {}/{} from {}/{} rpcs: {}%",
heavy_sum_soft_limit, heavy_sum_soft_limit,
self.min_sum_soft_limit, self.min_sum_soft_limit,
heavy_rpcs.len(),
self.min_synced_rpcs,
heavy_sum_soft_limit * 100 / self.min_sum_soft_limit heavy_sum_soft_limit * 100 / self.min_sum_soft_limit
); );
break; break;
} }
Some(parent_block) => {
highest_work_block = parent_block.clone();
} }
}
}
// unlock self.blockchain_graphmap
drop(blockchain_guard);
// success! this block has enough soft limit and nodes on it (or on later blocks)
let conns = heavy_rpcs.into_iter().cloned().collect();
let heavy_block = maybe_head_block;
let heavy_hash = heavy_block.hash.expect("head blocks always have hashes");
let heavy_num = heavy_block.number.expect("head blocks always have numbers");
debug_assert_ne!(heavy_num, U64::zero());
let heavy_block_id = BlockId {
hash: heavy_hash,
num: heavy_num,
};
let new_synced_connections = SyncedConnections {
head_block_id: Some(heavy_block_id.clone()),
conns,
};
let old_synced_connections = self
.synced_connections
.swap(Arc::new(new_synced_connections));
let num_connection_heads = connection_heads.len();
let total_conns = self.conns.len();
// TODO: if the rpc_head_block != heavy, log something somewhere in here
match &old_synced_connections.head_block_id {
None => {
debug!(block=%heavy_block_id, %rpc, "first consensus head");
head_block_sender.send(heavy_block)?;
}
Some(old_block_id) => {
match heavy_block_id.num.cmp(&old_block_id.num) {
Ordering::Equal => {
// multiple blocks with the same fork!
if heavy_block_id.hash == old_block_id.hash {
// no change in hash. no need to use head_block_sender
debug!(heavy=%heavy_block_id, %rpc, "consensus head block")
} else {
// hash changed
// TODO: better log
warn!(heavy=%heavy_block_id, %rpc, "fork detected");
// todo!("handle equal by updating the cannonical chain");
head_block_sender.send(heavy_block)?;
}
}
Ordering::Less => {
// this is unlikely but possible
// TODO: better log
debug!("chain rolled back");
// todo!("handle less by removing higher blocks from the cannonical chain");
head_block_sender.send(heavy_block)?;
}
Ordering::Greater => {
debug!(heavy=%heavy_block_id, %rpc, "new head block");
// todo!("handle greater by adding this block to and any missing parents to the cannonical chain");
head_block_sender.send(heavy_block)?;
}
}
}
}
return Ok(());
}
// if we get here, something is wrong. clear synced connections
let empty_synced_connections = SyncedConnections::default();
let old_synced_connections = self
.synced_connections
.swap(Arc::new(empty_synced_connections));
// TODO: log different things depending on old_synced_connections
}
return Ok(());
todo!("double check everything under this");
/*
let soft_limit_met = heavy_sum_soft_limit >= self.min_sum_soft_limit; let soft_limit_met = heavy_sum_soft_limit >= self.min_sum_soft_limit;
let num_synced_rpcs = heavy_rpcs.len() as u32; let num_synced_rpcs = heavy_rpcs.len() as u32;
@ -419,11 +472,10 @@ impl Web3Connections {
// TODO: warn is too loud. if we are first starting, this is expected to happen // TODO: warn is too loud. if we are first starting, this is expected to happen
warn!(hash=%head_block_hash, num=?head_block_num, "not enough rpcs are synced to advance"); warn!(hash=%head_block_hash, num=?head_block_num, "not enough rpcs are synced to advance");
SyncedConnections::default() None
} else { } else {
// TODO: wait until at least most of the rpcs have given their initial block? // TODO: wait until at least most of the rpcs have given their initial block?
// otherwise, if there is a syncing node that is fast, our first head block might not be good // otherwise, if there is a syncing node that is fast, our first head block might not be good
// TODO: have a configurable "minimum rpcs" number that we can set
// TODO: sort by weight and soft limit? do we need an IndexSet, or is a Vec fine? // TODO: sort by weight and soft limit? do we need an IndexSet, or is a Vec fine?
let conns = heavy_rpcs.into_iter().cloned().collect(); let conns = heavy_rpcs.into_iter().cloned().collect();
@ -433,27 +485,30 @@ impl Web3Connections {
num: head_block_num, num: head_block_num,
}; };
SyncedConnections { let new_synced_connections = SyncedConnections {
head_block_id: Some(head_block_id), head_block_id: Some(head_block_id),
conns, conns,
} };
Some(new_synced_connections)
} }
} else { } else {
// failure even after checking parent heads! // failure even after checking parent heads!
// not enough servers are in sync to server traffic // not enough servers are in sync to server traffic
// TODO: at startup this is fine, but later its a problem // TODO: at startup this is fine, but later its a problem
warn!("empty SyncedConnections"); None
SyncedConnections::default()
}; };
if let Some(new_synced_connections) = new_synced_connections {
let heavy_block_id = new_synced_connections.head_block_id.clone(); let heavy_block_id = new_synced_connections.head_block_id.clone();
let new_synced_connections = Arc::new(new_synced_connections); let new_synced_connections = Arc::new(new_synced_connections);
let num_connection_heads = connection_heads.len();
let total_conns = self.conns.len();
let old_synced_connections = self.synced_connections.swap(new_synced_connections); let old_synced_connections = self.synced_connections.swap(new_synced_connections);
let num_connection_heads = connection_heads.len();
let total_conns = self.conns.len();
match (&old_synced_connections.head_block_id, &heavy_block_id) { match (&old_synced_connections.head_block_id, &heavy_block_id) {
(None, None) => warn!("no servers synced"), (None, None) => warn!("no servers synced"),
(None, Some(heavy_block_id)) => { (None, Some(heavy_block_id)) => {
@ -461,19 +516,31 @@ impl Web3Connections {
} }
(Some(_), None) => warn!("no longer synced!"), (Some(_), None) => warn!("no longer synced!"),
(Some(old_block_id), Some(heavy_block_id)) => { (Some(old_block_id), Some(heavy_block_id)) => {
debug_assert_ne!(heavy_block_id.num, U64::zero());
match heavy_block_id.num.cmp(&old_block_id.num) { match heavy_block_id.num.cmp(&old_block_id.num) {
Ordering::Equal => { Ordering::Equal => {
todo!("handle equal") // multiple blocks with the same fork!
debug!("fork detected");
todo!("handle equal");
} }
Ordering::Less => { Ordering::Less => {
todo!("handle less") // this seems unlikely
warn!("chain rolled back");
todo!("handle less");
} }
Ordering::Greater => { Ordering::Greater => {
todo!("handle greater") info!(heavy=%heavy_block_id, %rpc, "new head block");
todo!("handle greater");
} }
} }
} }
} }
} else {
todo!()
}
*/
/* /*
if old_synced_connections.head_block_id.is_none() && rpc_head_block.hash.is_some() { if old_synced_connections.head_block_id.is_none() && rpc_head_block.hash.is_some() {
// this is fine. we have our first hash // this is fine. we have our first hash
@ -509,10 +576,5 @@ impl Web3Connections {
warn!(?soft_limit_met, %heavy_block_id, %old_head_hash, %rpc, "NO heavy head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs) warn!(?soft_limit_met, %heavy_block_id, %old_head_hash, %rpc, "NO heavy head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs)
} }
*/ */
// TODO: the head hash changed. forward to any subscribers
head_block_sender.send(highest_work_block)?;
Ok(())
} }
} }

@ -34,7 +34,7 @@ use tracing::{error, info, instrument, trace, warn};
/// A collection of web3 connections. Sends requests either the current best server or all servers. /// A collection of web3 connections. Sends requests either the current best server or all servers.
#[derive(From)] #[derive(From)]
pub struct Web3Connections { pub struct Web3Connections {
pub(super) inner: HashMap<String, Arc<Web3Connection>>, pub(super) conns: HashMap<String, Arc<Web3Connection>>,
/// any requests will be forwarded to one (or more) of these connections /// any requests will be forwarded to one (or more) of these connections
pub(super) synced_connections: ArcSwap<SyncedConnections>, pub(super) synced_connections: ArcSwap<SyncedConnections>,
pub(super) pending_transactions: Arc<DashMap<TxHash, TxStatus>>, pub(super) pending_transactions: Arc<DashMap<TxHash, TxStatus>>,
@ -46,7 +46,7 @@ pub struct Web3Connections {
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
/// TODO: what should we use for edges? /// TODO: what should we use for edges?
pub(super) blockchain_graphmap: RwLock<DiGraphMap<H256, u32>>, pub(super) blockchain_graphmap: RwLock<DiGraphMap<H256, u32>>,
pub(super) min_synced_rpcs: u32, pub(super) min_synced_rpcs: usize,
pub(super) min_sum_soft_limit: u32, pub(super) min_sum_soft_limit: u32,
} }
@ -61,7 +61,7 @@ impl Web3Connections {
block_map: BlockHashesMap, block_map: BlockHashesMap,
head_block_sender: Option<watch::Sender<ArcBlock>>, head_block_sender: Option<watch::Sender<ArcBlock>>,
min_sum_soft_limit: u32, min_sum_soft_limit: u32,
min_synced_rpcs: u32, min_synced_rpcs: usize,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>, pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
pending_transactions: Arc<DashMap<TxHash, TxStatus>>, pending_transactions: Arc<DashMap<TxHash, TxStatus>>,
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> { ) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
@ -169,7 +169,7 @@ impl Web3Connections {
let synced_connections = SyncedConnections::default(); let synced_connections = SyncedConnections::default();
let connections = Arc::new(Self { let connections = Arc::new(Self {
inner: connections, conns: connections,
synced_connections: ArcSwap::new(Arc::new(synced_connections)), synced_connections: ArcSwap::new(Arc::new(synced_connections)),
pending_transactions, pending_transactions,
block_hashes: Default::default(), block_hashes: Default::default(),
@ -199,7 +199,7 @@ impl Web3Connections {
} }
pub fn get(&self, conn_name: &str) -> Option<&Arc<Web3Connection>> { pub fn get(&self, conn_name: &str) -> Option<&Arc<Web3Connection>> {
self.inner.get(conn_name) self.conns.get(conn_name)
} }
/// subscribe to blocks and transactions from all the backend rpcs. /// subscribe to blocks and transactions from all the backend rpcs.
@ -350,7 +350,7 @@ impl Web3Connections {
// TODO: we are going to be checking "has_block_data" a lot now. i think we pretty much always have min_block_needed now that we override "latest" // TODO: we are going to be checking "has_block_data" a lot now. i think we pretty much always have min_block_needed now that we override "latest"
let mut synced_rpcs: Vec<Arc<Web3Connection>> = let mut synced_rpcs: Vec<Arc<Web3Connection>> =
if let Some(min_block_needed) = min_block_needed { if let Some(min_block_needed) = min_block_needed {
self.inner self.conns
.values() .values()
.filter(|x| !skip.contains(x)) .filter(|x| !skip.contains(x))
.filter(|x| x.has_block_data(min_block_needed)) .filter(|x| x.has_block_data(min_block_needed))
@ -436,7 +436,7 @@ impl Web3Connections {
// TODO: with capacity? // TODO: with capacity?
let mut selected_rpcs = vec![]; let mut selected_rpcs = vec![];
for connection in self.inner.values() { for connection in self.conns.values() {
if let Some(min_block_needed) = min_block_needed { if let Some(min_block_needed) = min_block_needed {
if !connection.has_block_data(min_block_needed) { if !connection.has_block_data(min_block_needed) {
continue; continue;
@ -477,7 +477,7 @@ impl Web3Connections {
// TODO: maximum retries? // TODO: maximum retries?
loop { loop {
if skip_rpcs.len() == self.inner.len() { if skip_rpcs.len() == self.conns.len() {
break; break;
} }
match self match self
@ -624,7 +624,7 @@ impl fmt::Debug for Web3Connections {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though // TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("Web3Connections") f.debug_struct("Web3Connections")
.field("conns", &self.inner) .field("conns", &self.conns)
.finish_non_exhaustive() .finish_non_exhaustive()
} }
} }
@ -634,7 +634,7 @@ impl Serialize for Web3Connections {
where where
S: Serializer, S: Serializer,
{ {
let conns: Vec<&Web3Connection> = self.inner.iter().map(|x| x.1.as_ref()).collect(); let conns: Vec<&Web3Connection> = self.conns.iter().map(|x| x.1.as_ref()).collect();
let mut state = serializer.serialize_struct("Web3Connections", 2)?; let mut state = serializer.serialize_struct("Web3Connections", 2)?;
state.serialize_field("conns", &conns)?; state.serialize_field("conns", &conns)?;