proxyd: add weighting feature to node selection

This commit is contained in:
Danyal Prout 2023-10-31 20:41:55 -05:00
parent 798878e455
commit a5a4a5108d
4 changed files with 109 additions and 5 deletions

@ -159,6 +159,8 @@ type Backend struct {
latencySlidingWindow *sw.AvgSlidingWindow latencySlidingWindow *sw.AvgSlidingWindow
networkRequestsSlidingWindow *sw.AvgSlidingWindow networkRequestsSlidingWindow *sw.AvgSlidingWindow
networkErrorsSlidingWindow *sw.AvgSlidingWindow networkErrorsSlidingWindow *sw.AvgSlidingWindow
weight int
} }
type BackendOpt func(b *Backend) type BackendOpt func(b *Backend)
@ -239,6 +241,12 @@ func WithConsensusForcedCandidate(forcedCandidate bool) BackendOpt {
} }
} }
func WithWeight(weight int) BackendOpt {
return func(b *Backend) {
b.weight = weight
}
}
func WithMaxDegradedLatencyThreshold(maxDegradedLatencyThreshold time.Duration) BackendOpt { func WithMaxDegradedLatencyThreshold(maxDegradedLatencyThreshold time.Duration) BackendOpt {
return func(b *Backend) { return func(b *Backend) {
b.maxDegradedLatencyThreshold = maxDegradedLatencyThreshold b.maxDegradedLatencyThreshold = maxDegradedLatencyThreshold
@ -685,6 +693,7 @@ func sortBatchRPCResponse(req []*RPCReq, res []*RPCRes) {
type BackendGroup struct { type BackendGroup struct {
Name string Name string
Backends []*Backend Backends []*Backend
UseWeightedRouting bool
Consensus *ConsensusPoller Consensus *ConsensusPoller
} }
@ -741,6 +750,8 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
} }
} }
rpcReqs = rewrittenReqs rpcReqs = rewrittenReqs
} else if bg.UseWeightedRouting {
backends = randomizeFirstBackendByWeight(backends)
} }
rpcRequestsTotal.Inc() rpcRequestsTotal.Inc()
@ -808,6 +819,39 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
return nil, "", ErrNoBackends return nil, "", ErrNoBackends
} }
func randomizeFirstBackendByWeight(backends []*Backend) []*Backend {
if len(backends) == 0 {
return backends
}
totalWeight := 0
for _, backend := range backends {
totalWeight += backend.weight
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
random := r.Intn(totalWeight)
currentSum := 0
for idx, backend := range backends {
currentSum += backend.weight
if currentSum > random {
return moveIndexToStart(backends, idx)
}
}
log.Warn("unable to select weighted backend, using ordered input")
return backends
}
func moveIndexToStart(backends []*Backend, index int) []*Backend {
result := make([]*Backend, 0, len(backends))
result = append(result, backends[index])
result = append(result, backends[:index]...)
result = append(result, backends[index+1:]...)
return result
}
func (bg *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) { func (bg *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) {
for _, back := range bg.Backends { for _, back := range bg.Backends {
proxier, err := back.ProxyWS(clientConn, methodWhitelist) proxier, err := back.ProxyWS(clientConn, methodWhitelist)
@ -872,6 +916,11 @@ func (bg *BackendGroup) loadBalancedConsensusGroup() []*Backend {
backendsDegraded[i], backendsDegraded[j] = backendsDegraded[j], backendsDegraded[i] backendsDegraded[i], backendsDegraded[j] = backendsDegraded[j], backendsDegraded[i]
}) })
if bg.UseWeightedRouting {
backendsHealthy = randomizeFirstBackendByWeight(backendsHealthy)
backendsDegraded = randomizeFirstBackendByWeight(backendsDegraded)
}
// healthy are put into a priority position // healthy are put into a priority position
// degraded backends are used as fallback // degraded backends are used as fallback
backendsHealthy = append(backendsHealthy, backendsDegraded...) backendsHealthy = append(backendsHealthy, backendsDegraded...)

@ -19,3 +19,52 @@ func TestStripXFF(t *testing.T) {
assert.Equal(t, test.out, actual) assert.Equal(t, test.out, actual)
} }
} }
func TestMoveIndexToStart(t *testing.T) {
backends := []*Backend{
{
Name: "node1",
},
{
Name: "node1",
},
{
Name: "node1",
},
}
tests := []struct {
index int
out []*Backend
}{
{
index: 0,
out: []*Backend{
backends[0],
backends[1],
backends[2],
},
},
{
index: 1,
out: []*Backend{
backends[1],
backends[0],
backends[2],
},
},
{
index: 2,
out: []*Backend{
backends[2],
backends[0],
backends[1],
},
},
}
for _, test := range tests {
result := moveIndexToStart(backends, test.index)
assert.Equal(t, test.out, result)
}
}

@ -98,6 +98,8 @@ type BackendConfig struct {
ConsensusSkipPeerCountCheck bool `toml:"consensus_skip_peer_count"` ConsensusSkipPeerCountCheck bool `toml:"consensus_skip_peer_count"`
ConsensusForcedCandidate bool `toml:"consensus_forced_candidate"` ConsensusForcedCandidate bool `toml:"consensus_forced_candidate"`
ConsensusReceiptsTarget string `toml:"consensus_receipts_target"` ConsensusReceiptsTarget string `toml:"consensus_receipts_target"`
Weight int `toml:"weight"`
} }
type BackendsConfig map[string]*BackendConfig type BackendsConfig map[string]*BackendConfig
@ -105,6 +107,8 @@ type BackendsConfig map[string]*BackendConfig
type BackendGroupConfig struct { type BackendGroupConfig struct {
Backends []string `toml:"backends"` Backends []string `toml:"backends"`
UseWeightedRouting bool `toml:"weighted_routing"`
ConsensusAware bool `toml:"consensus_aware"` ConsensusAware bool `toml:"consensus_aware"`
ConsensusAsyncHandler string `toml:"consensus_handler"` ConsensusAsyncHandler string `toml:"consensus_handler"`

@ -144,6 +144,7 @@ func Start(config *Config) (*Server, func(), error) {
opts = append(opts, WithProxydIP(os.Getenv("PROXYD_IP"))) opts = append(opts, WithProxydIP(os.Getenv("PROXYD_IP")))
opts = append(opts, WithConsensusSkipPeerCountCheck(cfg.ConsensusSkipPeerCountCheck)) opts = append(opts, WithConsensusSkipPeerCountCheck(cfg.ConsensusSkipPeerCountCheck))
opts = append(opts, WithConsensusForcedCandidate(cfg.ConsensusForcedCandidate)) opts = append(opts, WithConsensusForcedCandidate(cfg.ConsensusForcedCandidate))
opts = append(opts, WithWeight(cfg.Weight))
receiptsTarget, err := ReadFromEnvOrConfig(cfg.ConsensusReceiptsTarget) receiptsTarget, err := ReadFromEnvOrConfig(cfg.ConsensusReceiptsTarget)
if err != nil { if err != nil {
@ -176,6 +177,7 @@ func Start(config *Config) (*Server, func(), error) {
} }
group := &BackendGroup{ group := &BackendGroup{
Name: bgName, Name: bgName,
UseWeightedRouting: bg.UseWeightedRouting,
Backends: backends, Backends: backends,
} }
backendGroups[bgName] = group backendGroups[bgName] = group