Initial batch events

Removed fiber since it were deprecated for latest Node.js
This commit is contained in:
Tornado Contrib 2024-05-07 01:44:15 +00:00
parent 212e37d847
commit 0a33404eb4
Signed by: tornadocontrib
GPG Key ID: 60B4DF1A076C64B1
10 changed files with 371 additions and 21 deletions

@ -6,6 +6,8 @@ const { BigNumber } = require('ethers')
const { poseidon } = require('@tornado/circomlib')
const { decrypt } = require('eth-sig-util')
const { IndexedDB } = require('../services/idb')
const { BatchEventsService } = require('../services/batch')
const { getAllCommitments } = require('../services/graph')
const { sleep } = require('../utilities/helpers')
const { workerEvents, numbers } = require('../constants/worker')
const { ExtendedProvider } = require('../services/ether/ExtendedProvider')
@ -62,11 +64,19 @@ const initWorker = (chainId) => {
}
const setTornadoPool = (chainId, provider) => {
self.poolContract = TornadoPoolFactory.connect(POOL_CONTRACT[chainId], provider)
self.BatchEventsService = new BatchEventsService({
provider,
contract: self.poolContract
})
}
const getCommitmentBatch = async ({ blockFrom, blockTo, cachedEvents, withCache }) => {
const filter = self.poolContract.filters.NewCommitment()
const events = await self.poolContract.queryFilter(filter, blockFrom, blockTo)
const events = await self.BatchEventsService.getBatchEvents({
fromBlock: blockFrom,
toBlock: blockTo,
type: 'NewCommitment'
})
const commitmentEvents = events.map(({ blockNumber, transactionHash, args }) => ({
blockNumber,

@ -3,6 +3,8 @@ const { isEmpty } = require('lodash')
const { BigNumber } = require('ethers')
const { IndexedDB } = require('../services/idb')
const { BatchEventsService } = require('../services/batch')
const { getAllNullifiers } = require('../services/graph')
const { sleep } = require('../utilities/helpers')
const { workerEvents, numbers } = require('../constants/worker')
const { ExtendedProvider } = require('../services/ether/ExtendedProvider')
@ -48,6 +50,11 @@ const initWorker = (chainId) => {
const setTornadoPool = (chainId, provider) => {
self.poolContract = TornadoPoolFactory.connect(POOL_CONTRACT[chainId], provider)
self.BatchEventsService = new BatchEventsService({
provider,
contract: self.poolContract
})
}
const saveEvents = async ({ events }) => {
@ -123,8 +130,10 @@ const getCachedEvents = async () => {
const getNullifiers = async (blockFrom) => {
try {
const filter = self.poolContract.filters.NewNullifier()
const events = await self.poolContract.queryFilter(filter, blockFrom)
const events = await self.BatchEventsService.getBatchEvents({
fromBlock: blockFrom,
type: 'NewNullifier'
})
return events.map(({ blockNumber, transactionHash, args }) => ({
blockNumber,

15
copyFile.ts Normal file

@ -0,0 +1,15 @@
import { argv } from 'process'
import { copyFile } from 'fs'
function copyFiles() {
const [, , inFile, outFile] = argv
copyFile(inFile, outFile, function(err) {
if (err) {
throw err
}
console.log(`Copied ${inFile} to ${outFile}`)
})
}
copyFiles()

@ -10,10 +10,11 @@
"lint": "eslint --ext .js,.ts",
"lint:fix": "eslint --ext .js,.ts --quiet --fix",
"compile": "typechain --target ethers-v5 --out-dir ./_contracts './abi/*.json'",
"generate": "nuxt generate && cp dist/404.html dist/ipfs-404.html",
"copyFile": "node --loader ts-node/esm copyFile.ts",
"generate": "nuxt generate && yarn copyFile dist/404.html dist/ipfs-404.html",
"prepare": "husky install",
"ipfs:upload": "node --loader ts-node/esm ipfsUpload.ts",
"worker:compile": "nuxt generate && yarn compile:events && yarn compile:nullifier",
"worker:compile": "yarn generate && yarn compile:events && yarn compile:nullifier",
"compile:events": "babel dist/_nuxt/workers/events.worker.js --out-file static/events.worker.js",
"compile:nullifier": "babel dist/_nuxt/workers/nullifier.worker.js --out-file static/nullifier.worker.js"
},
@ -74,7 +75,6 @@
"eslint-plugin-prettier": "^3.4.0",
"eslint-plugin-promise": "^5.1.0",
"eslint-plugin-vue": "^7.16.0",
"fibers": "^5.0.0",
"form-data": "^4.0.0",
"husky": "^6.0.0",
"lint-staged": "10.2.11",

109
services/events/batch.ts Normal file

@ -0,0 +1,109 @@
import { Provider, Contract, EventLog } from "ethers";
import { sleep, getBatches } from "@/utilities";
export interface BatchEventServiceConstructor {
provider: Provider;
contract: Contract;
concurrencySize?: number;
blocksPerRequest?: number;
shouldRetry?: boolean;
retryMax?: number;
retryOn?: number;
}
export type EventInput = {
fromBlock: number;
toBlock: number;
type: string;
};
export class BatchEventsService {
provider: Provider;
contract: Contract;
concurrencySize: number;
blocksPerRequest: number;
shouldRetry: boolean;
retryMax: number;
retryOn: number;
constructor({
provider,
contract,
concurrencySize = 10,
blocksPerRequest = 2000,
shouldRetry = true,
retryMax = 5,
retryOn = 500,
}: BatchEventServiceConstructor) {
this.provider = provider;
this.contract = contract;
this.concurrencySize = concurrencySize;
this.blocksPerRequest = blocksPerRequest;
this.shouldRetry = shouldRetry;
this.retryMax = retryMax;
this.retryOn = retryOn;
}
async getPastEvents({ fromBlock, toBlock, type }: EventInput): Promise<EventLog[]> {
let err;
let retries = 0;
// eslint-disable-next-line no-unmodified-loop-condition
while ((!this.shouldRetry && retries === 0) || (this.shouldRetry && retries < this.retryMax)) {
try {
return (await this.contract.queryFilter(type, fromBlock, toBlock)) as EventLog[];
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (e: any) {
err = e;
retries++;
// If provider.getBlockNumber returned last block that isn't accepted (happened on Avalanche/Gnosis),
// get events to last accepted block
if (e.message.includes('after last accepted block')) {
const acceptedBlock = parseInt(e.message.split('after last accepted block ')[1]);
toBlock = acceptedBlock;
}
// retry on 0.5 seconds
await sleep(this.retryOn);
}
}
throw err;
}
createBatchRequest(batchArray: EventInput[]): Promise<EventLog[]>[] {
return batchArray.map(async (event: EventInput, index: number) => {
await sleep(20 * index);
return this.getPastEvents(event);
});
}
async getBatchEvents({ fromBlock, toBlock, type = '*' }: EventInput): Promise<EventLog[]> {
if (!toBlock) {
toBlock = await this.provider.getBlockNumber();
}
const eventsToSync: EventInput[] = [];
for (let i = fromBlock; i < toBlock; i += this.blocksPerRequest) {
const j = i + this.blocksPerRequest - 1 > toBlock ? toBlock : i + this.blocksPerRequest - 1;
eventsToSync.push({ fromBlock: i, toBlock: j, type } as EventInput);
}
const events = [];
const eventChunk = getBatches<EventInput>(eventsToSync, this.concurrencySize);
let chunkCount = 0;
for (const chunk of eventChunk) {
chunkCount++;
const fetchedEvents = (await Promise.all(this.createBatchRequest(chunk))).flat();
events.push(...fetchedEvents);
}
return events;
}
}

@ -8,6 +8,8 @@ import { getBridgeHelper, getBridgeProxy, getAmbBridge } from '@/contracts'
import { EventsClass, GetAffirmationParams, GetRelayedMessageParams, SaveEventsParams } from './@types'
export * from './batch'
class EventAggregator implements EventsClass {
public async getBackupedAddressFromPublicKey(publicKey: string) {
try {

@ -12,3 +12,21 @@ export type Account = {
}
export type Accounts = Account[]
export type Commitment = {
index: string
commitment: string
blockNumber: string
encryptedOutput: string
transactionHash: string
}
export type Commitments = Commitment[]
export type Nullifier = {
nullifier: string
blockNumber: string
transactionHash: string
}
export type Nullifiers = Nullifier[]

@ -5,8 +5,8 @@ import { ChainId } from '@/types'
import { numbers } from '@/constants'
import { isEmpty, toChecksumAddress } from '@/utilities'
import { Params, Accounts } from './@types'
import { _META, GET_ACCOUNTS, GET_REGISTERED } from './queries'
import { Params, Accounts, Commitments, Nullifiers } from './@types'
import { _META, GET_ACCOUNTS, GET_REGISTERED, GET_COMMITMENT, GET_NULLIFIER } from './queries'
const first = 1000
const breakLength = 900
@ -169,3 +169,164 @@ async function getMeta({ chainId }: Params) {
return undefined
}
}
export async function getCommitments({ fromBlock, chainId }: Params): Promise<{
results: Commitments,
lastSyncBlock: number
}> {
const { data } = await client.query({
context: {
chainId,
},
query: gql(GET_COMMITMENT),
variables: { first, fromBlock },
})
if (!data) {
return {
results: [],
lastSyncBlock: data._meta.block.number
}
}
return {
results: data.commitments,
lastSyncBlock: data._meta.block.number
}
}
export async function getAllCommitments({ fromBlock, chainId }: Params) {
try {
let commitments: Commitments = []
let lastSyncBlock
while (true) {
let { results, lastSyncBlock: lastBlock } = await getCommitments({ fromBlock, chainId })
lastSyncBlock = lastBlock
if (isEmpty(results)) {
break
}
if (results.length < breakLength) {
commitments = commitments.concat(results)
break
}
const [lastEvent] = results.slice(-numbers.ONE)
results = results.filter((e) => e.blockNumber !== lastEvent.blockNumber)
fromBlock = Number(lastEvent.blockNumber)
commitments = commitments.concat(results)
}
if (!commitments) {
return {
lastSyncBlock,
events: [],
}
}
const data = commitments.map((e) => ({
index: Number(e.index),
commitment: e.commitment,
blockNumber: Number(e.blockNumber),
encryptedOutput: e.encryptedOutput,
transactionHash: e.transactionHash
}))
const [lastEvent] = data.slice(-numbers.ONE)
return {
events: data,
lastSyncBlock: lastEvent?.blockNumber > lastSyncBlock ? lastEvent.blockNumber + numbers.ONE : lastSyncBlock,
}
} catch {
return {
lastSyncBlock: '',
events: [],
}
}
}
export async function getNullifiers({ fromBlock, chainId }: Params): Promise<{
results: Nullifiers,
lastSyncBlock: number
}> {
const { data } = await client.query({
context: {
chainId,
},
query: gql(GET_NULLIFIER),
variables: { first, fromBlock },
})
if (!data) {
return {
results: [],
lastSyncBlock: data._meta.block.number
}
}
return {
results: data.commitments,
lastSyncBlock: data._meta.block.number
}
}
export async function getAllNullifiers({ fromBlock, chainId }: Params) {
try {
let nullifiers: Nullifiers = []
let lastSyncBlock
while (true) {
let { results, lastSyncBlock: lastBlock } = await getNullifiers({ fromBlock, chainId })
lastSyncBlock = lastBlock
if (isEmpty(results)) {
break
}
if (results.length < breakLength) {
nullifiers = nullifiers.concat(results)
break
}
const [lastEvent] = results.slice(-numbers.ONE)
results = results.filter((e) => e.blockNumber !== lastEvent.blockNumber)
fromBlock = Number(lastEvent.blockNumber)
nullifiers = nullifiers.concat(results)
}
if (!nullifiers) {
return {
lastSyncBlock,
events: [],
}
}
const data = nullifiers.map((e) => ({
nullifier: e.nullifier,
blockNumber: Number(e.blockNumber),
transactionHash: e.transactionHash
}))
const [lastEvent] = data.slice(-numbers.ONE)
return {
events: data,
lastSyncBlock: lastEvent?.blockNumber > lastSyncBlock ? lastEvent.blockNumber + numbers.ONE : lastSyncBlock,
}
} catch {
return {
lastSyncBlock: '',
events: [],
}
}
}

@ -33,3 +33,41 @@ export const GET_REGISTERED = `
}
}
`
export const GET_COMMITMENT = `
query getCommitment($first: Int, $fromBlock: Int) {
commitments(first: $first, orderBy: blockNumber, orderDirection: asc, where: {
blockNumber_gte: $fromBlock
}) {
index
commitment
blockNumber
encryptedOutput
transactionHash
}
_meta {
block {
number
}
hasIndexingErrors
}
}
`
export const GET_NULLIFIER = `
query getNullifier($first: Int, $fromBlock: Int) {
nullifiers(first: $first, orderBy: blockNumber, orderDirection: asc, where: {
blockNumber_gte: $fromBlock
}) {
nullifier
blockNumber
transactionHash
}
_meta {
block {
number
}
hasIndexingErrors
}
}
`

@ -5108,11 +5108,6 @@ detect-indent@^5.0.0:
resolved "https://registry.yarnpkg.com/detect-indent/-/detect-indent-5.0.0.tgz#3871cc0a6a002e8c3e5b3cf7f336264675f06b9d"
integrity "sha1-OHHMCmoALow+Wzz38zYmRnXwa50= sha512-rlpvsxUtM0PQvy9iZe640/IWwWYyBsTApREbA1pHOpmOUIl9MkP/U4z7vTtg4Oaojvqhxt7sdufnT0EzGaR31g=="
detect-libc@^1.0.3:
version "1.0.3"
resolved "https://registry.yarnpkg.com/detect-libc/-/detect-libc-1.0.3.tgz#fa137c4bd698edf55cd5cd02ac559f91a4c4ba9b"
integrity "sha1-+hN8S9aY7fVc1c0CrFWfkaTEups= sha512-pGjwhsmsp4kL2RTz08wcOlGN83otlqHeD/Z5T8GXZB+/YcpQ/dgo+lbU8ZsGxV0HIvqqxo9l7mqYwyYMD9bKDg=="
devalue@^2.0.1:
version "2.0.1"
resolved "https://registry.yarnpkg.com/devalue/-/devalue-2.0.1.tgz#5d368f9adc0928e47b77eea53ca60d2f346f9762"
@ -6299,13 +6294,6 @@ ffwasm@0.0.7:
big-integer "^1.6.48"
wasmbuilder "0.0.10"
fibers@^5.0.0:
version "5.0.3"
resolved "https://registry.yarnpkg.com/fibers/-/fibers-5.0.3.tgz#2fd03acb255db66fe693d15beafbf5ae92193fd7"
integrity "sha1-L9A6yyVdtm/mk9Fb6vv1rpIZP9c= sha512-/qYTSoZydQkM21qZpGLDLuCq8c+B8KhuCQ1kLPvnRNhxhVbvrpmH9l2+Lblf5neDuEsY4bfT7LeO553TXQDvJw=="
dependencies:
detect-libc "^1.0.3"
figgy-pudding@^3.5.1:
version "3.5.2"
resolved "https://registry.yarnpkg.com/figgy-pudding/-/figgy-pudding-3.5.2.tgz#b4eee8148abb01dcf1d1ac34367d59e12fa61d6e"