From 7405b8b360266147837a6013af347e714576782b Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Sat, 27 May 2023 04:14:29 -0700 Subject: [PATCH] refactor poller --- proxyd/proxyd/consensus_poller.go | 220 +++++++++++++++--------------- 1 file changed, 113 insertions(+), 107 deletions(-) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index 57bdbb3..decacd3 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -34,8 +34,7 @@ type ConsensusPoller struct { tracker ConsensusTracker asyncHandler ConsensusAsyncHandler - minPeerCount uint64 - + minPeerCount uint64 banPeriod time.Duration maxUpdateThreshold time.Duration maxBlockLag uint64 @@ -220,7 +219,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller banPeriod: 5 * time.Minute, maxUpdateThreshold: 30 * time.Second, - maxBlockLag: 8, // quarter of an epoch, 8*12 seconds = 96 seconds ~ 1.6 minutes + maxBlockLag: 8, // 8*12 seconds = 96 seconds ~ 1.6 minutes minPeerCount: 3, } @@ -253,12 +252,11 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { // if backend is not healthy state we'll only resume checking it after ban if !be.IsHealthy() { - log.Warn("backend banned - not online or not healthy", "backend", be.Name) + log.Warn("backend banned - not healthy", "backend", be.Name) cp.Ban(be) return } - // if backend it not in sync we'll check again after ban inSync, err := cp.isInSync(ctx, be) RecordConsensusBackendInSync(be, err == nil && inSync) if err != nil { @@ -276,21 +274,27 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest") if err != nil { - log.Warn("error updating backend", "name", be.Name, "err", err) + log.Warn("error updating backend - latest block", "name", be.Name, "err", err) } safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe") if err != nil { - log.Warn("error updating backend", "name", be.Name, "err", err) + log.Warn("error updating backend - safe block", "name", be.Name, "err", err) } finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized") if err != nil { - log.Warn("error updating backend", "name", be.Name, "err", err) + log.Warn("error updating backend - finalized block", "name", be.Name, "err", err) } - _, _, _, _, oldFinalized, oldSafe, _, _ := cp.getBackendState(be) - expectedBlockTags := cp.checkExpectedBlockTags(finalizedBlockNumber, oldFinalized, safeBlockNumber, oldSafe, latestBlockNumber) + bs := cp.getBackendState(be) + oldFinalized := bs.finalizedBlockNumber + oldSafe := bs.safeBlockNumber + + expectedBlockTags := cp.checkExpectedBlockTags( + finalizedBlockNumber, oldFinalized, + safeBlockNumber, oldSafe, + latestBlockNumber) changed, updateDelay := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash, @@ -342,116 +346,108 @@ func (cp *ConsensusPoller) checkExpectedBlockTags(currentFinalized hexutil.Uint6 // UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { - var highestLatestBlock hexutil.Uint64 - - var lowestLatestBlock hexutil.Uint64 - var lowestLatestBlockHash string - - var lowestFinalizedBlock hexutil.Uint64 - var lowestSafeBlock hexutil.Uint64 - + // get the latest block number from the tracker currentConsensusBlockNumber := cp.GetLatestBlockNumber() - // find the highest block, in order to use it defining the highest non-lagging ancestor block + // find out what backends are the candidates to be in the consensus group + // and create a copy of current their state + // + // a serving node needs to be: + // - not banned + // - healthy (network latency and error rate) + // - with minimum peer count + // - in sync + // - updated recently + // - not lagging latest block + + candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends)) + filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends)) for _, be := range cp.backendGroup.Backends { - peerCount, inSync, backendLatestBlockNumber, _, _, _, lastUpdate, _ := cp.getBackendState(be) + bs := cp.getBackendState(be) + passed := true + if time.Now().Before(bs.bannedUntil) { + passed = false + } + if !be.IsHealthy() { + passed = false + } + if !be.skipPeerCountCheck && bs.peerCount < cp.minPeerCount { + passed = false + } + if !bs.inSync { + passed = false + } + if bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { + passed = false + } + if passed { + candidates[be] = bs + } else { + filteredBackendsNames = append(filteredBackendsNames, be.Name) + } + } - if cp.IsBanned(be) { - continue - } - if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { - continue - } - if !inSync { - continue - } - if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { - continue - } - - if backendLatestBlockNumber > highestLatestBlock { - highestLatestBlock = backendLatestBlockNumber + // find the highest block, in order to use it defining the highest non-lagging ancestor block + var highestLatestBlock hexutil.Uint64 + for _, bs := range candidates { + if bs.latestBlockNumber > highestLatestBlock { + highestLatestBlock = bs.latestBlockNumber } } // find the highest common ancestor block - for _, be := range cp.backendGroup.Backends { - peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, backendFinalizedBlockNumber, backendSafeBlockNumber, lastUpdate, _ := cp.getBackendState(be) - - if cp.IsBanned(be) { - continue - } - if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { - continue - } - if !inSync { - continue - } - if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { - continue - } - + var lowestLatestBlock hexutil.Uint64 + var lowestLatestBlockHash string + var lowestFinalizedBlock hexutil.Uint64 + var lowestSafeBlock hexutil.Uint64 + lagging := make([]*Backend, 0, len(candidates)) + for be, bs := range candidates { // check if backend is lagging behind the highest block - if backendLatestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-backendLatestBlockNumber) > cp.maxBlockLag { + if bs.latestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag { + lagging = append(lagging, be) continue } - if lowestLatestBlock == 0 || backendLatestBlockNumber < lowestLatestBlock { - lowestLatestBlock = backendLatestBlockNumber - lowestLatestBlockHash = backendLatestBlockHash + // update the lowest common ancestor block + if lowestLatestBlock == 0 || bs.latestBlockNumber < lowestLatestBlock { + lowestLatestBlock = bs.latestBlockNumber + lowestLatestBlockHash = bs.latestBlockHash } - if lowestFinalizedBlock == 0 || backendFinalizedBlockNumber < lowestFinalizedBlock { - lowestFinalizedBlock = backendFinalizedBlockNumber + // update the lowest finalized block + if lowestFinalizedBlock == 0 || bs.finalizedBlockNumber < lowestFinalizedBlock { + lowestFinalizedBlock = bs.finalizedBlockNumber } - if lowestSafeBlock == 0 || backendSafeBlockNumber < lowestSafeBlock { - lowestSafeBlock = backendSafeBlockNumber + // update the lowest safe block + if lowestSafeBlock == 0 || bs.safeBlockNumber < lowestSafeBlock { + lowestSafeBlock = bs.safeBlockNumber } } + // remove lagging backends from the candidates + for _, be := range lagging { + filteredBackendsNames = append(filteredBackendsNames, be.Name) + delete(candidates, be) + } + + // find the proposed block among the candidates proposedBlock := lowestLatestBlock proposedBlockHash := lowestLatestBlockHash hasConsensus := false - // check if everybody agrees on the same block hash - consensusBackends := make([]*Backend, 0, len(cp.backendGroup.Backends)) - consensusBackendsNames := make([]string, 0, len(cp.backendGroup.Backends)) - filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends)) - if lowestLatestBlock > currentConsensusBlockNumber { log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock) } // if there is no block to propose, the consensus is automatically broken + // this can happen when backends have just recovered broken := proposedBlock == 0 && currentConsensusBlockNumber > 0 if proposedBlock > 0 { for !hasConsensus { allAgreed := true - consensusBackends = consensusBackends[:0] - filteredBackendsNames = filteredBackendsNames[:0] - for _, be := range cp.backendGroup.Backends { - /* - a serving node needs to be: - - healthy (network) - - updated recently - - not banned - - with minimum peer count - - not lagging latest block - - in sync - */ - - peerCount, inSync, latestBlockNumber, _, _, _, lastUpdate, bannedUntil := cp.getBackendState(be) - notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) - isBanned := time.Now().Before(bannedUntil) - notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount - lagging := latestBlockNumber < proposedBlock - if !be.IsHealthy() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync { - filteredBackendsNames = append(filteredBackendsNames, be.Name) - continue - } - + for be, _ := range candidates { actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String()) if err != nil { log.Warn("error updating backend", "name", be.Name, "err", err) @@ -469,8 +465,6 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { allAgreed = false break } - consensusBackends = append(consensusBackends, be) - consensusBackendsNames = append(consensusBackendsNames, be.Name) } if allAgreed { hasConsensus = true @@ -488,26 +482,39 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { for _, l := range cp.listeners { l() } - log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash) + log.Info("consensus broken", + "currentConsensusBlockNumber", currentConsensusBlockNumber, + "proposedBlock", proposedBlock, + "proposedBlockHash", proposedBlockHash) } cp.tracker.SetLatestBlockNumber(proposedBlock) cp.tracker.SetSafeBlockNumber(lowestSafeBlock) cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock) + // update consensus group + group := make([]*Backend, 0, len(candidates)) + consensusBackendsNames := make([]string, 0, len(candidates)) + for be, _ := range candidates { + group = append(group, be) + consensusBackendsNames = append(consensusBackendsNames, be.Name) + } cp.consensusGroupMux.Lock() - cp.consensusGroup = consensusBackends + cp.consensusGroup = group cp.consensusGroupMux.Unlock() RecordGroupConsensusLatestBlock(cp.backendGroup, proposedBlock) RecordGroupConsensusSafeBlock(cp.backendGroup, lowestSafeBlock) RecordGroupConsensusFinalizedBlock(cp.backendGroup, lowestFinalizedBlock) - RecordGroupConsensusCount(cp.backendGroup, len(consensusBackends)) + RecordGroupConsensusCount(cp.backendGroup, len(group)) RecordGroupConsensusFilteredCount(cp.backendGroup, len(filteredBackendsNames)) RecordGroupTotalCount(cp.backendGroup, len(cp.backendGroup.Backends)) - log.Debug("group state", "proposedBlock", proposedBlock, "consensusBackends", strings.Join(consensusBackendsNames, ", "), "filteredBackends", strings.Join(filteredBackendsNames, ", ")) + log.Debug("group state", + "proposedBlock", proposedBlock, + "consensusBackends", strings.Join(consensusBackendsNames, ", "), + "filteredBackends", strings.Join(filteredBackendsNames, ", ")) } // IsBanned checks if a specific backend is banned @@ -606,23 +613,22 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo return res, nil } -func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool, - latestBlockNumber hexutil.Uint64, latestBlockHash string, - finalizedBlockNumber hexutil.Uint64, - safeBlockNumber hexutil.Uint64, - lastUpdate time.Time, bannedUntil time.Time) { +func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState { bs := cp.backendState[be] defer bs.backendStateMux.Unlock() bs.backendStateMux.Lock() - peerCount = bs.peerCount - inSync = bs.inSync - latestBlockNumber = bs.latestBlockNumber - latestBlockHash = bs.latestBlockHash - finalizedBlockNumber = bs.finalizedBlockNumber - safeBlockNumber = bs.safeBlockNumber - lastUpdate = bs.lastUpdate - bannedUntil = bs.bannedUntil - return + + // we return a copy so that the caller can use it without locking + return &backendState{ + latestBlockNumber: bs.latestBlockNumber, + latestBlockHash: bs.latestBlockHash, + safeBlockNumber: bs.safeBlockNumber, + finalizedBlockNumber: bs.finalizedBlockNumber, + peerCount: bs.peerCount, + inSync: bs.inSync, + lastUpdate: bs.lastUpdate, + bannedUntil: bs.bannedUntil, + } } func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool,