diff --git a/Cargo.lock b/Cargo.lock index b563d087..a2e7fb41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4297,6 +4297,32 @@ dependencies = [ "warp", ] +[[package]] +name = "web3-proxy-minimal" +version = "0.1.0" +dependencies = [ + "anyhow", + "arc-swap", + "argh", + "console-subscriber", + "derive_more", + "ethers", + "fdlimit", + "flume", + "futures", + "hashbrown 0.12.1", + "parking_lot 0.12.0", + "regex", + "reqwest", + "rustc-hash", + "serde", + "serde_json", + "tokio", + "toml", + "tracing", + "tracing-subscriber", +] + [[package]] name = "webpki" version = "0.22.0" diff --git a/Cargo.toml b/Cargo.toml index 8ae9edfa..28fbf3d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "linkedhashmap", "web3-proxy", + "web3-proxy-minimal", ] # TODO: enable these once rapid development is done diff --git a/config/example.bac b/config/example.bac new file mode 100644 index 00000000..0a0ad1a9 --- /dev/null +++ b/config/example.bac @@ -0,0 +1,35 @@ +[shared] +chain_id = 1 + +[balanced_rpcs] + + [balanced_rpcs.erigon_archive] + url = "http://127.0.0.1:8549" + # TODO: double check soft_limit on erigon + soft_limit = 100_000 + + [balanced_rpcs.geth] + url = "http://127.0.0.1:8545" + soft_limit = 200_000 + +[private_rpcs] + + [private_rpcs.eden] + url = "https://api.edennetwork.io/v1/" + soft_limit = 1_805 + + [private_rpcs.eden_beta] + url = "https://api.edennetwork.io/v1/beta" + soft_limit = 5_861 + + [private_rpcs.ethermine] + url = "https://rpc.ethermine.org" + soft_limit = 5_861 + + [private_rpcs.flashbots] + url = "https://rpc.flashbots.net" + soft_limit = 7074 + + [private_rpcs.securerpc] + url = "https://gibson.securerpc.com/v1" + soft_limit = 4560 diff --git a/web3-proxy-minimal/Cargo.toml b/web3-proxy-minimal/Cargo.toml new file mode 100644 index 00000000..368ab7e2 --- /dev/null +++ b/web3-proxy-minimal/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "web3-proxy-minimal" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.57" +arc-swap = "1.5.0" +argh = "0.1.7" +# axum = "*" # TODO: use this instead of warp? +console-subscriber = { version = "0.1.5", features = ["parking_lot"] } +derive_more = "0.99.17" +ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] } +fdlimit = "0.2.1" +flume = "0.10.12" +futures = { version = "0.3.21", features = ["thread-pool"] } +# TODO: governor has a "futures" and "futures-timer" feature. do we want those? +hashbrown = "0.12.1" +# TODO: parking_lot has an "arc_lock" feature that we might want to use +parking_lot = { version = "0.12.0", features = ["deadlock_detection"] } +# TODO: regex has several "perf" features that we might want to use +regex = "1.5.5" +reqwest = { version = "0.11.10", default-features = false, features = ["json", "tokio-rustls"] } +rustc-hash = "1.1.0" +serde = { version = "1.0.137", features = [] } +serde_json = { version = "1.0.81", default-features = false, features = ["alloc", "raw_value"] } +tokio = { version = "1.18.2", features = ["full", "tracing"] } +toml = "0.5.9" +tracing = "0.1.34" +# TODO: tracing-subscriber has serde and serde_json features that we might want to use +tracing-subscriber = { version = "0.3.11", features = ["parking_lot"] } diff --git a/web3-proxy-minimal/src/app.rs b/web3-proxy-minimal/src/app.rs new file mode 100644 index 00000000..eebe3519 --- /dev/null +++ b/web3-proxy-minimal/src/app.rs @@ -0,0 +1,53 @@ +use crate::connections::Web3Connections; +use std::fmt; +use std::sync::Arc; +use std::time::Duration; + +static APP_USER_AGENT: &str = concat!( + "satoshiandkin/", + env!("CARGO_PKG_NAME"), + "/", + env!("CARGO_PKG_VERSION"), +); + +/// The application +// TODO: this debug impl is way too verbose. make something smaller +// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs +pub struct Web3ProxyApp { + /// Send requests to the best server available + balanced_rpcs: Arc, +} + +impl fmt::Debug for Web3ProxyApp { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: the default formatter takes forever to write. this is too quiet though + f.debug_struct("Web3ProxyApp").finish_non_exhaustive() + } +} + +impl Web3ProxyApp { + // #[instrument(name = "try_new_Web3ProxyApp", skip_all)] + pub async fn try_new( + chain_id: u64, + balanced_rpcs: Vec, + ) -> anyhow::Result { + // make a http shared client + // TODO: how should we configure the connection pool? + // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server + let http_client = reqwest::ClientBuilder::new() + .connect_timeout(Duration::from_secs(5)) + .timeout(Duration::from_secs(60)) + .user_agent(APP_USER_AGENT) + .build()?; + + // TODO: attach context to this error + let balanced_rpcs = + Web3Connections::try_new(chain_id, balanced_rpcs, Some(http_client.clone())).await?; + + Ok(Web3ProxyApp { balanced_rpcs }) + } + + pub async fn run(&self) -> anyhow::Result<()> { + self.balanced_rpcs.subscribe_heads().await + } +} diff --git a/web3-proxy-minimal/src/connection.rs b/web3-proxy-minimal/src/connection.rs new file mode 100644 index 00000000..caee7203 --- /dev/null +++ b/web3-proxy-minimal/src/connection.rs @@ -0,0 +1,342 @@ +///! Rate-limited communication with a web3 provider +use derive_more::From; +use ethers::prelude::{Block, Middleware, ProviderError, TxHash, H256}; +use futures::StreamExt; +use std::fmt; +use std::sync::atomic::{self, AtomicU32}; +use std::{cmp::Ordering, sync::Arc}; +use tokio::sync::RwLock; +use tokio::task; +use tokio::time::{interval, sleep, Duration, MissedTickBehavior}; +use tracing::{info, instrument, trace, warn}; + +/// TODO: instead of an enum, I tried to use Box, but hit https://github.com/gakonst/ethers-rs/issues/592 +#[derive(From)] +pub enum Web3Provider { + Http(ethers::providers::Provider), + Ws(ethers::providers::Provider), +} + +impl Web3Provider { + #[instrument] + async fn from_str(url_str: &str, http_client: Option) -> anyhow::Result { + let provider = if url_str.starts_with("http") { + let url: reqwest::Url = url_str.parse()?; + + let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?; + + let provider = ethers::providers::Http::new_with_client(url, http_client); + + // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) + ethers::providers::Provider::new(provider) + .interval(Duration::from_secs(1)) + .into() + } else if url_str.starts_with("ws") { + // TODO: wrapper automatically reconnect + let provider = ethers::providers::Ws::connect(url_str).await?; + + // TODO: make sure this automatically reconnects + + // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) + ethers::providers::Provider::new(provider) + .interval(Duration::from_secs(1)) + .into() + } else { + return Err(anyhow::anyhow!("only http and ws servers are supported")); + }; + + Ok(provider) + } +} + +impl fmt::Debug for Web3Provider { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url + f.debug_struct("Web3Provider").finish_non_exhaustive() + } +} + +/// An active connection to a Web3Rpc +pub struct Web3Connection { + /// TODO: can we get this from the provider? do we even need it? + url: String, + /// keep track of currently open requests. We sort on this + active_requests: AtomicU32, + /// this in a RwLock so that we can replace it if re-connecting + provider: RwLock>, + chain_id: u64, +} + +impl fmt::Debug for Web3Connection { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Web3Connection") + .field("url", &self.url) + .finish_non_exhaustive() + } +} + +impl fmt::Display for Web3Connection { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", &self.url) + } +} + +impl Web3Connection { + #[instrument(skip_all)] + pub async fn reconnect( + self: &Arc, + block_sender: &flume::Sender<(u64, H256, usize)>, + rpc_id: usize, + ) -> anyhow::Result<()> { + // websocket doesn't need the http client + let http_client = None; + + // since this lock is held open over an await, we use tokio's locking + let mut provider = self.provider.write().await; + + // tell the block subscriber that we are at 0 + block_sender.send_async((0, H256::zero(), rpc_id)).await?; + + let new_provider = Web3Provider::from_str(&self.url, http_client).await?; + + *provider = Arc::new(new_provider); + + Ok(()) + } + + /// Connect to a web3 rpc and subscribe to new heads + #[instrument(name = "try_new_Web3Connection", skip(http_client))] + pub async fn try_new( + chain_id: u64, + url_str: String, + // optional because this is only used for http providers. websocket providers don't use it + http_client: Option, + ) -> anyhow::Result> { + let provider = Web3Provider::from_str(&url_str, http_client).await?; + + let connection = Web3Connection { + url: url_str.clone(), + active_requests: 0.into(), + provider: RwLock::new(Arc::new(provider)), + chain_id, + }; + + Ok(Arc::new(connection)) + } + + #[instrument] + pub async fn check_chain_id(&self) -> anyhow::Result<()> { + // check the server's chain_id here + // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error + let found_chain_id: Result = + self.request("eth_chainId", Option::None::<()>).await; + + match found_chain_id { + Ok(found_chain_id) => { + let found_chain_id = + u64::from_str_radix(found_chain_id.trim_start_matches("0x"), 16).unwrap(); + + if self.chain_id != found_chain_id { + return Err(anyhow::anyhow!( + "incorrect chain id! Expected {}. Found {}", + self.chain_id, + found_chain_id + )); + } + } + Err(e) => { + let e = anyhow::Error::from(e).context(format!("{}", self)); + return Err(e); + } + } + + info!("Successful connection"); + + Ok(()) + } + + /// Send a web3 request + /// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented + /// By taking self here, we ensure that this is dropped after the request is complete + #[instrument(skip(params))] + pub async fn request( + &self, + method: &str, + params: T, + ) -> Result + where + T: fmt::Debug + serde::Serialize + Send + Sync, + R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug, + { + // TODO: use tracing spans properly + // TODO: it would be nice to have the request id on this + // TODO: including params in this is way too verbose + trace!("Sending {} to {}", method, self.url); + + let provider = self.provider.read().await.clone(); + + let response = match &*provider { + Web3Provider::Http(provider) => provider.request(method, params).await, + Web3Provider::Ws(provider) => provider.request(method, params).await, + }; + + // TODO: i think ethers already has trace logging (and does it much more fancy) + // TODO: at least instrument this with more useful information + trace!("Reply from {}", self.url); + + response + } + + #[instrument(skip_all)] + async fn send_block( + self: &Arc, + block: Result, ProviderError>, + block_sender: &flume::Sender<(u64, H256, usize)>, + rpc_id: usize, + ) { + match block { + Ok(block) => { + let block_number = block.number.unwrap().as_u64(); + let block_hash = block.hash.unwrap(); + + // TODO: i'm pretty sure we don't need send_async, but double check + block_sender + .send_async((block_number, block_hash, rpc_id)) + .await + .unwrap(); + } + Err(e) => { + warn!("unable to get block from {}: {}", self, e); + } + } + } + + /// Subscribe to new blocks. If `reconnect` is true, this runs forever. + /// TODO: instrument with the url + #[instrument(skip_all)] + pub async fn subscribe_new_heads( + self: Arc, + rpc_id: usize, + block_sender: flume::Sender<(u64, H256, usize)>, + reconnect: bool, + ) -> anyhow::Result<()> { + loop { + info!("Watching new_heads on {}", self); + + // TODO: is a RwLock of Arc the right thing here? + let provider = self.provider.read().await.clone(); + + match &*provider { + Web3Provider::Http(provider) => { + // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints + // TODO: what should this interval be? probably some fraction of block time. set automatically? + // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now + // TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though + let mut interval = interval(Duration::from_secs(2)); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + let mut last_hash = Default::default(); + + loop { + // wait for the interval + // TODO: if error or rate limit, increase interval? + interval.tick().await; + + // TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest" + let block: Result, _> = provider + .request("eth_getBlockByNumber", ("latest", false)) + .await; + + // don't send repeat blocks + if let Ok(block) = &block { + let new_hash = block.hash.unwrap(); + + if new_hash == last_hash { + continue; + } + + last_hash = new_hash; + } + + self.send_block(block, &block_sender, rpc_id).await; + } + } + Web3Provider::Ws(provider) => { + // TODO: automatically reconnect? + // TODO: it would be faster to get the block number, but subscriptions don't provide that + // TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out? + let mut stream = provider.subscribe_blocks().await?; + + // query the block once since the subscription doesn't send the current block + // there is a very small race condition here where the stream could send us a new block right now + // all it does is print "new block" for the same block as current block + // TODO: rate limit! + let block: Result, _> = provider + .request("eth_getBlockByNumber", ("latest", false)) + .await; + + self.send_block(block, &block_sender, rpc_id).await; + + // TODO: should the stream have a timeout on it here? + // TODO: although reconnects will make this less of an issue + loop { + match stream.next().await { + Some(new_block) => { + self.send_block(Ok(new_block), &block_sender, rpc_id).await; + + // TODO: really not sure about this + task::yield_now().await; + } + None => { + warn!("subscription ended"); + break; + } + } + } + } + } + + if reconnect { + drop(provider); + + // TODO: exponential backoff + warn!("new heads subscription exited. reconnecting in 10 seconds..."); + sleep(Duration::from_secs(10)).await; + + self.reconnect(&block_sender, rpc_id).await?; + } else { + break; + } + } + + info!("Done watching new_heads on {}", self); + Ok(()) + } +} + +impl Eq for Web3Connection {} + +impl Ord for Web3Connection { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // TODO: what atomic ordering?! + let a = self.active_requests.load(atomic::Ordering::Acquire); + let b = other.active_requests.load(atomic::Ordering::Acquire); + + a.cmp(&b) + } +} + +impl PartialOrd for Web3Connection { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// note that this is just comparing the active requests. two providers with different rpc urls are equal! +impl PartialEq for Web3Connection { + fn eq(&self, other: &Self) -> bool { + // TODO: what ordering?! + self.active_requests.load(atomic::Ordering::Acquire) + == other.active_requests.load(atomic::Ordering::Acquire) + } +} diff --git a/web3-proxy-minimal/src/connections.rs b/web3-proxy-minimal/src/connections.rs new file mode 100644 index 00000000..5173c059 --- /dev/null +++ b/web3-proxy-minimal/src/connections.rs @@ -0,0 +1,251 @@ +///! Load balanced communication with a group of web3 providers +use arc_swap::ArcSwap; +use derive_more::From; +use ethers::prelude::H256; +use futures::future::join_all; +use hashbrown::{HashMap, HashSet}; +use std::cmp; +use std::collections::BTreeMap; +use std::fmt; +use std::sync::Arc; +use tokio::task; +use tracing::Instrument; +use tracing::{info, info_span, instrument, warn}; + +use crate::connection::Web3Connection; + +#[derive(Clone, Debug)] +struct SyncedConnections { + head_block_num: u64, + head_block_hash: H256, + inner: HashSet, +} + +impl SyncedConnections { + fn new(max_connections: usize) -> Self { + Self { + head_block_num: 0, + head_block_hash: Default::default(), + inner: HashSet::with_capacity(max_connections), + } + } +} + +/// A collection of web3 connections. Sends requests either the current best server or all servers. +#[derive(From)] +pub struct Web3Connections { + inner: Vec>, + synced_connections: ArcSwap, +} + +impl fmt::Debug for Web3Connections { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: the default formatter takes forever to write. this is too quiet though + f.debug_struct("Web3Connections") + .field("inner", &self.inner) + .finish_non_exhaustive() + } +} + +impl Web3Connections { + // #[instrument(name = "try_new_Web3Connections", skip_all)] + pub async fn try_new( + chain_id: u64, + servers: Vec, + http_client: Option, + ) -> anyhow::Result> { + let num_connections = servers.len(); + + // turn configs into connections + let mut connections = Vec::with_capacity(num_connections); + for rpc_url in servers.into_iter() { + match Web3Connection::try_new(chain_id, rpc_url, http_client.clone()).await { + Ok(connection) => connections.push(connection), + Err(e) => warn!("Unable to connect to a server! {:?}", e), + } + } + + if connections.len() < 2 { + // TODO: less than 3? what should we do here? + return Err(anyhow::anyhow!( + "need at least 2 connections when subscribing to heads!" + )); + } + + let synced_connections = SyncedConnections::new(num_connections); + + let connections = Arc::new(Self { + inner: connections, + synced_connections: ArcSwap::new(Arc::new(synced_connections)), + }); + + Ok(connections) + } + + pub async fn subscribe_heads(self: &Arc) -> anyhow::Result<()> { + let (block_sender, block_receiver) = flume::unbounded(); + + let mut handles = vec![]; + + for (rpc_id, connection) in self.inner.iter().enumerate() { + // subscribe to new heads in a spawned future + // TODO: channel instead. then we can have one future with write access to a left-right? + let connection = Arc::clone(connection); + let block_sender = block_sender.clone(); + + // let url = connection.url().to_string(); + + let handle = task::Builder::default() + .name("subscribe_new_heads") + .spawn(async move { + // loop to automatically reconnect + // TODO: make this cancellable? + // TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date + // TODO: proper span + connection.check_chain_id().await?; + + connection + .subscribe_new_heads(rpc_id, block_sender.clone(), true) + .instrument(tracing::info_span!("url")) + .await + }); + + handles.push(handle); + } + + let connections = Arc::clone(self); + let handle = task::Builder::default() + .name("update_synced_rpcs") + .spawn(async move { connections.update_synced_rpcs(block_receiver).await }); + + handles.push(handle); + + for x in join_all(handles).await { + match x { + Ok(Ok(_)) => {} + Ok(Err(e)) => return Err(e), + Err(e) => return Err(e.into()), + } + } + + Ok(()) + } + + /// TODO: move parts of this onto SyncedConnections? + #[instrument(skip_all)] + async fn update_synced_rpcs( + &self, + block_receiver: flume::Receiver<(u64, H256, usize)>, + ) -> anyhow::Result<()> { + let max_connections = self.inner.len(); + + let mut connection_states: HashMap = + HashMap::with_capacity(max_connections); + + let mut pending_synced_connections = SyncedConnections::new(max_connections); + + while let Ok((new_block_num, new_block_hash, rpc_id)) = block_receiver.recv_async().await { + if new_block_num == 0 { + // TODO: show the actual rpc url? + warn!("rpc #{} is still syncing", rpc_id); + } + + // TODO: span with rpc in it, too + // TODO: make sure i'm doing this span right + let span = info_span!("new_block", new_block_num); + let _enter = span.enter(); + + connection_states.insert(rpc_id, (new_block_num, new_block_hash)); + + // TODO: do something to update the synced blocks + match new_block_num.cmp(&pending_synced_connections.head_block_num) { + cmp::Ordering::Greater => { + // the rpc's newest block is the new overall best block + info!(rpc_id, "new head"); + + pending_synced_connections.inner.clear(); + pending_synced_connections.inner.insert(rpc_id); + + pending_synced_connections.head_block_num = new_block_num; + + // TODO: if the parent hash isn't our previous best block, ignore it + pending_synced_connections.head_block_hash = new_block_hash; + } + cmp::Ordering::Equal => { + if new_block_hash == pending_synced_connections.head_block_hash { + // this rpc has caught up with the best known head + // do not clear synced_connections. + // we just want to add this rpc to the end + // TODO: HashSet here? i think we get dupes if we don't + pending_synced_connections.inner.insert(rpc_id); + } else { + // same height, but different chain + + // check connection_states to see which head block is more popular! + let mut rpc_ids_by_block: BTreeMap> = BTreeMap::new(); + + let mut synced_rpcs = 0; + + // TODO: what order should we iterate in? track last update time, too? + for (rpc_id, (block_num, block_hash)) in connection_states.iter() { + if *block_num != new_block_num { + // this connection isn't synced. we don't care what hash it has + continue; + } + + synced_rpcs += 1; + + let count = rpc_ids_by_block + .entry(*block_hash) + .or_insert_with(|| Vec::with_capacity(max_connections - 1)); + + count.push(*rpc_id); + } + + let most_common_head_hash = rpc_ids_by_block + .iter() + .max_by(|a, b| a.1.len().cmp(&b.1.len())) + .map(|(k, _v)| k) + .unwrap(); + + warn!( + "chain is forked! {} possible heads. {}/{}/{} rpcs have {}", + rpc_ids_by_block.len(), + rpc_ids_by_block.get(most_common_head_hash).unwrap().len(), + synced_rpcs, + max_connections, + most_common_head_hash + ); + + // this isn't the best block in the tier. make sure this rpc isn't included + if new_block_hash != *most_common_head_hash { + pending_synced_connections.inner.remove(&rpc_id); + } + + // TODO: if pending_synced_connections hasn't changed. continue + } + } + cmp::Ordering::Less => { + // this isn't the best block in the tier. don't do anything + if !pending_synced_connections.inner.remove(&rpc_id) { + // we didn't remove anything. nothing more to do + continue; + } + // we removed. don't continue so that we update self.synced_connections + } + } + + // the synced connections have changed + let synced_connections = Arc::new(pending_synced_connections.clone()); + + info!("new synced_connections: {:?}", synced_connections); + + // TODO: only do this if there are 2 nodes synced to this block? + // do the arcswap + self.synced_connections.swap(synced_connections); + } + + // TODO: if there was an error, we should return it + Err(anyhow::anyhow!("block_receiver exited!")) + } +} diff --git a/web3-proxy-minimal/src/main.rs b/web3-proxy-minimal/src/main.rs new file mode 100644 index 00000000..d2fc98b3 --- /dev/null +++ b/web3-proxy-minimal/src/main.rs @@ -0,0 +1,88 @@ +mod app; +mod connection; +mod connections; + +use parking_lot::deadlock; +use std::env; +use std::sync::atomic::{self, AtomicUsize}; +use std::thread; +use std::time::Duration; +use tokio::runtime; + +use crate::app::Web3ProxyApp; + +fn main() -> anyhow::Result<()> { + // TODO: is there a better way to do this? + if env::var("RUST_LOG").is_err() { + env::set_var("RUST_LOG", "web3_proxy_minimal=debug"); + } + + // install global collector configured based on RUST_LOG env var. + // tracing_subscriber::fmt().init(); + console_subscriber::init(); + + fdlimit::raise_fd_limit(); + + let chain_id = 1; + let workers = 4; + + let mut rt_builder = runtime::Builder::new_multi_thread(); + + rt_builder.enable_all().thread_name_fn(move || { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + // TODO: what ordering? i think we want seqcst so that these all happen in order, but that might be stricter than we really need + let worker_id = ATOMIC_ID.fetch_add(1, atomic::Ordering::SeqCst); + // TODO: i think these max at 15 characters + format!("web3-{}-{}", chain_id, worker_id) + }); + + if workers > 0 { + rt_builder.worker_threads(workers); + } + + let rt = rt_builder.build()?; + + // spawn a thread for deadlock detection + thread::spawn(move || loop { + thread::sleep(Duration::from_secs(10)); + let deadlocks = deadlock::check_deadlock(); + if deadlocks.is_empty() { + continue; + } + + println!("{} deadlocks detected", deadlocks.len()); + for (i, threads) in deadlocks.iter().enumerate() { + println!("Deadlock #{}", i); + for t in threads { + println!("Thread Id {:#?}", t.thread_id()); + println!("{:#?}", t.backtrace()); + } + } + }); + + // spawn the root task + rt.block_on(async { + let balanced_rpcs = vec![ + "http://127.0.0.1:8545", + "ws://127.0.0.1:8546", + "http://127.0.0.1:8549", + "ws://127.0.0.1:8549", + "https://api.edennetwork.io/v1/", + "https://api.edennetwork.io/v1/beta", + "https://rpc.ethermine.org", + "https://rpc.flashbots.net", + "https://gibson.securerpc.com/v1", + "wss://ws-nd-373-761-850.p2pify.com/106d73af4cebc487df5ba92f1ad8dee7", + "wss://mainnet.infura.io/ws/v3/c6fa1b6f17124b44ae71b2b25601aee0", + "wss://ecfa2710350449f490725c4525cba584.eth.ws.rivet.cloud/", + "wss://speedy-nodes-nyc.moralis.io/3587198387de4b2d711f6999/eth/mainnet/archive/ws", + ] + .into_iter() + .map(|x| x.to_string()) + .collect(); + + let app = Web3ProxyApp::try_new(chain_id, balanced_rpcs).await?; + + app.run().await + }) +} diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 78618d73..98b1091b 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -17,7 +17,6 @@ use std::time::Duration; use tokio::sync::watch; use tokio::task; use tokio::time::sleep; -use tracing::info_span; use tracing::{debug, instrument, trace, warn}; static APP_USER_AGENT: &str = concat!( @@ -224,13 +223,12 @@ impl Web3ProxyApp { } else { // this is not a private transaction (or no private relays are configured) // TODO: how much should we retry? - for i in 0..10usize { + for _i in 0..10usize { // TODO: think more about this loop. // // TODO: add more to this span. and do it properly // let span = info_span!("i", ?i); // let _enter = span.enter(); - /* // todo: move getting a cache_key or the result into a helper function. then we could have multiple caches // TODO: i think we are maybe getting stuck on this lock. maybe a new block arrives, it tries to write and gets hung up on something. then this can't proceed trace!("{:?} waiting for head_block_hash", request); @@ -297,7 +295,6 @@ impl Web3ProxyApp { ); } } - */ match self.balanced_rpcs.next_upstream_server().await { Ok(active_request_handle) => { @@ -310,15 +307,14 @@ impl Web3ProxyApp { // TODO: trace here was really slow with millions of requests. // trace!("forwarding request from {}", upstream_server); - JsonRpcForwardedResponse { + let response = JsonRpcForwardedResponse { jsonrpc: "2.0".to_string(), id: request.id, // TODO: since we only use the result here, should that be all we return from try_send_request? result: Some(partial_response), error: None, - } + }; - /* // TODO: small race condidition here. parallel requests with the same query will both be saved to the cache let mut response_cache = self.response_cache.write(); @@ -334,13 +330,12 @@ impl Web3ProxyApp { // TODO: needing to remove manually here makes me think we should do this differently let _ = self.active_requests.remove(&cache_key); let _ = in_flight_tx.send(false); - */ - // response + response } Err(e) => { - // // send now since we aren't going to cache an error response - // let _ = in_flight_tx.send(false); + // send now since we aren't going to cache an error response + let _ = in_flight_tx.send(false); // TODO: move this to a helper function? let code; @@ -402,8 +397,8 @@ impl Web3ProxyApp { } }; - // // TODO: needing to remove manually here makes me think we should do this differently - // let _ = self.active_requests.remove(&cache_key); + // TODO: needing to remove manually here makes me think we should do this differently + let _ = self.active_requests.remove(&cache_key); if response.error.is_some() { trace!("Sending error reply: {:?}", response); @@ -412,7 +407,7 @@ impl Web3ProxyApp { } else { trace!("Sending reply: {:?}", response); - // let _ = in_flight_tx.send(false); + let _ = in_flight_tx.send(false); } return Ok(response); @@ -421,9 +416,9 @@ impl Web3ProxyApp { // TODO: this is too verbose. if there are other servers in other tiers, we use those! warn!("No servers in sync!"); - // // TODO: needing to remove manually here makes me think we should do this differently - // let _ = self.active_requests.remove(&cache_key); - // let _ = in_flight_tx.send(false); + // TODO: needing to remove manually here makes me think we should do this differently + let _ = self.active_requests.remove(&cache_key); + let _ = in_flight_tx.send(false); return Err(anyhow::anyhow!("no servers in sync")); } @@ -439,9 +434,9 @@ impl Web3ProxyApp { warn!("All rate limits exceeded. Sleeping"); - // // TODO: needing to remove manually here makes me think we should do this differently - // let _ = self.active_requests.remove(&cache_key); - // let _ = in_flight_tx.send(false); + // TODO: needing to remove manually here makes me think we should do this differently + let _ = self.active_requests.remove(&cache_key); + let _ = in_flight_tx.send(false); continue; }