address comments
This commit is contained in:
parent
58e3a35677
commit
c2ba3b1c55
@ -596,36 +596,7 @@ func (b *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch b
|
||||
// When `consensus_aware` is set to `true`, the backend group acts as a load balancer
|
||||
// serving traffic from any backend that agrees in the consensus group
|
||||
if b.Consensus != nil {
|
||||
cg := b.Consensus.GetConsensusGroup()
|
||||
backendsHealthy := make([]*Backend, 0, len(cg))
|
||||
backendsDegraded := make([]*Backend, 0, len(cg))
|
||||
// separate into unhealthy, degraded and healthy backends
|
||||
for _, be := range cg {
|
||||
// unhealthy are filtered out and not attempted
|
||||
if !be.IsHealthy() {
|
||||
continue
|
||||
}
|
||||
if be.IsDegraded() {
|
||||
backendsDegraded = append(backendsDegraded, be)
|
||||
continue
|
||||
}
|
||||
backendsHealthy = append(backendsHealthy, be)
|
||||
}
|
||||
|
||||
// shuffle both slices
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
r.Shuffle(len(backendsHealthy), func(i, j int) {
|
||||
backendsHealthy[i], backendsHealthy[j] = backendsHealthy[j], backendsHealthy[i]
|
||||
})
|
||||
r = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
r.Shuffle(len(backendsDegraded), func(i, j int) {
|
||||
backendsDegraded[i], backendsDegraded[j] = backendsDegraded[j], backendsDegraded[i]
|
||||
})
|
||||
|
||||
// healthy are put into a priority position
|
||||
// degraded backends are used as fallback
|
||||
backends = backendsHealthy
|
||||
backends = append(backends, backendsDegraded...)
|
||||
backends = b.loadBalancedConsensusGroup()
|
||||
}
|
||||
|
||||
rpcRequestsTotal.Inc()
|
||||
@ -707,6 +678,40 @@ func (b *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn,
|
||||
return nil, ErrNoBackends
|
||||
}
|
||||
|
||||
func (b *BackendGroup) loadBalancedConsensusGroup() []*Backend {
|
||||
cg := b.Consensus.GetConsensusGroup()
|
||||
|
||||
backendsHealthy := make([]*Backend, 0, len(cg))
|
||||
backendsDegraded := make([]*Backend, 0, len(cg))
|
||||
// separate into healthy, degraded and unhealthy backends
|
||||
for _, be := range cg {
|
||||
// unhealthy are filtered out and not attempted
|
||||
if !be.IsHealthy() {
|
||||
continue
|
||||
}
|
||||
if be.IsDegraded() {
|
||||
backendsDegraded = append(backendsDegraded, be)
|
||||
continue
|
||||
}
|
||||
backendsHealthy = append(backendsHealthy, be)
|
||||
}
|
||||
|
||||
// shuffle both slices
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
r.Shuffle(len(backendsHealthy), func(i, j int) {
|
||||
backendsHealthy[i], backendsHealthy[j] = backendsHealthy[j], backendsHealthy[i]
|
||||
})
|
||||
r.Shuffle(len(backendsDegraded), func(i, j int) {
|
||||
backendsDegraded[i], backendsDegraded[j] = backendsDegraded[j], backendsDegraded[i]
|
||||
})
|
||||
|
||||
// healthy are put into a priority position
|
||||
// degraded backends are used as fallback
|
||||
backendsHealthy = append(backendsHealthy, backendsDegraded...)
|
||||
|
||||
return backendsHealthy
|
||||
}
|
||||
|
||||
func calcBackoff(i int) time.Duration {
|
||||
jitter := float64(rand.Int63n(250))
|
||||
ms := math.Min(math.Pow(2, float64(i))*1000+jitter, 3000)
|
||||
|
@ -78,7 +78,6 @@ func TestConsensus(t *testing.T) {
|
||||
h2.ResetOverrides()
|
||||
bg.Consensus.Unban()
|
||||
|
||||
// advance latest on node2 to 0x2
|
||||
h1.AddOverride(&ms.MethodTemplate{
|
||||
Method: "net_peerCount",
|
||||
Block: "",
|
||||
@ -396,6 +395,44 @@ func TestConsensus(t *testing.T) {
|
||||
require.GreaterOrEqual(t, len(node1.Requests()), 50, msg)
|
||||
require.GreaterOrEqual(t, len(node2.Requests()), 50, msg)
|
||||
})
|
||||
|
||||
t.Run("load balancing should not hit if node is not healthy", func(t *testing.T) {
|
||||
h1.ResetOverrides()
|
||||
h2.ResetOverrides()
|
||||
bg.Consensus.Unban()
|
||||
|
||||
// node1 should not be serving any traffic
|
||||
h1.AddOverride(&ms.MethodTemplate{
|
||||
Method: "net_peerCount",
|
||||
Block: "",
|
||||
Response: buildPeerCountResponse(1),
|
||||
})
|
||||
|
||||
for _, be := range bg.Backends {
|
||||
bg.Consensus.UpdateBackend(ctx, be)
|
||||
}
|
||||
bg.Consensus.UpdateBackendGroupConsensus(ctx)
|
||||
|
||||
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
|
||||
|
||||
node1.Reset()
|
||||
node2.Reset()
|
||||
|
||||
require.Equal(t, 0, len(node1.Requests()))
|
||||
require.Equal(t, 0, len(node2.Requests()))
|
||||
|
||||
numberReqs := 10
|
||||
for numberReqs > 0 {
|
||||
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x1", false})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 200, statusCode)
|
||||
numberReqs--
|
||||
}
|
||||
|
||||
msg := fmt.Sprintf("n1 %d, n2 %d", len(node1.Requests()), len(node2.Requests()))
|
||||
require.Equal(t, len(node1.Requests()), 0, msg)
|
||||
require.Equal(t, len(node2.Requests()), 10, msg)
|
||||
})
|
||||
}
|
||||
|
||||
func backend(bg *proxyd.BackendGroup, name string) *proxyd.Backend {
|
||||
|
Loading…
Reference in New Issue
Block a user