package vote import ( "bytes" "fmt" "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" ) var votesManagerCounter = metrics.NewRegisteredCounter("votesManager/local", nil) // Backend wraps all methods required for voting. type Backend interface { IsMining() bool EventMux() *event.TypeMux } // VoteManager will handle the vote produced by self. type VoteManager struct { eth Backend chain *core.BlockChain chainHeadCh chan core.ChainHeadEvent chainHeadSub event.Subscription // used for backup validators to sync votes from corresponding mining validator syncVoteCh chan core.NewVoteEvent syncVoteSub event.Subscription pool *VotePool signer *VoteSigner journal *VoteJournal engine consensus.PoSA } func NewVoteManager(eth Backend, chain *core.BlockChain, pool *VotePool, journalPath, blsPasswordPath, blsWalletPath string, engine consensus.PoSA) (*VoteManager, error) { voteManager := &VoteManager{ eth: eth, chain: chain, chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), syncVoteCh: make(chan core.NewVoteEvent, voteBufferForPut), pool: pool, engine: engine, } // Create voteSigner. voteSigner, err := NewVoteSigner(blsPasswordPath, blsWalletPath) if err != nil { return nil, err } log.Info("Create voteSigner successfully") voteManager.signer = voteSigner // Create voteJournal voteJournal, err := NewVoteJournal(journalPath) if err != nil { return nil, err } log.Info("Create voteJournal successfully") voteManager.journal = voteJournal // Subscribe to chain head event. voteManager.chainHeadSub = voteManager.chain.SubscribeChainHeadEvent(voteManager.chainHeadCh) voteManager.syncVoteSub = voteManager.pool.SubscribeNewVoteEvent(voteManager.syncVoteCh) go voteManager.loop() return voteManager, nil } func (voteManager *VoteManager) loop() { log.Debug("vote manager routine loop started") defer voteManager.chainHeadSub.Unsubscribe() defer voteManager.syncVoteSub.Unsubscribe() events := voteManager.eth.EventMux().Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{}) defer func() { log.Debug("vote manager loop defer func occur") if !events.Closed() { log.Debug("event not closed, unsubscribed by vote manager loop") events.Unsubscribe() } }() dlEventCh := events.Chan() startVote := true var once sync.Once for { select { case ev := <-dlEventCh: if ev == nil { log.Debug("dlEvent is nil, continue") continue } switch ev.Data.(type) { case downloader.StartEvent: log.Debug("downloader is in startEvent mode, will not startVote") startVote = false case downloader.FailedEvent: log.Debug("downloader is in FailedEvent mode, set startVote flag as true") startVote = true case downloader.DoneEvent: log.Debug("downloader is in DoneEvent mode, set the startVote flag to true") startVote = true } case cHead := <-voteManager.chainHeadCh: if !startVote { log.Debug("startVote flag is false, continue") continue } if !voteManager.eth.IsMining() { log.Debug("skip voting because mining is disabled, continue") continue } if cHead.Block == nil { log.Debug("cHead.Block is nil, continue") continue } curHead := cHead.Block.Header() // Check if cur validator is within the validatorSet at curHead if !voteManager.engine.IsActiveValidatorAt(voteManager.chain, curHead, func(bLSPublicKey *types.BLSPublicKey) bool { return bytes.Equal(voteManager.signer.PubKey[:], bLSPublicKey[:]) }) { log.Debug("cur validator is not within the validatorSet at curHead") continue } // Add VoteKey to `miner-info` once.Do(func() { minerInfo := metrics.Get("miner-info") if minerInfo != nil { minerInfo.(metrics.Label).Value()["VoteKey"] = common.Bytes2Hex(voteManager.signer.PubKey[:]) } }) // Vote for curBlockHeader block. vote := &types.VoteData{ TargetNumber: curHead.Number.Uint64(), TargetHash: curHead.Hash(), } voteMessage := &types.VoteEnvelope{ Data: vote, } // Put Vote into journal and VotesPool if we are active validator and allow to sign it. if ok, sourceNumber, sourceHash := voteManager.UnderRules(curHead); ok { log.Debug("curHead is underRules for voting") if sourceHash == (common.Hash{}) { log.Debug("sourceHash is empty") continue } voteMessage.Data.SourceNumber = sourceNumber voteMessage.Data.SourceHash = sourceHash if err := voteManager.signer.SignVote(voteMessage); err != nil { log.Error("Failed to sign vote", "err", err, "votedBlockNumber", voteMessage.Data.TargetNumber, "votedBlockHash", voteMessage.Data.TargetHash, "voteMessageHash", voteMessage.Hash()) votesSigningErrorCounter.Inc(1) continue } if err := voteManager.journal.WriteVote(voteMessage); err != nil { log.Error("Failed to write vote into journal", "err", err) voteJournalErrorCounter.Inc(1) continue } log.Debug("vote manager produced vote", "votedBlockNumber", voteMessage.Data.TargetNumber, "votedBlockHash", voteMessage.Data.TargetHash, "voteMessageHash", voteMessage.Hash()) voteManager.pool.PutVote(voteMessage) votesManagerCounter.Inc(1) } case event := <-voteManager.syncVoteCh: voteMessage := event.Vote if voteManager.eth.IsMining() || !bytes.Equal(voteManager.signer.PubKey[:], voteMessage.VoteAddress[:]) { continue } if err := voteManager.journal.WriteVote(voteMessage); err != nil { log.Error("Failed to write vote into journal", "err", err) voteJournalErrorCounter.Inc(1) continue } log.Debug("vote manager synced vote", "votedBlockNumber", voteMessage.Data.TargetNumber, "votedBlockHash", voteMessage.Data.TargetHash, "voteMessageHash", voteMessage.Hash()) votesManagerCounter.Inc(1) case <-voteManager.syncVoteSub.Err(): log.Debug("voteManager subscribed votes failed") return case <-voteManager.chainHeadSub.Err(): log.Debug("voteManager subscribed chainHead failed") return } } } // UnderRules checks if the produced header under the following rules: // A validator must not publish two distinct votes for the same height. (Rule 1) // A validator must not vote within the span of its other votes . (Rule 2) // Validators always vote for their canonical chain’s latest block. (Rule 3) func (voteManager *VoteManager) UnderRules(header *types.Header) (bool, uint64, common.Hash) { sourceNumber, sourceHash, err := voteManager.engine.GetJustifiedNumberAndHash(voteManager.chain, header) if err != nil { log.Error("failed to get the highest justified number and hash at cur header", "curHeader's BlockNumber", header.Number, "curHeader's BlockHash", header.Hash()) return false, 0, common.Hash{} } targetNumber := header.Number.Uint64() voteDataBuffer := voteManager.journal.voteDataBuffer //Rule 1: A validator must not publish two distinct votes for the same height. if voteDataBuffer.Contains(targetNumber) { log.Debug("err: A validator must not publish two distinct votes for the same height.") return false, 0, common.Hash{} } //Rule 2: A validator must not vote within the span of its other votes. blockNumber := sourceNumber + 1 if blockNumber+maliciousVoteSlashScope < targetNumber { blockNumber = targetNumber - maliciousVoteSlashScope } for ; blockNumber < targetNumber; blockNumber++ { if voteDataBuffer.Contains(blockNumber) { voteData, ok := voteDataBuffer.Get(blockNumber) if !ok { log.Error("Failed to get voteData info from LRU cache.") continue } if voteData.(*types.VoteData).SourceNumber > sourceNumber { log.Debug(fmt.Sprintf("error: cur vote %d-->%d is across the span of other votes %d-->%d", sourceNumber, targetNumber, voteData.(*types.VoteData).SourceNumber, voteData.(*types.VoteData).TargetNumber)) return false, 0, common.Hash{} } } } for blockNumber := targetNumber + 1; blockNumber <= targetNumber+upperLimitOfVoteBlockNumber; blockNumber++ { if voteDataBuffer.Contains(blockNumber) { voteData, ok := voteDataBuffer.Get(blockNumber) if !ok { log.Error("Failed to get voteData info from LRU cache.") continue } if voteData.(*types.VoteData).SourceNumber < sourceNumber { log.Debug(fmt.Sprintf("error: cur vote %d-->%d is within the span of other votes %d-->%d", sourceNumber, targetNumber, voteData.(*types.VoteData).SourceNumber, voteData.(*types.VoteData).TargetNumber)) return false, 0, common.Hash{} } } } // Rule 3: Validators always vote for their canonical chain’s latest block. // Since the header subscribed to is the canonical chain, so this rule is satisfied by default. log.Debug("All three rules check passed") return true, sourceNumber, sourceHash }