Merge pull request #5794 from ethereum-optimism/felipe/consensus-finalized-safe

feat(proxyd): track consensus for {safe,finalized} blocks and rewrite tags
This commit is contained in:
OptimismBot 2023-05-27 10:57:47 -04:00 committed by GitHub
commit 45061a1c8c
12 changed files with 1109 additions and 760 deletions

@ -1,4 +1,4 @@
FROM golang:1.18.0-alpine3.15 as builder FROM golang:1.20.4-alpine3.18 as builder
ARG GITCOMMIT=docker ARG GITCOMMIT=docker
ARG GITDATE=docker ARG GITDATE=docker
@ -12,7 +12,7 @@ WORKDIR /app
RUN make proxyd RUN make proxyd
FROM alpine:3.15 FROM alpine:3.18
COPY ./proxyd/entrypoint.sh /bin/entrypoint.sh COPY ./proxyd/entrypoint.sh /bin/entrypoint.sh

@ -374,7 +374,6 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method
func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
// we are concerned about network error rates, so we record 1 request independently of how many are in the batch // we are concerned about network error rates, so we record 1 request independently of how many are in the batch
b.networkRequestsSlidingWindow.Incr() b.networkRequestsSlidingWindow.Incr()
RecordBackendNetworkRequestCountSlidingWindow(b, b.networkRequestsSlidingWindow.Count())
isSingleElementBatch := len(rpcReqs) == 1 isSingleElementBatch := len(rpcReqs) == 1
@ -391,7 +390,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body)) httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body))
if err != nil { if err != nil {
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, wrapErr(err, "error creating backend request") return nil, wrapErr(err, "error creating backend request")
} }
@ -413,7 +412,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
httpRes, err := b.client.DoLimited(httpReq) httpRes, err := b.client.DoLimited(httpReq)
if err != nil { if err != nil {
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, wrapErr(err, "error in backend request") return nil, wrapErr(err, "error in backend request")
} }
@ -432,7 +431,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
// Alchemy returns a 400 on bad JSONs, so handle that case // Alchemy returns a 400 on bad JSONs, so handle that case
if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 { if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 {
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, fmt.Errorf("response code %d", httpRes.StatusCode) return nil, fmt.Errorf("response code %d", httpRes.StatusCode)
} }
@ -440,7 +439,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
resB, err := io.ReadAll(io.LimitReader(httpRes.Body, b.maxResponseSize)) resB, err := io.ReadAll(io.LimitReader(httpRes.Body, b.maxResponseSize))
if err != nil { if err != nil {
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, wrapErr(err, "error reading response body") return nil, wrapErr(err, "error reading response body")
} }
@ -458,18 +457,18 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
// Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method // Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method
if responseIsNotBatched(resB) { if responseIsNotBatched(resB) {
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, ErrBackendUnexpectedJSONRPC return nil, ErrBackendUnexpectedJSONRPC
} }
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, ErrBackendBadResponse return nil, ErrBackendBadResponse
} }
} }
if len(rpcReqs) != len(res) { if len(rpcReqs) != len(res) {
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, ErrBackendUnexpectedJSONRPC return nil, ErrBackendUnexpectedJSONRPC
} }
@ -483,6 +482,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
duration := time.Since(start) duration := time.Since(start)
b.latencySlidingWindow.Add(float64(duration)) b.latencySlidingWindow.Add(float64(duration))
RecordBackendNetworkLatencyAverageSlidingWindow(b, time.Duration(b.latencySlidingWindow.Avg())) RecordBackendNetworkLatencyAverageSlidingWindow(b, time.Duration(b.latencySlidingWindow.Avg()))
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
sortBatchRPCResponse(rpcReqs, res) sortBatchRPCResponse(rpcReqs, res)
return res, nil return res, nil
@ -490,11 +490,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
// IsHealthy checks if the backend is able to serve traffic, based on dynamic parameters // IsHealthy checks if the backend is able to serve traffic, based on dynamic parameters
func (b *Backend) IsHealthy() bool { func (b *Backend) IsHealthy() bool {
errorRate := float64(0) errorRate := b.ErrorRate()
// avoid division-by-zero when the window is empty
if b.networkRequestsSlidingWindow.Sum() >= 10 {
errorRate = b.networkErrorsSlidingWindow.Sum() / b.networkRequestsSlidingWindow.Sum()
}
avgLatency := time.Duration(b.latencySlidingWindow.Avg()) avgLatency := time.Duration(b.latencySlidingWindow.Avg())
if errorRate >= b.maxErrorRateThreshold { if errorRate >= b.maxErrorRateThreshold {
return false return false
@ -505,6 +501,16 @@ func (b *Backend) IsHealthy() bool {
return true return true
} }
// ErrorRate returns the instant error rate of the backend
func (b *Backend) ErrorRate() (errorRate float64) {
// we only really start counting the error rate after a minimum of 10 requests
// this is to avoid false positives when the backend is just starting up
if b.networkRequestsSlidingWindow.Sum() >= 10 {
errorRate = b.networkErrorsSlidingWindow.Sum() / b.networkRequestsSlidingWindow.Sum()
}
return errorRate
}
// IsDegraded checks if the backend is serving traffic in a degraded state (i.e. used as a last resource) // IsDegraded checks if the backend is serving traffic in a degraded state (i.e. used as a last resource)
func (b *Backend) IsDegraded() bool { func (b *Backend) IsDegraded() bool {
avgLatency := time.Duration(b.latencySlidingWindow.Avg()) avgLatency := time.Duration(b.latencySlidingWindow.Avg())
@ -556,7 +562,11 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
backends = bg.loadBalancedConsensusGroup() backends = bg.loadBalancedConsensusGroup()
// We also rewrite block tags to enforce compliance with consensus // We also rewrite block tags to enforce compliance with consensus
rctx := RewriteContext{latest: bg.Consensus.GetConsensusBlockNumber()} rctx := RewriteContext{
latest: bg.Consensus.GetLatestBlockNumber(),
finalized: bg.Consensus.GetFinalizedBlockNumber(),
safe: bg.Consensus.GetSafeBlockNumber(),
}
for i, req := range rpcReqs { for i, req := range rpcReqs {
res := RPCRes{JSONRPC: JSONRPCVersion, ID: req.ID} res := RPCRes{JSONRPC: JSONRPCVersion, ID: req.ID}

@ -35,7 +35,6 @@ type ConsensusPoller struct {
asyncHandler ConsensusAsyncHandler asyncHandler ConsensusAsyncHandler
minPeerCount uint64 minPeerCount uint64
banPeriod time.Duration banPeriod time.Duration
maxUpdateThreshold time.Duration maxUpdateThreshold time.Duration
maxBlockLag uint64 maxBlockLag uint64
@ -46,6 +45,10 @@ type backendState struct {
latestBlockNumber hexutil.Uint64 latestBlockNumber hexutil.Uint64
latestBlockHash string latestBlockHash string
finalizedBlockNumber hexutil.Uint64
safeBlockNumber hexutil.Uint64
peerCount uint64 peerCount uint64
inSync bool inSync bool
@ -54,6 +57,10 @@ type backendState struct {
bannedUntil time.Time bannedUntil time.Time
} }
func (bs *backendState) IsBanned() bool {
return time.Now().Before(bs.bannedUntil)
}
// GetConsensusGroup returns the backend members that are agreeing in a consensus // GetConsensusGroup returns the backend members that are agreeing in a consensus
func (cp *ConsensusPoller) GetConsensusGroup() []*Backend { func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
defer cp.consensusGroupMux.Unlock() defer cp.consensusGroupMux.Unlock()
@ -65,9 +72,19 @@ func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
return g return g
} }
// GetConsensusBlockNumber returns the agreed block number in a consensus // GetLatestBlockNumber returns the `latest` agreed block number in a consensus
func (ct *ConsensusPoller) GetConsensusBlockNumber() hexutil.Uint64 { func (ct *ConsensusPoller) GetLatestBlockNumber() hexutil.Uint64 {
return ct.tracker.GetConsensusBlockNumber() return ct.tracker.GetLatestBlockNumber()
}
// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus
func (ct *ConsensusPoller) GetFinalizedBlockNumber() hexutil.Uint64 {
return ct.tracker.GetFinalizedBlockNumber()
}
// GetSafeBlockNumber returns the `safe` agreed block number in a consensus
func (ct *ConsensusPoller) GetSafeBlockNumber() hexutil.Uint64 {
return ct.tracker.GetSafeBlockNumber()
} }
func (cp *ConsensusPoller) Shutdown() { func (cp *ConsensusPoller) Shutdown() {
@ -163,6 +180,10 @@ func (cp *ConsensusPoller) AddListener(listener OnConsensusBroken) {
cp.listeners = append(cp.listeners, listener) cp.listeners = append(cp.listeners, listener)
} }
func (cp *ConsensusPoller) ClearListeners() {
cp.listeners = []OnConsensusBroken{}
}
func WithBanPeriod(banPeriod time.Duration) ConsensusOpt { func WithBanPeriod(banPeriod time.Duration) ConsensusOpt {
return func(cp *ConsensusPoller) { return func(cp *ConsensusPoller) {
cp.banPeriod = banPeriod cp.banPeriod = banPeriod
@ -202,7 +223,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
banPeriod: 5 * time.Minute, banPeriod: 5 * time.Minute,
maxUpdateThreshold: 30 * time.Second, maxUpdateThreshold: 30 * time.Second,
maxBlockLag: 50, maxBlockLag: 8, // 8*12 seconds = 96 seconds ~ 1.6 minutes
minPeerCount: 3, minPeerCount: 3,
} }
@ -225,22 +246,21 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
// UpdateBackend refreshes the consensus state of a single backend // UpdateBackend refreshes the consensus state of a single backend
func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
banned := cp.IsBanned(be) bs := cp.getBackendState(be)
RecordConsensusBackendBanned(be, banned) RecordConsensusBackendBanned(be, bs.IsBanned())
if banned { if bs.IsBanned() {
log.Debug("skipping backend - banned", "backend", be.Name) log.Debug("skipping backend - banned", "backend", be.Name)
return return
} }
// if backend is not healthy state we'll only resume checking it after ban // if backend is not healthy state we'll only resume checking it after ban
if !be.IsHealthy() { if !be.IsHealthy() {
log.Warn("backend banned - not online or not healthy", "backend", be.Name) log.Warn("backend banned - not healthy", "backend", be.Name)
cp.Ban(be) cp.Ban(be)
return return
} }
// if backend it not in sync we'll check again after ban
inSync, err := cp.isInSync(ctx, be) inSync, err := cp.isInSync(ctx, be)
RecordConsensusBackendInSync(be, err == nil && inSync) RecordConsensusBackendInSync(be, err == nil && inSync)
if err != nil { if err != nil {
@ -258,120 +278,123 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest") latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest")
if err != nil { if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err) log.Warn("error updating backend - latest block", "name", be.Name, "err", err)
} }
changed, updateDelay := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash) safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe")
if err != nil {
log.Warn("error updating backend - safe block", "name", be.Name, "err", err)
}
finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized")
if err != nil {
log.Warn("error updating backend - finalized block", "name", be.Name, "err", err)
}
oldFinalized := bs.finalizedBlockNumber
oldSafe := bs.safeBlockNumber
updateDelay := time.Since(bs.lastUpdate)
RecordConsensusBackendUpdateDelay(be, updateDelay)
changed := cp.setBackendState(be, peerCount, inSync,
latestBlockNumber, latestBlockHash,
finalizedBlockNumber, safeBlockNumber)
RecordBackendLatestBlock(be, latestBlockNumber)
RecordBackendSafeBlock(be, safeBlockNumber)
RecordBackendFinalizedBlock(be, finalizedBlockNumber)
if changed { if changed {
RecordBackendLatestBlock(be, latestBlockNumber)
RecordConsensusBackendUpdateDelay(be, updateDelay)
log.Debug("backend state updated", log.Debug("backend state updated",
"name", be.Name, "name", be.Name,
"peerCount", peerCount, "peerCount", peerCount,
"inSync", inSync, "inSync", inSync,
"latestBlockNumber", latestBlockNumber, "latestBlockNumber", latestBlockNumber,
"latestBlockHash", latestBlockHash, "latestBlockHash", latestBlockHash,
"finalizedBlockNumber", finalizedBlockNumber,
"safeBlockNumber", safeBlockNumber,
"updateDelay", updateDelay) "updateDelay", updateDelay)
} }
// sanity check for latest, safe and finalized block tags
expectedBlockTags := cp.checkExpectedBlockTags(
finalizedBlockNumber, oldFinalized,
safeBlockNumber, oldSafe,
latestBlockNumber)
RecordBackendUnexpectedBlockTags(be, !expectedBlockTags)
if !expectedBlockTags {
log.Warn("backend banned - unexpected block tags",
"backend", be.Name,
"oldFinalized", oldFinalized,
"finalizedBlockNumber", finalizedBlockNumber,
"oldSafe", oldSafe,
"safeBlockNumber", safeBlockNumber,
"latestBlockNumber", latestBlockNumber,
)
cp.Ban(be)
}
}
// checkExpectedBlockTags for unexpected conditions on block tags
// - finalized block number should never decrease
// - safe block number should never decrease
// - finalized block should be <= safe block <= latest block
func (cp *ConsensusPoller) checkExpectedBlockTags(currentFinalized hexutil.Uint64, oldFinalized hexutil.Uint64,
currentSafe hexutil.Uint64, oldSafe hexutil.Uint64,
currentLatest hexutil.Uint64) bool {
return currentFinalized >= oldFinalized &&
currentSafe >= oldSafe &&
currentFinalized <= currentSafe &&
currentSafe <= currentLatest
} }
// UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends // UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends
func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
var highestBlock hexutil.Uint64 // get the latest block number from the tracker
var lowestBlock hexutil.Uint64 currentConsensusBlockNumber := cp.GetLatestBlockNumber()
var lowestBlockHash string
currentConsensusBlockNumber := cp.GetConsensusBlockNumber() // get the candidates for the consensus group
candidates := cp.getConsensusCandidates()
// find the highest block, in order to use it defining the highest non-lagging ancestor block // update the lowest latest block number and hash
for _, be := range cp.backendGroup.Backends { // the lowest safe block number
peerCount, inSync, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be) // the lowest finalized block number
var lowestLatestBlock hexutil.Uint64
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { var lowestLatestBlockHash string
continue var lowestFinalizedBlock hexutil.Uint64
var lowestSafeBlock hexutil.Uint64
for _, bs := range candidates {
if lowestLatestBlock == 0 || bs.latestBlockNumber < lowestLatestBlock {
lowestLatestBlock = bs.latestBlockNumber
lowestLatestBlockHash = bs.latestBlockHash
} }
if !inSync { if lowestFinalizedBlock == 0 || bs.finalizedBlockNumber < lowestFinalizedBlock {
continue lowestFinalizedBlock = bs.finalizedBlockNumber
} }
if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { if lowestSafeBlock == 0 || bs.safeBlockNumber < lowestSafeBlock {
continue lowestSafeBlock = bs.safeBlockNumber
}
if backendLatestBlockNumber > highestBlock {
highestBlock = backendLatestBlockNumber
} }
} }
// find the highest common ancestor block // find the proposed block among the candidates
for _, be := range cp.backendGroup.Backends { // the proposed block needs have the same hash in the entire consensus group
peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be) proposedBlock := lowestLatestBlock
proposedBlockHash := lowestLatestBlockHash
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
continue
}
if !inSync {
continue
}
if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue
}
// check if backend is lagging behind the highest block
if backendLatestBlockNumber < highestBlock && uint64(highestBlock-backendLatestBlockNumber) > cp.maxBlockLag {
continue
}
if lowestBlock == 0 || backendLatestBlockNumber < lowestBlock {
lowestBlock = backendLatestBlockNumber
lowestBlockHash = backendLatestBlockHash
}
}
// no block to propose (i.e. initializing consensus)
if lowestBlock == 0 {
return
}
proposedBlock := lowestBlock
proposedBlockHash := lowestBlockHash
hasConsensus := false hasConsensus := false
broken := false
// check if everybody agrees on the same block hash if lowestLatestBlock > currentConsensusBlockNumber {
consensusBackends := make([]*Backend, 0, len(cp.backendGroup.Backends)) log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock)
consensusBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
if lowestBlock > currentConsensusBlockNumber {
log.Debug("validating consensus on block", "lowestBlock", lowestBlock)
} }
broken := false // if there is a block to propose, check if it is the same in all backends
if proposedBlock > 0 {
for !hasConsensus { for !hasConsensus {
allAgreed := true allAgreed := true
consensusBackends = consensusBackends[:0] for be := range candidates {
filteredBackendsNames = filteredBackendsNames[:0]
for _, be := range cp.backendGroup.Backends {
/*
a serving node needs to be:
- healthy (network)
- updated recently
- not banned
- with minimum peer count
- not lagging latest block
- in sync
*/
peerCount, inSync, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be)
notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now())
isBanned := time.Now().Before(bannedUntil)
notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount
lagging := latestBlockNumber < proposedBlock
if !be.IsHealthy() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync {
filteredBackendsNames = append(filteredBackendsNames, be.Name)
continue
}
actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String()) actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String())
if err != nil { if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err) log.Warn("error updating backend", "name", be.Name, "err", err)
@ -383,14 +406,17 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash) blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash)
if blocksDontMatch { if blocksDontMatch {
if currentConsensusBlockNumber >= actualBlockNumber { if currentConsensusBlockNumber >= actualBlockNumber {
log.Warn("backend broke consensus", "name", be.Name, "blockNum", actualBlockNumber, "proposedBlockNum", proposedBlock, "blockHash", actualBlockHash, "proposedBlockHash", proposedBlockHash) log.Warn("backend broke consensus",
"name", be.Name,
"actualBlockNumber", actualBlockNumber,
"actualBlockHash", actualBlockHash,
"proposedBlock", proposedBlock,
"proposedBlockHash", proposedBlockHash)
broken = true broken = true
} }
allAgreed = false allAgreed = false
break break
} }
consensusBackends = append(consensusBackends, be)
consensusBackendsNames = append(consensusBackendsNames, be.Name)
} }
if allAgreed { if allAgreed {
hasConsensus = true hasConsensus = true
@ -401,26 +427,54 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
log.Debug("no consensus, now trying", "block:", proposedBlock) log.Debug("no consensus, now trying", "block:", proposedBlock)
} }
} }
}
if broken { if broken {
// propagate event to other interested parts, such as cache invalidator // propagate event to other interested parts, such as cache invalidator
for _, l := range cp.listeners { for _, l := range cp.listeners {
l() l()
} }
log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash) log.Info("consensus broken",
"currentConsensusBlockNumber", currentConsensusBlockNumber,
"proposedBlock", proposedBlock,
"proposedBlockHash", proposedBlockHash)
}
// update tracker
cp.tracker.SetLatestBlockNumber(proposedBlock)
cp.tracker.SetSafeBlockNumber(lowestSafeBlock)
cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock)
// update consensus group
group := make([]*Backend, 0, len(candidates))
consensusBackendsNames := make([]string, 0, len(candidates))
filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
for _, be := range cp.backendGroup.Backends {
_, exist := candidates[be]
if exist {
group = append(group, be)
consensusBackendsNames = append(consensusBackendsNames, be.Name)
} else {
filteredBackendsNames = append(filteredBackendsNames, be.Name)
}
} }
cp.tracker.SetConsensusBlockNumber(proposedBlock)
cp.consensusGroupMux.Lock() cp.consensusGroupMux.Lock()
cp.consensusGroup = consensusBackends cp.consensusGroup = group
cp.consensusGroupMux.Unlock() cp.consensusGroupMux.Unlock()
RecordGroupConsensusLatestBlock(cp.backendGroup, proposedBlock) RecordGroupConsensusLatestBlock(cp.backendGroup, proposedBlock)
RecordGroupConsensusCount(cp.backendGroup, len(consensusBackends)) RecordGroupConsensusSafeBlock(cp.backendGroup, lowestSafeBlock)
RecordGroupConsensusFinalizedBlock(cp.backendGroup, lowestFinalizedBlock)
RecordGroupConsensusCount(cp.backendGroup, len(group))
RecordGroupConsensusFilteredCount(cp.backendGroup, len(filteredBackendsNames)) RecordGroupConsensusFilteredCount(cp.backendGroup, len(filteredBackendsNames))
RecordGroupTotalCount(cp.backendGroup, len(cp.backendGroup.Backends)) RecordGroupTotalCount(cp.backendGroup, len(cp.backendGroup.Backends))
log.Debug("group state", "proposedBlock", proposedBlock, "consensusBackends", strings.Join(consensusBackendsNames, ", "), "filteredBackends", strings.Join(filteredBackendsNames, ", ")) log.Debug("group state",
"proposedBlock", proposedBlock,
"consensusBackends", strings.Join(consensusBackendsNames, ", "),
"filteredBackends", strings.Join(filteredBackendsNames, ", "))
} }
// IsBanned checks if a specific backend is banned // IsBanned checks if a specific backend is banned
@ -428,7 +482,7 @@ func (cp *ConsensusPoller) IsBanned(be *Backend) bool {
bs := cp.backendState[be] bs := cp.backendState[be]
defer bs.backendStateMux.Unlock() defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
return time.Now().Before(bs.bannedUntil) return bs.IsBanned()
} }
// Ban bans a specific backend // Ban bans a specific backend
@ -437,19 +491,29 @@ func (cp *ConsensusPoller) Ban(be *Backend) {
defer bs.backendStateMux.Unlock() defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
bs.bannedUntil = time.Now().Add(cp.banPeriod) bs.bannedUntil = time.Now().Add(cp.banPeriod)
// when we ban a node, we give it the chance to start from any block when it is back
bs.latestBlockNumber = 0
bs.safeBlockNumber = 0
bs.finalizedBlockNumber = 0
} }
// Unban remove any bans from the backends // Unban removes any bans from the backends
func (cp *ConsensusPoller) Unban() { func (cp *ConsensusPoller) Unban(be *Backend) {
for _, be := range cp.backendGroup.Backends {
bs := cp.backendState[be] bs := cp.backendState[be]
defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
bs.bannedUntil = time.Now().Add(-10 * time.Hour) bs.bannedUntil = time.Now().Add(-10 * time.Hour)
bs.backendStateMux.Unlock() }
// Reset reset all backend states
func (cp *ConsensusPoller) Reset() {
for _, be := range cp.backendGroup.Backends {
cp.backendState[be] = &backendState{}
} }
} }
// fetchBlock Convenient wrapper to make a request to get a block directly from the backend // fetchBlock is a convenient wrapper to make a request to get a block directly from the backend
func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) { func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) {
var rpcRes RPCRes var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_getBlockByNumber", block, false) err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_getBlockByNumber", block, false)
@ -467,7 +531,7 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st
return return
} }
// getPeerCount Convenient wrapper to retrieve the current peer count from the backend // getPeerCount is a convenient wrapper to retrieve the current peer count from the backend
func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count uint64, err error) { func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count uint64, err error) {
var rpcRes RPCRes var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "net_peerCount") err = be.ForwardRPC(ctx, &rpcRes, "67", "net_peerCount")
@ -512,29 +576,97 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo
return res, nil return res, nil
} }
func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) { // getBackendState creates a copy of backend state so that the caller can use it without locking
func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState {
bs := cp.backendState[be] bs := cp.backendState[be]
defer bs.backendStateMux.Unlock() defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
peerCount = bs.peerCount
inSync = bs.inSync return &backendState{
blockNumber = bs.latestBlockNumber latestBlockNumber: bs.latestBlockNumber,
blockHash = bs.latestBlockHash latestBlockHash: bs.latestBlockHash,
lastUpdate = bs.lastUpdate safeBlockNumber: bs.safeBlockNumber,
bannedUntil = bs.bannedUntil finalizedBlockNumber: bs.finalizedBlockNumber,
return peerCount: bs.peerCount,
inSync: bs.inSync,
lastUpdate: bs.lastUpdate,
bannedUntil: bs.bannedUntil,
}
} }
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) { func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool,
latestBlockNumber hexutil.Uint64, latestBlockHash string,
finalizedBlockNumber hexutil.Uint64,
safeBlockNumber hexutil.Uint64) bool {
bs := cp.backendState[be] bs := cp.backendState[be]
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
changed = bs.latestBlockHash != blockHash changed := bs.latestBlockHash != latestBlockHash
bs.peerCount = peerCount bs.peerCount = peerCount
bs.inSync = inSync bs.inSync = inSync
bs.latestBlockNumber = blockNumber bs.latestBlockNumber = latestBlockNumber
bs.latestBlockHash = blockHash bs.latestBlockHash = latestBlockHash
updateDelay = time.Since(bs.lastUpdate) bs.finalizedBlockNumber = finalizedBlockNumber
bs.safeBlockNumber = safeBlockNumber
bs.lastUpdate = time.Now() bs.lastUpdate = time.Now()
bs.backendStateMux.Unlock() bs.backendStateMux.Unlock()
return return changed
}
// getConsensusCandidates find out what backends are the candidates to be in the consensus group
// and create a copy of current their state
//
// a candidate is a serving node within the following conditions:
// - not banned
// - healthy (network latency and error rate)
// - with minimum peer count
// - in sync
// - updated recently
// - not lagging latest block
func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState {
candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends))
for _, be := range cp.backendGroup.Backends {
bs := cp.getBackendState(be)
if bs.IsBanned() {
continue
}
if !be.IsHealthy() {
continue
}
if !be.skipPeerCountCheck && bs.peerCount < cp.minPeerCount {
continue
}
if !bs.inSync {
continue
}
if bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue
}
candidates[be] = bs
}
// find the highest block, in order to use it defining the highest non-lagging ancestor block
var highestLatestBlock hexutil.Uint64
for _, bs := range candidates {
if bs.latestBlockNumber > highestLatestBlock {
highestLatestBlock = bs.latestBlockNumber
}
}
// find the highest common ancestor block
lagging := make([]*Backend, 0, len(candidates))
for be, bs := range candidates {
// check if backend is lagging behind the highest block
if bs.latestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag {
lagging = append(lagging, be)
}
}
// remove lagging backends from the candidates
for _, be := range lagging {
delete(candidates, be)
}
return candidates
} }

