bsc/core/vote/vote_manager.go

323 lines
12 KiB
Go
Raw Permalink Normal View History

package vote
import (
"bytes"
"fmt"
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/parlia"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)
const blocksNumberSinceMining = 5 // the number of blocks need to wait before voting, counting from the validator begin to mine
var diffInTurn = big.NewInt(2) // Block difficulty for in-turn signatures
var votesManagerCounter = metrics.NewRegisteredCounter("votesManager/local", nil)
var notJustified = metrics.NewRegisteredCounter("votesManager/notJustified", nil)
var inTurnJustified = metrics.NewRegisteredCounter("votesManager/inTurnJustified", nil)
var notInTurnJustified = metrics.NewRegisteredCounter("votesManager/notInTurnJustified", nil)
var continuousJustified = metrics.NewRegisteredCounter("votesManager/continuousJustified", nil)
var notContinuousJustified = metrics.NewRegisteredCounter("votesManager/notContinuousJustified", nil)
// Backend wraps all methods required for voting.
type Backend interface {
IsMining() bool
EventMux() *event.TypeMux
}
// VoteManager will handle the vote produced by self.
type VoteManager struct {
eth Backend
chain *core.BlockChain
highestVerifiedBlockCh chan core.HighestVerifiedBlockEvent
highestVerifiedBlockSub event.Subscription
// used for backup validators to sync votes from corresponding mining validator
syncVoteCh chan core.NewVoteEvent
syncVoteSub event.Subscription
pool *VotePool
signer *VoteSigner
journal *VoteJournal
engine consensus.PoSA
}
func NewVoteManager(eth Backend, chain *core.BlockChain, pool *VotePool, journalPath, blsPasswordPath, blsWalletPath string, engine consensus.PoSA) (*VoteManager, error) {
voteManager := &VoteManager{
eth: eth,
chain: chain,
highestVerifiedBlockCh: make(chan core.HighestVerifiedBlockEvent, highestVerifiedBlockChanSize),
syncVoteCh: make(chan core.NewVoteEvent, voteBufferForPut),
pool: pool,
engine: engine,
}
// Create voteSigner.
voteSigner, err := NewVoteSigner(blsPasswordPath, blsWalletPath)
if err != nil {
return nil, err
}
log.Info("Create voteSigner successfully")
voteManager.signer = voteSigner
// Create voteJournal
voteJournal, err := NewVoteJournal(journalPath)
if err != nil {
return nil, err
}
log.Info("Create voteJournal successfully")
voteManager.journal = voteJournal
// Subscribe to chain head event.
voteManager.highestVerifiedBlockSub = voteManager.chain.SubscribeHighestVerifiedHeaderEvent(voteManager.highestVerifiedBlockCh)
voteManager.syncVoteSub = voteManager.pool.SubscribeNewVoteEvent(voteManager.syncVoteCh)
go voteManager.loop()
return voteManager, nil
}
func (voteManager *VoteManager) loop() {
log.Debug("vote manager routine loop started")
defer voteManager.highestVerifiedBlockSub.Unsubscribe()
defer voteManager.syncVoteSub.Unsubscribe()
events := voteManager.eth.EventMux().Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
defer func() {
log.Debug("vote manager loop defer func occur")
if !events.Closed() {
log.Debug("event not closed, unsubscribed by vote manager loop")
events.Unsubscribe()
}
}()
dlEventCh := events.Chan()
startVote := true
blockCountSinceMining := 0
var once sync.Once
for {
select {
case ev := <-dlEventCh:
if ev == nil {
log.Debug("dlEvent is nil, continue")
continue
}
switch ev.Data.(type) {
case downloader.StartEvent:
log.Debug("downloader is in startEvent mode, will not startVote")
startVote = false
case downloader.FailedEvent:
log.Debug("downloader is in FailedEvent mode, set startVote flag as true")
startVote = true
case downloader.DoneEvent:
log.Debug("downloader is in DoneEvent mode, set the startVote flag to true")
startVote = true
}
case cHead := <-voteManager.highestVerifiedBlockCh:
if !startVote {
log.Debug("startVote flag is false, continue")
continue
}
if !voteManager.eth.IsMining() {
blockCountSinceMining = 0
log.Debug("skip voting because mining is disabled, continue")
continue
}
blockCountSinceMining++
if blockCountSinceMining <= blocksNumberSinceMining {
log.Debug("skip voting", "blockCountSinceMining", blockCountSinceMining, "blocksNumberSinceMining", blocksNumberSinceMining)
continue
}
if cHead.Header == nil {
log.Debug("cHead.Header is nil, continue")
continue
}
curHead := cHead.Header
if p, ok := voteManager.engine.(*parlia.Parlia); ok {
nextBlockMinedTime := time.Unix(int64((curHead.Time + p.Period())), 0)
timeForBroadcast := 50 * time.Millisecond // enough to broadcast a vote
if time.Now().Add(timeForBroadcast).After(nextBlockMinedTime) {
log.Warn("too late to vote", "Head.Time(Second)", curHead.Time, "Now(Millisecond)", time.Now().UnixMilli())
continue
}
}
// Check if cur validator is within the validatorSet at curHead
if !voteManager.engine.IsActiveValidatorAt(voteManager.chain, curHead,
func(bLSPublicKey *types.BLSPublicKey) bool {
return bytes.Equal(voteManager.signer.PubKey[:], bLSPublicKey[:])
}) {
log.Debug("local validator with voteKey is not within the validatorSet at curHead")
continue
}
// Add VoteKey to `miner-info`
once.Do(func() {
minerInfo := metrics.Get("miner-info")
if minerInfo != nil {
minerInfo.(metrics.Label).Value()["VoteKey"] = common.Bytes2Hex(voteManager.signer.PubKey[:])
}
})
// Vote for curBlockHeader block.
vote := &types.VoteData{
TargetNumber: curHead.Number.Uint64(),
TargetHash: curHead.Hash(),
}
voteMessage := &types.VoteEnvelope{
Data: vote,
}
// Put Vote into journal and VotesPool if we are active validator and allow to sign it.
if ok, sourceNumber, sourceHash := voteManager.UnderRules(curHead); ok {
log.Debug("curHead is underRules for voting")
if sourceHash == (common.Hash{}) {
log.Debug("sourceHash is empty")
continue
}
voteMessage.Data.SourceNumber = sourceNumber
voteMessage.Data.SourceHash = sourceHash
if err := voteManager.signer.SignVote(voteMessage); err != nil {
log.Error("Failed to sign vote", "err", err, "votedBlockNumber", voteMessage.Data.TargetNumber, "votedBlockHash", voteMessage.Data.TargetHash, "voteMessageHash", voteMessage.Hash())
votesSigningErrorCounter.Inc(1)
continue
}
if err := voteManager.journal.WriteVote(voteMessage); err != nil {
log.Error("Failed to write vote into journal", "err", err)
voteJournalErrorCounter.Inc(1)
continue
}
log.Debug("vote manager produced vote", "votedBlockNumber", voteMessage.Data.TargetNumber, "votedBlockHash", voteMessage.Data.TargetHash, "voteMessageHash", voteMessage.Hash())
voteManager.pool.PutVote(voteMessage)
votesManagerCounter.Inc(1)
}
// check the latest justified block, which indicating the stability of the network
curJustifiedNumber, _, err := voteManager.engine.GetJustifiedNumberAndHash(voteManager.chain, []*types.Header{curHead})
if err == nil && curJustifiedNumber != 0 {
if curJustifiedNumber+1 != curHead.Number.Uint64() {
log.Debug("not justified", "blockNumber", curHead.Number.Uint64()-1)
notJustified.Inc(1)
} else {
parent := voteManager.chain.GetHeaderByHash(curHead.ParentHash)
if parent != nil {
if parent.Difficulty.Cmp(diffInTurn) == 0 {
inTurnJustified.Inc(1)
} else {
log.Debug("not in turn block justified", "blockNumber", parent.Number.Int64(), "blockHash", parent.Hash())
notInTurnJustified.Inc(1)
}
lastJustifiedNumber, _, err := voteManager.engine.GetJustifiedNumberAndHash(voteManager.chain, []*types.Header{parent})
if err == nil {
if lastJustifiedNumber == 0 || lastJustifiedNumber+1 == curJustifiedNumber {
continuousJustified.Inc(1)
} else {
log.Debug("not continuous block justified", "lastJustified", lastJustifiedNumber, "curJustified", curJustifiedNumber)
notContinuousJustified.Inc(1)
}
}
}
}
}
case event := <-voteManager.syncVoteCh:
voteMessage := event.Vote
if voteManager.eth.IsMining() || !bytes.Equal(voteManager.signer.PubKey[:], voteMessage.VoteAddress[:]) {
continue
}
if err := voteManager.journal.WriteVote(voteMessage); err != nil {
log.Error("Failed to write vote into journal", "err", err)
voteJournalErrorCounter.Inc(1)
continue
}
log.Debug("vote manager synced vote", "votedBlockNumber", voteMessage.Data.TargetNumber, "votedBlockHash", voteMessage.Data.TargetHash, "voteMessageHash", voteMessage.Hash())
votesManagerCounter.Inc(1)
case <-voteManager.syncVoteSub.Err():
log.Debug("voteManager subscribed votes failed")
return
case <-voteManager.highestVerifiedBlockSub.Err():
log.Debug("voteManager subscribed chainHead failed")
return
}
}
}
// UnderRules checks if the produced header under the following rules:
// A validator must not publish two distinct votes for the same height. (Rule 1)
// A validator must not vote within the span of its other votes . (Rule 2)
// Validators always vote for their canonical chains latest block. (Rule 3)
func (voteManager *VoteManager) UnderRules(header *types.Header) (bool, uint64, common.Hash) {
sourceNumber, sourceHash, err := voteManager.engine.GetJustifiedNumberAndHash(voteManager.chain, []*types.Header{header})
if err != nil {
log.Error("failed to get the highest justified number and hash at cur header", "curHeader's BlockNumber", header.Number, "curHeader's BlockHash", header.Hash())
return false, 0, common.Hash{}
}
targetNumber := header.Number.Uint64()
voteDataBuffer := voteManager.journal.voteDataBuffer
//Rule 1: A validator must not publish two distinct votes for the same height.
if voteDataBuffer.Contains(targetNumber) {
log.Debug("err: A validator must not publish two distinct votes for the same height.")
return false, 0, common.Hash{}
}
//Rule 2: A validator must not vote within the span of its other votes.
blockNumber := sourceNumber + 1
if blockNumber+maliciousVoteSlashScope < targetNumber {
blockNumber = targetNumber - maliciousVoteSlashScope
}
for ; blockNumber < targetNumber; blockNumber++ {
if voteDataBuffer.Contains(blockNumber) {
voteData, ok := voteDataBuffer.Get(blockNumber)
if !ok {
log.Error("Failed to get voteData info from LRU cache.")
continue
}
if voteData.(*types.VoteData).SourceNumber > sourceNumber {
log.Debug(fmt.Sprintf("error: cur vote %d-->%d is across the span of other votes %d-->%d",
sourceNumber, targetNumber, voteData.(*types.VoteData).SourceNumber, voteData.(*types.VoteData).TargetNumber))
return false, 0, common.Hash{}
}
}
}
for blockNumber := targetNumber + 1; blockNumber <= targetNumber+upperLimitOfVoteBlockNumber; blockNumber++ {
if voteDataBuffer.Contains(blockNumber) {
voteData, ok := voteDataBuffer.Get(blockNumber)
if !ok {
log.Error("Failed to get voteData info from LRU cache.")
continue
}
if voteData.(*types.VoteData).SourceNumber < sourceNumber {
log.Debug(fmt.Sprintf("error: cur vote %d-->%d is within the span of other votes %d-->%d",
sourceNumber, targetNumber, voteData.(*types.VoteData).SourceNumber, voteData.(*types.VoteData).TargetNumber))
return false, 0, common.Hash{}
}
}
}
// Rule 3: Validators always vote for their canonical chains latest block.
// Since the header subscribed to is the canonical chain, so this rule is satisfied by default.
log.Debug("All three rules check passed")
return true, sourceNumber, sourceHash
}