From 1e9284d5e878f3d89d32bab2864c3751d760bb63 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 19 May 2022 03:00:54 +0000 Subject: [PATCH] check to see if this gets stuck --- TODO.md | 9 +-- web3-proxy/src/app.rs | 44 +++++++------- web3-proxy/src/connection.rs | 22 +++---- web3-proxy/src/connections.rs | 106 ++++++++++++++++++++++++---------- web3-proxy/src/main.rs | 4 +- 5 files changed, 113 insertions(+), 72 deletions(-) diff --git a/TODO.md b/TODO.md index 15ee496f..e1301094 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,10 @@ # Todo +- [ ] some production configs are occassionally stuck waiting at 100% cpu + - they stop processing new blocks. i'm guessing 2 blocks arrive at the same time, but i thought our locks would handle that + - even after removing a bunch of the locks, the deadlock still happens. i can't reliably reproduce. i just let it run for awhile and it happens. + - running gdb shows the thread at tokio tungstenite thread is spinning near 100% cpu and none of the rest of the program is proceeding +- [ ] should we use ethers-rs' quorum provider for the private rpcs? i think it would work well, but won't work with our current reconnect logic - [ ] improve caching - [ ] if the eth_call (or similar) params include a block, we can cache for longer - [ ] if the call is something simple like "symbol" or "decimals", cache that too @@ -9,10 +14,6 @@ - [ ] if a rpc fails to connect at start, retry later instead of skipping it forever - [ ] endpoint for health checks. if no synced servers, give a 502 error - [ ] move from warp to auxm? -- [ ] some production configs are occassionally stuck waiting at 100% cpu - - they stop processing new blocks. i'm guessing 2 blocks arrive at the same time, but i thought our locks would handle that - - even after removing a bunch of the locks, the deadlock still happens. i can't reliably reproduce. i just let it run for awhile and it happens. - - running gdb shows the thread at tokio tungstenite thread is spinning near 100% cpu and none of the rest of the program is proceeding - [ ] proper logging with useful instrumentation - [ ] handle websocket disconnect and reconnect - [ ] warning if no blocks for too long. maybe reconnect automatically? diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 6704efb0..78618d73 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -111,6 +111,7 @@ impl Web3ProxyApp { self: Arc, request: JsonRpcRequestEnum, ) -> anyhow::Result { + // TODO: i feel like i don't see this log when i should (even though i do see the response) debug!("Received request: {:?}", request); let response = match request { @@ -122,7 +123,7 @@ impl Web3ProxyApp { } }; - // TODO: i don't seem to ever see this log. why? + // TODO: i feel like i don't see this log when i should (even though i do see the response) debug!("Forwarding response: {:?}", response); Ok(warp::reply::json(&response)) @@ -223,12 +224,13 @@ impl Web3ProxyApp { } else { // this is not a private transaction (or no private relays are configured) // TODO: how much should we retry? - for i in 0..10 { + for i in 0..10usize { // TODO: think more about this loop. - // TODO: add more to this span - let span = info_span!("i", i); - let _enter = span.enter(); + // // TODO: add more to this span. and do it properly + // let span = info_span!("i", ?i); + // let _enter = span.enter(); + /* // todo: move getting a cache_key or the result into a helper function. then we could have multiple caches // TODO: i think we are maybe getting stuck on this lock. maybe a new block arrives, it tries to write and gets hung up on something. then this can't proceed trace!("{:?} waiting for head_block_hash", request); @@ -295,6 +297,7 @@ impl Web3ProxyApp { ); } } + */ match self.balanced_rpcs.next_upstream_server().await { Ok(active_request_handle) => { @@ -307,14 +310,15 @@ impl Web3ProxyApp { // TODO: trace here was really slow with millions of requests. // trace!("forwarding request from {}", upstream_server); - let response = JsonRpcForwardedResponse { + JsonRpcForwardedResponse { jsonrpc: "2.0".to_string(), id: request.id, // TODO: since we only use the result here, should that be all we return from try_send_request? result: Some(partial_response), error: None, - }; + } + /* // TODO: small race condidition here. parallel requests with the same query will both be saved to the cache let mut response_cache = self.response_cache.write(); @@ -330,12 +334,13 @@ impl Web3ProxyApp { // TODO: needing to remove manually here makes me think we should do this differently let _ = self.active_requests.remove(&cache_key); let _ = in_flight_tx.send(false); + */ - response + // response } Err(e) => { - // send now since we aren't going to cache an error response - let _ = in_flight_tx.send(false); + // // send now since we aren't going to cache an error response + // let _ = in_flight_tx.send(false); // TODO: move this to a helper function? let code; @@ -397,8 +402,8 @@ impl Web3ProxyApp { } }; - // TODO: needing to remove manually here makes me think we should do this differently - let _ = self.active_requests.remove(&cache_key); + // // TODO: needing to remove manually here makes me think we should do this differently + // let _ = self.active_requests.remove(&cache_key); if response.error.is_some() { trace!("Sending error reply: {:?}", response); @@ -407,7 +412,7 @@ impl Web3ProxyApp { } else { trace!("Sending reply: {:?}", response); - let _ = in_flight_tx.send(false); + // let _ = in_flight_tx.send(false); } return Ok(response); @@ -416,9 +421,9 @@ impl Web3ProxyApp { // TODO: this is too verbose. if there are other servers in other tiers, we use those! warn!("No servers in sync!"); - // TODO: needing to remove manually here makes me think we should do this differently - let _ = self.active_requests.remove(&cache_key); - let _ = in_flight_tx.send(false); + // // TODO: needing to remove manually here makes me think we should do this differently + // let _ = self.active_requests.remove(&cache_key); + // let _ = in_flight_tx.send(false); return Err(anyhow::anyhow!("no servers in sync")); } @@ -434,11 +439,10 @@ impl Web3ProxyApp { warn!("All rate limits exceeded. Sleeping"); - // TODO: needing to remove manually here makes me think we should do this differently - let _ = self.active_requests.remove(&cache_key); - let _ = in_flight_tx.send(false); + // // TODO: needing to remove manually here makes me think we should do this differently + // let _ = self.active_requests.remove(&cache_key); + // let _ = in_flight_tx.send(false); - // continue continue; } } diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 79fc6e60..a7b922f4 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -13,7 +13,7 @@ use std::sync::atomic::{self, AtomicU32}; use std::{cmp::Ordering, sync::Arc}; use tokio::sync::RwLock; use tokio::task; -use tokio::time::{interval, sleep, timeout_at, Duration, Instant, MissedTickBehavior}; +use tokio::time::{interval, sleep, Duration, MissedTickBehavior}; use tracing::{info, instrument, trace, warn}; type Web3RateLimiter = @@ -107,10 +107,8 @@ impl Web3Connection { // since this lock is held open over an await, we use tokio's locking let mut provider = self.provider.write().await; - // TODO: tell the block subscriber that we are at 0 - block_sender - .send_async((0, H256::default(), rpc_id)) - .await?; + // tell the block subscriber that we are at 0 + block_sender.send_async((0, H256::zero(), rpc_id)).await?; let new_provider = Web3Provider::from_str(&self.url, http_client).await?; @@ -300,26 +298,20 @@ impl Web3Connection { self.send_block(block, &block_sender, rpc_id).await; - // TODO: what should this timeout be? needs to be larger than worst case block time + // TODO: should the stream have a timeout on it here? // TODO: although reconnects will make this less of an issue loop { - match timeout_at(Instant::now() + Duration::from_secs(300), stream.next()) - .await - { - Ok(Some(new_block)) => { + match stream.next().await { + Some(new_block) => { self.send_block(Ok(new_block), &block_sender, rpc_id).await; // TODO: really not sure about this task::yield_now().await; } - Ok(None) => { + None => { warn!("subscription ended"); break; } - Err(e) => { - warn!("subscription ended with an error: {:?}", e); - break; - } } } } diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index d1071140..33e97fa3 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -7,9 +7,10 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use governor::clock::{QuantaClock, QuantaInstant}; use governor::NotUntil; -use hashbrown::HashMap; +use hashbrown::{HashMap, HashSet}; use serde_json::value::RawValue; use std::cmp; +use std::collections::BTreeMap; use std::fmt; use std::sync::Arc; use tokio::task; @@ -23,7 +24,7 @@ use crate::connection::{ActiveRequestHandle, Web3Connection}; struct SyncedConnections { head_block_num: u64, head_block_hash: H256, - inner: Vec, + inner: HashSet, } impl fmt::Debug for SyncedConnections { @@ -38,7 +39,7 @@ impl SyncedConnections { Self { head_block_num: 0, head_block_hash: Default::default(), - inner: Vec::with_capacity(max_connections), + inner: HashSet::with_capacity(max_connections), } } @@ -232,6 +233,7 @@ impl Web3Connections { } // TODO: span with rpc in it, too + // TODO: make sure i'm doing this span right let span = info_span!("new_block", new_block_num); let _enter = span.enter(); @@ -241,39 +243,76 @@ impl Web3Connections { match new_block_num.cmp(&pending_synced_connections.head_block_num) { cmp::Ordering::Greater => { // the rpc's newest block is the new overall best block - info!("new head from #{}", rpc_id); + info!(rpc_id, "new head"); pending_synced_connections.inner.clear(); - pending_synced_connections.inner.push(rpc_id); + pending_synced_connections.inner.insert(rpc_id); pending_synced_connections.head_block_num = new_block_num; + + // TODO: if the parent hash isn't our previous best block, ignore it pending_synced_connections.head_block_hash = new_block_hash; } cmp::Ordering::Equal => { - if new_block_hash != pending_synced_connections.head_block_hash { + if new_block_hash == pending_synced_connections.head_block_hash { + // this rpc has caught up with the best known head + // do not clear synced_connections. + // we just want to add this rpc to the end + // TODO: HashSet here? i think we get dupes if we don't + pending_synced_connections.inner.insert(rpc_id); + } else { // same height, but different chain - // TODO: anything else we should do? set some "nextSafeBlockHeight" to delay sending transactions? - // TODO: sometimes a node changes its block. if that happens, a new block is probably right behind this one - warn!( - "chain is forked at #{}! #{} has {}. {} rpcs have {}", - new_block_num, - rpc_id, - new_block_hash, - pending_synced_connections.inner.len(), - pending_synced_connections.head_block_hash - ); - // TODO: don't continue. check connection_states to see which head block is more popular! - continue; - } - // do not clear synced_connections. - // we just want to add this rpc to the end - // TODO: HashSet here? i think we get dupes if we don't - pending_synced_connections.inner.push(rpc_id); + // check connection_states to see which head block is more popular! + let mut rpc_ids_by_block: BTreeMap> = BTreeMap::new(); + + let mut synced_rpcs = 0; + + for (rpc_id, (block_num, block_hash)) in connection_states.iter() { + if *block_num != new_block_num { + // this connection isn't synced. we don't care what hash it has + continue; + } + + synced_rpcs += 1; + + let count = rpc_ids_by_block + .entry(*block_hash) + .or_insert_with(|| Vec::with_capacity(max_connections - 1)); + + count.push(*rpc_id); + } + + let most_common_head_hash = rpc_ids_by_block + .iter() + .max_by(|a, b| a.1.len().cmp(&b.1.len())) + .map(|(k, _v)| k) + .unwrap(); + + warn!( + "chain is forked! {} possible heads. {}/{}/{} rpcs have {}", + rpc_ids_by_block.len(), + rpc_ids_by_block.get(most_common_head_hash).unwrap().len(), + synced_rpcs, + max_connections, + most_common_head_hash + ); + + // this isn't the best block in the tier. don't do anything + if !pending_synced_connections.inner.remove(&rpc_id) { + // we didn't remove anything. nothing more to do + continue; + } + // we removed. don't continue so that we update self.synced_connections + } } cmp::Ordering::Less => { // this isn't the best block in the tier. don't do anything - continue; + if !pending_synced_connections.inner.remove(&rpc_id) { + // we didn't remove anything. nothing more to do + continue; + } + // we removed. don't continue so that we update self.synced_connections } } @@ -298,9 +337,15 @@ impl Web3Connections { ) -> Result>> { let mut earliest_not_until = None; - let mut synced_rpc_indexes = self.synced_connections.load().inner.clone(); + let mut synced_rpc_ids: Vec = self + .synced_connections + .load() + .inner + .iter() + .cloned() + .collect(); - let sort_cache: HashMap = synced_rpc_indexes + let sort_cache: HashMap = synced_rpc_ids .iter() .map(|rpc_id| { let rpc = self.inner.get(*rpc_id).unwrap(); @@ -308,14 +353,13 @@ impl Web3Connections { let active_requests = rpc.active_requests(); let soft_limit = rpc.soft_limit(); - // TODO: how should we include the soft limit? floats are slower than integer math let utilization = active_requests as f32 / soft_limit as f32; (*rpc_id, (utilization, soft_limit)) }) .collect(); - synced_rpc_indexes.sort_unstable_by(|a, b| { + synced_rpc_ids.sort_unstable_by(|a, b| { let (a_utilization, a_soft_limit) = sort_cache.get(a).unwrap(); let (b_utilization, b_soft_limit) = sort_cache.get(b).unwrap(); @@ -329,7 +373,8 @@ impl Web3Connections { } }); - for rpc_id in synced_rpc_indexes.into_iter() { + // now that the rpcs are sorted, try to get an active request handle for one of them + for rpc_id in synced_rpc_ids.into_iter() { let rpc = self.inner.get(rpc_id).unwrap(); // increment our connection counter @@ -344,8 +389,7 @@ impl Web3Connections { } } - // TODO: this is too verbose - // warn!("no servers on {:?}! {:?}", self, earliest_not_until); + warn!("no servers on {:?}! {:?}", self, earliest_not_until); // this might be None Err(earliest_not_until) diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index 536df925..a83ec7ab 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -28,6 +28,7 @@ fn main() -> anyhow::Result<()> { } // install global collector configured based on RUST_LOG env var. + // tracing_subscriber::fmt().init(); console_subscriber::init(); fdlimit::raise_fd_limit(); @@ -43,8 +44,7 @@ fn main() -> anyhow::Result<()> { let chain_id = rpc_config.shared.chain_id; - // TODO: multithreaded runtime once i'm done debugging - let mut rt_builder = runtime::Builder::new_current_thread(); + let mut rt_builder = runtime::Builder::new_multi_thread(); rt_builder.enable_all().thread_name_fn(move || { static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);