Fixed resent job for failed transactions (#466)

This commit is contained in:
Kirill Fedoseev 2020-10-06 15:15:35 +03:00 committed by GitHub
parent 46daeb6815
commit 74293959f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -98,13 +98,13 @@ async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry, scheduleT
const txArray = JSON.parse(msg.content) const txArray = JSON.parse(msg.content)
logger.debug(`Msg received with ${txArray.length} Tx to send`) logger.debug(`Msg received with ${txArray.length} Tx to send`)
const gasPrice = GasPrice.getPrice() const gasPrice = GasPrice.getPrice().toString(10)
let nonce let nonce
let insufficientFunds = false let insufficientFunds = false
let minimumBalance = null let minimumBalance = null
const failedTx = [] const failedTx = []
const sentTx = [] const resendJobs = []
const isResend = txArray.length > 0 && !!txArray[0].txHash const isResend = txArray.length > 0 && !!txArray[0].txHash
@ -136,17 +136,14 @@ async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry, scheduleT
nonce = await readNonce(true) nonce = await readNonce(true)
} }
logger.info( logger.info(`Transaction ${job.txHash} was not mined, updating gasPrice: ${job.gasPrice} -> ${gasPrice}`)
`Transaction ${job.txHash} was not mined, updating gasPrice: ${job.gasPrice} -> ${gasPrice.toString(10)}`
)
} }
logger.info(`Sending transaction with nonce ${nonce}`) logger.info(`Sending transaction with nonce ${nonce}`)
job.gasPrice = gasPrice.toString(10) const txHash = await sendTx({
job.txHash = await sendTx({
chain: config.id, chain: config.id,
data: job.data, data: job.data,
nonce, nonce,
gasPrice: job.gasPrice, gasPrice,
amount: '0', amount: '0',
gasLimit, gasLimit,
privateKey: ORACLE_VALIDATOR_ADDRESS_PRIVATE_KEY, privateKey: ORACLE_VALIDATOR_ADDRESS_PRIVATE_KEY,
@ -154,12 +151,17 @@ async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry, scheduleT
chainId, chainId,
web3: web3Instance web3: web3Instance
}) })
sentTx.push(job) const resendJob = {
...job,
txHash,
gasPrice
}
resendJobs.push(resendJob)
nonce++ nonce++
logger.info( logger.info(
{ eventTransactionHash: job.transactionReference, generatedTransactionHash: job.txHash }, { eventTransactionHash: job.transactionReference, generatedTransactionHash: txHash },
`Tx generated ${job.txHash} for event Tx ${job.transactionReference}` `Tx generated ${txHash} for event Tx ${job.transactionReference}`
) )
} catch (e) { } catch (e) {
logger.error( logger.error(
@ -168,7 +170,11 @@ async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry, scheduleT
e.message e.message
) )
if (!e.message.toLowerCase().includes('transaction with the same hash was already imported')) { if (!e.message.toLowerCase().includes('transaction with the same hash was already imported')) {
failedTx.push(job) if (isResend) {
resendJobs.push(job)
} else {
failedTx.push(job)
}
} }
if (e.message.toLowerCase().includes('insufficient funds')) { if (e.message.toLowerCase().includes('insufficient funds')) {
@ -191,9 +197,9 @@ async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry, scheduleT
logger.info(`Sending ${failedTx.length} Failed Tx to Queue`) logger.info(`Sending ${failedTx.length} Failed Tx to Queue`)
await scheduleForRetry(failedTx, msg.properties.headers['x-retries']) await scheduleForRetry(failedTx, msg.properties.headers['x-retries'])
} }
if (sentTx.length) { if (resendJobs.length) {
logger.info(`Sending ${sentTx.length} Tx Delayed Resend Requests to Queue`) logger.info(`Sending ${resendJobs.length} Tx Delayed Resend Requests to Queue`)
await scheduleTransactionResend(sentTx) await scheduleTransactionResend(resendJobs)
} }
ackMsg(msg) ackMsg(msg)
logger.debug(`Finished processing msg`) logger.debug(`Finished processing msg`)