Code review feedback

This commit is contained in:
Danyal Prout 2023-11-05 20:07:39 -06:00
parent bd7bd24e66
commit 2ac9981e00
5 changed files with 152 additions and 68 deletions

@ -11,6 +11,7 @@ import (
"math" "math"
"math/rand" "math/rand"
"net/http" "net/http"
"slices"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@ -21,6 +22,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/mroth/weightedrand/v2"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/semaphore" "golang.org/x/sync/semaphore"
@ -695,6 +697,26 @@ type BackendGroup struct {
Backends []*Backend Backends []*Backend
WeightedRouting bool WeightedRouting bool
Consensus *ConsensusPoller Consensus *ConsensusPoller
weightedChooser *weightedrand.Chooser[*Backend, int]
}
func NewBackendGroup(name string, backends []*Backend, weightedRouting bool) (*BackendGroup, error) {
choices := make([]weightedrand.Choice[*Backend, int], len(backends))
for i, backend := range backends {
choices[i] = weightedrand.Choice[*Backend, int]{Item: backend, Weight: backend.weight}
}
chooser, err := weightedrand.NewChooser(choices...)
if err != nil && weightedRouting {
return nil, err
}
return &BackendGroup{
Name: name,
Backends: backends,
WeightedRouting: weightedRouting,
weightedChooser: chooser,
}, nil
} }
func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) { func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) {
@ -702,7 +724,7 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
return nil, "", nil return nil, "", nil
} }
backends := bg.Backends backends := bg.orderedBackendsForRequest()
overriddenResponses := make([]*indexedReqRes, 0) overriddenResponses := make([]*indexedReqRes, 0)
rewrittenReqs := make([]*RPCReq, 0, len(rpcReqs)) rewrittenReqs := make([]*RPCReq, 0, len(rpcReqs))
@ -710,7 +732,6 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
if bg.Consensus != nil { if bg.Consensus != nil {
// When `consensus_aware` is set to `true`, the backend group acts as a load balancer // When `consensus_aware` is set to `true`, the backend group acts as a load balancer
// serving traffic from any backend that agrees in the consensus group // serving traffic from any backend that agrees in the consensus group
backends = bg.loadBalancedConsensusGroup()
// We also rewrite block tags to enforce compliance with consensus // We also rewrite block tags to enforce compliance with consensus
rctx := RewriteContext{ rctx := RewriteContext{
@ -750,8 +771,6 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
} }
} }
rpcReqs = rewrittenReqs rpcReqs = rewrittenReqs
} else if bg.WeightedRouting {
backends = randomizeFirstBackendByWeight(backends)
} }
rpcRequestsTotal.Inc() rpcRequestsTotal.Inc()
@ -819,36 +838,19 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
return nil, "", ErrNoBackends return nil, "", ErrNoBackends
} }
func randomizeFirstBackendByWeight(backends []*Backend) []*Backend { func moveBackendToStart(choice *Backend, options []*Backend) []*Backend {
if len(backends) <= 1 { result := make([]*Backend, 0, len(options))
return backends
if slices.Contains(options, choice) {
result = append(result, choice)
} }
totalWeight := 0 for _, opt := range options {
for _, backend := range backends { if opt != choice {
totalWeight += backend.weight result = append(result, opt)
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
random := r.Intn(totalWeight)
currentSum := 0
for idx, backend := range backends {
currentSum += backend.weight
if currentSum > random {
return moveIndexToStart(backends, idx)
} }
} }
log.Warn("unable to select weighted backend, using ordered input")
return backends
}
func moveIndexToStart(backends []*Backend, index int) []*Backend {
result := make([]*Backend, 0, len(backends))
result = append(result, backends[index])
result = append(result, backends[:index]...)
result = append(result, backends[index+1:]...)
return result return result
} }
@ -889,6 +891,17 @@ func (bg *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn,
return nil, ErrNoBackends return nil, ErrNoBackends
} }
func (bg *BackendGroup) orderedBackendsForRequest() []*Backend {
backends := bg.Backends
if bg.Consensus != nil {
backends = bg.loadBalancedConsensusGroup()
} else if bg.WeightedRouting {
choice := bg.weightedChooser.Pick()
backends = moveBackendToStart(choice, backends)
}
return backends
}
func (bg *BackendGroup) loadBalancedConsensusGroup() []*Backend { func (bg *BackendGroup) loadBalancedConsensusGroup() []*Backend {
cg := bg.Consensus.GetConsensusGroup() cg := bg.Consensus.GetConsensusGroup()
@ -917,8 +930,9 @@ func (bg *BackendGroup) loadBalancedConsensusGroup() []*Backend {
}) })
if bg.WeightedRouting { if bg.WeightedRouting {
backendsHealthy = randomizeFirstBackendByWeight(backendsHealthy) choice := bg.weightedChooser.Pick()
backendsDegraded = randomizeFirstBackendByWeight(backendsDegraded) backendsHealthy = moveBackendToStart(choice, backendsHealthy)
backendsDegraded = moveBackendToStart(choice, backendsDegraded)
} }
// healthy are put into a priority position // healthy are put into a priority position

