From 2d9259ee2006716f7a485f70c073399ad40e1584 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Fri, 26 May 2023 14:22:50 -0700 Subject: [PATCH] better moar tests --- proxyd/proxyd/backend.go | 32 +- proxyd/proxyd/consensus_poller.go | 67 +- .../integration_tests/consensus_test.go | 983 ++++++------------ .../testdata/consensus_responses.yml | 78 +- proxyd/proxyd/metrics.go | 97 +- proxyd/proxyd/rewriter.go | 2 +- 6 files changed, 533 insertions(+), 726 deletions(-) diff --git a/proxyd/proxyd/backend.go b/proxyd/proxyd/backend.go index 3946995..6ba7a10 100644 --- a/proxyd/proxyd/backend.go +++ b/proxyd/proxyd/backend.go @@ -374,7 +374,6 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { // we are concerned about network error rates, so we record 1 request independently of how many are in the batch b.networkRequestsSlidingWindow.Incr() - RecordBackendNetworkRequestCountSlidingWindow(b, b.networkRequestsSlidingWindow.Count()) isSingleElementBatch := len(rpcReqs) == 1 @@ -391,7 +390,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body)) if err != nil { b.networkErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) + RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, wrapErr(err, "error creating backend request") } @@ -413,7 +412,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool httpRes, err := b.client.DoLimited(httpReq) if err != nil { b.networkErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) + RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, wrapErr(err, "error in backend request") } @@ -432,7 +431,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool // Alchemy returns a 400 on bad JSONs, so handle that case if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 { b.networkErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) + RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, fmt.Errorf("response code %d", httpRes.StatusCode) } @@ -440,7 +439,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool resB, err := io.ReadAll(io.LimitReader(httpRes.Body, b.maxResponseSize)) if err != nil { b.networkErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) + RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, wrapErr(err, "error reading response body") } @@ -458,18 +457,18 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool // Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method if responseIsNotBatched(resB) { b.networkErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) + RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, ErrBackendUnexpectedJSONRPC } b.networkErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) + RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, ErrBackendBadResponse } } if len(rpcReqs) != len(res) { b.networkErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorCountSlidingWindow(b, b.networkErrorsSlidingWindow.Count()) + RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, ErrBackendUnexpectedJSONRPC } @@ -483,6 +482,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool duration := time.Since(start) b.latencySlidingWindow.Add(float64(duration)) RecordBackendNetworkLatencyAverageSlidingWindow(b, time.Duration(b.latencySlidingWindow.Avg())) + RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) sortBatchRPCResponse(rpcReqs, res) return res, nil @@ -490,11 +490,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool // IsHealthy checks if the backend is able to serve traffic, based on dynamic parameters func (b *Backend) IsHealthy() bool { - errorRate := float64(0) - // avoid division-by-zero when the window is empty - if b.networkRequestsSlidingWindow.Sum() >= 10 { - errorRate = b.networkErrorsSlidingWindow.Sum() / b.networkRequestsSlidingWindow.Sum() - } + errorRate := b.ErrorRate() avgLatency := time.Duration(b.latencySlidingWindow.Avg()) if errorRate >= b.maxErrorRateThreshold { return false @@ -505,6 +501,16 @@ func (b *Backend) IsHealthy() bool { return true } +// ErrorRate returns the instant error rate of the backend +func (b *Backend) ErrorRate() (errorRate float64) { + // we only really start counting the error rate after a minimum of 10 requests + // this is to avoid false positives when the backend is just starting up + if b.networkRequestsSlidingWindow.Sum() >= 10 { + errorRate = b.networkErrorsSlidingWindow.Sum() / b.networkRequestsSlidingWindow.Sum() + } + return errorRate +} + // IsDegraded checks if the backend is serving traffic in a degraded state (i.e. used as a last resource) func (b *Backend) IsDegraded() bool { avgLatency := time.Duration(b.latencySlidingWindow.Avg()) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index 5c5edcd..1856566 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -275,23 +275,42 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { log.Warn("error updating backend", "name", be.Name, "err", err) } - finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized") - if err != nil { - log.Warn("error updating backend", "name", be.Name, "err", err) - } - safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe") if err != nil { log.Warn("error updating backend", "name", be.Name, "err", err) } + finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized") + if err != nil { + log.Warn("error updating backend", "name", be.Name, "err", err) + } + + _, _, _, _, oldFinalized, oldSafe, _, _ := cp.getBackendState(be) + expectedBlockTags := cp.checkExpectedBlockTags(finalizedBlockNumber, oldFinalized, safeBlockNumber, oldSafe, latestBlockNumber) + changed, updateDelay := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash, finalizedBlockNumber, safeBlockNumber) + RecordBackendLatestBlock(be, latestBlockNumber) + RecordBackendSafeBlock(be, safeBlockNumber) + RecordBackendFinalizedBlock(be, finalizedBlockNumber) + RecordBackendUnexpectedBlockTags(be, !expectedBlockTags) + RecordConsensusBackendUpdateDelay(be, updateDelay) + + if !expectedBlockTags { + log.Warn("backend banned - unexpected block tags", + "backend", be.Name, + "oldFinalized", oldFinalized, + "finalizedBlockNumber", finalizedBlockNumber, + "oldSafe", oldSafe, + "safeBlockNumber", safeBlockNumber, + "latestBlockNumber", latestBlockNumber, + ) + cp.Ban(be) + } + if changed { - RecordBackendLatestBlock(be, latestBlockNumber) - RecordConsensusBackendUpdateDelay(be, updateDelay) log.Debug("backend state updated", "name", be.Name, "peerCount", peerCount, @@ -304,6 +323,19 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { } } +// checkExpectedBlockTags for unexpected conditions on block tags +// - finalized block number should never decrease +// - safe block number should never decrease +// - finalized block should be < safe block < latest block +func (cp *ConsensusPoller) checkExpectedBlockTags(currentFinalized hexutil.Uint64, oldFinalized hexutil.Uint64, + currentSafe hexutil.Uint64, oldSafe hexutil.Uint64, + currentLatest hexutil.Uint64) bool { + return currentFinalized >= oldFinalized && + currentSafe >= oldSafe && + currentFinalized <= currentSafe && + currentSafe <= currentLatest +} + // UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { var highestLatestBlock hexutil.Uint64 @@ -320,6 +352,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { for _, be := range cp.backendGroup.Backends { peerCount, inSync, backendLatestBlockNumber, _, _, _, lastUpdate, _ := cp.getBackendState(be) + if cp.IsBanned(be) { + continue + } if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { continue } @@ -339,6 +374,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { for _, be := range cp.backendGroup.Backends { peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, backendFinalizedBlockNumber, backendSafeBlockNumber, lastUpdate, _ := cp.getBackendState(be) + if cp.IsBanned(be) { + continue + } if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { continue } @@ -451,13 +489,17 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { } cp.tracker.SetLatestBlockNumber(proposedBlock) - cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock) cp.tracker.SetSafeBlockNumber(lowestSafeBlock) + cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock) + cp.consensusGroupMux.Lock() cp.consensusGroup = consensusBackends cp.consensusGroupMux.Unlock() RecordGroupConsensusLatestBlock(cp.backendGroup, proposedBlock) + RecordGroupConsensusSafeBlock(cp.backendGroup, lowestSafeBlock) + RecordGroupConsensusFinalizedBlock(cp.backendGroup, lowestFinalizedBlock) + RecordGroupConsensusCount(cp.backendGroup, len(consensusBackends)) RecordGroupConsensusFilteredCount(cp.backendGroup, len(filteredBackendsNames)) RecordGroupTotalCount(cp.backendGroup, len(cp.backendGroup.Backends)) @@ -481,13 +523,10 @@ func (cp *ConsensusPoller) Ban(be *Backend) { bs.bannedUntil = time.Now().Add(cp.banPeriod) } -// Unban remove any bans from the backends -func (cp *ConsensusPoller) Unban() { +// Reset remove any bans from the backends and reset their states +func (cp *ConsensusPoller) Reset() { for _, be := range cp.backendGroup.Backends { - bs := cp.backendState[be] - bs.backendStateMux.Lock() - bs.bannedUntil = time.Now().Add(-10 * time.Hour) - bs.backendStateMux.Unlock() + cp.backendState[be] = &backendState{} } } diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index d95903e..fe36a15 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -16,6 +16,12 @@ import ( "github.com/stretchr/testify/require" ) +type nodeContext struct { + mockBackend *MockBackend + backend *proxyd.Backend + handler *ms.MockedHandler +} + func TestConsensus(t *testing.T) { node1 := NewMockBackend(nil) defer node1.Close() @@ -55,526 +61,402 @@ func TestConsensus(t *testing.T) { require.NotNil(t, bg) require.NotNil(t, bg.Consensus) + // convenient mapping to access the nodes by name + nodes := map[string]nodeContext{ + "node1": { + mockBackend: node1, + backend: bg.Backends[0], + handler: &h1, + }, + "node2": { + mockBackend: node2, + backend: bg.Backends[1], + handler: &h2, + }, + } + + reset := func() { + for _, node := range nodes { + node.handler.ResetOverrides() + node.mockBackend.Reset() + } + bg.Consensus.Reset() + } + + // poll for updated consensus + update := func() { + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + } + + override := func(node string, method string, block string, response string) { + nodes[node].handler.AddOverride(&ms.MethodTemplate{ + Method: method, + Block: block, + Response: response, + }) + } + + overrideBlock := func(node string, blockRequest string, blockResponse string) { + override(node, + "eth_getBlockByNumber", + blockRequest, + buildResponse(map[string]string{ + "number": blockResponse, + "hash": "hash_" + blockResponse, + })) + } + + overrideBlockHash := func(node string, blockRequest string, number string, hash string) { + override(node, + "eth_getBlockByNumber", + blockRequest, + buildResponse(map[string]string{ + "number": number, + "hash": hash, + })) + } + + overridePeerCount := func(node string, count int) { + override(node, "net_peerCount", "", buildResponse(hexutil.Uint64(count).String())) + } + + overrideNotInSync := func(node string) { + override(node, "eth_syncing", "", buildResponse(map[string]string{ + "startingblock": "0x0", + "currentblock": "0x0", + "highestblock": "0x100", + })) + } + + useOnlyNode1 := func() { + overridePeerCount("node2", 0) + update() + + consensusGroup := bg.Consensus.GetConsensusGroup() + require.Equal(t, 1, len(consensusGroup)) + require.Contains(t, consensusGroup, nodes["node1"].backend) + node1.Reset() + } + t.Run("initial consensus", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() + reset() // unknown consensus at init require.Equal(t, "0x0", bg.Consensus.GetLatestBlockNumber().String()) // first poll - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + update() - // consensus at block 0x1 - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) - require.Equal(t, "0x555", bg.Consensus.GetFinalizedBlockNumber().String()) - require.Equal(t, "0x551", bg.Consensus.GetSafeBlockNumber().String()) + // as a default we use: + // - latest at 0x101 [257] + // - safe at 0xe1 [225] + // - finalized at 0xc1 [193] + + // consensus at block 0x101 + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) }) t.Run("prevent using a backend with low peer count", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() + reset() + overridePeerCount("node1", 0) + update() - h1.AddOverride(&ms.MethodTemplate{ - Method: "net_peerCount", - Block: "", - Response: buildPeerCountResponse(1), - }) - - be := backend(bg, "node1") - require.NotNil(t, be) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) consensusGroup := bg.Consensus.GetConsensusGroup() - - require.NotContains(t, consensusGroup, be) - require.False(t, bg.Consensus.IsBanned(be)) + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) require.Equal(t, 1, len(consensusGroup)) }) t.Run("prevent using a backend lagging behind", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() + reset() + // node2 is 51 blocks ahead of node1 (0x101 + 51 = 0x134) + overrideBlock("node2", "latest", "0x134") + update() - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x1", "hash1"), - }) - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "finalized", - Response: buildGetBlockResponse("0x1", "hash1"), - }) - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "safe", - Response: buildGetBlockResponse("0x1", "hash1"), - }) - - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x100", "hash0x100"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "0x100", - Response: buildGetBlockResponse("0x100", "hash0x100"), - }) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - // since we ignored node1, the consensus should be at 0x100 - require.Equal(t, "0x100", bg.Consensus.GetLatestBlockNumber().String()) - require.Equal(t, "0x555", bg.Consensus.GetFinalizedBlockNumber().String()) - require.Equal(t, "0x551", bg.Consensus.GetSafeBlockNumber().String()) + // since we ignored node1, the consensus should be at 0x133 + require.Equal(t, "0x134", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) consensusGroup := bg.Consensus.GetConsensusGroup() - - be := backend(bg, "node1") - require.NotNil(t, be) - require.NotContains(t, consensusGroup, be) - require.False(t, bg.Consensus.IsBanned(be)) + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) require.Equal(t, 1, len(consensusGroup)) }) - t.Run("prevent using a backend lagging behind - at limit", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() - - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x1", "hash1"), - }) - - // 0x1 + 50 = 0x33 - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x33", "hash0x100"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "0x100", - Response: buildGetBlockResponse("0x33", "hash0x100"), - }) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - // since we ignored node1, the consensus should be at 0x100 - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) - - consensusGroup := bg.Consensus.GetConsensusGroup() - - require.Equal(t, 2, len(consensusGroup)) - }) - t.Run("prevent using a backend lagging behind - one before limit", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() + reset() + // node2 is 50 blocks ahead of node1 (0x101 + 50 = 0x133) + overrideBlock("node2", "latest", "0x133") + update() - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x1", "hash1"), - }) - - // 0x1 + 49 = 0x32 - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x32", "hash0x100"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "0x100", - Response: buildGetBlockResponse("0x32", "hash0x100"), - }) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) - - consensusGroup := bg.Consensus.GetConsensusGroup() - - require.Equal(t, 2, len(consensusGroup)) + // both nodes are in consensus with the lowest block + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup())) }) t.Run("prevent using a backend not in sync", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() + reset() + // make node1 not in sync + overrideNotInSync("node1") + update() - // advance latest on node2 to 0x2 - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_syncing", - Block: "", - Response: buildResponse(map[string]string{ - "startingblock": "0x0", - "currentblock": "0x0", - "highestblock": "0x100", - }), - }) - - be := backend(bg, "node1") - require.NotNil(t, be) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) consensusGroup := bg.Consensus.GetConsensusGroup() - - require.NotContains(t, consensusGroup, be) - require.False(t, bg.Consensus.IsBanned(be)) + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) require.Equal(t, 1, len(consensusGroup)) }) t.Run("advance consensus", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() + reset() - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } + // as a default we use: + // - latest at 0x101 [257] + // - safe at 0xe1 [225] + // - finalized at 0xc1 [193] + + update() + + // all nodes start at block 0x101 + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) + + // advance latest on node2 to 0x102 + overrideBlock("node2", "latest", "0x102") + + update() + + // consensus should stick to 0x101, since node1 is still lagging there bg.Consensus.UpdateBackendGroupConsensus(ctx) + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) - // all nodes start at block 0x1 - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) + // advance latest on node1 to 0x102 + overrideBlock("node1", "latest", "0x102") - // advance latest on node2 to 0x2 - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), - }) + update() - // poll for group consensus - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - - // consensus should stick to 0x1, since node1 is still lagging there - bg.Consensus.UpdateBackendGroupConsensus(ctx) - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) - - // advance latest on node1 to 0x2 - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), - }) - - // poll for group consensus - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - // should stick to 0x2, since now all nodes are at 0x2 - require.Equal(t, "0x2", bg.Consensus.GetLatestBlockNumber().String()) + // all nodes now at 0x102 + require.Equal(t, "0x102", bg.Consensus.GetLatestBlockNumber().String()) }) t.Run("should use lowest safe and finalized", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() + reset() + overrideBlock("node2", "finalized", "0xc2") + overrideBlock("node2", "safe", "0xe2") + update() - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "finalized", - Response: buildGetBlockResponse("0x559", "hash559"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "safe", - Response: buildGetBlockResponse("0x558", "hash558"), - }) - - // poll for group consensus - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - require.Equal(t, "0x555", bg.Consensus.GetFinalizedBlockNumber().String()) - require.Equal(t, "0x551", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) }) t.Run("advance safe and finalized", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() + reset() + overrideBlock("node1", "finalized", "0xc2") + overrideBlock("node1", "safe", "0xe2") + overrideBlock("node2", "finalized", "0xc2") + overrideBlock("node2", "safe", "0xe2") + update() - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + require.Equal(t, "0xc2", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0xe2", bg.Consensus.GetSafeBlockNumber().String()) + }) - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "finalized", - Response: buildGetBlockResponse("0x556", "hash556"), - }) - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "safe", - Response: buildGetBlockResponse("0x552", "hash552"), - }) + t.Run("ban backend if tags are messed - safe < finalized", func(t *testing.T) { + reset() + overrideBlock("node1", "finalized", "0xb1") + overrideBlock("node1", "safe", "0xa1") + update() - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "finalized", - Response: buildGetBlockResponse("0x559", "hash559"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "safe", - Response: buildGetBlockResponse("0x558", "hash558"), - }) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) - // poll for group consensus - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } + consensusGroup := bg.Consensus.GetConsensusGroup() + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 1, len(consensusGroup)) + }) - bg.Consensus.UpdateBackendGroupConsensus(ctx) + t.Run("ban backend if tags are messed - latest < safe", func(t *testing.T) { + reset() + overrideBlock("node1", "safe", "0xb1") + overrideBlock("node1", "latest", "0xa1") + update() - require.Equal(t, "0x556", bg.Consensus.GetFinalizedBlockNumber().String()) - require.Equal(t, "0x552", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + + consensusGroup := bg.Consensus.GetConsensusGroup() + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 1, len(consensusGroup)) + }) + + t.Run("ban backend if tags are messed - safe dropped", func(t *testing.T) { + reset() + update() + overrideBlock("node1", "safe", "0xb1") + update() + + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + + consensusGroup := bg.Consensus.GetConsensusGroup() + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 1, len(consensusGroup)) + }) + + t.Run("ban backend if tags are messed - finalized dropped", func(t *testing.T) { + reset() + update() + overrideBlock("node1", "finalized", "0xa1") + update() + + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + + consensusGroup := bg.Consensus.GetConsensusGroup() + require.NotContains(t, consensusGroup, nodes["node1"].backend) + require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.Equal(t, 1, len(consensusGroup)) }) t.Run("broken consensus", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() - + reset() listenerCalled := false bg.Consensus.AddListener(func() { listenerCalled = true }) + update() - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + // all nodes start at block 0x101 + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) - // all nodes start at block 0x1 - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) + // advance latest on both nodes to 0x102 + overrideBlock("node1", "latest", "0x102") + overrideBlock("node2", "latest", "0x102") - // advance latest on both nodes to 0x2 - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), - }) + update() - // poll for group consensus - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - // at 0x2 - require.Equal(t, "0x2", bg.Consensus.GetLatestBlockNumber().String()) + // at 0x102 + require.Equal(t, "0x102", bg.Consensus.GetLatestBlockNumber().String()) // make node2 diverge on hash - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "0x2", - Response: buildGetBlockResponse("0x2", "wrong_hash"), - }) + overrideBlockHash("node2", "0x102", "0x102", "wrong_hash") - // poll for group consensus - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + update() - // should resolve to 0x1, since 0x2 is out of consensus at the moment - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) + // should resolve to 0x101, since 0x102 is out of consensus at the moment + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) + // everybody serving traffic + consensusGroup := bg.Consensus.GetConsensusGroup() + require.Equal(t, 2, len(consensusGroup)) + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend)) + + // onConsensusBroken listener was called require.True(t, listenerCalled) }) t.Run("broken consensus with depth 2", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - // all nodes start at block 0x1 - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) - - // advance latest on both nodes to 0x2 - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), + reset() + listenerCalled := false + bg.Consensus.AddListener(func() { + listenerCalled = true }) + update() - // poll for group consensus - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + // all nodes start at block 0x101 + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) - // at 0x2 - require.Equal(t, "0x2", bg.Consensus.GetLatestBlockNumber().String()) + // advance latest on both nodes to 0x102 + overrideBlock("node1", "latest", "0x102") + overrideBlock("node2", "latest", "0x102") + + update() + + // at 0x102 + require.Equal(t, "0x102", bg.Consensus.GetLatestBlockNumber().String()) // advance latest on both nodes to 0x3 - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x3", "hash3"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x3", "hash3"), - }) + overrideBlock("node1", "latest", "0x103") + overrideBlock("node2", "latest", "0x103") - // poll for group consensus - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + update() - // at 0x3 - require.Equal(t, "0x3", bg.Consensus.GetLatestBlockNumber().String()) + // at 0x103 + require.Equal(t, "0x103", bg.Consensus.GetLatestBlockNumber().String()) - // make node2 diverge on hash for blocks 0x2 and 0x3 - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "0x2", - Response: buildGetBlockResponse("0x2", "wrong_hash2"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "0x3", - Response: buildGetBlockResponse("0x3", "wrong_hash3"), - }) + // make node2 diverge on hash for blocks 0x102 and 0x103 + overrideBlockHash("node2", "0x102", "0x102", "wrong_hash_0x102") + overrideBlockHash("node2", "0x103", "0x103", "wrong_hash_0x103") - // poll for group consensus - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + update() - // should resolve to 0x1 - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) + // should resolve to 0x101 + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) + + // everybody serving traffic + consensusGroup := bg.Consensus.GetConsensusGroup() + require.Equal(t, 2, len(consensusGroup)) + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend)) + + // onConsensusBroken listener was called + require.True(t, listenerCalled) }) t.Run("fork in advanced block", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() + reset() + listenerCalled := false + bg.Consensus.AddListener(func() { + listenerCalled = true + }) + update() - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + // all nodes start at block 0x101 + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) - // all nodes start at block 0x1 - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) + // make nodes 1 and 2 advance in forks, i.e. they have same block number with different hashes + overrideBlockHash("node1", "0x102", "0x102", "node1_0x102") + overrideBlockHash("node2", "0x102", "0x102", "node2_0x102") + overrideBlockHash("node1", "0x103", "0x103", "node1_0x103") + overrideBlockHash("node2", "0x103", "0x103", "node2_0x103") + overrideBlockHash("node1", "latest", "0x103", "node1_0x103") + overrideBlockHash("node2", "latest", "0x103", "node2_0x103") - // make nodes 1 and 2 advance in forks - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "0x2", - Response: buildGetBlockResponse("0x2", "node1_0x2"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "0x2", - Response: buildGetBlockResponse("0x2", "node2_0x2"), - }) - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "0x3", - Response: buildGetBlockResponse("0x3", "node1_0x3"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "0x3", - Response: buildGetBlockResponse("0x3", "node2_0x3"), - }) - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x3", "node1_0x3"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x3", "node2_0x3"), - }) + update() - // poll for group consensus - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + // should resolve to 0x101, the highest common ancestor + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) - // should resolve to 0x1, the highest common ancestor - require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) + // everybody serving traffic + consensusGroup := bg.Consensus.GetConsensusGroup() + require.Equal(t, 2, len(consensusGroup)) + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend)) + + // onConsensusBroken listener should not be called + require.False(t, listenerCalled) }) t.Run("load balancing should hit both backends", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + reset() + update() require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup())) + // reset request counts node1.Reset() node2.Reset() @@ -591,7 +473,7 @@ func TestConsensus(t *testing.T) { numberReqs := len(consensusGroup) * 100 for numberReqs > 0 { - _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x1", false}) + _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x101", false}) require.NoError(t, err) require.Equal(t, 200, statusCode) numberReqs-- @@ -603,24 +485,10 @@ func TestConsensus(t *testing.T) { }) t.Run("load balancing should not hit if node is not healthy", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() - - // node1 should not be serving any traffic - h1.AddOverride(&ms.MethodTemplate{ - Method: "net_peerCount", - Block: "", - Response: buildPeerCountResponse(1), - }) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) + reset() + useOnlyNode1() + // reset request counts node1.Reset() node2.Reset() @@ -629,60 +497,24 @@ func TestConsensus(t *testing.T) { numberReqs := 10 for numberReqs > 0 { - _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x1", false}) + _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x101", false}) require.NoError(t, err) require.Equal(t, 200, statusCode) numberReqs-- } msg := fmt.Sprintf("n1 %d, n2 %d", len(node1.Requests()), len(node2.Requests())) - require.Equal(t, len(node1.Requests()), 0, msg) - require.Equal(t, len(node2.Requests()), 10, msg) + require.Equal(t, len(node1.Requests()), 10, msg) + require.Equal(t, len(node2.Requests()), 0, msg) }) t.Run("rewrite response of eth_blockNumber", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - node1.Reset() - node2.Reset() - bg.Consensus.Unban() - - // establish the consensus - - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), - }) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) + reset() + update() totalRequests := len(node1.Requests()) + len(node2.Requests()) - require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup())) - // pretend backends advanced in consensus, but we are still serving the latest value of the consensus - // until it gets updated again - - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x3", "hash3"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x3", "hash3"), - }) - resRaw, statusCode, err := client.SendRPC("eth_blockNumber", nil) require.NoError(t, err) require.Equal(t, 200, statusCode) @@ -690,37 +522,15 @@ func TestConsensus(t *testing.T) { var jsonMap map[string]interface{} err = json.Unmarshal(resRaw, &jsonMap) require.NoError(t, err) - require.Equal(t, "0x2", jsonMap["result"]) + require.Equal(t, "0x101", jsonMap["result"]) // no extra request hit the backends require.Equal(t, totalRequests, len(node1.Requests())+len(node2.Requests())) }) t.Run("rewrite request of eth_getBlockByNumber for latest", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() - - // establish the consensus and ban node2 for now - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "net_peerCount", - Block: "", - Response: buildPeerCountResponse(1), - }) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) - - node1.Reset() + reset() + useOnlyNode1() _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"latest"}) require.NoError(t, err) @@ -729,39 +539,12 @@ func TestConsensus(t *testing.T) { var jsonMap map[string]interface{} err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap) require.NoError(t, err) - require.Equal(t, "0x2", jsonMap["params"].([]interface{})[0]) + require.Equal(t, "0x101", jsonMap["params"].([]interface{})[0]) }) t.Run("rewrite request of eth_getBlockByNumber for finalized", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() - - // establish the consensus and ban node2 for now - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x20", "hash20"), - }) - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "finalized", - Response: buildGetBlockResponse("0x5", "hash5"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "net_peerCount", - Block: "", - Response: buildPeerCountResponse(1), - }) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) - - node1.Reset() + reset() + useOnlyNode1() _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"finalized"}) require.NoError(t, err) @@ -770,39 +553,12 @@ func TestConsensus(t *testing.T) { var jsonMap map[string]interface{} err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap) require.NoError(t, err) - require.Equal(t, "0x5", jsonMap["params"].([]interface{})[0]) + require.Equal(t, "0xc1", jsonMap["params"].([]interface{})[0]) }) t.Run("rewrite request of eth_getBlockByNumber for safe", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() - - // establish the consensus and ban node2 for now - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x20", "hash20"), - }) - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "safe", - Response: buildGetBlockResponse("0x1", "hash1"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "net_peerCount", - Block: "", - Response: buildPeerCountResponse(1), - }) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) - - node1.Reset() + reset() + useOnlyNode1() _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"safe"}) require.NoError(t, err) @@ -811,36 +567,14 @@ func TestConsensus(t *testing.T) { var jsonMap map[string]interface{} err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap) require.NoError(t, err) - require.Equal(t, "0x1", jsonMap["params"].([]interface{})[0]) + require.Equal(t, "0xe1", jsonMap["params"].([]interface{})[0]) }) t.Run("rewrite request of eth_getBlockByNumber - out of range", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() + reset() + useOnlyNode1() - // establish the consensus and ban node2 for now - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "net_peerCount", - Block: "", - Response: buildPeerCountResponse(1), - }) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) - - node1.Reset() - - resRaw, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x10"}) + resRaw, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x300"}) require.NoError(t, err) require.Equal(t, 400, statusCode) @@ -852,35 +586,13 @@ func TestConsensus(t *testing.T) { }) t.Run("batched rewrite", func(t *testing.T) { - h1.ResetOverrides() - h2.ResetOverrides() - bg.Consensus.Unban() - - // establish the consensus and ban node2 for now - h1.AddOverride(&ms.MethodTemplate{ - Method: "eth_getBlockByNumber", - Block: "latest", - Response: buildGetBlockResponse("0x2", "hash2"), - }) - h2.AddOverride(&ms.MethodTemplate{ - Method: "net_peerCount", - Block: "", - Response: buildPeerCountResponse(1), - }) - - for _, be := range bg.Backends { - bg.Consensus.UpdateBackend(ctx, be) - } - bg.Consensus.UpdateBackendGroupConsensus(ctx) - - require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) - - node1.Reset() + reset() + useOnlyNode1() resRaw, statusCode, err := client.SendBatchRPC( NewRPCReq("1", "eth_getBlockByNumber", []interface{}{"latest"}), - NewRPCReq("2", "eth_getBlockByNumber", []interface{}{"0x10"}), - NewRPCReq("3", "eth_getBlockByNumber", []interface{}{"0x1"})) + NewRPCReq("2", "eth_getBlockByNumber", []interface{}{"0x102"}), + NewRPCReq("3", "eth_getBlockByNumber", []interface{}{"0xe1"})) require.NoError(t, err) require.Equal(t, 200, statusCode) @@ -889,34 +601,15 @@ func TestConsensus(t *testing.T) { require.NoError(t, err) require.Equal(t, 3, len(jsonMap)) - // rewrite latest to 0x2 - require.Equal(t, "0x2", jsonMap[0]["result"].(map[string]interface{})["number"]) + // rewrite latest to 0x101 + require.Equal(t, "0x101", jsonMap[0]["result"].(map[string]interface{})["number"]) - // out of bounds for block 0x10 + // out of bounds for block 0x102 require.Equal(t, -32019, int(jsonMap[1]["error"].(map[string]interface{})["code"].(float64))) require.Equal(t, "block is out of range", jsonMap[1]["error"].(map[string]interface{})["message"]) - // dont rewrite for 0x1 - require.Equal(t, "0x1", jsonMap[2]["result"].(map[string]interface{})["number"]) - }) -} - -func backend(bg *proxyd.BackendGroup, name string) *proxyd.Backend { - for _, be := range bg.Backends { - if be.Name == name { - return be - } - } - return nil -} - -func buildPeerCountResponse(count uint64) string { - return buildResponse(hexutil.Uint64(count).String()) -} -func buildGetBlockResponse(number string, hash string) string { - return buildResponse(map[string]string{ - "number": number, - "hash": hash, + // dont rewrite for 0xe1 + require.Equal(t, "0xe1", jsonMap[2]["result"].(map[string]interface{})["number"]) }) } diff --git a/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml b/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml index ef5b692..12cc4cd 100644 --- a/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml +++ b/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml @@ -26,63 +26,85 @@ "jsonrpc": "2.0", "id": 67, "result": { - "hash": "hash1", - "number": "0x1" + "hash": "hash_0x101", + "number": "0x101" } } - method: eth_getBlockByNumber - block: 0x1 + block: 0x101 response: > { "jsonrpc": "2.0", "id": 67, "result": { - "hash": "hash1", - "number": "0x1" + "hash": "hash_0x101", + "number": "0x101" } } - method: eth_getBlockByNumber - block: 0x2 + block: 0x102 response: > { "jsonrpc": "2.0", "id": 67, "result": { - "hash": "hash2", - "number": "0x2" + "hash": "hash_0x102", + "number": "0x102" } } - method: eth_getBlockByNumber - block: 0x3 + block: 0x103 response: > { "jsonrpc": "2.0", "id": 67, "result": { - "hash": "hash3", - "number": "0x3" + "hash": "hash_0x103", + "number": "0x103" } } - method: eth_getBlockByNumber - block: finalized + block: 0x132 response: > { "jsonrpc": "2.0", "id": 67, "result": { - "hash": "hash_finalized", - "number": "0x555" + "hash": "hash_0x132", + "number": "0x132" } } - method: eth_getBlockByNumber - block: 0x555 + block: 0x133 response: > { "jsonrpc": "2.0", "id": 67, "result": { - "hash": "hash_finalized", - "number": "0x555" + "hash": "hash_0x133", + "number": "0x133" + } + } +- method: eth_getBlockByNumber + block: 0x134 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x134", + "number": "0x134" + } + } +- method: eth_getBlockByNumber + block: 0x200 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x200", + "number": "0x200" } } - method: eth_getBlockByNumber @@ -92,40 +114,40 @@ "jsonrpc": "2.0", "id": 67, "result": { - "hash": "hash_safe", - "number": "0x551" + "hash": "hash_0xe1", + "number": "0xe1" } } - method: eth_getBlockByNumber - block: 0x555 + block: 0xe1 response: > { "jsonrpc": "2.0", "id": 67, "result": { - "hash": "hash_safe", - "number": "0x551" + "hash": "hash_0xe1", + "number": "0xe1" } } - method: eth_getBlockByNumber - block: 0x5 + block: finalized response: > { "jsonrpc": "2.0", "id": 67, "result": { - "hash": "hash5", - "number": "0x5" + "hash": "hash_0xc1", + "number": "0xc1" } } - method: eth_getBlockByNumber - block: 0x20 + block: 0xc1 response: > { "jsonrpc": "2.0", "id": 67, "result": { - "hash": "hash20", - "number": "0x20" + "hash": "hash_0xc1", + "number": "0xc1" } } diff --git a/proxyd/proxyd/metrics.go b/proxyd/proxyd/metrics.go index 884e783..8c52ed4 100644 --- a/proxyd/proxyd/metrics.go +++ b/proxyd/proxyd/metrics.go @@ -246,6 +246,22 @@ var ( "backend_group_name", }) + consensusSafeBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: MetricsNamespace, + Name: "group_consensus_safe_block", + Help: "Consensus safe block", + }, []string{ + "backend_group_name", + }) + + consensusFinalizedBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: MetricsNamespace, + Name: "group_consensus_finalized_block", + Help: "Consensus finalized block", + }, []string{ + "backend_group_name", + }) + backendLatestBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: MetricsNamespace, Name: "backend_latest_block", @@ -254,6 +270,30 @@ var ( "backend_name", }) + backendSafeBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: MetricsNamespace, + Name: "backend_safe_block", + Help: "Current safe block observed per backend", + }, []string{ + "backend_name", + }) + + backendFinalizedBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: MetricsNamespace, + Name: "backend_finalized_block", + Help: "Current finalized block observed per backend", + }, []string{ + "backend_name", + }) + + backendUnexpectedBlockTagsBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: MetricsNamespace, + Name: "backend_unexpected_block_tags", + Help: "Bool gauge for unexpected block tags", + }, []string{ + "backend_name", + }) + consensusGroupCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: MetricsNamespace, Name: "group_consensus_count", @@ -318,18 +358,10 @@ var ( "backend_name", }) - networkErrorCountBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ + networkErrorRateBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: MetricsNamespace, - Name: "backend_net_error_count", - Help: "Network error count per backend", - }, []string{ - "backend_name", - }) - - requestCountBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: MetricsNamespace, - Name: "backend_request_count", - Help: "Request count per backend", + Name: "backend_error_rate", + Help: "Request error rate per backend", }, []string{ "backend_name", }) @@ -402,6 +434,14 @@ func RecordGroupConsensusLatestBlock(group *BackendGroup, blockNumber hexutil.Ui consensusLatestBlock.WithLabelValues(group.Name).Set(float64(blockNumber)) } +func RecordGroupConsensusSafeBlock(group *BackendGroup, blockNumber hexutil.Uint64) { + consensusSafeBlock.WithLabelValues(group.Name).Set(float64(blockNumber)) +} + +func RecordGroupConsensusFinalizedBlock(group *BackendGroup, blockNumber hexutil.Uint64) { + consensusFinalizedBlock.WithLabelValues(group.Name).Set(float64(blockNumber)) +} + func RecordGroupConsensusCount(group *BackendGroup, count int) { consensusGroupCount.WithLabelValues(group.Name).Set(float64(count)) } @@ -418,12 +458,20 @@ func RecordBackendLatestBlock(b *Backend, blockNumber hexutil.Uint64) { backendLatestBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber)) } +func RecordBackendSafeBlock(b *Backend, blockNumber hexutil.Uint64) { + backendSafeBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber)) +} + +func RecordBackendFinalizedBlock(b *Backend, blockNumber hexutil.Uint64) { + backendFinalizedBlockBackend.WithLabelValues(b.Name).Set(float64(blockNumber)) +} + +func RecordBackendUnexpectedBlockTags(b *Backend, unexpected bool) { + backendFinalizedBlockBackend.WithLabelValues(b.Name).Set(boolToFloat64(unexpected)) +} + func RecordConsensusBackendBanned(b *Backend, banned bool) { - v := float64(0) - if banned { - v = float64(1) - } - consensusBannedBackends.WithLabelValues(b.Name).Set(v) + consensusBannedBackends.WithLabelValues(b.Name).Set(boolToFloat64(banned)) } func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) { @@ -431,11 +479,7 @@ func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) { } func RecordConsensusBackendInSync(b *Backend, inSync bool) { - v := float64(0) - if inSync { - v = float64(1) - } - consensusInSyncBackend.WithLabelValues(b.Name).Set(v) + consensusInSyncBackend.WithLabelValues(b.Name).Set(boolToFloat64(inSync)) } func RecordConsensusBackendUpdateDelay(b *Backend, delay time.Duration) { @@ -446,10 +490,13 @@ func RecordBackendNetworkLatencyAverageSlidingWindow(b *Backend, avgLatency time avgLatencyBackend.WithLabelValues(b.Name).Set(float64(avgLatency.Milliseconds())) } -func RecordBackendNetworkRequestCountSlidingWindow(b *Backend, count uint) { - requestCountBackend.WithLabelValues(b.Name).Set(float64(count)) +func RecordBackendNetworkErrorRateSlidingWindow(b *Backend, rate float64) { + networkErrorRateBackend.WithLabelValues(b.Name).Set(rate) } -func RecordBackendNetworkErrorCountSlidingWindow(b *Backend, count uint) { - networkErrorCountBackend.WithLabelValues(b.Name).Set(float64(count)) +func boolToFloat64(b bool) float64 { + if b { + return 1 + } + return 0 } diff --git a/proxyd/proxyd/rewriter.go b/proxyd/proxyd/rewriter.go index ab098d8..08d5638 100644 --- a/proxyd/proxyd/rewriter.go +++ b/proxyd/proxyd/rewriter.go @@ -10,8 +10,8 @@ import ( type RewriteContext struct { latest hexutil.Uint64 - finalized hexutil.Uint64 safe hexutil.Uint64 + finalized hexutil.Uint64 } type RewriteResult uint8