Merge branch 'master' into fix-coef-for-gaslimit-in-oracle

This commit is contained in:
Alexander Kolotov 2019-06-14 20:33:43 +03:00 committed by GitHub
commit 1737a1d6a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 205 additions and 10 deletions

@ -34,7 +34,6 @@ services:
build: build:
context: .. context: ..
dockerfile: oracle/Dockerfile dockerfile: oracle/Dockerfile
command: 'true'
env_file: ./.env env_file: ./.env
environment: environment:
- NODE_ENV=production - NODE_ENV=production
@ -50,7 +49,6 @@ services:
build: build:
context: .. context: ..
dockerfile: oracle/Dockerfile dockerfile: oracle/Dockerfile
command: 'true'
env_file: ./.env env_file: ./.env
environment: environment:
- NODE_ENV=production - NODE_ENV=production
@ -66,7 +64,6 @@ services:
build: build:
context: .. context: ..
dockerfile: oracle/Dockerfile dockerfile: oracle/Dockerfile
command: 'true'
env_file: ./.env env_file: ./.env
environment: environment:
- NODE_ENV=production - NODE_ENV=production
@ -82,7 +79,6 @@ services:
build: build:
context: .. context: ..
dockerfile: oracle/Dockerfile dockerfile: oracle/Dockerfile
command: 'true'
env_file: ./.env env_file: ./.env
environment: environment:
- NODE_ENV=production - NODE_ENV=production
@ -98,7 +94,6 @@ services:
build: build:
context: .. context: ..
dockerfile: oracle/Dockerfile dockerfile: oracle/Dockerfile
command: 'true'
env_file: ./.env env_file: ./.env
environment: environment:
- NODE_ENV=production - NODE_ENV=production

