diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 403ff32f..7f07b358 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -5,11 +5,11 @@ use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; -use ethers::prelude::{HttpClientError, ProviderError, WsClientError}; +use ethers::prelude::{HttpClientError, ProviderError, WsClientError, H256}; use futures::future::join_all; use governor::clock::{Clock, QuantaClock}; -// use linkedhashmap::LinkedHashMap; -// use parking_lot::RwLock; +use linkedhashmap::LinkedHashMap; +use parking_lot::RwLock; use std::fmt; use std::sync::Arc; use std::time::Duration; @@ -23,12 +23,12 @@ static APP_USER_AGENT: &str = concat!( env!("CARGO_PKG_VERSION"), ); -// // TODO: put this in config? what size should we do? -// const RESPONSE_CACHE_CAP: usize = 1024; +// TODO: put this in config? what size should we do? +const RESPONSE_CACHE_CAP: usize = 1024; -// /// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work -// type ResponseLruCache = -// RwLock), JsonRpcForwardedResponse>>; +/// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work +type ResponseLruCache = + RwLock), JsonRpcForwardedResponse>>; /// The application // TODO: this debug impl is way too verbose. make something smaller @@ -41,7 +41,7 @@ pub struct Web3ProxyApp { balanced_rpcs: Arc, /// Send private requests (like eth_sendRawTransaction) to all these servers private_rpcs: Arc, - // response_cache: ResponseLruCache, + response_cache: ResponseLruCache, } impl fmt::Debug for Web3ProxyApp { @@ -91,7 +91,7 @@ impl Web3ProxyApp { clock, balanced_rpcs, private_rpcs, - // response_cache: Default::default(), + response_cache: Default::default(), }) } @@ -211,22 +211,26 @@ impl Web3ProxyApp { // if no tiers are synced, fallback to privates // TODO: think more about this loop. loop { - // TODO: bring back this caching - // let best_block_hash = self.balanced_rpcs.head_block_hash(); + let best_block_hash = self + .balanced_rpcs + .get_synced_rpcs() + .enter() + .map(|x| *x.get_head_block_hash()) + .unwrap(); - // // TODO: building this cache key is slow and its large, but i don't see a better way right now - // // TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block - // let cache_key = ( - // best_block_hash, - // request.method.clone(), - // request.params.clone().map(|x| x.to_string()), - // ); + // TODO: building this cache key is slow and its large, but i don't see a better way right now + // TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block + let cache_key = ( + best_block_hash, + request.method.clone(), + request.params.clone().map(|x| x.to_string()), + ); - // if let Some(cached) = self.response_cache.read().get(&cache_key) { - // // TODO: this still serializes every time - // // TODO: return a reference in the other places so that this works without a clone? - // return Ok(cached.to_owned()); - // } + if let Some(cached) = self.response_cache.read().get(&cache_key) { + // TODO: this still serializes every time + // TODO: return a reference in the other places so that this works without a clone? + return Ok(cached.to_owned()); + } match self.balanced_rpcs.next_upstream_server().await { Ok(active_request_handle) => { @@ -239,27 +243,27 @@ impl Web3ProxyApp { // TODO: trace here was really slow with millions of requests. // info!("forwarding request from {}", upstream_server); - JsonRpcForwardedResponse { + let response = JsonRpcForwardedResponse { jsonrpc: "2.0".to_string(), id: request.id, // TODO: since we only use the result here, should that be all we return from try_send_request? result: Some(partial_response), error: None, + }; + + // TODO: small race condidition here. parallel requests with the same query will both be saved to the cache + let mut response_cache = self.response_cache.write(); + + // TODO: cache the warp::reply to save us serializing every time + response_cache.insert(cache_key, response.clone()); + if response_cache.len() >= RESPONSE_CACHE_CAP { + // TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block + response_cache.pop_front(); } - // // TODO: small race condidition here. parallel requests with the same query will both be saved to the cache - // let mut response_cache = self.response_cache.write(); + drop(response_cache); - // // TODO: cache the warp::reply to save us serializing every time - // response_cache.insert(cache_key, response.clone()); - // if response_cache.len() >= RESPONSE_CACHE_CAP { - // // TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block - // response_cache.pop_front(); - // } - - // drop(response_cache); - - // response + response } Err(e) => { // TODO: move this to a helper function? diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index ca4b65fb..baa75d8d 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -6,8 +6,7 @@ use futures::StreamExt; use governor::clock::{QuantaClock, QuantaInstant}; use governor::NotUntil; use hashbrown::HashMap; -use left_right::{Absorb, ReadHandle, WriteHandle}; -use parking_lot::RwLock; +use left_right::{Absorb, ReadHandleFactory, WriteHandle}; use serde_json::value::RawValue; use std::cmp; use std::fmt; @@ -18,7 +17,7 @@ use crate::config::Web3ConnectionConfig; use crate::connection::{ActiveRequestHandle, Web3Connection}; #[derive(Clone, Default)] -struct SyncedConnections { +pub struct SyncedConnections { head_block_number: u64, head_block_hash: H256, inner: Vec>, @@ -32,22 +31,26 @@ impl fmt::Debug for SyncedConnections { } impl SyncedConnections { - fn new(max_connections: usize) -> Self { - let inner = Vec::with_capacity(max_connections); - - Self { - head_block_number: 0, - head_block_hash: Default::default(), - inner, - } + pub fn get_head_block_hash(&self) -> &H256 { + &self.head_block_hash } - fn update(&mut self, new_block_num: u64, new_block_hash: H256, rpc: Arc) { + fn update( + &mut self, + log: bool, + new_block_num: u64, + new_block_hash: H256, + rpc: Arc, + ) -> bool { + let mut update_needed: bool = false; + // TODO: double check this logic match new_block_num.cmp(&self.head_block_number) { cmp::Ordering::Greater => { // the rpc's newest block is the new overall best block - info!("new head block {} from {}", new_block_num, rpc); + if log { + info!("new head block {} from {}", new_block_num, rpc); + } self.inner.clear(); self.inner.push(rpc); @@ -59,52 +62,65 @@ impl SyncedConnections { if new_block_hash != self.head_block_hash { // same height, but different chain // TODO: anything else we should do? set some "nextSafeBlockHeight" to delay sending transactions? - warn!( - "chain is forked at #{}! {} has {:?}. First was {:?}", - new_block_num, rpc, new_block_hash, self.head_block_hash - ); - return; + if log { + warn!( + "chain is forked at #{}! {} has {:?}. First was {:?}", + new_block_num, rpc, new_block_hash, self.head_block_hash + ); + } + return update_needed; } // do not clear synced_connections. // we just want to add this rpc to the end self.inner.push(rpc); + + update_needed = true; } cmp::Ordering::Less => { // this isn't the best block in the tier. don't do anything - return; + return update_needed; } } // TODO: better log - trace!("Now synced: {:?}", self.inner); + if log { + trace!("Now synced: {:?}", self.inner); + } + + update_needed } } -struct SyncedConnectionsUpdate { - new_block_number: u64, - new_block_hash: H256, - rpc: Arc, +enum SyncedConnectionsOp { + SyncedConnectionsUpdate(u64, H256, Arc), + SyncedConnectionsCapacity(usize), } -impl Absorb for SyncedConnections { - fn absorb_first(&mut self, operation: &mut SyncedConnectionsUpdate, _: &Self) { - self.update( - operation.new_block_number, - operation.new_block_hash, - operation.rpc.clone(), - ); +impl Absorb for SyncedConnections { + fn absorb_first(&mut self, operation: &mut SyncedConnectionsOp, _: &Self) { + match operation { + SyncedConnectionsOp::SyncedConnectionsUpdate(new_block_number, new_block_hash, rpc) => { + self.update(true, *new_block_number, *new_block_hash, rpc.clone()); + } + SyncedConnectionsOp::SyncedConnectionsCapacity(capacity) => { + self.inner = Vec::with_capacity(*capacity); + } + } } - fn absorb_second(&mut self, operation: SyncedConnectionsUpdate, _: &Self) { - self.update( - operation.new_block_number, - operation.new_block_hash, - operation.rpc, - ); + fn absorb_second(&mut self, operation: SyncedConnectionsOp, _: &Self) { + match operation { + SyncedConnectionsOp::SyncedConnectionsUpdate(new_block_number, new_block_hash, rpc) => { + // TODO: disable logging on this one? + self.update(false, new_block_number, new_block_hash, rpc); + } + SyncedConnectionsOp::SyncedConnectionsCapacity(capacity) => { + self.inner = Vec::with_capacity(capacity); + } + } } - // See the documentation of `Absorb::drop_first`. fn drop_first(self: Box) {} fn sync_with(&mut self, first: &Self) { @@ -118,8 +134,8 @@ impl Absorb for SyncedConnections { pub struct Web3Connections { inner: Vec>, /// TODO: what is the best type for this? Heavy reads with writes every few seconds. When writes happen, there is a burst of them - /// TODO: we probably need a better lock on this. left_right with the writer in a mutex - synced_connections: RwLock, + /// TODO: why does the reader need a mutex? is there a better way to do this? + synced_connections_reader: ReadHandleFactory, } impl fmt::Debug for Web3Connections { @@ -139,10 +155,10 @@ impl Web3Connections { clock: &QuantaClock, subscribe_heads: bool, ) -> anyhow::Result> { - let mut connections = vec![]; - let num_connections = servers.len(); + // turn configs into connections + let mut connections = Vec::with_capacity(num_connections); for server_config in servers.into_iter() { match server_config .try_build(clock, chain_id, http_client.clone()) @@ -153,29 +169,20 @@ impl Web3Connections { } } - // TODO: exit if no connections? + if connections.len() < 2 { + // TODO: less than 3? what should we do here? + return Err(anyhow::anyhow!( + "need at least 2 connections when subscribing to heads!" + )); + } - let connections = Arc::new(Self { - inner: connections, - synced_connections: RwLock::new(SyncedConnections::new(num_connections)), - }); + let (block_sender, block_receiver) = flume::unbounded(); + + let (mut synced_connections_writer, synced_connections_reader) = + left_right::new::(); if subscribe_heads { - if connections.inner.len() < 2 { - // TODO: less than 3? what should we do here? - return Err(anyhow::anyhow!( - "need at least 2 connections when subscribing to heads!" - )); - } - - let (block_sender, block_receiver) = flume::unbounded(); - - { - let connections = Arc::clone(&connections); - tokio::spawn(async move { connections.update_synced_rpcs(block_receiver).await }); - } - - for connection in connections.inner.iter() { + for connection in connections.iter() { // subscribe to new heads in a spawned future // TODO: channel instead. then we can have one future with write access to a left-right? let connection = Arc::clone(connection); @@ -191,12 +198,31 @@ impl Web3Connections { } } + synced_connections_writer.append(SyncedConnectionsOp::SyncedConnectionsCapacity( + num_connections, + )); + synced_connections_writer.publish(); + + let connections = Arc::new(Self { + inner: connections, + synced_connections_reader: synced_connections_reader.factory(), + }); + + if subscribe_heads { + let connections = Arc::clone(&connections); + tokio::spawn(async move { + connections + .update_synced_rpcs(block_receiver, synced_connections_writer) + .await + }); + } + Ok(connections) } - // pub fn synced_connections(&self) -> &RwLock { - // &self.synced_connections - // } + pub fn get_synced_rpcs(&self) -> left_right::ReadHandle { + self.synced_connections_reader.handle() + } /// Send the same request to all the handles. Returning the fastest successful result. pub async fn try_send_parallel_requests( @@ -259,10 +285,10 @@ impl Web3Connections { } /// TODO: possible dead lock here. investigate more. probably refactor - // #[instrument] - pub async fn update_synced_rpcs( + async fn update_synced_rpcs( &self, block_receiver: flume::Receiver<(u64, H256, Arc)>, + mut synced_connections_writer: WriteHandle, ) -> anyhow::Result<()> { while let Ok((new_block_num, new_block_hash, rpc)) = block_receiver.recv_async().await { if new_block_num == 0 { @@ -270,10 +296,14 @@ impl Web3Connections { continue; } - // TODO: experiment with different locks and such here - let mut synced_connections = self.synced_connections.write(); + synced_connections_writer.append(SyncedConnectionsOp::SyncedConnectionsUpdate( + new_block_num, + new_block_hash, + rpc, + )); - synced_connections.update(new_block_num, new_block_hash, rpc); + // TODO: only publish when the second block arrives? + synced_connections_writer.publish(); } Ok(()) @@ -285,8 +315,12 @@ impl Web3Connections { ) -> Result>> { let mut earliest_not_until = None; - // TODO: this clone is definitely not the best way to do this - let mut synced_rpc_arcs = self.synced_connections.read().inner.clone(); + let mut synced_rpc_arcs = self + .synced_connections_reader + .handle() + .enter() + .map(|x| x.inner.clone()) + .unwrap(); // // TODO: how should we include the soft limit? floats are slower than integer math // let a = a as f32 / self.soft_limit as f32;