diff --git a/config/example.toml b/config/example.toml index eacd59ee..79dc8f1b 100644 --- a/config/example.toml +++ b/config/example.toml @@ -1,7 +1,7 @@ [shared] chain_id = 1 db_url = "mysql://root:dev_web3_proxy@dev-db:3306/dev_web3_proxy" -min_synced_soft_limit = 2000 +min_sum_soft_limit = 2000 min_synced_rpcs = 2 redis_url = "redis://dev-redis:6379/" redirect_public_url = "https://llamanodes.com/free-rpc-stats" diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 7d62c439..715f57d1 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -255,6 +255,7 @@ impl Web3ProxyApp { redis_pool.clone(), block_map.clone(), Some(head_block_sender), + top_config.app.min_sum_soft_limit, top_config.app.min_synced_rpcs, Some(pending_tx_sender.clone()), pending_transactions.clone(), @@ -279,6 +280,7 @@ impl Web3ProxyApp { None, // minimum doesn't really matter on private rpcs 1, + 1, // TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits None, pending_transactions.clone(), diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index 1bb57f47..3bb197b2 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -217,6 +217,7 @@ mod tests { db_url: None, invite_code: None, redis_url: None, + min_sum_soft_limit: 1, min_synced_rpcs: 1, public_rate_limit_per_minute: 0, response_cache_max_bytes: 10_usize.pow(7), diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index d85fbf39..975821a8 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -45,8 +45,10 @@ pub struct AppConfig { pub chain_id: u64, pub db_url: Option, pub invite_code: Option, + #[serde(default = "default_min_sum_soft_limit")] + pub min_sum_soft_limit: u32, #[serde(default = "default_min_synced_rpcs")] - pub min_synced_rpcs: usize, + pub min_synced_rpcs: u32, pub redis_url: Option, #[serde(default = "default_public_rate_limit_per_minute")] pub public_rate_limit_per_minute: u64, @@ -58,10 +60,15 @@ pub struct AppConfig { pub redirect_user_url: String, } -fn default_min_synced_rpcs() -> usize { +fn default_min_sum_soft_limit() -> u32 { 1 } +fn default_min_synced_rpcs() -> u32 { + 1 +} + +/// 0 blocks public requests by default. fn default_public_rate_limit_per_minute() -> u64 { 0 } diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index ae73d537..138af61d 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -7,7 +7,7 @@ use crate::{ }; use dashmap::mapref::one::Ref; use derive_more::From; -use ethers::prelude::{Block, TxHash, H256, U256, U64}; +use ethers::prelude::{Block, TxHash, H256, U64}; use hashbrown::{HashMap, HashSet}; use petgraph::algo::all_simple_paths; use serde_json::json; @@ -15,46 +15,13 @@ use std::sync::Arc; use tokio::sync::{broadcast, watch}; use tracing::{debug, info, trace, warn}; +/// A block's hash and number. #[derive(Default, From)] pub struct BlockId { pub(super) hash: H256, pub(super) num: U64, } -/// TODO: do we need this? probably big refactor still to do -pub(super) struct BlockMetadata<'a> { - pub(super) block: &'a Arc>, - pub(super) rpc_names: Vec<&'a str>, - pub(super) sum_soft_limit: u32, -} - -impl<'a> BlockMetadata<'a> { - // TODO: there are sortable traits, but this seems simpler - /// sort the blocks in descending height - pub fn sortable_values(&self) -> (&U64, &u32, &U256, &H256) { - // trace!(?self.block, ?self.conns); - - // first we care about the block number - let block_num = self.block.number.as_ref().unwrap(); - - // if block_num ties, the block with the highest total difficulty *should* be the winner - // TODO: sometimes i see a block with no total difficulty. websocket subscription doesn't get everything - // let total_difficulty = self.block.total_difficulty.as_ref().expect("wat"); - - // all the nodes should already be doing this fork priority logic themselves - // so, it should be safe to just look at whatever our node majority thinks and go with that - let sum_soft_limit = &self.sum_soft_limit; - - let difficulty = &self.block.difficulty; - - // if we are still tied (unlikely). this will definitely break the tie - // TODO: what does geth do? - let block_hash = self.block.hash.as_ref().unwrap(); - - (block_num, sum_soft_limit, difficulty, block_hash) - } -} - impl Web3Connections { /// add a block to our map and it's hash to our graphmap of the blockchain pub fn save_block(&self, block: &Arc>) -> anyhow::Result<()> { @@ -231,26 +198,26 @@ impl Web3Connections { async fn process_block_from_rpc( &self, connection_heads: &mut HashMap, - new_block: Arc>, + rpc_head_block: Arc>, rpc: Arc, head_block_sender: &watch::Sender>>, pending_tx_sender: &Option>, ) -> anyhow::Result<()> { // add the block to connection_heads - match (new_block.hash, new_block.number) { - (Some(hash), Some(num)) => { - if num == U64::zero() { + match (rpc_head_block.hash, rpc_head_block.number) { + (Some(rpc_head_hash), Some(rpc_head_num)) => { + if rpc_head_num == U64::zero() { debug!(%rpc, "still syncing"); connection_heads.remove(&rpc.name); } else { - connection_heads.insert(rpc.name.clone(), hash); + connection_heads.insert(rpc.name.to_owned(), rpc_head_hash); - self.save_block(&new_block)?; + self.save_block(&rpc_head_block)?; } } _ => { - warn!(%rpc, ?new_block, "Block without number or hash!"); + warn!(%rpc, ?rpc_head_block, "Block without number or hash!"); connection_heads.remove(&rpc.name); @@ -282,11 +249,6 @@ impl Web3Connections { // clone to release the read lock let highest_work_block = highest_work_block.map(|x| x.clone()); - // TODO: default min_soft_limit? without, we start serving traffic at the start too quickly - // let min_sum_soft_limit = total_soft_limit / 2; - // TODO: this should be configurable - let min_sum_soft_limit = 1; - let mut highest_work_block = match highest_work_block { None => todo!("no servers are in sync"), Some(highest_work_block) => highest_work_block, @@ -297,7 +259,7 @@ impl Web3Connections { // track rpcs so we can build a new SyncedConnections let mut consensus_rpcs: Vec<&Arc> = vec![]; // a running total of the soft limits covered by the rpcs - let mut consensus_soft_limit = 0; + let mut consensus_sum_soft_limit: u32 = 0; // check the highest work block and its parents for a set of rpcs that can serve our request load // TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind @@ -316,7 +278,7 @@ impl Web3Connections { if let Some(rpc) = self.conns.get(rpc_name) { consensus_names.insert(rpc_name); consensus_rpcs.push(rpc); - consensus_soft_limit += rpc.soft_limit; + consensus_sum_soft_limit += rpc.soft_limit; } continue; } @@ -338,18 +300,20 @@ impl Web3Connections { if is_connected { if let Some(rpc) = self.conns.get(rpc_name) { consensus_rpcs.push(rpc); - consensus_soft_limit += rpc.soft_limit; + consensus_sum_soft_limit += rpc.soft_limit; } } } - if consensus_soft_limit >= min_sum_soft_limit { + // TODO: min_sum_soft_limit as a percentage of total_soft_limit? + // let min_sum_soft_limit = total_soft_limit / self.min_sum_soft_limit; + if consensus_sum_soft_limit >= self.min_sum_soft_limit { // success! this block has enough nodes on it break; } // else, we need to try the parent block - trace!(%consensus_soft_limit, ?highest_work_hash, "avoiding thundering herd"); + trace!(%consensus_sum_soft_limit, ?highest_work_hash, "avoiding thundering herd"); // // TODO: this automatically queries for parents, but need to rearrange lifetimes to make an await work here // highest_work_block = self @@ -362,9 +326,9 @@ impl Web3Connections { None => { warn!( "ran out of parents to check. soft limit only {}/{}: {}%", - consensus_soft_limit, - min_sum_soft_limit, - consensus_soft_limit * 100 / min_sum_soft_limit + consensus_sum_soft_limit, + self.min_sum_soft_limit, + consensus_sum_soft_limit * 100 / self.min_sum_soft_limit ); break; } @@ -376,8 +340,8 @@ impl Web3Connections { // unlock self.blockchain_graphmap drop(blockchain_guard); - let soft_limit_met = consensus_soft_limit >= min_sum_soft_limit; - let num_synced_rpcs = consensus_rpcs.len(); + let soft_limit_met = consensus_sum_soft_limit >= self.min_sum_soft_limit; + let num_synced_rpcs = consensus_rpcs.len() as u32; let new_synced_connections = if soft_limit_met { // we have a consensus large enough to serve traffic @@ -411,27 +375,31 @@ impl Web3Connections { SyncedConnections::default() }; - let new_head_hash = new_synced_connections.head_block_hash; - let new_head_num = new_synced_connections.head_block_num; + let consensus_block_hash = new_synced_connections.head_block_hash; + let consensus_block_num = new_synced_connections.head_block_num; let new_synced_connections = Arc::new(new_synced_connections); let num_connection_heads = connection_heads.len(); + let total_rpcs = self.conns.len(); let old_synced_connections = self.synced_connections.swap(new_synced_connections); let old_head_hash = old_synced_connections.head_block_hash; - let total_rpcs = self.conns.len(); - if new_head_hash == old_head_hash { - trace!(hash=%new_head_hash, num=%new_head_num, limit=%consensus_soft_limit, "cur consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs); + if Some(consensus_block_hash) != rpc_head_block.hash { + info!("non consensus block") + } + + if consensus_block_hash == old_head_hash { + debug!(hash=%consensus_block_hash, num=%consensus_block_num, limit=%consensus_sum_soft_limit, "cur consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs); } else if soft_limit_met { // TODO: if new's parent is not old, warn? - debug!(hash=%new_head_hash, num=%new_head_num, limit=%consensus_soft_limit, "new consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs); + debug!(hash=%consensus_block_hash, num=%consensus_block_num, limit=%consensus_sum_soft_limit, "new consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs); // the head hash changed. forward to any subscribers head_block_sender.send(highest_work_block)?; } else { - warn!(?soft_limit_met, %new_head_hash, %old_head_hash, "no consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs) + warn!(?soft_limit_met, %consensus_block_hash, %old_head_hash, "no consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs) } Ok(()) diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index ab853365..1dc65722 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -45,7 +45,8 @@ pub struct Web3Connections { /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// TODO: what should we use for edges? pub(super) blockchain_graphmap: RwLock>, - pub(super) min_synced_rpcs: usize, + pub(super) min_synced_rpcs: u32, + pub(super) min_sum_soft_limit: u32, } impl Web3Connections { @@ -58,7 +59,8 @@ impl Web3Connections { redis_client_pool: Option, block_map: BlockMap, head_block_sender: Option>>>, - min_synced_rpcs: usize, + min_sum_soft_limit: u32, + min_synced_rpcs: u32, pending_tx_sender: Option>, pending_transactions: Arc>, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { @@ -171,6 +173,7 @@ impl Web3Connections { pending_transactions, block_map: Default::default(), blockchain_graphmap: Default::default(), + min_sum_soft_limit, min_synced_rpcs, });