infra/proxyd/consensus_poller.go

799 lines
22 KiB
Go
Raw Normal View History

2023-04-18 21:57:55 +03:00
package proxyd
import (
"context"
"fmt"
2023-04-25 20:26:55 +03:00
"strconv"
2023-04-18 21:57:55 +03:00
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
)
const (
DefaultPollerInterval = 1 * time.Second
2023-04-18 21:57:55 +03:00
)
2023-04-27 02:31:03 +03:00
type OnConsensusBroken func()
2023-09-14 22:42:33 +03:00
// ConsensusPoller checks the consensus state for each member of a BackendGroup
2023-04-18 21:57:55 +03:00
// resolves the highest common block for multiple nodes, and reconciles the consensus
// in case of block hash divergence to minimize re-orgs
type ConsensusPoller struct {
2023-09-14 22:36:24 +03:00
ctx context.Context
2023-04-18 21:57:55 +03:00
cancelFunc context.CancelFunc
2023-04-27 02:31:03 +03:00
listeners []OnConsensusBroken
2023-04-18 21:57:55 +03:00
backendGroup *BackendGroup
backendState map[*Backend]*backendState
consensusGroupMux sync.Mutex
consensusGroup []*Backend
tracker ConsensusTracker
asyncHandler ConsensusAsyncHandler
2023-04-25 20:26:55 +03:00
2023-05-27 14:14:29 +03:00
minPeerCount uint64
2023-04-25 20:26:55 +03:00
banPeriod time.Duration
maxUpdateThreshold time.Duration
maxBlockLag uint64
maxBlockRange uint64
interval time.Duration
2023-04-18 21:57:55 +03:00
}
type backendState struct {
backendStateMux sync.Mutex
latestBlockNumber hexutil.Uint64
latestBlockHash string
safeBlockNumber hexutil.Uint64
finalizedBlockNumber hexutil.Uint64
peerCount uint64
inSync bool
2023-04-18 21:57:55 +03:00
lastUpdate time.Time
bannedUntil time.Time
}
2023-05-27 17:38:19 +03:00
func (bs *backendState) IsBanned() bool {
return time.Now().Before(bs.bannedUntil)
}
func (bs *backendState) GetLatestBlock() (hexutil.Uint64, string) {
bs.backendStateMux.Lock()
defer bs.backendStateMux.Unlock()
return bs.latestBlockNumber, bs.latestBlockHash
}
func (bs *backendState) GetSafeBlockNumber() hexutil.Uint64 {
bs.backendStateMux.Lock()
defer bs.backendStateMux.Unlock()
return bs.safeBlockNumber
}
func (bs *backendState) GetFinalizedBlockNumber() hexutil.Uint64 {
bs.backendStateMux.Lock()
defer bs.backendStateMux.Unlock()
return bs.finalizedBlockNumber
}
2023-04-18 21:57:55 +03:00
// GetConsensusGroup returns the backend members that are agreeing in a consensus
func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
defer cp.consensusGroupMux.Unlock()
cp.consensusGroupMux.Lock()
2023-04-25 20:26:55 +03:00
g := make([]*Backend, len(cp.consensusGroup))
2023-04-18 21:57:55 +03:00
copy(g, cp.consensusGroup)
return g
}
// GetLatestBlockNumber returns the `latest` agreed block number in a consensus
func (ct *ConsensusPoller) GetLatestBlockNumber() hexutil.Uint64 {
return ct.tracker.GetLatestBlockNumber()
}
// GetSafeBlockNumber returns the `safe` agreed block number in a consensus
func (ct *ConsensusPoller) GetSafeBlockNumber() hexutil.Uint64 {
return ct.tracker.GetSafeBlockNumber()
2023-04-18 21:57:55 +03:00
}
// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus
func (ct *ConsensusPoller) GetFinalizedBlockNumber() hexutil.Uint64 {
return ct.tracker.GetFinalizedBlockNumber()
}
2023-04-18 21:57:55 +03:00
func (cp *ConsensusPoller) Shutdown() {
cp.asyncHandler.Shutdown()
}
// ConsensusAsyncHandler controls the asynchronous polling mechanism, interval and shutdown
type ConsensusAsyncHandler interface {
Init()
Shutdown()
}
// NoopAsyncHandler allows fine control updating the consensus
type NoopAsyncHandler struct{}
func NewNoopAsyncHandler() ConsensusAsyncHandler {
log.Warn("using NewNoopAsyncHandler")
return &NoopAsyncHandler{}
}
func (ah *NoopAsyncHandler) Init() {}
func (ah *NoopAsyncHandler) Shutdown() {}
// PollerAsyncHandler asynchronously updates each individual backend and the group consensus
type PollerAsyncHandler struct {
ctx context.Context
cp *ConsensusPoller
}
func NewPollerAsyncHandler(ctx context.Context, cp *ConsensusPoller) ConsensusAsyncHandler {
return &PollerAsyncHandler{
ctx: ctx,
cp: cp,
}
}
func (ah *PollerAsyncHandler) Init() {
// create the individual backend pollers.
log.Info("total number of primary candidates", "primaries", len(ah.cp.backendGroup.Primaries()))
log.Info("total number of fallback candidates", "fallbacks", len(ah.cp.backendGroup.Fallbacks()))
for _, be := range ah.cp.backendGroup.Primaries() {
2023-04-18 21:57:55 +03:00
go func(be *Backend) {
for {
timer := time.NewTimer(ah.cp.interval)
2023-04-18 21:57:55 +03:00
ah.cp.UpdateBackend(ah.ctx, be)
select {
case <-timer.C:
case <-ah.ctx.Done():
timer.Stop()
return
}
}
}(be)
}
for _, be := range ah.cp.backendGroup.Fallbacks() {
go func(be *Backend) {
for {
timer := time.NewTimer(ah.cp.interval)
healthyCandidates := ah.cp.FilterCandidates(ah.cp.backendGroup.Primaries())
log.Info("number of healthy primary candidates", "healthy_candidates", len(healthyCandidates))
if len(healthyCandidates) == 0 {
log.Debug("zero healthy candidates, querying fallback backend",
"backend_name", be.Name)
ah.cp.UpdateBackend(ah.ctx, be)
}
2023-04-18 21:57:55 +03:00
select {
case <-timer.C:
case <-ah.ctx.Done():
timer.Stop()
return
}
}
}(be)
}
// create the group consensus poller
go func() {
for {
timer := time.NewTimer(ah.cp.interval)
log.Info("updating backend group consensus")
2023-04-18 21:57:55 +03:00
ah.cp.UpdateBackendGroupConsensus(ah.ctx)
select {
case <-timer.C:
case <-ah.ctx.Done():
timer.Stop()
return
}
}
}()
}
func (ah *PollerAsyncHandler) Shutdown() {
ah.cp.cancelFunc()
}
type ConsensusOpt func(cp *ConsensusPoller)
func WithTracker(tracker ConsensusTracker) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.tracker = tracker
}
}
func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.asyncHandler = asyncHandler
}
}
2023-04-27 02:31:03 +03:00
func WithListener(listener OnConsensusBroken) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.AddListener(listener)
}
}
func (cp *ConsensusPoller) AddListener(listener OnConsensusBroken) {
cp.listeners = append(cp.listeners, listener)
}
2023-05-27 05:07:35 +03:00
func (cp *ConsensusPoller) ClearListeners() {
cp.listeners = []OnConsensusBroken{}
}
2023-04-25 20:26:55 +03:00
func WithBanPeriod(banPeriod time.Duration) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.banPeriod = banPeriod
}
}
func WithMaxUpdateThreshold(maxUpdateThreshold time.Duration) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.maxUpdateThreshold = maxUpdateThreshold
}
}
func WithMaxBlockLag(maxBlockLag uint64) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.maxBlockLag = maxBlockLag
}
}
func WithMaxBlockRange(maxBlockRange uint64) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.maxBlockRange = maxBlockRange
}
}
2023-04-25 20:26:55 +03:00
func WithMinPeerCount(minPeerCount uint64) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.minPeerCount = minPeerCount
}
}
func WithPollerInterval(interval time.Duration) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.interval = interval
}
}
2023-04-18 21:57:55 +03:00
func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller {
ctx, cancelFunc := context.WithCancel(context.Background())
state := make(map[*Backend]*backendState, len(bg.Backends))
cp := &ConsensusPoller{
2023-09-14 22:36:24 +03:00
ctx: ctx,
2023-04-18 21:57:55 +03:00
cancelFunc: cancelFunc,
backendGroup: bg,
backendState: state,
2023-04-25 20:26:55 +03:00
banPeriod: 5 * time.Minute,
maxUpdateThreshold: 30 * time.Second,
2023-05-27 14:14:29 +03:00
maxBlockLag: 8, // 8*12 seconds = 96 seconds ~ 1.6 minutes
2023-04-25 20:26:55 +03:00
minPeerCount: 3,
interval: DefaultPollerInterval,
2023-04-18 21:57:55 +03:00
}
for _, opt := range opts {
opt(cp)
}
if cp.tracker == nil {
cp.tracker = NewInMemoryConsensusTracker()
}
if cp.asyncHandler == nil {
cp.asyncHandler = NewPollerAsyncHandler(ctx, cp)
}
2023-05-27 20:18:34 +03:00
cp.Reset()
2023-04-18 21:57:55 +03:00
cp.asyncHandler.Init()
return cp
}
2023-09-14 22:42:33 +03:00
// UpdateBackend refreshes the consensus state of a single backend
2023-04-18 21:57:55 +03:00
func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
bs := cp.GetBackendState(be)
RecordConsensusBackendBanned(be, bs.IsBanned())
2023-05-10 03:21:25 +03:00
if bs.IsBanned() {
log.Debug("skipping backend - banned", "backend", be.Name)
2023-05-10 03:21:25 +03:00
return
2023-04-18 21:57:55 +03:00
}
2023-09-14 22:42:33 +03:00
// if backend is not healthy state we'll only resume checking it after ban
if !be.IsHealthy() && !be.forcedCandidate {
2023-05-27 14:14:29 +03:00
log.Warn("backend banned - not healthy", "backend", be.Name)
2023-05-09 13:58:35 +03:00
cp.Ban(be)
2023-05-10 03:21:25 +03:00
return
2023-04-18 21:57:55 +03:00
}
2023-04-25 20:26:55 +03:00
inSync, err := cp.isInSync(ctx, be)
2023-05-12 01:33:47 +03:00
RecordConsensusBackendInSync(be, err == nil && inSync)
if err != nil {
2023-09-14 22:42:33 +03:00
log.Warn("error updating backend sync state", "name", be.Name, "err", err)
return
2023-04-25 20:26:55 +03:00
}
2023-05-05 01:46:08 +03:00
var peerCount uint64
if !be.skipPeerCountCheck {
peerCount, err = cp.getPeerCount(ctx, be)
if err != nil {
2023-05-12 01:33:47 +03:00
log.Warn("error updating backend peer count", "name", be.Name, "err", err)
return
}
if peerCount == 0 {
log.Warn("peer count responded with 200 and 0 peers", "name", be.Name)
be.intermittentErrorsSlidingWindow.Incr()
return
2023-05-05 01:46:08 +03:00
}
RecordConsensusBackendPeerCount(be, peerCount)
2023-04-25 20:26:55 +03:00
}
2023-04-18 21:57:55 +03:00
latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest")
if err != nil {
log.Warn("error updating backend - latest block will not be updated", "name", be.Name, "err", err)
return
}
if latestBlockNumber == 0 {
log.Warn("error backend responded a 200 with blockheight 0 for latest block", "name", be.Name)
be.intermittentErrorsSlidingWindow.Incr()
return
2023-04-18 21:57:55 +03:00
}
2023-05-27 00:22:50 +03:00
safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe")
if err != nil {
log.Warn("error updating backend - safe block will not be updated", "name", be.Name, "err", err)
return
}
if safeBlockNumber == 0 {
log.Warn("error backend responded a 200 with blockheight 0 for safe block", "name", be.Name)
be.intermittentErrorsSlidingWindow.Incr()
return
}
2023-05-27 00:22:50 +03:00
finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized")
if err != nil {
log.Warn("error updating backend - finalized block will not be updated", "name", be.Name, "err", err)
return
}
if finalizedBlockNumber == 0 {
log.Warn("error backend responded a 200 with blockheight 0 for finalized block", "name", be.Name)
be.intermittentErrorsSlidingWindow.Incr()
return
}
2023-05-27 20:18:34 +03:00
RecordConsensusBackendUpdateDelay(be, bs.lastUpdate)
2023-05-27 00:22:50 +03:00
changed := cp.setBackendState(be, peerCount, inSync,
latestBlockNumber, latestBlockHash,
safeBlockNumber, finalizedBlockNumber)
2023-04-18 21:57:55 +03:00
2023-05-27 00:22:50 +03:00
RecordBackendLatestBlock(be, latestBlockNumber)
RecordBackendSafeBlock(be, safeBlockNumber)
RecordBackendFinalizedBlock(be, finalizedBlockNumber)
if changed {
2023-09-14 22:42:33 +03:00
log.Debug("backend state updated",
"name", be.Name,
"peerCount", peerCount,
"inSync", inSync,
"latestBlockNumber", latestBlockNumber,
"latestBlockHash", latestBlockHash,
"safeBlockNumber", safeBlockNumber,
"finalizedBlockNumber", finalizedBlockNumber,
2023-05-27 20:18:34 +03:00
"lastUpdate", bs.lastUpdate)
}
// sanity check for latest, safe and finalized block tags
expectedBlockTags := cp.checkExpectedBlockTags(
latestBlockNumber,
bs.safeBlockNumber, safeBlockNumber,
bs.finalizedBlockNumber, finalizedBlockNumber)
2023-05-27 00:22:50 +03:00
RecordBackendUnexpectedBlockTags(be, !expectedBlockTags)
if !expectedBlockTags && !be.forcedCandidate {
2023-05-27 00:22:50 +03:00
log.Warn("backend banned - unexpected block tags",
"backend", be.Name,
"oldFinalized", bs.finalizedBlockNumber,
2023-05-27 00:22:50 +03:00
"finalizedBlockNumber", finalizedBlockNumber,
"oldSafe", bs.safeBlockNumber,
2023-05-27 00:22:50 +03:00
"safeBlockNumber", safeBlockNumber,
"latestBlockNumber", latestBlockNumber,
)
cp.Ban(be)
}
2023-04-18 21:57:55 +03:00
}
2023-05-27 00:22:50 +03:00
// checkExpectedBlockTags for unexpected conditions on block tags
// - finalized block number should never decrease
// - safe block number should never decrease
2023-05-27 01:57:22 +03:00
// - finalized block should be <= safe block <= latest block
func (cp *ConsensusPoller) checkExpectedBlockTags(
currentLatest hexutil.Uint64,
oldSafe hexutil.Uint64, currentSafe hexutil.Uint64,
oldFinalized hexutil.Uint64, currentFinalized hexutil.Uint64) bool {
2023-05-27 00:22:50 +03:00
return currentFinalized >= oldFinalized &&
currentSafe >= oldSafe &&
currentFinalized <= currentSafe &&
currentSafe <= currentLatest
}
2023-09-14 22:42:33 +03:00
// UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends
2023-04-18 21:57:55 +03:00
func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
2023-09-14 22:42:33 +03:00
// get the latest block number from the tracker
currentConsensusBlockNumber := cp.GetLatestBlockNumber()
2023-04-18 21:57:55 +03:00
// get the candidates for the consensus group
candidates := cp.getConsensusCandidates()
2023-04-25 20:26:55 +03:00
// update the lowest latest block number and hash
// the lowest safe block number
// the lowest finalized block number
2023-05-27 14:14:29 +03:00
var lowestLatestBlock hexutil.Uint64
var lowestLatestBlockHash string
var lowestFinalizedBlock hexutil.Uint64
var lowestSafeBlock hexutil.Uint64
for _, bs := range candidates {
2023-05-27 14:14:29 +03:00
if lowestLatestBlock == 0 || bs.latestBlockNumber < lowestLatestBlock {
lowestLatestBlock = bs.latestBlockNumber
lowestLatestBlockHash = bs.latestBlockHash
}
2023-05-27 14:14:29 +03:00
if lowestFinalizedBlock == 0 || bs.finalizedBlockNumber < lowestFinalizedBlock {
lowestFinalizedBlock = bs.finalizedBlockNumber
}
2023-05-27 14:14:29 +03:00
if lowestSafeBlock == 0 || bs.safeBlockNumber < lowestSafeBlock {
lowestSafeBlock = bs.safeBlockNumber
2023-04-18 21:57:55 +03:00
}
}
2023-05-27 14:14:29 +03:00
// find the proposed block among the candidates
// the proposed block needs have the same hash in the entire consensus group
proposedBlock := lowestLatestBlock
proposedBlockHash := lowestLatestBlockHash
2023-04-18 21:57:55 +03:00
hasConsensus := false
broken := false
2023-04-18 21:57:55 +03:00
if lowestLatestBlock > currentConsensusBlockNumber {
log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock)
2023-04-18 21:57:55 +03:00
}
// if there is a block to propose, check if it is the same in all backends
2023-05-27 01:54:04 +03:00
if proposedBlock > 0 {
for !hasConsensus {
allAgreed := true
2023-05-27 17:38:19 +03:00
for be := range candidates {
2023-05-27 01:54:04 +03:00
actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String())
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
continue
}
if proposedBlockHash == "" {
proposedBlockHash = actualBlockHash
2023-04-18 21:57:55 +03:00
}
2023-05-27 01:54:04 +03:00
blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash)
if blocksDontMatch {
if currentConsensusBlockNumber >= actualBlockNumber {
log.Warn("backend broke consensus",
"name", be.Name,
"actualBlockNumber", actualBlockNumber,
"actualBlockHash", actualBlockHash,
"proposedBlock", proposedBlock,
"proposedBlockHash", proposedBlockHash)
2023-05-27 01:54:04 +03:00
broken = true
}
allAgreed = false
break
}
}
if allAgreed {
hasConsensus = true
} else {
// walk one block behind and try again
proposedBlock -= 1
proposedBlockHash = ""
log.Debug("no consensus, now trying", "block:", proposedBlock)
2023-04-18 21:57:55 +03:00
}
}
}
if broken {
// propagate event to other interested parts, such as cache invalidator
2023-04-27 02:31:03 +03:00
for _, l := range cp.listeners {
l()
}
2023-05-27 14:14:29 +03:00
log.Info("consensus broken",
"currentConsensusBlockNumber", currentConsensusBlockNumber,
"proposedBlock", proposedBlock,
"proposedBlockHash", proposedBlockHash)
2023-04-18 21:57:55 +03:00
}
// update tracker
cp.tracker.SetLatestBlockNumber(proposedBlock)
cp.tracker.SetSafeBlockNumber(lowestSafeBlock)
2023-05-27 00:22:50 +03:00
cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock)
2023-05-27 14:14:29 +03:00
// update consensus group
group := make([]*Backend, 0, len(candidates))
consensusBackendsNames := make([]string, 0, len(candidates))
filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
for _, be := range cp.backendGroup.Backends {
_, exist := candidates[be]
if exist {
group = append(group, be)
consensusBackendsNames = append(consensusBackendsNames, be.Name)
} else {
filteredBackendsNames = append(filteredBackendsNames, be.Name)
}
2023-05-27 14:14:29 +03:00
}
2023-04-18 21:57:55 +03:00
cp.consensusGroupMux.Lock()
2023-05-27 14:14:29 +03:00
cp.consensusGroup = group
2023-04-18 21:57:55 +03:00
cp.consensusGroupMux.Unlock()
2023-05-10 03:21:25 +03:00
RecordGroupConsensusLatestBlock(cp.backendGroup, proposedBlock)
2023-05-27 00:22:50 +03:00
RecordGroupConsensusSafeBlock(cp.backendGroup, lowestSafeBlock)
RecordGroupConsensusFinalizedBlock(cp.backendGroup, lowestFinalizedBlock)
2023-05-27 14:14:29 +03:00
RecordGroupConsensusCount(cp.backendGroup, len(group))
2023-05-10 05:17:25 +03:00
RecordGroupConsensusFilteredCount(cp.backendGroup, len(filteredBackendsNames))
RecordGroupTotalCount(cp.backendGroup, len(cp.backendGroup.Backends))
2023-05-10 03:21:25 +03:00
2023-09-14 22:42:33 +03:00
log.Debug("group state",
2023-05-27 14:14:29 +03:00
"proposedBlock", proposedBlock,
"consensusBackends", strings.Join(consensusBackendsNames, ", "),
"filteredBackends", strings.Join(filteredBackendsNames, ", "))
2023-04-18 21:57:55 +03:00
}
2023-05-09 13:58:35 +03:00
// IsBanned checks if a specific backend is banned
func (cp *ConsensusPoller) IsBanned(be *Backend) bool {
bs := cp.backendState[be]
defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock()
return bs.IsBanned()
2023-05-09 13:58:35 +03:00
}
// IsBanned checks if a specific backend is banned
func (cp *ConsensusPoller) BannedUntil(be *Backend) time.Time {
bs := cp.backendState[be]
defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock()
return bs.bannedUntil
}
2023-05-09 13:58:35 +03:00
// Ban bans a specific backend
func (cp *ConsensusPoller) Ban(be *Backend) {
if be.forcedCandidate {
return
}
2023-05-09 13:58:35 +03:00
bs := cp.backendState[be]
defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock()
bs.bannedUntil = time.Now().Add(cp.banPeriod)
2023-09-14 22:42:33 +03:00
// when we ban a node, we give it the chance to start from any block when it is back
bs.latestBlockNumber = 0
2023-05-27 01:54:04 +03:00
bs.safeBlockNumber = 0
bs.finalizedBlockNumber = 0
}
2023-09-14 22:42:33 +03:00
// Unban removes any bans from the backends
2023-05-27 01:54:04 +03:00
func (cp *ConsensusPoller) Unban(be *Backend) {
bs := cp.backendState[be]
defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock()
bs.bannedUntil = time.Now().Add(-10 * time.Hour)
2023-05-09 13:58:35 +03:00
}
2023-05-27 05:07:35 +03:00
// Reset reset all backend states
2023-05-27 00:22:50 +03:00
func (cp *ConsensusPoller) Reset() {
2023-04-25 20:26:55 +03:00
for _, be := range cp.backendGroup.Backends {
2023-05-27 00:22:50 +03:00
cp.backendState[be] = &backendState{}
2023-04-25 20:26:55 +03:00
}
}
2023-09-14 22:42:33 +03:00
// fetchBlock is a convenient wrapper to make a request to get a block directly from the backend
2023-04-21 20:36:42 +03:00
func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) {
2023-04-18 21:57:55 +03:00
var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_getBlockByNumber", block, false)
if err != nil {
2023-04-21 20:36:42 +03:00
return 0, "", err
2023-04-18 21:57:55 +03:00
}
jsonMap, ok := rpcRes.Result.(map[string]interface{})
if !ok {
2023-04-25 20:26:55 +03:00
return 0, "", fmt.Errorf("unexpected response to eth_getBlockByNumber on backend %s", be.Name)
2023-04-18 21:57:55 +03:00
}
2023-04-21 20:36:42 +03:00
blockNumber = hexutil.Uint64(hexutil.MustDecodeUint64(jsonMap["number"].(string)))
2023-04-18 21:57:55 +03:00
blockHash = jsonMap["hash"].(string)
return
}
2023-09-14 22:42:33 +03:00
// getPeerCount is a convenient wrapper to retrieve the current peer count from the backend
2023-04-25 20:26:55 +03:00
func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count uint64, err error) {
var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "net_peerCount")
if err != nil {
return 0, err
}
jsonMap, ok := rpcRes.Result.(string)
if !ok {
return 0, fmt.Errorf("unexpected response to net_peerCount on backend %s", be.Name)
}
count = hexutil.MustDecodeUint64(jsonMap)
return count, nil
}
2023-09-14 22:42:33 +03:00
// isInSync is a convenient wrapper to check if the backend is in sync from the network
2023-04-25 20:26:55 +03:00
func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bool, err error) {
var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_syncing")
if err != nil {
return false, err
}
var res bool
switch typed := rpcRes.Result.(type) {
case bool:
syncing := typed
res = !syncing
case string:
syncing, err := strconv.ParseBool(typed)
if err != nil {
return false, err
}
res = !syncing
default:
// result is a json when not in sync
res = false
}
return res, nil
}
// GetBackendState creates a copy of backend state so that the caller can use it without locking
func (cp *ConsensusPoller) GetBackendState(be *Backend) *backendState {
2023-04-18 21:57:55 +03:00
bs := cp.backendState[be]
2023-05-12 01:33:47 +03:00
defer bs.backendStateMux.Unlock()
2023-04-18 21:57:55 +03:00
bs.backendStateMux.Lock()
2023-05-27 14:14:29 +03:00
return &backendState{
latestBlockNumber: bs.latestBlockNumber,
latestBlockHash: bs.latestBlockHash,
safeBlockNumber: bs.safeBlockNumber,
finalizedBlockNumber: bs.finalizedBlockNumber,
peerCount: bs.peerCount,
inSync: bs.inSync,
lastUpdate: bs.lastUpdate,
bannedUntil: bs.bannedUntil,
}
2023-04-18 21:57:55 +03:00
}
func (cp *ConsensusPoller) GetLastUpdate(be *Backend) time.Time {
bs := cp.backendState[be]
defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock()
return bs.lastUpdate
}
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool,
latestBlockNumber hexutil.Uint64, latestBlockHash string,
safeBlockNumber hexutil.Uint64,
finalizedBlockNumber hexutil.Uint64) bool {
2023-04-18 21:57:55 +03:00
bs := cp.backendState[be]
bs.backendStateMux.Lock()
changed := bs.latestBlockHash != latestBlockHash
2023-04-25 20:26:55 +03:00
bs.peerCount = peerCount
2023-05-12 01:33:47 +03:00
bs.inSync = inSync
bs.latestBlockNumber = latestBlockNumber
bs.latestBlockHash = latestBlockHash
bs.finalizedBlockNumber = finalizedBlockNumber
bs.safeBlockNumber = safeBlockNumber
2023-04-18 21:57:55 +03:00
bs.lastUpdate = time.Now()
bs.backendStateMux.Unlock()
return changed
}
// getConsensusCandidates will search for candidates in the primary group,
// if there are none it will search for candidates in he fallback group
func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState {
healthyPrimaries := cp.FilterCandidates(cp.backendGroup.Primaries())
RecordHealthyCandidates(cp.backendGroup, len(healthyPrimaries))
if len(healthyPrimaries) > 0 {
return healthyPrimaries
}
return cp.FilterCandidates(cp.backendGroup.Fallbacks())
}
// filterCandidates find out what backends are the candidates to be in the consensus group
2023-09-14 22:42:33 +03:00
// and create a copy of current their state
//
// a candidate is a serving node within the following conditions:
// - not banned
// - healthy (network latency and error rate)
// - with minimum peer count
// - in sync
// - updated recently
// - not lagging latest block
func (cp *ConsensusPoller) FilterCandidates(backends []*Backend) map[*Backend]*backendState {
candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends))
for _, be := range backends {
bs := cp.GetBackendState(be)
if be.forcedCandidate {
candidates[be] = bs
continue
}
if bs.IsBanned() {
continue
}
if !be.IsHealthy() {
continue
}
if !be.skipPeerCountCheck && bs.peerCount < cp.minPeerCount {
log.Debug("backend peer count too low for inclusion in consensus",
"backend_name", be.Name,
"peer_count", bs.peerCount,
"min_peer_count", cp.minPeerCount,
)
continue
}
if !bs.inSync {
continue
}
if bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue
}
candidates[be] = bs
}
// find the highest block, in order to use it defining the highest non-lagging ancestor block
var highestLatestBlock hexutil.Uint64
for _, bs := range candidates {
if bs.latestBlockNumber > highestLatestBlock {
highestLatestBlock = bs.latestBlockNumber
}
}
// find the highest common ancestor block
lagging := make([]*Backend, 0, len(candidates))
for be, bs := range candidates {
// check if backend is lagging behind the highest block
2023-05-27 20:18:34 +03:00
if uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag {
lagging = append(lagging, be)
}
}
2023-09-14 22:42:33 +03:00
// remove lagging backends from the candidates
for _, be := range lagging {
delete(candidates, be)
}
return candidates
2023-04-18 21:57:55 +03:00
}