Improve oracle sender retry strategy
This commit is contained in:
parent
81a936af45
commit
9e5f185bce
@ -91,7 +91,7 @@ function updateNonce(nonce) {
|
|||||||
return redis.set(nonceKey, nonce)
|
return redis.set(nonceKey, nonce)
|
||||||
}
|
}
|
||||||
|
|
||||||
async function main({ msg, ackMsg, nackMsg, sendToQueue, channel }) {
|
async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry }) {
|
||||||
try {
|
try {
|
||||||
if (redis.status !== 'ready') {
|
if (redis.status !== 'ready') {
|
||||||
nackMsg(msg)
|
nackMsg(msg)
|
||||||
@ -167,7 +167,7 @@ async function main({ msg, ackMsg, nackMsg, sendToQueue, channel }) {
|
|||||||
|
|
||||||
if (failedTx.length) {
|
if (failedTx.length) {
|
||||||
logger.info(`Sending ${failedTx.length} Failed Tx to Queue`)
|
logger.info(`Sending ${failedTx.length} Failed Tx to Queue`)
|
||||||
await sendToQueue(failedTx)
|
await scheduleForRetry(failedTx, msg.properties.headers['x-retries'])
|
||||||
}
|
}
|
||||||
ackMsg(msg)
|
ackMsg(msg)
|
||||||
logger.debug(`Finished processing msg`)
|
logger.debug(`Finished processing msg`)
|
||||||
|
@ -24,13 +24,17 @@ function connectWatcherToQueue({ queueName, cb }) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
function connectSenderToQueue({ queueName, cb }) {
|
function connectSenderToQueue({ queueName, cb }) {
|
||||||
|
const deadLetterExchange = `${queueName}-retry`
|
||||||
|
|
||||||
const channelWrapper = connection.createChannel({
|
const channelWrapper = connection.createChannel({
|
||||||
json: true
|
json: true
|
||||||
})
|
})
|
||||||
|
|
||||||
channelWrapper.addSetup(channel => {
|
channelWrapper.addSetup(channel => {
|
||||||
return Promise.all([
|
return Promise.all([
|
||||||
|
channel.assertExchange(deadLetterExchange, 'fanout', { durable: true }),
|
||||||
channel.assertQueue(queueName, { durable: true }),
|
channel.assertQueue(queueName, { durable: true }),
|
||||||
|
channel.bindQueue(queueName, deadLetterExchange),
|
||||||
channel.prefetch(1),
|
channel.prefetch(1),
|
||||||
channel.consume(queueName, msg =>
|
channel.consume(queueName, msg =>
|
||||||
cb({
|
cb({
|
||||||
@ -38,7 +42,21 @@ function connectSenderToQueue({ queueName, cb }) {
|
|||||||
channel: channelWrapper,
|
channel: channelWrapper,
|
||||||
ackMsg: job => channelWrapper.ack(job),
|
ackMsg: job => channelWrapper.ack(job),
|
||||||
nackMsg: job => channelWrapper.nack(job, false, true),
|
nackMsg: job => channelWrapper.nack(job, false, true),
|
||||||
sendToQueue: data => channelWrapper.sendToQueue(queueName, data, { persistent: true })
|
scheduleForRetry: async (data, msgRetries = 0) => {
|
||||||
|
const retries = msgRetries + 1
|
||||||
|
const delay = retries ** 2 * 1000
|
||||||
|
const retryQueue = `${queueName}-retry-${delay}`
|
||||||
|
await channel.assertQueue(retryQueue, {
|
||||||
|
durable: true,
|
||||||
|
deadLetterExchange,
|
||||||
|
messageTtl: delay,
|
||||||
|
expires: delay * 10
|
||||||
|
})
|
||||||
|
await channelWrapper.sendToQueue(retryQueue, data, {
|
||||||
|
persistent: true,
|
||||||
|
headers: { 'x-retries': retries }
|
||||||
|
})
|
||||||
|
}
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
])
|
])
|
||||||
|
Loading…
Reference in New Issue
Block a user