2023.04.28: Check HISTORY.md for more info

Signed-off-by: T-Hax <>
This commit is contained in:
T-Hax 2023-04-27 17:34:22 +00:00
parent a27fff1974
commit ddb47e423a
17 changed files with 487 additions and 475 deletions

@ -1,16 +1,13 @@
# All of these are used for tests
# If someone is using the SDK, there is no reason to use .env
# RPC URLs
ETH_MAINNET_TEST_RPC=
## Test behaviour
# Debug (Whether to log debug events)
# debug (debug events are logged to console)
DEBUG=
# Tor
# Torify tests (need to make possible on each still)
# use tor (torify tests)
TORIFY=
# Tor port (regular = 9050, browser = 9150)
# tor port (regular = 9050, browser = 9150)
TOR_PORT=
# RPCs
ETH_MAINNET_TEST_RPC=
# relayer DOMAIN (the example.xyz in https://example.xyz) for testing
TEST_RELAYER_DOMAIN=

@ -1,5 +1,16 @@
# History
### 2023.04.28 (2023-04-28)
Did:
* Had to run those "few more tests". Finally done with those, there was a promise not being resolved which ended up being test listeners. Noted.
* Synchronization is now abstracted into the inheritable Synchronizer which will be reused for different things, for example relayer registry operations next.
Next:
* MONOREPO!!!!!!!!!!!!!!! STARTING TODAY
### 2023.04.23 (2023-04-23)
Did:

@ -10,7 +10,7 @@
"zk"
],
"private": false,
"version": "2023.04.23",
"version": "2023.04.28",
"engines": {
"node": "^18"
},

