Add unit test for retry queue
This commit is contained in:
parent
a466fe57dc
commit
ae6692abcd
@ -44,18 +44,13 @@ function connectSenderToQueue({ queueName, cb }) {
|
|||||||
ackMsg: job => channelWrapper.ack(job),
|
ackMsg: job => channelWrapper.ack(job),
|
||||||
nackMsg: job => channelWrapper.nack(job, false, true),
|
nackMsg: job => channelWrapper.nack(job, false, true),
|
||||||
scheduleForRetry: async (data, msgRetries = 0) => {
|
scheduleForRetry: async (data, msgRetries = 0) => {
|
||||||
const retries = msgRetries + 1
|
await generateRetry({
|
||||||
const delay = getRetrySequence(retries) * 1000
|
data,
|
||||||
const retryQueue = `${queueName}-retry-${delay}`
|
msgRetries,
|
||||||
await channel.assertQueue(retryQueue, {
|
channelWrapper,
|
||||||
durable: true,
|
channel,
|
||||||
deadLetterExchange,
|
queueName,
|
||||||
messageTtl: delay,
|
deadLetterExchange
|
||||||
expires: delay * 10
|
|
||||||
})
|
|
||||||
await channelWrapper.sendToQueue(retryQueue, data, {
|
|
||||||
persistent: true,
|
|
||||||
headers: { 'x-retries': retries }
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@ -64,8 +59,32 @@ function connectSenderToQueue({ queueName, cb }) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function generateRetry({
|
||||||
|
data,
|
||||||
|
msgRetries,
|
||||||
|
channelWrapper,
|
||||||
|
channel,
|
||||||
|
queueName,
|
||||||
|
deadLetterExchange
|
||||||
|
}) {
|
||||||
|
const retries = msgRetries + 1
|
||||||
|
const delay = getRetrySequence(retries) * 1000
|
||||||
|
const retryQueue = `${queueName}-retry-${delay}`
|
||||||
|
await channel.assertQueue(retryQueue, {
|
||||||
|
durable: true,
|
||||||
|
deadLetterExchange,
|
||||||
|
messageTtl: delay,
|
||||||
|
expires: delay * 10
|
||||||
|
})
|
||||||
|
await channelWrapper.sendToQueue(retryQueue, data, {
|
||||||
|
persistent: true,
|
||||||
|
headers: { 'x-retries': retries }
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
connectWatcherToQueue,
|
connectWatcherToQueue,
|
||||||
connectSenderToQueue,
|
connectSenderToQueue,
|
||||||
connection
|
connection,
|
||||||
|
generateRetry
|
||||||
}
|
}
|
||||||
|
152
oracle/test/amqp.test.js
Normal file
152
oracle/test/amqp.test.js
Normal file
@ -0,0 +1,152 @@
|
|||||||
|
const { expect } = require('chai')
|
||||||
|
const { generateRetry, connection } = require('../src/services/amqpClient')
|
||||||
|
|
||||||
|
connection.close()
|
||||||
|
|
||||||
|
describe('generateRetry', () => {
|
||||||
|
let channel
|
||||||
|
let channelWrapper
|
||||||
|
const data = [{}]
|
||||||
|
const queueName = 'test-queue'
|
||||||
|
const deadLetterExchange = `${queueName}-retry`
|
||||||
|
beforeEach(() => {
|
||||||
|
channel = {
|
||||||
|
assertQueue(queue, options) {
|
||||||
|
this.queue = queue
|
||||||
|
this.options = options
|
||||||
|
}
|
||||||
|
}
|
||||||
|
channelWrapper = {
|
||||||
|
sendToQueue(queue, data, options) {
|
||||||
|
this.queue = queue
|
||||||
|
this.data = data
|
||||||
|
this.options = options
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
it('should assert new queue and push the message', async () => {
|
||||||
|
// Given
|
||||||
|
const msgRetries = 0
|
||||||
|
const delay = 1000
|
||||||
|
|
||||||
|
// When
|
||||||
|
await generateRetry({
|
||||||
|
data,
|
||||||
|
msgRetries,
|
||||||
|
channelWrapper,
|
||||||
|
channel,
|
||||||
|
queueName,
|
||||||
|
deadLetterExchange
|
||||||
|
})
|
||||||
|
|
||||||
|
// Then
|
||||||
|
expect(channel.queue).to.equal(`${queueName}-retry-${delay}`)
|
||||||
|
expect(channel.options.messageTtl).to.equal(delay)
|
||||||
|
expect(channel.options.expires).to.equal(delay * 10)
|
||||||
|
expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1)
|
||||||
|
})
|
||||||
|
it('should increment delay on retries', async () => {
|
||||||
|
let msgRetries = 1
|
||||||
|
let delay = 2000
|
||||||
|
|
||||||
|
await generateRetry({
|
||||||
|
data,
|
||||||
|
msgRetries,
|
||||||
|
channelWrapper,
|
||||||
|
channel,
|
||||||
|
queueName,
|
||||||
|
deadLetterExchange
|
||||||
|
})
|
||||||
|
|
||||||
|
expect(channel.queue).to.equal(`${queueName}-retry-${delay}`)
|
||||||
|
expect(channel.options.messageTtl).to.equal(delay)
|
||||||
|
expect(channel.options.expires).to.equal(delay * 10)
|
||||||
|
expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1)
|
||||||
|
|
||||||
|
msgRetries = 2
|
||||||
|
delay = 3000
|
||||||
|
|
||||||
|
await generateRetry({
|
||||||
|
data,
|
||||||
|
msgRetries,
|
||||||
|
channelWrapper,
|
||||||
|
channel,
|
||||||
|
queueName,
|
||||||
|
deadLetterExchange
|
||||||
|
})
|
||||||
|
|
||||||
|
expect(channel.queue).to.equal(`${queueName}-retry-${delay}`)
|
||||||
|
expect(channel.options.messageTtl).to.equal(delay)
|
||||||
|
expect(channel.options.expires).to.equal(delay * 10)
|
||||||
|
expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1)
|
||||||
|
|
||||||
|
msgRetries = 4
|
||||||
|
delay = 8000
|
||||||
|
|
||||||
|
await generateRetry({
|
||||||
|
data,
|
||||||
|
msgRetries,
|
||||||
|
channelWrapper,
|
||||||
|
channel,
|
||||||
|
queueName,
|
||||||
|
deadLetterExchange
|
||||||
|
})
|
||||||
|
|
||||||
|
expect(channel.queue).to.equal(`${queueName}-retry-${delay}`)
|
||||||
|
expect(channel.options.messageTtl).to.equal(delay)
|
||||||
|
expect(channel.options.expires).to.equal(delay * 10)
|
||||||
|
expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1)
|
||||||
|
})
|
||||||
|
it('should have a max delay of 60 seconds', async () => {
|
||||||
|
let msgRetries = 10
|
||||||
|
let delay = 60000
|
||||||
|
|
||||||
|
await generateRetry({
|
||||||
|
data,
|
||||||
|
msgRetries,
|
||||||
|
channelWrapper,
|
||||||
|
channel,
|
||||||
|
queueName,
|
||||||
|
deadLetterExchange
|
||||||
|
})
|
||||||
|
|
||||||
|
expect(channel.queue).to.equal(`${queueName}-retry-${delay}`)
|
||||||
|
expect(channel.options.messageTtl).to.equal(delay)
|
||||||
|
expect(channel.options.expires).to.equal(delay * 10)
|
||||||
|
expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1)
|
||||||
|
|
||||||
|
msgRetries = 15
|
||||||
|
delay = 60000
|
||||||
|
|
||||||
|
await generateRetry({
|
||||||
|
data,
|
||||||
|
msgRetries,
|
||||||
|
channelWrapper,
|
||||||
|
channel,
|
||||||
|
queueName,
|
||||||
|
deadLetterExchange
|
||||||
|
})
|
||||||
|
|
||||||
|
expect(channel.queue).to.equal(`${queueName}-retry-${delay}`)
|
||||||
|
expect(channel.options.messageTtl).to.equal(delay)
|
||||||
|
expect(channel.options.expires).to.equal(delay * 10)
|
||||||
|
expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1)
|
||||||
|
|
||||||
|
msgRetries = 20
|
||||||
|
delay = 60000
|
||||||
|
|
||||||
|
await generateRetry({
|
||||||
|
data,
|
||||||
|
msgRetries,
|
||||||
|
channelWrapper,
|
||||||
|
channel,
|
||||||
|
queueName,
|
||||||
|
deadLetterExchange
|
||||||
|
})
|
||||||
|
|
||||||
|
expect(channel.queue).to.equal(`${queueName}-retry-${delay}`)
|
||||||
|
expect(channel.options.messageTtl).to.equal(delay)
|
||||||
|
expect(channel.options.expires).to.equal(delay * 10)
|
||||||
|
expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1)
|
||||||
|
})
|
||||||
|
})
|
@ -1,2 +1,3 @@
|
|||||||
HOME_RPC_URL=http://example.com
|
HOME_RPC_URL=http://example.com
|
||||||
FOREIGN_RPC_URL=http://example.com
|
FOREIGN_RPC_URL=http://example.com
|
||||||
|
QUEUE_URL=http://example.com
|
||||||
|
Loading…
Reference in New Issue
Block a user