From 14272e83053a681231c8e081d7e84124e5d9b347 Mon Sep 17 00:00:00 2001 From: poma Date: Thu, 1 Oct 2020 02:23:31 +0300 Subject: [PATCH] robust wait loop for tx manager --- package.json | 3 +- src/TxManager.js | 283 +++++++++++++++++++++++++++++++++-------------- testTxManager.js | 37 +++---- yarn.lock | 18 +++ 4 files changed, 231 insertions(+), 110 deletions(-) diff --git a/package.json b/package.json index 8ca3847..4630b75 100644 --- a/package.json +++ b/package.json @@ -25,7 +25,8 @@ "uuid": "^8.3.0", "web3": "^1.3.0", "web3-core-promievent": "^1.3.0", - "web3-utils": "^1.2.2" + "web3-utils": "^1.2.2", + "why-is-node-running": "^2.2.0" }, "devDependencies": { "chai": "^4.2.0", diff --git a/src/TxManager.js b/src/TxManager.js index 72d5a7d..00b54f7 100644 --- a/src/TxManager.js +++ b/src/TxManager.js @@ -13,6 +13,11 @@ const nonceErrors = [ const gasPriceErrors = [ 'Returned error: Transaction gas price supplied is too low. There is another transaction with same nonce in the queue. Try increasing the gas price or incrementing the nonce.', 'Returned error: replacement transaction underpriced', + /Returned error: Transaction gas price \d+wei is too low. There is another transaction with same nonce in the queue with gas price: \d+wei. Try increasing the gas price or incrementing the nonce./, +] + +const sameTxErrors = [ + 'Returned error: Transaction with the same hash was already imported.', ] const defaultConfig = { @@ -36,7 +41,7 @@ class TxManager { this._web3.eth.defaultAccount = this.address this._gasPriceOracle = new GasPriceOracle({ defaultRpc: rpcUrl }) this._mutex = new Mutex() - this.nonce + this._nonce = null } /** @@ -44,17 +49,8 @@ class TxManager { * * @param tx Transaction to send */ - async createTx(tx) { - try { - await this._mutex.acquire() - if (!this.nonce) { - this.nonce = await this._web3.eth.getTransactionCount(this.address, 'latest') - } - return new Transaction(tx, this) - } catch (e) { - console.log('e', e) - this._mutex.release() - } + createTx(tx) { + return new Transaction(tx, this) } } @@ -63,124 +59,209 @@ class Transaction { Object.assign(this, manager) this.manager = manager this.tx = tx - this.promiReceipt = PromiEvent() - this.emitter = this.promiReceipt.eventEmitter + this._promise = PromiEvent() + this._emitter = this._promise.eventEmitter + this.executed = false this.retries = 0 + this.currentTxHash = null // store all submitted hashes to catch cases when an old tx is mined - // todo: what to do if old tx with the same nonce was submitted - // by other client and we don't have its hash? this.hashes = [] } /** - * Submits transaction to Ethereum network. Resolves when tx gets enough confirmations. + * Submits the transaction to Ethereum network. Resolves when tx gets enough confirmations. * Emits progress events. */ send() { - this._prepare() - .then(() => { - this._send() - .then((result) => this.promiReceipt.resolve(result)) - .catch((e) => this.promiReceipt.reject(e)) - }) - .catch((e) => this.promiReceipt.reject(e)) - .finally(this.manager._mutex.release()) - - return this.emitter + if (this.executed) { + throw new Error('The transaction was already executed') + } + this.executed = true + this._execute() + .then(this._promise.resolve) + .catch(this._promise.reject) + return this._emitter } /** - * Replaces pending tx. + * Replaces a pending tx. * * @param tx Transaction to send */ - replace(tx) { - // todo check if it's not mined yet - console.log('Replacing...') + async replace(tx) { + // todo throw error if the current transaction is mined already + console.log('Replacing current transaction') + if (!this.executed) { + // Tx was not executed yet, just replace it + this.tx = tx + return + } + if (!tx.gas) { + tx.gas = await this._web3.eth.estimateGas(tx) + } + tx.nonce = this.tx.nonce // can be different from `this.manager._nonce` + tx.gasPrice = Math.max(this.tx.gasPrice, tx.gasPrice || 0) // start no less than current tx gas price + this.tx = tx - return this.send() + this._increaseGasPrice() + await this._send() } /** - * Cancels pending tx. + * Cancels a pending tx. */ cancel() { - // todo check if it's not mined yet - console.log('Canceling...') - this.tx = { + console.log('Canceling the transaction') + return this.replace({ + from: this.address, to: this.address, - gasPrice: this.tx.gasPrice, - } - this._increaseGasPrice() - return this.send() + value: 0, + gas: 21000, + }) } + /** + * Executes the transaction. Acquires global mutex for transaction duration + * + * @returns {Promise} + * @private + */ + async _execute() { + const release = await this.manager._mutex.acquire() + try { + await this._prepare() + await this._send() + const receipt = this._waitForConfirmations() + // we could have bumped nonce during execution, so get the latest one + 1 + this.manager._nonce = this.tx.nonce + 1 + return receipt + } finally { + release() + } + } + + /** + * Prepare first transaction before submitting it. Inits `gas`, `gasPrice`, `nonce` + * + * @returns {Promise} + * @private + */ async _prepare() { - this.tx.gas = await this._web3.eth.estimateGas(this.tx) + const gas = await this._web3.eth.estimateGas(this.tx) + if (!this.tx.gas) { + this.tx.gas = gas + } if (!this.tx.gasPrice) { this.tx.gasPrice = await this._getGasPrice('fast') } - this.tx.nonce = this.manager.nonce + if (!this.manager._nonce) { + this.manager._nonce = await this._web3.eth.getTransactionCount(this.address, 'latest') + } + this.tx.nonce = this.manager._nonce } + /** + * Send the current transaction + * + * @returns {Promise} + * @private + */ async _send() { + // todo throw is we attempt to send a tx that attempts to replace already mined tx const signedTx = await this._web3.eth.accounts.signTransaction(this.tx, this._privateKey) - this.tx.date = Date.now() + this.submitTimestamp = Date.now() this.tx.hash = signedTx.transactionHash - this.hashes.push(this.tx.hash) - this.emitter.emit('transactionHash', signedTx.transactionHash) + this.hashes.push(signedTx.transactionHash) try { await this._broadcast(signedTx.rawTransaction) - console.log('Broadcasted. Start waiting for mining...') - // The most reliable way to see if one of our tx was mined is to track current nonce + } catch (e) { + return this._handleSendError(e) + } - while (this.tx.nonce >= (await this._getLastNonce())) { - if (Date.now() - this.tx.date >= this.config.GAS_BUMP_INTERVAL) { - if (this._increaseGasPrice()) { - console.log('Resubmit with higher gas price') - return this._send() - } + this._emitter.emit('transactionHash', signedTx.transactionHash) + console.log(`Broadcasted transaction ${signedTx.transactionHash}`) + console.log(this.tx) + } + + /** + * A loop that waits until the current transaction is mined and gets enough confirmations + * + * @returns {Promise} The transaction receipt + * @private + */ + async _waitForConfirmations() { + // eslint-disable-next-line no-constant-condition + while (true) { + // We are already waiting on certain tx hash + if (this.currentTxHash) { + const receipt = await this._web3.eth.getTransactionReceipt(this.currentTxHash) + + if (!receipt) { + // We were waiting for some tx but it disappeared + // Erase the hash and start over + this.currentTxHash = null + continue } + const currentBlock = await this._web3.eth.getBlockNumber() + const confirmations = Math.max(0, currentBlock - receipt.blockNumber) + // todo don't emit repeating confirmation count + this._emitter.emit('confirmations', confirmations) + if (confirmations >= this.config.CONFIRMATIONS) { + // Tx is mined and has enough confirmations + return receipt + } + + // Tx is mined but doesn't have enough confirmations yet, keep waiting await sleep(this.config.POLL_INTERVAL) + continue + } + + // Tx is still pending + if (await this._getLastNonce() <= this.tx.nonce) { + // todo optionally run estimateGas on each iteration and cancel the transaction if it fails + + // We were waiting too long, increase gas price and resubmit + if (Date.now() - this.submitTimestamp >= this.config.GAS_BUMP_INTERVAL) { + if (this._increaseGasPrice()) { + console.log('Resubmitting with higher gas price') + await this._send() + continue + } + } + // Tx is still pending, keep waiting + await sleep(this.config.POLL_INTERVAL) + continue } let receipt = await this._getReceipts() - let retryAttempt = 5 - while (retryAttempt >= 0 && !receipt) { - await sleep(1000) - receipt = await this._getReceipts() - retryAttempt-- - } + // There is a mined tx with current nonce, but it's not one of ours + // Probably other tx submitted by other process/client if (!receipt) { - // resubmit + console.log('Can\'t find our transaction receipt, retrying a few times') + // Give node a few more attempts to respond with our receipt + let retries = 5 + while (!receipt && retries--) { + await sleep(1000) + receipt = await this._getReceipts() + } + + // Receipt was not found after a few retries + // Resubmit our tx + if (!receipt) { + console.log('There is a mined tx with our nonce but unknown tx hash, resubmitting with tx with increased nonce') + this.tx.nonce++ + // todo drop gas price to original value? + await this._send() + continue + } } console.log('Mined. Start waiting for confirmations...') - this.emitter.emit('mined', receipt) - - let currentBlock = await this._web3.eth.getBlockNumber() - let confirmations = currentBlock > receipt.blockNumber ? currentBlock - receipt.blockNumber : 0 - while (confirmations <= this.config.CONFIRMATIONS) { - this.emitter.emit('confirmations', confirmations) - - await sleep(this.config.POLL_INTERVAL) - receipt = await this._getReceipts() - if (!receipt) { - // resubmit - } - currentBlock = await this._web3.eth.getBlockNumber() - confirmations = currentBlock - receipt.blockNumber - } - - // we could have bumped nonce during execution, so get the latest one + 1 - this.manager.nonce = this.tx.nonce + 1 - return receipt - } catch (e) { - console.log('_send', e) - await this._handleSendError() + this._emitter.emit('mined', receipt) + this.currentTxHash = receipt.transactionHash } } @@ -213,7 +294,7 @@ class Transaction { console.log('Got error', e) // nonce is too low, trying to increase and resubmit - if (nonceErrors.includes(e.message)) { + if (this._hasError(e.message, nonceErrors)) { console.log(`Nonce ${this.tx.nonce} is too low, increasing and retrying`) if (this.retries <= this.config.MAX_RETRIES) { this.tx.nonce++ @@ -223,11 +304,28 @@ class Transaction { } // there is already a pending tx with higher gas price, trying to bump and resubmit - if (gasPriceErrors.includes(e.message)) { - console.log(`Gas price ${this.tx.gasPrice} gwei is too low, increasing and retrying`) + if (this._hasError(e.message, gasPriceErrors)) { + console.log(`Gas price ${fromWei(this.tx.gasPrice, 'gwei')} gwei is too low, increasing and retrying`) this._increaseGasPrice() return this._send() } + + if (this._hasError(e.message, sameTxErrors)) { + console.log('Same transaction is already in mempool, skipping submit') + // do nothing + } + } + + /** + * Returns whether error message is contained in errors array + * + * @param message The message to look up + * @param {Array} errors Array with errors. Errors can be either string or regexp. + * @returns {boolean} Returns true if error message is present in the `errors` array + * @private + */ + _hasError(message, errors) { + return errors.find(e => typeof e === 'string' ? e === message : message.match(e)) !== undefined } _increaseGasPrice() { @@ -243,10 +341,17 @@ class Transaction { return false } this.tx.gasPrice = toHex(BN.min(newGasPrice, maxGasPrice)) - console.log(`Increasing gas price to ${fromWei(this.tx.gasPrice, 'Gwei')}`) + console.log(`Increasing gas price to ${fromWei(this.tx.gasPrice, 'gwei')} gwei`) return true } + /** + * Fetches gas price from the oracle + * + * @param {'instant'|'fast'|'normal'|'slow'} type + * @returns {Promise} A hex string representing gas price in wei + * @private + */ async _getGasPrice(type) { const gasPrices = await this._gasPriceOracle.gasPrices() const result = gasPrices[type].toString() @@ -254,6 +359,12 @@ class Transaction { return toHex(toWei(gasPrices[type].toString(), 'gwei')) } + /** + * Gets current nonce for the current account, ignoring any pending transactions + * + * @returns {Promise} + * @private + */ _getLastNonce() { return this._web3.eth.getTransactionCount(this.address, 'latest') } diff --git a/testTxManager.js b/testTxManager.js index 8e6cc4b..40d9072 100644 --- a/testTxManager.js +++ b/testTxManager.js @@ -1,8 +1,9 @@ const { toHex, toWei } = require('web3-utils') const TxManager = require('./src/TxManager') const { rpcUrl, privateKey } = require('./config') +const logHandles = require('why-is-node-running') -const TxM = new TxManager({ +const manager = new TxManager({ privateKey, rpcUrl, config: { @@ -11,22 +12,25 @@ const TxM = new TxManager({ }, }) -const tx = { - value: 0, - gasPrice: toHex(toWei('0.1', 'gwei')), +const tx1 = { + value: 1, + gasPrice: toHex(toWei('1', 'gwei')), to: '0xA43Ce8Cc89Eff3AA5593c742fC56A30Ef2427CB0', } const tx2 = { - value: 1, - // gasPrice: toHex(toWei('0.1', 'gwei')), + value: 2, + // gasPrice: toHex(toWei('1', 'gwei')), to: '0x0039F22efB07A647557C7C5d17854CFD6D489eF3', } async function main() { - const Tx = await TxM.createTx(tx) + const tx = manager.createTx(tx1) - const receipt1 = await Tx.send() + setTimeout(() => tx.cancel(), 1000) + // setTimeout(() => tx.replace(tx2), 1000) + + const receipt = await tx.send() .on('transactionHash', (hash) => { console.log('hash', hash) }) @@ -37,21 +41,8 @@ async function main() { console.log('confirmations', confirmations) }) - // setTimeout(async () => await Tx.cancel(), 800) - - // const receipt2 = await Tx.replace(tx2) - // .on('transactionHash', (hash) => { - // console.log('hash', hash) - // }) - // .on('mined', (receipt) => { - // console.log('Mined in block', receipt.blockNumber) - // }) - // .on('confirmations', (confirmations) => { - // console.log('confirmations', confirmations) - // }) - - // console.log('receipt2', receipt2) - console.log('receipt1', await receipt1) + console.log('receipt', receipt) + // setTimeout(logHandles, 100) } main() diff --git a/yarn.lock b/yarn.lock index d91fa08..3df6c7e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3565,6 +3565,11 @@ shebang-regex@^1.0.0: resolved "https://registry.yarnpkg.com/shebang-regex/-/shebang-regex-1.0.0.tgz#da42f49740c0b42db2ca9728571cb190c98efea3" integrity sha1-2kL0l0DAtC2yypcoVxyxkMmO/qM= +siginfo@^2.0.0: + version "2.0.0" + resolved "https://registry.yarnpkg.com/siginfo/-/siginfo-2.0.0.tgz#32e76c70b79724e3bb567cb9d543eb858ccfaf30" + integrity sha512-ybx0WO1/8bSBLEWXZvEd7gMW3Sn3JFlW3TvX1nREbDLRNQNaeNN8WK0meBwPdAaOI7TtRRRJn/Es1zhrrCHu7g== + signal-exit@^3.0.0, signal-exit@^3.0.2: version "3.0.3" resolved "https://registry.yarnpkg.com/signal-exit/-/signal-exit-3.0.3.tgz#a1410c2edd8f077b08b4e253c8eacfcaf057461c" @@ -3624,6 +3629,11 @@ sshpk@^1.7.0: safer-buffer "^2.0.2" tweetnacl "~0.14.0" +stackback@0.0.2: + version "0.0.2" + resolved "https://registry.yarnpkg.com/stackback/-/stackback-0.0.2.tgz#1ac8a0d9483848d1695e418b6d031a3c3ce68e3b" + integrity sha1-Gsig2Ug4SNFpXkGLbQMaPDzmjjs= + standard-as-callback@^2.0.1: version "2.0.1" resolved "https://registry.yarnpkg.com/standard-as-callback/-/standard-as-callback-2.0.1.tgz#ed8bb25648e15831759b6023bdb87e6b60b38126" @@ -4348,6 +4358,14 @@ which@^1.2.9: dependencies: isexe "^2.0.0" +why-is-node-running@^2.2.0: + version "2.2.0" + resolved "https://registry.yarnpkg.com/why-is-node-running/-/why-is-node-running-2.2.0.tgz#fd0a73ea9303920fbb45457c6ecc392ebec90bd2" + integrity sha512-rxtN9D0lJaYyP92BR5yoyWecK2txBKmBIuS7GRbOPP5bXsT37/hBqcmTrlrt25DBr9p4WJb6c9LuYSJd89vHRQ== + dependencies: + siginfo "^2.0.0" + stackback "0.0.2" + wide-align@1.1.3: version "1.1.3" resolved "https://registry.yarnpkg.com/wide-align/-/wide-align-1.1.3.tgz#ae074e6bdc0c14a431e804e624549c633b000457"