diff --git a/oracle/src/sender.js b/oracle/src/sender.js index 6bd8faa4..645c98d7 100644 --- a/oracle/src/sender.js +++ b/oracle/src/sender.js @@ -91,7 +91,7 @@ function updateNonce(nonce) { return redis.set(nonceKey, nonce) } -async function main({ msg, ackMsg, nackMsg, sendToQueue, channel }) { +async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry }) { try { if (redis.status !== 'ready') { nackMsg(msg) @@ -167,7 +167,7 @@ async function main({ msg, ackMsg, nackMsg, sendToQueue, channel }) { if (failedTx.length) { logger.info(`Sending ${failedTx.length} Failed Tx to Queue`) - await sendToQueue(failedTx) + await scheduleForRetry(failedTx, msg.properties.headers['x-retries']) } ackMsg(msg) logger.debug(`Finished processing msg`) diff --git a/oracle/src/services/amqpClient.js b/oracle/src/services/amqpClient.js index f16cabe6..0fc79b94 100644 --- a/oracle/src/services/amqpClient.js +++ b/oracle/src/services/amqpClient.js @@ -1,6 +1,7 @@ require('../../env') const connection = require('amqp-connection-manager').connect(process.env.QUEUE_URL) const logger = require('./logger') +const { getRetrySequence } = require('../utils/utils') connection.on('connect', () => { logger.info('Connected to amqp Broker') @@ -24,13 +25,17 @@ function connectWatcherToQueue({ queueName, cb }) { } function connectSenderToQueue({ queueName, cb }) { + const deadLetterExchange = `${queueName}-retry` + const channelWrapper = connection.createChannel({ json: true }) channelWrapper.addSetup(channel => { return Promise.all([ + channel.assertExchange(deadLetterExchange, 'fanout', { durable: true }), channel.assertQueue(queueName, { durable: true }), + channel.bindQueue(queueName, deadLetterExchange), channel.prefetch(1), channel.consume(queueName, msg => cb({ @@ -38,15 +43,48 @@ function connectSenderToQueue({ queueName, cb }) { channel: channelWrapper, ackMsg: job => channelWrapper.ack(job), nackMsg: job => channelWrapper.nack(job, false, true), - sendToQueue: data => channelWrapper.sendToQueue(queueName, data, { persistent: true }) + scheduleForRetry: async (data, msgRetries = 0) => { + await generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange + }) + } }) ) ]) }) } +async function generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange +}) { + const retries = msgRetries + 1 + const delay = getRetrySequence(retries) * 1000 + const retryQueue = `${queueName}-retry-${delay}` + await channel.assertQueue(retryQueue, { + durable: true, + deadLetterExchange, + messageTtl: delay, + expires: delay * 10 + }) + await channelWrapper.sendToQueue(retryQueue, data, { + persistent: true, + headers: { 'x-retries': retries } + }) +} + module.exports = { connectWatcherToQueue, connectSenderToQueue, - connection + connection, + generateRetry } diff --git a/oracle/src/utils/utils.js b/oracle/src/utils/utils.js index 8d59bdb1..124820fa 100644 --- a/oracle/src/utils/utils.js +++ b/oracle/src/utils/utils.js @@ -2,6 +2,14 @@ const BigNumber = require('bignumber.js') const promiseRetry = require('promise-retry') const Web3 = require('web3') +const retrySequence = [1, 2, 3, 5, 8, 13, 21, 34, 55, 60] + +function getRetrySequence(count) { + return count > retrySequence.length + ? retrySequence[retrySequence.length - 1] + : retrySequence[count - 1] +} + async function syncForEach(array, callback) { for (let index = 0; index < array.length; index++) { await callback(array[index], index, array) @@ -112,5 +120,6 @@ module.exports = { setIntervalAndRun, watchdog, privateKeyToAddress, - nonceError + nonceError, + getRetrySequence } diff --git a/oracle/test/amqp.test.js b/oracle/test/amqp.test.js new file mode 100644 index 00000000..2a6a78ba --- /dev/null +++ b/oracle/test/amqp.test.js @@ -0,0 +1,152 @@ +const { expect } = require('chai') +const { generateRetry, connection } = require('../src/services/amqpClient') + +connection.close() + +describe('generateRetry', () => { + let channel + let channelWrapper + const data = [{}] + const queueName = 'test-queue' + const deadLetterExchange = `${queueName}-retry` + beforeEach(() => { + channel = { + assertQueue(queue, options) { + this.queue = queue + this.options = options + } + } + channelWrapper = { + sendToQueue(queue, data, options) { + this.queue = queue + this.data = data + this.options = options + } + } + }) + it('should assert new queue and push the message', async () => { + // Given + const msgRetries = 0 + const delay = 1000 + + // When + await generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange + }) + + // Then + expect(channel.queue).to.equal(`${queueName}-retry-${delay}`) + expect(channel.options.messageTtl).to.equal(delay) + expect(channel.options.expires).to.equal(delay * 10) + expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1) + }) + it('should increment delay on retries', async () => { + let msgRetries = 1 + let delay = 2000 + + await generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange + }) + + expect(channel.queue).to.equal(`${queueName}-retry-${delay}`) + expect(channel.options.messageTtl).to.equal(delay) + expect(channel.options.expires).to.equal(delay * 10) + expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1) + + msgRetries = 2 + delay = 3000 + + await generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange + }) + + expect(channel.queue).to.equal(`${queueName}-retry-${delay}`) + expect(channel.options.messageTtl).to.equal(delay) + expect(channel.options.expires).to.equal(delay * 10) + expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1) + + msgRetries = 4 + delay = 8000 + + await generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange + }) + + expect(channel.queue).to.equal(`${queueName}-retry-${delay}`) + expect(channel.options.messageTtl).to.equal(delay) + expect(channel.options.expires).to.equal(delay * 10) + expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1) + }) + it('should have a max delay of 60 seconds', async () => { + let msgRetries = 10 + let delay = 60000 + + await generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange + }) + + expect(channel.queue).to.equal(`${queueName}-retry-${delay}`) + expect(channel.options.messageTtl).to.equal(delay) + expect(channel.options.expires).to.equal(delay * 10) + expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1) + + msgRetries = 15 + delay = 60000 + + await generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange + }) + + expect(channel.queue).to.equal(`${queueName}-retry-${delay}`) + expect(channel.options.messageTtl).to.equal(delay) + expect(channel.options.expires).to.equal(delay * 10) + expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1) + + msgRetries = 20 + delay = 60000 + + await generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange + }) + + expect(channel.queue).to.equal(`${queueName}-retry-${delay}`) + expect(channel.options.messageTtl).to.equal(delay) + expect(channel.options.expires).to.equal(delay * 10) + expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1) + }) +}) diff --git a/oracle/test/test.env b/oracle/test/test.env index 710c869c..c1ac6c46 100644 --- a/oracle/test/test.env +++ b/oracle/test/test.env @@ -1,2 +1,3 @@ HOME_RPC_URL=http://example.com FOREIGN_RPC_URL=http://example.com +QUEUE_URL=http://example.com