add minimal code that only watches blocks to see if it locks up

This commit is contained in:
Bryan Stitt 2022-05-19 21:21:41 +00:00
parent 1e9284d5e8
commit 36f2ca380b
9 changed files with 844 additions and 20 deletions

26
Cargo.lock generated

@ -4297,6 +4297,32 @@ dependencies = [
"warp",
]
[[package]]
name = "web3-proxy-minimal"
version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"argh",
"console-subscriber",
"derive_more",
"ethers",
"fdlimit",
"flume",
"futures",
"hashbrown 0.12.1",
"parking_lot 0.12.0",
"regex",
"reqwest",
"rustc-hash",
"serde",
"serde_json",
"tokio",
"toml",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "webpki"
version = "0.22.0"

@ -2,6 +2,7 @@
members = [
"linkedhashmap",
"web3-proxy",
"web3-proxy-minimal",
]
# TODO: enable these once rapid development is done

35
config/example.bac Normal file

@ -0,0 +1,35 @@
[shared]
chain_id = 1
[balanced_rpcs]
[balanced_rpcs.erigon_archive]
url = "http://127.0.0.1:8549"
# TODO: double check soft_limit on erigon
soft_limit = 100_000
[balanced_rpcs.geth]
url = "http://127.0.0.1:8545"
soft_limit = 200_000
[private_rpcs]
[private_rpcs.eden]
url = "https://api.edennetwork.io/v1/"
soft_limit = 1_805
[private_rpcs.eden_beta]
url = "https://api.edennetwork.io/v1/beta"
soft_limit = 5_861
[private_rpcs.ethermine]
url = "https://rpc.ethermine.org"
soft_limit = 5_861
[private_rpcs.flashbots]
url = "https://rpc.flashbots.net"
soft_limit = 7074
[private_rpcs.securerpc]
url = "https://gibson.securerpc.com/v1"
soft_limit = 4560

@ -0,0 +1,33 @@
[package]
name = "web3-proxy-minimal"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.57"
arc-swap = "1.5.0"
argh = "0.1.7"
# axum = "*" # TODO: use this instead of warp?
console-subscriber = { version = "0.1.5", features = ["parking_lot"] }
derive_more = "0.99.17"
ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] }
fdlimit = "0.2.1"
flume = "0.10.12"
futures = { version = "0.3.21", features = ["thread-pool"] }
# TODO: governor has a "futures" and "futures-timer" feature. do we want those?
hashbrown = "0.12.1"
# TODO: parking_lot has an "arc_lock" feature that we might want to use
parking_lot = { version = "0.12.0", features = ["deadlock_detection"] }
# TODO: regex has several "perf" features that we might want to use
regex = "1.5.5"
reqwest = { version = "0.11.10", default-features = false, features = ["json", "tokio-rustls"] }
rustc-hash = "1.1.0"
serde = { version = "1.0.137", features = [] }
serde_json = { version = "1.0.81", default-features = false, features = ["alloc", "raw_value"] }
tokio = { version = "1.18.2", features = ["full", "tracing"] }
toml = "0.5.9"
tracing = "0.1.34"
# TODO: tracing-subscriber has serde and serde_json features that we might want to use
tracing-subscriber = { version = "0.3.11", features = ["parking_lot"] }

@ -0,0 +1,53 @@
use crate::connections::Web3Connections;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
static APP_USER_AGENT: &str = concat!(
"satoshiandkin/",
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION"),
);
/// The application
// TODO: this debug impl is way too verbose. make something smaller
// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs
pub struct Web3ProxyApp {
/// Send requests to the best server available
balanced_rpcs: Arc<Web3Connections>,
}
impl fmt::Debug for Web3ProxyApp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("Web3ProxyApp").finish_non_exhaustive()
}
}
impl Web3ProxyApp {
// #[instrument(name = "try_new_Web3ProxyApp", skip_all)]
pub async fn try_new(
chain_id: u64,
balanced_rpcs: Vec<String>,
) -> anyhow::Result<Web3ProxyApp> {
// make a http shared client
// TODO: how should we configure the connection pool?
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
let http_client = reqwest::ClientBuilder::new()
.connect_timeout(Duration::from_secs(5))
.timeout(Duration::from_secs(60))
.user_agent(APP_USER_AGENT)
.build()?;
// TODO: attach context to this error
let balanced_rpcs =
Web3Connections::try_new(chain_id, balanced_rpcs, Some(http_client.clone())).await?;
Ok(Web3ProxyApp { balanced_rpcs })
}
pub async fn run(&self) -> anyhow::Result<()> {
self.balanced_rpcs.subscribe_heads().await
}
}

