diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 35708b89..d953abba 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -53,6 +53,12 @@ ORACLE_SHUTDOWN_CONTRACT_METHOD | Method signature to be used in the side chain ORACLE_FOREIGN_RPC_BLOCK_POLLING_LIMIT | Max length for the block range used in `eth_getLogs` requests for polling contract events for the Foreign chain. Infinite, if not provided. | `integer` ORACLE_HOME_RPC_BLOCK_POLLING_LIMIT | Max length for the block range used in `eth_getLogs` requests for polling contract events for the Home chain. Infinite, if not provided. | `integer` ORACLE_JSONRPC_ERROR_CODES | Override default JSON rpc error codes that can trigger RPC fallback to the next URL from the list (or a retry in case of a single RPC URL). Default is `-32603,-32002,-32005`. Should be a comma-separated list of negative integers. | `string` +ORACLE_HOME_EVENTS_REPROCESSING | If set to `true`, home events happened in the past will be refetched and processed once again, to ensure that nothing was missed on the first pass. | `bool` +ORACLE_HOME_EVENTS_REPROCESSING_BATCH_SIZE | Batch size for one `eth_getLogs` request when reprocessing old logs in the home chain. Defaults to `1000` | `integer` +ORACLE_HOME_EVENTS_REPROCESSING_BLOCK_DELAY | Block confirmations number, after which old logs are being reprocessed in the home chain. Defaults to `500` | `integer` +ORACLE_FOREIGN_EVENTS_REPROCESSING | If set to `true`, foreign events happened in the past will be refetched and processed once again, to ensure that nothing was missed on the first pass. | `bool` +ORACLE_FOREIGN_EVENTS_REPROCESSING_BATCH_SIZE | Batch size for one `eth_getLogs` request when reprocessing old logs in the foreign chain. Defaults to `500` | `integer` +ORACLE_FOREIGN_EVENTS_REPROCESSING_BLOCK_DELAY | Block confirmations number, after which old logs are being reprocessed in the foreign chain. Defaults to `250` | `integer` ## Monitor configuration diff --git a/e2e-commons/components-envs/oracle-amb.env b/e2e-commons/components-envs/oracle-amb.env index a6b5cc08..bb3a0036 100644 --- a/e2e-commons/components-envs/oracle-amb.env +++ b/e2e-commons/components-envs/oracle-amb.env @@ -23,3 +23,9 @@ ORACLE_HOME_START_BLOCK=1 ORACLE_FOREIGN_START_BLOCK=1 ORACLE_HOME_TO_FOREIGN_BLOCK_LIST=/mono/oracle/access-lists/block_list.txt ORACLE_FOREIGN_ARCHIVE_RPC_URL=http://parity2:8545 +ORACLE_HOME_EVENTS_REPROCESSING=false +ORACLE_HOME_EVENTS_REPROCESSING_BATCH_SIZE=10 +ORACLE_HOME_EVENTS_REPROCESSING_BLOCK_DELAY=10 +ORACLE_FOREIGN_EVENTS_REPROCESSING=true +ORACLE_FOREIGN_EVENTS_REPROCESSING_BATCH_SIZE=10 +ORACLE_FOREIGN_EVENTS_REPROCESSING_BLOCK_DELAY=10 diff --git a/e2e-commons/components-envs/oracle-erc20-native.env b/e2e-commons/components-envs/oracle-erc20-native.env index e12f4f5d..13474687 100644 --- a/e2e-commons/components-envs/oracle-erc20-native.env +++ b/e2e-commons/components-envs/oracle-erc20-native.env @@ -22,3 +22,9 @@ ORACLE_ALLOW_HTTP_FOR_RPC=yes ORACLE_HOME_START_BLOCK=1 ORACLE_FOREIGN_START_BLOCK=1 ORACLE_HOME_TO_FOREIGN_BLOCK_LIST=/mono/oracle/access-lists/block_list.txt +ORACLE_HOME_EVENTS_REPROCESSING=true +ORACLE_HOME_EVENTS_REPROCESSING_BATCH_SIZE=10 +ORACLE_HOME_EVENTS_REPROCESSING_BLOCK_DELAY=10 +ORACLE_FOREIGN_EVENTS_REPROCESSING=true +ORACLE_FOREIGN_EVENTS_REPROCESSING_BATCH_SIZE=10 +ORACLE_FOREIGN_EVENTS_REPROCESSING_BLOCK_DELAY=10 diff --git a/oracle/config/base.config.js b/oracle/config/base.config.js index 6dc94902..35e52706 100644 --- a/oracle/config/base.config.js +++ b/oracle/config/base.config.js @@ -23,7 +23,13 @@ const { ORACLE_HOME_START_BLOCK, ORACLE_FOREIGN_START_BLOCK, ORACLE_HOME_RPC_BLOCK_POLLING_LIMIT, - ORACLE_FOREIGN_RPC_BLOCK_POLLING_LIMIT + ORACLE_FOREIGN_RPC_BLOCK_POLLING_LIMIT, + ORACLE_HOME_EVENTS_REPROCESSING, + ORACLE_HOME_EVENTS_REPROCESSING_BATCH_SIZE, + ORACLE_HOME_EVENTS_REPROCESSING_BLOCK_DELAY, + ORACLE_FOREIGN_EVENTS_REPROCESSING, + ORACLE_FOREIGN_EVENTS_REPROCESSING_BATCH_SIZE, + ORACLE_FOREIGN_EVENTS_REPROCESSING_BLOCK_DELAY } = process.env let homeAbi @@ -61,7 +67,12 @@ const homeConfig = { blockPollingLimit: parseInt(ORACLE_HOME_RPC_BLOCK_POLLING_LIMIT, 10), web3: web3Home, bridgeContract: homeContract, - eventContract: homeContract + eventContract: homeContract, + reprocessingOptions: { + enabled: ORACLE_HOME_EVENTS_REPROCESSING === 'true', + batchSize: parseInt(ORACLE_HOME_EVENTS_REPROCESSING_BATCH_SIZE, 10) || 1000, + blockDelay: parseInt(ORACLE_HOME_EVENTS_REPROCESSING_BLOCK_DELAY, 10) || 500 + } } const foreignContract = new web3Foreign.eth.Contract(foreignAbi, COMMON_FOREIGN_BRIDGE_ADDRESS) @@ -74,7 +85,12 @@ const foreignConfig = { blockPollingLimit: parseInt(ORACLE_FOREIGN_RPC_BLOCK_POLLING_LIMIT, 10), web3: web3Foreign, bridgeContract: foreignContract, - eventContract: foreignContract + eventContract: foreignContract, + reprocessingOptions: { + enabled: ORACLE_FOREIGN_EVENTS_REPROCESSING === 'true', + batchSize: parseInt(ORACLE_FOREIGN_EVENTS_REPROCESSING_BATCH_SIZE, 10) || 500, + blockDelay: parseInt(ORACLE_FOREIGN_EVENTS_REPROCESSING_BLOCK_DELAY, 10) || 250 + } } const maxProcessingTime = diff --git a/oracle/config/foreign-sender.config.js b/oracle/config/foreign-sender.config.js index 990ac78d..7971c745 100644 --- a/oracle/config/foreign-sender.config.js +++ b/oracle/config/foreign-sender.config.js @@ -8,7 +8,6 @@ const { ORACLE_FOREIGN_TX_RESEND_INTERVAL } = process.env module.exports = { ...baseConfig, queue: 'foreign-prioritized', - oldQueue: 'foreign', id: 'foreign', name: 'sender-foreign', web3: web3Foreign, diff --git a/oracle/config/home-sender.config.js b/oracle/config/home-sender.config.js index 0220efd1..70ac51a8 100644 --- a/oracle/config/home-sender.config.js +++ b/oracle/config/home-sender.config.js @@ -8,7 +8,6 @@ const { ORACLE_HOME_TX_RESEND_INTERVAL } = process.env module.exports = { ...baseConfig, queue: 'home-prioritized', - oldQueue: 'home', id: 'home', name: 'sender-home', web3: web3Home, diff --git a/oracle/src/sender.js b/oracle/src/sender.js index 7484c9d2..8eacfb61 100644 --- a/oracle/src/sender.js +++ b/oracle/src/sender.js @@ -55,7 +55,6 @@ async function initialize() { function connectQueue() { connectSenderToQueue({ queueName: config.queue, - oldQueueName: config.oldQueue, resendInterval: config.resendInterval, cb: options => { if (config.maxProcessingTime) { diff --git a/oracle/src/services/amqpClient.js b/oracle/src/services/amqpClient.js index b3cc147f..b2a4a177 100644 --- a/oracle/src/services/amqpClient.js +++ b/oracle/src/services/amqpClient.js @@ -40,23 +40,9 @@ function connectWatcherToQueue({ queueName, cb }) { cb({ sendToQueue, channel: channelWrapper }) } -function connectSenderToQueue({ queueName, oldQueueName, cb, resendInterval }) { +function connectSenderToQueue({ queueName, cb, resendInterval }) { const deadLetterExchange = `${queueName}-retry` - async function resendMessagesToNewQueue(channel) { - logger.info(`Trying to check messages in the old non-priority queue ${queueName}`) - while (true) { - const msg = await channel.get(oldQueueName) - if (msg === false) { - logger.info(`No messages in the old queue ${oldQueueName} left`) - break - } - logger.debug(`Message in the old queue ${oldQueueName} was found, redirecting it to the new queue ${queueName}`) - await channel.sendToQueue(queueName, msg.content, { persistent: true, priority: SENDER_QUEUE_SEND_PRIORITY }) - await channel.ack(msg) - } - } - const channelWrapper = connection.createChannel({ json: true }) @@ -64,7 +50,6 @@ function connectSenderToQueue({ queueName, oldQueueName, cb, resendInterval }) { channelWrapper.addSetup(async channel => { await channel.assertExchange(deadLetterExchange, 'fanout', { durable: true }) await channel.assertQueue(queueName, { durable: true, maxPriority: SENDER_QUEUE_MAX_PRIORITY }) - await channel.assertQueue(oldQueueName, { durable: true }).then(() => resendMessagesToNewQueue(channel)) await channel.bindQueue(queueName, deadLetterExchange) await channel.prefetch(1) await channel.consume(queueName, msg => diff --git a/oracle/src/tx/web3.js b/oracle/src/tx/web3.js index 33f98d4b..8ea58fbf 100644 --- a/oracle/src/tx/web3.js +++ b/oracle/src/tx/web3.js @@ -87,7 +87,7 @@ async function getEvents({ contract, event, fromBlock, toBlock, filter }) { ) const pastEvents = await contract.getPastEvents(event, { fromBlock, toBlock, filter }) logger.debug({ contractAddress, event, count: pastEvents.length }, 'Past events obtained') - return pastEvents + return pastEvents.sort((a, b) => a.blockNumber - b.blockNumber || a.transactionIndex - b.transactionIndex) } catch (e) { logger.error(e.message) throw new Error(`${event} events cannot be obtained`) diff --git a/oracle/src/utils/constants.js b/oracle/src/utils/constants.js index bed0d136..40ec0cb8 100644 --- a/oracle/src/utils/constants.js +++ b/oracle/src/utils/constants.js @@ -5,6 +5,7 @@ module.exports = { MIN_AMB_HEADER_LENGTH: 32 + 20 + 20 + 4 + 2 + 1 + 2, MAX_GAS_LIMIT: 10000000, MAX_CONCURRENT_EVENTS: 50, + MAX_HISTORY_BLOCK_TO_REPROCESS: 10000, RETRY_CONFIG: { retries: 20, factor: 1.4, diff --git a/oracle/src/watcher.js b/oracle/src/watcher.js index e0627042..22bb91ad 100644 --- a/oracle/src/watcher.js +++ b/oracle/src/watcher.js @@ -6,7 +6,11 @@ const logger = require('./services/logger') const { getShutdownFlag } = require('./services/shutdownState') const { getBlockNumber, getRequiredBlockConfirmations, getEvents } = require('./tx/web3') const { checkHTTPS, watchdog } = require('./utils/utils') -const { EXIT_CODES, BLOCK_NUMBER_PROGRESS_ITERATIONS_LIMIT } = require('./utils/constants') +const { + EXIT_CODES, + BLOCK_NUMBER_PROGRESS_ITERATIONS_LIMIT, + MAX_HISTORY_BLOCK_TO_REPROCESS +} = require('./utils/constants') if (process.argv.length < 3) { logger.error('Please check the number of arguments, config file was not provided') @@ -26,9 +30,12 @@ const processAMBInformationRequests = require('./events/processAMBInformationReq const { getTokensState } = require('./utils/tokenState') -const { web3, bridgeContract, eventContract, startBlock, pollingInterval, chain } = config.main +const { web3, bridgeContract, eventContract, startBlock, pollingInterval, chain, reprocessingOptions } = config.main const lastBlockRedisKey = `${config.id}:lastProcessedBlock` +const lastReprocessedBlockRedisKey = `${config.id}:lastReprocessedBlock` +const seenEventsRedisKey = `${config.id}:seenEvents` let lastProcessedBlock = Math.max(startBlock - 1, 0) +let lastReprocessedBlock let lastSeenBlockNumber = 0 let sameBlockNumberCounter = 0 @@ -39,6 +46,8 @@ async function initialize() { web3.currentProvider.urls.forEach(checkHttps(chain)) await getLastProcessedBlock() + await getLastReprocessedBlock() + await checkConditions() connectWatcherToQueue({ queueName: config.queue, cb: runMain @@ -76,11 +85,34 @@ async function getLastProcessedBlock() { lastProcessedBlock = result ? parseInt(result, 10) : lastProcessedBlock } +async function getLastReprocessedBlock() { + if (reprocessingOptions.enabled) { + const result = await redis.get(lastReprocessedBlockRedisKey) + if (result) { + lastReprocessedBlock = Math.max(parseInt(result, 10), lastProcessedBlock - MAX_HISTORY_BLOCK_TO_REPROCESS) + } else { + lastReprocessedBlock = lastProcessedBlock + } + logger.debug({ block: lastReprocessedBlock }, 'Last reprocessed block obtained') + } else { + // when reprocessing is being enabled not for the first time, + // we do not want to process blocks for which we didn't recorded seen events, + // instead, we want to start from the current block. + // Thus we should delete this reprocessing pointer once it is disabled. + await redis.del(lastReprocessedBlockRedisKey) + } +} + function updateLastProcessedBlock(lastBlockNumber) { lastProcessedBlock = lastBlockNumber return redis.set(lastBlockRedisKey, lastProcessedBlock) } +function updateLastReprocessedBlock(lastBlockNumber) { + lastReprocessedBlock = lastBlockNumber + return redis.set(lastReprocessedBlockRedisKey, lastReprocessedBlock) +} + function processEvents(events) { switch (config.id) { case 'erc-native-signature-request': @@ -114,6 +146,71 @@ async function checkConditions() { } } +const eventKey = e => `${e.transactionHash}-${e.logIndex}` + +async function reprocessOldLogs(sendToQueue) { + const fromBlock = lastReprocessedBlock + 1 + let toBlock = lastReprocessedBlock + reprocessingOptions.batchSize + const events = await getEvents({ + contract: eventContract, + event: config.event, + fromBlock, + toBlock, + filter: config.eventFilter + }) + const alreadySeenEvents = await getSeenEvents(fromBlock, toBlock) + const missingEvents = events.filter(e => !alreadySeenEvents[eventKey(e)]) + if (missingEvents.length === 0) { + logger.debug('No missed events were found') + } else { + logger.info(`Found ${missingEvents.length} ${config.event} missed events`) + let job + if (config.id === 'amb-information-request') { + // obtain block number and events from the earliest block + const batchBlockNumber = missingEvents[0].blockNumber + const batchEvents = missingEvents.filter(event => event.blockNumber === batchBlockNumber) + + // if there are some other events in the later blocks, + // adjust lastReprocessedBlock so that these events will be processed again on the next iteration + if (batchEvents.length < missingEvents.length) { + // pick event outside from the batch + toBlock = missingEvents[batchEvents.length].blockNumber - 1 + } + + job = await processAMBInformationRequests(batchEvents) + if (job === null) { + return + } + } else { + job = await processEvents(missingEvents) + } + logger.info('Missed events transactions to send:', job.length) + if (job.length) { + await sendToQueue(job) + } + } + + await updateLastReprocessedBlock(toBlock) + await deleteSeenEvents(0, toBlock) +} + +async function getSeenEvents(fromBlock, toBlock) { + const keys = await redis.zrangebyscore(seenEventsRedisKey, fromBlock, toBlock) + const res = {} + keys.forEach(k => { + res[k] = true + }) + return res +} + +function deleteSeenEvents(fromBlock, toBlock) { + return redis.zremrangebyscore(seenEventsRedisKey, fromBlock, toBlock) +} + +function addSeenEvents(events) { + return redis.zadd(seenEventsRedisKey, ...events.flatMap(e => [e.blockNumber, eventKey(e)])) +} + async function getLastBlockToProcess(web3, bridgeContract) { const [lastBlockNumber, requiredBlockConfirmations] = await Promise.all([ getBlockNumber(web3), @@ -158,24 +255,29 @@ async function main({ sendToQueue }) { const lastBlockToProcess = await getLastBlockToProcess(web3, bridgeContract) + if (reprocessingOptions.enabled) { + if (lastReprocessedBlock + reprocessingOptions.batchSize + reprocessingOptions.blockDelay < lastBlockToProcess) { + await reprocessOldLogs(sendToQueue) + return + } + } + if (lastBlockToProcess <= lastProcessedBlock) { logger.debug('All blocks already processed') return } - await checkConditions() - const fromBlock = lastProcessedBlock + 1 const rangeEndBlock = config.blockPollingLimit ? fromBlock + config.blockPollingLimit : lastBlockToProcess let toBlock = Math.min(lastBlockToProcess, rangeEndBlock) - const events = (await getEvents({ + let events = await getEvents({ contract: eventContract, event: config.event, fromBlock, toBlock, filter: config.eventFilter - })).sort((a, b) => a.blockNumber - b.blockNumber) + }) logger.info(`Found ${events.length} ${config.event} events`) if (events.length) { @@ -192,9 +294,10 @@ async function main({ sendToQueue }) { if (batchEvents.length < events.length) { // pick event outside from the batch toBlock = events[batchEvents.length].blockNumber - 1 + events = batchEvents } - job = await processAMBInformationRequests(batchEvents) + job = await processAMBInformationRequests(events) if (job === null) { return } @@ -206,6 +309,9 @@ async function main({ sendToQueue }) { if (job.length) { await sendToQueue(job) } + if (reprocessingOptions.enabled) { + await addSeenEvents(events) + } } logger.debug({ lastProcessedBlock: toBlock.toString() }, 'Updating last processed block')