WIP monitoring and alerts

This commit is contained in:
smart_ex 2022-06-08 14:42:54 +10:00
parent 3a74ebf90e
commit 03f9bfac45
20 changed files with 314 additions and 123 deletions

@ -22,3 +22,6 @@ CONFIRMATIONS=4
MAX_GAS_PRICE=1000
BASE_FEE_RESERVE_PERCENTAGE=25
AGGREGATOR=0x8cb1436F64a3c33aD17bb42F94e255c4c0E871b2
# Telegram bot alerts
TELEGRAM_NOTIFIER_BOT_TOKEN=
TELEGRAM_NOTIFIER_CHAT_ID=

@ -2,7 +2,7 @@ version: '2'
services:
server:
image: tornadocash/relayer:mining
image: tornadocash/relayer
restart: always
command: server
env_file: .env
@ -11,35 +11,8 @@ services:
nginx_proxy_read_timeout: 600
depends_on: [ redis ]
treeWatcher:
image: tornadocash/relayer:mining
restart: always
command: treeWatcher
env_file: .env
environment:
REDIS_URL: redis://redis/0
depends_on: [redis]
priceWatcher:
image: tornadocash/relayer:mining
restart: always
command: priceWatcher
env_file: .env
environment:
REDIS_URL: redis://redis/0
depends_on: [redis]
healthWatcher:
image: tornadocash/relayer:mining
restart: always
command: healthWatcher
env_file: .env
environment:
REDIS_URL: redis://redis/0
depends_on: [redis]
worker1:
image: tornadocash/relayer:mining
image: tornadocash/relayer
restart: always
command: worker
env_file: .env
@ -92,28 +65,21 @@ services:
# TELEGRAM_NOTIFIER_BOT_TOKEN: ...
# TELEGRAM_NOTIFIER_CHAT_ID: ...
# # this container will send Telegram notifications if specified address doesn't have enough funds
# monitor_mainnet:
# image: peppersec/monitor_eth
# restart: always
# environment:
# TELEGRAM_NOTIFIER_BOT_TOKEN: ...
# TELEGRAM_NOTIFIER_CHAT_ID: ...
# ADDRESS: '0x0000000000000000000000000000000000000000'
# THRESHOLD: 0.5 # ETH
# RPC_URL: https://mainnet.infura.io
# BLOCK_EXPLORER: etherscan.io
redis:
image: redis
restart: always
command: [redis-server, --appendonly, 'yes']
command: [ redis-server, '/usr/local/etc/redis/redis.conf', --appendonly, 'yes', ]
ports:
- '6379:6379'
volumes:
- ./redis.conf:/usr/local/etc/redis/redis.conf
- redis:/data
nginx:
image: nginx:alpine
container_name: nginx
# container_name: nginx
restart: always
ports:
- 80:80

5
redis.conf Normal file

@ -0,0 +1,5 @@
timeout 0
tcp-keepalive 0
databases 1
save 60 100000
notify-keyspace-events KAE

