error handling

This commit is contained in:
smart_ex 2022-07-05 22:19:38 +10:00
parent 29fa454428
commit 7557158835
4 changed files with 37 additions and 10 deletions

@ -32,6 +32,7 @@ export class PriceQueueHelper {
_queue: Queue<PriceJobData, PriceJobReturn, 'updatePrice'>; _queue: Queue<PriceJobData, PriceJobReturn, 'updatePrice'>;
_worker: Worker<PriceJobData, PriceJobReturn, 'updatePrice'>; _worker: Worker<PriceJobData, PriceJobReturn, 'updatePrice'>;
_scheduler: QueueScheduler; _scheduler: QueueScheduler;
interval = 30000;
constructor(private store?: RedisStore) {} constructor(private store?: RedisStore) {}
@ -70,7 +71,7 @@ export class PriceQueueHelper {
async addRepeatable(tokens: PriceJobData) { async addRepeatable(tokens: PriceJobData) {
await this.queue.add('updatePrice', tokens, { await this.queue.add('updatePrice', tokens, {
repeat: { repeat: {
every: 30000, every: this.interval,
immediately: true, immediately: true,
}, },
}); });
@ -89,7 +90,11 @@ export class RelayerQueueHelper {
if (!this._queue) { if (!this._queue) {
this._queue = new Queue<RelayerJobData, RelayerJobReturn, RelayerJobType>(this.config.queueName, { this._queue = new Queue<RelayerJobData, RelayerJobReturn, RelayerJobType>(this.config.queueName, {
connection: this.store.client, connection: this.store.client,
defaultJobOptions: { stackTraceLimit: 100 }, defaultJobOptions: {
stackTraceLimit: 100,
attempts: 3,
backoff: 1000,
},
}); });
} }
return this._queue; return this._queue;
@ -120,8 +125,9 @@ export class HealthQueueHelper {
private _queue: Queue<HealthJobData, HealthJobReturn, 'checkHealth'>; private _queue: Queue<HealthJobData, HealthJobReturn, 'checkHealth'>;
private _worker: Worker<HealthJobData, HealthJobReturn, 'checkHealth'>; private _worker: Worker<HealthJobData, HealthJobReturn, 'checkHealth'>;
private _scheduler: QueueScheduler; private _scheduler: QueueScheduler;
interval = 30000;
constructor(private store?: RedisStore, private config?: ConfigService) {} constructor(private store?: RedisStore) {}
get scheduler(): QueueScheduler { get scheduler(): QueueScheduler {
if (!this._scheduler) { if (!this._scheduler) {
@ -155,7 +161,7 @@ export class HealthQueueHelper {
async addRepeatable() { async addRepeatable() {
await this.queue.add('checkHealth', null, { await this.queue.add('checkHealth', null, {
repeat: { repeat: {
every: 30000, every: this.interval,
immediately: true, immediately: true,
}, },
}); });

@ -1,6 +1,19 @@
import { RelayerProcessor } from './index'; import { RelayerProcessor } from './index';
import { getTxService } from '../services'; import { getTxService } from '../services';
import { JobStatus } from '../types'; import { JobStatus } from '../types';
import { UnrecoverableError } from 'bullmq';
import { ExecutionError } from '../services/tx.service';
class RevertError extends UnrecoverableError {
code: string;
constructor(message: string, code: string) {
super(message);
this.name = this.constructor.name;
this.code = code;
Object.setPrototypeOf(this, new.target.prototype);
}
}
export const relayerProcessor: RelayerProcessor = async (job) => { export const relayerProcessor: RelayerProcessor = async (job) => {
try { try {
@ -13,7 +26,10 @@ export const relayerProcessor: RelayerProcessor = async (job) => {
const txData = await txService.prepareTxData(withdrawalData); const txData = await txService.prepareTxData(withdrawalData);
return await txService.sendTx(txData); return await txService.sendTx(txData);
} catch (e) { } catch (e) {
await job.update({ ...job.data, status: JobStatus.FAILED }); if (e instanceof ExecutionError && e.code === 'REVERTED') {
await job.update({ ...job.data, status: JobStatus.FAILED });
throw new RevertError(e.message, e.code);
}
throw e; throw e;
} }
}; };

@ -27,8 +27,9 @@ export const relayerWorker = async () => {
}); });
relayer.worker.on('failed', (job, error) => { relayer.worker.on('failed', (job, error) => {
healthService.saveError(error, job.id); healthService.saveError(error, job.id);
console.log(error); console.log(`Failed job ${job.id}: `, error);
}); });
relayer.scheduler.on('stalled', (jobId, prev) => console.log({ jobId, prev }));
}; };
export const healthWorker = async () => { export const healthWorker = async () => {
@ -42,6 +43,6 @@ export const healthWorker = async () => {
console.log(`Job ${job.name} completed with result: `, result); console.log(`Job ${job.name} completed with result: `, result);
}); });
health.worker.on('failed', (job, error) => { health.worker.on('failed', (job, error) => {
console.log(error); console.log(`Failed job ${job.id}: `, error);
}); });
}; };

@ -19,7 +19,7 @@ export type WithdrawalData = {
args: [BytesLike, BytesLike, string, string, BigNumberish, BigNumberish]; args: [BytesLike, BytesLike, string, string, BigNumberish, BigNumberish];
}; };
class ExecutionError extends Error { export class ExecutionError extends Error {
constructor(message: string, code?: string) { constructor(message: string, code?: string) {
super(message); super(message);
this.code = code; this.code = code;
@ -83,7 +83,9 @@ export class TxService {
}); });
if (receipt.status === 1) { if (receipt.status === 1) {
await this.updateJobData({ status: JobStatus.CONFIRMED }); await this.updateJobData({ status: JobStatus.CONFIRMED });
} else throw new ExecutionError('Submitted transaction failed', 'REVERTED'); } else {
throw new ExecutionError('Submitted transaction failed', 'REVERTED');
}
return receipt; return receipt;
} catch (e) { } catch (e) {
const regex = /body=("\{.*}}")/; const regex = /body=("\{.*}}")/;
@ -138,7 +140,9 @@ export class TxService {
async getGasPrice(): Promise<BigNumber> { async getGasPrice(): Promise<BigNumber> {
const gasPrices = await this.oracle.gasPrices(); const gasPrices = await this.oracle.gasPrices();
let gasPrice = gasPrices['fast']; let gasPrice = gasPrices['fast'];
if ('maxFeePerGas' in gasPrices) gasPrice = gasPrices['maxFeePerGas']; if ('maxFeePerGas' in gasPrices) {
gasPrice = gasPrices['maxFeePerGas'];
}
return parseUnits(String(gasPrice), 'gwei'); return parseUnits(String(gasPrice), 'gwei');
} }
} }