check to see if this gets stuck

This commit is contained in:
Bryan Stitt 2022-05-19 03:00:54 +00:00
parent 3d04273e2e
commit 1e9284d5e8
5 changed files with 113 additions and 72 deletions

@ -1,5 +1,10 @@
# Todo
- [ ] some production configs are occassionally stuck waiting at 100% cpu
- they stop processing new blocks. i'm guessing 2 blocks arrive at the same time, but i thought our locks would handle that
- even after removing a bunch of the locks, the deadlock still happens. i can't reliably reproduce. i just let it run for awhile and it happens.
- running gdb shows the thread at tokio tungstenite thread is spinning near 100% cpu and none of the rest of the program is proceeding
- [ ] should we use ethers-rs' quorum provider for the private rpcs? i think it would work well, but won't work with our current reconnect logic
- [ ] improve caching
- [ ] if the eth_call (or similar) params include a block, we can cache for longer
- [ ] if the call is something simple like "symbol" or "decimals", cache that too
@ -9,10 +14,6 @@
- [ ] if a rpc fails to connect at start, retry later instead of skipping it forever
- [ ] endpoint for health checks. if no synced servers, give a 502 error
- [ ] move from warp to auxm?
- [ ] some production configs are occassionally stuck waiting at 100% cpu
- they stop processing new blocks. i'm guessing 2 blocks arrive at the same time, but i thought our locks would handle that
- even after removing a bunch of the locks, the deadlock still happens. i can't reliably reproduce. i just let it run for awhile and it happens.
- running gdb shows the thread at tokio tungstenite thread is spinning near 100% cpu and none of the rest of the program is proceeding
- [ ] proper logging with useful instrumentation
- [ ] handle websocket disconnect and reconnect
- [ ] warning if no blocks for too long. maybe reconnect automatically?

