Detect all AMB mediators in monitor (#493)

This commit is contained in:
Kirill Fedoseev 2020-12-20 01:19:49 +03:00 committed by GitHub
parent 4497a024b1
commit 4d468ae107
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 231 additions and 49 deletions

@ -2,10 +2,6 @@ function strip0x(input) {
return input.replace(/^0x/, '')
}
function addTxHashToData({ encodedData, transactionHash }) {
return encodedData.slice(0, 2) + strip0x(transactionHash) + encodedData.slice(2)
}
/**
* Decodes the datatype byte from the AMB message.
* First (the most significant bit) denotes if the message should be forwarded to the manual lane.
@ -33,8 +29,18 @@ function parseAMBMessage(message) {
}
}
module.exports = {
addTxHashToData,
parseAMBMessage,
strip0x
const normalizeAMBMessageEvent = e => {
let msgData = e.returnValues.encodedData
if (!e.returnValues.messageId) {
// append tx hash to an old message, where message id was not used
// for old messages, e.messageId is a corresponding transactionHash
msgData = e.transactionHash + msgData.slice(2)
}
return parseAMBMessage(msgData)
}
module.exports = {
strip0x,
parseAMBMessage,
normalizeAMBMessageEvent
}

@ -1,6 +1,6 @@
const { BN } = require('web3-utils')
const { expect } = require('chai').use(require('bn-chai')(BN))
const { parseAMBMessage, strip0x, addTxHashToData } = require('../message')
const { parseAMBMessage, strip0x } = require('../message')
describe('strip0x', () => {
it('should remove 0x from input', () => {
@ -24,28 +24,6 @@ describe('strip0x', () => {
expect(result).to.be.equal(input)
})
})
describe('addTxHashToData', () => {
it('should add txHash to encoded data at position 2', () => {
// Given
const msgSender = '0x003667154bb32e42bb9e1e6532f19d187fa0082e'
const msgExecutor = '0xf4bef13f9f4f2b203faf0c3cbbaabe1afe056955'
const msgGasLimit = '000000000000000000000000000000000000000000000000000000005b877705'
const msgDataType = '00'
const msgData = '0xb1591967aed668a4b27645ff40c444892d91bf5951b382995d4d4f6ee3a2ce03'
const encodedData = `0x${strip0x(msgSender)}${strip0x(msgExecutor)}${msgGasLimit}${msgDataType}${strip0x(msgData)}`
const transactionHash = '0xbdceda9d8c94838aca10c687da1411a07b1390e88239c0638cb9cc264219cc10'
const message = `0x${strip0x(transactionHash)}${strip0x(msgSender)}${strip0x(
msgExecutor
)}${msgGasLimit}${msgDataType}${strip0x(msgData)}`
// When
const result = addTxHashToData({ encodedData, transactionHash })
// Then
expect(result).to.be.equal(message)
})
})
describe('parseAMBMessage', () => {
it('should parse data type 00', () => {
const msgSender = '0x003667154bb32e42bb9e1e6532f19d187fa0082e'

@ -1,8 +1,10 @@
require('dotenv').config()
const logger = require('./logger')('checkWorker3')
const stuckTransfers = require('./stuckTransfers')
const detectMediators = require('./detectMediators')
const { writeFile, createDir } = require('./utils/file')
const { web3Home } = require('./utils/web3')
const { saveCache } = require('./utils/web3Cache')
const { MONITOR_BRIDGE_NAME, COMMON_HOME_BRIDGE_ADDRESS } = process.env
const { getBridgeMode, HOME_NATIVE_TO_ERC_ABI, BRIDGE_MODES } = require('../commons')
@ -20,6 +22,15 @@ async function checkWorker3() {
transfers.health = true
writeFile(`/responses/${MONITOR_BRIDGE_NAME}/stuckTransfers.json`, transfers)
logger.debug('Done')
} else if (bridgeMode === BRIDGE_MODES.ARBITRARY_MESSAGE) {
createDir(`/responses/${MONITOR_BRIDGE_NAME}`)
logger.debug('calling detectMediators()')
const mediators = await detectMediators(bridgeMode)
mediators.ok = true
mediators.health = true
writeFile(`/responses/${MONITOR_BRIDGE_NAME}/mediators.json`, mediators)
saveCache()
logger.debug('Done')
}
} catch (e) {
logger.error('checkWorker3.js', e)

147
monitor/detectMediators.js Normal file

@ -0,0 +1,147 @@
require('dotenv').config()
const logger = require('./logger')('stuckTransfers.js')
const { isHomeContract, isForeignContract } = require('./utils/web3Cache')
const eventsInfo = require('./utils/events')
const { getHomeTxSender, getForeignTxSender } = require('./utils/web3Cache')
const { addExecutionStatus } = require('./utils/message')
const { normalizeAMBMessageEvent } = require('../commons')
function countInteractions(requests) {
const stats = {}
requests.forEach(msg => {
if (!stats[msg.sender]) {
stats[msg.sender] = {}
}
if (!stats[msg.sender][msg.executor]) {
stats[msg.sender][msg.executor] = 0
}
stats[msg.sender][msg.executor] += 1
})
return stats
}
const normalize = event => ({
...normalizeAMBMessageEvent(event),
txHash: event.transactionHash,
logIndex: event.transactionLogIndex
})
const flat = arrays => Array.prototype.concat.apply([], arrays)
function findPermanentMediators(homeToForeignC2C, foreignToHomeC2C) {
return flat(
Object.entries(homeToForeignC2C).map(([homeMediator, homeStats]) =>
Object.entries(foreignToHomeC2C)
.map(([foreignMediator, foreignStats]) => ({
homeMediator,
foreignMediator,
homeToForeignRequests: homeStats[foreignMediator],
foreignToHomeRequests: foreignStats[homeMediator]
}))
.filter(stats => stats.homeToForeignRequests && stats.foreignToHomeRequests)
)
)
}
function findFloatingMediators(homeToForeignC2C, foreignToHomeC2C) {
return Object.entries(homeToForeignC2C)
.map(([homeMediator, homeStats]) => {
const noResponses = ([executor]) => !foreignToHomeC2C[executor] || !foreignToHomeC2C[executor][homeMediator]
const executorRequestPairs = Object.entries(homeStats).filter(noResponses)
return {
mediator: homeMediator,
executors: executorRequestPairs.map(pair => pair[0]),
requests: executorRequestPairs.map(pair => pair[1])
}
})
.filter(stats => stats.executors.length > 0)
}
function findRemotelyControlledMediators(statsU2C) {
return Object.entries(statsU2C).map(([user, stats]) => ({
user,
executors: Object.keys(stats),
requests: Object.values(stats)
}))
}
function findUnknown(statsA2U) {
return Object.entries(statsA2U).map(([sender, stats]) => ({
sender,
executors: Object.keys(stats),
requests: Object.values(stats)
}))
}
async function main(mode) {
const {
homeToForeignRequests,
foreignToHomeRequests,
homeToForeignConfirmations,
foreignToHomeConfirmations
} = await eventsInfo(mode)
const homeToForeign = homeToForeignRequests
.map(normalize)
.map(addExecutionStatus(homeToForeignConfirmations))
.filter(x => typeof x.status === 'boolean')
const foreignToHome = foreignToHomeRequests
.map(normalize)
.map(addExecutionStatus(foreignToHomeConfirmations))
.filter(x => typeof x.status === 'boolean')
for (const event of homeToForeign) {
// AMB contract emits a single UserRequestForSignature event for every home->foreign request.
// If index of such event in logs is not equal to 0x0, then some other events occurred before it,
// meaning that the sender was a contract.
// Alternatively, the sender is a contract, if the message sender is not equal to tx.origin.
event.isSenderAContract = event.logIndex !== '0x0' || (await getHomeTxSender(event.txHash)) !== event.sender
// Executor is definitely a contract if a message execution failed, since message calls to EOA always succeed.
// Alternatively, the executor is checked to be a contract by looking at its bytecode size.
event.isExecutorAContract = !event.status || (await isForeignContract(event.executor))
}
for (const event of foreignToHome) {
// AMB contract emits a single UserRequestForAffirmation event for every foreign->home request.
// If index of such event in logs is not equal to 0x0, then some other events occurred before it,
// meaning that the sender was a contract.
// Alternatively, the sender is a contract, if the message sender is not equal to tx.origin.
event.isSenderAContract = event.logIndex !== '0x0' || (await getForeignTxSender(event.txHash)) !== event.sender
// Executor is definitely a contract if a message execution failed, since message calls to EOA always succeed.
// Alternatively, the executor is checked to be a contract by looking at its bytecode size.
event.isExecutorAContract = !event.status || (await isHomeContract(event.executor))
}
const C2C = event => event.isSenderAContract && event.isExecutorAContract
const U2C = event => !event.isSenderAContract && event.isExecutorAContract
const A2U = event => !event.isExecutorAContract
const homeToForeignC2C = countInteractions(homeToForeign.filter(C2C))
const foreignToHomeC2C = countInteractions(foreignToHome.filter(C2C))
const homeToForeignU2C = countInteractions(homeToForeign.filter(U2C))
const foreignToHomeU2C = countInteractions(foreignToHome.filter(U2C))
const homeToForeignA2U = countInteractions(homeToForeign.filter(A2U))
const foreignToHomeA2U = countInteractions(foreignToHome.filter(A2U))
const permanentMediators = findPermanentMediators(homeToForeignC2C, foreignToHomeC2C)
const floatingMediators = {
home: findFloatingMediators(homeToForeignC2C, foreignToHomeC2C),
foreign: findFloatingMediators(foreignToHomeC2C, homeToForeignC2C)
}
const remotelyControlledMediators = {
home: findRemotelyControlledMediators(homeToForeignU2C),
foreign: findRemotelyControlledMediators(foreignToHomeU2C)
}
const unknown = {
home: findUnknown(homeToForeignA2U),
foreign: findUnknown(foreignToHomeA2U)
}
logger.debug('Done')
return {
permanentMediators,
floatingMediators,
remotelyControlledMediators,
unknown
}
}
module.exports = main

@ -1,32 +1,33 @@
const { parseAMBMessage } = require('../../commons')
const { normalizeAMBMessageEvent } = require('../../commons')
const { readAccessListFile } = require('./file')
const { MONITOR_HOME_TO_FOREIGN_ALLOWANCE_LIST, MONITOR_HOME_TO_FOREIGN_BLOCK_LIST } = process.env
const keyAMB = e => [e.messageId, e.sender, e.executor].join(',').toLowerCase()
const normalizeAMBMessage = e => {
let msgData = e.returnValues.encodedData
if (!e.returnValues.messageId) {
// append tx hash to an old message, where message id was not used
// for old messages, e.messageId is a corresponding transactionHash
msgData = e.transactionHash + msgData.slice(2)
}
return parseAMBMessage(msgData)
}
function deliveredMsgNotProcessed(processedList) {
const keys = new Set()
processedList.forEach(processedMsg => keys.add(keyAMB(processedMsg.returnValues)))
return deliveredMsg => !keys.has(keyAMB(normalizeAMBMessage(deliveredMsg)))
return deliveredMsg => !keys.has(keyAMB(normalizeAMBMessageEvent(deliveredMsg)))
}
function processedMsgNotDelivered(deliveredList) {
const keys = new Set()
deliveredList.forEach(deliveredMsg => keys.add(keyAMB(normalizeAMBMessage(deliveredMsg))))
deliveredList.forEach(deliveredMsg => keys.add(keyAMB(normalizeAMBMessageEvent(deliveredMsg))))
return processedMsg => !keys.has(keyAMB(processedMsg.returnValues))
}
function addExecutionStatus(processedList) {
const statuses = {}
processedList.forEach(processedMsg => {
statuses[keyAMB(processedMsg.returnValues)] = processedMsg.returnValues.status
})
return deliveredMsg => {
deliveredMsg.status = statuses[keyAMB(deliveredMsg)]
return deliveredMsg
}
}
/**
* Normalizes the different event objects to facilitate data processing
* @param {Object} event
@ -70,23 +71,24 @@ const manuallyProcessedAMBHomeToForeignRequests = () => {
if (MONITOR_HOME_TO_FOREIGN_ALLOWANCE_LIST) {
const allowanceList = readAccessListFile(MONITOR_HOME_TO_FOREIGN_ALLOWANCE_LIST)
return e => {
const { sender, executor, decodedDataType } = normalizeAMBMessage(e)
const { sender, executor, decodedDataType } = normalizeAMBMessageEvent(e)
return (!allowanceList.includes(sender) && !allowanceList.includes(executor)) || decodedDataType.manualLane
}
} else if (MONITOR_HOME_TO_FOREIGN_BLOCK_LIST) {
const blockList = readAccessListFile(MONITOR_HOME_TO_FOREIGN_BLOCK_LIST)
return e => {
const { sender, executor, decodedDataType } = normalizeAMBMessage(e)
const { sender, executor, decodedDataType } = normalizeAMBMessageEvent(e)
return blockList.includes(sender) || blockList.includes(executor) || decodedDataType.manualLane
}
} else {
return e => normalizeAMBMessage(e).decodedDataType.manualLane
return e => normalizeAMBMessageEvent(e).decodedDataType.manualLane
}
}
module.exports = {
deliveredMsgNotProcessed,
processedMsgNotDelivered,
addExecutionStatus,
normalizeEventInformation,
eventWithoutReference,
unclaimedHomeToForeignRequests,

@ -1,6 +1,6 @@
const logger = require('../logger')('web3Cache')
const { readCacheFile, writeCacheFile } = require('./file')
const { web3Home } = require('./web3')
const { web3Home, web3Foreign } = require('./web3')
const { getPastEvents: commonGetPastEvents } = require('../../commons')
const { MONITOR_BRIDGE_NAME, MONITOR_CACHE_EVENTS } = process.env
@ -9,16 +9,49 @@ let isDirty = false
const homeTxSendersCacheFile = `./cache/${MONITOR_BRIDGE_NAME}/home/txSenders.json`
const cachedHomeTxSenders = readCacheFile(homeTxSendersCacheFile) || {}
const foreignTxSendersCacheFile = `./cache/${MONITOR_BRIDGE_NAME}/foreign/txSenders.json`
const cachedForeignTxSenders = readCacheFile(foreignTxSendersCacheFile) || {}
const homeIsContractCacheFile = `./cache/${MONITOR_BRIDGE_NAME}/home/isContract.json`
const cachedHomeIsContract = readCacheFile(homeIsContractCacheFile) || {}
const foreignIsContractCacheFile = `./cache/${MONITOR_BRIDGE_NAME}/foreign/isContract.json`
const cachedForeignIsContract = readCacheFile(foreignIsContractCacheFile) || {}
async function getHomeTxSender(txHash) {
if (!cachedHomeTxSenders[txHash]) {
logger.debug(`Fetching sender for tx ${txHash}`)
logger.debug(`Fetching sender for home tx ${txHash}`)
cachedHomeTxSenders[txHash] = (await web3Home.eth.getTransaction(txHash)).from.toLowerCase()
isDirty = true
}
return cachedHomeTxSenders[txHash]
}
async function getForeignTxSender(txHash) {
if (!cachedForeignTxSenders[txHash]) {
logger.debug(`Fetching sender for foreign tx ${txHash}`)
cachedForeignTxSenders[txHash] = (await web3Foreign.eth.getTransaction(txHash)).from.toLowerCase()
isDirty = true
}
return cachedForeignTxSenders[txHash]
}
async function isHomeContract(address) {
if (typeof cachedHomeIsContract[address] !== 'boolean') {
logger.debug(`Fetching home contract code size for tx ${address}`)
cachedHomeIsContract[address] = (await web3Home.eth.getCode(address)).length > 2
isDirty = true
}
return cachedHomeIsContract[address]
}
async function isForeignContract(address) {
if (typeof cachedForeignIsContract[address] !== 'boolean') {
logger.debug(`Fetching foreign contract code size for tx ${address}`)
cachedForeignIsContract[address] = (await web3Foreign.eth.getCode(address)).length > 2
isDirty = true
}
return cachedForeignIsContract[address]
}
async function getPastEvents(contract, options) {
if (MONITOR_CACHE_EVENTS !== 'true') {
return commonGetPastEvents(contract, options)
@ -123,11 +156,16 @@ function saveCache() {
if (isDirty) {
logger.debug('Saving cache on disk')
writeCacheFile(homeTxSendersCacheFile, cachedHomeTxSenders)
writeCacheFile(homeIsContractCacheFile, cachedHomeIsContract)
writeCacheFile(foreignIsContractCacheFile, cachedForeignIsContract)
}
}
module.exports = {
getHomeTxSender,
getForeignTxSender,
isHomeContract,
isForeignContract,
getPastEvents,
saveCache
}