From 7c7b25d6541facde21a28cb1a7a29e43e5df0d7d Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 20 May 2022 02:01:02 +0000 Subject: [PATCH] minimal app served its purpose --- .vscode/settings.json | 3 + Cargo.lock | 316 +++--------------------- Cargo.toml | 1 - web3-proxy-frontend/Cargo.toml | 2 +- web3-proxy-minimal/Cargo.toml | 33 --- web3-proxy-minimal/src/app.rs | 53 ---- web3-proxy-minimal/src/connection.rs | 342 -------------------------- web3-proxy-minimal/src/connections.rs | 245 ------------------ web3-proxy-minimal/src/main.rs | 88 ------- web3-proxy/Cargo.toml | 2 +- web3-proxy/src/app.rs | 3 +- web3-proxy/src/connections.rs | 15 +- 12 files changed, 48 insertions(+), 1055 deletions(-) create mode 100644 .vscode/settings.json delete mode 100644 web3-proxy-minimal/Cargo.toml delete mode 100644 web3-proxy-minimal/src/app.rs delete mode 100644 web3-proxy-minimal/src/connection.rs delete mode 100644 web3-proxy-minimal/src/connections.rs delete mode 100644 web3-proxy-minimal/src/main.rs diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..f5ab95b6 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "rust-analyzer.cargo.features": "all" +} \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 893c3832..2e15490c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1155,82 +1155,41 @@ dependencies = [ "uint", ] -[[package]] -name = "ethers" -version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" -dependencies = [ - "ethers-addressbook 0.1.0 (git+https://github.com/gakonst/ethers-rs)", - "ethers-contract 0.6.0", - "ethers-core 0.6.0", - "ethers-etherscan 0.2.0", - "ethers-middleware 0.6.0", - "ethers-providers 0.6.0", - "ethers-signers 0.6.0", - "ethers-solc 0.3.0 (git+https://github.com/gakonst/ethers-rs)", -] - [[package]] name = "ethers" version = "0.6.2" +source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a" dependencies = [ - "ethers-addressbook 0.1.0", - "ethers-contract 0.6.2", - "ethers-core 0.6.3", - "ethers-etherscan 0.2.2", - "ethers-middleware 0.6.2", - "ethers-providers 0.6.2", - "ethers-signers 0.6.2", - "ethers-solc 0.3.0", + "ethers-addressbook", + "ethers-contract", + "ethers-core", + "ethers-etherscan", + "ethers-middleware", + "ethers-providers", + "ethers-signers", + "ethers-solc", ] [[package]] name = "ethers-addressbook" version = "0.1.0" +source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a" dependencies = [ - "ethers-core 0.6.3", - "once_cell", - "serde", - "serde_json", -] - -[[package]] -name = "ethers-addressbook" -version = "0.1.0" -source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" -dependencies = [ - "ethers-core 0.6.0", + "ethers-core", "once_cell", "serde", "serde_json", ] -[[package]] -name = "ethers-contract" -version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" -dependencies = [ - "ethers-contract-abigen 0.6.0", - "ethers-contract-derive 0.6.0", - "ethers-core 0.6.0", - "ethers-providers 0.6.0", - "futures-util", - "hex", - "once_cell", - "pin-project", - "serde", - "serde_json", - "thiserror", -] - [[package]] name = "ethers-contract" version = "0.6.2" +source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a" dependencies = [ - "ethers-contract-abigen 0.6.3", - "ethers-contract-derive 0.6.3", - "ethers-core 0.6.3", - "ethers-providers 0.6.2", + "ethers-contract-abigen", + "ethers-contract-derive", + "ethers-core", + "ethers-providers", "futures-util", "hex", "once_cell", @@ -1240,36 +1199,15 @@ dependencies = [ "thiserror", ] -[[package]] -name = "ethers-contract-abigen" -version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" -dependencies = [ - "Inflector", - "cfg-if", - "dunce", - "ethers-core 0.6.0", - "eyre", - "getrandom", - "hex", - "proc-macro2", - "quote", - "reqwest", - "serde", - "serde_json", - "syn", - "url", - "walkdir", -] - [[package]] name = "ethers-contract-abigen" version = "0.6.3" +source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a" dependencies = [ "Inflector", "cfg-if", "dunce", - "ethers-core 0.6.3", + "ethers-core", "eyre", "getrandom", "hex", @@ -1283,26 +1221,13 @@ dependencies = [ "walkdir", ] -[[package]] -name = "ethers-contract-derive" -version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" -dependencies = [ - "ethers-contract-abigen 0.6.0", - "ethers-core 0.6.0", - "hex", - "proc-macro2", - "quote", - "serde_json", - "syn", -] - [[package]] name = "ethers-contract-derive" version = "0.6.3" +source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a" dependencies = [ - "ethers-contract-abigen 0.6.3", - "ethers-core 0.6.3", + "ethers-contract-abigen", + "ethers-core", "hex", "proc-macro2", "quote", @@ -1310,35 +1235,10 @@ dependencies = [ "syn", ] -[[package]] -name = "ethers-core" -version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" -dependencies = [ - "arrayvec", - "bytes", - "cargo_metadata", - "convert_case 0.5.0", - "elliptic-curve", - "ethabi", - "generic-array 0.14.5", - "hex", - "k256", - "once_cell", - "proc-macro2", - "rand", - "rlp", - "rlp-derive", - "serde", - "serde_json", - "syn", - "thiserror", - "tiny-keccak", -] - [[package]] name = "ethers-core" version = "0.6.3" +source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a" dependencies = [ "arrayvec", "bytes", @@ -1364,27 +1264,13 @@ dependencies = [ "unicode-xid", ] -[[package]] -name = "ethers-etherscan" -version = "0.2.0" -source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" -dependencies = [ - "ethers-core 0.6.0", - "ethers-solc 0.3.0 (git+https://github.com/gakonst/ethers-rs)", - "reqwest", - "serde", - "serde-aux", - "serde_json", - "thiserror", - "tracing", -] - [[package]] name = "ethers-etherscan" version = "0.2.2" +source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a" dependencies = [ - "ethers-core 0.6.3", - "ethers-solc 0.3.0", + "ethers-core", + "ethers-solc", "reqwest", "semver", "serde", @@ -1394,39 +1280,17 @@ dependencies = [ "tracing", ] -[[package]] -name = "ethers-middleware" -version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" -dependencies = [ - "async-trait", - "ethers-contract 0.6.0", - "ethers-core 0.6.0", - "ethers-etherscan 0.2.0", - "ethers-providers 0.6.0", - "ethers-signers 0.6.0", - "futures-util", - "instant", - "reqwest", - "serde", - "serde_json", - "thiserror", - "tokio", - "tracing", - "tracing-futures", - "url", -] - [[package]] name = "ethers-middleware" version = "0.6.2" +source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a" dependencies = [ "async-trait", - "ethers-contract 0.6.2", - "ethers-core 0.6.3", - "ethers-etherscan 0.2.2", - "ethers-providers 0.6.2", - "ethers-signers 0.6.2", + "ethers-contract", + "ethers-core", + "ethers-etherscan", + "ethers-providers", + "ethers-signers", "futures-locks", "futures-util", "instant", @@ -1440,49 +1304,15 @@ dependencies = [ "url", ] -[[package]] -name = "ethers-providers" -version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" -dependencies = [ - "async-trait", - "auto_impl", - "base64 0.13.0", - "ethers-core 0.6.0", - "futures-channel", - "futures-core", - "futures-timer", - "futures-util", - "hashers", - "hex", - "http", - "once_cell", - "parking_lot 0.11.2", - "pin-project", - "reqwest", - "serde", - "serde_json", - "thiserror", - "tokio", - "tokio-tungstenite 0.17.1", - "tracing", - "tracing-futures", - "url", - "wasm-bindgen", - "wasm-bindgen-futures", - "wasm-timer", - "web-sys", - "ws_stream_wasm", -] - [[package]] name = "ethers-providers" version = "0.6.2" +source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a" dependencies = [ "async-trait", "auto_impl", "base64 0.13.0", - "ethers-core 0.6.3", + "ethers-core", "futures-channel", "futures-core", "futures-timer", @@ -1509,33 +1339,17 @@ dependencies = [ "ws_stream_wasm", ] -[[package]] -name = "ethers-signers" -version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" -dependencies = [ - "async-trait", - "coins-bip32", - "coins-bip39", - "elliptic-curve", - "eth-keystore", - "ethers-core 0.6.0", - "hex", - "rand", - "sha2 0.9.9", - "thiserror", -] - [[package]] name = "ethers-signers" version = "0.6.2" +source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a" dependencies = [ "async-trait", "coins-bip32", "coins-bip39", "elliptic-curve", "eth-keystore", - "ethers-core 0.6.3", + "ethers-core", "hex", "rand", "sha2 0.9.9", @@ -1545,11 +1359,12 @@ dependencies = [ [[package]] name = "ethers-solc" version = "0.3.0" +source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a" dependencies = [ "cfg-if", "colored", "dunce", - "ethers-core 0.6.3", + "ethers-core", "getrandom", "glob", "hex", @@ -1572,35 +1387,6 @@ dependencies = [ "walkdir", ] -[[package]] -name = "ethers-solc" -version = "0.3.0" -source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" -dependencies = [ - "colored", - "dunce", - "ethers-core 0.6.0", - "getrandom", - "glob", - "hex", - "home", - "md-5", - "num_cpus", - "once_cell", - "path-slash", - "rayon", - "regex", - "semver", - "serde", - "serde_json", - "solang-parser", - "svm-rs", - "thiserror", - "tiny-keccak", - "tracing", - "walkdir", -] - [[package]] name = "eyre" version = "0.6.8" @@ -4522,7 +4308,7 @@ dependencies = [ "console-subscriber", "dashmap", "derive_more", - "ethers 0.6.0", + "ethers", "fdlimit", "flume", "futures", @@ -4558,32 +4344,6 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "web3-proxy-minimal" -version = "0.1.0" -dependencies = [ - "anyhow", - "arc-swap", - "argh", - "console-subscriber", - "derive_more", - "ethers 0.6.2", - "fdlimit", - "flume", - "futures", - "hashbrown 0.12.1", - "parking_lot 0.12.0", - "regex", - "reqwest", - "rustc-hash", - "serde", - "serde_json", - "tokio", - "toml", - "tracing", - "tracing-subscriber", -] - [[package]] name = "webpki" version = "0.22.0" diff --git a/Cargo.toml b/Cargo.toml index de3ff7b9..b5964cab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,6 @@ members = [ "linkedhashmap", "web3-proxy", "web3-proxy-frontend", - "web3-proxy-minimal", ] # TODO: enable these once rapid development is done diff --git a/web3-proxy-frontend/Cargo.toml b/web3-proxy-frontend/Cargo.toml index 977fef46..69dae5df 100644 --- a/web3-proxy-frontend/Cargo.toml +++ b/web3-proxy-frontend/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -axum = "*" +axum = "0.5.6" console-subscriber = { version = "0.1.5", features = ["parking_lot"] } hashbrown = "0.12.1" serde = { version = "1.0.137", features = [] } diff --git a/web3-proxy-minimal/Cargo.toml b/web3-proxy-minimal/Cargo.toml deleted file mode 100644 index 63cb0182..00000000 --- a/web3-proxy-minimal/Cargo.toml +++ /dev/null @@ -1,33 +0,0 @@ -[package] -name = "web3-proxy-minimal" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -anyhow = "1.0.57" -arc-swap = "1.5.0" -argh = "0.1.7" -# axum = "*" # TODO: use this instead of warp? -console-subscriber = { version = "0.1.5", features = ["parking_lot"] } -derive_more = "0.99.17" -ethers = { path = "../../ethers-rs", features = ["rustls", "ws"] } -fdlimit = "0.2.1" -flume = "0.10.12" -futures = { version = "0.3.21", features = ["thread-pool"] } -# TODO: governor has a "futures" and "futures-timer" feature. do we want those? -hashbrown = "0.12.1" -# TODO: parking_lot has an "arc_lock" feature that we might want to use -parking_lot = { version = "0.12.0", features = ["deadlock_detection"] } -# TODO: regex has several "perf" features that we might want to use -regex = "1.5.5" -reqwest = { version = "0.11.10", default-features = false, features = ["json", "tokio-rustls"] } -rustc-hash = "1.1.0" -serde = { version = "1.0.137", features = [] } -serde_json = { version = "1.0.81", default-features = false, features = ["alloc", "raw_value"] } -tokio = { version = "1.18.2", features = ["full", "tracing"] } -toml = "0.5.9" -tracing = "0.1.34" -# TODO: tracing-subscriber has serde and serde_json features that we might want to use -tracing-subscriber = { version = "0.3.11", features = ["parking_lot"] } diff --git a/web3-proxy-minimal/src/app.rs b/web3-proxy-minimal/src/app.rs deleted file mode 100644 index eebe3519..00000000 --- a/web3-proxy-minimal/src/app.rs +++ /dev/null @@ -1,53 +0,0 @@ -use crate::connections::Web3Connections; -use std::fmt; -use std::sync::Arc; -use std::time::Duration; - -static APP_USER_AGENT: &str = concat!( - "satoshiandkin/", - env!("CARGO_PKG_NAME"), - "/", - env!("CARGO_PKG_VERSION"), -); - -/// The application -// TODO: this debug impl is way too verbose. make something smaller -// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs -pub struct Web3ProxyApp { - /// Send requests to the best server available - balanced_rpcs: Arc, -} - -impl fmt::Debug for Web3ProxyApp { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though - f.debug_struct("Web3ProxyApp").finish_non_exhaustive() - } -} - -impl Web3ProxyApp { - // #[instrument(name = "try_new_Web3ProxyApp", skip_all)] - pub async fn try_new( - chain_id: u64, - balanced_rpcs: Vec, - ) -> anyhow::Result { - // make a http shared client - // TODO: how should we configure the connection pool? - // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server - let http_client = reqwest::ClientBuilder::new() - .connect_timeout(Duration::from_secs(5)) - .timeout(Duration::from_secs(60)) - .user_agent(APP_USER_AGENT) - .build()?; - - // TODO: attach context to this error - let balanced_rpcs = - Web3Connections::try_new(chain_id, balanced_rpcs, Some(http_client.clone())).await?; - - Ok(Web3ProxyApp { balanced_rpcs }) - } - - pub async fn run(&self) -> anyhow::Result<()> { - self.balanced_rpcs.subscribe_heads().await - } -} diff --git a/web3-proxy-minimal/src/connection.rs b/web3-proxy-minimal/src/connection.rs deleted file mode 100644 index caee7203..00000000 --- a/web3-proxy-minimal/src/connection.rs +++ /dev/null @@ -1,342 +0,0 @@ -///! Rate-limited communication with a web3 provider -use derive_more::From; -use ethers::prelude::{Block, Middleware, ProviderError, TxHash, H256}; -use futures::StreamExt; -use std::fmt; -use std::sync::atomic::{self, AtomicU32}; -use std::{cmp::Ordering, sync::Arc}; -use tokio::sync::RwLock; -use tokio::task; -use tokio::time::{interval, sleep, Duration, MissedTickBehavior}; -use tracing::{info, instrument, trace, warn}; - -/// TODO: instead of an enum, I tried to use Box, but hit https://github.com/gakonst/ethers-rs/issues/592 -#[derive(From)] -pub enum Web3Provider { - Http(ethers::providers::Provider), - Ws(ethers::providers::Provider), -} - -impl Web3Provider { - #[instrument] - async fn from_str(url_str: &str, http_client: Option) -> anyhow::Result { - let provider = if url_str.starts_with("http") { - let url: reqwest::Url = url_str.parse()?; - - let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?; - - let provider = ethers::providers::Http::new_with_client(url, http_client); - - // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) - ethers::providers::Provider::new(provider) - .interval(Duration::from_secs(1)) - .into() - } else if url_str.starts_with("ws") { - // TODO: wrapper automatically reconnect - let provider = ethers::providers::Ws::connect(url_str).await?; - - // TODO: make sure this automatically reconnects - - // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) - ethers::providers::Provider::new(provider) - .interval(Duration::from_secs(1)) - .into() - } else { - return Err(anyhow::anyhow!("only http and ws servers are supported")); - }; - - Ok(provider) - } -} - -impl fmt::Debug for Web3Provider { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url - f.debug_struct("Web3Provider").finish_non_exhaustive() - } -} - -/// An active connection to a Web3Rpc -pub struct Web3Connection { - /// TODO: can we get this from the provider? do we even need it? - url: String, - /// keep track of currently open requests. We sort on this - active_requests: AtomicU32, - /// this in a RwLock so that we can replace it if re-connecting - provider: RwLock>, - chain_id: u64, -} - -impl fmt::Debug for Web3Connection { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Web3Connection") - .field("url", &self.url) - .finish_non_exhaustive() - } -} - -impl fmt::Display for Web3Connection { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", &self.url) - } -} - -impl Web3Connection { - #[instrument(skip_all)] - pub async fn reconnect( - self: &Arc, - block_sender: &flume::Sender<(u64, H256, usize)>, - rpc_id: usize, - ) -> anyhow::Result<()> { - // websocket doesn't need the http client - let http_client = None; - - // since this lock is held open over an await, we use tokio's locking - let mut provider = self.provider.write().await; - - // tell the block subscriber that we are at 0 - block_sender.send_async((0, H256::zero(), rpc_id)).await?; - - let new_provider = Web3Provider::from_str(&self.url, http_client).await?; - - *provider = Arc::new(new_provider); - - Ok(()) - } - - /// Connect to a web3 rpc and subscribe to new heads - #[instrument(name = "try_new_Web3Connection", skip(http_client))] - pub async fn try_new( - chain_id: u64, - url_str: String, - // optional because this is only used for http providers. websocket providers don't use it - http_client: Option, - ) -> anyhow::Result> { - let provider = Web3Provider::from_str(&url_str, http_client).await?; - - let connection = Web3Connection { - url: url_str.clone(), - active_requests: 0.into(), - provider: RwLock::new(Arc::new(provider)), - chain_id, - }; - - Ok(Arc::new(connection)) - } - - #[instrument] - pub async fn check_chain_id(&self) -> anyhow::Result<()> { - // check the server's chain_id here - // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error - let found_chain_id: Result = - self.request("eth_chainId", Option::None::<()>).await; - - match found_chain_id { - Ok(found_chain_id) => { - let found_chain_id = - u64::from_str_radix(found_chain_id.trim_start_matches("0x"), 16).unwrap(); - - if self.chain_id != found_chain_id { - return Err(anyhow::anyhow!( - "incorrect chain id! Expected {}. Found {}", - self.chain_id, - found_chain_id - )); - } - } - Err(e) => { - let e = anyhow::Error::from(e).context(format!("{}", self)); - return Err(e); - } - } - - info!("Successful connection"); - - Ok(()) - } - - /// Send a web3 request - /// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented - /// By taking self here, we ensure that this is dropped after the request is complete - #[instrument(skip(params))] - pub async fn request( - &self, - method: &str, - params: T, - ) -> Result - where - T: fmt::Debug + serde::Serialize + Send + Sync, - R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug, - { - // TODO: use tracing spans properly - // TODO: it would be nice to have the request id on this - // TODO: including params in this is way too verbose - trace!("Sending {} to {}", method, self.url); - - let provider = self.provider.read().await.clone(); - - let response = match &*provider { - Web3Provider::Http(provider) => provider.request(method, params).await, - Web3Provider::Ws(provider) => provider.request(method, params).await, - }; - - // TODO: i think ethers already has trace logging (and does it much more fancy) - // TODO: at least instrument this with more useful information - trace!("Reply from {}", self.url); - - response - } - - #[instrument(skip_all)] - async fn send_block( - self: &Arc, - block: Result, ProviderError>, - block_sender: &flume::Sender<(u64, H256, usize)>, - rpc_id: usize, - ) { - match block { - Ok(block) => { - let block_number = block.number.unwrap().as_u64(); - let block_hash = block.hash.unwrap(); - - // TODO: i'm pretty sure we don't need send_async, but double check - block_sender - .send_async((block_number, block_hash, rpc_id)) - .await - .unwrap(); - } - Err(e) => { - warn!("unable to get block from {}: {}", self, e); - } - } - } - - /// Subscribe to new blocks. If `reconnect` is true, this runs forever. - /// TODO: instrument with the url - #[instrument(skip_all)] - pub async fn subscribe_new_heads( - self: Arc, - rpc_id: usize, - block_sender: flume::Sender<(u64, H256, usize)>, - reconnect: bool, - ) -> anyhow::Result<()> { - loop { - info!("Watching new_heads on {}", self); - - // TODO: is a RwLock of Arc the right thing here? - let provider = self.provider.read().await.clone(); - - match &*provider { - Web3Provider::Http(provider) => { - // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints - // TODO: what should this interval be? probably some fraction of block time. set automatically? - // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now - // TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though - let mut interval = interval(Duration::from_secs(2)); - interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - - let mut last_hash = Default::default(); - - loop { - // wait for the interval - // TODO: if error or rate limit, increase interval? - interval.tick().await; - - // TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest" - let block: Result, _> = provider - .request("eth_getBlockByNumber", ("latest", false)) - .await; - - // don't send repeat blocks - if let Ok(block) = &block { - let new_hash = block.hash.unwrap(); - - if new_hash == last_hash { - continue; - } - - last_hash = new_hash; - } - - self.send_block(block, &block_sender, rpc_id).await; - } - } - Web3Provider::Ws(provider) => { - // TODO: automatically reconnect? - // TODO: it would be faster to get the block number, but subscriptions don't provide that - // TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out? - let mut stream = provider.subscribe_blocks().await?; - - // query the block once since the subscription doesn't send the current block - // there is a very small race condition here where the stream could send us a new block right now - // all it does is print "new block" for the same block as current block - // TODO: rate limit! - let block: Result, _> = provider - .request("eth_getBlockByNumber", ("latest", false)) - .await; - - self.send_block(block, &block_sender, rpc_id).await; - - // TODO: should the stream have a timeout on it here? - // TODO: although reconnects will make this less of an issue - loop { - match stream.next().await { - Some(new_block) => { - self.send_block(Ok(new_block), &block_sender, rpc_id).await; - - // TODO: really not sure about this - task::yield_now().await; - } - None => { - warn!("subscription ended"); - break; - } - } - } - } - } - - if reconnect { - drop(provider); - - // TODO: exponential backoff - warn!("new heads subscription exited. reconnecting in 10 seconds..."); - sleep(Duration::from_secs(10)).await; - - self.reconnect(&block_sender, rpc_id).await?; - } else { - break; - } - } - - info!("Done watching new_heads on {}", self); - Ok(()) - } -} - -impl Eq for Web3Connection {} - -impl Ord for Web3Connection { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - // TODO: what atomic ordering?! - let a = self.active_requests.load(atomic::Ordering::Acquire); - let b = other.active_requests.load(atomic::Ordering::Acquire); - - a.cmp(&b) - } -} - -impl PartialOrd for Web3Connection { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -/// note that this is just comparing the active requests. two providers with different rpc urls are equal! -impl PartialEq for Web3Connection { - fn eq(&self, other: &Self) -> bool { - // TODO: what ordering?! - self.active_requests.load(atomic::Ordering::Acquire) - == other.active_requests.load(atomic::Ordering::Acquire) - } -} diff --git a/web3-proxy-minimal/src/connections.rs b/web3-proxy-minimal/src/connections.rs deleted file mode 100644 index 363124a4..00000000 --- a/web3-proxy-minimal/src/connections.rs +++ /dev/null @@ -1,245 +0,0 @@ -///! Load balanced communication with a group of web3 providers -use arc_swap::ArcSwap; -use derive_more::From; -use ethers::prelude::H256; -use futures::future::join_all; -use hashbrown::HashMap; -use std::cmp; -use std::collections::{BTreeMap, BTreeSet}; -use std::fmt; -use std::sync::Arc; -use tokio::task; -use tracing::Instrument; -use tracing::{info, info_span, instrument, warn}; - -use crate::connection::Web3Connection; - -#[derive(Clone, Debug, Default)] -struct SyncedConnections { - head_block_num: u64, - head_block_hash: H256, - inner: BTreeSet, -} - -/// A collection of web3 connections. Sends requests either the current best server or all servers. -#[derive(From)] -pub struct Web3Connections { - inner: Vec>, - synced_connections: ArcSwap, -} - -impl fmt::Debug for Web3Connections { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though - f.debug_struct("Web3Connections") - .field("inner", &self.inner) - .finish_non_exhaustive() - } -} - -impl Web3Connections { - // #[instrument(name = "try_new_Web3Connections", skip_all)] - pub async fn try_new( - chain_id: u64, - servers: Vec, - http_client: Option, - ) -> anyhow::Result> { - let num_connections = servers.len(); - - // turn configs into connections - let mut connections = Vec::with_capacity(num_connections); - for rpc_url in servers.into_iter() { - match Web3Connection::try_new(chain_id, rpc_url, http_client.clone()).await { - Ok(connection) => connections.push(connection), - Err(e) => warn!("Unable to connect to a server! {:?}", e), - } - } - - if connections.len() < 2 { - // TODO: less than 3? what should we do here? - return Err(anyhow::anyhow!( - "need at least 2 connections when subscribing to heads!" - )); - } - - let synced_connections = SyncedConnections::default(); - - let connections = Arc::new(Self { - inner: connections, - synced_connections: ArcSwap::new(Arc::new(synced_connections)), - }); - - Ok(connections) - } - - pub async fn subscribe_heads(self: &Arc) -> anyhow::Result<()> { - let (block_sender, block_receiver) = flume::unbounded(); - - let mut handles = vec![]; - - for (rpc_id, connection) in self.inner.iter().enumerate() { - // subscribe to new heads in a spawned future - // TODO: channel instead. then we can have one future with write access to a left-right? - let connection = Arc::clone(connection); - let block_sender = block_sender.clone(); - - // let url = connection.url().to_string(); - - let handle = task::Builder::default() - .name("subscribe_new_heads") - .spawn(async move { - // loop to automatically reconnect - // TODO: make this cancellable? - // TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date - // TODO: proper span - connection.check_chain_id().await?; - - connection - .subscribe_new_heads(rpc_id, block_sender.clone(), true) - .instrument(tracing::info_span!("url")) - .await - }); - - handles.push(handle); - } - - let connections = Arc::clone(self); - let handle = task::Builder::default() - .name("update_synced_rpcs") - .spawn(async move { connections.update_synced_rpcs(block_receiver).await }); - - handles.push(handle); - - for x in join_all(handles).await { - match x { - Ok(Ok(_)) => {} - Ok(Err(e)) => return Err(e), - Err(e) => return Err(e.into()), - } - } - - Ok(()) - } - - /// TODO: move parts of this onto SyncedConnections? - #[instrument(skip_all)] - async fn update_synced_rpcs( - &self, - block_receiver: flume::Receiver<(u64, H256, usize)>, - ) -> anyhow::Result<()> { - let max_connections = self.inner.len(); - - let mut connection_states: HashMap = - HashMap::with_capacity(max_connections); - - let mut pending_synced_connections = SyncedConnections::default(); - - while let Ok((new_block_num, new_block_hash, rpc_id)) = block_receiver.recv_async().await { - // TODO: span with rpc in it, too - // TODO: make sure i'm doing this span right - let span = info_span!("block_receiver", rpc_id, new_block_num); - let _enter = span.enter(); - - if new_block_num == 0 { - // TODO: show the actual rpc url? - warn!("rpc #{} is still syncing", rpc_id); - } - - connection_states.insert(rpc_id, (new_block_num, new_block_hash)); - - // TODO: do something to update the synced blocks - match new_block_num.cmp(&pending_synced_connections.head_block_num) { - cmp::Ordering::Greater => { - // the rpc's newest block is the new overall best block - info!("new head: {:?}", new_block_hash); - - pending_synced_connections.inner.clear(); - pending_synced_connections.inner.insert(rpc_id); - - pending_synced_connections.head_block_num = new_block_num; - - // TODO: if the parent hash isn't our previous best block, ignore it - pending_synced_connections.head_block_hash = new_block_hash; - } - cmp::Ordering::Equal => { - if new_block_hash == pending_synced_connections.head_block_hash { - // this rpc has caught up with the best known head - // do not clear synced_connections. - // we just want to add this rpc to the end - // TODO: HashSet here? i think we get dupes if we don't - pending_synced_connections.inner.insert(rpc_id); - } else { - // same height, but different chain - - // check connection_states to see which head block is more popular! - let mut rpc_ids_by_block: BTreeMap> = BTreeMap::new(); - - let mut synced_rpcs = 0; - - // TODO: what order should we iterate in? track last update time, too? - for (rpc_id, (block_num, block_hash)) in connection_states.iter() { - if *block_num != new_block_num { - // this connection isn't synced. we don't care what hash it has - continue; - } - - synced_rpcs += 1; - - let count = rpc_ids_by_block - .entry(*block_hash) - .or_insert_with(|| Vec::with_capacity(max_connections - 1)); - - count.push(*rpc_id); - } - - let most_common_head_hash = rpc_ids_by_block - .iter() - .max_by(|a, b| a.1.len().cmp(&b.1.len())) - .map(|(k, _v)| k) - .unwrap(); - - warn!( - "chain is forked! {} possible heads. {}/{}/{} rpcs have {}", - rpc_ids_by_block.len(), - rpc_ids_by_block.get(most_common_head_hash).unwrap().len(), - synced_rpcs, - max_connections, - most_common_head_hash - ); - - // this isn't the best block in the tier. make sure this rpc isn't included - if new_block_hash != *most_common_head_hash { - pending_synced_connections.inner.remove(&rpc_id); - } - - // TODO: if pending_synced_connections hasn't changed. continue - } - } - cmp::Ordering::Less => { - // this isn't the best block in the tier. don't do anything - if !pending_synced_connections.inner.remove(&rpc_id) { - // we didn't remove anything. nothing more to do - // the rpc must be behind by more than 1 block - continue; - } - // we removed. don't continue so that we update self.synced_connections - } - } - - // the synced connections have changed - let synced_connections = Arc::new(pending_synced_connections.clone()); - - info!( - "new synced_connections for {}: {:?}", - synced_connections.head_block_hash, synced_connections.inner - ); - - // TODO: only do this if there are 2 nodes synced to this block? - // do the arcswap - self.synced_connections.swap(synced_connections); - } - - // TODO: if there was an error, we should return it - Err(anyhow::anyhow!("block_receiver exited!")) - } -} diff --git a/web3-proxy-minimal/src/main.rs b/web3-proxy-minimal/src/main.rs deleted file mode 100644 index d2fc98b3..00000000 --- a/web3-proxy-minimal/src/main.rs +++ /dev/null @@ -1,88 +0,0 @@ -mod app; -mod connection; -mod connections; - -use parking_lot::deadlock; -use std::env; -use std::sync::atomic::{self, AtomicUsize}; -use std::thread; -use std::time::Duration; -use tokio::runtime; - -use crate::app::Web3ProxyApp; - -fn main() -> anyhow::Result<()> { - // TODO: is there a better way to do this? - if env::var("RUST_LOG").is_err() { - env::set_var("RUST_LOG", "web3_proxy_minimal=debug"); - } - - // install global collector configured based on RUST_LOG env var. - // tracing_subscriber::fmt().init(); - console_subscriber::init(); - - fdlimit::raise_fd_limit(); - - let chain_id = 1; - let workers = 4; - - let mut rt_builder = runtime::Builder::new_multi_thread(); - - rt_builder.enable_all().thread_name_fn(move || { - static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); - // TODO: what ordering? i think we want seqcst so that these all happen in order, but that might be stricter than we really need - let worker_id = ATOMIC_ID.fetch_add(1, atomic::Ordering::SeqCst); - // TODO: i think these max at 15 characters - format!("web3-{}-{}", chain_id, worker_id) - }); - - if workers > 0 { - rt_builder.worker_threads(workers); - } - - let rt = rt_builder.build()?; - - // spawn a thread for deadlock detection - thread::spawn(move || loop { - thread::sleep(Duration::from_secs(10)); - let deadlocks = deadlock::check_deadlock(); - if deadlocks.is_empty() { - continue; - } - - println!("{} deadlocks detected", deadlocks.len()); - for (i, threads) in deadlocks.iter().enumerate() { - println!("Deadlock #{}", i); - for t in threads { - println!("Thread Id {:#?}", t.thread_id()); - println!("{:#?}", t.backtrace()); - } - } - }); - - // spawn the root task - rt.block_on(async { - let balanced_rpcs = vec![ - "http://127.0.0.1:8545", - "ws://127.0.0.1:8546", - "http://127.0.0.1:8549", - "ws://127.0.0.1:8549", - "https://api.edennetwork.io/v1/", - "https://api.edennetwork.io/v1/beta", - "https://rpc.ethermine.org", - "https://rpc.flashbots.net", - "https://gibson.securerpc.com/v1", - "wss://ws-nd-373-761-850.p2pify.com/106d73af4cebc487df5ba92f1ad8dee7", - "wss://mainnet.infura.io/ws/v3/c6fa1b6f17124b44ae71b2b25601aee0", - "wss://ecfa2710350449f490725c4525cba584.eth.ws.rivet.cloud/", - "wss://speedy-nodes-nyc.moralis.io/3587198387de4b2d711f6999/eth/mainnet/archive/ws", - ] - .into_iter() - .map(|x| x.to_string()) - .collect(); - - let app = Web3ProxyApp::try_new(chain_id, balanced_rpcs).await?; - - app.run().await - }) -} diff --git a/web3-proxy/Cargo.toml b/web3-proxy/Cargo.toml index 698fd267..ae78f0d2 100644 --- a/web3-proxy/Cargo.toml +++ b/web3-proxy/Cargo.toml @@ -13,7 +13,7 @@ argh = "0.1.7" console-subscriber = { version = "0.1.5", features = ["parking_lot"] } dashmap = "5.3.3" derive_more = "0.99.17" -ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] } +ethers = { git = "https://github.com/SatoshiAndKin/ethers-rs", features = ["rustls", "ws"] } fdlimit = "0.2.1" flume = "0.10.12" futures = { version = "0.3.21", features = ["thread-pool"] } diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 98b1091b..be7c8e4e 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -79,6 +79,7 @@ impl Web3ProxyApp { Web3Connections::try_new(chain_id, balanced_rpcs, Some(http_client.clone()), &clock) .await?; + // TODO: do this separately instead of during try_new { let balanced_rpcs = balanced_rpcs.clone(); task::spawn(async move { @@ -110,7 +111,6 @@ impl Web3ProxyApp { self: Arc, request: JsonRpcRequestEnum, ) -> anyhow::Result { - // TODO: i feel like i don't see this log when i should (even though i do see the response) debug!("Received request: {:?}", request); let response = match request { @@ -122,7 +122,6 @@ impl Web3ProxyApp { } }; - // TODO: i feel like i don't see this log when i should (even though i do see the response) debug!("Forwarding response: {:?}", response); Ok(warp::reply::json(&response)) diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 1bdb4462..19200a56 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -20,7 +20,7 @@ use tracing::{info, info_span, instrument, trace, warn}; use crate::config::Web3ConnectionConfig; use crate::connection::{ActiveRequestHandle, Web3Connection}; -#[derive(Clone)] +#[derive(Clone, Default)] struct SyncedConnections { head_block_num: u64, head_block_hash: H256, @@ -35,14 +35,6 @@ impl fmt::Debug for SyncedConnections { } impl SyncedConnections { - fn new(max_connections: usize) -> Self { - Self { - head_block_num: 0, - head_block_hash: Default::default(), - inner: HashSet::with_capacity(max_connections), - } - } - pub fn get_head_block_hash(&self) -> &H256 { &self.head_block_hash } @@ -93,7 +85,7 @@ impl Web3Connections { )); } - let synced_connections = SyncedConnections::new(num_connections); + let synced_connections = SyncedConnections::default(); let connections = Arc::new(Self { inner: connections, @@ -139,6 +131,7 @@ impl Web3Connections { handles.push(handle); + // TODO: do something with join_all's result join_all(handles).await; } @@ -224,7 +217,7 @@ impl Web3Connections { let mut connection_states: HashMap = HashMap::with_capacity(max_connections); - let mut pending_synced_connections = SyncedConnections::new(max_connections); + let mut pending_synced_connections = SyncedConnections::default(); while let Ok((new_block_num, new_block_hash, rpc_id)) = block_receiver.recv_async().await { // TODO: span with more in it?