Merge pull request #5805 from ethereum-optimism/felipe/small-fixes-05-27

refactor(proxyd): order params chronologically: latest, safe, finalized + small fixes
This commit is contained in:
OptimismBot 2023-05-29 13:20:18 -04:00 committed by GitHub
commit d7d4c3f569
5 changed files with 64 additions and 64 deletions

@ -564,8 +564,8 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
// We also rewrite block tags to enforce compliance with consensus // We also rewrite block tags to enforce compliance with consensus
rctx := RewriteContext{ rctx := RewriteContext{
latest: bg.Consensus.GetLatestBlockNumber(), latest: bg.Consensus.GetLatestBlockNumber(),
finalized: bg.Consensus.GetFinalizedBlockNumber(),
safe: bg.Consensus.GetSafeBlockNumber(), safe: bg.Consensus.GetSafeBlockNumber(),
finalized: bg.Consensus.GetFinalizedBlockNumber(),
} }
for i, req := range rpcReqs { for i, req := range rpcReqs {

@ -43,11 +43,10 @@ type ConsensusPoller struct {
type backendState struct { type backendState struct {
backendStateMux sync.Mutex backendStateMux sync.Mutex
latestBlockNumber hexutil.Uint64 latestBlockNumber hexutil.Uint64
latestBlockHash string latestBlockHash string
finalizedBlockNumber hexutil.Uint64
safeBlockNumber hexutil.Uint64 safeBlockNumber hexutil.Uint64
finalizedBlockNumber hexutil.Uint64
peerCount uint64 peerCount uint64
inSync bool inSync bool
@ -77,16 +76,16 @@ func (ct *ConsensusPoller) GetLatestBlockNumber() hexutil.Uint64 {
return ct.tracker.GetLatestBlockNumber() return ct.tracker.GetLatestBlockNumber()
} }
// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus
func (ct *ConsensusPoller) GetFinalizedBlockNumber() hexutil.Uint64 {
return ct.tracker.GetFinalizedBlockNumber()
}
// GetSafeBlockNumber returns the `safe` agreed block number in a consensus // GetSafeBlockNumber returns the `safe` agreed block number in a consensus
func (ct *ConsensusPoller) GetSafeBlockNumber() hexutil.Uint64 { func (ct *ConsensusPoller) GetSafeBlockNumber() hexutil.Uint64 {
return ct.tracker.GetSafeBlockNumber() return ct.tracker.GetSafeBlockNumber()
} }
// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus
func (ct *ConsensusPoller) GetFinalizedBlockNumber() hexutil.Uint64 {
return ct.tracker.GetFinalizedBlockNumber()
}
func (cp *ConsensusPoller) Shutdown() { func (cp *ConsensusPoller) Shutdown() {
cp.asyncHandler.Shutdown() cp.asyncHandler.Shutdown()
} }
@ -212,9 +211,6 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
ctx, cancelFunc := context.WithCancel(context.Background()) ctx, cancelFunc := context.WithCancel(context.Background())
state := make(map[*Backend]*backendState, len(bg.Backends)) state := make(map[*Backend]*backendState, len(bg.Backends))
for _, be := range bg.Backends {
state[be] = &backendState{}
}
cp := &ConsensusPoller{ cp := &ConsensusPoller{
cancelFunc: cancelFunc, cancelFunc: cancelFunc,
@ -239,6 +235,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
cp.asyncHandler = NewPollerAsyncHandler(ctx, cp) cp.asyncHandler = NewPollerAsyncHandler(ctx, cp)
} }
cp.Reset()
cp.asyncHandler.Init() cp.asyncHandler.Init()
return cp return cp
@ -291,15 +288,11 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
log.Warn("error updating backend - finalized block", "name", be.Name, "err", err) log.Warn("error updating backend - finalized block", "name", be.Name, "err", err)
} }
oldFinalized := bs.finalizedBlockNumber RecordConsensusBackendUpdateDelay(be, bs.lastUpdate)
oldSafe := bs.safeBlockNumber
updateDelay := time.Since(bs.lastUpdate)
RecordConsensusBackendUpdateDelay(be, updateDelay)
changed := cp.setBackendState(be, peerCount, inSync, changed := cp.setBackendState(be, peerCount, inSync,
latestBlockNumber, latestBlockHash, latestBlockNumber, latestBlockHash,
finalizedBlockNumber, safeBlockNumber) safeBlockNumber, finalizedBlockNumber)
RecordBackendLatestBlock(be, latestBlockNumber) RecordBackendLatestBlock(be, latestBlockNumber)
RecordBackendSafeBlock(be, safeBlockNumber) RecordBackendSafeBlock(be, safeBlockNumber)
@ -312,25 +305,25 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
"inSync", inSync, "inSync", inSync,
"latestBlockNumber", latestBlockNumber, "latestBlockNumber", latestBlockNumber,
"latestBlockHash", latestBlockHash, "latestBlockHash", latestBlockHash,
"finalizedBlockNumber", finalizedBlockNumber,
"safeBlockNumber", safeBlockNumber, "safeBlockNumber", safeBlockNumber,
"updateDelay", updateDelay) "finalizedBlockNumber", finalizedBlockNumber,
"lastUpdate", bs.lastUpdate)
} }
// sanity check for latest, safe and finalized block tags // sanity check for latest, safe and finalized block tags
expectedBlockTags := cp.checkExpectedBlockTags( expectedBlockTags := cp.checkExpectedBlockTags(
finalizedBlockNumber, oldFinalized, latestBlockNumber,
safeBlockNumber, oldSafe, bs.safeBlockNumber, safeBlockNumber,
latestBlockNumber) bs.finalizedBlockNumber, finalizedBlockNumber)
RecordBackendUnexpectedBlockTags(be, !expectedBlockTags) RecordBackendUnexpectedBlockTags(be, !expectedBlockTags)
if !expectedBlockTags { if !expectedBlockTags {
log.Warn("backend banned - unexpected block tags", log.Warn("backend banned - unexpected block tags",
"backend", be.Name, "backend", be.Name,
"oldFinalized", oldFinalized, "oldFinalized", bs.finalizedBlockNumber,
"finalizedBlockNumber", finalizedBlockNumber, "finalizedBlockNumber", finalizedBlockNumber,
"oldSafe", oldSafe, "oldSafe", bs.safeBlockNumber,
"safeBlockNumber", safeBlockNumber, "safeBlockNumber", safeBlockNumber,
"latestBlockNumber", latestBlockNumber, "latestBlockNumber", latestBlockNumber,
) )
@ -342,9 +335,10 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
// - finalized block number should never decrease // - finalized block number should never decrease
// - safe block number should never decrease // - safe block number should never decrease
// - finalized block should be <= safe block <= latest block // - finalized block should be <= safe block <= latest block
func (cp *ConsensusPoller) checkExpectedBlockTags(currentFinalized hexutil.Uint64, oldFinalized hexutil.Uint64, func (cp *ConsensusPoller) checkExpectedBlockTags(
currentSafe hexutil.Uint64, oldSafe hexutil.Uint64, currentLatest hexutil.Uint64,
currentLatest hexutil.Uint64) bool { oldSafe hexutil.Uint64, currentSafe hexutil.Uint64,
oldFinalized hexutil.Uint64, currentFinalized hexutil.Uint64) bool {
return currentFinalized >= oldFinalized && return currentFinalized >= oldFinalized &&
currentSafe >= oldSafe && currentSafe >= oldSafe &&
currentFinalized <= currentSafe && currentFinalized <= currentSafe &&
@ -596,8 +590,8 @@ func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState {
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool,
latestBlockNumber hexutil.Uint64, latestBlockHash string, latestBlockNumber hexutil.Uint64, latestBlockHash string,
finalizedBlockNumber hexutil.Uint64, safeBlockNumber hexutil.Uint64,
safeBlockNumber hexutil.Uint64) bool { finalizedBlockNumber hexutil.Uint64) bool {
bs := cp.backendState[be] bs := cp.backendState[be]
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
changed := bs.latestBlockHash != latestBlockHash changed := bs.latestBlockHash != latestBlockHash
@ -658,7 +652,7 @@ func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState {
lagging := make([]*Backend, 0, len(candidates)) lagging := make([]*Backend, 0, len(candidates))
for be, bs := range candidates { for be, bs := range candidates {
// check if backend is lagging behind the highest block // check if backend is lagging behind the highest block
if bs.latestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag { if uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag {
lagging = append(lagging, be) lagging = append(lagging, be)
} }
} }

@ -15,17 +15,17 @@ import (
type ConsensusTracker interface { type ConsensusTracker interface {
GetLatestBlockNumber() hexutil.Uint64 GetLatestBlockNumber() hexutil.Uint64
SetLatestBlockNumber(blockNumber hexutil.Uint64) SetLatestBlockNumber(blockNumber hexutil.Uint64)
GetFinalizedBlockNumber() hexutil.Uint64
SetFinalizedBlockNumber(blockNumber hexutil.Uint64)
GetSafeBlockNumber() hexutil.Uint64 GetSafeBlockNumber() hexutil.Uint64
SetSafeBlockNumber(blockNumber hexutil.Uint64) SetSafeBlockNumber(blockNumber hexutil.Uint64)
GetFinalizedBlockNumber() hexutil.Uint64
SetFinalizedBlockNumber(blockNumber hexutil.Uint64)
} }
// InMemoryConsensusTracker store and retrieve in memory, async-safe // InMemoryConsensusTracker store and retrieve in memory, async-safe
type InMemoryConsensusTracker struct { type InMemoryConsensusTracker struct {
latestBlockNumber hexutil.Uint64 latestBlockNumber hexutil.Uint64
finalizedBlockNumber hexutil.Uint64
safeBlockNumber hexutil.Uint64 safeBlockNumber hexutil.Uint64
finalizedBlockNumber hexutil.Uint64
mutex sync.Mutex mutex sync.Mutex
} }
@ -49,20 +49,6 @@ func (ct *InMemoryConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uin
ct.latestBlockNumber = blockNumber ct.latestBlockNumber = blockNumber
} }
func (ct *InMemoryConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()
return ct.finalizedBlockNumber
}
func (ct *InMemoryConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()
ct.finalizedBlockNumber = blockNumber
}
func (ct *InMemoryConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 { func (ct *InMemoryConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock() defer ct.mutex.Unlock()
ct.mutex.Lock() ct.mutex.Lock()
@ -77,6 +63,20 @@ func (ct *InMemoryConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint6
ct.safeBlockNumber = blockNumber ct.safeBlockNumber = blockNumber
} }
func (ct *InMemoryConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()
return ct.finalizedBlockNumber
}
func (ct *InMemoryConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()
ct.finalizedBlockNumber = blockNumber
}
// RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe // RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe
type RedisConsensusTracker struct { type RedisConsensusTracker struct {
ctx context.Context ctx context.Context
@ -104,13 +104,6 @@ func (ct *RedisConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64
ct.client.Set(ct.ctx, ct.key("latest"), blockNumber, 0) ct.client.Set(ct.ctx, ct.key("latest"), blockNumber, 0)
} }
func (ct *RedisConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("finalized")).Val()))
}
func (ct *RedisConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key("finalized"), blockNumber, 0)
}
func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 { func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("safe")).Val())) return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("safe")).Val()))
} }
@ -118,3 +111,11 @@ func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
func (ct *RedisConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) { func (ct *RedisConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key("safe"), blockNumber, 0) ct.client.Set(ct.ctx, ct.key("safe"), blockNumber, 0)
} }
func (ct *RedisConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("finalized")).Val()))
}
func (ct *RedisConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key("finalized"), blockNumber, 0)
}

