diff --git a/proxyd/proxyd/backend.go b/proxyd/proxyd/backend.go index 98a5da6..6381994 100644 --- a/proxyd/proxyd/backend.go +++ b/proxyd/proxyd/backend.go @@ -11,6 +11,7 @@ import ( "math" "math/rand" "net/http" + "slices" "sort" "strconv" "strings" @@ -21,6 +22,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" "github.com/gorilla/websocket" + "github.com/mroth/weightedrand/v2" "github.com/prometheus/client_golang/prometheus" "golang.org/x/sync/semaphore" @@ -695,6 +697,26 @@ type BackendGroup struct { Backends []*Backend WeightedRouting bool Consensus *ConsensusPoller + weightedChooser *weightedrand.Chooser[*Backend, int] +} + +func NewBackendGroup(name string, backends []*Backend, weightedRouting bool) (*BackendGroup, error) { + choices := make([]weightedrand.Choice[*Backend, int], len(backends)) + for i, backend := range backends { + choices[i] = weightedrand.Choice[*Backend, int]{Item: backend, Weight: backend.weight} + } + + chooser, err := weightedrand.NewChooser(choices...) + if err != nil && weightedRouting { + return nil, err + } + + return &BackendGroup{ + Name: name, + Backends: backends, + WeightedRouting: weightedRouting, + weightedChooser: chooser, + }, nil } func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) { @@ -702,7 +724,7 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch return nil, "", nil } - backends := bg.Backends + backends := bg.orderedBackendsForRequest() overriddenResponses := make([]*indexedReqRes, 0) rewrittenReqs := make([]*RPCReq, 0, len(rpcReqs)) @@ -710,7 +732,6 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch if bg.Consensus != nil { // When `consensus_aware` is set to `true`, the backend group acts as a load balancer // serving traffic from any backend that agrees in the consensus group - backends = bg.loadBalancedConsensusGroup() // We also rewrite block tags to enforce compliance with consensus rctx := RewriteContext{ @@ -750,8 +771,6 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch } } rpcReqs = rewrittenReqs - } else if bg.WeightedRouting { - backends = randomizeFirstBackendByWeight(backends) } rpcRequestsTotal.Inc() @@ -819,36 +838,19 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch return nil, "", ErrNoBackends } -func randomizeFirstBackendByWeight(backends []*Backend) []*Backend { - if len(backends) <= 1 { - return backends +func moveBackendToStart(choice *Backend, options []*Backend) []*Backend { + result := make([]*Backend, 0, len(options)) + + if slices.Contains(options, choice) { + result = append(result, choice) } - totalWeight := 0 - for _, backend := range backends { - totalWeight += backend.weight - } - - r := rand.New(rand.NewSource(time.Now().UnixNano())) - random := r.Intn(totalWeight) - currentSum := 0 - - for idx, backend := range backends { - currentSum += backend.weight - if currentSum > random { - return moveIndexToStart(backends, idx) + for _, opt := range options { + if opt != choice { + result = append(result, opt) } } - log.Warn("unable to select weighted backend, using ordered input") - return backends -} - -func moveIndexToStart(backends []*Backend, index int) []*Backend { - result := make([]*Backend, 0, len(backends)) - result = append(result, backends[index]) - result = append(result, backends[:index]...) - result = append(result, backends[index+1:]...) return result } @@ -889,6 +891,17 @@ func (bg *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, return nil, ErrNoBackends } +func (bg *BackendGroup) orderedBackendsForRequest() []*Backend { + backends := bg.Backends + if bg.Consensus != nil { + backends = bg.loadBalancedConsensusGroup() + } else if bg.WeightedRouting { + choice := bg.weightedChooser.Pick() + backends = moveBackendToStart(choice, backends) + } + return backends +} + func (bg *BackendGroup) loadBalancedConsensusGroup() []*Backend { cg := bg.Consensus.GetConsensusGroup() @@ -917,8 +930,9 @@ func (bg *BackendGroup) loadBalancedConsensusGroup() []*Backend { }) if bg.WeightedRouting { - backendsHealthy = randomizeFirstBackendByWeight(backendsHealthy) - backendsDegraded = randomizeFirstBackendByWeight(backendsDegraded) + choice := bg.weightedChooser.Pick() + backendsHealthy = moveBackendToStart(choice, backendsHealthy) + backendsDegraded = moveBackendToStart(choice, backendsDegraded) } // healthy are put into a priority position diff --git a/proxyd/proxyd/backend_test.go b/proxyd/proxyd/backend_test.go index 3d027b4..c5a95e6 100644 --- a/proxyd/proxyd/backend_test.go +++ b/proxyd/proxyd/backend_test.go @@ -2,6 +2,7 @@ package proxyd import ( "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "testing" ) @@ -20,51 +21,116 @@ func TestStripXFF(t *testing.T) { } } -func TestMoveIndexToStart(t *testing.T) { - backends := []*Backend{ - { - Name: "node1", - }, - { - Name: "node1", - }, - { - Name: "node1", - }, +func TestCreateBackendGroup(t *testing.T) { + unweightedOne := &Backend{ + Name: "one", + weight: 0, + } + + unweightedTwo := &Backend{ + Name: "two", + weight: 0, + } + + weightedOne := &Backend{ + Name: "one", + weight: 1, + } + + weightedTwo := &Backend{ + Name: "two", + weight: 1, } tests := []struct { - index int - out []*Backend + name string + backends []*Backend + weightedRouting bool + expectError bool }{ { - index: 0, - out: []*Backend{ - backends[0], - backends[1], - backends[2], - }, + name: "weighting disabled", + backends: []*Backend{unweightedOne, unweightedTwo}, + weightedRouting: false, + expectError: false, }, { - index: 1, - out: []*Backend{ - backends[1], - backends[0], - backends[2], - }, + name: "weighting enabled -- all nodes have weight", + backends: []*Backend{weightedOne, weightedTwo}, + weightedRouting: true, + expectError: false, }, { - index: 2, - out: []*Backend{ - backends[2], - backends[0], - backends[1], - }, + name: "weighting enabled -- some nodes have weight", + backends: []*Backend{weightedOne, unweightedTwo}, + weightedRouting: true, + expectError: false, + }, + { + name: "weighting enabled -- no nodes have weight", + backends: []*Backend{unweightedOne, unweightedTwo}, + weightedRouting: true, + expectError: true, }, } for _, test := range tests { - result := moveIndexToStart(backends, test.index) - assert.Equal(t, test.out, result) + result, err := NewBackendGroup(test.name, test.backends, test.weightedRouting) + + if test.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.name, result.Name) + assert.Equal(t, test.backends, test.backends) + assert.Equal(t, test.weightedRouting, test.weightedRouting) + } + } + +} + +func TestMoveIndexToStart(t *testing.T) { + one := &Backend{ + Name: "one", + } + + two := &Backend{ + Name: "two", + } + + three := &Backend{ + Name: "three", + } + + tests := []struct { + choice *Backend + input []*Backend + output []*Backend + }{ + { + choice: one, + input: []*Backend{one, two, three}, + output: []*Backend{one, two, three}, + }, + { + choice: two, + input: []*Backend{one, two, three}, + output: []*Backend{two, one, three}, + }, + { + choice: three, + input: []*Backend{one, two}, + output: []*Backend{one, two}, + }, + { + choice: one, + input: []*Backend{one}, + output: []*Backend{one}, + }, + } + + for _, test := range tests { + result := moveBackendToStart(test.choice, test.input) + assert.Equal(t, test.output, result) } } diff --git a/proxyd/proxyd/go.mod b/proxyd/proxyd/go.mod index cbab0d5..99d21a1 100644 --- a/proxyd/proxyd/go.mod +++ b/proxyd/proxyd/go.mod @@ -12,6 +12,7 @@ require ( github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.5.0 github.com/hashicorp/golang-lru v1.0.2 + github.com/mroth/weightedrand/v2 v2.1.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.17.0 github.com/redis/go-redis/v9 v9.2.1 diff --git a/proxyd/proxyd/go.sum b/proxyd/proxyd/go.sum index a54ffb5..a250ff9 100644 --- a/proxyd/proxyd/go.sum +++ b/proxyd/proxyd/go.sum @@ -143,6 +143,8 @@ github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQth github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY= github.com/mmcloughlin/addchain v0.4.0/go.mod h1:A86O+tHqZLMNO4w6ZZ4FlVQEadcoqkyU72HC5wJ4RlU= github.com/mmcloughlin/profile v0.1.1/go.mod h1:IhHD7q1ooxgwTgjxQYkACGA77oFTDdFVejUS1/tS/qU= +github.com/mroth/weightedrand/v2 v2.1.0 h1:o1ascnB1CIVzsqlfArQQjeMy1U0NcIbBO5rfd5E/OeU= +github.com/mroth/weightedrand/v2 v2.1.0/go.mod h1:f2faGsfOGOwc1p94wzHKKZyTpcJUW7OJ/9U4yfiNAOU= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= diff --git a/proxyd/proxyd/proxyd.go b/proxyd/proxyd/proxyd.go index 8db0de0..b99f417 100644 --- a/proxyd/proxyd/proxyd.go +++ b/proxyd/proxyd/proxyd.go @@ -175,11 +175,12 @@ func Start(config *Config) (*Server, func(), error) { } backends = append(backends, backendsByName[bName]) } - group := &BackendGroup{ - Name: bgName, - WeightedRouting: bg.WeightedRouting, - Backends: backends, + + group, err := NewBackendGroup(bgName, backends, bg.WeightedRouting) + if err != nil { + return nil, nil, fmt.Errorf("error creating backend group %s: %w", bgName, err) } + backendGroups[bgName] = group }