Use safe approach for eth_getLogs requests (#581)

This commit is contained in:
Kirill Fedoseev 2021-06-30 12:28:03 +03:00 committed by GitHub
parent 92e1b597c4
commit 3e5e50c06e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 86 additions and 40 deletions

@ -35,7 +35,7 @@ async function initialize() {
try {
const checkHttps = checkHTTPS(ORACLE_ALLOW_HTTP_FOR_RPC, logger)
web3.currentProvider.urls.forEach(checkHttps(config.chain))
web3.currentProvider.subProvider.urls.forEach(checkHttps(config.chain))
attached = await isAttached()
if (attached) {

@ -40,7 +40,7 @@ async function initialize() {
try {
const checkHttps = checkHTTPS(process.env.ORACLE_ALLOW_HTTP_FOR_RPC, logger)
web3.currentProvider.urls.forEach(checkHttps(config.chain))
web3.currentProvider.subProvider.urls.forEach(checkHttps(config.chain))
GasPrice.start(config.id)

@ -2,6 +2,8 @@ const fetch = require('node-fetch')
const promiseRetry = require('promise-retry')
const { FALLBACK_RPC_URL_SWITCH_TIMEOUT } = require('../utils/constants')
const { onInjected } = require('./injectedLogger')
// From EIP-1474 and Infura documentation
const JSONRPC_ERROR_CODES = [-32603, -32002, -32005]
@ -33,14 +35,10 @@ function HttpListProvider(urls, options = {}) {
this.options = { ...defaultOptions, ...options }
this.currentIndex = 0
this.lastTimeUsedPrimary = 0
this.logger = {
debug: () => {},
info: () => {}
}
}
HttpListProvider.prototype.setLogger = function(logger) {
onInjected(logger => {
this.logger = logger.child({ module: `HttpListProvider:${this.options.name}` })
})
}
HttpListProvider.prototype.send = async function send(payload, callback) {

@ -1,6 +1,7 @@
const promiseRetry = require('promise-retry')
const { promiseAny } = require('../utils/utils')
const { defaultOptions, HttpListProviderError, send } = require('./HttpListProvider')
const { onInjected } = require('./injectedLogger')
function RedundantHttpListProvider(urls, options = {}) {
if (!(this instanceof RedundantHttpListProvider)) {
@ -13,10 +14,9 @@ function RedundantHttpListProvider(urls, options = {}) {
this.urls = urls
this.options = { ...defaultOptions, ...options }
}
RedundantHttpListProvider.prototype.setLogger = function(logger) {
onInjected(logger => {
this.logger = logger.child({ module: `RedundantHttpListProvider:${this.options.name}` })
})
}
RedundantHttpListProvider.prototype.send = async function send(payload, callback) {

@ -0,0 +1,41 @@
const { hexToNumber, isHexStrict } = require('web3').utils
const { onInjected } = require('./injectedLogger')
function SafeEthLogsProvider(provider) {
this.subProvider = provider
onInjected(logger => {
this.logger = logger.child({ module: 'SafeEthLogsProvider' })
})
}
SafeEthLogsProvider.prototype.send = function send(payload, callback) {
if (payload.method === 'eth_getLogs' && isHexStrict(payload.params[0].toBlock)) {
this.logger.debug('Modifying eth_getLogs request to include batch eth_blockNumber request')
const newPayload = [payload, { jsonrpc: '2.0', id: payload.id + 1, method: 'eth_blockNumber', params: [] }]
this.subProvider.send(newPayload, (err, res) => {
if (err) {
callback(err, null)
} else {
const rawLogs = res.find(({ id }) => id === payload.id)
const rawBlockNumber = res.find(({ id }) => id === payload.id + 1)
const blockNumber = hexToNumber(rawBlockNumber.result)
const toBlock = hexToNumber(payload.params[0].toBlock)
if (blockNumber < toBlock) {
this.logger.warn({ toBlock, blockNumber }, 'Returned block number is less than the specified toBlock')
callback(new Error('block number too low'), null)
} else {
callback(null, rawLogs)
}
}
})
} else {
this.subProvider.send(payload, callback)
}
}
module.exports = {
SafeEthLogsProvider
}

@ -0,0 +1,27 @@
// workaround to avoid circular dependencies in module imports
// e.g. logger -> config -> web3 -> provider -> logger
// transforms to the following import chain
// logger -> config -> web3 -> provider -> injectedLogger
// logger -> injectedLogger
let logger
const callbacks = []
function onInjected(cb) {
if (logger) {
cb(logger)
} else {
callbacks.push(cb)
}
}
function setLogger(newLogger) {
logger = newLogger
callbacks.forEach(cb => cb(logger))
}
module.exports = {
onInjected,
setLogger
}

@ -1,15 +1,7 @@
const pino = require('pino')
const path = require('path')
const {
web3Home,
web3Foreign,
web3ForeignArchive,
web3Side,
web3HomeFallback,
web3ForeignFallback,
web3HomeRedundant,
web3ForeignRedundant
} = require('./web3')
const { setLogger } = require('./injectedLogger')
const config = process.env.NODE_ENV !== 'test' ? require(path.join('../../config/', process.argv[2])) : {}
@ -25,19 +17,6 @@ const logger = pino({
: {}
})
web3Home.currentProvider.setLogger(logger)
web3Foreign.currentProvider.setLogger(logger)
web3HomeFallback.currentProvider.setLogger(logger)
web3ForeignFallback.currentProvider.setLogger(logger)
web3HomeRedundant.currentProvider.setLogger(logger)
web3ForeignRedundant.currentProvider.setLogger(logger)
if (web3ForeignArchive) {
web3ForeignArchive.currentProvider.setLogger(logger)
}
if (web3Side) {
web3Side.currentProvider.setLogger(logger)
}
setLogger(logger)
module.exports = logger

@ -1,5 +1,6 @@
const Web3 = require('web3')
const { HttpListProvider } = require('./HttpListProvider')
const { SafeEthLogsProvider } = require('./SafeEthLogsProvider')
const { RedundantHttpListProvider } = require('./RedundantHttpListProvider')
const { RETRY_CONFIG } = require('../utils/constants')
@ -37,10 +38,10 @@ const foreignOptions = {
retry: RETRY_CONFIG
}
const homeProvider = new HttpListProvider(homeUrls, homeOptions)
const homeProvider = new SafeEthLogsProvider(new HttpListProvider(homeUrls, homeOptions))
const web3Home = new Web3(homeProvider)
const foreignProvider = new HttpListProvider(foreignUrls, foreignOptions)
const foreignProvider = new SafeEthLogsProvider(new HttpListProvider(foreignUrls, foreignOptions))
const web3Foreign = new Web3(foreignProvider)
let web3ForeignArchive = null

@ -34,7 +34,7 @@ async function initialize() {
try {
const checkHttps = checkHTTPS(process.env.ORACLE_ALLOW_HTTP_FOR_RPC, logger)
web3.currentProvider.urls.forEach(checkHttps(chain))
web3.currentProvider.subProvider.urls.forEach(checkHttps(chain))
await getLastProcessedBlock()
connectWatcherToQueue({