funnel survive rate limiting

This commit is contained in:
Bryan Stitt 2022-06-16 22:23:41 +00:00
parent dd674bb900
commit 27e07b3fc9
3 changed files with 29 additions and 17 deletions

View File

@ -1,8 +1,10 @@
# Todo
- [ ] it works for a few seconds and then gets stuck on something
- [x] it works for a few seconds and then gets stuck on something.
- [x] its working with one backend node, but multiple breaks. something to do with pending transactions
- [x] dashmap entry api is easy to deadlock! be careful with it!
- [ ] rpc errors propagate too far. one subscription failing ends the app. isolate the providers more
- [ ] its working with one backend node, but multiple breaks. something to do with pending transactions
- [ ] ethers has a transactions_unsorted httprpc method that we should probably use. all rpcs probably don't support it, so make it okay for that to fail
- [ ] if web3 proxy gets an http error back, retry another node
- [x] refactor Connection::spawn. have it return a handle to the spawned future of it running with block and transaction subscriptions
- [x] refactor Connections::spawn. have it return a handle that is selecting on those handles?

View File

@ -162,6 +162,7 @@ impl Web3Connection {
// check the server's chain_id here
// TODO: move this outside the `new` function and into a `start` function or something
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
// TODO: this will wait forever. do we want that?
let found_chain_id: Result<String, _> = connection
.wait_for_request_handle()
.await
@ -385,7 +386,6 @@ impl Web3Connection {
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block
// TODO: subscribe to Block<TransactionReceipt> instead?
let block: Result<Block<TxHash>, _> = self
.wait_for_request_handle()
.await
@ -492,11 +492,12 @@ impl Web3Connection {
Ok(())
}
/// be careful with this; it will wait forever!
#[instrument(skip_all)]
pub async fn wait_for_request_handle(self: &Arc<Self>) -> ActiveRequestHandle {
// TODO: maximum wait time
// TODO: maximum wait time?
for _ in 0..10 {
loop {
match self.try_request_handle().await {
Ok(pending_request_handle) => return pending_request_handle,
Err(retry_after) => {
@ -504,9 +505,6 @@ impl Web3Connection {
}
}
}
// TODO: what should we do? panic isn't ever what we want
panic!("no request handle after 10 tries");
}
pub async fn try_request_handle(self: &Arc<Self>) -> Result<ActiveRequestHandle, Duration> {

View File

@ -161,15 +161,26 @@ impl Web3Connections {
&self,
rpc: Arc<Web3Connection>,
pending_tx_id: TxHash,
) -> Result<TxState, ProviderError> {
) -> Result<Option<TxState>, ProviderError> {
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
// TODO: there is a race here sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!)
// TODO: maximum wait time
let pending_transaction: Transaction = rpc
.wait_for_request_handle()
.await
.request("eth_getTransactionByHash", (pending_tx_id,))
.await?;
let pending_transaction: Transaction = match rpc.try_request_handle().await {
Ok(request_handle) => {
request_handle
.request("eth_getTransactionByHash", (pending_tx_id,))
.await?
}
Err(err) => {
trace!(
?pending_tx_id,
?rpc,
?err,
"cancelled funneling transaction"
);
return Ok(None);
}
};
trace!(?pending_transaction, "pending");
@ -177,9 +188,9 @@ impl Web3Connections {
match &pending_transaction.block_hash {
Some(_block_hash) => {
// the transaction is already confirmed. no need to save in the pending_transactions map
Ok(TxState::Confirmed(pending_transaction))
Ok(Some(TxState::Confirmed(pending_transaction)))
}
None => Ok(TxState::Pending(pending_transaction)),
None => Ok(Some(TxState::Pending(pending_transaction))),
}
}
@ -207,7 +218,7 @@ impl Web3Connections {
// query the rpc for this transaction
// it is possible that another rpc is also being queried. thats fine. we want the fastest response
match self._funnel_transaction(rpc.clone(), pending_tx_id).await {
Ok(tx_state) => {
Ok(Some(tx_state)) => {
let _ = pending_tx_sender.send(tx_state);
trace!(?pending_tx_id, "sent");
@ -215,6 +226,7 @@ impl Web3Connections {
// we sent the transaction. return now. don't break looping because that gives a warning
return Ok(());
}
Ok(None) => {}
Err(err) => {
trace!(?err, ?pending_tx_id, "failed fetching transaction");
// unable to update the entry. sleep and try again soon