parallel rpc batching

This commit is contained in:
gozzy 2022-11-11 02:01:46 +00:00
parent 85a222d93a
commit 61c2865c46
4 changed files with 85 additions and 52 deletions

@ -5,17 +5,10 @@ import { download } from '@/store/snark'
import networkConfig from '@/networkConfig'
import InstanceABI from '@/abis/Instance.abi.json'
import { CONTRACT_INSTANCES, eventsType } from '@/constants'
import { sleep, formatEvents, capitalizeFirstLetter } from '@/utils'
import { sleep, flattenNArray, formatEvents, capitalizeFirstLetter } from '@/utils'
const supportedCaches = ['1', '56', '100', '137']
let store
if (process.browser) {
window.onNuxtReady(({ $store }) => {
store = $store
})
}
class EventService {
constructor({ netId, amount, currency, factoryMethods }) {
this.idb = window.$nuxt.$indexedDB(netId)
@ -237,7 +230,7 @@ class EventService {
}
}
async getEventsPartFromRpc({ fromBlock, toBlock, type }) {
async getEventsPartFromRpc({ fromBlock, toBlock, type }, shouldRetry = false, i = 0) {
try {
const { currentBlockNumber } = await this.getBlocksDiff({ fromBlock })
@ -248,11 +241,43 @@ class EventService {
}
}
const events = await this.contract.getPastEvents(capitalizeFirstLetter(type), {
fromBlock,
toBlock
const rpcRequest = new Promise((resolve, reject) => {
const repsonse = this.contract.getPastEvents(capitalizeFirstLetter(type), {
fromBlock,
toBlock
})
if (repsonse) {
resolve(repsonse)
} else {
reject(new Error())
}
})
let events = []
if (shouldRetry) {
i = i + 1
try {
events = await Promise.resolve(rpcRequest)
} catch (e) {
await sleep(200)
events = await this.getEventsPartFromRpc(
{
fromBlock,
toBlock,
type
},
i !== 5,
i
)
}
} else {
events = await Promise.resolve(rpcRequest)
}
if (!events?.length) {
return {
events: [],
@ -268,55 +293,56 @@ class EventService {
}
}
createBatchRequest({ batchIndex, batchSize, batchBlocks, blockDenom, type }) {
return new Array(batchSize).fill('').map(
(e, i) =>
new Promise(async (resolve) => {
const toBlock = batchBlocks[batchIndex * batchSize + i]
const fromBlock = toBlock - blockDenom
const batchEvents = await this.getEventsPartFromRpc(
{
fromBlock,
toBlock,
type
},
true
)
resolve(batchEvents.events)
})
)
}
async getBatchEventsFromRpc({ fromBlock, type }) {
try {
const batchSize = 10
const blockRange = 10000
const { blockDifference, currentBlockNumber } = await this.getBlocksDiff({ fromBlock })
let numberParts = blockDifference === 0 ? 1 : Math.ceil(blockDifference / blockRange)
const part = Math.ceil(blockDifference / numberParts)
const batchDigest = blockDifference === 0 ? 1 : Math.ceil(blockDifference / blockRange)
const blockDenom = Math.ceil(blockDifference / batchDigest)
const batchCount = Math.ceil(batchDigest / batchSize)
const blocks = new Array(batchCount * batchSize).fill('')
const batchBlocks = blocks.map((e, i) => (i + 1) * blockDenom + fromBlock)
let events = []
let loadedBlocks = 0
let toBlock = fromBlock + part
if (fromBlock < currentBlockNumber) {
if (toBlock >= currentBlockNumber) {
toBlock = 'latest'
numberParts = 1
}
if (store.state.loading.progress !== 98) {
store.dispatch('loading/updateProgress', { message: 'Fetching the past events', progress: 0 })
for (let batchIndex = 0; batchIndex < batchCount; batchIndex++) {
const batch = await Promise.all(
this.createBatchRequest({ batchIndex, batchBlocks, blockDenom, batchSize, type })
)
events = events.concat(batch)
}
for (let i = 0; i < numberParts; i++) {
try {
await sleep(200)
const partOfEvents = await this.getEventsPartFromRpc({ fromBlock, toBlock, type })
if (partOfEvents) {
events = events.concat(partOfEvents.events)
}
loadedBlocks += toBlock - fromBlock
fromBlock = toBlock
toBlock += part
events = flattenNArray(events)
const progressInt = parseInt((loadedBlocks / blockDifference) * 100)
console.log('Progress: ', progressInt)
if (store.state.loading.progress !== 98) {
store.dispatch('loading/updateProgress', {
message: 'Fetching the past events',
progress: progressInt === 100 ? 98 : progressInt
})
}
} catch {
numberParts = numberParts + 1
}
}
if (events.length) {
return {
events,
lastBlock: toBlock === 'latest' ? currentBlockNumber : toBlock
}
return {
lastBlock: events[events.length - 1].blockNumber,
events
}
}
return undefined
@ -328,6 +354,7 @@ class EventService {
async getEventsFromRpc({ fromBlock, type }) {
try {
const { blockDifference } = await this.getBlocksDiff({ fromBlock })
let events
if (blockDifference < 10000) {

@ -22,12 +22,12 @@ class MerkleTreeService {
commitment,
instanceName,
fileFolder: 'trees',
fileName: `deposits_${currency}_${amount}_bloom.json.zip`
fileName: `deposits_${currency}_${amount}_bloom.json.gz`
})
}
getFileName(partNumber = trees.PARTS_COUNT) {
return `trees/deposits_${this.currency}_${this.amount}_slice${partNumber}.json.zip`
return `trees/deposits_${this.currency}_${this.amount}_slice${partNumber}.json.gz`
}
createTree({ events }) {

@ -358,7 +358,7 @@ const actions = {
try {
const module = await download({
contentType: 'string',
name: `events/encrypted_notes_${netId}.json.zip`
name: `events/encrypted_notes_${netId}.json.gz`
})
if (module) {

@ -8,6 +8,12 @@ export * from './stringUtils'
export * from './numberUtils'
export * from './instanceUtils'
export function flattenNArray(arr) {
return arr.reduce((flat, toFlatten) => {
return flat.concat(Array.isArray(toFlatten) ? flattenNArray(toFlatten) : toFlatten)
}, [])
}
export function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms))
}