From d961aa647d77f8417e847af7a2c09317446a36a8 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sun, 15 May 2022 19:28:22 +0000 Subject: [PATCH] small refactor --- Cargo.lock | 35 +++++++++ web3-proxy/Cargo.toml | 1 + web3-proxy/src/app.rs | 90 ++++++++++------------ web3-proxy/src/connection.rs | 13 ++++ web3-proxy/src/connections.rs | 139 ++++++++++++++++++++++------------ 5 files changed, 179 insertions(+), 99 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1b4cc439..c11e3906 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1425,6 +1425,19 @@ dependencies = [ "byteorder", ] +[[package]] +name = "generator" +version = "0.6.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "061d3be1afec479d56fa3bd182bf966c7999ec175fcfdb87ac14d417241366c6" +dependencies = [ + "cc", + "libc", + "log", + "rustversion", + "winapi", +] + [[package]] name = "generic-array" version = "0.12.4" @@ -1886,6 +1899,16 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "left-right" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2033c37d2bc68719774fa095ccdd46a14b846e9c52ec54107eda92fbf966b203" +dependencies = [ + "loom", + "slab", +] + [[package]] name = "libc" version = "0.2.125" @@ -1928,6 +1951,17 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "loom" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27a6650b2f722ae8c0e2ebc46d07f80c9923464fc206d962332f1eff83143530" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", +] + [[package]] name = "mach" version = "0.3.2" @@ -3863,6 +3897,7 @@ dependencies = [ "futures", "governor", "hashbrown 0.12.1", + "left-right", "linkedhashmap", "parking_lot 0.12.0", "proctitle", diff --git a/web3-proxy/Cargo.toml b/web3-proxy/Cargo.toml index bf7d034f..780ee16b 100644 --- a/web3-proxy/Cargo.toml +++ b/web3-proxy/Cargo.toml @@ -15,6 +15,7 @@ flume = "0.10.12" futures = { version = "0.3.21", features = ["thread-pool"] } governor = { version = "0.4.2", features = ["dashmap", "std"] } hashbrown = "0.12.1" +left-right = "0.11.4" linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] } parking_lot = "0.12.0" proctitle = "0.1.1" diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 5cdc58ef..403ff32f 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -5,14 +5,12 @@ use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; -use ethers::prelude::ProviderError; -use ethers::prelude::{HttpClientError, WsClientError}; +use ethers::prelude::{HttpClientError, ProviderError, WsClientError}; use futures::future::join_all; use governor::clock::{Clock, QuantaClock}; -use linkedhashmap::LinkedHashMap; -use parking_lot::RwLock; +// use linkedhashmap::LinkedHashMap; +// use parking_lot::RwLock; use std::fmt; -use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::time::Duration; use tokio::time::sleep; @@ -25,12 +23,12 @@ static APP_USER_AGENT: &str = concat!( env!("CARGO_PKG_VERSION"), ); -// TODO: put this in config? what size should we do? -const RESPONSE_CACHE_CAP: usize = 1024; +// // TODO: put this in config? what size should we do? +// const RESPONSE_CACHE_CAP: usize = 1024; -/// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work -type ResponseLruCache = - RwLock), JsonRpcForwardedResponse>>; +// /// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work +// type ResponseLruCache = +// RwLock), JsonRpcForwardedResponse>>; /// The application // TODO: this debug impl is way too verbose. make something smaller @@ -43,7 +41,7 @@ pub struct Web3ProxyApp { balanced_rpcs: Arc, /// Send private requests (like eth_sendRawTransaction) to all these servers private_rpcs: Arc, - response_cache: ResponseLruCache, + // response_cache: ResponseLruCache, } impl fmt::Debug for Web3ProxyApp { @@ -61,8 +59,6 @@ impl Web3ProxyApp { ) -> anyhow::Result { let clock = QuantaClock::default(); - let best_head_block_number = Arc::new(AtomicU64::new(0)); - // make a http shared client // TODO: how should we configure the connection pool? // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server @@ -75,7 +71,6 @@ impl Web3ProxyApp { // TODO: attach context to this error let balanced_rpcs = Web3Connections::try_new( chain_id, - best_head_block_number.clone(), balanced_rpcs, Some(http_client.clone()), &clock, @@ -88,22 +83,15 @@ impl Web3ProxyApp { warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); balanced_rpcs.clone() } else { - Web3Connections::try_new( - chain_id, - best_head_block_number.clone(), - private_rpcs, - Some(http_client), - &clock, - false, - ) - .await? + Web3Connections::try_new(chain_id, private_rpcs, Some(http_client), &clock, false) + .await? }; Ok(Web3ProxyApp { clock, balanced_rpcs, private_rpcs, - response_cache: Default::default(), + // response_cache: Default::default(), }) } @@ -221,22 +209,24 @@ impl Web3ProxyApp { // this is not a private transaction (or no private relays are configured) // try to send to each tier, stopping at the first success // if no tiers are synced, fallback to privates + // TODO: think more about this loop. loop { - let best_block_number = self.balanced_rpcs.head_block_number(); + // TODO: bring back this caching + // let best_block_hash = self.balanced_rpcs.head_block_hash(); - // TODO: building this cache key is slow and its large, but i don't see a better way right now - // TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block - let cache_key = ( - best_block_number, - request.method.clone(), - request.params.clone().map(|x| x.to_string()), - ); + // // TODO: building this cache key is slow and its large, but i don't see a better way right now + // // TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block + // let cache_key = ( + // best_block_hash, + // request.method.clone(), + // request.params.clone().map(|x| x.to_string()), + // ); - if let Some(cached) = self.response_cache.read().get(&cache_key) { - // TODO: this still serializes every time - // TODO: return a reference in the other places so that this works without a clone? - return Ok(cached.to_owned()); - } + // if let Some(cached) = self.response_cache.read().get(&cache_key) { + // // TODO: this still serializes every time + // // TODO: return a reference in the other places so that this works without a clone? + // return Ok(cached.to_owned()); + // } match self.balanced_rpcs.next_upstream_server().await { Ok(active_request_handle) => { @@ -249,27 +239,27 @@ impl Web3ProxyApp { // TODO: trace here was really slow with millions of requests. // info!("forwarding request from {}", upstream_server); - let response = JsonRpcForwardedResponse { + JsonRpcForwardedResponse { jsonrpc: "2.0".to_string(), id: request.id, // TODO: since we only use the result here, should that be all we return from try_send_request? result: Some(partial_response), error: None, - }; - - // TODO: small race condidition here. parallel requests with the same query will both be saved to the cache - let mut response_cache = self.response_cache.write(); - - // TODO: cache the warp::reply to save us serializing every time - response_cache.insert(cache_key, response.clone()); - if response_cache.len() >= RESPONSE_CACHE_CAP { - // TODO: this isn't really an LRU. what is this called? should we make it an lru? these caches only live for one block - response_cache.pop_front(); } - drop(response_cache); + // // TODO: small race condidition here. parallel requests with the same query will both be saved to the cache + // let mut response_cache = self.response_cache.write(); - response + // // TODO: cache the warp::reply to save us serializing every time + // response_cache.insert(cache_key, response.clone()); + // if response_cache.len() >= RESPONSE_CACHE_CAP { + // // TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block + // response_cache.pop_front(); + // } + + // drop(response_cache); + + // response } Err(e) => { // TODO: move this to a helper function? diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index b7346ce9..aa13aae5 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -201,6 +201,8 @@ impl Web3Connection { let mut interval = interval(Duration::from_secs(2)); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + let mut last_hash = Default::default(); + loop { // wait for the interval // TODO: if error or rate limit, increase interval? @@ -215,6 +217,17 @@ impl Web3Connection { drop(active_request_handle); + // don't send repeat blocks + if let Ok(block) = &block { + let new_hash = block.hash.unwrap(); + + if new_hash == last_hash { + continue; + } + + last_hash = new_hash; + } + self.send_block(block, &block_sender); } } diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 6f2d15a7..ca4b65fb 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -6,12 +6,12 @@ use futures::StreamExt; use governor::clock::{QuantaClock, QuantaInstant}; use governor::NotUntil; use hashbrown::HashMap; +use left_right::{Absorb, ReadHandle, WriteHandle}; +use parking_lot::RwLock; use serde_json::value::RawValue; use std::cmp; use std::fmt; -use std::sync::atomic::{self, AtomicU64}; use std::sync::Arc; -use tokio::sync::RwLock; use tracing::{info, trace, warn}; use crate::config::Web3ConnectionConfig; @@ -41,6 +41,76 @@ impl SyncedConnections { inner, } } + + fn update(&mut self, new_block_num: u64, new_block_hash: H256, rpc: Arc) { + // TODO: double check this logic + match new_block_num.cmp(&self.head_block_number) { + cmp::Ordering::Greater => { + // the rpc's newest block is the new overall best block + info!("new head block {} from {}", new_block_num, rpc); + + self.inner.clear(); + self.inner.push(rpc); + + self.head_block_number = new_block_num; + self.head_block_hash = new_block_hash; + } + cmp::Ordering::Equal => { + if new_block_hash != self.head_block_hash { + // same height, but different chain + // TODO: anything else we should do? set some "nextSafeBlockHeight" to delay sending transactions? + warn!( + "chain is forked at #{}! {} has {:?}. First was {:?}", + new_block_num, rpc, new_block_hash, self.head_block_hash + ); + return; + } + + // do not clear synced_connections. + // we just want to add this rpc to the end + self.inner.push(rpc); + } + cmp::Ordering::Less => { + // this isn't the best block in the tier. don't do anything + return; + } + } + + // TODO: better log + trace!("Now synced: {:?}", self.inner); + } +} + +struct SyncedConnectionsUpdate { + new_block_number: u64, + new_block_hash: H256, + rpc: Arc, +} + +impl Absorb for SyncedConnections { + fn absorb_first(&mut self, operation: &mut SyncedConnectionsUpdate, _: &Self) { + self.update( + operation.new_block_number, + operation.new_block_hash, + operation.rpc.clone(), + ); + } + + fn absorb_second(&mut self, operation: SyncedConnectionsUpdate, _: &Self) { + self.update( + operation.new_block_number, + operation.new_block_hash, + operation.rpc, + ); + } + + // See the documentation of `Absorb::drop_first`. + fn drop_first(self: Box) {} + + fn sync_with(&mut self, first: &Self) { + // TODO: not sure about this + *self = first.clone() + } } /// A collection of web3 connections. Sends requests either the current best server or all servers. @@ -48,9 +118,8 @@ impl SyncedConnections { pub struct Web3Connections { inner: Vec>, /// TODO: what is the best type for this? Heavy reads with writes every few seconds. When writes happen, there is a burst of them - /// TODO: we probably need a better lock on this + /// TODO: we probably need a better lock on this. left_right with the writer in a mutex synced_connections: RwLock, - best_head_block_number: Arc, } impl fmt::Debug for Web3Connections { @@ -65,7 +134,6 @@ impl fmt::Debug for Web3Connections { impl Web3Connections { pub async fn try_new( chain_id: usize, - best_head_block_number: Arc, servers: Vec, http_client: Option, clock: &QuantaClock, @@ -88,12 +156,18 @@ impl Web3Connections { // TODO: exit if no connections? let connections = Arc::new(Self { - best_head_block_number: best_head_block_number.clone(), inner: connections, synced_connections: RwLock::new(SyncedConnections::new(num_connections)), }); if subscribe_heads { + if connections.inner.len() < 2 { + // TODO: less than 3? what should we do here? + return Err(anyhow::anyhow!( + "need at least 2 connections when subscribing to heads!" + )); + } + let (block_sender, block_receiver) = flume::unbounded(); { @@ -120,9 +194,9 @@ impl Web3Connections { Ok(connections) } - pub fn head_block_number(&self) -> u64 { - self.best_head_block_number.load(atomic::Ordering::Acquire) - } + // pub fn synced_connections(&self) -> &RwLock { + // &self.synced_connections + // } /// Send the same request to all the handles. Returning the fastest successful result. pub async fn try_send_parallel_requests( @@ -197,42 +271,9 @@ impl Web3Connections { } // TODO: experiment with different locks and such here - let mut synced_connections = self.synced_connections.write().await; + let mut synced_connections = self.synced_connections.write(); - // TODO: double check this logic - match new_block_num.cmp(&synced_connections.head_block_number) { - cmp::Ordering::Greater => { - // the rpc's newest block is the new overall best block - info!("new head block {} from {}", new_block_num, rpc); - - synced_connections.inner.clear(); - synced_connections.inner.push(rpc); - - synced_connections.head_block_number = new_block_num; - synced_connections.head_block_hash = new_block_hash; - } - cmp::Ordering::Equal => { - if new_block_hash != synced_connections.head_block_hash { - // same height, but different chain - warn!( - "chain is forked! {} has {}. First #{} was {}", - rpc, new_block_hash, new_block_num, synced_connections.head_block_hash - ); - continue; - } - - // do not clear synced_connections. - // we just want to add this rpc to the end - synced_connections.inner.push(rpc); - } - cmp::Ordering::Less => { - // this isn't the best block in the tier. don't do anything - continue; - } - } - - // TODO: better log - trace!("Now synced: {:?}", synced_connections.inner); + synced_connections.update(new_block_num, new_block_hash, rpc); } Ok(()) @@ -244,15 +285,15 @@ impl Web3Connections { ) -> Result>> { let mut earliest_not_until = None; - // TODO: this clone is probably not the best way to do this - let mut synced_rpc_indexes = self.synced_connections.read().await.inner.clone(); + // TODO: this clone is definitely not the best way to do this + let mut synced_rpc_arcs = self.synced_connections.read().inner.clone(); // // TODO: how should we include the soft limit? floats are slower than integer math // let a = a as f32 / self.soft_limit as f32; // let b = b as f32 / other.soft_limit as f32; // TODO: better key! - let sort_cache: HashMap = synced_rpc_indexes + let sort_cache: HashMap = synced_rpc_arcs .iter() .map(|connection| { // TODO: better key! @@ -267,7 +308,7 @@ impl Web3Connections { .collect(); // TODO: i think we might need to load active connections and then - synced_rpc_indexes.sort_unstable_by(|a, b| { + synced_rpc_arcs.sort_unstable_by(|a, b| { // TODO: better keys let a_key = format!("{}", a); let b_key = format!("{}", b); @@ -285,7 +326,7 @@ impl Web3Connections { } }); - for selected_rpc in synced_rpc_indexes.into_iter() { + for selected_rpc in synced_rpc_arcs.into_iter() { // increment our connection counter match selected_rpc.try_request_handle() { Err(not_until) => {