minimal app served its purpose

This commit is contained in:
Bryan Stitt 2022-05-20 02:01:02 +00:00
parent c3d1d14f16
commit 7c7b25d654
12 changed files with 48 additions and 1055 deletions

3
.vscode/settings.json vendored Normal file

@ -0,0 +1,3 @@
{
"rust-analyzer.cargo.features": "all"
}

316
Cargo.lock generated

@ -1155,82 +1155,41 @@ dependencies = [
"uint",
]
[[package]]
name = "ethers"
version = "0.6.0"
source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb"
dependencies = [
"ethers-addressbook 0.1.0 (git+https://github.com/gakonst/ethers-rs)",
"ethers-contract 0.6.0",
"ethers-core 0.6.0",
"ethers-etherscan 0.2.0",
"ethers-middleware 0.6.0",
"ethers-providers 0.6.0",
"ethers-signers 0.6.0",
"ethers-solc 0.3.0 (git+https://github.com/gakonst/ethers-rs)",
]
[[package]]
name = "ethers"
version = "0.6.2"
source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a"
dependencies = [
"ethers-addressbook 0.1.0",
"ethers-contract 0.6.2",
"ethers-core 0.6.3",
"ethers-etherscan 0.2.2",
"ethers-middleware 0.6.2",
"ethers-providers 0.6.2",
"ethers-signers 0.6.2",
"ethers-solc 0.3.0",
"ethers-addressbook",
"ethers-contract",
"ethers-core",
"ethers-etherscan",
"ethers-middleware",
"ethers-providers",
"ethers-signers",
"ethers-solc",
]
[[package]]
name = "ethers-addressbook"
version = "0.1.0"
source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a"
dependencies = [
"ethers-core 0.6.3",
"once_cell",
"serde",
"serde_json",
]
[[package]]
name = "ethers-addressbook"
version = "0.1.0"
source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb"
dependencies = [
"ethers-core 0.6.0",
"ethers-core",
"once_cell",
"serde",
"serde_json",
]
[[package]]
name = "ethers-contract"
version = "0.6.0"
source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb"
dependencies = [
"ethers-contract-abigen 0.6.0",
"ethers-contract-derive 0.6.0",
"ethers-core 0.6.0",
"ethers-providers 0.6.0",
"futures-util",
"hex",
"once_cell",
"pin-project",
"serde",
"serde_json",
"thiserror",
]
[[package]]
name = "ethers-contract"
version = "0.6.2"
source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a"
dependencies = [
"ethers-contract-abigen 0.6.3",
"ethers-contract-derive 0.6.3",
"ethers-core 0.6.3",
"ethers-providers 0.6.2",
"ethers-contract-abigen",
"ethers-contract-derive",
"ethers-core",
"ethers-providers",
"futures-util",
"hex",
"once_cell",
@ -1240,36 +1199,15 @@ dependencies = [
"thiserror",
]
[[package]]
name = "ethers-contract-abigen"
version = "0.6.0"
source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb"
dependencies = [
"Inflector",
"cfg-if",
"dunce",
"ethers-core 0.6.0",
"eyre",
"getrandom",
"hex",
"proc-macro2",
"quote",
"reqwest",
"serde",
"serde_json",
"syn",
"url",
"walkdir",
]
[[package]]
name = "ethers-contract-abigen"
version = "0.6.3"
source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a"
dependencies = [
"Inflector",
"cfg-if",
"dunce",
"ethers-core 0.6.3",
"ethers-core",
"eyre",
"getrandom",
"hex",
@ -1283,26 +1221,13 @@ dependencies = [
"walkdir",
]
[[package]]
name = "ethers-contract-derive"
version = "0.6.0"
source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb"
dependencies = [
"ethers-contract-abigen 0.6.0",
"ethers-core 0.6.0",
"hex",
"proc-macro2",
"quote",
"serde_json",
"syn",
]
[[package]]
name = "ethers-contract-derive"
version = "0.6.3"
source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a"
dependencies = [
"ethers-contract-abigen 0.6.3",
"ethers-core 0.6.3",
"ethers-contract-abigen",
"ethers-core",
"hex",
"proc-macro2",
"quote",
@ -1310,35 +1235,10 @@ dependencies = [
"syn",
]
[[package]]
name = "ethers-core"
version = "0.6.0"
source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb"
dependencies = [
"arrayvec",
"bytes",
"cargo_metadata",
"convert_case 0.5.0",
"elliptic-curve",
"ethabi",
"generic-array 0.14.5",
"hex",
"k256",
"once_cell",
"proc-macro2",
"rand",
"rlp",
"rlp-derive",
"serde",
"serde_json",
"syn",
"thiserror",
"tiny-keccak",
]
[[package]]
name = "ethers-core"
version = "0.6.3"
source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a"
dependencies = [
"arrayvec",
"bytes",
@ -1364,27 +1264,13 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "ethers-etherscan"
version = "0.2.0"
source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb"
dependencies = [
"ethers-core 0.6.0",
"ethers-solc 0.3.0 (git+https://github.com/gakonst/ethers-rs)",
"reqwest",
"serde",
"serde-aux",
"serde_json",
"thiserror",
"tracing",
]
[[package]]
name = "ethers-etherscan"
version = "0.2.2"
source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a"
dependencies = [
"ethers-core 0.6.3",
"ethers-solc 0.3.0",
"ethers-core",
"ethers-solc",
"reqwest",
"semver",
"serde",
@ -1394,39 +1280,17 @@ dependencies = [
"tracing",
]
[[package]]
name = "ethers-middleware"
version = "0.6.0"
source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb"
dependencies = [
"async-trait",
"ethers-contract 0.6.0",
"ethers-core 0.6.0",
"ethers-etherscan 0.2.0",
"ethers-providers 0.6.0",
"ethers-signers 0.6.0",
"futures-util",
"instant",
"reqwest",
"serde",
"serde_json",
"thiserror",
"tokio",
"tracing",
"tracing-futures",
"url",
]
[[package]]
name = "ethers-middleware"
version = "0.6.2"
source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a"
dependencies = [
"async-trait",
"ethers-contract 0.6.2",
"ethers-core 0.6.3",
"ethers-etherscan 0.2.2",
"ethers-providers 0.6.2",
"ethers-signers 0.6.2",
"ethers-contract",
"ethers-core",
"ethers-etherscan",
"ethers-providers",
"ethers-signers",
"futures-locks",
"futures-util",
"instant",
@ -1440,49 +1304,15 @@ dependencies = [
"url",
]
[[package]]
name = "ethers-providers"
version = "0.6.0"
source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb"
dependencies = [
"async-trait",
"auto_impl",
"base64 0.13.0",
"ethers-core 0.6.0",
"futures-channel",
"futures-core",
"futures-timer",
"futures-util",
"hashers",
"hex",
"http",
"once_cell",
"parking_lot 0.11.2",
"pin-project",
"reqwest",
"serde",
"serde_json",
"thiserror",
"tokio",
"tokio-tungstenite 0.17.1",
"tracing",
"tracing-futures",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-timer",
"web-sys",
"ws_stream_wasm",
]
[[package]]
name = "ethers-providers"
version = "0.6.2"
source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a"
dependencies = [
"async-trait",
"auto_impl",
"base64 0.13.0",
"ethers-core 0.6.3",
"ethers-core",
"futures-channel",
"futures-core",
"futures-timer",
@ -1509,33 +1339,17 @@ dependencies = [
"ws_stream_wasm",
]
[[package]]
name = "ethers-signers"
version = "0.6.0"
source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb"
dependencies = [
"async-trait",
"coins-bip32",
"coins-bip39",
"elliptic-curve",
"eth-keystore",
"ethers-core 0.6.0",
"hex",
"rand",
"sha2 0.9.9",
"thiserror",
]
[[package]]
name = "ethers-signers"
version = "0.6.2"
source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a"
dependencies = [
"async-trait",
"coins-bip32",
"coins-bip39",
"elliptic-curve",
"eth-keystore",
"ethers-core 0.6.3",
"ethers-core",
"hex",
"rand",
"sha2 0.9.9",
@ -1545,11 +1359,12 @@ dependencies = [
[[package]]
name = "ethers-solc"
version = "0.3.0"
source = "git+https://github.com/SatoshiAndKin/ethers-rs#7f6e02f06c52fe3ec6219aa0882ef01d1691003a"
dependencies = [
"cfg-if",
"colored",
"dunce",
"ethers-core 0.6.3",
"ethers-core",
"getrandom",
"glob",
"hex",
@ -1572,35 +1387,6 @@ dependencies = [
"walkdir",
]
[[package]]
name = "ethers-solc"
version = "0.3.0"
source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb"
dependencies = [
"colored",
"dunce",
"ethers-core 0.6.0",
"getrandom",
"glob",
"hex",
"home",
"md-5",
"num_cpus",
"once_cell",
"path-slash",
"rayon",
"regex",
"semver",
"serde",
"serde_json",
"solang-parser",
"svm-rs",
"thiserror",
"tiny-keccak",
"tracing",
"walkdir",
]
[[package]]
name = "eyre"
version = "0.6.8"
@ -4522,7 +4308,7 @@ dependencies = [
"console-subscriber",
"dashmap",
"derive_more",
"ethers 0.6.0",
"ethers",
"fdlimit",
"flume",
"futures",
@ -4558,32 +4344,6 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "web3-proxy-minimal"
version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"argh",
"console-subscriber",
"derive_more",
"ethers 0.6.2",
"fdlimit",
"flume",
"futures",
"hashbrown 0.12.1",
"parking_lot 0.12.0",
"regex",
"reqwest",
"rustc-hash",
"serde",
"serde_json",
"tokio",
"toml",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "webpki"
version = "0.22.0"

@ -3,7 +3,6 @@ members = [
"linkedhashmap",
"web3-proxy",
"web3-proxy-frontend",
"web3-proxy-minimal",
]
# TODO: enable these once rapid development is done

@ -6,7 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
axum = "*"
axum = "0.5.6"
console-subscriber = { version = "0.1.5", features = ["parking_lot"] }
hashbrown = "0.12.1"
serde = { version = "1.0.137", features = [] }

@ -1,33 +0,0 @@
[package]
name = "web3-proxy-minimal"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.57"
arc-swap = "1.5.0"
argh = "0.1.7"
# axum = "*" # TODO: use this instead of warp?
console-subscriber = { version = "0.1.5", features = ["parking_lot"] }
derive_more = "0.99.17"
ethers = { path = "../../ethers-rs", features = ["rustls", "ws"] }
fdlimit = "0.2.1"
flume = "0.10.12"
futures = { version = "0.3.21", features = ["thread-pool"] }
# TODO: governor has a "futures" and "futures-timer" feature. do we want those?
hashbrown = "0.12.1"
# TODO: parking_lot has an "arc_lock" feature that we might want to use
parking_lot = { version = "0.12.0", features = ["deadlock_detection"] }
# TODO: regex has several "perf" features that we might want to use
regex = "1.5.5"
reqwest = { version = "0.11.10", default-features = false, features = ["json", "tokio-rustls"] }
rustc-hash = "1.1.0"
serde = { version = "1.0.137", features = [] }
serde_json = { version = "1.0.81", default-features = false, features = ["alloc", "raw_value"] }
tokio = { version = "1.18.2", features = ["full", "tracing"] }
toml = "0.5.9"
tracing = "0.1.34"
# TODO: tracing-subscriber has serde and serde_json features that we might want to use
tracing-subscriber = { version = "0.3.11", features = ["parking_lot"] }

@ -1,53 +0,0 @@
use crate::connections::Web3Connections;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
static APP_USER_AGENT: &str = concat!(
"satoshiandkin/",
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION"),
);
/// The application
// TODO: this debug impl is way too verbose. make something smaller
// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs
pub struct Web3ProxyApp {
/// Send requests to the best server available
balanced_rpcs: Arc<Web3Connections>,
}
impl fmt::Debug for Web3ProxyApp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("Web3ProxyApp").finish_non_exhaustive()
}
}
impl Web3ProxyApp {
// #[instrument(name = "try_new_Web3ProxyApp", skip_all)]
pub async fn try_new(
chain_id: u64,
balanced_rpcs: Vec<String>,
) -> anyhow::Result<Web3ProxyApp> {
// make a http shared client
// TODO: how should we configure the connection pool?
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
let http_client = reqwest::ClientBuilder::new()
.connect_timeout(Duration::from_secs(5))
.timeout(Duration::from_secs(60))
.user_agent(APP_USER_AGENT)
.build()?;
// TODO: attach context to this error
let balanced_rpcs =
Web3Connections::try_new(chain_id, balanced_rpcs, Some(http_client.clone())).await?;
Ok(Web3ProxyApp { balanced_rpcs })
}
pub async fn run(&self) -> anyhow::Result<()> {
self.balanced_rpcs.subscribe_heads().await
}
}

@ -1,342 +0,0 @@
///! Rate-limited communication with a web3 provider
use derive_more::From;
use ethers::prelude::{Block, Middleware, ProviderError, TxHash, H256};
use futures::StreamExt;
use std::fmt;
use std::sync::atomic::{self, AtomicU32};
use std::{cmp::Ordering, sync::Arc};
use tokio::sync::RwLock;
use tokio::task;
use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
use tracing::{info, instrument, trace, warn};
/// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit https://github.com/gakonst/ethers-rs/issues/592
#[derive(From)]
pub enum Web3Provider {
Http(ethers::providers::Provider<ethers::providers::Http>),
Ws(ethers::providers::Provider<ethers::providers::Ws>),
}
impl Web3Provider {
#[instrument]
async fn from_str(url_str: &str, http_client: Option<reqwest::Client>) -> anyhow::Result<Self> {
let provider = if url_str.starts_with("http") {
let url: reqwest::Url = url_str.parse()?;
let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?;
let provider = ethers::providers::Http::new_with_client(url, http_client);
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
ethers::providers::Provider::new(provider)
.interval(Duration::from_secs(1))
.into()
} else if url_str.starts_with("ws") {
// TODO: wrapper automatically reconnect
let provider = ethers::providers::Ws::connect(url_str).await?;
// TODO: make sure this automatically reconnects
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
ethers::providers::Provider::new(provider)
.interval(Duration::from_secs(1))
.into()
} else {
return Err(anyhow::anyhow!("only http and ws servers are supported"));
};
Ok(provider)
}
}
impl fmt::Debug for Web3Provider {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url
f.debug_struct("Web3Provider").finish_non_exhaustive()
}
}
/// An active connection to a Web3Rpc
pub struct Web3Connection {
/// TODO: can we get this from the provider? do we even need it?
url: String,
/// keep track of currently open requests. We sort on this
active_requests: AtomicU32,
/// this in a RwLock so that we can replace it if re-connecting
provider: RwLock<Arc<Web3Provider>>,
chain_id: u64,
}
impl fmt::Debug for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Web3Connection")
.field("url", &self.url)
.finish_non_exhaustive()
}
}
impl fmt::Display for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", &self.url)
}
}
impl Web3Connection {
#[instrument(skip_all)]
pub async fn reconnect(
self: &Arc<Self>,
block_sender: &flume::Sender<(u64, H256, usize)>,
rpc_id: usize,
) -> anyhow::Result<()> {
// websocket doesn't need the http client
let http_client = None;
// since this lock is held open over an await, we use tokio's locking
let mut provider = self.provider.write().await;
// tell the block subscriber that we are at 0
block_sender.send_async((0, H256::zero(), rpc_id)).await?;
let new_provider = Web3Provider::from_str(&self.url, http_client).await?;
*provider = Arc::new(new_provider);
Ok(())
}
/// Connect to a web3 rpc and subscribe to new heads
#[instrument(name = "try_new_Web3Connection", skip(http_client))]
pub async fn try_new(
chain_id: u64,
url_str: String,
// optional because this is only used for http providers. websocket providers don't use it
http_client: Option<reqwest::Client>,
) -> anyhow::Result<Arc<Web3Connection>> {
let provider = Web3Provider::from_str(&url_str, http_client).await?;
let connection = Web3Connection {
url: url_str.clone(),
active_requests: 0.into(),
provider: RwLock::new(Arc::new(provider)),
chain_id,
};
Ok(Arc::new(connection))
}
#[instrument]
pub async fn check_chain_id(&self) -> anyhow::Result<()> {
// check the server's chain_id here
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
let found_chain_id: Result<String, _> =
self.request("eth_chainId", Option::None::<()>).await;
match found_chain_id {
Ok(found_chain_id) => {
let found_chain_id =
u64::from_str_radix(found_chain_id.trim_start_matches("0x"), 16).unwrap();
if self.chain_id != found_chain_id {
return Err(anyhow::anyhow!(
"incorrect chain id! Expected {}. Found {}",
self.chain_id,
found_chain_id
));
}
}
Err(e) => {
let e = anyhow::Error::from(e).context(format!("{}", self));
return Err(e);
}
}
info!("Successful connection");
Ok(())
}
/// Send a web3 request
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
/// By taking self here, we ensure that this is dropped after the request is complete
#[instrument(skip(params))]
pub async fn request<T, R>(
&self,
method: &str,
params: T,
) -> Result<R, ethers::prelude::ProviderError>
where
T: fmt::Debug + serde::Serialize + Send + Sync,
R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug,
{
// TODO: use tracing spans properly
// TODO: it would be nice to have the request id on this
// TODO: including params in this is way too verbose
trace!("Sending {} to {}", method, self.url);
let provider = self.provider.read().await.clone();
let response = match &*provider {
Web3Provider::Http(provider) => provider.request(method, params).await,
Web3Provider::Ws(provider) => provider.request(method, params).await,
};
// TODO: i think ethers already has trace logging (and does it much more fancy)
// TODO: at least instrument this with more useful information
trace!("Reply from {}", self.url);
response
}
#[instrument(skip_all)]
async fn send_block(
self: &Arc<Self>,
block: Result<Block<TxHash>, ProviderError>,
block_sender: &flume::Sender<(u64, H256, usize)>,
rpc_id: usize,
) {
match block {
Ok(block) => {
let block_number = block.number.unwrap().as_u64();
let block_hash = block.hash.unwrap();
// TODO: i'm pretty sure we don't need send_async, but double check
block_sender
.send_async((block_number, block_hash, rpc_id))
.await
.unwrap();
}
Err(e) => {
warn!("unable to get block from {}: {}", self, e);
}
}
}
/// Subscribe to new blocks. If `reconnect` is true, this runs forever.
/// TODO: instrument with the url
#[instrument(skip_all)]
pub async fn subscribe_new_heads(
self: Arc<Self>,
rpc_id: usize,
block_sender: flume::Sender<(u64, H256, usize)>,
reconnect: bool,
) -> anyhow::Result<()> {
loop {
info!("Watching new_heads on {}", self);
// TODO: is a RwLock of Arc the right thing here?
let provider = self.provider.read().await.clone();
match &*provider {
Web3Provider::Http(provider) => {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: what should this interval be? probably some fraction of block time. set automatically?
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
let mut interval = interval(Duration::from_secs(2));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let mut last_hash = Default::default();
loop {
// wait for the interval
// TODO: if error or rate limit, increase interval?
interval.tick().await;
// TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest"
let block: Result<Block<TxHash>, _> = provider
.request("eth_getBlockByNumber", ("latest", false))
.await;
// don't send repeat blocks
if let Ok(block) = &block {
let new_hash = block.hash.unwrap();
if new_hash == last_hash {
continue;
}
last_hash = new_hash;
}
self.send_block(block, &block_sender, rpc_id).await;
}
}
Web3Provider::Ws(provider) => {
// TODO: automatically reconnect?
// TODO: it would be faster to get the block number, but subscriptions don't provide that
// TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out?
let mut stream = provider.subscribe_blocks().await?;
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block
// TODO: rate limit!
let block: Result<Block<TxHash>, _> = provider
.request("eth_getBlockByNumber", ("latest", false))
.await;
self.send_block(block, &block_sender, rpc_id).await;
// TODO: should the stream have a timeout on it here?
// TODO: although reconnects will make this less of an issue
loop {
match stream.next().await {
Some(new_block) => {
self.send_block(Ok(new_block), &block_sender, rpc_id).await;
// TODO: really not sure about this
task::yield_now().await;
}
None => {
warn!("subscription ended");
break;
}
}
}
}
}
if reconnect {
drop(provider);
// TODO: exponential backoff
warn!("new heads subscription exited. reconnecting in 10 seconds...");
sleep(Duration::from_secs(10)).await;
self.reconnect(&block_sender, rpc_id).await?;
} else {
break;
}
}
info!("Done watching new_heads on {}", self);
Ok(())
}
}
impl Eq for Web3Connection {}
impl Ord for Web3Connection {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// TODO: what atomic ordering?!
let a = self.active_requests.load(atomic::Ordering::Acquire);
let b = other.active_requests.load(atomic::Ordering::Acquire);
a.cmp(&b)
}
}
impl PartialOrd for Web3Connection {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
/// note that this is just comparing the active requests. two providers with different rpc urls are equal!
impl PartialEq for Web3Connection {
fn eq(&self, other: &Self) -> bool {
// TODO: what ordering?!
self.active_requests.load(atomic::Ordering::Acquire)
== other.active_requests.load(atomic::Ordering::Acquire)
}
}

@ -1,245 +0,0 @@
///! Load balanced communication with a group of web3 providers
use arc_swap::ArcSwap;
use derive_more::From;
use ethers::prelude::H256;
use futures::future::join_all;
use hashbrown::HashMap;
use std::cmp;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
use std::sync::Arc;
use tokio::task;
use tracing::Instrument;
use tracing::{info, info_span, instrument, warn};
use crate::connection::Web3Connection;
#[derive(Clone, Debug, Default)]
struct SyncedConnections {
head_block_num: u64,
head_block_hash: H256,
inner: BTreeSet<usize>,
}
/// A collection of web3 connections. Sends requests either the current best server or all servers.
#[derive(From)]
pub struct Web3Connections {
inner: Vec<Arc<Web3Connection>>,
synced_connections: ArcSwap<SyncedConnections>,
}
impl fmt::Debug for Web3Connections {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("Web3Connections")
.field("inner", &self.inner)
.finish_non_exhaustive()
}
}
impl Web3Connections {
// #[instrument(name = "try_new_Web3Connections", skip_all)]
pub async fn try_new(
chain_id: u64,
servers: Vec<String>,
http_client: Option<reqwest::Client>,
) -> anyhow::Result<Arc<Self>> {
let num_connections = servers.len();
// turn configs into connections
let mut connections = Vec::with_capacity(num_connections);
for rpc_url in servers.into_iter() {
match Web3Connection::try_new(chain_id, rpc_url, http_client.clone()).await {
Ok(connection) => connections.push(connection),
Err(e) => warn!("Unable to connect to a server! {:?}", e),
}
}
if connections.len() < 2 {
// TODO: less than 3? what should we do here?
return Err(anyhow::anyhow!(
"need at least 2 connections when subscribing to heads!"
));
}
let synced_connections = SyncedConnections::default();
let connections = Arc::new(Self {
inner: connections,
synced_connections: ArcSwap::new(Arc::new(synced_connections)),
});
Ok(connections)
}
pub async fn subscribe_heads(self: &Arc<Self>) -> anyhow::Result<()> {
let (block_sender, block_receiver) = flume::unbounded();
let mut handles = vec![];
for (rpc_id, connection) in self.inner.iter().enumerate() {
// subscribe to new heads in a spawned future
// TODO: channel instead. then we can have one future with write access to a left-right?
let connection = Arc::clone(connection);
let block_sender = block_sender.clone();
// let url = connection.url().to_string();
let handle = task::Builder::default()
.name("subscribe_new_heads")
.spawn(async move {
// loop to automatically reconnect
// TODO: make this cancellable?
// TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date
// TODO: proper span
connection.check_chain_id().await?;
connection
.subscribe_new_heads(rpc_id, block_sender.clone(), true)
.instrument(tracing::info_span!("url"))
.await
});
handles.push(handle);
}
let connections = Arc::clone(self);
let handle = task::Builder::default()
.name("update_synced_rpcs")
.spawn(async move { connections.update_synced_rpcs(block_receiver).await });
handles.push(handle);
for x in join_all(handles).await {
match x {
Ok(Ok(_)) => {}
Ok(Err(e)) => return Err(e),
Err(e) => return Err(e.into()),
}
}
Ok(())
}
/// TODO: move parts of this onto SyncedConnections?
#[instrument(skip_all)]
async fn update_synced_rpcs(
&self,
block_receiver: flume::Receiver<(u64, H256, usize)>,
) -> anyhow::Result<()> {
let max_connections = self.inner.len();
let mut connection_states: HashMap<usize, (u64, H256)> =
HashMap::with_capacity(max_connections);
let mut pending_synced_connections = SyncedConnections::default();
while let Ok((new_block_num, new_block_hash, rpc_id)) = block_receiver.recv_async().await {
// TODO: span with rpc in it, too
// TODO: make sure i'm doing this span right
let span = info_span!("block_receiver", rpc_id, new_block_num);
let _enter = span.enter();
if new_block_num == 0 {
// TODO: show the actual rpc url?
warn!("rpc #{} is still syncing", rpc_id);
}
connection_states.insert(rpc_id, (new_block_num, new_block_hash));
// TODO: do something to update the synced blocks
match new_block_num.cmp(&pending_synced_connections.head_block_num) {
cmp::Ordering::Greater => {
// the rpc's newest block is the new overall best block
info!("new head: {:?}", new_block_hash);
pending_synced_connections.inner.clear();
pending_synced_connections.inner.insert(rpc_id);
pending_synced_connections.head_block_num = new_block_num;
// TODO: if the parent hash isn't our previous best block, ignore it
pending_synced_connections.head_block_hash = new_block_hash;
}
cmp::Ordering::Equal => {
if new_block_hash == pending_synced_connections.head_block_hash {
// this rpc has caught up with the best known head
// do not clear synced_connections.
// we just want to add this rpc to the end
// TODO: HashSet here? i think we get dupes if we don't
pending_synced_connections.inner.insert(rpc_id);
} else {
// same height, but different chain
// check connection_states to see which head block is more popular!
let mut rpc_ids_by_block: BTreeMap<H256, Vec<usize>> = BTreeMap::new();
let mut synced_rpcs = 0;
// TODO: what order should we iterate in? track last update time, too?
for (rpc_id, (block_num, block_hash)) in connection_states.iter() {
if *block_num != new_block_num {
// this connection isn't synced. we don't care what hash it has
continue;
}
synced_rpcs += 1;
let count = rpc_ids_by_block
.entry(*block_hash)
.or_insert_with(|| Vec::with_capacity(max_connections - 1));
count.push(*rpc_id);
}
let most_common_head_hash = rpc_ids_by_block
.iter()
.max_by(|a, b| a.1.len().cmp(&b.1.len()))
.map(|(k, _v)| k)
.unwrap();
warn!(
"chain is forked! {} possible heads. {}/{}/{} rpcs have {}",
rpc_ids_by_block.len(),
rpc_ids_by_block.get(most_common_head_hash).unwrap().len(),
synced_rpcs,
max_connections,
most_common_head_hash
);
// this isn't the best block in the tier. make sure this rpc isn't included
if new_block_hash != *most_common_head_hash {
pending_synced_connections.inner.remove(&rpc_id);
}
// TODO: if pending_synced_connections hasn't changed. continue
}
}
cmp::Ordering::Less => {
// this isn't the best block in the tier. don't do anything
if !pending_synced_connections.inner.remove(&rpc_id) {
// we didn't remove anything. nothing more to do
// the rpc must be behind by more than 1 block
continue;
}
// we removed. don't continue so that we update self.synced_connections
}
}
// the synced connections have changed
let synced_connections = Arc::new(pending_synced_connections.clone());
info!(
"new synced_connections for {}: {:?}",
synced_connections.head_block_hash, synced_connections.inner
);
// TODO: only do this if there are 2 nodes synced to this block?
// do the arcswap
self.synced_connections.swap(synced_connections);
}
// TODO: if there was an error, we should return it
Err(anyhow::anyhow!("block_receiver exited!"))
}
}

@ -1,88 +0,0 @@
mod app;
mod connection;
mod connections;
use parking_lot::deadlock;
use std::env;
use std::sync::atomic::{self, AtomicUsize};
use std::thread;
use std::time::Duration;
use tokio::runtime;
use crate::app::Web3ProxyApp;
fn main() -> anyhow::Result<()> {
// TODO: is there a better way to do this?
if env::var("RUST_LOG").is_err() {
env::set_var("RUST_LOG", "web3_proxy_minimal=debug");
}
// install global collector configured based on RUST_LOG env var.
// tracing_subscriber::fmt().init();
console_subscriber::init();
fdlimit::raise_fd_limit();
let chain_id = 1;
let workers = 4;
let mut rt_builder = runtime::Builder::new_multi_thread();
rt_builder.enable_all().thread_name_fn(move || {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
// TODO: what ordering? i think we want seqcst so that these all happen in order, but that might be stricter than we really need
let worker_id = ATOMIC_ID.fetch_add(1, atomic::Ordering::SeqCst);
// TODO: i think these max at 15 characters
format!("web3-{}-{}", chain_id, worker_id)
});
if workers > 0 {
rt_builder.worker_threads(workers);
}
let rt = rt_builder.build()?;
// spawn a thread for deadlock detection
thread::spawn(move || loop {
thread::sleep(Duration::from_secs(10));
let deadlocks = deadlock::check_deadlock();
if deadlocks.is_empty() {
continue;
}
println!("{} deadlocks detected", deadlocks.len());
for (i, threads) in deadlocks.iter().enumerate() {
println!("Deadlock #{}", i);
for t in threads {
println!("Thread Id {:#?}", t.thread_id());
println!("{:#?}", t.backtrace());
}
}
});
// spawn the root task
rt.block_on(async {
let balanced_rpcs = vec![
"http://127.0.0.1:8545",
"ws://127.0.0.1:8546",
"http://127.0.0.1:8549",
"ws://127.0.0.1:8549",
"https://api.edennetwork.io/v1/",
"https://api.edennetwork.io/v1/beta",
"https://rpc.ethermine.org",
"https://rpc.flashbots.net",
"https://gibson.securerpc.com/v1",
"wss://ws-nd-373-761-850.p2pify.com/106d73af4cebc487df5ba92f1ad8dee7",
"wss://mainnet.infura.io/ws/v3/c6fa1b6f17124b44ae71b2b25601aee0",
"wss://ecfa2710350449f490725c4525cba584.eth.ws.rivet.cloud/",
"wss://speedy-nodes-nyc.moralis.io/3587198387de4b2d711f6999/eth/mainnet/archive/ws",
]
.into_iter()
.map(|x| x.to_string())
.collect();
let app = Web3ProxyApp::try_new(chain_id, balanced_rpcs).await?;
app.run().await
})
}

@ -13,7 +13,7 @@ argh = "0.1.7"
console-subscriber = { version = "0.1.5", features = ["parking_lot"] }
dashmap = "5.3.3"
derive_more = "0.99.17"
ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] }
ethers = { git = "https://github.com/SatoshiAndKin/ethers-rs", features = ["rustls", "ws"] }
fdlimit = "0.2.1"
flume = "0.10.12"
futures = { version = "0.3.21", features = ["thread-pool"] }

@ -79,6 +79,7 @@ impl Web3ProxyApp {
Web3Connections::try_new(chain_id, balanced_rpcs, Some(http_client.clone()), &clock)
.await?;
// TODO: do this separately instead of during try_new
{
let balanced_rpcs = balanced_rpcs.clone();
task::spawn(async move {
@ -110,7 +111,6 @@ impl Web3ProxyApp {
self: Arc<Web3ProxyApp>,
request: JsonRpcRequestEnum,
) -> anyhow::Result<impl warp::Reply> {
// TODO: i feel like i don't see this log when i should (even though i do see the response)
debug!("Received request: {:?}", request);
let response = match request {
@ -122,7 +122,6 @@ impl Web3ProxyApp {
}
};
// TODO: i feel like i don't see this log when i should (even though i do see the response)
debug!("Forwarding response: {:?}", response);
Ok(warp::reply::json(&response))

@ -20,7 +20,7 @@ use tracing::{info, info_span, instrument, trace, warn};
use crate::config::Web3ConnectionConfig;
use crate::connection::{ActiveRequestHandle, Web3Connection};
#[derive(Clone)]
#[derive(Clone, Default)]
struct SyncedConnections {
head_block_num: u64,
head_block_hash: H256,
@ -35,14 +35,6 @@ impl fmt::Debug for SyncedConnections {
}
impl SyncedConnections {
fn new(max_connections: usize) -> Self {
Self {
head_block_num: 0,
head_block_hash: Default::default(),
inner: HashSet::with_capacity(max_connections),
}
}
pub fn get_head_block_hash(&self) -> &H256 {
&self.head_block_hash
}
@ -93,7 +85,7 @@ impl Web3Connections {
));
}
let synced_connections = SyncedConnections::new(num_connections);
let synced_connections = SyncedConnections::default();
let connections = Arc::new(Self {
inner: connections,
@ -139,6 +131,7 @@ impl Web3Connections {
handles.push(handle);
// TODO: do something with join_all's result
join_all(handles).await;
}
@ -224,7 +217,7 @@ impl Web3Connections {
let mut connection_states: HashMap<usize, (u64, H256)> =
HashMap::with_capacity(max_connections);
let mut pending_synced_connections = SyncedConnections::new(max_connections);
let mut pending_synced_connections = SyncedConnections::default();
while let Ok((new_block_num, new_block_hash, rpc_id)) = block_receiver.recv_async().await {
// TODO: span with more in it?