@ -0,0 +1,342 @@
///! Rate-limited communication with a web3 provider
use derive_more::From;
use ethers::prelude::{Block, Middleware, ProviderError, TxHash, H256};
use futures::StreamExt;
use std::fmt;
use std::sync::atomic::{self, AtomicU32};
use std::{cmp::Ordering, sync::Arc};
use tokio::sync::RwLock;
use tokio::task;
use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
use tracing::{info, instrument, trace, warn};
/// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit https://github.com/gakonst/ethers-rs/issues/592
#[derive(From)]
pub enum Web3Provider {
Http(ethers::providers::Provider<ethers::providers::Http>),
Ws(ethers::providers::Provider<ethers::providers::Ws>),
}
impl Web3Provider {
#[instrument]
async fn from_str(url_str: &str, http_client: Option<reqwest::Client>) -> anyhow::Result<Self> {
let provider = if url_str.starts_with("http") {
let url: reqwest::Url = url_str.parse()?;
let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?;
let provider = ethers::providers::Http::new_with_client(url, http_client);
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
ethers::providers::Provider::new(provider)
.interval(Duration::from_secs(1))
.into()
} else if url_str.starts_with("ws") {
// TODO: wrapper automatically reconnect
let provider = ethers::providers::Ws::connect(url_str).await?;
// TODO: make sure this automatically reconnects
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
ethers::providers::Provider::new(provider)
.interval(Duration::from_secs(1))
.into()
} else {
return Err(anyhow::anyhow!("only http and ws servers are supported"));
};
Ok(provider)
}
}
impl fmt::Debug for Web3Provider {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url
f.debug_struct("Web3Provider").finish_non_exhaustive()
}
}
/// An active connection to a Web3Rpc
pub struct Web3Connection {
/// TODO: can we get this from the provider? do we even need it?
url: String,
/// keep track of currently open requests. We sort on this
active_requests: AtomicU32,
/// this in a RwLock so that we can replace it if re-connecting
provider: RwLock<Arc<Web3Provider>>,
chain_id: u64,
}
impl fmt::Debug for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Web3Connection")
.field("url", &self.url)
.finish_non_exhaustive()
}
}
impl fmt::Display for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", &self.url)
}
}
impl Web3Connection {
#[instrument(skip_all)]
pub async fn reconnect(
self: &Arc<Self>,
block_sender: &flume::Sender<(u64, H256, usize)>,
rpc_id: usize,
) -> anyhow::Result<()> {
// websocket doesn't need the http client
let http_client = None;
// since this lock is held open over an await, we use tokio's locking
let mut provider = self.provider.write().await;
// tell the block subscriber that we are at 0
block_sender.send_async((0, H256::zero(), rpc_id)).await?;
let new_provider = Web3Provider::from_str(&self.url, http_client).await?;
*provider = Arc::new(new_provider);
Ok(())
}
/// Connect to a web3 rpc and subscribe to new heads
#[instrument(name = "try_new_Web3Connection", skip(http_client))]
pub async fn try_new(
chain_id: u64,
url_str: String,
// optional because this is only used for http providers. websocket providers don't use it
http_client: Option<reqwest::Client>,
) -> anyhow::Result<Arc<Web3Connection>> {
let provider = Web3Provider::from_str(&url_str, http_client).await?;
let connection = Web3Connection {
url: url_str.clone(),
active_requests: 0.into(),
provider: RwLock::new(Arc::new(provider)),
chain_id,
};
Ok(Arc::new(connection))
}
#[instrument]
pub async fn check_chain_id(&self) -> anyhow::Result<()> {
// check the server's chain_id here
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
let found_chain_id: Result<String, _> =
self.request("eth_chainId", Option::None::<()>).await;
match found_chain_id {
Ok(found_chain_id) => {
let found_chain_id =
u64::from_str_radix(found_chain_id.trim_start_matches("0x"), 16).unwrap();
if self.chain_id != found_chain_id {
return Err(anyhow::anyhow!(
"incorrect chain id! Expected {}. Found {}",
self.chain_id,
found_chain_id
));
}
}
Err(e) => {
let e = anyhow::Error::from(e).context(format!("{}", self));
return Err(e);
}
}
info!("Successful connection");
Ok(())
}
/// Send a web3 request
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
/// By taking self here, we ensure that this is dropped after the request is complete
#[instrument(skip(params))]
pub async fn request<T, R>(
&self,
method: &str,
params: T,
) -> Result<R, ethers::prelude::ProviderError>
where
T: fmt::Debug + serde::Serialize + Send + Sync,
R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug,
{
// TODO: use tracing spans properly
// TODO: it would be nice to have the request id on this
// TODO: including params in this is way too verbose
trace!("Sending {} to {}", method, self.url);
let provider = self.provider.read().await.clone();
let response = match &*provider {
Web3Provider::Http(provider) => provider.request(method, params).await,
Web3Provider::Ws(provider) => provider.request(method, params).await,
};
// TODO: i think ethers already has trace logging (and does it much more fancy)
// TODO: at least instrument this with more useful information
trace!("Reply from {}", self.url);
response
}
#[instrument(skip_all)]
async fn send_block(
self: &Arc<Self>,
block: Result<Block<TxHash>, ProviderError>,
block_sender: &flume::Sender<(u64, H256, usize)>,
rpc_id: usize,
) {
match block {
Ok(block) => {
let block_number = block.number.unwrap().as_u64();
let block_hash = block.hash.unwrap();
// TODO: i'm pretty sure we don't need send_async, but double check
block_sender
.send_async((block_number, block_hash, rpc_id))
.await
.unwrap();
}
Err(e) => {
warn!("unable to get block from {}: {}", self, e);
}
}
}
/// Subscribe to new blocks. If `reconnect` is true, this runs forever.
/// TODO: instrument with the url
#[instrument(skip_all)]
pub async fn subscribe_new_heads(
self: Arc<Self>,
rpc_id: usize,
block_sender: flume::Sender<(u64, H256, usize)>,
reconnect: bool,
) -> anyhow::Result<()> {
loop {
info!("Watching new_heads on {}", self);
// TODO: is a RwLock of Arc the right thing here?
let provider = self.provider.read().await.clone();
match &*provider {
Web3Provider::Http(provider) => {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: what should this interval be? probably some fraction of block time. set automatically?
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
let mut interval = interval(Duration::from_secs(2));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let mut last_hash = Default::default();
loop {
// wait for the interval
// TODO: if error or rate limit, increase interval?
interval.tick().await;
// TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest"
let block: Result<Block<TxHash>, _> = provider
.request("eth_getBlockByNumber", ("latest", false))
.await;
// don't send repeat blocks
if let Ok(block) = &block {
let new_hash = block.hash.unwrap();
if new_hash == last_hash {
continue;
}
last_hash = new_hash;
}
self.send_block(block, &block_sender, rpc_id).await;
}
}
Web3Provider::Ws(provider) => {
// TODO: automatically reconnect?
// TODO: it would be faster to get the block number, but subscriptions don't provide that
// TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out?
let mut stream = provider.subscribe_blocks().await?;
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block
// TODO: rate limit!
let block: Result<Block<TxHash>, _> = provider
.request("eth_getBlockByNumber", ("latest", false))
.await;
self.send_block(block, &block_sender, rpc_id).await;
// TODO: should the stream have a timeout on it here?
// TODO: although reconnects will make this less of an issue
loop {
match stream.next().await {
Some(new_block) => {
self.send_block(Ok(new_block), &block_sender, rpc_id).await;
// TODO: really not sure about this
task::yield_now().await;
}
None => {
warn!("subscription ended");
break;
}
}
}
}
}
if reconnect {
drop(provider);
// TODO: exponential backoff
warn!("new heads subscription exited. reconnecting in 10 seconds...");
sleep(Duration::from_secs(10)).await;
self.reconnect(&block_sender, rpc_id).await?;
} else {
break;
}
}
info!("Done watching new_heads on {}", self);
Ok(())
}
}
impl Eq for Web3Connection {}
impl Ord for Web3Connection {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// TODO: what atomic ordering?!
let a = self.active_requests.load(atomic::Ordering::Acquire);
let b = other.active_requests.load(atomic::Ordering::Acquire);
a.cmp(&b)
}
}
impl PartialOrd for Web3Connection {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
/// note that this is just comparing the active requests. two providers with different rpc urls are equal!
impl PartialEq for Web3Connection {
fn eq(&self, other: &Self) -> bool {
// TODO: what ordering?!
self.active_requests.load(atomic::Ordering::Acquire)
== other.active_requests.load(atomic::Ordering::Acquire)
}
}

@ -0,0 +1,251 @@
///! Load balanced communication with a group of web3 providers
use arc_swap::ArcSwap;
use derive_more::From;
use ethers::prelude::H256;
use futures::future::join_all;
use hashbrown::{HashMap, HashSet};
use std::cmp;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::Arc;
use tokio::task;
use tracing::Instrument;
use tracing::{info, info_span, instrument, warn};
use crate::connection::Web3Connection;
#[derive(Clone, Debug)]
struct SyncedConnections {
head_block_num: u64,
head_block_hash: H256,
inner: HashSet<usize>,
}
impl SyncedConnections {
fn new(max_connections: usize) -> Self {
Self {
head_block_num: 0,
head_block_hash: Default::default(),
inner: HashSet::with_capacity(max_connections),
}
}
}
/// A collection of web3 connections. Sends requests either the current best server or all servers.
#[derive(From)]
pub struct Web3Connections {
inner: Vec<Arc<Web3Connection>>,
synced_connections: ArcSwap<SyncedConnections>,
}
impl fmt::Debug for Web3Connections {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("Web3Connections")
.field("inner", &self.inner)
.finish_non_exhaustive()
}
}
impl Web3Connections {
// #[instrument(name = "try_new_Web3Connections", skip_all)]
pub async fn try_new(
chain_id: u64,
servers: Vec<String>,
http_client: Option<reqwest::Client>,
) -> anyhow::Result<Arc<Self>> {
let num_connections = servers.len();
// turn configs into connections
let mut connections = Vec::with_capacity(num_connections);
for rpc_url in servers.into_iter() {
match Web3Connection::try_new(chain_id, rpc_url, http_client.clone()).await {
Ok(connection) => connections.push(connection),
Err(e) => warn!("Unable to connect to a server! {:?}", e),
}
}
if connections.len() < 2 {
// TODO: less than 3? what should we do here?
return Err(anyhow::anyhow!(
"need at least 2 connections when subscribing to heads!"
));
}
let synced_connections = SyncedConnections::new(num_connections);
let connections = Arc::new(Self {
inner: connections,
synced_connections: ArcSwap::new(Arc::new(synced_connections)),
});
Ok(connections)
}
pub async fn subscribe_heads(self: &Arc<Self>) -> anyhow::Result<()> {
let (block_sender, block_receiver) = flume::unbounded();
let mut handles = vec![];
for (rpc_id, connection) in self.inner.iter().enumerate() {
// subscribe to new heads in a spawned future
// TODO: channel instead. then we can have one future with write access to a left-right?
let connection = Arc::clone(connection);
let block_sender = block_sender.clone();
// let url = connection.url().to_string();
let handle = task::Builder::default()
.name("subscribe_new_heads")
.spawn(async move {
// loop to automatically reconnect
// TODO: make this cancellable?
// TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date
// TODO: proper span
connection.check_chain_id().await?;
connection
.subscribe_new_heads(rpc_id, block_sender.clone(), true)
.instrument(tracing::info_span!("url"))
.await
});
handles.push(handle);
}
let connections = Arc::clone(self);
let handle = task::Builder::default()
.name("update_synced_rpcs")
.spawn(async move { connections.update_synced_rpcs(block_receiver).await });
handles.push(handle);
for x in join_all(handles).await {
match x {
Ok(Ok(_)) => {}
Ok(Err(e)) => return Err(e),
Err(e) => return Err(e.into()),
}
}
Ok(())
}
/// TODO: move parts of this onto SyncedConnections?
#[instrument(skip_all)]
async fn update_synced_rpcs(
&self,
block_receiver: flume::Receiver<(u64, H256, usize)>,
) -> anyhow::Result<()> {
let max_connections = self.inner.len();
let mut connection_states: HashMap<usize, (u64, H256)> =
HashMap::with_capacity(max_connections);
let mut pending_synced_connections = SyncedConnections::new(max_connections);
while let Ok((new_block_num, new_block_hash, rpc_id)) = block_receiver.recv_async().await {
if new_block_num == 0 {
// TODO: show the actual rpc url?
warn!("rpc #{} is still syncing", rpc_id);
}
// TODO: span with rpc in it, too
// TODO: make sure i'm doing this span right
let span = info_span!("new_block", new_block_num);
let _enter = span.enter();
connection_states.insert(rpc_id, (new_block_num, new_block_hash));
// TODO: do something to update the synced blocks
match new_block_num.cmp(&pending_synced_connections.head_block_num) {
cmp::Ordering::Greater => {
// the rpc's newest block is the new overall best block
info!(rpc_id, "new head");
pending_synced_connections.inner.clear();
pending_synced_connections.inner.insert(rpc_id);
pending_synced_connections.head_block_num = new_block_num;
// TODO: if the parent hash isn't our previous best block, ignore it
pending_synced_connections.head_block_hash = new_block_hash;
}
cmp::Ordering::Equal => {
if new_block_hash == pending_synced_connections.head_block_hash {
// this rpc has caught up with the best known head
// do not clear synced_connections.
// we just want to add this rpc to the end
// TODO: HashSet here? i think we get dupes if we don't
pending_synced_connections.inner.insert(rpc_id);
} else {
// same height, but different chain
// check connection_states to see which head block is more popular!
let mut rpc_ids_by_block: BTreeMap<H256, Vec<usize>> = BTreeMap::new();
let mut synced_rpcs = 0;
// TODO: what order should we iterate in? track last update time, too?
for (rpc_id, (block_num, block_hash)) in connection_states.iter() {
if *block_num != new_block_num {
// this connection isn't synced. we don't care what hash it has
continue;
}
synced_rpcs += 1;
let count = rpc_ids_by_block
.entry(*block_hash)
.or_insert_with(|| Vec::with_capacity(max_connections - 1));
count.push(*rpc_id);
}
let most_common_head_hash = rpc_ids_by_block
.iter()
.max_by(|a, b| a.1.len().cmp(&b.1.len()))
.map(|(k, _v)| k)
.unwrap();
warn!(
"chain is forked! {} possible heads. {}/{}/{} rpcs have {}",
rpc_ids_by_block.len(),
rpc_ids_by_block.get(most_common_head_hash).unwrap().len(),
synced_rpcs,
max_connections,
most_common_head_hash
);
// this isn't the best block in the tier. make sure this rpc isn't included
if new_block_hash != *most_common_head_hash {
pending_synced_connections.inner.remove(&rpc_id);
}
// TODO: if pending_synced_connections hasn't changed. continue
}
}
cmp::Ordering::Less => {
// this isn't the best block in the tier. don't do anything
if !pending_synced_connections.inner.remove(&rpc_id) {
// we didn't remove anything. nothing more to do
continue;
}
// we removed. don't continue so that we update self.synced_connections
}
}
// the synced connections have changed
let synced_connections = Arc::new(pending_synced_connections.clone());
info!("new synced_connections: {:?}", synced_connections);
// TODO: only do this if there are 2 nodes synced to this block?
// do the arcswap
self.synced_connections.swap(synced_connections);
}
// TODO: if there was an error, we should return it
Err(anyhow::anyhow!("block_receiver exited!"))
}
}

