From 064c481c02f6c11038ba26aa393050764937da6f Mon Sep 17 00:00:00 2001 From: gozzy Date: Fri, 2 Dec 2022 21:02:43 +0000 Subject: [PATCH] event retry & err conditioning --- services/events.js | 70 +++++++++++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 29 deletions(-) diff --git a/services/events.js b/services/events.js index 7d951f2..7fee968 100644 --- a/services/events.js +++ b/services/events.js @@ -292,6 +292,8 @@ class EventService { i !== 5, i ) + } else { + throw new Error(`Failed to fetch block ${toBlock}`) } } @@ -310,23 +312,16 @@ class EventService { } } - createBatchRequest({ batchIndex, batchSize, batchBlocks, blockDenom, type }) { - return new Array(batchSize).fill('').map( - (_, i) => + createBatchRequest(batchArray) { + return batchArray.map( + (e, i) => new Promise(async (resolve) => { - const toBlock = batchBlocks[batchIndex * batchSize + i] - const fromBlock = toBlock - blockDenom - - const batchEvents = await this.getEventsPartFromRpc( - { - fromBlock, - toBlock, - type - }, - true - ) - - resolve(batchEvents.events) + try { + const { events } = await this.getEventsPartFromRpc({ ...e }, true) + resolve(events) + } catch (e) { + resolve({ isFailedBatch: true, ...e }) + } }) ) } @@ -335,30 +330,47 @@ class EventService { try { const batchSize = 10 const blockRange = 10000 - const { blockDifference, currentBlockNumber } = await this.getBlocksDiff({ fromBlock }) + let [events, failed] = [[], []] + let lastBlock = fromBlock + + const { blockDifference, currentBlockNumber } = await this.getBlocksDiff({ fromBlock }) const batchDigest = blockDifference === 0 ? 1 : Math.ceil(blockDifference / blockRange) + const blockDenom = Math.ceil(blockDifference / batchDigest) const batchCount = Math.ceil(batchDigest / batchSize) - const blocks = new Array(batchCount * batchSize).fill('') - const batchBlocks = blocks.map((_, i) => (i + 1) * blockDenom + fromBlock) - - let events = [] - if (fromBlock < currentBlockNumber) { - this.updateEventProgress(0, type) + await this.updateEventProgress(0, type) for (let batchIndex = 0; batchIndex < batchCount; batchIndex++) { - const batch = await Promise.all( - this.createBatchRequest({ batchIndex, batchBlocks, blockDenom, batchSize, type }) - ) + const isLastBatch = batchIndex === batchCount - 1 + const params = new Array(batchSize).fill('').map((_, i) => { + const toBlock = (i + 1) * blockDenom + lastBlock + const fromBlock = toBlock - blockDenom + return { fromBlock, toBlock, type } + }) + const batch = await Promise.all(this.createBatchRequest(params)) - this.updateEventProgress(batchIndex / batchCount, type) - events = events.concat(batch) + lastBlock = params[batchSize - 1].toBlock + events = events.concat(batch.filter((e) => !e.isFailedBatch)) + failed = failed.concat(batch.filter((e) => e.isFailedBatch)) + + const progressIndex = batchIndex - failed.length / batchSize + + if (isLastBatch && failed.length !== 0) { + const fbatch = await Promise.all(this.createBatchRequest(failed)) + const isFailedBatch = fbatch.filter((e) => e.isFailedBatch).length !== 0 + + if (isFailedBatch) { + throw new Error('Failed to batch events') + } else { + events = events.concat(fbatch) + } + } + await this.updateEventProgress(progressIndex / batchCount, type) await sleep(200) } - events = flattenNArray(events) return {