transaction subscription getting closer

This commit is contained in:
Bryan Stitt 2022-06-14 04:04:14 +00:00
parent 2a8ec0ea61
commit 2e559f3063
9 changed files with 536 additions and 376 deletions

42
Cargo.lock generated

@ -177,9 +177,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.5.6"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab2504b827a8bef941ba3dd64bdffe9cf56ca182908a147edd6189c95fbcae7d"
checksum = "dc47084705629d09d15060d70a8dbfce479c842303d05929ce29c74c995916ae"
dependencies = [
"async-trait",
"axum-core",
@ -211,9 +211,9 @@ dependencies = [
[[package]]
name = "axum-core"
version = "0.2.4"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da31c0ed7b4690e2c78fe4b880d21cd7db04a346ebc658b4270251b695437f17"
checksum = "c2efed1c501becea07ce48118786ebcf229531d0d3b28edf224a720020d9e106"
dependencies = [
"async-trait",
"bytes",
@ -1407,9 +1407,9 @@ checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e"
[[package]]
name = "flume"
version = "0.10.12"
version = "0.10.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "843c03199d0c0ca54bc1ea90ac0d507274c28abcc4f691ae8b4eaa375087c76a"
checksum = "1ceeb589a3157cac0ab8cc585feb749bd2cea5cb55a6ee802ad72d9fd38303da"
dependencies = [
"futures-core",
"futures-sink",
@ -2722,9 +2722,9 @@ dependencies = [
[[package]]
name = "reqwest"
version = "0.11.10"
version = "0.11.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46a1f7aa4f35e5e8b4160449f51afc758f0ce6454315a9fa7d0d113e958c41eb"
checksum = "b75aa69a3f06bbcc66ede33af2af253c6f7a86b1ca0033f60c580a27074fbf92"
dependencies = [
"base64 0.13.0",
"bytes",
@ -2750,6 +2750,7 @@ dependencies = [
"serde_urlencoded",
"tokio",
"tokio-rustls",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
@ -2857,9 +2858,9 @@ dependencies = [
[[package]]
name = "rustls-pemfile"
version = "0.3.0"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ee86d63972a7c661d1536fefe8c3c8407321c3df668891286de28abcd087360"
checksum = "e7522c9de787ff061458fe9a829dc790a3f5b22dc571694fc5883f448b94d9a9"
dependencies = [
"base64 0.13.0",
]
@ -3432,9 +3433,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.18.2"
version = "1.19.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4903bf0427cf68dddd5aa6a93220756f8be0c34fcfa9f5e6191e103e15a31395"
checksum = "c51a52ed6686dd62c320f9b89299e9dfb46f730c7a48e635c19f21d116cb1439"
dependencies = [
"bytes",
"libc",
@ -3475,14 +3476,14 @@ dependencies = [
[[package]]
name = "tokio-stream"
version = "0.1.8"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3"
checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util 0.6.10",
"tokio-util 0.7.2",
]
[[package]]
@ -3549,7 +3550,6 @@ dependencies = [
"pin-project",
"pin-project-lite",
"tokio",
"tokio-util 0.7.2",
"tower-layer",
"tower-service",
"tracing",
@ -3588,9 +3588,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]]
name = "tracing"
version = "0.1.34"
version = "0.1.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09"
checksum = "a400e31aa60b9d44a52a8ee0343b5b18566b03a8321e0d321f695cf56e940160"
dependencies = [
"cfg-if",
"log",
@ -3612,11 +3612,11 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.26"
version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f"
checksum = "7709595b8878a4965ce5e87ebf880a7d39c9afc6837721b21a5a816a8117d921"
dependencies = [
"lazy_static",
"once_cell",
"valuable",
]

11
TODO.md

@ -1,10 +1,12 @@
# Todo
- [ ] support websocket clients
- [ ] if web3 proxy gets an http error back, retry another node
- [ ] refactor Connection::spawn. have it return a handle to the spawned future of it running with block and transaction subscriptions
- [ ] refactor Connections::spawn. have it return a handle that is selecting on those handles?
- [x] support websocket clients
- we support websockets for the backends already, but we need them for the frontend too
- [ ] when block subscribers receive blocks, store them in a cache
- [ ] have a /ws endpoint (figure out how to route on / later)
- [ ] ws endpoint shouldn't be that different from the http endpoint
- [ ] when block subscribers receive blocks, store them in a cache. use this cache instead of querying eth_getBlock
- [x] have a /ws endpoint (figure out how to route on / later)
- inspect any jsonrpc errors. if its something like "header not found" or "block with id $x not found" retry on another node (and add a negative score to that server)
- this error seems to happen when we use load balanced rpcs
- [x] use redis and redis-cell for rate limits
@ -41,6 +43,7 @@
- [ ] one proxy for mulitple chains?
- [ ] zero downtime deploys
- [ ] are we using Acquire/Release/AcqRel properly? or do we need other modes?
- [ ] subscription id should be per connection, not global
- [x] simple proxy
- [x] better locking. when lots of requests come in, we seem to be in the way of block updates
- [x] load balance between multiple RPC servers

@ -9,13 +9,13 @@ edition = "2021"
anyhow = "1.0.57"
arc-swap = "1.5.0"
argh = "0.1.7"
axum = { version = "0.5.6", features = ["serde_json", "tokio-tungstenite", "ws"] }
axum = { version = "0.5.7", features = ["serde_json", "tokio-tungstenite", "ws"] }
counter = "0.5.5"
dashmap = "5.3.4"
derive_more = "0.99.17"
ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] }
fdlimit = "0.2.1"
flume = "0.10.12"
flume = "0.10.13"
futures = { version = "0.3.21", features = ["thread-pool"] }
hashbrown = "0.12.1"
linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] }
@ -25,15 +25,15 @@ parking_lot = { version = "0.12.1", features = ["deadlock_detection"] }
proctitle = "0.1.1"
# TODO: regex has several "perf" features that we might want to use
regex = "1.5.6"
reqwest = { version = "0.11.10", default-features = false, features = ["json", "tokio-rustls"] }
reqwest = { version = "0.11.11", default-features = false, features = ["json", "tokio-rustls"] }
rustc-hash = "1.1.0"
serde = { version = "1.0.137", features = [] }
serde_json = { version = "1.0.81", default-features = false, features = ["alloc", "raw_value"] }
tokio = { version = "1.18.2", features = ["full", "tracing"] }
tokio = { version = "1.19.2", features = ["full", "tracing"] }
toml = "0.5.9"
tracing = "0.1.34"
tracing = "0.1.35"
# TODO: tracing-subscriber has serde and serde_json features that we might want to use
tracing-subscriber = { version = "0.3.11", features = ["env-filter", "parking_lot"] }
url = "2.2.2"
tower = "0.4.12"
tokio-stream = { version = "0.1.8", features = ["sync"] }
tokio-stream = { version = "0.1.9", features = ["sync"] }

@ -6,6 +6,7 @@ use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum;
use axum::extract::ws::Message;
use dashmap::DashMap;
use ethers::prelude::TransactionReceipt;
use ethers::prelude::{Block, TxHash, H256};
use futures::future::Abortable;
use futures::future::{join_all, AbortHandle};
@ -18,10 +19,9 @@ use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use tokio::task;
use tokio::time::timeout;
use tokio_stream::wrappers::WatchStream;
use tracing::{debug, info, instrument, trace, warn};
use tracing::{debug, info, info_span, instrument, trace, warn, Instrument};
static APP_USER_AGENT: &str = concat!(
"satoshiandkin/",
@ -52,6 +52,8 @@ pub struct Web3ProxyApp {
response_cache: ResponseLrcCache,
// don't drop this or the sender will stop working
head_block_receiver: watch::Receiver<Block<TxHash>>,
// TODO: i think we want a TxState enum for Confirmed(TxHash, BlockHash) or Pending(TxHash) or Orphan(TxHash, BlockHash)
pending_tx_receipt_receiver: flume::Receiver<TransactionReceipt>,
next_subscription_id: AtomicUsize,
}
@ -63,12 +65,12 @@ impl fmt::Debug for Web3ProxyApp {
}
impl Web3ProxyApp {
pub async fn try_new(
pub async fn spawn(
chain_id: usize,
redis_address: Option<String>,
balanced_rpcs: Vec<Web3ConnectionConfig>,
private_rpcs: Vec<Web3ConnectionConfig>,
) -> anyhow::Result<Web3ProxyApp> {
) -> anyhow::Result<Arc<Web3ProxyApp>> {
// make a http shared client
// TODO: how should we configure the connection pool?
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
@ -85,6 +87,7 @@ impl Web3ProxyApp {
info!("Connecting to redis on {}", redis_address);
let redis_client = redis_cell_client::Client::open(redis_address)?;
// TODO: r2d2 connection pool?
let redis_conn = redis_client.get_multiplexed_tokio_connection().await?;
Some(redis_conn)
@ -95,47 +98,52 @@ impl Web3ProxyApp {
}
};
// TODO: subscribe to pending transactions on the private rpcs, too?
let (head_block_sender, head_block_receiver) = watch::channel(Block::default());
let (pending_tx_receipt_sender, pending_tx_receipt_receiver) = flume::unbounded();
// TODO: attach context to this error
let balanced_rpcs = Web3Connections::try_new(
let balanced_rpcs = Web3Connections::spawn(
chain_id,
balanced_rpcs,
http_client.as_ref(),
rate_limiter.as_ref(),
Some(head_block_sender),
Some(pending_tx_receipt_sender),
)
.await?;
let (head_block_sender, head_block_receiver) = watch::channel(Block::default());
// TODO: do this separately instead of during try_new
{
let balanced_rpcs = balanced_rpcs.clone();
task::spawn(async move {
balanced_rpcs.subscribe_all_heads(head_block_sender).await;
});
}
// TODO: attach context to this error
let private_rpcs = if private_rpcs.is_empty() {
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
balanced_rpcs.clone()
} else {
Web3Connections::try_new(
// TODO: attach context to this error
Web3Connections::spawn(
chain_id,
private_rpcs,
http_client.as_ref(),
rate_limiter.as_ref(),
// subscribing to new heads here won't work well
None,
// TODO: subscribe to pending transactions on the private rpcs, too?
None,
)
.await?
};
Ok(Web3ProxyApp {
let app = Web3ProxyApp {
balanced_rpcs,
private_rpcs,
incoming_requests: Default::default(),
response_cache: Default::default(),
head_block_receiver,
pending_tx_receipt_receiver,
next_subscription_id: 1.into(),
})
};
let app = Arc::new(app);
Ok(app)
}
pub async fn eth_subscribe(
@ -146,57 +154,60 @@ impl Web3ProxyApp {
) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> {
let (subscription_handle, subscription_registration) = AbortHandle::new_pair();
// TODO: this only needs to be unique per connection. we don't need it globably unique
let subscription_id = self
.next_subscription_id
.fetch_add(1, atomic::Ordering::SeqCst);
let subscription_id = format!("{:#x}", subscription_id);
// save the id so we can use it in the response
let id = payload.id.clone();
let f = {
let subscription_future = {
let subscription_id = subscription_id.clone();
let params = payload.params.as_deref().unwrap().get();
match payload.params.as_deref().unwrap().get() {
r#"["newHeads"]"# => {
let head_block_receiver = self.head_block_receiver.clone();
if params == r#"["newHeads"]"# {
let head_block_receiver = self.head_block_receiver.clone();
trace!(?subscription_id, "new heads subscription");
async move {
let mut head_block_receiver = Abortable::new(
WatchStream::new(head_block_receiver),
subscription_registration,
);
trace!(?subscription_id, "new heads subscription");
async move {
let mut head_block_receiver = Abortable::new(
WatchStream::new(head_block_receiver),
subscription_registration,
);
while let Some(new_head) = head_block_receiver.next().await {
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json!({
"jsonrpc": "2.0",
"method":"eth_subscription",
"params": {
"subscription": subscription_id,
"result": new_head,
},
});
while let Some(new_head) = head_block_receiver.next().await {
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json!({
"jsonrpc": "2.0",
"method":"eth_subscription",
"params": {
"subscription": subscription_id,
"result": new_head,
},
});
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
if subscription_tx.send_async(msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
}
if subscription_tx.send_async(msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
trace!(?subscription_id, "closed new heads subscription");
}
trace!(?subscription_id, "closed new heads subscription");
}
} else {
return Err(anyhow::anyhow!("unimplemented"));
r#"["pendingTransactions"]"# => {
unimplemented!("pendingTransactions")
}
_ => return Err(anyhow::anyhow!("unimplemented")),
}
};
tokio::spawn(f);
// TODO: what should we do with this handle? i think its fine to just drop it
tokio::spawn(subscription_future);
let response = JsonRpcForwardedResponse::from_string(subscription_id, id);
@ -319,89 +330,92 @@ impl Web3ProxyApp {
// TODO: how much should we retry? probably with a timeout and not with a count like this
// TODO: think more about this loop.
// // TODO: add more to this span
// let span = info_span!("i", ?i);
// // TODO: add more to this span such as
let span = info_span!("rpc_request");
// let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon)
if request.method == "eth_sendRawTransaction" {
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
// TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit
return self
.private_rpcs
.try_send_all_upstream_servers(request)
.await;
} else {
// this is not a private transaction (or no private relays are configured)
let (cache_key, response_cache) = match self.get_cached_response(&request) {
(cache_key, Ok(response)) => {
let _ = self.incoming_requests.remove(&cache_key);
return Ok(response);
}
(cache_key, Err(response_cache)) => (cache_key, response_cache),
};
// check if this request is already in flight
// TODO: move this logic into an IncomingRequestHandler (ActiveRequestHandler has an rpc, but this won't)
let (incoming_tx, incoming_rx) = watch::channel(true);
let mut other_incoming_rx = None;
match self.incoming_requests.entry(cache_key.clone()) {
dashmap::mapref::entry::Entry::Occupied(entry) => {
other_incoming_rx = Some(entry.get().clone());
}
dashmap::mapref::entry::Entry::Vacant(entry) => {
entry.insert(incoming_rx);
}
match &request.method[..] {
"eth_sendRawTransaction" => {
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
// TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit
self.private_rpcs
.try_send_all_upstream_servers(request)
.instrument(span)
.await
}
_ => {
// this is not a private transaction (or no private relays are configured)
if let Some(mut other_incoming_rx) = other_incoming_rx {
// wait for the other request to finish. it might have finished successfully or with an error
trace!("{:?} waiting on in-flight request", request);
let (cache_key, response_cache) = match self.get_cached_response(&request) {
(cache_key, Ok(response)) => {
let _ = self.incoming_requests.remove(&cache_key);
let _ = other_incoming_rx.changed().await;
return Ok(response);
}
(cache_key, Err(response_cache)) => (cache_key, response_cache),
};
// now that we've waited, lets check the cache again
if let Some(cached) = response_cache.read().get(&cache_key) {
let _ = self.incoming_requests.remove(&cache_key);
let _ = incoming_tx.send(false);
// TODO: emit a stat
trace!(
"{:?} cache hit after waiting for in-flight request!",
request
);
return Ok(cached.to_owned());
} else {
// TODO: emit a stat
trace!(
"{:?} cache miss after waiting for in-flight request!",
request
);
// check if this request is already in flight
// TODO: move this logic into an IncomingRequestHandler (ActiveRequestHandler has an rpc, but this won't)
let (incoming_tx, incoming_rx) = watch::channel(true);
let mut other_incoming_rx = None;
match self.incoming_requests.entry(cache_key.clone()) {
dashmap::mapref::entry::Entry::Occupied(entry) => {
other_incoming_rx = Some(entry.get().clone());
}
dashmap::mapref::entry::Entry::Vacant(entry) => {
entry.insert(incoming_rx);
}
}
if let Some(mut other_incoming_rx) = other_incoming_rx {
// wait for the other request to finish. it might have finished successfully or with an error
trace!("{:?} waiting on in-flight request", request);
let _ = other_incoming_rx.changed().await;
// now that we've waited, lets check the cache again
if let Some(cached) = response_cache.read().get(&cache_key) {
let _ = self.incoming_requests.remove(&cache_key);
let _ = incoming_tx.send(false);
// TODO: emit a stat
trace!(
"{:?} cache hit after waiting for in-flight request!",
request
);
return Ok(cached.to_owned());
} else {
// TODO: emit a stat
trace!(
"{:?} cache miss after waiting for in-flight request!",
request
);
}
}
let response = self
.balanced_rpcs
.try_send_best_upstream_server(request)
.await?;
// TODO: small race condidition here. parallel requests with the same query will both be saved to the cache
let mut response_cache = response_cache.write();
// TODO: cache the warp::reply to save us serializing every time
response_cache.insert(cache_key.clone(), response.clone());
if response_cache.len() >= RESPONSE_CACHE_CAP {
// TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block
response_cache.pop_front();
}
drop(response_cache);
let _ = self.incoming_requests.remove(&cache_key);
let _ = incoming_tx.send(false);
Ok(response)
}
let response = self
.balanced_rpcs
.try_send_best_upstream_server(request)
.await?;
// TODO: small race condidition here. parallel requests with the same query will both be saved to the cache
let mut response_cache = response_cache.write();
// TODO: cache the warp::reply to save us serializing every time
response_cache.insert(cache_key.clone(), response.clone());
if response_cache.len() >= RESPONSE_CACHE_CAP {
// TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block
response_cache.pop_front();
}
drop(response_cache);
let _ = self.incoming_requests.remove(&cache_key);
let _ = incoming_tx.send(false);
Ok(response)
}
}
}

@ -47,7 +47,7 @@ pub struct Web3ConnectionConfig {
impl RpcConfig {
/// Create a Web3ProxyApp from config
// #[instrument(name = "try_build_RpcConfig", skip_all)]
pub async fn try_build(self) -> anyhow::Result<Web3ProxyApp> {
pub async fn try_build(self) -> anyhow::Result<Arc<Web3ProxyApp>> {
let balanced_rpcs = self.balanced_rpcs.into_values().collect();
let private_rpcs = if let Some(private_rpcs) = self.private_rpcs {
@ -56,7 +56,7 @@ impl RpcConfig {
vec![]
};
Web3ProxyApp::try_new(
Web3ProxyApp::spawn(
self.shared.chain_id,
self.shared.rate_limit_redis,
balanced_rpcs,
@ -77,7 +77,7 @@ impl Web3ConnectionConfig {
) -> anyhow::Result<Arc<Web3Connection>> {
let hard_rate_limit = self.hard_limit.map(|x| (x, redis_conn.unwrap()));
Web3Connection::try_new(
Web3Connection::spawn(
chain_id,
self.url,
http_client,

@ -6,6 +6,7 @@ use redis_cell_client::RedisCellClient;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicU32};
use std::{cmp::Ordering, sync::Arc};
use tokio::sync::RwLock;
@ -115,33 +116,9 @@ impl fmt::Display for Web3Connection {
}
impl Web3Connection {
#[instrument(skip_all)]
pub async fn reconnect(
self: &Arc<Self>,
block_sender: &flume::Sender<(Block<TxHash>, usize)>,
rpc_id: usize,
) -> anyhow::Result<()> {
// websocket doesn't need the http client
let http_client = None;
// since this lock is held open over an await, we use tokio's locking
let mut provider = self.provider.write().await;
*provider = None;
// tell the block subscriber that we are at 0
block_sender.send_async((Block::default(), rpc_id)).await?;
let new_provider = Web3Provider::from_str(&self.url, http_client).await?;
*provider = Some(Arc::new(new_provider));
Ok(())
}
/// Connect to a web3 rpc and subscribe to new heads
#[instrument(name = "try_new_Web3Connection", skip(hard_limit, http_client))]
pub async fn try_new(
/// Connect to a web3 rpc
#[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))]
pub async fn spawn(
chain_id: usize,
url_str: String,
// optional because this is only used for http providers. websocket providers don't use it
@ -176,9 +153,10 @@ impl Web3Connection {
// check the server's chain_id here
// TODO: move this outside the `new` function and into a `start` function or something
let active_request_handle = connection.wait_for_request_handle().await;
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
let found_chain_id: Result<String, _> = active_request_handle
let found_chain_id: Result<String, _> = connection
.wait_for_request_handle()
.await
.request("eth_chainId", Option::None::<()>)
.await;
@ -206,6 +184,31 @@ impl Web3Connection {
Ok(connection)
}
#[instrument(skip_all)]
pub async fn reconnect(
self: &Arc<Self>,
block_sender: &flume::Sender<(Block<TxHash>, Arc<Self>)>,
) -> anyhow::Result<()> {
// websocket doesn't need the http client
let http_client = None;
// since this lock is held open over an await, we use tokio's locking
let mut provider = self.provider.write().await;
*provider = None;
// tell the block subscriber that we are at 0
block_sender
.send_async((Block::default(), self.clone()))
.await?;
let new_provider = Web3Provider::from_str(&self.url, http_client).await?;
*provider = Some(Arc::new(new_provider));
Ok(())
}
#[inline]
pub fn active_requests(&self) -> u32 {
self.active_requests.load(atomic::Ordering::Acquire)
@ -225,13 +228,12 @@ impl Web3Connection {
async fn send_block(
self: &Arc<Self>,
block: Result<Block<TxHash>, ProviderError>,
block_sender: &flume::Sender<(Block<TxHash>, usize)>,
rpc_id: usize,
block_sender: &flume::Sender<(Block<TxHash>, Arc<Self>)>,
) -> anyhow::Result<()> {
match block {
Ok(block) => {
// TODO: i'm pretty sure we don't need send_async, but double check
block_sender.send_async((block, rpc_id)).await?;
block_sender.send_async((block, self.clone())).await?;
}
Err(e) => {
warn!("unable to get block from {}: {}", self, e);
@ -241,120 +243,216 @@ impl Web3Connection {
Ok(())
}
/// Subscribe to new blocks. If `reconnect` is true, this runs forever.
/// TODO: instrument with the url
#[instrument(skip_all)]
pub async fn subscribe_new_heads(
pub async fn subscribe(
self: Arc<Self>,
rpc_id: usize,
block_sender: flume::Sender<(Block<TxHash>, usize)>,
block_sender: flume::Sender<(Block<TxHash>, Arc<Self>)>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
reconnect: bool,
) -> anyhow::Result<()> {
loop {
info!("Watching new_heads on {}", self);
// TODO: make these abortable so that if one fails the other can be cancelled?
// TODO: is a RwLock of an Option<Arc> the right thing here?
if let Some(provider) = self.provider.read().await.clone() {
match &*provider {
Web3Provider::Http(provider) => {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: what should this interval be? probably some fraction of block time. set automatically?
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
let mut interval = interval(Duration::from_secs(2));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let new_heads = {
let clone = self.clone();
let block_sender = block_sender.clone();
let mut last_hash = Default::default();
clone.subscribe_new_heads(block_sender)
};
loop {
// wait for the interval
// TODO: if error or rate limit, increase interval?
interval.tick().await;
let pending_txs = {
let clone = self.clone();
let tx_id_sender = tx_id_sender.clone();
match self.try_request_handle().await {
Ok(active_request_handle) => {
// TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest"
let block: Result<Block<TxHash>, _> = provider
.request("eth_getBlockByNumber", ("latest", false))
.await;
clone.subscribe_pending_transactions(tx_id_sender)
};
drop(active_request_handle);
// don't send repeat blocks
if let Ok(block) = &block {
let new_hash = block.hash.unwrap();
if new_hash == last_hash {
continue;
}
last_hash = new_hash;
}
self.send_block(block, &block_sender, rpc_id).await?;
}
Err(e) => {
warn!("Failed getting latest block from {}: {:?}", self, e);
}
}
}
}
Web3Provider::Ws(provider) => {
// rate limits
let active_request_handle = self.wait_for_request_handle().await;
// TODO: automatically reconnect?
// TODO: it would be faster to get the block number, but subscriptions don't provide that
// TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out?
let mut stream = provider.subscribe_blocks().await?;
drop(active_request_handle);
let active_request_handle = self.wait_for_request_handle().await;
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block
// TODO: rate limit!
let block: Result<Block<TxHash>, _> = active_request_handle
.request("eth_getBlockByNumber", ("latest", false))
.await;
self.send_block(block, &block_sender, rpc_id).await?;
// TODO: should the stream have a timeout on it here?
// TODO: although reconnects will make this less of an issue
loop {
match stream.next().await {
Some(new_block) => {
self.send_block(Ok(new_block), &block_sender, rpc_id)
.await?;
// TODO: really not sure about this
task::yield_now().await;
}
None => {
warn!("subscription ended");
break;
}
}
}
}
tokio::select! {
_ = new_heads => {
info!(?self, "new heads subscription completed");
}
_ = pending_txs => {
info!(?self, "pending transactions subscription completed");
}
}
if reconnect {
// TODO: exponential backoff
warn!("new heads subscription exited. Attempting to reconnect in 1 second...");
// TODO: share code with new heads subscription
warn!("pending transactions subscription exited. Attempting to reconnect in 1 second...");
sleep(Duration::from_secs(1)).await;
// TODO: loop on reconnecting! do not return with a "?" here
self.reconnect(&block_sender, rpc_id).await?;
// TODO: this isn't going to work. it will get in a loop with newHeads
self.reconnect(&block_sender).await?;
} else {
break;
}
}
info!("Done watching new_heads on {}", self);
Ok(())
}
/// Subscribe to new blocks. If `reconnect` is true, this runs forever.
/// TODO: instrument with the url
#[instrument(skip_all)]
async fn subscribe_new_heads(
self: Arc<Self>,
block_sender: flume::Sender<(Block<TxHash>, Arc<Self>)>,
) -> anyhow::Result<()> {
info!("Watching new_heads on {}", self);
// TODO: is a RwLock of an Option<Arc> the right thing here?
if let Some(provider) = self.provider.read().await.clone() {
match &*provider {
Web3Provider::Http(_provider) => {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: what should this interval be? probably some fraction of block time. set automatically?
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
let mut interval = interval(Duration::from_secs(2));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let mut last_hash = Default::default();
loop {
// wait for the interval
// TODO: if error or rate limit, increase interval?
interval.tick().await;
match self.try_request_handle().await {
Ok(active_request_handle) => {
// TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest"
let block: Result<Block<TxHash>, _> = active_request_handle
.request("eth_getBlockByNumber", ("latest", false))
.await;
// don't send repeat blocks
if let Ok(block) = &block {
let new_hash = block.hash.unwrap();
if new_hash == last_hash {
continue;
}
last_hash = new_hash;
}
self.send_block(block, &block_sender).await?;
}
Err(e) => {
warn!("Failed getting latest block from {}: {:?}", self, e);
}
}
}
}
Web3Provider::Ws(provider) => {
let active_request_handle = self.wait_for_request_handle().await;
let mut stream = provider.subscribe_blocks().await?;
drop(active_request_handle);
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block
// TODO: subscribe to Block<TransactionReceipt> instead?
let block: Result<Block<TxHash>, _> = self
.wait_for_request_handle()
.await
.request("eth_getBlockByNumber", ("latest", false))
.await;
self.send_block(block, &block_sender).await?;
// TODO: should the stream have a timeout on it here?
// TODO: although reconnects will make this less of an issue
loop {
match stream.next().await {
Some(new_block) => {
self.send_block(Ok(new_block), &block_sender).await?;
// TODO: really not sure about this
task::yield_now().await;
}
None => {
warn!("subscription ended");
break;
}
}
}
}
}
}
Ok(())
}
#[instrument(skip_all)]
async fn subscribe_pending_transactions(
self: Arc<Self>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
) -> anyhow::Result<()> {
info!("watching pending transactions on {}", self);
// TODO: is a RwLock of an Option<Arc> the right thing here?
if let Some(provider) = self.provider.read().await.clone() {
match &*provider {
Web3Provider::Http(_provider) => {
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: what should this interval be? probably automatically set to some fraction of block time
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
let mut interval = interval(Duration::from_secs(2));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
// TODO: create a filter
loop {
// wait for the interval
// TODO: if error or rate limit, increase interval?
interval.tick().await;
match self.try_request_handle().await {
Ok(active_request_handle) => {
// TODO: check the filter
unimplemented!("actually send a request");
}
Err(e) => {
warn!("Failed getting latest block from {}: {:?}", self, e);
}
}
}
}
Web3Provider::Ws(provider) => {
// rate limits
let active_request_handle = self.wait_for_request_handle().await;
// TODO: automatically reconnect?
// TODO: it would be faster to get the block number, but subscriptions don't provide that
// TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out?
let mut stream = provider.subscribe_pending_txs().await?;
drop(active_request_handle);
// TODO: query existing pending txs?
// TODO: should the stream have a timeout on it here?
// TODO: although reconnects will make this less of an issue
loop {
match stream.next().await {
Some(pending_tx_id) => {
tx_id_sender
.send_async((pending_tx_id, self.clone()))
.await?;
}
None => {
warn!("subscription ended");
break;
}
}
}
}
}
}
Ok(())
}
@ -404,6 +502,12 @@ impl Web3Connection {
}
}
impl Hash for Web3Connection {
fn hash<H: Hasher>(&self, state: &mut H) {
self.url.hash(state);
}
}
/// Drop this once a connection completes
pub struct ActiveRequestHandle(Arc<Web3Connection>);

@ -2,8 +2,7 @@
use arc_swap::ArcSwap;
use counter::Counter;
use derive_more::From;
use ethers::prelude::{Block, ProviderError, TxHash, H256};
use futures::future::join_all;
use ethers::prelude::{Block, ProviderError, TransactionReceipt, TxHash, H256};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hashbrown::HashMap;
@ -30,7 +29,9 @@ use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
struct SyncedConnections {
head_block_num: u64,
head_block_hash: H256,
inner: BTreeSet<usize>,
// TODO: this should be able to serialize, but it isn't
#[serde(skip_serializing)]
inner: BTreeSet<Arc<Web3Connection>>,
}
impl fmt::Debug for SyncedConnections {
@ -78,12 +79,14 @@ impl fmt::Debug for Web3Connections {
}
impl Web3Connections {
// #[instrument(name = "try_new_Web3Connections", skip_all)]
pub async fn try_new(
// #[instrument(name = "spawn_Web3Connections", skip_all)]
pub async fn spawn(
chain_id: usize,
servers: Vec<Web3ConnectionConfig>,
http_client: Option<&reqwest::Client>,
rate_limiter: Option<&redis_cell_client::MultiplexedConnection>,
head_block_sender: Option<watch::Sender<Block<TxHash>>>,
pending_tx_receipt_sender: Option<flume::Sender<TransactionReceipt>>,
) -> anyhow::Result<Arc<Self>> {
let num_connections = servers.len();
@ -111,55 +114,90 @@ impl Web3Connections {
synced_connections: ArcSwap::new(Arc::new(synced_connections)),
});
let handle = {
let connections = connections.clone();
tokio::spawn(async move {
connections
.subscribe(head_block_sender, pending_tx_receipt_sender)
.await
})
};
Ok(connections)
}
pub async fn subscribe_all_heads(
self: &Arc<Self>,
head_block_sender: watch::Sender<Block<TxHash>>,
) {
// TODO: benchmark bounded vs unbounded
let (block_sender, block_receiver) = flume::unbounded::<(Block<TxHash>, usize)>();
/// subscribe to all the backend rpcs
async fn subscribe(
self: Arc<Self>,
head_block_sender: Option<watch::Sender<Block<TxHash>>>,
pending_tx_receipt_sender: Option<flume::Sender<TransactionReceipt>>,
) -> anyhow::Result<()> {
let mut futures = FuturesUnordered::new();
let mut handles = vec![];
// subscribe to pending transactions
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) = flume::unbounded();
for (rpc_id, connection) in self.inner.iter().enumerate() {
// subscribe to new heads in a spawned future
// TODO: channel instead. then we can have one future with write access to a left-right?
let connection = Arc::clone(connection);
// one future subscribes to pendingTransactions on all the rpcs. it sends them through the funnel
// TODO: do this only when someone is subscribed. otherwise this will be way too many queries
for (rpc_id, connection) in self.inner.iter().cloned().enumerate() {
let pending_tx_id_sender = pending_tx_id_sender.clone();
let block_sender = block_sender.clone();
// let url = connection.url().to_string();
let handle = task::Builder::default()
.name("subscribe_new_heads")
.spawn(async move {
// loop to automatically reconnect
// TODO: make this cancellable?
// TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date
// TODO: proper span
connection
.subscribe_new_heads(rpc_id, block_sender.clone(), true)
.instrument(tracing::info_span!("url"))
.await
});
handles.push(handle);
}
let connections = Arc::clone(self);
let handle = task::Builder::default()
.name("update_synced_rpcs")
.spawn(async move {
connections
.update_synced_rpcs(block_receiver, head_block_sender)
let handle = tokio::spawn(async move {
// loop to automatically reconnect
// TODO: make this cancellable?
// TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date
// TODO: proper span
connection
.subscribe(block_sender, pending_tx_id_sender, true)
.instrument(tracing::info_span!("rpc", ?rpc_id))
.await
});
handles.push(handle);
futures.push(handle);
}
// TODO: do something with join_all's result
join_all(handles).await;
// the next future subscribes to the transaction funnel
// it skips any duplicates (unless they are being orphaned)
// fetches new transactions from the notifying rpc
// forwards new transacitons to pending_tx_receipt_sender
{
// TODO: do something with the handle so we can catch any errors
let handle = task::spawn(async move {
while let Ok((pending_transaction_id, rpc)) =
pending_tx_id_receiver.recv_async().await
{
unimplemented!("de-dedup the pending txid")
}
Ok(())
});
futures.push(handle);
}
// the next future subscribes to the block funnel
if let Some(head_block_sender) = head_block_sender {
let connections = Arc::clone(&self);
let handle = task::Builder::default()
.name("update_synced_rpcs")
.spawn(async move {
connections
.update_synced_rpcs(block_receiver, head_block_sender, pending_tx_id_sender)
.await
});
futures.push(handle);
}
if let Some(Err(e)) = futures.next().await {
return Err(e.into());
}
Ok(())
}
pub fn get_head_block_hash(&self) -> H256 {
@ -227,16 +265,18 @@ impl Web3Connections {
// we don't instrument here because we put a span inside the while loop
async fn update_synced_rpcs(
&self,
block_receiver: flume::Receiver<(Block<TxHash>, usize)>,
block_receiver: flume::Receiver<(Block<TxHash>, Arc<Web3Connection>)>,
head_block_sender: watch::Sender<Block<TxHash>>,
pending_tx_id_sender: flume::Sender<(TxHash, Arc<Web3Connection>)>,
) -> anyhow::Result<()> {
let total_rpcs = self.inner.len();
let mut connection_states: HashMap<usize, _> = HashMap::with_capacity(total_rpcs);
let mut connection_states: HashMap<Arc<Web3Connection>, _> =
HashMap::with_capacity(total_rpcs);
let mut pending_synced_connections = SyncedConnections::default();
while let Ok((new_block, rpc_id)) = block_receiver.recv_async().await {
while let Ok((new_block, rpc)) = block_receiver.recv_async().await {
// TODO: wth. how is this happening? need more logs
let new_block_num = match new_block.number {
Some(x) => x.as_u64(),
@ -250,14 +290,16 @@ impl Web3Connections {
// TODO: span with more in it?
// TODO: make sure i'm doing this span right
// TODO: show the actual rpc url?
let span = info_span!("block_receiver", rpc_id, new_block_num);
let span = info_span!("block_receiver", ?rpc, new_block_num);
// TODO: clippy lint to make sure we don't hold this across an awaited future
let _enter = span.enter();
if new_block_num == 0 {
warn!("rpc is still syncing");
}
connection_states.insert(rpc_id, (new_block_num, new_block_hash));
connection_states.insert(rpc.clone(), (new_block_num, new_block_hash));
// TODO: do something to update the synced blocks
match new_block_num.cmp(&pending_synced_connections.head_block_num) {
@ -268,7 +310,7 @@ impl Web3Connections {
info!("new head: {}", new_block_hash);
pending_synced_connections.inner.clear();
pending_synced_connections.inner.insert(rpc_id);
pending_synced_connections.inner.insert(rpc);
pending_synced_connections.head_block_num = new_block_num;
@ -283,16 +325,17 @@ impl Web3Connections {
// do not clear synced_connections.
// we just want to add this rpc to the end
// TODO: HashSet here? i think we get dupes if we don't
pending_synced_connections.inner.insert(rpc_id);
pending_synced_connections.inner.insert(rpc);
} else {
// same height, but different chain
// check connection_states to see which head block is more popular!
let mut rpc_ids_by_block: BTreeMap<H256, Vec<usize>> = BTreeMap::new();
let mut rpc_ids_by_block: BTreeMap<H256, Vec<Arc<Web3Connection>>> =
BTreeMap::new();
let mut counted_rpcs = 0;
for (rpc_id, (block_num, block_hash)) in connection_states.iter() {
for (rpc, (block_num, block_hash)) in connection_states.iter() {
if *block_num != new_block_num {
// this connection isn't synced. we don't care what hash it has
continue;
@ -304,7 +347,7 @@ impl Web3Connections {
.entry(*block_hash)
.or_insert_with(|| Vec::with_capacity(total_rpcs - 1));
count.push(*rpc_id);
count.push(rpc.clone());
}
let most_common_head_hash = *rpc_ids_by_block
@ -335,7 +378,7 @@ impl Web3Connections {
}
cmp::Ordering::Less => {
// this isn't the best block in the tier. don't do anything
if !pending_synced_connections.inner.remove(&rpc_id) {
if !pending_synced_connections.inner.remove(&rpc) {
// we didn't remove anything. nothing more to do
continue;
}
@ -373,7 +416,7 @@ impl Web3Connections {
pub async fn next_upstream_server(&self) -> Result<ActiveRequestHandle, Option<Duration>> {
let mut earliest_retry_after = None;
let mut synced_rpc_ids: Vec<usize> = self
let mut synced_rpcs: Vec<Arc<Web3Connection>> = self
.synced_connections
.load()
.inner
@ -381,21 +424,19 @@ impl Web3Connections {
.cloned()
.collect();
let sort_cache: HashMap<usize, (f32, u32)> = synced_rpc_ids
let sort_cache: HashMap<Arc<Web3Connection>, (f32, u32)> = synced_rpcs
.iter()
.map(|rpc_id| {
let rpc = self.inner.get(*rpc_id).unwrap();
.map(|rpc| {
let active_requests = rpc.active_requests();
let soft_limit = rpc.soft_limit();
let utilization = active_requests as f32 / soft_limit as f32;
(*rpc_id, (utilization, soft_limit))
(rpc.clone(), (utilization, soft_limit))
})
.collect();
synced_rpc_ids.sort_unstable_by(|a, b| {
synced_rpcs.sort_unstable_by(|a, b| {
let (a_utilization, a_soft_limit) = sort_cache.get(a).unwrap();
let (b_utilization, b_soft_limit) = sort_cache.get(b).unwrap();
@ -410,16 +451,14 @@ impl Web3Connections {
});
// now that the rpcs are sorted, try to get an active request handle for one of them
for rpc_id in synced_rpc_ids.into_iter() {
let rpc = self.inner.get(rpc_id).unwrap();
for rpc in synced_rpcs.into_iter() {
// increment our connection counter
match rpc.try_request_handle().await {
Err(retry_after) => {
earliest_retry_after = earliest_retry_after.min(Some(retry_after));
}
Ok(handle) => {
trace!("next server on {:?}: {:?}", self, rpc_id);
trace!("next server on {:?}: {:?}", self, rpc);
return Ok(handle);
}
}

@ -46,37 +46,40 @@ async fn handle_socket_payload(
Ok(payload) => {
let id = payload.id.clone();
let response: anyhow::Result<JsonRpcForwardedResponseEnum> = if payload.method
== "eth_subscribe"
{
let response = app.eth_subscribe(payload, response_tx.clone()).await;
let response: anyhow::Result<JsonRpcForwardedResponseEnum> = match &payload.method[..] {
"eth_subscribe" => {
let response = app.eth_subscribe(payload, response_tx.clone()).await;
match response {
Ok((handle, response)) => {
// TODO: better key
subscriptions.insert(response.result.as_ref().unwrap().to_string(), handle);
match response {
Ok((handle, response)) => {
// TODO: better key
subscriptions
.insert(response.result.as_ref().unwrap().to_string(), handle);
Ok(response.into())
Ok(response.into())
}
Err(err) => Err(err),
}
Err(err) => Err(err),
}
} else if payload.method == "eth_unsubscribe" {
let subscription_id = payload.params.unwrap().to_string();
"eth_unsubscribe" => {
let subscription_id = payload.params.unwrap().to_string();
let partial_response = match subscriptions.remove(&subscription_id) {
None => "false",
Some(handle) => {
handle.abort();
"true"
}
};
let partial_response = match subscriptions.remove(&subscription_id) {
None => "false",
Some(handle) => {
handle.abort();
"true"
}
};
let response =
JsonRpcForwardedResponse::from_string(partial_response.to_string(), id.clone());
let response = JsonRpcForwardedResponse::from_string(
partial_response.to_string(),
id.clone(),
);
Ok(response.into())
} else {
app.proxy_web3_rpc(payload.into()).await
Ok(response.into())
}
_ => app.proxy_web3_rpc(payload.into()).await,
};
(id, response)

@ -10,7 +10,6 @@ mod jsonrpc;
use parking_lot::deadlock;
use std::fs;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tokio::runtime;
@ -86,8 +85,6 @@ fn main() -> anyhow::Result<()> {
rt.block_on(async {
let app = rpc_config.try_build().await?;
let app: Arc<Web3ProxyApp> = Arc::new(app);
frontend::run(cli_config.port, app).await
})
}