diff --git a/Cargo.lock b/Cargo.lock index 395f326e..2cfd51c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -59,12 +59,6 @@ version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc" -[[package]] -name = "arc-swap" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f" - [[package]] name = "argh" version = "0.1.7" @@ -3851,7 +3845,6 @@ name = "web3-proxy" version = "0.1.0" dependencies = [ "anyhow", - "arc-swap", "argh", "derive_more", "ethers", diff --git a/web3-proxy/Cargo.toml b/web3-proxy/Cargo.toml index c8674154..1eef379c 100644 --- a/web3-proxy/Cargo.toml +++ b/web3-proxy/Cargo.toml @@ -7,7 +7,6 @@ edition = "2021" [dependencies] anyhow = "1.0.57" -arc-swap = "1.5.0" argh = "0.1.7" # axum = "*" # TODO: use this instead of warp? derive_more = "0.99.17" diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index f72a09ec..b5558bd1 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -133,13 +133,14 @@ impl Web3Connection { pub async fn new_heads( self: Arc, connections: Option>, + best_head_block_number: Arc, ) -> anyhow::Result<()> { info!("Watching new_heads on {}", self); match &self.provider { Web3Provider::Http(provider) => { // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints - // TODO: what should this interval be? probably some fraction of block time + // TODO: what should this interval be? probably some fraction of block time. set automatically? // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now let mut interval = interval(Duration::from_secs(2)); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); @@ -149,7 +150,6 @@ impl Web3Connection { // TODO: if error or rate limit, increase interval? interval.tick().await; - // rate limits let active_request_handle = self.wait_for_request_handle().await; let block_number = provider.get_block_number().await.map(|x| x.as_u64())?; @@ -165,6 +165,14 @@ impl Web3Connection { if old_block_number != block_number { info!("new block on {}: {}", self, block_number); + // we don't care about this result. + let _ = best_head_block_number.compare_exchange( + old_block_number, + block_number, + atomic::Ordering::AcqRel, + atomic::Ordering::Acquire, + ); + if let Some(connections) = &connections { connections.update_synced_rpcs(&self, block_number)?; } @@ -193,8 +201,17 @@ impl Web3Connection { info!("current block on {}: {}", self, block_number); - self.head_block_number - .store(block_number, atomic::Ordering::Release); + let old_block_number = self + .head_block_number + .swap(block_number, atomic::Ordering::Release); + + // we don't care about this result + let _ = best_head_block_number.compare_exchange( + old_block_number, + block_number, + atomic::Ordering::AcqRel, + atomic::Ordering::Acquire, + ); if let Some(connections) = &connections { connections.update_synced_rpcs(&self, block_number)?; @@ -209,6 +226,9 @@ impl Web3Connection { self.head_block_number .store(block_number, atomic::Ordering::Release); + // TODO: what ordering? + best_head_block_number.fetch_max(block_number, atomic::Ordering::AcqRel); + info!("new block on {}: {}", self, block_number); if let Some(connections) = &connections { @@ -284,7 +304,7 @@ impl ActiveRequestHandle { self, method: &str, params: &serde_json::value::RawValue, - ) -> Result { + ) -> Result, ethers::prelude::ProviderError> { match &self.0.provider { Web3Provider::Http(provider) => provider.request(method, params).await, Web3Provider::Ws(provider) => provider.request(method, params).await, @@ -347,11 +367,15 @@ impl fmt::Debug for JsonRpcRequest { } } -// TODO: check for errors too! -#[derive(Clone, Deserialize, Serialize)] +#[derive(Clone, Serialize)] pub struct JsonRpcForwardedResponse { + pub jsonrpc: String, pub id: Box, - pub result: Box, + #[serde(skip_serializing_if = "Option::is_none")] + pub result: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, + // TODO: optional error } impl fmt::Debug for JsonRpcForwardedResponse { diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 1fd3ef2b..f3c5cbdc 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -1,19 +1,20 @@ ///! Communicate with a group of web3 providers -use arc_swap::ArcSwap; use derive_more::From; use futures::stream::FuturesUnordered; use futures::StreamExt; use governor::clock::{QuantaClock, QuantaInstant}; use governor::NotUntil; use hashbrown::HashMap; +use parking_lot::RwLock; use serde_json::value::RawValue; use std::cmp; use std::fmt; +use std::sync::atomic::{self, AtomicU64}; use std::sync::Arc; use tracing::warn; use crate::config::Web3ConnectionConfig; -use crate::connection::{ActiveRequestHandle, JsonRpcForwardedResponse, Web3Connection}; +use crate::connection::{ActiveRequestHandle, Web3Connection}; #[derive(Clone, Default)] struct SyncedConnections { @@ -44,8 +45,9 @@ impl SyncedConnections { pub struct Web3Connections { inner: Vec>, /// TODO: what is the best type for this? Heavy reads with writes every few seconds. When writes happen, there is a burst of them - /// TODO: arcswap was a lot faster, but i think we need a lock for proper logic - synced_connections: ArcSwap, + /// TODO: we probably need a better lock on this + synced_connections: RwLock, + best_head_block_number: Arc, } impl fmt::Debug for Web3Connections { @@ -59,7 +61,7 @@ impl fmt::Debug for Web3Connections { impl Web3Connections { pub async fn try_new( - // TODO: servers should be a Web3ConnectionBuilder struct + best_head_block_number: Arc, servers: Vec, http_client: Option, clock: &QuantaClock, @@ -78,20 +80,25 @@ impl Web3Connections { // TODO: exit if no connections? let connections = Arc::new(Self { + best_head_block_number: best_head_block_number.clone(), inner: connections, - synced_connections: ArcSwap::new(Arc::new(SyncedConnections::new(num_connections))), + synced_connections: RwLock::new(SyncedConnections::new(num_connections)), }); for connection in connections.inner.iter() { // subscribe to new heads in a spawned future - // TODO: channel instead. then we can have one future with write access to a left-right + // TODO: channel instead. then we can have one future with write access to a left-right? let connection = Arc::clone(connection); let connections = connections.clone(); + let best_head_block_number = best_head_block_number.clone(); tokio::spawn(async move { let url = connection.url().to_string(); // TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date - if let Err(e) = connection.new_heads(Some(connections)).await { + if let Err(e) = connection + .new_heads(Some(connections), best_head_block_number) + .await + { warn!("new_heads error on {}: {:?}", url, e); } }); @@ -101,15 +108,15 @@ impl Web3Connections { } pub fn head_block_number(&self) -> u64 { - self.synced_connections.load().head_block_number + self.best_head_block_number.load(atomic::Ordering::Acquire) } - pub async fn try_send_request<'a>( + pub async fn try_send_request( &self, connection_handle: ActiveRequestHandle, method: &str, params: &RawValue, - ) -> anyhow::Result { + ) -> anyhow::Result> { // connection.in_active_requests was called when this rpc was selected let response = connection_handle.request(method, params).await; @@ -124,7 +131,7 @@ impl Web3Connections { connections: Vec, method: String, params: Box, - response_sender: flume::Sender>, + response_sender: flume::Sender>>, ) -> anyhow::Result<()> { let mut unordered_futures = FuturesUnordered::new(); @@ -185,35 +192,38 @@ impl Web3Connections { rpc: &Arc, new_block: u64, ) -> anyhow::Result<()> { - // TODO: try a left_right instead of an ArcSwap. - let synced_connections = self.synced_connections.load(); + let mut synced_connections = self.synced_connections.write(); - // should we load new_block here? + let current_block_number = synced_connections.head_block_number; - let mut new_synced_connections: SyncedConnections = - match synced_connections.head_block_number.cmp(&new_block) { - cmp::Ordering::Equal => { - // this rpc is synced, but it isn't the first to this block - (**synced_connections).to_owned() - } - cmp::Ordering::Less => { - // this is a new head block. clear the current synced connections - // TODO: this is too verbose with a bunch of tiers. include the tier - // info!("new head block from {:?}: {}", rpc, new_block); + let best_head_block = self.head_block_number(); - let mut new_synced_connections = SyncedConnections::new(self.inner.len()); + match current_block_number.cmp(&best_head_block) { + cmp::Ordering::Equal => { + // this rpc tier is synced, and it isn't the first to this block + } + cmp::Ordering::Less => {} + cmp::Ordering::Greater => {} + } - // synced_connections.inner.clear(); + match current_block_number.cmp(&new_block) { + cmp::Ordering::Equal => { + // this rpc is synced, and it isn't the first to this block + } + cmp::Ordering::Less => { + // this is a new head block. clear the current synced connections + // TODO: this is too verbose with a bunch of tiers. include the tier + // info!("new head block from {:?}: {}", rpc, new_block); - new_synced_connections.head_block_number = new_block; + synced_connections.inner.clear(); - new_synced_connections - } - cmp::Ordering::Greater => { - // not the latest block. return now - return Ok(()); - } - }; + synced_connections.head_block_number = new_block; + } + cmp::Ordering::Greater => { + // not the latest block. return now + return Ok(()); + } + } let rpc_index = self .inner @@ -221,10 +231,7 @@ impl Web3Connections { .position(|x| x.url() == rpc.url()) .unwrap(); - new_synced_connections.inner.push(rpc_index); - - self.synced_connections - .swap(Arc::new(new_synced_connections)); + synced_connections.inner.push(rpc_index); Ok(()) } @@ -236,7 +243,7 @@ impl Web3Connections { let mut earliest_not_until = None; // TODO: this clone is probably not the best way to do this - let mut synced_rpc_indexes = self.synced_connections.load().inner.clone(); + let mut synced_rpc_indexes = self.synced_connections.read().inner.clone(); let cache: HashMap = synced_rpc_indexes .iter() diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index eff525d1..6f9ba347 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -3,12 +3,13 @@ mod connection; mod connections; use config::Web3ConnectionConfig; +use connection::JsonRpcForwardedResponse; use futures::future; use governor::clock::{Clock, QuantaClock}; use linkedhashmap::LinkedHashMap; -use serde_json::json; use std::fmt; use std::fs; +use std::sync::atomic::{self, AtomicU64}; use std::sync::Arc; use std::time::Duration; use tokio::sync::RwLock; @@ -28,11 +29,17 @@ static APP_USER_AGENT: &str = concat!( env!("CARGO_PKG_VERSION"), ); -const RESPONSE_CACHE_CAP: usize = 128; +// TODO: put this in config? what size should we do? +const RESPONSE_CACHE_CAP: usize = 1024; + +/// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work +type ResponseLruCache = RwLock>; /// The application // TODO: this debug impl is way too verbose. make something smaller +// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs pub struct Web3ProxyApp { + best_head_block_number: Arc, /// clock used for rate limiting /// TODO: use tokio's clock (will require a different ratelimiting crate) clock: QuantaClock, @@ -40,14 +47,18 @@ pub struct Web3ProxyApp { balanced_rpc_tiers: Vec>, /// Send private requests (like eth_sendRawTransaction) to all these servers private_rpcs: Option>, - /// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work - response_cache: RwLock>, + response_cache: ResponseLruCache, } impl fmt::Debug for Web3ProxyApp { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default formatter takes forever to write. this is too quiet though - f.debug_struct("Web3ProxyApp").finish_non_exhaustive() + f.debug_struct("Web3ProxyApp") + .field( + "best_head_block_number", + &self.best_head_block_number.load(atomic::Ordering::Relaxed), + ) + .finish_non_exhaustive() } } @@ -58,6 +69,8 @@ impl Web3ProxyApp { ) -> anyhow::Result { let clock = QuantaClock::default(); + let best_head_block_number = Arc::new(AtomicU64::new(0)); + // make a http shared client // TODO: how should we configure the connection pool? // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server @@ -70,7 +83,12 @@ impl Web3ProxyApp { // TODO: attach context to this error let balanced_rpc_tiers = future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| { - Web3Connections::try_new(balanced_rpc_tier, Some(http_client.clone()), &clock) + Web3Connections::try_new( + best_head_block_number.clone(), + balanced_rpc_tier, + Some(http_client.clone()), + &clock, + ) })) .await .into_iter() @@ -82,10 +100,19 @@ impl Web3ProxyApp { // TODO: instead of None, set it to a list of all the rpcs from balanced_rpc_tiers. that way we broadcast very loudly None } else { - Some(Web3Connections::try_new(private_rpcs, Some(http_client), &clock).await?) + Some( + Web3Connections::try_new( + best_head_block_number.clone(), + private_rpcs, + Some(http_client), + &clock, + ) + .await?, + ) }; Ok(Web3ProxyApp { + best_head_block_number, clock, balanced_rpc_tiers, private_rpcs, @@ -113,6 +140,7 @@ impl Web3ProxyApp { let method = json_body.method.clone(); let params = json_body.params.clone(); + // TODO: benchmark this compared to waiting on unbounded futures tokio::spawn(async move { connections .try_send_requests(upstream_servers, method, params, tx) @@ -120,14 +148,16 @@ impl Web3ProxyApp { }); // wait for the first response - let response = rx.recv_async().await?; + let backend_response = rx.recv_async().await?; - if let Ok(partial_response) = response { - let response = json!({ - "jsonrpc": "2.0", - "id": json_body.id, - "result": partial_response - }); + if let Ok(backend_response) = backend_response { + // TODO: i think we + let response = JsonRpcForwardedResponse { + jsonrpc: "2.0".to_string(), + id: json_body.id, + result: Some(backend_response), + error: None, + }; return Ok(warp::reply::json(&response)); } } @@ -181,17 +211,19 @@ impl Web3ProxyApp { // TODO: trace here was really slow with millions of requests. // info!("forwarding request from {}", upstream_server); - let response = json!({ + let response = JsonRpcForwardedResponse { // TODO: re-use their jsonrpc? - "jsonrpc": "2.0", - "id": json_body.id, + jsonrpc: "2.0".to_string(), + id: json_body.id, // TODO: since we only use the result here, should that be all we return from try_send_request? - "result": partial_response.result, - }); + result: Some(partial_response), + error: None, + }; // TODO: small race condidition here. parallel requests with the same query will both be saved to the cache let mut response_cache = self.response_cache.write().await; + // TODO: cache the warp::reply to save us serializing every time response_cache.insert(cache_key, response.clone()); if response_cache.len() >= RESPONSE_CACHE_CAP { response_cache.pop_front(); @@ -201,11 +233,12 @@ impl Web3ProxyApp { } Err(e) => { // TODO: what is the proper format for an error? - json!({ - "jsonrpc": "2.0", - "id": json_body.id, - "error": format!("{}", e) - }) + JsonRpcForwardedResponse { + jsonrpc: "2.0".to_string(), + id: json_body.id, + result: None, + error: Some(format!("{}", e)), + } } };