robust wait loop for tx manager
This commit is contained in:
parent
c16164876e
commit
14272e8305
@ -25,7 +25,8 @@
|
||||
"uuid": "^8.3.0",
|
||||
"web3": "^1.3.0",
|
||||
"web3-core-promievent": "^1.3.0",
|
||||
"web3-utils": "^1.2.2"
|
||||
"web3-utils": "^1.2.2",
|
||||
"why-is-node-running": "^2.2.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"chai": "^4.2.0",
|
||||
|
283
src/TxManager.js
283
src/TxManager.js
@ -13,6 +13,11 @@ const nonceErrors = [
|
||||
const gasPriceErrors = [
|
||||
'Returned error: Transaction gas price supplied is too low. There is another transaction with same nonce in the queue. Try increasing the gas price or incrementing the nonce.',
|
||||
'Returned error: replacement transaction underpriced',
|
||||
/Returned error: Transaction gas price \d+wei is too low. There is another transaction with same nonce in the queue with gas price: \d+wei. Try increasing the gas price or incrementing the nonce./,
|
||||
]
|
||||
|
||||
const sameTxErrors = [
|
||||
'Returned error: Transaction with the same hash was already imported.',
|
||||
]
|
||||
|
||||
const defaultConfig = {
|
||||
@ -36,7 +41,7 @@ class TxManager {
|
||||
this._web3.eth.defaultAccount = this.address
|
||||
this._gasPriceOracle = new GasPriceOracle({ defaultRpc: rpcUrl })
|
||||
this._mutex = new Mutex()
|
||||
this.nonce
|
||||
this._nonce = null
|
||||
}
|
||||
|
||||
/**
|
||||
@ -44,17 +49,8 @@ class TxManager {
|
||||
*
|
||||
* @param tx Transaction to send
|
||||
*/
|
||||
async createTx(tx) {
|
||||
try {
|
||||
await this._mutex.acquire()
|
||||
if (!this.nonce) {
|
||||
this.nonce = await this._web3.eth.getTransactionCount(this.address, 'latest')
|
||||
}
|
||||
return new Transaction(tx, this)
|
||||
} catch (e) {
|
||||
console.log('e', e)
|
||||
this._mutex.release()
|
||||
}
|
||||
createTx(tx) {
|
||||
return new Transaction(tx, this)
|
||||
}
|
||||
}
|
||||
|
||||
@ -63,124 +59,209 @@ class Transaction {
|
||||
Object.assign(this, manager)
|
||||
this.manager = manager
|
||||
this.tx = tx
|
||||
this.promiReceipt = PromiEvent()
|
||||
this.emitter = this.promiReceipt.eventEmitter
|
||||
this._promise = PromiEvent()
|
||||
this._emitter = this._promise.eventEmitter
|
||||
this.executed = false
|
||||
this.retries = 0
|
||||
this.currentTxHash = null
|
||||
// store all submitted hashes to catch cases when an old tx is mined
|
||||
// todo: what to do if old tx with the same nonce was submitted
|
||||
// by other client and we don't have its hash?
|
||||
this.hashes = []
|
||||
}
|
||||
|
||||
/**
|
||||
* Submits transaction to Ethereum network. Resolves when tx gets enough confirmations.
|
||||
* Submits the transaction to Ethereum network. Resolves when tx gets enough confirmations.
|
||||
* Emits progress events.
|
||||
*/
|
||||
send() {
|
||||
this._prepare()
|
||||
.then(() => {
|
||||
this._send()
|
||||
.then((result) => this.promiReceipt.resolve(result))
|
||||
.catch((e) => this.promiReceipt.reject(e))
|
||||
})
|
||||
.catch((e) => this.promiReceipt.reject(e))
|
||||
.finally(this.manager._mutex.release())
|
||||
|
||||
return this.emitter
|
||||
if (this.executed) {
|
||||
throw new Error('The transaction was already executed')
|
||||
}
|
||||
this.executed = true
|
||||
this._execute()
|
||||
.then(this._promise.resolve)
|
||||
.catch(this._promise.reject)
|
||||
return this._emitter
|
||||
}
|
||||
|
||||
/**
|
||||
* Replaces pending tx.
|
||||
* Replaces a pending tx.
|
||||
*
|
||||
* @param tx Transaction to send
|
||||
*/
|
||||
replace(tx) {
|
||||
// todo check if it's not mined yet
|
||||
console.log('Replacing...')
|
||||
async replace(tx) {
|
||||
// todo throw error if the current transaction is mined already
|
||||
console.log('Replacing current transaction')
|
||||
if (!this.executed) {
|
||||
// Tx was not executed yet, just replace it
|
||||
this.tx = tx
|
||||
return
|
||||
}
|
||||
if (!tx.gas) {
|
||||
tx.gas = await this._web3.eth.estimateGas(tx)
|
||||
}
|
||||
tx.nonce = this.tx.nonce // can be different from `this.manager._nonce`
|
||||
tx.gasPrice = Math.max(this.tx.gasPrice, tx.gasPrice || 0) // start no less than current tx gas price
|
||||
|
||||
this.tx = tx
|
||||
return this.send()
|
||||
this._increaseGasPrice()
|
||||
await this._send()
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancels pending tx.
|
||||
* Cancels a pending tx.
|
||||
*/
|
||||
cancel() {
|
||||
// todo check if it's not mined yet
|
||||
console.log('Canceling...')
|
||||
this.tx = {
|
||||
console.log('Canceling the transaction')
|
||||
return this.replace({
|
||||
from: this.address,
|
||||
to: this.address,
|
||||
gasPrice: this.tx.gasPrice,
|
||||
}
|
||||
this._increaseGasPrice()
|
||||
return this.send()
|
||||
value: 0,
|
||||
gas: 21000,
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the transaction. Acquires global mutex for transaction duration
|
||||
*
|
||||
* @returns {Promise<TransactionReceipt>}
|
||||
* @private
|
||||
*/
|
||||
async _execute() {
|
||||
const release = await this.manager._mutex.acquire()
|
||||
try {
|
||||
await this._prepare()
|
||||
await this._send()
|
||||
const receipt = this._waitForConfirmations()
|
||||
// we could have bumped nonce during execution, so get the latest one + 1
|
||||
this.manager._nonce = this.tx.nonce + 1
|
||||
return receipt
|
||||
} finally {
|
||||
release()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare first transaction before submitting it. Inits `gas`, `gasPrice`, `nonce`
|
||||
*
|
||||
* @returns {Promise<void>}
|
||||
* @private
|
||||
*/
|
||||
async _prepare() {
|
||||
this.tx.gas = await this._web3.eth.estimateGas(this.tx)
|
||||
const gas = await this._web3.eth.estimateGas(this.tx)
|
||||
if (!this.tx.gas) {
|
||||
this.tx.gas = gas
|
||||
}
|
||||
if (!this.tx.gasPrice) {
|
||||
this.tx.gasPrice = await this._getGasPrice('fast')
|
||||
}
|
||||
this.tx.nonce = this.manager.nonce
|
||||
if (!this.manager._nonce) {
|
||||
this.manager._nonce = await this._web3.eth.getTransactionCount(this.address, 'latest')
|
||||
}
|
||||
this.tx.nonce = this.manager._nonce
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the current transaction
|
||||
*
|
||||
* @returns {Promise}
|
||||
* @private
|
||||
*/
|
||||
async _send() {
|
||||
// todo throw is we attempt to send a tx that attempts to replace already mined tx
|
||||
const signedTx = await this._web3.eth.accounts.signTransaction(this.tx, this._privateKey)
|
||||
this.tx.date = Date.now()
|
||||
this.submitTimestamp = Date.now()
|
||||
this.tx.hash = signedTx.transactionHash
|
||||
this.hashes.push(this.tx.hash)
|
||||
this.emitter.emit('transactionHash', signedTx.transactionHash)
|
||||
this.hashes.push(signedTx.transactionHash)
|
||||
|
||||
try {
|
||||
await this._broadcast(signedTx.rawTransaction)
|
||||
console.log('Broadcasted. Start waiting for mining...')
|
||||
// The most reliable way to see if one of our tx was mined is to track current nonce
|
||||
} catch (e) {
|
||||
return this._handleSendError(e)
|
||||
}
|
||||
|
||||
while (this.tx.nonce >= (await this._getLastNonce())) {
|
||||
if (Date.now() - this.tx.date >= this.config.GAS_BUMP_INTERVAL) {
|
||||
if (this._increaseGasPrice()) {
|
||||
console.log('Resubmit with higher gas price')
|
||||
return this._send()
|
||||
}
|
||||
this._emitter.emit('transactionHash', signedTx.transactionHash)
|
||||
console.log(`Broadcasted transaction ${signedTx.transactionHash}`)
|
||||
console.log(this.tx)
|
||||
}
|
||||
|
||||
/**
|
||||
* A loop that waits until the current transaction is mined and gets enough confirmations
|
||||
*
|
||||
* @returns {Promise<TransactionReceipt>} The transaction receipt
|
||||
* @private
|
||||
*/
|
||||
async _waitForConfirmations() {
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) {
|
||||
// We are already waiting on certain tx hash
|
||||
if (this.currentTxHash) {
|
||||
const receipt = await this._web3.eth.getTransactionReceipt(this.currentTxHash)
|
||||
|
||||
if (!receipt) {
|
||||
// We were waiting for some tx but it disappeared
|
||||
// Erase the hash and start over
|
||||
this.currentTxHash = null
|
||||
continue
|
||||
}
|
||||
|
||||
const currentBlock = await this._web3.eth.getBlockNumber()
|
||||
const confirmations = Math.max(0, currentBlock - receipt.blockNumber)
|
||||
// todo don't emit repeating confirmation count
|
||||
this._emitter.emit('confirmations', confirmations)
|
||||
if (confirmations >= this.config.CONFIRMATIONS) {
|
||||
// Tx is mined and has enough confirmations
|
||||
return receipt
|
||||
}
|
||||
|
||||
// Tx is mined but doesn't have enough confirmations yet, keep waiting
|
||||
await sleep(this.config.POLL_INTERVAL)
|
||||
continue
|
||||
}
|
||||
|
||||
// Tx is still pending
|
||||
if (await this._getLastNonce() <= this.tx.nonce) {
|
||||
// todo optionally run estimateGas on each iteration and cancel the transaction if it fails
|
||||
|
||||
// We were waiting too long, increase gas price and resubmit
|
||||
if (Date.now() - this.submitTimestamp >= this.config.GAS_BUMP_INTERVAL) {
|
||||
if (this._increaseGasPrice()) {
|
||||
console.log('Resubmitting with higher gas price')
|
||||
await this._send()
|
||||
continue
|
||||
}
|
||||
}
|
||||
// Tx is still pending, keep waiting
|
||||
await sleep(this.config.POLL_INTERVAL)
|
||||
continue
|
||||
}
|
||||
|
||||
let receipt = await this._getReceipts()
|
||||
let retryAttempt = 5
|
||||
while (retryAttempt >= 0 && !receipt) {
|
||||
await sleep(1000)
|
||||
receipt = await this._getReceipts()
|
||||
retryAttempt--
|
||||
}
|
||||
|
||||
// There is a mined tx with current nonce, but it's not one of ours
|
||||
// Probably other tx submitted by other process/client
|
||||
if (!receipt) {
|
||||
// resubmit
|
||||
console.log('Can\'t find our transaction receipt, retrying a few times')
|
||||
// Give node a few more attempts to respond with our receipt
|
||||
let retries = 5
|
||||
while (!receipt && retries--) {
|
||||
await sleep(1000)
|
||||
receipt = await this._getReceipts()
|
||||
}
|
||||
|
||||
// Receipt was not found after a few retries
|
||||
// Resubmit our tx
|
||||
if (!receipt) {
|
||||
console.log('There is a mined tx with our nonce but unknown tx hash, resubmitting with tx with increased nonce')
|
||||
this.tx.nonce++
|
||||
// todo drop gas price to original value?
|
||||
await this._send()
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
console.log('Mined. Start waiting for confirmations...')
|
||||
this.emitter.emit('mined', receipt)
|
||||
|
||||
let currentBlock = await this._web3.eth.getBlockNumber()
|
||||
let confirmations = currentBlock > receipt.blockNumber ? currentBlock - receipt.blockNumber : 0
|
||||
while (confirmations <= this.config.CONFIRMATIONS) {
|
||||
this.emitter.emit('confirmations', confirmations)
|
||||
|
||||
await sleep(this.config.POLL_INTERVAL)
|
||||
receipt = await this._getReceipts()
|
||||
if (!receipt) {
|
||||
// resubmit
|
||||
}
|
||||
currentBlock = await this._web3.eth.getBlockNumber()
|
||||
confirmations = currentBlock - receipt.blockNumber
|
||||
}
|
||||
|
||||
// we could have bumped nonce during execution, so get the latest one + 1
|
||||
this.manager.nonce = this.tx.nonce + 1
|
||||
return receipt
|
||||
} catch (e) {
|
||||
console.log('_send', e)
|
||||
await this._handleSendError()
|
||||
this._emitter.emit('mined', receipt)
|
||||
this.currentTxHash = receipt.transactionHash
|
||||
}
|
||||
}
|
||||
|
||||
@ -213,7 +294,7 @@ class Transaction {
|
||||
console.log('Got error', e)
|
||||
|
||||
// nonce is too low, trying to increase and resubmit
|
||||
if (nonceErrors.includes(e.message)) {
|
||||
if (this._hasError(e.message, nonceErrors)) {
|
||||
console.log(`Nonce ${this.tx.nonce} is too low, increasing and retrying`)
|
||||
if (this.retries <= this.config.MAX_RETRIES) {
|
||||
this.tx.nonce++
|
||||
@ -223,11 +304,28 @@ class Transaction {
|
||||
}
|
||||
|
||||
// there is already a pending tx with higher gas price, trying to bump and resubmit
|
||||
if (gasPriceErrors.includes(e.message)) {
|
||||
console.log(`Gas price ${this.tx.gasPrice} gwei is too low, increasing and retrying`)
|
||||
if (this._hasError(e.message, gasPriceErrors)) {
|
||||
console.log(`Gas price ${fromWei(this.tx.gasPrice, 'gwei')} gwei is too low, increasing and retrying`)
|
||||
this._increaseGasPrice()
|
||||
return this._send()
|
||||
}
|
||||
|
||||
if (this._hasError(e.message, sameTxErrors)) {
|
||||
console.log('Same transaction is already in mempool, skipping submit')
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether error message is contained in errors array
|
||||
*
|
||||
* @param message The message to look up
|
||||
* @param {Array<string|RegExp>} errors Array with errors. Errors can be either string or regexp.
|
||||
* @returns {boolean} Returns true if error message is present in the `errors` array
|
||||
* @private
|
||||
*/
|
||||
_hasError(message, errors) {
|
||||
return errors.find(e => typeof e === 'string' ? e === message : message.match(e)) !== undefined
|
||||
}
|
||||
|
||||
_increaseGasPrice() {
|
||||
@ -243,10 +341,17 @@ class Transaction {
|
||||
return false
|
||||
}
|
||||
this.tx.gasPrice = toHex(BN.min(newGasPrice, maxGasPrice))
|
||||
console.log(`Increasing gas price to ${fromWei(this.tx.gasPrice, 'Gwei')}`)
|
||||
console.log(`Increasing gas price to ${fromWei(this.tx.gasPrice, 'gwei')} gwei`)
|
||||
return true
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches gas price from the oracle
|
||||
*
|
||||
* @param {'instant'|'fast'|'normal'|'slow'} type
|
||||
* @returns {Promise<string>} A hex string representing gas price in wei
|
||||
* @private
|
||||
*/
|
||||
async _getGasPrice(type) {
|
||||
const gasPrices = await this._gasPriceOracle.gasPrices()
|
||||
const result = gasPrices[type].toString()
|
||||
@ -254,6 +359,12 @@ class Transaction {
|
||||
return toHex(toWei(gasPrices[type].toString(), 'gwei'))
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets current nonce for the current account, ignoring any pending transactions
|
||||
*
|
||||
* @returns {Promise<number>}
|
||||
* @private
|
||||
*/
|
||||
_getLastNonce() {
|
||||
return this._web3.eth.getTransactionCount(this.address, 'latest')
|
||||
}
|
||||
|
@ -1,8 +1,9 @@
|
||||
const { toHex, toWei } = require('web3-utils')
|
||||
const TxManager = require('./src/TxManager')
|
||||
const { rpcUrl, privateKey } = require('./config')
|
||||
const logHandles = require('why-is-node-running')
|
||||
|
||||
const TxM = new TxManager({
|
||||
const manager = new TxManager({
|
||||
privateKey,
|
||||
rpcUrl,
|
||||
config: {
|
||||
@ -11,22 +12,25 @@ const TxM = new TxManager({
|
||||
},
|
||||
})
|
||||
|
||||
const tx = {
|
||||
value: 0,
|
||||
gasPrice: toHex(toWei('0.1', 'gwei')),
|
||||
const tx1 = {
|
||||
value: 1,
|
||||
gasPrice: toHex(toWei('1', 'gwei')),
|
||||
to: '0xA43Ce8Cc89Eff3AA5593c742fC56A30Ef2427CB0',
|
||||
}
|
||||
|
||||
const tx2 = {
|
||||
value: 1,
|
||||
// gasPrice: toHex(toWei('0.1', 'gwei')),
|
||||
value: 2,
|
||||
// gasPrice: toHex(toWei('1', 'gwei')),
|
||||
to: '0x0039F22efB07A647557C7C5d17854CFD6D489eF3',
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const Tx = await TxM.createTx(tx)
|
||||
const tx = manager.createTx(tx1)
|
||||
|
||||
const receipt1 = await Tx.send()
|
||||
setTimeout(() => tx.cancel(), 1000)
|
||||
// setTimeout(() => tx.replace(tx2), 1000)
|
||||
|
||||
const receipt = await tx.send()
|
||||
.on('transactionHash', (hash) => {
|
||||
console.log('hash', hash)
|
||||
})
|
||||
@ -37,21 +41,8 @@ async function main() {
|
||||
console.log('confirmations', confirmations)
|
||||
})
|
||||
|
||||
// setTimeout(async () => await Tx.cancel(), 800)
|
||||
|
||||
// const receipt2 = await Tx.replace(tx2)
|
||||
// .on('transactionHash', (hash) => {
|
||||
// console.log('hash', hash)
|
||||
// })
|
||||
// .on('mined', (receipt) => {
|
||||
// console.log('Mined in block', receipt.blockNumber)
|
||||
// })
|
||||
// .on('confirmations', (confirmations) => {
|
||||
// console.log('confirmations', confirmations)
|
||||
// })
|
||||
|
||||
// console.log('receipt2', receipt2)
|
||||
console.log('receipt1', await receipt1)
|
||||
console.log('receipt', receipt)
|
||||
// setTimeout(logHandles, 100)
|
||||
}
|
||||
|
||||
main()
|
||||
|
18
yarn.lock
18
yarn.lock
@ -3565,6 +3565,11 @@ shebang-regex@^1.0.0:
|
||||
resolved "https://registry.yarnpkg.com/shebang-regex/-/shebang-regex-1.0.0.tgz#da42f49740c0b42db2ca9728571cb190c98efea3"
|
||||
integrity sha1-2kL0l0DAtC2yypcoVxyxkMmO/qM=
|
||||
|
||||
siginfo@^2.0.0:
|
||||
version "2.0.0"
|
||||
resolved "https://registry.yarnpkg.com/siginfo/-/siginfo-2.0.0.tgz#32e76c70b79724e3bb567cb9d543eb858ccfaf30"
|
||||
integrity sha512-ybx0WO1/8bSBLEWXZvEd7gMW3Sn3JFlW3TvX1nREbDLRNQNaeNN8WK0meBwPdAaOI7TtRRRJn/Es1zhrrCHu7g==
|
||||
|
||||
signal-exit@^3.0.0, signal-exit@^3.0.2:
|
||||
version "3.0.3"
|
||||
resolved "https://registry.yarnpkg.com/signal-exit/-/signal-exit-3.0.3.tgz#a1410c2edd8f077b08b4e253c8eacfcaf057461c"
|
||||
@ -3624,6 +3629,11 @@ sshpk@^1.7.0:
|
||||
safer-buffer "^2.0.2"
|
||||
tweetnacl "~0.14.0"
|
||||
|
||||
stackback@0.0.2:
|
||||
version "0.0.2"
|
||||
resolved "https://registry.yarnpkg.com/stackback/-/stackback-0.0.2.tgz#1ac8a0d9483848d1695e418b6d031a3c3ce68e3b"
|
||||
integrity sha1-Gsig2Ug4SNFpXkGLbQMaPDzmjjs=
|
||||
|
||||
standard-as-callback@^2.0.1:
|
||||
version "2.0.1"
|
||||
resolved "https://registry.yarnpkg.com/standard-as-callback/-/standard-as-callback-2.0.1.tgz#ed8bb25648e15831759b6023bdb87e6b60b38126"
|
||||
@ -4348,6 +4358,14 @@ which@^1.2.9:
|
||||
dependencies:
|
||||
isexe "^2.0.0"
|
||||
|
||||
why-is-node-running@^2.2.0:
|
||||
version "2.2.0"
|
||||
resolved "https://registry.yarnpkg.com/why-is-node-running/-/why-is-node-running-2.2.0.tgz#fd0a73ea9303920fbb45457c6ecc392ebec90bd2"
|
||||
integrity sha512-rxtN9D0lJaYyP92BR5yoyWecK2txBKmBIuS7GRbOPP5bXsT37/hBqcmTrlrt25DBr9p4WJb6c9LuYSJd89vHRQ==
|
||||
dependencies:
|
||||
siginfo "^2.0.0"
|
||||
stackback "0.0.2"
|
||||
|
||||
wide-align@1.1.3:
|
||||
version "1.1.3"
|
||||
resolved "https://registry.yarnpkg.com/wide-align/-/wide-align-1.1.3.tgz#ae074e6bdc0c14a431e804e624549c633b000457"
|
||||
|
Loading…
Reference in New Issue
Block a user