From 0a33404eb4e3378ada6677cfa57a70c59ab6cb3f Mon Sep 17 00:00:00 2001 From: Tornado Contrib Date: Tue, 7 May 2024 01:44:15 +0000 Subject: [PATCH] Initial batch events Removed fiber since it were deprecated for latest Node.js --- assets/events.worker.js | 14 ++- assets/nullifier.worker.js | 13 ++- copyFile.ts | 15 +++ package.json | 6 +- services/events/batch.ts | 109 ++++++++++++++++++++++ services/events/index.ts | 2 + services/graph/@types/index.ts | 18 ++++ services/graph/index.ts | 165 ++++++++++++++++++++++++++++++++- services/graph/queries.ts | 38 ++++++++ yarn.lock | 12 --- 10 files changed, 371 insertions(+), 21 deletions(-) create mode 100644 copyFile.ts create mode 100644 services/events/batch.ts diff --git a/assets/events.worker.js b/assets/events.worker.js index 8660cb0..5718d0b 100644 --- a/assets/events.worker.js +++ b/assets/events.worker.js @@ -6,6 +6,8 @@ const { BigNumber } = require('ethers') const { poseidon } = require('@tornado/circomlib') const { decrypt } = require('eth-sig-util') const { IndexedDB } = require('../services/idb') +const { BatchEventsService } = require('../services/batch') +const { getAllCommitments } = require('../services/graph') const { sleep } = require('../utilities/helpers') const { workerEvents, numbers } = require('../constants/worker') const { ExtendedProvider } = require('../services/ether/ExtendedProvider') @@ -62,11 +64,19 @@ const initWorker = (chainId) => { } const setTornadoPool = (chainId, provider) => { self.poolContract = TornadoPoolFactory.connect(POOL_CONTRACT[chainId], provider) + + self.BatchEventsService = new BatchEventsService({ + provider, + contract: self.poolContract + }) } const getCommitmentBatch = async ({ blockFrom, blockTo, cachedEvents, withCache }) => { - const filter = self.poolContract.filters.NewCommitment() - const events = await self.poolContract.queryFilter(filter, blockFrom, blockTo) + const events = await self.BatchEventsService.getBatchEvents({ + fromBlock: blockFrom, + toBlock: blockTo, + type: 'NewCommitment' + }) const commitmentEvents = events.map(({ blockNumber, transactionHash, args }) => ({ blockNumber, diff --git a/assets/nullifier.worker.js b/assets/nullifier.worker.js index e3f6cf2..0b95b3b 100644 --- a/assets/nullifier.worker.js +++ b/assets/nullifier.worker.js @@ -3,6 +3,8 @@ const { isEmpty } = require('lodash') const { BigNumber } = require('ethers') const { IndexedDB } = require('../services/idb') +const { BatchEventsService } = require('../services/batch') +const { getAllNullifiers } = require('../services/graph') const { sleep } = require('../utilities/helpers') const { workerEvents, numbers } = require('../constants/worker') const { ExtendedProvider } = require('../services/ether/ExtendedProvider') @@ -48,6 +50,11 @@ const initWorker = (chainId) => { const setTornadoPool = (chainId, provider) => { self.poolContract = TornadoPoolFactory.connect(POOL_CONTRACT[chainId], provider) + + self.BatchEventsService = new BatchEventsService({ + provider, + contract: self.poolContract + }) } const saveEvents = async ({ events }) => { @@ -123,8 +130,10 @@ const getCachedEvents = async () => { const getNullifiers = async (blockFrom) => { try { - const filter = self.poolContract.filters.NewNullifier() - const events = await self.poolContract.queryFilter(filter, blockFrom) + const events = await self.BatchEventsService.getBatchEvents({ + fromBlock: blockFrom, + type: 'NewNullifier' + }) return events.map(({ blockNumber, transactionHash, args }) => ({ blockNumber, diff --git a/copyFile.ts b/copyFile.ts new file mode 100644 index 0000000..ca38712 --- /dev/null +++ b/copyFile.ts @@ -0,0 +1,15 @@ +import { argv } from 'process' +import { copyFile } from 'fs' + +function copyFiles() { + const [, , inFile, outFile] = argv + + copyFile(inFile, outFile, function(err) { + if (err) { + throw err + } + + console.log(`Copied ${inFile} to ${outFile}`) + }) +} +copyFiles() diff --git a/package.json b/package.json index 00284c2..803f37e 100644 --- a/package.json +++ b/package.json @@ -10,10 +10,11 @@ "lint": "eslint --ext .js,.ts", "lint:fix": "eslint --ext .js,.ts --quiet --fix", "compile": "typechain --target ethers-v5 --out-dir ./_contracts './abi/*.json'", - "generate": "nuxt generate && cp dist/404.html dist/ipfs-404.html", + "copyFile": "node --loader ts-node/esm copyFile.ts", + "generate": "nuxt generate && yarn copyFile dist/404.html dist/ipfs-404.html", "prepare": "husky install", "ipfs:upload": "node --loader ts-node/esm ipfsUpload.ts", - "worker:compile": "nuxt generate && yarn compile:events && yarn compile:nullifier", + "worker:compile": "yarn generate && yarn compile:events && yarn compile:nullifier", "compile:events": "babel dist/_nuxt/workers/events.worker.js --out-file static/events.worker.js", "compile:nullifier": "babel dist/_nuxt/workers/nullifier.worker.js --out-file static/nullifier.worker.js" }, @@ -74,7 +75,6 @@ "eslint-plugin-prettier": "^3.4.0", "eslint-plugin-promise": "^5.1.0", "eslint-plugin-vue": "^7.16.0", - "fibers": "^5.0.0", "form-data": "^4.0.0", "husky": "^6.0.0", "lint-staged": "10.2.11", diff --git a/services/events/batch.ts b/services/events/batch.ts new file mode 100644 index 0000000..0925c75 --- /dev/null +++ b/services/events/batch.ts @@ -0,0 +1,109 @@ +import { Provider, Contract, EventLog } from "ethers"; +import { sleep, getBatches } from "@/utilities"; + +export interface BatchEventServiceConstructor { + provider: Provider; + contract: Contract; + concurrencySize?: number; + blocksPerRequest?: number; + shouldRetry?: boolean; + retryMax?: number; + retryOn?: number; +} + +export type EventInput = { + fromBlock: number; + toBlock: number; + type: string; +}; + +export class BatchEventsService { + provider: Provider; + contract: Contract; + concurrencySize: number; + blocksPerRequest: number; + shouldRetry: boolean; + retryMax: number; + retryOn: number; + constructor({ + provider, + contract, + concurrencySize = 10, + blocksPerRequest = 2000, + shouldRetry = true, + retryMax = 5, + retryOn = 500, + }: BatchEventServiceConstructor) { + this.provider = provider; + this.contract = contract; + this.concurrencySize = concurrencySize; + this.blocksPerRequest = blocksPerRequest; + this.shouldRetry = shouldRetry; + this.retryMax = retryMax; + this.retryOn = retryOn; + } + + async getPastEvents({ fromBlock, toBlock, type }: EventInput): Promise { + let err; + let retries = 0; + + // eslint-disable-next-line no-unmodified-loop-condition + while ((!this.shouldRetry && retries === 0) || (this.shouldRetry && retries < this.retryMax)) { + try { + return (await this.contract.queryFilter(type, fromBlock, toBlock)) as EventLog[]; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } catch (e: any) { + err = e; + retries++; + + // If provider.getBlockNumber returned last block that isn't accepted (happened on Avalanche/Gnosis), + // get events to last accepted block + if (e.message.includes('after last accepted block')) { + const acceptedBlock = parseInt(e.message.split('after last accepted block ')[1]); + toBlock = acceptedBlock; + } + + // retry on 0.5 seconds + await sleep(this.retryOn); + } + } + + throw err; + } + + createBatchRequest(batchArray: EventInput[]): Promise[] { + return batchArray.map(async (event: EventInput, index: number) => { + await sleep(20 * index); + + return this.getPastEvents(event); + }); + } + + async getBatchEvents({ fromBlock, toBlock, type = '*' }: EventInput): Promise { + if (!toBlock) { + toBlock = await this.provider.getBlockNumber(); + } + + const eventsToSync: EventInput[] = []; + + for (let i = fromBlock; i < toBlock; i += this.blocksPerRequest) { + const j = i + this.blocksPerRequest - 1 > toBlock ? toBlock : i + this.blocksPerRequest - 1; + + eventsToSync.push({ fromBlock: i, toBlock: j, type } as EventInput); + } + + const events = []; + const eventChunk = getBatches(eventsToSync, this.concurrencySize); + + let chunkCount = 0; + + for (const chunk of eventChunk) { + chunkCount++; + + const fetchedEvents = (await Promise.all(this.createBatchRequest(chunk))).flat(); + events.push(...fetchedEvents); + } + + return events; + } +} diff --git a/services/events/index.ts b/services/events/index.ts index c365c51..b0c10cb 100644 --- a/services/events/index.ts +++ b/services/events/index.ts @@ -8,6 +8,8 @@ import { getBridgeHelper, getBridgeProxy, getAmbBridge } from '@/contracts' import { EventsClass, GetAffirmationParams, GetRelayedMessageParams, SaveEventsParams } from './@types' +export * from './batch' + class EventAggregator implements EventsClass { public async getBackupedAddressFromPublicKey(publicKey: string) { try { diff --git a/services/graph/@types/index.ts b/services/graph/@types/index.ts index 64c6558..5432e6d 100644 --- a/services/graph/@types/index.ts +++ b/services/graph/@types/index.ts @@ -12,3 +12,21 @@ export type Account = { } export type Accounts = Account[] + +export type Commitment = { + index: string + commitment: string + blockNumber: string + encryptedOutput: string + transactionHash: string +} + +export type Commitments = Commitment[] + +export type Nullifier = { + nullifier: string + blockNumber: string + transactionHash: string +} + +export type Nullifiers = Nullifier[] \ No newline at end of file diff --git a/services/graph/index.ts b/services/graph/index.ts index b176a97..72327fc 100644 --- a/services/graph/index.ts +++ b/services/graph/index.ts @@ -5,8 +5,8 @@ import { ChainId } from '@/types' import { numbers } from '@/constants' import { isEmpty, toChecksumAddress } from '@/utilities' -import { Params, Accounts } from './@types' -import { _META, GET_ACCOUNTS, GET_REGISTERED } from './queries' +import { Params, Accounts, Commitments, Nullifiers } from './@types' +import { _META, GET_ACCOUNTS, GET_REGISTERED, GET_COMMITMENT, GET_NULLIFIER } from './queries' const first = 1000 const breakLength = 900 @@ -169,3 +169,164 @@ async function getMeta({ chainId }: Params) { return undefined } } + + +export async function getCommitments({ fromBlock, chainId }: Params): Promise<{ + results: Commitments, + lastSyncBlock: number +}> { + const { data } = await client.query({ + context: { + chainId, + }, + query: gql(GET_COMMITMENT), + variables: { first, fromBlock }, + }) + + if (!data) { + return { + results: [], + lastSyncBlock: data._meta.block.number + } + } + + return { + results: data.commitments, + lastSyncBlock: data._meta.block.number + } +} + +export async function getAllCommitments({ fromBlock, chainId }: Params) { + try { + let commitments: Commitments = [] + let lastSyncBlock + + while (true) { + let { results, lastSyncBlock: lastBlock } = await getCommitments({ fromBlock, chainId }) + + lastSyncBlock = lastBlock + + if (isEmpty(results)) { + break + } + + if (results.length < breakLength) { + commitments = commitments.concat(results) + break + } + + const [lastEvent] = results.slice(-numbers.ONE) + + results = results.filter((e) => e.blockNumber !== lastEvent.blockNumber) + fromBlock = Number(lastEvent.blockNumber) + + commitments = commitments.concat(results) + } + + if (!commitments) { + return { + lastSyncBlock, + events: [], + } + } + + const data = commitments.map((e) => ({ + index: Number(e.index), + commitment: e.commitment, + blockNumber: Number(e.blockNumber), + encryptedOutput: e.encryptedOutput, + transactionHash: e.transactionHash + })) + + const [lastEvent] = data.slice(-numbers.ONE) + + return { + events: data, + lastSyncBlock: lastEvent?.blockNumber > lastSyncBlock ? lastEvent.blockNumber + numbers.ONE : lastSyncBlock, + } + } catch { + return { + lastSyncBlock: '', + events: [], + } + } +} + +export async function getNullifiers({ fromBlock, chainId }: Params): Promise<{ + results: Nullifiers, + lastSyncBlock: number +}> { + const { data } = await client.query({ + context: { + chainId, + }, + query: gql(GET_NULLIFIER), + variables: { first, fromBlock }, + }) + + if (!data) { + return { + results: [], + lastSyncBlock: data._meta.block.number + } + } + + return { + results: data.commitments, + lastSyncBlock: data._meta.block.number + } +} + +export async function getAllNullifiers({ fromBlock, chainId }: Params) { + try { + let nullifiers: Nullifiers = [] + let lastSyncBlock + + while (true) { + let { results, lastSyncBlock: lastBlock } = await getNullifiers({ fromBlock, chainId }) + + lastSyncBlock = lastBlock + + if (isEmpty(results)) { + break + } + + if (results.length < breakLength) { + nullifiers = nullifiers.concat(results) + break + } + + const [lastEvent] = results.slice(-numbers.ONE) + + results = results.filter((e) => e.blockNumber !== lastEvent.blockNumber) + fromBlock = Number(lastEvent.blockNumber) + + nullifiers = nullifiers.concat(results) + } + + if (!nullifiers) { + return { + lastSyncBlock, + events: [], + } + } + + const data = nullifiers.map((e) => ({ + nullifier: e.nullifier, + blockNumber: Number(e.blockNumber), + transactionHash: e.transactionHash + })) + + const [lastEvent] = data.slice(-numbers.ONE) + + return { + events: data, + lastSyncBlock: lastEvent?.blockNumber > lastSyncBlock ? lastEvent.blockNumber + numbers.ONE : lastSyncBlock, + } + } catch { + return { + lastSyncBlock: '', + events: [], + } + } +} \ No newline at end of file diff --git a/services/graph/queries.ts b/services/graph/queries.ts index a5de7f7..6ea55b3 100644 --- a/services/graph/queries.ts +++ b/services/graph/queries.ts @@ -33,3 +33,41 @@ export const GET_REGISTERED = ` } } ` + +export const GET_COMMITMENT = ` + query getCommitment($first: Int, $fromBlock: Int) { + commitments(first: $first, orderBy: blockNumber, orderDirection: asc, where: { + blockNumber_gte: $fromBlock + }) { + index + commitment + blockNumber + encryptedOutput + transactionHash + } + _meta { + block { + number + } + hasIndexingErrors + } + } +` + +export const GET_NULLIFIER = ` + query getNullifier($first: Int, $fromBlock: Int) { + nullifiers(first: $first, orderBy: blockNumber, orderDirection: asc, where: { + blockNumber_gte: $fromBlock + }) { + nullifier + blockNumber + transactionHash + } + _meta { + block { + number + } + hasIndexingErrors + } + } +` \ No newline at end of file diff --git a/yarn.lock b/yarn.lock index 93283a7..3087b54 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5108,11 +5108,6 @@ detect-indent@^5.0.0: resolved "https://registry.yarnpkg.com/detect-indent/-/detect-indent-5.0.0.tgz#3871cc0a6a002e8c3e5b3cf7f336264675f06b9d" integrity "sha1-OHHMCmoALow+Wzz38zYmRnXwa50= sha512-rlpvsxUtM0PQvy9iZe640/IWwWYyBsTApREbA1pHOpmOUIl9MkP/U4z7vTtg4Oaojvqhxt7sdufnT0EzGaR31g==" -detect-libc@^1.0.3: - version "1.0.3" - resolved "https://registry.yarnpkg.com/detect-libc/-/detect-libc-1.0.3.tgz#fa137c4bd698edf55cd5cd02ac559f91a4c4ba9b" - integrity "sha1-+hN8S9aY7fVc1c0CrFWfkaTEups= sha512-pGjwhsmsp4kL2RTz08wcOlGN83otlqHeD/Z5T8GXZB+/YcpQ/dgo+lbU8ZsGxV0HIvqqxo9l7mqYwyYMD9bKDg==" - devalue@^2.0.1: version "2.0.1" resolved "https://registry.yarnpkg.com/devalue/-/devalue-2.0.1.tgz#5d368f9adc0928e47b77eea53ca60d2f346f9762" @@ -6299,13 +6294,6 @@ ffwasm@0.0.7: big-integer "^1.6.48" wasmbuilder "0.0.10" -fibers@^5.0.0: - version "5.0.3" - resolved "https://registry.yarnpkg.com/fibers/-/fibers-5.0.3.tgz#2fd03acb255db66fe693d15beafbf5ae92193fd7" - integrity "sha1-L9A6yyVdtm/mk9Fb6vv1rpIZP9c= sha512-/qYTSoZydQkM21qZpGLDLuCq8c+B8KhuCQ1kLPvnRNhxhVbvrpmH9l2+Lblf5neDuEsY4bfT7LeO553TXQDvJw==" - dependencies: - detect-libc "^1.0.3" - figgy-pudding@^3.5.1: version "3.5.2" resolved "https://registry.yarnpkg.com/figgy-pudding/-/figgy-pudding-3.5.2.tgz#b4eee8148abb01dcf1d1ac34367d59e12fa61d6e"