From 6fe63ae9f4daf6ecee51bbe70f76eb9a910f11d0 Mon Sep 17 00:00:00 2001 From: Kirill Fedoseev Date: Sat, 12 Sep 2020 17:01:37 +0300 Subject: [PATCH] Possibility to resend old pending transactions (#425) --- .../affirmation-request-watcher.config.js | 2 +- oracle/config/base.config.js | 2 + .../collected-signatures-watcher.config.js | 2 +- .../config/convert-to-chai-worker.config.js | 2 +- oracle/config/foreign-sender.config.js | 3 +- oracle/config/home-sender.config.js | 3 +- .../signature-request-watcher.config.js | 2 +- oracle/config/transfer-watcher.config.js | 2 +- oracle/src/confirmRelay.js | 6 +- oracle/src/sender.js | 52 +++++- oracle/src/services/amqpClient.js | 169 ++++++++++++------ oracle/src/utils/constants.js | 6 +- oracle/src/utils/utils.js | 2 +- 13 files changed, 177 insertions(+), 76 deletions(-) diff --git a/oracle/config/affirmation-request-watcher.config.js b/oracle/config/affirmation-request-watcher.config.js index f91cc471..9ae5be43 100644 --- a/oracle/config/affirmation-request-watcher.config.js +++ b/oracle/config/affirmation-request-watcher.config.js @@ -24,7 +24,7 @@ module.exports = { ...baseConfig.bridgeConfig, ...baseConfig.foreignConfig, event: 'UserRequestForAffirmation', - queue: 'home', + queue: 'home-prioritized', name: `watcher-${id}`, id } diff --git a/oracle/config/base.config.js b/oracle/config/base.config.js index 1e2398cd..5808b5dd 100644 --- a/oracle/config/base.config.js +++ b/oracle/config/base.config.js @@ -73,6 +73,7 @@ const bridgeConfig = { } const homeConfig = { + chain: 'home', eventContractAddress: process.env.COMMON_HOME_BRIDGE_ADDRESS, eventAbi: homeAbi, bridgeContractAddress: process.env.COMMON_HOME_BRIDGE_ADDRESS, @@ -83,6 +84,7 @@ const homeConfig = { } const foreignConfig = { + chain: 'foreign', eventContractAddress: process.env.COMMON_FOREIGN_BRIDGE_ADDRESS, eventAbi: foreignAbi, bridgeContractAddress: process.env.COMMON_FOREIGN_BRIDGE_ADDRESS, diff --git a/oracle/config/collected-signatures-watcher.config.js b/oracle/config/collected-signatures-watcher.config.js index cdffa345..9340d13d 100644 --- a/oracle/config/collected-signatures-watcher.config.js +++ b/oracle/config/collected-signatures-watcher.config.js @@ -6,7 +6,7 @@ module.exports = { ...baseConfig.bridgeConfig, ...baseConfig.homeConfig, event: 'CollectedSignatures', - queue: 'foreign', + queue: 'foreign-prioritized', name: `watcher-${id}`, id } diff --git a/oracle/config/convert-to-chai-worker.config.js b/oracle/config/convert-to-chai-worker.config.js index f30189c7..8f30b098 100644 --- a/oracle/config/convert-to-chai-worker.config.js +++ b/oracle/config/convert-to-chai-worker.config.js @@ -14,7 +14,7 @@ module.exports = { ...baseConfig.bridgeConfig, ...baseConfig.foreignConfig, workerQueue: 'convert-to-chai', - senderQueue: 'foreign', + senderQueue: 'foreign-prioritized', name: `worker-${id}`, id } diff --git a/oracle/config/foreign-sender.config.js b/oracle/config/foreign-sender.config.js index c589a119..774f77ed 100644 --- a/oracle/config/foreign-sender.config.js +++ b/oracle/config/foreign-sender.config.js @@ -4,7 +4,8 @@ const { web3Foreign } = require('../src/services/web3') module.exports = { ...baseConfig.bridgeConfig, - queue: 'foreign', + queue: 'foreign-prioritized', + oldQueue: 'foreign', id: 'foreign', name: 'sender-foreign', web3: web3Foreign diff --git a/oracle/config/home-sender.config.js b/oracle/config/home-sender.config.js index cc5ab3a1..aed9fc9d 100644 --- a/oracle/config/home-sender.config.js +++ b/oracle/config/home-sender.config.js @@ -4,7 +4,8 @@ const { web3Home } = require('../src/services/web3') module.exports = { ...baseConfig.bridgeConfig, - queue: 'home', + queue: 'home-prioritized', + oldQueue: 'home', id: 'home', name: 'sender-home', web3: web3Home diff --git a/oracle/config/signature-request-watcher.config.js b/oracle/config/signature-request-watcher.config.js index a487c432..60d9a805 100644 --- a/oracle/config/signature-request-watcher.config.js +++ b/oracle/config/signature-request-watcher.config.js @@ -6,7 +6,7 @@ module.exports = { ...baseConfig.bridgeConfig, ...baseConfig.homeConfig, event: 'UserRequestForSignature', - queue: 'home', + queue: 'home-prioritized', name: `watcher-${id}`, id } diff --git a/oracle/config/transfer-watcher.config.js b/oracle/config/transfer-watcher.config.js index 76c30995..c7c82282 100644 --- a/oracle/config/transfer-watcher.config.js +++ b/oracle/config/transfer-watcher.config.js @@ -41,7 +41,7 @@ module.exports = { eventContractAddress: initialChecks.bridgeableTokenAddress, eventAbi: ERC20_ABI, eventFilter: { to: process.env.COMMON_FOREIGN_BRIDGE_ADDRESS }, - queue: 'home', + queue: 'home-prioritized', ...workerQueueConfig, name: `watcher-${id}`, id diff --git a/oracle/src/confirmRelay.js b/oracle/src/confirmRelay.js index 57f2c18a..d7b45043 100644 --- a/oracle/src/confirmRelay.js +++ b/oracle/src/confirmRelay.js @@ -138,8 +138,8 @@ async function main({ sendJob, txHash }) { } async function sendJobTx(jobs) { - const gasPrice = await GasPrice.start(config.queue, true) - const chainId = await getChainId(config.queue) + const gasPrice = await GasPrice.start(config.chain, true) + const chainId = await getChainId(config.chain) let nonce = await getNonce(web3Instance, ORACLE_VALIDATOR_ADDRESS) await syncForEach(jobs, async job => { @@ -153,7 +153,7 @@ async function sendJobTx(jobs) { try { logger.info(`Sending transaction with nonce ${nonce}`) const txHash = await sendTx({ - chain: config.queue, + chain: config.chain, data: job.data, nonce, gasPrice: gasPrice.toString(10), diff --git a/oracle/src/sender.js b/oracle/src/sender.js index 10515cb9..c6fbde30 100644 --- a/oracle/src/sender.js +++ b/oracle/src/sender.js @@ -1,5 +1,6 @@ require('../env') const path = require('path') +const { toBN } = require('web3-utils') const { connectSenderToQueue } = require('./services/amqpClient') const { redis } = require('./services/redisClient') const GasPrice = require('./services/gasPrice') @@ -45,6 +46,7 @@ async function initialize() { chainId = await getChainId(config.id) connectSenderToQueue({ queueName: config.queue, + oldQueueName: config.oldQueue, cb: options => { if (config.maxProcessingTime) { return watchdog(() => main(options), config.maxProcessingTime, () => { @@ -88,7 +90,7 @@ function updateNonce(nonce) { return redis.set(nonceKey, nonce) } -async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry }) { +async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry, scheduleTransactionResend }) { try { if (redis.status !== 'ready') { nackMsg(msg) @@ -103,8 +105,15 @@ async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry }) { let insufficientFunds = false let minimumBalance = null const failedTx = [] + const sentTx = [] - logger.debug(`Sending ${txArray.length} transactions`) + const isResend = txArray.length > 0 && !!txArray[0].txHash + + if (isResend) { + logger.debug(`Checking status of ${txArray.length} transactions`) + } else { + logger.debug(`Sending ${txArray.length} transactions`) + } await syncForEach(txArray, async job => { let gasLimit if (typeof job.extraGas === 'number') { @@ -114,11 +123,37 @@ async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry }) { } try { - logger.info(`Sending transaction with nonce ${nonce}`) + let txNonce + if (isResend) { + const tx = await web3Instance.eth.getTransaction(job.txHash) + + if (tx === null) { + logger.info(`Transaction ${job.txHash} was not found, dropping it`) + return + } + if (tx.blockNumber !== null) { + logger.info(`Transaction ${job.txHash} was successfully mined`) + return + } + + logger.info( + `Previously sent transaction is stuck, updating gasPrice: ${tx.gasPrice} -> ${gasPrice.toString(10)}` + ) + if (toBN(tx.gasPrice).gte(toBN(gasPrice))) { + logger.info("Gas price returned from the oracle didn't increase, will reinspect this transaction later") + sentTx.push(job) + return + } + + txNonce = tx.nonce + } else { + txNonce = nonce++ + } + logger.info(`Sending transaction with nonce ${txNonce}`) const txHash = await sendTx({ chain: config.id, data: job.data, - nonce, + nonce: txNonce, gasPrice: gasPrice.toString(10), amount: '0', gasLimit, @@ -127,8 +162,11 @@ async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry }) { chainId, web3: web3Instance }) + sentTx.push({ + ...job, + txHash + }) - nonce++ logger.info( { eventTransactionHash: job.transactionReference, generatedTransactionHash: txHash }, `Tx generated ${txHash} for event Tx ${job.transactionReference}` @@ -163,6 +201,10 @@ async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry }) { logger.info(`Sending ${failedTx.length} Failed Tx to Queue`) await scheduleForRetry(failedTx, msg.properties.headers['x-retries']) } + if (sentTx.length) { + logger.info(`Sending ${sentTx.length} Tx Delayed Resend Requests to Queue`) + await scheduleTransactionResend(sentTx) + } ackMsg(msg) logger.debug(`Finished processing msg`) diff --git a/oracle/src/services/amqpClient.js b/oracle/src/services/amqpClient.js index 3e0f6823..86bbb19a 100644 --- a/oracle/src/services/amqpClient.js +++ b/oracle/src/services/amqpClient.js @@ -4,6 +4,12 @@ const dns = require('dns') const connection = require('amqp-connection-manager').connect(process.env.ORACLE_QUEUE_URL) const logger = require('./logger') const { getRetrySequence } = require('../utils/utils') +const { + TRANSACTION_RESEND_TIMEOUT, + SENDER_QUEUE_MAX_PRIORITY, + SENDER_QUEUE_SEND_PRIORITY, + SENDER_QUEUE_CHECK_STATUS_PRIORITY +} = require('../utils/constants') connection.on('connect', () => { logger.info('Connected to amqp Broker') @@ -22,16 +28,18 @@ async function isAttached() { } function connectWatcherToQueue({ queueName, workerQueue, cb }) { - const queueList = workerQueue ? [queueName, workerQueue] : [queueName] - const channelWrapper = connection.createChannel({ json: true, - setup(channel) { - return Promise.all(queueList.map(queue => channel.assertQueue(queue, { durable: true }))) + async setup(channel) { + await channel.assertQueue(queueName, { durable: true, maxPriority: SENDER_QUEUE_MAX_PRIORITY }) + if (workerQueue) { + await channel.assertQueue(workerQueue, { durable: true }) + } } }) - const sendToQueue = data => channelWrapper.sendToQueue(queueName, data, { persistent: true }) + const sendToQueue = data => + channelWrapper.sendToQueue(queueName, data, { persistent: true, priority: SENDER_QUEUE_SEND_PRIORITY }) let sendToWorker if (workerQueue) { sendToWorker = data => channelWrapper.sendToQueue(workerQueue, data, { persistent: true }) @@ -40,38 +48,60 @@ function connectWatcherToQueue({ queueName, workerQueue, cb }) { cb({ sendToQueue, sendToWorker, channel: channelWrapper }) } -function connectSenderToQueue({ queueName, cb }) { +function connectSenderToQueue({ queueName, oldQueueName, cb }) { const deadLetterExchange = `${queueName}-retry` + async function resendMessagesToNewQueue(channel) { + logger.info(`Trying to check messages in the old non-priority queue ${queueName}`) + while (true) { + const msg = await channel.get(oldQueueName) + if (msg === false) { + logger.info(`No messages in the old queue ${oldQueueName} left`) + break + } + logger.debug(`Message in the old queue ${oldQueueName} was found, redirecting it to the new queue ${queueName}`) + await channel.sendToQueue(queueName, msg.content, { persistent: true, priority: SENDER_QUEUE_SEND_PRIORITY }) + await channel.ack(msg) + } + } + const channelWrapper = connection.createChannel({ json: true }) - channelWrapper.addSetup(channel => { - return Promise.all([ - channel.assertExchange(deadLetterExchange, 'fanout', { durable: true }), - channel.assertQueue(queueName, { durable: true }), - channel.bindQueue(queueName, deadLetterExchange), - channel.prefetch(1), - channel.consume(queueName, msg => - cb({ - msg, - channel: channelWrapper, - ackMsg: job => channelWrapper.ack(job), - nackMsg: job => channelWrapper.nack(job, false, true), - scheduleForRetry: async (data, msgRetries = 0) => { - await generateRetry({ - data, - msgRetries, - channelWrapper, - channel, - queueName, - deadLetterExchange - }) - } - }) - ) - ]) + channelWrapper.addSetup(async channel => { + await channel.assertExchange(deadLetterExchange, 'fanout', { durable: true }) + await channel.assertQueue(queueName, { durable: true, maxPriority: SENDER_QUEUE_MAX_PRIORITY }) + await channel.assertQueue(oldQueueName, { durable: true }).then(() => resendMessagesToNewQueue(channel)) + await channel.bindQueue(queueName, deadLetterExchange) + await channel.prefetch(1) + await channel.consume(queueName, msg => + cb({ + msg, + channel: channelWrapper, + ackMsg: job => channelWrapper.ack(job), + nackMsg: job => channelWrapper.nack(job, false, true), + scheduleForRetry: async (data, msgRetries = 0) => { + await generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange + }) + }, + scheduleTransactionResend: async data => { + await generateTransactionResend({ + data, + channelWrapper, + channel, + queueName, + deadLetterExchange + }) + } + }) + ) }) } @@ -82,52 +112,73 @@ function connectWorkerToQueue({ queueName, senderQueue, cb }) { json: true }) - channelWrapper.addSetup(channel => { - return Promise.all([ - channel.assertExchange(deadLetterExchange, 'fanout', { durable: true }), - channel.assertQueue(queueName, { durable: true }), - channel.assertQueue(senderQueue, { durable: true }), - channel.bindQueue(queueName, deadLetterExchange), - channel.prefetch(1), - channel.consume(queueName, msg => - cb({ - msg, - channel: channelWrapper, - ackMsg: job => channelWrapper.ack(job), - nackMsg: job => channelWrapper.nack(job, false, true), - sendToSenderQueue: data => channelWrapper.sendToQueue(senderQueue, data, { persistent: true }), - scheduleForRetry: async (data, msgRetries = 0) => { - await generateRetry({ - data, - msgRetries, - channelWrapper, - channel, - queueName, - deadLetterExchange - }) - } - }) - ) - ]) + channelWrapper.addSetup(async channel => { + await channel.assertExchange(deadLetterExchange, 'fanout', { durable: true }) + await channel.assertQueue(queueName, { durable: true }) + await channel.assertQueue(senderQueue, { durable: true, maxPriority: SENDER_QUEUE_MAX_PRIORITY }) + await channel.bindQueue(queueName, deadLetterExchange) + await channel.prefetch(1) + await channel.consume(queueName, msg => + cb({ + msg, + channel: channelWrapper, + ackMsg: job => channelWrapper.ack(job), + nackMsg: job => channelWrapper.nack(job, false, true), + sendToSenderQueue: data => + channelWrapper.sendToQueue(senderQueue, data, { persistent: true, priority: SENDER_QUEUE_SEND_PRIORITY }), + scheduleForRetry: async (data, msgRetries = 0) => { + await generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange + }) + } + }) + ) }) } async function generateRetry({ data, msgRetries, channelWrapper, channel, queueName, deadLetterExchange }) { const retries = msgRetries + 1 const delay = getRetrySequence(retries) * 1000 + + // New retry queue is created, and one message is send to it. + // Nobody consumes messages from this queue, so eventually the message will be dropped. + // `messageTtl` defines a timeout after which the message will be dropped out of the queue. + // When message is dropped, it will be resend into the specified `deadLetterExchange` with the updated `x-retries` header. const retryQueue = `${queueName}-retry-${delay}` await channel.assertQueue(retryQueue, { durable: true, deadLetterExchange, messageTtl: delay, - expires: delay * 10 + expires: delay * 10, + maxPriority: SENDER_QUEUE_MAX_PRIORITY }) await channelWrapper.sendToQueue(retryQueue, data, { persistent: true, + priority: SENDER_QUEUE_SEND_PRIORITY, headers: { 'x-retries': retries } }) } +async function generateTransactionResend({ data, channelWrapper, channel, queueName, deadLetterExchange }) { + const retryQueue = `${queueName}-check-tx-status` + await channel.assertQueue(retryQueue, { + durable: true, + deadLetterExchange, + messageTtl: TRANSACTION_RESEND_TIMEOUT, + expires: TRANSACTION_RESEND_TIMEOUT * 10, + maxPriority: SENDER_QUEUE_MAX_PRIORITY + }) + await channelWrapper.sendToQueue(retryQueue, data, { + priority: SENDER_QUEUE_CHECK_STATUS_PRIORITY, + persistent: true + }) +} + module.exports = { isAttached, connectWatcherToQueue, diff --git a/oracle/src/utils/constants.js b/oracle/src/utils/constants.js index 7d6fc24c..e1fcb20b 100644 --- a/oracle/src/utils/constants.js +++ b/oracle/src/utils/constants.js @@ -22,5 +22,9 @@ module.exports = { GAS_PRICE_BOUNDARIES: { MIN: 1, MAX: 250 - } + }, + TRANSACTION_RESEND_TIMEOUT: 20 * 60 * 1000, + SENDER_QUEUE_MAX_PRIORITY: 10, + SENDER_QUEUE_SEND_PRIORITY: 5, + SENDER_QUEUE_CHECK_STATUS_PRIORITY: 1 } diff --git a/oracle/src/utils/utils.js b/oracle/src/utils/utils.js index a191e2cb..8de376ff 100644 --- a/oracle/src/utils/utils.js +++ b/oracle/src/utils/utils.js @@ -33,7 +33,7 @@ async function waitForFunds(web3, address, minimumBalance, cb, logger) { async retry => { logger.debug('Getting balance of validator account') const newBalance = web3.utils.toBN(await web3.eth.getBalance(address)) - if (newBalance.gte(minimumBalance)) { + if (newBalance.gte(web3.utils.toBN(minimumBalance.toString(10)))) { logger.debug({ balance: newBalance, minimumBalance }, 'Validator has minimum necessary balance') cb(newBalance) } else {