From 9e5f185bce56ba17908ed829c586042baebfb292 Mon Sep 17 00:00:00 2001 From: Gerardo Nardelli Date: Tue, 11 Jun 2019 15:10:28 -0300 Subject: [PATCH 1/3] Improve oracle sender retry strategy --- oracle/src/sender.js | 4 ++-- oracle/src/services/amqpClient.js | 20 +++++++++++++++++++- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/oracle/src/sender.js b/oracle/src/sender.js index 6bd8faa4..645c98d7 100644 --- a/oracle/src/sender.js +++ b/oracle/src/sender.js @@ -91,7 +91,7 @@ function updateNonce(nonce) { return redis.set(nonceKey, nonce) } -async function main({ msg, ackMsg, nackMsg, sendToQueue, channel }) { +async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry }) { try { if (redis.status !== 'ready') { nackMsg(msg) @@ -167,7 +167,7 @@ async function main({ msg, ackMsg, nackMsg, sendToQueue, channel }) { if (failedTx.length) { logger.info(`Sending ${failedTx.length} Failed Tx to Queue`) - await sendToQueue(failedTx) + await scheduleForRetry(failedTx, msg.properties.headers['x-retries']) } ackMsg(msg) logger.debug(`Finished processing msg`) diff --git a/oracle/src/services/amqpClient.js b/oracle/src/services/amqpClient.js index f16cabe6..31eb2a62 100644 --- a/oracle/src/services/amqpClient.js +++ b/oracle/src/services/amqpClient.js @@ -24,13 +24,17 @@ function connectWatcherToQueue({ queueName, cb }) { } function connectSenderToQueue({ queueName, cb }) { + const deadLetterExchange = `${queueName}-retry` + const channelWrapper = connection.createChannel({ json: true }) channelWrapper.addSetup(channel => { return Promise.all([ + channel.assertExchange(deadLetterExchange, 'fanout', { durable: true }), channel.assertQueue(queueName, { durable: true }), + channel.bindQueue(queueName, deadLetterExchange), channel.prefetch(1), channel.consume(queueName, msg => cb({ @@ -38,7 +42,21 @@ function connectSenderToQueue({ queueName, cb }) { channel: channelWrapper, ackMsg: job => channelWrapper.ack(job), nackMsg: job => channelWrapper.nack(job, false, true), - sendToQueue: data => channelWrapper.sendToQueue(queueName, data, { persistent: true }) + scheduleForRetry: async (data, msgRetries = 0) => { + const retries = msgRetries + 1 + const delay = retries ** 2 * 1000 + const retryQueue = `${queueName}-retry-${delay}` + await channel.assertQueue(retryQueue, { + durable: true, + deadLetterExchange, + messageTtl: delay, + expires: delay * 10 + }) + await channelWrapper.sendToQueue(retryQueue, data, { + persistent: true, + headers: { 'x-retries': retries } + }) + } }) ) ]) From a466fe57dcfc9feb95acbf3421ece11f4980a5c6 Mon Sep 17 00:00:00 2001 From: Gerardo Nardelli Date: Wed, 12 Jun 2019 16:14:43 -0300 Subject: [PATCH 2/3] Limit retry delay --- oracle/src/services/amqpClient.js | 3 ++- oracle/src/utils/utils.js | 11 ++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/oracle/src/services/amqpClient.js b/oracle/src/services/amqpClient.js index 31eb2a62..89ee680d 100644 --- a/oracle/src/services/amqpClient.js +++ b/oracle/src/services/amqpClient.js @@ -1,6 +1,7 @@ require('../../env') const connection = require('amqp-connection-manager').connect(process.env.QUEUE_URL) const logger = require('./logger') +const { getRetrySequence } = require('../utils/utils') connection.on('connect', () => { logger.info('Connected to amqp Broker') @@ -44,7 +45,7 @@ function connectSenderToQueue({ queueName, cb }) { nackMsg: job => channelWrapper.nack(job, false, true), scheduleForRetry: async (data, msgRetries = 0) => { const retries = msgRetries + 1 - const delay = retries ** 2 * 1000 + const delay = getRetrySequence(retries) * 1000 const retryQueue = `${queueName}-retry-${delay}` await channel.assertQueue(retryQueue, { durable: true, diff --git a/oracle/src/utils/utils.js b/oracle/src/utils/utils.js index 8d59bdb1..124820fa 100644 --- a/oracle/src/utils/utils.js +++ b/oracle/src/utils/utils.js @@ -2,6 +2,14 @@ const BigNumber = require('bignumber.js') const promiseRetry = require('promise-retry') const Web3 = require('web3') +const retrySequence = [1, 2, 3, 5, 8, 13, 21, 34, 55, 60] + +function getRetrySequence(count) { + return count > retrySequence.length + ? retrySequence[retrySequence.length - 1] + : retrySequence[count - 1] +} + async function syncForEach(array, callback) { for (let index = 0; index < array.length; index++) { await callback(array[index], index, array) @@ -112,5 +120,6 @@ module.exports = { setIntervalAndRun, watchdog, privateKeyToAddress, - nonceError + nonceError, + getRetrySequence } From ae6692abcd20b45a4bdb0cb67928feb316e27746 Mon Sep 17 00:00:00 2001 From: Gerardo Nardelli Date: Thu, 13 Jun 2019 14:29:40 -0300 Subject: [PATCH 3/3] Add unit test for retry queue --- oracle/src/services/amqpClient.js | 45 ++++++--- oracle/test/amqp.test.js | 152 ++++++++++++++++++++++++++++++ oracle/test/test.env | 1 + 3 files changed, 185 insertions(+), 13 deletions(-) create mode 100644 oracle/test/amqp.test.js diff --git a/oracle/src/services/amqpClient.js b/oracle/src/services/amqpClient.js index 89ee680d..0fc79b94 100644 --- a/oracle/src/services/amqpClient.js +++ b/oracle/src/services/amqpClient.js @@ -44,18 +44,13 @@ function connectSenderToQueue({ queueName, cb }) { ackMsg: job => channelWrapper.ack(job), nackMsg: job => channelWrapper.nack(job, false, true), scheduleForRetry: async (data, msgRetries = 0) => { - const retries = msgRetries + 1 - const delay = getRetrySequence(retries) * 1000 - const retryQueue = `${queueName}-retry-${delay}` - await channel.assertQueue(retryQueue, { - durable: true, - deadLetterExchange, - messageTtl: delay, - expires: delay * 10 - }) - await channelWrapper.sendToQueue(retryQueue, data, { - persistent: true, - headers: { 'x-retries': retries } + await generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange }) } }) @@ -64,8 +59,32 @@ function connectSenderToQueue({ queueName, cb }) { }) } +async function generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange +}) { + const retries = msgRetries + 1 + const delay = getRetrySequence(retries) * 1000 + const retryQueue = `${queueName}-retry-${delay}` + await channel.assertQueue(retryQueue, { + durable: true, + deadLetterExchange, + messageTtl: delay, + expires: delay * 10 + }) + await channelWrapper.sendToQueue(retryQueue, data, { + persistent: true, + headers: { 'x-retries': retries } + }) +} + module.exports = { connectWatcherToQueue, connectSenderToQueue, - connection + connection, + generateRetry } diff --git a/oracle/test/amqp.test.js b/oracle/test/amqp.test.js new file mode 100644 index 00000000..2a6a78ba --- /dev/null +++ b/oracle/test/amqp.test.js @@ -0,0 +1,152 @@ +const { expect } = require('chai') +const { generateRetry, connection } = require('../src/services/amqpClient') + +connection.close() + +describe('generateRetry', () => { + let channel + let channelWrapper + const data = [{}] + const queueName = 'test-queue' + const deadLetterExchange = `${queueName}-retry` + beforeEach(() => { + channel = { + assertQueue(queue, options) { + this.queue = queue + this.options = options + } + } + channelWrapper = { + sendToQueue(queue, data, options) { + this.queue = queue + this.data = data + this.options = options + } + } + }) + it('should assert new queue and push the message', async () => { + // Given + const msgRetries = 0 + const delay = 1000 + + // When + await generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange + }) + + // Then + expect(channel.queue).to.equal(`${queueName}-retry-${delay}`) + expect(channel.options.messageTtl).to.equal(delay) + expect(channel.options.expires).to.equal(delay * 10) + expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1) + }) + it('should increment delay on retries', async () => { + let msgRetries = 1 + let delay = 2000 + + await generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange + }) + + expect(channel.queue).to.equal(`${queueName}-retry-${delay}`) + expect(channel.options.messageTtl).to.equal(delay) + expect(channel.options.expires).to.equal(delay * 10) + expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1) + + msgRetries = 2 + delay = 3000 + + await generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange + }) + + expect(channel.queue).to.equal(`${queueName}-retry-${delay}`) + expect(channel.options.messageTtl).to.equal(delay) + expect(channel.options.expires).to.equal(delay * 10) + expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1) + + msgRetries = 4 + delay = 8000 + + await generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange + }) + + expect(channel.queue).to.equal(`${queueName}-retry-${delay}`) + expect(channel.options.messageTtl).to.equal(delay) + expect(channel.options.expires).to.equal(delay * 10) + expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1) + }) + it('should have a max delay of 60 seconds', async () => { + let msgRetries = 10 + let delay = 60000 + + await generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange + }) + + expect(channel.queue).to.equal(`${queueName}-retry-${delay}`) + expect(channel.options.messageTtl).to.equal(delay) + expect(channel.options.expires).to.equal(delay * 10) + expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1) + + msgRetries = 15 + delay = 60000 + + await generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange + }) + + expect(channel.queue).to.equal(`${queueName}-retry-${delay}`) + expect(channel.options.messageTtl).to.equal(delay) + expect(channel.options.expires).to.equal(delay * 10) + expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1) + + msgRetries = 20 + delay = 60000 + + await generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange + }) + + expect(channel.queue).to.equal(`${queueName}-retry-${delay}`) + expect(channel.options.messageTtl).to.equal(delay) + expect(channel.options.expires).to.equal(delay * 10) + expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1) + }) +}) diff --git a/oracle/test/test.env b/oracle/test/test.env index 710c869c..c1ac6c46 100644 --- a/oracle/test/test.env +++ b/oracle/test/test.env @@ -1,2 +1,3 @@ HOME_RPC_URL=http://example.com FOREIGN_RPC_URL=http://example.com +QUEUE_URL=http://example.com