From d5594a62a0a48faa797a13e2474fe317d4659222 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Fri, 26 May 2023 15:54:04 -0700 Subject: [PATCH] moar tests for safe and finalized bans --- proxyd/proxyd/consensus_poller.go | 117 ++++++++++-------- .../integration_tests/consensus_test.go | 76 ++++++++++++ .../testdata/consensus_responses.yml | 22 ++++ 3 files changed, 161 insertions(+), 54 deletions(-) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index 1856566..1ad4b22 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -406,11 +406,6 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { } } - // no block to propose (i.e. initializing consensus) - if lowestLatestBlock == 0 { - return - } - proposedBlock := lowestLatestBlock proposedBlockHash := lowestLatestBlockHash hasConsensus := false @@ -424,59 +419,63 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock) } - broken := false - for !hasConsensus { - allAgreed := true - consensusBackends = consensusBackends[:0] - filteredBackendsNames = filteredBackendsNames[:0] - for _, be := range cp.backendGroup.Backends { - /* - a serving node needs to be: - - healthy (network) - - updated recently - - not banned - - with minimum peer count - - not lagging latest block - - in sync - */ + // if there is no block to propose, the consensus is automatically broken + broken := proposedBlock == 0 && currentConsensusBlockNumber > 0 - peerCount, inSync, latestBlockNumber, _, _, _, lastUpdate, bannedUntil := cp.getBackendState(be) - notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) - isBanned := time.Now().Before(bannedUntil) - notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount - lagging := latestBlockNumber < proposedBlock - if !be.IsHealthy() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync { - filteredBackendsNames = append(filteredBackendsNames, be.Name) - continue - } + if proposedBlock > 0 { + for !hasConsensus { + allAgreed := true + consensusBackends = consensusBackends[:0] + filteredBackendsNames = filteredBackendsNames[:0] + for _, be := range cp.backendGroup.Backends { + /* + a serving node needs to be: + - healthy (network) + - updated recently + - not banned + - with minimum peer count + - not lagging latest block + - in sync + */ - actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String()) - if err != nil { - log.Warn("error updating backend", "name", be.Name, "err", err) - continue - } - if proposedBlockHash == "" { - proposedBlockHash = actualBlockHash - } - blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash) - if blocksDontMatch { - if currentConsensusBlockNumber >= actualBlockNumber { - log.Warn("backend broke consensus", "name", be.Name, "blockNum", actualBlockNumber, "proposedBlockNum", proposedBlock, "blockHash", actualBlockHash, "proposedBlockHash", proposedBlockHash) - broken = true + peerCount, inSync, latestBlockNumber, _, _, _, lastUpdate, bannedUntil := cp.getBackendState(be) + notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) + isBanned := time.Now().Before(bannedUntil) + notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount + lagging := latestBlockNumber < proposedBlock + if !be.IsHealthy() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync { + filteredBackendsNames = append(filteredBackendsNames, be.Name) + continue } - allAgreed = false - break + + actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String()) + if err != nil { + log.Warn("error updating backend", "name", be.Name, "err", err) + continue + } + if proposedBlockHash == "" { + proposedBlockHash = actualBlockHash + } + blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash) + if blocksDontMatch { + if currentConsensusBlockNumber >= actualBlockNumber { + log.Warn("backend broke consensus", "name", be.Name, "blockNum", actualBlockNumber, "proposedBlockNum", proposedBlock, "blockHash", actualBlockHash, "proposedBlockHash", proposedBlockHash) + broken = true + } + allAgreed = false + break + } + consensusBackends = append(consensusBackends, be) + consensusBackendsNames = append(consensusBackendsNames, be.Name) + } + if allAgreed { + hasConsensus = true + } else { + // walk one block behind and try again + proposedBlock -= 1 + proposedBlockHash = "" + log.Debug("no consensus, now trying", "block:", proposedBlock) } - consensusBackends = append(consensusBackends, be) - consensusBackendsNames = append(consensusBackendsNames, be.Name) - } - if allAgreed { - hasConsensus = true - } else { - // walk one block behind and try again - proposedBlock -= 1 - proposedBlockHash = "" - log.Debug("no consensus, now trying", "block:", proposedBlock) } } @@ -521,6 +520,16 @@ func (cp *ConsensusPoller) Ban(be *Backend) { defer bs.backendStateMux.Unlock() bs.backendStateMux.Lock() bs.bannedUntil = time.Now().Add(cp.banPeriod) + bs.safeBlockNumber = 0 + bs.finalizedBlockNumber = 0 +} + +// Unban remove any bans from the backends +func (cp *ConsensusPoller) Unban(be *Backend) { + bs := cp.backendState[be] + defer bs.backendStateMux.Unlock() + bs.backendStateMux.Lock() + bs.bannedUntil = time.Now().Add(-10 * time.Hour) } // Reset remove any bans from the backends and reset their states diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index fe36a15..320139a 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -330,6 +330,82 @@ func TestConsensus(t *testing.T) { require.Equal(t, 1, len(consensusGroup)) }) + t.Run("recover after safe and finalized dropped", func(t *testing.T) { + reset() + useOnlyNode1() + overrideBlock("node1", "latest", "0xd1") + overrideBlock("node1", "safe", "0xb1") + overrideBlock("node1", "finalized", "0x91") + update() + + consensusGroup := bg.Consensus.GetConsensusGroup() + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 0, len(consensusGroup)) + + // unban and see if it recovers + bg.Consensus.Unban(nodes["node1"].backend) + update() + + consensusGroup = bg.Consensus.GetConsensusGroup() + require.Contains(t, consensusGroup, nodes["node1"].backend) + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 1, len(consensusGroup)) + + require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String()) + }) + + t.Run("latest dropped below safe, then recovered", func(t *testing.T) { + reset() + useOnlyNode1() + overrideBlock("node1", "latest", "0xd1") + update() + + consensusGroup := bg.Consensus.GetConsensusGroup() + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 0, len(consensusGroup)) + + // unban and see if it recovers + bg.Consensus.Unban(nodes["node1"].backend) + overrideBlock("node1", "safe", "0xb1") + overrideBlock("node1", "finalized", "0x91") + update() + + consensusGroup = bg.Consensus.GetConsensusGroup() + require.Contains(t, consensusGroup, nodes["node1"].backend) + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 1, len(consensusGroup)) + + require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String()) + }) + + t.Run("latest dropped below safe, and stayed inconsistent after ban", func(t *testing.T) { + reset() + useOnlyNode1() + overrideBlock("node1", "latest", "0xd1") + update() + + consensusGroup := bg.Consensus.GetConsensusGroup() + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 0, len(consensusGroup)) + + // unban and see if it recovers - it should not since the blocks stays the same + bg.Consensus.Unban(nodes["node1"].backend) + update() + + // should be banned again + consensusGroup = bg.Consensus.GetConsensusGroup() + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 0, len(consensusGroup)) + }) + t.Run("broken consensus", func(t *testing.T) { reset() listenerCalled := false diff --git a/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml b/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml index 12cc4cd..ea8ef72 100644 --- a/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml +++ b/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml @@ -107,6 +107,17 @@ "number": "0x200" } } +- method: eth_getBlockByNumber + block: 0x91 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x91", + "number": "0x91" + } + } - method: eth_getBlockByNumber block: safe response: > @@ -151,3 +162,14 @@ "number": "0xc1" } } +- method: eth_getBlockByNumber + block: 0xd1 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0xd1", + "number": "0xd1" + } + }