From 2e559f3063b813fb208f9ef0e58b65c4188f9174 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 14 Jun 2022 04:04:14 +0000 Subject: [PATCH] transaction subscription getting closer --- Cargo.lock | 42 ++-- TODO.md | 11 +- web3-proxy/Cargo.toml | 12 +- web3-proxy/src/app.rs | 266 +++++++++++---------- web3-proxy/src/config.rs | 6 +- web3-proxy/src/connection.rs | 352 ++++++++++++++++++---------- web3-proxy/src/connections.rs | 169 ++++++++----- web3-proxy/src/frontend/ws_proxy.rs | 51 ++-- web3-proxy/src/main.rs | 3 - 9 files changed, 536 insertions(+), 376 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a973ffd5..473004f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -177,9 +177,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.5.6" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab2504b827a8bef941ba3dd64bdffe9cf56ca182908a147edd6189c95fbcae7d" +checksum = "dc47084705629d09d15060d70a8dbfce479c842303d05929ce29c74c995916ae" dependencies = [ "async-trait", "axum-core", @@ -211,9 +211,9 @@ dependencies = [ [[package]] name = "axum-core" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da31c0ed7b4690e2c78fe4b880d21cd7db04a346ebc658b4270251b695437f17" +checksum = "c2efed1c501becea07ce48118786ebcf229531d0d3b28edf224a720020d9e106" dependencies = [ "async-trait", "bytes", @@ -1407,9 +1407,9 @@ checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e" [[package]] name = "flume" -version = "0.10.12" +version = "0.10.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "843c03199d0c0ca54bc1ea90ac0d507274c28abcc4f691ae8b4eaa375087c76a" +checksum = "1ceeb589a3157cac0ab8cc585feb749bd2cea5cb55a6ee802ad72d9fd38303da" dependencies = [ "futures-core", "futures-sink", @@ -2722,9 +2722,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.10" +version = "0.11.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46a1f7aa4f35e5e8b4160449f51afc758f0ce6454315a9fa7d0d113e958c41eb" +checksum = "b75aa69a3f06bbcc66ede33af2af253c6f7a86b1ca0033f60c580a27074fbf92" dependencies = [ "base64 0.13.0", "bytes", @@ -2750,6 +2750,7 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-rustls", + "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", @@ -2857,9 +2858,9 @@ dependencies = [ [[package]] name = "rustls-pemfile" -version = "0.3.0" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ee86d63972a7c661d1536fefe8c3c8407321c3df668891286de28abcd087360" +checksum = "e7522c9de787ff061458fe9a829dc790a3f5b22dc571694fc5883f448b94d9a9" dependencies = [ "base64 0.13.0", ] @@ -3432,9 +3433,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.18.2" +version = "1.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4903bf0427cf68dddd5aa6a93220756f8be0c34fcfa9f5e6191e103e15a31395" +checksum = "c51a52ed6686dd62c320f9b89299e9dfb46f730c7a48e635c19f21d116cb1439" dependencies = [ "bytes", "libc", @@ -3475,14 +3476,14 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" +checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9" dependencies = [ "futures-core", "pin-project-lite", "tokio", - "tokio-util 0.6.10", + "tokio-util 0.7.2", ] [[package]] @@ -3549,7 +3550,6 @@ dependencies = [ "pin-project", "pin-project-lite", "tokio", - "tokio-util 0.7.2", "tower-layer", "tower-service", "tracing", @@ -3588,9 +3588,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.34" +version = "0.1.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09" +checksum = "a400e31aa60b9d44a52a8ee0343b5b18566b03a8321e0d321f695cf56e940160" dependencies = [ "cfg-if", "log", @@ -3612,11 +3612,11 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f" +checksum = "7709595b8878a4965ce5e87ebf880a7d39c9afc6837721b21a5a816a8117d921" dependencies = [ - "lazy_static", + "once_cell", "valuable", ] diff --git a/TODO.md b/TODO.md index c6d7f10f..ee1bcfca 100644 --- a/TODO.md +++ b/TODO.md @@ -1,10 +1,12 @@ # Todo -- [ ] support websocket clients +- [ ] if web3 proxy gets an http error back, retry another node +- [ ] refactor Connection::spawn. have it return a handle to the spawned future of it running with block and transaction subscriptions +- [ ] refactor Connections::spawn. have it return a handle that is selecting on those handles? +- [x] support websocket clients - we support websockets for the backends already, but we need them for the frontend too - - [ ] when block subscribers receive blocks, store them in a cache - - [ ] have a /ws endpoint (figure out how to route on / later) - - [ ] ws endpoint shouldn't be that different from the http endpoint + - [ ] when block subscribers receive blocks, store them in a cache. use this cache instead of querying eth_getBlock + - [x] have a /ws endpoint (figure out how to route on / later) - inspect any jsonrpc errors. if its something like "header not found" or "block with id $x not found" retry on another node (and add a negative score to that server) - this error seems to happen when we use load balanced rpcs - [x] use redis and redis-cell for rate limits @@ -41,6 +43,7 @@ - [ ] one proxy for mulitple chains? - [ ] zero downtime deploys - [ ] are we using Acquire/Release/AcqRel properly? or do we need other modes? +- [ ] subscription id should be per connection, not global - [x] simple proxy - [x] better locking. when lots of requests come in, we seem to be in the way of block updates - [x] load balance between multiple RPC servers diff --git a/web3-proxy/Cargo.toml b/web3-proxy/Cargo.toml index 042b5051..e177cca1 100644 --- a/web3-proxy/Cargo.toml +++ b/web3-proxy/Cargo.toml @@ -9,13 +9,13 @@ edition = "2021" anyhow = "1.0.57" arc-swap = "1.5.0" argh = "0.1.7" -axum = { version = "0.5.6", features = ["serde_json", "tokio-tungstenite", "ws"] } +axum = { version = "0.5.7", features = ["serde_json", "tokio-tungstenite", "ws"] } counter = "0.5.5" dashmap = "5.3.4" derive_more = "0.99.17" ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] } fdlimit = "0.2.1" -flume = "0.10.12" +flume = "0.10.13" futures = { version = "0.3.21", features = ["thread-pool"] } hashbrown = "0.12.1" linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] } @@ -25,15 +25,15 @@ parking_lot = { version = "0.12.1", features = ["deadlock_detection"] } proctitle = "0.1.1" # TODO: regex has several "perf" features that we might want to use regex = "1.5.6" -reqwest = { version = "0.11.10", default-features = false, features = ["json", "tokio-rustls"] } +reqwest = { version = "0.11.11", default-features = false, features = ["json", "tokio-rustls"] } rustc-hash = "1.1.0" serde = { version = "1.0.137", features = [] } serde_json = { version = "1.0.81", default-features = false, features = ["alloc", "raw_value"] } -tokio = { version = "1.18.2", features = ["full", "tracing"] } +tokio = { version = "1.19.2", features = ["full", "tracing"] } toml = "0.5.9" -tracing = "0.1.34" +tracing = "0.1.35" # TODO: tracing-subscriber has serde and serde_json features that we might want to use tracing-subscriber = { version = "0.3.11", features = ["env-filter", "parking_lot"] } url = "2.2.2" tower = "0.4.12" -tokio-stream = { version = "0.1.8", features = ["sync"] } +tokio-stream = { version = "0.1.9", features = ["sync"] } diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 59fed3ab..914cfc0a 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -6,6 +6,7 @@ use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; use axum::extract::ws::Message; use dashmap::DashMap; +use ethers::prelude::TransactionReceipt; use ethers::prelude::{Block, TxHash, H256}; use futures::future::Abortable; use futures::future::{join_all, AbortHandle}; @@ -18,10 +19,9 @@ use std::sync::atomic::{self, AtomicUsize}; use std::sync::Arc; use std::time::Duration; use tokio::sync::watch; -use tokio::task; use tokio::time::timeout; use tokio_stream::wrappers::WatchStream; -use tracing::{debug, info, instrument, trace, warn}; +use tracing::{debug, info, info_span, instrument, trace, warn, Instrument}; static APP_USER_AGENT: &str = concat!( "satoshiandkin/", @@ -52,6 +52,8 @@ pub struct Web3ProxyApp { response_cache: ResponseLrcCache, // don't drop this or the sender will stop working head_block_receiver: watch::Receiver>, + // TODO: i think we want a TxState enum for Confirmed(TxHash, BlockHash) or Pending(TxHash) or Orphan(TxHash, BlockHash) + pending_tx_receipt_receiver: flume::Receiver, next_subscription_id: AtomicUsize, } @@ -63,12 +65,12 @@ impl fmt::Debug for Web3ProxyApp { } impl Web3ProxyApp { - pub async fn try_new( + pub async fn spawn( chain_id: usize, redis_address: Option, balanced_rpcs: Vec, private_rpcs: Vec, - ) -> anyhow::Result { + ) -> anyhow::Result> { // make a http shared client // TODO: how should we configure the connection pool? // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server @@ -85,6 +87,7 @@ impl Web3ProxyApp { info!("Connecting to redis on {}", redis_address); let redis_client = redis_cell_client::Client::open(redis_address)?; + // TODO: r2d2 connection pool? let redis_conn = redis_client.get_multiplexed_tokio_connection().await?; Some(redis_conn) @@ -95,47 +98,52 @@ impl Web3ProxyApp { } }; + // TODO: subscribe to pending transactions on the private rpcs, too? + let (head_block_sender, head_block_receiver) = watch::channel(Block::default()); + let (pending_tx_receipt_sender, pending_tx_receipt_receiver) = flume::unbounded(); + // TODO: attach context to this error - let balanced_rpcs = Web3Connections::try_new( + let balanced_rpcs = Web3Connections::spawn( chain_id, balanced_rpcs, http_client.as_ref(), rate_limiter.as_ref(), + Some(head_block_sender), + Some(pending_tx_receipt_sender), ) .await?; - let (head_block_sender, head_block_receiver) = watch::channel(Block::default()); - - // TODO: do this separately instead of during try_new - { - let balanced_rpcs = balanced_rpcs.clone(); - task::spawn(async move { - balanced_rpcs.subscribe_all_heads(head_block_sender).await; - }); - } - - // TODO: attach context to this error let private_rpcs = if private_rpcs.is_empty() { warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); balanced_rpcs.clone() } else { - Web3Connections::try_new( + // TODO: attach context to this error + Web3Connections::spawn( chain_id, private_rpcs, http_client.as_ref(), rate_limiter.as_ref(), + // subscribing to new heads here won't work well + None, + // TODO: subscribe to pending transactions on the private rpcs, too? + None, ) .await? }; - Ok(Web3ProxyApp { + let app = Web3ProxyApp { balanced_rpcs, private_rpcs, incoming_requests: Default::default(), response_cache: Default::default(), head_block_receiver, + pending_tx_receipt_receiver, next_subscription_id: 1.into(), - }) + }; + + let app = Arc::new(app); + + Ok(app) } pub async fn eth_subscribe( @@ -146,57 +154,60 @@ impl Web3ProxyApp { ) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> { let (subscription_handle, subscription_registration) = AbortHandle::new_pair(); + // TODO: this only needs to be unique per connection. we don't need it globably unique let subscription_id = self .next_subscription_id .fetch_add(1, atomic::Ordering::SeqCst); - let subscription_id = format!("{:#x}", subscription_id); // save the id so we can use it in the response let id = payload.id.clone(); - let f = { + let subscription_future = { let subscription_id = subscription_id.clone(); - let params = payload.params.as_deref().unwrap().get(); + match payload.params.as_deref().unwrap().get() { + r#"["newHeads"]"# => { + let head_block_receiver = self.head_block_receiver.clone(); - if params == r#"["newHeads"]"# { - let head_block_receiver = self.head_block_receiver.clone(); + trace!(?subscription_id, "new heads subscription"); + async move { + let mut head_block_receiver = Abortable::new( + WatchStream::new(head_block_receiver), + subscription_registration, + ); - trace!(?subscription_id, "new heads subscription"); - async move { - let mut head_block_receiver = Abortable::new( - WatchStream::new(head_block_receiver), - subscription_registration, - ); + while let Some(new_head) = head_block_receiver.next().await { + // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id + let msg = json!({ + "jsonrpc": "2.0", + "method":"eth_subscription", + "params": { + "subscription": subscription_id, + "result": new_head, + }, + }); - while let Some(new_head) = head_block_receiver.next().await { - // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id - let msg = json!({ - "jsonrpc": "2.0", - "method":"eth_subscription", - "params": { - "subscription": subscription_id, - "result": new_head, - }, - }); + let msg = Message::Text(serde_json::to_string(&msg).unwrap()); - let msg = Message::Text(serde_json::to_string(&msg).unwrap()); + if subscription_tx.send_async(msg).await.is_err() { + // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? + break; + }; + } - if subscription_tx.send_async(msg).await.is_err() { - // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? - break; - }; + trace!(?subscription_id, "closed new heads subscription"); } - - trace!(?subscription_id, "closed new heads subscription"); } - } else { - return Err(anyhow::anyhow!("unimplemented")); + r#"["pendingTransactions"]"# => { + unimplemented!("pendingTransactions") + } + _ => return Err(anyhow::anyhow!("unimplemented")), } }; - tokio::spawn(f); + // TODO: what should we do with this handle? i think its fine to just drop it + tokio::spawn(subscription_future); let response = JsonRpcForwardedResponse::from_string(subscription_id, id); @@ -319,89 +330,92 @@ impl Web3ProxyApp { // TODO: how much should we retry? probably with a timeout and not with a count like this // TODO: think more about this loop. - // // TODO: add more to this span - // let span = info_span!("i", ?i); + // // TODO: add more to this span such as + let span = info_span!("rpc_request"); // let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon) - if request.method == "eth_sendRawTransaction" { - // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs - // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit - return self - .private_rpcs - .try_send_all_upstream_servers(request) - .await; - } else { - // this is not a private transaction (or no private relays are configured) - - let (cache_key, response_cache) = match self.get_cached_response(&request) { - (cache_key, Ok(response)) => { - let _ = self.incoming_requests.remove(&cache_key); - - return Ok(response); - } - (cache_key, Err(response_cache)) => (cache_key, response_cache), - }; - - // check if this request is already in flight - // TODO: move this logic into an IncomingRequestHandler (ActiveRequestHandler has an rpc, but this won't) - let (incoming_tx, incoming_rx) = watch::channel(true); - let mut other_incoming_rx = None; - match self.incoming_requests.entry(cache_key.clone()) { - dashmap::mapref::entry::Entry::Occupied(entry) => { - other_incoming_rx = Some(entry.get().clone()); - } - dashmap::mapref::entry::Entry::Vacant(entry) => { - entry.insert(incoming_rx); - } + match &request.method[..] { + "eth_sendRawTransaction" => { + // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs + // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit + self.private_rpcs + .try_send_all_upstream_servers(request) + .instrument(span) + .await } + _ => { + // this is not a private transaction (or no private relays are configured) - if let Some(mut other_incoming_rx) = other_incoming_rx { - // wait for the other request to finish. it might have finished successfully or with an error - trace!("{:?} waiting on in-flight request", request); + let (cache_key, response_cache) = match self.get_cached_response(&request) { + (cache_key, Ok(response)) => { + let _ = self.incoming_requests.remove(&cache_key); - let _ = other_incoming_rx.changed().await; + return Ok(response); + } + (cache_key, Err(response_cache)) => (cache_key, response_cache), + }; - // now that we've waited, lets check the cache again - if let Some(cached) = response_cache.read().get(&cache_key) { - let _ = self.incoming_requests.remove(&cache_key); - let _ = incoming_tx.send(false); - - // TODO: emit a stat - trace!( - "{:?} cache hit after waiting for in-flight request!", - request - ); - - return Ok(cached.to_owned()); - } else { - // TODO: emit a stat - trace!( - "{:?} cache miss after waiting for in-flight request!", - request - ); + // check if this request is already in flight + // TODO: move this logic into an IncomingRequestHandler (ActiveRequestHandler has an rpc, but this won't) + let (incoming_tx, incoming_rx) = watch::channel(true); + let mut other_incoming_rx = None; + match self.incoming_requests.entry(cache_key.clone()) { + dashmap::mapref::entry::Entry::Occupied(entry) => { + other_incoming_rx = Some(entry.get().clone()); + } + dashmap::mapref::entry::Entry::Vacant(entry) => { + entry.insert(incoming_rx); + } } + + if let Some(mut other_incoming_rx) = other_incoming_rx { + // wait for the other request to finish. it might have finished successfully or with an error + trace!("{:?} waiting on in-flight request", request); + + let _ = other_incoming_rx.changed().await; + + // now that we've waited, lets check the cache again + if let Some(cached) = response_cache.read().get(&cache_key) { + let _ = self.incoming_requests.remove(&cache_key); + let _ = incoming_tx.send(false); + + // TODO: emit a stat + trace!( + "{:?} cache hit after waiting for in-flight request!", + request + ); + + return Ok(cached.to_owned()); + } else { + // TODO: emit a stat + trace!( + "{:?} cache miss after waiting for in-flight request!", + request + ); + } + } + + let response = self + .balanced_rpcs + .try_send_best_upstream_server(request) + .await?; + + // TODO: small race condidition here. parallel requests with the same query will both be saved to the cache + let mut response_cache = response_cache.write(); + + // TODO: cache the warp::reply to save us serializing every time + response_cache.insert(cache_key.clone(), response.clone()); + if response_cache.len() >= RESPONSE_CACHE_CAP { + // TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block + response_cache.pop_front(); + } + + drop(response_cache); + + let _ = self.incoming_requests.remove(&cache_key); + let _ = incoming_tx.send(false); + + Ok(response) } - - let response = self - .balanced_rpcs - .try_send_best_upstream_server(request) - .await?; - - // TODO: small race condidition here. parallel requests with the same query will both be saved to the cache - let mut response_cache = response_cache.write(); - - // TODO: cache the warp::reply to save us serializing every time - response_cache.insert(cache_key.clone(), response.clone()); - if response_cache.len() >= RESPONSE_CACHE_CAP { - // TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block - response_cache.pop_front(); - } - - drop(response_cache); - - let _ = self.incoming_requests.remove(&cache_key); - let _ = incoming_tx.send(false); - - Ok(response) } } } diff --git a/web3-proxy/src/config.rs b/web3-proxy/src/config.rs index e08be760..4bb0e98f 100644 --- a/web3-proxy/src/config.rs +++ b/web3-proxy/src/config.rs @@ -47,7 +47,7 @@ pub struct Web3ConnectionConfig { impl RpcConfig { /// Create a Web3ProxyApp from config // #[instrument(name = "try_build_RpcConfig", skip_all)] - pub async fn try_build(self) -> anyhow::Result { + pub async fn try_build(self) -> anyhow::Result> { let balanced_rpcs = self.balanced_rpcs.into_values().collect(); let private_rpcs = if let Some(private_rpcs) = self.private_rpcs { @@ -56,7 +56,7 @@ impl RpcConfig { vec![] }; - Web3ProxyApp::try_new( + Web3ProxyApp::spawn( self.shared.chain_id, self.shared.rate_limit_redis, balanced_rpcs, @@ -77,7 +77,7 @@ impl Web3ConnectionConfig { ) -> anyhow::Result> { let hard_rate_limit = self.hard_limit.map(|x| (x, redis_conn.unwrap())); - Web3Connection::try_new( + Web3Connection::spawn( chain_id, self.url, http_client, diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index c5ebed5a..5bfefc32 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -6,6 +6,7 @@ use redis_cell_client::RedisCellClient; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use std::fmt; +use std::hash::{Hash, Hasher}; use std::sync::atomic::{self, AtomicU32}; use std::{cmp::Ordering, sync::Arc}; use tokio::sync::RwLock; @@ -115,33 +116,9 @@ impl fmt::Display for Web3Connection { } impl Web3Connection { - #[instrument(skip_all)] - pub async fn reconnect( - self: &Arc, - block_sender: &flume::Sender<(Block, usize)>, - rpc_id: usize, - ) -> anyhow::Result<()> { - // websocket doesn't need the http client - let http_client = None; - - // since this lock is held open over an await, we use tokio's locking - let mut provider = self.provider.write().await; - - *provider = None; - - // tell the block subscriber that we are at 0 - block_sender.send_async((Block::default(), rpc_id)).await?; - - let new_provider = Web3Provider::from_str(&self.url, http_client).await?; - - *provider = Some(Arc::new(new_provider)); - - Ok(()) - } - - /// Connect to a web3 rpc and subscribe to new heads - #[instrument(name = "try_new_Web3Connection", skip(hard_limit, http_client))] - pub async fn try_new( + /// Connect to a web3 rpc + #[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))] + pub async fn spawn( chain_id: usize, url_str: String, // optional because this is only used for http providers. websocket providers don't use it @@ -176,9 +153,10 @@ impl Web3Connection { // check the server's chain_id here // TODO: move this outside the `new` function and into a `start` function or something - let active_request_handle = connection.wait_for_request_handle().await; // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error - let found_chain_id: Result = active_request_handle + let found_chain_id: Result = connection + .wait_for_request_handle() + .await .request("eth_chainId", Option::None::<()>) .await; @@ -206,6 +184,31 @@ impl Web3Connection { Ok(connection) } + #[instrument(skip_all)] + pub async fn reconnect( + self: &Arc, + block_sender: &flume::Sender<(Block, Arc)>, + ) -> anyhow::Result<()> { + // websocket doesn't need the http client + let http_client = None; + + // since this lock is held open over an await, we use tokio's locking + let mut provider = self.provider.write().await; + + *provider = None; + + // tell the block subscriber that we are at 0 + block_sender + .send_async((Block::default(), self.clone())) + .await?; + + let new_provider = Web3Provider::from_str(&self.url, http_client).await?; + + *provider = Some(Arc::new(new_provider)); + + Ok(()) + } + #[inline] pub fn active_requests(&self) -> u32 { self.active_requests.load(atomic::Ordering::Acquire) @@ -225,13 +228,12 @@ impl Web3Connection { async fn send_block( self: &Arc, block: Result, ProviderError>, - block_sender: &flume::Sender<(Block, usize)>, - rpc_id: usize, + block_sender: &flume::Sender<(Block, Arc)>, ) -> anyhow::Result<()> { match block { Ok(block) => { // TODO: i'm pretty sure we don't need send_async, but double check - block_sender.send_async((block, rpc_id)).await?; + block_sender.send_async((block, self.clone())).await?; } Err(e) => { warn!("unable to get block from {}: {}", self, e); @@ -241,120 +243,216 @@ impl Web3Connection { Ok(()) } - /// Subscribe to new blocks. If `reconnect` is true, this runs forever. - /// TODO: instrument with the url - #[instrument(skip_all)] - pub async fn subscribe_new_heads( + pub async fn subscribe( self: Arc, - rpc_id: usize, - block_sender: flume::Sender<(Block, usize)>, + block_sender: flume::Sender<(Block, Arc)>, + tx_id_sender: flume::Sender<(TxHash, Arc)>, reconnect: bool, ) -> anyhow::Result<()> { loop { - info!("Watching new_heads on {}", self); + // TODO: make these abortable so that if one fails the other can be cancelled? - // TODO: is a RwLock of an Option the right thing here? - if let Some(provider) = self.provider.read().await.clone() { - match &*provider { - Web3Provider::Http(provider) => { - // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints - // TODO: what should this interval be? probably some fraction of block time. set automatically? - // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now - // TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though - let mut interval = interval(Duration::from_secs(2)); - interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + let new_heads = { + let clone = self.clone(); + let block_sender = block_sender.clone(); - let mut last_hash = Default::default(); + clone.subscribe_new_heads(block_sender) + }; - loop { - // wait for the interval - // TODO: if error or rate limit, increase interval? - interval.tick().await; + let pending_txs = { + let clone = self.clone(); + let tx_id_sender = tx_id_sender.clone(); - match self.try_request_handle().await { - Ok(active_request_handle) => { - // TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest" - let block: Result, _> = provider - .request("eth_getBlockByNumber", ("latest", false)) - .await; + clone.subscribe_pending_transactions(tx_id_sender) + }; - drop(active_request_handle); - - // don't send repeat blocks - if let Ok(block) = &block { - let new_hash = block.hash.unwrap(); - - if new_hash == last_hash { - continue; - } - - last_hash = new_hash; - } - - self.send_block(block, &block_sender, rpc_id).await?; - } - Err(e) => { - warn!("Failed getting latest block from {}: {:?}", self, e); - } - } - } - } - Web3Provider::Ws(provider) => { - // rate limits - let active_request_handle = self.wait_for_request_handle().await; - - // TODO: automatically reconnect? - // TODO: it would be faster to get the block number, but subscriptions don't provide that - // TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out? - let mut stream = provider.subscribe_blocks().await?; - - drop(active_request_handle); - let active_request_handle = self.wait_for_request_handle().await; - - // query the block once since the subscription doesn't send the current block - // there is a very small race condition here where the stream could send us a new block right now - // all it does is print "new block" for the same block as current block - // TODO: rate limit! - let block: Result, _> = active_request_handle - .request("eth_getBlockByNumber", ("latest", false)) - .await; - - self.send_block(block, &block_sender, rpc_id).await?; - - // TODO: should the stream have a timeout on it here? - // TODO: although reconnects will make this less of an issue - loop { - match stream.next().await { - Some(new_block) => { - self.send_block(Ok(new_block), &block_sender, rpc_id) - .await?; - - // TODO: really not sure about this - task::yield_now().await; - } - None => { - warn!("subscription ended"); - break; - } - } - } - } + tokio::select! { + _ = new_heads => { + info!(?self, "new heads subscription completed"); + } + _ = pending_txs => { + info!(?self, "pending transactions subscription completed"); } } if reconnect { // TODO: exponential backoff - warn!("new heads subscription exited. Attempting to reconnect in 1 second..."); + // TODO: share code with new heads subscription + warn!("pending transactions subscription exited. Attempting to reconnect in 1 second..."); sleep(Duration::from_secs(1)).await; // TODO: loop on reconnecting! do not return with a "?" here - self.reconnect(&block_sender, rpc_id).await?; + // TODO: this isn't going to work. it will get in a loop with newHeads + self.reconnect(&block_sender).await?; } else { break; } } - info!("Done watching new_heads on {}", self); + Ok(()) + } + + /// Subscribe to new blocks. If `reconnect` is true, this runs forever. + /// TODO: instrument with the url + #[instrument(skip_all)] + async fn subscribe_new_heads( + self: Arc, + block_sender: flume::Sender<(Block, Arc)>, + ) -> anyhow::Result<()> { + info!("Watching new_heads on {}", self); + + // TODO: is a RwLock of an Option the right thing here? + if let Some(provider) = self.provider.read().await.clone() { + match &*provider { + Web3Provider::Http(_provider) => { + // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints + // TODO: what should this interval be? probably some fraction of block time. set automatically? + // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now + // TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though + let mut interval = interval(Duration::from_secs(2)); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + let mut last_hash = Default::default(); + + loop { + // wait for the interval + // TODO: if error or rate limit, increase interval? + interval.tick().await; + + match self.try_request_handle().await { + Ok(active_request_handle) => { + // TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest" + let block: Result, _> = active_request_handle + .request("eth_getBlockByNumber", ("latest", false)) + .await; + + // don't send repeat blocks + if let Ok(block) = &block { + let new_hash = block.hash.unwrap(); + + if new_hash == last_hash { + continue; + } + + last_hash = new_hash; + } + + self.send_block(block, &block_sender).await?; + } + Err(e) => { + warn!("Failed getting latest block from {}: {:?}", self, e); + } + } + } + } + Web3Provider::Ws(provider) => { + let active_request_handle = self.wait_for_request_handle().await; + let mut stream = provider.subscribe_blocks().await?; + drop(active_request_handle); + + // query the block once since the subscription doesn't send the current block + // there is a very small race condition here where the stream could send us a new block right now + // all it does is print "new block" for the same block as current block + // TODO: subscribe to Block instead? + let block: Result, _> = self + .wait_for_request_handle() + .await + .request("eth_getBlockByNumber", ("latest", false)) + .await; + + self.send_block(block, &block_sender).await?; + + // TODO: should the stream have a timeout on it here? + // TODO: although reconnects will make this less of an issue + loop { + match stream.next().await { + Some(new_block) => { + self.send_block(Ok(new_block), &block_sender).await?; + + // TODO: really not sure about this + task::yield_now().await; + } + None => { + warn!("subscription ended"); + break; + } + } + } + } + } + } + + Ok(()) + } + + #[instrument(skip_all)] + async fn subscribe_pending_transactions( + self: Arc, + tx_id_sender: flume::Sender<(TxHash, Arc)>, + ) -> anyhow::Result<()> { + info!("watching pending transactions on {}", self); + + // TODO: is a RwLock of an Option the right thing here? + if let Some(provider) = self.provider.read().await.clone() { + match &*provider { + Web3Provider::Http(_provider) => { + // there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints + // TODO: what should this interval be? probably automatically set to some fraction of block time + // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now + // TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though + let mut interval = interval(Duration::from_secs(2)); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + // TODO: create a filter + + loop { + // wait for the interval + // TODO: if error or rate limit, increase interval? + interval.tick().await; + + match self.try_request_handle().await { + Ok(active_request_handle) => { + // TODO: check the filter + unimplemented!("actually send a request"); + } + Err(e) => { + warn!("Failed getting latest block from {}: {:?}", self, e); + } + } + } + } + Web3Provider::Ws(provider) => { + // rate limits + let active_request_handle = self.wait_for_request_handle().await; + + // TODO: automatically reconnect? + // TODO: it would be faster to get the block number, but subscriptions don't provide that + // TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out? + let mut stream = provider.subscribe_pending_txs().await?; + + drop(active_request_handle); + + // TODO: query existing pending txs? + + // TODO: should the stream have a timeout on it here? + // TODO: although reconnects will make this less of an issue + loop { + match stream.next().await { + Some(pending_tx_id) => { + tx_id_sender + .send_async((pending_tx_id, self.clone())) + .await?; + } + None => { + warn!("subscription ended"); + break; + } + } + } + } + } + } + Ok(()) } @@ -404,6 +502,12 @@ impl Web3Connection { } } +impl Hash for Web3Connection { + fn hash(&self, state: &mut H) { + self.url.hash(state); + } +} + /// Drop this once a connection completes pub struct ActiveRequestHandle(Arc); diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index d13a2a1f..633acc7e 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -2,8 +2,7 @@ use arc_swap::ArcSwap; use counter::Counter; use derive_more::From; -use ethers::prelude::{Block, ProviderError, TxHash, H256}; -use futures::future::join_all; +use ethers::prelude::{Block, ProviderError, TransactionReceipt, TxHash, H256}; use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::HashMap; @@ -30,7 +29,9 @@ use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; struct SyncedConnections { head_block_num: u64, head_block_hash: H256, - inner: BTreeSet, + // TODO: this should be able to serialize, but it isn't + #[serde(skip_serializing)] + inner: BTreeSet>, } impl fmt::Debug for SyncedConnections { @@ -78,12 +79,14 @@ impl fmt::Debug for Web3Connections { } impl Web3Connections { - // #[instrument(name = "try_new_Web3Connections", skip_all)] - pub async fn try_new( + // #[instrument(name = "spawn_Web3Connections", skip_all)] + pub async fn spawn( chain_id: usize, servers: Vec, http_client: Option<&reqwest::Client>, rate_limiter: Option<&redis_cell_client::MultiplexedConnection>, + head_block_sender: Option>>, + pending_tx_receipt_sender: Option>, ) -> anyhow::Result> { let num_connections = servers.len(); @@ -111,55 +114,90 @@ impl Web3Connections { synced_connections: ArcSwap::new(Arc::new(synced_connections)), }); + let handle = { + let connections = connections.clone(); + + tokio::spawn(async move { + connections + .subscribe(head_block_sender, pending_tx_receipt_sender) + .await + }) + }; + Ok(connections) } - pub async fn subscribe_all_heads( - self: &Arc, - head_block_sender: watch::Sender>, - ) { - // TODO: benchmark bounded vs unbounded - let (block_sender, block_receiver) = flume::unbounded::<(Block, usize)>(); + /// subscribe to all the backend rpcs + async fn subscribe( + self: Arc, + head_block_sender: Option>>, + pending_tx_receipt_sender: Option>, + ) -> anyhow::Result<()> { + let mut futures = FuturesUnordered::new(); - let mut handles = vec![]; + // subscribe to pending transactions + let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); + let (block_sender, block_receiver) = flume::unbounded(); - for (rpc_id, connection) in self.inner.iter().enumerate() { - // subscribe to new heads in a spawned future - // TODO: channel instead. then we can have one future with write access to a left-right? - let connection = Arc::clone(connection); + // one future subscribes to pendingTransactions on all the rpcs. it sends them through the funnel + // TODO: do this only when someone is subscribed. otherwise this will be way too many queries + for (rpc_id, connection) in self.inner.iter().cloned().enumerate() { + let pending_tx_id_sender = pending_tx_id_sender.clone(); let block_sender = block_sender.clone(); - // let url = connection.url().to_string(); - - let handle = task::Builder::default() - .name("subscribe_new_heads") - .spawn(async move { - // loop to automatically reconnect - // TODO: make this cancellable? - // TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date - // TODO: proper span - connection - .subscribe_new_heads(rpc_id, block_sender.clone(), true) - .instrument(tracing::info_span!("url")) - .await - }); - - handles.push(handle); - } - - let connections = Arc::clone(self); - let handle = task::Builder::default() - .name("update_synced_rpcs") - .spawn(async move { - connections - .update_synced_rpcs(block_receiver, head_block_sender) + let handle = tokio::spawn(async move { + // loop to automatically reconnect + // TODO: make this cancellable? + // TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date + // TODO: proper span + connection + .subscribe(block_sender, pending_tx_id_sender, true) + .instrument(tracing::info_span!("rpc", ?rpc_id)) .await }); - handles.push(handle); + futures.push(handle); + } - // TODO: do something with join_all's result - join_all(handles).await; + // the next future subscribes to the transaction funnel + // it skips any duplicates (unless they are being orphaned) + // fetches new transactions from the notifying rpc + // forwards new transacitons to pending_tx_receipt_sender + { + // TODO: do something with the handle so we can catch any errors + let handle = task::spawn(async move { + while let Ok((pending_transaction_id, rpc)) = + pending_tx_id_receiver.recv_async().await + { + unimplemented!("de-dedup the pending txid") + } + + Ok(()) + }); + + futures.push(handle); + } + + // the next future subscribes to the block funnel + + if let Some(head_block_sender) = head_block_sender { + let connections = Arc::clone(&self); + let handle = task::Builder::default() + .name("update_synced_rpcs") + .spawn(async move { + connections + .update_synced_rpcs(block_receiver, head_block_sender, pending_tx_id_sender) + .await + }); + + futures.push(handle); + } + + if let Some(Err(e)) = futures.next().await { + return Err(e.into()); + } + + Ok(()) } pub fn get_head_block_hash(&self) -> H256 { @@ -227,16 +265,18 @@ impl Web3Connections { // we don't instrument here because we put a span inside the while loop async fn update_synced_rpcs( &self, - block_receiver: flume::Receiver<(Block, usize)>, + block_receiver: flume::Receiver<(Block, Arc)>, head_block_sender: watch::Sender>, + pending_tx_id_sender: flume::Sender<(TxHash, Arc)>, ) -> anyhow::Result<()> { let total_rpcs = self.inner.len(); - let mut connection_states: HashMap = HashMap::with_capacity(total_rpcs); + let mut connection_states: HashMap, _> = + HashMap::with_capacity(total_rpcs); let mut pending_synced_connections = SyncedConnections::default(); - while let Ok((new_block, rpc_id)) = block_receiver.recv_async().await { + while let Ok((new_block, rpc)) = block_receiver.recv_async().await { // TODO: wth. how is this happening? need more logs let new_block_num = match new_block.number { Some(x) => x.as_u64(), @@ -250,14 +290,16 @@ impl Web3Connections { // TODO: span with more in it? // TODO: make sure i'm doing this span right // TODO: show the actual rpc url? - let span = info_span!("block_receiver", rpc_id, new_block_num); + let span = info_span!("block_receiver", ?rpc, new_block_num); + + // TODO: clippy lint to make sure we don't hold this across an awaited future let _enter = span.enter(); if new_block_num == 0 { warn!("rpc is still syncing"); } - connection_states.insert(rpc_id, (new_block_num, new_block_hash)); + connection_states.insert(rpc.clone(), (new_block_num, new_block_hash)); // TODO: do something to update the synced blocks match new_block_num.cmp(&pending_synced_connections.head_block_num) { @@ -268,7 +310,7 @@ impl Web3Connections { info!("new head: {}", new_block_hash); pending_synced_connections.inner.clear(); - pending_synced_connections.inner.insert(rpc_id); + pending_synced_connections.inner.insert(rpc); pending_synced_connections.head_block_num = new_block_num; @@ -283,16 +325,17 @@ impl Web3Connections { // do not clear synced_connections. // we just want to add this rpc to the end // TODO: HashSet here? i think we get dupes if we don't - pending_synced_connections.inner.insert(rpc_id); + pending_synced_connections.inner.insert(rpc); } else { // same height, but different chain // check connection_states to see which head block is more popular! - let mut rpc_ids_by_block: BTreeMap> = BTreeMap::new(); + let mut rpc_ids_by_block: BTreeMap>> = + BTreeMap::new(); let mut counted_rpcs = 0; - for (rpc_id, (block_num, block_hash)) in connection_states.iter() { + for (rpc, (block_num, block_hash)) in connection_states.iter() { if *block_num != new_block_num { // this connection isn't synced. we don't care what hash it has continue; @@ -304,7 +347,7 @@ impl Web3Connections { .entry(*block_hash) .or_insert_with(|| Vec::with_capacity(total_rpcs - 1)); - count.push(*rpc_id); + count.push(rpc.clone()); } let most_common_head_hash = *rpc_ids_by_block @@ -335,7 +378,7 @@ impl Web3Connections { } cmp::Ordering::Less => { // this isn't the best block in the tier. don't do anything - if !pending_synced_connections.inner.remove(&rpc_id) { + if !pending_synced_connections.inner.remove(&rpc) { // we didn't remove anything. nothing more to do continue; } @@ -373,7 +416,7 @@ impl Web3Connections { pub async fn next_upstream_server(&self) -> Result> { let mut earliest_retry_after = None; - let mut synced_rpc_ids: Vec = self + let mut synced_rpcs: Vec> = self .synced_connections .load() .inner @@ -381,21 +424,19 @@ impl Web3Connections { .cloned() .collect(); - let sort_cache: HashMap = synced_rpc_ids + let sort_cache: HashMap, (f32, u32)> = synced_rpcs .iter() - .map(|rpc_id| { - let rpc = self.inner.get(*rpc_id).unwrap(); - + .map(|rpc| { let active_requests = rpc.active_requests(); let soft_limit = rpc.soft_limit(); let utilization = active_requests as f32 / soft_limit as f32; - (*rpc_id, (utilization, soft_limit)) + (rpc.clone(), (utilization, soft_limit)) }) .collect(); - synced_rpc_ids.sort_unstable_by(|a, b| { + synced_rpcs.sort_unstable_by(|a, b| { let (a_utilization, a_soft_limit) = sort_cache.get(a).unwrap(); let (b_utilization, b_soft_limit) = sort_cache.get(b).unwrap(); @@ -410,16 +451,14 @@ impl Web3Connections { }); // now that the rpcs are sorted, try to get an active request handle for one of them - for rpc_id in synced_rpc_ids.into_iter() { - let rpc = self.inner.get(rpc_id).unwrap(); - + for rpc in synced_rpcs.into_iter() { // increment our connection counter match rpc.try_request_handle().await { Err(retry_after) => { earliest_retry_after = earliest_retry_after.min(Some(retry_after)); } Ok(handle) => { - trace!("next server on {:?}: {:?}", self, rpc_id); + trace!("next server on {:?}: {:?}", self, rpc); return Ok(handle); } } diff --git a/web3-proxy/src/frontend/ws_proxy.rs b/web3-proxy/src/frontend/ws_proxy.rs index 6d2eb356..59e925d3 100644 --- a/web3-proxy/src/frontend/ws_proxy.rs +++ b/web3-proxy/src/frontend/ws_proxy.rs @@ -46,37 +46,40 @@ async fn handle_socket_payload( Ok(payload) => { let id = payload.id.clone(); - let response: anyhow::Result = if payload.method - == "eth_subscribe" - { - let response = app.eth_subscribe(payload, response_tx.clone()).await; + let response: anyhow::Result = match &payload.method[..] { + "eth_subscribe" => { + let response = app.eth_subscribe(payload, response_tx.clone()).await; - match response { - Ok((handle, response)) => { - // TODO: better key - subscriptions.insert(response.result.as_ref().unwrap().to_string(), handle); + match response { + Ok((handle, response)) => { + // TODO: better key + subscriptions + .insert(response.result.as_ref().unwrap().to_string(), handle); - Ok(response.into()) + Ok(response.into()) + } + Err(err) => Err(err), } - Err(err) => Err(err), } - } else if payload.method == "eth_unsubscribe" { - let subscription_id = payload.params.unwrap().to_string(); + "eth_unsubscribe" => { + let subscription_id = payload.params.unwrap().to_string(); - let partial_response = match subscriptions.remove(&subscription_id) { - None => "false", - Some(handle) => { - handle.abort(); - "true" - } - }; + let partial_response = match subscriptions.remove(&subscription_id) { + None => "false", + Some(handle) => { + handle.abort(); + "true" + } + }; - let response = - JsonRpcForwardedResponse::from_string(partial_response.to_string(), id.clone()); + let response = JsonRpcForwardedResponse::from_string( + partial_response.to_string(), + id.clone(), + ); - Ok(response.into()) - } else { - app.proxy_web3_rpc(payload.into()).await + Ok(response.into()) + } + _ => app.proxy_web3_rpc(payload.into()).await, }; (id, response) diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index 082f8cbc..c65a6f2c 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -10,7 +10,6 @@ mod jsonrpc; use parking_lot::deadlock; use std::fs; use std::sync::atomic::{self, AtomicUsize}; -use std::sync::Arc; use std::thread; use std::time::Duration; use tokio::runtime; @@ -86,8 +85,6 @@ fn main() -> anyhow::Result<()> { rt.block_on(async { let app = rpc_config.try_build().await?; - let app: Arc = Arc::new(app); - frontend::run(cli_config.port, app).await }) }