Refetch old logs ranges to see if there are missed events (#627)

This commit is contained in:
Kirill Fedoseev 2022-01-03 19:32:38 +07:00 committed by GitHub
parent b17fff2b56
commit 296e5c5a22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 153 additions and 30 deletions

@ -53,6 +53,12 @@ ORACLE_SHUTDOWN_CONTRACT_METHOD | Method signature to be used in the side chain
ORACLE_FOREIGN_RPC_BLOCK_POLLING_LIMIT | Max length for the block range used in `eth_getLogs` requests for polling contract events for the Foreign chain. Infinite, if not provided. | `integer` ORACLE_FOREIGN_RPC_BLOCK_POLLING_LIMIT | Max length for the block range used in `eth_getLogs` requests for polling contract events for the Foreign chain. Infinite, if not provided. | `integer`
ORACLE_HOME_RPC_BLOCK_POLLING_LIMIT | Max length for the block range used in `eth_getLogs` requests for polling contract events for the Home chain. Infinite, if not provided. | `integer` ORACLE_HOME_RPC_BLOCK_POLLING_LIMIT | Max length for the block range used in `eth_getLogs` requests for polling contract events for the Home chain. Infinite, if not provided. | `integer`
ORACLE_JSONRPC_ERROR_CODES | Override default JSON rpc error codes that can trigger RPC fallback to the next URL from the list (or a retry in case of a single RPC URL). Default is `-32603,-32002,-32005`. Should be a comma-separated list of negative integers. | `string` ORACLE_JSONRPC_ERROR_CODES | Override default JSON rpc error codes that can trigger RPC fallback to the next URL from the list (or a retry in case of a single RPC URL). Default is `-32603,-32002,-32005`. Should be a comma-separated list of negative integers. | `string`
ORACLE_HOME_EVENTS_REPROCESSING | If set to `true`, home events happened in the past will be refetched and processed once again, to ensure that nothing was missed on the first pass. | `bool`
ORACLE_HOME_EVENTS_REPROCESSING_BATCH_SIZE | Batch size for one `eth_getLogs` request when reprocessing old logs in the home chain. Defaults to `1000` | `integer`
ORACLE_HOME_EVENTS_REPROCESSING_BLOCK_DELAY | Block confirmations number, after which old logs are being reprocessed in the home chain. Defaults to `500` | `integer`
ORACLE_FOREIGN_EVENTS_REPROCESSING | If set to `true`, foreign events happened in the past will be refetched and processed once again, to ensure that nothing was missed on the first pass. | `bool`
ORACLE_FOREIGN_EVENTS_REPROCESSING_BATCH_SIZE | Batch size for one `eth_getLogs` request when reprocessing old logs in the foreign chain. Defaults to `500` | `integer`
ORACLE_FOREIGN_EVENTS_REPROCESSING_BLOCK_DELAY | Block confirmations number, after which old logs are being reprocessed in the foreign chain. Defaults to `250` | `integer`
## Monitor configuration ## Monitor configuration

@ -23,3 +23,9 @@ ORACLE_HOME_START_BLOCK=1
ORACLE_FOREIGN_START_BLOCK=1 ORACLE_FOREIGN_START_BLOCK=1
ORACLE_HOME_TO_FOREIGN_BLOCK_LIST=/mono/oracle/access-lists/block_list.txt ORACLE_HOME_TO_FOREIGN_BLOCK_LIST=/mono/oracle/access-lists/block_list.txt
ORACLE_FOREIGN_ARCHIVE_RPC_URL=http://parity2:8545 ORACLE_FOREIGN_ARCHIVE_RPC_URL=http://parity2:8545
ORACLE_HOME_EVENTS_REPROCESSING=false
ORACLE_HOME_EVENTS_REPROCESSING_BATCH_SIZE=10
ORACLE_HOME_EVENTS_REPROCESSING_BLOCK_DELAY=10
ORACLE_FOREIGN_EVENTS_REPROCESSING=true
ORACLE_FOREIGN_EVENTS_REPROCESSING_BATCH_SIZE=10
ORACLE_FOREIGN_EVENTS_REPROCESSING_BLOCK_DELAY=10

@ -22,3 +22,9 @@ ORACLE_ALLOW_HTTP_FOR_RPC=yes
ORACLE_HOME_START_BLOCK=1 ORACLE_HOME_START_BLOCK=1
ORACLE_FOREIGN_START_BLOCK=1 ORACLE_FOREIGN_START_BLOCK=1
ORACLE_HOME_TO_FOREIGN_BLOCK_LIST=/mono/oracle/access-lists/block_list.txt ORACLE_HOME_TO_FOREIGN_BLOCK_LIST=/mono/oracle/access-lists/block_list.txt
ORACLE_HOME_EVENTS_REPROCESSING=true
ORACLE_HOME_EVENTS_REPROCESSING_BATCH_SIZE=10
ORACLE_HOME_EVENTS_REPROCESSING_BLOCK_DELAY=10
ORACLE_FOREIGN_EVENTS_REPROCESSING=true
ORACLE_FOREIGN_EVENTS_REPROCESSING_BATCH_SIZE=10
ORACLE_FOREIGN_EVENTS_REPROCESSING_BLOCK_DELAY=10

@ -23,7 +23,13 @@ const {
ORACLE_HOME_START_BLOCK, ORACLE_HOME_START_BLOCK,
ORACLE_FOREIGN_START_BLOCK, ORACLE_FOREIGN_START_BLOCK,
ORACLE_HOME_RPC_BLOCK_POLLING_LIMIT, ORACLE_HOME_RPC_BLOCK_POLLING_LIMIT,
ORACLE_FOREIGN_RPC_BLOCK_POLLING_LIMIT ORACLE_FOREIGN_RPC_BLOCK_POLLING_LIMIT,
ORACLE_HOME_EVENTS_REPROCESSING,
ORACLE_HOME_EVENTS_REPROCESSING_BATCH_SIZE,
ORACLE_HOME_EVENTS_REPROCESSING_BLOCK_DELAY,
ORACLE_FOREIGN_EVENTS_REPROCESSING,
ORACLE_FOREIGN_EVENTS_REPROCESSING_BATCH_SIZE,
ORACLE_FOREIGN_EVENTS_REPROCESSING_BLOCK_DELAY
} = process.env } = process.env
let homeAbi let homeAbi
@ -61,7 +67,12 @@ const homeConfig = {
blockPollingLimit: parseInt(ORACLE_HOME_RPC_BLOCK_POLLING_LIMIT, 10), blockPollingLimit: parseInt(ORACLE_HOME_RPC_BLOCK_POLLING_LIMIT, 10),
web3: web3Home, web3: web3Home,
bridgeContract: homeContract, bridgeContract: homeContract,
eventContract: homeContract eventContract: homeContract,
reprocessingOptions: {
enabled: ORACLE_HOME_EVENTS_REPROCESSING === 'true',
batchSize: parseInt(ORACLE_HOME_EVENTS_REPROCESSING_BATCH_SIZE, 10) || 1000,
blockDelay: parseInt(ORACLE_HOME_EVENTS_REPROCESSING_BLOCK_DELAY, 10) || 500
}
} }
const foreignContract = new web3Foreign.eth.Contract(foreignAbi, COMMON_FOREIGN_BRIDGE_ADDRESS) const foreignContract = new web3Foreign.eth.Contract(foreignAbi, COMMON_FOREIGN_BRIDGE_ADDRESS)
@ -74,7 +85,12 @@ const foreignConfig = {
blockPollingLimit: parseInt(ORACLE_FOREIGN_RPC_BLOCK_POLLING_LIMIT, 10), blockPollingLimit: parseInt(ORACLE_FOREIGN_RPC_BLOCK_POLLING_LIMIT, 10),
web3: web3Foreign, web3: web3Foreign,
bridgeContract: foreignContract, bridgeContract: foreignContract,
eventContract: foreignContract eventContract: foreignContract,
reprocessingOptions: {
enabled: ORACLE_FOREIGN_EVENTS_REPROCESSING === 'true',
batchSize: parseInt(ORACLE_FOREIGN_EVENTS_REPROCESSING_BATCH_SIZE, 10) || 500,
blockDelay: parseInt(ORACLE_FOREIGN_EVENTS_REPROCESSING_BLOCK_DELAY, 10) || 250
}
} }
const maxProcessingTime = const maxProcessingTime =

