From 61c2865c462dbbbc621f531f5da554e9c288cabc Mon Sep 17 00:00:00 2001 From: gozzy Date: Fri, 11 Nov 2022 02:01:46 +0000 Subject: [PATCH] parallel rpc batching --- services/events.js | 125 +++++++++++++++++++++++++---------------- services/merkleTree.js | 4 +- store/application.js | 2 +- utils/index.js | 6 ++ 4 files changed, 85 insertions(+), 52 deletions(-) diff --git a/services/events.js b/services/events.js index 7ee6133..96891ee 100644 --- a/services/events.js +++ b/services/events.js @@ -5,17 +5,10 @@ import { download } from '@/store/snark' import networkConfig from '@/networkConfig' import InstanceABI from '@/abis/Instance.abi.json' import { CONTRACT_INSTANCES, eventsType } from '@/constants' -import { sleep, formatEvents, capitalizeFirstLetter } from '@/utils' +import { sleep, flattenNArray, formatEvents, capitalizeFirstLetter } from '@/utils' const supportedCaches = ['1', '56', '100', '137'] -let store -if (process.browser) { - window.onNuxtReady(({ $store }) => { - store = $store - }) -} - class EventService { constructor({ netId, amount, currency, factoryMethods }) { this.idb = window.$nuxt.$indexedDB(netId) @@ -237,7 +230,7 @@ class EventService { } } - async getEventsPartFromRpc({ fromBlock, toBlock, type }) { + async getEventsPartFromRpc({ fromBlock, toBlock, type }, shouldRetry = false, i = 0) { try { const { currentBlockNumber } = await this.getBlocksDiff({ fromBlock }) @@ -248,11 +241,43 @@ class EventService { } } - const events = await this.contract.getPastEvents(capitalizeFirstLetter(type), { - fromBlock, - toBlock + const rpcRequest = new Promise((resolve, reject) => { + const repsonse = this.contract.getPastEvents(capitalizeFirstLetter(type), { + fromBlock, + toBlock + }) + + if (repsonse) { + resolve(repsonse) + } else { + reject(new Error()) + } }) + let events = [] + + if (shouldRetry) { + i = i + 1 + + try { + events = await Promise.resolve(rpcRequest) + } catch (e) { + await sleep(200) + + events = await this.getEventsPartFromRpc( + { + fromBlock, + toBlock, + type + }, + i !== 5, + i + ) + } + } else { + events = await Promise.resolve(rpcRequest) + } + if (!events?.length) { return { events: [], @@ -268,55 +293,56 @@ class EventService { } } + createBatchRequest({ batchIndex, batchSize, batchBlocks, blockDenom, type }) { + return new Array(batchSize).fill('').map( + (e, i) => + new Promise(async (resolve) => { + const toBlock = batchBlocks[batchIndex * batchSize + i] + const fromBlock = toBlock - blockDenom + + const batchEvents = await this.getEventsPartFromRpc( + { + fromBlock, + toBlock, + type + }, + true + ) + + resolve(batchEvents.events) + }) + ) + } + async getBatchEventsFromRpc({ fromBlock, type }) { try { + const batchSize = 10 const blockRange = 10000 const { blockDifference, currentBlockNumber } = await this.getBlocksDiff({ fromBlock }) - let numberParts = blockDifference === 0 ? 1 : Math.ceil(blockDifference / blockRange) - const part = Math.ceil(blockDifference / numberParts) + const batchDigest = blockDifference === 0 ? 1 : Math.ceil(blockDifference / blockRange) + const blockDenom = Math.ceil(blockDifference / batchDigest) + const batchCount = Math.ceil(batchDigest / batchSize) + + const blocks = new Array(batchCount * batchSize).fill('') + const batchBlocks = blocks.map((e, i) => (i + 1) * blockDenom + fromBlock) let events = [] - let loadedBlocks = 0 - let toBlock = fromBlock + part if (fromBlock < currentBlockNumber) { - if (toBlock >= currentBlockNumber) { - toBlock = 'latest' - numberParts = 1 - } - if (store.state.loading.progress !== 98) { - store.dispatch('loading/updateProgress', { message: 'Fetching the past events', progress: 0 }) + for (let batchIndex = 0; batchIndex < batchCount; batchIndex++) { + const batch = await Promise.all( + this.createBatchRequest({ batchIndex, batchBlocks, blockDenom, batchSize, type }) + ) + + events = events.concat(batch) } - for (let i = 0; i < numberParts; i++) { - try { - await sleep(200) - const partOfEvents = await this.getEventsPartFromRpc({ fromBlock, toBlock, type }) - if (partOfEvents) { - events = events.concat(partOfEvents.events) - } - loadedBlocks += toBlock - fromBlock - fromBlock = toBlock - toBlock += part + events = flattenNArray(events) - const progressInt = parseInt((loadedBlocks / blockDifference) * 100) - console.log('Progress: ', progressInt) - if (store.state.loading.progress !== 98) { - store.dispatch('loading/updateProgress', { - message: 'Fetching the past events', - progress: progressInt === 100 ? 98 : progressInt - }) - } - } catch { - numberParts = numberParts + 1 - } - } - if (events.length) { - return { - events, - lastBlock: toBlock === 'latest' ? currentBlockNumber : toBlock - } + return { + lastBlock: events[events.length - 1].blockNumber, + events } } return undefined @@ -328,6 +354,7 @@ class EventService { async getEventsFromRpc({ fromBlock, type }) { try { const { blockDifference } = await this.getBlocksDiff({ fromBlock }) + let events if (blockDifference < 10000) { diff --git a/services/merkleTree.js b/services/merkleTree.js index 7e1edae..6805ad9 100644 --- a/services/merkleTree.js +++ b/services/merkleTree.js @@ -22,12 +22,12 @@ class MerkleTreeService { commitment, instanceName, fileFolder: 'trees', - fileName: `deposits_${currency}_${amount}_bloom.json.zip` + fileName: `deposits_${currency}_${amount}_bloom.json.gz` }) } getFileName(partNumber = trees.PARTS_COUNT) { - return `trees/deposits_${this.currency}_${this.amount}_slice${partNumber}.json.zip` + return `trees/deposits_${this.currency}_${this.amount}_slice${partNumber}.json.gz` } createTree({ events }) { diff --git a/store/application.js b/store/application.js index 7415345..84ed4fa 100644 --- a/store/application.js +++ b/store/application.js @@ -358,7 +358,7 @@ const actions = { try { const module = await download({ contentType: 'string', - name: `events/encrypted_notes_${netId}.json.zip` + name: `events/encrypted_notes_${netId}.json.gz` }) if (module) { diff --git a/utils/index.js b/utils/index.js index df30fec..566b88b 100644 --- a/utils/index.js +++ b/utils/index.js @@ -8,6 +8,12 @@ export * from './stringUtils' export * from './numberUtils' export * from './instanceUtils' +export function flattenNArray(arr) { + return arr.reduce((flat, toFlatten) => { + return flat.concat(Array.isArray(toFlatten) ? flattenNArray(toFlatten) : toFlatten) + }, []) +} + export function sleep(ms) { return new Promise((resolve) => setTimeout(resolve, ms)) }