2023-04-10 18:36:45 +08:00
package vote
import (
2023-08-23 17:46:08 +08:00
mapset "github.com/deckarep/golang-set/v2"
2023-04-10 18:36:45 +08:00
const (
maxCurVoteAmountPerBlock = 21
maxFutureVoteAmountPerBlock = 50
voteBufferForPut = 256
// votes in the range (currentBlockNum-256,currentBlockNum+11] will be stored
lowerLimitOfVoteBlockNumber = 256
upperLimitOfVoteBlockNumber = 11 // refer to fetcher.maxUncleDist
2024-07-19 20:23:45 +08:00
highestVerifiedBlockChanSize = 10 // highestVerifiedBlockChanSize is the size of channel listening to HighestVerifiedBlockEvent.
2023-04-10 18:36:45 +08:00
var (
2023-05-05 14:15:43 +08:00
localCurVotesCounter = metrics.NewRegisteredCounter("curVotes/local", nil)
localFutureVotesCounter = metrics.NewRegisteredCounter("futureVotes/local", nil)
2023-04-10 18:36:45 +08:00
localReceivedVotesGauge = metrics.NewRegisteredGauge("receivedVotes/local", nil)
localCurVotesPqGauge = metrics.NewRegisteredGauge("curVotesPq/local", nil)
localFutureVotesPqGauge = metrics.NewRegisteredGauge("futureVotesPq/local", nil)
type VoteBox struct {
blockNumber uint64
voteMessages []*types.VoteEnvelope
type VotePool struct {
2023-09-21 12:02:59 +08:00
chain *core.BlockChain
mu sync.RWMutex
2023-04-10 18:36:45 +08:00
votesFeed event.Feed
scope event.SubscriptionScope
2023-08-23 17:46:08 +08:00
receivedVotes mapset.Set[common.Hash]
2023-04-10 18:36:45 +08:00
curVotes map[common.Hash]*VoteBox
futureVotes map[common.Hash]*VoteBox
curVotesPq *votesPriorityQueue
futureVotesPq *votesPriorityQueue
2024-07-19 20:23:45 +08:00
highestVerifiedBlockCh chan core.HighestVerifiedBlockEvent
highestVerifiedBlockSub event.Subscription
2023-04-10 18:36:45 +08:00
votesCh chan *types.VoteEnvelope
engine consensus.PoSA
type votesPriorityQueue []*types.VoteData
2023-09-21 12:02:59 +08:00
func NewVotePool(chain *core.BlockChain, engine consensus.PoSA) *VotePool {
2023-04-10 18:36:45 +08:00
votePool := &VotePool{
2024-07-19 20:23:45 +08:00
chain: chain,
receivedVotes: mapset.NewSet[common.Hash](),
curVotes: make(map[common.Hash]*VoteBox),
futureVotes: make(map[common.Hash]*VoteBox),
curVotesPq: &votesPriorityQueue{},
futureVotesPq: &votesPriorityQueue{},
highestVerifiedBlockCh: make(chan core.HighestVerifiedBlockEvent, highestVerifiedBlockChanSize),
votesCh: make(chan *types.VoteEnvelope, voteBufferForPut),
engine: engine,
2023-04-10 18:36:45 +08:00
// Subscribe events from blockchain and start the main event loop.
2024-07-19 20:23:45 +08:00
votePool.highestVerifiedBlockSub = votePool.chain.SubscribeHighestVerifiedHeaderEvent(votePool.highestVerifiedBlockCh)
2023-04-10 18:36:45 +08:00
go votePool.loop()
return votePool
// loop is the vote pool's main even loop, waiting for and reacting to outside blockchain events and votes channel event.
func (pool *VotePool) loop() {
2024-07-19 20:23:45 +08:00
defer pool.highestVerifiedBlockSub.Unsubscribe()
2023-08-02 14:50:32 +08:00
2023-04-10 18:36:45 +08:00
for {
select {
// Handle ChainHeadEvent.
2024-07-19 20:23:45 +08:00
case ev := <-pool.highestVerifiedBlockCh:
if ev.Header != nil {
latestBlockNumber := ev.Header.Number.Uint64()
2023-04-10 18:36:45 +08:00
2024-07-19 20:23:45 +08:00
2023-04-10 18:36:45 +08:00
2024-07-19 20:23:45 +08:00
case <-pool.highestVerifiedBlockSub.Err():
2023-04-10 18:36:45 +08:00
// Handle votes channel and put the vote into vote pool.
case vote := <-pool.votesCh:
func (pool *VotePool) PutVote(vote *types.VoteEnvelope) {
pool.votesCh <- vote
func (pool *VotePool) putIntoVotePool(vote *types.VoteEnvelope) bool {
targetNumber := vote.Data.TargetNumber
targetHash := vote.Data.TargetHash
2023-08-23 17:46:08 +08:00
header := pool.chain.CurrentBlock()
2023-04-10 18:36:45 +08:00
headNumber := header.Number.Uint64()
// Make sure in the range (currentHeight-lowerLimitOfVoteBlockNumber, currentHeight+upperLimitOfVoteBlockNumber].
if targetNumber+lowerLimitOfVoteBlockNumber-1 < headNumber || targetNumber > headNumber+upperLimitOfVoteBlockNumber {
log.Debug("BlockNumber of vote is outside the range of header-256~header+11, will be discarded")
return false
voteData := &types.VoteData{
TargetNumber: targetNumber,
TargetHash: targetHash,
var votes map[common.Hash]*VoteBox
var votesPq *votesPriorityQueue
isFutureVote := false
2024-07-19 20:23:45 +08:00
voteBlock := pool.chain.GetVerifiedBlockByHash(targetHash)
2023-04-10 18:36:45 +08:00
if voteBlock == nil {
votes = pool.futureVotes
votesPq = pool.futureVotesPq
isFutureVote = true
} else {
votes = pool.curVotes
votesPq = pool.curVotesPq
voteHash := vote.Hash()
if ok := pool.basicVerify(vote, headNumber, votes, isFutureVote, voteHash); !ok {
return false
if !isFutureVote {
// Verify if the vote comes from valid validators based on voteAddress (BLSPublicKey), only verify curVotes here, will verify futureVotes in transfer process.
if pool.engine.VerifyVote(pool.chain, vote) != nil {
return false
// Send vote for handler usage of broadcasting to peers.
voteEv := core.NewVoteEvent{Vote: vote}
pool.putVote(votes, votesPq, vote, voteData, voteHash, isFutureVote)
return true
func (pool *VotePool) SubscribeNewVoteEvent(ch chan<- core.NewVoteEvent) event.Subscription {
return pool.scope.Track(pool.votesFeed.Subscribe(ch))
func (pool *VotePool) putVote(m map[common.Hash]*VoteBox, votesPq *votesPriorityQueue, vote *types.VoteEnvelope, voteData *types.VoteData, voteHash common.Hash, isFutureVote bool) {
targetHash := vote.Data.TargetHash
targetNumber := vote.Data.TargetNumber
log.Debug("The vote info to put is:", "voteBlockNumber", targetNumber, "voteBlockHash", targetHash)
defer pool.mu.Unlock()
if _, ok := m[targetHash]; !ok {
// Push into votes priorityQueue if not exist in corresponding votes Map.
// To be noted: will not put into priorityQueue if exists in map to avoid duplicate element with the same voteData.
heap.Push(votesPq, voteData)
voteBox := &VoteBox{
blockNumber: targetNumber,
voteMessages: make([]*types.VoteEnvelope, 0, maxFutureVoteAmountPerBlock),
m[targetHash] = voteBox
if isFutureVote {
} else {
// Put into corresponding votes map.
m[targetHash].voteMessages = append(m[targetHash].voteMessages, vote)
// Add into received vote to avoid future duplicated vote comes.
log.Debug("VoteHash put into votepool is:", "voteHash", voteHash)
if isFutureVote {
2023-05-05 14:15:43 +08:00
2023-04-10 18:36:45 +08:00
} else {
2023-05-05 14:15:43 +08:00
2023-04-10 18:36:45 +08:00
func (pool *VotePool) transferVotesFromFutureToCur(latestBlockHeader *types.Header) {
defer pool.mu.Unlock()
futurePq := pool.futureVotesPq
latestBlockNumber := latestBlockHeader.Number.Uint64()
// For vote in the range [,latestBlockNumber-11), transfer to cur if valid.
for futurePq.Len() > 0 && futurePq.Peek().TargetNumber+upperLimitOfVoteBlockNumber < latestBlockNumber {
blockHash := futurePq.Peek().TargetHash
// For vote in the range [latestBlockNumber-11,latestBlockNumber], only transfer the vote inside the local fork.
futurePqBuffer := make([]*types.VoteData, 0)
for futurePq.Len() > 0 && futurePq.Peek().TargetNumber <= latestBlockNumber {
blockHash := futurePq.Peek().TargetHash
2024-07-19 20:23:45 +08:00
header := pool.chain.GetVerifiedBlockByHash(blockHash)
2023-04-10 18:36:45 +08:00
if header == nil {
// Put into pq buffer used for later put again into futurePq
futurePqBuffer = append(futurePqBuffer, heap.Pop(futurePq).(*types.VoteData))
for _, voteData := range futurePqBuffer {
heap.Push(futurePq, voteData)
func (pool *VotePool) transfer(blockHash common.Hash) {
curPq, futurePq := pool.curVotesPq, pool.futureVotesPq
curVotes, futureVotes := pool.curVotes, pool.futureVotes
voteData := heap.Pop(futurePq)
defer localFutureVotesPqGauge.Update(int64(futurePq.Len()))
voteBox, ok := futureVotes[blockHash]
if !ok {
validVotes := make([]*types.VoteEnvelope, 0, len(voteBox.voteMessages))
for _, vote := range voteBox.voteMessages {
// Verify if the vote comes from valid validators based on voteAddress (BLSPublicKey).
if pool.engine.VerifyVote(pool.chain, vote) != nil {
2023-04-24 14:53:51 +08:00
2023-04-10 18:36:45 +08:00
// In the process of transfer, send valid vote to votes channel for handler usage
voteEv := core.NewVoteEvent{Vote: vote}
validVotes = append(validVotes, vote)
// may len(curVotes[blockHash].voteMessages) extra maxCurVoteAmountPerBlock, but it doesn't matter
if _, ok := curVotes[blockHash]; !ok {
heap.Push(curPq, voteData)
curVotes[blockHash] = &VoteBox{voteBox.blockNumber, validVotes}
} else {
curVotes[blockHash].voteMessages = append(curVotes[blockHash].voteMessages, validVotes...)
delete(futureVotes, blockHash)
2023-05-05 14:15:43 +08:00
2023-04-10 18:36:45 +08:00
// Prune old data of duplicationSet, curVotePq and curVotesMap.
func (pool *VotePool) prune(latestBlockNumber uint64) {
defer pool.mu.Unlock()
curVotes := pool.curVotes
curVotesPq := pool.curVotesPq
// delete votes in the range [,latestBlockNumber-lowerLimitOfVoteBlockNumber]
for curVotesPq.Len() > 0 && curVotesPq.Peek().TargetNumber+lowerLimitOfVoteBlockNumber-1 < latestBlockNumber {
// Prune curPriorityQueue.
blockHash := heap.Pop(curVotesPq).(*types.VoteData).TargetHash
if voteBox, ok := curVotes[blockHash]; ok {
voteMessages := voteBox.voteMessages
// Prune duplicationSet.
for _, voteMessage := range voteMessages {
voteHash := voteMessage.Hash()
// Prune curVotes Map.
delete(curVotes, blockHash)
2023-05-05 14:15:43 +08:00
2023-04-10 18:36:45 +08:00
// GetVotes as batch.
func (pool *VotePool) GetVotes() []*types.VoteEnvelope {
defer pool.mu.RUnlock()
votesRes := make([]*types.VoteEnvelope, 0)
curVotes := pool.curVotes
for _, voteBox := range curVotes {
votesRes = append(votesRes, voteBox.voteMessages...)
return votesRes
func (pool *VotePool) FetchVoteByBlockHash(blockHash common.Hash) []*types.VoteEnvelope {
defer pool.mu.RUnlock()
if _, ok := pool.curVotes[blockHash]; ok {
return pool.curVotes[blockHash].voteMessages
return nil
func (pool *VotePool) basicVerify(vote *types.VoteEnvelope, headNumber uint64, m map[common.Hash]*VoteBox, isFutureVote bool, voteHash common.Hash) bool {
targetHash := vote.Data.TargetHash
defer pool.mu.RUnlock()
// Check duplicate voteMessage firstly.
if pool.receivedVotes.Contains(voteHash) {
log.Debug("Vote pool already contained the same vote", "voteHash", voteHash)
return false
// To prevent DOS attacks, make sure no more than 21 votes per blockHash if not futureVotes
// and no more than 50 votes per blockHash if futureVotes.
maxVoteAmountPerBlock := maxCurVoteAmountPerBlock
if isFutureVote {
maxVoteAmountPerBlock = maxFutureVoteAmountPerBlock
if voteBox, ok := m[targetHash]; ok {
if len(voteBox.voteMessages) >= maxVoteAmountPerBlock {
return false
// Verify bls signature.
if err := vote.Verify(); err != nil {
log.Error("Failed to verify voteMessage", "err", err)
return false
return true
func (pq votesPriorityQueue) Less(i, j int) bool {
return pq[i].TargetNumber < pq[j].TargetNumber
func (pq votesPriorityQueue) Len() int {
return len(pq)
func (pq votesPriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
func (pq *votesPriorityQueue) Push(vote interface{}) {
curVote := vote.(*types.VoteData)
*pq = append(*pq, curVote)
func (pq *votesPriorityQueue) Pop() interface{} {
tmp := *pq
l := len(tmp)
var res interface{} = tmp[l-1]
*pq = tmp[:l-1]
return res
func (pq *votesPriorityQueue) Peek() *types.VoteData {
if pq.Len() == 0 {
return nil
return (*pq)[0]