From 8e45b5d5ca1a7202a631c66d23666725da5560eb Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Thu, 11 May 2023 15:19:52 -0700 Subject: [PATCH 1/6] feat(proxyd): prevent banning out-of-sync backend --- proxyd/proxyd/consensus_poller.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index e26d6b9..3639a25 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -215,7 +215,15 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { RecordConsensusBackendBanned(be, banned) if banned { - log.Debug("skipping backend banned", "backend", be.Name) + log.Debug("skipping backend - banned", "backend", be.Name) + return + } + + // if backend it not in sync we'll check again after ban + inSync, err := cp.isInSync(ctx, be) + RecordConsensusBackendInSync(be, err == nil && inSync) + if err != nil || !inSync { + log.Warn("skipping backend - not in sync", "backend", be.Name) return } @@ -226,15 +234,6 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { return } - // if backend it not in sync we'll check again after ban - inSync, err := cp.isInSync(ctx, be) - if err != nil || !inSync { - log.Warn("backend banned - not in sync", "backend", be.Name) - cp.Ban(be) - return - } - RecordConsensusBackendInSync(be, inSync) - // if backend exhausted rate limit we'll skip it for now if be.IsRateLimited() { return From 5c4b805efc25bb88c4d6a5ae6013a53099315b25 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Thu, 11 May 2023 15:33:47 -0700 Subject: [PATCH 2/6] ban logic --- proxyd/proxyd/consensus_poller.go | 46 +++++++++++-------- .../integration_tests/consensus_test.go | 3 ++ 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index 3639a25..d79e67a 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -44,6 +44,7 @@ type backendState struct { latestBlockNumber hexutil.Uint64 latestBlockHash string peerCount uint64 + inSync bool lastUpdate time.Time @@ -219,11 +220,9 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { return } - // if backend it not in sync we'll check again after ban - inSync, err := cp.isInSync(ctx, be) - RecordConsensusBackendInSync(be, err == nil && inSync) - if err != nil || !inSync { - log.Warn("skipping backend - not in sync", "backend", be.Name) + // if backend exhausted rate limit we'll skip it for now + if be.IsRateLimited() { + log.Debug("skipping backend - rate limited", "backend", be.Name) return } @@ -234,17 +233,18 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { return } - // if backend exhausted rate limit we'll skip it for now - if be.IsRateLimited() { - return + // if backend it not in sync we'll check again after ban + inSync, err := cp.isInSync(ctx, be) + RecordConsensusBackendInSync(be, err == nil && inSync) + if err != nil { + log.Warn("error updating backend sync state", "name", be.Name, "err", err) } var peerCount uint64 if !be.skipPeerCountCheck { peerCount, err = cp.getPeerCount(ctx, be) if err != nil { - log.Warn("error updating backend", "name", be.Name, "err", err) - return + log.Warn("error updating backend peer count", "name", be.Name, "err", err) } RecordConsensusBackendPeerCount(be, peerCount) } @@ -252,10 +252,9 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest") if err != nil { log.Warn("error updating backend", "name", be.Name, "err", err) - return } - changed, updateDelay := cp.setBackendState(be, peerCount, latestBlockNumber, latestBlockHash) + changed, updateDelay := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash) if changed { RecordBackendLatestBlock(be, latestBlockNumber) @@ -263,6 +262,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { log.Debug("backend state updated", "name", be.Name, "peerCount", peerCount, + "inSync", inSync, "latestBlockNumber", latestBlockNumber, "latestBlockHash", latestBlockHash, "updateDelay", updateDelay) @@ -279,11 +279,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { // find the highest block, in order to use it defining the highest non-lagging ancestor block for _, be := range cp.backendGroup.Backends { - peerCount, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be) + peerCount, inSync, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be) if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { continue } + if !inSync { + continue + } if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { continue } @@ -295,11 +298,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { // find the highest common ancestor block for _, be := range cp.backendGroup.Backends { - peerCount, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be) + peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be) if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { continue } + if !inSync { + continue + } if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { continue } @@ -350,12 +356,12 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { - not lagging */ - peerCount, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) + peerCount, inSync, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) isBanned := time.Now().Before(bannedUntil) notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount lagging := latestBlockNumber < proposedBlock - if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers || lagging { + if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync { filteredBackendsNames = append(filteredBackendsNames, be.Name) continue } @@ -497,23 +503,25 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo return res, nil } -func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) { +func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) { bs := cp.backendState[be] + defer bs.backendStateMux.Unlock() bs.backendStateMux.Lock() peerCount = bs.peerCount + inSync = bs.inSync blockNumber = bs.latestBlockNumber blockHash = bs.latestBlockHash lastUpdate = bs.lastUpdate bannedUntil = bs.bannedUntil - bs.backendStateMux.Unlock() return } -func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) { +func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) { bs := cp.backendState[be] bs.backendStateMux.Lock() changed = bs.latestBlockHash != blockHash bs.peerCount = peerCount + bs.inSync = inSync bs.latestBlockNumber = blockNumber bs.latestBlockHash = blockHash updateDelay = time.Since(bs.lastUpdate) diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index 729fade..41f7e22 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -94,6 +94,7 @@ func TestConsensus(t *testing.T) { consensusGroup := bg.Consensus.GetConsensusGroup() require.NotContains(t, consensusGroup, be) + require.False(t, bg.Consensus.IsBanned(be)) require.Equal(t, 1, len(consensusGroup)) }) @@ -132,6 +133,7 @@ func TestConsensus(t *testing.T) { be := backend(bg, "node1") require.NotNil(t, be) require.NotContains(t, consensusGroup, be) + require.False(t, bg.Consensus.IsBanned(be)) require.Equal(t, 1, len(consensusGroup)) }) @@ -232,6 +234,7 @@ func TestConsensus(t *testing.T) { consensusGroup := bg.Consensus.GetConsensusGroup() require.NotContains(t, consensusGroup, be) + require.False(t, bg.Consensus.IsBanned(be)) require.Equal(t, 1, len(consensusGroup)) }) From e025bcc4b4b8c3a457d784023f59302b39ff3057 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Wed, 26 Apr 2023 16:31:03 -0700 Subject: [PATCH 3/6] proxyd: emit event on consensus broken --- proxyd/proxyd/cache.go | 33 +++++++++++++++++++ proxyd/proxyd/consensus_poller.go | 18 +++++++++- .../integration_tests/consensus_test.go | 7 +++- 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/proxyd/proxyd/cache.go b/proxyd/proxyd/cache.go index 73b7fd8..3a3731e 100644 --- a/proxyd/proxyd/cache.go +++ b/proxyd/proxyd/cache.go @@ -12,6 +12,7 @@ import ( type Cache interface { Get(ctx context.Context, key string) (string, error) Put(ctx context.Context, key string, value string) error + Clear(ctx context.Context) error } const ( @@ -42,6 +43,11 @@ func (c *cache) Put(ctx context.Context, key string, value string) error { return nil } +func (c *cache) Clear(ctx context.Context) error { + c.lru.Purge() + return nil +} + type redisCache struct { rdb *redis.Client } @@ -75,6 +81,29 @@ func (c *redisCache) Put(ctx context.Context, key string, value string) error { return err } +func (c *redisCache) Clear(ctx context.Context) error { + patterns := []string{"lvc:*", "method:*"} + + for _, p := range patterns { + scmd := c.rdb.Keys(ctx, p) + err := scmd.Err() + if err != nil { + RecordRedisError("CacheClear") + return err + } + keys, _ := scmd.Result() + + icmd := c.rdb.Del(ctx, keys...) + err = icmd.Err() + if err != nil { + RecordRedisError("CacheClear") + return err + } + } + + return nil +} + type cacheWithCompression struct { cache Cache } @@ -103,6 +132,10 @@ func (c *cacheWithCompression) Put(ctx context.Context, key string, value string return c.cache.Put(ctx, key, string(encodedVal)) } +func (c *cacheWithCompression) Clear(ctx context.Context) error { + return c.cache.Clear(ctx) +} + type GetLatestBlockNumFn func(ctx context.Context) (uint64, error) type GetLatestGasPriceFn func(ctx context.Context) (uint64, error) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index e26d6b9..b4d3b20 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -17,11 +17,14 @@ const ( PollerInterval = 1 * time.Second ) +type OnConsensusBroken func() + // ConsensusPoller checks the consensus state for each member of a BackendGroup // resolves the highest common block for multiple nodes, and reconciles the consensus // in case of block hash divergence to minimize re-orgs type ConsensusPoller struct { cancelFunc context.CancelFunc + listeners []OnConsensusBroken backendGroup *BackendGroup backendState map[*Backend]*backendState @@ -149,6 +152,16 @@ func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt { } } +func WithListener(listener OnConsensusBroken) ConsensusOpt { + return func(cp *ConsensusPoller) { + cp.AddListener(listener) + } +} + +func (cp *ConsensusPoller) AddListener(listener OnConsensusBroken) { + cp.listeners = append(cp.listeners, listener) +} + func WithBanPeriod(banPeriod time.Duration) ConsensusOpt { return func(cp *ConsensusPoller) { cp.banPeriod = banPeriod @@ -349,6 +362,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { - with minimum peer count - updated recently - not lagging + - in sync */ peerCount, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) @@ -392,7 +406,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { } if broken { - // propagate event to other interested parts, such as cache invalidator + for _, l := range cp.listeners { + l() + } log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash) } diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index 729fade..59f3664 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -286,6 +286,11 @@ func TestConsensus(t *testing.T) { h2.ResetOverrides() bg.Consensus.Unban() + listenerCalled := false + bg.Consensus.AddListener(func() { + listenerCalled = true + }) + for _, be := range bg.Backends { bg.Consensus.UpdateBackend(ctx, be) } @@ -331,7 +336,7 @@ func TestConsensus(t *testing.T) { // should resolve to 0x1, since 0x2 is out of consensus at the moment require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) - // later, when impl events, listen to broken consensus event + require.True(t, listenerCalled) }) t.Run("broken consensus with depth 2", func(t *testing.T) { From 491369d32f695455f16d11cb70e6874da07dc08f Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Thu, 11 May 2023 15:44:12 -0700 Subject: [PATCH 4/6] lint --- proxyd/proxyd/consensus_poller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index b4d3b20..fbb8a17 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -362,7 +362,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { - with minimum peer count - updated recently - not lagging - - in sync + - in sync */ peerCount, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) From ef42dde6e1c41b17d777435ab8d5a80ce9152a29 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Thu, 11 May 2023 15:45:33 -0700 Subject: [PATCH 5/6] revert cache.go --- proxyd/proxyd/cache.go | 33 --------------------------------- 1 file changed, 33 deletions(-) diff --git a/proxyd/proxyd/cache.go b/proxyd/proxyd/cache.go index 3a3731e..73b7fd8 100644 --- a/proxyd/proxyd/cache.go +++ b/proxyd/proxyd/cache.go @@ -12,7 +12,6 @@ import ( type Cache interface { Get(ctx context.Context, key string) (string, error) Put(ctx context.Context, key string, value string) error - Clear(ctx context.Context) error } const ( @@ -43,11 +42,6 @@ func (c *cache) Put(ctx context.Context, key string, value string) error { return nil } -func (c *cache) Clear(ctx context.Context) error { - c.lru.Purge() - return nil -} - type redisCache struct { rdb *redis.Client } @@ -81,29 +75,6 @@ func (c *redisCache) Put(ctx context.Context, key string, value string) error { return err } -func (c *redisCache) Clear(ctx context.Context) error { - patterns := []string{"lvc:*", "method:*"} - - for _, p := range patterns { - scmd := c.rdb.Keys(ctx, p) - err := scmd.Err() - if err != nil { - RecordRedisError("CacheClear") - return err - } - keys, _ := scmd.Result() - - icmd := c.rdb.Del(ctx, keys...) - err = icmd.Err() - if err != nil { - RecordRedisError("CacheClear") - return err - } - } - - return nil -} - type cacheWithCompression struct { cache Cache } @@ -132,10 +103,6 @@ func (c *cacheWithCompression) Put(ctx context.Context, key string, value string return c.cache.Put(ctx, key, string(encodedVal)) } -func (c *cacheWithCompression) Clear(ctx context.Context) error { - return c.cache.Clear(ctx) -} - type GetLatestBlockNumFn func(ctx context.Context) (uint64, error) type GetLatestGasPriceFn func(ctx context.Context) (uint64, error) From 29f7aa88ab91b9f8332bc7a0de637048f92b42a8 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Sat, 13 May 2023 22:33:09 -0700 Subject: [PATCH 6/6] feat(proxyd): redis namespace --- proxyd/proxyd/cache.go | 19 ++++++++++++++----- proxyd/proxyd/config.go | 3 ++- .../integration_tests/testdata/caching.toml | 1 + proxyd/proxyd/proxyd.go | 2 +- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/proxyd/proxyd/cache.go b/proxyd/proxyd/cache.go index 73b7fd8..9e399df 100644 --- a/proxyd/proxyd/cache.go +++ b/proxyd/proxyd/cache.go @@ -2,6 +2,7 @@ package proxyd import ( "context" + "strings" "time" "github.com/go-redis/redis/v8" @@ -43,16 +44,24 @@ func (c *cache) Put(ctx context.Context, key string, value string) error { } type redisCache struct { - rdb *redis.Client + rdb *redis.Client + prefix string } -func newRedisCache(rdb *redis.Client) *redisCache { - return &redisCache{rdb} +func newRedisCache(rdb *redis.Client, prefix string) *redisCache { + return &redisCache{rdb, prefix} +} + +func (c *redisCache) namespaced(key string) string { + if c.prefix == "" { + return key + } + return strings.Join([]string{c.prefix, key}, ":") } func (c *redisCache) Get(ctx context.Context, key string) (string, error) { start := time.Now() - val, err := c.rdb.Get(ctx, key).Result() + val, err := c.rdb.Get(ctx, c.namespaced(key)).Result() redisCacheDurationSumm.WithLabelValues("GET").Observe(float64(time.Since(start).Milliseconds())) if err == redis.Nil { @@ -66,7 +75,7 @@ func (c *redisCache) Get(ctx context.Context, key string) (string, error) { func (c *redisCache) Put(ctx context.Context, key string, value string) error { start := time.Now() - err := c.rdb.SetEX(ctx, key, value, redisTTL).Err() + err := c.rdb.SetEX(ctx, c.namespaced(key), value, redisTTL).Err() redisCacheDurationSumm.WithLabelValues("SETEX").Observe(float64(time.Since(start).Milliseconds())) if err != nil { diff --git a/proxyd/proxyd/config.go b/proxyd/proxyd/config.go index d140fd3..4f36931 100644 --- a/proxyd/proxyd/config.go +++ b/proxyd/proxyd/config.go @@ -32,7 +32,8 @@ type CacheConfig struct { } type RedisConfig struct { - URL string `toml:"url"` + URL string `toml:"url"` + Namespace string `toml:"namespace"` } type MetricsConfig struct { diff --git a/proxyd/proxyd/integration_tests/testdata/caching.toml b/proxyd/proxyd/integration_tests/testdata/caching.toml index cd14ff3..ef9d6de 100644 --- a/proxyd/proxyd/integration_tests/testdata/caching.toml +++ b/proxyd/proxyd/integration_tests/testdata/caching.toml @@ -6,6 +6,7 @@ response_timeout_seconds = 1 [redis] url = "$REDIS_URL" +namespace = "proxyd" [cache] enabled = true diff --git a/proxyd/proxyd/proxyd.go b/proxyd/proxyd/proxyd.go index fa0371b..4fc286e 100644 --- a/proxyd/proxyd/proxyd.go +++ b/proxyd/proxyd/proxyd.go @@ -236,7 +236,7 @@ func Start(config *Config) (*Server, func(), error) { log.Warn("redis is not configured, using in-memory cache") cache = newMemoryCache() } else { - cache = newRedisCache(redisClient) + cache = newRedisCache(redisClient, config.Redis.Namespace) } // Ideally, the BlocKSyncRPCURL should be the sequencer or a HA replica that's not far behind ethClient, err := ethclient.Dial(blockSyncRPCURL)