WIP clear redis store on init. Notifier state

This commit is contained in:
smart_ex 2022-06-09 14:12:46 +10:00
parent 03f9bfac45
commit ec2f20bfaf
6 changed files with 27 additions and 27 deletions

@ -13,6 +13,7 @@ const server = createServer();
server.listen(port, '0.0.0.0', async (err, address) => {
if (err) throw err;
await configService.init();
await configService.clearRedisState();
await getJobService().setupRepeatableJobs();
await getNotifierService().subscribe();

@ -6,11 +6,12 @@ import {
TornadoProxyABI__factory,
} from '../../contracts';
import { providers } from 'ethers';
import { multiCallAddress, netId, offchainOracleAddress, mainnetRpcUrl, rpcUrl } from '../config';
import { mainnetRpcUrl, multiCallAddress, netId, offchainOracleAddress, rpcUrl } from '../config';
export function getProvider(isStatic = true, customRpcUrl?: string, chainId = netId) {
if (isStatic) return new providers.StaticJsonRpcProvider(customRpcUrl || rpcUrl, chainId);
else return new providers.JsonRpcProvider(customRpcUrl || rpcUrl, chainId);
const url = customRpcUrl || rpcUrl;
if (isStatic) return new providers.StaticJsonRpcProvider(url, chainId);
else return new providers.JsonRpcProvider(url, chainId);
}
@ -33,6 +34,3 @@ export const getMultiCallContract = () => {
export const getTornTokenContract = (tokenAddress: string) => {
return ERC20Abi__factory.connect(tokenAddress, getProvider(true, mainnetRpcUrl));
};
// export const getAggregatorContract = () => {
// return AggregatorAbi__factory.connect(aggregatorAddress, getProvider());
// };

@ -24,6 +24,7 @@ import { getAddress } from 'ethers/lib/utils';
import { BigNumber, providers, Wallet } from 'ethers';
import { container, singleton } from 'tsyringe';
import { GasPrice } from 'gas-price-oracle/lib/types';
import { RedisStore } from '../modules/redis';
type relayerQueueName = `relayer_${availableIds}`
@ -50,7 +51,7 @@ export class ConfigService {
balances: { MAIN: { warn: string; critical: string; }; TORN: { warn: string; critical: string; }; };
constructor() {
constructor(private store: RedisStore) {
this.netIdKey = `netId${this.netId}`;
this.queueName = `relayer_${this.netId}`;
this.isLightMode = ![1, 5].includes(netId);
@ -132,6 +133,11 @@ export class ConfigService {
}
}
async clearRedisState() {
const queueKeys = (await this.store.client.keys('bull:*')).filter(s => s.indexOf('relayer') === -1);
await this.store.client.del(queueKeys);
}
getInstance(address: string) {
return this.addressMap.get(getAddress(address));
}

@ -29,7 +29,7 @@ export class HealthService {
private async _checkBalance(value, currency: 'MAIN' | 'TORN') {
let level = 'OK';
const type = 'BALANCE';
const key = 'alerts';
const key = 'alerts:list';
const time = new Date().getTime();
if (value.lt(this.config.balances[currency].critical)) {
level = 'CRITICAL';

@ -49,15 +49,8 @@ export class JobService {
}
async setupRepeatableJobs() {
await this._clearSchedulerJobs();
await this.price.addRepeatable(this.config.tokens);
await this.health.addRepeatable();
// await this.schedulerQ.add('checkBalance', null, {
// repeat: {
// every: 30000,
// immediately: true,
// },
// });
}
}

@ -34,25 +34,27 @@ export class NotifierService {
async processAlert(message: string) {
const alert = JSON.parse(message);
const [a, b] = alert.type.split('_');
if (alert.level === 'OK') {
this.store.client.srem('alerts:sent', ...['WARN', 'CRITICAL'].map(l => `${a}_${b}_${l}`));
} else {
await this.send(alert.message, alert.level);
this.store.client.sadd('alerts:sent', alert.type);
const [a, b, c] = alert.type.split('_');
const isSent = await this.store.client.sismember('alerts:sent', `${a}_${b}_${c}`);
if (!isSent) {
if (alert.level === 'OK') {
this.store.client.srem('alerts:sent', ...['WARN', 'CRITICAL'].map(c => `${a}_${b}_${c}`));
} else {
await this.send(alert.message, alert.level);
this.store.client.sadd('alerts:sent', alert.type);
}
}
}
async subscribe() {
const sub = await this.store.subscriber;
sub.subscribe('__keyspace@0__:alerts', 'rpush');
sub.subscribe('__keyspace@0__:alerts:list', 'rpush');
sub.on('message', async (channel, event) => {
if (event === 'rpush') {
const messages = await this.store.client.brpop('alerts', 10);
while (messages.length) {
const [, message] = messages.splice(0, 2);
await this.processAlert(message);
}
const messages = await this.store.client.rpop('alerts:list', 10);
messages?.forEach(message => {
this.processAlert(message);
});
}
});
}