diff --git a/src/queue/index.ts b/src/queue/index.ts index 577b512..1243d16 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -32,6 +32,7 @@ export class PriceQueueHelper { _queue: Queue; _worker: Worker; _scheduler: QueueScheduler; + interval = 30000; constructor(private store?: RedisStore) {} @@ -70,7 +71,7 @@ export class PriceQueueHelper { async addRepeatable(tokens: PriceJobData) { await this.queue.add('updatePrice', tokens, { repeat: { - every: 30000, + every: this.interval, immediately: true, }, }); @@ -89,7 +90,11 @@ export class RelayerQueueHelper { if (!this._queue) { this._queue = new Queue(this.config.queueName, { connection: this.store.client, - defaultJobOptions: { stackTraceLimit: 100 }, + defaultJobOptions: { + stackTraceLimit: 100, + attempts: 3, + backoff: 1000, + }, }); } return this._queue; @@ -120,8 +125,9 @@ export class HealthQueueHelper { private _queue: Queue; private _worker: Worker; private _scheduler: QueueScheduler; + interval = 30000; - constructor(private store?: RedisStore, private config?: ConfigService) {} + constructor(private store?: RedisStore) {} get scheduler(): QueueScheduler { if (!this._scheduler) { @@ -155,7 +161,7 @@ export class HealthQueueHelper { async addRepeatable() { await this.queue.add('checkHealth', null, { repeat: { - every: 30000, + every: this.interval, immediately: true, }, }); diff --git a/src/queue/relayer.processor.ts b/src/queue/relayer.processor.ts index e48bd14..c8ef1e5 100644 --- a/src/queue/relayer.processor.ts +++ b/src/queue/relayer.processor.ts @@ -1,6 +1,19 @@ import { RelayerProcessor } from './index'; import { getTxService } from '../services'; import { JobStatus } from '../types'; +import { UnrecoverableError } from 'bullmq'; +import { ExecutionError } from '../services/tx.service'; + +class RevertError extends UnrecoverableError { + code: string; + + constructor(message: string, code: string) { + super(message); + this.name = this.constructor.name; + this.code = code; + Object.setPrototypeOf(this, new.target.prototype); + } +} export const relayerProcessor: RelayerProcessor = async (job) => { try { @@ -13,7 +26,10 @@ export const relayerProcessor: RelayerProcessor = async (job) => { const txData = await txService.prepareTxData(withdrawalData); return await txService.sendTx(txData); } catch (e) { - await job.update({ ...job.data, status: JobStatus.FAILED }); + if (e instanceof ExecutionError && e.code === 'REVERTED') { + await job.update({ ...job.data, status: JobStatus.FAILED }); + throw new RevertError(e.message, e.code); + } throw e; } }; diff --git a/src/queue/worker.ts b/src/queue/worker.ts index 41ded48..9116298 100644 --- a/src/queue/worker.ts +++ b/src/queue/worker.ts @@ -27,8 +27,9 @@ export const relayerWorker = async () => { }); relayer.worker.on('failed', (job, error) => { healthService.saveError(error, job.id); - console.log(error); + console.log(`Failed job ${job.id}: `, error); }); + relayer.scheduler.on('stalled', (jobId, prev) => console.log({ jobId, prev })); }; export const healthWorker = async () => { @@ -42,6 +43,6 @@ export const healthWorker = async () => { console.log(`Job ${job.name} completed with result: `, result); }); health.worker.on('failed', (job, error) => { - console.log(error); + console.log(`Failed job ${job.id}: `, error); }); }; diff --git a/src/services/tx.service.ts b/src/services/tx.service.ts index 9a4867b..242a2ee 100644 --- a/src/services/tx.service.ts +++ b/src/services/tx.service.ts @@ -19,7 +19,7 @@ export type WithdrawalData = { args: [BytesLike, BytesLike, string, string, BigNumberish, BigNumberish]; }; -class ExecutionError extends Error { +export class ExecutionError extends Error { constructor(message: string, code?: string) { super(message); this.code = code; @@ -83,7 +83,9 @@ export class TxService { }); if (receipt.status === 1) { await this.updateJobData({ status: JobStatus.CONFIRMED }); - } else throw new ExecutionError('Submitted transaction failed', 'REVERTED'); + } else { + throw new ExecutionError('Submitted transaction failed', 'REVERTED'); + } return receipt; } catch (e) { const regex = /body=("\{.*}}")/; @@ -138,7 +140,9 @@ export class TxService { async getGasPrice(): Promise { const gasPrices = await this.oracle.gasPrices(); let gasPrice = gasPrices['fast']; - if ('maxFeePerGas' in gasPrices) gasPrice = gasPrices['maxFeePerGas']; + if ('maxFeePerGas' in gasPrices) { + gasPrice = gasPrices['maxFeePerGas']; + } return parseUnits(String(gasPrice), 'gwei'); } }