From 5f2cd8831f903d1d69f240a5eb33742af7bef1a0 Mon Sep 17 00:00:00 2001 From: nikdementev Date: Tue, 12 Oct 2021 12:37:24 +0300 Subject: [PATCH] fix: update job --- src/constants/variables.ts | 8 ++++++ src/modules/queue/base.processor.ts | 2 +- src/modules/queue/transaction.processor.ts | 29 +++++++++++----------- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/src/constants/variables.ts b/src/constants/variables.ts index a439f0c..d7bbc45 100644 --- a/src/constants/variables.ts +++ b/src/constants/variables.ts @@ -31,6 +31,14 @@ const numbers = { MERKLE_TREE_HEIGHT: 23, }; +export const jobStatus = { + ACCEPTED: 'ACCEPTED', + CONFIRMED: 'CONFIRMED', + FAILED: 'FAILED', + MINED: 'MINED', + SENT: 'SENT', +}; + const BG_ZERO = BigNumber.from(numbers.ZERO); const FIELD_SIZE = BigNumber.from('21888242871839275222246405745257275088548364400416034343698204186575808495617'); diff --git a/src/modules/queue/base.processor.ts b/src/modules/queue/base.processor.ts index 413407f..a223d61 100644 --- a/src/modules/queue/base.processor.ts +++ b/src/modules/queue/base.processor.ts @@ -53,7 +53,7 @@ export class BaseProcessor implements OnModuleDestroy { return this.updateTask(job); } - private async updateTask(job: Job) { + protected async updateTask(job: Job) { const currentJob = await this.queue.getJob(job.id); await currentJob.update(job.data); } diff --git a/src/modules/queue/transaction.processor.ts b/src/modules/queue/transaction.processor.ts index 551521d..c800d15 100644 --- a/src/modules/queue/transaction.processor.ts +++ b/src/modules/queue/transaction.processor.ts @@ -6,13 +6,14 @@ import { Injectable } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { InjectQueue, Process, Processor, OnQueueActive, OnQueueCompleted, OnQueueFailed } from '@nestjs/bull'; -import { numbers, CONTRACT_ERRORS } from '@/constants'; +import { Transaction } from '@/types'; +import { numbers, CONTRACT_ERRORS, jobStatus } from '@/constants'; import { getToIntegerMultiplier } from '@/utilities'; import { GasPriceService, ProviderService } from '@/services'; import txMangerConfig from '@/config/txManager.config'; import { BaseProcessor } from './base.processor'; -import { Transaction } from '@/types'; + @Injectable() @Processor('transaction') export class TransactionProcessor extends BaseProcessor { @@ -43,22 +44,20 @@ export class TransactionProcessor extends BaseProcessor { @OnQueueActive() async onActive(job: Job) { - job.data.status = 'ACCEPTED'; - - await job.update(job.data); + job.data.status = jobStatus.ACCEPTED; + await this.updateTask(job); } @OnQueueCompleted() async onCompleted(job: Job) { - job.data.status = 'CONFIRMED'; - await job.update(job.data); + job.data.status = jobStatus.CONFIRMED; + await this.updateTask(job); } @OnQueueFailed() async onFailed(job: Job) { - job.data.status = 'FAILED'; - - await job.update(job.data); + job.data.status = jobStatus.FAILED; + await this.updateTask(job); } async submitTx(job: Job) { @@ -72,19 +71,19 @@ export class TransactionProcessor extends BaseProcessor { .send() .on('transactionHash', async (txHash: string) => { job.data.txHash = txHash; - job.data.status = 'SENT'; + job.data.status = jobStatus.SENT; - await job.update(job.data); + await this.updateTask(job); }) .on('mined', async () => { - job.data.status = 'MINED'; + job.data.status = jobStatus.MINED; - await job.update(job.data); + await this.updateTask(job); }) .on('confirmations', async (confirmations) => { job.data.confirmations = confirmations; - await job.update(job.data); + await this.updateTask(job); }); if (BigNumber.from(receipt.status).eq(1)) {