tovarish-relayer/lib/services/worker.js

286 lines
12 KiB
JavaScript
Raw Normal View History

2024-11-13 04:42:13 +03:00
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.RelayerWorker = exports.DEFAULT_GAS_LIMIT = exports.RelayerStatus = void 0;
exports.getFeeParams = getFeeParams;
exports.checkWithdrawalFees = checkWithdrawalFees;
exports.processWithdrawals = processWithdrawals;
const crypto_1 = require("crypto");
const ethers_1 = require("ethers");
const contracts_1 = require("@tornado/contracts");
const core_1 = require("@tornado/core");
const config_1 = require("../config");
const logger_1 = require("./logger");
const error_1 = require("./error");
var RelayerStatus;
(function (RelayerStatus) {
RelayerStatus["QUEUED"] = "QUEUED";
RelayerStatus["ACCEPTED"] = "ACCEPTED";
RelayerStatus["SENT"] = "SENT";
RelayerStatus["MINED"] = "MINED";
RelayerStatus["RESUBMITTED"] = "RESUBMITTED";
RelayerStatus["CONFIRMED"] = "CONFIRMED";
RelayerStatus["FAILED"] = "FAILED";
})(RelayerStatus || (exports.RelayerStatus = RelayerStatus = {}));
exports.DEFAULT_GAS_LIMIT = 600_000;
function setupServices(relayerWorker) {
const { relayerConfig: { enabledNetworks, txRpcUrls }, } = relayerWorker;
for (const netId of enabledNetworks) {
const config = (0, core_1.getConfig)(netId);
const rpcUrl = txRpcUrls[netId];
const provider = (0, core_1.getProviderWithNetId)(netId, rpcUrl, config);
const signer = new core_1.TornadoWallet((0, config_1.getPrivateKey)(), provider);
const Router = contracts_1.TornadoRouter__factory.connect(config.routerContract, signer);
const tornadoFeeOracle = new core_1.TornadoFeeOracle(provider);
relayerWorker.cachedRelayerServices[netId] = {
provider,
signer,
Router,
tornadoFeeOracle,
};
}
}
function getFeeParams(config, serviceFee, syncManager, { netId, contract, args }) {
const { amount, currency } = (0, core_1.getInstanceByAddress)(config, contract);
const { nativeCurrency, tokens: { [currency]: { symbol: currencySymbol, decimals, gasLimit: instanceGasLimit }, }, } = config;
const symbol = currencySymbol.toLowerCase();
const { gasPrice, l1Fee } = syncManager.getGasPrice(netId);
const gasLimit = BigInt(instanceGasLimit || exports.DEFAULT_GAS_LIMIT);
const denomination = (0, ethers_1.parseUnits)(amount, decimals);
const ethRefund = BigInt(args[5]);
const tokenPriceInWei = syncManager.getPrice(netId, symbol);
const isEth = nativeCurrency === currency;
return {
amount,
symbol,
gasPrice: BigInt(gasPrice),
gasLimit,
l1Fee,
denomination,
ethRefund,
tokenPriceInWei,
tokenDecimals: decimals,
relayerFeePercent: serviceFee,
isEth,
premiumPercent: 5,
};
}
async function checkWithdrawalFees(relayerWorker, work) {
try {
const { id, netId, contract, proof, args } = work;
const { relayerConfig: { rewardAccount, serviceFee }, cachedRelayerServices: { [netId]: { tornadoFeeOracle, Router }, }, syncManager, } = relayerWorker;
const config = (0, core_1.getConfig)(netId);
const feeParams = getFeeParams(config, serviceFee, syncManager, work);
const { amount, symbol, tokenDecimals, denomination, ethRefund } = feeParams;
let fee = tornadoFeeOracle.calculateRelayerFee(feeParams);
const gasLimit = await Router.withdraw.estimateGas(contract, proof, ...args, {
from: rewardAccount,
value: ethRefund,
});
// Recalculate fee based on correct gas limit
fee = tornadoFeeOracle.calculateRelayerFee({
...feeParams,
gasLimit,
});
if (fee > denomination) {
return {
gasPrice: feeParams.gasPrice,
gasLimit,
status: false,
error: `Fee above deposit amount, requires ${(0, ethers_1.formatUnits)(fee, tokenDecimals)} ${symbol} while denomination is ${amount} ${symbol}`,
};
}
if (fee > BigInt(args[4])) {
return {
gasPrice: feeParams.gasPrice,
gasLimit,
status: false,
error: `Insufficient fee, requires ${(0, ethers_1.formatUnits)(fee, tokenDecimals)} ${symbol} while user only wants to pay ${(0, ethers_1.formatUnits)(BigInt(args[4]), tokenDecimals)} ${symbol}`,
};
}
relayerWorker.logger.info(`New job: ${id} ${netId} ${amount} ${symbol} (Fee: ${(0, ethers_1.formatUnits)(BigInt(args[4]), tokenDecimals)} ${symbol}, Refund: ${(0, ethers_1.formatUnits)(BigInt(args[5]), tokenDecimals)})`);
return {
gasPrice: feeParams.gasPrice,
gasLimit,
status: true,
};
}
catch {
return {
gasPrice: BigInt(0),
gasLimit: BigInt(0),
status: false,
error: 'Withdrawal transaction expected to be reverted',
};
}
}
async function processWithdrawals(relayerWorker) {
const { logger, cachedRelayerServices, errors } = relayerWorker;
for (const work of relayerWorker.queue) {
try {
if (work.status !== RelayerStatus.ACCEPTED) {
continue;
}
const { id, netId, contract, proof, args } = work;
// cancel duplicated jobs
const otherWork = relayerWorker.queue.find((q) => q.id !== id &&
// find if other previous work is already sent (not pending or failed - to allow spending first and failed one)
q.status !== RelayerStatus.ACCEPTED &&
q.status !== RelayerStatus.FAILED &&
q.contract === contract &&
q.args[1] === args[1]);
if (otherWork) {
const errMsg = `Found the same pending job ${otherWork.id}, wait until the previous one completes`;
throw new Error(errMsg);
}
const { gasLimit, gasPrice } = relayerWorker.queueGas.find((w) => w.id === id);
const config = (0, core_1.getConfig)(netId);
const { amount, currency } = (0, core_1.getInstanceByAddress)(config, contract);
const { decimals } = config.tokens[currency];
const { Router, signer } = cachedRelayerServices[netId];
/**
* Check fees to ensure that it didn't spike or revert (or has insane gas spendings)
*/
const txObj = await signer.populateTransaction(await Router.withdraw.populateTransaction(contract, proof, ...args, {
value: BigInt(args[5]),
}));
const txGasPrice = txObj.maxFeePerGas
? txObj.maxFeePerGas + BigInt(txObj.maxPriorityFeePerGas || 0)
: txObj.gasPrice;
// Prevent tx on gas limit spike
if (txObj.gasLimit > (gasLimit * BigInt(15)) / BigInt(10)) {
const errMsg = `Job ${id} exceeds pre estimated gas limit, wants ${gasLimit * BigInt(2)} have ${txObj.gasLimit}`;
throw new Error(errMsg);
}
// Prevent tx on gas price spike
if (txGasPrice > gasPrice * BigInt(2)) {
const errMsg = `Job ${id} exceeds pre estimated gas price, wants ${gasPrice * BigInt(2)} have ${txGasPrice}`;
throw new Error(errMsg);
}
const tx = await signer.sendTransaction(txObj);
work.txHash = tx.hash;
work.confirmations = 0;
work.status = RelayerStatus.SENT;
logger.info(`Sent Job ${work.id} ${netId} ${amount} ${currency} tx (Fee: ${(0, ethers_1.formatUnits)(BigInt(args[4]), decimals)} ${currency}, Refund: ${(0, ethers_1.formatUnits)(BigInt(args[5]), decimals)} ${currency} ${tx.hash})`);
// Wait for 2 seconds so that the remote node could increment nonces
await (0, core_1.sleep)(2000);
// Head straight to confirmed status as the remote node oftenly doesn't report receipt correctly
work.confirmations = 1;
work.status = RelayerStatus.MINED;
work.confirmations = 3;
work.status = RelayerStatus.CONFIRMED;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
}
catch (error) {
logger.error(`Failed to send job ${work.id}`);
console.log(error);
errors.push((0, error_1.newError)('Worker (processWithdrawals)', work.netId, error));
work.status = RelayerStatus.FAILED;
if (error.message?.includes('exceeds pre estimated')) {
work.failedReason = error.message;
}
else if (error.message?.includes('Found the same pending job')) {
work.failedReason = error.message;
}
else {
work.failedReason = 'Relayer failed to send transaction';
}
}
}
relayerWorker.queue = relayerWorker.queue.filter((w) => w.timestamp + relayerWorker.relayerConfig.clearInterval >= Math.floor(Date.now() / 1000));
relayerWorker.queueGas = relayerWorker.queueGas.filter((w) => w.timestamp + relayerWorker.relayerConfig.clearInterval >= Math.floor(Date.now() / 1000));
}
class RelayerWorker {
relayerConfig;
logger;
syncManager;
cachedRelayerServices;
queue;
queueGas;
queueTimer;
errors;
constructor(relayerConfig, syncManager) {
this.relayerConfig = relayerConfig;
this.syncManager = syncManager;
this.logger = (0, logger_1.getLogger)('[RelayerWorker]', relayerConfig.logLevel);
this.cachedRelayerServices = {};
this.queue = [];
this.queueGas = [];
this.queueTimer = null;
this.errors = [];
setupServices(this);
}
async doWork() {
await processWithdrawals(this);
const pendingWorks = this.queue.filter((q) => q.status === RelayerStatus.QUEUED || q.status === RelayerStatus.ACCEPTED).length;
if (pendingWorks) {
if (pendingWorks < 5) {
this.doWork();
return;
}
else {
this.queue.forEach((q) => {
q.status = RelayerStatus.FAILED;
q.error = 'Relayer has too many jobs, try it again later';
q.failedReason = 'Relayer has too many jobs, try it again later';
});
this.logger.error(`Relayer has cleared the workload ( ${pendingWorks} ) due to overhaul`);
}
}
this.queueTimer = null;
}
async createWork({ netId, contract, proof, args, }) {
const work = {
netId,
id: crypto_1.webcrypto.randomUUID(),
type: 'TORNADO_WITHDRAW',
status: RelayerStatus.QUEUED,
contract,
proof,
args,
timestamp: Math.floor(Date.now() / 1000),
};
if (this.queue.find((q) => q.status !== RelayerStatus.FAILED && q.contract === contract && q.args[1] === args[1])) {
work.status = RelayerStatus.FAILED;
return {
error: 'Found the same pending job, wait until the previous one completes',
};
}
const { gasPrice, gasLimit, status, error } = await checkWithdrawalFees(this, work);
const workGas = {
id: work.id,
gasPrice,
gasLimit,
timestamp: work.timestamp,
};
if (!status) {
work.status = RelayerStatus.FAILED;
return {
error,
};
}
work.status = RelayerStatus.ACCEPTED;
this.queue.push(work);
this.queueGas.push(workGas);
if (!this.queueTimer) {
this.queueTimer = setTimeout(() => this.doWork(), 500);
}
return work;
}
getWork({ id }) {
const work = this.queue.find((w) => w.id === id);
if (!work) {
return {
error: `Work ${id} not found`,
};
}
const copiedWork = JSON.parse(JSON.stringify(work));
delete copiedWork.netId;
return copiedWork;
}
pendingWorks() {
return this.queue.filter((q) => q.status === RelayerStatus.QUEUED || q.status === RelayerStatus.ACCEPTED)
.length;
}
}
exports.RelayerWorker = RelayerWorker;