@ -0,0 +1,88 @@
mod app;
mod connection;
mod connections;
use parking_lot::deadlock;
use std::env;
use std::sync::atomic::{self, AtomicUsize};
use std::thread;
use std::time::Duration;
use tokio::runtime;
use crate::app::Web3ProxyApp;
fn main() -> anyhow::Result<()> {
// TODO: is there a better way to do this?
if env::var("RUST_LOG").is_err() {
env::set_var("RUST_LOG", "web3_proxy_minimal=debug");
}
// install global collector configured based on RUST_LOG env var.
// tracing_subscriber::fmt().init();
console_subscriber::init();
fdlimit::raise_fd_limit();
let chain_id = 1;
let workers = 4;
let mut rt_builder = runtime::Builder::new_multi_thread();
rt_builder.enable_all().thread_name_fn(move || {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
// TODO: what ordering? i think we want seqcst so that these all happen in order, but that might be stricter than we really need
let worker_id = ATOMIC_ID.fetch_add(1, atomic::Ordering::SeqCst);
// TODO: i think these max at 15 characters
format!("web3-{}-{}", chain_id, worker_id)
});
if workers > 0 {
rt_builder.worker_threads(workers);
}
let rt = rt_builder.build()?;
// spawn a thread for deadlock detection
thread::spawn(move || loop {
thread::sleep(Duration::from_secs(10));
let deadlocks = deadlock::check_deadlock();
if deadlocks.is_empty() {
continue;
}
println!("{} deadlocks detected", deadlocks.len());
for (i, threads) in deadlocks.iter().enumerate() {
println!("Deadlock #{}", i);
for t in threads {
println!("Thread Id {:#?}", t.thread_id());
println!("{:#?}", t.backtrace());
}
}
});
// spawn the root task
rt.block_on(async {
let balanced_rpcs = vec![
"http://127.0.0.1:8545",
"ws://127.0.0.1:8546",
"http://127.0.0.1:8549",
"ws://127.0.0.1:8549",
"https://api.edennetwork.io/v1/",
"https://api.edennetwork.io/v1/beta",
"https://rpc.ethermine.org",
"https://rpc.flashbots.net",
"https://gibson.securerpc.com/v1",
"wss://ws-nd-373-761-850.p2pify.com/106d73af4cebc487df5ba92f1ad8dee7",
"wss://mainnet.infura.io/ws/v3/c6fa1b6f17124b44ae71b2b25601aee0",
"wss://ecfa2710350449f490725c4525cba584.eth.ws.rivet.cloud/",
"wss://speedy-nodes-nyc.moralis.io/3587198387de4b2d711f6999/eth/mainnet/archive/ws",
]
.into_iter()
.map(|x| x.to_string())
.collect();
let app = Web3ProxyApp::try_new(chain_id, balanced_rpcs).await?;
app.run().await
})
}

