From 8436728419cf868ec291fdd29bf008c11419d538 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Sat, 27 May 2023 04:31:05 -0700 Subject: [PATCH] extract candidates logic, refactor update backend --- proxyd/proxyd/consensus_poller.go | 225 ++++++++++-------- .../integration_tests/consensus_test.go | 2 +- 2 files changed, 124 insertions(+), 103 deletions(-) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index decacd3..9ed28a4 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -57,6 +57,10 @@ type backendState struct { bannedUntil time.Time } +func (bs backendState) IsBanned() bool { + return time.Now().Before(bs.bannedUntil) +} + // GetConsensusGroup returns the backend members that are agreeing in a consensus func (cp *ConsensusPoller) GetConsensusGroup() []*Backend { defer cp.consensusGroupMux.Unlock() @@ -242,10 +246,10 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller // UpdateBackend refreshes the consensus state of a single backend func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { - banned := cp.IsBanned(be) - RecordConsensusBackendBanned(be, banned) + bs := cp.getBackendState(be) + RecordConsensusBackendBanned(be, bs.IsBanned()) - if banned { + if bs.IsBanned() { log.Debug("skipping backend - banned", "backend", be.Name) return } @@ -287,36 +291,19 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { log.Warn("error updating backend - finalized block", "name", be.Name, "err", err) } - bs := cp.getBackendState(be) oldFinalized := bs.finalizedBlockNumber oldSafe := bs.safeBlockNumber - expectedBlockTags := cp.checkExpectedBlockTags( - finalizedBlockNumber, oldFinalized, - safeBlockNumber, oldSafe, - latestBlockNumber) + updateDelay := time.Since(bs.lastUpdate) + RecordConsensusBackendUpdateDelay(be, updateDelay) - changed, updateDelay := cp.setBackendState(be, peerCount, inSync, + changed := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash, finalizedBlockNumber, safeBlockNumber) RecordBackendLatestBlock(be, latestBlockNumber) RecordBackendSafeBlock(be, safeBlockNumber) RecordBackendFinalizedBlock(be, finalizedBlockNumber) - RecordBackendUnexpectedBlockTags(be, !expectedBlockTags) - RecordConsensusBackendUpdateDelay(be, updateDelay) - - if !expectedBlockTags { - log.Warn("backend banned - unexpected block tags", - "backend", be.Name, - "oldFinalized", oldFinalized, - "finalizedBlockNumber", finalizedBlockNumber, - "oldSafe", oldSafe, - "safeBlockNumber", safeBlockNumber, - "latestBlockNumber", latestBlockNumber, - ) - cp.Ban(be) - } if changed { log.Debug("backend state updated", @@ -329,6 +316,26 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { "safeBlockNumber", safeBlockNumber, "updateDelay", updateDelay) } + + // sanity check for latest, safe and finalized block tags + expectedBlockTags := cp.checkExpectedBlockTags( + finalizedBlockNumber, oldFinalized, + safeBlockNumber, oldSafe, + latestBlockNumber) + + RecordBackendUnexpectedBlockTags(be, !expectedBlockTags) + + if !expectedBlockTags { + log.Warn("backend banned - unexpected block tags", + "backend", be.Name, + "oldFinalized", oldFinalized, + "finalizedBlockNumber", finalizedBlockNumber, + "oldSafe", oldSafe, + "safeBlockNumber", safeBlockNumber, + "latestBlockNumber", latestBlockNumber, + ) + cp.Ban(be) + } } // checkExpectedBlockTags for unexpected conditions on block tags @@ -349,101 +356,41 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { // get the latest block number from the tracker currentConsensusBlockNumber := cp.GetLatestBlockNumber() - // find out what backends are the candidates to be in the consensus group - // and create a copy of current their state - // - // a serving node needs to be: - // - not banned - // - healthy (network latency and error rate) - // - with minimum peer count - // - in sync - // - updated recently - // - not lagging latest block + // get the candidates for the consensus group + candidates := cp.getConsensusCandidates() - candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends)) - filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends)) - for _, be := range cp.backendGroup.Backends { - bs := cp.getBackendState(be) - passed := true - if time.Now().Before(bs.bannedUntil) { - passed = false - } - if !be.IsHealthy() { - passed = false - } - if !be.skipPeerCountCheck && bs.peerCount < cp.minPeerCount { - passed = false - } - if !bs.inSync { - passed = false - } - if bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { - passed = false - } - if passed { - candidates[be] = bs - } else { - filteredBackendsNames = append(filteredBackendsNames, be.Name) - } - } - - // find the highest block, in order to use it defining the highest non-lagging ancestor block - var highestLatestBlock hexutil.Uint64 - for _, bs := range candidates { - if bs.latestBlockNumber > highestLatestBlock { - highestLatestBlock = bs.latestBlockNumber - } - } - - // find the highest common ancestor block + // update the lowest latest block number and hash + // the lowest safe block number + // the lowest finalized block number var lowestLatestBlock hexutil.Uint64 var lowestLatestBlockHash string var lowestFinalizedBlock hexutil.Uint64 var lowestSafeBlock hexutil.Uint64 - lagging := make([]*Backend, 0, len(candidates)) - for be, bs := range candidates { - // check if backend is lagging behind the highest block - if bs.latestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag { - lagging = append(lagging, be) - continue - } - - // update the lowest common ancestor block + for _, bs := range candidates { if lowestLatestBlock == 0 || bs.latestBlockNumber < lowestLatestBlock { lowestLatestBlock = bs.latestBlockNumber lowestLatestBlockHash = bs.latestBlockHash } - - // update the lowest finalized block if lowestFinalizedBlock == 0 || bs.finalizedBlockNumber < lowestFinalizedBlock { lowestFinalizedBlock = bs.finalizedBlockNumber } - - // update the lowest safe block if lowestSafeBlock == 0 || bs.safeBlockNumber < lowestSafeBlock { lowestSafeBlock = bs.safeBlockNumber } } - // remove lagging backends from the candidates - for _, be := range lagging { - filteredBackendsNames = append(filteredBackendsNames, be.Name) - delete(candidates, be) - } - // find the proposed block among the candidates + // the proposed block needs have the same hash in the entire consensus group proposedBlock := lowestLatestBlock proposedBlockHash := lowestLatestBlockHash hasConsensus := false + broken := false if lowestLatestBlock > currentConsensusBlockNumber { log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock) } - // if there is no block to propose, the consensus is automatically broken - // this can happen when backends have just recovered - broken := proposedBlock == 0 && currentConsensusBlockNumber > 0 - + // if there is a block to propose, check if it is the same in all backends if proposedBlock > 0 { for !hasConsensus { allAgreed := true @@ -459,7 +406,12 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash) if blocksDontMatch { if currentConsensusBlockNumber >= actualBlockNumber { - log.Warn("backend broke consensus", "name", be.Name, "blockNum", actualBlockNumber, "proposedBlockNum", proposedBlock, "blockHash", actualBlockHash, "proposedBlockHash", proposedBlockHash) + log.Warn("backend broke consensus", + "name", be.Name, + "actualBlockNumber", actualBlockNumber, + "actualBlockHash", actualBlockHash, + "proposedBlock", proposedBlock, + "proposedBlockHash", proposedBlockHash) broken = true } allAgreed = false @@ -488,6 +440,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { "proposedBlockHash", proposedBlockHash) } + // update tracker cp.tracker.SetLatestBlockNumber(proposedBlock) cp.tracker.SetSafeBlockNumber(lowestSafeBlock) cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock) @@ -495,10 +448,17 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { // update consensus group group := make([]*Backend, 0, len(candidates)) consensusBackendsNames := make([]string, 0, len(candidates)) - for be, _ := range candidates { - group = append(group, be) - consensusBackendsNames = append(consensusBackendsNames, be.Name) + filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends)) + for _, be := range cp.backendGroup.Backends { + _, exist := candidates[be] + if exist { + group = append(group, be) + consensusBackendsNames = append(consensusBackendsNames, be.Name) + } else { + filteredBackendsNames = append(filteredBackendsNames, be.Name) + } } + cp.consensusGroupMux.Lock() cp.consensusGroup = group cp.consensusGroupMux.Unlock() @@ -522,7 +482,7 @@ func (cp *ConsensusPoller) IsBanned(be *Backend) bool { bs := cp.backendState[be] defer bs.backendStateMux.Unlock() bs.backendStateMux.Lock() - return time.Now().Before(bs.bannedUntil) + return bs.IsBanned() } // Ban bans a specific backend @@ -531,6 +491,9 @@ func (cp *ConsensusPoller) Ban(be *Backend) { defer bs.backendStateMux.Unlock() bs.backendStateMux.Lock() bs.bannedUntil = time.Now().Add(cp.banPeriod) + + // when we ban a node, we give it the chance to start from any block when it is back + bs.latestBlockNumber = 0 bs.safeBlockNumber = 0 bs.finalizedBlockNumber = 0 } @@ -613,12 +576,12 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo return res, nil } +// getBackendState creates a copy of backend state so that the caller can use it without locking func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState { bs := cp.backendState[be] defer bs.backendStateMux.Unlock() bs.backendStateMux.Lock() - // we return a copy so that the caller can use it without locking return &backendState{ latestBlockNumber: bs.latestBlockNumber, latestBlockHash: bs.latestBlockHash, @@ -634,18 +597,76 @@ func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState { func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, latestBlockNumber hexutil.Uint64, latestBlockHash string, finalizedBlockNumber hexutil.Uint64, - safeBlockNumber hexutil.Uint64) (changed bool, updateDelay time.Duration) { + safeBlockNumber hexutil.Uint64) bool { bs := cp.backendState[be] bs.backendStateMux.Lock() - changed = bs.latestBlockHash != latestBlockHash + changed := bs.latestBlockHash != latestBlockHash bs.peerCount = peerCount bs.inSync = inSync bs.latestBlockNumber = latestBlockNumber bs.latestBlockHash = latestBlockHash bs.finalizedBlockNumber = finalizedBlockNumber bs.safeBlockNumber = safeBlockNumber - updateDelay = time.Since(bs.lastUpdate) bs.lastUpdate = time.Now() bs.backendStateMux.Unlock() - return + return changed +} + +// getConsensusCandidates find out what backends are the candidates to be in the consensus group +// and create a copy of current their state +// +// a candidate is a serving node within the following conditions: +// - not banned +// - healthy (network latency and error rate) +// - with minimum peer count +// - in sync +// - updated recently +// - not lagging latest block +func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState { + candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends)) + + for _, be := range cp.backendGroup.Backends { + bs := cp.getBackendState(be) + if bs.IsBanned() { + continue + } + if !be.IsHealthy() { + continue + } + if !be.skipPeerCountCheck && bs.peerCount < cp.minPeerCount { + continue + } + if !bs.inSync { + continue + } + if bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { + continue + } + + candidates[be] = bs + } + + // find the highest block, in order to use it defining the highest non-lagging ancestor block + var highestLatestBlock hexutil.Uint64 + for _, bs := range candidates { + if bs.latestBlockNumber > highestLatestBlock { + highestLatestBlock = bs.latestBlockNumber + } + } + + // find the highest common ancestor block + lagging := make([]*Backend, 0, len(candidates)) + for be, bs := range candidates { + // check if backend is lagging behind the highest block + if bs.latestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag { + lagging = append(lagging, be) + } + } + + // remove lagging backends from the candidates + for _, be := range lagging { + delete(candidates, be) + } + + return candidates } diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index e8927d2..98196c9 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -428,7 +428,7 @@ func TestConsensus(t *testing.T) { require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String()) }) - t.Run("latest dropped below safe, and stayed inconsistent after ban", func(t *testing.T) { + t.Run("latest dropped below safe, and stayed inconsistent", func(t *testing.T) { reset() useOnlyNode1() overrideBlock("node1", "latest", "0xd1")