From 5b1621ead4d3f2ef9afe658616727cb0d8d9a6ac Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 22 Nov 2022 22:45:22 +0000 Subject: [PATCH] fix bug with not using synced_connections correctly --- TODO.md | 5 +- web3_proxy/src/rpcs/blockchain.rs | 31 ++-- web3_proxy/src/rpcs/connection.rs | 11 +- web3_proxy/src/rpcs/connections.rs | 262 +++++++++++++++++++++++++++-- web3_proxy/src/rpcs/provider.rs | 3 + web3_proxy/src/rpcs/request.rs | 2 + 6 files changed, 270 insertions(+), 44 deletions(-) diff --git a/TODO.md b/TODO.md index 9b8ab3d7..ca3b4ce9 100644 --- a/TODO.md +++ b/TODO.md @@ -243,10 +243,11 @@ These are roughly in order of completition - [x] cache the status page for a second - [x] request accounting for websockets - [x] database merge scripts -- [ ] add block timestamp to the /status page +- [x] test that sets up a Web3Connection and asks "has_block" for old and new blocks +- [x] test that sets up Web3Connections with 2 nodes. one behind by several blocks. and see what the "next" server shows as- [ ] add block timestamp to the /status page - [ ] be sure to save the timestamp in a way that our request routing logic can make use of it - [ ] period_datetime should always be :00. right now it depends on start time -- [ ] two servers running will confuse rpc_accounting! + - [ ] two servers running will confuse rpc_accounting! - one option: we need the insert to be an upsert, but how do we merge historgrams? - [ ] change invite codes to set the user_tier - [ ] period_datetime should always be :00. right now it depends on start time diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 82ee1eb3..dccd18e2 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -246,7 +246,7 @@ impl Web3Connections { /// `connection_heads` is a mapping of rpc_names to head block hashes. /// self.blockchain_map is a mapping of hashes to the complete ArcBlock. /// TODO: return something? - async fn process_block_from_rpc( + pub(crate) async fn process_block_from_rpc( &self, authorization: &Arc, connection_heads: &mut HashMap, @@ -261,24 +261,15 @@ impl Web3Connections { let rpc_head_num = rpc_head_block.number.unwrap(); let rpc_head_hash = rpc_head_block.hash.unwrap(); - if rpc_head_num.is_zero() { - // TODO: i don't think we can get to this anymore now that we use Options - debug!("{} still syncing", rpc); + // we don't know if its on the heaviest chain yet + self.save_block(&rpc_head_block, false).await?; - connection_heads.remove(&rpc.name); + connection_heads.insert(rpc.name.to_owned(), rpc_head_hash); - None - } else { - // we don't know if its on the heaviest chain yet - self.save_block(&rpc_head_block, false).await?; - - connection_heads.insert(rpc.name.to_owned(), rpc_head_hash); - - Some(BlockId { - hash: rpc_head_hash, - num: rpc_head_num, - }) - } + Some(BlockId { + hash: rpc_head_hash, + num: rpc_head_num, + }) } None => { // TODO: warn is too verbose. this is expected if a node disconnects and has to reconnect @@ -384,7 +375,7 @@ impl Web3Connections { } if highest_rpcs_sum_soft_limit < self.min_sum_soft_limit - || highest_rpcs.len() < self.min_synced_rpcs + || highest_rpcs.len() < self.min_head_rpcs { // not enough rpcs yet. check the parent if let Some(parent_block) = self.block_hashes.get(&maybe_head_block.parent_hash) @@ -401,7 +392,7 @@ impl Web3Connections { highest_rpcs_sum_soft_limit, self.min_sum_soft_limit, highest_rpcs.len(), - self.min_synced_rpcs, + self.min_head_rpcs, highest_rpcs_sum_soft_limit * 100 / self.min_sum_soft_limit ); break; @@ -448,8 +439,6 @@ impl Web3Connections { .number .expect("head blocks always have numbers"); - debug_assert_ne!(consensus_head_num, U64::zero()); - let num_consensus_rpcs = conns.len(); let consensus_head_block_id = BlockId { diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index a9a283a6..26136186 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -32,9 +32,9 @@ pub struct Web3Connection { pub name: String, pub display_name: Option, /// TODO: can we get this from the provider? do we even need it? - url: String, + pub(super) url: String, /// Some connections use an http_client. we keep a clone for reconnecting - http_client: Option, + pub(super) http_client: Option, /// keep track of currently open requests. We sort on this pub(super) active_requests: AtomicU32, /// keep track of total requests @@ -46,11 +46,11 @@ pub struct Web3Connection { pub(super) provider: AsyncRwLock>>, /// rate limits are stored in a central redis so that multiple proxies can share their rate limits /// We do not use the deferred rate limiter because going over limits would cause errors - hard_limit: Option, + pub(super) hard_limit: Option, /// used for load balancing to the least loaded server pub(super) soft_limit: u32, /// TODO: have an enum for this so that "no limit" prints pretty? - block_data_limit: AtomicU64, + pub(super) block_data_limit: AtomicU64, /// Lower weight are higher priority when sending requests. 0 to 99. pub(super) weight: f64, /// TODO: should this be an AsyncRwLock? @@ -350,6 +350,7 @@ impl Web3Connection { return Ok(()); } Web3Provider::Ws(_) => {} + Web3Provider::Mock => return Ok(()), } info!("Reconnecting to {}", self); @@ -571,6 +572,7 @@ impl Web3Connection { // TODO: is a RwLock of an Option the right thing here? if let Some(provider) = self.provider.read().await.clone() { match &*provider { + Web3Provider::Mock => unimplemented!(), Web3Provider::Http(_provider) => { // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints // TODO: try watch_blocks and fall back to this? @@ -745,6 +747,7 @@ impl Web3Connection { // TODO: is a RwLock of an Option the right thing here? if let Some(provider) = self.provider.read().await.clone() { match &*provider { + Web3Provider::Mock => unimplemented!(), Web3Provider::Http(provider) => { // there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints // TODO: what should this interval be? probably automatically set to some fraction of block time diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index bfe6a837..5034799c 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -52,7 +52,7 @@ pub struct Web3Connections { /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// TODO: what should we use for edges? pub(super) blockchain_graphmap: AsyncRwLock>, - pub(super) min_synced_rpcs: usize, + pub(super) min_head_rpcs: usize, pub(super) min_sum_soft_limit: u32, } @@ -68,7 +68,7 @@ impl Web3Connections { block_map: BlockHashesCache, head_block_sender: Option>, min_sum_soft_limit: u32, - min_synced_rpcs: usize, + min_head_rpcs: usize, pending_tx_sender: Option>, pending_transactions: Cache, open_request_handle_metrics: Arc, @@ -180,11 +180,11 @@ impl Web3Connections { } // TODO: now this errors for private rpcs when we disable all! - if connections.len() < min_synced_rpcs { + if connections.len() < min_head_rpcs { return Err(anyhow::anyhow!( - "Only {}/{} connections! Add more connections or reduce min_synced_rpcs.", + "Only {}/{} connections! Add more connections or reduce min_head_rpcs.", connections.len(), - min_synced_rpcs + min_head_rpcs )); } @@ -212,7 +212,7 @@ impl Web3Connections { block_numbers, blockchain_graphmap: Default::default(), min_sum_soft_limit, - min_synced_rpcs, + min_head_rpcs, }); let authorization = Arc::new(Authorization::local(db_conn.clone())?); @@ -382,7 +382,7 @@ impl Web3Connections { } /// get the best available rpc server - pub async fn next_upstream_server( + pub async fn best_synced_backend_connection( &self, authorization: &Arc, request_metadata: Option<&Arc>, @@ -394,20 +394,23 @@ impl Web3Connections { let min_block_needed = if let Some(min_block_needed) = min_block_needed { *min_block_needed } else { - self.head_block_num().context("not servers are synced")? + // TODO: error or OpenRequestResult::NotSynced? and then we turn that into a 502? + self.head_block_num().context("no servers are synced")? }; // filter the synced rpcs // TODO: we are going to be checking "has_block_data" a lot now - let synced_rpcs: Vec> = self + let head_rpcs: Vec> = self + .synced_connections + .load() .conns - .values() + .iter() .filter(|x| !skip.contains(x)) .filter(|x| x.has_block_data(&min_block_needed)) .cloned() .collect(); - if synced_rpcs.is_empty() { + if head_rpcs.is_empty() { // TODO: what should happen here? automatic retry? // TODO: more detailed error return Err(anyhow::anyhow!("no servers are synced")); @@ -416,7 +419,7 @@ impl Web3Connections { let mut minimum = 0.0; // we sort on a bunch of values. cache them here so that we don't do this math multiple times. - let weight_map: HashMap<_, f64> = synced_rpcs + let weight_map: HashMap<_, f64> = head_rpcs .iter() .map(|rpc| { // TODO: put this on the rpc object instead? @@ -457,8 +460,8 @@ impl Web3Connections { let sorted_rpcs = { let mut rng = thread_fast_rng::thread_fast_rng(); - synced_rpcs - .choose_multiple_weighted(&mut rng, synced_rpcs.len(), |rpc| { + head_rpcs + .choose_multiple_weighted(&mut rng, head_rpcs.len(), |rpc| { *weight_map .get(rpc) .expect("rpc should always be in the weight map") @@ -512,7 +515,7 @@ impl Web3Connections { /// get all rpc servers that are not rate limited /// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions // TODO: better type on this that can return an anyhow::Result - pub async fn upstream_servers( + pub async fn all_backend_connections( &self, authorization: &Arc, block_needed: Option<&U64>, @@ -572,7 +575,7 @@ impl Web3Connections { break; } match self - .next_upstream_server( + .best_synced_backend_connection( authorization, request_metadata, &skip_rpcs, @@ -705,7 +708,10 @@ impl Web3Connections { block_needed: Option<&U64>, ) -> anyhow::Result { loop { - match self.upstream_servers(authorization, block_needed).await { + match self + .all_backend_connections(authorization, block_needed) + .await + { Ok(active_request_handles) => { // TODO: benchmark this compared to waiting on unbounded futures // TODO: do something with this handle? @@ -800,3 +806,225 @@ impl Serialize for Web3Connections { state.end() } } + +mod tests { + use super::*; + use crate::rpcs::{blockchain::BlockId, provider::Web3Provider}; + use ethers::types::Block; + use log::LevelFilter; + use parking_lot::RwLock; + + #[tokio::test] + async fn test_server_selection() { + // TODO: do this better. can test_env_logger and tokio test be stacked? + let _ = env_logger::builder() + .filter_level(LevelFilter::Error) + .filter_module("web3_proxy", LevelFilter::Trace) + .is_test(true) + .try_init(); + + let lagged_block = Block { + hash: Some(H256::random()), + number: Some(0.into()), + ..Default::default() + }; + + let head_block = Block { + hash: Some(H256::random()), + number: Some(1.into()), + parent_hash: lagged_block.hash.unwrap(), + ..Default::default() + }; + + // TODO: write a impl From for Block -> BlockId? + let lagged_block_id = BlockId { + hash: lagged_block.hash.unwrap(), + num: lagged_block.number.unwrap(), + }; + let head_block_id = BlockId { + hash: head_block.hash.unwrap(), + num: head_block.number.unwrap(), + }; + + let lagged_block = Arc::new(lagged_block); + let head_block = Arc::new(head_block); + + let block_data_limit = u64::MAX; + + let head_rpc = Web3Connection { + name: "synced".to_string(), + display_name: None, + url: "ws://example.com/synced".to_string(), + http_client: None, + active_requests: 0.into(), + total_requests: 0.into(), + provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), + hard_limit: None, + soft_limit: 1_000, + block_data_limit: block_data_limit.into(), + weight: 100.0, + head_block_id: RwLock::new(Some(head_block_id)), + open_request_handle_metrics: Arc::new(Default::default()), + }; + + let lagged_rpc = Web3Connection { + name: "lagged".to_string(), + display_name: None, + url: "ws://example.com/lagged".to_string(), + http_client: None, + active_requests: 0.into(), + total_requests: 0.into(), + provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), + hard_limit: None, + soft_limit: 1_000, + block_data_limit: block_data_limit.into(), + weight: 100.0, + head_block_id: RwLock::new(Some(lagged_block_id)), + open_request_handle_metrics: Arc::new(Default::default()), + }; + + assert!(head_rpc.has_block_data(&lagged_block.number.unwrap())); + assert!(head_rpc.has_block_data(&head_block.number.unwrap())); + + assert!(lagged_rpc.has_block_data(&lagged_block.number.unwrap())); + assert!(!lagged_rpc.has_block_data(&head_block.number.unwrap())); + + let head_rpc = Arc::new(head_rpc); + let lagged_rpc = Arc::new(lagged_rpc); + + let conns = HashMap::from([ + (head_rpc.name.clone(), head_rpc.clone()), + (lagged_rpc.name.clone(), lagged_rpc.clone()), + ]); + + let conns = Web3Connections { + conns, + synced_connections: Default::default(), + pending_transactions: Cache::builder() + .max_capacity(10_000) + .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), + block_hashes: Cache::builder() + .max_capacity(10_000) + .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), + block_numbers: Cache::builder() + .max_capacity(10_000) + .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), + blockchain_graphmap: Default::default(), + min_head_rpcs: 1, + min_sum_soft_limit: 1, + }; + + let authorization = Arc::new(Authorization::local(None).unwrap()); + + let (head_block_sender, _head_block_receiver) = + watch::channel::(Default::default()); + let mut connection_heads = HashMap::new(); + + // process None so that + conns + .process_block_from_rpc( + &authorization, + &mut connection_heads, + None, + lagged_rpc.clone(), + &head_block_sender, + &None, + ) + .await + .unwrap(); + conns + .process_block_from_rpc( + &authorization, + &mut connection_heads, + None, + head_rpc.clone(), + &head_block_sender, + &None, + ) + .await + .unwrap(); + + // no head block because the rpcs haven't communicated through their channels + assert!(conns.head_block_hash().is_none()); + + // all_backend_connections gives everything regardless of sync status + assert_eq!( + conns + .all_backend_connections(&authorization, None) + .await + .unwrap() + .len(), + 2 + ); + + // best_synced_backend_connection requires servers to be synced with the head block + // TODO: should this be an error, or a OpenRequestResult::NotSynced? + assert!(conns + .best_synced_backend_connection(&authorization, None, &[], lagged_block.number.as_ref()) + .await + .is_err()); + + // add lagged blocks to the conns. both servers should be allowed + conns.save_block(&lagged_block, true).await.unwrap(); + + conns + .process_block_from_rpc( + &authorization, + &mut connection_heads, + Some(lagged_block.clone()), + lagged_rpc, + &head_block_sender, + &None, + ) + .await + .unwrap(); + conns + .process_block_from_rpc( + &authorization, + &mut connection_heads, + Some(lagged_block.clone()), + head_rpc.clone(), + &head_block_sender, + &None, + ) + .await + .unwrap(); + + assert_eq!(conns.num_synced_rpcs(), 2); + + // add head block to the conns. lagged_rpc should not be available + conns.save_block(&head_block, true).await.unwrap(); + + conns + .process_block_from_rpc( + &authorization, + &mut connection_heads, + Some(head_block.clone()), + head_rpc, + &head_block_sender, + &None, + ) + .await + .unwrap(); + + assert_eq!(conns.num_synced_rpcs(), 1); + + // TODO: is_ok is too simple. make sure its head_rpc + assert!(conns + .best_synced_backend_connection(&authorization, None, &[], None) + .await + .is_ok()); + assert!(conns + .best_synced_backend_connection(&authorization, None, &[], Some(&0.into())) + .await + .is_ok()); + assert!(conns + .best_synced_backend_connection(&authorization, None, &[], Some(&1.into())) + .await + .is_ok()); + assert!(conns + .best_synced_backend_connection(&authorization, None, &[], Some(&2.into())) + .await + .is_err()); + } +} diff --git a/web3_proxy/src/rpcs/provider.rs b/web3_proxy/src/rpcs/provider.rs index c4850534..bcffec4e 100644 --- a/web3_proxy/src/rpcs/provider.rs +++ b/web3_proxy/src/rpcs/provider.rs @@ -8,12 +8,15 @@ use std::time::Duration; pub enum Web3Provider { Http(ethers::providers::Provider), Ws(ethers::providers::Provider), + // TODO: only include this for tests. + Mock, } impl Web3Provider { pub fn ready(&self) -> bool { // TODO: i'm not sure if this is enough match self { + Self::Mock => true, Self::Http(_) => true, Self::Ws(provider) => provider.as_ref().ready(), } diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 21c40b0d..e6e2cc30 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -200,6 +200,7 @@ impl OpenRequestHandle { // TODO: really sucks that we have to clone here let response = match provider { + Web3Provider::Mock => unimplemented!(), Web3Provider::Http(provider) => provider.request(method, params).await, Web3Provider::Ws(provider) => provider.request(method, params).await, }; @@ -242,6 +243,7 @@ impl OpenRequestHandle { let is_revert = if let ProviderError::JsonRpcClientError(err) = err { // Http and Ws errors are very similar, but different types let msg = match provider { + Web3Provider::Mock => unimplemented!(), Web3Provider::Http(_) => { if let Some(HttpClientError::JsonRpcError(err)) = err.downcast_ref::()