Merge pull request #7977 from danyalprout/proxyd-weighting

feat(proxyd): add weighting feature to node selection
This commit is contained in:
felipe 2023-11-11 09:56:01 +00:00 committed by GitHub
commit fef2f0f965
5 changed files with 52 additions and 9 deletions

@ -22,6 +22,7 @@ import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/xaionaro-go/weightedshuffle"
"golang.org/x/sync/semaphore" "golang.org/x/sync/semaphore"
sw "github.com/ethereum-optimism/optimism/proxyd/pkg/avg-sliding-window" sw "github.com/ethereum-optimism/optimism/proxyd/pkg/avg-sliding-window"
@ -160,6 +161,8 @@ type Backend struct {
latencySlidingWindow *sw.AvgSlidingWindow latencySlidingWindow *sw.AvgSlidingWindow
networkRequestsSlidingWindow *sw.AvgSlidingWindow networkRequestsSlidingWindow *sw.AvgSlidingWindow
networkErrorsSlidingWindow *sw.AvgSlidingWindow networkErrorsSlidingWindow *sw.AvgSlidingWindow
weight int
} }
type BackendOpt func(b *Backend) type BackendOpt func(b *Backend)
@ -246,6 +249,12 @@ func WithConsensusForcedCandidate(forcedCandidate bool) BackendOpt {
} }
} }
func WithWeight(weight int) BackendOpt {
return func(b *Backend) {
b.weight = weight
}
}
func WithMaxDegradedLatencyThreshold(maxDegradedLatencyThreshold time.Duration) BackendOpt { func WithMaxDegradedLatencyThreshold(maxDegradedLatencyThreshold time.Duration) BackendOpt {
return func(b *Backend) { return func(b *Backend) {
b.maxDegradedLatencyThreshold = maxDegradedLatencyThreshold b.maxDegradedLatencyThreshold = maxDegradedLatencyThreshold
@ -694,9 +703,10 @@ func sortBatchRPCResponse(req []*RPCReq, res []*RPCRes) {
} }
type BackendGroup struct { type BackendGroup struct {
Name string Name string
Backends []*Backend Backends []*Backend
Consensus *ConsensusPoller WeightedRouting bool
Consensus *ConsensusPoller
} }
func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) { func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) {
@ -704,7 +714,7 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
return nil, "", nil return nil, "", nil
} }
backends := bg.Backends backends := bg.orderedBackendsForRequest()
overriddenResponses := make([]*indexedReqRes, 0) overriddenResponses := make([]*indexedReqRes, 0)
rewrittenReqs := make([]*RPCReq, 0, len(rpcReqs)) rewrittenReqs := make([]*RPCReq, 0, len(rpcReqs))
@ -712,7 +722,6 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
if bg.Consensus != nil { if bg.Consensus != nil {
// When `consensus_aware` is set to `true`, the backend group acts as a load balancer // When `consensus_aware` is set to `true`, the backend group acts as a load balancer
// serving traffic from any backend that agrees in the consensus group // serving traffic from any backend that agrees in the consensus group
backends = bg.loadBalancedConsensusGroup()
// We also rewrite block tags to enforce compliance with consensus // We also rewrite block tags to enforce compliance with consensus
rctx := RewriteContext{ rctx := RewriteContext{
@ -856,6 +865,27 @@ func (bg *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn,
return nil, ErrNoBackends return nil, ErrNoBackends
} }
func weightedShuffle(backends []*Backend) {
weight := func(i int) float64 {
return float64(backends[i].weight)
}
weightedshuffle.ShuffleInplace(backends, weight, nil)
}
func (bg *BackendGroup) orderedBackendsForRequest() []*Backend {
if bg.Consensus != nil {
return bg.loadBalancedConsensusGroup()
} else if bg.WeightedRouting {
result := make([]*Backend, len(bg.Backends))
copy(result, bg.Backends)
weightedShuffle(result)
return result
} else {
return bg.Backends
}
}
func (bg *BackendGroup) loadBalancedConsensusGroup() []*Backend { func (bg *BackendGroup) loadBalancedConsensusGroup() []*Backend {
cg := bg.Consensus.GetConsensusGroup() cg := bg.Consensus.GetConsensusGroup()
@ -883,6 +913,10 @@ func (bg *BackendGroup) loadBalancedConsensusGroup() []*Backend {
backendsDegraded[i], backendsDegraded[j] = backendsDegraded[j], backendsDegraded[i] backendsDegraded[i], backendsDegraded[j] = backendsDegraded[j], backendsDegraded[i]
}) })
if bg.WeightedRouting {
weightedShuffle(backendsHealthy)
}
// healthy are put into a priority position // healthy are put into a priority position
// degraded backends are used as fallback // degraded backends are used as fallback
backendsHealthy = append(backendsHealthy, backendsDegraded...) backendsHealthy = append(backendsHealthy, backendsDegraded...)

@ -96,6 +96,8 @@ type BackendConfig struct {
StripTrailingXFF bool `toml:"strip_trailing_xff"` StripTrailingXFF bool `toml:"strip_trailing_xff"`
Headers map[string]string `toml:"headers"` Headers map[string]string `toml:"headers"`
Weight int `toml:"weight"`
ConsensusSkipPeerCountCheck bool `toml:"consensus_skip_peer_count"` ConsensusSkipPeerCountCheck bool `toml:"consensus_skip_peer_count"`
ConsensusForcedCandidate bool `toml:"consensus_forced_candidate"` ConsensusForcedCandidate bool `toml:"consensus_forced_candidate"`
ConsensusReceiptsTarget string `toml:"consensus_receipts_target"` ConsensusReceiptsTarget string `toml:"consensus_receipts_target"`
@ -106,6 +108,8 @@ type BackendsConfig map[string]*BackendConfig
type BackendGroupConfig struct { type BackendGroupConfig struct {
Backends []string `toml:"backends"` Backends []string `toml:"backends"`
WeightedRouting bool `toml:"weighted_routing"`
ConsensusAware bool `toml:"consensus_aware"` ConsensusAware bool `toml:"consensus_aware"`
ConsensusAsyncHandler string `toml:"consensus_handler"` ConsensusAsyncHandler string `toml:"consensus_handler"`

@ -18,6 +18,7 @@ require (
github.com/rs/cors v1.10.1 github.com/rs/cors v1.10.1
github.com/stretchr/testify v1.8.4 github.com/stretchr/testify v1.8.4
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/xaionaro-go/weightedshuffle v0.0.0-20211213010739-6a74fbc7d24a
golang.org/x/sync v0.4.0 golang.org/x/sync v0.4.0
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
) )

@ -198,6 +198,8 @@ github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFA
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/xaionaro-go/weightedshuffle v0.0.0-20211213010739-6a74fbc7d24a h1:WS5nQycV+82Ndezq0UcMcGVG416PZgcJPqI/bLM824A=
github.com/xaionaro-go/weightedshuffle v0.0.0-20211213010739-6a74fbc7d24a/go.mod h1:0KAUfC65le2kMu4fnBxm7Xj3PkQ3MBpJbF5oMmqufBc=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE= github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE=

@ -156,6 +156,7 @@ func Start(config *Config) (*Server, func(), error) {
opts = append(opts, WithProxydIP(os.Getenv("PROXYD_IP"))) opts = append(opts, WithProxydIP(os.Getenv("PROXYD_IP")))
opts = append(opts, WithConsensusSkipPeerCountCheck(cfg.ConsensusSkipPeerCountCheck)) opts = append(opts, WithConsensusSkipPeerCountCheck(cfg.ConsensusSkipPeerCountCheck))
opts = append(opts, WithConsensusForcedCandidate(cfg.ConsensusForcedCandidate)) opts = append(opts, WithConsensusForcedCandidate(cfg.ConsensusForcedCandidate))
opts = append(opts, WithWeight(cfg.Weight))
receiptsTarget, err := ReadFromEnvOrConfig(cfg.ConsensusReceiptsTarget) receiptsTarget, err := ReadFromEnvOrConfig(cfg.ConsensusReceiptsTarget)
if err != nil { if err != nil {
@ -186,11 +187,12 @@ func Start(config *Config) (*Server, func(), error) {
} }
backends = append(backends, backendsByName[bName]) backends = append(backends, backendsByName[bName])
} }
group := &BackendGroup{
Name: bgName, backendGroups[bgName] = &BackendGroup{
Backends: backends, Name: bgName,
Backends: backends,
WeightedRouting: bg.WeightedRouting,
} }
backendGroups[bgName] = group
} }
var wsBackendGroup *BackendGroup var wsBackendGroup *BackendGroup