Possibility to resend old pending transactions (#425)

This commit is contained in:
Kirill Fedoseev 2020-09-12 17:01:37 +03:00 committed by GitHub
parent 4954c859c3
commit 6fe63ae9f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 177 additions and 76 deletions

@ -24,7 +24,7 @@ module.exports = {
...baseConfig.bridgeConfig,
...baseConfig.foreignConfig,
event: 'UserRequestForAffirmation',
queue: 'home',
queue: 'home-prioritized',
name: `watcher-${id}`,
id
}

@ -73,6 +73,7 @@ const bridgeConfig = {
}
const homeConfig = {
chain: 'home',
eventContractAddress: process.env.COMMON_HOME_BRIDGE_ADDRESS,
eventAbi: homeAbi,
bridgeContractAddress: process.env.COMMON_HOME_BRIDGE_ADDRESS,
@ -83,6 +84,7 @@ const homeConfig = {
}
const foreignConfig = {
chain: 'foreign',
eventContractAddress: process.env.COMMON_FOREIGN_BRIDGE_ADDRESS,
eventAbi: foreignAbi,
bridgeContractAddress: process.env.COMMON_FOREIGN_BRIDGE_ADDRESS,

@ -6,7 +6,7 @@ module.exports = {
...baseConfig.bridgeConfig,
...baseConfig.homeConfig,
event: 'CollectedSignatures',
queue: 'foreign',
queue: 'foreign-prioritized',
name: `watcher-${id}`,
id
}

@ -14,7 +14,7 @@ module.exports = {
...baseConfig.bridgeConfig,
...baseConfig.foreignConfig,
workerQueue: 'convert-to-chai',
senderQueue: 'foreign',
senderQueue: 'foreign-prioritized',
name: `worker-${id}`,
id
}

@ -4,7 +4,8 @@ const { web3Foreign } = require('../src/services/web3')
module.exports = {
...baseConfig.bridgeConfig,
queue: 'foreign',
queue: 'foreign-prioritized',
oldQueue: 'foreign',
id: 'foreign',
name: 'sender-foreign',
web3: web3Foreign

@ -4,7 +4,8 @@ const { web3Home } = require('../src/services/web3')
module.exports = {
...baseConfig.bridgeConfig,
queue: 'home',
queue: 'home-prioritized',
oldQueue: 'home',
id: 'home',
name: 'sender-home',
web3: web3Home

@ -6,7 +6,7 @@ module.exports = {
...baseConfig.bridgeConfig,
...baseConfig.homeConfig,
event: 'UserRequestForSignature',
queue: 'home',
queue: 'home-prioritized',
name: `watcher-${id}`,
id
}

@ -41,7 +41,7 @@ module.exports = {
eventContractAddress: initialChecks.bridgeableTokenAddress,
eventAbi: ERC20_ABI,
eventFilter: { to: process.env.COMMON_FOREIGN_BRIDGE_ADDRESS },
queue: 'home',
queue: 'home-prioritized',
...workerQueueConfig,
name: `watcher-${id}`,
id

@ -138,8 +138,8 @@ async function main({ sendJob, txHash }) {
}
async function sendJobTx(jobs) {
const gasPrice = await GasPrice.start(config.queue, true)
const chainId = await getChainId(config.queue)
const gasPrice = await GasPrice.start(config.chain, true)
const chainId = await getChainId(config.chain)
let nonce = await getNonce(web3Instance, ORACLE_VALIDATOR_ADDRESS)
await syncForEach(jobs, async job => {
@ -153,7 +153,7 @@ async function sendJobTx(jobs) {
try {
logger.info(`Sending transaction with nonce ${nonce}`)
const txHash = await sendTx({
chain: config.queue,
chain: config.chain,
data: job.data,
nonce,
gasPrice: gasPrice.toString(10),

@ -1,5 +1,6 @@
require('../env')
const path = require('path')
const { toBN } = require('web3-utils')
const { connectSenderToQueue } = require('./services/amqpClient')
const { redis } = require('./services/redisClient')
const GasPrice = require('./services/gasPrice')
@ -45,6 +46,7 @@ async function initialize() {
chainId = await getChainId(config.id)
connectSenderToQueue({
queueName: config.queue,
oldQueueName: config.oldQueue,
cb: options => {
if (config.maxProcessingTime) {
return watchdog(() => main(options), config.maxProcessingTime, () => {
@ -88,7 +90,7 @@ function updateNonce(nonce) {
return redis.set(nonceKey, nonce)
}
async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry }) {
async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry, scheduleTransactionResend }) {
try {
if (redis.status !== 'ready') {
nackMsg(msg)
@ -103,8 +105,15 @@ async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry }) {
let insufficientFunds = false
let minimumBalance = null
const failedTx = []
const sentTx = []
logger.debug(`Sending ${txArray.length} transactions`)
const isResend = txArray.length > 0 && !!txArray[0].txHash
if (isResend) {
logger.debug(`Checking status of ${txArray.length} transactions`)
} else {
logger.debug(`Sending ${txArray.length} transactions`)
}
await syncForEach(txArray, async job => {
let gasLimit
if (typeof job.extraGas === 'number') {
@ -114,11 +123,37 @@ async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry }) {
}
try {
logger.info(`Sending transaction with nonce ${nonce}`)
let txNonce
if (isResend) {
const tx = await web3Instance.eth.getTransaction(job.txHash)
if (tx === null) {
logger.info(`Transaction ${job.txHash} was not found, dropping it`)
return
}
if (tx.blockNumber !== null) {
logger.info(`Transaction ${job.txHash} was successfully mined`)
return
}
logger.info(
`Previously sent transaction is stuck, updating gasPrice: ${tx.gasPrice} -> ${gasPrice.toString(10)}`
)
if (toBN(tx.gasPrice).gte(toBN(gasPrice))) {
logger.info("Gas price returned from the oracle didn't increase, will reinspect this transaction later")
sentTx.push(job)
return
}
txNonce = tx.nonce
} else {
txNonce = nonce++
}
logger.info(`Sending transaction with nonce ${txNonce}`)
const txHash = await sendTx({
chain: config.id,
data: job.data,
nonce,
nonce: txNonce,
gasPrice: gasPrice.toString(10),
amount: '0',
gasLimit,
@ -127,8 +162,11 @@ async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry }) {
chainId,
web3: web3Instance
})
sentTx.push({
...job,
txHash
})
nonce++
logger.info(
{ eventTransactionHash: job.transactionReference, generatedTransactionHash: txHash },
`Tx generated ${txHash} for event Tx ${job.transactionReference}`
@ -163,6 +201,10 @@ async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry }) {
logger.info(`Sending ${failedTx.length} Failed Tx to Queue`)
await scheduleForRetry(failedTx, msg.properties.headers['x-retries'])
}
if (sentTx.length) {
logger.info(`Sending ${sentTx.length} Tx Delayed Resend Requests to Queue`)
await scheduleTransactionResend(sentTx)
}
ackMsg(msg)
logger.debug(`Finished processing msg`)

@ -4,6 +4,12 @@ const dns = require('dns')
const connection = require('amqp-connection-manager').connect(process.env.ORACLE_QUEUE_URL)
const logger = require('./logger')
const { getRetrySequence } = require('../utils/utils')
const {
TRANSACTION_RESEND_TIMEOUT,
SENDER_QUEUE_MAX_PRIORITY,
SENDER_QUEUE_SEND_PRIORITY,
SENDER_QUEUE_CHECK_STATUS_PRIORITY
} = require('../utils/constants')
connection.on('connect', () => {
logger.info('Connected to amqp Broker')
@ -22,16 +28,18 @@ async function isAttached() {
}
function connectWatcherToQueue({ queueName, workerQueue, cb }) {
const queueList = workerQueue ? [queueName, workerQueue] : [queueName]
const channelWrapper = connection.createChannel({
json: true,
setup(channel) {
return Promise.all(queueList.map(queue => channel.assertQueue(queue, { durable: true })))
async setup(channel) {
await channel.assertQueue(queueName, { durable: true, maxPriority: SENDER_QUEUE_MAX_PRIORITY })
if (workerQueue) {
await channel.assertQueue(workerQueue, { durable: true })
}
}
})
const sendToQueue = data => channelWrapper.sendToQueue(queueName, data, { persistent: true })
const sendToQueue = data =>
channelWrapper.sendToQueue(queueName, data, { persistent: true, priority: SENDER_QUEUE_SEND_PRIORITY })
let sendToWorker
if (workerQueue) {
sendToWorker = data => channelWrapper.sendToQueue(workerQueue, data, { persistent: true })
@ -40,38 +48,60 @@ function connectWatcherToQueue({ queueName, workerQueue, cb }) {
cb({ sendToQueue, sendToWorker, channel: channelWrapper })
}
function connectSenderToQueue({ queueName, cb }) {
function connectSenderToQueue({ queueName, oldQueueName, cb }) {
const deadLetterExchange = `${queueName}-retry`
async function resendMessagesToNewQueue(channel) {
logger.info(`Trying to check messages in the old non-priority queue ${queueName}`)
while (true) {
const msg = await channel.get(oldQueueName)
if (msg === false) {
logger.info(`No messages in the old queue ${oldQueueName} left`)
break
}
logger.debug(`Message in the old queue ${oldQueueName} was found, redirecting it to the new queue ${queueName}`)
await channel.sendToQueue(queueName, msg.content, { persistent: true, priority: SENDER_QUEUE_SEND_PRIORITY })
await channel.ack(msg)
}
}
const channelWrapper = connection.createChannel({
json: true
})
channelWrapper.addSetup(channel => {
return Promise.all([
channel.assertExchange(deadLetterExchange, 'fanout', { durable: true }),
channel.assertQueue(queueName, { durable: true }),
channel.bindQueue(queueName, deadLetterExchange),
channel.prefetch(1),
channel.consume(queueName, msg =>
cb({
msg,
channel: channelWrapper,
ackMsg: job => channelWrapper.ack(job),
nackMsg: job => channelWrapper.nack(job, false, true),
scheduleForRetry: async (data, msgRetries = 0) => {
await generateRetry({
data,
msgRetries,
channelWrapper,
channel,
queueName,
deadLetterExchange
})
}
})
)
])
channelWrapper.addSetup(async channel => {
await channel.assertExchange(deadLetterExchange, 'fanout', { durable: true })
await channel.assertQueue(queueName, { durable: true, maxPriority: SENDER_QUEUE_MAX_PRIORITY })
await channel.assertQueue(oldQueueName, { durable: true }).then(() => resendMessagesToNewQueue(channel))
await channel.bindQueue(queueName, deadLetterExchange)
await channel.prefetch(1)
await channel.consume(queueName, msg =>
cb({
msg,
channel: channelWrapper,
ackMsg: job => channelWrapper.ack(job),
nackMsg: job => channelWrapper.nack(job, false, true),
scheduleForRetry: async (data, msgRetries = 0) => {
await generateRetry({
data,
msgRetries,
channelWrapper,
channel,
queueName,
deadLetterExchange
})
},
scheduleTransactionResend: async data => {
await generateTransactionResend({
data,
channelWrapper,
channel,
queueName,
deadLetterExchange
})
}
})
)
})
}
@ -82,52 +112,73 @@ function connectWorkerToQueue({ queueName, senderQueue, cb }) {
json: true
})
channelWrapper.addSetup(channel => {
return Promise.all([
channel.assertExchange(deadLetterExchange, 'fanout', { durable: true }),
channel.assertQueue(queueName, { durable: true }),
channel.assertQueue(senderQueue, { durable: true }),
channel.bindQueue(queueName, deadLetterExchange),
channel.prefetch(1),
channel.consume(queueName, msg =>
cb({
msg,
channel: channelWrapper,
ackMsg: job => channelWrapper.ack(job),
nackMsg: job => channelWrapper.nack(job, false, true),
sendToSenderQueue: data => channelWrapper.sendToQueue(senderQueue, data, { persistent: true }),
scheduleForRetry: async (data, msgRetries = 0) => {
await generateRetry({
data,
msgRetries,
channelWrapper,
channel,
queueName,
deadLetterExchange
})
}
})
)
])
channelWrapper.addSetup(async channel => {
await channel.assertExchange(deadLetterExchange, 'fanout', { durable: true })
await channel.assertQueue(queueName, { durable: true })
await channel.assertQueue(senderQueue, { durable: true, maxPriority: SENDER_QUEUE_MAX_PRIORITY })
await channel.bindQueue(queueName, deadLetterExchange)
await channel.prefetch(1)
await channel.consume(queueName, msg =>
cb({
msg,
channel: channelWrapper,
ackMsg: job => channelWrapper.ack(job),
nackMsg: job => channelWrapper.nack(job, false, true),
sendToSenderQueue: data =>
channelWrapper.sendToQueue(senderQueue, data, { persistent: true, priority: SENDER_QUEUE_SEND_PRIORITY }),
scheduleForRetry: async (data, msgRetries = 0) => {
await generateRetry({
data,
msgRetries,
channelWrapper,
channel,
queueName,
deadLetterExchange
})
}
})
)
})
}
async function generateRetry({ data, msgRetries, channelWrapper, channel, queueName, deadLetterExchange }) {
const retries = msgRetries + 1
const delay = getRetrySequence(retries) * 1000
// New retry queue is created, and one message is send to it.
// Nobody consumes messages from this queue, so eventually the message will be dropped.
// `messageTtl` defines a timeout after which the message will be dropped out of the queue.
// When message is dropped, it will be resend into the specified `deadLetterExchange` with the updated `x-retries` header.
const retryQueue = `${queueName}-retry-${delay}`
await channel.assertQueue(retryQueue, {
durable: true,
deadLetterExchange,
messageTtl: delay,
expires: delay * 10
expires: delay * 10,
maxPriority: SENDER_QUEUE_MAX_PRIORITY
})
await channelWrapper.sendToQueue(retryQueue, data, {
persistent: true,
priority: SENDER_QUEUE_SEND_PRIORITY,
headers: { 'x-retries': retries }
})
}
async function generateTransactionResend({ data, channelWrapper, channel, queueName, deadLetterExchange }) {
const retryQueue = `${queueName}-check-tx-status`
await channel.assertQueue(retryQueue, {
durable: true,
deadLetterExchange,
messageTtl: TRANSACTION_RESEND_TIMEOUT,
expires: TRANSACTION_RESEND_TIMEOUT * 10,
maxPriority: SENDER_QUEUE_MAX_PRIORITY
})
await channelWrapper.sendToQueue(retryQueue, data, {
priority: SENDER_QUEUE_CHECK_STATUS_PRIORITY,
persistent: true
})
}
module.exports = {
isAttached,
connectWatcherToQueue,

@ -22,5 +22,9 @@ module.exports = {
GAS_PRICE_BOUNDARIES: {
MIN: 1,
MAX: 250
}
},
TRANSACTION_RESEND_TIMEOUT: 20 * 60 * 1000,
SENDER_QUEUE_MAX_PRIORITY: 10,
SENDER_QUEUE_SEND_PRIORITY: 5,
SENDER_QUEUE_CHECK_STATUS_PRIORITY: 1
}

@ -33,7 +33,7 @@ async function waitForFunds(web3, address, minimumBalance, cb, logger) {
async retry => {
logger.debug('Getting balance of validator account')
const newBalance = web3.utils.toBN(await web3.eth.getBalance(address))
if (newBalance.gte(minimumBalance)) {
if (newBalance.gte(web3.utils.toBN(minimumBalance.toString(10)))) {
logger.debug({ balance: newBalance, minimumBalance }, 'Validator has minimum necessary balance')
cb(newBalance)
} else {