@ -3,7 +3,7 @@ import createServer from './server';
import { utils } from 'ethers';
import { port, rewardAccount } from '../config';
import { version } from '../../package.json';
import { configService, getJobService } from '../services';
import { configService, getJobService, getNotifierService } from '../services';
if (!utils.isAddress(rewardAccount)) {
@ -14,6 +14,8 @@ server.listen(port, '0.0.0.0', async (err, address) => {
if (err) throw err;
await configService.init();
await getJobService().setupRepeatableJobs();
await getNotifierService().subscribe();
console.log(`Relayer ${version} started on port ${address}`);
});

@ -2,14 +2,14 @@ import { FastifyInstance } from 'fastify';
import { jobsSchema, statusSchema, withdrawBodySchema, withdrawSchema } from './schema';
import { FromSchema } from 'json-schema-to-ts';
import { rewardAccount, tornadoServiceFee } from '../config';
import { version } from '../../package.json';
import { configService, getJobService, getPriceService } from '../services';
import { configService, getHealthService, getJobService, getPriceService } from '../services';
import { RelayerJobType } from '../types';
export function mainHandler(server: FastifyInstance, options, next) {
const jobService = getJobService();
const priceService = getPriceService();
const healthService = getHealthService();
server.get('/',
async (req, res) => {
@ -23,6 +23,7 @@ export function mainHandler(server: FastifyInstance, options, next) {
async (req, res) => {
const ethPrices = await priceService.getPrices();
const currentQueue = await jobService.getQueueCount();
const errorsLog = await healthService.getErrors();
console.log(currentQueue);
res.send({
rewardAccount,
@ -31,10 +32,11 @@ export function mainHandler(server: FastifyInstance, options, next) {
ethPrices,
tornadoServiceFee,
miningServiceFee: 0,
version,
version: '4.5.0',
health: {
status: true,
status: 'true',
error: '',
errorsLog
},
currentQueue,
});

@ -25,7 +25,7 @@ export const gasLimits = {
[RelayerJobType.MINING_WITHDRAW]: 400000,
};
export const minimumBalance = '1000000000000000000';
export const minimumTornBalance = '50000000000000000000';
export const minimumTornBalance = '30000000000000000000';
export const baseFeeReserve = Number(process.env.BASE_FEE_RESERVE_PERCENTAGE);
export const tornToken = {
tokenAddress: '0x77777FeDdddFfC19Ff86DB637967013e6C6A116C',

@ -6,6 +6,13 @@ const getNewInstance: () => Redis = () => new IORedis(redisUrl, { maxRetriesPerR
@singleton()
export class RedisStore {
get publisher(): Redis {
if (!this._publisher) {
this._publisher = getNewInstance();
}
return this._publisher;
}
get client() {
if (!this._client) {
this._client = getNewInstance();
@ -20,8 +27,9 @@ export class RedisStore {
return this._subscriber;
}
_subscriber: Redis;
_client: Redis;
private _subscriber: Redis;
private _publisher: Redis;
private _client: Redis;
}

@ -0,0 +1,8 @@
import { getHealthService } from '../services';
import { Processor } from 'bullmq';
export const healthProcessor: Processor = async () => {
const healthService = getHealthService();
await healthService.check();
}
;

@ -1,16 +1,18 @@
import { Processor, Queue, QueueScheduler, Worker } from 'bullmq';
import { JobStatus, RelayerJobType, Token } from '../types';
import { WithdrawalData } from '../services/tx.service';
import { BigNumber } from 'ethers';
import { priceProcessor } from './price.processor';
import { autoInjectable } from 'tsyringe';
import { RedisStore } from '../modules/redis';
import { ConfigService } from '../services/config.service';
import { relayerProcessor } from './relayer.processor';
import { healthProcessor } from './health.processor';
type PriceJobData = Token[]
type PriceJobReturn = number
type HealthJobReturn = { balance: BigNumber, isEnought: boolean }
type HealthJobReturn = void
type HealthJobData = null
export type RelayerJobData =
WithdrawalData
@ -109,4 +111,51 @@ export class RelayerQueueHelper {
}
@autoInjectable()
export class HealthQueueHelper {
private _queue: Queue<HealthJobData, HealthJobReturn, 'checkHealth'>;
private _worker: Worker<HealthJobData, HealthJobReturn, 'checkHealth'>;
private _scheduler: QueueScheduler;
constructor(private store?: RedisStore, private config?: ConfigService) {
}
get scheduler(): QueueScheduler {
if (!this._scheduler) {
this._scheduler = new QueueScheduler('health', { connection: this.store.client });
}
return this._scheduler;
}
get worker() {
if (!this._worker) {
this._worker = new Worker<HealthJobData, HealthJobReturn, 'checkHealth'>('health', healthProcessor, {
connection: this.store.client,
concurrency: 1,
});
}
return this._worker;
}
get queue() {
if (!this._queue) {
this._queue = new Queue<HealthJobData, HealthJobReturn, 'checkHealth'>('health', {
connection: this.store.client,
defaultJobOptions: { stackTraceLimit: 100 },
});
}
return this._queue;
}
async addRepeatable() {
await this.queue.add('checkHealth', null, {
repeat: {
every: 30000,
immediately: true,
},
});
}
}

@ -4,5 +4,7 @@ import { PriceProcessor } from './index';
export const priceProcessor: PriceProcessor = async (job) => {
const priceService = getPriceService();
const result = await priceService.fetchPrices(job.data);
return await priceService.savePrices(result);
if (result) return await priceService.savePrices(result);
return null;
};

@ -1,6 +0,0 @@
import { configService } from '../services';
import { Processor } from 'bullmq';
export const checkBalance: Processor = async (job) => {
return await configService.getBalance();
};

@ -1,11 +1,12 @@
import 'reflect-metadata';
import { PriceQueueHelper, RelayerQueueHelper } from './';
import { configService } from '../services';
import { HealthQueueHelper, PriceQueueHelper, RelayerQueueHelper } from './';
import { configService, getHealthService } from '../services';
export const schedulerWorker = async () => {
export const priceWorker = async () => {
await configService.init();
const price = new PriceQueueHelper();
price.scheduler.on('stalled', (jobId, prev) => console.log({ jobId, prev }));
console.log('price worker', price.queue.name);
price.worker.on('active', () => console.log('worker active'));
price.worker.on('completed', async (job, result) => {
@ -17,9 +18,24 @@ export const schedulerWorker = async () => {
export const relayerWorker = async () => {
await configService.init();
const relayer = new RelayerQueueHelper();
const healthService = getHealthService();
console.log(relayer.queue.name, 'worker started');
relayer.worker.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result: `, result);
});
relayer.worker.on('failed', (job, error) => console.log(error));
relayer.worker.on('failed', (job, error) => {
healthService.saveError(error);
console.log(error);
});
};
export const healthWorker = async () => {
await configService.init();
const health = new HealthQueueHelper();
health.scheduler.on('stalled', (jobId, prev) => console.log({ jobId, prev }));
console.log(health.queue.name, 'worker started');
health.worker.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result: `, result);
});
health.worker.on('failed', (job, error) => console.log(error));
};

15
src/sandbox.ts Normal file

@ -0,0 +1,15 @@
import 'reflect-metadata';
import { configService, getHealthService } from './services';
(async () => {
try {
await configService.init();
const healthService = getHealthService();
console.log(healthService);
} catch (e) {
console.error('Top level catch', e);
}
})();

@ -20,11 +20,10 @@ import {
import { resolve } from '../modules';
import { ERC20Abi, ProxyLightABI, TornadoProxyABI } from '../../contracts';
import { availableIds, netIds, NetInstances } from '../../../torn-token';
import { formatEther, getAddress } from 'ethers/lib/utils';
import { providers, Wallet } from 'ethers';
import { getAddress } from 'ethers/lib/utils';
import { BigNumber, providers, Wallet } from 'ethers';
import { container, singleton } from 'tsyringe';
import { GasPrice } from 'gas-price-oracle/lib/types';
import { configService } from './index';
type relayerQueueName = `relayer_${availableIds}`
@ -48,6 +47,7 @@ export class ConfigService {
fallbackGasPrices: GasPrice;
private _tokenAddress: string;
private _tokenContract: ERC20Abi;
balances: { MAIN: { warn: string; critical: string; }; TORN: { warn: string; critical: string; }; };
constructor() {
@ -57,6 +57,10 @@ export class ConfigService {
this.instances = instances[this.netIdKey];
this.provider = getProvider(false);
this.wallet = new Wallet(this.privateKey, this.provider);
this.balances = {
MAIN: { warn: BigNumber.from(minimumBalance).mul(150).div(100).toString(), critical: minimumBalance },
TORN: { warn: BigNumber.from(minimumTornBalance).mul(2).toString(), critical: minimumTornBalance },
};
this._fillInstanceMap();
}
@ -64,21 +68,19 @@ export class ConfigService {
return this._proxyContract;
}
get tokenContract(): ERC20Abi {
return this._tokenContract;
}
private _fillInstanceMap() {
if (!this.instances) throw new Error('config mismatch, check your environment variables');
// TODO
for (const [currency, { instanceAddress, symbol, decimals }] of Object.entries(this.instances)) {
Object.entries(instanceAddress).forEach(([amount, address]) => {
if (address) {
this.addressMap.set(getAddress(address), {
currency,
amount,
symbol,
decimals,
for (const [amount, address] of Object.entries(instanceAddress)) {
if (address) this.addressMap.set(getAddress(address), {
currency, amount, symbol, decimals,
});
}
},
);
}
}
@ -104,6 +106,7 @@ export class ConfigService {
this.fallbackGasPrices = gasPrices;
} else {
this._proxyAddress = tornadoGoerliProxy;
this.nativeCurrency = 'eth';
if (this.netId === 1) {
this._proxyAddress = await resolve(torn.tornadoRouter.address);
}
@ -117,18 +120,12 @@ export class ConfigService {
decimals: el.decimals,
symbol: el.symbol,
})).filter(Boolean);
const { balance } = await configService.getBalance();
const { balance: tornBalance } = await configService.getTornBalance();
console.log(
'Configuration completed\n',
`-- netId: ${this.netId}\n`,
`-- rpcUrl: ${this.rpcUrl}\n`,
`-- relayer Address: ${this.wallet.address}\n`,
`-- relayer Balance: ${formatEther(balance)}\n`,
`-- relayer Torn balance: ${formatEther(tornBalance)}\n`,
);
this.isInit = true;
} catch (e) {
console.error(`${this.constructor.name} Error:`, e.message);
@ -139,18 +136,6 @@ export class ConfigService {
return this.addressMap.get(getAddress(address));
}
async getBalance() {
const balance = await this.wallet.getBalance();
const isEnougth = balance.gt(minimumBalance);
return { balance, isEnougth };
}
async getTornBalance() {
const balance = await this._tokenContract.balanceOf(this.wallet.address);
const isEnougth = balance.gt(minimumTornBalance);
return { balance, isEnougth };
}
}
type InstanceProps = {

@ -0,0 +1,70 @@
import { autoInjectable, container } from 'tsyringe';
import { ConfigService } from './config.service';
import { RedisStore } from '../modules/redis';
import { formatEther } from 'ethers/lib/utils';
@autoInjectable()
export class HealthService {
constructor(private config: ConfigService, private store: RedisStore) {
}
async clearErrors() {
await this.store.client.del('errors');
}
async getErrors(): Promise<{ message: string, score: number }[]> {
const set = await this.store.client.zrevrange('errors', 0, -1, 'WITHSCORES');
const errors = [];
while (set.length) {
const [message, score] = set.splice(0, 2);
errors.push({ message, score });
}
return errors;
}
async saveError(e) {
await this.store.client.zadd('errors', 'INCR', 1, e.message);
}
private async _checkBalance(value, currency: 'MAIN' | 'TORN') {
let level = 'OK';
const type = 'BALANCE';
const key = 'alerts';
const time = new Date().getTime();
if (value.lt(this.config.balances[currency].critical)) {
level = 'CRITICAL';
} else if (value.lt(this.config.balances[currency].warn)) {
level = 'WARN';
}
const isSent = await this.store.client.sismember(`${key}:sent`, `${type}_${currency}_${level}`);
if (!isSent) {
const alert = {
type: `${type}_${currency}_${level}`,
message: `Insufficient balance ${formatEther(value)} ${currency === 'MAIN' ? this.config.nativeCurrency : 'torn'}`,
level,
time,
};
await this.store.client.rpush(key, JSON.stringify(alert));
}
}
async check() {
const mainBalance = await this.config.wallet.getBalance();
const tornBalance = await this.config.tokenContract.balanceOf(this.config.wallet.address);
// const mainBalance = BigNumber.from(`${2e18}`).add(1);
// const tornBalance = BigNumber.from(`${50e18}`);
await this._checkBalance(mainBalance, 'MAIN');
await this._checkBalance(tornBalance, 'TORN');
}
}
type HealthData = {
status: boolean,
error: string,
errorsLog: { message: string, score: number }[]
}
export default () => container.resolve(HealthService);

@ -2,3 +2,6 @@ export { default as configService } from './config.service';
export { default as getPriceService } from './price.service';
export { default as getJobService } from './job.service';
export { default as getTxService } from './tx.service';
export { default as getNotifierService } from './notifier.service';
export { default as getHealthService } from './health.service';

@ -1,13 +1,16 @@
import { v4 } from 'uuid';
import { JobStatus, RelayerJobType } from '../types';
import { PriceQueueHelper, RelayerQueueHelper } from '../queue';
import { HealthQueueHelper, PriceQueueHelper, RelayerQueueHelper } from '../queue';
import { WithdrawalData } from './tx.service';
import { container, injectable } from 'tsyringe';
import { ConfigService } from './config.service';
@injectable()
export class JobService {
constructor(private price?: PriceQueueHelper, private relayer?: RelayerQueueHelper, public config?: ConfigService) {
constructor(private price?: PriceQueueHelper,
private relayer?: RelayerQueueHelper,
private health?: HealthQueueHelper,
public config?: ConfigService) {
}
async postJob(type: RelayerJobType, data: WithdrawalData) {
@ -35,13 +38,20 @@ export class JobService {
}
private async _clearSchedulerJobs() {
const jobs = await this.price.queue.getJobs();
await Promise.all(jobs.map(job => job.remove()));
try {
const jobs = await Promise.all([this.price.queue.getJobs(), this.health.queue.getJobs()]);
await Promise.all(jobs.flat().map(job => job?.remove()));
} catch (e) {
console.log(e);
}
}
async setupRepeatableJobs() {
await this._clearSchedulerJobs();
await this.price.addRepeatable(this.config.tokens);
await this.health.addRepeatable();
// await this.schedulerQ.add('checkBalance', null, {
// repeat: {
// every: 30000,

@ -1,5 +1,23 @@
import { Telegram } from 'telegraf';
import { autoInjectable } from 'tsyringe';
import { autoInjectable, container } from 'tsyringe';
import { RedisStore } from '../modules/redis';
export type Levels = keyof typeof AlertLevel
export enum AlertLevel {
'INFO' = '',
'WARN' = '⚠️',
'CRITICAL' = '‼️',
'ERROR' = '💩',
'RECOVERED' = '✅'
}
export enum AlertType {
'INSUFFICIENT_BALANCE',
'INSUFFICIENT_TORN_BALANCE',
'RPC'
}
@autoInjectable()
export class NotifierService {
@ -7,16 +25,44 @@ export class NotifierService {
private readonly token: string;
private readonly chatId: string;
constructor() {
constructor(private store: RedisStore) {
this.token = process.env.TELEGRAM_NOTIFIER_BOT_TOKEN || '';
this.chatId = process.env.TELEGRAM_NOTIFIER_CHAT_ID || '';
this.telegram = new Telegram(this.token);
}
send(message: string) {
async processAlert(message: string) {
const alert = JSON.parse(message);
const [a, b] = alert.type.split('_');
if (alert.level === 'OK') {
this.store.client.srem('alerts:sent', ...['WARN', 'CRITICAL'].map(l => `${a}_${b}_${l}`));
} else {
await this.send(alert.message, alert.level);
this.store.client.sadd('alerts:sent', alert.type);
}
}
async subscribe() {
const sub = await this.store.subscriber;
sub.subscribe('__keyspace@0__:alerts', 'rpush');
sub.on('message', async (channel, event) => {
if (event === 'rpush') {
const messages = await this.store.client.brpop('alerts', 10);
while (messages.length) {
const [, message] = messages.splice(0, 2);
await this.processAlert(message);
}
}
});
}
send(message: string, level: Levels) {
const text = `${AlertLevel[level]} ${message}`;
console.log('sending message: ', text);
return this.telegram.sendMessage(
this.chatId,
message,
text,
{ parse_mode: 'HTML' },
);
}
@ -32,3 +78,5 @@ export class NotifierService {
return this.telegram.getMe();
}
}
export default () => container.resolve(NotifierService);

@ -27,6 +27,7 @@ export class PriceService {
}
async fetchPrices(tokens: Token[]) {
try {
const names = tokens.reduce((p, c) => {
p[c.address] = c.symbol.toLowerCase();
return p;
@ -45,6 +46,9 @@ export class PriceService {
prices[names[tokens[i].address]] = price.toString();
}
return prices;
} catch (e) {
console.log(e);
}
}
async getPrice(currency: string) {

@ -1,4 +1,5 @@
import { schedulerWorker, relayerWorker } from './queue/worker';
import { priceWorker, relayerWorker, healthWorker } from './queue/worker';
schedulerWorker();
priceWorker();
relayerWorker();
healthWorker();