From 584322fced4d46285fccd5baebf61abca1a9b20b Mon Sep 17 00:00:00 2001 From: Jacob Elias Date: Tue, 4 Jun 2024 18:19:54 -0500 Subject: [PATCH] feat: proxyd: enable consensus-proxyd be able to fallback to a different list of backends if the primary list is experiencing downtime. (#10668) Co-authored-by: Jacob Elias --- proxyd/proxyd/Makefile | 4 + proxyd/proxyd/backend.go | 31 +- proxyd/proxyd/config.go | 2 + proxyd/proxyd/consensus_poller.go | 60 ++- .../integration_tests/consensus_test.go | 3 + .../proxyd/integration_tests/fallback_test.go | 374 ++++++++++++++++++ .../integration_tests/testdata/fallback.toml | 31 ++ proxyd/proxyd/metrics.go | 26 ++ proxyd/proxyd/proxyd.go | 45 ++- 9 files changed, 564 insertions(+), 12 deletions(-) create mode 100644 proxyd/proxyd/integration_tests/fallback_test.go create mode 100644 proxyd/proxyd/integration_tests/testdata/fallback.toml diff --git a/proxyd/proxyd/Makefile b/proxyd/proxyd/Makefile index 049a23a..d9ffb57 100644 --- a/proxyd/proxyd/Makefile +++ b/proxyd/proxyd/Makefile @@ -19,3 +19,7 @@ test: lint: go vet ./... .PHONY: test + +test-fallback: + go test -v ./... -test.run ^TestFallback$ +.PHONY: test-fallback diff --git a/proxyd/proxyd/backend.go b/proxyd/proxyd/backend.go index 81ec23c..802b94a 100644 --- a/proxyd/proxyd/backend.go +++ b/proxyd/proxyd/backend.go @@ -705,12 +705,35 @@ func sortBatchRPCResponse(req []*RPCReq, res []*RPCRes) { } type BackendGroup struct { - Name string - Backends []*Backend - WeightedRouting bool - Consensus *ConsensusPoller + Name string + Backends []*Backend + WeightedRouting bool + Consensus *ConsensusPoller + FallbackBackends map[string]bool } +func (bg *BackendGroup) Fallbacks() []*Backend { + fallbacks := []*Backend{} + for _, a := range bg.Backends { + if fallback, ok := bg.FallbackBackends[a.Name]; ok && fallback { + fallbacks = append(fallbacks, a) + } + } + return fallbacks +} + +func (bg *BackendGroup) Primaries() []*Backend { + primaries := []*Backend{} + for _, a := range bg.Backends { + fallback, ok := bg.FallbackBackends[a.Name] + if ok && !fallback { + primaries = append(primaries, a) + } + } + return primaries +} + +// NOTE: BackendGroup Forward contains the log for balancing with consensus aware func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) { if len(rpcReqs) == 0 { return nil, "", nil diff --git a/proxyd/proxyd/config.go b/proxyd/proxyd/config.go index 4eed330..4719a55 100644 --- a/proxyd/proxyd/config.go +++ b/proxyd/proxyd/config.go @@ -126,6 +126,8 @@ type BackendGroupConfig struct { ConsensusHAHeartbeatInterval TOMLDuration `toml:"consensus_ha_heartbeat_interval"` ConsensusHALockPeriod TOMLDuration `toml:"consensus_ha_lock_period"` ConsensusHARedis RedisConfig `toml:"consensus_ha_redis"` + + Fallbacks []string `toml:"fallbacks"` } type BackendGroupsConfig map[string]*BackendGroupConfig diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index 79f27b5..64ac026 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -122,12 +122,38 @@ func NewPollerAsyncHandler(ctx context.Context, cp *ConsensusPoller) ConsensusAs } } func (ah *PollerAsyncHandler) Init() { - // create the individual backend pollers - for _, be := range ah.cp.backendGroup.Backends { + // create the individual backend pollers. + log.Info("total number of primary candidates", "primaries", len(ah.cp.backendGroup.Primaries())) + log.Info("total number of fallback candidates", "fallbacks", len(ah.cp.backendGroup.Fallbacks())) + + for _, be := range ah.cp.backendGroup.Primaries() { go func(be *Backend) { for { timer := time.NewTimer(ah.cp.interval) ah.cp.UpdateBackend(ah.ctx, be) + select { + case <-timer.C: + case <-ah.ctx.Done(): + timer.Stop() + return + } + } + }(be) + } + + for _, be := range ah.cp.backendGroup.Fallbacks() { + go func(be *Backend) { + for { + timer := time.NewTimer(ah.cp.interval) + + healthyCandidates := ah.cp.FilterCandidates(ah.cp.backendGroup.Primaries()) + + log.Info("number of healthy primary candidates", "healthy_candidates", len(healthyCandidates)) + if len(healthyCandidates) == 0 { + log.Info("zero healthy candidates, querying fallback backend", + "backend_name", be.Name) + ah.cp.UpdateBackend(ah.ctx, be) + } select { case <-timer.C: @@ -143,6 +169,7 @@ func (ah *PollerAsyncHandler) Init() { go func() { for { timer := time.NewTimer(ah.cp.interval) + log.Info("updating backend group consensus") ah.cp.UpdateBackendGroupConsensus(ah.ctx) select { @@ -609,6 +636,13 @@ func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState { } } +func (cp *ConsensusPoller) GetLastUpdate(be *Backend) time.Time { + bs := cp.backendState[be] + defer bs.backendStateMux.Unlock() + bs.backendStateMux.Lock() + return bs.lastUpdate +} + func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, latestBlockNumber hexutil.Uint64, latestBlockHash string, safeBlockNumber hexutil.Uint64, @@ -627,7 +661,21 @@ func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync return changed } -// getConsensusCandidates find out what backends are the candidates to be in the consensus group +// getConsensusCandidates will search for candidates in the primary group, +// if there are none it will search for candidates in he fallback group +func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState { + + healthyPrimaries := cp.FilterCandidates(cp.backendGroup.Primaries()) + + RecordHealthyCandidates(cp.backendGroup, len(healthyPrimaries)) + if len(healthyPrimaries) > 0 { + return healthyPrimaries + } + + return cp.FilterCandidates(cp.backendGroup.Fallbacks()) +} + +// filterCandidates find out what backends are the candidates to be in the consensus group // and create a copy of current their state // // a candidate is a serving node within the following conditions: @@ -637,10 +685,12 @@ func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync // - in sync // - updated recently // - not lagging latest block -func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState { +func (cp *ConsensusPoller) FilterCandidates(backends []*Backend) map[*Backend]*backendState { + candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends)) - for _, be := range cp.backendGroup.Backends { + for _, be := range backends { + bs := cp.getBackendState(be) if be.forcedCandidate { candidates[be] = bs diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index 1b37ef7..654b7a5 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -108,6 +108,9 @@ func TestConsensus(t *testing.T) { } override := func(node string, method string, block string, response string) { + if _, ok := nodes[node]; !ok { + t.Fatalf("node %s does not exist in the nodes map", node) + } nodes[node].handler.AddOverride(&ms.MethodTemplate{ Method: method, Block: block, diff --git a/proxyd/proxyd/integration_tests/fallback_test.go b/proxyd/proxyd/integration_tests/fallback_test.go new file mode 100644 index 0000000..c5b3e48 --- /dev/null +++ b/proxyd/proxyd/integration_tests/fallback_test.go @@ -0,0 +1,374 @@ +package integration_tests + +import ( + "context" + "fmt" + "net/http" + "os" + "path" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common/hexutil" + + "github.com/ethereum-optimism/optimism/proxyd" + ms "github.com/ethereum-optimism/optimism/proxyd/tools/mockserver/handler" + "github.com/stretchr/testify/require" +) + +func setup_failover(t *testing.T) (map[string]nodeContext, *proxyd.BackendGroup, *ProxydHTTPClient, func(), []time.Time, []time.Time) { + // setup mock servers + node1 := NewMockBackend(nil) + node2 := NewMockBackend(nil) + + dir, err := os.Getwd() + require.NoError(t, err) + + responses := path.Join(dir, "testdata/consensus_responses.yml") + + h1 := ms.MockedHandler{ + Overrides: []*ms.MethodTemplate{}, + Autoload: true, + AutoloadFile: responses, + } + h2 := ms.MockedHandler{ + Overrides: []*ms.MethodTemplate{}, + Autoload: true, + AutoloadFile: responses, + } + + require.NoError(t, os.Setenv("NODE1_URL", node1.URL())) + require.NoError(t, os.Setenv("NODE2_URL", node2.URL())) + + node1.SetHandler(http.HandlerFunc(h1.Handler)) + node2.SetHandler(http.HandlerFunc(h2.Handler)) + + // setup proxyd + config := ReadConfig("fallback") + svr, shutdown, err := proxyd.Start(config) + require.NoError(t, err) + + // expose the proxyd client + client := NewProxydClient("http://127.0.0.1:8545") + + // expose the backend group + bg := svr.BackendGroups["node"] + require.NotNil(t, bg) + require.NotNil(t, bg.Consensus) + require.Equal(t, 2, len(bg.Backends)) // should match config + + // convenient mapping to access the nodes by name + nodes := map[string]nodeContext{ + "normal": { + mockBackend: node1, + backend: bg.Backends[0], + handler: &h1, + }, + "fallback": { + mockBackend: node2, + backend: bg.Backends[1], + handler: &h2, + }, + } + normalTimestamps := []time.Time{} + fallbackTimestamps := []time.Time{} + + return nodes, bg, client, shutdown, normalTimestamps, fallbackTimestamps +} + +func TestFallback(t *testing.T) { + nodes, bg, client, shutdown, normalTimestamps, fallbackTimestamps := setup_failover(t) + defer nodes["normal"].mockBackend.Close() + defer nodes["fallback"].mockBackend.Close() + defer shutdown() + + ctx := context.Background() + + // Use Update to Advance the Candidate iteration + update := func() { + for _, be := range bg.Primaries() { + bg.Consensus.UpdateBackend(ctx, be) + } + + for _, be := range bg.Fallbacks() { + healthyCandidates := bg.Consensus.FilterCandidates(bg.Primaries()) + if len(healthyCandidates) == 0 { + bg.Consensus.UpdateBackend(ctx, be) + } + } + + bg.Consensus.UpdateBackendGroupConsensus(ctx) + } + + override := func(node string, method string, block string, response string) { + if _, ok := nodes[node]; !ok { + t.Fatalf("node %s does not exist in the nodes map", node) + } + nodes[node].handler.AddOverride(&ms.MethodTemplate{ + Method: method, + Block: block, + Response: response, + }) + } + + overrideBlock := func(node string, blockRequest string, blockResponse string) { + override(node, + "eth_getBlockByNumber", + blockRequest, + buildResponse(map[string]string{ + "number": blockResponse, + "hash": "hash_" + blockResponse, + })) + } + + overrideBlockHash := func(node string, blockRequest string, number string, hash string) { + override(node, + "eth_getBlockByNumber", + blockRequest, + buildResponse(map[string]string{ + "number": number, + "hash": hash, + })) + } + + overridePeerCount := func(node string, count int) { + override(node, "net_peerCount", "", buildResponse(hexutil.Uint64(count).String())) + } + + overrideNotInSync := func(node string) { + override(node, "eth_syncing", "", buildResponse(map[string]string{ + "startingblock": "0x0", + "currentblock": "0x0", + "highestblock": "0x100", + })) + } + + containsNode := func(backends []*proxyd.Backend, name string) bool { + for _, be := range backends { + // Note: Currently checks for name but would like to expose fallback better + if be.Name == name { + return true + } + } + return false + } + + // TODO: Improvement instead of simple array, + // ensure normal and backend are returned in strict order + recordLastUpdates := func(backends []*proxyd.Backend) []time.Time { + lastUpdated := []time.Time{} + for _, be := range backends { + lastUpdated = append(lastUpdated, bg.Consensus.GetLastUpdate(be)) + } + return lastUpdated + } + + // convenient methods to manipulate state and mock responses + reset := func() { + for _, node := range nodes { + node.handler.ResetOverrides() + node.mockBackend.Reset() + } + bg.Consensus.ClearListeners() + bg.Consensus.Reset() + + normalTimestamps = []time.Time{} + fallbackTimestamps = []time.Time{} + } + + /* + triggerFirstNormalFailure: will trigger consensus group into fallback mode + old consensus group should be returned one time, and fallback group should be enabled + Fallback will be returned subsequent update + */ + triggerFirstNormalFailure := func() { + overridePeerCount("normal", 0) + update() + require.True(t, containsNode(bg.Consensus.GetConsensusGroup(), "fallback")) + require.False(t, containsNode(bg.Consensus.GetConsensusGroup(), "normal")) + require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) + nodes["fallback"].mockBackend.Reset() + } + + t.Run("Test fallback Mode will not be exited, unless state changes", func(t *testing.T) { + reset() + triggerFirstNormalFailure() + for i := 0; i < 10; i++ { + update() + require.False(t, containsNode(bg.Consensus.GetConsensusGroup(), "normal")) + require.True(t, containsNode(bg.Consensus.GetConsensusGroup(), "fallback")) + require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) + } + }) + + t.Run("Test Healthy mode will not be exited unless state changes", func(t *testing.T) { + reset() + for i := 0; i < 10; i++ { + update() + require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) + require.False(t, containsNode(bg.Consensus.GetConsensusGroup(), "fallback")) + require.True(t, containsNode(bg.Consensus.GetConsensusGroup(), "normal")) + + _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x101", false}) + + require.Equal(t, 200, statusCode) + require.Nil(t, err, "error not nil") + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) + } + // TODO: Remove these, just here so compiler doesn't complain + overrideNotInSync("normal") + overrideBlock("normal", "safe", "0xb1") + overrideBlockHash("fallback", "0x102", "0x102", "wrong_hash") + }) + + t.Run("trigger normal failure, subsequent update return failover in consensus group, and fallback mode enabled", func(t *testing.T) { + reset() + triggerFirstNormalFailure() + update() + require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) + require.True(t, containsNode(bg.Consensus.GetConsensusGroup(), "fallback")) + require.False(t, containsNode(bg.Consensus.GetConsensusGroup(), "normal")) + }) + + t.Run("trigger healthy -> fallback, update -> healthy", func(t *testing.T) { + reset() + update() + require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) + require.True(t, containsNode(bg.Consensus.GetConsensusGroup(), "normal")) + require.False(t, containsNode(bg.Consensus.GetConsensusGroup(), "fallback")) + + triggerFirstNormalFailure() + update() + require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) + require.True(t, containsNode(bg.Consensus.GetConsensusGroup(), "fallback")) + require.False(t, containsNode(bg.Consensus.GetConsensusGroup(), "normal")) + + overridePeerCount("normal", 5) + update() + require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) + require.True(t, containsNode(bg.Consensus.GetConsensusGroup(), "normal")) + require.False(t, containsNode(bg.Consensus.GetConsensusGroup(), "fallback")) + }) + + t.Run("Ensure fallback is not updated when in normal mode", func(t *testing.T) { + reset() + for i := 0; i < 10; i++ { + update() + ts := recordLastUpdates(bg.Backends) + normalTimestamps = append(normalTimestamps, ts[0]) + fallbackTimestamps = append(fallbackTimestamps, ts[1]) + + require.False(t, normalTimestamps[i].IsZero()) + require.True(t, fallbackTimestamps[i].IsZero()) + + require.True(t, containsNode(bg.Consensus.GetConsensusGroup(), "normal")) + require.False(t, containsNode(bg.Consensus.GetConsensusGroup(), "fallback")) + + // consensus at block 0x101 + require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String()) + require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String()) + } + }) + + /* + Set Normal backend to Fail -> both backends should be updated + */ + t.Run("Ensure both nodes are quieried in fallback mode", func(t *testing.T) { + reset() + triggerFirstNormalFailure() + for i := 0; i < 10; i++ { + update() + ts := recordLastUpdates(bg.Backends) + normalTimestamps = append(normalTimestamps, ts[0]) + fallbackTimestamps = append(fallbackTimestamps, ts[1]) + + // Both Nodes should be updated again + require.False(t, normalTimestamps[i].IsZero()) + require.False(t, fallbackTimestamps[i].IsZero(), + fmt.Sprintf("Error: Fallback timestamp: %v was not queried on iteratio %d", fallbackTimestamps[i], i), + ) + if i > 0 { + require.Greater(t, normalTimestamps[i], normalTimestamps[i-1]) + require.Greater(t, fallbackTimestamps[i], fallbackTimestamps[i-1]) + } + } + }) + + t.Run("Ensure both nodes are quieried in fallback mode", func(t *testing.T) { + reset() + triggerFirstNormalFailure() + for i := 0; i < 10; i++ { + update() + ts := recordLastUpdates(bg.Backends) + normalTimestamps = append(normalTimestamps, ts[0]) + fallbackTimestamps = append(fallbackTimestamps, ts[1]) + + // Both Nodes should be updated again + require.False(t, normalTimestamps[i].IsZero()) + require.False(t, fallbackTimestamps[i].IsZero(), + fmt.Sprintf("Error: Fallback timestamp: %v was not queried on iteratio %d", fallbackTimestamps[i], i), + ) + if i > 0 { + require.Greater(t, normalTimestamps[i], normalTimestamps[i-1]) + require.Greater(t, fallbackTimestamps[i], fallbackTimestamps[i-1]) + } + } + }) + t.Run("Healthy -> Fallback -> Healthy with timestamps", func(t *testing.T) { + reset() + for i := 0; i < 10; i++ { + update() + ts := recordLastUpdates(bg.Backends) + normalTimestamps = append(normalTimestamps, ts[0]) + fallbackTimestamps = append(fallbackTimestamps, ts[1]) + + // Normal is queried, fallback is not + require.False(t, normalTimestamps[i].IsZero()) + require.True(t, fallbackTimestamps[i].IsZero(), + fmt.Sprintf("Error: Fallback timestamp: %v was not queried on iteratio %d", fallbackTimestamps[i], i), + ) + if i > 0 { + require.Greater(t, normalTimestamps[i], normalTimestamps[i-1]) + // Fallbacks should be zeros + require.Equal(t, fallbackTimestamps[i], fallbackTimestamps[i-1]) + } + } + + offset := 10 + triggerFirstNormalFailure() + for i := 0; i < 10; i++ { + update() + ts := recordLastUpdates(bg.Backends) + normalTimestamps = append(normalTimestamps, ts[0]) + fallbackTimestamps = append(fallbackTimestamps, ts[1]) + + // Both Nodes should be updated again + require.False(t, normalTimestamps[i+offset].IsZero()) + require.False(t, fallbackTimestamps[i+offset].IsZero()) + + require.Greater(t, normalTimestamps[i+offset], normalTimestamps[i+offset-1]) + require.Greater(t, fallbackTimestamps[i+offset], fallbackTimestamps[i+offset-1]) + } + + overridePeerCount("normal", 5) + offset = 20 + for i := 0; i < 10; i++ { + update() + ts := recordLastUpdates(bg.Backends) + normalTimestamps = append(normalTimestamps, ts[0]) + fallbackTimestamps = append(fallbackTimestamps, ts[1]) + + // Normal Node will be updated + require.False(t, normalTimestamps[i+offset].IsZero()) + require.Greater(t, normalTimestamps[i+offset], normalTimestamps[i+offset-1]) + + // fallback should not be updating + if offset+i > 21 { + require.Equal(t, fallbackTimestamps[i+offset], fallbackTimestamps[i+offset-1]) + } + } + }) +} diff --git a/proxyd/proxyd/integration_tests/testdata/fallback.toml b/proxyd/proxyd/integration_tests/testdata/fallback.toml new file mode 100644 index 0000000..c801ca3 --- /dev/null +++ b/proxyd/proxyd/integration_tests/testdata/fallback.toml @@ -0,0 +1,31 @@ +[server] +rpc_port = 8545 + +[backend] +response_timeout_seconds = 1 +max_degraded_latency_threshold = "30ms" + +[backends] +[backends.normal] +rpc_url = "$NODE1_URL" + +[backends.fallback] +rpc_url = "$NODE2_URL" + +[backend_groups] +[backend_groups.node] +backends = ["normal", "fallback"] +consensus_aware = true +consensus_handler = "noop" # allow more control over the consensus poller for tests +consensus_ban_period = "1m" +consensus_max_update_threshold = "2m" +consensus_max_block_lag = 8 +consensus_min_peer_count = 4 +fallbacks = ["fallback"] + +[rpc_method_mappings] +eth_call = "node" +eth_chainId = "node" +eth_blockNumber = "node" +eth_getBlockByNumber = "node" +consensus_getReceipts = "node" diff --git a/proxyd/proxyd/metrics.go b/proxyd/proxyd/metrics.go index 90a79ab..4046af0 100644 --- a/proxyd/proxyd/metrics.go +++ b/proxyd/proxyd/metrics.go @@ -410,6 +410,24 @@ var ( }, []string{ "backend_name", }) + + healthyPrimaryCandidates = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: MetricsNamespace, + Name: "healthy_candidates", + Help: "Record the number of healthy primary candidates", + }, []string{ + "backend_group_name", + }) + + backendGroupFallbackBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: MetricsNamespace, + Name: "backend_group_fallback_backenend", + Help: "Bool gauge for if a backend is a fallback for a backend group", + }, []string{ + "backend_group", + "backend_name", + "fallback", + }) ) func RecordRedisError(source string) { @@ -541,6 +559,10 @@ func RecordConsensusBackendBanned(b *Backend, banned bool) { consensusBannedBackends.WithLabelValues(b.Name).Set(boolToFloat64(banned)) } +func RecordHealthyCandidates(b *BackendGroup, candidates int) { + healthyPrimaryCandidates.WithLabelValues(b.Name).Set(float64(candidates)) +} + func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) { consensusPeerCountBackend.WithLabelValues(b.Name).Set(float64(peerCount)) } @@ -567,6 +589,10 @@ func RecordBackendNetworkErrorRateSlidingWindow(b *Backend, rate float64) { networkErrorRateBackend.WithLabelValues(b.Name).Set(rate) } +func RecordBackendGroupFallbacks(bg *BackendGroup, name string, fallback bool) { + backendGroupFallbackBackend.WithLabelValues(bg.Name, name, strconv.FormatBool(fallback)).Set(boolToFloat64(fallback)) +} + func boolToFloat64(b bool) float64 { if b { return 1 diff --git a/proxyd/proxyd/proxyd.go b/proxyd/proxyd/proxyd.go index 73a3117..402909b 100644 --- a/proxyd/proxyd/proxyd.go +++ b/proxyd/proxyd/proxyd.go @@ -187,17 +187,47 @@ func Start(config *Config) (*Server, func(), error) { backendGroups := make(map[string]*BackendGroup) for bgName, bg := range config.BackendGroups { backends := make([]*Backend, 0) + fallbackBackends := make(map[string]bool) + fallbackCount := 0 for _, bName := range bg.Backends { if backendsByName[bName] == nil { return nil, nil, fmt.Errorf("backend %s is not defined", bName) } backends = append(backends, backendsByName[bName]) + + for _, fb := range bg.Fallbacks { + if bName == fb { + fallbackBackends[bName] = true + log.Info("configured backend as fallback", + "backend_name", bName, + "backend_group", bgName, + ) + fallbackCount++ + } + } + + if _, ok := fallbackBackends[bName]; !ok { + fallbackBackends[bName] = false + log.Info("configured backend as primary", + "backend_name", bName, + "backend_group", bgName, + ) + } + } + + if fallbackCount != len(bg.Fallbacks) { + return nil, nil, + fmt.Errorf( + "error: number of fallbacks instantiated (%d) did not match configured (%d) for backend group %s", + fallbackCount, len(bg.Fallbacks), bgName, + ) } backendGroups[bgName] = &BackendGroup{ - Name: bgName, - Backends: backends, - WeightedRouting: bg.WeightedRouting, + Name: bgName, + Backends: backends, + WeightedRouting: bg.WeightedRouting, + FallbackBackends: fallbackBackends, } } @@ -350,6 +380,15 @@ func Start(config *Config) (*Server, func(), error) { copts = append(copts, WithPollerInterval(time.Duration(bgcfg.ConsensusPollerInterval))) } + for _, be := range bgcfg.Backends { + if fallback, ok := bg.FallbackBackends[be]; !ok { + log.Crit("error backend not found in backend fallback configurations", "backend_name", be) + } else { + log.Debug("configuring new backend for group", "backend_group", bgName, "backend_name", be, "fallback", fallback) + RecordBackendGroupFallbacks(bg, be, fallback) + } + } + var tracker ConsensusTracker if bgcfg.ConsensusHA { if bgcfg.ConsensusHARedis.URL == "" {