diff --git a/.circleci/continue_config.yml b/.circleci/continue_config.yml index 4f741db..1fe6548 100644 --- a/.circleci/continue_config.yml +++ b/.circleci/continue_config.yml @@ -462,6 +462,8 @@ workflows: docker_name: op-conductor-mon docker_tags: <>,<> docker_context: . + requires: + - hold - docker-publish: name: op-conductor-mon-docker-publish docker_name: op-conductor-mon diff --git a/proxyd/Makefile b/proxyd/Makefile index d9ffb57..811c054 100644 --- a/proxyd/Makefile +++ b/proxyd/Makefile @@ -20,6 +20,6 @@ lint: go vet ./... .PHONY: test -test-fallback: - go test -v ./... -test.run ^TestFallback$ -.PHONY: test-fallback +test-%: + go test -v ./... -test.run ^Test$*$$ +.PHONY: test-% diff --git a/proxyd/backend.go b/proxyd/backend.go index 802b94a..7907a0d 100644 --- a/proxyd/backend.go +++ b/proxyd/backend.go @@ -158,9 +158,9 @@ type Backend struct { maxLatencyThreshold time.Duration maxErrorRateThreshold float64 - latencySlidingWindow *sw.AvgSlidingWindow - networkRequestsSlidingWindow *sw.AvgSlidingWindow - networkErrorsSlidingWindow *sw.AvgSlidingWindow + latencySlidingWindow *sw.AvgSlidingWindow + networkRequestsSlidingWindow *sw.AvgSlidingWindow + intermittentErrorsSlidingWindow *sw.AvgSlidingWindow weight int } @@ -279,6 +279,12 @@ func WithConsensusReceiptTarget(receiptsTarget string) BackendOpt { } } +func WithIntermittentNetworkErrorSlidingWindow(sw *sw.AvgSlidingWindow) BackendOpt { + return func(b *Backend) { + b.intermittentErrorsSlidingWindow = sw + } +} + type indexedReqRes struct { index int req *RPCReq @@ -328,9 +334,9 @@ func NewBackend( maxDegradedLatencyThreshold: 5 * time.Second, maxErrorRateThreshold: 0.5, - latencySlidingWindow: sw.NewSlidingWindow(), - networkRequestsSlidingWindow: sw.NewSlidingWindow(), - networkErrorsSlidingWindow: sw.NewSlidingWindow(), + latencySlidingWindow: sw.NewSlidingWindow(), + networkRequestsSlidingWindow: sw.NewSlidingWindow(), + intermittentErrorsSlidingWindow: sw.NewSlidingWindow(), } backend.Override(opts...) @@ -534,7 +540,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body)) if err != nil { - b.networkErrorsSlidingWindow.Incr() + b.intermittentErrorsSlidingWindow.Incr() RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, wrapErr(err, "error creating backend request") } @@ -560,7 +566,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool start := time.Now() httpRes, err := b.client.DoLimited(httpReq) if err != nil { - b.networkErrorsSlidingWindow.Incr() + b.intermittentErrorsSlidingWindow.Incr() RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, wrapErr(err, "error in backend request") } @@ -579,7 +585,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool // Alchemy returns a 400 on bad JSONs, so handle that case if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 { - b.networkErrorsSlidingWindow.Incr() + b.intermittentErrorsSlidingWindow.Incr() RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, fmt.Errorf("response code %d", httpRes.StatusCode) } @@ -590,7 +596,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool return nil, ErrBackendResponseTooLarge } if err != nil { - b.networkErrorsSlidingWindow.Incr() + b.intermittentErrorsSlidingWindow.Incr() RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, wrapErr(err, "error reading response body") } @@ -608,18 +614,18 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool if err := json.Unmarshal(resB, &rpcRes); err != nil { // Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method if responseIsNotBatched(resB) { - b.networkErrorsSlidingWindow.Incr() + b.intermittentErrorsSlidingWindow.Incr() RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, ErrBackendUnexpectedJSONRPC } - b.networkErrorsSlidingWindow.Incr() + b.intermittentErrorsSlidingWindow.Incr() RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, ErrBackendBadResponse } } if len(rpcReqs) != len(rpcRes) { - b.networkErrorsSlidingWindow.Incr() + b.intermittentErrorsSlidingWindow.Incr() RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, ErrBackendUnexpectedJSONRPC } @@ -670,7 +676,7 @@ func (b *Backend) ErrorRate() (errorRate float64) { // we only really start counting the error rate after a minimum of 10 requests // this is to avoid false positives when the backend is just starting up if b.networkRequestsSlidingWindow.Sum() >= 10 { - errorRate = b.networkErrorsSlidingWindow.Sum() / b.networkRequestsSlidingWindow.Sum() + errorRate = b.intermittentErrorsSlidingWindow.Sum() / b.networkRequestsSlidingWindow.Sum() } return errorRate } @@ -1266,6 +1272,11 @@ func RecordBatchRPCForward(ctx context.Context, backendName string, reqs []*RPCR } } +func (b *Backend) ClearSlidingWindows() { + b.intermittentErrorsSlidingWindow.Clear() + b.networkRequestsSlidingWindow.Clear() +} + func stripXFF(xff string) string { ipList := strings.Split(xff, ",") return strings.TrimSpace(ipList[0]) diff --git a/proxyd/consensus_poller.go b/proxyd/consensus_poller.go index 90af41d..c476d61 100644 --- a/proxyd/consensus_poller.go +++ b/proxyd/consensus_poller.go @@ -9,7 +9,6 @@ import ( "time" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/log" ) @@ -63,6 +62,24 @@ func (bs *backendState) IsBanned() bool { return time.Now().Before(bs.bannedUntil) } +func (bs *backendState) GetLatestBlock() (hexutil.Uint64, string) { + bs.backendStateMux.Lock() + defer bs.backendStateMux.Unlock() + return bs.latestBlockNumber, bs.latestBlockHash +} + +func (bs *backendState) GetSafeBlockNumber() hexutil.Uint64 { + bs.backendStateMux.Lock() + defer bs.backendStateMux.Unlock() + return bs.safeBlockNumber +} + +func (bs *backendState) GetFinalizedBlockNumber() hexutil.Uint64 { + bs.backendStateMux.Lock() + defer bs.backendStateMux.Unlock() + return bs.finalizedBlockNumber +} + // GetConsensusGroup returns the backend members that are agreeing in a consensus func (cp *ConsensusPoller) GetConsensusGroup() []*Backend { defer cp.consensusGroupMux.Unlock() @@ -287,7 +304,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller // UpdateBackend refreshes the consensus state of a single backend func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { - bs := cp.getBackendState(be) + bs := cp.GetBackendState(be) RecordConsensusBackendBanned(be, bs.IsBanned()) if bs.IsBanned() { @@ -306,6 +323,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { RecordConsensusBackendInSync(be, err == nil && inSync) if err != nil { log.Warn("error updating backend sync state", "name", be.Name, "err", err) + return } var peerCount uint64 @@ -313,23 +331,49 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { peerCount, err = cp.getPeerCount(ctx, be) if err != nil { log.Warn("error updating backend peer count", "name", be.Name, "err", err) + return + } + if peerCount == 0 { + log.Warn("peer count responded with 200 and 0 peers", "name", be.Name) + be.intermittentErrorsSlidingWindow.Incr() + return } RecordConsensusBackendPeerCount(be, peerCount) } latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest") if err != nil { - log.Warn("error updating backend - latest block", "name", be.Name, "err", err) + log.Warn("error updating backend - latest block will not be updated", "name", be.Name, "err", err) + return + } + if latestBlockNumber == 0 { + log.Warn("error backend responded a 200 with blockheight 0 for latest block", "name", be.Name) + be.intermittentErrorsSlidingWindow.Incr() + return } safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe") if err != nil { - log.Warn("error updating backend - safe block", "name", be.Name, "err", err) + log.Warn("error updating backend - safe block will not be updated", "name", be.Name, "err", err) + return + } + + if safeBlockNumber == 0 { + log.Warn("error backend responded a 200 with blockheight 0 for safe block", "name", be.Name) + be.intermittentErrorsSlidingWindow.Incr() + return } finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized") if err != nil { - log.Warn("error updating backend - finalized block", "name", be.Name, "err", err) + log.Warn("error updating backend - finalized block will not be updated", "name", be.Name, "err", err) + return + } + + if finalizedBlockNumber == 0 { + log.Warn("error backend responded a 200 with blockheight 0 for finalized block", "name", be.Name) + be.intermittentErrorsSlidingWindow.Incr() + return } RecordConsensusBackendUpdateDelay(be, bs.lastUpdate) @@ -523,6 +567,14 @@ func (cp *ConsensusPoller) IsBanned(be *Backend) bool { return bs.IsBanned() } +// IsBanned checks if a specific backend is banned +func (cp *ConsensusPoller) BannedUntil(be *Backend) time.Time { + bs := cp.backendState[be] + defer bs.backendStateMux.Unlock() + bs.backendStateMux.Lock() + return bs.bannedUntil +} + // Ban bans a specific backend func (cp *ConsensusPoller) Ban(be *Backend) { if be.forcedCandidate { @@ -618,8 +670,8 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo return res, nil } -// getBackendState creates a copy of backend state so that the caller can use it without locking -func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState { +// GetBackendState creates a copy of backend state so that the caller can use it without locking +func (cp *ConsensusPoller) GetBackendState(be *Backend) *backendState { bs := cp.backendState[be] defer bs.backendStateMux.Unlock() bs.backendStateMux.Lock() @@ -691,7 +743,7 @@ func (cp *ConsensusPoller) FilterCandidates(backends []*Backend) map[*Backend]*b for _, be := range backends { - bs := cp.getBackendState(be) + bs := cp.GetBackendState(be) if be.forcedCandidate { candidates[be] = bs continue diff --git a/proxyd/integration_tests/block_height_zero_and_network_errors_test.go b/proxyd/integration_tests/block_height_zero_and_network_errors_test.go new file mode 100644 index 0000000..fe7a040 --- /dev/null +++ b/proxyd/integration_tests/block_height_zero_and_network_errors_test.go @@ -0,0 +1,353 @@ +package integration_tests + +import ( + "context" + "net/http" + "os" + "path" + "testing" + + "time" + + "github.com/ethereum-optimism/optimism/proxyd" + sw "github.com/ethereum-optimism/optimism/proxyd/pkg/avg-sliding-window" + ms "github.com/ethereum-optimism/optimism/proxyd/tools/mockserver/handler" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/stretchr/testify/require" +) + +type bhZeroNodeContext struct { + backend *proxyd.Backend // this is the actual backend impl in proxyd + mockBackend *MockBackend // this is the fake backend that we can use to mock responses + handler *ms.MockedHandler // this is where we control the state of mocked responses + intermittentNetErrorWindow *sw.AvgSlidingWindow + clock *sw.AdjustableClock // this is where we control backend time +} + +// ts is a convenient method that must parse a time.Time from a string in format `"2006-01-02 15:04:05"` +func ts(s string) time.Time { + t, err := time.Parse(time.DateTime, s) + if err != nil { + panic(err) + } + return t +} + +func setupBlockHeightZero(t *testing.T) (map[string]*bhZeroNodeContext, *proxyd.BackendGroup, *ProxydHTTPClient, func()) { + // setup mock servers + node1 := NewMockBackend(nil) + node2 := NewMockBackend(nil) + + dir, err := os.Getwd() + require.NoError(t, err) + + responses := path.Join(dir, "testdata/block_height_zero_and_network_errors_responses.yaml") + + h1 := ms.MockedHandler{ + Overrides: []*ms.MethodTemplate{}, + Autoload: true, + AutoloadFile: responses, + } + h2 := ms.MockedHandler{ + Overrides: []*ms.MethodTemplate{}, + Autoload: true, + AutoloadFile: responses, + } + + require.NoError(t, os.Setenv("NODE1_URL", node1.URL())) + require.NoError(t, os.Setenv("NODE2_URL", node2.URL())) + + node1.SetHandler(http.HandlerFunc(h1.Handler)) + node2.SetHandler(http.HandlerFunc(h2.Handler)) + + // setup proxyd + config := ReadConfig("block_height_zero_and_network_errors") + svr, shutdown, err := proxyd.Start(config) + require.NoError(t, err) + + // expose the proxyd client + client := NewProxydClient("http://127.0.0.1:8545") + + // expose the backend group + bg := svr.BackendGroups["node"] + + require.NotNil(t, bg) + require.NotNil(t, bg.Consensus) + require.Equal(t, 2, len(bg.Backends)) + + // convenient mapping to access the nodes + nodes := map[string]*bhZeroNodeContext{ + "node1": { + mockBackend: node1, + backend: bg.Backends[0], + handler: &h1, + }, + "node2": { + mockBackend: node2, + backend: bg.Backends[1], + handler: &h2, + }, + } + + return nodes, bg, client, shutdown +} + +func TestBlockHeightZero(t *testing.T) { + nodes, bg, _, shutdown := setupBlockHeightZero(t) + defer nodes["node1"].mockBackend.Close() + defer nodes["node2"].mockBackend.Close() + defer shutdown() + + ctx := context.Background() + + addTimeToBackend := func(node string, ts time.Duration) { + mockBackend, ok := nodes[node] + require.True(t, ok, "Fatal error bad node key for override clock") + mockBackend.clock.Set(mockBackend.clock.Now().Add(ts)) + } + + // poll for updated consensus + update := func() { + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + addTimeToBackend("node1", 3*time.Second) + addTimeToBackend("node2", 3*time.Second) + } + + // convenient methods to manipulate state and mock responses + reset := func() { + for _, node := range nodes { + node.handler.ResetOverrides() + node.mockBackend.Reset() + node.backend.ClearSlidingWindows() + } + bg.Consensus.ClearListeners() + bg.Consensus.Reset() + + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend)) + + now := ts("2023-04-21 15:00:00") + clock := sw.NewAdjustableClock(now) + b1 := nodes["node1"] + b2 := nodes["node2"] + b1.intermittentNetErrorWindow = sw.NewSlidingWindow( + sw.WithWindowLength(5*time.Minute), + sw.WithBucketSize(time.Second), + sw.WithClock(clock)) + + b2.intermittentNetErrorWindow = sw.NewSlidingWindow( + sw.WithWindowLength(5*time.Minute), + sw.WithBucketSize(time.Second), + sw.WithClock(clock)) + + b1.clock = clock + b2.clock = clock + b1.backend.Override(proxyd.WithIntermittentNetworkErrorSlidingWindow(b1.intermittentNetErrorWindow)) + b2.backend.Override(proxyd.WithIntermittentNetworkErrorSlidingWindow(b2.intermittentNetErrorWindow)) + nodes["node1"] = b1 + nodes["node2"] = b2 + + require.Zero(t, nodes["node1"].intermittentNetErrorWindow.Count()) + require.Zero(t, nodes["node2"].intermittentNetErrorWindow.Count()) + + } + + override := func(node string, method string, block string, response string, responseCode int) { + if _, ok := nodes[node]; !ok { + t.Fatalf("node %s does not exist in the nodes map", node) + } + nodes[node].handler.AddOverride(&ms.MethodTemplate{ + Method: method, + Block: block, + Response: response, + ResponseCode: responseCode, + }) + } + + overrideBlock := func(node string, blockRequest string, blockResponse string, responseCode int) { + override(node, + "eth_getBlockByNumber", + blockRequest, + buildResponse(map[string]string{ + "number": blockResponse, + "hash": "hash_" + blockResponse, + }), + responseCode, + ) + } + + overridePeerCount := func(node string, count int, responseCode int) { + override(node, "net_peerCount", "", buildResponse(hexutil.Uint64(count).String()), responseCode) + } + + type blockHeights struct { + latestBlockNumber hexutil.Uint64 + latestBlockHash string + safeBlockNumber hexutil.Uint64 + finalizedBlockNumber hexutil.Uint64 + } + + getBlockHeights := func(node string) blockHeights { + bs := bg.Consensus.GetBackendState(nodes[node].backend) + lB, lHash := bs.GetLatestBlock() + sB := bs.GetSafeBlockNumber() + fB := bs.GetFinalizedBlockNumber() + return blockHeights{ + latestBlockNumber: lB, + latestBlockHash: lHash, + safeBlockNumber: sB, + finalizedBlockNumber: fB, + } + } + + for _, blockState := range []string{"latest", "finalized", "safe"} { + + t.Run("Test that the backend will not be banned if "+blockState+" responds 429", func(t *testing.T) { + reset() + update() + bh1 := getBlockHeights("node1") + overrideBlock("node1", blockState, "0x101", 429) + update() + bh2 := getBlockHeights("node1") + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend)) + require.Equal(t, bh1.latestBlockNumber.String(), bh2.latestBlockNumber.String()) + require.Equal(t, bh1.safeBlockNumber.String(), bh2.safeBlockNumber.String()) + require.Equal(t, bh1.finalizedBlockNumber.String(), bh2.finalizedBlockNumber.String()) + require.Equal(t, nodes["node1"].intermittentNetErrorWindow.Count(), uint(1)) + require.Equal(t, nodes["node2"].intermittentNetErrorWindow.Count(), uint(0)) + }) + + // Write a test which will check the sliding window increments each time by one + t.Run("Test that the backend will not be banned and single increment of window if "+blockState+" responds 500", func(t *testing.T) { + reset() + update() + bh1 := getBlockHeights("node1") + overrideBlock("node1", blockState, "0x101", 500) + update() + bh2 := getBlockHeights("node1") + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend)) + require.Equal(t, bh1.latestBlockNumber.String(), bh2.latestBlockNumber.String()) + require.Equal(t, bh1.safeBlockNumber.String(), bh2.safeBlockNumber.String()) + require.Equal(t, bh1.finalizedBlockNumber.String(), bh2.finalizedBlockNumber.String()) + require.Equal(t, nodes["node1"].intermittentNetErrorWindow.Count(), uint(1)) + require.Equal(t, nodes["node2"].intermittentNetErrorWindow.Count(), uint(0)) + }) + + t.Run("Test that the backend will not be banned and single increment of window if "+blockState+" responds 0 and 200", func(t *testing.T) { + reset() + update() + bh1 := getBlockHeights("node2") + overrideBlock("node2", blockState, "0x0", 200) + update() + bh2 := getBlockHeights("node2") + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend)) + + require.Equal(t, bh1.latestBlockNumber.String(), bh2.latestBlockNumber.String()) + require.Equal(t, bh1.safeBlockNumber.String(), bh2.safeBlockNumber.String()) + require.Equal(t, bh1.finalizedBlockNumber.String(), bh2.finalizedBlockNumber.String()) + require.Equal(t, nodes["node1"].intermittentNetErrorWindow.Count(), uint(0)) + require.Equal(t, nodes["node2"].intermittentNetErrorWindow.Count(), uint(1)) + }) + + } + + t.Run("Test that the backend will not be banned and single increment of window if latest responds 200 with block height zero", func(t *testing.T) { + reset() + update() + overrideBlock("node1", "latest", "0x0", 200) + bh1 := getBlockHeights("node1") + update() + bh2 := getBlockHeights("node1") + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend)) + + require.Equal(t, bh1.latestBlockNumber.String(), bh2.latestBlockNumber.String()) + require.Equal(t, bh1.safeBlockNumber.String(), bh2.safeBlockNumber.String()) + require.Equal(t, bh1.finalizedBlockNumber.String(), bh2.finalizedBlockNumber.String()) + require.Equal(t, nodes["node1"].intermittentNetErrorWindow.Count(), uint(1)) + require.Equal(t, nodes["node2"].intermittentNetErrorWindow.Count(), uint(0)) + }) + + t.Run("Test that the backend will not be banned if latest responds 5xx for peer count", func(t *testing.T) { + reset() + update() + overridePeerCount("node2", 59, 500) + bh1 := getBlockHeights("node2") + update() + bh2 := getBlockHeights("node2") + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend)) + + require.Equal(t, bh1.latestBlockNumber.String(), bh2.latestBlockNumber.String()) + require.Equal(t, bh1.safeBlockNumber.String(), bh2.safeBlockNumber.String()) + require.Equal(t, bh1.finalizedBlockNumber.String(), bh2.finalizedBlockNumber.String()) + + require.Equal(t, nodes["node1"].intermittentNetErrorWindow.Count(), uint(0)) + require.Equal(t, nodes["node2"].intermittentNetErrorWindow.Count(), uint(1)) + }) + + t.Run("Test that the backend will not be banned if latest responds 4xx for peer count", func(t *testing.T) { + reset() + update() + overridePeerCount("node1", 59, 429) + bh1 := getBlockHeights("node1") + update() + bh2 := getBlockHeights("node1") + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend)) + + require.Equal(t, bh1.latestBlockNumber.String(), bh2.latestBlockNumber.String()) + require.Equal(t, bh1.safeBlockNumber.String(), bh2.safeBlockNumber.String()) + require.Equal(t, bh1.finalizedBlockNumber.String(), bh2.finalizedBlockNumber.String()) + + require.Equal(t, nodes["node1"].intermittentNetErrorWindow.Count(), uint(1)) + require.Equal(t, nodes["node2"].intermittentNetErrorWindow.Count(), uint(0)) + }) + + t.Run("Test that the backend will not be banned if latest responds 200 and 0 for peer count", func(t *testing.T) { + reset() + update() + bh1 := getBlockHeights("node1") + overridePeerCount("node1", 0, 200) + update() + bh2 := getBlockHeights("node1") + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend)) + + require.Equal(t, bh1.latestBlockNumber.String(), bh2.latestBlockNumber.String()) + require.Equal(t, bh1.safeBlockNumber.String(), bh2.safeBlockNumber.String()) + require.Equal(t, bh1.finalizedBlockNumber.String(), bh2.finalizedBlockNumber.String()) + + require.Equal(t, nodes["node1"].intermittentNetErrorWindow.Count(), uint(1)) + require.Equal(t, nodes["node2"].intermittentNetErrorWindow.Count(), uint(0)) + }) + + t.Run("Test that if it breaches the network error threshold the node will be banned", func(t *testing.T) { + reset() + update() + overrideBlock("node1", "latest", "0x0", 500) + overrideBlock("node1", "safe", "0x0", 429) + overrideBlock("node1", "finalized", "0x0", 403) + overridePeerCount("node1", 0, 500) + + for i := 1; i < 7; i++ { + require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend), "Execpted node 1 to be not banned on iteration ", i) + require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend), "Execpted node 2 to be not banned on iteration ", i) + update() + // On the 5th update (i=6), node 1 will be banned due to error rate and not increment window + if i < 6 { + require.Equal(t, nodes["node1"].intermittentNetErrorWindow.Count(), uint(i)) + } + require.Equal(t, nodes["node2"].intermittentNetErrorWindow.Count(), uint(0)) + } + require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend)) + }) + +} diff --git a/proxyd/integration_tests/consensus_test.go b/proxyd/integration_tests/consensus_test.go index 654b7a5..15256eb 100644 --- a/proxyd/integration_tests/consensus_test.go +++ b/proxyd/integration_tests/consensus_test.go @@ -102,9 +102,11 @@ func TestConsensus(t *testing.T) { for _, node := range nodes { node.handler.ResetOverrides() node.mockBackend.Reset() + node.backend.ClearSlidingWindows() } bg.Consensus.ClearListeners() bg.Consensus.Reset() + } override := func(node string, method string, block string, response string) { @@ -311,7 +313,11 @@ func TestConsensus(t *testing.T) { consensusGroup := bg.Consensus.GetConsensusGroup() require.NotContains(t, consensusGroup, nodes["node1"].backend) - require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend)) + require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend), + fmt.Sprintf("Expected Node to be banned. \n\tCurrent Time: %s \n\tBanned Until: %s", + time.Now().Format("01-02-2006 15:04:05"), + bg.Consensus.BannedUntil(nodes["node1"].backend).Format("01-02-2006 15:04:05")), + ) require.Equal(t, 0, len(consensusGroup)) }) diff --git a/proxyd/integration_tests/fallback_test.go b/proxyd/integration_tests/fallback_test.go index c5b3e48..c4a1e4c 100644 --- a/proxyd/integration_tests/fallback_test.go +++ b/proxyd/integration_tests/fallback_test.go @@ -182,7 +182,7 @@ func TestFallback(t *testing.T) { Fallback will be returned subsequent update */ triggerFirstNormalFailure := func() { - overridePeerCount("normal", 0) + overridePeerCount("normal", 1) update() require.True(t, containsNode(bg.Consensus.GetConsensusGroup(), "fallback")) require.False(t, containsNode(bg.Consensus.GetConsensusGroup(), "normal")) diff --git a/proxyd/integration_tests/testdata/block_height_zero_and_network_errors.toml b/proxyd/integration_tests/testdata/block_height_zero_and_network_errors.toml new file mode 100644 index 0000000..99a2521 --- /dev/null +++ b/proxyd/integration_tests/testdata/block_height_zero_and_network_errors.toml @@ -0,0 +1,35 @@ +[server] +rpc_port = 8545 + +[backend] +response_timeout_seconds = 1 +max_degraded_latency_threshold = "30ms" +max_error_rate_threshold = 0.25 + +[backends] +[backends.node1] +rpc_url = "$NODE1_URL" + +[backends.node2] +rpc_url = "$NODE2_URL" + +[backend_groups] +[backend_groups.node] +backends = ["node1", "node2"] +consensus_aware = true +consensus_handler = "noop" # allow more control over the consensus poller for tests + +## Consensus Ban Need to set very large, becaue consensus poller uses system clock, not adjustable clock +## if a certain test case takes longer than 15m it may break +consensus_ban_period = "15m" + +consensus_max_update_threshold = "2m" +consensus_max_block_lag = 8 +consensus_min_peer_count = 4 + +[rpc_method_mappings] +eth_call = "node" +eth_chainId = "node" +eth_blockNumber = "node" +eth_getBlockByNumber = "node" +consensus_getReceipts = "node" diff --git a/proxyd/integration_tests/testdata/block_height_zero_and_network_errors_responses.yaml b/proxyd/integration_tests/testdata/block_height_zero_and_network_errors_responses.yaml new file mode 100644 index 0000000..642c334 --- /dev/null +++ b/proxyd/integration_tests/testdata/block_height_zero_and_network_errors_responses.yaml @@ -0,0 +1,234 @@ +- method: eth_chainId + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": "hello", + } +- method: net_peerCount + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": "0x10" + } +- method: eth_syncing + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": false + } +- method: eth_getBlockByNumber + block: latest + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x101", + "number": "0x101" + } + } +- method: eth_getBlockByNumber + block: 0x101 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x101", + "number": "0x101" + } + } +- method: eth_getBlockByNumber + block: 0x102 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x102", + "number": "0x102" + } + } +- method: eth_getBlockByNumber + block: 0x103 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x103", + "number": "0x103" + } + } +- method: eth_getBlockByNumber + block: 0x10a + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x10a", + "number": "0x10a" + } + } +- method: eth_getBlockByNumber + block: 0x132 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x132", + "number": "0x132" + } + } +- method: eth_getBlockByNumber + block: 0x133 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x133", + "number": "0x133" + } + } +- method: eth_getBlockByNumber + block: 0x134 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x134", + "number": "0x134" + } + } +- method: eth_getBlockByNumber + block: 0x200 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x200", + "number": "0x200" + } + } +- method: eth_getBlockByNumber + block: 0x91 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x91", + "number": "0x91" + } + } +- method: eth_getBlockByNumber + block: safe + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0xe1", + "number": "0xe1" + } + } +- method: eth_getBlockByNumber + block: 0xe1 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0xe1", + "number": "0xe1" + } + } +- method: eth_getBlockByNumber + block: finalized + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0xc1", + "number": "0xc1" + } + } +- method: eth_getBlockByNumber + block: 0xc1 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0xc1", + "number": "0xc1" + } + } +- method: eth_getBlockByNumber + block: 0xd1 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0xd1", + "number": "0xd1" + } + } +- method: debug_getRawReceipts + block: 0x55 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "_": "debug_getRawReceipts" + } + } +- method: debug_getRawReceipts + block: 0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "_": "debug_getRawReceipts" + } + } +- method: debug_getRawReceipts + block: 0x101 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "_": "debug_getRawReceipts" + } + } +- method: eth_getTransactionReceipt + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "_": "eth_getTransactionReceipt" + } + } +- method: alchemy_getTransactionReceipts + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "_": "alchemy_getTransactionReceipts" + } + } diff --git a/proxyd/integration_tests/testdata/multicall_responses.yml b/proxyd/integration_tests/testdata/multicall_responses.yml new file mode 100644 index 0000000..642c334 --- /dev/null +++ b/proxyd/integration_tests/testdata/multicall_responses.yml @@ -0,0 +1,234 @@ +- method: eth_chainId + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": "hello", + } +- method: net_peerCount + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": "0x10" + } +- method: eth_syncing + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": false + } +- method: eth_getBlockByNumber + block: latest + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x101", + "number": "0x101" + } + } +- method: eth_getBlockByNumber + block: 0x101 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x101", + "number": "0x101" + } + } +- method: eth_getBlockByNumber + block: 0x102 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x102", + "number": "0x102" + } + } +- method: eth_getBlockByNumber + block: 0x103 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x103", + "number": "0x103" + } + } +- method: eth_getBlockByNumber + block: 0x10a + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x10a", + "number": "0x10a" + } + } +- method: eth_getBlockByNumber + block: 0x132 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x132", + "number": "0x132" + } + } +- method: eth_getBlockByNumber + block: 0x133 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x133", + "number": "0x133" + } + } +- method: eth_getBlockByNumber + block: 0x134 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x134", + "number": "0x134" + } + } +- method: eth_getBlockByNumber + block: 0x200 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x200", + "number": "0x200" + } + } +- method: eth_getBlockByNumber + block: 0x91 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0x91", + "number": "0x91" + } + } +- method: eth_getBlockByNumber + block: safe + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0xe1", + "number": "0xe1" + } + } +- method: eth_getBlockByNumber + block: 0xe1 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0xe1", + "number": "0xe1" + } + } +- method: eth_getBlockByNumber + block: finalized + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0xc1", + "number": "0xc1" + } + } +- method: eth_getBlockByNumber + block: 0xc1 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0xc1", + "number": "0xc1" + } + } +- method: eth_getBlockByNumber + block: 0xd1 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_0xd1", + "number": "0xd1" + } + } +- method: debug_getRawReceipts + block: 0x55 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "_": "debug_getRawReceipts" + } + } +- method: debug_getRawReceipts + block: 0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "_": "debug_getRawReceipts" + } + } +- method: debug_getRawReceipts + block: 0x101 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "_": "debug_getRawReceipts" + } + } +- method: eth_getTransactionReceipt + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "_": "eth_getTransactionReceipt" + } + } +- method: alchemy_getTransactionReceipts + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "_": "alchemy_getTransactionReceipts" + } + } diff --git a/proxyd/pkg/avg-sliding-window/sliding.go b/proxyd/pkg/avg-sliding-window/sliding.go index 70c40be..a05978c 100644 --- a/proxyd/pkg/avg-sliding-window/sliding.go +++ b/proxyd/pkg/avg-sliding-window/sliding.go @@ -186,3 +186,11 @@ func (sw *AvgSlidingWindow) Count() uint { sw.advance() return sw.qty } + +// advance evicts old data points +func (sw *AvgSlidingWindow) Clear() { + defer sw.mux.Unlock() + sw.mux.Lock() + sw.qty = 0 + sw.sum = 0.0 +} diff --git a/proxyd/tools/mockserver/handler/handler.go b/proxyd/tools/mockserver/handler/handler.go index 0f9bfca..ef0af26 100644 --- a/proxyd/tools/mockserver/handler/handler.go +++ b/proxyd/tools/mockserver/handler/handler.go @@ -16,9 +16,10 @@ import ( ) type MethodTemplate struct { - Method string `yaml:"method"` - Block string `yaml:"block"` - Response string `yaml:"response"` + Method string `yaml:"method"` + Block string `yaml:"block"` + Response string `yaml:"response"` + ResponseCode int `yaml:"response_code"` } type MockedHandler struct { @@ -85,6 +86,9 @@ func (mh *MockedHandler) Handler(w http.ResponseWriter, req *http.Request) { for _, r := range template { if r.Method == method && r.Block == block { selectedResponse = r.Response + if r.ResponseCode != 0 { + w.WriteHeader(r.ResponseCode) + } } } if selectedResponse != "" {