@ -13,35 +13,68 @@ import (
// ConsensusTracker abstracts how we store and retrieve the current consensus // ConsensusTracker abstracts how we store and retrieve the current consensus
// allowing it to be stored locally in-memory or in a shared Redis cluster // allowing it to be stored locally in-memory or in a shared Redis cluster
type ConsensusTracker interface { type ConsensusTracker interface {
GetConsensusBlockNumber() hexutil.Uint64 GetLatestBlockNumber() hexutil.Uint64
SetConsensusBlockNumber(blockNumber hexutil.Uint64) SetLatestBlockNumber(blockNumber hexutil.Uint64)
GetFinalizedBlockNumber() hexutil.Uint64
SetFinalizedBlockNumber(blockNumber hexutil.Uint64)
GetSafeBlockNumber() hexutil.Uint64
SetSafeBlockNumber(blockNumber hexutil.Uint64)
} }
// InMemoryConsensusTracker store and retrieve in memory, async-safe // InMemoryConsensusTracker store and retrieve in memory, async-safe
type InMemoryConsensusTracker struct { type InMemoryConsensusTracker struct {
consensusBlockNumber hexutil.Uint64 latestBlockNumber hexutil.Uint64
finalizedBlockNumber hexutil.Uint64
safeBlockNumber hexutil.Uint64
mutex sync.Mutex mutex sync.Mutex
} }
func NewInMemoryConsensusTracker() ConsensusTracker { func NewInMemoryConsensusTracker() ConsensusTracker {
return &InMemoryConsensusTracker{ return &InMemoryConsensusTracker{
consensusBlockNumber: 0,
mutex: sync.Mutex{}, mutex: sync.Mutex{},
} }
} }
func (ct *InMemoryConsensusTracker) GetConsensusBlockNumber() hexutil.Uint64 { func (ct *InMemoryConsensusTracker) GetLatestBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock() defer ct.mutex.Unlock()
ct.mutex.Lock() ct.mutex.Lock()
return ct.consensusBlockNumber return ct.latestBlockNumber
} }
func (ct *InMemoryConsensusTracker) SetConsensusBlockNumber(blockNumber hexutil.Uint64) { func (ct *InMemoryConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock() defer ct.mutex.Unlock()
ct.mutex.Lock() ct.mutex.Lock()
ct.consensusBlockNumber = blockNumber ct.latestBlockNumber = blockNumber
}
func (ct *InMemoryConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()
return ct.finalizedBlockNumber
}
func (ct *InMemoryConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()
ct.finalizedBlockNumber = blockNumber
}
func (ct *InMemoryConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()
return ct.safeBlockNumber
}
func (ct *InMemoryConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()
ct.safeBlockNumber = blockNumber
} }
// RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe // RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe
@ -59,14 +92,29 @@ func NewRedisConsensusTracker(ctx context.Context, r *redis.Client, namespace st
} }
} }
func (ct *RedisConsensusTracker) key() string { func (ct *RedisConsensusTracker) key(tag string) string {
return fmt.Sprintf("consensus_latest_block:%s", ct.backendGroup) return fmt.Sprintf("consensus:%s:%s", ct.backendGroup, tag)
} }
func (ct *RedisConsensusTracker) GetConsensusBlockNumber() hexutil.Uint64 { func (ct *RedisConsensusTracker) GetLatestBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key()).Val())) return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("latest")).Val()))
} }
func (ct *RedisConsensusTracker) SetConsensusBlockNumber(blockNumber hexutil.Uint64) { func (ct *RedisConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key(), blockNumber, 0) ct.client.Set(ct.ctx, ct.key("latest"), blockNumber, 0)
}
func (ct *RedisConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("finalized")).Val()))
}
func (ct *RedisConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key("finalized"), blockNumber, 0)
}
func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("safe")).Val()))
}
func (ct *RedisConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key("safe"), blockNumber, 0)
} }