@ -8,7 +8,6 @@ const { ORACLE_FOREIGN_TX_RESEND_INTERVAL } = process.env
module.exports = { module.exports = {
...baseConfig, ...baseConfig,
queue: 'foreign-prioritized', queue: 'foreign-prioritized',
oldQueue: 'foreign',
id: 'foreign', id: 'foreign',
name: 'sender-foreign', name: 'sender-foreign',
web3: web3Foreign, web3: web3Foreign,

@ -8,7 +8,6 @@ const { ORACLE_HOME_TX_RESEND_INTERVAL } = process.env
module.exports = { module.exports = {
...baseConfig, ...baseConfig,
queue: 'home-prioritized', queue: 'home-prioritized',
oldQueue: 'home',
id: 'home', id: 'home',
name: 'sender-home', name: 'sender-home',
web3: web3Home, web3: web3Home,

@ -55,7 +55,6 @@ async function initialize() {
function connectQueue() { function connectQueue() {
connectSenderToQueue({ connectSenderToQueue({
queueName: config.queue, queueName: config.queue,
oldQueueName: config.oldQueue,
resendInterval: config.resendInterval, resendInterval: config.resendInterval,
cb: options => { cb: options => {
if (config.maxProcessingTime) { if (config.maxProcessingTime) {

@ -40,23 +40,9 @@ function connectWatcherToQueue({ queueName, cb }) {
cb({ sendToQueue, channel: channelWrapper }) cb({ sendToQueue, channel: channelWrapper })
} }
function connectSenderToQueue({ queueName, oldQueueName, cb, resendInterval }) { function connectSenderToQueue({ queueName, cb, resendInterval }) {
const deadLetterExchange = `${queueName}-retry` const deadLetterExchange = `${queueName}-retry`
async function resendMessagesToNewQueue(channel) {
logger.info(`Trying to check messages in the old non-priority queue ${queueName}`)
while (true) {
const msg = await channel.get(oldQueueName)
if (msg === false) {
logger.info(`No messages in the old queue ${oldQueueName} left`)
break
}
logger.debug(`Message in the old queue ${oldQueueName} was found, redirecting it to the new queue ${queueName}`)
await channel.sendToQueue(queueName, msg.content, { persistent: true, priority: SENDER_QUEUE_SEND_PRIORITY })
await channel.ack(msg)
}
}
const channelWrapper = connection.createChannel({ const channelWrapper = connection.createChannel({
json: true json: true
}) })
@ -64,7 +50,6 @@ function connectSenderToQueue({ queueName, oldQueueName, cb, resendInterval }) {
channelWrapper.addSetup(async channel => { channelWrapper.addSetup(async channel => {
await channel.assertExchange(deadLetterExchange, 'fanout', { durable: true }) await channel.assertExchange(deadLetterExchange, 'fanout', { durable: true })
await channel.assertQueue(queueName, { durable: true, maxPriority: SENDER_QUEUE_MAX_PRIORITY }) await channel.assertQueue(queueName, { durable: true, maxPriority: SENDER_QUEUE_MAX_PRIORITY })
await channel.assertQueue(oldQueueName, { durable: true }).then(() => resendMessagesToNewQueue(channel))
await channel.bindQueue(queueName, deadLetterExchange) await channel.bindQueue(queueName, deadLetterExchange)
await channel.prefetch(1) await channel.prefetch(1)
await channel.consume(queueName, msg => await channel.consume(queueName, msg =>

@ -87,7 +87,7 @@ async function getEvents({ contract, event, fromBlock, toBlock, filter }) {
) )
const pastEvents = await contract.getPastEvents(event, { fromBlock, toBlock, filter }) const pastEvents = await contract.getPastEvents(event, { fromBlock, toBlock, filter })
logger.debug({ contractAddress, event, count: pastEvents.length }, 'Past events obtained') logger.debug({ contractAddress, event, count: pastEvents.length }, 'Past events obtained')
return pastEvents return pastEvents.sort((a, b) => a.blockNumber - b.blockNumber || a.transactionIndex - b.transactionIndex)
} catch (e) { } catch (e) {
logger.error(e.message) logger.error(e.message)
throw new Error(`${event} events cannot be obtained`) throw new Error(`${event} events cannot be obtained`)

@ -5,6 +5,7 @@ module.exports = {
MIN_AMB_HEADER_LENGTH: 32 + 20 + 20 + 4 + 2 + 1 + 2, MIN_AMB_HEADER_LENGTH: 32 + 20 + 20 + 4 + 2 + 1 + 2,
MAX_GAS_LIMIT: 10000000, MAX_GAS_LIMIT: 10000000,
MAX_CONCURRENT_EVENTS: 50, MAX_CONCURRENT_EVENTS: 50,
MAX_HISTORY_BLOCK_TO_REPROCESS: 10000,
RETRY_CONFIG: { RETRY_CONFIG: {
retries: 20, retries: 20,
factor: 1.4, factor: 1.4,

@ -6,7 +6,11 @@ const logger = require('./services/logger')
const { getShutdownFlag } = require('./services/shutdownState') const { getShutdownFlag } = require('./services/shutdownState')
const { getBlockNumber, getRequiredBlockConfirmations, getEvents } = require('./tx/web3') const { getBlockNumber, getRequiredBlockConfirmations, getEvents } = require('./tx/web3')
const { checkHTTPS, watchdog } = require('./utils/utils') const { checkHTTPS, watchdog } = require('./utils/utils')
const { EXIT_CODES, BLOCK_NUMBER_PROGRESS_ITERATIONS_LIMIT } = require('./utils/constants') const {
EXIT_CODES,
BLOCK_NUMBER_PROGRESS_ITERATIONS_LIMIT,
MAX_HISTORY_BLOCK_TO_REPROCESS
} = require('./utils/constants')
if (process.argv.length < 3) { if (process.argv.length < 3) {
logger.error('Please check the number of arguments, config file was not provided') logger.error('Please check the number of arguments, config file was not provided')
@ -26,9 +30,12 @@ const processAMBInformationRequests = require('./events/processAMBInformationReq
const { getTokensState } = require('./utils/tokenState') const { getTokensState } = require('./utils/tokenState')
const { web3, bridgeContract, eventContract, startBlock, pollingInterval, chain } = config.main const { web3, bridgeContract, eventContract, startBlock, pollingInterval, chain, reprocessingOptions } = config.main
const lastBlockRedisKey = `${config.id}:lastProcessedBlock` const lastBlockRedisKey = `${config.id}:lastProcessedBlock`
const lastReprocessedBlockRedisKey = `${config.id}:lastReprocessedBlock`
const seenEventsRedisKey = `${config.id}:seenEvents`
let lastProcessedBlock = Math.max(startBlock - 1, 0) let lastProcessedBlock = Math.max(startBlock - 1, 0)
let lastReprocessedBlock
let lastSeenBlockNumber = 0 let lastSeenBlockNumber = 0
let sameBlockNumberCounter = 0 let sameBlockNumberCounter = 0
@ -39,6 +46,8 @@ async function initialize() {
web3.currentProvider.urls.forEach(checkHttps(chain)) web3.currentProvider.urls.forEach(checkHttps(chain))
await getLastProcessedBlock() await getLastProcessedBlock()
await getLastReprocessedBlock()
await checkConditions()
connectWatcherToQueue({ connectWatcherToQueue({
queueName: config.queue, queueName: config.queue,
cb: runMain cb: runMain
@ -76,11 +85,34 @@ async function getLastProcessedBlock() {
lastProcessedBlock = result ? parseInt(result, 10) : lastProcessedBlock lastProcessedBlock = result ? parseInt(result, 10) : lastProcessedBlock
} }
async function getLastReprocessedBlock() {
if (reprocessingOptions.enabled) {
const result = await redis.get(lastReprocessedBlockRedisKey)
if (result) {
lastReprocessedBlock = Math.max(parseInt(result, 10), lastProcessedBlock - MAX_HISTORY_BLOCK_TO_REPROCESS)
} else {
lastReprocessedBlock = lastProcessedBlock
}
logger.debug({ block: lastReprocessedBlock }, 'Last reprocessed block obtained')
} else {
// when reprocessing is being enabled not for the first time,
// we do not want to process blocks for which we didn't recorded seen events,
// instead, we want to start from the current block.
// Thus we should delete this reprocessing pointer once it is disabled.
await redis.del(lastReprocessedBlockRedisKey)
}
}
function updateLastProcessedBlock(lastBlockNumber) { function updateLastProcessedBlock(lastBlockNumber) {
lastProcessedBlock = lastBlockNumber lastProcessedBlock = lastBlockNumber
return redis.set(lastBlockRedisKey, lastProcessedBlock) return redis.set(lastBlockRedisKey, lastProcessedBlock)
} }
function updateLastReprocessedBlock(lastBlockNumber) {
lastReprocessedBlock = lastBlockNumber
return redis.set(lastReprocessedBlockRedisKey, lastReprocessedBlock)
}
function processEvents(events) { function processEvents(events) {
switch (config.id) { switch (config.id) {
case 'erc-native-signature-request': case 'erc-native-signature-request':
@ -114,6 +146,71 @@ async function checkConditions() {
} }
} }
const eventKey = e => `${e.transactionHash}-${e.logIndex}`
async function reprocessOldLogs(sendToQueue) {
const fromBlock = lastReprocessedBlock + 1
let toBlock = lastReprocessedBlock + reprocessingOptions.batchSize
const events = await getEvents({
contract: eventContract,
event: config.event,
fromBlock,
toBlock,
filter: config.eventFilter
})
const alreadySeenEvents = await getSeenEvents(fromBlock, toBlock)
const missingEvents = events.filter(e => !alreadySeenEvents[eventKey(e)])
if (missingEvents.length === 0) {
logger.debug('No missed events were found')
} else {
logger.info(`Found ${missingEvents.length} ${config.event} missed events`)
let job
if (config.id === 'amb-information-request') {
// obtain block number and events from the earliest block
const batchBlockNumber = missingEvents[0].blockNumber
const batchEvents = missingEvents.filter(event => event.blockNumber === batchBlockNumber)
// if there are some other events in the later blocks,
// adjust lastReprocessedBlock so that these events will be processed again on the next iteration
if (batchEvents.length < missingEvents.length) {
// pick event outside from the batch
toBlock = missingEvents[batchEvents.length].blockNumber - 1
}
job = await processAMBInformationRequests(batchEvents)
if (job === null) {
return
}
} else {
job = await processEvents(missingEvents)
}
logger.info('Missed events transactions to send:', job.length)
if (job.length) {
await sendToQueue(job)
}
}
await updateLastReprocessedBlock(toBlock)
await deleteSeenEvents(0, toBlock)
}
async function getSeenEvents(fromBlock, toBlock) {
const keys = await redis.zrangebyscore(seenEventsRedisKey, fromBlock, toBlock)
const res = {}
keys.forEach(k => {
res[k] = true
})
return res
}
function deleteSeenEvents(fromBlock, toBlock) {
return redis.zremrangebyscore(seenEventsRedisKey, fromBlock, toBlock)
}
function addSeenEvents(events) {
return redis.zadd(seenEventsRedisKey, ...events.flatMap(e => [e.blockNumber, eventKey(e)]))
}
async function getLastBlockToProcess(web3, bridgeContract) { async function getLastBlockToProcess(web3, bridgeContract) {
const [lastBlockNumber, requiredBlockConfirmations] = await Promise.all([ const [lastBlockNumber, requiredBlockConfirmations] = await Promise.all([
getBlockNumber(web3), getBlockNumber(web3),
@ -158,24 +255,29 @@ async function main({ sendToQueue }) {
const lastBlockToProcess = await getLastBlockToProcess(web3, bridgeContract) const lastBlockToProcess = await getLastBlockToProcess(web3, bridgeContract)
if (reprocessingOptions.enabled) {
if (lastReprocessedBlock + reprocessingOptions.batchSize + reprocessingOptions.blockDelay < lastBlockToProcess) {
await reprocessOldLogs(sendToQueue)
return
}
}
if (lastBlockToProcess <= lastProcessedBlock) { if (lastBlockToProcess <= lastProcessedBlock) {
logger.debug('All blocks already processed') logger.debug('All blocks already processed')
return return
} }
await checkConditions()
const fromBlock = lastProcessedBlock + 1 const fromBlock = lastProcessedBlock + 1
const rangeEndBlock = config.blockPollingLimit ? fromBlock + config.blockPollingLimit : lastBlockToProcess const rangeEndBlock = config.blockPollingLimit ? fromBlock + config.blockPollingLimit : lastBlockToProcess
let toBlock = Math.min(lastBlockToProcess, rangeEndBlock) let toBlock = Math.min(lastBlockToProcess, rangeEndBlock)
const events = (await getEvents({ let events = await getEvents({
contract: eventContract, contract: eventContract,
event: config.event, event: config.event,
fromBlock, fromBlock,
toBlock, toBlock,
filter: config.eventFilter filter: config.eventFilter
})).sort((a, b) => a.blockNumber - b.blockNumber) })
logger.info(`Found ${events.length} ${config.event} events`) logger.info(`Found ${events.length} ${config.event} events`)
if (events.length) { if (events.length) {
@ -192,9 +294,10 @@ async function main({ sendToQueue }) {
if (batchEvents.length < events.length) { if (batchEvents.length < events.length) {
// pick event outside from the batch // pick event outside from the batch
toBlock = events[batchEvents.length].blockNumber - 1 toBlock = events[batchEvents.length].blockNumber - 1
events = batchEvents
} }
job = await processAMBInformationRequests(batchEvents) job = await processAMBInformationRequests(events)
if (job === null) { if (job === null) {
return return
} }
@ -206,6 +309,9 @@ async function main({ sendToQueue }) {
if (job.length) { if (job.length) {
await sendToQueue(job) await sendToQueue(job)
} }
if (reprocessingOptions.enabled) {
await addSeenEvents(events)
}
} }
logger.debug({ lastProcessedBlock: toBlock.toString() }, 'Updating last processed block') logger.debug({ lastProcessedBlock: toBlock.toString() }, 'Updating last processed block')