This commit is contained in:
smart_ex 2022-06-29 20:03:25 +10:00
parent bd0f8d2a2e
commit e997d4d07e
9 changed files with 193 additions and 101 deletions

@ -1,10 +1,26 @@
[ [
{ {
"inputs": [ "inputs": [
{ "internalType": "contract MultiWrapper", "name": "_multiWrapper", "type": "address" }, {
{ "internalType": "contract IOracle[]", "name": "existingOracles", "type": "address[]" }, "internalType": "contract MultiWrapper",
{ "internalType": "enum OffchainOracle.OracleType[]", "name": "oracleTypes", "type": "uint8[]" }, "name": "_multiWrapper",
{ "internalType": "contract IERC20[]", "name": "existingConnectors", "type": "address[]" }, "type": "address"
},
{
"internalType": "contract IOracle[]",
"name": "existingOracles",
"type": "address[]"
},
{
"internalType": "enum OffchainOracle.OracleType[]",
"name": "oracleTypes",
"type": "uint8[]"
},
{
"internalType": "contract IERC20[]",
"name": "existingConnectors",
"type": "address[]"
},
{ "internalType": "contract IERC20", "name": "wBase", "type": "address" } { "internalType": "contract IERC20", "name": "wBase", "type": "address" }
], ],
"stateMutability": "nonpayable", "stateMutability": "nonpayable",
@ -13,7 +29,12 @@
{ {
"anonymous": false, "anonymous": false,
"inputs": [ "inputs": [
{ "indexed": false, "internalType": "contract IERC20", "name": "connector", "type": "address" } {
"indexed": false,
"internalType": "contract IERC20",
"name": "connector",
"type": "address"
}
], ],
"name": "ConnectorAdded", "name": "ConnectorAdded",
"type": "event" "type": "event"
@ -21,7 +42,12 @@
{ {
"anonymous": false, "anonymous": false,
"inputs": [ "inputs": [
{ "indexed": false, "internalType": "contract IERC20", "name": "connector", "type": "address" } {
"indexed": false,
"internalType": "contract IERC20",
"name": "connector",
"type": "address"
}
], ],
"name": "ConnectorRemoved", "name": "ConnectorRemoved",
"type": "event" "type": "event"
@ -29,7 +55,12 @@
{ {
"anonymous": false, "anonymous": false,
"inputs": [ "inputs": [
{ "indexed": false, "internalType": "contract MultiWrapper", "name": "multiWrapper", "type": "address" } {
"indexed": false,
"internalType": "contract MultiWrapper",
"name": "multiWrapper",
"type": "address"
}
], ],
"name": "MultiWrapperUpdated", "name": "MultiWrapperUpdated",
"type": "event" "type": "event"
@ -37,7 +68,12 @@
{ {
"anonymous": false, "anonymous": false,
"inputs": [ "inputs": [
{ "indexed": false, "internalType": "contract IOracle", "name": "oracle", "type": "address" }, {
"indexed": false,
"internalType": "contract IOracle",
"name": "oracle",
"type": "address"
},
{ {
"indexed": false, "indexed": false,
"internalType": "enum OffchainOracle.OracleType", "internalType": "enum OffchainOracle.OracleType",
@ -51,7 +87,12 @@
{ {
"anonymous": false, "anonymous": false,
"inputs": [ "inputs": [
{ "indexed": false, "internalType": "contract IOracle", "name": "oracle", "type": "address" }, {
"indexed": false,
"internalType": "contract IOracle",
"name": "oracle",
"type": "address"
},
{ {
"indexed": false, "indexed": false,
"internalType": "enum OffchainOracle.OracleType", "internalType": "enum OffchainOracle.OracleType",
@ -65,14 +106,30 @@
{ {
"anonymous": false, "anonymous": false,
"inputs": [ "inputs": [
{ "indexed": true, "internalType": "address", "name": "previousOwner", "type": "address" }, {
{ "indexed": true, "internalType": "address", "name": "newOwner", "type": "address" } "indexed": true,
"internalType": "address",
"name": "previousOwner",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "newOwner",
"type": "address"
}
], ],
"name": "OwnershipTransferred", "name": "OwnershipTransferred",
"type": "event" "type": "event"
}, },
{ {
"inputs": [{ "internalType": "contract IERC20", "name": "connector", "type": "address" }], "inputs": [
{
"internalType": "contract IERC20",
"name": "connector",
"type": "address"
}
],
"name": "addConnector", "name": "addConnector",
"outputs": [], "outputs": [],
"stateMutability": "nonpayable", "stateMutability": "nonpayable",
@ -80,8 +137,16 @@
}, },
{ {
"inputs": [ "inputs": [
{ "internalType": "contract IOracle", "name": "oracle", "type": "address" }, {
{ "internalType": "enum OffchainOracle.OracleType", "name": "oracleKind", "type": "uint8" } "internalType": "contract IOracle",
"name": "oracle",
"type": "address"
},
{
"internalType": "enum OffchainOracle.OracleType",
"name": "oracleKind",
"type": "uint8"
}
], ],
"name": "addOracle", "name": "addOracle",
"outputs": [], "outputs": [],
@ -91,14 +156,28 @@
{ {
"inputs": [], "inputs": [],
"name": "connectors", "name": "connectors",
"outputs": [{ "internalType": "contract IERC20[]", "name": "allConnectors", "type": "address[]" }], "outputs": [
{
"internalType": "contract IERC20[]",
"name": "allConnectors",
"type": "address[]"
}
],
"stateMutability": "view", "stateMutability": "view",
"type": "function" "type": "function"
}, },
{ {
"inputs": [ "inputs": [
{ "internalType": "contract IERC20", "name": "srcToken", "type": "address" }, {
{ "internalType": "contract IERC20", "name": "dstToken", "type": "address" }, "internalType": "contract IERC20",
"name": "srcToken",
"type": "address"
},
{
"internalType": "contract IERC20",
"name": "dstToken",
"type": "address"
},
{ "internalType": "bool", "name": "useWrappers", "type": "bool" } { "internalType": "bool", "name": "useWrappers", "type": "bool" }
], ],
"name": "getRate", "name": "getRate",
@ -108,7 +187,11 @@
}, },
{ {
"inputs": [ "inputs": [
{ "internalType": "contract IERC20", "name": "srcToken", "type": "address" }, {
"internalType": "contract IERC20",
"name": "srcToken",
"type": "address"
},
{ "internalType": "bool", "name": "useSrcWrappers", "type": "bool" } { "internalType": "bool", "name": "useSrcWrappers", "type": "bool" }
], ],
"name": "getRateToEth", "name": "getRateToEth",
@ -127,8 +210,16 @@
"inputs": [], "inputs": [],
"name": "oracles", "name": "oracles",
"outputs": [ "outputs": [
{ "internalType": "contract IOracle[]", "name": "allOracles", "type": "address[]" }, {
{ "internalType": "enum OffchainOracle.OracleType[]", "name": "oracleTypes", "type": "uint8[]" } "internalType": "contract IOracle[]",
"name": "allOracles",
"type": "address[]"
},
{
"internalType": "enum OffchainOracle.OracleType[]",
"name": "oracleTypes",
"type": "uint8[]"
}
], ],
"stateMutability": "view", "stateMutability": "view",
"type": "function" "type": "function"
@ -141,7 +232,13 @@
"type": "function" "type": "function"
}, },
{ {
"inputs": [{ "internalType": "contract IERC20", "name": "connector", "type": "address" }], "inputs": [
{
"internalType": "contract IERC20",
"name": "connector",
"type": "address"
}
],
"name": "removeConnector", "name": "removeConnector",
"outputs": [], "outputs": [],
"stateMutability": "nonpayable", "stateMutability": "nonpayable",
@ -149,8 +246,16 @@
}, },
{ {
"inputs": [ "inputs": [
{ "internalType": "contract IOracle", "name": "oracle", "type": "address" }, {
{ "internalType": "enum OffchainOracle.OracleType", "name": "oracleKind", "type": "uint8" } "internalType": "contract IOracle",
"name": "oracle",
"type": "address"
},
{
"internalType": "enum OffchainOracle.OracleType",
"name": "oracleKind",
"type": "uint8"
}
], ],
"name": "removeOracle", "name": "removeOracle",
"outputs": [], "outputs": [],
@ -165,7 +270,13 @@
"type": "function" "type": "function"
}, },
{ {
"inputs": [{ "internalType": "contract MultiWrapper", "name": "_multiWrapper", "type": "address" }], "inputs": [
{
"internalType": "contract MultiWrapper",
"name": "_multiWrapper",
"type": "address"
}
],
"name": "setMultiWrapper", "name": "setMultiWrapper",
"outputs": [], "outputs": [],
"stateMutability": "nonpayable", "stateMutability": "nonpayable",

@ -9,7 +9,7 @@ services:
environment: environment:
REDIS_URL: redis://redis/0 REDIS_URL: redis://redis/0
nginx_proxy_read_timeout: 600 nginx_proxy_read_timeout: 600
depends_on: [ redis ] depends_on: [redis]
worker1: worker1:
image: tornadocash/relayer image: tornadocash/relayer
@ -18,7 +18,7 @@ services:
env_file: .env env_file: .env
environment: environment:
REDIS_URL: redis://redis/0 REDIS_URL: redis://redis/0
depends_on: [ redis ] depends_on: [redis]
# worker2: # worker2:
# image: tornadocash/relayer:mining # image: tornadocash/relayer:mining
@ -65,12 +65,10 @@ services:
# TELEGRAM_NOTIFIER_BOT_TOKEN: ... # TELEGRAM_NOTIFIER_BOT_TOKEN: ...
# TELEGRAM_NOTIFIER_CHAT_ID: ... # TELEGRAM_NOTIFIER_CHAT_ID: ...
redis: redis:
image: redis image: redis
restart: always restart: always
command: [ redis-server, '/usr/local/etc/redis/redis.conf', --appendonly, 'yes', ] command: [redis-server, '/usr/local/etc/redis/redis.conf', --appendonly, 'yes']
ports: ports:
- '6379:6379' - '6379:6379'
volumes: volumes:

@ -13,4 +13,3 @@ export const healthProcessor: Processor = async () => {
await healthService.setStatus({ status: false, error: e.message }); await healthService.setStatus({ status: false, error: e.message });
} }
}; };

@ -1,4 +1,5 @@
import { Processor, Queue, QueueScheduler, Worker } from 'bullmq'; import { Processor, Queue, QueueScheduler, Worker } from 'bullmq';
import { TransactionReceipt } from '@ethersproject/abstract-provider';
import { JobStatus, RelayerJobType, Token } from '../types'; import { JobStatus, RelayerJobType, Token } from '../types';
import { WithdrawalData } from '../services/tx.service'; import { WithdrawalData } from '../services/tx.service';
import { priceProcessor } from './price.processor'; import { priceProcessor } from './price.processor';
@ -8,19 +9,23 @@ import { ConfigService } from '../services/config.service';
import { relayerProcessor } from './relayer.processor'; import { relayerProcessor } from './relayer.processor';
import { healthProcessor } from './health.processor'; import { healthProcessor } from './health.processor';
type PriceJobData = Token[] type PriceJobData = Token[];
type PriceJobReturn = number type PriceJobReturn = number;
type HealthJobReturn = void type HealthJobReturn = void;
type HealthJobData = null type HealthJobData = null;
export type RelayerJobData = export type RelayerJobData = WithdrawalData & {
WithdrawalData id: string;
& { id: string, status: JobStatus, type: RelayerJobType, txHash?: string, confirmations?: number } status: JobStatus;
export type RelayerJobReturn = any type: RelayerJobType;
txHash?: string;
confirmations?: number;
};
export type RelayerJobReturn = TransactionReceipt;
export type RelayerProcessor = Processor<RelayerJobData, RelayerJobReturn, RelayerJobType> export type RelayerProcessor = Processor<RelayerJobData, RelayerJobReturn, RelayerJobType>;
export type PriceProcessor = Processor<PriceJobData, PriceJobReturn, 'updatePrice'> export type PriceProcessor = Processor<PriceJobData, PriceJobReturn, 'updatePrice'>;
@autoInjectable() @autoInjectable()
export class PriceQueueHelper { export class PriceQueueHelper {
@ -28,8 +33,7 @@ export class PriceQueueHelper {
_worker: Worker<PriceJobData, PriceJobReturn, 'updatePrice'>; _worker: Worker<PriceJobData, PriceJobReturn, 'updatePrice'>;
_scheduler: QueueScheduler; _scheduler: QueueScheduler;
constructor(private store?: RedisStore) { constructor(private store?: RedisStore) {}
}
get queue() { get queue() {
if (!this._queue) { if (!this._queue) {
@ -56,7 +60,9 @@ export class PriceQueueHelper {
get scheduler() { get scheduler() {
if (!this._scheduler) { if (!this._scheduler) {
this._scheduler = new QueueScheduler('price', { connection: this.store.client }); this._scheduler = new QueueScheduler('price', {
connection: this.store.client,
});
} }
return this._scheduler; return this._scheduler;
} }
@ -71,15 +77,13 @@ export class PriceQueueHelper {
} }
} }
@autoInjectable() @autoInjectable()
export class RelayerQueueHelper { export class RelayerQueueHelper {
private _queue: Queue<RelayerJobData, RelayerJobReturn, RelayerJobType>; private _queue: Queue<RelayerJobData, RelayerJobReturn, RelayerJobType>;
private _worker: Worker<RelayerJobData, RelayerJobReturn, RelayerJobType>; private _worker: Worker<RelayerJobData, RelayerJobReturn, RelayerJobType>;
private _scheduler: QueueScheduler; private _scheduler: QueueScheduler;
constructor(private store?: RedisStore, private config?: ConfigService) { constructor(private store?: RedisStore, private config?: ConfigService) {}
}
get queue() { get queue() {
if (!this._queue) { if (!this._queue) {
@ -103,27 +107,27 @@ export class RelayerQueueHelper {
get scheduler() { get scheduler() {
if (!this._scheduler) { if (!this._scheduler) {
this._scheduler = new QueueScheduler(this.config.queueName, { connection: this.store.client }); this._scheduler = new QueueScheduler(this.config.queueName, {
connection: this.store.client,
});
} }
return this._scheduler; return this._scheduler;
} }
} }
@autoInjectable() @autoInjectable()
export class HealthQueueHelper { export class HealthQueueHelper {
private _queue: Queue<HealthJobData, HealthJobReturn, 'checkHealth'>; private _queue: Queue<HealthJobData, HealthJobReturn, 'checkHealth'>;
private _worker: Worker<HealthJobData, HealthJobReturn, 'checkHealth'>; private _worker: Worker<HealthJobData, HealthJobReturn, 'checkHealth'>;
private _scheduler: QueueScheduler; private _scheduler: QueueScheduler;
constructor(private store?: RedisStore, private config?: ConfigService) { constructor(private store?: RedisStore, private config?: ConfigService) {}
}
get scheduler(): QueueScheduler { get scheduler(): QueueScheduler {
if (!this._scheduler) { if (!this._scheduler) {
this._scheduler = new QueueScheduler('health', { connection: this.store.client }); this._scheduler = new QueueScheduler('health', {
connection: this.store.client,
});
} }
return this._scheduler; return this._scheduler;
} }
@ -156,6 +160,4 @@ export class HealthQueueHelper {
}, },
}); });
} }
} }

@ -7,4 +7,3 @@ export const priceProcessor: PriceProcessor = async (job) => {
if (result) return await priceService.savePrices(result); if (result) return await priceService.savePrices(result);
return null; return null;
}; };

@ -15,19 +15,23 @@ class RelayerError extends Error {
@autoInjectable() @autoInjectable()
export class HealthService { export class HealthService {
constructor(private config: ConfigService, private store: RedisStore) {}
constructor(private config: ConfigService, private store: RedisStore) {
}
async clearErrorCodes() { async clearErrorCodes() {
await this.store.client.del('errors:code'); await this.store.client.del('errors:code');
} }
private async _getErrors(): Promise<{ errorsLog: { message: string, score: number }[], errorsCode: Record<string, number> }> { private async _getErrors(): Promise<{
errorsLog: { message: string; score: number }[];
errorsCode: Record<string, number>;
}> {
const logSet = await this.store.client.zrevrange('errors:log', 0, -1, 'WITHSCORES'); const logSet = await this.store.client.zrevrange('errors:log', 0, -1, 'WITHSCORES');
const codeSet = await this.store.client.zrevrange('errors:code', 0, -1, 'WITHSCORES'); const codeSet = await this.store.client.zrevrange('errors:code', 0, -1, 'WITHSCORES');
return { errorsLog: HealthService._parseSet(logSet), errorsCode: HealthService._parseSet(codeSet, 'object') }; return {
errorsLog: HealthService._parseSet(logSet),
errorsCode: HealthService._parseSet(codeSet, 'object'),
};
} }
private async _getStatus() { private async _getStatus() {
@ -53,7 +57,7 @@ export class HealthService {
return out; return out;
} }
async setStatus(status: { status: boolean; error: string; }) { async setStatus(status: { status: boolean; error: string }) {
await this.store.client.hset('health:status', status); await this.store.client.hset('health:status', status);
} }
@ -115,8 +119,6 @@ export class HealthService {
await this.config.checkNetwork(); await this.config.checkNetwork();
const mainBalance = await this.config.wallet.getBalance(); const mainBalance = await this.config.wallet.getBalance();
const tornBalance = await this.config.tokenContract.balanceOf(this.config.wallet.address); const tornBalance = await this.config.tokenContract.balanceOf(this.config.wallet.address);
// const mainBalance = BigNumber.from(`${1e18}`).add(1);
// const tornBalance = BigNumber.from(`${60e18}`);
const mainStatus = await this._checkBalance(mainBalance, 'MAIN'); const mainStatus = await this._checkBalance(mainBalance, 'MAIN');
const tornStatus = await this._checkBalance(tornBalance, 'TORN'); const tornStatus = await this._checkBalance(tornBalance, 'TORN');
if (mainStatus.level === 'CRITICAL') { if (mainStatus.level === 'CRITICAL') {
@ -128,15 +130,10 @@ export class HealthService {
} }
} }
type HealthData = {
status: boolean,
error: string,
errorsLog: { message: string, score: number }[]
}
type Alert = { type Alert = {
type: string, type: string;
message: string, message: string;
level: Levels, level: Levels;
time?: number, time?: number;
} };
export default () => container.resolve(HealthService); export default () => container.resolve(HealthService);

@ -4,4 +4,3 @@ export { default as getJobService } from './job.service';
export { default as getTxService } from './tx.service'; export { default as getTxService } from './tx.service';
export { default as getNotifierService } from './notifier.service'; export { default as getNotifierService } from './notifier.service';
export { default as getHealthService } from './health.service'; export { default as getHealthService } from './health.service';

@ -7,11 +7,12 @@ import { ConfigService } from './config.service';
@injectable() @injectable()
export class JobService { export class JobService {
constructor(private price?: PriceQueueHelper, constructor(
private price?: PriceQueueHelper,
private relayer?: RelayerQueueHelper, private relayer?: RelayerQueueHelper,
private health?: HealthQueueHelper, private health?: HealthQueueHelper,
public config?: ConfigService) { public config?: ConfigService,
} ) {}
async postJob(type: RelayerJobType, data: WithdrawalData) { async postJob(type: RelayerJobType, data: WithdrawalData) {
const id = v4(); const id = v4();
@ -39,10 +40,8 @@ export class JobService {
private async _clearSchedulerJobs() { private async _clearSchedulerJobs() {
try { try {
const jobs = await Promise.all([this.price.queue.getJobs(), this.health.queue.getJobs()]); const jobs = await Promise.all([this.price.queue.getJobs(), this.health.queue.getJobs()]);
await Promise.all(jobs.flat().map(job => job?.remove())); await Promise.all(jobs.flat().map((job) => job?.remove()));
} catch (e) { } catch (e) {
console.log(e); console.log(e);
} }

@ -2,21 +2,20 @@ import { Telegram } from 'telegraf';
import { autoInjectable, container } from 'tsyringe'; import { autoInjectable, container } from 'tsyringe';
import { RedisStore } from '../modules/redis'; import { RedisStore } from '../modules/redis';
export type Levels = keyof typeof AlertLevel export type Levels = keyof typeof AlertLevel;
export enum AlertLevel { export enum AlertLevel {
'INFO' = '', 'INFO' = '',
'WARN' = '⚠️', 'WARN' = '⚠️',
'CRITICAL' = '‼️', 'CRITICAL' = '‼️',
'ERROR' = '💩', 'ERROR' = '💩',
'OK' = '✅' 'OK' = '✅',
} }
export enum AlertType { export enum AlertType {
'INSUFFICIENT_BALANCE', 'INSUFFICIENT_BALANCE',
'INSUFFICIENT_TORN_BALANCE', 'INSUFFICIENT_TORN_BALANCE',
'RPC' 'RPC',
} }
class MockTelegram { class MockTelegram {
@ -43,7 +42,6 @@ export class NotifierService {
this.token = process.env.TELEGRAM_NOTIFIER_BOT_TOKEN; this.token = process.env.TELEGRAM_NOTIFIER_BOT_TOKEN;
this.chatId = process.env.TELEGRAM_NOTIFIER_CHAT_ID; this.chatId = process.env.TELEGRAM_NOTIFIER_CHAT_ID;
this.telegram = this.token ? new Telegram(this.token) : new MockTelegram(); this.telegram = this.token ? new Telegram(this.token) : new MockTelegram();
} }
async processAlert(message: string) { async processAlert(message: string) {
@ -52,7 +50,7 @@ export class NotifierService {
const isSent = await this.store.client.sismember('alerts:sent', `${a}_${b}_${c}`); const isSent = await this.store.client.sismember('alerts:sent', `${a}_${b}_${c}`);
if (!isSent) { if (!isSent) {
if (alert.level === 'OK') { if (alert.level === 'OK') {
this.store.client.srem('alerts:sent', ...['WARN', 'CRITICAL'].map(c => `${a}_${b}_${c}`)); this.store.client.srem('alerts:sent', ...['WARN', 'CRITICAL'].map((c) => `${a}_${b}_${c}`));
} else { } else {
await this.send(alert.message, alert.level); await this.send(alert.message, alert.level);
this.store.client.sadd('alerts:sent', alert.type); this.store.client.sadd('alerts:sent', alert.type);
@ -63,27 +61,17 @@ export class NotifierService {
async subscribe() { async subscribe() {
this.store.subscriber.subscribe('user-notify'); this.store.subscriber.subscribe('user-notify');
this.store.subscriber.on('message', async (channel, message) => { this.store.subscriber.on('message', async (channel, message) => {
await this.processAlert(<string>message); await this.processAlert(<string>message);
}, });
);
} }
send(message: string, level: Levels) { send(message: string, level: Levels) {
const text = `${AlertLevel[level]} ${message}`; const text = `${AlertLevel[level]} ${message}`;
return this.telegram.sendMessage( return this.telegram.sendMessage(this.chatId, text, { parse_mode: 'HTML' });
this.chatId,
text,
{ parse_mode: 'HTML' },
);
} }
sendError(e: any) { sendError(e: any) {
return this.telegram.sendMessage( return this.telegram.sendMessage(this.chatId, `Error: ${e}`);
this.chatId,
`Error: ${e}`,
);
} }
check() { check() {