event promise chaining

This commit is contained in:
gozzy 2023-04-16 02:30:43 +00:00
parent 322184baf8
commit eb59acd319
2 changed files with 124 additions and 95 deletions

@ -80,7 +80,6 @@ class EventService {
} }
return a.blockNumber - b.blockNumber return a.blockNumber - b.blockNumber
}) })
const lastBlock = allEvents[allEvents.length - 1].blockNumber const lastBlock = allEvents[allEvents.length - 1].blockNumber
this.saveEvents({ events: allEvents, lastBlock, type }) this.saveEvents({ events: allEvents, lastBlock, type })
@ -247,65 +246,60 @@ class EventService {
} }
} }
getPastEvents({ fromBlock, toBlock, type }) { getPastEvents({ fromBlock, toBlock, type }, shouldRetry = false, i = 0) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const repsonse = this.contract.getPastEvents(capitalizeFirstLetter(type), { this.contract
.getPastEvents(capitalizeFirstLetter(type), {
fromBlock, fromBlock,
toBlock toBlock
}) })
.then((events) => resolve(events))
.catch((err) => {
i = i + 1
// maximum 5 second buffer for rate-limiting
if (shouldRetry) {
const isRetry = i !== 5
if (repsonse) { sleep(1000 * i).then(() =>
resolve(repsonse) this.getPastEvents({ fromBlock, toBlock, type }, isRetry, i)
.then((events) => resolve(events))
.catch((_) => resolve(undefined))
)
} else { } else {
reject(new Error()) reject(new Error(err))
} }
}) })
})
} }
async getEventsPartFromRpc({ fromBlock, toBlock, type }, shouldRetry = false, i = 0) { async getEventsPartFromRpc(parameters, shouldRetry = false) {
try { try {
const { fromBlock, type } = parameters
const { currentBlockNumber } = await this.getBlocksDiff({ fromBlock }) const { currentBlockNumber } = await this.getBlocksDiff({ fromBlock })
if (fromBlock > currentBlockNumber) { if (fromBlock < currentBlockNumber) {
const eventsPart = await this.getPastEvents(parameters, shouldRetry)
if (eventsPart) {
if (eventsPart.length > 0) {
return { return {
events: [], events: formatEvents(eventsPart, type),
lastBlock: fromBlock lastBlock: eventsPart[eventsPart.length - 1].blockNumber
} }
}
let events = []
try {
events = await this.getPastEvents({ fromBlock, toBlock, type })
} catch (e) {
if (shouldRetry) {
i = i + 1
// maximum 5 second buffer for rate-limiting
await sleep(1000 * i)
events = await this.getEventsPartFromRpc(
{
fromBlock,
toBlock,
type
},
i !== 5,
i
)
} else { } else {
throw new Error(`Failed to fetch block ${toBlock}`)
}
}
if (!events?.length) {
return { return {
events: [], events: [],
lastBlock: fromBlock lastBlock: fromBlock
} }
} }
} else {
return undefined
}
} else {
return { return {
events: formatEvents(events, type), events: [],
lastBlock: events[events.length - 1].blockNumber lastBlock: fromBlock
}
} }
} catch (err) { } catch (err) {
return undefined return undefined
@ -315,16 +309,18 @@ class EventService {
createBatchRequest(batchArray) { createBatchRequest(batchArray) {
return batchArray.map( return batchArray.map(
(e, i) => (e, i) =>
new Promise(async (resolve) => { new Promise((resolve) =>
try { sleep(20 * i).then(() =>
sleep(20 * i) this.getEventsPartFromRpc({ ...e }, true).then((batch) => {
const { events } = await this.getEventsPartFromRpc({ ...e }, true) if (!batch) {
resolve(events) resolve([{ isFailedBatch: true, ...e }])
} catch (e) { } else {
resolve({ isFailedBatch: true, ...e }) resolve(batch.events)
} }
}) })
) )
)
)
} }
async getBatchEventsFromRpc({ fromBlock, type }) { async getBatchEventsFromRpc({ fromBlock, type }) {
@ -352,33 +348,35 @@ class EventService {
return { fromBlock, toBlock, type } return { fromBlock, toBlock, type }
}) })
const batch = await Promise.all(this.createBatchRequest(params)) const batch = await Promise.all(this.createBatchRequest(params))
const requests = flattenNArray(batch)
events = events.concat(requests.filter((e) => !e.isFailedBatch))
failed = failed.concat(requests.filter((e) => e.isFailedBatch))
lastBlock = params[batchSize - 1].toBlock lastBlock = params[batchSize - 1].toBlock
events = events.concat(batch.filter((e) => !e.isFailedBatch))
failed = failed.concat(batch.filter((e) => e.isFailedBatch))
const progressIndex = batchIndex - failed.length / batchSize const progressIndex = batchIndex - failed.length / batchSize
if (isLastBatch && failed.length !== 0) { if (isLastBatch && failed.length !== 0) {
const fbatch = await Promise.all(this.createBatchRequest(failed)) const failedBatch = await Promise.all(this.createBatchRequest(failed))
const isFailedBatch = fbatch.filter((e) => e.isFailedBatch).length !== 0 const failedReqs = flattenNArray(failedBatch)
const failedRept = failedReqs.filter((e) => e.isFailedBatch)
if (isFailedBatch) { if (failedRept.length === 0) {
throw new Error('Failed to batch events') events = events.concat(failedReqs)
} else { } else {
events = events.concat(fbatch) throw new Error('Failed to batch events')
} }
} }
await this.updateEventProgress(progressIndex / batchCount, type) await this.updateEventProgress(progressIndex / batchCount, type)
} }
events = flattenNArray(events)
return { return {
lastBlock: events[events.length - 1].blockNumber, lastBlock: events[events.length - 1].blockNumber,
events events
} }
} } else {
return undefined return undefined
}
} catch (err) { } catch (err) {
return undefined return undefined
} }

