extract candidates logic, refactor update backend

This commit is contained in:
Felipe Andrade 2023-05-27 04:31:05 -07:00
parent 7405b8b360
commit 8436728419
2 changed files with 124 additions and 103 deletions

@ -57,6 +57,10 @@ type backendState struct {
bannedUntil time.Time bannedUntil time.Time
} }
func (bs backendState) IsBanned() bool {
return time.Now().Before(bs.bannedUntil)
}
// GetConsensusGroup returns the backend members that are agreeing in a consensus // GetConsensusGroup returns the backend members that are agreeing in a consensus
func (cp *ConsensusPoller) GetConsensusGroup() []*Backend { func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
defer cp.consensusGroupMux.Unlock() defer cp.consensusGroupMux.Unlock()
@ -242,10 +246,10 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
// UpdateBackend refreshes the consensus state of a single backend // UpdateBackend refreshes the consensus state of a single backend
func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
banned := cp.IsBanned(be) bs := cp.getBackendState(be)
RecordConsensusBackendBanned(be, banned) RecordConsensusBackendBanned(be, bs.IsBanned())
if banned { if bs.IsBanned() {
log.Debug("skipping backend - banned", "backend", be.Name) log.Debug("skipping backend - banned", "backend", be.Name)
return return
} }
@ -287,36 +291,19 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
log.Warn("error updating backend - finalized block", "name", be.Name, "err", err) log.Warn("error updating backend - finalized block", "name", be.Name, "err", err)
} }
bs := cp.getBackendState(be)
oldFinalized := bs.finalizedBlockNumber oldFinalized := bs.finalizedBlockNumber
oldSafe := bs.safeBlockNumber oldSafe := bs.safeBlockNumber
expectedBlockTags := cp.checkExpectedBlockTags( updateDelay := time.Since(bs.lastUpdate)
finalizedBlockNumber, oldFinalized, RecordConsensusBackendUpdateDelay(be, updateDelay)
safeBlockNumber, oldSafe,
latestBlockNumber)
changed, updateDelay := cp.setBackendState(be, peerCount, inSync, changed := cp.setBackendState(be, peerCount, inSync,
latestBlockNumber, latestBlockHash, latestBlockNumber, latestBlockHash,
finalizedBlockNumber, safeBlockNumber) finalizedBlockNumber, safeBlockNumber)
RecordBackendLatestBlock(be, latestBlockNumber) RecordBackendLatestBlock(be, latestBlockNumber)
RecordBackendSafeBlock(be, safeBlockNumber) RecordBackendSafeBlock(be, safeBlockNumber)
RecordBackendFinalizedBlock(be, finalizedBlockNumber) RecordBackendFinalizedBlock(be, finalizedBlockNumber)
RecordBackendUnexpectedBlockTags(be, !expectedBlockTags)
RecordConsensusBackendUpdateDelay(be, updateDelay)
if !expectedBlockTags {
log.Warn("backend banned - unexpected block tags",
"backend", be.Name,
"oldFinalized", oldFinalized,
"finalizedBlockNumber", finalizedBlockNumber,
"oldSafe", oldSafe,
"safeBlockNumber", safeBlockNumber,
"latestBlockNumber", latestBlockNumber,
)
cp.Ban(be)
}
if changed { if changed {
log.Debug("backend state updated", log.Debug("backend state updated",
@ -329,6 +316,26 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
"safeBlockNumber", safeBlockNumber, "safeBlockNumber", safeBlockNumber,
"updateDelay", updateDelay) "updateDelay", updateDelay)
} }
// sanity check for latest, safe and finalized block tags
expectedBlockTags := cp.checkExpectedBlockTags(
finalizedBlockNumber, oldFinalized,
safeBlockNumber, oldSafe,
latestBlockNumber)
RecordBackendUnexpectedBlockTags(be, !expectedBlockTags)
if !expectedBlockTags {
log.Warn("backend banned - unexpected block tags",
"backend", be.Name,
"oldFinalized", oldFinalized,
"finalizedBlockNumber", finalizedBlockNumber,
"oldSafe", oldSafe,
"safeBlockNumber", safeBlockNumber,
"latestBlockNumber", latestBlockNumber,
)
cp.Ban(be)
}
} }
// checkExpectedBlockTags for unexpected conditions on block tags // checkExpectedBlockTags for unexpected conditions on block tags
@ -349,101 +356,41 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
// get the latest block number from the tracker // get the latest block number from the tracker
currentConsensusBlockNumber := cp.GetLatestBlockNumber() currentConsensusBlockNumber := cp.GetLatestBlockNumber()
// find out what backends are the candidates to be in the consensus group // get the candidates for the consensus group
// and create a copy of current their state candidates := cp.getConsensusCandidates()
//
// a serving node needs to be:
// - not banned
// - healthy (network latency and error rate)
// - with minimum peer count
// - in sync
// - updated recently
// - not lagging latest block
candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends)) // update the lowest latest block number and hash
filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends)) // the lowest safe block number
for _, be := range cp.backendGroup.Backends { // the lowest finalized block number
bs := cp.getBackendState(be)
passed := true
if time.Now().Before(bs.bannedUntil) {
passed = false
}
if !be.IsHealthy() {
passed = false
}
if !be.skipPeerCountCheck && bs.peerCount < cp.minPeerCount {
passed = false
}
if !bs.inSync {
passed = false
}
if bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
passed = false
}
if passed {
candidates[be] = bs
} else {
filteredBackendsNames = append(filteredBackendsNames, be.Name)
}
}
// find the highest block, in order to use it defining the highest non-lagging ancestor block
var highestLatestBlock hexutil.Uint64
for _, bs := range candidates {
if bs.latestBlockNumber > highestLatestBlock {
highestLatestBlock = bs.latestBlockNumber
}
}
// find the highest common ancestor block
var lowestLatestBlock hexutil.Uint64 var lowestLatestBlock hexutil.Uint64
var lowestLatestBlockHash string var lowestLatestBlockHash string
var lowestFinalizedBlock hexutil.Uint64 var lowestFinalizedBlock hexutil.Uint64
var lowestSafeBlock hexutil.Uint64 var lowestSafeBlock hexutil.Uint64
lagging := make([]*Backend, 0, len(candidates)) for _, bs := range candidates {
for be, bs := range candidates {
// check if backend is lagging behind the highest block
if bs.latestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag {
lagging = append(lagging, be)
continue
}
// update the lowest common ancestor block
if lowestLatestBlock == 0 || bs.latestBlockNumber < lowestLatestBlock { if lowestLatestBlock == 0 || bs.latestBlockNumber < lowestLatestBlock {
lowestLatestBlock = bs.latestBlockNumber lowestLatestBlock = bs.latestBlockNumber
lowestLatestBlockHash = bs.latestBlockHash lowestLatestBlockHash = bs.latestBlockHash
} }
// update the lowest finalized block
if lowestFinalizedBlock == 0 || bs.finalizedBlockNumber < lowestFinalizedBlock { if lowestFinalizedBlock == 0 || bs.finalizedBlockNumber < lowestFinalizedBlock {
lowestFinalizedBlock = bs.finalizedBlockNumber lowestFinalizedBlock = bs.finalizedBlockNumber
} }
// update the lowest safe block
if lowestSafeBlock == 0 || bs.safeBlockNumber < lowestSafeBlock { if lowestSafeBlock == 0 || bs.safeBlockNumber < lowestSafeBlock {
lowestSafeBlock = bs.safeBlockNumber lowestSafeBlock = bs.safeBlockNumber
} }
} }
// remove lagging backends from the candidates
for _, be := range lagging {
filteredBackendsNames = append(filteredBackendsNames, be.Name)
delete(candidates, be)
}
// find the proposed block among the candidates // find the proposed block among the candidates
// the proposed block needs have the same hash in the entire consensus group
proposedBlock := lowestLatestBlock proposedBlock := lowestLatestBlock
proposedBlockHash := lowestLatestBlockHash proposedBlockHash := lowestLatestBlockHash
hasConsensus := false hasConsensus := false
broken := false
if lowestLatestBlock > currentConsensusBlockNumber { if lowestLatestBlock > currentConsensusBlockNumber {
log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock) log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock)
} }
// if there is no block to propose, the consensus is automatically broken // if there is a block to propose, check if it is the same in all backends
// this can happen when backends have just recovered
broken := proposedBlock == 0 && currentConsensusBlockNumber > 0
if proposedBlock > 0 { if proposedBlock > 0 {
for !hasConsensus { for !hasConsensus {
allAgreed := true allAgreed := true
@ -459,7 +406,12 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash) blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash)
if blocksDontMatch { if blocksDontMatch {
if currentConsensusBlockNumber >= actualBlockNumber { if currentConsensusBlockNumber >= actualBlockNumber {
log.Warn("backend broke consensus", "name", be.Name, "blockNum", actualBlockNumber, "proposedBlockNum", proposedBlock, "blockHash", actualBlockHash, "proposedBlockHash", proposedBlockHash) log.Warn("backend broke consensus",
"name", be.Name,
"actualBlockNumber", actualBlockNumber,
"actualBlockHash", actualBlockHash,
"proposedBlock", proposedBlock,
"proposedBlockHash", proposedBlockHash)
broken = true broken = true
} }
allAgreed = false allAgreed = false
@ -488,6 +440,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
"proposedBlockHash", proposedBlockHash) "proposedBlockHash", proposedBlockHash)
} }
// update tracker
cp.tracker.SetLatestBlockNumber(proposedBlock) cp.tracker.SetLatestBlockNumber(proposedBlock)
cp.tracker.SetSafeBlockNumber(lowestSafeBlock) cp.tracker.SetSafeBlockNumber(lowestSafeBlock)
cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock) cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock)
@ -495,10 +448,17 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
// update consensus group // update consensus group
group := make([]*Backend, 0, len(candidates)) group := make([]*Backend, 0, len(candidates))
consensusBackendsNames := make([]string, 0, len(candidates)) consensusBackendsNames := make([]string, 0, len(candidates))
for be, _ := range candidates { filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
group = append(group, be) for _, be := range cp.backendGroup.Backends {
consensusBackendsNames = append(consensusBackendsNames, be.Name) _, exist := candidates[be]
if exist {
group = append(group, be)
consensusBackendsNames = append(consensusBackendsNames, be.Name)
} else {
filteredBackendsNames = append(filteredBackendsNames, be.Name)
}
} }
cp.consensusGroupMux.Lock() cp.consensusGroupMux.Lock()
cp.consensusGroup = group cp.consensusGroup = group
cp.consensusGroupMux.Unlock() cp.consensusGroupMux.Unlock()
@ -522,7 +482,7 @@ func (cp *ConsensusPoller) IsBanned(be *Backend) bool {
bs := cp.backendState[be] bs := cp.backendState[be]
defer bs.backendStateMux.Unlock() defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
return time.Now().Before(bs.bannedUntil) return bs.IsBanned()
} }
// Ban bans a specific backend // Ban bans a specific backend
@ -531,6 +491,9 @@ func (cp *ConsensusPoller) Ban(be *Backend) {
defer bs.backendStateMux.Unlock() defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
bs.bannedUntil = time.Now().Add(cp.banPeriod) bs.bannedUntil = time.Now().Add(cp.banPeriod)
// when we ban a node, we give it the chance to start from any block when it is back
bs.latestBlockNumber = 0
bs.safeBlockNumber = 0 bs.safeBlockNumber = 0
bs.finalizedBlockNumber = 0 bs.finalizedBlockNumber = 0
} }
@ -613,12 +576,12 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo
return res, nil return res, nil
} }
// getBackendState creates a copy of backend state so that the caller can use it without locking
func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState { func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState {
bs := cp.backendState[be] bs := cp.backendState[be]
defer bs.backendStateMux.Unlock() defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
// we return a copy so that the caller can use it without locking
return &backendState{ return &backendState{
latestBlockNumber: bs.latestBlockNumber, latestBlockNumber: bs.latestBlockNumber,
latestBlockHash: bs.latestBlockHash, latestBlockHash: bs.latestBlockHash,
@ -634,18 +597,76 @@ func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState {
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool,
latestBlockNumber hexutil.Uint64, latestBlockHash string, latestBlockNumber hexutil.Uint64, latestBlockHash string,
finalizedBlockNumber hexutil.Uint64, finalizedBlockNumber hexutil.Uint64,
safeBlockNumber hexutil.Uint64) (changed bool, updateDelay time.Duration) { safeBlockNumber hexutil.Uint64) bool {
bs := cp.backendState[be] bs := cp.backendState[be]
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
changed = bs.latestBlockHash != latestBlockHash changed := bs.latestBlockHash != latestBlockHash
bs.peerCount = peerCount bs.peerCount = peerCount
bs.inSync = inSync bs.inSync = inSync
bs.latestBlockNumber = latestBlockNumber bs.latestBlockNumber = latestBlockNumber
bs.latestBlockHash = latestBlockHash bs.latestBlockHash = latestBlockHash
bs.finalizedBlockNumber = finalizedBlockNumber bs.finalizedBlockNumber = finalizedBlockNumber
bs.safeBlockNumber = safeBlockNumber bs.safeBlockNumber = safeBlockNumber
updateDelay = time.Since(bs.lastUpdate)
bs.lastUpdate = time.Now() bs.lastUpdate = time.Now()
bs.backendStateMux.Unlock() bs.backendStateMux.Unlock()
return return changed
}
// getConsensusCandidates find out what backends are the candidates to be in the consensus group
// and create a copy of current their state
//
// a candidate is a serving node within the following conditions:
// - not banned
// - healthy (network latency and error rate)
// - with minimum peer count
// - in sync
// - updated recently
// - not lagging latest block
func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState {
candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends))
for _, be := range cp.backendGroup.Backends {
bs := cp.getBackendState(be)
if bs.IsBanned() {
continue
}
if !be.IsHealthy() {
continue
}
if !be.skipPeerCountCheck && bs.peerCount < cp.minPeerCount {
continue
}
if !bs.inSync {
continue
}
if bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue
}
candidates[be] = bs
}
// find the highest block, in order to use it defining the highest non-lagging ancestor block
var highestLatestBlock hexutil.Uint64
for _, bs := range candidates {
if bs.latestBlockNumber > highestLatestBlock {
highestLatestBlock = bs.latestBlockNumber
}
}
// find the highest common ancestor block
lagging := make([]*Backend, 0, len(candidates))
for be, bs := range candidates {
// check if backend is lagging behind the highest block
if bs.latestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag {
lagging = append(lagging, be)
}
}
// remove lagging backends from the candidates
for _, be := range lagging {
delete(candidates, be)
}
return candidates
} }

@ -428,7 +428,7 @@ func TestConsensus(t *testing.T) {
require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String()) require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String())
}) })
t.Run("latest dropped below safe, and stayed inconsistent after ban", func(t *testing.T) { t.Run("latest dropped below safe, and stayed inconsistent", func(t *testing.T) {
reset() reset()
useOnlyNode1() useOnlyNode1()
overrideBlock("node1", "latest", "0xd1") overrideBlock("node1", "latest", "0xd1")