deadlock detection

This commit is contained in:
Bryan Stitt 2022-05-16 05:16:32 +00:00
parent c3f97b06f4
commit 5fcd01065e
8 changed files with 111 additions and 16 deletions

74
Cargo.lock generated

@ -12,6 +12,21 @@ dependencies = [
"regex",
]
[[package]]
name = "addr2line"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9ecd88a8c8378ca913a680cd98f0f13ac67383d35993f86c90a70e3f137816b"
dependencies = [
"gimli",
]
[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "aes"
version = "0.7.5"
@ -154,6 +169,21 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "backtrace"
version = "0.3.65"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11a17d453482a265fd5f8479f2a3f405566e6ca627837aaddb85af8b1ab8ef61"
dependencies = [
"addr2line",
"cc",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
]
[[package]]
name = "base16ct"
version = "0.1.1"
@ -1470,6 +1500,12 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "gimli"
version = "0.26.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4"
[[package]]
name = "glob"
version = "0.3.0"
@ -2017,6 +2053,15 @@ dependencies = [
"unicase",
]
[[package]]
name = "miniz_oxide"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2b29bd4bc3f33391105ebee3589c19197c4271e3e5a9ec9bfe8127eeff8f082"
dependencies = [
"adler",
]
[[package]]
name = "mio"
version = "0.8.2"
@ -2152,6 +2197,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]]
name = "object"
version = "0.28.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e42c982f2d955fac81dd7e1d0e1426a7d702acd9c98d19ab01083a6a0328c424"
dependencies = [
"memchr",
]
[[package]]
name = "once_cell"
version = "1.10.0"
@ -2249,10 +2303,13 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929"
dependencies = [
"backtrace",
"cfg-if",
"libc",
"petgraph",
"redox_syscall",
"smallvec",
"thread-id",
"windows-sys",
]
@ -2795,6 +2852,12 @@ dependencies = [
"syn",
]
[[package]]
name = "rustc-demangle"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342"
[[package]]
name = "rustc-hash"
version = "1.1.0"
@ -3333,6 +3396,17 @@ dependencies = [
"syn",
]
[[package]]
name = "thread-id"
version = "4.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fdfe0627923f7411a43ec9ec9c39c3a9b4151be313e0922042581fb6c9b717f"
dependencies = [
"libc",
"redox_syscall",
"winapi",
]
[[package]]
name = "thread_local"
version = "1.1.4"

@ -5,6 +5,6 @@ members = [
]
# TODO: enable these once rapid development is done
#[profile.release]
[profile.release]
#lto = true
#panic = "abort"
panic = "abort"

@ -35,5 +35,5 @@ chain_id = 1
soft_limit = 7074
[private_rpcs.securerpc]
url = "wss://gibson.securerpc.com/v1"
url = "https://gibson.securerpc.com/v1"
soft_limit = 4560

@ -13,11 +13,11 @@ derive_more = "0.99.17"
ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] }
flume = "0.10.12"
futures = { version = "0.3.21", features = ["thread-pool"] }
governor = { version = "0.4.2", features = ["std"] }
governor = { version = "0.4.2", features = ["dashmap", "std"] }
hashbrown = "0.12.1"
left-right = "0.11.4"
linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] }
parking_lot = "0.12.0"
parking_lot = { version = "0.12.0", features = ["deadlock_detection"] }
proctitle = "0.1.1"
regex = "1.5.5"
reqwest = { version = "0.11.10", default-features = false, features = ["json", "rustls"] }

@ -212,7 +212,6 @@ impl Web3ProxyApp {
// TODO: think more about this loop.
loop {
// todo: bring back this caching
/*
let best_block_hash = self
.balanced_rpcs
.get_synced_rpcs()
@ -233,7 +232,6 @@ impl Web3ProxyApp {
// TODO: return a reference in the other places so that this works without a clone?
return Ok(cached.to_owned());
}
*/
match self.balanced_rpcs.next_upstream_server().await {
Ok(active_request_handle) => {
@ -254,7 +252,6 @@ impl Web3ProxyApp {
error: None,
};
/*
// TODO: small race condidition here. parallel requests with the same query will both be saved to the cache
let mut response_cache = self.response_cache.write();
@ -266,7 +263,6 @@ impl Web3ProxyApp {
}
drop(response_cache);
*/
response
}

@ -164,7 +164,7 @@ impl Web3Connection {
&self.url
}
fn send_block(
async fn send_block(
self: &Arc<Self>,
block: Result<Block<TxHash>, ProviderError>,
block_sender: &flume::Sender<(u64, H256, Arc<Self>)>,
@ -176,7 +176,8 @@ impl Web3Connection {
// TODO: i'm pretty sure we don't need send_async, but double check
block_sender
.send((block_number, block_hash, self.clone()))
.send_async((block_number, block_hash, self.clone()))
.await
.unwrap();
}
Err(e) => {
@ -228,7 +229,7 @@ impl Web3Connection {
last_hash = new_hash;
}
self.send_block(block, &block_sender);
self.send_block(block, &block_sender).await;
}
}
Web3Provider::Ws(provider) => {
@ -253,10 +254,10 @@ impl Web3Connection {
drop(active_request_handle);
self.send_block(block, &block_sender);
self.send_block(block, &block_sender).await;
while let Some(new_block) = stream.next().await {
self.send_block(Ok(new_block), &block_sender);
self.send_block(Ok(new_block), &block_sender).await;
}
}
}

@ -245,7 +245,10 @@ impl Web3Connections {
// send the first good response to a one shot channel. that way we respond quickly
// drop the result because errors are expected after the first send
response_sender.send(Ok(response)).map_err(Into::into)
response_sender
.send_async(Ok(response))
.await
.map_err(Into::into)
});
unordered_futures.push(handle);
@ -273,7 +276,7 @@ impl Web3Connections {
};
// send the error to the channel
if response_sender.send(e).is_ok() {
if response_sender.send_async(e).await.is_ok() {
// if we were able to send an error, then we never sent a success
return Err(anyhow::anyhow!("no successful responses"));
} else {

@ -5,10 +5,13 @@ mod connections;
mod jsonrpc;
use jsonrpc::{JsonRpcErrorData, JsonRpcForwardedResponse};
use parking_lot::deadlock;
use serde_json::value::RawValue;
use std::fs;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tokio::runtime;
use tracing::info;
use warp::Filter;
@ -43,6 +46,24 @@ fn main() -> anyhow::Result<()> {
})
.build()?;
// spawn a thread for deadlock detection
thread::spawn(move || loop {
thread::sleep(Duration::from_secs(10));
let deadlocks = deadlock::check_deadlock();
if deadlocks.is_empty() {
continue;
}
println!("{} deadlocks detected", deadlocks.len());
for (i, threads) in deadlocks.iter().enumerate() {
println!("Deadlock #{}", i);
for t in threads {
println!("Thread Id {:#?}", t.thread_id());
println!("{:#?}", t.backtrace());
}
}
});
// spawn the root task
rt.block_on(async {
let listen_port = cli_config.listen_port;