txManager

This commit is contained in:
poma 2020-09-29 06:17:42 +03:00
parent 383693bd44
commit b152c67548
No known key found for this signature in database
GPG Key ID: BA20CB01FE165657
8 changed files with 270 additions and 27 deletions

@ -62,9 +62,9 @@ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLI
## New relayer architecture
1. TreeWatcher module keeps track of Account Tree changes and automatically caches the actual state in Redis
2. Server module is Express.js instance to accepts http requests
3. Controller contains handlers for the Server endpoints. It validates input data and put a Job to Queue
1. TreeWatcher module keeps track of Account Tree changes and automatically caches the actual state in Redis and emits `treeUpdate` event to redis pub/sub channel
2. Server module is Express.js instance that accepts http requests
3. Controller contains handlers for the Server endpoints. It validates input data and adds a Job to Queue.
4. Queue module is used by Controller to put and get Job from queue (bull wrapper)
5. Status module contains handler to get a Job status. It's used by UI for pull updates
6. Validate contains validation logic for all endpoints

@ -152,5 +152,5 @@ module.exports = {
watherInterval: Number(process.env.NONCE_WATCHER_INTERVAL || 30) * 1000,
pendingTxTimeout: Number(process.env.ALLOWABLE_PENDING_TX_TIMEOUT || 180) * 1000,
gasBumpPercentage: process.env.GAS_PRICE_BUMP_PERCENTAGE || 20,
rewardAccount: '0x0000000000000000000000000000000000000000',
rewardAccount: '0x03Ebd0748Aa4D1457cF479cce56309641e0a98F5',
}

