diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index b8eb850f..0e0b13fd 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -923,12 +923,30 @@ impl Web3Rpcs { let rate_limit_substrings = ["limit", "exceeded", "quota usage"]; for rate_limit_substr in rate_limit_substrings { if error_msg.contains(rate_limit_substr) { - warn!( - "rate limited ({}) by {}", - error_msg, - skip_rpcs.last().unwrap() - ); - continue; + if rate_limit_substr.contains("result on length") { + // this error contains "limit" but is not a rate limit error + // TODO: make the expected limit configurable + // TODO: parse the rate_limit_substr and only continue if it is < expected limit + if rate_limit_substr.contains("exceeding limit 2000000") { + // they hit our expected limit. return the error now + return Err(error.into()); + } else { + // they hit a limit lower than what we expect + warn!( + "unexpected result limit ({}) by {}", + error_msg, + skip_rpcs.last().unwrap() + ); + continue; + } + } else { + warn!( + "rate limited ({}) by {}", + error_msg, + skip_rpcs.last().unwrap() + ); + continue; + } } } diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index dc07a732..275ee3b0 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -9,6 +9,7 @@ use crate::frontend::authorization::Authorization; use crate::jsonrpc::{JsonRpcParams, JsonRpcResultData}; use crate::rpcs::request::RequestErrorHandler; use anyhow::{anyhow, Context}; +use arc_swap::ArcSwapOption; use ethers::prelude::{Bytes, Middleware, TxHash, U64}; use ethers::types::{Address, Transaction, U256}; use futures::future::try_join_all; @@ -28,6 +29,7 @@ use std::hash::{Hash, Hasher}; use std::sync::atomic::{self, AtomicU64, AtomicU8, AtomicUsize}; use std::{cmp::Ordering, sync::Arc}; use thread_fast_rng::rand::Rng; +use tokio::select; use tokio::sync::watch; use tokio::time::{sleep, sleep_until, timeout, Duration, Instant}; use url::Url; @@ -40,10 +42,12 @@ pub struct Web3Rpc { pub db_conn: Option, /// most all requests prefer use the http_provider pub(super) http_provider: Option, + /// the websocket url is only used for subscriptions + pub(super) ws_url: Option, /// the websocket provider is only used for subscriptions - pub(super) ws_provider: Option, + pub(super) ws_provider: ArcSwapOption, /// keep track of hard limits - /// this is only inside an Option so that the "Default" derive works. it will always be set. + /// hard_limit_until is only inside an Option so that the "Default" derive works. it will always be set. pub(super) hard_limit_until: Option>, /// rate limits are stored in a central redis so that multiple proxies can share their rate limits /// We do not use the deferred rate limiter because going over limits would cause errors @@ -56,13 +60,12 @@ pub struct Web3Rpc { pub backup: bool, /// TODO: have an enum for this so that "no limit" prints pretty? pub(super) block_data_limit: AtomicU64, - /// TODO: change this to a watch channel so that http providers can subscribe and take action on change. - /// this is only inside an Option so that the "Default" derive works. it will always be set. + /// head_block is only inside an Option so that the "Default" derive works. it will always be set. pub(super) head_block: Option>>, /// Track head block latency pub(super) head_latency: RwLock, /// Track peak request latency - /// This is only inside an Option so that the "Default" derive works. it will always be set. + /// peak_latency is only inside an Option so that the "Default" derive works. it will always be set. pub(super) peak_latency: Option, /// Automatically set priority pub(super) tier: AtomicU8, @@ -70,8 +73,9 @@ pub struct Web3Rpc { /// TODO: maybe move this to graphana pub(super) total_requests: AtomicUsize, pub(super) active_requests: AtomicUsize, - /// this is only inside an Option so that the "Default" derive works. it will always be set. + /// disconnect_watch is only inside an Option so that the "Default" derive works. it will always be set. pub(super) disconnect_watch: Option>, + /// created_at is only inside an Option so that the "Default" derive works. it will always be set. pub(super) created_at: Option, } @@ -172,12 +176,10 @@ impl Web3Rpc { None }; - let ws_provider = if let Some(ws_url) = config.ws_url { + let ws_url = if let Some(ws_url) = config.ws_url { let ws_url = ws_url.parse::()?; - Some(connect_ws(ws_url, usize::MAX).await?) - - // TODO: check the provider is on the right chain + Some(ws_url) } else { None }; @@ -189,7 +191,7 @@ impl Web3Rpc { backup, block_data_limit, created_at: Some(created_at), - db_conn: db_conn.clone(), + db_conn, display_name: config.display_name, hard_limit, hard_limit_until: Some(hard_limit_until), @@ -198,7 +200,7 @@ impl Web3Rpc { name, peak_latency: Some(peak_latency), soft_limit: config.soft_limit, - ws_provider, + ws_url, disconnect_watch: Some(disconnect_watch), ..Default::default() }; @@ -211,8 +213,9 @@ impl Web3Rpc { let handle = { let new_connection = new_connection.clone(); tokio::spawn(async move { + // TODO: this needs to be a subscribe_with_reconnect that does a retry with jitter and exponential backoff new_connection - .subscribe(block_map, block_sender, chain_id, tx_id_sender) + .subscribe_with_reconnect(block_map, block_sender, chain_id, tx_id_sender) .await }) }; @@ -410,6 +413,7 @@ impl Web3Rpc { } /// query the web3 provider to confirm it is on the expected chain with the expected data available + /// TODO: this currently checks only the http if both http and ws are set. it should check both and make sure they match async fn check_provider(self: &Arc, chain_id: u64) -> Web3ProxyResult<()> { // check the server's chain_id here // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error @@ -563,10 +567,50 @@ impl Web3Rpc { Ok(()) } + #[allow(clippy::too_many_arguments)] + async fn subscribe_with_reconnect( + self: Arc, + block_map: BlocksByHashCache, + block_sender: Option>, + chain_id: u64, + tx_id_sender: Option)>>, + ) -> Web3ProxyResult<()> { + loop { + if let Err(err) = self + .clone() + .subscribe( + block_map.clone(), + block_sender.clone(), + chain_id, + tx_id_sender.clone(), + ) + .await + { + if self.should_disconnect() { + break; + } + + warn!("{} subscribe err: {:#?}", self, err) + } else if self.should_disconnect() { + break; + } + + if self.backup { + debug!("reconnecting to {} in 30 seconds", self); + } else { + info!("reconnecting to {} in 30 seconds", self); + } + + // TODO: exponential backoff with jitter + sleep(Duration::from_secs(30)).await; + } + + Ok(()) + } + /// subscribe to blocks and transactions /// This should only exit when the program is exiting. /// TODO: should more of these args be on self? chain_id for sure - #[allow(clippy::too_many_arguments)] async fn subscribe( self: Arc, block_map: BlocksByHashCache, @@ -580,12 +624,45 @@ impl Web3Rpc { Some(RequestErrorHandler::ErrorLevel) }; + if let Some(url) = self.ws_url.clone() { + debug!("starting websocket provider on {}", self); + + let x = connect_ws(url, usize::MAX).await?; + + let x = Arc::new(x); + + self.ws_provider.store(Some(x)); + } + debug!("starting subscriptions on {}", self); self.check_provider(chain_id).await?; let mut futures = vec![]; + // TODO: use this channel instead of self.disconnect_watch + let (subscribe_stop_tx, mut subscribe_stop_rx) = watch::channel(false); + + // subscribe to the disconnect watch. the app uses this when shutting down + if let Some(disconnect_watch_tx) = self.disconnect_watch.as_ref() { + let mut disconnect_watch_rx = disconnect_watch_tx.subscribe(); + + let f = async move { + // TODO: make sure it changed to "true" + select! { + x = disconnect_watch_rx.changed() => { + x?; + }, + x = subscribe_stop_rx.changed() => { + x?; + }, + } + Ok(()) + }; + + futures.push(flatten_handle(tokio::spawn(f))); + } + // health check that runs if there haven't been any recent requests { // TODO: move this into a proper function @@ -595,14 +672,16 @@ impl Web3Rpc { // TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though let health_sleep_seconds = 5; + let subscribe_stop_rx = subscribe_stop_tx.subscribe(); + // health check loop let f = async move { // TODO: benchmark this and lock contention let mut old_total_requests = 0; let mut new_total_requests; - // TODO: errors here should not cause the loop to exit! - while !rpc.should_disconnect() { + // errors here should not cause the loop to exit! + while !(*subscribe_stop_rx.borrow()) { new_total_requests = rpc.total_requests.load(atomic::Ordering::Relaxed); if new_total_requests - old_total_requests < 5 { @@ -629,11 +708,24 @@ impl Web3Rpc { } // subscribe to new heads - if let Some(block_sender) = &block_sender { - // TODO: do we need this to be abortable? - let f = self - .clone() - .subscribe_new_heads(block_sender.clone(), block_map.clone()); + if let Some(block_sender) = block_sender.clone() { + let clone = self.clone(); + let subscribe_stop_rx = subscribe_stop_tx.subscribe(); + + let f = async move { + let x = clone + .subscribe_new_heads(block_sender.clone(), block_map.clone(), subscribe_stop_rx) + .await; + + // error or success, we clear the block when subscribe_new_heads exits + clone + .send_head_block_result(Ok(None), &block_sender, &block_map) + .await?; + + x + }; + + // TODO: if futures.push(flatten_handle(tokio::spawn(f))); } @@ -641,8 +733,11 @@ impl Web3Rpc { // subscribe pending transactions // TODO: make this opt-in. its a lot of bandwidth if let Some(tx_id_sender) = tx_id_sender { - // TODO: do we need this to be abortable? - let f = self.clone().subscribe_pending_transactions(tx_id_sender); + let subscribe_stop_rx = subscribe_stop_tx.subscribe(); + + let f = self + .clone() + .subscribe_pending_transactions(tx_id_sender, subscribe_stop_rx); futures.push(flatten_handle(tokio::spawn(f))); } @@ -654,19 +749,19 @@ impl Web3Rpc { debug!("subscriptions on {} exited", self); - self.disconnect_watch - .as_ref() - .expect("disconnect_watch should always be set") - .send_replace(true); + subscribe_stop_tx.send_replace(true); + + // TODO: wait for all of the futures to exit? Ok(()) } /// Subscribe to new blocks. async fn subscribe_new_heads( - self: Arc, + self: &Arc, block_sender: flume::Sender, block_map: BlocksByHashCache, + subscribe_stop_rx: watch::Receiver, ) -> Web3ProxyResult<()> { debug!("subscribing to new heads on {}", self); @@ -674,7 +769,7 @@ impl Web3Rpc { let error_handler = None; let authorization = Default::default(); - if let Some(ws_provider) = self.ws_provider.as_ref() { + if let Some(ws_provider) = self.ws_provider.load().as_ref() { // todo: move subscribe_blocks onto the request handle let active_request_handle = self .wait_for_request_handle(&authorization, None, error_handler) @@ -686,7 +781,7 @@ impl Web3Rpc { // there is a very small race condition here where the stream could send us a new block right now // but all seeing the same block twice won't break anything // TODO: how does this get wrapped in an arc? does ethers handle that? - // TODO: can we force this to use the websocket? + // TODO: send this request to the ws_provider instead of the http_provider let latest_block: Result, _> = self .authorized_request( "eth_getBlockByNumber", @@ -700,7 +795,7 @@ impl Web3Rpc { .await?; while let Some(block) = blocks.next().await { - if self.should_disconnect() { + if *subscribe_stop_rx.borrow() { break; } @@ -714,7 +809,7 @@ impl Web3Rpc { let mut blocks = http_provider.watch_blocks().await?; while let Some(block_hash) = blocks.next().await { - if self.should_disconnect() { + if *subscribe_stop_rx.borrow() { break; } @@ -737,7 +832,7 @@ impl Web3Rpc { self.send_head_block_result(Ok(None), &block_sender, &block_map) .await?; - if self.should_disconnect() { + if *subscribe_stop_rx.borrow() { Ok(()) } else { Err(anyhow!("new_heads subscription exited. reconnect needed").into()) @@ -748,9 +843,9 @@ impl Web3Rpc { async fn subscribe_pending_transactions( self: Arc, tx_id_sender: flume::Sender<(TxHash, Arc)>, + mut subscribe_stop_rx: watch::Receiver, ) -> Web3ProxyResult<()> { - // TODO: make this subscription optional - self.wait_for_disconnect().await?; + subscribe_stop_rx.changed().await?; /* trace!("watching pending transactions on {}", self); @@ -795,7 +890,7 @@ impl Web3Rpc { } */ - if self.should_disconnect() { + if *subscribe_stop_rx.borrow() { Ok(()) } else { Err(anyhow!("pending_transactions subscription exited. reconnect needed").into()) @@ -909,20 +1004,6 @@ impl Web3Rpc { Ok(handle.into()) } - async fn wait_for_disconnect(&self) -> Result<(), tokio::sync::watch::error::RecvError> { - let mut disconnect_subscription = self.disconnect_watch.as_ref().unwrap().subscribe(); - - loop { - if *disconnect_subscription.borrow_and_update() { - // disconnect watch is set to "true" - return Ok(()); - } - - // wait for disconnect_subscription to change - disconnect_subscription.changed().await?; - } - } - pub async fn internal_request( self: &Arc, method: &str, diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 40daecf1..3c483ad8 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -193,7 +193,7 @@ impl OpenRequestHandle { // TODO: replace ethers-rs providers with our own that handles "id" being null let response: Result = if let Some(ref p) = self.rpc.http_provider { p.request(method, params).await - } else if let Some(ref p) = self.rpc.ws_provider { + } else if let Some(p) = self.rpc.ws_provider.load().as_ref() { p.request(method, params).await } else { return Err(ProviderError::CustomError(