@ -17,7 +17,6 @@ use std::time::Duration;
use tokio::sync::watch;
use tokio::task;
use tokio::time::sleep;
use tracing::info_span;
use tracing::{debug, instrument, trace, warn};
static APP_USER_AGENT: &str = concat!(
@ -224,13 +223,12 @@ impl Web3ProxyApp {
} else {
// this is not a private transaction (or no private relays are configured)
// TODO: how much should we retry?
for i in 0..10usize {
for _i in 0..10usize {
// TODO: think more about this loop.
// // TODO: add more to this span. and do it properly
// let span = info_span!("i", ?i);
// let _enter = span.enter();
/*
// todo: move getting a cache_key or the result into a helper function. then we could have multiple caches
// TODO: i think we are maybe getting stuck on this lock. maybe a new block arrives, it tries to write and gets hung up on something. then this can't proceed
trace!("{:?} waiting for head_block_hash", request);
@ -297,7 +295,6 @@ impl Web3ProxyApp {
);
}
}
*/
match self.balanced_rpcs.next_upstream_server().await {
Ok(active_request_handle) => {
@ -310,15 +307,14 @@ impl Web3ProxyApp {
// TODO: trace here was really slow with millions of requests.
// trace!("forwarding request from {}", upstream_server);
JsonRpcForwardedResponse {
let response = JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
// TODO: since we only use the result here, should that be all we return from try_send_request?
result: Some(partial_response),
error: None,
}
};
/*
// TODO: small race condidition here. parallel requests with the same query will both be saved to the cache
let mut response_cache = self.response_cache.write();
@ -334,13 +330,12 @@ impl Web3ProxyApp {
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
*/
// response
response
}
Err(e) => {
// // send now since we aren't going to cache an error response
// let _ = in_flight_tx.send(false);
// send now since we aren't going to cache an error response
let _ = in_flight_tx.send(false);
// TODO: move this to a helper function?
let code;
@ -402,8 +397,8 @@ impl Web3ProxyApp {
}
};
// // TODO: needing to remove manually here makes me think we should do this differently
// let _ = self.active_requests.remove(&cache_key);
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
if response.error.is_some() {
trace!("Sending error reply: {:?}", response);
@ -412,7 +407,7 @@ impl Web3ProxyApp {
} else {
trace!("Sending reply: {:?}", response);
// let _ = in_flight_tx.send(false);
let _ = in_flight_tx.send(false);
}
return Ok(response);
@ -421,9 +416,9 @@ impl Web3ProxyApp {
// TODO: this is too verbose. if there are other servers in other tiers, we use those!
warn!("No servers in sync!");
// // TODO: needing to remove manually here makes me think we should do this differently
// let _ = self.active_requests.remove(&cache_key);
// let _ = in_flight_tx.send(false);
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
return Err(anyhow::anyhow!("no servers in sync"));
}
@ -439,9 +434,9 @@ impl Web3ProxyApp {
warn!("All rate limits exceeded. Sleeping");
// // TODO: needing to remove manually here makes me think we should do this differently
// let _ = self.active_requests.remove(&cache_key);
// let _ = in_flight_tx.send(false);
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
continue;
}