@ -267,8 +267,8 @@ func TestConsensus(t *testing.T) {
overrideBlock("node2", "safe", "0xe2") overrideBlock("node2", "safe", "0xe2")
update() update()
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
}) })
t.Run("advance safe and finalized", func(t *testing.T) { t.Run("advance safe and finalized", func(t *testing.T) {
@ -279,8 +279,8 @@ func TestConsensus(t *testing.T) {
overrideBlock("node2", "safe", "0xe2") overrideBlock("node2", "safe", "0xe2")
update() update()
require.Equal(t, "0xc2", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xe2", bg.Consensus.GetSafeBlockNumber().String()) require.Equal(t, "0xe2", bg.Consensus.GetSafeBlockNumber().String())
require.Equal(t, "0xc2", bg.Consensus.GetFinalizedBlockNumber().String())
}) })
t.Run("ban backend if error rate is too high", func(t *testing.T) { t.Run("ban backend if error rate is too high", func(t *testing.T) {
@ -317,8 +317,8 @@ func TestConsensus(t *testing.T) {
overrideBlock("node1", "safe", "0xa1") overrideBlock("node1", "safe", "0xa1")
update() update()
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
consensusGroup := bg.Consensus.GetConsensusGroup() consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend) require.NotContains(t, consensusGroup, nodes["node1"].backend)
@ -349,8 +349,8 @@ func TestConsensus(t *testing.T) {
update() update()
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
consensusGroup := bg.Consensus.GetConsensusGroup() consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend) require.NotContains(t, consensusGroup, nodes["node1"].backend)
@ -365,8 +365,8 @@ func TestConsensus(t *testing.T) {
update() update()
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
consensusGroup := bg.Consensus.GetConsensusGroup() consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend) require.NotContains(t, consensusGroup, nodes["node1"].backend)
@ -397,8 +397,8 @@ func TestConsensus(t *testing.T) {
require.Equal(t, 1, len(consensusGroup)) require.Equal(t, 1, len(consensusGroup))
require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String()) require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String()) require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String())
require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String())
}) })
t.Run("latest dropped below safe, then recovered", func(t *testing.T) { t.Run("latest dropped below safe, then recovered", func(t *testing.T) {
@ -424,8 +424,8 @@ func TestConsensus(t *testing.T) {
require.Equal(t, 1, len(consensusGroup)) require.Equal(t, 1, len(consensusGroup))
require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String()) require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String()) require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String())
require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String())
}) })
t.Run("latest dropped below safe, and stayed inconsistent", func(t *testing.T) { t.Run("latest dropped below safe, and stayed inconsistent", func(t *testing.T) {

@ -482,7 +482,12 @@ func RecordConsensusBackendInSync(b *Backend, inSync bool) {
consensusInSyncBackend.WithLabelValues(b.Name).Set(boolToFloat64(inSync)) consensusInSyncBackend.WithLabelValues(b.Name).Set(boolToFloat64(inSync))
} }
func RecordConsensusBackendUpdateDelay(b *Backend, delay time.Duration) { func RecordConsensusBackendUpdateDelay(b *Backend, lastUpdate time.Time) {
// avoid recording the delay for the first update
if lastUpdate.IsZero() {
return
}
delay := time.Since(lastUpdate)
consensusUpdateDelayBackend.WithLabelValues(b.Name).Set(float64(delay.Milliseconds())) consensusUpdateDelayBackend.WithLabelValues(b.Name).Set(float64(delay.Milliseconds()))
} }