From 58e3a35677392900e6e9c569b98feb846c7ca08b Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Thu, 27 Apr 2023 17:13:18 -0700 Subject: [PATCH] proxyd: round-robin lb for consensus group --- proxyd/proxyd/backend.go | 39 +++++++++++++++++- .../integration_tests/consensus_test.go | 41 +++++++++++++++++++ .../integration_tests/testdata/consensus.toml | 2 +- .../testdata/consensus_responses.yml | 7 ++++ 4 files changed, 87 insertions(+), 2 deletions(-) diff --git a/proxyd/proxyd/backend.go b/proxyd/proxyd/backend.go index 4fcfb26..ed95ac9 100644 --- a/proxyd/proxyd/backend.go +++ b/proxyd/proxyd/backend.go @@ -591,9 +591,46 @@ func (b *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch b return nil, nil } + backends := b.Backends + + // When `consensus_aware` is set to `true`, the backend group acts as a load balancer + // serving traffic from any backend that agrees in the consensus group + if b.Consensus != nil { + cg := b.Consensus.GetConsensusGroup() + backendsHealthy := make([]*Backend, 0, len(cg)) + backendsDegraded := make([]*Backend, 0, len(cg)) + // separate into unhealthy, degraded and healthy backends + for _, be := range cg { + // unhealthy are filtered out and not attempted + if !be.IsHealthy() { + continue + } + if be.IsDegraded() { + backendsDegraded = append(backendsDegraded, be) + continue + } + backendsHealthy = append(backendsHealthy, be) + } + + // shuffle both slices + r := rand.New(rand.NewSource(time.Now().UnixNano())) + r.Shuffle(len(backendsHealthy), func(i, j int) { + backendsHealthy[i], backendsHealthy[j] = backendsHealthy[j], backendsHealthy[i] + }) + r = rand.New(rand.NewSource(time.Now().UnixNano())) + r.Shuffle(len(backendsDegraded), func(i, j int) { + backendsDegraded[i], backendsDegraded[j] = backendsDegraded[j], backendsDegraded[i] + }) + + // healthy are put into a priority position + // degraded backends are used as fallback + backends = backendsHealthy + backends = append(backends, backendsDegraded...) + } + rpcRequestsTotal.Inc() - for _, back := range b.Backends { + for _, back := range backends { res, err := back.Forward(ctx, rpcReqs, isBatch) if errors.Is(err, ErrMethodNotWhitelisted) { return nil, err diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index 6875118..b55a30c 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -3,6 +3,7 @@ package integration_tests import ( "context" "encoding/json" + "fmt" "net/http" "os" "path" @@ -47,6 +48,7 @@ func TestConsensus(t *testing.T) { ctx := context.Background() svr, shutdown, err := proxyd.Start(config) require.NoError(t, err) + client := NewProxydClient("http://127.0.0.1:8545") defer shutdown() bg := svr.BackendGroups["node"] @@ -355,6 +357,45 @@ func TestConsensus(t *testing.T) { // should resolve to 0x1, the highest common ancestor require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) }) + + t.Run("load balancing should hit both backends", func(t *testing.T) { + h1.ResetOverrides() + h2.ResetOverrides() + bg.Consensus.Unban() + + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup())) + + node1.Reset() + node2.Reset() + + require.Equal(t, 0, len(node1.Requests())) + require.Equal(t, 0, len(node2.Requests())) + + // there is a random component to this test, + // since our round-robin implementation shuffles the ordering + // to achieve uniform distribution + + // so we just make 100 requests per backend and expect the number of requests to be somewhat balanced + // i.e. each backend should be hit minimally by at least 50% of the requests + consensusGroup := bg.Consensus.GetConsensusGroup() + + numberReqs := len(consensusGroup) * 100 + for numberReqs > 0 { + _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x1", false}) + require.NoError(t, err) + require.Equal(t, 200, statusCode) + numberReqs-- + } + + msg := fmt.Sprintf("n1 %d, n2 %d", len(node1.Requests()), len(node2.Requests())) + require.GreaterOrEqual(t, len(node1.Requests()), 50, msg) + require.GreaterOrEqual(t, len(node2.Requests()), 50, msg) + }) } func backend(bg *proxyd.BackendGroup, name string) *proxyd.Backend { diff --git a/proxyd/proxyd/integration_tests/testdata/consensus.toml b/proxyd/proxyd/integration_tests/testdata/consensus.toml index dbd8e26..bc9b43e 100644 --- a/proxyd/proxyd/integration_tests/testdata/consensus.toml +++ b/proxyd/proxyd/integration_tests/testdata/consensus.toml @@ -1,5 +1,5 @@ [server] -rpc_port = 8080 +rpc_port = 8545 [backend] response_timeout_seconds = 1 diff --git a/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml b/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml index 10b8f52..83579e7 100644 --- a/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml +++ b/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml @@ -1,3 +1,10 @@ +- method: eth_chainId + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": "hello", + } - method: net_peerCount response: > {