proxyd: round-robin lb for consensus group
This commit is contained in:
parent
b7f559c4c2
commit
58e3a35677
@ -591,9 +591,46 @@ func (b *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch b
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
backends := b.Backends
|
||||||
|
|
||||||
|
// When `consensus_aware` is set to `true`, the backend group acts as a load balancer
|
||||||
|
// serving traffic from any backend that agrees in the consensus group
|
||||||
|
if b.Consensus != nil {
|
||||||
|
cg := b.Consensus.GetConsensusGroup()
|
||||||
|
backendsHealthy := make([]*Backend, 0, len(cg))
|
||||||
|
backendsDegraded := make([]*Backend, 0, len(cg))
|
||||||
|
// separate into unhealthy, degraded and healthy backends
|
||||||
|
for _, be := range cg {
|
||||||
|
// unhealthy are filtered out and not attempted
|
||||||
|
if !be.IsHealthy() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if be.IsDegraded() {
|
||||||
|
backendsDegraded = append(backendsDegraded, be)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
backendsHealthy = append(backendsHealthy, be)
|
||||||
|
}
|
||||||
|
|
||||||
|
// shuffle both slices
|
||||||
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
r.Shuffle(len(backendsHealthy), func(i, j int) {
|
||||||
|
backendsHealthy[i], backendsHealthy[j] = backendsHealthy[j], backendsHealthy[i]
|
||||||
|
})
|
||||||
|
r = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
r.Shuffle(len(backendsDegraded), func(i, j int) {
|
||||||
|
backendsDegraded[i], backendsDegraded[j] = backendsDegraded[j], backendsDegraded[i]
|
||||||
|
})
|
||||||
|
|
||||||
|
// healthy are put into a priority position
|
||||||
|
// degraded backends are used as fallback
|
||||||
|
backends = backendsHealthy
|
||||||
|
backends = append(backends, backendsDegraded...)
|
||||||
|
}
|
||||||
|
|
||||||
rpcRequestsTotal.Inc()
|
rpcRequestsTotal.Inc()
|
||||||
|
|
||||||
for _, back := range b.Backends {
|
for _, back := range backends {
|
||||||
res, err := back.Forward(ctx, rpcReqs, isBatch)
|
res, err := back.Forward(ctx, rpcReqs, isBatch)
|
||||||
if errors.Is(err, ErrMethodNotWhitelisted) {
|
if errors.Is(err, ErrMethodNotWhitelisted) {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -3,6 +3,7 @@ package integration_tests
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
@ -47,6 +48,7 @@ func TestConsensus(t *testing.T) {
|
|||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
svr, shutdown, err := proxyd.Start(config)
|
svr, shutdown, err := proxyd.Start(config)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
client := NewProxydClient("http://127.0.0.1:8545")
|
||||||
defer shutdown()
|
defer shutdown()
|
||||||
|
|
||||||
bg := svr.BackendGroups["node"]
|
bg := svr.BackendGroups["node"]
|
||||||
@ -355,6 +357,45 @@ func TestConsensus(t *testing.T) {
|
|||||||
// should resolve to 0x1, the highest common ancestor
|
// should resolve to 0x1, the highest common ancestor
|
||||||
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
|
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("load balancing should hit both backends", func(t *testing.T) {
|
||||||
|
h1.ResetOverrides()
|
||||||
|
h2.ResetOverrides()
|
||||||
|
bg.Consensus.Unban()
|
||||||
|
|
||||||
|
for _, be := range bg.Backends {
|
||||||
|
bg.Consensus.UpdateBackend(ctx, be)
|
||||||
|
}
|
||||||
|
bg.Consensus.UpdateBackendGroupConsensus(ctx)
|
||||||
|
|
||||||
|
require.Equal(t, 2, len(bg.Consensus.GetConsensusGroup()))
|
||||||
|
|
||||||
|
node1.Reset()
|
||||||
|
node2.Reset()
|
||||||
|
|
||||||
|
require.Equal(t, 0, len(node1.Requests()))
|
||||||
|
require.Equal(t, 0, len(node2.Requests()))
|
||||||
|
|
||||||
|
// there is a random component to this test,
|
||||||
|
// since our round-robin implementation shuffles the ordering
|
||||||
|
// to achieve uniform distribution
|
||||||
|
|
||||||
|
// so we just make 100 requests per backend and expect the number of requests to be somewhat balanced
|
||||||
|
// i.e. each backend should be hit minimally by at least 50% of the requests
|
||||||
|
consensusGroup := bg.Consensus.GetConsensusGroup()
|
||||||
|
|
||||||
|
numberReqs := len(consensusGroup) * 100
|
||||||
|
for numberReqs > 0 {
|
||||||
|
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x1", false})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 200, statusCode)
|
||||||
|
numberReqs--
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := fmt.Sprintf("n1 %d, n2 %d", len(node1.Requests()), len(node2.Requests()))
|
||||||
|
require.GreaterOrEqual(t, len(node1.Requests()), 50, msg)
|
||||||
|
require.GreaterOrEqual(t, len(node2.Requests()), 50, msg)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func backend(bg *proxyd.BackendGroup, name string) *proxyd.Backend {
|
func backend(bg *proxyd.BackendGroup, name string) *proxyd.Backend {
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
[server]
|
[server]
|
||||||
rpc_port = 8080
|
rpc_port = 8545
|
||||||
|
|
||||||
[backend]
|
[backend]
|
||||||
response_timeout_seconds = 1
|
response_timeout_seconds = 1
|
||||||
|
@ -1,3 +1,10 @@
|
|||||||
|
- method: eth_chainId
|
||||||
|
response: >
|
||||||
|
{
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"id": 67,
|
||||||
|
"result": "hello",
|
||||||
|
}
|
||||||
- method: net_peerCount
|
- method: net_peerCount
|
||||||
response: >
|
response: >
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user