Fix remote shutdown sender behaviour (#550)

This commit is contained in:
Kirill Fedoseev 2021-04-14 23:35:43 +03:00 committed by GitHub
parent f65e8f9244
commit 9dfb0510c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 55 additions and 36 deletions

@ -13,6 +13,7 @@ const {
privateKeyToAddress, privateKeyToAddress,
syncForEach, syncForEach,
waitForFunds, waitForFunds,
waitForUnsuspend,
watchdog, watchdog,
nonceError nonceError
} = require('./utils/utils') } = require('./utils/utils')
@ -45,30 +46,40 @@ async function initialize() {
GasPrice.start(config.id) GasPrice.start(config.id)
chainId = await getChainId(web3Instance) chainId = await getChainId(web3Instance)
connectSenderToQueue({
queueName: config.queue,
oldQueueName: config.oldQueue,
resendInterval: config.resendInterval,
cb: options => {
if (config.maxProcessingTime) {
return watchdog(() => main(options), config.maxProcessingTime, () => {
logger.fatal('Max processing time reached')
process.exit(EXIT_CODES.MAX_TIME_REACHED)
})
}
return main(options) connectQueue()
}
})
} catch (e) { } catch (e) {
logger.error(e.message) logger.error(e.message)
process.exit(EXIT_CODES.GENERAL_ERROR) process.exit(EXIT_CODES.GENERAL_ERROR)
} }
} }
function connectQueue() {
connectSenderToQueue({
queueName: config.queue,
oldQueueName: config.oldQueue,
resendInterval: config.resendInterval,
cb: options => {
if (config.maxProcessingTime) {
return watchdog(() => main(options), config.maxProcessingTime, () => {
logger.fatal('Max processing time reached')
process.exit(EXIT_CODES.MAX_TIME_REACHED)
})
}
return main(options)
}
})
}
function resume(newBalance) { function resume(newBalance) {
logger.info(`Validator balance changed. New balance is ${newBalance}. Resume messages processing.`) logger.info(`Validator balance changed. New balance is ${newBalance}. Resume messages processing.`)
initialize() connectQueue()
}
function unsuspend() {
logger.info(`Oracle sender was unsuspended.`)
connectQueue()
} }
async function readNonce(forceUpdate) { async function readNonce(forceUpdate) {
@ -103,11 +114,10 @@ async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry, scheduleT
return return
} }
const wasShutdown = await getShutdownFlag(logger, config.shutdownKey, false)
if (await getShutdownFlag(logger, config.shutdownKey, true)) { if (await getShutdownFlag(logger, config.shutdownKey, true)) {
if (!wasShutdown) { logger.info('Oracle sender was suspended via the remote shutdown process')
logger.info('Oracle sender was suspended via the remote shutdown process') channel.close()
} waitForUnsuspend(() => getShutdownFlag(logger, config.shutdownKey, true), unsuspend)
return return
} }

@ -43,7 +43,7 @@ async function fetchShutdownFlag() {
} }
if (config.shutdownContractAddress) { if (config.shutdownContractAddress) {
const shutdownSelector = web3Side.eth.abi.encodeEventSignature(config.shutdownMethod) const shutdownSelector = web3Side.eth.abi.encodeFunctionSignature(config.shutdownMethod)
logger.debug( logger.debug(
{ contract: config.shutdownContractAddress, method: config.shutdownMethod, data: shutdownSelector }, { contract: config.shutdownContractAddress, method: config.shutdownMethod, data: shutdownSelector },
'Fetching shutdown status from contract' 'Fetching shutdown status from contract'

@ -29,24 +29,30 @@ function checkHTTPS(ORACLE_ALLOW_HTTP_FOR_RPC, logger) {
} }
} }
const promiseRetryForever = f => promiseRetry(f, { forever: true, factor: 1 })
async function waitForFunds(web3, address, minimumBalance, cb, logger) { async function waitForFunds(web3, address, minimumBalance, cb, logger) {
promiseRetry( promiseRetryForever(async retry => {
async retry => { logger.debug('Getting balance of validator account')
logger.debug('Getting balance of validator account') const newBalance = web3.utils.toBN(await web3.eth.getBalance(address))
const newBalance = web3.utils.toBN(await web3.eth.getBalance(address)) if (newBalance.gte(web3.utils.toBN(minimumBalance.toString(10)))) {
if (newBalance.gte(web3.utils.toBN(minimumBalance.toString(10)))) { logger.debug({ balance: newBalance, minimumBalance }, 'Validator has minimum necessary balance')
logger.debug({ balance: newBalance, minimumBalance }, 'Validator has minimum necessary balance') cb(newBalance)
cb(newBalance) } else {
} else { logger.debug({ balance: newBalance, minimumBalance }, 'Balance of validator is still less than the minimum')
logger.debug({ balance: newBalance, minimumBalance }, 'Balance of validator is still less than the minimum') retry()
retry()
}
},
{
forever: true,
factor: 1
} }
) })
}
async function waitForUnsuspend(getSuspendFlag, cb) {
promiseRetryForever(async retry => {
if (await getSuspendFlag()) {
retry()
} else {
cb()
}
})
} }
function addExtraGas(gas, extraPercentage, maxGasLimit = Infinity) { function addExtraGas(gas, extraPercentage, maxGasLimit = Infinity) {
@ -135,6 +141,7 @@ module.exports = {
syncForEach, syncForEach,
checkHTTPS, checkHTTPS,
waitForFunds, waitForFunds,
waitForUnsuspend,
addExtraGas, addExtraGas,
setIntervalAndRun, setIntervalAndRun,
watchdog, watchdog,

@ -164,6 +164,8 @@ async function main({ sendToQueue, sendToWorker }) {
logger.info('Oracle watcher was suspended via the remote shutdown process') logger.info('Oracle watcher was suspended via the remote shutdown process')
} }
return return
} else if (wasShutdown) {
logger.info(`Oracle watcher was unsuspended.`)
} }
await checkConditions() await checkConditions()