@ -28,53 +28,83 @@ class RelayerRegister {
fetchEvents = ({ fromBlock, toBlock }, shouldRetry = false) => { fetchEvents = ({ fromBlock, toBlock }, shouldRetry = false) => {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
if (fromBlock <= toBlock) { if (fromBlock <= toBlock) {
try { this.relayerRegistry
const registeredEventsPart = this.relayerRegistry.getPastEvents('RelayerRegistered', { .getPastEvents('RelayerRegistered', { fromBlock, toBlock })
fromBlock, .then((events) => resolve(events))
toBlock .catch((_) => {
})
resolve(registeredEventsPart)
} catch (error) {
if (shouldRetry) { if (shouldRetry) {
sleep(500) sleep(500).then(() =>
this.fetchEvents({ fromBlock, toBlock })
const events = this.fetchEvents({ fromBlock, toBlock }) .then((events) => resolve(events))
.catch((_) => resolve(undefined))
resolve(events) )
} else { } else {
reject(new Error(error)) resolve(undefined)
}
} }
})
} else { } else {
resolve([]) resolve(undefined)
} }
}) })
} }
batchFetchEvents = async ({ fromBlock, toBlock }) => { batchFetchEvents = async ({ fromBlock, toBlock }) => {
const batchSize = 10
const blockRange = 10000 const blockRange = 10000
const blockDifference = toBlock - fromBlock const blockDifference = toBlock - fromBlock
const chunkCount = Math.ceil(blockDifference / blockRange) const chunkCount = Math.ceil(blockDifference / blockRange)
const blockDenom = Math.ceil(blockDifference / chunkCount) const blockDenom = Math.ceil(blockDifference / chunkCount)
const chunkSize = Math.ceil(chunkCount / batchSize)
const promises = new Array(chunkCount).fill('').map( let failed = []
let events = []
let lastBlock = fromBlock
for (let batchIndex = 0; batchIndex < chunkSize; batchIndex++) {
const params = new Array(batchSize).fill('').map((_, i) => {
const toBlock = (i + 1) * blockDenom + lastBlock
const fromBlock = toBlock - blockDenom
return { fromBlock, toBlock }
})
const promises = new Array(batchSize).fill('').map(
(_, i) => (_, i) =>
new Promise((resolve) => { new Promise((resolve) =>
sleep(20 * i) sleep(i * 20).then(() => {
const batch = this.fetchEvents( this.fetchEvents(params[i], true).then((batch) => {
{ if (!batch) {
fromBlock: i * blockDenom + fromBlock, resolve([{ isFailedBatch: true, fromBlock, toBlock }])
toBlock: (i + 1) * blockDenom + fromBlock } else {
},
true
)
resolve(batch) resolve(batch)
}
})
}) })
) )
)
const requests = flattenNArray(await Promise.all(promises))
const failedIndexes = requests
.filter((e) => e.isFailedBatch)
.map((e) => {
const reqIndex = requests.indexOf(e)
return params[reqIndex]
})
const batchEvents = flattenNArray(await Promise.all(promises)) failed = failed.concat(failedIndexes || [])
const events = batchEvents.map((e) => ({ ...e.returnValues })) events = events.concat(requests.filter((e) => !e.isFailedBatch))
lastBlock = params[batchSize - 1].toBlock
}
if (failed.length !== 0) {
const failedReqs = failed.map((e) => this.fetchEvents(e))
const failedBatch = flattenNArray(await Promise.all(failedReqs))
events = events.concat(failedBatch || [])
}
events = events.map((e) => ({ ...e.returnValues }))
if (events.length === 0) {
throw new Error('Failed to fetch registry events')
}
return events return events
} }
@ -169,6 +199,7 @@ class RelayerRegister {
for (let x = 0; x < relayerEvents.length; x++) { for (let x = 0; x < relayerEvents.length; x++) {
const { ensName, relayerAddress } = relayerEvents[x] const { ensName, relayerAddress } = relayerEvents[x]
let ensAddress let ensAddress
if (!isAddress(relayerAddress)) { if (!isAddress(relayerAddress)) {
ensAddress = await this.getENSAddress(ensName) ensAddress = await this.getENSAddress(ensName)
ensAddress = toChecksumAddress(ensAddress) ensAddress = toChecksumAddress(ensAddress)