Weighted shuffle list
This commit is contained in:
parent
2ac9981e00
commit
20dd9feaf7
@ -11,7 +11,6 @@ import (
|
|||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
"slices"
|
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@ -22,8 +21,8 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"github.com/mroth/weightedrand/v2"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/xaionaro-go/weightedshuffle"
|
||||||
"golang.org/x/sync/semaphore"
|
"golang.org/x/sync/semaphore"
|
||||||
|
|
||||||
sw "github.com/ethereum-optimism/optimism/proxyd/pkg/avg-sliding-window"
|
sw "github.com/ethereum-optimism/optimism/proxyd/pkg/avg-sliding-window"
|
||||||
@ -697,26 +696,6 @@ type BackendGroup struct {
|
|||||||
Backends []*Backend
|
Backends []*Backend
|
||||||
WeightedRouting bool
|
WeightedRouting bool
|
||||||
Consensus *ConsensusPoller
|
Consensus *ConsensusPoller
|
||||||
weightedChooser *weightedrand.Chooser[*Backend, int]
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewBackendGroup(name string, backends []*Backend, weightedRouting bool) (*BackendGroup, error) {
|
|
||||||
choices := make([]weightedrand.Choice[*Backend, int], len(backends))
|
|
||||||
for i, backend := range backends {
|
|
||||||
choices[i] = weightedrand.Choice[*Backend, int]{Item: backend, Weight: backend.weight}
|
|
||||||
}
|
|
||||||
|
|
||||||
chooser, err := weightedrand.NewChooser(choices...)
|
|
||||||
if err != nil && weightedRouting {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &BackendGroup{
|
|
||||||
Name: name,
|
|
||||||
Backends: backends,
|
|
||||||
WeightedRouting: weightedRouting,
|
|
||||||
weightedChooser: chooser,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) {
|
func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) {
|
||||||
@ -838,22 +817,6 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
|
|||||||
return nil, "", ErrNoBackends
|
return nil, "", ErrNoBackends
|
||||||
}
|
}
|
||||||
|
|
||||||
func moveBackendToStart(choice *Backend, options []*Backend) []*Backend {
|
|
||||||
result := make([]*Backend, 0, len(options))
|
|
||||||
|
|
||||||
if slices.Contains(options, choice) {
|
|
||||||
result = append(result, choice)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, opt := range options {
|
|
||||||
if opt != choice {
|
|
||||||
result = append(result, opt)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bg *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) {
|
func (bg *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) {
|
||||||
for _, back := range bg.Backends {
|
for _, back := range bg.Backends {
|
||||||
proxier, err := back.ProxyWS(clientConn, methodWhitelist)
|
proxier, err := back.ProxyWS(clientConn, methodWhitelist)
|
||||||
@ -891,15 +854,25 @@ func (bg *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn,
|
|||||||
return nil, ErrNoBackends
|
return nil, ErrNoBackends
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bg *BackendGroup) orderedBackendsForRequest() []*Backend {
|
func weightedShuffle(backends []*Backend) {
|
||||||
backends := bg.Backends
|
weight := func(i int) float64 {
|
||||||
if bg.Consensus != nil {
|
return float64(backends[i].weight)
|
||||||
backends = bg.loadBalancedConsensusGroup()
|
}
|
||||||
} else if bg.WeightedRouting {
|
|
||||||
choice := bg.weightedChooser.Pick()
|
weightedshuffle.ShuffleInplace(backends, weight, nil)
|
||||||
backends = moveBackendToStart(choice, backends)
|
}
|
||||||
|
|
||||||
|
func (bg *BackendGroup) orderedBackendsForRequest() []*Backend {
|
||||||
|
if bg.Consensus != nil {
|
||||||
|
return bg.loadBalancedConsensusGroup()
|
||||||
|
} else if bg.WeightedRouting {
|
||||||
|
result := make([]*Backend, len(bg.Backends))
|
||||||
|
copy(result, bg.Backends)
|
||||||
|
weightedShuffle(result)
|
||||||
|
return result
|
||||||
|
} else {
|
||||||
|
return bg.Backends
|
||||||
}
|
}
|
||||||
return backends
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bg *BackendGroup) loadBalancedConsensusGroup() []*Backend {
|
func (bg *BackendGroup) loadBalancedConsensusGroup() []*Backend {
|
||||||
@ -930,9 +903,7 @@ func (bg *BackendGroup) loadBalancedConsensusGroup() []*Backend {
|
|||||||
})
|
})
|
||||||
|
|
||||||
if bg.WeightedRouting {
|
if bg.WeightedRouting {
|
||||||
choice := bg.weightedChooser.Pick()
|
weightedShuffle(backendsHealthy)
|
||||||
backendsHealthy = moveBackendToStart(choice, backendsHealthy)
|
|
||||||
backendsDegraded = moveBackendToStart(choice, backendsDegraded)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// healthy are put into a priority position
|
// healthy are put into a priority position
|
||||||
|
@ -2,7 +2,6 @@ package proxyd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -20,117 +19,3 @@ func TestStripXFF(t *testing.T) {
|
|||||||
assert.Equal(t, test.out, actual)
|
assert.Equal(t, test.out, actual)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCreateBackendGroup(t *testing.T) {
|
|
||||||
unweightedOne := &Backend{
|
|
||||||
Name: "one",
|
|
||||||
weight: 0,
|
|
||||||
}
|
|
||||||
|
|
||||||
unweightedTwo := &Backend{
|
|
||||||
Name: "two",
|
|
||||||
weight: 0,
|
|
||||||
}
|
|
||||||
|
|
||||||
weightedOne := &Backend{
|
|
||||||
Name: "one",
|
|
||||||
weight: 1,
|
|
||||||
}
|
|
||||||
|
|
||||||
weightedTwo := &Backend{
|
|
||||||
Name: "two",
|
|
||||||
weight: 1,
|
|
||||||
}
|
|
||||||
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
backends []*Backend
|
|
||||||
weightedRouting bool
|
|
||||||
expectError bool
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "weighting disabled",
|
|
||||||
backends: []*Backend{unweightedOne, unweightedTwo},
|
|
||||||
weightedRouting: false,
|
|
||||||
expectError: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "weighting enabled -- all nodes have weight",
|
|
||||||
backends: []*Backend{weightedOne, weightedTwo},
|
|
||||||
weightedRouting: true,
|
|
||||||
expectError: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "weighting enabled -- some nodes have weight",
|
|
||||||
backends: []*Backend{weightedOne, unweightedTwo},
|
|
||||||
weightedRouting: true,
|
|
||||||
expectError: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "weighting enabled -- no nodes have weight",
|
|
||||||
backends: []*Backend{unweightedOne, unweightedTwo},
|
|
||||||
weightedRouting: true,
|
|
||||||
expectError: true,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
result, err := NewBackendGroup(test.name, test.backends, test.weightedRouting)
|
|
||||||
|
|
||||||
if test.expectError {
|
|
||||||
require.Error(t, err)
|
|
||||||
} else {
|
|
||||||
require.NoError(t, err)
|
|
||||||
assert.Equal(t, test.name, result.Name)
|
|
||||||
assert.Equal(t, test.backends, test.backends)
|
|
||||||
assert.Equal(t, test.weightedRouting, test.weightedRouting)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMoveIndexToStart(t *testing.T) {
|
|
||||||
one := &Backend{
|
|
||||||
Name: "one",
|
|
||||||
}
|
|
||||||
|
|
||||||
two := &Backend{
|
|
||||||
Name: "two",
|
|
||||||
}
|
|
||||||
|
|
||||||
three := &Backend{
|
|
||||||
Name: "three",
|
|
||||||
}
|
|
||||||
|
|
||||||
tests := []struct {
|
|
||||||
choice *Backend
|
|
||||||
input []*Backend
|
|
||||||
output []*Backend
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
choice: one,
|
|
||||||
input: []*Backend{one, two, three},
|
|
||||||
output: []*Backend{one, two, three},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
choice: two,
|
|
||||||
input: []*Backend{one, two, three},
|
|
||||||
output: []*Backend{two, one, three},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
choice: three,
|
|
||||||
input: []*Backend{one, two},
|
|
||||||
output: []*Backend{one, two},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
choice: one,
|
|
||||||
input: []*Backend{one},
|
|
||||||
output: []*Backend{one},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
result := moveBackendToStart(test.choice, test.input)
|
|
||||||
assert.Equal(t, test.output, result)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -12,13 +12,13 @@ require (
|
|||||||
github.com/gorilla/mux v1.8.0
|
github.com/gorilla/mux v1.8.0
|
||||||
github.com/gorilla/websocket v1.5.0
|
github.com/gorilla/websocket v1.5.0
|
||||||
github.com/hashicorp/golang-lru v1.0.2
|
github.com/hashicorp/golang-lru v1.0.2
|
||||||
github.com/mroth/weightedrand/v2 v2.1.0
|
|
||||||
github.com/pkg/errors v0.9.1
|
github.com/pkg/errors v0.9.1
|
||||||
github.com/prometheus/client_golang v1.17.0
|
github.com/prometheus/client_golang v1.17.0
|
||||||
github.com/redis/go-redis/v9 v9.2.1
|
github.com/redis/go-redis/v9 v9.2.1
|
||||||
github.com/rs/cors v1.10.1
|
github.com/rs/cors v1.10.1
|
||||||
github.com/stretchr/testify v1.8.4
|
github.com/stretchr/testify v1.8.4
|
||||||
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
|
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
|
||||||
|
github.com/xaionaro-go/weightedshuffle v0.0.0-20211213010739-6a74fbc7d24a
|
||||||
golang.org/x/sync v0.4.0
|
golang.org/x/sync v0.4.0
|
||||||
gopkg.in/yaml.v3 v3.0.1
|
gopkg.in/yaml.v3 v3.0.1
|
||||||
)
|
)
|
||||||
|
@ -143,8 +143,6 @@ github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQth
|
|||||||
github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY=
|
github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY=
|
||||||
github.com/mmcloughlin/addchain v0.4.0/go.mod h1:A86O+tHqZLMNO4w6ZZ4FlVQEadcoqkyU72HC5wJ4RlU=
|
github.com/mmcloughlin/addchain v0.4.0/go.mod h1:A86O+tHqZLMNO4w6ZZ4FlVQEadcoqkyU72HC5wJ4RlU=
|
||||||
github.com/mmcloughlin/profile v0.1.1/go.mod h1:IhHD7q1ooxgwTgjxQYkACGA77oFTDdFVejUS1/tS/qU=
|
github.com/mmcloughlin/profile v0.1.1/go.mod h1:IhHD7q1ooxgwTgjxQYkACGA77oFTDdFVejUS1/tS/qU=
|
||||||
github.com/mroth/weightedrand/v2 v2.1.0 h1:o1ascnB1CIVzsqlfArQQjeMy1U0NcIbBO5rfd5E/OeU=
|
|
||||||
github.com/mroth/weightedrand/v2 v2.1.0/go.mod h1:f2faGsfOGOwc1p94wzHKKZyTpcJUW7OJ/9U4yfiNAOU=
|
|
||||||
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
|
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
|
||||||
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
||||||
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
|
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
|
||||||
@ -200,6 +198,8 @@ github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFA
|
|||||||
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
|
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
|
||||||
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
|
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
|
||||||
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
|
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
|
||||||
|
github.com/xaionaro-go/weightedshuffle v0.0.0-20211213010739-6a74fbc7d24a h1:WS5nQycV+82Ndezq0UcMcGVG416PZgcJPqI/bLM824A=
|
||||||
|
github.com/xaionaro-go/weightedshuffle v0.0.0-20211213010739-6a74fbc7d24a/go.mod h1:0KAUfC65le2kMu4fnBxm7Xj3PkQ3MBpJbF5oMmqufBc=
|
||||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||||
github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE=
|
github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE=
|
||||||
|
@ -176,12 +176,11 @@ func Start(config *Config) (*Server, func(), error) {
|
|||||||
backends = append(backends, backendsByName[bName])
|
backends = append(backends, backendsByName[bName])
|
||||||
}
|
}
|
||||||
|
|
||||||
group, err := NewBackendGroup(bgName, backends, bg.WeightedRouting)
|
backendGroups[bgName] = &BackendGroup{
|
||||||
if err != nil {
|
Name: bgName,
|
||||||
return nil, nil, fmt.Errorf("error creating backend group %s: %w", bgName, err)
|
Backends: backends,
|
||||||
|
WeightedRouting: bg.WeightedRouting,
|
||||||
}
|
}
|
||||||
|
|
||||||
backendGroups[bgName] = group
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var wsBackendGroup *BackendGroup
|
var wsBackendGroup *BackendGroup
|
||||||
|
Loading…
Reference in New Issue
Block a user