diff --git a/Cargo.lock b/Cargo.lock index 3cf79aeb..a30936f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -927,7 +927,7 @@ dependencies = [ [[package]] name = "ethers" version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#a866cd57260b37fd9be5aa3ba33083b9daa5405d" +source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" dependencies = [ "ethers-addressbook", "ethers-contract", @@ -942,7 +942,7 @@ dependencies = [ [[package]] name = "ethers-addressbook" version = "0.1.0" -source = "git+https://github.com/gakonst/ethers-rs#a866cd57260b37fd9be5aa3ba33083b9daa5405d" +source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" dependencies = [ "ethers-core", "once_cell", @@ -953,7 +953,7 @@ dependencies = [ [[package]] name = "ethers-contract" version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#a866cd57260b37fd9be5aa3ba33083b9daa5405d" +source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" dependencies = [ "ethers-contract-abigen", "ethers-contract-derive", @@ -971,7 +971,7 @@ dependencies = [ [[package]] name = "ethers-contract-abigen" version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#a866cd57260b37fd9be5aa3ba33083b9daa5405d" +source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" dependencies = [ "Inflector", "cfg-if", @@ -993,7 +993,7 @@ dependencies = [ [[package]] name = "ethers-contract-derive" version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#a866cd57260b37fd9be5aa3ba33083b9daa5405d" +source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" dependencies = [ "ethers-contract-abigen", "ethers-core", @@ -1007,7 +1007,7 @@ dependencies = [ [[package]] name = "ethers-core" version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#a866cd57260b37fd9be5aa3ba33083b9daa5405d" +source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" dependencies = [ "arrayvec", "bytes", @@ -1033,7 +1033,7 @@ dependencies = [ [[package]] name = "ethers-etherscan" version = "0.2.0" -source = "git+https://github.com/gakonst/ethers-rs#a866cd57260b37fd9be5aa3ba33083b9daa5405d" +source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" dependencies = [ "ethers-core", "ethers-solc", @@ -1048,7 +1048,7 @@ dependencies = [ [[package]] name = "ethers-middleware" version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#a866cd57260b37fd9be5aa3ba33083b9daa5405d" +source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" dependencies = [ "async-trait", "ethers-contract", @@ -1071,7 +1071,7 @@ dependencies = [ [[package]] name = "ethers-providers" version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#a866cd57260b37fd9be5aa3ba33083b9daa5405d" +source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" dependencies = [ "async-trait", "auto_impl", @@ -1106,7 +1106,7 @@ dependencies = [ [[package]] name = "ethers-signers" version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#a866cd57260b37fd9be5aa3ba33083b9daa5405d" +source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" dependencies = [ "async-trait", "coins-bip32", @@ -1123,7 +1123,7 @@ dependencies = [ [[package]] name = "ethers-solc" version = "0.3.0" -source = "git+https://github.com/gakonst/ethers-rs#a866cd57260b37fd9be5aa3ba33083b9daa5405d" +source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" dependencies = [ "colored", "dunce", @@ -1539,9 +1539,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.6" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03" +checksum = "ff8670570af52249509a86f5e3e18a08c60b177071826898fde8997cf5f6bfbb" dependencies = [ "bytes", "fnv", @@ -1561,9 +1561,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.7.0" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6330e8a36bd8c859f3fa6d9382911fbb7147ec39807f63b923933a247240b9ba" +checksum = "496ce29bb5a52785b44e0f7ca2847ae0bb839c9bd28f69acac9b99d461c0c04c" [[package]] name = "httpdate" @@ -2304,9 +2304,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" [[package]] name = "pin-utils" @@ -3213,9 +3213,9 @@ dependencies = [ [[package]] name = "tinyvec" -version = "1.5.1" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c1c1d5a42b6245520c249549ec267180beaffcc0615401ac8e31853d4b6d8d2" +checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" dependencies = [ "tinyvec_macros", ] @@ -3228,9 +3228,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.17.0" +version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" +checksum = "0f48b6d60512a392e34dbf7fd456249fd2de3c83669ab642e021903f4015185b" dependencies = [ "bytes", "libc", @@ -3376,9 +3376,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.20" +version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e65ce065b4b5c53e73bb28912318cb8c9e9ad3921f1d669eb0e68b4c8143a2b" +checksum = "cc6b8ad3567499f98a1db7a752b07a7c8c7c7c34c332ec00effb2b0027974b7c" dependencies = [ "proc-macro2", "quote", @@ -3514,9 +3514,9 @@ dependencies = [ [[package]] name = "unicode-bidi" -version = "0.3.7" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a01404663e3db436ed2746d9fefef640d868edae3cceb81c3b8d5732fda678f" +checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992" [[package]] name = "unicode-normalization" diff --git a/Cargo.toml b/Cargo.toml index 79ad55bc..0e43dc5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,17 +10,17 @@ arc-swap = "1.5.0" argh = "0.1.7" anyhow = "1.0.57" atomic-counter = "1.0.1" -derive_more = "0.99" +derive_more = "0.99.17" ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] } futures = { version = "0.3.21", features = ["thread-pool"] } governor = { version = "0.4.2", features = ["dashmap", "std"] } -tokio = { version = "1.17.0", features = ["full"] } -parking_lot = "0.12" +tokio = { version = "1.18.0", features = ["full"] } +parking_lot = "0.12.0" regex = "1.5.5" -reqwest = { version = "0.11.10", features = ["json"] } -serde = {version = "1.0"} +reqwest = { version = "0.11.10", features = ["json", "rustls"] } +serde = "1.0.136" serde_json = { version = "1.0.79", default-features = false, features = ["alloc"] } -tracing = "0.1" -tracing-subscriber = "0.3" +tracing = "0.1.34" +tracing-subscriber = "0.3.11" url = "2.2.2" warp = "0.3.2" diff --git a/src/main.rs b/src/main.rs index 028f6ad4..1d85b78f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,8 @@ mod provider; mod provider_tiers; use futures::future; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use governor::clock::{Clock, QuantaClock}; use serde_json::json; use std::collections::HashMap; @@ -77,6 +79,7 @@ impl Web3ProxyApp { // TODO: how should we configure the connection pool? // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server let http_client = reqwest::ClientBuilder::new() + .connect_timeout(Duration::from_secs(5)) .timeout(Duration::from_secs(300)) .user_agent(APP_USER_AGENT) .build()?; @@ -97,6 +100,7 @@ impl Web3ProxyApp { let private_rpcs = if private_rpcs.is_empty() { warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); + // TODO: instead of None, set it to a list of all the rpcs from balanced_rpc_tiers. that way we broadcast very loudly None } else { Some(Arc::new( @@ -369,14 +373,18 @@ impl Web3ProxyApp { .ok_or_else(|| anyhow::anyhow!("no params"))? .to_owned(); - // send the query to all the servers - let bodies = future::join_all(rpc_servers.into_iter().map(|rpc| { + // TODO: lets just use a usize index or something + let method = Arc::new(method); + + let mut unordered_futures = FuturesUnordered::new(); + + for rpc in rpc_servers { let connections = connections.clone(); let method = method.clone(); let params = params.clone(); let tx = tx.clone(); - async move { + let handle = tokio::spawn(async move { // get the client for this rpc server let provider = connections.get(&rpc).unwrap().clone_provider(); @@ -393,14 +401,15 @@ impl Web3ProxyApp { let _ = tx.send(Ok(response)); Ok::<(), anyhow::Error>(()) - } - })) - .await; + }); + + unordered_futures.push(handle); + } // TODO: use iterators instead of pushing into a vec let mut errs = vec![]; - for x in bodies { - match x { + if let Some(x) = unordered_futures.next().await { + match x.unwrap() { Ok(_) => {} Err(e) => { // TODO: better errors @@ -437,9 +446,11 @@ async fn main() { // TODO: support multiple chains in one process? then we could just point "chain.stytt.com" at this and caddy wouldn't need anything else // TODO: be smart about about using archive nodes? have a set that doesn't use archive nodes since queries to them are more valuable let listen_port = 8445; + // TODO: what should this be? 0 will cause a thundering herd + let allowed_lag = 0; let state = Web3ProxyApp::try_new( - 1, + allowed_lag, vec![ // local nodes vec![("ws://10.11.12.16:8545", 0), ("ws://10.11.12.16:8946", 0)], @@ -471,15 +482,13 @@ async fn main() { let proxy_rpc_filter = warp::any() .and(warp::post()) .and(warp::body::json()) - .then(move |json_body| state.clone().proxy_web3_rpc(json_body)) - .map(handle_anyhow_errors); + .then(move |json_body| state.clone().proxy_web3_rpc(json_body)); - // TODO: filter for displaying connections - // TODO: filter for displaying + // TODO: filter for displaying connections and their block heights // TODO: warp trace is super verbose. how do we make this more readable? // let routes = proxy_rpc_filter.with(warp::trace::request()); - let routes = proxy_rpc_filter; + let routes = proxy_rpc_filter.map(handle_anyhow_errors); warp::serve(routes).run(([0, 0, 0, 0], listen_port)).await; }