diff --git a/proxyd/proxyd/backend.go b/proxyd/proxyd/backend.go index 6ba7a10..07c9fa2 100644 --- a/proxyd/proxyd/backend.go +++ b/proxyd/proxyd/backend.go @@ -564,8 +564,8 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch // We also rewrite block tags to enforce compliance with consensus rctx := RewriteContext{ latest: bg.Consensus.GetLatestBlockNumber(), - finalized: bg.Consensus.GetFinalizedBlockNumber(), safe: bg.Consensus.GetSafeBlockNumber(), + finalized: bg.Consensus.GetFinalizedBlockNumber(), } for i, req := range rpcReqs { diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index 07edc31..5944fa7 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -43,11 +43,10 @@ type ConsensusPoller struct { type backendState struct { backendStateMux sync.Mutex - latestBlockNumber hexutil.Uint64 - latestBlockHash string - - finalizedBlockNumber hexutil.Uint64 + latestBlockNumber hexutil.Uint64 + latestBlockHash string safeBlockNumber hexutil.Uint64 + finalizedBlockNumber hexutil.Uint64 peerCount uint64 inSync bool @@ -77,16 +76,16 @@ func (ct *ConsensusPoller) GetLatestBlockNumber() hexutil.Uint64 { return ct.tracker.GetLatestBlockNumber() } -// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus -func (ct *ConsensusPoller) GetFinalizedBlockNumber() hexutil.Uint64 { - return ct.tracker.GetFinalizedBlockNumber() -} - // GetSafeBlockNumber returns the `safe` agreed block number in a consensus func (ct *ConsensusPoller) GetSafeBlockNumber() hexutil.Uint64 { return ct.tracker.GetSafeBlockNumber() } +// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus +func (ct *ConsensusPoller) GetFinalizedBlockNumber() hexutil.Uint64 { + return ct.tracker.GetFinalizedBlockNumber() +} + func (cp *ConsensusPoller) Shutdown() { cp.asyncHandler.Shutdown() } @@ -212,9 +211,6 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller ctx, cancelFunc := context.WithCancel(context.Background()) state := make(map[*Backend]*backendState, len(bg.Backends)) - for _, be := range bg.Backends { - state[be] = &backendState{} - } cp := &ConsensusPoller{ cancelFunc: cancelFunc, @@ -239,6 +235,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller cp.asyncHandler = NewPollerAsyncHandler(ctx, cp) } + cp.Reset() cp.asyncHandler.Init() return cp @@ -291,15 +288,11 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { log.Warn("error updating backend - finalized block", "name", be.Name, "err", err) } - oldFinalized := bs.finalizedBlockNumber - oldSafe := bs.safeBlockNumber - - updateDelay := time.Since(bs.lastUpdate) - RecordConsensusBackendUpdateDelay(be, updateDelay) + RecordConsensusBackendUpdateDelay(be, bs.lastUpdate) changed := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash, - finalizedBlockNumber, safeBlockNumber) + safeBlockNumber, finalizedBlockNumber) RecordBackendLatestBlock(be, latestBlockNumber) RecordBackendSafeBlock(be, safeBlockNumber) @@ -312,25 +305,25 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { "inSync", inSync, "latestBlockNumber", latestBlockNumber, "latestBlockHash", latestBlockHash, - "finalizedBlockNumber", finalizedBlockNumber, "safeBlockNumber", safeBlockNumber, - "updateDelay", updateDelay) + "finalizedBlockNumber", finalizedBlockNumber, + "lastUpdate", bs.lastUpdate) } // sanity check for latest, safe and finalized block tags expectedBlockTags := cp.checkExpectedBlockTags( - finalizedBlockNumber, oldFinalized, - safeBlockNumber, oldSafe, - latestBlockNumber) + latestBlockNumber, + bs.safeBlockNumber, safeBlockNumber, + bs.finalizedBlockNumber, finalizedBlockNumber) RecordBackendUnexpectedBlockTags(be, !expectedBlockTags) if !expectedBlockTags { log.Warn("backend banned - unexpected block tags", "backend", be.Name, - "oldFinalized", oldFinalized, + "oldFinalized", bs.finalizedBlockNumber, "finalizedBlockNumber", finalizedBlockNumber, - "oldSafe", oldSafe, + "oldSafe", bs.safeBlockNumber, "safeBlockNumber", safeBlockNumber, "latestBlockNumber", latestBlockNumber, ) @@ -342,9 +335,10 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { // - finalized block number should never decrease // - safe block number should never decrease // - finalized block should be <= safe block <= latest block -func (cp *ConsensusPoller) checkExpectedBlockTags(currentFinalized hexutil.Uint64, oldFinalized hexutil.Uint64, - currentSafe hexutil.Uint64, oldSafe hexutil.Uint64, - currentLatest hexutil.Uint64) bool { +func (cp *ConsensusPoller) checkExpectedBlockTags( + currentLatest hexutil.Uint64, + oldSafe hexutil.Uint64, currentSafe hexutil.Uint64, + oldFinalized hexutil.Uint64, currentFinalized hexutil.Uint64) bool { return currentFinalized >= oldFinalized && currentSafe >= oldSafe && currentFinalized <= currentSafe && @@ -596,8 +590,8 @@ func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState { func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, latestBlockNumber hexutil.Uint64, latestBlockHash string, - finalizedBlockNumber hexutil.Uint64, - safeBlockNumber hexutil.Uint64) bool { + safeBlockNumber hexutil.Uint64, + finalizedBlockNumber hexutil.Uint64) bool { bs := cp.backendState[be] bs.backendStateMux.Lock() changed := bs.latestBlockHash != latestBlockHash @@ -658,7 +652,7 @@ func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState { lagging := make([]*Backend, 0, len(candidates)) for be, bs := range candidates { // check if backend is lagging behind the highest block - if bs.latestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag { + if uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag { lagging = append(lagging, be) } } diff --git a/proxyd/proxyd/consensus_tracker.go b/proxyd/proxyd/consensus_tracker.go index 83d2ec7..b0bcb45 100644 --- a/proxyd/proxyd/consensus_tracker.go +++ b/proxyd/proxyd/consensus_tracker.go @@ -15,17 +15,17 @@ import ( type ConsensusTracker interface { GetLatestBlockNumber() hexutil.Uint64 SetLatestBlockNumber(blockNumber hexutil.Uint64) - GetFinalizedBlockNumber() hexutil.Uint64 - SetFinalizedBlockNumber(blockNumber hexutil.Uint64) GetSafeBlockNumber() hexutil.Uint64 SetSafeBlockNumber(blockNumber hexutil.Uint64) + GetFinalizedBlockNumber() hexutil.Uint64 + SetFinalizedBlockNumber(blockNumber hexutil.Uint64) } // InMemoryConsensusTracker store and retrieve in memory, async-safe type InMemoryConsensusTracker struct { latestBlockNumber hexutil.Uint64 - finalizedBlockNumber hexutil.Uint64 safeBlockNumber hexutil.Uint64 + finalizedBlockNumber hexutil.Uint64 mutex sync.Mutex } @@ -49,20 +49,6 @@ func (ct *InMemoryConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uin ct.latestBlockNumber = blockNumber } -func (ct *InMemoryConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 { - defer ct.mutex.Unlock() - ct.mutex.Lock() - - return ct.finalizedBlockNumber -} - -func (ct *InMemoryConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) { - defer ct.mutex.Unlock() - ct.mutex.Lock() - - ct.finalizedBlockNumber = blockNumber -} - func (ct *InMemoryConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 { defer ct.mutex.Unlock() ct.mutex.Lock() @@ -77,6 +63,20 @@ func (ct *InMemoryConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint6 ct.safeBlockNumber = blockNumber } +func (ct *InMemoryConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 { + defer ct.mutex.Unlock() + ct.mutex.Lock() + + return ct.finalizedBlockNumber +} + +func (ct *InMemoryConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) { + defer ct.mutex.Unlock() + ct.mutex.Lock() + + ct.finalizedBlockNumber = blockNumber +} + // RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe type RedisConsensusTracker struct { ctx context.Context @@ -104,13 +104,6 @@ func (ct *RedisConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64 ct.client.Set(ct.ctx, ct.key("latest"), blockNumber, 0) } -func (ct *RedisConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 { - return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("finalized")).Val())) -} - -func (ct *RedisConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) { - ct.client.Set(ct.ctx, ct.key("finalized"), blockNumber, 0) -} func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 { return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("safe")).Val())) } @@ -118,3 +111,11 @@ func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 { func (ct *RedisConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) { ct.client.Set(ct.ctx, ct.key("safe"), blockNumber, 0) } + +func (ct *RedisConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 { + return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("finalized")).Val())) +} + +func (ct *RedisConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) { + ct.client.Set(ct.ctx, ct.key("finalized"), blockNumber, 0) +} diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index 98196c9..8075637 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -267,8 +267,8 @@ func TestConsensus(t *testing.T) { overrideBlock("node2", "safe", "0xe2") update() - require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) }) t.Run("advance safe and finalized", func(t *testing.T) { @@ -279,8 +279,8 @@ func TestConsensus(t *testing.T) { overrideBlock("node2", "safe", "0xe2") update() - require.Equal(t, "0xc2", bg.Consensus.GetFinalizedBlockNumber().String()) require.Equal(t, "0xe2", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0xc2", bg.Consensus.GetFinalizedBlockNumber().String()) }) t.Run("ban backend if error rate is too high", func(t *testing.T) { @@ -317,8 +317,8 @@ func TestConsensus(t *testing.T) { overrideBlock("node1", "safe", "0xa1") update() - require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) consensusGroup := bg.Consensus.GetConsensusGroup() require.NotContains(t, consensusGroup, nodes["node1"].backend) @@ -349,8 +349,8 @@ func TestConsensus(t *testing.T) { update() require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) - require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) consensusGroup := bg.Consensus.GetConsensusGroup() require.NotContains(t, consensusGroup, nodes["node1"].backend) @@ -365,8 +365,8 @@ func TestConsensus(t *testing.T) { update() require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) - require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) consensusGroup := bg.Consensus.GetConsensusGroup() require.NotContains(t, consensusGroup, nodes["node1"].backend) @@ -397,8 +397,8 @@ func TestConsensus(t *testing.T) { require.Equal(t, 1, len(consensusGroup)) require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String()) - require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String()) require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String()) }) t.Run("latest dropped below safe, then recovered", func(t *testing.T) { @@ -424,8 +424,8 @@ func TestConsensus(t *testing.T) { require.Equal(t, 1, len(consensusGroup)) require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String()) - require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String()) require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String()) }) t.Run("latest dropped below safe, and stayed inconsistent", func(t *testing.T) { diff --git a/proxyd/proxyd/metrics.go b/proxyd/proxyd/metrics.go index 3d8bfac..1d9602c 100644 --- a/proxyd/proxyd/metrics.go +++ b/proxyd/proxyd/metrics.go @@ -482,7 +482,12 @@ func RecordConsensusBackendInSync(b *Backend, inSync bool) { consensusInSyncBackend.WithLabelValues(b.Name).Set(boolToFloat64(inSync)) } -func RecordConsensusBackendUpdateDelay(b *Backend, delay time.Duration) { +func RecordConsensusBackendUpdateDelay(b *Backend, lastUpdate time.Time) { + // avoid recording the delay for the first update + if lastUpdate.IsZero() { + return + } + delay := time.Since(lastUpdate) consensusUpdateDelayBackend.WithLabelValues(b.Name).Set(float64(delay.Milliseconds())) }