diff --git a/Cargo.lock b/Cargo.lock index c11e3906..0f2d1221 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12,6 +12,21 @@ dependencies = [ "regex", ] +[[package]] +name = "addr2line" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9ecd88a8c8378ca913a680cd98f0f13ac67383d35993f86c90a70e3f137816b" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "aes" version = "0.7.5" @@ -154,6 +169,21 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "backtrace" +version = "0.3.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11a17d453482a265fd5f8479f2a3f405566e6ca627837aaddb85af8b1ab8ef61" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "base16ct" version = "0.1.1" @@ -1470,6 +1500,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "gimli" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" + [[package]] name = "glob" version = "0.3.0" @@ -2017,6 +2053,15 @@ dependencies = [ "unicase", ] +[[package]] +name = "miniz_oxide" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2b29bd4bc3f33391105ebee3589c19197c4271e3e5a9ec9bfe8127eeff8f082" +dependencies = [ + "adler", +] + [[package]] name = "mio" version = "0.8.2" @@ -2152,6 +2197,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" +[[package]] +name = "object" +version = "0.28.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e42c982f2d955fac81dd7e1d0e1426a7d702acd9c98d19ab01083a6a0328c424" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.10.0" @@ -2249,10 +2303,13 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929" dependencies = [ + "backtrace", "cfg-if", "libc", + "petgraph", "redox_syscall", "smallvec", + "thread-id", "windows-sys", ] @@ -2795,6 +2852,12 @@ dependencies = [ "syn", ] +[[package]] +name = "rustc-demangle" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" + [[package]] name = "rustc-hash" version = "1.1.0" @@ -3333,6 +3396,17 @@ dependencies = [ "syn", ] +[[package]] +name = "thread-id" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fdfe0627923f7411a43ec9ec9c39c3a9b4151be313e0922042581fb6c9b717f" +dependencies = [ + "libc", + "redox_syscall", + "winapi", +] + [[package]] name = "thread_local" version = "1.1.4" diff --git a/Cargo.toml b/Cargo.toml index addd8055..810e197d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,6 @@ members = [ ] # TODO: enable these once rapid development is done -#[profile.release] +[profile.release] #lto = true -#panic = "abort" +panic = "abort" diff --git a/config/example.toml b/config/example.toml index 093f1188..7acc7902 100644 --- a/config/example.toml +++ b/config/example.toml @@ -35,5 +35,5 @@ chain_id = 1 soft_limit = 7074 [private_rpcs.securerpc] - url = "wss://gibson.securerpc.com/v1" + url = "https://gibson.securerpc.com/v1" soft_limit = 4560 diff --git a/web3-proxy/Cargo.toml b/web3-proxy/Cargo.toml index 022f7f3f..31330e58 100644 --- a/web3-proxy/Cargo.toml +++ b/web3-proxy/Cargo.toml @@ -13,11 +13,11 @@ derive_more = "0.99.17" ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] } flume = "0.10.12" futures = { version = "0.3.21", features = ["thread-pool"] } -governor = { version = "0.4.2", features = ["std"] } +governor = { version = "0.4.2", features = ["dashmap", "std"] } hashbrown = "0.12.1" left-right = "0.11.4" linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] } -parking_lot = "0.12.0" +parking_lot = { version = "0.12.0", features = ["deadlock_detection"] } proctitle = "0.1.1" regex = "1.5.5" reqwest = { version = "0.11.10", default-features = false, features = ["json", "rustls"] } diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 3606e9e6..2761abcc 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -212,7 +212,6 @@ impl Web3ProxyApp { // TODO: think more about this loop. loop { // todo: bring back this caching - /* let best_block_hash = self .balanced_rpcs .get_synced_rpcs() @@ -233,7 +232,6 @@ impl Web3ProxyApp { // TODO: return a reference in the other places so that this works without a clone? return Ok(cached.to_owned()); } - */ match self.balanced_rpcs.next_upstream_server().await { Ok(active_request_handle) => { @@ -254,7 +252,6 @@ impl Web3ProxyApp { error: None, }; - /* // TODO: small race condidition here. parallel requests with the same query will both be saved to the cache let mut response_cache = self.response_cache.write(); @@ -266,7 +263,6 @@ impl Web3ProxyApp { } drop(response_cache); - */ response } diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index aa13aae5..6ffff20e 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -164,7 +164,7 @@ impl Web3Connection { &self.url } - fn send_block( + async fn send_block( self: &Arc, block: Result, ProviderError>, block_sender: &flume::Sender<(u64, H256, Arc)>, @@ -176,7 +176,8 @@ impl Web3Connection { // TODO: i'm pretty sure we don't need send_async, but double check block_sender - .send((block_number, block_hash, self.clone())) + .send_async((block_number, block_hash, self.clone())) + .await .unwrap(); } Err(e) => { @@ -228,7 +229,7 @@ impl Web3Connection { last_hash = new_hash; } - self.send_block(block, &block_sender); + self.send_block(block, &block_sender).await; } } Web3Provider::Ws(provider) => { @@ -253,10 +254,10 @@ impl Web3Connection { drop(active_request_handle); - self.send_block(block, &block_sender); + self.send_block(block, &block_sender).await; while let Some(new_block) = stream.next().await { - self.send_block(Ok(new_block), &block_sender); + self.send_block(Ok(new_block), &block_sender).await; } } } diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 42ad4fba..be39503d 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -245,7 +245,10 @@ impl Web3Connections { // send the first good response to a one shot channel. that way we respond quickly // drop the result because errors are expected after the first send - response_sender.send(Ok(response)).map_err(Into::into) + response_sender + .send_async(Ok(response)) + .await + .map_err(Into::into) }); unordered_futures.push(handle); @@ -273,7 +276,7 @@ impl Web3Connections { }; // send the error to the channel - if response_sender.send(e).is_ok() { + if response_sender.send_async(e).await.is_ok() { // if we were able to send an error, then we never sent a success return Err(anyhow::anyhow!("no successful responses")); } else { diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index 71aa8f3f..7add1723 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -5,10 +5,13 @@ mod connections; mod jsonrpc; use jsonrpc::{JsonRpcErrorData, JsonRpcForwardedResponse}; +use parking_lot::deadlock; use serde_json::value::RawValue; use std::fs; use std::sync::atomic::{self, AtomicUsize}; use std::sync::Arc; +use std::thread; +use std::time::Duration; use tokio::runtime; use tracing::info; use warp::Filter; @@ -43,6 +46,24 @@ fn main() -> anyhow::Result<()> { }) .build()?; + // spawn a thread for deadlock detection + thread::spawn(move || loop { + thread::sleep(Duration::from_secs(10)); + let deadlocks = deadlock::check_deadlock(); + if deadlocks.is_empty() { + continue; + } + + println!("{} deadlocks detected", deadlocks.len()); + for (i, threads) in deadlocks.iter().enumerate() { + println!("Deadlock #{}", i); + for t in threads { + println!("Thread Id {:#?}", t.thread_id()); + println!("{:#?}", t.backtrace()); + } + } + }); + // spawn the root task rt.block_on(async { let listen_port = cli_config.listen_port;