From 91eeee23e2f2eee2158551da2a649033baa4a3b7 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 22 May 2023 15:32:15 -0700 Subject: [PATCH] use ether's reconnects instead of our own which need a lock (wip) --- web3_proxy/src/config.rs | 2 - web3_proxy/src/frontend/users/payment.rs | 5 +- web3_proxy/src/rpcs/blockchain.rs | 3 +- web3_proxy/src/rpcs/many.rs | 20 +- web3_proxy/src/rpcs/one.rs | 404 ++++++----------------- web3_proxy/src/rpcs/provider.rs | 155 ++++----- web3_proxy/src/rpcs/request.rs | 52 +-- web3_proxy/src/rpcs/transactions.rs | 3 +- 8 files changed, 191 insertions(+), 453 deletions(-) diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 05a947d5..fc156631 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -287,7 +287,6 @@ impl Web3RpcConfig { blocks_by_hash_cache: BlocksByHashCache, block_sender: Option>, tx_id_sender: Option>, - reconnect: bool, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { if !self.extra.is_empty() { warn!("unknown Web3RpcConfig fields!: {:?}", self.extra.keys()); @@ -304,7 +303,6 @@ impl Web3RpcConfig { blocks_by_hash_cache, block_sender, tx_id_sender, - reconnect, ) .await } diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index 975fb3da..728728d0 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -166,7 +166,6 @@ pub async fn user_balance_post( "eth_getTransactionReceipt", &vec![format!("0x{}", hex::encode(tx_hash))], Level::Trace.into(), - None, ) .await // TODO: What kind of error would be here @@ -217,7 +216,7 @@ pub async fn user_balance_post( ]); debug!("Params are: {:?}", ¶ms); let accepted_token: String = handle - .request("eth_call", ¶ms, Level::Trace.into(), None) + .request("eth_call", ¶ms, Level::Trace.into()) .await // TODO: What kind of error would be here .map_err(|err| Web3ProxyError::Anyhow(err.into()))?; @@ -267,7 +266,7 @@ pub async fn user_balance_post( ]); debug!("ERC20 Decimal request params are: {:?}", ¶ms); let decimals: String = handle - .request("eth_call", ¶ms, Level::Trace.into(), None) + .request("eth_call", ¶ms, Level::Trace.into()) .await .map_err(|err| Web3ProxyError::Anyhow(err.into()))?; debug!("Decimals response is: {:?}", decimals); diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 9c181e0d..2b71b1fe 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -219,13 +219,12 @@ impl Web3Rpcs { // TODO: if error, retry? let block: Web3ProxyBlock = match rpc { Some(rpc) => rpc - .wait_for_request_handle(authorization, Some(Duration::from_secs(30)), None) + .wait_for_request_handle(authorization, Some(Duration::from_secs(30))) .await? .request::<_, Option>( "eth_getBlockByHash", &json!(get_block_params), Level::Error.into(), - None, ) .await? .and_then(|x| { diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 5f281bee..74ef6776 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -276,7 +276,6 @@ impl Web3Rpcs { blocks_by_hash_cache, block_sender, pending_tx_id_sender, - true, )); Some(handle) @@ -303,9 +302,9 @@ impl Web3Rpcs { while new_head_receiver.borrow_and_update().is_none() { new_head_receiver.changed().await?; } - } - old_rpc.disconnect().await.context("disconnect old rpc")?; + // TODO: tell ethers to disconnect? is there a function for that? + } } // TODO: what should we do with the new handle? make sure error logs aren't dropped @@ -435,7 +434,7 @@ impl Web3Rpcs { .into_iter() .map(|active_request_handle| async move { let result: Result, _> = active_request_handle - .request(method, &json!(¶ms), error_level.into(), None) + .request(method, &json!(¶ms), error_level.into()) .await; result }) @@ -508,7 +507,7 @@ impl Web3Rpcs { skip.push(Arc::clone(faster_rpc)); // just because it has lower latency doesn't mean we are sure to get a connection. there might be rate limits - match faster_rpc.try_request_handle(authorization, None).await { + match faster_rpc.try_request_handle(authorization).await { Ok(OpenRequestResult::Handle(handle)) => { trace!("opened handle: {}", faster_rpc); return OpenRequestResult::Handle(handle); @@ -831,7 +830,7 @@ impl Web3Rpcs { } // check rate limits and increment our connection counter - match rpc.try_request_handle(authorization, None).await { + match rpc.try_request_handle(authorization).await { Ok(OpenRequestResult::RetryAt(retry_at)) => { // this rpc is not available. skip it trace!("{} is rate limited. skipping", rpc); @@ -908,7 +907,6 @@ impl Web3Rpcs { &request.method, &json!(request.params), RequestErrorHandler::Save, - None, ) .await; @@ -1307,8 +1305,8 @@ mod tests { use std::time::{SystemTime, UNIX_EPOCH}; use super::*; + use crate::rpcs::blockchain::Web3ProxyBlock; use crate::rpcs::consensus::ConsensusFinder; - use crate::rpcs::{blockchain::Web3ProxyBlock, provider::Web3Provider}; use arc_swap::ArcSwap; use ethers::types::H256; use ethers::types::{Block, U256}; @@ -1451,7 +1449,6 @@ mod tests { block_data_limit: block_data_limit.into(), tier: 0, head_block: Some(tx_synced), - provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), peak_latency: Some(new_peak_latency()), ..Default::default() }; @@ -1466,7 +1463,6 @@ mod tests { block_data_limit: block_data_limit.into(), tier: 0, head_block: Some(tx_lagged), - provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), peak_latency: Some(new_peak_latency()), ..Default::default() }; @@ -1707,7 +1703,6 @@ mod tests { block_data_limit: 64.into(), tier: 1, head_block: Some(tx_pruned), - provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), ..Default::default() }; @@ -1721,7 +1716,6 @@ mod tests { block_data_limit: u64::MAX.into(), tier: 2, head_block: Some(tx_archive), - provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), ..Default::default() }; @@ -1876,7 +1870,6 @@ mod tests { block_data_limit: 64.into(), tier: 0, head_block: Some(tx_mock_geth), - provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), peak_latency: Some(new_peak_latency()), ..Default::default() }; @@ -1889,7 +1882,6 @@ mod tests { block_data_limit: u64::MAX.into(), tier: 1, head_block: Some(tx_mock_erigon_archive), - provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), peak_latency: Some(new_peak_latency()), ..Default::default() }; diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index c3d8b34f..dd84fa4b 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -1,6 +1,6 @@ ///! Rate-limited communication with a web3 provider. use super::blockchain::{ArcBlock, BlocksByHashCache, Web3ProxyBlock}; -use super::provider::Web3Provider; +use super::provider::{connect_http, connect_ws, EthersHttpProvider, EthersWsProvider}; use super::request::{OpenRequestHandle, OpenRequestResult}; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::{BlockAndRpc, Web3RpcConfig}; @@ -40,17 +40,10 @@ pub struct Web3Rpc { pub name: String, pub display_name: Option, pub db_conn: Option, - pub(super) ws_url: Option, - pub(super) http_url: Option, - /// Some connections use an http_client. we keep a clone for reconnecting - pub(super) http_client: Option, - /// provider is in a RwLock so that we can replace it if re-connecting - /// it is an async lock because we hold it open across awaits - /// this provider is only used for new heads subscriptions - /// TODO: benchmark ArcSwapOption and a watch::Sender - /// TODO: only the websocket provider needs to be behind an asyncrwlock! - /// TODO: the http provider is just an http_client - pub(super) provider: AsyncRwLock>>, + /// most all requests prefer use the http_provider + pub(super) http_provider: Option, + /// the websocket provider is only used for subscriptions + pub(super) ws_provider: Option, /// keep track of hard limits /// this is only inside an Option so that the "Default" derive works. it will always be set. pub(super) hard_limit_until: Option>, @@ -79,7 +72,6 @@ pub struct Web3Rpc { /// TODO: maybe move this to graphana pub(super) total_requests: AtomicUsize, pub(super) active_requests: AtomicUsize, - pub(super) reconnect: AtomicBool, /// this is only inside an Option so that the "Default" derive works. it will always be set. pub(super) disconnect_watch: Option>, pub(super) created_at: Option, @@ -102,7 +94,6 @@ impl Web3Rpc { block_map: BlocksByHashCache, block_sender: Option>, tx_id_sender: Option)>>, - reconnect: bool, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let created_at = Instant::now(); @@ -161,7 +152,6 @@ impl Web3Rpc { } let (disconnect_sender, disconnect_receiver) = watch::channel(false); - let reconnect = reconnect.into(); let (head_block, _) = watch::channel(None); @@ -169,6 +159,7 @@ impl Web3Rpc { // TODO Should these defaults be in config let peak_latency = PeakEwmaLatency::spawn( // Decay over 15s + // TODO! This is wrong! needs to be as_nanos! Duration::from_secs(15).as_millis() as f64, // Peak requests so far around 5k, we will use an order of magnitude // more to be safe. Should only use about 50mb RAM @@ -177,41 +168,46 @@ impl Web3Rpc { Duration::from_secs(1), ); - let http_url = if let Some(http_url) = config.http_url { - Some(http_url.parse()?) + let http_provider = if let Some(http_url) = config.http_url { + let http_url = http_url.parse::()?; + + Some(connect_http(Cow::Owned(http_url), http_client)?) + + // TODO: check the provider is on the right chain } else { None }; - let ws_url = if let Some(ws_url) = config.ws_url { - Some(ws_url.parse()?) + let ws_provider = if let Some(ws_url) = config.ws_url { + let ws_url = ws_url.parse::()?; + + Some(connect_ws(Cow::Owned(ws_url), usize::MAX).await?) + + // TODO: check the provider is on the right chain } else { None }; - let new_connection = Self { - name, - db_conn: db_conn.clone(), - display_name: config.display_name, - http_client, - ws_url, - http_url, - hard_limit, - hard_limit_until: Some(hard_limit_until), - soft_limit: config.soft_limit, + let new_rpc = Self { automatic_block_limit, backup, block_data_limit, - reconnect, - tier: config.tier, - disconnect_watch: Some(disconnect_sender), created_at: Some(created_at), + db_conn: db_conn.clone(), + disconnect_watch: Some(disconnect_sender), + display_name: config.display_name, + hard_limit, + hard_limit_until: Some(hard_limit_until), head_block: Some(head_block), + http_provider, + name, peak_latency: Some(peak_latency), + soft_limit: config.soft_limit, + tier: config.tier, ..Default::default() }; - let new_connection = Arc::new(new_connection); + let new_connection = Arc::new(new_rpc); // subscribe to new blocks and new transactions // subscribing starts the connection (with retries) @@ -256,7 +252,6 @@ impl Web3Rpc { async fn check_block_data_limit( self: &Arc, authorization: &Arc, - unlocked_provider: Option>, ) -> anyhow::Result> { if !self.automatic_block_limit { // TODO: is this a good thing to return? @@ -270,16 +265,13 @@ impl Web3Rpc { // TODO: binary search between 90k and max? // TODO: start at 0 or 1? for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] { - let handle = self - .wait_for_request_handle(authorization, None, unlocked_provider.clone()) - .await?; + let handle = self.wait_for_request_handle(authorization, None).await?; let head_block_num_future = handle.request::, U256>( "eth_blockNumber", &None, // error here are expected, so keep the level low Level::Debug.into(), - unlocked_provider.clone(), ); let head_block_num = timeout(Duration::from_secs(5), head_block_num_future) @@ -297,9 +289,7 @@ impl Web3Rpc { // TODO: wait for the handle BEFORE we check the current block number. it might be delayed too! // TODO: what should the request be? - let handle = self - .wait_for_request_handle(authorization, None, unlocked_provider.clone()) - .await?; + let handle = self.wait_for_request_handle(authorization, None).await?; let archive_result: Result = handle .request( @@ -310,7 +300,6 @@ impl Web3Rpc { )), // error here are expected, so keep the level low Level::Trace.into(), - unlocked_provider.clone(), ) .await; @@ -388,204 +377,50 @@ impl Web3Rpc { true } - /// reconnect to the provider. errors are retried forever with exponential backoff with jitter. - /// We use the "Decorrelated" jitter from - /// TODO: maybe it would be better to use "Full Jitter". The "Full Jitter" approach uses less work, but slightly more time. - pub async fn retrying_connect( + /// query the web3 provider to confirm it is on the expected chain with the expected data available + async fn check_provider( self: &Arc, block_sender: Option<&flume::Sender>, chain_id: u64, db_conn: Option<&DatabaseConnection>, - delay_start: bool, ) -> anyhow::Result<()> { - // there are several crates that have retry helpers, but they all seem more complex than necessary - // TODO: move this backoff logic into a helper function so we can use it when doing database locking - let base_ms = 500; - let cap_ms = 30_000; - let range_multiplier = 3; + let authorization = Arc::new(Authorization::internal(db_conn.cloned())?); - // sleep once before the initial retry attempt - // TODO: now that we use this method for our initial connection, do we still want this sleep? - let mut sleep_ms = if delay_start { - let first_sleep_ms = min( - cap_ms, - thread_fast_rng().gen_range(base_ms..(base_ms * range_multiplier)), - ); - let reconnect_in = Duration::from_millis(first_sleep_ms); + // check the server's chain_id here + // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error + // TODO: what should the timeout be? should there be a request timeout? + // trace!("waiting on chain id for {}", self); + let found_chain_id: Result = self + .wait_for_request_handle(&authorization, None) + .await + .context(format!("waiting for request handle on {}", self))? + .request("eth_chainId", &json!(Vec::<()>::new()), Level::Trace.into()) + .await; + trace!("found_chain_id: {:#?}", found_chain_id); - info!("Reconnect to {} in {}ms", self, reconnect_in.as_millis()); - - sleep(reconnect_in).await; - - first_sleep_ms - } else { - base_ms - }; - - // retry until we succeed - while let Err(err) = self.connect(block_sender, chain_id, db_conn).await { - // thread_rng is crytographically secure. we don't need that here. use thread_fast_rng instead - // TODO: min of 1 second? sleep longer if rate limited? - sleep_ms = min( - cap_ms, - thread_fast_rng().gen_range(base_ms..(sleep_ms * range_multiplier)), - ); - - let retry_in = Duration::from_millis(sleep_ms); - - let error_level = if self.backup { - log::Level::Debug - } else { - log::Level::Info - }; - - log::log!( - error_level, - "Failed (re)connect to {}! Retry in {}ms. err={:?}", - self, - retry_in.as_millis(), - err, - ); - - sleep(retry_in).await; + match found_chain_id { + Ok(found_chain_id) => { + // TODO: there has to be a cleaner way to do this + if chain_id != found_chain_id.as_u64() { + return Err(anyhow::anyhow!( + "incorrect chain id! Config has {}, but RPC has {}", + chain_id, + found_chain_id + ) + .context(format!("failed @ {}", self))); + } + } + Err(e) => { + return Err(anyhow::Error::from(e) + .context(format!("unable to parse eth_chainId from {}", self))); + } } - Ok(()) - } + self.check_block_data_limit(&authorization) + .await + .context(format!("unable to check_block_data_limit of {}", self))?; - /// connect to the web3 provider - async fn connect( - self: &Arc, - block_sender: Option<&flume::Sender>, - chain_id: u64, - db_conn: Option<&DatabaseConnection>, - ) -> anyhow::Result<()> { - if let Ok(mut unlocked_provider) = self.provider.try_write() { - #[cfg(test)] - if let Some(Web3Provider::Mock) = unlocked_provider.as_deref() { - return Ok(()); - } - - *unlocked_provider = if let Some(ws_url) = self.ws_url.as_ref() { - // set up ws client - match &*unlocked_provider { - None => { - info!("connecting to {}", self); - } - Some(_) => { - debug!("reconnecting to {}", self); - - // tell the block subscriber that this rpc doesn't have any blocks - if let Some(block_sender) = block_sender { - block_sender - .send_async((None, self.clone())) - .await - .context("block_sender during connect")?; - } - - // reset sync status - self.head_block - .as_ref() - .expect("head_block should always be set") - .send_replace(None); - - // disconnect the current provider - // TODO: what until the block_sender's receiver finishes updating this item? - *unlocked_provider = None; - } - } - - let p = Web3Provider::new(Cow::Borrowed(ws_url), None) - .await - .context(format!("failed connecting to {}", ws_url))?; - - assert!(p.ws().is_some()); - - Some(Arc::new(p)) - } else { - // http client - if let Some(url) = &self.http_url { - let p = Web3Provider::new(Cow::Borrowed(url), self.http_client.clone()) - .await - .context(format!("failed connecting to {}", url))?; - - assert!(p.http().is_some()); - - Some(Arc::new(p)) - } else { - None - } - }; - - let authorization = Arc::new(Authorization::internal(db_conn.cloned())?); - - // check the server's chain_id here - // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error - // TODO: what should the timeout be? should there be a request timeout? - // trace!("waiting on chain id for {}", self); - let found_chain_id: Result = self - .wait_for_request_handle(&authorization, None, unlocked_provider.clone()) - .await - .context(format!("waiting for request handle on {}", self))? - .request( - "eth_chainId", - &json!(Vec::<()>::new()), - Level::Trace.into(), - unlocked_provider.clone(), - ) - .await; - trace!("found_chain_id: {:#?}", found_chain_id); - - match found_chain_id { - Ok(found_chain_id) => { - // TODO: there has to be a cleaner way to do this - if chain_id != found_chain_id.as_u64() { - return Err(anyhow::anyhow!( - "incorrect chain id! Config has {}, but RPC has {}", - chain_id, - found_chain_id - ) - .context(format!("failed @ {}", self))); - } - } - Err(e) => { - return Err(anyhow::Error::from(e) - .context(format!("unable to parse eth_chainId from {}", self))); - } - } - - self.check_block_data_limit(&authorization, unlocked_provider.clone()) - .await - .context(format!("unable to check_block_data_limit of {}", self))?; - - drop(unlocked_provider); - - info!("successfully connected to {}", self); - } else if self.provider.read().await.is_none() { - return Err(anyhow!("failed waiting for client {}", self)); - }; - - Ok(()) - } - - pub async fn disconnect(&self) -> anyhow::Result<()> { - let age = self.created_at.unwrap().elapsed().as_secs(); - - info!("disconnecting {} ({}s old)", self, age); - - self.reconnect.store(false, atomic::Ordering::Release); - - if let Err(err) = self.disconnect_watch.as_ref().unwrap().send(true) { - warn!("failed sending disconnect watch: {:?}", err); - }; - - trace!("disconnecting (locking) {} ({}s old)", self, age); - - let mut provider = self.provider.write().await; - - trace!("disconnecting (clearing provider) {} ({}s old)", self, age); - - *provider = None; + info!("successfully connected to {}", self); Ok(()) } @@ -638,7 +473,7 @@ impl Web3Rpc { if self.block_data_limit() == U64::zero() { let authorization = Arc::new(Authorization::internal(self.db_conn.clone())?); - if let Err(err) = self.check_block_data_limit(&authorization, None).await { + if let Err(err) = self.check_block_data_limit(&authorization).await { warn!( "failed checking block limit after {} finished syncing. {:?}", self, err @@ -670,7 +505,7 @@ impl Web3Rpc { *self.disconnect_watch.as_ref().unwrap().borrow() } - /// subscribe to blocks and transactions with automatic reconnects + /// subscribe to blocks and transactions /// This should only exit when the program is exiting. /// TODO: should more of these args be on self? #[allow(clippy::too_many_arguments)] @@ -690,15 +525,12 @@ impl Web3Rpc { RequestErrorHandler::ErrorLevel }; - let mut delay_start = false; + todo!(); - // this does loop. just only when reconnect is enabled - #[allow(clippy::never_loop)] - loop { - trace!("subscription loop started on {}", self); - - let mut futures = vec![]; + /* + let mut futures = vec![]; + while false { let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe()); { @@ -741,7 +573,7 @@ impl Web3Rpc { // TODO: what if we just happened to have this check line up with another restart? // TODO: think more about this - if let Some(client) = rpc.provider.read().await.clone() { + if let Some(client) = rpc.ws_provider.read().await.clone() { // health check as a way of keeping this rpc's request_ewma accurate // TODO: do something different if this is a backup server? @@ -785,6 +617,7 @@ impl Web3Rpc { let code = match to { Err(err) => { + // TODO: an "error" here just means that the hash wasn't available. i dont think its truly an "error" if rpc.backup { debug!( "{} failed health check query! {:#?}", @@ -893,6 +726,8 @@ impl Web3Rpc { } } + + */ info!("all subscriptions on {} completed", self); Ok(()) @@ -908,8 +743,15 @@ impl Web3Rpc { ) -> anyhow::Result<()> { trace!("watching new heads on {}", self); - let provider = self.wait_for_provider().await; + if let Some(ws_provider) = self.ws_provider.as_ref() { + todo!("subscribe") + } else if let Some(http_provider) = self.http_provider.as_ref() { + todo!("poll") + } else { + unimplemented!("no ws or http provider!") + } + /* match provider.as_ref() { Web3Provider::Http(_client) => { // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints @@ -922,17 +764,13 @@ impl Web3Rpc { while !self.should_disconnect() { // TODO: what should the max_wait be? // we do not pass unlocked_provider because we want to get a new one each call. otherwise we might re-use an old one - match self - .wait_for_request_handle(&authorization, None, None) - .await - { + match self.wait_for_request_handle(&authorization, None).await { Ok(active_request_handle) => { let block: Result, _> = active_request_handle .request( "eth_getBlockByNumber", &json!(("latest", false)), Level::Warn.into(), - None, ) .await; @@ -1017,9 +855,8 @@ impl Web3Rpc { } Web3Provider::Both(_, client) | Web3Provider::Ws(client) => { // todo: move subscribe_blocks onto the request handle? - let active_request_handle = self - .wait_for_request_handle(&authorization, None, Some(provider.clone())) - .await; + let active_request_handle = + self.wait_for_request_handle(&authorization, None).await; let mut stream = client.subscribe_blocks().await?; drop(active_request_handle); @@ -1029,13 +866,12 @@ impl Web3Rpc { // TODO: how does this get wrapped in an arc? does ethers handle that? // TODO: do this part over http? let block: Result, _> = self - .wait_for_request_handle(&authorization, None, Some(provider.clone())) + .wait_for_request_handle(&authorization, None) .await? .request( "eth_getBlockByNumber", &json!(("latest", false)), Level::Warn.into(), - Some(provider.clone()), ) .await; @@ -1082,6 +918,7 @@ impl Web3Rpc { #[cfg(test)] Web3Provider::Mock => unimplemented!(), } + */ // clear the head block. this might not be needed, but it won't hurt self.send_head_block_result(Ok(None), &block_sender, block_map) @@ -1100,10 +937,10 @@ impl Web3Rpc { authorization: Arc, tx_id_sender: flume::Sender<(TxHash, Arc)>, ) -> anyhow::Result<()> { - // TODO: give this a separate client. don't use new_head_client for everything. especially a firehose this big - // TODO: timeout - let provider = self.wait_for_provider().await; + // TODO: make this subscription optional + self.wait_for_disconnect().await?; + /* trace!("watching pending transactions on {}", self); // TODO: does this keep the lock open for too long? match provider.as_ref() { @@ -1144,6 +981,7 @@ impl Web3Rpc { self.wait_for_disconnect().await?; } } + */ if self.should_disconnect() { Ok(()) @@ -1155,21 +993,15 @@ impl Web3Rpc { } /// be careful with this; it might wait forever! - /// `allow_not_ready` is only for use by health checks while starting the provider - /// TODO: don't use anyhow. use specific error type pub async fn wait_for_request_handle<'a>( self: &'a Arc, authorization: &'a Arc, max_wait: Option, - unlocked_provider: Option>, ) -> Web3ProxyResult { let max_wait = max_wait.map(|x| Instant::now() + x); loop { - match self - .try_request_handle(authorization, unlocked_provider.clone()) - .await - { + match self.try_request_handle(authorization).await { Ok(OpenRequestResult::Handle(handle)) => return Ok(handle), Ok(OpenRequestResult::RetryAt(retry_at)) => { // TODO: emit a stat? @@ -1214,18 +1046,8 @@ impl Web3Rpc { pub async fn try_request_handle( self: &Arc, authorization: &Arc, - // TODO: borrow on this instead of needing to clone the Arc? - unlocked_provider: Option>, ) -> Web3ProxyResult { - // TODO: think more about this read block - // TODO: this should *not* be new_head_client. this should be a separate object - if unlocked_provider.is_some() || self.provider.read().await.is_some() { - // we already have an unlocked provider. no need to lock - } else { - warn!("no provider on {}", self); - // TODO: wait for provider? that will probably slow us down more than we want - return Ok(OpenRequestResult::NotReady); - } + // TODO: if websocket is reconnecting, return an error? // check cached rate limits if let Some(hard_limit_until) = self.hard_limit_until.as_ref() { @@ -1291,59 +1113,34 @@ impl Web3Rpc { } } - async fn wait_for_provider(&self) -> Arc { - let mut provider = self.provider.read().await.clone(); - - let mut logged = false; - while provider.is_none() { - // trace!("waiting on unlocked_provider: locking..."); - sleep(Duration::from_millis(100)).await; - - if !logged { - debug!("waiting for provider on {}", self); - logged = true; - } - - provider = self.provider.read().await.clone(); - } - - provider.unwrap() - } - pub async fn wait_for_query( self: &Arc, method: &str, params: &P, revert_handler: RequestErrorHandler, authorization: Arc, - unlocked_provider: Option>, ) -> anyhow::Result where // TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it P: Clone + fmt::Debug + serde::Serialize + Send + Sync + 'static, R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug + Send, { - self.wait_for_request_handle(&authorization, None, None) + self.wait_for_request_handle(&authorization, None) .await? - .request::(method, params, revert_handler, unlocked_provider) + .request::(method, params, revert_handler) .await .context("ProviderError from the backend") } } -impl fmt::Debug for Web3Provider { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url - f.debug_struct("Web3Provider").finish_non_exhaustive() - } -} - impl Hash for Web3Rpc { fn hash(&self, state: &mut H) { self.name.hash(state); self.display_name.hash(state); - self.http_url.hash(state); - self.ws_url.hash(state); + self.http_provider.as_ref().map(|x| x.url()).hash(state); + // TODO: figure out how to get the url for the provider + // TODO: url does NOT include the authorization data. i think created_at should protect us if auth changes without anything else + // self.ws_provider.map(|x| x.url()).hash(state); self.automatic_block_limit.hash(state); self.backup.hash(state); // TODO: don't include soft_limit if we change them to be dynamic @@ -1481,7 +1278,6 @@ mod tests { let x = Web3Rpc { name: "name".to_string(), - ws_url: Some("ws://example.com".parse::().unwrap()), soft_limit: 1_000, automatic_block_limit: false, backup: false, diff --git a/web3_proxy/src/rpcs/provider.rs b/web3_proxy/src/rpcs/provider.rs index a70bc88f..1829b0a5 100644 --- a/web3_proxy/src/rpcs/provider.rs +++ b/web3_proxy/src/rpcs/provider.rs @@ -5,92 +5,81 @@ use std::{borrow::Cow, time::Duration}; use url::Url; // TODO: our own structs for these that handle streaming large responses -type EthersHttpProvider = ethers::providers::Provider; -type EthersWsProvider = ethers::providers::Provider; +pub type EthersHttpProvider = ethers::providers::Provider; +pub type EthersWsProvider = ethers::providers::Provider; -/// Use HTTP and WS providers. -// TODO: instead of an enum, I tried to use Box, but hit -// TODO: custom types that let us stream JSON responses -#[derive(From)] -pub enum Web3Provider { - Both(EthersHttpProvider, EthersWsProvider), - Http(EthersHttpProvider), - // TODO: deadpool? custom tokio-tungstenite - Ws(EthersWsProvider), - #[cfg(test)] - Mock, -} +pub fn extract_auth(url: &mut Cow<'_, Url>) -> Option { + if let Some(pass) = url.password().map(|x| x.to_string()) { + // to_string is needed because we are going to remove these items from the url + let user = url.username().to_string(); -impl Web3Provider { - pub fn http(&self) -> Option<&EthersHttpProvider> { - match self { - Self::Http(x) => Some(x), - _ => None, - } - } + // clear username and password from the url + let mut_url = url.to_mut(); - pub fn ws(&self) -> Option<&EthersWsProvider> { - match self { - Self::Both(_, x) | Self::Ws(x) => Some(x), - _ => None, - } - } + mut_url + .set_username("") + .expect("unable to clear username on websocket"); + mut_url + .set_password(None) + .expect("unable to clear password on websocket"); - /// Note, if the http url has an authority the http_client param is ignored and a dedicated http_client will be used - /// TODO: take a reqwest::Client or a reqwest::ClientBuilder. that way we can do things like set compression even when auth is set - pub async fn new( - mut url: Cow<'_, Url>, - http_client: Option, - ) -> anyhow::Result { - let auth = if let Some(pass) = url.password().map(|x| x.to_string()) { - // to_string is needed because we are going to remove these items from the url - let user = url.username().to_string(); - - // clear username and password from the url - let mut_url = url.to_mut(); - - mut_url - .set_username("") - .map_err(|_| anyhow!("unable to clear username on websocket"))?; - mut_url - .set_password(None) - .map_err(|_| anyhow!("unable to clear password on websocket"))?; - - // keep them - Some(Authorization::basic(user, pass)) - } else { - None - }; - - let provider = if url.scheme().starts_with("http") { - let provider = if let Some(auth) = auth { - ethers::providers::Http::new_with_auth(url.into_owned(), auth)? - } else if let Some(http_client) = http_client { - ethers::providers::Http::new_with_client(url.into_owned(), http_client) - } else { - ethers::providers::Http::new(url.into_owned()) - }; - - // TODO: i don't think this interval matters for our uses, but we should probably set it to like `block time / 2` - ethers::providers::Provider::new(provider) - .interval(Duration::from_secs(12)) - .into() - } else if url.scheme().starts_with("ws") { - let provider = if auth.is_some() { - let connection_details = ConnectionDetails::new(url.as_str(), auth); - - ethers::providers::Ws::connect(connection_details).await? - } else { - ethers::providers::Ws::connect(url.as_str()).await? - }; - - // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) - // TODO: i don't think this interval matters - ethers::providers::Provider::new(provider).into() - } else { - return Err(anyhow::anyhow!("only http and ws servers are supported")); - }; - - Ok(provider) + // keep them + Some(Authorization::basic(user, pass)) + } else { + None } } + +/// Note, if the http url has an authority the http_client param is ignored and a dedicated http_client will be used +/// TODO: take a reqwest::Client or a reqwest::ClientBuilder. that way we can do things like set compression even when auth is set +pub fn connect_http( + mut url: Cow<'_, Url>, + http_client: Option, +) -> anyhow::Result { + let auth = extract_auth(&mut url); + + let provider = if url.scheme().starts_with("http") { + let provider = if let Some(auth) = auth { + ethers::providers::Http::new_with_auth(url.into_owned(), auth)? + } else if let Some(http_client) = http_client { + ethers::providers::Http::new_with_client(url.into_owned(), http_client) + } else { + ethers::providers::Http::new(url.into_owned()) + }; + + // TODO: i don't think this interval matters for our uses, but we should probably set it to like `block time / 2` + ethers::providers::Provider::new(provider) + .interval(Duration::from_secs(12)) + .into() + } else { + return Err(anyhow::anyhow!("only http servers are supported")); + }; + + Ok(provider) +} + +pub async fn connect_ws( + mut url: Cow<'_, Url>, + reconnects: usize, +) -> anyhow::Result { + let auth = extract_auth(&mut url); + + let provider = if url.scheme().starts_with("ws") { + let provider = if auth.is_some() { + let connection_details = ConnectionDetails::new(url.as_str(), auth); + + // if they error, we do our own reconnection with backoff + ethers::providers::Ws::connect_with_reconnects(connection_details, reconnects).await? + } else { + ethers::providers::Ws::connect_with_reconnects(url.as_str(), reconnects).await? + }; + + // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) + // TODO: i don't think this interval matters + ethers::providers::Provider::new(provider).into() + } else { + return Err(anyhow::anyhow!("ws servers are supported")); + }; + + Ok(provider) +} diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index c50dd202..7a13335f 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -1,5 +1,4 @@ use super::one::Web3Rpc; -use super::provider::Web3Provider; use crate::frontend::authorization::Authorization; use anyhow::Context; use chrono::Utc; @@ -158,7 +157,6 @@ impl OpenRequestHandle { method: &str, params: &P, mut error_handler: RequestErrorHandler, - unlocked_provider: Option>, ) -> Result where // TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it @@ -170,29 +168,6 @@ impl OpenRequestHandle { // trace!(rpc=%self.rpc, %method, "request"); trace!("requesting from {}", self.rpc); - let mut provider = if unlocked_provider.is_some() { - unlocked_provider - } else { - self.rpc.provider.read().await.clone() - }; - - let mut logged = false; - // TODO: instead of a lock, i guess it should be a watch? - while provider.is_none() { - // trace!("waiting on provider: locking..."); - // TODO: i dont like this. subscribing to a channel could be better - sleep(Duration::from_millis(100)).await; - - if !logged { - debug!("no provider for open handle on {}", self.rpc); - logged = true; - } - - provider = self.rpc.provider.read().await.clone(); - } - - let provider = provider.expect("provider was checked already"); - self.rpc .total_requests .fetch_add(1, std::sync::atomic::Ordering::AcqRel); @@ -202,21 +177,16 @@ impl OpenRequestHandle { let start = Instant::now(); // TODO: replace ethers-rs providers with our own that supports streaming the responses - let response = match provider.as_ref() { - #[cfg(test)] - Web3Provider::Mock => { - return Err(ProviderError::CustomError( - "mock provider can't respond".to_string(), - )) - } - Web3Provider::Ws(p) => p.request(method, params).await, - Web3Provider::Http(p) | Web3Provider::Both(p, _) => { - // TODO: i keep hearing that http is faster. but ws has always been better for me. investigate more with actual benchmarks - p.request(method, params).await - } + // TODO: replace ethers-rs providers with our own that handles "id" being null + let response: Result = if let Some(ref p) = self.rpc.http_provider { + p.request(method, params).await + } else if let Some(ref p) = self.rpc.ws_provider { + p.request(method, params).await + } else { + unimplemented!("no provider. cannot send request") }; - // note. we intentionally do not record this latency now. we do NOT want to measure errors + // we do NOT want to measure errors, so we intentionally do not record this latency now. let latency = start.elapsed(); // we used to fetch_sub the active_request count here, but sometimes the handle is dropped without request being called! @@ -277,11 +247,7 @@ impl OpenRequestHandle { // TODO: move this info a function on ResponseErrorType let response_type = if let ProviderError::JsonRpcClientError(err) = err { // Http and Ws errors are very similar, but different types - let msg = match &*provider { - #[cfg(test)] - Web3Provider::Mock => unimplemented!(), - _ => err.as_error_response().map(|x| x.message.clone()), - }; + let msg = err.as_error_response().map(|x| x.message.clone()); trace!("error message: {:?}", msg); diff --git a/web3_proxy/src/rpcs/transactions.rs b/web3_proxy/src/rpcs/transactions.rs index d8c007ee..c46da986 100644 --- a/web3_proxy/src/rpcs/transactions.rs +++ b/web3_proxy/src/rpcs/transactions.rs @@ -29,14 +29,13 @@ impl Web3Rpcs { // TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself // TODO: if one rpc fails, try another? // TODO: try_request_handle, or wait_for_request_handle? I think we want wait here - let tx: Transaction = match rpc.try_request_handle(authorization, None).await { + let tx: Transaction = match rpc.try_request_handle(authorization).await { Ok(OpenRequestResult::Handle(handle)) => { handle .request( "eth_getTransactionByHash", &(pending_tx_id,), Level::Error.into(), - None, ) .await? }