@ -111,6 +111,7 @@ impl Web3ProxyApp {
self: Arc<Web3ProxyApp>,
request: JsonRpcRequestEnum,
) -> anyhow::Result<impl warp::Reply> {
// TODO: i feel like i don't see this log when i should (even though i do see the response)
debug!("Received request: {:?}", request);
let response = match request {
@ -122,7 +123,7 @@ impl Web3ProxyApp {
}
};
// TODO: i don't seem to ever see this log. why?
// TODO: i feel like i don't see this log when i should (even though i do see the response)
debug!("Forwarding response: {:?}", response);
Ok(warp::reply::json(&response))
@ -223,12 +224,13 @@ impl Web3ProxyApp {
} else {
// this is not a private transaction (or no private relays are configured)
// TODO: how much should we retry?
for i in 0..10 {
for i in 0..10usize {
// TODO: think more about this loop.
// TODO: add more to this span
let span = info_span!("i", i);
let _enter = span.enter();
// // TODO: add more to this span. and do it properly
// let span = info_span!("i", ?i);
// let _enter = span.enter();
/*
// todo: move getting a cache_key or the result into a helper function. then we could have multiple caches
// TODO: i think we are maybe getting stuck on this lock. maybe a new block arrives, it tries to write and gets hung up on something. then this can't proceed
trace!("{:?} waiting for head_block_hash", request);
@ -295,6 +297,7 @@ impl Web3ProxyApp {
);
}
}
*/
match self.balanced_rpcs.next_upstream_server().await {
Ok(active_request_handle) => {
@ -307,14 +310,15 @@ impl Web3ProxyApp {
// TODO: trace here was really slow with millions of requests.
// trace!("forwarding request from {}", upstream_server);
let response = JsonRpcForwardedResponse {
JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
// TODO: since we only use the result here, should that be all we return from try_send_request?
result: Some(partial_response),
error: None,
};
}
/*
// TODO: small race condidition here. parallel requests with the same query will both be saved to the cache
let mut response_cache = self.response_cache.write();
@ -330,12 +334,13 @@ impl Web3ProxyApp {
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
*/
response
// response
}
Err(e) => {
// send now since we aren't going to cache an error response
let _ = in_flight_tx.send(false);
// // send now since we aren't going to cache an error response
// let _ = in_flight_tx.send(false);
// TODO: move this to a helper function?
let code;
@ -397,8 +402,8 @@ impl Web3ProxyApp {
}
};
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
// // TODO: needing to remove manually here makes me think we should do this differently
// let _ = self.active_requests.remove(&cache_key);
if response.error.is_some() {
trace!("Sending error reply: {:?}", response);
@ -407,7 +412,7 @@ impl Web3ProxyApp {
} else {
trace!("Sending reply: {:?}", response);
let _ = in_flight_tx.send(false);
// let _ = in_flight_tx.send(false);
}
return Ok(response);
@ -416,9 +421,9 @@ impl Web3ProxyApp {
// TODO: this is too verbose. if there are other servers in other tiers, we use those!
warn!("No servers in sync!");
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
// // TODO: needing to remove manually here makes me think we should do this differently
// let _ = self.active_requests.remove(&cache_key);
// let _ = in_flight_tx.send(false);
return Err(anyhow::anyhow!("no servers in sync"));
}
@ -434,11 +439,10 @@ impl Web3ProxyApp {
warn!("All rate limits exceeded. Sleeping");
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
// // TODO: needing to remove manually here makes me think we should do this differently
// let _ = self.active_requests.remove(&cache_key);
// let _ = in_flight_tx.send(false);
// continue
continue;
}
}

@ -13,7 +13,7 @@ use std::sync::atomic::{self, AtomicU32};
use std::{cmp::Ordering, sync::Arc};
use tokio::sync::RwLock;
use tokio::task;
use tokio::time::{interval, sleep, timeout_at, Duration, Instant, MissedTickBehavior};
use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
use tracing::{info, instrument, trace, warn};
type Web3RateLimiter =
@ -107,10 +107,8 @@ impl Web3Connection {
// since this lock is held open over an await, we use tokio's locking
let mut provider = self.provider.write().await;
// TODO: tell the block subscriber that we are at 0
block_sender
.send_async((0, H256::default(), rpc_id))
.await?;
// tell the block subscriber that we are at 0
block_sender.send_async((0, H256::zero(), rpc_id)).await?;
let new_provider = Web3Provider::from_str(&self.url, http_client).await?;
@ -300,26 +298,20 @@ impl Web3Connection {
self.send_block(block, &block_sender, rpc_id).await;
// TODO: what should this timeout be? needs to be larger than worst case block time
// TODO: should the stream have a timeout on it here?
// TODO: although reconnects will make this less of an issue
loop {
match timeout_at(Instant::now() + Duration::from_secs(300), stream.next())
.await
{
Ok(Some(new_block)) => {
match stream.next().await {
Some(new_block) => {
self.send_block(Ok(new_block), &block_sender, rpc_id).await;
// TODO: really not sure about this
task::yield_now().await;
}
Ok(None) => {
None => {
warn!("subscription ended");
break;
}
Err(e) => {
warn!("subscription ended with an error: {:?}", e);
break;
}
}
}
}

@ -7,9 +7,10 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use governor::clock::{QuantaClock, QuantaInstant};
use governor::NotUntil;
use hashbrown::HashMap;
use hashbrown::{HashMap, HashSet};
use serde_json::value::RawValue;
use std::cmp;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::Arc;
use tokio::task;
@ -23,7 +24,7 @@ use crate::connection::{ActiveRequestHandle, Web3Connection};
struct SyncedConnections {
head_block_num: u64,
head_block_hash: H256,
inner: Vec<usize>,
inner: HashSet<usize>,
}
impl fmt::Debug for SyncedConnections {
@ -38,7 +39,7 @@ impl SyncedConnections {
Self {
head_block_num: 0,
head_block_hash: Default::default(),
inner: Vec::with_capacity(max_connections),
inner: HashSet::with_capacity(max_connections),
}
}
@ -232,6 +233,7 @@ impl Web3Connections {
}
// TODO: span with rpc in it, too
// TODO: make sure i'm doing this span right
let span = info_span!("new_block", new_block_num);
let _enter = span.enter();
@ -241,39 +243,76 @@ impl Web3Connections {
match new_block_num.cmp(&pending_synced_connections.head_block_num) {
cmp::Ordering::Greater => {
// the rpc's newest block is the new overall best block
info!("new head from #{}", rpc_id);
info!(rpc_id, "new head");
pending_synced_connections.inner.clear();
pending_synced_connections.inner.push(rpc_id);
pending_synced_connections.inner.insert(rpc_id);
pending_synced_connections.head_block_num = new_block_num;
// TODO: if the parent hash isn't our previous best block, ignore it
pending_synced_connections.head_block_hash = new_block_hash;
}
cmp::Ordering::Equal => {
if new_block_hash != pending_synced_connections.head_block_hash {
if new_block_hash == pending_synced_connections.head_block_hash {
// this rpc has caught up with the best known head
// do not clear synced_connections.
// we just want to add this rpc to the end
// TODO: HashSet here? i think we get dupes if we don't
pending_synced_connections.inner.insert(rpc_id);
} else {
// same height, but different chain
// TODO: anything else we should do? set some "nextSafeBlockHeight" to delay sending transactions?
// TODO: sometimes a node changes its block. if that happens, a new block is probably right behind this one
warn!(
"chain is forked at #{}! #{} has {}. {} rpcs have {}",
new_block_num,
rpc_id,
new_block_hash,
pending_synced_connections.inner.len(),
pending_synced_connections.head_block_hash
);
// TODO: don't continue. check connection_states to see which head block is more popular!
continue;
}
// do not clear synced_connections.
// we just want to add this rpc to the end
// TODO: HashSet here? i think we get dupes if we don't
pending_synced_connections.inner.push(rpc_id);
// check connection_states to see which head block is more popular!
let mut rpc_ids_by_block: BTreeMap<H256, Vec<usize>> = BTreeMap::new();
let mut synced_rpcs = 0;
for (rpc_id, (block_num, block_hash)) in connection_states.iter() {
if *block_num != new_block_num {
// this connection isn't synced. we don't care what hash it has
continue;
}
synced_rpcs += 1;
let count = rpc_ids_by_block
.entry(*block_hash)
.or_insert_with(|| Vec::with_capacity(max_connections - 1));
count.push(*rpc_id);
}
let most_common_head_hash = rpc_ids_by_block
.iter()
.max_by(|a, b| a.1.len().cmp(&b.1.len()))
.map(|(k, _v)| k)
.unwrap();
warn!(
"chain is forked! {} possible heads. {}/{}/{} rpcs have {}",
rpc_ids_by_block.len(),
rpc_ids_by_block.get(most_common_head_hash).unwrap().len(),
synced_rpcs,
max_connections,
most_common_head_hash
);
// this isn't the best block in the tier. don't do anything
if !pending_synced_connections.inner.remove(&rpc_id) {
// we didn't remove anything. nothing more to do
continue;
}
// we removed. don't continue so that we update self.synced_connections
}
}
cmp::Ordering::Less => {
// this isn't the best block in the tier. don't do anything
continue;
if !pending_synced_connections.inner.remove(&rpc_id) {
// we didn't remove anything. nothing more to do
continue;
}
// we removed. don't continue so that we update self.synced_connections
}
}
@ -298,9 +337,15 @@ impl Web3Connections {
) -> Result<ActiveRequestHandle, Option<NotUntil<QuantaInstant>>> {
let mut earliest_not_until = None;
let mut synced_rpc_indexes = self.synced_connections.load().inner.clone();
let mut synced_rpc_ids: Vec<usize> = self
.synced_connections
.load()
.inner
.iter()
.cloned()
.collect();
let sort_cache: HashMap<usize, (f32, u32)> = synced_rpc_indexes
let sort_cache: HashMap<usize, (f32, u32)> = synced_rpc_ids
.iter()
.map(|rpc_id| {
let rpc = self.inner.get(*rpc_id).unwrap();
@ -308,14 +353,13 @@ impl Web3Connections {
let active_requests = rpc.active_requests();
let soft_limit = rpc.soft_limit();
// TODO: how should we include the soft limit? floats are slower than integer math
let utilization = active_requests as f32 / soft_limit as f32;
(*rpc_id, (utilization, soft_limit))
})
.collect();
synced_rpc_indexes.sort_unstable_by(|a, b| {
synced_rpc_ids.sort_unstable_by(|a, b| {
let (a_utilization, a_soft_limit) = sort_cache.get(a).unwrap();
let (b_utilization, b_soft_limit) = sort_cache.get(b).unwrap();
@ -329,7 +373,8 @@ impl Web3Connections {
}
});
for rpc_id in synced_rpc_indexes.into_iter() {
// now that the rpcs are sorted, try to get an active request handle for one of them
for rpc_id in synced_rpc_ids.into_iter() {
let rpc = self.inner.get(rpc_id).unwrap();
// increment our connection counter
@ -344,8 +389,7 @@ impl Web3Connections {
}
}
// TODO: this is too verbose
// warn!("no servers on {:?}! {:?}", self, earliest_not_until);
warn!("no servers on {:?}! {:?}", self, earliest_not_until);
// this might be None
Err(earliest_not_until)

@ -28,6 +28,7 @@ fn main() -> anyhow::Result<()> {
}
// install global collector configured based on RUST_LOG env var.
// tracing_subscriber::fmt().init();
console_subscriber::init();
fdlimit::raise_fd_limit();
@ -43,8 +44,7 @@ fn main() -> anyhow::Result<()> {
let chain_id = rpc_config.shared.chain_id;
// TODO: multithreaded runtime once i'm done debugging
let mut rt_builder = runtime::Builder::new_current_thread();
let mut rt_builder = runtime::Builder::new_multi_thread();
rt_builder.enable_all().thread_name_fn(move || {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);