1babca422e
- Return early on network errors or invalid responses within updateBackend (e.g., zero block height) - Prevent premature node banning - Use existing sliding windows for node banning for unexpected tags or excessive network errors
799 lines
22 KiB
Go
799 lines
22 KiB
Go
package proxyd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common/hexutil"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
)
|
|
|
|
const (
|
|
DefaultPollerInterval = 1 * time.Second
|
|
)
|
|
|
|
type OnConsensusBroken func()
|
|
|
|
// ConsensusPoller checks the consensus state for each member of a BackendGroup
|
|
// resolves the highest common block for multiple nodes, and reconciles the consensus
|
|
// in case of block hash divergence to minimize re-orgs
|
|
type ConsensusPoller struct {
|
|
ctx context.Context
|
|
cancelFunc context.CancelFunc
|
|
listeners []OnConsensusBroken
|
|
|
|
backendGroup *BackendGroup
|
|
backendState map[*Backend]*backendState
|
|
consensusGroupMux sync.Mutex
|
|
consensusGroup []*Backend
|
|
|
|
tracker ConsensusTracker
|
|
asyncHandler ConsensusAsyncHandler
|
|
|
|
minPeerCount uint64
|
|
banPeriod time.Duration
|
|
maxUpdateThreshold time.Duration
|
|
maxBlockLag uint64
|
|
maxBlockRange uint64
|
|
interval time.Duration
|
|
}
|
|
|
|
type backendState struct {
|
|
backendStateMux sync.Mutex
|
|
|
|
latestBlockNumber hexutil.Uint64
|
|
latestBlockHash string
|
|
safeBlockNumber hexutil.Uint64
|
|
finalizedBlockNumber hexutil.Uint64
|
|
|
|
peerCount uint64
|
|
inSync bool
|
|
|
|
lastUpdate time.Time
|
|
|
|
bannedUntil time.Time
|
|
}
|
|
|
|
func (bs *backendState) IsBanned() bool {
|
|
return time.Now().Before(bs.bannedUntil)
|
|
}
|
|
|
|
func (bs *backendState) GetLatestBlock() (hexutil.Uint64, string) {
|
|
bs.backendStateMux.Lock()
|
|
defer bs.backendStateMux.Unlock()
|
|
return bs.latestBlockNumber, bs.latestBlockHash
|
|
}
|
|
|
|
func (bs *backendState) GetSafeBlockNumber() hexutil.Uint64 {
|
|
bs.backendStateMux.Lock()
|
|
defer bs.backendStateMux.Unlock()
|
|
return bs.safeBlockNumber
|
|
}
|
|
|
|
func (bs *backendState) GetFinalizedBlockNumber() hexutil.Uint64 {
|
|
bs.backendStateMux.Lock()
|
|
defer bs.backendStateMux.Unlock()
|
|
return bs.finalizedBlockNumber
|
|
}
|
|
|
|
// GetConsensusGroup returns the backend members that are agreeing in a consensus
|
|
func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
|
|
defer cp.consensusGroupMux.Unlock()
|
|
cp.consensusGroupMux.Lock()
|
|
|
|
g := make([]*Backend, len(cp.consensusGroup))
|
|
copy(g, cp.consensusGroup)
|
|
|
|
return g
|
|
}
|
|
|
|
// GetLatestBlockNumber returns the `latest` agreed block number in a consensus
|
|
func (ct *ConsensusPoller) GetLatestBlockNumber() hexutil.Uint64 {
|
|
return ct.tracker.GetLatestBlockNumber()
|
|
}
|
|
|
|
// GetSafeBlockNumber returns the `safe` agreed block number in a consensus
|
|
func (ct *ConsensusPoller) GetSafeBlockNumber() hexutil.Uint64 {
|
|
return ct.tracker.GetSafeBlockNumber()
|
|
}
|
|
|
|
// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus
|
|
func (ct *ConsensusPoller) GetFinalizedBlockNumber() hexutil.Uint64 {
|
|
return ct.tracker.GetFinalizedBlockNumber()
|
|
}
|
|
|
|
func (cp *ConsensusPoller) Shutdown() {
|
|
cp.asyncHandler.Shutdown()
|
|
}
|
|
|
|
// ConsensusAsyncHandler controls the asynchronous polling mechanism, interval and shutdown
|
|
type ConsensusAsyncHandler interface {
|
|
Init()
|
|
Shutdown()
|
|
}
|
|
|
|
// NoopAsyncHandler allows fine control updating the consensus
|
|
type NoopAsyncHandler struct{}
|
|
|
|
func NewNoopAsyncHandler() ConsensusAsyncHandler {
|
|
log.Warn("using NewNoopAsyncHandler")
|
|
return &NoopAsyncHandler{}
|
|
}
|
|
func (ah *NoopAsyncHandler) Init() {}
|
|
func (ah *NoopAsyncHandler) Shutdown() {}
|
|
|
|
// PollerAsyncHandler asynchronously updates each individual backend and the group consensus
|
|
type PollerAsyncHandler struct {
|
|
ctx context.Context
|
|
cp *ConsensusPoller
|
|
}
|
|
|
|
func NewPollerAsyncHandler(ctx context.Context, cp *ConsensusPoller) ConsensusAsyncHandler {
|
|
return &PollerAsyncHandler{
|
|
ctx: ctx,
|
|
cp: cp,
|
|
}
|
|
}
|
|
func (ah *PollerAsyncHandler) Init() {
|
|
// create the individual backend pollers.
|
|
log.Info("total number of primary candidates", "primaries", len(ah.cp.backendGroup.Primaries()))
|
|
log.Info("total number of fallback candidates", "fallbacks", len(ah.cp.backendGroup.Fallbacks()))
|
|
|
|
for _, be := range ah.cp.backendGroup.Primaries() {
|
|
go func(be *Backend) {
|
|
for {
|
|
timer := time.NewTimer(ah.cp.interval)
|
|
ah.cp.UpdateBackend(ah.ctx, be)
|
|
select {
|
|
case <-timer.C:
|
|
case <-ah.ctx.Done():
|
|
timer.Stop()
|
|
return
|
|
}
|
|
}
|
|
}(be)
|
|
}
|
|
|
|
for _, be := range ah.cp.backendGroup.Fallbacks() {
|
|
go func(be *Backend) {
|
|
for {
|
|
timer := time.NewTimer(ah.cp.interval)
|
|
|
|
healthyCandidates := ah.cp.FilterCandidates(ah.cp.backendGroup.Primaries())
|
|
|
|
log.Info("number of healthy primary candidates", "healthy_candidates", len(healthyCandidates))
|
|
if len(healthyCandidates) == 0 {
|
|
log.Debug("zero healthy candidates, querying fallback backend",
|
|
"backend_name", be.Name)
|
|
ah.cp.UpdateBackend(ah.ctx, be)
|
|
}
|
|
|
|
select {
|
|
case <-timer.C:
|
|
case <-ah.ctx.Done():
|
|
timer.Stop()
|
|
return
|
|
}
|
|
}
|
|
}(be)
|
|
}
|
|
|
|
// create the group consensus poller
|
|
go func() {
|
|
for {
|
|
timer := time.NewTimer(ah.cp.interval)
|
|
log.Info("updating backend group consensus")
|
|
ah.cp.UpdateBackendGroupConsensus(ah.ctx)
|
|
|
|
select {
|
|
case <-timer.C:
|
|
case <-ah.ctx.Done():
|
|
timer.Stop()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
func (ah *PollerAsyncHandler) Shutdown() {
|
|
ah.cp.cancelFunc()
|
|
}
|
|
|
|
type ConsensusOpt func(cp *ConsensusPoller)
|
|
|
|
func WithTracker(tracker ConsensusTracker) ConsensusOpt {
|
|
return func(cp *ConsensusPoller) {
|
|
cp.tracker = tracker
|
|
}
|
|
}
|
|
|
|
func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt {
|
|
return func(cp *ConsensusPoller) {
|
|
cp.asyncHandler = asyncHandler
|
|
}
|
|
}
|
|
|
|
func WithListener(listener OnConsensusBroken) ConsensusOpt {
|
|
return func(cp *ConsensusPoller) {
|
|
cp.AddListener(listener)
|
|
}
|
|
}
|
|
|
|
func (cp *ConsensusPoller) AddListener(listener OnConsensusBroken) {
|
|
cp.listeners = append(cp.listeners, listener)
|
|
}
|
|
|
|
func (cp *ConsensusPoller) ClearListeners() {
|
|
cp.listeners = []OnConsensusBroken{}
|
|
}
|
|
|
|
func WithBanPeriod(banPeriod time.Duration) ConsensusOpt {
|
|
return func(cp *ConsensusPoller) {
|
|
cp.banPeriod = banPeriod
|
|
}
|
|
}
|
|
|
|
func WithMaxUpdateThreshold(maxUpdateThreshold time.Duration) ConsensusOpt {
|
|
return func(cp *ConsensusPoller) {
|
|
cp.maxUpdateThreshold = maxUpdateThreshold
|
|
}
|
|
}
|
|
|
|
func WithMaxBlockLag(maxBlockLag uint64) ConsensusOpt {
|
|
return func(cp *ConsensusPoller) {
|
|
cp.maxBlockLag = maxBlockLag
|
|
}
|
|
}
|
|
|
|
func WithMaxBlockRange(maxBlockRange uint64) ConsensusOpt {
|
|
return func(cp *ConsensusPoller) {
|
|
cp.maxBlockRange = maxBlockRange
|
|
}
|
|
}
|
|
|
|
func WithMinPeerCount(minPeerCount uint64) ConsensusOpt {
|
|
return func(cp *ConsensusPoller) {
|
|
cp.minPeerCount = minPeerCount
|
|
}
|
|
}
|
|
|
|
func WithPollerInterval(interval time.Duration) ConsensusOpt {
|
|
return func(cp *ConsensusPoller) {
|
|
cp.interval = interval
|
|
}
|
|
}
|
|
|
|
func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller {
|
|
ctx, cancelFunc := context.WithCancel(context.Background())
|
|
|
|
state := make(map[*Backend]*backendState, len(bg.Backends))
|
|
|
|
cp := &ConsensusPoller{
|
|
ctx: ctx,
|
|
cancelFunc: cancelFunc,
|
|
backendGroup: bg,
|
|
backendState: state,
|
|
|
|
banPeriod: 5 * time.Minute,
|
|
maxUpdateThreshold: 30 * time.Second,
|
|
maxBlockLag: 8, // 8*12 seconds = 96 seconds ~ 1.6 minutes
|
|
minPeerCount: 3,
|
|
interval: DefaultPollerInterval,
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt(cp)
|
|
}
|
|
|
|
if cp.tracker == nil {
|
|
cp.tracker = NewInMemoryConsensusTracker()
|
|
}
|
|
|
|
if cp.asyncHandler == nil {
|
|
cp.asyncHandler = NewPollerAsyncHandler(ctx, cp)
|
|
}
|
|
|
|
cp.Reset()
|
|
cp.asyncHandler.Init()
|
|
|
|
return cp
|
|
}
|
|
|
|
// UpdateBackend refreshes the consensus state of a single backend
|
|
func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
|
|
bs := cp.GetBackendState(be)
|
|
RecordConsensusBackendBanned(be, bs.IsBanned())
|
|
|
|
if bs.IsBanned() {
|
|
log.Debug("skipping backend - banned", "backend", be.Name)
|
|
return
|
|
}
|
|
|
|
// if backend is not healthy state we'll only resume checking it after ban
|
|
if !be.IsHealthy() && !be.forcedCandidate {
|
|
log.Warn("backend banned - not healthy", "backend", be.Name)
|
|
cp.Ban(be)
|
|
return
|
|
}
|
|
|
|
inSync, err := cp.isInSync(ctx, be)
|
|
RecordConsensusBackendInSync(be, err == nil && inSync)
|
|
if err != nil {
|
|
log.Warn("error updating backend sync state", "name", be.Name, "err", err)
|
|
return
|
|
}
|
|
|
|
var peerCount uint64
|
|
if !be.skipPeerCountCheck {
|
|
peerCount, err = cp.getPeerCount(ctx, be)
|
|
if err != nil {
|
|
log.Warn("error updating backend peer count", "name", be.Name, "err", err)
|
|
return
|
|
}
|
|
if peerCount == 0 {
|
|
log.Warn("peer count responded with 200 and 0 peers", "name", be.Name)
|
|
be.intermittentErrorsSlidingWindow.Incr()
|
|
return
|
|
}
|
|
RecordConsensusBackendPeerCount(be, peerCount)
|
|
}
|
|
|
|
latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest")
|
|
if err != nil {
|
|
log.Warn("error updating backend - latest block will not be updated", "name", be.Name, "err", err)
|
|
return
|
|
}
|
|
if latestBlockNumber == 0 {
|
|
log.Warn("error backend responded a 200 with blockheight 0 for latest block", "name", be.Name)
|
|
be.intermittentErrorsSlidingWindow.Incr()
|
|
return
|
|
}
|
|
|
|
safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe")
|
|
if err != nil {
|
|
log.Warn("error updating backend - safe block will not be updated", "name", be.Name, "err", err)
|
|
return
|
|
}
|
|
|
|
if safeBlockNumber == 0 {
|
|
log.Warn("error backend responded a 200 with blockheight 0 for safe block", "name", be.Name)
|
|
be.intermittentErrorsSlidingWindow.Incr()
|
|
return
|
|
}
|
|
|
|
finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized")
|
|
if err != nil {
|
|
log.Warn("error updating backend - finalized block will not be updated", "name", be.Name, "err", err)
|
|
return
|
|
}
|
|
|
|
if finalizedBlockNumber == 0 {
|
|
log.Warn("error backend responded a 200 with blockheight 0 for finalized block", "name", be.Name)
|
|
be.intermittentErrorsSlidingWindow.Incr()
|
|
return
|
|
}
|
|
|
|
RecordConsensusBackendUpdateDelay(be, bs.lastUpdate)
|
|
|
|
changed := cp.setBackendState(be, peerCount, inSync,
|
|
latestBlockNumber, latestBlockHash,
|
|
safeBlockNumber, finalizedBlockNumber)
|
|
|
|
RecordBackendLatestBlock(be, latestBlockNumber)
|
|
RecordBackendSafeBlock(be, safeBlockNumber)
|
|
RecordBackendFinalizedBlock(be, finalizedBlockNumber)
|
|
|
|
if changed {
|
|
log.Debug("backend state updated",
|
|
"name", be.Name,
|
|
"peerCount", peerCount,
|
|
"inSync", inSync,
|
|
"latestBlockNumber", latestBlockNumber,
|
|
"latestBlockHash", latestBlockHash,
|
|
"safeBlockNumber", safeBlockNumber,
|
|
"finalizedBlockNumber", finalizedBlockNumber,
|
|
"lastUpdate", bs.lastUpdate)
|
|
}
|
|
|
|
// sanity check for latest, safe and finalized block tags
|
|
expectedBlockTags := cp.checkExpectedBlockTags(
|
|
latestBlockNumber,
|
|
bs.safeBlockNumber, safeBlockNumber,
|
|
bs.finalizedBlockNumber, finalizedBlockNumber)
|
|
|
|
RecordBackendUnexpectedBlockTags(be, !expectedBlockTags)
|
|
|
|
if !expectedBlockTags && !be.forcedCandidate {
|
|
log.Warn("backend banned - unexpected block tags",
|
|
"backend", be.Name,
|
|
"oldFinalized", bs.finalizedBlockNumber,
|
|
"finalizedBlockNumber", finalizedBlockNumber,
|
|
"oldSafe", bs.safeBlockNumber,
|
|
"safeBlockNumber", safeBlockNumber,
|
|
"latestBlockNumber", latestBlockNumber,
|
|
)
|
|
cp.Ban(be)
|
|
}
|
|
}
|
|
|
|
// checkExpectedBlockTags for unexpected conditions on block tags
|
|
// - finalized block number should never decrease
|
|
// - safe block number should never decrease
|
|
// - finalized block should be <= safe block <= latest block
|
|
func (cp *ConsensusPoller) checkExpectedBlockTags(
|
|
currentLatest hexutil.Uint64,
|
|
oldSafe hexutil.Uint64, currentSafe hexutil.Uint64,
|
|
oldFinalized hexutil.Uint64, currentFinalized hexutil.Uint64) bool {
|
|
return currentFinalized >= oldFinalized &&
|
|
currentSafe >= oldSafe &&
|
|
currentFinalized <= currentSafe &&
|
|
currentSafe <= currentLatest
|
|
}
|
|
|
|
// UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends
|
|
func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
|
|
// get the latest block number from the tracker
|
|
currentConsensusBlockNumber := cp.GetLatestBlockNumber()
|
|
|
|
// get the candidates for the consensus group
|
|
candidates := cp.getConsensusCandidates()
|
|
|
|
// update the lowest latest block number and hash
|
|
// the lowest safe block number
|
|
// the lowest finalized block number
|
|
var lowestLatestBlock hexutil.Uint64
|
|
var lowestLatestBlockHash string
|
|
var lowestFinalizedBlock hexutil.Uint64
|
|
var lowestSafeBlock hexutil.Uint64
|
|
for _, bs := range candidates {
|
|
if lowestLatestBlock == 0 || bs.latestBlockNumber < lowestLatestBlock {
|
|
lowestLatestBlock = bs.latestBlockNumber
|
|
lowestLatestBlockHash = bs.latestBlockHash
|
|
}
|
|
if lowestFinalizedBlock == 0 || bs.finalizedBlockNumber < lowestFinalizedBlock {
|
|
lowestFinalizedBlock = bs.finalizedBlockNumber
|
|
}
|
|
if lowestSafeBlock == 0 || bs.safeBlockNumber < lowestSafeBlock {
|
|
lowestSafeBlock = bs.safeBlockNumber
|
|
}
|
|
}
|
|
|
|
// find the proposed block among the candidates
|
|
// the proposed block needs have the same hash in the entire consensus group
|
|
proposedBlock := lowestLatestBlock
|
|
proposedBlockHash := lowestLatestBlockHash
|
|
hasConsensus := false
|
|
broken := false
|
|
|
|
if lowestLatestBlock > currentConsensusBlockNumber {
|
|
log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock)
|
|
}
|
|
|
|
// if there is a block to propose, check if it is the same in all backends
|
|
if proposedBlock > 0 {
|
|
for !hasConsensus {
|
|
allAgreed := true
|
|
for be := range candidates {
|
|
actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String())
|
|
if err != nil {
|
|
log.Warn("error updating backend", "name", be.Name, "err", err)
|
|
continue
|
|
}
|
|
if proposedBlockHash == "" {
|
|
proposedBlockHash = actualBlockHash
|
|
}
|
|
blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash)
|
|
if blocksDontMatch {
|
|
if currentConsensusBlockNumber >= actualBlockNumber {
|
|
log.Warn("backend broke consensus",
|
|
"name", be.Name,
|
|
"actualBlockNumber", actualBlockNumber,
|
|
"actualBlockHash", actualBlockHash,
|
|
"proposedBlock", proposedBlock,
|
|
"proposedBlockHash", proposedBlockHash)
|
|
broken = true
|
|
}
|
|
allAgreed = false
|
|
break
|
|
}
|
|
}
|
|
if allAgreed {
|
|
hasConsensus = true
|
|
} else {
|
|
// walk one block behind and try again
|
|
proposedBlock -= 1
|
|
proposedBlockHash = ""
|
|
log.Debug("no consensus, now trying", "block:", proposedBlock)
|
|
}
|
|
}
|
|
}
|
|
|
|
if broken {
|
|
// propagate event to other interested parts, such as cache invalidator
|
|
for _, l := range cp.listeners {
|
|
l()
|
|
}
|
|
log.Info("consensus broken",
|
|
"currentConsensusBlockNumber", currentConsensusBlockNumber,
|
|
"proposedBlock", proposedBlock,
|
|
"proposedBlockHash", proposedBlockHash)
|
|
}
|
|
|
|
// update tracker
|
|
cp.tracker.SetLatestBlockNumber(proposedBlock)
|
|
cp.tracker.SetSafeBlockNumber(lowestSafeBlock)
|
|
cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock)
|
|
|
|
// update consensus group
|
|
group := make([]*Backend, 0, len(candidates))
|
|
consensusBackendsNames := make([]string, 0, len(candidates))
|
|
filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
|
|
for _, be := range cp.backendGroup.Backends {
|
|
_, exist := candidates[be]
|
|
if exist {
|
|
group = append(group, be)
|
|
consensusBackendsNames = append(consensusBackendsNames, be.Name)
|
|
} else {
|
|
filteredBackendsNames = append(filteredBackendsNames, be.Name)
|
|
}
|
|
}
|
|
|
|
cp.consensusGroupMux.Lock()
|
|
cp.consensusGroup = group
|
|
cp.consensusGroupMux.Unlock()
|
|
|
|
RecordGroupConsensusLatestBlock(cp.backendGroup, proposedBlock)
|
|
RecordGroupConsensusSafeBlock(cp.backendGroup, lowestSafeBlock)
|
|
RecordGroupConsensusFinalizedBlock(cp.backendGroup, lowestFinalizedBlock)
|
|
|
|
RecordGroupConsensusCount(cp.backendGroup, len(group))
|
|
RecordGroupConsensusFilteredCount(cp.backendGroup, len(filteredBackendsNames))
|
|
RecordGroupTotalCount(cp.backendGroup, len(cp.backendGroup.Backends))
|
|
|
|
log.Debug("group state",
|
|
"proposedBlock", proposedBlock,
|
|
"consensusBackends", strings.Join(consensusBackendsNames, ", "),
|
|
"filteredBackends", strings.Join(filteredBackendsNames, ", "))
|
|
}
|
|
|
|
// IsBanned checks if a specific backend is banned
|
|
func (cp *ConsensusPoller) IsBanned(be *Backend) bool {
|
|
bs := cp.backendState[be]
|
|
defer bs.backendStateMux.Unlock()
|
|
bs.backendStateMux.Lock()
|
|
return bs.IsBanned()
|
|
}
|
|
|
|
// IsBanned checks if a specific backend is banned
|
|
func (cp *ConsensusPoller) BannedUntil(be *Backend) time.Time {
|
|
bs := cp.backendState[be]
|
|
defer bs.backendStateMux.Unlock()
|
|
bs.backendStateMux.Lock()
|
|
return bs.bannedUntil
|
|
}
|
|
|
|
// Ban bans a specific backend
|
|
func (cp *ConsensusPoller) Ban(be *Backend) {
|
|
if be.forcedCandidate {
|
|
return
|
|
}
|
|
|
|
bs := cp.backendState[be]
|
|
defer bs.backendStateMux.Unlock()
|
|
bs.backendStateMux.Lock()
|
|
bs.bannedUntil = time.Now().Add(cp.banPeriod)
|
|
|
|
// when we ban a node, we give it the chance to start from any block when it is back
|
|
bs.latestBlockNumber = 0
|
|
bs.safeBlockNumber = 0
|
|
bs.finalizedBlockNumber = 0
|
|
}
|
|
|
|
// Unban removes any bans from the backends
|
|
func (cp *ConsensusPoller) Unban(be *Backend) {
|
|
bs := cp.backendState[be]
|
|
defer bs.backendStateMux.Unlock()
|
|
bs.backendStateMux.Lock()
|
|
bs.bannedUntil = time.Now().Add(-10 * time.Hour)
|
|
}
|
|
|
|
// Reset reset all backend states
|
|
func (cp *ConsensusPoller) Reset() {
|
|
for _, be := range cp.backendGroup.Backends {
|
|
cp.backendState[be] = &backendState{}
|
|
}
|
|
}
|
|
|
|
// fetchBlock is a convenient wrapper to make a request to get a block directly from the backend
|
|
func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) {
|
|
var rpcRes RPCRes
|
|
err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_getBlockByNumber", block, false)
|
|
if err != nil {
|
|
return 0, "", err
|
|
}
|
|
|
|
jsonMap, ok := rpcRes.Result.(map[string]interface{})
|
|
if !ok {
|
|
return 0, "", fmt.Errorf("unexpected response to eth_getBlockByNumber on backend %s", be.Name)
|
|
}
|
|
blockNumber = hexutil.Uint64(hexutil.MustDecodeUint64(jsonMap["number"].(string)))
|
|
blockHash = jsonMap["hash"].(string)
|
|
|
|
return
|
|
}
|
|
|
|
// getPeerCount is a convenient wrapper to retrieve the current peer count from the backend
|
|
func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count uint64, err error) {
|
|
var rpcRes RPCRes
|
|
err = be.ForwardRPC(ctx, &rpcRes, "67", "net_peerCount")
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
jsonMap, ok := rpcRes.Result.(string)
|
|
if !ok {
|
|
return 0, fmt.Errorf("unexpected response to net_peerCount on backend %s", be.Name)
|
|
}
|
|
|
|
count = hexutil.MustDecodeUint64(jsonMap)
|
|
|
|
return count, nil
|
|
}
|
|
|
|
// isInSync is a convenient wrapper to check if the backend is in sync from the network
|
|
func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bool, err error) {
|
|
var rpcRes RPCRes
|
|
err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_syncing")
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
var res bool
|
|
switch typed := rpcRes.Result.(type) {
|
|
case bool:
|
|
syncing := typed
|
|
res = !syncing
|
|
case string:
|
|
syncing, err := strconv.ParseBool(typed)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
res = !syncing
|
|
default:
|
|
// result is a json when not in sync
|
|
res = false
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
// GetBackendState creates a copy of backend state so that the caller can use it without locking
|
|
func (cp *ConsensusPoller) GetBackendState(be *Backend) *backendState {
|
|
bs := cp.backendState[be]
|
|
defer bs.backendStateMux.Unlock()
|
|
bs.backendStateMux.Lock()
|
|
|
|
return &backendState{
|
|
latestBlockNumber: bs.latestBlockNumber,
|
|
latestBlockHash: bs.latestBlockHash,
|
|
safeBlockNumber: bs.safeBlockNumber,
|
|
finalizedBlockNumber: bs.finalizedBlockNumber,
|
|
peerCount: bs.peerCount,
|
|
inSync: bs.inSync,
|
|
lastUpdate: bs.lastUpdate,
|
|
bannedUntil: bs.bannedUntil,
|
|
}
|
|
}
|
|
|
|
func (cp *ConsensusPoller) GetLastUpdate(be *Backend) time.Time {
|
|
bs := cp.backendState[be]
|
|
defer bs.backendStateMux.Unlock()
|
|
bs.backendStateMux.Lock()
|
|
return bs.lastUpdate
|
|
}
|
|
|
|
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool,
|
|
latestBlockNumber hexutil.Uint64, latestBlockHash string,
|
|
safeBlockNumber hexutil.Uint64,
|
|
finalizedBlockNumber hexutil.Uint64) bool {
|
|
bs := cp.backendState[be]
|
|
bs.backendStateMux.Lock()
|
|
changed := bs.latestBlockHash != latestBlockHash
|
|
bs.peerCount = peerCount
|
|
bs.inSync = inSync
|
|
bs.latestBlockNumber = latestBlockNumber
|
|
bs.latestBlockHash = latestBlockHash
|
|
bs.finalizedBlockNumber = finalizedBlockNumber
|
|
bs.safeBlockNumber = safeBlockNumber
|
|
bs.lastUpdate = time.Now()
|
|
bs.backendStateMux.Unlock()
|
|
return changed
|
|
}
|
|
|
|
// getConsensusCandidates will search for candidates in the primary group,
|
|
// if there are none it will search for candidates in he fallback group
|
|
func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState {
|
|
|
|
healthyPrimaries := cp.FilterCandidates(cp.backendGroup.Primaries())
|
|
|
|
RecordHealthyCandidates(cp.backendGroup, len(healthyPrimaries))
|
|
if len(healthyPrimaries) > 0 {
|
|
return healthyPrimaries
|
|
}
|
|
|
|
return cp.FilterCandidates(cp.backendGroup.Fallbacks())
|
|
}
|
|
|
|
// filterCandidates find out what backends are the candidates to be in the consensus group
|
|
// and create a copy of current their state
|
|
//
|
|
// a candidate is a serving node within the following conditions:
|
|
// - not banned
|
|
// - healthy (network latency and error rate)
|
|
// - with minimum peer count
|
|
// - in sync
|
|
// - updated recently
|
|
// - not lagging latest block
|
|
func (cp *ConsensusPoller) FilterCandidates(backends []*Backend) map[*Backend]*backendState {
|
|
|
|
candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends))
|
|
|
|
for _, be := range backends {
|
|
|
|
bs := cp.GetBackendState(be)
|
|
if be.forcedCandidate {
|
|
candidates[be] = bs
|
|
continue
|
|
}
|
|
if bs.IsBanned() {
|
|
continue
|
|
}
|
|
if !be.IsHealthy() {
|
|
continue
|
|
}
|
|
if !be.skipPeerCountCheck && bs.peerCount < cp.minPeerCount {
|
|
log.Debug("backend peer count too low for inclusion in consensus",
|
|
"backend_name", be.Name,
|
|
"peer_count", bs.peerCount,
|
|
"min_peer_count", cp.minPeerCount,
|
|
)
|
|
continue
|
|
}
|
|
if !bs.inSync {
|
|
continue
|
|
}
|
|
if bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
|
|
continue
|
|
}
|
|
|
|
candidates[be] = bs
|
|
}
|
|
|
|
// find the highest block, in order to use it defining the highest non-lagging ancestor block
|
|
var highestLatestBlock hexutil.Uint64
|
|
for _, bs := range candidates {
|
|
if bs.latestBlockNumber > highestLatestBlock {
|
|
highestLatestBlock = bs.latestBlockNumber
|
|
}
|
|
}
|
|
|
|
// find the highest common ancestor block
|
|
lagging := make([]*Backend, 0, len(candidates))
|
|
for be, bs := range candidates {
|
|
// check if backend is lagging behind the highest block
|
|
if uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag {
|
|
lagging = append(lagging, be)
|
|
}
|
|
}
|
|
|
|
// remove lagging backends from the candidates
|
|
for _, be := range lagging {
|
|
delete(candidates, be)
|
|
}
|
|
|
|
return candidates
|
|
}
|