@ -93,8 +93,8 @@ backends = ["infura"]
# consensus_ban_period = "1m" # consensus_ban_period = "1m"
# Maximum delay for update the backend, default 30s # Maximum delay for update the backend, default 30s
# consensus_max_update_threshold = "20s" # consensus_max_update_threshold = "20s"
# Maximum block lag, default 50 # Maximum block lag, default 8
# consensus_max_block_lag = 10 # consensus_max_block_lag = 16
# Minimum peer count, default 3 # Minimum peer count, default 3
# consensus_min_peer_count = 4 # consensus_min_peer_count = 4

File diff suppressed because it is too large Load Diff

@ -18,7 +18,7 @@ consensus_aware = true
consensus_handler = "noop" # allow more control over the consensus poller for tests consensus_handler = "noop" # allow more control over the consensus poller for tests
consensus_ban_period = "1m" consensus_ban_period = "1m"
consensus_max_update_threshold = "2m" consensus_max_update_threshold = "2m"
consensus_max_block_lag = 50 consensus_max_block_lag = 8
consensus_min_peer_count = 4 consensus_min_peer_count = 4
[rpc_method_mappings] [rpc_method_mappings]

@ -26,40 +26,161 @@
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 67, "id": 67,
"result": { "result": {
"hash": "hash1", "hash": "hash_0x101",
"number": "0x1" "number": "0x101"
} }
} }
- method: eth_getBlockByNumber - method: eth_getBlockByNumber
block: 0x1 block: 0x101
response: > response: >
{ {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 67, "id": 67,
"result": { "result": {
"hash": "hash1", "hash": "hash_0x101",
"number": "0x1" "number": "0x101"
} }
} }
- method: eth_getBlockByNumber - method: eth_getBlockByNumber
block: 0x2 block: 0x102
response: > response: >
{ {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 67, "id": 67,
"result": { "result": {
"hash": "hash2", "hash": "hash_0x102",
"number": "0x2" "number": "0x102"
} }
} }
- method: eth_getBlockByNumber - method: eth_getBlockByNumber
block: 0x3 block: 0x103
response: > response: >
{ {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 67, "id": 67,
"result": { "result": {
"hash": "hash3", "hash": "hash_0x103",
"number": "0x3" "number": "0x103"
}
}
- method: eth_getBlockByNumber
block: 0x10a
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x10a",
"number": "0x10a"
}
}
- method: eth_getBlockByNumber
block: 0x132
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x132",
"number": "0x132"
}
}
- method: eth_getBlockByNumber
block: 0x133
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x133",
"number": "0x133"
}
}
- method: eth_getBlockByNumber
block: 0x134
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x134",
"number": "0x134"
}
}
- method: eth_getBlockByNumber
block: 0x200
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x200",
"number": "0x200"
}
}
- method: eth_getBlockByNumber
block: 0x91
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x91",
"number": "0x91"
}
}
- method: eth_getBlockByNumber
block: safe
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xe1",
"number": "0xe1"
}
}
- method: eth_getBlockByNumber
block: 0xe1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xe1",
"number": "0xe1"
}
}
- method: eth_getBlockByNumber
block: finalized
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xc1",
"number": "0xc1"
}
}
- method: eth_getBlockByNumber
block: 0xc1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xc1",
"number": "0xc1"
}
}
- method: eth_getBlockByNumber
block: 0xd1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xd1",
"number": "0xd1"
} }
} }

