From 2bf2329dd3034decc619352f61342063c776d612 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Sat, 27 May 2023 10:30:50 -0700 Subject: [PATCH] order params chronologically: latest, safe, finalized --- proxyd/proxyd/backend.go | 2 +- proxyd/proxyd/consensus_poller.go | 46 ++++++++--------- proxyd/proxyd/consensus_tracker.go | 49 ++++++++++--------- .../integration_tests/consensus_test.go | 14 +++--- 4 files changed, 54 insertions(+), 57 deletions(-) diff --git a/proxyd/proxyd/backend.go b/proxyd/proxyd/backend.go index 6ba7a10..07c9fa2 100644 --- a/proxyd/proxyd/backend.go +++ b/proxyd/proxyd/backend.go @@ -564,8 +564,8 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch // We also rewrite block tags to enforce compliance with consensus rctx := RewriteContext{ latest: bg.Consensus.GetLatestBlockNumber(), - finalized: bg.Consensus.GetFinalizedBlockNumber(), safe: bg.Consensus.GetSafeBlockNumber(), + finalized: bg.Consensus.GetFinalizedBlockNumber(), } for i, req := range rpcReqs { diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index 9e1c5eb..5944fa7 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -43,11 +43,10 @@ type ConsensusPoller struct { type backendState struct { backendStateMux sync.Mutex - latestBlockNumber hexutil.Uint64 - latestBlockHash string - - finalizedBlockNumber hexutil.Uint64 + latestBlockNumber hexutil.Uint64 + latestBlockHash string safeBlockNumber hexutil.Uint64 + finalizedBlockNumber hexutil.Uint64 peerCount uint64 inSync bool @@ -77,16 +76,16 @@ func (ct *ConsensusPoller) GetLatestBlockNumber() hexutil.Uint64 { return ct.tracker.GetLatestBlockNumber() } -// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus -func (ct *ConsensusPoller) GetFinalizedBlockNumber() hexutil.Uint64 { - return ct.tracker.GetFinalizedBlockNumber() -} - // GetSafeBlockNumber returns the `safe` agreed block number in a consensus func (ct *ConsensusPoller) GetSafeBlockNumber() hexutil.Uint64 { return ct.tracker.GetSafeBlockNumber() } +// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus +func (ct *ConsensusPoller) GetFinalizedBlockNumber() hexutil.Uint64 { + return ct.tracker.GetFinalizedBlockNumber() +} + func (cp *ConsensusPoller) Shutdown() { cp.asyncHandler.Shutdown() } @@ -289,15 +288,11 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { log.Warn("error updating backend - finalized block", "name", be.Name, "err", err) } - // just for readability - oldFinalized := bs.finalizedBlockNumber - oldSafe := bs.safeBlockNumber - RecordConsensusBackendUpdateDelay(be, bs.lastUpdate) changed := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash, - finalizedBlockNumber, safeBlockNumber) + safeBlockNumber, finalizedBlockNumber) RecordBackendLatestBlock(be, latestBlockNumber) RecordBackendSafeBlock(be, safeBlockNumber) @@ -310,25 +305,25 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { "inSync", inSync, "latestBlockNumber", latestBlockNumber, "latestBlockHash", latestBlockHash, - "finalizedBlockNumber", finalizedBlockNumber, "safeBlockNumber", safeBlockNumber, + "finalizedBlockNumber", finalizedBlockNumber, "lastUpdate", bs.lastUpdate) } // sanity check for latest, safe and finalized block tags expectedBlockTags := cp.checkExpectedBlockTags( - finalizedBlockNumber, oldFinalized, - safeBlockNumber, oldSafe, - latestBlockNumber) + latestBlockNumber, + bs.safeBlockNumber, safeBlockNumber, + bs.finalizedBlockNumber, finalizedBlockNumber) RecordBackendUnexpectedBlockTags(be, !expectedBlockTags) if !expectedBlockTags { log.Warn("backend banned - unexpected block tags", "backend", be.Name, - "oldFinalized", oldFinalized, + "oldFinalized", bs.finalizedBlockNumber, "finalizedBlockNumber", finalizedBlockNumber, - "oldSafe", oldSafe, + "oldSafe", bs.safeBlockNumber, "safeBlockNumber", safeBlockNumber, "latestBlockNumber", latestBlockNumber, ) @@ -340,9 +335,10 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { // - finalized block number should never decrease // - safe block number should never decrease // - finalized block should be <= safe block <= latest block -func (cp *ConsensusPoller) checkExpectedBlockTags(currentFinalized hexutil.Uint64, oldFinalized hexutil.Uint64, - currentSafe hexutil.Uint64, oldSafe hexutil.Uint64, - currentLatest hexutil.Uint64) bool { +func (cp *ConsensusPoller) checkExpectedBlockTags( + currentLatest hexutil.Uint64, + oldSafe hexutil.Uint64, currentSafe hexutil.Uint64, + oldFinalized hexutil.Uint64, currentFinalized hexutil.Uint64) bool { return currentFinalized >= oldFinalized && currentSafe >= oldSafe && currentFinalized <= currentSafe && @@ -594,8 +590,8 @@ func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState { func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, latestBlockNumber hexutil.Uint64, latestBlockHash string, - finalizedBlockNumber hexutil.Uint64, - safeBlockNumber hexutil.Uint64) bool { + safeBlockNumber hexutil.Uint64, + finalizedBlockNumber hexutil.Uint64) bool { bs := cp.backendState[be] bs.backendStateMux.Lock() changed := bs.latestBlockHash != latestBlockHash diff --git a/proxyd/proxyd/consensus_tracker.go b/proxyd/proxyd/consensus_tracker.go index 83d2ec7..b0bcb45 100644 --- a/proxyd/proxyd/consensus_tracker.go +++ b/proxyd/proxyd/consensus_tracker.go @@ -15,17 +15,17 @@ import ( type ConsensusTracker interface { GetLatestBlockNumber() hexutil.Uint64 SetLatestBlockNumber(blockNumber hexutil.Uint64) - GetFinalizedBlockNumber() hexutil.Uint64 - SetFinalizedBlockNumber(blockNumber hexutil.Uint64) GetSafeBlockNumber() hexutil.Uint64 SetSafeBlockNumber(blockNumber hexutil.Uint64) + GetFinalizedBlockNumber() hexutil.Uint64 + SetFinalizedBlockNumber(blockNumber hexutil.Uint64) } // InMemoryConsensusTracker store and retrieve in memory, async-safe type InMemoryConsensusTracker struct { latestBlockNumber hexutil.Uint64 - finalizedBlockNumber hexutil.Uint64 safeBlockNumber hexutil.Uint64 + finalizedBlockNumber hexutil.Uint64 mutex sync.Mutex } @@ -49,20 +49,6 @@ func (ct *InMemoryConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uin ct.latestBlockNumber = blockNumber } -func (ct *InMemoryConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 { - defer ct.mutex.Unlock() - ct.mutex.Lock() - - return ct.finalizedBlockNumber -} - -func (ct *InMemoryConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) { - defer ct.mutex.Unlock() - ct.mutex.Lock() - - ct.finalizedBlockNumber = blockNumber -} - func (ct *InMemoryConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 { defer ct.mutex.Unlock() ct.mutex.Lock() @@ -77,6 +63,20 @@ func (ct *InMemoryConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint6 ct.safeBlockNumber = blockNumber } +func (ct *InMemoryConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 { + defer ct.mutex.Unlock() + ct.mutex.Lock() + + return ct.finalizedBlockNumber +} + +func (ct *InMemoryConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) { + defer ct.mutex.Unlock() + ct.mutex.Lock() + + ct.finalizedBlockNumber = blockNumber +} + // RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe type RedisConsensusTracker struct { ctx context.Context @@ -104,13 +104,6 @@ func (ct *RedisConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64 ct.client.Set(ct.ctx, ct.key("latest"), blockNumber, 0) } -func (ct *RedisConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 { - return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("finalized")).Val())) -} - -func (ct *RedisConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) { - ct.client.Set(ct.ctx, ct.key("finalized"), blockNumber, 0) -} func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 { return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("safe")).Val())) } @@ -118,3 +111,11 @@ func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 { func (ct *RedisConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) { ct.client.Set(ct.ctx, ct.key("safe"), blockNumber, 0) } + +func (ct *RedisConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 { + return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("finalized")).Val())) +} + +func (ct *RedisConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) { + ct.client.Set(ct.ctx, ct.key("finalized"), blockNumber, 0) +} diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index 98196c9..8075637 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -267,8 +267,8 @@ func TestConsensus(t *testing.T) { overrideBlock("node2", "safe", "0xe2") update() - require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) }) t.Run("advance safe and finalized", func(t *testing.T) { @@ -279,8 +279,8 @@ func TestConsensus(t *testing.T) { overrideBlock("node2", "safe", "0xe2") update() - require.Equal(t, "0xc2", bg.Consensus.GetFinalizedBlockNumber().String()) require.Equal(t, "0xe2", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0xc2", bg.Consensus.GetFinalizedBlockNumber().String()) }) t.Run("ban backend if error rate is too high", func(t *testing.T) { @@ -317,8 +317,8 @@ func TestConsensus(t *testing.T) { overrideBlock("node1", "safe", "0xa1") update() - require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) consensusGroup := bg.Consensus.GetConsensusGroup() require.NotContains(t, consensusGroup, nodes["node1"].backend) @@ -349,8 +349,8 @@ func TestConsensus(t *testing.T) { update() require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) - require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) consensusGroup := bg.Consensus.GetConsensusGroup() require.NotContains(t, consensusGroup, nodes["node1"].backend) @@ -365,8 +365,8 @@ func TestConsensus(t *testing.T) { update() require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) - require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) consensusGroup := bg.Consensus.GetConsensusGroup() require.NotContains(t, consensusGroup, nodes["node1"].backend) @@ -397,8 +397,8 @@ func TestConsensus(t *testing.T) { require.Equal(t, 1, len(consensusGroup)) require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String()) - require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String()) require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String()) }) t.Run("latest dropped below safe, then recovered", func(t *testing.T) { @@ -424,8 +424,8 @@ func TestConsensus(t *testing.T) { require.Equal(t, 1, len(consensusGroup)) require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String()) - require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String()) require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String()) }) t.Run("latest dropped below safe, and stayed inconsistent", func(t *testing.T) {