refactor poller

This commit is contained in:
Felipe Andrade 2023-05-27 04:14:29 -07:00
parent 1f657b5f46
commit 7405b8b360

@ -34,8 +34,7 @@ type ConsensusPoller struct {
tracker ConsensusTracker tracker ConsensusTracker
asyncHandler ConsensusAsyncHandler asyncHandler ConsensusAsyncHandler
minPeerCount uint64 minPeerCount uint64
banPeriod time.Duration banPeriod time.Duration
maxUpdateThreshold time.Duration maxUpdateThreshold time.Duration
maxBlockLag uint64 maxBlockLag uint64
@ -220,7 +219,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
banPeriod: 5 * time.Minute, banPeriod: 5 * time.Minute,
maxUpdateThreshold: 30 * time.Second, maxUpdateThreshold: 30 * time.Second,
maxBlockLag: 8, // quarter of an epoch, 8*12 seconds = 96 seconds ~ 1.6 minutes maxBlockLag: 8, // 8*12 seconds = 96 seconds ~ 1.6 minutes
minPeerCount: 3, minPeerCount: 3,
} }
@ -253,12 +252,11 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
// if backend is not healthy state we'll only resume checking it after ban // if backend is not healthy state we'll only resume checking it after ban
if !be.IsHealthy() { if !be.IsHealthy() {
log.Warn("backend banned - not online or not healthy", "backend", be.Name) log.Warn("backend banned - not healthy", "backend", be.Name)
cp.Ban(be) cp.Ban(be)
return return
} }
// if backend it not in sync we'll check again after ban
inSync, err := cp.isInSync(ctx, be) inSync, err := cp.isInSync(ctx, be)
RecordConsensusBackendInSync(be, err == nil && inSync) RecordConsensusBackendInSync(be, err == nil && inSync)
if err != nil { if err != nil {
@ -276,21 +274,27 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest") latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest")
if err != nil { if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err) log.Warn("error updating backend - latest block", "name", be.Name, "err", err)
} }
safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe") safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe")
if err != nil { if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err) log.Warn("error updating backend - safe block", "name", be.Name, "err", err)
} }
finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized") finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized")
if err != nil { if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err) log.Warn("error updating backend - finalized block", "name", be.Name, "err", err)
} }
_, _, _, _, oldFinalized, oldSafe, _, _ := cp.getBackendState(be) bs := cp.getBackendState(be)
expectedBlockTags := cp.checkExpectedBlockTags(finalizedBlockNumber, oldFinalized, safeBlockNumber, oldSafe, latestBlockNumber) oldFinalized := bs.finalizedBlockNumber
oldSafe := bs.safeBlockNumber
expectedBlockTags := cp.checkExpectedBlockTags(
finalizedBlockNumber, oldFinalized,
safeBlockNumber, oldSafe,
latestBlockNumber)
changed, updateDelay := cp.setBackendState(be, peerCount, inSync, changed, updateDelay := cp.setBackendState(be, peerCount, inSync,
latestBlockNumber, latestBlockHash, latestBlockNumber, latestBlockHash,
@ -342,116 +346,108 @@ func (cp *ConsensusPoller) checkExpectedBlockTags(currentFinalized hexutil.Uint6
// UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends // UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends
func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
var highestLatestBlock hexutil.Uint64 // get the latest block number from the tracker
var lowestLatestBlock hexutil.Uint64
var lowestLatestBlockHash string
var lowestFinalizedBlock hexutil.Uint64
var lowestSafeBlock hexutil.Uint64
currentConsensusBlockNumber := cp.GetLatestBlockNumber() currentConsensusBlockNumber := cp.GetLatestBlockNumber()
// find the highest block, in order to use it defining the highest non-lagging ancestor block // find out what backends are the candidates to be in the consensus group
// and create a copy of current their state
//
// a serving node needs to be:
// - not banned
// - healthy (network latency and error rate)
// - with minimum peer count
// - in sync
// - updated recently
// - not lagging latest block
candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends))
filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
for _, be := range cp.backendGroup.Backends { for _, be := range cp.backendGroup.Backends {
peerCount, inSync, backendLatestBlockNumber, _, _, _, lastUpdate, _ := cp.getBackendState(be) bs := cp.getBackendState(be)
passed := true
if time.Now().Before(bs.bannedUntil) {
passed = false
}
if !be.IsHealthy() {
passed = false
}
if !be.skipPeerCountCheck && bs.peerCount < cp.minPeerCount {
passed = false
}
if !bs.inSync {
passed = false
}
if bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
passed = false
}
if passed {
candidates[be] = bs
} else {
filteredBackendsNames = append(filteredBackendsNames, be.Name)
}
}
if cp.IsBanned(be) { // find the highest block, in order to use it defining the highest non-lagging ancestor block
continue var highestLatestBlock hexutil.Uint64
} for _, bs := range candidates {
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { if bs.latestBlockNumber > highestLatestBlock {
continue highestLatestBlock = bs.latestBlockNumber
}
if !inSync {
continue
}
if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue
}
if backendLatestBlockNumber > highestLatestBlock {
highestLatestBlock = backendLatestBlockNumber
} }
} }
// find the highest common ancestor block // find the highest common ancestor block
for _, be := range cp.backendGroup.Backends { var lowestLatestBlock hexutil.Uint64
peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, backendFinalizedBlockNumber, backendSafeBlockNumber, lastUpdate, _ := cp.getBackendState(be) var lowestLatestBlockHash string
var lowestFinalizedBlock hexutil.Uint64
if cp.IsBanned(be) { var lowestSafeBlock hexutil.Uint64
continue lagging := make([]*Backend, 0, len(candidates))
} for be, bs := range candidates {
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
continue
}
if !inSync {
continue
}
if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue
}
// check if backend is lagging behind the highest block // check if backend is lagging behind the highest block
if backendLatestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-backendLatestBlockNumber) > cp.maxBlockLag { if bs.latestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag {
lagging = append(lagging, be)
continue continue
} }
if lowestLatestBlock == 0 || backendLatestBlockNumber < lowestLatestBlock { // update the lowest common ancestor block
lowestLatestBlock = backendLatestBlockNumber if lowestLatestBlock == 0 || bs.latestBlockNumber < lowestLatestBlock {
lowestLatestBlockHash = backendLatestBlockHash lowestLatestBlock = bs.latestBlockNumber
lowestLatestBlockHash = bs.latestBlockHash
} }
if lowestFinalizedBlock == 0 || backendFinalizedBlockNumber < lowestFinalizedBlock { // update the lowest finalized block
lowestFinalizedBlock = backendFinalizedBlockNumber if lowestFinalizedBlock == 0 || bs.finalizedBlockNumber < lowestFinalizedBlock {
lowestFinalizedBlock = bs.finalizedBlockNumber
} }
if lowestSafeBlock == 0 || backendSafeBlockNumber < lowestSafeBlock { // update the lowest safe block
lowestSafeBlock = backendSafeBlockNumber if lowestSafeBlock == 0 || bs.safeBlockNumber < lowestSafeBlock {
lowestSafeBlock = bs.safeBlockNumber
} }
} }
// remove lagging backends from the candidates
for _, be := range lagging {
filteredBackendsNames = append(filteredBackendsNames, be.Name)
delete(candidates, be)
}
// find the proposed block among the candidates
proposedBlock := lowestLatestBlock proposedBlock := lowestLatestBlock
proposedBlockHash := lowestLatestBlockHash proposedBlockHash := lowestLatestBlockHash
hasConsensus := false hasConsensus := false
// check if everybody agrees on the same block hash
consensusBackends := make([]*Backend, 0, len(cp.backendGroup.Backends))
consensusBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
if lowestLatestBlock > currentConsensusBlockNumber { if lowestLatestBlock > currentConsensusBlockNumber {
log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock) log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock)
} }
// if there is no block to propose, the consensus is automatically broken // if there is no block to propose, the consensus is automatically broken
// this can happen when backends have just recovered
broken := proposedBlock == 0 && currentConsensusBlockNumber > 0 broken := proposedBlock == 0 && currentConsensusBlockNumber > 0
if proposedBlock > 0 { if proposedBlock > 0 {
for !hasConsensus { for !hasConsensus {
allAgreed := true allAgreed := true
consensusBackends = consensusBackends[:0] for be, _ := range candidates {
filteredBackendsNames = filteredBackendsNames[:0]
for _, be := range cp.backendGroup.Backends {
/*
a serving node needs to be:
- healthy (network)
- updated recently
- not banned
- with minimum peer count
- not lagging latest block
- in sync
*/
peerCount, inSync, latestBlockNumber, _, _, _, lastUpdate, bannedUntil := cp.getBackendState(be)
notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now())
isBanned := time.Now().Before(bannedUntil)
notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount
lagging := latestBlockNumber < proposedBlock
if !be.IsHealthy() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync {
filteredBackendsNames = append(filteredBackendsNames, be.Name)
continue
}
actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String()) actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String())
if err != nil { if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err) log.Warn("error updating backend", "name", be.Name, "err", err)
@ -469,8 +465,6 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
allAgreed = false allAgreed = false
break break
} }
consensusBackends = append(consensusBackends, be)
consensusBackendsNames = append(consensusBackendsNames, be.Name)
} }
if allAgreed { if allAgreed {
hasConsensus = true hasConsensus = true
@ -488,26 +482,39 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
for _, l := range cp.listeners { for _, l := range cp.listeners {
l() l()
} }
log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash) log.Info("consensus broken",
"currentConsensusBlockNumber", currentConsensusBlockNumber,
"proposedBlock", proposedBlock,
"proposedBlockHash", proposedBlockHash)
} }
cp.tracker.SetLatestBlockNumber(proposedBlock) cp.tracker.SetLatestBlockNumber(proposedBlock)
cp.tracker.SetSafeBlockNumber(lowestSafeBlock) cp.tracker.SetSafeBlockNumber(lowestSafeBlock)
cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock) cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock)
// update consensus group
group := make([]*Backend, 0, len(candidates))
consensusBackendsNames := make([]string, 0, len(candidates))
for be, _ := range candidates {
group = append(group, be)
consensusBackendsNames = append(consensusBackendsNames, be.Name)
}
cp.consensusGroupMux.Lock() cp.consensusGroupMux.Lock()
cp.consensusGroup = consensusBackends cp.consensusGroup = group
cp.consensusGroupMux.Unlock() cp.consensusGroupMux.Unlock()
RecordGroupConsensusLatestBlock(cp.backendGroup, proposedBlock) RecordGroupConsensusLatestBlock(cp.backendGroup, proposedBlock)
RecordGroupConsensusSafeBlock(cp.backendGroup, lowestSafeBlock) RecordGroupConsensusSafeBlock(cp.backendGroup, lowestSafeBlock)
RecordGroupConsensusFinalizedBlock(cp.backendGroup, lowestFinalizedBlock) RecordGroupConsensusFinalizedBlock(cp.backendGroup, lowestFinalizedBlock)
RecordGroupConsensusCount(cp.backendGroup, len(consensusBackends)) RecordGroupConsensusCount(cp.backendGroup, len(group))
RecordGroupConsensusFilteredCount(cp.backendGroup, len(filteredBackendsNames)) RecordGroupConsensusFilteredCount(cp.backendGroup, len(filteredBackendsNames))
RecordGroupTotalCount(cp.backendGroup, len(cp.backendGroup.Backends)) RecordGroupTotalCount(cp.backendGroup, len(cp.backendGroup.Backends))
log.Debug("group state", "proposedBlock", proposedBlock, "consensusBackends", strings.Join(consensusBackendsNames, ", "), "filteredBackends", strings.Join(filteredBackendsNames, ", ")) log.Debug("group state",
"proposedBlock", proposedBlock,
"consensusBackends", strings.Join(consensusBackendsNames, ", "),
"filteredBackends", strings.Join(filteredBackendsNames, ", "))
} }
// IsBanned checks if a specific backend is banned // IsBanned checks if a specific backend is banned
@ -606,23 +613,22 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo
return res, nil return res, nil
} }
func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool, func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState {
latestBlockNumber hexutil.Uint64, latestBlockHash string,
finalizedBlockNumber hexutil.Uint64,
safeBlockNumber hexutil.Uint64,
lastUpdate time.Time, bannedUntil time.Time) {
bs := cp.backendState[be] bs := cp.backendState[be]
defer bs.backendStateMux.Unlock() defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
peerCount = bs.peerCount
inSync = bs.inSync // we return a copy so that the caller can use it without locking
latestBlockNumber = bs.latestBlockNumber return &backendState{
latestBlockHash = bs.latestBlockHash latestBlockNumber: bs.latestBlockNumber,
finalizedBlockNumber = bs.finalizedBlockNumber latestBlockHash: bs.latestBlockHash,
safeBlockNumber = bs.safeBlockNumber safeBlockNumber: bs.safeBlockNumber,
lastUpdate = bs.lastUpdate finalizedBlockNumber: bs.finalizedBlockNumber,
bannedUntil = bs.bannedUntil peerCount: bs.peerCount,
return inSync: bs.inSync,
lastUpdate: bs.lastUpdate,
bannedUntil: bs.bannedUntil,
}
} }
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool,