feat: proxyd: enable consensus-proxyd be able to fallback to a different list of backends if the primary list is experiencing downtime. (#10668)

Co-authored-by: Jacob Elias <jacob@jacobs-apple-macbook-pro.phoenix-vibes.ts.net>
This commit is contained in:
Jacob Elias 2024-06-04 18:19:54 -05:00 committed by GitHub
parent 2f424fd5c6
commit 584322fced
9 changed files with 564 additions and 12 deletions

@ -19,3 +19,7 @@ test:
lint:
go vet ./...
.PHONY: test
test-fallback:
go test -v ./... -test.run ^TestFallback$
.PHONY: test-fallback

@ -705,12 +705,35 @@ func sortBatchRPCResponse(req []*RPCReq, res []*RPCRes) {
}
type BackendGroup struct {
Name string
Backends []*Backend
WeightedRouting bool
Consensus *ConsensusPoller
Name string
Backends []*Backend
WeightedRouting bool
Consensus *ConsensusPoller
FallbackBackends map[string]bool
}
func (bg *BackendGroup) Fallbacks() []*Backend {
fallbacks := []*Backend{}
for _, a := range bg.Backends {
if fallback, ok := bg.FallbackBackends[a.Name]; ok && fallback {
fallbacks = append(fallbacks, a)
}
}
return fallbacks
}
func (bg *BackendGroup) Primaries() []*Backend {
primaries := []*Backend{}
for _, a := range bg.Backends {
fallback, ok := bg.FallbackBackends[a.Name]
if ok && !fallback {
primaries = append(primaries, a)
}
}
return primaries
}
// NOTE: BackendGroup Forward contains the log for balancing with consensus aware
func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) {
if len(rpcReqs) == 0 {
return nil, "", nil

@ -126,6 +126,8 @@ type BackendGroupConfig struct {
ConsensusHAHeartbeatInterval TOMLDuration `toml:"consensus_ha_heartbeat_interval"`
ConsensusHALockPeriod TOMLDuration `toml:"consensus_ha_lock_period"`
ConsensusHARedis RedisConfig `toml:"consensus_ha_redis"`
Fallbacks []string `toml:"fallbacks"`
}
type BackendGroupsConfig map[string]*BackendGroupConfig

@ -122,12 +122,38 @@ func NewPollerAsyncHandler(ctx context.Context, cp *ConsensusPoller) ConsensusAs
}
}
func (ah *PollerAsyncHandler) Init() {
// create the individual backend pollers
for _, be := range ah.cp.backendGroup.Backends {
// create the individual backend pollers.
log.Info("total number of primary candidates", "primaries", len(ah.cp.backendGroup.Primaries()))
log.Info("total number of fallback candidates", "fallbacks", len(ah.cp.backendGroup.Fallbacks()))
for _, be := range ah.cp.backendGroup.Primaries() {
go func(be *Backend) {
for {
timer := time.NewTimer(ah.cp.interval)
ah.cp.UpdateBackend(ah.ctx, be)
select {
case <-timer.C:
case <-ah.ctx.Done():
timer.Stop()
return
}
}
}(be)
}
for _, be := range ah.cp.backendGroup.Fallbacks() {
go func(be *Backend) {
for {
timer := time.NewTimer(ah.cp.interval)
healthyCandidates := ah.cp.FilterCandidates(ah.cp.backendGroup.Primaries())
log.Info("number of healthy primary candidates", "healthy_candidates", len(healthyCandidates))
if len(healthyCandidates) == 0 {
log.Info("zero healthy candidates, querying fallback backend",
"backend_name", be.Name)
ah.cp.UpdateBackend(ah.ctx, be)
}
select {
case <-timer.C:
@ -143,6 +169,7 @@ func (ah *PollerAsyncHandler) Init() {
go func() {
for {
timer := time.NewTimer(ah.cp.interval)
log.Info("updating backend group consensus")
ah.cp.UpdateBackendGroupConsensus(ah.ctx)
select {
@ -609,6 +636,13 @@ func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState {
}
}
func (cp *ConsensusPoller) GetLastUpdate(be *Backend) time.Time {
bs := cp.backendState[be]
defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock()
return bs.lastUpdate
}
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool,
latestBlockNumber hexutil.Uint64, latestBlockHash string,
safeBlockNumber hexutil.Uint64,
@ -627,7 +661,21 @@ func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync
return changed
}
// getConsensusCandidates find out what backends are the candidates to be in the consensus group
// getConsensusCandidates will search for candidates in the primary group,
// if there are none it will search for candidates in he fallback group
func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState {
healthyPrimaries := cp.FilterCandidates(cp.backendGroup.Primaries())
RecordHealthyCandidates(cp.backendGroup, len(healthyPrimaries))
if len(healthyPrimaries) > 0 {
return healthyPrimaries
}
return cp.FilterCandidates(cp.backendGroup.Fallbacks())
}
// filterCandidates find out what backends are the candidates to be in the consensus group
// and create a copy of current their state
//
// a candidate is a serving node within the following conditions:
@ -637,10 +685,12 @@ func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync
// - in sync
// - updated recently
// - not lagging latest block
func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState {
func (cp *ConsensusPoller) FilterCandidates(backends []*Backend) map[*Backend]*backendState {
candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends))
for _, be := range cp.backendGroup.Backends {
for _, be := range backends {
bs := cp.getBackendState(be)
if be.forcedCandidate {
candidates[be] = bs

@ -108,6 +108,9 @@ func TestConsensus(t *testing.T) {
}
override := func(node string, method string, block string, response string) {
if _, ok := nodes[node]; !ok {
t.Fatalf("node %s does not exist in the nodes map", node)
}
nodes[node].handler.AddOverride(&ms.MethodTemplate{
Method: method,
Block: block,

@ -0,0 +1,374 @@
package integration_tests
import (
"context"
"fmt"
"net/http"
"os"
"path"
"testing"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum-optimism/optimism/proxyd"
ms "github.com/ethereum-optimism/optimism/proxyd/tools/mockserver/handler"
"github.com/stretchr/testify/require"
)
func setup_failover(t *testing.T) (map[string]nodeContext, *proxyd.BackendGroup, *ProxydHTTPClient, func(), []time.Time, []time.Time) {
// setup mock servers
node1 := NewMockBackend(nil)
node2 := NewMockBackend(nil)
dir, err := os.Getwd()
require.NoError(t, err)
responses := path.Join(dir, "testdata/consensus_responses.yml")
h1 := ms.MockedHandler{
Overrides: []*ms.MethodTemplate{},
Autoload: true,
AutoloadFile: responses,
}
h2 := ms.MockedHandler{
Overrides: []*ms.MethodTemplate{},
Autoload: true,
AutoloadFile: responses,
}
require.NoError(t, os.Setenv("NODE1_URL", node1.URL()))
require.NoError(t, os.Setenv("NODE2_URL", node2.URL()))
node1.SetHandler(http.HandlerFunc(h1.Handler))
node2.SetHandler(http.HandlerFunc(h2.Handler))
// setup proxyd
config := ReadConfig("fallback")
svr, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
// expose the proxyd client
client := NewProxydClient("http://127.0.0.1:8545")
// expose the backend group
bg := svr.BackendGroups["node"]
require.NotNil(t, bg)
require.NotNil(t, bg.Consensus)
require.Equal(t, 2, len(bg.Backends)) // should match config
// convenient mapping to access the nodes by name
nodes := map[string]nodeContext{
"normal": {
mockBackend: node1,
backend: bg.Backends[0],
handler: &h1,
},
"fallback": {
mockBackend: node2,
backend: bg.Backends[1],
handler: &h2,
},
}
normalTimestamps := []time.Time{}
fallbackTimestamps := []time.Time{}
return nodes, bg, client, shutdown, normalTimestamps, fallbackTimestamps
}
func TestFallback(t *testing.T) {
nodes, bg, client, shutdown, normalTimestamps, fallbackTimestamps := setup_failover(t)
defer nodes["normal"].mockBackend.Close()
defer nodes["fallback"].mockBackend.Close()
defer shutdown()
ctx := context.Background()
// Use Update to Advance the Candidate iteration
update := func() {
for _, be := range bg.Primaries() {
bg.Consensus.UpdateBackend(ctx, be)
}
for _, be := range bg.Fallbacks() {
healthyCandidates := bg.Consensus.FilterCandidates(bg.Primaries())
if len(healthyCandidates) == 0 {
bg.Consensus.UpdateBackend(ctx, be)
}
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
}
override := func(node string, method string, block string, response string) {
if _, ok := nodes[node]; !ok {
t.Fatalf("node %s does not exist in the nodes map", node)
}
nodes[node].handler.AddOverride(&ms.MethodTemplate{
Method: method,
Block: block,
Response: response,
})
}
overrideBlock := func(node string, blockRequest string, blockResponse string) {
override(node,
"eth_getBlockByNumber",
blockRequest,
buildResponse(map[string]string{
"number": blockResponse,
"hash": "hash_" + blockResponse,
}))
}
overrideBlockHash := func(node string, blockRequest string, number string, hash string) {
override(node,
"eth_getBlockByNumber",
blockRequest,
buildResponse(map[string]string{
"number": number,
"hash": hash,
}))
}
overridePeerCount := func(node string, count int) {
override(node, "net_peerCount", "", buildResponse(hexutil.Uint64(count).String()))
}
overrideNotInSync := func(node string) {
override(node, "eth_syncing", "", buildResponse(map[string]string{
"startingblock": "0x0",
"currentblock": "0x0",
"highestblock": "0x100",
}))
}
containsNode := func(backends []*proxyd.Backend, name string) bool {
for _, be := range backends {
// Note: Currently checks for name but would like to expose fallback better
if be.Name == name {
return true
}
}
return false
}
// TODO: Improvement instead of simple array,
// ensure normal and backend are returned in strict order
recordLastUpdates := func(backends []*proxyd.Backend) []time.Time {
lastUpdated := []time.Time{}
for _, be := range backends {
lastUpdated = append(lastUpdated, bg.Consensus.GetLastUpdate(be))
}
return lastUpdated
}
// convenient methods to manipulate state and mock responses
reset := func() {
for _, node := range nodes {
node.handler.ResetOverrides()
node.mockBackend.Reset()
}
bg.Consensus.ClearListeners()
bg.Consensus.Reset()
normalTimestamps = []time.Time{}
fallbackTimestamps = []time.Time{}
}
/*
triggerFirstNormalFailure: will trigger consensus group into fallback mode
old consensus group should be returned one time, and fallback group should be enabled
Fallback will be returned subsequent update
*/
triggerFirstNormalFailure := func() {
overridePeerCount("normal", 0)
update()
require.True(t, containsNode(bg.Consensus.GetConsensusGroup(), "fallback"))
require.False(t, containsNode(bg.Consensus.GetConsensusGroup(), "normal"))
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
nodes["fallback"].mockBackend.Reset()
}
t.Run("Test fallback Mode will not be exited, unless state changes", func(t *testing.T) {
reset()
triggerFirstNormalFailure()
for i := 0; i < 10; i++ {
update()
require.False(t, containsNode(bg.Consensus.GetConsensusGroup(), "normal"))
require.True(t, containsNode(bg.Consensus.GetConsensusGroup(), "fallback"))
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
}
})
t.Run("Test Healthy mode will not be exited unless state changes", func(t *testing.T) {
reset()
for i := 0; i < 10; i++ {
update()
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
require.False(t, containsNode(bg.Consensus.GetConsensusGroup(), "fallback"))
require.True(t, containsNode(bg.Consensus.GetConsensusGroup(), "normal"))
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x101", false})
require.Equal(t, 200, statusCode)
require.Nil(t, err, "error not nil")
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
}
// TODO: Remove these, just here so compiler doesn't complain
overrideNotInSync("normal")
overrideBlock("normal", "safe", "0xb1")
overrideBlockHash("fallback", "0x102", "0x102", "wrong_hash")
})
t.Run("trigger normal failure, subsequent update return failover in consensus group, and fallback mode enabled", func(t *testing.T) {
reset()
triggerFirstNormalFailure()
update()
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
require.True(t, containsNode(bg.Consensus.GetConsensusGroup(), "fallback"))
require.False(t, containsNode(bg.Consensus.GetConsensusGroup(), "normal"))
})
t.Run("trigger healthy -> fallback, update -> healthy", func(t *testing.T) {
reset()
update()
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
require.True(t, containsNode(bg.Consensus.GetConsensusGroup(), "normal"))
require.False(t, containsNode(bg.Consensus.GetConsensusGroup(), "fallback"))
triggerFirstNormalFailure()
update()
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
require.True(t, containsNode(bg.Consensus.GetConsensusGroup(), "fallback"))
require.False(t, containsNode(bg.Consensus.GetConsensusGroup(), "normal"))
overridePeerCount("normal", 5)
update()
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
require.True(t, containsNode(bg.Consensus.GetConsensusGroup(), "normal"))
require.False(t, containsNode(bg.Consensus.GetConsensusGroup(), "fallback"))
})
t.Run("Ensure fallback is not updated when in normal mode", func(t *testing.T) {
reset()
for i := 0; i < 10; i++ {
update()
ts := recordLastUpdates(bg.Backends)
normalTimestamps = append(normalTimestamps, ts[0])
fallbackTimestamps = append(fallbackTimestamps, ts[1])
require.False(t, normalTimestamps[i].IsZero())
require.True(t, fallbackTimestamps[i].IsZero())
require.True(t, containsNode(bg.Consensus.GetConsensusGroup(), "normal"))
require.False(t, containsNode(bg.Consensus.GetConsensusGroup(), "fallback"))
// consensus at block 0x101
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
}
})
/*
Set Normal backend to Fail -> both backends should be updated
*/
t.Run("Ensure both nodes are quieried in fallback mode", func(t *testing.T) {
reset()
triggerFirstNormalFailure()
for i := 0; i < 10; i++ {
update()
ts := recordLastUpdates(bg.Backends)
normalTimestamps = append(normalTimestamps, ts[0])
fallbackTimestamps = append(fallbackTimestamps, ts[1])
// Both Nodes should be updated again
require.False(t, normalTimestamps[i].IsZero())
require.False(t, fallbackTimestamps[i].IsZero(),
fmt.Sprintf("Error: Fallback timestamp: %v was not queried on iteratio %d", fallbackTimestamps[i], i),
)
if i > 0 {
require.Greater(t, normalTimestamps[i], normalTimestamps[i-1])
require.Greater(t, fallbackTimestamps[i], fallbackTimestamps[i-1])
}
}
})
t.Run("Ensure both nodes are quieried in fallback mode", func(t *testing.T) {
reset()
triggerFirstNormalFailure()
for i := 0; i < 10; i++ {
update()
ts := recordLastUpdates(bg.Backends)
normalTimestamps = append(normalTimestamps, ts[0])
fallbackTimestamps = append(fallbackTimestamps, ts[1])
// Both Nodes should be updated again
require.False(t, normalTimestamps[i].IsZero())
require.False(t, fallbackTimestamps[i].IsZero(),
fmt.Sprintf("Error: Fallback timestamp: %v was not queried on iteratio %d", fallbackTimestamps[i], i),
)
if i > 0 {
require.Greater(t, normalTimestamps[i], normalTimestamps[i-1])
require.Greater(t, fallbackTimestamps[i], fallbackTimestamps[i-1])
}
}
})
t.Run("Healthy -> Fallback -> Healthy with timestamps", func(t *testing.T) {
reset()
for i := 0; i < 10; i++ {
update()
ts := recordLastUpdates(bg.Backends)
normalTimestamps = append(normalTimestamps, ts[0])
fallbackTimestamps = append(fallbackTimestamps, ts[1])
// Normal is queried, fallback is not
require.False(t, normalTimestamps[i].IsZero())
require.True(t, fallbackTimestamps[i].IsZero(),
fmt.Sprintf("Error: Fallback timestamp: %v was not queried on iteratio %d", fallbackTimestamps[i], i),
)
if i > 0 {
require.Greater(t, normalTimestamps[i], normalTimestamps[i-1])
// Fallbacks should be zeros
require.Equal(t, fallbackTimestamps[i], fallbackTimestamps[i-1])
}
}
offset := 10
triggerFirstNormalFailure()
for i := 0; i < 10; i++ {
update()
ts := recordLastUpdates(bg.Backends)
normalTimestamps = append(normalTimestamps, ts[0])
fallbackTimestamps = append(fallbackTimestamps, ts[1])
// Both Nodes should be updated again
require.False(t, normalTimestamps[i+offset].IsZero())
require.False(t, fallbackTimestamps[i+offset].IsZero())
require.Greater(t, normalTimestamps[i+offset], normalTimestamps[i+offset-1])
require.Greater(t, fallbackTimestamps[i+offset], fallbackTimestamps[i+offset-1])
}
overridePeerCount("normal", 5)
offset = 20
for i := 0; i < 10; i++ {
update()
ts := recordLastUpdates(bg.Backends)
normalTimestamps = append(normalTimestamps, ts[0])
fallbackTimestamps = append(fallbackTimestamps, ts[1])
// Normal Node will be updated
require.False(t, normalTimestamps[i+offset].IsZero())
require.Greater(t, normalTimestamps[i+offset], normalTimestamps[i+offset-1])
// fallback should not be updating
if offset+i > 21 {
require.Equal(t, fallbackTimestamps[i+offset], fallbackTimestamps[i+offset-1])
}
}
})
}

@ -0,0 +1,31 @@
[server]
rpc_port = 8545
[backend]
response_timeout_seconds = 1
max_degraded_latency_threshold = "30ms"
[backends]
[backends.normal]
rpc_url = "$NODE1_URL"
[backends.fallback]
rpc_url = "$NODE2_URL"
[backend_groups]
[backend_groups.node]
backends = ["normal", "fallback"]
consensus_aware = true
consensus_handler = "noop" # allow more control over the consensus poller for tests
consensus_ban_period = "1m"
consensus_max_update_threshold = "2m"
consensus_max_block_lag = 8
consensus_min_peer_count = 4
fallbacks = ["fallback"]
[rpc_method_mappings]
eth_call = "node"
eth_chainId = "node"
eth_blockNumber = "node"
eth_getBlockByNumber = "node"
consensus_getReceipts = "node"

@ -410,6 +410,24 @@ var (
}, []string{
"backend_name",
})
healthyPrimaryCandidates = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "healthy_candidates",
Help: "Record the number of healthy primary candidates",
}, []string{
"backend_group_name",
})
backendGroupFallbackBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_group_fallback_backenend",
Help: "Bool gauge for if a backend is a fallback for a backend group",
}, []string{
"backend_group",
"backend_name",
"fallback",
})
)
func RecordRedisError(source string) {
@ -541,6 +559,10 @@ func RecordConsensusBackendBanned(b *Backend, banned bool) {
consensusBannedBackends.WithLabelValues(b.Name).Set(boolToFloat64(banned))
}
func RecordHealthyCandidates(b *BackendGroup, candidates int) {
healthyPrimaryCandidates.WithLabelValues(b.Name).Set(float64(candidates))
}
func RecordConsensusBackendPeerCount(b *Backend, peerCount uint64) {
consensusPeerCountBackend.WithLabelValues(b.Name).Set(float64(peerCount))
}
@ -567,6 +589,10 @@ func RecordBackendNetworkErrorRateSlidingWindow(b *Backend, rate float64) {
networkErrorRateBackend.WithLabelValues(b.Name).Set(rate)
}
func RecordBackendGroupFallbacks(bg *BackendGroup, name string, fallback bool) {
backendGroupFallbackBackend.WithLabelValues(bg.Name, name, strconv.FormatBool(fallback)).Set(boolToFloat64(fallback))
}
func boolToFloat64(b bool) float64 {
if b {
return 1

@ -187,17 +187,47 @@ func Start(config *Config) (*Server, func(), error) {
backendGroups := make(map[string]*BackendGroup)
for bgName, bg := range config.BackendGroups {
backends := make([]*Backend, 0)
fallbackBackends := make(map[string]bool)
fallbackCount := 0
for _, bName := range bg.Backends {
if backendsByName[bName] == nil {
return nil, nil, fmt.Errorf("backend %s is not defined", bName)
}
backends = append(backends, backendsByName[bName])
for _, fb := range bg.Fallbacks {
if bName == fb {
fallbackBackends[bName] = true
log.Info("configured backend as fallback",
"backend_name", bName,
"backend_group", bgName,
)
fallbackCount++
}
}
if _, ok := fallbackBackends[bName]; !ok {
fallbackBackends[bName] = false
log.Info("configured backend as primary",
"backend_name", bName,
"backend_group", bgName,
)
}
}
if fallbackCount != len(bg.Fallbacks) {
return nil, nil,
fmt.Errorf(
"error: number of fallbacks instantiated (%d) did not match configured (%d) for backend group %s",
fallbackCount, len(bg.Fallbacks), bgName,
)
}
backendGroups[bgName] = &BackendGroup{
Name: bgName,
Backends: backends,
WeightedRouting: bg.WeightedRouting,
Name: bgName,
Backends: backends,
WeightedRouting: bg.WeightedRouting,
FallbackBackends: fallbackBackends,
}
}
@ -350,6 +380,15 @@ func Start(config *Config) (*Server, func(), error) {
copts = append(copts, WithPollerInterval(time.Duration(bgcfg.ConsensusPollerInterval)))
}
for _, be := range bgcfg.Backends {
if fallback, ok := bg.FallbackBackends[be]; !ok {
log.Crit("error backend not found in backend fallback configurations", "backend_name", be)
} else {
log.Debug("configuring new backend for group", "backend_group", bgName, "backend_name", be, "fallback", fallback)
RecordBackendGroupFallbacks(bg, be, fallback)
}
}
var tracker ConsensusTracker
if bgcfg.ConsensusHA {
if bgcfg.ConsensusHARedis.URL == "" {