From b152c67548705be44cdfe40ced1167e82a49f312 Mon Sep 17 00:00:00 2001 From: poma Date: Tue, 29 Sep 2020 06:17:42 +0300 Subject: [PATCH] txManager --- README.md | 6 +- config.js | 2 +- package.json | 1 + src/TxManager.js | 169 +++++++++++++++++++++++++++++++++++++++++++++++ src/utils.js | 15 +++++ src/validate.js | 9 +-- src/worker.js | 83 ++++++++++++++++++----- yarn.lock | 12 ++++ 8 files changed, 270 insertions(+), 27 deletions(-) create mode 100644 src/TxManager.js diff --git a/README.md b/README.md index c256c2a..293ce8f 100644 --- a/README.md +++ b/README.md @@ -62,9 +62,9 @@ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLI ## New relayer architecture -1. TreeWatcher module keeps track of Account Tree changes and automatically caches the actual state in Redis -2. Server module is Express.js instance to accepts http requests -3. Controller contains handlers for the Server endpoints. It validates input data and put a Job to Queue +1. TreeWatcher module keeps track of Account Tree changes and automatically caches the actual state in Redis and emits `treeUpdate` event to redis pub/sub channel +2. Server module is Express.js instance that accepts http requests +3. Controller contains handlers for the Server endpoints. It validates input data and adds a Job to Queue. 4. Queue module is used by Controller to put and get Job from queue (bull wrapper) 5. Status module contains handler to get a Job status. It's used by UI for pull updates 6. Validate contains validation logic for all endpoints diff --git a/config.js b/config.js index 7543f0b..288a838 100644 --- a/config.js +++ b/config.js @@ -152,5 +152,5 @@ module.exports = { watherInterval: Number(process.env.NONCE_WATCHER_INTERVAL || 30) * 1000, pendingTxTimeout: Number(process.env.ALLOWABLE_PENDING_TX_TIMEOUT || 180) * 1000, gasBumpPercentage: process.env.GAS_PRICE_BUMP_PERCENTAGE || 20, - rewardAccount: '0x0000000000000000000000000000000000000000', + rewardAccount: '0x03Ebd0748Aa4D1457cF479cce56309641e0a98F5', } diff --git a/package.json b/package.json index 9f55a69..ba39fad 100644 --- a/package.json +++ b/package.json @@ -12,6 +12,7 @@ "license": "MIT", "dependencies": { "ajv": "^6.12.5", + "async-mutex": "^0.2.4", "bull": "^3.12.1", "circomlib": "git+https://github.com/tornadocash/circomlib.git#5beb6aee94923052faeecea40135d45b6ce6172c", "dotenv": "^8.2.0", diff --git a/src/TxManager.js b/src/TxManager.js new file mode 100644 index 0000000..50a4c3c --- /dev/null +++ b/src/TxManager.js @@ -0,0 +1,169 @@ +const Web3 = require('web3') +const { Mutex } = require('async-mutex') +const { GasPriceOracle } = require('gas-price-oracle') +const { toWei, toHex, toBN, BN } = require('web3-utils') +const { sleep, when } = require('./utils') + +const nonceErrors = [ + 'Returned error: Transaction nonce is too low. Try incrementing the nonce.', + 'Returned error: nonce too low', +] + +const gasPriceErrors = [ + 'Returned error: Transaction gas price supplied is too low. There is another transaction with same nonce in the queue. Try increasing the gas price or incrementing the nonce.', + 'Returned error: replacement transaction underpriced', +] + +const defaultConfig = { + MAX_RETRIES: 10, + GAS_BUMP_PERCENTAGE: 5, + GAS_BUMP_INTERVAL: 1000 * 60 * 5, + MAX_GAS_PRICE: 1000, + POLL_INTERVAL: 3000, +} + + +class TxManager { + constructor({ privateKey, rpcUrl, broadcastNodes = [], config = {} }) { + this.config = Object.assign({ ...defaultConfig }, config) + this._privateKey = privateKey + this._web3 = new Web3(rpcUrl) + this._broadcastNodes = broadcastNodes + this.address = this._web3.eth.accounts.privateKeyToAccount('0x' + privateKey).address + this._web3.eth.accounts.wallet.add('0x' + privateKey) + this._web3.eth.defaultAccount = this.address + this._gasPriceOracle = new GasPriceOracle({ defaultRpc: rpcUrl }) + this._mutex = new Mutex() + } + + // todo get rid of it + async init() { + this.nonce = await this.web3.eth.getTransactionCount(this.address, 'latest') + } + + /** + * Submits transaction to Ethereum network. Resolves when tx gets enough confirmations. + * todo: return PromiEvent that emits progress events + * + * @param tx Transaction to send + */ + async submit(tx) { + const release = await this._mutex.acquire() + try { + await new Transaction(tx, this).submit() + } finally { + release() + } + } +} + +class Transaction { + constructor(tx, manager) { + Object.assign(this, manager) + this.manager = manager + this.tx = tx + this.retries = 0 + this.hash = null + // store all submitted hashes to catch cases when an old tx is mined + // todo: what to do if old tx with the same nonce was submitted + // by other client and we don't have its hash? + this.hashes = [] + } + + async submit() { + await this._prepare() + await this._send() + // we could have bumped nonce during execution, so get the latest one + 1 + this.manager.nonce = this.tx.nonce + 1 + } + + async _prepare() { + this.tx.gas = await this._web3.eth.estimateGas(this.tx) + this.tx.gasPrice = await this._getGasPrice('fast') + this.tx.nonce = this.nonce + } + + async _send() { + const signedTx = await this._web3.eth.accounts.signTransaction(this.tx, this.privateKey) + this.tx.date = Date.now() + this.tx.hash = signedTx.transactionHash + this.hashes.push(this.tx.hash) + try { + await this._broadcast(signedTx.rawTransaction) + // The most reliable way to see if one of our tx was mined is to track current nonce + while(this.tx.nonce <= await this._getLastNonce()) { + if (Date.now() - this.tx.date >= this.config.GAS_BUMP_INTERVAL) { + if (this._increaseGasPrice()) { + return this._send() + } + } + + await sleep(this.config.POLL_INTERVAL) + } + } catch (e) { + await this._handleSendError() + } + } + + /** + * Broadcasts tx to multiple nodes, waits for tx hash only on main node + */ + _broadcast(rawTx) { + const main = this._web3.eth.sendSignedTransaction(rawTx) + for (const node of this._broadcastNodes) { + try { + new Web3(node).eth.sendSignedTransaction(rawTx) + } catch (e) { + console.log(`Failed to send transaction to node ${node}: ${e}`) + } + } + return when(main, 'transactionHash') + } + + _handleSendError(e) { + console.log('Got error', e) + + // nonce is too low, trying to increase and resubmit + if (nonceErrors.includes(e.message)) { + console.log(`Nonce ${this.tx.nonce} is too low, increasing and retrying`) + if (this.retries <= this.config.MAX_RETRIES) { + this.tx.nonce++ + this.retries++ + return this._send() + } + } + + // there is already a pending tx with higher gas price, trying to bump and resubmit + if (gasPriceErrors.includes(e.message)) { + console.log(`Gas price ${this.tx.gasPrice} gwei is too low, increasing and retrying`) + this._increaseGasPrice() + return this._send() + } + } + + _increaseGasPrice() { + const newGasPrice = toBN(this.tx.gasPrice).mul(toBN(this.config.GAS_BUMP_PERCENTAGE)).div(toBN(100)) + const maxGasPrice = toBN(toWei(this.config.MAX_GAS_PRICE.toString(), 'gwei')) + if (toBN(this.tx.gasPrice).eq(maxGasPrice)) { + console.log('Already at max gas price, not bumping') + return false + } + this.tx.gasPrice = toHex(BN.min(newGasPrice, maxGasPrice)) + console.log(`Increasing gas price to ${this.tx.gasPrice}`) + return true + } + + async _getGasPrice(type) { + const gasPrices = await this._gasPriceOracle.gasPrices() + const result = gasPrices[type].toString() + console.log(`${type} gas price is now ${result} gwei`) + return toHex(toWei(gasPrices[type].toString(), 'gwei')) + } + + _getLastNonce() { + return this.web3.eth.getTransactionCount(this.address, 'latest') + } +} + +module.exports = TxManager + diff --git a/src/utils.js b/src/utils.js index 9f87555..9cf5645 100644 --- a/src/utils.js +++ b/src/utils.js @@ -2,6 +2,8 @@ const { instances, netId } = require('../config') const { poseidon } = require('circomlib') const { toBN } = require('web3-utils') +const sleep = (ms) => new Promise(res => setTimeout(res, ms)) + function getInstance(address) { const inst = instances[`netId${netId}`] for (const currency of Object.keys(inst)) { @@ -33,8 +35,21 @@ function setSafeInterval(func, interval) { }) } +/** + * A promise that resolves when the source emits specified event + */ +function when(source, event) { + return new Promise(resolve => { + source.once(event, payload => { + resolve(payload) + }) + }) +} + module.exports = { getInstance, setSafeInterval, poseidonHash2, + sleep, + when, } diff --git a/src/validate.js b/src/validate.js index 258137a..949ad7c 100644 --- a/src/validate.js +++ b/src/validate.js @@ -27,7 +27,7 @@ ajv.addKeyword('isKnownContract', { errors: true }) -ajv.addKeyword('isForFee', { +ajv.addKeyword('isFeeRecipient', { validate: (schema, data) => { try { return rewardAccount === data @@ -42,10 +42,8 @@ const addressType = { type: 'string', pattern: '^0x[a-fA-F0-9]{40}$', isAddress: const proofType = { type: 'string', pattern: '^0x[a-fA-F0-9]{512}$' } const encryptedAccountType = { type: 'string', pattern: '^0x[a-fA-F0-9]{392}$' } const bytes32Type = { type: 'string', pattern: '^0x[a-fA-F0-9]{64}$' } -const instanceType = JSON.parse(JSON.stringify(addressType)) -instanceType.isKnownContract = true -const relayerType = JSON.parse(JSON.stringify(addressType)) -relayerType.isForFee = true +const instanceType = { ...addressType, isKnownContract: true } +const relayerType = { ...addressType, isFeeRecipient: true } const tornadoWithdrawSchema = { type: 'object', @@ -56,7 +54,6 @@ const tornadoWithdrawSchema = { type: 'array', maxItems: 6, minItems: 6, - uniqueItems: true, items: [bytes32Type, bytes32Type, addressType, relayerType, bytes32Type, bytes32Type] } }, diff --git a/src/worker.js b/src/worker.js index 7e2785e..6914ee9 100644 --- a/src/worker.js +++ b/src/worker.js @@ -16,6 +16,7 @@ redisSubscribe.subscribe('treeUpdate', fetchTree) let web3 let nonce let currentTx +let currentJob let tree async function fetchTree() { @@ -30,15 +31,19 @@ async function fetchTree() { async function watcher() { if (currentTx && Date.now() - currentTx.date > gasBumpInterval) { - const newGasPrice = toBN(currentTx.gasPrice).mul(toBN(gasBumpPercentage)).div(toBN(100)) - const maxGasPrice = toBN(toWei(maxGasPrice.toString(), 'Gwei')) - currentTx.gasPrice = toHex(BN.min(newGasPrice, maxGasPrice)) - currentTx.date = Date.now() - console.log(`Resubmitting with gas price ${fromWei(currentTx.gasPrice.toString(), 'gwei')} gwei`) - //await this.sendTx(tx, null, 9999) + bumpGasPrice() } } +async function bumpGasPrice() { + const newGasPrice = toBN(currentTx.gasPrice).mul(toBN(gasBumpPercentage)).div(toBN(100)) + const maxGasPrice = toBN(toWei(maxGasPrice.toString(), 'Gwei')) + currentTx.gasPrice = toHex(BN.min(newGasPrice, maxGasPrice)) + currentTx.date = Date.now() + console.log(`Resubmitting with gas price ${fromWei(currentTx.gasPrice.toString(), 'gwei')} gwei`) + await sendTx(currentTx, updateTxHash) +} + async function init() { web3 = new Web3(rpcUrl, null, { transactionConfirmationBlocks: 1 }) const account = web3.eth.accounts.privateKeyToAccount('0x' + privateKey) @@ -49,7 +54,6 @@ async function init() { setSafeInterval(watcher, 1000) } - async function checkTornadoFee(contract, fee, refund) { } @@ -58,6 +62,7 @@ async function process(job) { if (job.type !== 'tornadoWithdraw') { throw new Error('not implemented') } + currentJob = job console.log(Date.now(), ' withdraw started', job.id) const { proof, args, contract } = job.data const fee = toBN(args[4]) @@ -68,7 +73,7 @@ async function process(job) { const instance = new web3.eth.Contract(tornadoABI, contract) const data = instance.methods.withdraw(proof, ...args).encodeABI() const gasPrices = await gasPriceOracle.gasPrices() - const tx = { + currentTx = { from: web3.eth.defaultAccount, value: numberToHex(refund), gasPrice: toHex(toWei(gasPrices.fast.toString(), 'gwei')), @@ -77,22 +82,66 @@ async function process(job) { data, nonce, } - // nonce++ later - const gas = await web3.eth.estimateGas(tx) - tx.gas = gas + try { + // eslint-disable-next-line require-atomic-updates + currentTx.gas = await web3.eth.estimateGas(currentTx) + } + catch (e) { + console.error('Revert', e) + throw new Error(`Revert by smart contract ${e.message}`) + } + + nonce++ + await sendTx(currentTx, updateTxHash) +} + +async function waitForTx(hash) { + +} + +async function updateTxHash(txHash) { + console.log(`A new successfully sent tx ${txHash}`) + currentJob.data.txHash = txHash + await currentJob.update(currentJob.data) +} + +async function sendTx(tx, onTxHash, retryAttempt) { let signedTx = await this.web3.eth.accounts.signTransaction(tx, privateKey) let result = this.web3.eth.sendSignedTransaction(signedTx.rawTransaction) - result.once('transactionHash', async (txHash) => { - console.log(`A new successfully sent tx ${txHash}`) - job.data.txHash = txHash - await job.update(job.data) - }) + if (onTxHash) { + result.once('transactionHash', onTxHash) + } - await result + try { // await returns once tx is mined + await result + } catch (e) { + console.log(`Error for tx with nonce ${tx.nonce}\n${e.message}`) + if (nonceErrors.includes(e.message)) { + console.log('nonce too low, retrying') + if (retryAttempt <= 10) { + tx.nonce++ + return sendTx(tx, onTxHash, retryAttempt + 1) + } + } + if (gasPriceErrors.includes(e.message)) { + return bumpGasPrice() + } + throw new Error(e) + } } +const nonceErrors = [ + 'Returned error: Transaction nonce is too low. Try incrementing the nonce.', + 'Returned error: nonce too low', +] + +const gasPriceErrors = [ + 'Returned error: Transaction gas price supplied is too low. There is another transaction with same nonce in the queue. Try increasing the gas price or incrementing the nonce.', + 'Returned error: replacement transaction underpriced', +] + async function main() { await init() diff --git a/yarn.lock b/yarn.lock index 1c380fc..aad6c26 100644 --- a/yarn.lock +++ b/yarn.lock @@ -322,6 +322,13 @@ async-limiter@~1.0.0: resolved "https://registry.yarnpkg.com/async-limiter/-/async-limiter-1.0.1.tgz#dd379e94f0db8310b08291f9d64c3209766617fd" integrity sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ== +async-mutex@^0.2.4: + version "0.2.4" + resolved "https://registry.yarnpkg.com/async-mutex/-/async-mutex-0.2.4.tgz#f6ea5f9cc73147f395f86fa573a2af039fe63082" + integrity sha512-fcQKOXUKMQc57JlmjBCHtkKNrfGpHyR7vu18RfuLfeTAf4hK9PgOadPR5cDrBQ682zasrLUhJFe7EKAHJOduDg== + dependencies: + tslib "^2.0.0" + asynckit@^0.4.0: version "0.4.0" resolved "https://registry.yarnpkg.com/asynckit/-/asynckit-0.4.0.tgz#c79ed97f7f34cb8f2ba1bc9790bcc366474b4b79" @@ -3540,6 +3547,11 @@ tslib@^1.9.0: resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.13.0.tgz#c881e13cc7015894ed914862d276436fa9a47043" integrity sha512-i/6DQjL8Xf3be4K/E6Wgpekn5Qasl1usyw++dAA35Ue5orEn65VIxOA+YvNNl9HV3qv70T7CNwjODHZrLwvd1Q== +tslib@^2.0.0: + version "2.0.1" + resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.0.1.tgz#410eb0d113e5b6356490eec749603725b021b43e" + integrity sha512-SgIkNheinmEBgx1IUNirK0TUD4X9yjjBRTqqjggWCU3pUEqIk3/Uwl3yRixYKT6WjQuGiwDv4NomL3wqRCj+CQ== + tunnel-agent@^0.6.0: version "0.6.0" resolved "https://registry.yarnpkg.com/tunnel-agent/-/tunnel-agent-0.6.0.tgz#27a5dea06b36b04a0a9966774b290868f0fc40fd"