fix: update job

This commit is contained in:
nikdementev 2021-10-12 12:37:24 +03:00
parent 8d735f4a97
commit 5f2cd8831f
No known key found for this signature in database
GPG Key ID: 769B05D57CF16FE2
3 changed files with 23 additions and 16 deletions

@ -31,6 +31,14 @@ const numbers = {
MERKLE_TREE_HEIGHT: 23,
};
export const jobStatus = {
ACCEPTED: 'ACCEPTED',
CONFIRMED: 'CONFIRMED',
FAILED: 'FAILED',
MINED: 'MINED',
SENT: 'SENT',
};
const BG_ZERO = BigNumber.from(numbers.ZERO);
const FIELD_SIZE = BigNumber.from('21888242871839275222246405745257275088548364400416034343698204186575808495617');

@ -53,7 +53,7 @@ export class BaseProcessor<T = object> implements OnModuleDestroy {
return this.updateTask(job);
}
private async updateTask(job: Job<T>) {
protected async updateTask(job: Job<T>) {
const currentJob = await this.queue.getJob(job.id);
await currentJob.update(job.data);
}

@ -6,13 +6,14 @@ import { Injectable } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { InjectQueue, Process, Processor, OnQueueActive, OnQueueCompleted, OnQueueFailed } from '@nestjs/bull';
import { numbers, CONTRACT_ERRORS } from '@/constants';
import { Transaction } from '@/types';
import { numbers, CONTRACT_ERRORS, jobStatus } from '@/constants';
import { getToIntegerMultiplier } from '@/utilities';
import { GasPriceService, ProviderService } from '@/services';
import txMangerConfig from '@/config/txManager.config';
import { BaseProcessor } from './base.processor';
import { Transaction } from '@/types';
@Injectable()
@Processor('transaction')
export class TransactionProcessor extends BaseProcessor<Transaction> {
@ -43,22 +44,20 @@ export class TransactionProcessor extends BaseProcessor<Transaction> {
@OnQueueActive()
async onActive(job: Job) {
job.data.status = 'ACCEPTED';
await job.update(job.data);
job.data.status = jobStatus.ACCEPTED;
await this.updateTask(job);
}
@OnQueueCompleted()
async onCompleted(job: Job) {
job.data.status = 'CONFIRMED';
await job.update(job.data);
job.data.status = jobStatus.CONFIRMED;
await this.updateTask(job);
}
@OnQueueFailed()
async onFailed(job: Job) {
job.data.status = 'FAILED';
await job.update(job.data);
job.data.status = jobStatus.FAILED;
await this.updateTask(job);
}
async submitTx(job: Job<Transaction>) {
@ -72,19 +71,19 @@ export class TransactionProcessor extends BaseProcessor<Transaction> {
.send()
.on('transactionHash', async (txHash: string) => {
job.data.txHash = txHash;
job.data.status = 'SENT';
job.data.status = jobStatus.SENT;
await job.update(job.data);
await this.updateTask(job);
})
.on('mined', async () => {
job.data.status = 'MINED';
job.data.status = jobStatus.MINED;
await job.update(job.data);
await this.updateTask(job);
})
.on('confirmations', async (confirmations) => {
job.data.confirmations = confirmations;
await job.update(job.data);
await this.updateTask(job);
});
if (BigNumber.from(receipt.status).eq(1)) {