@ -1,6 +1,19 @@
// Types
import { MarkOptional } from 'ts-essentials'
import * as Types from 'types/sdk/chain'
// Externals types
import { MarkOptional, DeepRequired } from 'ts-essentials'
// External modules
import EventEmitter from 'events'
import { randomBytes } from 'crypto'
import { TransactionRequest } from '@ethersproject/abstract-provider'
import {
EventFilter,
BaseContract,
BigNumber,
ContractTransaction,
providers,
Signer,
VoidSigner
} from 'ethers'
// Our local types
import {
@ -15,15 +28,14 @@ import {
Multicall3Contract__factory
} from 'types/deth'
import { Multicall3 } from 'types/deth/Multicall3Contract'
// External imports
import { TransactionRequest } from '@ethersproject/abstract-provider'
import { BaseContract, BigNumber, ContractTransaction, providers, Signer, VoidSigner } from 'ethers'
import { randomBytes } from 'crypto'
import { TornadoContracts, Options } from 'types/sdk/chain'
// Local modules
import { Onchain } from 'lib/data'
import { ErrorUtils, HexUtils } from 'lib/utils'
import { Onchain, Cache, Docs } from 'lib/data'
import { ErrorUtils, HexUtils, AsyncUtils } from 'lib/utils'
// @ts-ignore
import { parseIndexableString } from 'pouchdb-collate'
// We use a vanilla provider here, but in reality we will probably
// add a censorship-checking custom derivative of it
@ -127,7 +139,7 @@ export class Chain {
* This is Tornado-specific.
*/
export namespace Contracts {
function _getContract<C extends Types.Contracts.TornadoContracts>(
function _getContract<C extends TornadoContracts>(
name: string,
address: string,
signerOrProvider: Signer | Provider
@ -199,3 +211,98 @@ export namespace Contracts {
return contractMap.get(key) as ERC20
}
}
export abstract class Synchronizer extends EventEmitter {
async sync(
event: EventFilter,
contract: BaseContract,
cache: Cache.Syncable<Docs.Base>,
options?: Options.Sync
): Promise<void> {
const _options = await this._populateSyncOptions(options)
// Assign pooler
cache.initializePooler(cache.getCallbacks(contract), cache.getErrorHandlers(), _options.concurrencyLimit)
// Decide whether we have a latest block
const numEntries = (await cache.db.info()).doc_count
// Check for synced blocks
if (0 < numEntries) {
const [lastSyncedBlock, ,] = parseIndexableString(
(await cache.db.allDocs({ descending: true, limit: 1 })).rows[0].id
)
_options.startBlock = lastSyncedBlock < _options.startBlock ? _options.startBlock : lastSyncedBlock
_options.blockDelta = Math.floor((_options.targetBlock - _options.startBlock) / _options.blockDivisor)
}
// Start synchronizing
let dbPromises = []
this.emit('debug', _options.startBlock, _options.targetBlock, _options.blockDelta)
this.emit('sync', 'syncing')
for (
let currentBlock = _options.startBlock,
blockDelta = _options.blockDelta,
targetBlock = _options.targetBlock,
concurrencyLimit = _options.concurrencyLimit;
currentBlock < targetBlock;
currentBlock += blockDelta
) {
if (cache.pooler!.pending < concurrencyLimit) {
const sum = currentBlock + blockDelta
await AsyncUtils.timeout(_options.msTimeout)
if (currentBlock + blockDelta < targetBlock) {
await cache.pooler!.pool(currentBlock, sum)
} else {
await cache.pooler!.pool(currentBlock, sum - (sum % targetBlock))
}
this.emit('debug', currentBlock++, sum)
} else {
let res: Array<any> = await cache.pooler!.race()
if (res.length != 0)
dbPromises.push(
cache.db.bulkDocs(res.map((el) => cache.buildDoc(el))).catch((err) => {
throw ErrorUtils.ensureError(err)
})
)
currentBlock -= blockDelta
}
}
this.emit('sync', 'synced')
// Immediately start listening if we're doing this
if (_options.listenForEvents) {
contract = contract.on(event, (...eventArgs) => {
this.emit(cache.name, 'received', cache.db.put(cache.buildDoc(eventArgs[eventArgs.length - 1])))
})
}
// Then wait for all pooler requests to resolve
let results = await cache.pooler!.all()
// Then transform them, we know the shape in forward
results = results.reduce((res: any[], response: any[]) => {
if (response[0]) response.forEach((el: any) => res.push(cache.buildDoc(el)))
return res
}, [])
// Then wait for old dbPromises to resolve
await Promise.all(dbPromises)
// Add the last docs
await cache.db.bulkDocs(results).catch((err) => {
throw ErrorUtils.ensureError(err)
})
}
protected abstract _populateSyncOptions(options?: Options.Sync): Promise<DeepRequired<Options.Sync>>
}

@ -2,14 +2,14 @@
import { DeepRequired, MarkOptional } from 'ts-essentials'
// Local types
import { RelayerProperties as RelayerDataProperties } from 'types/sdk/data'
import { Options as ChainOptions } from 'types/sdk/chain'
import { RelayerProperties as RelayerDataProperties, Options as DataOptions } from 'types/sdk/data'
import { Options, Transactions } from 'types/sdk/core'
import { ZKDepositData, InputFor } from 'types/sdk/crypto'
import { TornadoInstance, TornadoProxy } from 'types/deth'
// External imports
import { EventEmitter } from 'stream'
import { BigNumber, EventFilter, providers } from 'ethers'
import { BigNumber, providers } from 'ethers'
import { parseUnits } from 'ethers/lib/utils'
import { bigInt } from 'snarkjs'
@ -17,10 +17,75 @@ import { bigInt } from 'snarkjs'
import { parseIndexableString } from 'pouchdb-collate'
// Local imports
import { Docs, Cache, Types as DataTypes, Json, Constants, Onchain } from 'lib/data'
import { Primitives } from 'lib/crypto'
import { Contracts, Chain } from 'lib/chain'
import { ErrorUtils, ObjectUtils, AsyncUtils } from 'lib/utils'
import { Docs, Cache, Keys, Constants, Onchain } from 'lib/data'
import { Contracts, Chain, Synchronizer } from 'lib/chain'
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ FOR SYNCHRONIZATION ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
function tornadoSyncErrorHandler(
err: Error,
numResolvedPromises: number,
callbackIndex: number,
orderIndex: number,
...args: any[]
): void {
err = ErrorUtils.ensureError<Error>(err)
if (err.message.match('context deadline exceeded'))
console.error(
ErrorUtils.getError(
`Context deadline exceeded, stop if more promises do not resolve. Resolved: ${numResolvedPromises}`
)
)
else if (err.message.match('Invalid JSON RPC'))
console.error(
ErrorUtils.getError(`Endpoint returned invalid value (we might be rate limited), retrying.`)
)
else {
err.message += `\nCallback args supplied: [${args.join(', ')}]\n`
throw err
}
}
export class DepositCache extends Cache.Syncable<Docs.Deposit> {
buildDoc(response: any): Docs.Deposit {
return new Docs.Deposit(response)
}
getErrorHandlers(): Array<AsyncUtils.ErrorHandler> {
return [tornadoSyncErrorHandler]
}
getCallbacks(instance: TornadoInstance): Array<AsyncUtils.Callback> {
return [
(fromBlock: number, toBlock: number) => {
return instance.queryFilter(instance.filters.Deposit(null, null, null), fromBlock, toBlock)
}
]
}
}
export class WithdrawalCache extends Cache.Syncable<Docs.Withdrawal> {
buildDoc(response: any): Docs.Withdrawal {
return new Docs.Withdrawal(response)
}
getErrorHandlers(): Array<AsyncUtils.ErrorHandler> {
return [tornadoSyncErrorHandler]
}
getCallbacks(instance: TornadoInstance): Array<AsyncUtils.Callback> {
return [
(fromBlock: number, toBlock: number) => {
return instance.queryFilter(instance.filters.Withdrawal(null, null, null, null), fromBlock, toBlock)
}
]
}
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ CORE ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
type Provider = providers.Provider
@ -37,7 +102,7 @@ type RelayerProperties = MarkOptional<
'serviceFeePercent' | 'prices'
>
export class Core extends EventEmitter {
export class Core extends Synchronizer {
chain: Chain
caches: Map<string, Cache.Base<Docs.Base>>
instances: Map<string, TornadoInstance>
@ -65,8 +130,12 @@ export class Core extends EventEmitter {
}
async getInstance(token: string, denomination: number | string): Promise<TornadoInstance> {
const chainId = await this.chain.getChainId()
return Contracts.getInstance(String(chainId), token, String(denomination), this.chain.provider)
const chainId = String(await this.chain.getChainId())
token = token.toLowerCase()
denomination = String(denomination)
if (this.instances.has(chainId + token + denomination))
return this.instances.get(chainId + token + denomination)!
else return Contracts.getInstance(chainId, token, denomination, this.chain.provider)
}
async getProxy(): Promise<TornadoProxy> {
@ -210,10 +279,9 @@ export class Core extends EventEmitter {
const toWithdraw = BigNumber.from(+lookupKeys.denomination * 10 ** lookupKeys.denomination.length)
.mul(decimals)
.div(10 ** lookupKeys.denomination.length)
const native = lookupKeys.token !== (await this.chain.getChainSymbol())
const native = lookupKeys.token == (await this.chain.getChainSymbol())
// TODO: Decide if necessary
if (!tokenPrice && native)
if (!tokenPrice && !native)
throw ErrorUtils.getError(
'Core.buildDepositProofs: a token price MUST be supplied if the token withdrawn is not native.'
)
@ -272,8 +340,8 @@ export class Core extends EventEmitter {
ethBought: BigNumber,
tokenPriceInEth?: BigNumber
): typeof bigInt {
const factor = BigNumber.from(10).pow(String(relayerServiceFee).length)
const baseRelayerFee = toWithdraw.mul(BigNumber.from(relayerServiceFee).mul(factor)).div(factor)
const factor = 10 ** String(relayerServiceFee).length
const baseRelayerFee = toWithdraw.mul(BigNumber.from(relayerServiceFee * factor)).div(factor)
const txCost = gasPrice.add(gasPriceCushion).mul(5e5)
if (tokenPriceInEth) {
// @ts-expect-error
@ -283,9 +351,51 @@ export class Core extends EventEmitter {
else return bigInt(txCost.add(baseRelayerFee).toString())
}
/**
* @param instanceName The name of the instance as created in `_sync` function.
* @param commitments The commitments for which the leaf index values are to be noted down extra.
* @returns The result of concatenating the array of leaf indices found by matching them with the provided commitment values, followed by the array of all leaf indices, including all of the formerly mentioned values given that they are valid. Values which have not been matched, meaning probably invalid values, will be `0`.
*/
private async _findLeavesAndIndices(
instanceName: string,
commitments: Array<string>
): Promise<[Array<string>, Array<number>]> {
const indices = new Array<number>(commitments.length).fill(0)
const leaves: Array<string> = []
const cache = this.loadCache<Cache.Base<Docs.Deposit>>(instanceName)
const docs = await cache.db.allDocs()
// If no docs in cache throw and stop
if (docs.total_rows === 0) {
await cache.clear()
throw ErrorUtils.getError(
`Core.buildMerkleTree: events for instance ${instanceName} have not been synchronized.`
)
}
// Otherwise start looking for commitment leaf indices and also pick up all other leafs on the way
for (const row of docs.rows) {
const [, leafIndex, loadedCommitment] = parseIndexableString(row.id)
const index = commitments.findIndex((commitment) => commitment === loadedCommitment)
// If some commitment is found then add the leaf index and remove that commitment
if (index !== -1) {
indices[index] = leafIndex
commitments.splice(index, 1)
}
// In any case push every leaf
leaves.push(BigNumber.from(loadedCommitment).toString())
}
// Concat matched and all leaf indices
return [leaves, indices]
}
async loadNotes(
indexes?: Array<number>,
keys?: Partial<DataTypes.Keys.InstanceLookup>
keys?: Partial<Keys.InstanceLookup>
): Promise<Array<ZKDepositData>> {
const rows = await Cache.loadContents<Docs.Note>('DepositNotes')
@ -450,262 +560,111 @@ export class Core extends EventEmitter {
throw ErrorUtils.ensureError(err)
})
// TODO: Decide whether to close caches by default or not
//await cache.close().catch((err) => {
// throw ErrorUtils.ensureError(err)
//})
// TODO: Decide whether to close caches by default or not
//await cache.close().catch((err) => {
// throw ErrorUtils.ensureError(err)
//})
}
loadWithdrawalCache(name: string, options?: Options.Core.Cache): Cache.Withdrawal {
loadDepositCache(name: string, options?: ChainOptions.Sync): DepositCache {
if (!this.caches.has(name)) {
this.caches.set(name, new Cache.Withdrawal(name, options))
this.caches.set(
name,
new DepositCache(
name,
options ? { adapter: options?.cacheAdapter, persistent: options?.persistentCache } : undefined
)
)
}
return this.caches.get(name) as Cache.Withdrawal
return this.caches.get(name) as DepositCache
}
loadDepositCache(name: string, options?: Options.Core.Cache): Cache.Deposit {
loadWithdrawalCache(name: string, options?: ChainOptions.Sync): WithdrawalCache {
if (!this.caches.has(name)) {
this.caches.set(name, new Cache.Deposit(name, options))
this.caches.set(
name,
new WithdrawalCache(
name,
options ? { adapter: options?.cacheAdapter, persistent: options?.persistentCache } : undefined
)
)
}
return this.caches.get(name) as Cache.Deposit
return this.caches.get(name) as WithdrawalCache
}
loadCache<C extends Cache.Base<Docs.Base>>(name: string, options?: Options.Cache.Database): C {
loadCache<C extends Cache.Base<Docs.Base>>(name: string, options?: ChainOptions.Sync): C {
if (!this.caches.has(name)) {
this.caches.set(name, new Cache.Base(name, options))
this.caches.set(
name,
new Cache.Base(
name,
options ? { adapter: options?.cacheAdapter, persistent: options?.persistentCache } : undefined
)
)
}
return this.caches.get(name) as C
}
async syncMultiple(instances: Array<TornadoInstance>, syncOptions?: Options.Core.Sync): Promise<void> {
for (const instance of instances) {
await this.sync(instance, syncOptions)
}
}
async sync(instance: TornadoInstance, syncOptions?: Options.Core.Sync): Promise<void> {
// Get some data
async syncDeposits(instance: TornadoInstance, options?: ChainOptions.Sync): Promise<void> {
const lookupKeys = await Onchain.getInstanceLookupKeys(instance.address)
const pathstring = lookupKeys.network + lookupKeys.token + lookupKeys.denomination
const populatedSyncOpts = await this._populateSyncOpts(lookupKeys, syncOptions)
const actions = Object.entries(populatedSyncOpts).filter((el) => el[1] === true) as [string, boolean][]
// Synchronize
for (let i = 0, bound = actions.length; i < bound; i++) {
const action = actions[i][0].charAt(0).toUpperCase() + actions[i][0].slice(1)
const pathstring = lookupKeys.network + lookupKeys.token + lookupKeys.denomination
const name = action + 's' + pathstring.toUpperCase()
if (action == 'Deposit')
await this._sync(
pathstring,
this.loadDepositCache(name, syncOptions?.cache),
instance.filters.Deposit(null, null, null),
instance,
populatedSyncOpts
)
else if (action == 'Withdrawal')
await this._sync(
pathstring,
this.loadWithdrawalCache(name, syncOptions?.cache),
instance.filters.Withdrawal(null, null, null, null),
instance,
populatedSyncOpts
)
}
}
private async _sync(
pathstring: string,
cache: Cache.Syncable<Docs.Base>,
filter: EventFilter,
instance: TornadoInstance,
syncOptions: DeepRequired<Options.Core.Sync>
): Promise<void> {
// Assign pooler
cache.sync.initializePooler(cache.getCallbacks(instance), cache.getErrorHandlers())
// Decide whether we have a latest block
const numEntries = (await cache.db.info()).doc_count
// Check for synced blocks
if (0 < numEntries) {
const [lastSyncedBlock, ,] = parseIndexableString(
(await cache.db.allDocs({ descending: true, limit: 1 })).rows[0].id
)
syncOptions.blocks.startBlock =
lastSyncedBlock < syncOptions.blocks.startBlock ? syncOptions.blocks.startBlock : lastSyncedBlock
syncOptions.blocks.blockDelta = this._getBlockDelta(syncOptions)
}
// Start synchronizing
let dbPromises = []
this.emit(
'debug',
syncOptions.blocks.startBlock,
syncOptions.blocks.targetBlock,
syncOptions.blocks.blockDelta
options = options ?? {}
options.startBlock = await Onchain.getInstanceDeployBlockNum(
lookupKeys.network,
lookupKeys.token,
lookupKeys.denomination
)
this.emit('sync', 'syncing')
const populatedOptions = await this._populateSyncOptions(options)
const cache = this.loadDepositCache('Deposits' + pathstring.toUpperCase(), populatedOptions)
for (
let currentBlock = syncOptions.blocks.startBlock,
blockDelta = syncOptions.blocks.blockDelta,
targetBlock = syncOptions.blocks.targetBlock,
concurrencyLimit = syncOptions.cache.sync.concurrencyLimit;
currentBlock < targetBlock;
currentBlock += blockDelta
) {
if (cache.sync.pooler!.pending < concurrencyLimit) {
const sum = currentBlock + blockDelta
await this.sync(instance.filters.Deposit(null, null, null), instance, cache, populatedOptions)
await AsyncUtils.timeout(syncOptions.msTimeout)
if (currentBlock + blockDelta < targetBlock) {
await cache.sync.pooler!.pool(currentBlock, sum)
} else {
await cache.sync.pooler!.pool(currentBlock, sum - (sum % targetBlock))
}
this.emit('debug', currentBlock++, sum)
} else {
let res: Array<any> = await cache.sync.pooler!.race()
if (res.length != 0)
dbPromises.push(
cache.db.bulkDocs(res.map((el) => cache.buildDoc(el))).catch((err) => {
throw ErrorUtils.ensureError(err)
})
)
currentBlock -= blockDelta
}
}
this.emit('sync', 'synced')
// Immediately start listening if we're doing this
if (syncOptions.cache.sync.listen) {
instance = instance.on(filter, (...eventArgs) => {
this.emit(cache.name, 'received', cache.db.put(cache.buildDoc(eventArgs[eventArgs.length - 1])))
})
}
// Then wait for all pooler requests to resolve
let results = await cache.sync.pooler!.all()
// Then transform them, we know the shape in forward
results = results.reduce((res: any[], response: any[]) => {
if (response[0]) response.forEach((el: any) => res.push(cache.buildDoc(el)))
return res
}, [])
// Then wait for old dbPromises to resolve
await Promise.all(dbPromises)
// Add the last docs
await cache.db.bulkDocs(results).catch((err) => {
throw ErrorUtils.ensureError(err)
})
// Finally, store the objects
if (!this.instances.has(pathstring)) this.instances.set(pathstring, instance)
if (!this.caches.has(cache.name)) this.caches.set(cache.name, cache)
}
private async _populateSyncOpts(
lookupKeys: DataTypes.Keys.InstanceLookup,
syncOptions?: Options.Core.Sync
): Promise<DeepRequired<Options.Core.Sync>> {
// Assign nonexistent
if (!syncOptions) syncOptions = {}
if (!syncOptions.blocks) syncOptions.blocks = {}
if (!syncOptions.cache) syncOptions.cache = { db: {}, sync: {} }
if (!syncOptions.cache.sync) syncOptions.cache.sync = {}
if (!syncOptions.cache.db) syncOptions.cache.db = {}
async syncWithdrawals(instance: TornadoInstance, options?: ChainOptions.Sync): Promise<void> {
const lookupKeys = await Onchain.getInstanceLookupKeys(instance.address)
const pathstring = lookupKeys.network + lookupKeys.token + lookupKeys.denomination
// Prepare options
// deposit & withdraw
const both = syncOptions.deposit === undefined && syncOptions.withdrawal === undefined
syncOptions.deposit = syncOptions.deposit ?? both
syncOptions.withdrawal = syncOptions.withdrawal ?? false
// blocks
syncOptions.blocks.startBlock =
syncOptions.blocks.startBlock ??
(await Onchain.getInstanceDeployBlockNum(lookupKeys.network, lookupKeys.token, lookupKeys.denomination))
syncOptions.blocks.targetBlock = syncOptions.blocks.targetBlock ?? (await this.chain.latestBlockNum())
syncOptions.blocks.deltaDivisor = syncOptions.blocks.deltaDivisor ?? 100
syncOptions.blocks.blockDelta = this._getBlockDelta(syncOptions)
syncOptions.msTimeout = syncOptions.msTimeout ?? 200 // 5 requests per second
// cache
// db
syncOptions.cache.db.persistent = syncOptions.cache.db.persistent ?? true
syncOptions.cache.db.adapter = syncOptions.cache.db.adapter ?? 'leveldb'
// sync
syncOptions.cache.sync.concurrencyLimit = syncOptions.cache.sync.concurrencyLimit ?? 8
syncOptions.cache.sync.listen = syncOptions.cache.sync.listen ?? false
return syncOptions as DeepRequired<Options.Core.Sync>
}
private _getBlockDelta(syncOptions?: Options.Core.Sync): number {
return Math.floor(
(syncOptions!.blocks!.targetBlock! - syncOptions!.blocks!.startBlock!) /
syncOptions!.blocks!.deltaDivisor!
options = options ?? {}
options.startBlock = await Onchain.getInstanceDeployBlockNum(
lookupKeys.network,
lookupKeys.token,
lookupKeys.denomination
)
const populatedOptions = await this._populateSyncOptions(options)
const cache = this.loadWithdrawalCache('Withdrawals' + pathstring.toUpperCase(), populatedOptions)
await this.sync(instance.filters.Withdrawal(null, null, null), instance, cache, populatedOptions)
if (!this.instances.has(pathstring)) this.instances.set(pathstring, instance)
if (!this.caches.has(cache.name)) this.caches.set(cache.name, cache)
}
/**
* @param instanceName The name of the instance as created in `_sync` function.
* @param commitments The commitments for which the leaf index values are to be noted down extra.
* @returns The result of concatenating the array of leaf indices found by matching them with the provided commitment values, followed by the array of all leaf indices, including all of the formerly mentioned values given that they are valid. Values which have not been matched, meaning probably invalid values, will be `0`.
*/
private async _findLeavesAndIndices(
instanceName: string,
commitments: Array<string>
): Promise<[Array<string>, Array<number>]> {
const indices = new Array<number>(commitments.length).fill(0)
const leaves: Array<string> = []
protected async _populateSyncOptions(options: ChainOptions.Sync): Promise<DeepRequired<ChainOptions.Sync>> {
if (!options.startBlock) throw ErrorUtils.getError('Core._populateSyncOptions: startBlock not set.')
const cache = this.loadCache<Cache.Base<Docs.Deposit>>(instanceName)
const docs = await cache.db.allDocs()
options.targetBlock = options.targetBlock ?? (await this.chain.latestBlockNum())
// If no docs in cache throw and stop
if (docs.total_rows === 0) {
await cache.clear()
throw ErrorUtils.getError(
`Core.buildMerkleTree: events for instance ${instanceName} have not been synchronized.`
)
}
options.blockDivisor = options.blockDivisor ?? 40
// Otherwise start looking for commitment leaf indices and also pick up all other leafs on the way
for (const row of docs.rows) {
const [, leafIndex, loadedCommitment] = parseIndexableString(row.id)
const index = commitments.findIndex((commitment) => commitment === loadedCommitment)
options.blockDelta = Math.floor((options.targetBlock - options.startBlock) / options.blockDivisor)
// If some commitment is found then add the leaf index and remove that commitment
if (index !== -1) {
indices[index] = leafIndex
commitments.splice(index, 1)
}
options.concurrencyLimit = options.concurrencyLimit ?? 8
// In any case push every leaf
leaves.push(BigNumber.from(loadedCommitment).toString())
}
options.msTimeout = options.msTimeout ?? 200 // 5 requests per second
// Concat matched and all leaf indices
return [leaves, indices]
options.persistentCache = options.persistentCache ?? true
options.cacheAdapter = options.cacheAdapter ?? 'leveldb'
options.listenForEvents = options.listenForEvents ?? false
return options as DeepRequired<ChainOptions.Sync>
}
}

@ -1,5 +1,5 @@
// crypto types
import * as Types from 'types/sdk/crypto'
import { InputFor, OutputOf, ZKDepositData } from 'types/sdk/crypto'
// External crypto
import circomlib from 'circomlib'
@ -29,19 +29,20 @@ export namespace Setup {
return Json.load('circuits/tornado.json')
}
let cachedGroth16Prover: Groth16 | null = null
/**
* @note The following is a comment from tornado-cli: `groth16 initialises a lot of Promises that will never be resolved, that's why we need to use process.exit to terminate the CLI`. They literally didn't check the code to see that these are just worker threads and that `groth16` has a `terminate()` function to remove them. 🤦
*/
export async function getGroth16(): Promise<Groth16> {
const defaultParams = { wasmInitialMemory: 5000 }
return await buildGroth16(defaultParams)
if (!cachedGroth16Prover) cachedGroth16Prover = await buildGroth16(defaultParams)
return cachedGroth16Prover
}
}
export namespace Primitives {
export function calcPedersenHash(
pedersenHashData: Types.InputFor.PedersenHash
): Types.OutputOf.PedersenHash {
export function calcPedersenHash(pedersenHashData: InputFor.PedersenHash): OutputOf.PedersenHash {
return circomlib.babyJub.unpackPoint(circomlib.pedersenHash.hash(pedersenHashData.msg))[0]
}
@ -49,7 +50,7 @@ export namespace Primitives {
return HexUtils.bufferToHex(msg, 62)
}
export function parseNote(hexNote: string): Types.ZKDepositData {
export function parseNote(hexNote: string): ZKDepositData {
const _hexNote = hexNote.split('_')[1] ?? hexNote
const buffer = Buffer.from(_hexNote.slice(2), 'hex')
return createDeposit({
@ -61,11 +62,11 @@ export namespace Primitives {
}
export function createDeposit(
input: Types.InputFor.CreateDeposit = {
input: InputFor.CreateDeposit = {
nullifier: NumberUtils.randomBigInteger(31),
secret: NumberUtils.randomBigInteger(31)
}
): Types.ZKDepositData {
): ZKDepositData {
// @ts-expect-error
let preimage = Buffer.concat([input.nullifier.leInt2Buff(31), input.secret.leInt2Buff(31)])
let commitment = calcPedersenHash({ msg: preimage })
@ -84,14 +85,12 @@ export namespace Primitives {
}
}
export function buildMerkleTree(inputs: Types.InputFor.BuildMerkleTree): MerkleTree {
export function buildMerkleTree(inputs: InputFor.BuildMerkleTree): MerkleTree {
// @ts-expect-error
return new MerkleTreeDefault(inputs.height, inputs.leaves)
}
export async function calcDepositProofs(
inputs: Array<Types.InputFor.ZKProof>
): Promise<Array<Array<string>>> {
export async function calcDepositProofs(inputs: Array<InputFor.ZKProof>): Promise<Array<Array<string>>> {
const proofs: string[][] = []
const groth16 = await Setup.getGroth16()
const circuit = await Setup.getTornadoCircuit()
@ -170,4 +169,4 @@ export namespace Primitives {
// export function calcDepositProof(merkleProof: Crypto.InputFor.DepositProof): Crypto.OutputOf.DepositProof {}
// Namespace exports
export { Types }
export { InputFor, OutputOf, ZKDepositData }

@ -1,8 +1,6 @@
// Local types
import { TornadoInstance } from 'types/deth'
import * as Types from 'types/sdk/data'
import { ClassicInstance, TokenData, Keys, Options } from 'types/sdk/data'
import { RelayerProperties } from 'types/sdk/data'
import { Options } from 'types/sdk/core'
// Big modules
import { BigNumber } from 'ethers'
@ -131,7 +129,7 @@ export namespace Onchain {
network: string,
token: string,
denomination: string
): Promise<Types.Json.ClassicInstance> {
): Promise<ClassicInstance> {
const instanceData = Json.getValue(await Json.load('onchain/instances.json'), [network, token])
return {
network: +network,
@ -144,7 +142,7 @@ export namespace Onchain {
}
}
export async function getInstanceLookupKeys(instanceAddress: string): Promise<Types.Keys.InstanceLookup> {
export async function getInstanceLookupKeys(instanceAddress: string): Promise<Keys.InstanceLookup> {
// lookup some stuff first
const lookupObj: { [key: string]: string } = await Json.load('onchain/instanceAddresses.json')
@ -233,7 +231,7 @@ export namespace Onchain {
return Json.getValue(await Json.load('onchain/infrastructure.json'), [network, 'multicall3'])
}
export async function getTokenData(network: string, token: string): Promise<Types.Json.TokenData> {
export async function getTokenData(network: string, token: string): Promise<TokenData> {
const data = Json.getValue(await Json.load('onchain/tokens.json'), [network, token])
return {
network: +network,
@ -401,7 +399,7 @@ export namespace Cache {
name: string
db: PouchDB.Database<T>
constructor(name: string, options?: Options.Cache.Database) {
constructor(name: string, options?: Options.Cache) {
this.name = name
if (options?.persistent === false && options?.adapter !== 'memory' && options?.adapter !== null)
@ -432,11 +430,10 @@ export namespace Cache {
}
export abstract class Syncable<T extends Docs.Base> extends Base<T> {
sync: AsyncUtils.Sync
pooler?: AsyncUtils.PromisePooler
constructor(name: string, options?: { db?: Options.Cache.Database; sync?: Options.Cache.Sync }) {
super(name, options?.db)
this.sync = new AsyncUtils.Sync(options?.sync)
constructor(name: string, options?: Options.Cache) {
super(name, options)
}
abstract buildDoc(response: any): Docs.Base
@ -445,80 +442,28 @@ export namespace Cache {
abstract getErrorHandlers(...args: Array<any>): Array<AsyncUtils.ErrorHandler>
initializePooler(
callbacks: Array<AsyncUtils.Callback>,
errorHandlers: Array<AsyncUtils.ErrorHandler>,
concurrencyLimit: number
): void {
if (this.pooler) this.pooler.reset()
this.pooler = new AsyncUtils.PromisePooler(callbacks, errorHandlers, concurrencyLimit)
}
async close(): Promise<void> {
if (this.sync.pooler!.pending)
if (this.pooler && this.pooler.pending)
throw ErrorUtils.getError("Syncable.close: can't clear while pooler still has pending promises.")
await super.close()
}
async clear(): Promise<void> {
if (this.sync.pooler!.pending)
if (this.pooler && this.pooler.pending)
throw ErrorUtils.getError("Syncable.clear: can't clear while pooler still has pending promises.")
await super.clear()
}
}
function tornadoSyncErrorHandler(
err: Error,
numResolvedPromises: number,
callbackIndex: number,
orderIndex: number,
...args: any[]
): void {
err = ErrorUtils.ensureError<Error>(err)
if (err.message.match('context deadline exceeded'))
console.error(
ErrorUtils.getError(
`Context deadline exceeded, stop if more promises do not resolve. Resolved: ${numResolvedPromises}`
)
)
else if (err.message.match('Invalid JSON RPC'))
console.error(
ErrorUtils.getError(`Endpoint returned invalid value (we might be rate limited), retrying.`)
)
else {
err.message += `\nCallback args supplied: [${args.join(', ')}]\n`
throw err
}
}
export class Deposit extends Syncable<Docs.Deposit> {
buildDoc(response: any): Docs.Deposit {
return new Docs.Deposit(response)
}
getErrorHandlers(): Array<AsyncUtils.ErrorHandler> {
return [tornadoSyncErrorHandler]
}
getCallbacks(instance: TornadoInstance): Array<AsyncUtils.Callback> {
return [
(fromBlock: number, toBlock: number) => {
return instance.queryFilter(instance.filters.Deposit(null, null, null), fromBlock, toBlock)
}
]
}
}
export class Withdrawal extends Syncable<Docs.Withdrawal> {
buildDoc(response: any): Docs.Withdrawal {
return new Docs.Withdrawal(response)
}
getErrorHandlers(): Array<AsyncUtils.ErrorHandler> {
return [tornadoSyncErrorHandler]
}
getCallbacks(instance: TornadoInstance): Array<AsyncUtils.Callback> {
return [
(fromBlock: number, toBlock: number) => {
return instance.queryFilter(instance.filters.Withdrawal(null, null, null, null), fromBlock, toBlock)
}
]
}
}
type DocsArray<T extends Docs.Base> = Array<{
doc?: T
id: string
@ -552,4 +497,4 @@ export namespace Cache {
}
// Namespace exports
export { Types }
export { ClassicInstance, TokenData, Keys, Options }

@ -177,22 +177,6 @@ export namespace AsyncUtils {
}
}
export class Sync {
pooler?: PromisePooler
concurrencyLimit: number
listen: boolean
constructor(options?: Options.Cache.Sync) {
this.concurrencyLimit = options?.concurrencyLimit ?? 1
this.listen = options?.listen ?? false
}
initializePooler(callbacks: Array<Callback>, errorHandlers: Array<ErrorHandler>): void {
if (this.pooler) this.pooler.reset()
this.pooler = new PromisePooler(callbacks, errorHandlers, this.concurrencyLimit)
}
}
export function timeout(msTimeout: number): Promise<any> {
return new Promise((resolve) => setTimeout(resolve, msTimeout))
}

@ -1,5 +1,5 @@
// Types
import { Relayer as Types, RelayerOptions } from 'types/sdk/web'
import { Options, WithdrawalRequestResult } from 'types/sdk/web'
import { RelayerProperties } from 'types/sdk/data'
// HTTP and proxy
@ -95,7 +95,7 @@ export class Relayer {
private _chainId?: number
private _prices?: Map<string, BigNumber>
constructor(options: Types.Options, properties?: RelayerProperties) {
constructor(options: Options.Relayer, properties?: RelayerProperties) {
this.url = options.url
this.httpClient = options.httpClient
this._fetched = false
@ -217,10 +217,7 @@ export class Relayer {
)
}
async handleWithdrawal(
instanceAddress: string,
proof: Array<string>
): Promise<Types.WithdrawalRequestResult> {
async handleWithdrawal(instanceAddress: string, proof: Array<string>): Promise<WithdrawalRequestResult> {
const response = (await this.httpClient
.post(this.url + '/v1/tornadoWithdraw', {
contract: instanceAddress,
@ -231,7 +228,7 @@ export class Relayer {
const { id } = response.data
let result: Types.WithdrawalRequestResult = { success: false },
let result: WithdrawalRequestResult = { success: false },
finished = false
while (!finished) {
@ -278,7 +275,7 @@ export class Relayer {
/**
* Construct a new Relayer by reading relayer data from cache.
*/
static async fromCache(options: RelayerOptions): Promise<Relayer> {
static async fromCache(options: Options.Relayer): Promise<Relayer> {
const cache = new Cache.Base<Docs.Relayer>('Relayers')
// Error is ensured already

@ -1,5 +1,6 @@
import chai from 'chai'
import * as ganache from 'ganache'
import { initializeRelayer } from './preload'
// External
import { solidity } from 'ethereum-waffle'
@ -9,17 +10,16 @@ import { parseUnits } from 'ethers/lib/utils'
import { parseIndexableString } from 'pouchdb-collate'
// Local
import { RelayerProperties } from 'types/sdk/data'
import { ERC20, TornadoInstance } from 'types/deth'
import { Docs, Files, Onchain, Cache } from 'lib/data'
import { Files, Onchain } from 'lib/data'
import { Chain, Contracts } from 'lib/chain'
import { Primitives } from 'lib/crypto'
import { ErrorUtils } from 'lib/utils'
import { TorProvider, Relayer, RegularHttpClient } from 'lib/web'
import { TorProvider } from 'lib/web'
import { Core } from 'lib/core'
// Data
import compareDeposits from './resources/deposits_eth_0.1.json'
import { Setup } from 'lib/crypto'
chai.use(solidity)
@ -59,19 +59,6 @@ describe('Core', () => {
const chain = new Chain(ganacheProvider)
async function initializeRelayer(): Promise<Relayer> {
const httpClient = new RegularHttpClient()
const relayer = new Relayer({
url: 'https://thornadope.xyz',
httpClient: httpClient
})
await relayer.fetchProperties()
return relayer
}
after(async function () {
this.timeout(0)
await Files.wipeCache()
@ -112,17 +99,12 @@ describe('Core', () => {
const smallEthDenomName = '1ETH0.1'
it(`sync: Should be able to fetch deposit events for ${smallEthDenomName}`, async function () {
it(`syncDeposits: Should be able to fetch deposit events for ${smallEthDenomName}`, async function () {
// This is going to try syncing the entire range
await core.sync(smallestEth, {
blocks: {
deltaDivisor: 50
},
cache: {
sync: {
concurrencyLimit: 20
}
}
await core.syncDeposits(smallestEth, {
blockDivisor: 50,
concurrencyLimit: 20,
msTimeout: 300
})
const cache = core.caches.get('Deposits' + smallEthDenomName)
@ -143,17 +125,12 @@ describe('Core', () => {
const bigDaiDenomName = '1DAI100000'
it(`sync: Should be able to fetch deposit events for ${bigDaiDenomName}`, async function () {
it(`syncDeposits: Should be able to fetch deposit events for ${bigDaiDenomName}`, async function () {
// This is going to try syncing the entire range
await core.sync(dai100K, {
blocks: {
deltaDivisor: 50
},
cache: {
sync: {
concurrencyLimit: 20
}
}
await core.syncDeposits(dai100K, {
blockDivisor: 50,
concurrencyLimit: 20,
msTimeout: 300
})
}).timeout(0)
})
@ -200,7 +177,7 @@ describe('Core', () => {
dai = dai.connect(daiWhaleSigner)
})
it.only('buildDepositTransaction: build a single eth deposit tx and succeed', async () => {
it('buildDepositTransaction: build a single eth deposit tx and succeed', async () => {
const initBal = await needsMoney.getBalance()
// Build tx and load cache for this test
@ -217,6 +194,8 @@ describe('Core', () => {
)
})
const listener = smallestEth.listeners(smallestEth.filters.Deposit(null, null, null))[0]
// Deposit and await cache updated
const response = await needsMoney.sendTransaction(tx.request)
await response.wait()
@ -225,12 +204,15 @@ describe('Core', () => {
// Passing resolve as callback into put didn't work
await await putPromise
// Turn off listener (NEEDED OR WE'RE NOT RESOLVING)
smallestEth.off(smallestEth.filters.Deposit(null, null, null), listener)
// Check deposit predicates
expect(initBal).to.equal(parseUnits('1000'))
expect(endBal).to.be.lte(parseUnits('999.9'))
}).timeout(0)
it.only('buildDepositProof: it should be able to build an eth proof', async () => {
it('buildDepositProof: it should be able to build an eth proof', async () => {
// Get withdrawer, load cache, prep note for this test
const withdrawer = ganacheProvider.getSigner(2)
const cache = core.loadDepositCache('Deposits1ETH0.1')
@ -240,7 +222,7 @@ describe('Core', () => {
const notes = await core.loadNotes()
// Build proof
let proof
let proof: any
try {
proof = await core.buildDepositProof(
@ -265,8 +247,8 @@ describe('Core', () => {
// Withdrawal time, let's see if it works
// The balance diff will be exact because withdrawer is paying for gas as relayer
await expect(
await smallestEth
await expect(() =>
smallestEth
.connect(withdrawer)
.withdraw(proof[0], proof[1], proof[2], proof[3], proof[4], proof[5], proof[6])
).to.changeEtherBalance(needsMoney, ethDelta)
@ -289,6 +271,8 @@ describe('Core', () => {
)
})
const listener = dai100K.listeners()[0]
// Prep for deposit
await dai.transfer(needsMoneyAddress, depositAmount)
dai = dai.connect(needsMoney)
@ -306,12 +290,17 @@ describe('Core', () => {
// Passing resolve as callback into put didn't work
await await putPromise
// Off (otherwise no resolve)
dai100K.off(dai100K.filters.Deposit(null, null, null), listener)
// Checks
expect(daiBalBef).to.equal(daiBalPost.sub(depositAmount))
expect(await dai.balanceOf(needsMoneyAddress)).to.equal(0)
}).timeout(0)
it.only('buildDepositProof: it should be able to build a token proof', async () => {
if (!process.env.TEST_RELAYER_DOMAIN) throw ErrorUtils.getError('core.test.ts: Need a relayer name')
// Get withdrawer, load cache, prep note for this test
const withdrawer = ganacheProvider.getSigner(2)
const cache = core.loadDepositCache('Deposits1DAI100000')
@ -324,7 +313,8 @@ describe('Core', () => {
const note = notes[notes.length - 1]
// Init properties via some relayer to make our life easier
const relayer = await initializeRelayer()
const relayer = await initializeRelayer(process.env.TEST_RELAYER_DOMAIN)
let properties = relayer.properties
// Just set another address
@ -347,7 +337,7 @@ describe('Core', () => {
const daiDelta = parseUnits('100000').sub(proof[5])
await expect(
await smallestEth
await dai100K
.connect(withdrawer)
.withdraw(proof[0], proof[1], proof[2], proof[3], proof[4], proof[5], proof[6])
).to.changeTokenBalance(dai, needsMoney, daiDelta)

@ -1,2 +1,17 @@
import { Relayer, RegularHttpClient } from 'lib/web'
import * as dotenv from 'dotenv'
dotenv.config()
export async function initializeRelayer(
name: string,
httpClient = new RegularHttpClient()
): Promise<Relayer> {
const relayer = new Relayer({
url: 'https://' + name,
httpClient: httpClient
})
await relayer.fetchProperties()
return relayer
}

@ -19,7 +19,7 @@ describe('utils', () => {
})
it('pool(Many) & pending & totalAdded', async () => {
pooler = new AsyncUtils.PromisePooler([tp], 8)
pooler = new AsyncUtils.PromisePooler([tp], [], 8)
pooler.pool(30)
expect(pooler.pending).to.equal(1)
@ -37,7 +37,7 @@ describe('utils', () => {
it('race: promises should be raced in proper order', async () => {
let res = []
pooler = new AsyncUtils.PromisePooler([tp, tp, tp, tp], 8)
pooler = new AsyncUtils.PromisePooler([tp, tp, tp, tp], [], 8)
await pooler.pool(20)
await pooler.poolMany([10], [20], [22], [30])

@ -1,24 +1,32 @@
import chai from 'chai'
import { TorHttpClient, TorProvider } from 'lib/web'
import { initializeRelayer } from './preload'
import { TorHttpClient, RegularHttpClient, TorProvider, Relayer } from 'lib/web'
// Waffle matchers
import { solidity } from 'ethereum-waffle'
import { ErrorUtils } from 'lib/utils'
import { parseUnits } from 'ethers/lib/utils'
import { Web3Provider } from '@ethersproject/providers'
chai.use(solidity)
const expect = chai.expect
describe('web', () => {
const torify = process.env.TORIFY === 'true'
if (!process.env.ETH_MAINNET_TEST_RPC || !process.env.TOR_PORT)
throw ErrorUtils.getError('need a tor port and mainnet rpc endpoint.')
const torProvider = new TorProvider(process.env.ETH_MAINNET_TEST_RPC, { port: +process.env.TOR_PORT })
const httpClient = new TorHttpClient({ port: +process.env.TOR_PORT })
let torProvider: Web3Provider
if (process.env.TORIFY === 'true')
if (torify) torProvider = new TorProvider(process.env.ETH_MAINNET_TEST_RPC, { port: +process.env.TOR_PORT })
const httpClient = torify ? new TorHttpClient({ port: +process.env.TOR_PORT }) : new RegularHttpClient()
if (torify)
console.log(
'\nSome Tor tips: Support non-profit exit node operators, host your own nodes, avoid spy nodes by configuring torrc.\n'
)
@ -29,6 +37,18 @@ describe('web', () => {
throw err
}
it('Relayer: should be able to initialize a Relayer and fetch properties', async () => {
if (!process.env.TEST_RELAYER_DOMAIN)
throw ErrorUtils.getError('web.test.ts: this test requires a relayer domain name.')
const relayer = new Relayer({
url: 'https://' + process.env.TEST_RELAYER_DOMAIN,
httpClient: httpClient
})
console.log(await relayer.fetchProperties())
}).timeout(0)
it('httpClient: Should be able to send requests over Tor', async function () {
try {
const check = (await httpClient.get('https://check.torproject.org/api/ip')).data

@ -1,5 +1,17 @@
import { TornadoInstance, TornadoProxy, ETHTornado, ERC20Tornado, ERC20 } from 'types/deth'
export namespace Contracts {
export type TornadoContracts = TornadoInstance | TornadoProxy | ETHTornado | ERC20Tornado | ERC20
export type TornadoContracts = TornadoInstance | TornadoProxy | ETHTornado | ERC20Tornado | ERC20
export namespace Options {
export interface Sync {
startBlock?: number
targetBlock?: number
blockDelta?: number
blockDivisor?: number
concurrencyLimit?: number
msTimeout?: number
listenForEvents?: boolean
persistentCache?: true
cacheAdapter?: string
}
}

@ -9,36 +9,7 @@ import { TransactionRequest } from '@ethersproject/abstract-provider'
import { BigNumber } from 'ethers'
export namespace Options {
export namespace Cache {
export interface Database {
adapter?: string
persistent?: boolean
}
export interface Sync {
concurrencyLimit?: number
listen?: boolean
}
}
export namespace Core {
export interface Cache {
sync?: Cache.Sync
db?: Cache.Database
}
export interface Sync {
deposit?: boolean
withdrawal?: boolean
msTimeout?: number
blocks?: {
startBlock?: number
targetBlock?: number
blockDelta?: number
deltaDivisor?: number
}
cache?: Cache
}
export interface Deposit {
depositsPerInstance?: Array<number>
doNotPopulate?: boolean

@ -1,24 +1,22 @@
import { BigNumber } from 'ethers'
export namespace Json {
export interface TornadoInstance {
network: number
symbol: string
decimals: number
denomination: number
deployBlock: number
address: string
}
export interface TornadoInstance {
network: number
symbol: string
decimals: number
denomination: number
deployBlock: number
address: string
}
export interface ClassicInstance extends TornadoInstance {
anonymityMiningEnabled: boolean
}
export interface ClassicInstance extends TornadoInstance {
anonymityMiningEnabled: boolean
}
export interface TokenData {
network: number
decimals: number
address: string
}
export interface TokenData {
network: number
decimals: number
address: string
}
export namespace Keys {
@ -38,3 +36,10 @@ export interface RelayerProperties {
chainId: number
prices: Map<string, BigNumber>
}
export namespace Options {
export interface Cache {
adapter?: string
persistent?: boolean
}
}

@ -1,15 +1,15 @@
import { AxiosInstance } from 'axios'
export namespace Relayer {
export interface Options {
export namespace Options {
export interface Relayer {
url: string
httpClient: AxiosInstance
}
export interface WithdrawalRequestResult {
success: boolean
txHash?: string
}
}
export type RelayerOptions = Relayer.Options
export interface WithdrawalRequestResult {
success: boolean
txHash?: string
}
export type RelayerOptions = Options.Relayer