@ -12,6 +12,7 @@
"license": "MIT",
"dependencies": {
"ajv": "^6.12.5",
"async-mutex": "^0.2.4",
"bull": "^3.12.1",
"circomlib": "git+https://github.com/tornadocash/circomlib.git#5beb6aee94923052faeecea40135d45b6ce6172c",
"dotenv": "^8.2.0",

169
src/TxManager.js Normal file

@ -0,0 +1,169 @@
const Web3 = require('web3')
const { Mutex } = require('async-mutex')
const { GasPriceOracle } = require('gas-price-oracle')
const { toWei, toHex, toBN, BN } = require('web3-utils')
const { sleep, when } = require('./utils')
const nonceErrors = [
'Returned error: Transaction nonce is too low. Try incrementing the nonce.',
'Returned error: nonce too low',
]
const gasPriceErrors = [
'Returned error: Transaction gas price supplied is too low. There is another transaction with same nonce in the queue. Try increasing the gas price or incrementing the nonce.',
'Returned error: replacement transaction underpriced',
]
const defaultConfig = {
MAX_RETRIES: 10,
GAS_BUMP_PERCENTAGE: 5,
GAS_BUMP_INTERVAL: 1000 * 60 * 5,
MAX_GAS_PRICE: 1000,
POLL_INTERVAL: 3000,
}
class TxManager {
constructor({ privateKey, rpcUrl, broadcastNodes = [], config = {} }) {
this.config = Object.assign({ ...defaultConfig }, config)
this._privateKey = privateKey
this._web3 = new Web3(rpcUrl)
this._broadcastNodes = broadcastNodes
this.address = this._web3.eth.accounts.privateKeyToAccount('0x' + privateKey).address
this._web3.eth.accounts.wallet.add('0x' + privateKey)
this._web3.eth.defaultAccount = this.address
this._gasPriceOracle = new GasPriceOracle({ defaultRpc: rpcUrl })
this._mutex = new Mutex()
}
// todo get rid of it
async init() {
this.nonce = await this.web3.eth.getTransactionCount(this.address, 'latest')
}
/**
* Submits transaction to Ethereum network. Resolves when tx gets enough confirmations.
* todo: return PromiEvent that emits progress events
*
* @param tx Transaction to send
*/
async submit(tx) {
const release = await this._mutex.acquire()
try {
await new Transaction(tx, this).submit()
} finally {
release()
}
}
}
class Transaction {
constructor(tx, manager) {
Object.assign(this, manager)
this.manager = manager
this.tx = tx
this.retries = 0
this.hash = null
// store all submitted hashes to catch cases when an old tx is mined
// todo: what to do if old tx with the same nonce was submitted
// by other client and we don't have its hash?
this.hashes = []
}
async submit() {
await this._prepare()
await this._send()
// we could have bumped nonce during execution, so get the latest one + 1
this.manager.nonce = this.tx.nonce + 1
}
async _prepare() {
this.tx.gas = await this._web3.eth.estimateGas(this.tx)
this.tx.gasPrice = await this._getGasPrice('fast')
this.tx.nonce = this.nonce
}
async _send() {
const signedTx = await this._web3.eth.accounts.signTransaction(this.tx, this.privateKey)
this.tx.date = Date.now()
this.tx.hash = signedTx.transactionHash
this.hashes.push(this.tx.hash)
try {
await this._broadcast(signedTx.rawTransaction)
// The most reliable way to see if one of our tx was mined is to track current nonce
while(this.tx.nonce <= await this._getLastNonce()) {
if (Date.now() - this.tx.date >= this.config.GAS_BUMP_INTERVAL) {
if (this._increaseGasPrice()) {
return this._send()
}
}
await sleep(this.config.POLL_INTERVAL)
}
} catch (e) {
await this._handleSendError()
}
}
/**
* Broadcasts tx to multiple nodes, waits for tx hash only on main node
*/
_broadcast(rawTx) {
const main = this._web3.eth.sendSignedTransaction(rawTx)
for (const node of this._broadcastNodes) {
try {
new Web3(node).eth.sendSignedTransaction(rawTx)
} catch (e) {
console.log(`Failed to send transaction to node ${node}: ${e}`)
}
}
return when(main, 'transactionHash')
}
_handleSendError(e) {
console.log('Got error', e)
// nonce is too low, trying to increase and resubmit
if (nonceErrors.includes(e.message)) {
console.log(`Nonce ${this.tx.nonce} is too low, increasing and retrying`)
if (this.retries <= this.config.MAX_RETRIES) {
this.tx.nonce++
this.retries++
return this._send()
}
}
// there is already a pending tx with higher gas price, trying to bump and resubmit
if (gasPriceErrors.includes(e.message)) {
console.log(`Gas price ${this.tx.gasPrice} gwei is too low, increasing and retrying`)
this._increaseGasPrice()
return this._send()
}
}
_increaseGasPrice() {
const newGasPrice = toBN(this.tx.gasPrice).mul(toBN(this.config.GAS_BUMP_PERCENTAGE)).div(toBN(100))
const maxGasPrice = toBN(toWei(this.config.MAX_GAS_PRICE.toString(), 'gwei'))
if (toBN(this.tx.gasPrice).eq(maxGasPrice)) {
console.log('Already at max gas price, not bumping')
return false
}
this.tx.gasPrice = toHex(BN.min(newGasPrice, maxGasPrice))
console.log(`Increasing gas price to ${this.tx.gasPrice}`)
return true
}
async _getGasPrice(type) {
const gasPrices = await this._gasPriceOracle.gasPrices()
const result = gasPrices[type].toString()
console.log(`${type} gas price is now ${result} gwei`)
return toHex(toWei(gasPrices[type].toString(), 'gwei'))
}
_getLastNonce() {
return this.web3.eth.getTransactionCount(this.address, 'latest')
}
}
module.exports = TxManager

@ -2,6 +2,8 @@ const { instances, netId } = require('../config')
const { poseidon } = require('circomlib')
const { toBN } = require('web3-utils')
const sleep = (ms) => new Promise(res => setTimeout(res, ms))
function getInstance(address) {
const inst = instances[`netId${netId}`]
for (const currency of Object.keys(inst)) {
@ -33,8 +35,21 @@ function setSafeInterval(func, interval) {
})
}
/**
* A promise that resolves when the source emits specified event
*/
function when(source, event) {
return new Promise(resolve => {
source.once(event, payload => {
resolve(payload)
})
})
}
module.exports = {
getInstance,
setSafeInterval,
poseidonHash2,
sleep,
when,
}

@ -27,7 +27,7 @@ ajv.addKeyword('isKnownContract', {
errors: true
})
ajv.addKeyword('isForFee', {
ajv.addKeyword('isFeeRecipient', {
validate: (schema, data) => {
try {
return rewardAccount === data
@ -42,10 +42,8 @@ const addressType = { type: 'string', pattern: '^0x[a-fA-F0-9]{40}$', isAddress:
const proofType = { type: 'string', pattern: '^0x[a-fA-F0-9]{512}$' }
const encryptedAccountType = { type: 'string', pattern: '^0x[a-fA-F0-9]{392}$' }
const bytes32Type = { type: 'string', pattern: '^0x[a-fA-F0-9]{64}$' }
const instanceType = JSON.parse(JSON.stringify(addressType))
instanceType.isKnownContract = true
const relayerType = JSON.parse(JSON.stringify(addressType))
relayerType.isForFee = true
const instanceType = { ...addressType, isKnownContract: true }
const relayerType = { ...addressType, isFeeRecipient: true }
const tornadoWithdrawSchema = {
type: 'object',
@ -56,7 +54,6 @@ const tornadoWithdrawSchema = {
type: 'array',
maxItems: 6,
minItems: 6,
uniqueItems: true,
items: [bytes32Type, bytes32Type, addressType, relayerType, bytes32Type, bytes32Type]
}
},

@ -16,6 +16,7 @@ redisSubscribe.subscribe('treeUpdate', fetchTree)
let web3
let nonce
let currentTx
let currentJob
let tree
async function fetchTree() {
@ -30,13 +31,17 @@ async function fetchTree() {
async function watcher() {
if (currentTx && Date.now() - currentTx.date > gasBumpInterval) {
bumpGasPrice()
}
}
async function bumpGasPrice() {
const newGasPrice = toBN(currentTx.gasPrice).mul(toBN(gasBumpPercentage)).div(toBN(100))
const maxGasPrice = toBN(toWei(maxGasPrice.toString(), 'Gwei'))
currentTx.gasPrice = toHex(BN.min(newGasPrice, maxGasPrice))
currentTx.date = Date.now()
console.log(`Resubmitting with gas price ${fromWei(currentTx.gasPrice.toString(), 'gwei')} gwei`)
//await this.sendTx(tx, null, 9999)
}
await sendTx(currentTx, updateTxHash)
}
async function init() {
@ -49,7 +54,6 @@ async function init() {
setSafeInterval(watcher, 1000)
}
async function checkTornadoFee(contract, fee, refund) {
}
@ -58,6 +62,7 @@ async function process(job) {
if (job.type !== 'tornadoWithdraw') {
throw new Error('not implemented')
}
currentJob = job
console.log(Date.now(), ' withdraw started', job.id)
const { proof, args, contract } = job.data
const fee = toBN(args[4])
@ -68,7 +73,7 @@ async function process(job) {
const instance = new web3.eth.Contract(tornadoABI, contract)
const data = instance.methods.withdraw(proof, ...args).encodeABI()
const gasPrices = await gasPriceOracle.gasPrices()
const tx = {
currentTx = {
from: web3.eth.defaultAccount,
value: numberToHex(refund),
gasPrice: toHex(toWei(gasPrices.fast.toString(), 'gwei')),
@ -77,22 +82,66 @@ async function process(job) {
data,
nonce,
}
// nonce++ later
const gas = await web3.eth.estimateGas(tx)
tx.gas = gas
try {
// eslint-disable-next-line require-atomic-updates
currentTx.gas = await web3.eth.estimateGas(currentTx)
}
catch (e) {
console.error('Revert', e)
throw new Error(`Revert by smart contract ${e.message}`)
}
nonce++
await sendTx(currentTx, updateTxHash)
}
async function waitForTx(hash) {
}
async function updateTxHash(txHash) {
console.log(`A new successfully sent tx ${txHash}`)
currentJob.data.txHash = txHash
await currentJob.update(currentJob.data)
}
async function sendTx(tx, onTxHash, retryAttempt) {
let signedTx = await this.web3.eth.accounts.signTransaction(tx, privateKey)
let result = this.web3.eth.sendSignedTransaction(signedTx.rawTransaction)
result.once('transactionHash', async (txHash) => {
console.log(`A new successfully sent tx ${txHash}`)
job.data.txHash = txHash
await job.update(job.data)
})
await result
if (onTxHash) {
result.once('transactionHash', onTxHash)
}
try { // await returns once tx is mined
await result
} catch (e) {
console.log(`Error for tx with nonce ${tx.nonce}\n${e.message}`)
if (nonceErrors.includes(e.message)) {
console.log('nonce too low, retrying')
if (retryAttempt <= 10) {
tx.nonce++
return sendTx(tx, onTxHash, retryAttempt + 1)
}
}
if (gasPriceErrors.includes(e.message)) {
return bumpGasPrice()
}
throw new Error(e)
}
}
const nonceErrors = [
'Returned error: Transaction nonce is too low. Try incrementing the nonce.',
'Returned error: nonce too low',
]
const gasPriceErrors = [
'Returned error: Transaction gas price supplied is too low. There is another transaction with same nonce in the queue. Try increasing the gas price or incrementing the nonce.',
'Returned error: replacement transaction underpriced',
]
async function main() {
await init()

@ -322,6 +322,13 @@ async-limiter@~1.0.0:
resolved "https://registry.yarnpkg.com/async-limiter/-/async-limiter-1.0.1.tgz#dd379e94f0db8310b08291f9d64c3209766617fd"
integrity sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==
async-mutex@^0.2.4:
version "0.2.4"
resolved "https://registry.yarnpkg.com/async-mutex/-/async-mutex-0.2.4.tgz#f6ea5f9cc73147f395f86fa573a2af039fe63082"
integrity sha512-fcQKOXUKMQc57JlmjBCHtkKNrfGpHyR7vu18RfuLfeTAf4hK9PgOadPR5cDrBQ682zasrLUhJFe7EKAHJOduDg==
dependencies:
tslib "^2.0.0"
asynckit@^0.4.0:
version "0.4.0"
resolved "https://registry.yarnpkg.com/asynckit/-/asynckit-0.4.0.tgz#c79ed97f7f34cb8f2ba1bc9790bcc366474b4b79"
@ -3540,6 +3547,11 @@ tslib@^1.9.0:
resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.13.0.tgz#c881e13cc7015894ed914862d276436fa9a47043"
integrity sha512-i/6DQjL8Xf3be4K/E6Wgpekn5Qasl1usyw++dAA35Ue5orEn65VIxOA+YvNNl9HV3qv70T7CNwjODHZrLwvd1Q==
tslib@^2.0.0:
version "2.0.1"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.0.1.tgz#410eb0d113e5b6356490eec749603725b021b43e"
integrity sha512-SgIkNheinmEBgx1IUNirK0TUD4X9yjjBRTqqjggWCU3pUEqIk3/Uwl3yRixYKT6WjQuGiwDv4NomL3wqRCj+CQ==
tunnel-agent@^0.6.0:
version "0.6.0"
resolved "https://registry.yarnpkg.com/tunnel-agent/-/tunnel-agent-0.6.0.tgz#27a5dea06b36b04a0a9966774b290868f0fc40fd"