From 27e07b3fc9aea5851085a4e5d4233b04ed9d21b6 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 16 Jun 2022 22:23:41 +0000 Subject: [PATCH] funnel survive rate limiting --- TODO.md | 6 ++++-- web3-proxy/src/connection.rs | 10 ++++------ web3-proxy/src/connections.rs | 30 +++++++++++++++++++++--------- 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/TODO.md b/TODO.md index 687869fd..d846ce19 100644 --- a/TODO.md +++ b/TODO.md @@ -1,8 +1,10 @@ # Todo -- [ ] it works for a few seconds and then gets stuck on something +- [x] it works for a few seconds and then gets stuck on something. + - [x] its working with one backend node, but multiple breaks. something to do with pending transactions + - [x] dashmap entry api is easy to deadlock! be careful with it! - [ ] rpc errors propagate too far. one subscription failing ends the app. isolate the providers more -- [ ] its working with one backend node, but multiple breaks. something to do with pending transactions +- [ ] ethers has a transactions_unsorted httprpc method that we should probably use. all rpcs probably don't support it, so make it okay for that to fail - [ ] if web3 proxy gets an http error back, retry another node - [x] refactor Connection::spawn. have it return a handle to the spawned future of it running with block and transaction subscriptions - [x] refactor Connections::spawn. have it return a handle that is selecting on those handles? diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index d5b6dc27..eecc9834 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -162,6 +162,7 @@ impl Web3Connection { // check the server's chain_id here // TODO: move this outside the `new` function and into a `start` function or something // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error + // TODO: this will wait forever. do we want that? let found_chain_id: Result = connection .wait_for_request_handle() .await @@ -385,7 +386,6 @@ impl Web3Connection { // query the block once since the subscription doesn't send the current block // there is a very small race condition here where the stream could send us a new block right now // all it does is print "new block" for the same block as current block - // TODO: subscribe to Block instead? let block: Result, _> = self .wait_for_request_handle() .await @@ -492,11 +492,12 @@ impl Web3Connection { Ok(()) } + /// be careful with this; it will wait forever! #[instrument(skip_all)] pub async fn wait_for_request_handle(self: &Arc) -> ActiveRequestHandle { - // TODO: maximum wait time + // TODO: maximum wait time? - for _ in 0..10 { + loop { match self.try_request_handle().await { Ok(pending_request_handle) => return pending_request_handle, Err(retry_after) => { @@ -504,9 +505,6 @@ impl Web3Connection { } } } - - // TODO: what should we do? panic isn't ever what we want - panic!("no request handle after 10 tries"); } pub async fn try_request_handle(self: &Arc) -> Result { diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 98134ec8..0749fea0 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -161,15 +161,26 @@ impl Web3Connections { &self, rpc: Arc, pending_tx_id: TxHash, - ) -> Result { + ) -> Result, ProviderError> { // TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself // TODO: there is a race here sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!) // TODO: maximum wait time - let pending_transaction: Transaction = rpc - .wait_for_request_handle() - .await - .request("eth_getTransactionByHash", (pending_tx_id,)) - .await?; + let pending_transaction: Transaction = match rpc.try_request_handle().await { + Ok(request_handle) => { + request_handle + .request("eth_getTransactionByHash", (pending_tx_id,)) + .await? + } + Err(err) => { + trace!( + ?pending_tx_id, + ?rpc, + ?err, + "cancelled funneling transaction" + ); + return Ok(None); + } + }; trace!(?pending_transaction, "pending"); @@ -177,9 +188,9 @@ impl Web3Connections { match &pending_transaction.block_hash { Some(_block_hash) => { // the transaction is already confirmed. no need to save in the pending_transactions map - Ok(TxState::Confirmed(pending_transaction)) + Ok(Some(TxState::Confirmed(pending_transaction))) } - None => Ok(TxState::Pending(pending_transaction)), + None => Ok(Some(TxState::Pending(pending_transaction))), } } @@ -207,7 +218,7 @@ impl Web3Connections { // query the rpc for this transaction // it is possible that another rpc is also being queried. thats fine. we want the fastest response match self._funnel_transaction(rpc.clone(), pending_tx_id).await { - Ok(tx_state) => { + Ok(Some(tx_state)) => { let _ = pending_tx_sender.send(tx_state); trace!(?pending_tx_id, "sent"); @@ -215,6 +226,7 @@ impl Web3Connections { // we sent the transaction. return now. don't break looping because that gives a warning return Ok(()); } + Ok(None) => {} Err(err) => { trace!(?err, ?pending_tx_id, "failed fetching transaction"); // unable to update the entry. sleep and try again soon