From af863d39de25d252479ed6e7266dc7c893c1c843 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Thu, 25 May 2023 21:37:47 -0700 Subject: [PATCH 01/13] consensus for {safe,finalized} and rewrite tags --- proxyd/proxyd/backend.go | 6 +- proxyd/proxyd/consensus_poller.go | 117 +++++++--- proxyd/proxyd/consensus_tracker.go | 78 +++++-- .../integration_tests/consensus_test.go | 207 ++++++++++++++++-- .../testdata/consensus_responses.yml | 66 ++++++ proxyd/proxyd/rewriter.go | 12 +- proxyd/proxyd/rewriter_test.go | 12 +- .../tools/mockserver/handler/handler.go | 2 +- 8 files changed, 423 insertions(+), 77 deletions(-) diff --git a/proxyd/proxyd/backend.go b/proxyd/proxyd/backend.go index 8e8db75..3946995 100644 --- a/proxyd/proxyd/backend.go +++ b/proxyd/proxyd/backend.go @@ -556,7 +556,11 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch backends = bg.loadBalancedConsensusGroup() // We also rewrite block tags to enforce compliance with consensus - rctx := RewriteContext{latest: bg.Consensus.GetConsensusBlockNumber()} + rctx := RewriteContext{ + latest: bg.Consensus.GetLatestBlockNumber(), + finalized: bg.Consensus.GetFinalizedBlockNumber(), + safe: bg.Consensus.GetSafeBlockNumber(), + } for i, req := range rpcReqs { res := RPCRes{JSONRPC: JSONRPCVersion, ID: req.ID} diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index a170aca..5c5edcd 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -46,8 +46,12 @@ type backendState struct { latestBlockNumber hexutil.Uint64 latestBlockHash string - peerCount uint64 - inSync bool + + finalizedBlockNumber hexutil.Uint64 + safeBlockNumber hexutil.Uint64 + + peerCount uint64 + inSync bool lastUpdate time.Time @@ -65,9 +69,19 @@ func (cp *ConsensusPoller) GetConsensusGroup() []*Backend { return g } -// GetConsensusBlockNumber returns the agreed block number in a consensus -func (ct *ConsensusPoller) GetConsensusBlockNumber() hexutil.Uint64 { - return ct.tracker.GetConsensusBlockNumber() +// GetLatestBlockNumber returns the `latest` agreed block number in a consensus +func (ct *ConsensusPoller) GetLatestBlockNumber() hexutil.Uint64 { + return ct.tracker.GetLatestBlockNumber() +} + +// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus +func (ct *ConsensusPoller) GetFinalizedBlockNumber() hexutil.Uint64 { + return ct.tracker.GetFinalizedBlockNumber() +} + +// GetSafeBlockNumber returns the `safe` agreed block number in a consensus +func (ct *ConsensusPoller) GetSafeBlockNumber() hexutil.Uint64 { + return ct.tracker.GetSafeBlockNumber() } func (cp *ConsensusPoller) Shutdown() { @@ -261,7 +275,19 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { log.Warn("error updating backend", "name", be.Name, "err", err) } - changed, updateDelay := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash) + finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized") + if err != nil { + log.Warn("error updating backend", "name", be.Name, "err", err) + } + + safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe") + if err != nil { + log.Warn("error updating backend", "name", be.Name, "err", err) + } + + changed, updateDelay := cp.setBackendState(be, peerCount, inSync, + latestBlockNumber, latestBlockHash, + finalizedBlockNumber, safeBlockNumber) if changed { RecordBackendLatestBlock(be, latestBlockNumber) @@ -272,21 +298,27 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { "inSync", inSync, "latestBlockNumber", latestBlockNumber, "latestBlockHash", latestBlockHash, + "finalizedBlockNumber", finalizedBlockNumber, + "safeBlockNumber", safeBlockNumber, "updateDelay", updateDelay) } } // UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { - var highestBlock hexutil.Uint64 - var lowestBlock hexutil.Uint64 - var lowestBlockHash string + var highestLatestBlock hexutil.Uint64 - currentConsensusBlockNumber := cp.GetConsensusBlockNumber() + var lowestLatestBlock hexutil.Uint64 + var lowestLatestBlockHash string + + var lowestFinalizedBlock hexutil.Uint64 + var lowestSafeBlock hexutil.Uint64 + + currentConsensusBlockNumber := cp.GetLatestBlockNumber() // find the highest block, in order to use it defining the highest non-lagging ancestor block for _, be := range cp.backendGroup.Backends { - peerCount, inSync, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be) + peerCount, inSync, backendLatestBlockNumber, _, _, _, lastUpdate, _ := cp.getBackendState(be) if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { continue @@ -298,14 +330,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { continue } - if backendLatestBlockNumber > highestBlock { - highestBlock = backendLatestBlockNumber + if backendLatestBlockNumber > highestLatestBlock { + highestLatestBlock = backendLatestBlockNumber } } // find the highest common ancestor block for _, be := range cp.backendGroup.Backends { - peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be) + peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, backendFinalizedBlockNumber, backendSafeBlockNumber, lastUpdate, _ := cp.getBackendState(be) if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { continue @@ -318,23 +350,31 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { } // check if backend is lagging behind the highest block - if backendLatestBlockNumber < highestBlock && uint64(highestBlock-backendLatestBlockNumber) > cp.maxBlockLag { + if backendLatestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-backendLatestBlockNumber) > cp.maxBlockLag { continue } - if lowestBlock == 0 || backendLatestBlockNumber < lowestBlock { - lowestBlock = backendLatestBlockNumber - lowestBlockHash = backendLatestBlockHash + if lowestLatestBlock == 0 || backendLatestBlockNumber < lowestLatestBlock { + lowestLatestBlock = backendLatestBlockNumber + lowestLatestBlockHash = backendLatestBlockHash + } + + if lowestFinalizedBlock == 0 || backendFinalizedBlockNumber < lowestFinalizedBlock { + lowestFinalizedBlock = backendFinalizedBlockNumber + } + + if lowestSafeBlock == 0 || backendSafeBlockNumber < lowestSafeBlock { + lowestSafeBlock = backendSafeBlockNumber } } // no block to propose (i.e. initializing consensus) - if lowestBlock == 0 { + if lowestLatestBlock == 0 { return } - proposedBlock := lowestBlock - proposedBlockHash := lowestBlockHash + proposedBlock := lowestLatestBlock + proposedBlockHash := lowestLatestBlockHash hasConsensus := false // check if everybody agrees on the same block hash @@ -342,8 +382,8 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { consensusBackendsNames := make([]string, 0, len(cp.backendGroup.Backends)) filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends)) - if lowestBlock > currentConsensusBlockNumber { - log.Debug("validating consensus on block", "lowestBlock", lowestBlock) + if lowestLatestBlock > currentConsensusBlockNumber { + log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock) } broken := false @@ -362,7 +402,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { - in sync */ - peerCount, inSync, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) + peerCount, inSync, latestBlockNumber, _, _, _, lastUpdate, bannedUntil := cp.getBackendState(be) notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) isBanned := time.Now().Before(bannedUntil) notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount @@ -410,7 +450,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash) } - cp.tracker.SetConsensusBlockNumber(proposedBlock) + cp.tracker.SetLatestBlockNumber(proposedBlock) + cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock) + cp.tracker.SetSafeBlockNumber(lowestSafeBlock) cp.consensusGroupMux.Lock() cp.consensusGroup = consensusBackends cp.consensusGroupMux.Unlock() @@ -512,27 +554,38 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo return res, nil } -func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) { +func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool, + latestBlockNumber hexutil.Uint64, latestBlockHash string, + finalizedBlockNumber hexutil.Uint64, + safeBlockNumber hexutil.Uint64, + lastUpdate time.Time, bannedUntil time.Time) { bs := cp.backendState[be] defer bs.backendStateMux.Unlock() bs.backendStateMux.Lock() peerCount = bs.peerCount inSync = bs.inSync - blockNumber = bs.latestBlockNumber - blockHash = bs.latestBlockHash + latestBlockNumber = bs.latestBlockNumber + latestBlockHash = bs.latestBlockHash + finalizedBlockNumber = bs.finalizedBlockNumber + safeBlockNumber = bs.safeBlockNumber lastUpdate = bs.lastUpdate bannedUntil = bs.bannedUntil return } -func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) { +func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, + latestBlockNumber hexutil.Uint64, latestBlockHash string, + finalizedBlockNumber hexutil.Uint64, + safeBlockNumber hexutil.Uint64) (changed bool, updateDelay time.Duration) { bs := cp.backendState[be] bs.backendStateMux.Lock() - changed = bs.latestBlockHash != blockHash + changed = bs.latestBlockHash != latestBlockHash bs.peerCount = peerCount bs.inSync = inSync - bs.latestBlockNumber = blockNumber - bs.latestBlockHash = blockHash + bs.latestBlockNumber = latestBlockNumber + bs.latestBlockHash = latestBlockHash + bs.finalizedBlockNumber = finalizedBlockNumber + bs.safeBlockNumber = safeBlockNumber updateDelay = time.Since(bs.lastUpdate) bs.lastUpdate = time.Now() bs.backendStateMux.Unlock() diff --git a/proxyd/proxyd/consensus_tracker.go b/proxyd/proxyd/consensus_tracker.go index 3bd10e7..83d2ec7 100644 --- a/proxyd/proxyd/consensus_tracker.go +++ b/proxyd/proxyd/consensus_tracker.go @@ -13,35 +13,68 @@ import ( // ConsensusTracker abstracts how we store and retrieve the current consensus // allowing it to be stored locally in-memory or in a shared Redis cluster type ConsensusTracker interface { - GetConsensusBlockNumber() hexutil.Uint64 - SetConsensusBlockNumber(blockNumber hexutil.Uint64) + GetLatestBlockNumber() hexutil.Uint64 + SetLatestBlockNumber(blockNumber hexutil.Uint64) + GetFinalizedBlockNumber() hexutil.Uint64 + SetFinalizedBlockNumber(blockNumber hexutil.Uint64) + GetSafeBlockNumber() hexutil.Uint64 + SetSafeBlockNumber(blockNumber hexutil.Uint64) } // InMemoryConsensusTracker store and retrieve in memory, async-safe type InMemoryConsensusTracker struct { - consensusBlockNumber hexutil.Uint64 + latestBlockNumber hexutil.Uint64 + finalizedBlockNumber hexutil.Uint64 + safeBlockNumber hexutil.Uint64 mutex sync.Mutex } func NewInMemoryConsensusTracker() ConsensusTracker { return &InMemoryConsensusTracker{ - consensusBlockNumber: 0, - mutex: sync.Mutex{}, + mutex: sync.Mutex{}, } } -func (ct *InMemoryConsensusTracker) GetConsensusBlockNumber() hexutil.Uint64 { +func (ct *InMemoryConsensusTracker) GetLatestBlockNumber() hexutil.Uint64 { defer ct.mutex.Unlock() ct.mutex.Lock() - return ct.consensusBlockNumber + return ct.latestBlockNumber } -func (ct *InMemoryConsensusTracker) SetConsensusBlockNumber(blockNumber hexutil.Uint64) { +func (ct *InMemoryConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64) { defer ct.mutex.Unlock() ct.mutex.Lock() - ct.consensusBlockNumber = blockNumber + ct.latestBlockNumber = blockNumber +} + +func (ct *InMemoryConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 { + defer ct.mutex.Unlock() + ct.mutex.Lock() + + return ct.finalizedBlockNumber +} + +func (ct *InMemoryConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) { + defer ct.mutex.Unlock() + ct.mutex.Lock() + + ct.finalizedBlockNumber = blockNumber +} + +func (ct *InMemoryConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 { + defer ct.mutex.Unlock() + ct.mutex.Lock() + + return ct.safeBlockNumber +} + +func (ct *InMemoryConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) { + defer ct.mutex.Unlock() + ct.mutex.Lock() + + ct.safeBlockNumber = blockNumber } // RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe @@ -59,14 +92,29 @@ func NewRedisConsensusTracker(ctx context.Context, r *redis.Client, namespace st } } -func (ct *RedisConsensusTracker) key() string { - return fmt.Sprintf("consensus_latest_block:%s", ct.backendGroup) +func (ct *RedisConsensusTracker) key(tag string) string { + return fmt.Sprintf("consensus:%s:%s", ct.backendGroup, tag) } -func (ct *RedisConsensusTracker) GetConsensusBlockNumber() hexutil.Uint64 { - return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key()).Val())) +func (ct *RedisConsensusTracker) GetLatestBlockNumber() hexutil.Uint64 { + return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("latest")).Val())) } -func (ct *RedisConsensusTracker) SetConsensusBlockNumber(blockNumber hexutil.Uint64) { - ct.client.Set(ct.ctx, ct.key(), blockNumber, 0) +func (ct *RedisConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64) { + ct.client.Set(ct.ctx, ct.key("latest"), blockNumber, 0) +} + +func (ct *RedisConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 { + return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("finalized")).Val())) +} + +func (ct *RedisConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) { + ct.client.Set(ct.ctx, ct.key("finalized"), blockNumber, 0) +} +func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 { + return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("safe")).Val())) +} + +func (ct *RedisConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) { + ct.client.Set(ct.ctx, ct.key("safe"), blockNumber, 0) } diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index 9320429..d95903e 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -61,7 +61,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.Unban() // unknown consensus at init - require.Equal(t, "0x0", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x0", bg.Consensus.GetLatestBlockNumber().String()) // first poll for _, be := range bg.Backends { @@ -70,7 +70,9 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // consensus at block 0x1 - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0x555", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0x551", bg.Consensus.GetSafeBlockNumber().String()) }) t.Run("prevent using a backend with low peer count", func(t *testing.T) { @@ -108,6 +110,16 @@ func TestConsensus(t *testing.T) { Block: "latest", Response: buildGetBlockResponse("0x1", "hash1"), }) + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "finalized", + Response: buildGetBlockResponse("0x1", "hash1"), + }) + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "safe", + Response: buildGetBlockResponse("0x1", "hash1"), + }) h2.AddOverride(&ms.MethodTemplate{ Method: "eth_getBlockByNumber", @@ -126,7 +138,9 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // since we ignored node1, the consensus should be at 0x100 - require.Equal(t, "0x100", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x100", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0x555", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0x551", bg.Consensus.GetSafeBlockNumber().String()) consensusGroup := bg.Consensus.GetConsensusGroup() @@ -166,7 +180,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // since we ignored node1, the consensus should be at 0x100 - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) consensusGroup := bg.Consensus.GetConsensusGroup() @@ -201,7 +215,7 @@ func TestConsensus(t *testing.T) { } bg.Consensus.UpdateBackendGroupConsensus(ctx) - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) consensusGroup := bg.Consensus.GetConsensusGroup() @@ -249,7 +263,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // all nodes start at block 0x1 - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) // advance latest on node2 to 0x2 h2.AddOverride(&ms.MethodTemplate{ @@ -265,7 +279,7 @@ func TestConsensus(t *testing.T) { // consensus should stick to 0x1, since node1 is still lagging there bg.Consensus.UpdateBackendGroupConsensus(ctx) - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) // advance latest on node1 to 0x2 h1.AddOverride(&ms.MethodTemplate{ @@ -281,7 +295,82 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // should stick to 0x2, since now all nodes are at 0x2 - require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x2", bg.Consensus.GetLatestBlockNumber().String()) + }) + + t.Run("should use lowest safe and finalized", func(t *testing.T) { + h1.ResetOverrides() + h2.ResetOverrides() + bg.Consensus.Unban() + + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "finalized", + Response: buildGetBlockResponse("0x559", "hash559"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "safe", + Response: buildGetBlockResponse("0x558", "hash558"), + }) + + // poll for group consensus + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + require.Equal(t, "0x555", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0x551", bg.Consensus.GetSafeBlockNumber().String()) + }) + + t.Run("advance safe and finalized", func(t *testing.T) { + h1.ResetOverrides() + h2.ResetOverrides() + bg.Consensus.Unban() + + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "finalized", + Response: buildGetBlockResponse("0x556", "hash556"), + }) + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "safe", + Response: buildGetBlockResponse("0x552", "hash552"), + }) + + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "finalized", + Response: buildGetBlockResponse("0x559", "hash559"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "safe", + Response: buildGetBlockResponse("0x558", "hash558"), + }) + + // poll for group consensus + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + require.Equal(t, "0x556", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0x552", bg.Consensus.GetSafeBlockNumber().String()) }) t.Run("broken consensus", func(t *testing.T) { @@ -300,7 +389,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // all nodes start at block 0x1 - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) // advance latest on both nodes to 0x2 h1.AddOverride(&ms.MethodTemplate{ @@ -321,7 +410,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // at 0x2 - require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x2", bg.Consensus.GetLatestBlockNumber().String()) // make node2 diverge on hash h2.AddOverride(&ms.MethodTemplate{ @@ -337,7 +426,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // should resolve to 0x1, since 0x2 is out of consensus at the moment - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) require.True(t, listenerCalled) }) @@ -353,7 +442,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // all nodes start at block 0x1 - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) // advance latest on both nodes to 0x2 h1.AddOverride(&ms.MethodTemplate{ @@ -374,7 +463,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // at 0x2 - require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x2", bg.Consensus.GetLatestBlockNumber().String()) // advance latest on both nodes to 0x3 h1.AddOverride(&ms.MethodTemplate{ @@ -395,7 +484,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // at 0x3 - require.Equal(t, "0x3", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x3", bg.Consensus.GetLatestBlockNumber().String()) // make node2 diverge on hash for blocks 0x2 and 0x3 h2.AddOverride(&ms.MethodTemplate{ @@ -416,7 +505,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // should resolve to 0x1 - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) }) t.Run("fork in advanced block", func(t *testing.T) { @@ -430,7 +519,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // all nodes start at block 0x1 - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) // make nodes 1 and 2 advance in forks h1.AddOverride(&ms.MethodTemplate{ @@ -471,7 +560,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // should resolve to 0x1, the highest common ancestor - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) }) t.Run("load balancing should hit both backends", func(t *testing.T) { @@ -607,7 +696,7 @@ func TestConsensus(t *testing.T) { require.Equal(t, totalRequests, len(node1.Requests())+len(node2.Requests())) }) - t.Run("rewrite request of eth_getBlockByNumber", func(t *testing.T) { + t.Run("rewrite request of eth_getBlockByNumber for latest", func(t *testing.T) { h1.ResetOverrides() h2.ResetOverrides() bg.Consensus.Unban() @@ -643,6 +732,88 @@ func TestConsensus(t *testing.T) { require.Equal(t, "0x2", jsonMap["params"].([]interface{})[0]) }) + t.Run("rewrite request of eth_getBlockByNumber for finalized", func(t *testing.T) { + h1.ResetOverrides() + h2.ResetOverrides() + bg.Consensus.Unban() + + // establish the consensus and ban node2 for now + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildGetBlockResponse("0x20", "hash20"), + }) + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "finalized", + Response: buildGetBlockResponse("0x5", "hash5"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "net_peerCount", + Block: "", + Response: buildPeerCountResponse(1), + }) + + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) + + node1.Reset() + + _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"finalized"}) + require.NoError(t, err) + require.Equal(t, 200, statusCode) + + var jsonMap map[string]interface{} + err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap) + require.NoError(t, err) + require.Equal(t, "0x5", jsonMap["params"].([]interface{})[0]) + }) + + t.Run("rewrite request of eth_getBlockByNumber for safe", func(t *testing.T) { + h1.ResetOverrides() + h2.ResetOverrides() + bg.Consensus.Unban() + + // establish the consensus and ban node2 for now + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildGetBlockResponse("0x20", "hash20"), + }) + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "safe", + Response: buildGetBlockResponse("0x1", "hash1"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "net_peerCount", + Block: "", + Response: buildPeerCountResponse(1), + }) + + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) + + node1.Reset() + + _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"safe"}) + require.NoError(t, err) + require.Equal(t, 200, statusCode) + + var jsonMap map[string]interface{} + err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap) + require.NoError(t, err) + require.Equal(t, "0x1", jsonMap["params"].([]interface{})[0]) + }) + t.Run("rewrite request of eth_getBlockByNumber - out of range", func(t *testing.T) { h1.ResetOverrides() h2.ResetOverrides() diff --git a/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml b/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml index 83579e7..ef5b692 100644 --- a/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml +++ b/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml @@ -63,3 +63,69 @@ "number": "0x3" } } +- method: eth_getBlockByNumber + block: finalized + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_finalized", + "number": "0x555" + } + } +- method: eth_getBlockByNumber + block: 0x555 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_finalized", + "number": "0x555" + } + } +- method: eth_getBlockByNumber + block: safe + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_safe", + "number": "0x551" + } + } +- method: eth_getBlockByNumber + block: 0x555 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_safe", + "number": "0x551" + } + } +- method: eth_getBlockByNumber + block: 0x5 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash5", + "number": "0x5" + } + } +- method: eth_getBlockByNumber + block: 0x20 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash20", + "number": "0x20" + } + } diff --git a/proxyd/proxyd/rewriter.go b/proxyd/proxyd/rewriter.go index be539bc..ab098d8 100644 --- a/proxyd/proxyd/rewriter.go +++ b/proxyd/proxyd/rewriter.go @@ -9,7 +9,9 @@ import ( ) type RewriteContext struct { - latest hexutil.Uint64 + latest hexutil.Uint64 + finalized hexutil.Uint64 + safe hexutil.Uint64 } type RewriteResult uint8 @@ -180,11 +182,13 @@ func rewriteTag(rctx RewriteContext, current string) (string, bool, error) { } switch *bnh.BlockNumber { - case rpc.SafeBlockNumber, - rpc.FinalizedBlockNumber, - rpc.PendingBlockNumber, + case rpc.PendingBlockNumber, rpc.EarliestBlockNumber: return current, false, nil + case rpc.FinalizedBlockNumber: + return rctx.finalized.String(), true, nil + case rpc.SafeBlockNumber: + return rctx.safe.String(), true, nil case rpc.LatestBlockNumber: return rctx.latest.String(), true, nil default: diff --git a/proxyd/proxyd/rewriter_test.go b/proxyd/proxyd/rewriter_test.go index ae5c9c8..eb7f18f 100644 --- a/proxyd/proxyd/rewriter_test.go +++ b/proxyd/proxyd/rewriter_test.go @@ -326,33 +326,33 @@ func TestRewriteRequest(t *testing.T) { { name: "eth_getBlockByNumber finalized", args: args{ - rctx: RewriteContext{latest: hexutil.Uint64(100)}, + rctx: RewriteContext{latest: hexutil.Uint64(100), finalized: hexutil.Uint64(55)}, req: &RPCReq{Method: "eth_getBlockByNumber", Params: mustMarshalJSON([]string{"finalized"})}, res: nil, }, - expected: RewriteNone, + expected: RewriteOverrideRequest, check: func(t *testing.T, args args) { var p []string err := json.Unmarshal(args.req.Params, &p) require.Nil(t, err) require.Equal(t, 1, len(p)) - require.Equal(t, "finalized", p[0]) + require.Equal(t, hexutil.Uint64(55).String(), p[0]) }, }, { name: "eth_getBlockByNumber safe", args: args{ - rctx: RewriteContext{latest: hexutil.Uint64(100)}, + rctx: RewriteContext{latest: hexutil.Uint64(100), safe: hexutil.Uint64(50)}, req: &RPCReq{Method: "eth_getBlockByNumber", Params: mustMarshalJSON([]string{"safe"})}, res: nil, }, - expected: RewriteNone, + expected: RewriteOverrideRequest, check: func(t *testing.T, args args) { var p []string err := json.Unmarshal(args.req.Params, &p) require.Nil(t, err) require.Equal(t, 1, len(p)) - require.Equal(t, "safe", p[0]) + require.Equal(t, hexutil.Uint64(50).String(), p[0]) }, }, { diff --git a/proxyd/proxyd/tools/mockserver/handler/handler.go b/proxyd/proxyd/tools/mockserver/handler/handler.go index 04f30e7..8a78a7e 100644 --- a/proxyd/proxyd/tools/mockserver/handler/handler.go +++ b/proxyd/proxyd/tools/mockserver/handler/handler.go @@ -95,7 +95,7 @@ func (mh *MockedHandler) Handler(w http.ResponseWriter, req *http.Request) { resBody := "" if batched { resBody = "[" + strings.Join(responses, ",") + "]" - } else { + } else if len(responses) > 0 { resBody = responses[0] } From 2d9259ee2006716f7a485f70c073399ad40e1584 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Fri, 26 May 2023 14:22:50 -0700 Subject: [PATCH 02/13] better moar tests --- proxyd/proxyd/backend.go | 32 +- proxyd/proxyd/consensus_poller.go | 67 +- .../integration_tests/consensus_test.go | 983 ++++++------------ .../testdata/consensus_responses.yml | 78 +- proxyd/proxyd/metrics.go | 97 +- proxyd/proxyd/rewriter.go | 2 +- 6 files changed, 533 insertions(+), 726 deletions(-) diff --git a/proxyd/proxyd/backend.go b/proxyd/proxyd/backend.go index 3946995..6ba7a10 100644 --- a/proxyd/proxyd/backend.go +++ b/proxyd/proxyd/backend.go @@ -374,7 +374,6 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { // we are concerned about network error rates, so we record 1 request independently of how many are in the batch b.networkRequestsSlidingWindow.Incr() - RecordBackendNetworkRequestCountSlidingWindow(b, b.networkRequestsSlidingWindow.Count()) isSingleElementBatch := len(rpcReqs) == 1 @@ -391,7 +390,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body)) if err != nil { b.networkErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) + RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, wrapErr(err, "error creating backend request") } @@ -413,7 +412,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool httpRes, err := b.client.DoLimited(httpReq) if err != nil { b.networkErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) + RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, wrapErr(err, "error in backend request") } @@ -432,7 +431,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool // Alchemy returns a 400 on bad JSONs, so handle that case if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 { b.networkErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) + RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, fmt.Errorf("response code %d", httpRes.StatusCode) } @@ -440,7 +439,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool resB, err := io.ReadAll(io.LimitReader(httpRes.Body, b.maxResponseSize)) if err != nil { b.networkErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) + RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, wrapErr(err, "error reading response body") } @@ -458,18 +457,18 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool // Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method if responseIsNotBatched(resB) { b.networkErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) + RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, ErrBackendUnexpectedJSONRPC } b.networkErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) + RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, ErrBackendBadResponse } } if len(rpcReqs) != len(res) { b.networkErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) + RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, ErrBackendUnexpectedJSONRPC } @@ -483,6 +482,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool duration := time.Since(start) b.latencySlidingWindow.Add(float64(duration)) RecordBackendNetworkLatencyAverageSlidingWindow(b, time.Duration(b.latencySlidingWindow.Avg())) + RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) sortBatchRPCResponse(rpcReqs, res) return res, nil @@ -490,11 +490,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool // IsHealthy checks if the backend is able to serve traffic, based on dynamic parameters func (b *Backend) IsHealthy() bool { - errorRate := float64(0) - // avoid division-by-zero when the window is empty - if b.networkRequestsSlidingWindow.Sum() >= 10 { - errorRate = b.networkErrorsSlidingWindow.Sum() / b.networkRequestsSlidingWindow.Sum() - } + errorRate := b.ErrorRate() avgLatency := time.Duration(b.latencySlidingWindow.Avg()) if errorRate >= b.maxErrorRateThreshold { return false @@ -505,6 +501,16 @@ func (b *Backend) IsHealthy() bool { return true } +// ErrorRate returns the instant error rate of the backend +func (b *Backend) ErrorRate() (errorRate float64) { + // we only really start counting the error rate after a minimum of 10 requests + // this is to avoid false positives when the backend is just starting up + if b.networkRequestsSlidingWindow.Sum() >= 10 { + errorRate = b.networkErrorsSlidingWindow.Sum() / b.networkRequestsSlidingWindow.Sum() + } + return errorRate +} + // IsDegraded checks if the backend is serving traffic in a degraded state (i.e. used as a last resource) func (b *Backend) IsDegraded() bool { avgLatency := time.Duration(b.latencySlidingWindow.Avg()) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index 5c5edcd..1856566 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -275,23 +275,42 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { log.Warn("error updating backend", "name", be.Name, "err", err) } - finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized") - if err != nil { - log.Warn("error updating backend", "name", be.Name, "err", err) - } - safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe") if err != nil { log.Warn("error updating backend", "name", be.Name, "err", err) } + finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized") + if err != nil { + log.Warn("error updating backend", "name", be.Name, "err", err) + } + + _, _, _, _, oldFinalized, oldSafe, _, _ := cp.getBackendState(be) + expectedBlockTags := cp.checkExpectedBlockTags(finalizedBlockNumber, oldFinalized, safeBlockNumber, oldSafe, latestBlockNumber) + changed, updateDelay := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash, finalizedBlockNumber, safeBlockNumber) + RecordBackendLatestBlock(be, latestBlockNumber) + RecordBackendSafeBlock(be, safeBlockNumber) + RecordBackendFinalizedBlock(be, finalizedBlockNumber) + RecordBackendUnexpectedBlockTags(be, !expectedBlockTags) + RecordConsensusBackendUpdateDelay(be, updateDelay) + + if !expectedBlockTags { + log.Warn("backend banned - unexpected block tags", + "backend", be.Name, + "oldFinalized", oldFinalized, + "finalizedBlockNumber", finalizedBlockNumber, + "oldSafe", oldSafe, + "safeBlockNumber", safeBlockNumber, + "latestBlockNumber", latestBlockNumber, + ) + cp.Ban(be) + } + if changed { - RecordBackendLatestBlock(be, latestBlockNumber) - RecordConsensusBackendUpdateDelay(be, updateDelay) log.Debug("backend state updated", "name", be.Name, "peerCount", peerCount, @@ -304,6 +323,19 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { } } +// checkExpectedBlockTags for unexpected conditions on block tags +// - finalized block number should never decrease +// - safe block number should never decrease +// - finalized block should be < safe block < latest block +func (cp *ConsensusPoller) checkExpectedBlockTags(currentFinalized hexutil.Uint64, oldFinalized hexutil.Uint64, + currentSafe hexutil.Uint64, oldSafe hexutil.Uint64, + currentLatest hexutil.Uint64) bool { + return currentFinalized >= oldFinalized && + currentSafe >= oldSafe && + currentFinalized <= currentSafe && + currentSafe <= currentLatest +} + // UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { var highestLatestBlock hexutil.Uint64 @@ -320,6 +352,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { for _, be := range cp.backendGroup.Backends { peerCount, inSync, backendLatestBlockNumber, _, _, _, lastUpdate, _ := cp.getBackendState(be) + if cp.IsBanned(be) { + continue + } if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { continue } @@ -339,6 +374,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { for _, be := range cp.backendGroup.Backends { peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, backendFinalizedBlockNumber, backendSafeBlockNumber, lastUpdate, _ := cp.getBackendState(be) + if cp.IsBanned(be) { + continue + } if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { continue } @@ -451,13 +489,17 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { } cp.tracker.SetLatestBlockNumber(proposedBlock) - cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock) cp.tracker.SetSafeBlockNumber(lowestSafeBlock) + cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock) + cp.consensusGroupMux.Lock() cp.consensusGroup = consensusBackends cp.consensusGroupMux.Unlock() RecordGroupConsensusLatestBlock(cp.backendGroup, proposedBlock) + RecordGroupConsensusSafeBlock(cp.backendGroup, lowestSafeBlock) + RecordGroupConsensusFinalizedBlock(cp.backendGroup, lowestFinalizedBlock) + RecordGroupConsensusCount(cp.backendGroup, len(consensusBackends)) RecordGroupConsensusFilteredCount(cp.backendGroup, len(filteredBackendsNames)) RecordGroupTotalCount(cp.backendGroup, len(cp.backendGroup.Backends)) @@ -481,13 +523,10 @@ func (cp *ConsensusPoller) Ban(be *Backend) { bs.bannedUntil = time.Now().Add(cp.banPeriod) } -// Unban remove any bans from the backends -func (cp *ConsensusPoller) Unban() { +// Reset remove any bans from the backends and reset their states +func (cp *ConsensusPoller) Reset() { for _, be := range cp.backendGroup.Backends { - bs := cp.backendState[be] - bs.backendStateMux.Lock() - bs.bannedUntil = time.Now().Add(-10 * time.Hour) - bs.backendStateMux.Unlock() + cp.backendState[be] = &backendState{} } } diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index d95903e..fe36a15 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -16,6 +16,12 @@ import ( "github.com/stretchr/testify/require" ) +type nodeContext struct { + mockBackend *MockBackend + backend *proxyd.Backend + handler *ms.MockedHandler +} + func TestConsensus(t *testing.T) { node1 := NewMockBackend(nil) defer node1.Close() @@ -55,526 +61,402 @@ func TestConsensus(t *testing.T) { require.NotNil(t, bg) require.NotNil(t, bg.Consensus) + // convenient mapping to access the nodes by name + nodes := map[string]nodeContext{ + "node1": { + mockBackend: node1, + backend: bg.Backends[0], + handler: &h1, + }, + "node2": { + mockBackend: node2, + backend: bg.Backends[1], + handler: &h2, + }, + } + + reset := func() { + for _, node := range nodes { + node.handler.ResetOverrides() + node.mockBackend.Reset() + } + bg.Consensus.Reset() + } + + // poll for updated consensus + update := func() { + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + } + + override := func(node string, method string, block string, response string) { + nodes[node].handler.AddOverride(&ms.MethodTemplate{ + Method: method, + Block: block, + Response: response, + }) + } + + overrideBlock := func(node string, blockRequest string, blockResponse string) { + override(node, + "eth_getBlockByNumber", + blockRequest, + buildResponse(map[string]string{ + "number": blockResponse, + "hash": "hash_" + blockResponse, + })) + } + + overrideBlockHash := func(node string, blockRequest string, number string, hash string) { + override(node, + "eth_getBlockByNumber", + blockRequest, + buildResponse(map[string]string{ + "number": number, + "hash": hash, + })) + } + + overridePeerCount := func(node string, count int) { + override(node, "net_peerCount", "", buildResponse(hexutil.Uint64(count).String())) + } + + overrideNotInSync := func(node string) { + override(node, "eth_syncing", "", buildResponse(map[string]string{ + "startingblock": "0x0", + "currentblock": "0x0", + "highestblock": "0x100", + })) + } + + useOnlyNode1 := func() { + overridePeerCount("node2", 0) + update() + + consensusGroup := bg.Consensus.GetConsensusGroup() + require.Equal(t, 1, len(consensusGroup)) + require.Contains(t, consensusGroup, nodes["node1"].backend) + node1.Reset() + } + t.Run("initial consensus", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() + reset() // unknown consensus at init require.Equal(t, "0x0", bg.Consensus.GetLatestBlockNumber().String()) // first poll - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + update() - // consensus at block 0x1 - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) - require.Equal(t, "0x555", bg.Consensus.GetFinalizedBlockNumber().String()) - require.Equal(t, "0x551", bg.Consensus.GetSafeBlockNumber().String()) + // as a default we use: + // - latest at 0x101 [257] + // - safe at 0xe1 [225] + // - finalized at 0xc1 [193] + + // consensus at block 0x101 + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) }) t.Run("prevent using a backend with low peer count", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() + reset() + overridePeerCount("node1", 0) + update() - h1.AddOverride(&ms.MethodTemplate{ - Method: "net_peerCount", - Block: "", - Response: buildPeerCountResponse(1), - }) - - be := backend(bg, "node1") - require.NotNil(t, be) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) consensusGroup := bg.Consensus.GetConsensusGroup() - - require.NotContains(t, consensusGroup, be) - require.False(t, bg.Consensus.IsBanned(be)) + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) require.Equal(t, 1, len(consensusGroup)) }) t.Run("prevent using a backend lagging behind", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() + reset() + // node2 is 51 blocks ahead of node1 (0x101 + 51 = 0x134) + overrideBlock("node2", "latest", "0x134") + update() - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x1", "hash1"), - }) - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "finalized", - Response: buildGetBlockResponse("0x1", "hash1"), - }) - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "safe", - Response: buildGetBlockResponse("0x1", "hash1"), - }) - - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x100", "hash0x100"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "0x100", - Response: buildGetBlockResponse("0x100", "hash0x100"), - }) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - // since we ignored node1, the consensus should be at 0x100 - require.Equal(t, "0x100", bg.Consensus.GetLatestBlockNumber().String()) - require.Equal(t, "0x555", bg.Consensus.GetFinalizedBlockNumber().String()) - require.Equal(t, "0x551", bg.Consensus.GetSafeBlockNumber().String()) + // since we ignored node1, the consensus should be at 0x133 + require.Equal(t, "0x134", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) consensusGroup := bg.Consensus.GetConsensusGroup() - - be := backend(bg, "node1") - require.NotNil(t, be) - require.NotContains(t, consensusGroup, be) - require.False(t, bg.Consensus.IsBanned(be)) + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) require.Equal(t, 1, len(consensusGroup)) }) - t.Run("prevent using a backend lagging behind - at limit", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() - - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x1", "hash1"), - }) - - // 0x1 + 50 = 0x33 - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x33", "hash0x100"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "0x100", - Response: buildGetBlockResponse("0x33", "hash0x100"), - }) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - // since we ignored node1, the consensus should be at 0x100 - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) - - consensusGroup := bg.Consensus.GetConsensusGroup() - - require.Equal(t, 2, len(consensusGroup)) - }) - t.Run("prevent using a backend lagging behind - one before limit", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() + reset() + // node2 is 50 blocks ahead of node1 (0x101 + 50 = 0x133) + overrideBlock("node2", "latest", "0x133") + update() - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x1", "hash1"), - }) - - // 0x1 + 49 = 0x32 - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x32", "hash0x100"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "0x100", - Response: buildGetBlockResponse("0x32", "hash0x100"), - }) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) - - consensusGroup := bg.Consensus.GetConsensusGroup() - - require.Equal(t, 2, len(consensusGroup)) + // both nodes are in consensus with the lowest block + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup())) }) t.Run("prevent using a backend not in sync", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() + reset() + // make node1 not in sync + overrideNotInSync("node1") + update() - // advance latest on node2 to 0x2 - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_syncing", - Block: "", - Response: buildResponse(map[string]string{ - "startingblock": "0x0", - "currentblock": "0x0", - "highestblock": "0x100", - }), - }) - - be := backend(bg, "node1") - require.NotNil(t, be) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) consensusGroup := bg.Consensus.GetConsensusGroup() - - require.NotContains(t, consensusGroup, be) - require.False(t, bg.Consensus.IsBanned(be)) + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) require.Equal(t, 1, len(consensusGroup)) }) t.Run("advance consensus", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() + reset() - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } + // as a default we use: + // - latest at 0x101 [257] + // - safe at 0xe1 [225] + // - finalized at 0xc1 [193] + + update() + + // all nodes start at block 0x101 + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) + + // advance latest on node2 to 0x102 + overrideBlock("node2", "latest", "0x102") + + update() + + // consensus should stick to 0x101, since node1 is still lagging there bg.Consensus.UpdateBackendGroupConsensus(ctx) + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) - // all nodes start at block 0x1 - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) + // advance latest on node1 to 0x102 + overrideBlock("node1", "latest", "0x102") - // advance latest on node2 to 0x2 - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), - }) + update() - // poll for group consensus - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - - // consensus should stick to 0x1, since node1 is still lagging there - bg.Consensus.UpdateBackendGroupConsensus(ctx) - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) - - // advance latest on node1 to 0x2 - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), - }) - - // poll for group consensus - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - // should stick to 0x2, since now all nodes are at 0x2 - require.Equal(t, "0x2", bg.Consensus.GetLatestBlockNumber().String()) + // all nodes now at 0x102 + require.Equal(t, "0x102", bg.Consensus.GetLatestBlockNumber().String()) }) t.Run("should use lowest safe and finalized", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() + reset() + overrideBlock("node2", "finalized", "0xc2") + overrideBlock("node2", "safe", "0xe2") + update() - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "finalized", - Response: buildGetBlockResponse("0x559", "hash559"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "safe", - Response: buildGetBlockResponse("0x558", "hash558"), - }) - - // poll for group consensus - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - require.Equal(t, "0x555", bg.Consensus.GetFinalizedBlockNumber().String()) - require.Equal(t, "0x551", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) }) t.Run("advance safe and finalized", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() + reset() + overrideBlock("node1", "finalized", "0xc2") + overrideBlock("node1", "safe", "0xe2") + overrideBlock("node2", "finalized", "0xc2") + overrideBlock("node2", "safe", "0xe2") + update() - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + require.Equal(t, "0xc2", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0xe2", bg.Consensus.GetSafeBlockNumber().String()) + }) - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "finalized", - Response: buildGetBlockResponse("0x556", "hash556"), - }) - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "safe", - Response: buildGetBlockResponse("0x552", "hash552"), - }) + t.Run("ban backend if tags are messed - safe < finalized", func(t *testing.T) { + reset() + overrideBlock("node1", "finalized", "0xb1") + overrideBlock("node1", "safe", "0xa1") + update() - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "finalized", - Response: buildGetBlockResponse("0x559", "hash559"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "safe", - Response: buildGetBlockResponse("0x558", "hash558"), - }) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) - // poll for group consensus - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } + consensusGroup := bg.Consensus.GetConsensusGroup() + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 1, len(consensusGroup)) + }) - bg.Consensus.UpdateBackendGroupConsensus(ctx) + t.Run("ban backend if tags are messed - latest < safe", func(t *testing.T) { + reset() + overrideBlock("node1", "safe", "0xb1") + overrideBlock("node1", "latest", "0xa1") + update() - require.Equal(t, "0x556", bg.Consensus.GetFinalizedBlockNumber().String()) - require.Equal(t, "0x552", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + + consensusGroup := bg.Consensus.GetConsensusGroup() + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 1, len(consensusGroup)) + }) + + t.Run("ban backend if tags are messed - safe dropped", func(t *testing.T) { + reset() + update() + overrideBlock("node1", "safe", "0xb1") + update() + + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + + consensusGroup := bg.Consensus.GetConsensusGroup() + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 1, len(consensusGroup)) + }) + + t.Run("ban backend if tags are messed - finalized dropped", func(t *testing.T) { + reset() + update() + overrideBlock("node1", "finalized", "0xa1") + update() + + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + + consensusGroup := bg.Consensus.GetConsensusGroup() + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 1, len(consensusGroup)) }) t.Run("broken consensus", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() - + reset() listenerCalled := false bg.Consensus.AddListener(func() { listenerCalled = true }) + update() - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + // all nodes start at block 0x101 + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) - // all nodes start at block 0x1 - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) + // advance latest on both nodes to 0x102 + overrideBlock("node1", "latest", "0x102") + overrideBlock("node2", "latest", "0x102") - // advance latest on both nodes to 0x2 - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), - }) + update() - // poll for group consensus - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - // at 0x2 - require.Equal(t, "0x2", bg.Consensus.GetLatestBlockNumber().String()) + // at 0x102 + require.Equal(t, "0x102", bg.Consensus.GetLatestBlockNumber().String()) // make node2 diverge on hash - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "0x2", - Response: buildGetBlockResponse("0x2", "wrong_hash"), - }) + overrideBlockHash("node2", "0x102", "0x102", "wrong_hash") - // poll for group consensus - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + update() - // should resolve to 0x1, since 0x2 is out of consensus at the moment - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) + // should resolve to 0x101, since 0x102 is out of consensus at the moment + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) + // everybody serving traffic + consensusGroup := bg.Consensus.GetConsensusGroup() + require.Equal(t, 2, len(consensusGroup)) + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend)) + + // onConsensusBroken listener was called require.True(t, listenerCalled) }) t.Run("broken consensus with depth 2", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - // all nodes start at block 0x1 - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) - - // advance latest on both nodes to 0x2 - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), + reset() + listenerCalled := false + bg.Consensus.AddListener(func() { + listenerCalled = true }) + update() - // poll for group consensus - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + // all nodes start at block 0x101 + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) - // at 0x2 - require.Equal(t, "0x2", bg.Consensus.GetLatestBlockNumber().String()) + // advance latest on both nodes to 0x102 + overrideBlock("node1", "latest", "0x102") + overrideBlock("node2", "latest", "0x102") + + update() + + // at 0x102 + require.Equal(t, "0x102", bg.Consensus.GetLatestBlockNumber().String()) // advance latest on both nodes to 0x3 - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x3", "hash3"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x3", "hash3"), - }) + overrideBlock("node1", "latest", "0x103") + overrideBlock("node2", "latest", "0x103") - // poll for group consensus - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + update() - // at 0x3 - require.Equal(t, "0x3", bg.Consensus.GetLatestBlockNumber().String()) + // at 0x103 + require.Equal(t, "0x103", bg.Consensus.GetLatestBlockNumber().String()) - // make node2 diverge on hash for blocks 0x2 and 0x3 - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "0x2", - Response: buildGetBlockResponse("0x2", "wrong_hash2"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "0x3", - Response: buildGetBlockResponse("0x3", "wrong_hash3"), - }) + // make node2 diverge on hash for blocks 0x102 and 0x103 + overrideBlockHash("node2", "0x102", "0x102", "wrong_hash_0x102") + overrideBlockHash("node2", "0x103", "0x103", "wrong_hash_0x103") - // poll for group consensus - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + update() - // should resolve to 0x1 - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) + // should resolve to 0x101 + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) + + // everybody serving traffic + consensusGroup := bg.Consensus.GetConsensusGroup() + require.Equal(t, 2, len(consensusGroup)) + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend)) + + // onConsensusBroken listener was called + require.True(t, listenerCalled) }) t.Run("fork in advanced block", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() + reset() + listenerCalled := false + bg.Consensus.AddListener(func() { + listenerCalled = true + }) + update() - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + // all nodes start at block 0x101 + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) - // all nodes start at block 0x1 - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) + // make nodes 1 and 2 advance in forks, i.e. they have same block number with different hashes + overrideBlockHash("node1", "0x102", "0x102", "node1_0x102") + overrideBlockHash("node2", "0x102", "0x102", "node2_0x102") + overrideBlockHash("node1", "0x103", "0x103", "node1_0x103") + overrideBlockHash("node2", "0x103", "0x103", "node2_0x103") + overrideBlockHash("node1", "latest", "0x103", "node1_0x103") + overrideBlockHash("node2", "latest", "0x103", "node2_0x103") - // make nodes 1 and 2 advance in forks - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "0x2", - Response: buildGetBlockResponse("0x2", "node1_0x2"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "0x2", - Response: buildGetBlockResponse("0x2", "node2_0x2"), - }) - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "0x3", - Response: buildGetBlockResponse("0x3", "node1_0x3"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "0x3", - Response: buildGetBlockResponse("0x3", "node2_0x3"), - }) - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x3", "node1_0x3"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x3", "node2_0x3"), - }) + update() - // poll for group consensus - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + // should resolve to 0x101, the highest common ancestor + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) - // should resolve to 0x1, the highest common ancestor - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) + // everybody serving traffic + consensusGroup := bg.Consensus.GetConsensusGroup() + require.Equal(t, 2, len(consensusGroup)) + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend)) + + // onConsensusBroken listener should not be called + require.False(t, listenerCalled) }) t.Run("load balancing should hit both backends", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + reset() + update() require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup())) + // reset request counts node1.Reset() node2.Reset() @@ -591,7 +473,7 @@ func TestConsensus(t *testing.T) { numberReqs := len(consensusGroup) * 100 for numberReqs > 0 { - _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x1", false}) + _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x101", false}) require.NoError(t, err) require.Equal(t, 200, statusCode) numberReqs-- @@ -603,24 +485,10 @@ func TestConsensus(t *testing.T) { }) t.Run("load balancing should not hit if node is not healthy", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() - - // node1 should not be serving any traffic - h1.AddOverride(&ms.MethodTemplate{ - Method: "net_peerCount", - Block: "", - Response: buildPeerCountResponse(1), - }) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) + reset() + useOnlyNode1() + // reset request counts node1.Reset() node2.Reset() @@ -629,60 +497,24 @@ func TestConsensus(t *testing.T) { numberReqs := 10 for numberReqs > 0 { - _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x1", false}) + _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x101", false}) require.NoError(t, err) require.Equal(t, 200, statusCode) numberReqs-- } msg := fmt.Sprintf("n1 %d, n2 %d", len(node1.Requests()), len(node2.Requests())) - require.Equal(t, len(node1.Requests()), 0, msg) - require.Equal(t, len(node2.Requests()), 10, msg) + require.Equal(t, len(node1.Requests()), 10, msg) + require.Equal(t, len(node2.Requests()), 0, msg) }) t.Run("rewrite response of eth_blockNumber", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - node1.Reset() - node2.Reset() - bg.Consensus.Unban() - - // establish the consensus - - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), - }) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + reset() + update() totalRequests := len(node1.Requests()) + len(node2.Requests()) - require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup())) - // pretend backends advanced in consensus, but we are still serving the latest value of the consensus - // until it gets updated again - - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x3", "hash3"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x3", "hash3"), - }) - resRaw, statusCode, err := client.SendRPC("eth_blockNumber", nil) require.NoError(t, err) require.Equal(t, 200, statusCode) @@ -690,37 +522,15 @@ func TestConsensus(t *testing.T) { var jsonMap map[string]interface{} err = json.Unmarshal(resRaw, &jsonMap) require.NoError(t, err) - require.Equal(t, "0x2", jsonMap["result"]) + require.Equal(t, "0x101", jsonMap["result"]) // no extra request hit the backends require.Equal(t, totalRequests, len(node1.Requests())+len(node2.Requests())) }) t.Run("rewrite request of eth_getBlockByNumber for latest", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() - - // establish the consensus and ban node2 for now - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "net_peerCount", - Block: "", - Response: buildPeerCountResponse(1), - }) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) - - node1.Reset() + reset() + useOnlyNode1() _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"latest"}) require.NoError(t, err) @@ -729,39 +539,12 @@ func TestConsensus(t *testing.T) { var jsonMap map[string]interface{} err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap) require.NoError(t, err) - require.Equal(t, "0x2", jsonMap["params"].([]interface{})[0]) + require.Equal(t, "0x101", jsonMap["params"].([]interface{})[0]) }) t.Run("rewrite request of eth_getBlockByNumber for finalized", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() - - // establish the consensus and ban node2 for now - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x20", "hash20"), - }) - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "finalized", - Response: buildGetBlockResponse("0x5", "hash5"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "net_peerCount", - Block: "", - Response: buildPeerCountResponse(1), - }) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) - - node1.Reset() + reset() + useOnlyNode1() _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"finalized"}) require.NoError(t, err) @@ -770,39 +553,12 @@ func TestConsensus(t *testing.T) { var jsonMap map[string]interface{} err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap) require.NoError(t, err) - require.Equal(t, "0x5", jsonMap["params"].([]interface{})[0]) + require.Equal(t, "0xc1", jsonMap["params"].([]interface{})[0]) }) t.Run("rewrite request of eth_getBlockByNumber for safe", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() - - // establish the consensus and ban node2 for now - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x20", "hash20"), - }) - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "safe", - Response: buildGetBlockResponse("0x1", "hash1"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "net_peerCount", - Block: "", - Response: buildPeerCountResponse(1), - }) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) - - node1.Reset() + reset() + useOnlyNode1() _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"safe"}) require.NoError(t, err) @@ -811,36 +567,14 @@ func TestConsensus(t *testing.T) { var jsonMap map[string]interface{} err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap) require.NoError(t, err) - require.Equal(t, "0x1", jsonMap["params"].([]interface{})[0]) + require.Equal(t, "0xe1", jsonMap["params"].([]interface{})[0]) }) t.Run("rewrite request of eth_getBlockByNumber - out of range", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() + reset() + useOnlyNode1() - // establish the consensus and ban node2 for now - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "net_peerCount", - Block: "", - Response: buildPeerCountResponse(1), - }) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) - - node1.Reset() - - resRaw, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x10"}) + resRaw, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x300"}) require.NoError(t, err) require.Equal(t, 400, statusCode) @@ -852,35 +586,13 @@ func TestConsensus(t *testing.T) { }) t.Run("batched rewrite", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() - - // establish the consensus and ban node2 for now - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "net_peerCount", - Block: "", - Response: buildPeerCountResponse(1), - }) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) - - node1.Reset() + reset() + useOnlyNode1() resRaw, statusCode, err := client.SendBatchRPC( NewRPCReq("1", "eth_getBlockByNumber", []interface{}{"latest"}), - NewRPCReq("2", "eth_getBlockByNumber", []interface{}{"0x10"}), - NewRPCReq("3", "eth_getBlockByNumber", []interface{}{"0x1"})) + NewRPCReq("2", "eth_getBlockByNumber", []interface{}{"0x102"}), + NewRPCReq("3", "eth_getBlockByNumber", []interface{}{"0xe1"})) require.NoError(t, err) require.Equal(t, 200, statusCode) @@ -889,34 +601,15 @@ func TestConsensus(t *testing.T) { require.NoError(t, err) require.Equal(t, 3, len(jsonMap)) - // rewrite latest to 0x2 - require.Equal(t, "0x2", jsonMap[0]["result"].(map[string]interface{})["number"]) + // rewrite latest to 0x101 + require.Equal(t, "0x101", jsonMap[0]["result"].(map[string]interface{})["number"]) - // out of bounds for block 0x10 + // out of bounds for block 0x102 require.Equal(t, -32019, int(jsonMap[1]["error"].(map[string]interface{})["code"].(float64))) require.Equal(t, "block is out of range", jsonMap[1]["error"].(map[string]interface{})["message"]) - // dont rewrite for 0x1 - require.Equal(t, "0x1", jsonMap[2]["result"].(map[string]interface{})["number"]) - }) -} - -func backend(bg *proxyd.BackendGroup, name string) *proxyd.Backend { - for _, be := range bg.Backends { - if be.Name == name { - return be - } - } - return nil -} - -func buildPeerCountResponse(count uint64) string { - return buildResponse(hexutil.Uint64(count).String()) -} -func buildGetBlockResponse(number string, hash string) string { - return buildResponse(map[string]string{ - "number": number, - "hash": hash, + // dont rewrite for 0xe1 + require.Equal(t, "0xe1", jsonMap[2]["result"].(map[string]interface{})["number"]) }) } diff --git a/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml b/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml index ef5b692..12cc4cd 100644 --- a/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml +++ b/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml @@ -26,63 +26,85 @@ "jsonrpc": "2.0", "id": 67, "result": { - "hash": "hash1", - "number": "0x1" + "hash": "hash_0x101", + "number": "0x101" } } - method: eth_getBlockByNumber - block: 0x1 + block: 0x101 response: > { "jsonrpc": "2.0", "id": 67, "result": { - "hash": "hash1", - "number": "0x1" + "hash": "hash_0x101", + "number": "0x101" } } - method: eth_getBlockByNumber - block: 0x2 + block: 0x102 response: > { "jsonrpc": "2.0", "id": 67, "result": { - "hash": "hash2", - "number": "0x2" + "hash": "hash_0x102", + "number": "0x102" } } - method: eth_getBlockByNumber - block: 0x3 + block: 0x103 response: > { "jsonrpc": "2.0", "id": 67, "result": { - "hash": "hash3", - "number": "0x3" + "hash": "hash_0x103", + "number": "0x103" } } - method: eth_getBlockByNumber - block: finalized + block: 0x132 response: > { "jsonrpc": "2.0", "id": 67, "result": { - "hash": "hash_finalized", - "number": "0x555" + "hash": "hash_0x132", + "number": "0x132" } } - method: eth_getBlockByNumber - block: 0x555 + block: 0x133 response: > { "jsonrpc": "2.0", "id": 67, "result": { - "hash": "hash_finalized", - "number": "0x555" + "hash": "hash_0x133", + "number": "0x133" + } + } +- method: eth_getBlockByNumber + block: 0x134 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x134", + "number": "0x134" + } + } +- method: eth_getBlockByNumber + block: 0x200 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x200", + "number": "0x200" } } - method: eth_getBlockByNumber @@ -92,40 +114,40 @@ "jsonrpc": "2.0", "id": 67, "result": { - "hash": "hash_safe", - "number": "0x551" + "hash": "hash_0xe1", + "number": "0xe1" } } - method: eth_getBlockByNumber - block: 0x555 + block: 0xe1 response: > { "jsonrpc": "2.0", "id": 67, "result": { - "hash": "hash_safe", - "number": "0x551" + "hash": "hash_0xe1", + "number": "0xe1" } } - method: eth_getBlockByNumber - block: 0x5 + block: finalized response: > { "jsonrpc": "2.0", "id": 67, "result": { - "hash": "hash5", - "number": "0x5" + "hash": "hash_0xc1", + "number": "0xc1" } } - method: eth_getBlockByNumber - block: 0x20 + block: 0xc1 response: > { "jsonrpc": "2.0", "id": 67, "result": { - "hash": "hash20", - "number": "0x20" + "hash": "hash_0xc1", + "number": "0xc1" } } diff --git a/proxyd/proxyd/metrics.go b/proxyd/proxyd/metrics.go index 884e783..8c52ed4 100644 --- a/proxyd/proxyd/metrics.go +++ b/proxyd/proxyd/metrics.go @@ -246,6 +246,22 @@ var ( "backend_group_name", }) + consensusSafeBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: MetricsNamespace, + Name: "group_consensus_safe_block", + Help: "Consensus safe block", + }, []string{ + "backend_group_name", + }) + + consensusFinalizedBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: MetricsNamespace, + Name: "group_consensus_finalized_block", + Help: "Consensus finalized block", + }, []string{ + "backend_group_name", + }) + backendLatestBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: MetricsNamespace, Name: "backend_latest_block", @@ -254,6 +270,30 @@ var ( "backend_name", }) + backendSafeBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: MetricsNamespace, + Name: "backend_safe_block", + Help: "Current safe block observed per backend", + }, []string{ + "backend_name", + }) + + backendFinalizedBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: MetricsNamespace, + Name: "backend_finalized_block", + Help: "Current finalized block observed per backend", + }, []string{ + "backend_name", + }) + + backendUnexpectedBlockTagsBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: MetricsNamespace, + Name: "backend_unexpected_block_tags", + Help: "Bool gauge for unexpected block tags", + }, []string{ + "backend_name", + }) + consensusGroupCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: MetricsNamespace, Name: "group_consensus_count", @@ -318,18 +358,10 @@ var ( "backend_name", }) - networkErrorCountBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ + networkErrorRateBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: MetricsNamespace, - Name: "backend_net_error_count", - Help: "Network error count per backend", - }, []string{ - "backend_name", - }) - - requestCountBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: MetricsNamespace, - Name: "backend_request_count", - Help: "Request count per backend", + Name: "backend_error_rate", + Help: "Request error rate per backend", }, []string{ "backend_name", }) @@ -402,6 +434,14 @@ func RecordGroupConsensusLatestBlock(group *BackendGroup, blockNumber hexutil.Ui consensusLatestBlock.WithLabelValues(group.Name).Set(float64(blockNumber)) } +func RecordGroupConsensusSafeBlock(group *BackendGroup, blockNumber hexutil.Uint64) { + consensusSafeBlock.WithLabelValues(group.Name).Set(float64(blockNumber)) +} + +func RecordGroupConsensusFinalizedBlock(group *BackendGroup, blockNumber hexutil.Uint64) { + consensusFinalizedBlock.WithLabelValues(group.Name).Set(float64(blockNumber)) +} + func RecordGroupConsensusCount(group *BackendGroup, count int) { consensusGroupCount.WithLabelValues(group.Name).Set(float64(count)) } @@ -418,12 +458,20 @@ func RecordBackendLatestBlock(b *Backend, blockNumber hexutil.Uint64) { backendLatestBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber)) } +func RecordBackendSafeBlock(b *Backend, blockNumber hexutil.Uint64) { + backendSafeBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber)) +} + +func RecordBackendFinalizedBlock(b *Backend, blockNumber hexutil.Uint64) { + backendFinalizedBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber)) +} + +func RecordBackendUnexpectedBlockTags(b *Backend, unexpected bool) { + backendFinalizedBlockBackend.WithLabelValues(b.Name).Set(boolToFloat64(unexpected)) +} + func RecordConsensusBackendBanned(b *Backend, banned bool) { - v := float64(0) - if banned { - v = float64(1) - } - consensusBannedBackends.WithLabelValues(b.Name).Set(v) + consensusBannedBackends.WithLabelValues(b.Name).Set(boolToFloat64(banned)) } func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) { @@ -431,11 +479,7 @@ func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) { } func RecordConsensusBackendInSync(b *Backend, inSync bool) { - v := float64(0) - if inSync { - v = float64(1) - } - consensusInSyncBackend.WithLabelValues(b.Name).Set(v) + consensusInSyncBackend.WithLabelValues(b.Name).Set(boolToFloat64(inSync)) } func RecordConsensusBackendUpdateDelay(b *Backend, delay time.Duration) { @@ -446,10 +490,13 @@ func RecordBackendNetworkLatencyAverageSlidingWindow(b *Backend, avgLatency time avgLatencyBackend.WithLabelValues(b.Name).Set(float64(avgLatency.Milliseconds())) } -func RecordBackendNetworkRequestCountSlidingWindow(b *Backend, count uint) { - requestCountBackend.WithLabelValues(b.Name).Set(float64(count)) +func RecordBackendNetworkErrorRateSlidingWindow(b *Backend, rate float64) { + networkErrorRateBackend.WithLabelValues(b.Name).Set(rate) } -func RecordBackendNetworkErrorCountSlidingWindow(b *Backend, count uint) { - networkErrorCountBackend.WithLabelValues(b.Name).Set(float64(count)) +func boolToFloat64(b bool) float64 { + if b { + return 1 + } + return 0 } diff --git a/proxyd/proxyd/rewriter.go b/proxyd/proxyd/rewriter.go index ab098d8..08d5638 100644 --- a/proxyd/proxyd/rewriter.go +++ b/proxyd/proxyd/rewriter.go @@ -10,8 +10,8 @@ import ( type RewriteContext struct { latest hexutil.Uint64 - finalized hexutil.Uint64 safe hexutil.Uint64 + finalized hexutil.Uint64 } type RewriteResult uint8 From 83c40076b67540ddac8685f770f22515c1ef3440 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Fri, 26 May 2023 14:25:56 -0700 Subject: [PATCH 03/13] typo --- proxyd/proxyd/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxyd/proxyd/metrics.go b/proxyd/proxyd/metrics.go index 8c52ed4..3d8bfac 100644 --- a/proxyd/proxyd/metrics.go +++ b/proxyd/proxyd/metrics.go @@ -467,7 +467,7 @@ func RecordBackendFinalizedBlock(b *Backend, blockNumber hexutil.Uint64) { } func RecordBackendUnexpectedBlockTags(b *Backend, unexpected bool) { - backendFinalizedBlockBackend.WithLabelValues(b.Name).Set(boolToFloat64(unexpected)) + backendUnexpectedBlockTagsBackend.WithLabelValues(b.Name).Set(boolToFloat64(unexpected)) } func RecordConsensusBackendBanned(b *Backend, banned bool) { From cdf9e9192a2f4369013d7eb76effc5917fd83ef0 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Fri, 26 May 2023 14:49:27 -0700 Subject: [PATCH 04/13] use go 1.20 --- proxyd/proxyd/Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/proxyd/proxyd/Dockerfile b/proxyd/proxyd/Dockerfile index f6ba052..30b216f 100644 --- a/proxyd/proxyd/Dockerfile +++ b/proxyd/proxyd/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.18.0-alpine3.15 as builder +FROM golang:1.20.4-alpine3.18 as builder ARG GITCOMMIT=docker ARG GITDATE=docker @@ -12,7 +12,7 @@ WORKDIR /app RUN make proxyd -FROM alpine:3.15 +FROM alpine:3.18 COPY ./proxyd/entrypoint.sh /bin/entrypoint.sh From d5594a62a0a48faa797a13e2474fe317d4659222 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Fri, 26 May 2023 15:54:04 -0700 Subject: [PATCH 05/13] moar tests for safe and finalized bans --- proxyd/proxyd/consensus_poller.go | 117 ++++++++++-------- .../integration_tests/consensus_test.go | 76 ++++++++++++ .../testdata/consensus_responses.yml | 22 ++++ 3 files changed, 161 insertions(+), 54 deletions(-) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index 1856566..1ad4b22 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -406,11 +406,6 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { } } - // no block to propose (i.e. initializing consensus) - if lowestLatestBlock == 0 { - return - } - proposedBlock := lowestLatestBlock proposedBlockHash := lowestLatestBlockHash hasConsensus := false @@ -424,59 +419,63 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock) } - broken := false - for !hasConsensus { - allAgreed := true - consensusBackends = consensusBackends[:0] - filteredBackendsNames = filteredBackendsNames[:0] - for _, be := range cp.backendGroup.Backends { - /* - a serving node needs to be: - - healthy (network) - - updated recently - - not banned - - with minimum peer count - - not lagging latest block - - in sync - */ + // if there is no block to propose, the consensus is automatically broken + broken := proposedBlock == 0 && currentConsensusBlockNumber > 0 - peerCount, inSync, latestBlockNumber, _, _, _, lastUpdate, bannedUntil := cp.getBackendState(be) - notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) - isBanned := time.Now().Before(bannedUntil) - notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount - lagging := latestBlockNumber < proposedBlock - if !be.IsHealthy() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync { - filteredBackendsNames = append(filteredBackendsNames, be.Name) - continue - } + if proposedBlock > 0 { + for !hasConsensus { + allAgreed := true + consensusBackends = consensusBackends[:0] + filteredBackendsNames = filteredBackendsNames[:0] + for _, be := range cp.backendGroup.Backends { + /* + a serving node needs to be: + - healthy (network) + - updated recently + - not banned + - with minimum peer count + - not lagging latest block + - in sync + */ - actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String()) - if err != nil { - log.Warn("error updating backend", "name", be.Name, "err", err) - continue - } - if proposedBlockHash == "" { - proposedBlockHash = actualBlockHash - } - blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash) - if blocksDontMatch { - if currentConsensusBlockNumber >= actualBlockNumber { - log.Warn("backend broke consensus", "name", be.Name, "blockNum", actualBlockNumber, "proposedBlockNum", proposedBlock, "blockHash", actualBlockHash, "proposedBlockHash", proposedBlockHash) - broken = true + peerCount, inSync, latestBlockNumber, _, _, _, lastUpdate, bannedUntil := cp.getBackendState(be) + notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) + isBanned := time.Now().Before(bannedUntil) + notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount + lagging := latestBlockNumber < proposedBlock + if !be.IsHealthy() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync { + filteredBackendsNames = append(filteredBackendsNames, be.Name) + continue } - allAgreed = false - break + + actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String()) + if err != nil { + log.Warn("error updating backend", "name", be.Name, "err", err) + continue + } + if proposedBlockHash == "" { + proposedBlockHash = actualBlockHash + } + blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash) + if blocksDontMatch { + if currentConsensusBlockNumber >= actualBlockNumber { + log.Warn("backend broke consensus", "name", be.Name, "blockNum", actualBlockNumber, "proposedBlockNum", proposedBlock, "blockHash", actualBlockHash, "proposedBlockHash", proposedBlockHash) + broken = true + } + allAgreed = false + break + } + consensusBackends = append(consensusBackends, be) + consensusBackendsNames = append(consensusBackendsNames, be.Name) + } + if allAgreed { + hasConsensus = true + } else { + // walk one block behind and try again + proposedBlock -= 1 + proposedBlockHash = "" + log.Debug("no consensus, now trying", "block:", proposedBlock) } - consensusBackends = append(consensusBackends, be) - consensusBackendsNames = append(consensusBackendsNames, be.Name) - } - if allAgreed { - hasConsensus = true - } else { - // walk one block behind and try again - proposedBlock -= 1 - proposedBlockHash = "" - log.Debug("no consensus, now trying", "block:", proposedBlock) } } @@ -521,6 +520,16 @@ func (cp *ConsensusPoller) Ban(be *Backend) { defer bs.backendStateMux.Unlock() bs.backendStateMux.Lock() bs.bannedUntil = time.Now().Add(cp.banPeriod) + bs.safeBlockNumber = 0 + bs.finalizedBlockNumber = 0 +} + +// Unban remove any bans from the backends +func (cp *ConsensusPoller) Unban(be *Backend) { + bs := cp.backendState[be] + defer bs.backendStateMux.Unlock() + bs.backendStateMux.Lock() + bs.bannedUntil = time.Now().Add(-10 * time.Hour) } // Reset remove any bans from the backends and reset their states diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index fe36a15..320139a 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -330,6 +330,82 @@ func TestConsensus(t *testing.T) { require.Equal(t, 1, len(consensusGroup)) }) + t.Run("recover after safe and finalized dropped", func(t *testing.T) { + reset() + useOnlyNode1() + overrideBlock("node1", "latest", "0xd1") + overrideBlock("node1", "safe", "0xb1") + overrideBlock("node1", "finalized", "0x91") + update() + + consensusGroup := bg.Consensus.GetConsensusGroup() + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 0, len(consensusGroup)) + + // unban and see if it recovers + bg.Consensus.Unban(nodes["node1"].backend) + update() + + consensusGroup = bg.Consensus.GetConsensusGroup() + require.Contains(t, consensusGroup, nodes["node1"].backend) + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 1, len(consensusGroup)) + + require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String()) + }) + + t.Run("latest dropped below safe, then recovered", func(t *testing.T) { + reset() + useOnlyNode1() + overrideBlock("node1", "latest", "0xd1") + update() + + consensusGroup := bg.Consensus.GetConsensusGroup() + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 0, len(consensusGroup)) + + // unban and see if it recovers + bg.Consensus.Unban(nodes["node1"].backend) + overrideBlock("node1", "safe", "0xb1") + overrideBlock("node1", "finalized", "0x91") + update() + + consensusGroup = bg.Consensus.GetConsensusGroup() + require.Contains(t, consensusGroup, nodes["node1"].backend) + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 1, len(consensusGroup)) + + require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String()) + }) + + t.Run("latest dropped below safe, and stayed inconsistent after ban", func(t *testing.T) { + reset() + useOnlyNode1() + overrideBlock("node1", "latest", "0xd1") + update() + + consensusGroup := bg.Consensus.GetConsensusGroup() + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 0, len(consensusGroup)) + + // unban and see if it recovers - it should not since the blocks stays the same + bg.Consensus.Unban(nodes["node1"].backend) + update() + + // should be banned again + consensusGroup = bg.Consensus.GetConsensusGroup() + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 0, len(consensusGroup)) + }) + t.Run("broken consensus", func(t *testing.T) { reset() listenerCalled := false diff --git a/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml b/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml index 12cc4cd..ea8ef72 100644 --- a/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml +++ b/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml @@ -107,6 +107,17 @@ "number": "0x200" } } +- method: eth_getBlockByNumber + block: 0x91 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x91", + "number": "0x91" + } + } - method: eth_getBlockByNumber block: safe response: > @@ -151,3 +162,14 @@ "number": "0xc1" } } +- method: eth_getBlockByNumber + block: 0xd1 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0xd1", + "number": "0xd1" + } + } From 32be28788759d820bc6f8c510682378bc560411c Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Fri, 26 May 2023 15:57:22 -0700 Subject: [PATCH 06/13] typo --- proxyd/proxyd/consensus_poller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index 1ad4b22..d019951 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -326,7 +326,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { // checkExpectedBlockTags for unexpected conditions on block tags // - finalized block number should never decrease // - safe block number should never decrease -// - finalized block should be < safe block < latest block +// - finalized block should be <= safe block <= latest block func (cp *ConsensusPoller) checkExpectedBlockTags(currentFinalized hexutil.Uint64, oldFinalized hexutil.Uint64, currentSafe hexutil.Uint64, oldSafe hexutil.Uint64, currentLatest hexutil.Uint64) bool { From 3cdac70a0de21957ed537c3d03ccb63d99383d29 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Fri, 26 May 2023 19:07:35 -0700 Subject: [PATCH 07/13] reset listeners, doc typos --- proxyd/proxyd/consensus_poller.go | 12 ++++++++---- proxyd/proxyd/integration_tests/consensus_test.go | 1 + 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index d019951..5aff27e 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -177,6 +177,10 @@ func (cp *ConsensusPoller) AddListener(listener OnConsensusBroken) { cp.listeners = append(cp.listeners, listener) } +func (cp *ConsensusPoller) ClearListeners() { + cp.listeners = []OnConsensusBroken{} +} + func WithBanPeriod(banPeriod time.Duration) ConsensusOpt { return func(cp *ConsensusPoller) { cp.banPeriod = banPeriod @@ -524,7 +528,7 @@ func (cp *ConsensusPoller) Ban(be *Backend) { bs.finalizedBlockNumber = 0 } -// Unban remove any bans from the backends +// Unban removes any bans from the backends func (cp *ConsensusPoller) Unban(be *Backend) { bs := cp.backendState[be] defer bs.backendStateMux.Unlock() @@ -532,14 +536,14 @@ func (cp *ConsensusPoller) Unban(be *Backend) { bs.bannedUntil = time.Now().Add(-10 * time.Hour) } -// Reset remove any bans from the backends and reset their states +// Reset reset all backend states func (cp *ConsensusPoller) Reset() { for _, be := range cp.backendGroup.Backends { cp.backendState[be] = &backendState{} } } -// fetchBlock Convenient wrapper to make a request to get a block directly from the backend +// fetchBlock is a convenient wrapper to make a request to get a block directly from the backend func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) { var rpcRes RPCRes err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_getBlockByNumber", block, false) @@ -557,7 +561,7 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st return } -// getPeerCount Convenient wrapper to retrieve the current peer count from the backend +// getPeerCount is a convenient wrapper to retrieve the current peer count from the backend func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count uint64, err error) { var rpcRes RPCRes err = be.ForwardRPC(ctx, &rpcRes, "67", "net_peerCount") diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index 320139a..7d98732 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -80,6 +80,7 @@ func TestConsensus(t *testing.T) { node.handler.ResetOverrides() node.mockBackend.Reset() } + bg.Consensus.ClearListeners() bg.Consensus.Reset() } From 1b40d776a616f12fc5867b558308585cddc9edf0 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Fri, 26 May 2023 19:15:09 -0700 Subject: [PATCH 08/13] sane values for consensus_max_block_lag --- proxyd/proxyd/consensus_poller.go | 2 +- proxyd/proxyd/example.config.toml | 4 ++-- proxyd/proxyd/integration_tests/consensus_test.go | 12 ++++++------ .../proxyd/integration_tests/testdata/consensus.toml | 2 +- .../testdata/consensus_responses.yml | 11 +++++++++++ 5 files changed, 21 insertions(+), 10 deletions(-) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index 5aff27e..57bdbb3 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -220,7 +220,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller banPeriod: 5 * time.Minute, maxUpdateThreshold: 30 * time.Second, - maxBlockLag: 50, + maxBlockLag: 8, // quarter of an epoch, 8*12 seconds = 96 seconds ~ 1.6 minutes minPeerCount: 3, } diff --git a/proxyd/proxyd/example.config.toml b/proxyd/proxyd/example.config.toml index 413cd61..19c6c26 100644 --- a/proxyd/proxyd/example.config.toml +++ b/proxyd/proxyd/example.config.toml @@ -93,8 +93,8 @@ backends = ["infura"] # consensus_ban_period = "1m" # Maximum delay for update the backend, default 30s # consensus_max_update_threshold = "20s" -# Maximum block lag, default 50 -# consensus_max_block_lag = 10 +# Maximum block lag, default 8 +# consensus_max_block_lag = 16 # Minimum peer count, default 3 # consensus_min_peer_count = 4 diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index 7d98732..24398b0 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -175,12 +175,12 @@ func TestConsensus(t *testing.T) { t.Run("prevent using a backend lagging behind", func(t *testing.T) { reset() - // node2 is 51 blocks ahead of node1 (0x101 + 51 = 0x134) - overrideBlock("node2", "latest", "0x134") + // node2 is 8+1 blocks ahead of node1 (0x101 + 8+1 = 0x10a) + overrideBlock("node2", "latest", "0x10a") update() - // since we ignored node1, the consensus should be at 0x133 - require.Equal(t, "0x134", bg.Consensus.GetLatestBlockNumber().String()) + // since we ignored node1, the consensus should be at 0x10a + require.Equal(t, "0x10a", bg.Consensus.GetLatestBlockNumber().String()) require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) @@ -192,8 +192,8 @@ func TestConsensus(t *testing.T) { t.Run("prevent using a backend lagging behind - one before limit", func(t *testing.T) { reset() - // node2 is 50 blocks ahead of node1 (0x101 + 50 = 0x133) - overrideBlock("node2", "latest", "0x133") + // node2 is 8 blocks ahead of node1 (0x101 + 8 = 0x109) + overrideBlock("node2", "latest", "0x109") update() // both nodes are in consensus with the lowest block diff --git a/proxyd/proxyd/integration_tests/testdata/consensus.toml b/proxyd/proxyd/integration_tests/testdata/consensus.toml index 3f92a3d..03b11d0 100644 --- a/proxyd/proxyd/integration_tests/testdata/consensus.toml +++ b/proxyd/proxyd/integration_tests/testdata/consensus.toml @@ -18,7 +18,7 @@ consensus_aware = true consensus_handler = "noop" # allow more control over the consensus poller for tests consensus_ban_period = "1m" consensus_max_update_threshold = "2m" -consensus_max_block_lag = 50 +consensus_max_block_lag = 8 consensus_min_peer_count = 4 [rpc_method_mappings] diff --git a/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml b/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml index ea8ef72..ee413c5 100644 --- a/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml +++ b/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml @@ -63,6 +63,17 @@ "number": "0x103" } } +- method: eth_getBlockByNumber + block: 0x10a + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x10a", + "number": "0x10a" + } + } - method: eth_getBlockByNumber block: 0x132 response: > From 0c27c83aba1ca6f8d0091381ee56079bbbb5ef40 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Fri, 26 May 2023 19:29:24 -0700 Subject: [PATCH 09/13] test refactor --- .../integration_tests/consensus_test.go | 92 +++++++++++-------- 1 file changed, 55 insertions(+), 37 deletions(-) diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index 24398b0..73f10f3 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -17,16 +17,15 @@ import ( ) type nodeContext struct { - mockBackend *MockBackend - backend *proxyd.Backend - handler *ms.MockedHandler + backend *proxyd.Backend // this is the actual backend impl in proxyd + mockBackend *MockBackend // this is the fake backend that we can use to mock responses + handler *ms.MockedHandler // this is where we control the state of mocked responses } -func TestConsensus(t *testing.T) { +func setup(t *testing.T) (map[string]nodeContext, *proxyd.BackendGroup, *ProxydHTTPClient, func()) { + // setup mock servers node1 := NewMockBackend(nil) - defer node1.Close() node2 := NewMockBackend(nil) - defer node2.Close() dir, err := os.Getwd() require.NoError(t, err) @@ -50,16 +49,19 @@ func TestConsensus(t *testing.T) { node1.SetHandler(http.HandlerFunc(h1.Handler)) node2.SetHandler(http.HandlerFunc(h2.Handler)) + // setup proxyd config := ReadConfig("consensus") - ctx := context.Background() svr, shutdown, err := proxyd.Start(config) require.NoError(t, err) - client := NewProxydClient("http://127.0.0.1:8545") - defer shutdown() + // expose the proxyd client + client := NewProxydClient("http://127.0.0.1:8545") + + // expose the backend group bg := svr.BackendGroups["node"] require.NotNil(t, bg) require.NotNil(t, bg.Consensus) + require.Equal(t, 2, len(bg.Backends)) // should match config // convenient mapping to access the nodes by name nodes := map[string]nodeContext{ @@ -75,14 +77,16 @@ func TestConsensus(t *testing.T) { }, } - reset := func() { - for _, node := range nodes { - node.handler.ResetOverrides() - node.mockBackend.Reset() - } - bg.Consensus.ClearListeners() - bg.Consensus.Reset() - } + return nodes, bg, client, shutdown +} + +func TestConsensus(t *testing.T) { + nodes, bg, client, shutdown := setup(t) + defer nodes["node1"].mockBackend.Close() + defer nodes["node2"].mockBackend.Close() + defer shutdown() + + ctx := context.Background() // poll for updated consensus update := func() { @@ -92,6 +96,16 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) } + // convenient methods to manipulate state and mock responses + reset := func() { + for _, node := range nodes { + node.handler.ResetOverrides() + node.mockBackend.Reset() + } + bg.Consensus.ClearListeners() + bg.Consensus.Reset() + } + override := func(node string, method string, block string, response string) { nodes[node].handler.AddOverride(&ms.MethodTemplate{ Method: method, @@ -132,6 +146,7 @@ func TestConsensus(t *testing.T) { })) } + // force ban node2 and make sure node1 is the only one in consensus useOnlyNode1 := func() { overridePeerCount("node2", 0) update() @@ -139,7 +154,7 @@ func TestConsensus(t *testing.T) { consensusGroup := bg.Consensus.GetConsensusGroup() require.Equal(t, 1, len(consensusGroup)) require.Contains(t, consensusGroup, nodes["node1"].backend) - node1.Reset() + nodes["node1"].mockBackend.Reset() } t.Run("initial consensus", func(t *testing.T) { @@ -534,11 +549,11 @@ func TestConsensus(t *testing.T) { require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup())) // reset request counts - node1.Reset() - node2.Reset() + nodes["node1"].mockBackend.Reset() + nodes["node2"].mockBackend.Reset() - require.Equal(t, 0, len(node1.Requests())) - require.Equal(t, 0, len(node2.Requests())) + require.Equal(t, 0, len(nodes["node1"].mockBackend.Requests())) + require.Equal(t, 0, len(nodes["node2"].mockBackend.Requests())) // there is a random component to this test, // since our round-robin implementation shuffles the ordering @@ -556,9 +571,10 @@ func TestConsensus(t *testing.T) { numberReqs-- } - msg := fmt.Sprintf("n1 %d, n2 %d", len(node1.Requests()), len(node2.Requests())) - require.GreaterOrEqual(t, len(node1.Requests()), 50, msg) - require.GreaterOrEqual(t, len(node2.Requests()), 50, msg) + msg := fmt.Sprintf("n1 %d, n2 %d", + len(nodes["node1"].mockBackend.Requests()), len(nodes["node2"].mockBackend.Requests())) + require.GreaterOrEqual(t, len(nodes["node1"].mockBackend.Requests()), 50, msg) + require.GreaterOrEqual(t, len(nodes["node2"].mockBackend.Requests()), 50, msg) }) t.Run("load balancing should not hit if node is not healthy", func(t *testing.T) { @@ -566,11 +582,11 @@ func TestConsensus(t *testing.T) { useOnlyNode1() // reset request counts - node1.Reset() - node2.Reset() + nodes["node1"].mockBackend.Reset() + nodes["node2"].mockBackend.Reset() - require.Equal(t, 0, len(node1.Requests())) - require.Equal(t, 0, len(node2.Requests())) + require.Equal(t, 0, len(nodes["node1"].mockBackend.Requests())) + require.Equal(t, 0, len(nodes["node1"].mockBackend.Requests())) numberReqs := 10 for numberReqs > 0 { @@ -580,16 +596,17 @@ func TestConsensus(t *testing.T) { numberReqs-- } - msg := fmt.Sprintf("n1 %d, n2 %d", len(node1.Requests()), len(node2.Requests())) - require.Equal(t, len(node1.Requests()), 10, msg) - require.Equal(t, len(node2.Requests()), 0, msg) + msg := fmt.Sprintf("n1 %d, n2 %d", + len(nodes["node1"].mockBackend.Requests()), len(nodes["node2"].mockBackend.Requests())) + require.Equal(t, len(nodes["node1"].mockBackend.Requests()), 10, msg) + require.Equal(t, len(nodes["node2"].mockBackend.Requests()), 0, msg) }) t.Run("rewrite response of eth_blockNumber", func(t *testing.T) { reset() update() - totalRequests := len(node1.Requests()) + len(node2.Requests()) + totalRequests := len(nodes["node1"].mockBackend.Requests()) + len(nodes["node2"].mockBackend.Requests()) require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup())) resRaw, statusCode, err := client.SendRPC("eth_blockNumber", nil) @@ -602,7 +619,8 @@ func TestConsensus(t *testing.T) { require.Equal(t, "0x101", jsonMap["result"]) // no extra request hit the backends - require.Equal(t, totalRequests, len(node1.Requests())+len(node2.Requests())) + require.Equal(t, totalRequests, + len(nodes["node1"].mockBackend.Requests())+len(nodes["node2"].mockBackend.Requests())) }) t.Run("rewrite request of eth_getBlockByNumber for latest", func(t *testing.T) { @@ -614,7 +632,7 @@ func TestConsensus(t *testing.T) { require.Equal(t, 200, statusCode) var jsonMap map[string]interface{} - err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap) + err = json.Unmarshal(nodes["node1"].mockBackend.Requests()[0].Body, &jsonMap) require.NoError(t, err) require.Equal(t, "0x101", jsonMap["params"].([]interface{})[0]) }) @@ -628,7 +646,7 @@ func TestConsensus(t *testing.T) { require.Equal(t, 200, statusCode) var jsonMap map[string]interface{} - err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap) + err = json.Unmarshal(nodes["node1"].mockBackend.Requests()[0].Body, &jsonMap) require.NoError(t, err) require.Equal(t, "0xc1", jsonMap["params"].([]interface{})[0]) }) @@ -642,7 +660,7 @@ func TestConsensus(t *testing.T) { require.Equal(t, 200, statusCode) var jsonMap map[string]interface{} - err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap) + err = json.Unmarshal(nodes["node1"].mockBackend.Requests()[0].Body, &jsonMap) require.NoError(t, err) require.Equal(t, "0xe1", jsonMap["params"].([]interface{})[0]) }) From 1f657b5f465793171ef429611a48674b7c270731 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Sat, 27 May 2023 02:02:39 -0700 Subject: [PATCH 10/13] ban backend if error rate is too high --- .../integration_tests/consensus_test.go | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index 73f10f3..e8927d2 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -283,6 +283,34 @@ func TestConsensus(t *testing.T) { require.Equal(t, "0xe2", bg.Consensus.GetSafeBlockNumber().String()) }) + t.Run("ban backend if error rate is too high", func(t *testing.T) { + reset() + useOnlyNode1() + + // replace node1 handler with one that always returns 500 + oldHandler := nodes["node1"].mockBackend.handler + defer func() { nodes["node1"].mockBackend.handler = oldHandler }() + + nodes["node1"].mockBackend.SetHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(503) + })) + + numberReqs := 10 + for numberReqs > 0 { + _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x101", false}) + require.NoError(t, err) + require.Equal(t, 503, statusCode) + numberReqs-- + } + + update() + + consensusGroup := bg.Consensus.GetConsensusGroup() + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 0, len(consensusGroup)) + }) + t.Run("ban backend if tags are messed - safe < finalized", func(t *testing.T) { reset() overrideBlock("node1", "finalized", "0xb1") From 7405b8b360266147837a6013af347e714576782b Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Sat, 27 May 2023 04:14:29 -0700 Subject: [PATCH 11/13] refactor poller --- proxyd/proxyd/consensus_poller.go | 220 +++++++++++++++--------------- 1 file changed, 113 insertions(+), 107 deletions(-) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index 57bdbb3..decacd3 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -34,8 +34,7 @@ type ConsensusPoller struct { tracker ConsensusTracker asyncHandler ConsensusAsyncHandler - minPeerCount uint64 - + minPeerCount uint64 banPeriod time.Duration maxUpdateThreshold time.Duration maxBlockLag uint64 @@ -220,7 +219,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller banPeriod: 5 * time.Minute, maxUpdateThreshold: 30 * time.Second, - maxBlockLag: 8, // quarter of an epoch, 8*12 seconds = 96 seconds ~ 1.6 minutes + maxBlockLag: 8, // 8*12 seconds = 96 seconds ~ 1.6 minutes minPeerCount: 3, } @@ -253,12 +252,11 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { // if backend is not healthy state we'll only resume checking it after ban if !be.IsHealthy() { - log.Warn("backend banned - not online or not healthy", "backend", be.Name) + log.Warn("backend banned - not healthy", "backend", be.Name) cp.Ban(be) return } - // if backend it not in sync we'll check again after ban inSync, err := cp.isInSync(ctx, be) RecordConsensusBackendInSync(be, err == nil && inSync) if err != nil { @@ -276,21 +274,27 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest") if err != nil { - log.Warn("error updating backend", "name", be.Name, "err", err) + log.Warn("error updating backend - latest block", "name", be.Name, "err", err) } safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe") if err != nil { - log.Warn("error updating backend", "name", be.Name, "err", err) + log.Warn("error updating backend - safe block", "name", be.Name, "err", err) } finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized") if err != nil { - log.Warn("error updating backend", "name", be.Name, "err", err) + log.Warn("error updating backend - finalized block", "name", be.Name, "err", err) } - _, _, _, _, oldFinalized, oldSafe, _, _ := cp.getBackendState(be) - expectedBlockTags := cp.checkExpectedBlockTags(finalizedBlockNumber, oldFinalized, safeBlockNumber, oldSafe, latestBlockNumber) + bs := cp.getBackendState(be) + oldFinalized := bs.finalizedBlockNumber + oldSafe := bs.safeBlockNumber + + expectedBlockTags := cp.checkExpectedBlockTags( + finalizedBlockNumber, oldFinalized, + safeBlockNumber, oldSafe, + latestBlockNumber) changed, updateDelay := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash, @@ -342,116 +346,108 @@ func (cp *ConsensusPoller) checkExpectedBlockTags(currentFinalized hexutil.Uint6 // UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { - var highestLatestBlock hexutil.Uint64 - - var lowestLatestBlock hexutil.Uint64 - var lowestLatestBlockHash string - - var lowestFinalizedBlock hexutil.Uint64 - var lowestSafeBlock hexutil.Uint64 - + // get the latest block number from the tracker currentConsensusBlockNumber := cp.GetLatestBlockNumber() - // find the highest block, in order to use it defining the highest non-lagging ancestor block + // find out what backends are the candidates to be in the consensus group + // and create a copy of current their state + // + // a serving node needs to be: + // - not banned + // - healthy (network latency and error rate) + // - with minimum peer count + // - in sync + // - updated recently + // - not lagging latest block + + candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends)) + filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends)) for _, be := range cp.backendGroup.Backends { - peerCount, inSync, backendLatestBlockNumber, _, _, _, lastUpdate, _ := cp.getBackendState(be) + bs := cp.getBackendState(be) + passed := true + if time.Now().Before(bs.bannedUntil) { + passed = false + } + if !be.IsHealthy() { + passed = false + } + if !be.skipPeerCountCheck && bs.peerCount < cp.minPeerCount { + passed = false + } + if !bs.inSync { + passed = false + } + if bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { + passed = false + } + if passed { + candidates[be] = bs + } else { + filteredBackendsNames = append(filteredBackendsNames, be.Name) + } + } - if cp.IsBanned(be) { - continue - } - if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { - continue - } - if !inSync { - continue - } - if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { - continue - } - - if backendLatestBlockNumber > highestLatestBlock { - highestLatestBlock = backendLatestBlockNumber + // find the highest block, in order to use it defining the highest non-lagging ancestor block + var highestLatestBlock hexutil.Uint64 + for _, bs := range candidates { + if bs.latestBlockNumber > highestLatestBlock { + highestLatestBlock = bs.latestBlockNumber } } // find the highest common ancestor block - for _, be := range cp.backendGroup.Backends { - peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, backendFinalizedBlockNumber, backendSafeBlockNumber, lastUpdate, _ := cp.getBackendState(be) - - if cp.IsBanned(be) { - continue - } - if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { - continue - } - if !inSync { - continue - } - if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { - continue - } - + var lowestLatestBlock hexutil.Uint64 + var lowestLatestBlockHash string + var lowestFinalizedBlock hexutil.Uint64 + var lowestSafeBlock hexutil.Uint64 + lagging := make([]*Backend, 0, len(candidates)) + for be, bs := range candidates { // check if backend is lagging behind the highest block - if backendLatestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-backendLatestBlockNumber) > cp.maxBlockLag { + if bs.latestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag { + lagging = append(lagging, be) continue } - if lowestLatestBlock == 0 || backendLatestBlockNumber < lowestLatestBlock { - lowestLatestBlock = backendLatestBlockNumber - lowestLatestBlockHash = backendLatestBlockHash + // update the lowest common ancestor block + if lowestLatestBlock == 0 || bs.latestBlockNumber < lowestLatestBlock { + lowestLatestBlock = bs.latestBlockNumber + lowestLatestBlockHash = bs.latestBlockHash } - if lowestFinalizedBlock == 0 || backendFinalizedBlockNumber < lowestFinalizedBlock { - lowestFinalizedBlock = backendFinalizedBlockNumber + // update the lowest finalized block + if lowestFinalizedBlock == 0 || bs.finalizedBlockNumber < lowestFinalizedBlock { + lowestFinalizedBlock = bs.finalizedBlockNumber } - if lowestSafeBlock == 0 || backendSafeBlockNumber < lowestSafeBlock { - lowestSafeBlock = backendSafeBlockNumber + // update the lowest safe block + if lowestSafeBlock == 0 || bs.safeBlockNumber < lowestSafeBlock { + lowestSafeBlock = bs.safeBlockNumber } } + // remove lagging backends from the candidates + for _, be := range lagging { + filteredBackendsNames = append(filteredBackendsNames, be.Name) + delete(candidates, be) + } + + // find the proposed block among the candidates proposedBlock := lowestLatestBlock proposedBlockHash := lowestLatestBlockHash hasConsensus := false - // check if everybody agrees on the same block hash - consensusBackends := make([]*Backend, 0, len(cp.backendGroup.Backends)) - consensusBackendsNames := make([]string, 0, len(cp.backendGroup.Backends)) - filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends)) - if lowestLatestBlock > currentConsensusBlockNumber { log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock) } // if there is no block to propose, the consensus is automatically broken + // this can happen when backends have just recovered broken := proposedBlock == 0 && currentConsensusBlockNumber > 0 if proposedBlock > 0 { for !hasConsensus { allAgreed := true - consensusBackends = consensusBackends[:0] - filteredBackendsNames = filteredBackendsNames[:0] - for _, be := range cp.backendGroup.Backends { - /* - a serving node needs to be: - - healthy (network) - - updated recently - - not banned - - with minimum peer count - - not lagging latest block - - in sync - */ - - peerCount, inSync, latestBlockNumber, _, _, _, lastUpdate, bannedUntil := cp.getBackendState(be) - notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) - isBanned := time.Now().Before(bannedUntil) - notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount - lagging := latestBlockNumber < proposedBlock - if !be.IsHealthy() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync { - filteredBackendsNames = append(filteredBackendsNames, be.Name) - continue - } - + for be, _ := range candidates { actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String()) if err != nil { log.Warn("error updating backend", "name", be.Name, "err", err) @@ -469,8 +465,6 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { allAgreed = false break } - consensusBackends = append(consensusBackends, be) - consensusBackendsNames = append(consensusBackendsNames, be.Name) } if allAgreed { hasConsensus = true @@ -488,26 +482,39 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { for _, l := range cp.listeners { l() } - log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash) + log.Info("consensus broken", + "currentConsensusBlockNumber", currentConsensusBlockNumber, + "proposedBlock", proposedBlock, + "proposedBlockHash", proposedBlockHash) } cp.tracker.SetLatestBlockNumber(proposedBlock) cp.tracker.SetSafeBlockNumber(lowestSafeBlock) cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock) + // update consensus group + group := make([]*Backend, 0, len(candidates)) + consensusBackendsNames := make([]string, 0, len(candidates)) + for be, _ := range candidates { + group = append(group, be) + consensusBackendsNames = append(consensusBackendsNames, be.Name) + } cp.consensusGroupMux.Lock() - cp.consensusGroup = consensusBackends + cp.consensusGroup = group cp.consensusGroupMux.Unlock() RecordGroupConsensusLatestBlock(cp.backendGroup, proposedBlock) RecordGroupConsensusSafeBlock(cp.backendGroup, lowestSafeBlock) RecordGroupConsensusFinalizedBlock(cp.backendGroup, lowestFinalizedBlock) - RecordGroupConsensusCount(cp.backendGroup, len(consensusBackends)) + RecordGroupConsensusCount(cp.backendGroup, len(group)) RecordGroupConsensusFilteredCount(cp.backendGroup, len(filteredBackendsNames)) RecordGroupTotalCount(cp.backendGroup, len(cp.backendGroup.Backends)) - log.Debug("group state", "proposedBlock", proposedBlock, "consensusBackends", strings.Join(consensusBackendsNames, ", "), "filteredBackends", strings.Join(filteredBackendsNames, ", ")) + log.Debug("group state", + "proposedBlock", proposedBlock, + "consensusBackends", strings.Join(consensusBackendsNames, ", "), + "filteredBackends", strings.Join(filteredBackendsNames, ", ")) } // IsBanned checks if a specific backend is banned @@ -606,23 +613,22 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo return res, nil } -func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool, - latestBlockNumber hexutil.Uint64, latestBlockHash string, - finalizedBlockNumber hexutil.Uint64, - safeBlockNumber hexutil.Uint64, - lastUpdate time.Time, bannedUntil time.Time) { +func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState { bs := cp.backendState[be] defer bs.backendStateMux.Unlock() bs.backendStateMux.Lock() - peerCount = bs.peerCount - inSync = bs.inSync - latestBlockNumber = bs.latestBlockNumber - latestBlockHash = bs.latestBlockHash - finalizedBlockNumber = bs.finalizedBlockNumber - safeBlockNumber = bs.safeBlockNumber - lastUpdate = bs.lastUpdate - bannedUntil = bs.bannedUntil - return + + // we return a copy so that the caller can use it without locking + return &backendState{ + latestBlockNumber: bs.latestBlockNumber, + latestBlockHash: bs.latestBlockHash, + safeBlockNumber: bs.safeBlockNumber, + finalizedBlockNumber: bs.finalizedBlockNumber, + peerCount: bs.peerCount, + inSync: bs.inSync, + lastUpdate: bs.lastUpdate, + bannedUntil: bs.bannedUntil, + } } func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, From 8436728419cf868ec291fdd29bf008c11419d538 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Sat, 27 May 2023 04:31:05 -0700 Subject: [PATCH 12/13] extract candidates logic, refactor update backend --- proxyd/proxyd/consensus_poller.go | 225 ++++++++++-------- .../integration_tests/consensus_test.go | 2 +- 2 files changed, 124 insertions(+), 103 deletions(-) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index decacd3..9ed28a4 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -57,6 +57,10 @@ type backendState struct { bannedUntil time.Time } +func (bs backendState) IsBanned() bool { + return time.Now().Before(bs.bannedUntil) +} + // GetConsensusGroup returns the backend members that are agreeing in a consensus func (cp *ConsensusPoller) GetConsensusGroup() []*Backend { defer cp.consensusGroupMux.Unlock() @@ -242,10 +246,10 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller // UpdateBackend refreshes the consensus state of a single backend func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { - banned := cp.IsBanned(be) - RecordConsensusBackendBanned(be, banned) + bs := cp.getBackendState(be) + RecordConsensusBackendBanned(be, bs.IsBanned()) - if banned { + if bs.IsBanned() { log.Debug("skipping backend - banned", "backend", be.Name) return } @@ -287,36 +291,19 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { log.Warn("error updating backend - finalized block", "name", be.Name, "err", err) } - bs := cp.getBackendState(be) oldFinalized := bs.finalizedBlockNumber oldSafe := bs.safeBlockNumber - expectedBlockTags := cp.checkExpectedBlockTags( - finalizedBlockNumber, oldFinalized, - safeBlockNumber, oldSafe, - latestBlockNumber) + updateDelay := time.Since(bs.lastUpdate) + RecordConsensusBackendUpdateDelay(be, updateDelay) - changed, updateDelay := cp.setBackendState(be, peerCount, inSync, + changed := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash, finalizedBlockNumber, safeBlockNumber) RecordBackendLatestBlock(be, latestBlockNumber) RecordBackendSafeBlock(be, safeBlockNumber) RecordBackendFinalizedBlock(be, finalizedBlockNumber) - RecordBackendUnexpectedBlockTags(be, !expectedBlockTags) - RecordConsensusBackendUpdateDelay(be, updateDelay) - - if !expectedBlockTags { - log.Warn("backend banned - unexpected block tags", - "backend", be.Name, - "oldFinalized", oldFinalized, - "finalizedBlockNumber", finalizedBlockNumber, - "oldSafe", oldSafe, - "safeBlockNumber", safeBlockNumber, - "latestBlockNumber", latestBlockNumber, - ) - cp.Ban(be) - } if changed { log.Debug("backend state updated", @@ -329,6 +316,26 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { "safeBlockNumber", safeBlockNumber, "updateDelay", updateDelay) } + + // sanity check for latest, safe and finalized block tags + expectedBlockTags := cp.checkExpectedBlockTags( + finalizedBlockNumber, oldFinalized, + safeBlockNumber, oldSafe, + latestBlockNumber) + + RecordBackendUnexpectedBlockTags(be, !expectedBlockTags) + + if !expectedBlockTags { + log.Warn("backend banned - unexpected block tags", + "backend", be.Name, + "oldFinalized", oldFinalized, + "finalizedBlockNumber", finalizedBlockNumber, + "oldSafe", oldSafe, + "safeBlockNumber", safeBlockNumber, + "latestBlockNumber", latestBlockNumber, + ) + cp.Ban(be) + } } // checkExpectedBlockTags for unexpected conditions on block tags @@ -349,101 +356,41 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { // get the latest block number from the tracker currentConsensusBlockNumber := cp.GetLatestBlockNumber() - // find out what backends are the candidates to be in the consensus group - // and create a copy of current their state - // - // a serving node needs to be: - // - not banned - // - healthy (network latency and error rate) - // - with minimum peer count - // - in sync - // - updated recently - // - not lagging latest block + // get the candidates for the consensus group + candidates := cp.getConsensusCandidates() - candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends)) - filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends)) - for _, be := range cp.backendGroup.Backends { - bs := cp.getBackendState(be) - passed := true - if time.Now().Before(bs.bannedUntil) { - passed = false - } - if !be.IsHealthy() { - passed = false - } - if !be.skipPeerCountCheck && bs.peerCount < cp.minPeerCount { - passed = false - } - if !bs.inSync { - passed = false - } - if bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { - passed = false - } - if passed { - candidates[be] = bs - } else { - filteredBackendsNames = append(filteredBackendsNames, be.Name) - } - } - - // find the highest block, in order to use it defining the highest non-lagging ancestor block - var highestLatestBlock hexutil.Uint64 - for _, bs := range candidates { - if bs.latestBlockNumber > highestLatestBlock { - highestLatestBlock = bs.latestBlockNumber - } - } - - // find the highest common ancestor block + // update the lowest latest block number and hash + // the lowest safe block number + // the lowest finalized block number var lowestLatestBlock hexutil.Uint64 var lowestLatestBlockHash string var lowestFinalizedBlock hexutil.Uint64 var lowestSafeBlock hexutil.Uint64 - lagging := make([]*Backend, 0, len(candidates)) - for be, bs := range candidates { - // check if backend is lagging behind the highest block - if bs.latestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag { - lagging = append(lagging, be) - continue - } - - // update the lowest common ancestor block + for _, bs := range candidates { if lowestLatestBlock == 0 || bs.latestBlockNumber < lowestLatestBlock { lowestLatestBlock = bs.latestBlockNumber lowestLatestBlockHash = bs.latestBlockHash } - - // update the lowest finalized block if lowestFinalizedBlock == 0 || bs.finalizedBlockNumber < lowestFinalizedBlock { lowestFinalizedBlock = bs.finalizedBlockNumber } - - // update the lowest safe block if lowestSafeBlock == 0 || bs.safeBlockNumber < lowestSafeBlock { lowestSafeBlock = bs.safeBlockNumber } } - // remove lagging backends from the candidates - for _, be := range lagging { - filteredBackendsNames = append(filteredBackendsNames, be.Name) - delete(candidates, be) - } - // find the proposed block among the candidates + // the proposed block needs have the same hash in the entire consensus group proposedBlock := lowestLatestBlock proposedBlockHash := lowestLatestBlockHash hasConsensus := false + broken := false if lowestLatestBlock > currentConsensusBlockNumber { log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock) } - // if there is no block to propose, the consensus is automatically broken - // this can happen when backends have just recovered - broken := proposedBlock == 0 && currentConsensusBlockNumber > 0 - + // if there is a block to propose, check if it is the same in all backends if proposedBlock > 0 { for !hasConsensus { allAgreed := true @@ -459,7 +406,12 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash) if blocksDontMatch { if currentConsensusBlockNumber >= actualBlockNumber { - log.Warn("backend broke consensus", "name", be.Name, "blockNum", actualBlockNumber, "proposedBlockNum", proposedBlock, "blockHash", actualBlockHash, "proposedBlockHash", proposedBlockHash) + log.Warn("backend broke consensus", + "name", be.Name, + "actualBlockNumber", actualBlockNumber, + "actualBlockHash", actualBlockHash, + "proposedBlock", proposedBlock, + "proposedBlockHash", proposedBlockHash) broken = true } allAgreed = false @@ -488,6 +440,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { "proposedBlockHash", proposedBlockHash) } + // update tracker cp.tracker.SetLatestBlockNumber(proposedBlock) cp.tracker.SetSafeBlockNumber(lowestSafeBlock) cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock) @@ -495,10 +448,17 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { // update consensus group group := make([]*Backend, 0, len(candidates)) consensusBackendsNames := make([]string, 0, len(candidates)) - for be, _ := range candidates { - group = append(group, be) - consensusBackendsNames = append(consensusBackendsNames, be.Name) + filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends)) + for _, be := range cp.backendGroup.Backends { + _, exist := candidates[be] + if exist { + group = append(group, be) + consensusBackendsNames = append(consensusBackendsNames, be.Name) + } else { + filteredBackendsNames = append(filteredBackendsNames, be.Name) + } } + cp.consensusGroupMux.Lock() cp.consensusGroup = group cp.consensusGroupMux.Unlock() @@ -522,7 +482,7 @@ func (cp *ConsensusPoller) IsBanned(be *Backend) bool { bs := cp.backendState[be] defer bs.backendStateMux.Unlock() bs.backendStateMux.Lock() - return time.Now().Before(bs.bannedUntil) + return bs.IsBanned() } // Ban bans a specific backend @@ -531,6 +491,9 @@ func (cp *ConsensusPoller) Ban(be *Backend) { defer bs.backendStateMux.Unlock() bs.backendStateMux.Lock() bs.bannedUntil = time.Now().Add(cp.banPeriod) + + // when we ban a node, we give it the chance to start from any block when it is back + bs.latestBlockNumber = 0 bs.safeBlockNumber = 0 bs.finalizedBlockNumber = 0 } @@ -613,12 +576,12 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo return res, nil } +// getBackendState creates a copy of backend state so that the caller can use it without locking func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState { bs := cp.backendState[be] defer bs.backendStateMux.Unlock() bs.backendStateMux.Lock() - // we return a copy so that the caller can use it without locking return &backendState{ latestBlockNumber: bs.latestBlockNumber, latestBlockHash: bs.latestBlockHash, @@ -634,18 +597,76 @@ func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState { func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, latestBlockNumber hexutil.Uint64, latestBlockHash string, finalizedBlockNumber hexutil.Uint64, - safeBlockNumber hexutil.Uint64) (changed bool, updateDelay time.Duration) { + safeBlockNumber hexutil.Uint64) bool { bs := cp.backendState[be] bs.backendStateMux.Lock() - changed = bs.latestBlockHash != latestBlockHash + changed := bs.latestBlockHash != latestBlockHash bs.peerCount = peerCount bs.inSync = inSync bs.latestBlockNumber = latestBlockNumber bs.latestBlockHash = latestBlockHash bs.finalizedBlockNumber = finalizedBlockNumber bs.safeBlockNumber = safeBlockNumber - updateDelay = time.Since(bs.lastUpdate) bs.lastUpdate = time.Now() bs.backendStateMux.Unlock() - return + return changed +} + +// getConsensusCandidates find out what backends are the candidates to be in the consensus group +// and create a copy of current their state +// +// a candidate is a serving node within the following conditions: +// - not banned +// - healthy (network latency and error rate) +// - with minimum peer count +// - in sync +// - updated recently +// - not lagging latest block +func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState { + candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends)) + + for _, be := range cp.backendGroup.Backends { + bs := cp.getBackendState(be) + if bs.IsBanned() { + continue + } + if !be.IsHealthy() { + continue + } + if !be.skipPeerCountCheck && bs.peerCount < cp.minPeerCount { + continue + } + if !bs.inSync { + continue + } + if bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { + continue + } + + candidates[be] = bs + } + + // find the highest block, in order to use it defining the highest non-lagging ancestor block + var highestLatestBlock hexutil.Uint64 + for _, bs := range candidates { + if bs.latestBlockNumber > highestLatestBlock { + highestLatestBlock = bs.latestBlockNumber + } + } + + // find the highest common ancestor block + lagging := make([]*Backend, 0, len(candidates)) + for be, bs := range candidates { + // check if backend is lagging behind the highest block + if bs.latestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag { + lagging = append(lagging, be) + } + } + + // remove lagging backends from the candidates + for _, be := range lagging { + delete(candidates, be) + } + + return candidates } diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index e8927d2..98196c9 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -428,7 +428,7 @@ func TestConsensus(t *testing.T) { require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String()) }) - t.Run("latest dropped below safe, and stayed inconsistent after ban", func(t *testing.T) { + t.Run("latest dropped below safe, and stayed inconsistent", func(t *testing.T) { reset() useOnlyNode1() overrideBlock("node1", "latest", "0xd1") From dbfb39a4fa5a435ddf0fc7befcf146b72c22958f Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Sat, 27 May 2023 07:38:19 -0700 Subject: [PATCH 13/13] lint --- proxyd/proxyd/consensus_poller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index 9ed28a4..07edc31 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -57,7 +57,7 @@ type backendState struct { bannedUntil time.Time } -func (bs backendState) IsBanned() bool { +func (bs *backendState) IsBanned() bool { return time.Now().Before(bs.bannedUntil) } @@ -394,7 +394,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { if proposedBlock > 0 { for !hasConsensus { allAgreed := true - for be, _ := range candidates { + for be := range candidates { actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String()) if err != nil { log.Warn("error updating backend", "name", be.Name, "err", err)