Add confirmRelay script (#333)

This commit is contained in:
Kirill Fedoseev 2020-05-21 20:23:00 +03:00 committed by GitHub
parent 4bd3576691
commit a2c678d0a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 233 additions and 4 deletions

@ -14,6 +14,7 @@
"worker:convert-to-chai": "./scripts/start-worker.sh worker convert-to-chai-worker",
"sender:home": "./scripts/start-worker.sh sender home-sender",
"sender:foreign": "./scripts/start-worker.sh sender foreign-sender",
"confirm:transfer": "./scripts/start-worker.sh confirmRelay transfer-watcher",
"dev": "concurrently -n 'watcher:signature-request,watcher:collected-signatures,watcher:affirmation-request,watcher:transfer,watcher:half-duplex-transfer, worker:swap-tokens, sender:home,sender:foreign' -c 'red,green,yellow,blue,white,gray,magenta,cyan' 'yarn watcher:signature-request' 'yarn watcher:collected-signatures' 'yarn watcher:affirmation-request' 'yarn watcher:transfer' 'yarn watcher:half-duplex-transfer' 'yarn worker:swap-tokens' 'yarn sender:home' 'yarn sender:foreign'",
"test": "NODE_ENV=test mocha",
"test:watch": "NODE_ENV=test mocha --watch --reporter=min",

@ -8,11 +8,12 @@ LOGS_DIR="logs/"
WORKER="${WORKERS_DIR}${1}.js"
CONFIG="${2}.config.js"
LOG="${LOGS_DIR}${2}.txt"
TX_HASH="${3}"
CHECKS=$(node scripts/initialChecks.js)
if [ "${NODE_ENV}" = "production" ]; then
exec node "${WORKER}" "${CONFIG}" "$CHECKS"
exec node "${WORKER}" "${CONFIG}" "$CHECKS" "$TX_HASH"
else
node "${WORKER}" "${CONFIG}" "$CHECKS" | tee -a "${LOG}" | pino-pretty
node "${WORKER}" "${CONFIG}" "$CHECKS" "$TX_HASH" | tee -a "${LOG}" | pino-pretty
fi

186
oracle/src/confirmRelay.js Normal file

@ -0,0 +1,186 @@
require('../env')
const path = require('path')
const { isAttached, connectWatcherToQueue, connection } = require('./services/amqpClient')
const logger = require('./services/logger')
const GasPrice = require('./services/gasPrice')
const rpcUrlsManager = require('./services/getRpcUrlsManager')
const { getNonce, getChainId, getEventsFromTx } = require('./tx/web3')
const { sendTx } = require('./tx/sendTx')
const { checkHTTPS, watchdog, syncForEach, addExtraGas } = require('./utils/utils')
const { EXIT_CODES, EXTRA_GAS_PERCENTAGE } = require('./utils/constants')
const { ORACLE_VALIDATOR_ADDRESS, ORACLE_VALIDATOR_ADDRESS_PRIVATE_KEY, ORACLE_ALLOW_HTTP_FOR_RPC } = process.env
if (process.argv.length < 5) {
logger.error('Please check the number of arguments, transaction hash is not present')
process.exit(EXIT_CODES.GENERAL_ERROR)
}
const config = require(path.join('../config/', process.argv[2]))
const txHash = process.argv[4]
const processSignatureRequests = require('./events/processSignatureRequests')(config)
const processCollectedSignatures = require('./events/processCollectedSignatures')(config)
const processAffirmationRequests = require('./events/processAffirmationRequests')(config)
const processTransfers = require('./events/processTransfers')(config)
const processAMBSignatureRequests = require('./events/processAMBSignatureRequests')(config)
const processAMBCollectedSignatures = require('./events/processAMBCollectedSignatures')(config)
const processAMBAffirmationRequests = require('./events/processAMBAffirmationRequests')(config)
const web3Instance = config.web3
const { eventContractAddress } = config
const eventContract = new web3Instance.eth.Contract(config.eventAbi, eventContractAddress)
let attached
async function initialize() {
try {
const checkHttps = checkHTTPS(ORACLE_ALLOW_HTTP_FOR_RPC, logger)
rpcUrlsManager.homeUrls.forEach(checkHttps('home'))
rpcUrlsManager.foreignUrls.forEach(checkHttps('foreign'))
attached = await isAttached()
if (attached) {
logger.info('RabbitMQ container is available, using oracle sender')
} else {
logger.info('RabbitMQ container is not available, using internal sender')
}
connectWatcherToQueue({
queueName: config.queue,
workerQueue: config.workerQueue,
cb: runMain
})
} catch (e) {
logger.error(e)
process.exit(EXIT_CODES.GENERAL_ERROR)
}
}
async function runMain({ sendToQueue }) {
try {
const sendJob = attached ? sendToQueue : sendJobTx
if (!attached || connection.isConnected()) {
if (config.maxProcessingTime) {
await watchdog(() => main({ sendJob, txHash }), config.maxProcessingTime, () => {
logger.fatal('Max processing time reached')
process.exit(EXIT_CODES.MAX_TIME_REACHED)
})
} else {
await main({ sendJob, txHash })
}
} else {
setTimeout(() => {
runMain({ sendToQueue })
}, config.pollingInterval)
}
} catch (e) {
logger.error(e)
}
}
function processEvents(events) {
switch (config.id) {
case 'native-erc-signature-request':
case 'erc-erc-signature-request':
case 'erc-native-signature-request':
return processSignatureRequests(events)
case 'native-erc-collected-signatures':
case 'erc-erc-collected-signatures':
case 'erc-native-collected-signatures':
return processCollectedSignatures(events)
case 'native-erc-affirmation-request':
case 'erc677-erc677-affirmation-request':
case 'erc-native-affirmation-request':
case 'erc-erc-affirmation-request':
return processAffirmationRequests(events)
case 'erc-erc-transfer':
case 'erc-native-transfer':
return processTransfers(events)
case 'amb-signature-request':
return processAMBSignatureRequests(events)
case 'amb-collected-signatures':
return processAMBCollectedSignatures(events)
case 'amb-affirmation-request':
return processAMBAffirmationRequests(events)
default:
return []
}
}
async function main({ sendJob, txHash }) {
try {
const events = await getEventsFromTx({
web3: web3Instance,
contract: eventContract,
event: config.event,
txHash,
filter: config.eventFilter
})
logger.info(`Found ${events.length} ${config.event} events`)
if (events.length) {
const job = await processEvents(events)
logger.info('Transactions to send:', job.length)
if (job.length) {
await sendJob(job)
}
}
} catch (e) {
logger.error(e)
}
await connection.close()
logger.debug('Finished')
}
async function sendJobTx(jobs) {
const gasPrice = await GasPrice.start(config.queue, true)
const chainId = await getChainId(config.queue)
let nonce = await getNonce(web3Instance, ORACLE_VALIDATOR_ADDRESS)
await syncForEach(jobs, async job => {
const gasLimit = addExtraGas(job.gasEstimate, EXTRA_GAS_PERCENTAGE)
try {
logger.info(`Sending transaction with nonce ${nonce}`)
const txHash = await sendTx({
chain: config.queue,
data: job.data,
nonce,
gasPrice: gasPrice.toString(10),
amount: '0',
gasLimit,
privateKey: ORACLE_VALIDATOR_ADDRESS_PRIVATE_KEY,
to: job.to,
chainId,
web3: web3Instance
})
nonce++
logger.info(
{ eventTransactionHash: job.transactionReference, generatedTransactionHash: txHash },
`Tx generated ${txHash} for event Tx ${job.transactionReference}`
)
} catch (e) {
logger.error(
{ eventTransactionHash: job.transactionReference, error: e.message },
`Tx Failed for event Tx ${job.transactionReference}.`,
e.message
)
if (e.message.includes('Insufficient funds')) {
const currentBalance = await web3Instance.eth.getBalance(ORACLE_VALIDATOR_ADDRESS)
const minimumBalance = gasLimit.multipliedBy(gasPrice)
logger.error(
`Insufficient funds: ${currentBalance}. Stop processing messages until the balance is at least ${minimumBalance}.`
)
}
}
})
}
initialize()

@ -1,4 +1,6 @@
require('../../env')
const url = require('url')
const dns = require('dns')
const connection = require('amqp-connection-manager').connect(process.env.ORACLE_QUEUE_URL)
const logger = require('./logger')
const { getRetrySequence } = require('../utils/utils')
@ -11,6 +13,14 @@ connection.on('disconnect', () => {
logger.error('Disconnected from amqp Broker')
})
async function isAttached() {
if (!process.env.ORACLE_QUEUE_URL) {
return false
}
const amqpHost = new url.URL(process.env.ORACLE_QUEUE_URL).hostname
return new Promise(res => dns.lookup(amqpHost, err => res(err === null)))
}
function connectWatcherToQueue({ queueName, workerQueue, cb }) {
const queueList = workerQueue ? [queueName, workerQueue] : [queueName]
@ -119,6 +129,7 @@ async function generateRetry({ data, msgRetries, channelWrapper, channel, queueN
}
module.exports = {
isAttached,
connectWatcherToQueue,
connectSenderToQueue,
connectWorkerToQueue,

@ -45,7 +45,7 @@ const fetchGasPrice = async (speedType, factor, bridgeContract, gasPriceSupplier
return cachedGasPrice
}
async function start(chainId) {
async function start(chainId, fetchOnce) {
clearInterval(fetchGasPriceInterval)
let bridgeContract = null
@ -73,10 +73,16 @@ async function start(chainId) {
throw new Error(`Unrecognized chainId '${chainId}'`)
}
if (fetchOnce) {
await fetchGasPrice(speedType, factor, bridgeContract, () => fetch(gasPriceSupplierUrl))
return getPrice()
}
fetchGasPriceInterval = setIntervalAndRun(
() => fetchGasPrice(speedType, factor, bridgeContract, () => fetch(gasPriceSupplierUrl)),
updateInterval
)
return null
}
function getPrice() {

@ -69,10 +69,34 @@ async function getEvents({ contract, event, fromBlock, toBlock, filter }) {
}
}
async function getEventsFromTx({ web3, contract, event, txHash, filter }) {
try {
const contractAddress = contract.options.address
logger.info({ contractAddress, event, txHash }, 'Getting past events for specific transaction')
const { logs } = await web3.eth.getTransactionReceipt(txHash)
const eventAbi = contract.options.jsonInterface.find(abi => abi.name === event)
const decodeAbi = contract._decodeEventABI.bind(eventAbi)
const pastEvents = logs
.filter(event => event.topics[0] === eventAbi.signature)
.map(decodeAbi)
.filter(event =>
eventAbi.inputs.every(arg => {
const encodeParam = param => web3.eth.abi.encodeParameter(arg.type, param)
return !filter[arg.name] || encodeParam(filter[arg.name]) === encodeParam(event.returnValues[arg.name])
})
)
logger.debug({ contractAddress, event, count: pastEvents.length }, 'Past events obtained')
return pastEvents
} catch (e) {
throw new Error(`${event} events cannot be obtained`)
}
}
module.exports = {
getNonce,
getBlockNumber,
getChainId,
getRequiredBlockConfirmations,
getEvents
getEvents,
getEventsFromTx
}