From 9dfb0510c451248ebeb2645b9e3f76cd0e976895 Mon Sep 17 00:00:00 2001 From: Kirill Fedoseev Date: Wed, 14 Apr 2021 23:35:43 +0300 Subject: [PATCH] Fix remote shutdown sender behaviour (#550) --- oracle/src/sender.js | 48 +++++++++++++++++++++-------------- oracle/src/shutdownManager.js | 2 +- oracle/src/utils/utils.js | 39 ++++++++++++++++------------ oracle/src/watcher.js | 2 ++ 4 files changed, 55 insertions(+), 36 deletions(-) diff --git a/oracle/src/sender.js b/oracle/src/sender.js index fa8d13e3..de93ae08 100644 --- a/oracle/src/sender.js +++ b/oracle/src/sender.js @@ -13,6 +13,7 @@ const { privateKeyToAddress, syncForEach, waitForFunds, + waitForUnsuspend, watchdog, nonceError } = require('./utils/utils') @@ -45,30 +46,40 @@ async function initialize() { GasPrice.start(config.id) chainId = await getChainId(web3Instance) - connectSenderToQueue({ - queueName: config.queue, - oldQueueName: config.oldQueue, - resendInterval: config.resendInterval, - cb: options => { - if (config.maxProcessingTime) { - return watchdog(() => main(options), config.maxProcessingTime, () => { - logger.fatal('Max processing time reached') - process.exit(EXIT_CODES.MAX_TIME_REACHED) - }) - } - return main(options) - } - }) + connectQueue() } catch (e) { logger.error(e.message) process.exit(EXIT_CODES.GENERAL_ERROR) } } +function connectQueue() { + connectSenderToQueue({ + queueName: config.queue, + oldQueueName: config.oldQueue, + resendInterval: config.resendInterval, + cb: options => { + if (config.maxProcessingTime) { + return watchdog(() => main(options), config.maxProcessingTime, () => { + logger.fatal('Max processing time reached') + process.exit(EXIT_CODES.MAX_TIME_REACHED) + }) + } + + return main(options) + } + }) +} + function resume(newBalance) { logger.info(`Validator balance changed. New balance is ${newBalance}. Resume messages processing.`) - initialize() + connectQueue() +} + +function unsuspend() { + logger.info(`Oracle sender was unsuspended.`) + connectQueue() } async function readNonce(forceUpdate) { @@ -103,11 +114,10 @@ async function main({ msg, ackMsg, nackMsg, channel, scheduleForRetry, scheduleT return } - const wasShutdown = await getShutdownFlag(logger, config.shutdownKey, false) if (await getShutdownFlag(logger, config.shutdownKey, true)) { - if (!wasShutdown) { - logger.info('Oracle sender was suspended via the remote shutdown process') - } + logger.info('Oracle sender was suspended via the remote shutdown process') + channel.close() + waitForUnsuspend(() => getShutdownFlag(logger, config.shutdownKey, true), unsuspend) return } diff --git a/oracle/src/shutdownManager.js b/oracle/src/shutdownManager.js index ba99ec31..9ffc13cb 100644 --- a/oracle/src/shutdownManager.js +++ b/oracle/src/shutdownManager.js @@ -43,7 +43,7 @@ async function fetchShutdownFlag() { } if (config.shutdownContractAddress) { - const shutdownSelector = web3Side.eth.abi.encodeEventSignature(config.shutdownMethod) + const shutdownSelector = web3Side.eth.abi.encodeFunctionSignature(config.shutdownMethod) logger.debug( { contract: config.shutdownContractAddress, method: config.shutdownMethod, data: shutdownSelector }, 'Fetching shutdown status from contract' diff --git a/oracle/src/utils/utils.js b/oracle/src/utils/utils.js index a7052d46..5e2fa3b3 100644 --- a/oracle/src/utils/utils.js +++ b/oracle/src/utils/utils.js @@ -29,24 +29,30 @@ function checkHTTPS(ORACLE_ALLOW_HTTP_FOR_RPC, logger) { } } +const promiseRetryForever = f => promiseRetry(f, { forever: true, factor: 1 }) + async function waitForFunds(web3, address, minimumBalance, cb, logger) { - promiseRetry( - async retry => { - logger.debug('Getting balance of validator account') - const newBalance = web3.utils.toBN(await web3.eth.getBalance(address)) - if (newBalance.gte(web3.utils.toBN(minimumBalance.toString(10)))) { - logger.debug({ balance: newBalance, minimumBalance }, 'Validator has minimum necessary balance') - cb(newBalance) - } else { - logger.debug({ balance: newBalance, minimumBalance }, 'Balance of validator is still less than the minimum') - retry() - } - }, - { - forever: true, - factor: 1 + promiseRetryForever(async retry => { + logger.debug('Getting balance of validator account') + const newBalance = web3.utils.toBN(await web3.eth.getBalance(address)) + if (newBalance.gte(web3.utils.toBN(minimumBalance.toString(10)))) { + logger.debug({ balance: newBalance, minimumBalance }, 'Validator has minimum necessary balance') + cb(newBalance) + } else { + logger.debug({ balance: newBalance, minimumBalance }, 'Balance of validator is still less than the minimum') + retry() } - ) + }) +} + +async function waitForUnsuspend(getSuspendFlag, cb) { + promiseRetryForever(async retry => { + if (await getSuspendFlag()) { + retry() + } else { + cb() + } + }) } function addExtraGas(gas, extraPercentage, maxGasLimit = Infinity) { @@ -135,6 +141,7 @@ module.exports = { syncForEach, checkHTTPS, waitForFunds, + waitForUnsuspend, addExtraGas, setIntervalAndRun, watchdog, diff --git a/oracle/src/watcher.js b/oracle/src/watcher.js index 1127f86d..4e6e1cde 100644 --- a/oracle/src/watcher.js +++ b/oracle/src/watcher.js @@ -164,6 +164,8 @@ async function main({ sendToQueue, sendToWorker }) { logger.info('Oracle watcher was suspended via the remote shutdown process') } return + } else if (wasShutdown) { + logger.info(`Oracle watcher was unsuspended.`) } await checkConditions()