@ -2,6 +2,7 @@ package proxyd
import ( import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing" "testing"
) )
@ -20,51 +21,116 @@ func TestStripXFF(t *testing.T) {
} }
} }
func TestMoveIndexToStart(t *testing.T) { func TestCreateBackendGroup(t *testing.T) {
backends := []*Backend{ unweightedOne := &Backend{
{ Name: "one",
Name: "node1", weight: 0,
}, }
{
Name: "node1", unweightedTwo := &Backend{
}, Name: "two",
{ weight: 0,
Name: "node1", }
},
weightedOne := &Backend{
Name: "one",
weight: 1,
}
weightedTwo := &Backend{
Name: "two",
weight: 1,
} }
tests := []struct { tests := []struct {
index int name string
out []*Backend backends []*Backend
weightedRouting bool
expectError bool
}{ }{
{ {
index: 0, name: "weighting disabled",
out: []*Backend{ backends: []*Backend{unweightedOne, unweightedTwo},
backends[0], weightedRouting: false,
backends[1], expectError: false,
backends[2],
},
}, },
{ {
index: 1, name: "weighting enabled -- all nodes have weight",
out: []*Backend{ backends: []*Backend{weightedOne, weightedTwo},
backends[1], weightedRouting: true,
backends[0], expectError: false,
backends[2],
},
}, },
{ {
index: 2, name: "weighting enabled -- some nodes have weight",
out: []*Backend{ backends: []*Backend{weightedOne, unweightedTwo},
backends[2], weightedRouting: true,
backends[0], expectError: false,
backends[1],
}, },
{
name: "weighting enabled -- no nodes have weight",
backends: []*Backend{unweightedOne, unweightedTwo},
weightedRouting: true,
expectError: true,
}, },
} }
for _, test := range tests { for _, test := range tests {
result := moveIndexToStart(backends, test.index) result, err := NewBackendGroup(test.name, test.backends, test.weightedRouting)
assert.Equal(t, test.out, result)
if test.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, test.name, result.Name)
assert.Equal(t, test.backends, test.backends)
assert.Equal(t, test.weightedRouting, test.weightedRouting)
}
}
}
func TestMoveIndexToStart(t *testing.T) {
one := &Backend{
Name: "one",
}
two := &Backend{
Name: "two",
}
three := &Backend{
Name: "three",
}
tests := []struct {
choice *Backend
input []*Backend
output []*Backend
}{
{
choice: one,
input: []*Backend{one, two, three},
output: []*Backend{one, two, three},
},
{
choice: two,
input: []*Backend{one, two, three},
output: []*Backend{two, one, three},
},
{
choice: three,
input: []*Backend{one, two},
output: []*Backend{one, two},
},
{
choice: one,
input: []*Backend{one},
output: []*Backend{one},
},
}
for _, test := range tests {
result := moveBackendToStart(test.choice, test.input)
assert.Equal(t, test.output, result)
} }
} }

@ -12,6 +12,7 @@ require (
github.com/gorilla/mux v1.8.0 github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0 github.com/gorilla/websocket v1.5.0
github.com/hashicorp/golang-lru v1.0.2 github.com/hashicorp/golang-lru v1.0.2
github.com/mroth/weightedrand/v2 v2.1.0
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.17.0 github.com/prometheus/client_golang v1.17.0
github.com/redis/go-redis/v9 v9.2.1 github.com/redis/go-redis/v9 v9.2.1

@ -143,6 +143,8 @@ github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQth
github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY= github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY=
github.com/mmcloughlin/addchain v0.4.0/go.mod h1:A86O+tHqZLMNO4w6ZZ4FlVQEadcoqkyU72HC5wJ4RlU= github.com/mmcloughlin/addchain v0.4.0/go.mod h1:A86O+tHqZLMNO4w6ZZ4FlVQEadcoqkyU72HC5wJ4RlU=
github.com/mmcloughlin/profile v0.1.1/go.mod h1:IhHD7q1ooxgwTgjxQYkACGA77oFTDdFVejUS1/tS/qU= github.com/mmcloughlin/profile v0.1.1/go.mod h1:IhHD7q1ooxgwTgjxQYkACGA77oFTDdFVejUS1/tS/qU=
github.com/mroth/weightedrand/v2 v2.1.0 h1:o1ascnB1CIVzsqlfArQQjeMy1U0NcIbBO5rfd5E/OeU=
github.com/mroth/weightedrand/v2 v2.1.0/go.mod h1:f2faGsfOGOwc1p94wzHKKZyTpcJUW7OJ/9U4yfiNAOU=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=

@ -175,11 +175,12 @@ func Start(config *Config) (*Server, func(), error) {
} }
backends = append(backends, backendsByName[bName]) backends = append(backends, backendsByName[bName])
} }
group := &BackendGroup{
Name: bgName, group, err := NewBackendGroup(bgName, backends, bg.WeightedRouting)
WeightedRouting: bg.WeightedRouting, if err != nil {
Backends: backends, return nil, nil, fmt.Errorf("error creating backend group %s: %w", bgName, err)
} }
backendGroups[bgName] = group backendGroups[bgName] = group
} }