@ -246,6 +246,22 @@ var (
"backend_group_name", "backend_group_name",
}) })
consensusSafeBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "group_consensus_safe_block",
Help: "Consensus safe block",
}, []string{
"backend_group_name",
})
consensusFinalizedBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "group_consensus_finalized_block",
Help: "Consensus finalized block",
}, []string{
"backend_group_name",
})
backendLatestBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ backendLatestBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Name: "backend_latest_block", Name: "backend_latest_block",
@ -254,6 +270,30 @@ var (
"backend_name", "backend_name",
}) })
backendSafeBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_safe_block",
Help: "Current safe block observed per backend",
}, []string{
"backend_name",
})
backendFinalizedBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_finalized_block",
Help: "Current finalized block observed per backend",
}, []string{
"backend_name",
})
backendUnexpectedBlockTagsBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_unexpected_block_tags",
Help: "Bool gauge for unexpected block tags",
}, []string{
"backend_name",
})
consensusGroupCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ consensusGroupCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Name: "group_consensus_count", Name: "group_consensus_count",
@ -318,18 +358,10 @@ var (
"backend_name", "backend_name",
}) })
networkErrorCountBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ networkErrorRateBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Name: "backend_net_error_count", Name: "backend_error_rate",
Help: "Network error count per backend", Help: "Request error rate per backend",
}, []string{
"backend_name",
})
requestCountBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_request_count",
Help: "Request count per backend",
}, []string{ }, []string{
"backend_name", "backend_name",
}) })
@ -402,6 +434,14 @@ func RecordGroupConsensusLatestBlock(group *BackendGroup, blockNumber hexutil.Ui
consensusLatestBlock.WithLabelValues(group.Name).Set(float64(blockNumber)) consensusLatestBlock.WithLabelValues(group.Name).Set(float64(blockNumber))
} }
func RecordGroupConsensusSafeBlock(group *BackendGroup, blockNumber hexutil.Uint64) {
consensusSafeBlock.WithLabelValues(group.Name).Set(float64(blockNumber))
}
func RecordGroupConsensusFinalizedBlock(group *BackendGroup, blockNumber hexutil.Uint64) {
consensusFinalizedBlock.WithLabelValues(group.Name).Set(float64(blockNumber))
}
func RecordGroupConsensusCount(group *BackendGroup, count int) { func RecordGroupConsensusCount(group *BackendGroup, count int) {
consensusGroupCount.WithLabelValues(group.Name).Set(float64(count)) consensusGroupCount.WithLabelValues(group.Name).Set(float64(count))
} }
@ -418,12 +458,20 @@ func RecordBackendLatestBlock(b *Backend, blockNumber hexutil.Uint64) {
backendLatestBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber)) backendLatestBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber))
} }
func RecordBackendSafeBlock(b *Backend, blockNumber hexutil.Uint64) {
backendSafeBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber))
}
func RecordBackendFinalizedBlock(b *Backend, blockNumber hexutil.Uint64) {
backendFinalizedBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber))
}
func RecordBackendUnexpectedBlockTags(b *Backend, unexpected bool) {
backendUnexpectedBlockTagsBackend.WithLabelValues(b.Name).Set(boolToFloat64(unexpected))
}
func RecordConsensusBackendBanned(b *Backend, banned bool) { func RecordConsensusBackendBanned(b *Backend, banned bool) {
v := float64(0) consensusBannedBackends.WithLabelValues(b.Name).Set(boolToFloat64(banned))
if banned {
v = float64(1)
}
consensusBannedBackends.WithLabelValues(b.Name).Set(v)
} }
func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) { func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) {
@ -431,11 +479,7 @@ func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) {
} }
func RecordConsensusBackendInSync(b *Backend, inSync bool) { func RecordConsensusBackendInSync(b *Backend, inSync bool) {
v := float64(0) consensusInSyncBackend.WithLabelValues(b.Name).Set(boolToFloat64(inSync))
if inSync {
v = float64(1)
}
consensusInSyncBackend.WithLabelValues(b.Name).Set(v)
} }
func RecordConsensusBackendUpdateDelay(b *Backend, delay time.Duration) { func RecordConsensusBackendUpdateDelay(b *Backend, delay time.Duration) {
@ -446,10 +490,13 @@ func RecordBackendNetworkLatencyAverageSlidingWindow(b *Backend, avgLatency time
avgLatencyBackend.WithLabelValues(b.Name).Set(float64(avgLatency.Milliseconds())) avgLatencyBackend.WithLabelValues(b.Name).Set(float64(avgLatency.Milliseconds()))
} }
func RecordBackendNetworkRequestCountSlidingWindow(b *Backend, count uint) { func RecordBackendNetworkErrorRateSlidingWindow(b *Backend, rate float64) {
requestCountBackend.WithLabelValues(b.Name).Set(float64(count)) networkErrorRateBackend.WithLabelValues(b.Name).Set(rate)
} }
func RecordBackendNetworkErrorCountSlidingWindow(b *Backend, count uint) { func boolToFloat64(b bool) float64 {
networkErrorCountBackend.WithLabelValues(b.Name).Set(float64(count)) if b {
return 1
}
return 0
} }

@ -10,6 +10,8 @@ import (
type RewriteContext struct { type RewriteContext struct {
latest hexutil.Uint64 latest hexutil.Uint64
safe hexutil.Uint64
finalized hexutil.Uint64
} }
type RewriteResult uint8 type RewriteResult uint8
@ -180,11 +182,13 @@ func rewriteTag(rctx RewriteContext, current string) (string, bool, error) {
} }
switch *bnh.BlockNumber { switch *bnh.BlockNumber {
case rpc.SafeBlockNumber, case rpc.PendingBlockNumber,
rpc.FinalizedBlockNumber,
rpc.PendingBlockNumber,
rpc.EarliestBlockNumber: rpc.EarliestBlockNumber:
return current, false, nil return current, false, nil
case rpc.FinalizedBlockNumber:
return rctx.finalized.String(), true, nil
case rpc.SafeBlockNumber:
return rctx.safe.String(), true, nil
case rpc.LatestBlockNumber: case rpc.LatestBlockNumber:
return rctx.latest.String(), true, nil return rctx.latest.String(), true, nil
default: default:

@ -326,33 +326,33 @@ func TestRewriteRequest(t *testing.T) {
{ {
name: "eth_getBlockByNumber finalized", name: "eth_getBlockByNumber finalized",
args: args{ args: args{
rctx: RewriteContext{latest: hexutil.Uint64(100)}, rctx: RewriteContext{latest: hexutil.Uint64(100), finalized: hexutil.Uint64(55)},
req: &RPCReq{Method: "eth_getBlockByNumber", Params: mustMarshalJSON([]string{"finalized"})}, req: &RPCReq{Method: "eth_getBlockByNumber", Params: mustMarshalJSON([]string{"finalized"})},
res: nil, res: nil,
}, },
expected: RewriteNone, expected: RewriteOverrideRequest,
check: func(t *testing.T, args args) { check: func(t *testing.T, args args) {
var p []string var p []string
err := json.Unmarshal(args.req.Params, &p) err := json.Unmarshal(args.req.Params, &p)
require.Nil(t, err) require.Nil(t, err)
require.Equal(t, 1, len(p)) require.Equal(t, 1, len(p))
require.Equal(t, "finalized", p[0]) require.Equal(t, hexutil.Uint64(55).String(), p[0])
}, },
}, },
{ {
name: "eth_getBlockByNumber safe", name: "eth_getBlockByNumber safe",
args: args{ args: args{
rctx: RewriteContext{latest: hexutil.Uint64(100)}, rctx: RewriteContext{latest: hexutil.Uint64(100), safe: hexutil.Uint64(50)},
req: &RPCReq{Method: "eth_getBlockByNumber", Params: mustMarshalJSON([]string{"safe"})}, req: &RPCReq{Method: "eth_getBlockByNumber", Params: mustMarshalJSON([]string{"safe"})},
res: nil, res: nil,
}, },
expected: RewriteNone, expected: RewriteOverrideRequest,
check: func(t *testing.T, args args) { check: func(t *testing.T, args args) {
var p []string var p []string
err := json.Unmarshal(args.req.Params, &p) err := json.Unmarshal(args.req.Params, &p)
require.Nil(t, err) require.Nil(t, err)
require.Equal(t, 1, len(p)) require.Equal(t, 1, len(p))
require.Equal(t, "safe", p[0]) require.Equal(t, hexutil.Uint64(50).String(), p[0])
}, },
}, },
{ {

@ -95,7 +95,7 @@ func (mh *MockedHandler) Handler(w http.ResponseWriter, req *http.Request) {
resBody := "" resBody := ""
if batched { if batched {
resBody = "[" + strings.Join(responses, ",") + "]" resBody = "[" + strings.Join(responses, ",") + "]"
} else { } else if len(responses) > 0 {
resBody = responses[0] resBody = responses[0]
} }