Add swap-tokens worker
This commit is contained in:
parent
eb8de323ee
commit
c19f48ef3f
@ -32,6 +32,7 @@ module.exports = {
|
|||||||
eventAbi: ERC20_ABI,
|
eventAbi: ERC20_ABI,
|
||||||
eventFilter: { to: process.env.COMMON_FOREIGN_BRIDGE_ADDRESS },
|
eventFilter: { to: process.env.COMMON_FOREIGN_BRIDGE_ADDRESS },
|
||||||
queue: 'home',
|
queue: 'home',
|
||||||
|
workerQueue: 'swap-tokens',
|
||||||
name: `watcher-${id}`,
|
name: `watcher-${id}`,
|
||||||
id
|
id
|
||||||
}
|
}
|
||||||
|
12
oracle/config/swap-tokens-worker.config.js
Normal file
12
oracle/config/swap-tokens-worker.config.js
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
const baseConfig = require('./base.config')
|
||||||
|
|
||||||
|
const id = `${baseConfig.id}-swap-tokens`
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
...baseConfig.bridgeConfig,
|
||||||
|
...baseConfig.foreignConfig,
|
||||||
|
workerQueue: 'swap-tokens',
|
||||||
|
senderQueue: 'foreign',
|
||||||
|
name: `worker-${id}`,
|
||||||
|
id
|
||||||
|
}
|
@ -10,9 +10,10 @@
|
|||||||
"watcher:affirmation-request": "./scripts/start-worker.sh watcher affirmation-request-watcher",
|
"watcher:affirmation-request": "./scripts/start-worker.sh watcher affirmation-request-watcher",
|
||||||
"watcher:transfer": "./scripts/start-worker.sh watcher transfer-watcher",
|
"watcher:transfer": "./scripts/start-worker.sh watcher transfer-watcher",
|
||||||
"watcher:half-duplex-transfer": "./scripts/start-worker.sh watcher half-duplex-transfer-watcher",
|
"watcher:half-duplex-transfer": "./scripts/start-worker.sh watcher half-duplex-transfer-watcher",
|
||||||
|
"worker:swap-tokens": "./scripts/start-worker.sh worker swap-tokens-worker",
|
||||||
"sender:home": "./scripts/start-worker.sh sender home-sender",
|
"sender:home": "./scripts/start-worker.sh sender home-sender",
|
||||||
"sender:foreign": "./scripts/start-worker.sh sender foreign-sender",
|
"sender:foreign": "./scripts/start-worker.sh sender foreign-sender",
|
||||||
"dev": "concurrently -n 'watcher:signature-request,watcher:collected-signatures,watcher:affirmation-request,watcher:transfer,watcher:half-duplex-transfer,sender:home,sender:foreign' -c 'red,green,yellow,blue,white,magenta,cyan' 'yarn watcher:signature-request' 'yarn watcher:collected-signatures' 'yarn watcher:affirmation-request' 'yarn watcher:transfer' 'yarn watcher:half-duplex-transfer' 'yarn sender:home' 'yarn sender:foreign'",
|
"dev": "concurrently -n 'watcher:signature-request,watcher:collected-signatures,watcher:affirmation-request,watcher:transfer,watcher:half-duplex-transfer, worker:swap-tokens, sender:home,sender:foreign' -c 'red,green,yellow,blue,white,gray,magenta,cyan' 'yarn watcher:signature-request' 'yarn watcher:collected-signatures' 'yarn watcher:affirmation-request' 'yarn watcher:transfer' 'yarn watcher:half-duplex-transfer' 'yarn worker:swap-tokens' 'yarn sender:home' 'yarn sender:foreign'",
|
||||||
"test": "NODE_ENV=test mocha",
|
"test": "NODE_ENV=test mocha",
|
||||||
"test:watch": "NODE_ENV=test mocha --watch --reporter=min",
|
"test:watch": "NODE_ENV=test mocha --watch --reporter=min",
|
||||||
"coverage": "NODE_ENV=test nyc --reporter=text --reporter=html mocha",
|
"coverage": "NODE_ENV=test nyc --reporter=text --reporter=html mocha",
|
||||||
|
@ -22,7 +22,7 @@ function processTransfersBuilder(config) {
|
|||||||
const userRequestForAffirmationHash = web3Home.eth.abi.encodeEventSignature(userRequestForAffirmationAbi)
|
const userRequestForAffirmationHash = web3Home.eth.abi.encodeEventSignature(userRequestForAffirmationAbi)
|
||||||
const tokensSwappedHash = tokensSwappedAbi ? web3Home.eth.abi.encodeEventSignature(tokensSwappedAbi) : '0x'
|
const tokensSwappedHash = tokensSwappedAbi ? web3Home.eth.abi.encodeEventSignature(tokensSwappedAbi) : '0x'
|
||||||
|
|
||||||
return async function processTransfers(transfers) {
|
return async function processTransfers(transfers, blockNumber) {
|
||||||
const txToSend = []
|
const txToSend = []
|
||||||
|
|
||||||
if (validatorContract === null) {
|
if (validatorContract === null) {
|
||||||
@ -44,7 +44,7 @@ function processTransfersBuilder(config) {
|
|||||||
|
|
||||||
logger.info({ from, value }, `Processing transfer ${transfer.transactionHash}`)
|
logger.info({ from, value }, `Processing transfer ${transfer.transactionHash}`)
|
||||||
|
|
||||||
const block = await web3Foreign.eth.getBlock('latest')
|
const block = await web3Foreign.eth.getBlock(blockNumber)
|
||||||
const tokenSwapAllowed = await foreignBridge.methods.isTokenSwapAllowed(block.timestamp)
|
const tokenSwapAllowed = await foreignBridge.methods.isTokenSwapAllowed(block.timestamp)
|
||||||
|
|
||||||
if (!tokenSwapAllowed) {
|
if (!tokenSwapAllowed) {
|
||||||
|
@ -11,17 +11,23 @@ connection.on('disconnect', () => {
|
|||||||
logger.error('Disconnected from amqp Broker')
|
logger.error('Disconnected from amqp Broker')
|
||||||
})
|
})
|
||||||
|
|
||||||
function connectWatcherToQueue({ queueName, cb }) {
|
function connectWatcherToQueue({ queueName, workerQueue, cb }) {
|
||||||
|
const queueList = workerQueue ? [queueName, workerQueue] : [queueName]
|
||||||
|
|
||||||
const channelWrapper = connection.createChannel({
|
const channelWrapper = connection.createChannel({
|
||||||
json: true,
|
json: true,
|
||||||
setup(channel) {
|
setup(channel) {
|
||||||
return Promise.all([channel.assertQueue(queueName, { durable: true })])
|
return Promise.all(queueList.map(queue => channel.assertQueue(queue, { durable: true })))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
const sendToQueue = data => channelWrapper.sendToQueue(queueName, data, { persistent: true })
|
const sendToQueue = data => channelWrapper.sendToQueue(queueName, data, { persistent: true })
|
||||||
|
let sendToWorker
|
||||||
|
if (workerQueue) {
|
||||||
|
sendToWorker = data => channelWrapper.sendToQueue(workerQueue, data, { persistent: true })
|
||||||
|
}
|
||||||
|
|
||||||
cb({ sendToQueue, channel: channelWrapper })
|
cb({ sendToQueue, sendToWorker, channel: channelWrapper })
|
||||||
}
|
}
|
||||||
|
|
||||||
function connectSenderToQueue({ queueName, cb }) {
|
function connectSenderToQueue({ queueName, cb }) {
|
||||||
@ -59,6 +65,43 @@ function connectSenderToQueue({ queueName, cb }) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function connectWorkerToQueue({ queueName, senderQueue, cb }) {
|
||||||
|
const deadLetterExchange = `${queueName}-retry`
|
||||||
|
|
||||||
|
const channelWrapper = connection.createChannel({
|
||||||
|
json: true
|
||||||
|
})
|
||||||
|
|
||||||
|
channelWrapper.addSetup(channel => {
|
||||||
|
return Promise.all([
|
||||||
|
channel.assertExchange(deadLetterExchange, 'fanout', { durable: true }),
|
||||||
|
channel.assertQueue(queueName, { durable: true }),
|
||||||
|
channel.assertQueue(senderQueue, { durable: true }),
|
||||||
|
channel.bindQueue(queueName, deadLetterExchange),
|
||||||
|
channel.prefetch(1),
|
||||||
|
channel.consume(queueName, msg =>
|
||||||
|
cb({
|
||||||
|
msg,
|
||||||
|
channel: channelWrapper,
|
||||||
|
ackMsg: job => channelWrapper.ack(job),
|
||||||
|
nackMsg: job => channelWrapper.nack(job, false, true),
|
||||||
|
sendToSenderQueue: data => channelWrapper.sendToQueue(senderQueue, data, { persistent: true }),
|
||||||
|
scheduleForRetry: async (data, msgRetries = 0) => {
|
||||||
|
await generateRetry({
|
||||||
|
data,
|
||||||
|
msgRetries,
|
||||||
|
channelWrapper,
|
||||||
|
channel,
|
||||||
|
queueName,
|
||||||
|
deadLetterExchange
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
)
|
||||||
|
])
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
async function generateRetry({ data, msgRetries, channelWrapper, channel, queueName, deadLetterExchange }) {
|
async function generateRetry({ data, msgRetries, channelWrapper, channel, queueName, deadLetterExchange }) {
|
||||||
const retries = msgRetries + 1
|
const retries = msgRetries + 1
|
||||||
const delay = getRetrySequence(retries) * 1000
|
const delay = getRetrySequence(retries) * 1000
|
||||||
@ -78,6 +121,7 @@ async function generateRetry({ data, msgRetries, channelWrapper, channel, queueN
|
|||||||
module.exports = {
|
module.exports = {
|
||||||
connectWatcherToQueue,
|
connectWatcherToQueue,
|
||||||
connectSenderToQueue,
|
connectSenderToQueue,
|
||||||
|
connectWorkerToQueue,
|
||||||
connection,
|
connection,
|
||||||
generateRetry
|
generateRetry
|
||||||
}
|
}
|
||||||
|
@ -44,6 +44,7 @@ async function initialize() {
|
|||||||
await getLastProcessedBlock()
|
await getLastProcessedBlock()
|
||||||
connectWatcherToQueue({
|
connectWatcherToQueue({
|
||||||
queueName: config.queue,
|
queueName: config.queue,
|
||||||
|
workerQueue: config.workerQueue,
|
||||||
cb: runMain
|
cb: runMain
|
||||||
})
|
})
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
@ -52,16 +53,16 @@ async function initialize() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function runMain({ sendToQueue }) {
|
async function runMain({ sendToQueue, sendToWorker }) {
|
||||||
try {
|
try {
|
||||||
if (connection.isConnected() && redis.status === 'ready') {
|
if (connection.isConnected() && redis.status === 'ready') {
|
||||||
if (config.maxProcessingTime) {
|
if (config.maxProcessingTime) {
|
||||||
await watchdog(() => main({ sendToQueue }), config.maxProcessingTime, () => {
|
await watchdog(() => main({ sendToQueue, sendToWorker }), config.maxProcessingTime, () => {
|
||||||
logger.fatal('Max processing time reached')
|
logger.fatal('Max processing time reached')
|
||||||
process.exit(EXIT_CODES.MAX_TIME_REACHED)
|
process.exit(EXIT_CODES.MAX_TIME_REACHED)
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
await main({ sendToQueue })
|
await main({ sendToQueue, sendToWorker })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
@ -69,7 +70,7 @@ async function runMain({ sendToQueue }) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
runMain({ sendToQueue })
|
runMain({ sendToQueue, sendToWorker })
|
||||||
}, config.pollingInterval)
|
}, config.pollingInterval)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -84,7 +85,7 @@ function updateLastProcessedBlock(lastBlockNumber) {
|
|||||||
return redis.set(lastBlockRedisKey, lastProcessedBlock.toString())
|
return redis.set(lastBlockRedisKey, lastProcessedBlock.toString())
|
||||||
}
|
}
|
||||||
|
|
||||||
function processEvents(events) {
|
function processEvents(events, blockNumber) {
|
||||||
switch (config.id) {
|
switch (config.id) {
|
||||||
case 'native-erc-signature-request':
|
case 'native-erc-signature-request':
|
||||||
case 'erc-erc-signature-request':
|
case 'erc-erc-signature-request':
|
||||||
@ -102,7 +103,7 @@ function processEvents(events) {
|
|||||||
case 'erc-erc-transfer':
|
case 'erc-erc-transfer':
|
||||||
case 'erc-native-transfer':
|
case 'erc-native-transfer':
|
||||||
case 'erc-native-half-duplex-transfer':
|
case 'erc-native-half-duplex-transfer':
|
||||||
return processTransfers(events)
|
return processTransfers(events, blockNumber)
|
||||||
case 'amb-signature-request':
|
case 'amb-signature-request':
|
||||||
return processAMBSignatureRequests(events)
|
return processAMBSignatureRequests(events)
|
||||||
case 'amb-collected-signatures':
|
case 'amb-collected-signatures':
|
||||||
@ -125,7 +126,7 @@ async function getLastBlockToProcess() {
|
|||||||
return lastBlockNumber.sub(requiredBlockConfirmations)
|
return lastBlockNumber.sub(requiredBlockConfirmations)
|
||||||
}
|
}
|
||||||
|
|
||||||
async function main({ sendToQueue }) {
|
async function main({ sendToQueue, sendToWorker }) {
|
||||||
try {
|
try {
|
||||||
const lastBlockToProcess = await getLastBlockToProcess()
|
const lastBlockToProcess = await getLastBlockToProcess()
|
||||||
|
|
||||||
@ -147,10 +148,14 @@ async function main({ sendToQueue }) {
|
|||||||
logger.info(`Found ${events.length} ${config.event} events`)
|
logger.info(`Found ${events.length} ${config.event} events`)
|
||||||
|
|
||||||
if (events.length) {
|
if (events.length) {
|
||||||
const job = await processEvents(events)
|
const job = await processEvents(events, toBlock)
|
||||||
logger.info('Transactions to send:', job.length)
|
logger.info('Transactions to send:', job.length)
|
||||||
|
|
||||||
if (job.length) {
|
if (job.length) {
|
||||||
|
if (sendToWorker) {
|
||||||
|
await sendToWorker({ blockNumber: toBlock.toString() })
|
||||||
|
}
|
||||||
|
|
||||||
await sendToQueue(job)
|
await sendToQueue(job)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
73
oracle/src/worker.js
Normal file
73
oracle/src/worker.js
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
const path = require('path')
|
||||||
|
const logger = require('./services/logger')
|
||||||
|
const rpcUrlsManager = require('./services/getRpcUrlsManager')
|
||||||
|
const { checkHTTPS, watchdog } = require('./utils/utils')
|
||||||
|
const { EXIT_CODES } = require('./utils/constants')
|
||||||
|
const { connectWorkerToQueue } = require('./services/amqpClient')
|
||||||
|
|
||||||
|
const config = require(path.join('../config/', process.argv[2]))
|
||||||
|
|
||||||
|
const swapTokens = require('./workers/swapTokens')(config)
|
||||||
|
|
||||||
|
async function initialize() {
|
||||||
|
try {
|
||||||
|
const checkHttps = checkHTTPS(process.env.ORACLE_ALLOW_HTTP_FOR_RPC, logger)
|
||||||
|
|
||||||
|
rpcUrlsManager.homeUrls.forEach(checkHttps('home'))
|
||||||
|
rpcUrlsManager.foreignUrls.forEach(checkHttps('foreign'))
|
||||||
|
|
||||||
|
connectWorkerToQueue({
|
||||||
|
queueName: config.workerQueue,
|
||||||
|
senderQueue: config.senderQueue,
|
||||||
|
cb: options => {
|
||||||
|
if (config.maxProcessingTime) {
|
||||||
|
return watchdog(() => main(options), config.maxProcessingTime, () => {
|
||||||
|
logger.fatal('Max processing time reached')
|
||||||
|
process.exit(EXIT_CODES.MAX_TIME_REACHED)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return main(options)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
} catch (e) {
|
||||||
|
logger.error(e.message)
|
||||||
|
process.exit(EXIT_CODES.GENERAL_ERROR)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function run(blockNumber) {
|
||||||
|
if (config.id === 'erc-native-swap-tokens') {
|
||||||
|
return swapTokens(blockNumber)
|
||||||
|
} else {
|
||||||
|
return []
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function main({ msg, ackMsg, nackMsg, sendToSenderQueue, scheduleForRetry }) {
|
||||||
|
try {
|
||||||
|
const { blockNumber } = JSON.parse(msg.content)
|
||||||
|
logger.info(`Msg received with block number ${blockNumber}`)
|
||||||
|
|
||||||
|
try {
|
||||||
|
const job = await run(blockNumber)
|
||||||
|
|
||||||
|
logger.info('Transactions to send:', job.length)
|
||||||
|
|
||||||
|
if (job.length) {
|
||||||
|
await sendToSenderQueue(job)
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
logger.info(`Sending failed msg to retry`)
|
||||||
|
await scheduleForRetry({ blockNumber }, msg.properties.headers['x-retries'])
|
||||||
|
}
|
||||||
|
|
||||||
|
ackMsg(msg)
|
||||||
|
} catch (e) {
|
||||||
|
logger.error(e)
|
||||||
|
nackMsg(msg)
|
||||||
|
}
|
||||||
|
logger.debug(`Finished worker operation`)
|
||||||
|
}
|
||||||
|
|
||||||
|
initialize()
|
92
oracle/src/workers/swapTokens.js
Normal file
92
oracle/src/workers/swapTokens.js
Normal file
@ -0,0 +1,92 @@
|
|||||||
|
require('../../env')
|
||||||
|
const { HttpListProviderError } = require('http-list-provider')
|
||||||
|
const rootLogger = require('../services/logger')
|
||||||
|
const { web3Foreign } = require('../services/web3')
|
||||||
|
|
||||||
|
const { BRIDGE_VALIDATORS_ABI } = require('../../../commons')
|
||||||
|
|
||||||
|
let validatorContract = null
|
||||||
|
|
||||||
|
function swapTokensBuilder(config) {
|
||||||
|
const foreignBridge = new web3Foreign.eth.Contract(config.foreignBridgeAbi, config.foreignBridgeAddress)
|
||||||
|
|
||||||
|
return async function swapTokens(blockNumber) {
|
||||||
|
const txToSend = []
|
||||||
|
|
||||||
|
const logger = rootLogger.child({
|
||||||
|
blockNumber: blockNumber.toString()
|
||||||
|
})
|
||||||
|
|
||||||
|
logger.debug(`Starting swap tokens operation`)
|
||||||
|
|
||||||
|
if (validatorContract === null) {
|
||||||
|
logger.debug('Getting validator contract address')
|
||||||
|
const validatorContractAddress = await foreignBridge.methods.validatorContract().call()
|
||||||
|
logger.debug({ validatorContractAddress }, 'Validator contract address obtained')
|
||||||
|
|
||||||
|
validatorContract = new web3Foreign.eth.Contract(BRIDGE_VALIDATORS_ABI, validatorContractAddress)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.debug(`Checking if is validator duty`)
|
||||||
|
const validatorDuty = await validatorContract.methods.isValidatorDuty(config.validatorAddress).call()
|
||||||
|
|
||||||
|
if (!validatorDuty) {
|
||||||
|
logger.info(`Token swap discarded because is not validator duty`)
|
||||||
|
return txToSend
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.debug(`Checking if half duplex balance if above the threshold`)
|
||||||
|
const hdTokenBalanceAboveMinBalance = await foreignBridge.methods.isHDTokenBalanceAboveMinBalance().call()
|
||||||
|
|
||||||
|
if (!hdTokenBalanceAboveMinBalance) {
|
||||||
|
logger.info(`Token swap discarded because half duplex balance if below the threshold`)
|
||||||
|
return txToSend
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.debug(`Getting block`)
|
||||||
|
const block = await web3Foreign.eth.getBlock(blockNumber)
|
||||||
|
|
||||||
|
logger.debug(`Checking if SCD Emergency Shutdown has happened`)
|
||||||
|
const tokenSwapAllowed = await foreignBridge.methods.isTokenSwapAllowed(block.timestamp).call()
|
||||||
|
|
||||||
|
if (!tokenSwapAllowed) {
|
||||||
|
logger.info(`Token swap discarded because SCD Emergency Shutdown has happened`)
|
||||||
|
return txToSend
|
||||||
|
}
|
||||||
|
|
||||||
|
let gasEstimate
|
||||||
|
|
||||||
|
try {
|
||||||
|
logger.debug(`Estimate gas`)
|
||||||
|
gasEstimate = await foreignBridge.methods.swapTokens().estimateGas({
|
||||||
|
from: config.validatorAddress
|
||||||
|
})
|
||||||
|
|
||||||
|
logger.debug({ gasEstimate }, 'Gas estimated')
|
||||||
|
} catch (e) {
|
||||||
|
if (e instanceof HttpListProviderError) {
|
||||||
|
const errorMsg = 'RPC Connection Error: swapTokens Gas Estimate cannot be obtained.'
|
||||||
|
logger.error(e, errorMsg)
|
||||||
|
throw new Error(errorMsg)
|
||||||
|
} else {
|
||||||
|
logger.error(e, 'Unknown error while processing transaction')
|
||||||
|
throw e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// generate data
|
||||||
|
const data = await foreignBridge.methods.swapTokens().encodeABI()
|
||||||
|
|
||||||
|
// push to job
|
||||||
|
txToSend.push({
|
||||||
|
data,
|
||||||
|
gasEstimate,
|
||||||
|
transactionReference: `swap tokens operation for block number ${blockNumber.toString()}`,
|
||||||
|
to: config.foreignBridgeAddress
|
||||||
|
})
|
||||||
|
|
||||||
|
return txToSend
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = swapTokensBuilder
|
Loading…
Reference in New Issue
Block a user