@ -91,7 +91,7 @@ function updateNonce(nonce) {
return redis.set(nonceKey, nonce) return redis.set(nonceKey, nonce)
} }
async function main({ msg, ackMsg, nackMsg, sendToQueue, channel }) { async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry }) {
try { try {
if (redis.status !== 'ready') { if (redis.status !== 'ready') {
nackMsg(msg) nackMsg(msg)
@ -167,7 +167,7 @@ async function main({ msg, ackMsg, nackMsg, sendToQueue, channel }) {
if (failedTx.length) { if (failedTx.length) {
logger.info(`Sending ${failedTx.length} Failed Tx to Queue`) logger.info(`Sending ${failedTx.length} Failed Tx to Queue`)
await sendToQueue(failedTx) await scheduleForRetry(failedTx, msg.properties.headers['x-retries'])
} }
ackMsg(msg) ackMsg(msg)
logger.debug(`Finished processing msg`) logger.debug(`Finished processing msg`)

@ -1,6 +1,7 @@
require('../../env') require('../../env')
const connection = require('amqp-connection-manager').connect(process.env.QUEUE_URL) const connection = require('amqp-connection-manager').connect(process.env.QUEUE_URL)
const logger = require('./logger') const logger = require('./logger')
const { getRetrySequence } = require('../utils/utils')
connection.on('connect', () => { connection.on('connect', () => {
logger.info('Connected to amqp Broker') logger.info('Connected to amqp Broker')
@ -24,13 +25,17 @@ function connectWatcherToQueue({ queueName, cb }) {
} }
function connectSenderToQueue({ queueName, cb }) { function connectSenderToQueue({ queueName, cb }) {
const deadLetterExchange = `${queueName}-retry`
const channelWrapper = connection.createChannel({ const channelWrapper = connection.createChannel({
json: true json: true
}) })
channelWrapper.addSetup(channel => { channelWrapper.addSetup(channel => {
return Promise.all([ return Promise.all([
channel.assertExchange(deadLetterExchange, 'fanout', { durable: true }),
channel.assertQueue(queueName, { durable: true }), channel.assertQueue(queueName, { durable: true }),
channel.bindQueue(queueName, deadLetterExchange),
channel.prefetch(1), channel.prefetch(1),
channel.consume(queueName, msg => channel.consume(queueName, msg =>
cb({ cb({
@ -38,15 +43,48 @@ function connectSenderToQueue({ queueName, cb }) {
channel: channelWrapper, channel: channelWrapper,
ackMsg: job => channelWrapper.ack(job), ackMsg: job => channelWrapper.ack(job),
nackMsg: job => channelWrapper.nack(job, false, true), nackMsg: job => channelWrapper.nack(job, false, true),
sendToQueue: data => channelWrapper.sendToQueue(queueName, data, { persistent: true }) scheduleForRetry: async (data, msgRetries = 0) => {
await generateRetry({
data,
msgRetries,
channelWrapper,
channel,
queueName,
deadLetterExchange
})
}
}) })
) )
]) ])
}) })
} }
async function generateRetry({
data,
msgRetries,
channelWrapper,
channel,
queueName,
deadLetterExchange
}) {
const retries = msgRetries + 1
const delay = getRetrySequence(retries) * 1000
const retryQueue = `${queueName}-retry-${delay}`
await channel.assertQueue(retryQueue, {
durable: true,
deadLetterExchange,
messageTtl: delay,
expires: delay * 10
})
await channelWrapper.sendToQueue(retryQueue, data, {
persistent: true,
headers: { 'x-retries': retries }
})
}
module.exports = { module.exports = {
connectWatcherToQueue, connectWatcherToQueue,
connectSenderToQueue, connectSenderToQueue,
connection connection,
generateRetry
} }

@ -2,6 +2,14 @@ const BigNumber = require('bignumber.js')
const promiseRetry = require('promise-retry') const promiseRetry = require('promise-retry')
const Web3 = require('web3') const Web3 = require('web3')
const retrySequence = [1, 2, 3, 5, 8, 13, 21, 34, 55, 60]
function getRetrySequence(count) {
return count > retrySequence.length
? retrySequence[retrySequence.length - 1]
: retrySequence[count - 1]
}
async function syncForEach(array, callback) { async function syncForEach(array, callback) {
for (let index = 0; index < array.length; index++) { for (let index = 0; index < array.length; index++) {
await callback(array[index], index, array) await callback(array[index], index, array)
@ -112,5 +120,6 @@ module.exports = {
setIntervalAndRun, setIntervalAndRun,
watchdog, watchdog,
privateKeyToAddress, privateKeyToAddress,
nonceError nonceError,
getRetrySequence
} }

152
oracle/test/amqp.test.js Normal file

@ -0,0 +1,152 @@
const { expect } = require('chai')
const { generateRetry, connection } = require('../src/services/amqpClient')
connection.close()
describe('generateRetry', () => {
let channel
let channelWrapper
const data = [{}]
const queueName = 'test-queue'
const deadLetterExchange = `${queueName}-retry`
beforeEach(() => {
channel = {
assertQueue(queue, options) {
this.queue = queue
this.options = options
}
}
channelWrapper = {
sendToQueue(queue, data, options) {
this.queue = queue
this.data = data
this.options = options
}
}
})
it('should assert new queue and push the message', async () => {
// Given
const msgRetries = 0
const delay = 1000
// When
await generateRetry({
data,
msgRetries,
channelWrapper,
channel,
queueName,
deadLetterExchange
})
// Then
expect(channel.queue).to.equal(`${queueName}-retry-${delay}`)
expect(channel.options.messageTtl).to.equal(delay)
expect(channel.options.expires).to.equal(delay * 10)
expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1)
})
it('should increment delay on retries', async () => {
let msgRetries = 1
let delay = 2000
await generateRetry({
data,
msgRetries,
channelWrapper,
channel,
queueName,
deadLetterExchange
})
expect(channel.queue).to.equal(`${queueName}-retry-${delay}`)
expect(channel.options.messageTtl).to.equal(delay)
expect(channel.options.expires).to.equal(delay * 10)
expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1)
msgRetries = 2
delay = 3000
await generateRetry({
data,
msgRetries,
channelWrapper,
channel,
queueName,
deadLetterExchange
})
expect(channel.queue).to.equal(`${queueName}-retry-${delay}`)
expect(channel.options.messageTtl).to.equal(delay)
expect(channel.options.expires).to.equal(delay * 10)
expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1)
msgRetries = 4
delay = 8000
await generateRetry({
data,
msgRetries,
channelWrapper,
channel,
queueName,
deadLetterExchange
})
expect(channel.queue).to.equal(`${queueName}-retry-${delay}`)
expect(channel.options.messageTtl).to.equal(delay)
expect(channel.options.expires).to.equal(delay * 10)
expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1)
})
it('should have a max delay of 60 seconds', async () => {
let msgRetries = 10
let delay = 60000
await generateRetry({
data,
msgRetries,
channelWrapper,
channel,
queueName,
deadLetterExchange
})
expect(channel.queue).to.equal(`${queueName}-retry-${delay}`)
expect(channel.options.messageTtl).to.equal(delay)
expect(channel.options.expires).to.equal(delay * 10)
expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1)
msgRetries = 15
delay = 60000
await generateRetry({
data,
msgRetries,
channelWrapper,
channel,
queueName,
deadLetterExchange
})
expect(channel.queue).to.equal(`${queueName}-retry-${delay}`)
expect(channel.options.messageTtl).to.equal(delay)
expect(channel.options.expires).to.equal(delay * 10)
expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1)
msgRetries = 20
delay = 60000
await generateRetry({
data,
msgRetries,
channelWrapper,
channel,
queueName,
deadLetterExchange
})
expect(channel.queue).to.equal(`${queueName}-retry-${delay}`)
expect(channel.options.messageTtl).to.equal(delay)
expect(channel.options.expires).to.equal(delay * 10)
expect(channelWrapper.options.headers['x-retries']).to.equal(msgRetries + 1)
})
})

@ -1,2 +1,3 @@
HOME_RPC_URL=http://example.com HOME_RPC_URL=http://example.com
FOREIGN_RPC_URL=http://example.com FOREIGN_RPC_URL=http://example.com
QUEUE_URL=http://example.com