From a5a4a5108da6c66fac5bd71db7512303a8da5584 Mon Sep 17 00:00:00 2001 From: Danyal Prout Date: Tue, 31 Oct 2023 20:41:55 -0500 Subject: [PATCH] proxyd: add weighting feature to node selection --- proxyd/proxyd/backend.go | 55 +++++++++++++++++++++++++++++++++-- proxyd/proxyd/backend_test.go | 49 +++++++++++++++++++++++++++++++ proxyd/proxyd/config.go | 4 +++ proxyd/proxyd/proxyd.go | 6 ++-- 4 files changed, 109 insertions(+), 5 deletions(-) diff --git a/proxyd/proxyd/backend.go b/proxyd/proxyd/backend.go index 6c699f0..fbc0b10 100644 --- a/proxyd/proxyd/backend.go +++ b/proxyd/proxyd/backend.go @@ -159,6 +159,8 @@ type Backend struct { latencySlidingWindow *sw.AvgSlidingWindow networkRequestsSlidingWindow *sw.AvgSlidingWindow networkErrorsSlidingWindow *sw.AvgSlidingWindow + + weight int } type BackendOpt func(b *Backend) @@ -239,6 +241,12 @@ func WithConsensusForcedCandidate(forcedCandidate bool) BackendOpt { } } +func WithWeight(weight int) BackendOpt { + return func(b *Backend) { + b.weight = weight + } +} + func WithMaxDegradedLatencyThreshold(maxDegradedLatencyThreshold time.Duration) BackendOpt { return func(b *Backend) { b.maxDegradedLatencyThreshold = maxDegradedLatencyThreshold @@ -683,9 +691,10 @@ func sortBatchRPCResponse(req []*RPCReq, res []*RPCRes) { } type BackendGroup struct { - Name string - Backends []*Backend - Consensus *ConsensusPoller + Name string + Backends []*Backend + UseWeightedRouting bool + Consensus *ConsensusPoller } func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) { @@ -741,6 +750,8 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch } } rpcReqs = rewrittenReqs + } else if bg.UseWeightedRouting { + backends = randomizeFirstBackendByWeight(backends) } rpcRequestsTotal.Inc() @@ -808,6 +819,39 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch return nil, "", ErrNoBackends } +func randomizeFirstBackendByWeight(backends []*Backend) []*Backend { + if len(backends) == 0 { + return backends + } + + totalWeight := 0 + for _, backend := range backends { + totalWeight += backend.weight + } + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + random := r.Intn(totalWeight) + currentSum := 0 + + for idx, backend := range backends { + currentSum += backend.weight + if currentSum > random { + return moveIndexToStart(backends, idx) + } + } + + log.Warn("unable to select weighted backend, using ordered input") + return backends +} + +func moveIndexToStart(backends []*Backend, index int) []*Backend { + result := make([]*Backend, 0, len(backends)) + result = append(result, backends[index]) + result = append(result, backends[:index]...) + result = append(result, backends[index+1:]...) + return result +} + func (bg *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) { for _, back := range bg.Backends { proxier, err := back.ProxyWS(clientConn, methodWhitelist) @@ -872,6 +916,11 @@ func (bg *BackendGroup) loadBalancedConsensusGroup() []*Backend { backendsDegraded[i], backendsDegraded[j] = backendsDegraded[j], backendsDegraded[i] }) + if bg.UseWeightedRouting { + backendsHealthy = randomizeFirstBackendByWeight(backendsHealthy) + backendsDegraded = randomizeFirstBackendByWeight(backendsDegraded) + } + // healthy are put into a priority position // degraded backends are used as fallback backendsHealthy = append(backendsHealthy, backendsDegraded...) diff --git a/proxyd/proxyd/backend_test.go b/proxyd/proxyd/backend_test.go index 7be23bf..3d027b4 100644 --- a/proxyd/proxyd/backend_test.go +++ b/proxyd/proxyd/backend_test.go @@ -19,3 +19,52 @@ func TestStripXFF(t *testing.T) { assert.Equal(t, test.out, actual) } } + +func TestMoveIndexToStart(t *testing.T) { + backends := []*Backend{ + { + Name: "node1", + }, + { + Name: "node1", + }, + { + Name: "node1", + }, + } + + tests := []struct { + index int + out []*Backend + }{ + { + index: 0, + out: []*Backend{ + backends[0], + backends[1], + backends[2], + }, + }, + { + index: 1, + out: []*Backend{ + backends[1], + backends[0], + backends[2], + }, + }, + { + index: 2, + out: []*Backend{ + backends[2], + backends[0], + backends[1], + }, + }, + } + + for _, test := range tests { + result := moveIndexToStart(backends, test.index) + assert.Equal(t, test.out, result) + } +} diff --git a/proxyd/proxyd/config.go b/proxyd/proxyd/config.go index 63f557c..1c01bec 100644 --- a/proxyd/proxyd/config.go +++ b/proxyd/proxyd/config.go @@ -98,6 +98,8 @@ type BackendConfig struct { ConsensusSkipPeerCountCheck bool `toml:"consensus_skip_peer_count"` ConsensusForcedCandidate bool `toml:"consensus_forced_candidate"` ConsensusReceiptsTarget string `toml:"consensus_receipts_target"` + + Weight int `toml:"weight"` } type BackendsConfig map[string]*BackendConfig @@ -105,6 +107,8 @@ type BackendsConfig map[string]*BackendConfig type BackendGroupConfig struct { Backends []string `toml:"backends"` + UseWeightedRouting bool `toml:"weighted_routing"` + ConsensusAware bool `toml:"consensus_aware"` ConsensusAsyncHandler string `toml:"consensus_handler"` diff --git a/proxyd/proxyd/proxyd.go b/proxyd/proxyd/proxyd.go index 84051ab..4474cf1 100644 --- a/proxyd/proxyd/proxyd.go +++ b/proxyd/proxyd/proxyd.go @@ -144,6 +144,7 @@ func Start(config *Config) (*Server, func(), error) { opts = append(opts, WithProxydIP(os.Getenv("PROXYD_IP"))) opts = append(opts, WithConsensusSkipPeerCountCheck(cfg.ConsensusSkipPeerCountCheck)) opts = append(opts, WithConsensusForcedCandidate(cfg.ConsensusForcedCandidate)) + opts = append(opts, WithWeight(cfg.Weight)) receiptsTarget, err := ReadFromEnvOrConfig(cfg.ConsensusReceiptsTarget) if err != nil { @@ -175,8 +176,9 @@ func Start(config *Config) (*Server, func(), error) { backends = append(backends, backendsByName[bName]) } group := &BackendGroup{ - Name: bgName, - Backends: backends, + Name: bgName, + UseWeightedRouting: bg.UseWeightedRouting, + Backends: backends, } backendGroups[bgName] = group }