Try to detect unsynced node state (#592)

This commit is contained in:
Kirill Fedoseev 2021-08-04 00:36:50 +04:00 committed by GitHub
parent 4f5e3c47be
commit 2e6179f974
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 72 additions and 37 deletions

@ -77,7 +77,7 @@ jobs:
- name: Rebuild and push updated images - name: Rebuild and push updated images
run: | run: |
function check_if_image_exists() { function check_if_image_exists() {
curl -fsSlL -H 'Authorization: bearer ${{ github.token }}' "https://${DOCKER_REGISTRY}/v2/${DOCKER_REPO}/tokenbridge-e2e-$1/manifests/$2" > /dev/null curl -fsSlL "https://${{ github.actor }}:${{ github.token }}@${DOCKER_REGISTRY}/v2/${DOCKER_REPO}/tokenbridge-e2e-$1/manifests/$2" > /dev/null
} }
updated=() updated=()
if ! check_if_image_exists e2e ${E2E_TAG}; then updated+=("e2e"); fi if ! check_if_image_exists e2e ${E2E_TAG}; then updated+=("e2e"); fi
@ -104,7 +104,7 @@ jobs:
- name: Rebuild and push molecule runner e2e image - name: Rebuild and push molecule runner e2e image
run: | run: |
function check_if_image_exists() { function check_if_image_exists() {
curl -fsSlL -H 'Authorization: bearer ${{ github.token }}' "https://${DOCKER_REGISTRY}/v2/${DOCKER_REPO}/tokenbridge-e2e-$1/manifests/$2" > /dev/null curl -fsSlL "https://${{ github.actor }}:${{ github.token }}@${DOCKER_REGISTRY}/v2/${DOCKER_REPO}/tokenbridge-e2e-$1/manifests/$2" > /dev/null
} }
if check_if_image_exists molecule_runner ${MOLECULE_RUNNER_TAG}; then if check_if_image_exists molecule_runner ${MOLECULE_RUNNER_TAG}; then
echo "Image already exists" echo "Image already exists"

@ -49,7 +49,7 @@ async function initialize() {
try { try {
const checkHttps = checkHTTPS(ORACLE_ALLOW_HTTP_FOR_RPC, logger) const checkHttps = checkHTTPS(ORACLE_ALLOW_HTTP_FOR_RPC, logger)
web3.currentProvider.subProvider.urls.forEach(checkHttps(chain)) web3.currentProvider.urls.forEach(checkHttps(chain))
attached = await isAttached() attached = await isAttached()
if (attached) { if (attached) {

@ -40,7 +40,7 @@ async function initialize() {
try { try {
const checkHttps = checkHTTPS(process.env.ORACLE_ALLOW_HTTP_FOR_RPC, logger) const checkHttps = checkHTTPS(process.env.ORACLE_ALLOW_HTTP_FOR_RPC, logger)
web3.currentProvider.subProvider.urls.forEach(checkHttps(config.id)) web3.currentProvider.urls.forEach(checkHttps(config.id))
GasPrice.start(config.id) GasPrice.start(config.id)

@ -41,6 +41,20 @@ function HttpListProvider(urls, options = {}) {
}) })
} }
HttpListProvider.prototype.switchToFallbackRPC = function() {
if (this.urls.length < 2) {
return
}
const prevIndex = this.currentIndex
const newIndex = (prevIndex + 1) % this.urls.length
this.logger.info(
{ index: newIndex, oldURL: this.urls[prevIndex], newURL: this.urls[newIndex] },
'Switching to fallback JSON-RPC URL'
)
this.currentIndex = newIndex
}
HttpListProvider.prototype.send = async function send(payload, callback) { HttpListProvider.prototype.send = async function send(payload, callback) {
// if fallback URL is being used for too long, switch back to the primary URL // if fallback URL is being used for too long, switch back to the primary URL
if (this.currentIndex > 0 && Date.now() - this.lastTimeUsedPrimary > FALLBACK_RPC_URL_SWITCH_TIMEOUT) { if (this.currentIndex > 0 && Date.now() - this.lastTimeUsedPrimary > FALLBACK_RPC_URL_SWITCH_TIMEOUT) {

@ -1,20 +1,13 @@
const { hexToNumber, isHexStrict } = require('web3').utils const { hexToNumber, isHexStrict } = require('web3').utils
const { onInjected } = require('./injectedLogger')
function SafeEthLogsProvider(provider) { function SafeEthLogsProvider(provider) {
this.subProvider = provider const oldSend = provider.send.bind(provider)
onInjected(logger => { const newSend = function(payload, callback) {
this.logger = logger.child({ module: 'SafeEthLogsProvider' })
})
}
SafeEthLogsProvider.prototype.send = function send(payload, callback) {
if (payload.method === 'eth_getLogs' && isHexStrict(payload.params[0].toBlock)) { if (payload.method === 'eth_getLogs' && isHexStrict(payload.params[0].toBlock)) {
this.logger.debug('Modifying eth_getLogs request to include batch eth_blockNumber request') this.logger.debug('Modifying eth_getLogs request to include batch eth_blockNumber request')
const newPayload = [payload, { jsonrpc: '2.0', id: payload.id + 1, method: 'eth_blockNumber', params: [] }] const newPayload = [payload, { jsonrpc: '2.0', id: payload.id + 1, method: 'eth_blockNumber', params: [] }]
this.subProvider.send(newPayload, (err, res) => { oldSend(newPayload, (err, res) => {
if (err) { if (err) {
callback(err, null) callback(err, null)
} else { } else {
@ -32,8 +25,11 @@ SafeEthLogsProvider.prototype.send = function send(payload, callback) {
} }
}) })
} else { } else {
this.subProvider.send(payload, callback) oldSend(payload, callback)
} }
}
provider.send = newSend.bind(provider)
return provider
} }
module.exports = { module.exports = {

@ -38,10 +38,10 @@ const foreignOptions = {
retry: RETRY_CONFIG retry: RETRY_CONFIG
} }
const homeProvider = new SafeEthLogsProvider(new HttpListProvider(homeUrls, homeOptions)) const homeProvider = SafeEthLogsProvider(new HttpListProvider(homeUrls, homeOptions))
const web3Home = new Web3(homeProvider) const web3Home = new Web3(homeProvider)
const foreignProvider = new SafeEthLogsProvider(new HttpListProvider(foreignUrls, foreignOptions)) const foreignProvider = SafeEthLogsProvider(new HttpListProvider(foreignUrls, foreignOptions))
const web3Foreign = new Web3(foreignProvider) const web3Foreign = new Web3(foreignProvider)
let web3ForeignArchive = null let web3ForeignArchive = null

@ -25,6 +25,7 @@ module.exports = {
}, },
DEFAULT_TRANSACTION_RESEND_INTERVAL: 20 * 60 * 1000, DEFAULT_TRANSACTION_RESEND_INTERVAL: 20 * 60 * 1000,
FALLBACK_RPC_URL_SWITCH_TIMEOUT: 60 * 60 * 1000, FALLBACK_RPC_URL_SWITCH_TIMEOUT: 60 * 60 * 1000,
BLOCK_NUMBER_PROGRESS_ITERATIONS_LIMIT: 10,
SENDER_QUEUE_MAX_PRIORITY: 10, SENDER_QUEUE_MAX_PRIORITY: 10,
SENDER_QUEUE_SEND_PRIORITY: 5, SENDER_QUEUE_SEND_PRIORITY: 5,
SENDER_QUEUE_CHECK_STATUS_PRIORITY: 1, SENDER_QUEUE_CHECK_STATUS_PRIORITY: 1,

@ -6,7 +6,7 @@ const logger = require('./services/logger')
const { getShutdownFlag } = require('./services/shutdownState') const { getShutdownFlag } = require('./services/shutdownState')
const { getBlockNumber, getRequiredBlockConfirmations, getEvents } = require('./tx/web3') const { getBlockNumber, getRequiredBlockConfirmations, getEvents } = require('./tx/web3')
const { checkHTTPS, watchdog } = require('./utils/utils') const { checkHTTPS, watchdog } = require('./utils/utils')
const { EXIT_CODES } = require('./utils/constants') const { EXIT_CODES, BLOCK_NUMBER_PROGRESS_ITERATIONS_LIMIT } = require('./utils/constants')
if (process.argv.length < 3) { if (process.argv.length < 3) {
logger.error('Please check the number of arguments, config file was not provided') logger.error('Please check the number of arguments, config file was not provided')
@ -29,12 +29,14 @@ const { getTokensState } = require('./utils/tokenState')
const { web3, bridgeContract, eventContract, startBlock, pollingInterval, chain } = config.main const { web3, bridgeContract, eventContract, startBlock, pollingInterval, chain } = config.main
const lastBlockRedisKey = `${config.id}:lastProcessedBlock` const lastBlockRedisKey = `${config.id}:lastProcessedBlock`
let lastProcessedBlock = Math.max(startBlock - 1, 0) let lastProcessedBlock = Math.max(startBlock - 1, 0)
let lastSeenBlockNumber = 0
let sameBlockNumberCounter = 0
async function initialize() { async function initialize() {
try { try {
const checkHttps = checkHTTPS(process.env.ORACLE_ALLOW_HTTP_FOR_RPC, logger) const checkHttps = checkHTTPS(process.env.ORACLE_ALLOW_HTTP_FOR_RPC, logger)
web3.currentProvider.subProvider.urls.forEach(checkHttps(chain)) web3.currentProvider.urls.forEach(checkHttps(chain))
await getLastProcessedBlock() await getLastProcessedBlock()
connectWatcherToQueue({ connectWatcherToQueue({
@ -117,6 +119,28 @@ async function getLastBlockToProcess(web3, bridgeContract) {
getBlockNumber(web3), getBlockNumber(web3),
getRequiredBlockConfirmations(bridgeContract) getRequiredBlockConfirmations(bridgeContract)
]) ])
if (lastBlockNumber < lastSeenBlockNumber) {
sameBlockNumberCounter = 0
logger.warn({ lastBlockNumber, lastSeenBlockNumber }, 'Received block number less than already seen block')
web3.currentProvider.switchToFallbackRPC()
} else if (lastBlockNumber === lastSeenBlockNumber) {
sameBlockNumberCounter++
if (sameBlockNumberCounter > 1) {
logger.info({ lastBlockNumber, sameBlockNumberCounter }, 'Received the same block number more than twice')
if (sameBlockNumberCounter >= BLOCK_NUMBER_PROGRESS_ITERATIONS_LIMIT) {
sameBlockNumberCounter = 0
logger.warn(
{ lastBlockNumber, n: BLOCK_NUMBER_PROGRESS_ITERATIONS_LIMIT },
'Received the same block number for too many times. Probably node is not synced anymore'
)
web3.currentProvider.switchToFallbackRPC()
}
}
} else {
sameBlockNumberCounter = 0
lastSeenBlockNumber = lastBlockNumber
}
return lastBlockNumber - requiredBlockConfirmations return lastBlockNumber - requiredBlockConfirmations
} }