From eb59acd31961f5ef2c766919b99383500eb928aa Mon Sep 17 00:00:00 2001 From: gozzy Date: Sun, 16 Apr 2023 02:30:43 +0000 Subject: [PATCH] event promise chaining --- services/events.js | 124 ++++++++++++++++++------------------- services/registry/index.js | 95 ++++++++++++++++++---------- 2 files changed, 124 insertions(+), 95 deletions(-) diff --git a/services/events.js b/services/events.js index b76880e..2c0f9bf 100644 --- a/services/events.js +++ b/services/events.js @@ -80,7 +80,6 @@ class EventService { } return a.blockNumber - b.blockNumber }) - const lastBlock = allEvents[allEvents.length - 1].blockNumber this.saveEvents({ events: allEvents, lastBlock, type }) @@ -247,66 +246,61 @@ class EventService { } } - getPastEvents({ fromBlock, toBlock, type }) { + getPastEvents({ fromBlock, toBlock, type }, shouldRetry = false, i = 0) { return new Promise((resolve, reject) => { - const repsonse = this.contract.getPastEvents(capitalizeFirstLetter(type), { - fromBlock, - toBlock - }) + this.contract + .getPastEvents(capitalizeFirstLetter(type), { + fromBlock, + toBlock + }) + .then((events) => resolve(events)) + .catch((err) => { + i = i + 1 + // maximum 5 second buffer for rate-limiting + if (shouldRetry) { + const isRetry = i !== 5 - if (repsonse) { - resolve(repsonse) - } else { - reject(new Error()) - } + sleep(1000 * i).then(() => + this.getPastEvents({ fromBlock, toBlock, type }, isRetry, i) + .then((events) => resolve(events)) + .catch((_) => resolve(undefined)) + ) + } else { + reject(new Error(err)) + } + }) }) } - async getEventsPartFromRpc({ fromBlock, toBlock, type }, shouldRetry = false, i = 0) { + async getEventsPartFromRpc(parameters, shouldRetry = false) { try { + const { fromBlock, type } = parameters const { currentBlockNumber } = await this.getBlocksDiff({ fromBlock }) - if (fromBlock > currentBlockNumber) { - return { - events: [], - lastBlock: fromBlock - } - } + if (fromBlock < currentBlockNumber) { + const eventsPart = await this.getPastEvents(parameters, shouldRetry) - let events = [] - - try { - events = await this.getPastEvents({ fromBlock, toBlock, type }) - } catch (e) { - if (shouldRetry) { - i = i + 1 - // maximum 5 second buffer for rate-limiting - await sleep(1000 * i) - - events = await this.getEventsPartFromRpc( - { - fromBlock, - toBlock, - type - }, - i !== 5, - i - ) + if (eventsPart) { + if (eventsPart.length > 0) { + return { + events: formatEvents(eventsPart, type), + lastBlock: eventsPart[eventsPart.length - 1].blockNumber + } + } else { + return { + events: [], + lastBlock: fromBlock + } + } } else { - throw new Error(`Failed to fetch block ${toBlock}`) + return undefined } - } - - if (!events?.length) { + } else { return { events: [], lastBlock: fromBlock } } - return { - events: formatEvents(events, type), - lastBlock: events[events.length - 1].blockNumber - } } catch (err) { return undefined } @@ -315,15 +309,17 @@ class EventService { createBatchRequest(batchArray) { return batchArray.map( (e, i) => - new Promise(async (resolve) => { - try { - sleep(20 * i) - const { events } = await this.getEventsPartFromRpc({ ...e }, true) - resolve(events) - } catch (e) { - resolve({ isFailedBatch: true, ...e }) - } - }) + new Promise((resolve) => + sleep(20 * i).then(() => + this.getEventsPartFromRpc({ ...e }, true).then((batch) => { + if (!batch) { + resolve([{ isFailedBatch: true, ...e }]) + } else { + resolve(batch.events) + } + }) + ) + ) ) } @@ -352,33 +348,35 @@ class EventService { return { fromBlock, toBlock, type } }) const batch = await Promise.all(this.createBatchRequest(params)) + const requests = flattenNArray(batch) + events = events.concat(requests.filter((e) => !e.isFailedBatch)) + failed = failed.concat(requests.filter((e) => e.isFailedBatch)) lastBlock = params[batchSize - 1].toBlock - events = events.concat(batch.filter((e) => !e.isFailedBatch)) - failed = failed.concat(batch.filter((e) => e.isFailedBatch)) const progressIndex = batchIndex - failed.length / batchSize if (isLastBatch && failed.length !== 0) { - const fbatch = await Promise.all(this.createBatchRequest(failed)) - const isFailedBatch = fbatch.filter((e) => e.isFailedBatch).length !== 0 + const failedBatch = await Promise.all(this.createBatchRequest(failed)) + const failedReqs = flattenNArray(failedBatch) + const failedRept = failedReqs.filter((e) => e.isFailedBatch) - if (isFailedBatch) { - throw new Error('Failed to batch events') + if (failedRept.length === 0) { + events = events.concat(failedReqs) } else { - events = events.concat(fbatch) + throw new Error('Failed to batch events') } } await this.updateEventProgress(progressIndex / batchCount, type) } - events = flattenNArray(events) return { lastBlock: events[events.length - 1].blockNumber, events } + } else { + return undefined } - return undefined } catch (err) { return undefined } diff --git a/services/registry/index.js b/services/registry/index.js index 81cd84d..056224a 100644 --- a/services/registry/index.js +++ b/services/registry/index.js @@ -28,53 +28,83 @@ class RelayerRegister { fetchEvents = ({ fromBlock, toBlock }, shouldRetry = false) => { return new Promise((resolve, reject) => { if (fromBlock <= toBlock) { - try { - const registeredEventsPart = this.relayerRegistry.getPastEvents('RelayerRegistered', { - fromBlock, - toBlock + this.relayerRegistry + .getPastEvents('RelayerRegistered', { fromBlock, toBlock }) + .then((events) => resolve(events)) + .catch((_) => { + if (shouldRetry) { + sleep(500).then(() => + this.fetchEvents({ fromBlock, toBlock }) + .then((events) => resolve(events)) + .catch((_) => resolve(undefined)) + ) + } else { + resolve(undefined) + } }) - - resolve(registeredEventsPart) - } catch (error) { - if (shouldRetry) { - sleep(500) - - const events = this.fetchEvents({ fromBlock, toBlock }) - - resolve(events) - } else { - reject(new Error(error)) - } - } } else { - resolve([]) + resolve(undefined) } }) } batchFetchEvents = async ({ fromBlock, toBlock }) => { + const batchSize = 10 const blockRange = 10000 const blockDifference = toBlock - fromBlock const chunkCount = Math.ceil(blockDifference / blockRange) const blockDenom = Math.ceil(blockDifference / chunkCount) + const chunkSize = Math.ceil(chunkCount / batchSize) - const promises = new Array(chunkCount).fill('').map( - (_, i) => - new Promise((resolve) => { - sleep(20 * i) - const batch = this.fetchEvents( - { - fromBlock: i * blockDenom + fromBlock, - toBlock: (i + 1) * blockDenom + fromBlock - }, - true + let failed = [] + let events = [] + let lastBlock = fromBlock + + for (let batchIndex = 0; batchIndex < chunkSize; batchIndex++) { + const params = new Array(batchSize).fill('').map((_, i) => { + const toBlock = (i + 1) * blockDenom + lastBlock + const fromBlock = toBlock - blockDenom + return { fromBlock, toBlock } + }) + const promises = new Array(batchSize).fill('').map( + (_, i) => + new Promise((resolve) => + sleep(i * 20).then(() => { + this.fetchEvents(params[i], true).then((batch) => { + if (!batch) { + resolve([{ isFailedBatch: true, fromBlock, toBlock }]) + } else { + resolve(batch) + } + }) + }) ) - resolve(batch) + ) + const requests = flattenNArray(await Promise.all(promises)) + const failedIndexes = requests + .filter((e) => e.isFailedBatch) + .map((e) => { + const reqIndex = requests.indexOf(e) + return params[reqIndex] }) - ) - const batchEvents = flattenNArray(await Promise.all(promises)) - const events = batchEvents.map((e) => ({ ...e.returnValues })) + failed = failed.concat(failedIndexes || []) + events = events.concat(requests.filter((e) => !e.isFailedBatch)) + lastBlock = params[batchSize - 1].toBlock + } + + if (failed.length !== 0) { + const failedReqs = failed.map((e) => this.fetchEvents(e)) + const failedBatch = flattenNArray(await Promise.all(failedReqs)) + + events = events.concat(failedBatch || []) + } + + events = events.map((e) => ({ ...e.returnValues })) + + if (events.length === 0) { + throw new Error('Failed to fetch registry events') + } return events } @@ -169,6 +199,7 @@ class RelayerRegister { for (let x = 0; x < relayerEvents.length; x++) { const { ensName, relayerAddress } = relayerEvents[x] let ensAddress + if (!isAddress(relayerAddress)) { ensAddress = await this.getENSAddress(ensName) ensAddress = toChecksumAddress(ensAddress)