diff --git a/oracle/config/half-duplex-transfer-watcher.config.js b/oracle/config/half-duplex-transfer-watcher.config.js index da909e1e..8114e882 100644 --- a/oracle/config/half-duplex-transfer-watcher.config.js +++ b/oracle/config/half-duplex-transfer-watcher.config.js @@ -32,6 +32,7 @@ module.exports = { eventAbi: ERC20_ABI, eventFilter: { to: process.env.COMMON_FOREIGN_BRIDGE_ADDRESS }, queue: 'home', + workerQueue: 'swap-tokens', name: `watcher-${id}`, id } diff --git a/oracle/config/swap-tokens-worker.config.js b/oracle/config/swap-tokens-worker.config.js new file mode 100644 index 00000000..bc34af89 --- /dev/null +++ b/oracle/config/swap-tokens-worker.config.js @@ -0,0 +1,12 @@ +const baseConfig = require('./base.config') + +const id = `${baseConfig.id}-swap-tokens` + +module.exports = { + ...baseConfig.bridgeConfig, + ...baseConfig.foreignConfig, + workerQueue: 'swap-tokens', + senderQueue: 'foreign', + name: `worker-${id}`, + id +} diff --git a/oracle/package.json b/oracle/package.json index 51ce591c..83f168ce 100644 --- a/oracle/package.json +++ b/oracle/package.json @@ -10,9 +10,10 @@ "watcher:affirmation-request": "./scripts/start-worker.sh watcher affirmation-request-watcher", "watcher:transfer": "./scripts/start-worker.sh watcher transfer-watcher", "watcher:half-duplex-transfer": "./scripts/start-worker.sh watcher half-duplex-transfer-watcher", + "worker:swap-tokens": "./scripts/start-worker.sh worker swap-tokens-worker", "sender:home": "./scripts/start-worker.sh sender home-sender", "sender:foreign": "./scripts/start-worker.sh sender foreign-sender", - "dev": "concurrently -n 'watcher:signature-request,watcher:collected-signatures,watcher:affirmation-request,watcher:transfer,watcher:half-duplex-transfer,sender:home,sender:foreign' -c 'red,green,yellow,blue,white,magenta,cyan' 'yarn watcher:signature-request' 'yarn watcher:collected-signatures' 'yarn watcher:affirmation-request' 'yarn watcher:transfer' 'yarn watcher:half-duplex-transfer' 'yarn sender:home' 'yarn sender:foreign'", + "dev": "concurrently -n 'watcher:signature-request,watcher:collected-signatures,watcher:affirmation-request,watcher:transfer,watcher:half-duplex-transfer, worker:swap-tokens, sender:home,sender:foreign' -c 'red,green,yellow,blue,white,gray,magenta,cyan' 'yarn watcher:signature-request' 'yarn watcher:collected-signatures' 'yarn watcher:affirmation-request' 'yarn watcher:transfer' 'yarn watcher:half-duplex-transfer' 'yarn worker:swap-tokens' 'yarn sender:home' 'yarn sender:foreign'", "test": "NODE_ENV=test mocha", "test:watch": "NODE_ENV=test mocha --watch --reporter=min", "coverage": "NODE_ENV=test nyc --reporter=text --reporter=html mocha", diff --git a/oracle/src/events/processHalfDuplexTransfers/index.js b/oracle/src/events/processHalfDuplexTransfers/index.js index cdffd63f..bbbd22d1 100644 --- a/oracle/src/events/processHalfDuplexTransfers/index.js +++ b/oracle/src/events/processHalfDuplexTransfers/index.js @@ -22,7 +22,7 @@ function processTransfersBuilder(config) { const userRequestForAffirmationHash = web3Home.eth.abi.encodeEventSignature(userRequestForAffirmationAbi) const tokensSwappedHash = tokensSwappedAbi ? web3Home.eth.abi.encodeEventSignature(tokensSwappedAbi) : '0x' - return async function processTransfers(transfers) { + return async function processTransfers(transfers, blockNumber) { const txToSend = [] if (validatorContract === null) { @@ -44,7 +44,7 @@ function processTransfersBuilder(config) { logger.info({ from, value }, `Processing transfer ${transfer.transactionHash}`) - const block = await web3Foreign.eth.getBlock('latest') + const block = await web3Foreign.eth.getBlock(blockNumber) const tokenSwapAllowed = await foreignBridge.methods.isTokenSwapAllowed(block.timestamp) if (!tokenSwapAllowed) { diff --git a/oracle/src/services/amqpClient.js b/oracle/src/services/amqpClient.js index fddbd35d..9dc2b2c8 100644 --- a/oracle/src/services/amqpClient.js +++ b/oracle/src/services/amqpClient.js @@ -11,17 +11,23 @@ connection.on('disconnect', () => { logger.error('Disconnected from amqp Broker') }) -function connectWatcherToQueue({ queueName, cb }) { +function connectWatcherToQueue({ queueName, workerQueue, cb }) { + const queueList = workerQueue ? [queueName, workerQueue] : [queueName] + const channelWrapper = connection.createChannel({ json: true, setup(channel) { - return Promise.all([channel.assertQueue(queueName, { durable: true })]) + return Promise.all(queueList.map(queue => channel.assertQueue(queue, { durable: true }))) } }) const sendToQueue = data => channelWrapper.sendToQueue(queueName, data, { persistent: true }) + let sendToWorker + if (workerQueue) { + sendToWorker = data => channelWrapper.sendToQueue(workerQueue, data, { persistent: true }) + } - cb({ sendToQueue, channel: channelWrapper }) + cb({ sendToQueue, sendToWorker, channel: channelWrapper }) } function connectSenderToQueue({ queueName, cb }) { @@ -59,6 +65,43 @@ function connectSenderToQueue({ queueName, cb }) { }) } +function connectWorkerToQueue({ queueName, senderQueue, cb }) { + const deadLetterExchange = `${queueName}-retry` + + const channelWrapper = connection.createChannel({ + json: true + }) + + channelWrapper.addSetup(channel => { + return Promise.all([ + channel.assertExchange(deadLetterExchange, 'fanout', { durable: true }), + channel.assertQueue(queueName, { durable: true }), + channel.assertQueue(senderQueue, { durable: true }), + channel.bindQueue(queueName, deadLetterExchange), + channel.prefetch(1), + channel.consume(queueName, msg => + cb({ + msg, + channel: channelWrapper, + ackMsg: job => channelWrapper.ack(job), + nackMsg: job => channelWrapper.nack(job, false, true), + sendToSenderQueue: data => channelWrapper.sendToQueue(senderQueue, data, { persistent: true }), + scheduleForRetry: async (data, msgRetries = 0) => { + await generateRetry({ + data, + msgRetries, + channelWrapper, + channel, + queueName, + deadLetterExchange + }) + } + }) + ) + ]) + }) +} + async function generateRetry({ data, msgRetries, channelWrapper, channel, queueName, deadLetterExchange }) { const retries = msgRetries + 1 const delay = getRetrySequence(retries) * 1000 @@ -78,6 +121,7 @@ async function generateRetry({ data, msgRetries, channelWrapper, channel, queueN module.exports = { connectWatcherToQueue, connectSenderToQueue, + connectWorkerToQueue, connection, generateRetry } diff --git a/oracle/src/watcher.js b/oracle/src/watcher.js index 321766b9..dea021d8 100644 --- a/oracle/src/watcher.js +++ b/oracle/src/watcher.js @@ -44,6 +44,7 @@ async function initialize() { await getLastProcessedBlock() connectWatcherToQueue({ queueName: config.queue, + workerQueue: config.workerQueue, cb: runMain }) } catch (e) { @@ -52,16 +53,16 @@ async function initialize() { } } -async function runMain({ sendToQueue }) { +async function runMain({ sendToQueue, sendToWorker }) { try { if (connection.isConnected() && redis.status === 'ready') { if (config.maxProcessingTime) { - await watchdog(() => main({ sendToQueue }), config.maxProcessingTime, () => { + await watchdog(() => main({ sendToQueue, sendToWorker }), config.maxProcessingTime, () => { logger.fatal('Max processing time reached') process.exit(EXIT_CODES.MAX_TIME_REACHED) }) } else { - await main({ sendToQueue }) + await main({ sendToQueue, sendToWorker }) } } } catch (e) { @@ -69,7 +70,7 @@ async function runMain({ sendToQueue }) { } setTimeout(() => { - runMain({ sendToQueue }) + runMain({ sendToQueue, sendToWorker }) }, config.pollingInterval) } @@ -84,7 +85,7 @@ function updateLastProcessedBlock(lastBlockNumber) { return redis.set(lastBlockRedisKey, lastProcessedBlock.toString()) } -function processEvents(events) { +function processEvents(events, blockNumber) { switch (config.id) { case 'native-erc-signature-request': case 'erc-erc-signature-request': @@ -102,7 +103,7 @@ function processEvents(events) { case 'erc-erc-transfer': case 'erc-native-transfer': case 'erc-native-half-duplex-transfer': - return processTransfers(events) + return processTransfers(events, blockNumber) case 'amb-signature-request': return processAMBSignatureRequests(events) case 'amb-collected-signatures': @@ -125,7 +126,7 @@ async function getLastBlockToProcess() { return lastBlockNumber.sub(requiredBlockConfirmations) } -async function main({ sendToQueue }) { +async function main({ sendToQueue, sendToWorker }) { try { const lastBlockToProcess = await getLastBlockToProcess() @@ -147,10 +148,14 @@ async function main({ sendToQueue }) { logger.info(`Found ${events.length} ${config.event} events`) if (events.length) { - const job = await processEvents(events) + const job = await processEvents(events, toBlock) logger.info('Transactions to send:', job.length) if (job.length) { + if (sendToWorker) { + await sendToWorker({ blockNumber: toBlock.toString() }) + } + await sendToQueue(job) } } diff --git a/oracle/src/worker.js b/oracle/src/worker.js new file mode 100644 index 00000000..17d7631d --- /dev/null +++ b/oracle/src/worker.js @@ -0,0 +1,73 @@ +const path = require('path') +const logger = require('./services/logger') +const rpcUrlsManager = require('./services/getRpcUrlsManager') +const { checkHTTPS, watchdog } = require('./utils/utils') +const { EXIT_CODES } = require('./utils/constants') +const { connectWorkerToQueue } = require('./services/amqpClient') + +const config = require(path.join('../config/', process.argv[2])) + +const swapTokens = require('./workers/swapTokens')(config) + +async function initialize() { + try { + const checkHttps = checkHTTPS(process.env.ORACLE_ALLOW_HTTP_FOR_RPC, logger) + + rpcUrlsManager.homeUrls.forEach(checkHttps('home')) + rpcUrlsManager.foreignUrls.forEach(checkHttps('foreign')) + + connectWorkerToQueue({ + queueName: config.workerQueue, + senderQueue: config.senderQueue, + cb: options => { + if (config.maxProcessingTime) { + return watchdog(() => main(options), config.maxProcessingTime, () => { + logger.fatal('Max processing time reached') + process.exit(EXIT_CODES.MAX_TIME_REACHED) + }) + } + + return main(options) + } + }) + } catch (e) { + logger.error(e.message) + process.exit(EXIT_CODES.GENERAL_ERROR) + } +} + +async function run(blockNumber) { + if (config.id === 'erc-native-swap-tokens') { + return swapTokens(blockNumber) + } else { + return [] + } +} + +async function main({ msg, ackMsg, nackMsg, sendToSenderQueue, scheduleForRetry }) { + try { + const { blockNumber } = JSON.parse(msg.content) + logger.info(`Msg received with block number ${blockNumber}`) + + try { + const job = await run(blockNumber) + + logger.info('Transactions to send:', job.length) + + if (job.length) { + await sendToSenderQueue(job) + } + } catch (e) { + logger.info(`Sending failed msg to retry`) + await scheduleForRetry({ blockNumber }, msg.properties.headers['x-retries']) + } + + ackMsg(msg) + } catch (e) { + logger.error(e) + nackMsg(msg) + } + logger.debug(`Finished worker operation`) +} + +initialize() diff --git a/oracle/src/workers/swapTokens.js b/oracle/src/workers/swapTokens.js new file mode 100644 index 00000000..c6c8a056 --- /dev/null +++ b/oracle/src/workers/swapTokens.js @@ -0,0 +1,92 @@ +require('../../env') +const { HttpListProviderError } = require('http-list-provider') +const rootLogger = require('../services/logger') +const { web3Foreign } = require('../services/web3') + +const { BRIDGE_VALIDATORS_ABI } = require('../../../commons') + +let validatorContract = null + +function swapTokensBuilder(config) { + const foreignBridge = new web3Foreign.eth.Contract(config.foreignBridgeAbi, config.foreignBridgeAddress) + + return async function swapTokens(blockNumber) { + const txToSend = [] + + const logger = rootLogger.child({ + blockNumber: blockNumber.toString() + }) + + logger.debug(`Starting swap tokens operation`) + + if (validatorContract === null) { + logger.debug('Getting validator contract address') + const validatorContractAddress = await foreignBridge.methods.validatorContract().call() + logger.debug({ validatorContractAddress }, 'Validator contract address obtained') + + validatorContract = new web3Foreign.eth.Contract(BRIDGE_VALIDATORS_ABI, validatorContractAddress) + } + + logger.debug(`Checking if is validator duty`) + const validatorDuty = await validatorContract.methods.isValidatorDuty(config.validatorAddress).call() + + if (!validatorDuty) { + logger.info(`Token swap discarded because is not validator duty`) + return txToSend + } + + logger.debug(`Checking if half duplex balance if above the threshold`) + const hdTokenBalanceAboveMinBalance = await foreignBridge.methods.isHDTokenBalanceAboveMinBalance().call() + + if (!hdTokenBalanceAboveMinBalance) { + logger.info(`Token swap discarded because half duplex balance if below the threshold`) + return txToSend + } + + logger.debug(`Getting block`) + const block = await web3Foreign.eth.getBlock(blockNumber) + + logger.debug(`Checking if SCD Emergency Shutdown has happened`) + const tokenSwapAllowed = await foreignBridge.methods.isTokenSwapAllowed(block.timestamp).call() + + if (!tokenSwapAllowed) { + logger.info(`Token swap discarded because SCD Emergency Shutdown has happened`) + return txToSend + } + + let gasEstimate + + try { + logger.debug(`Estimate gas`) + gasEstimate = await foreignBridge.methods.swapTokens().estimateGas({ + from: config.validatorAddress + }) + + logger.debug({ gasEstimate }, 'Gas estimated') + } catch (e) { + if (e instanceof HttpListProviderError) { + const errorMsg = 'RPC Connection Error: swapTokens Gas Estimate cannot be obtained.' + logger.error(e, errorMsg) + throw new Error(errorMsg) + } else { + logger.error(e, 'Unknown error while processing transaction') + throw e + } + } + + // generate data + const data = await foreignBridge.methods.swapTokens().encodeABI() + + // push to job + txToSend.push({ + data, + gasEstimate, + transactionReference: `swap tokens operation for block number ${blockNumber.toString()}`, + to: config.foreignBridgeAddress + }) + + return txToSend + } +} + +module.exports = swapTokensBuilder