diff --git a/proxyd/proxyd/cache.go b/proxyd/proxyd/cache.go index 73b7fd8..9e399df 100644 --- a/proxyd/proxyd/cache.go +++ b/proxyd/proxyd/cache.go @@ -2,6 +2,7 @@ package proxyd import ( "context" + "strings" "time" "github.com/go-redis/redis/v8" @@ -43,16 +44,24 @@ func (c *cache) Put(ctx context.Context, key string, value string) error { } type redisCache struct { - rdb *redis.Client + rdb *redis.Client + prefix string } -func newRedisCache(rdb *redis.Client) *redisCache { - return &redisCache{rdb} +func newRedisCache(rdb *redis.Client, prefix string) *redisCache { + return &redisCache{rdb, prefix} +} + +func (c *redisCache) namespaced(key string) string { + if c.prefix == "" { + return key + } + return strings.Join([]string{c.prefix, key}, ":") } func (c *redisCache) Get(ctx context.Context, key string) (string, error) { start := time.Now() - val, err := c.rdb.Get(ctx, key).Result() + val, err := c.rdb.Get(ctx, c.namespaced(key)).Result() redisCacheDurationSumm.WithLabelValues("GET").Observe(float64(time.Since(start).Milliseconds())) if err == redis.Nil { @@ -66,7 +75,7 @@ func (c *redisCache) Get(ctx context.Context, key string) (string, error) { func (c *redisCache) Put(ctx context.Context, key string, value string) error { start := time.Now() - err := c.rdb.SetEX(ctx, key, value, redisTTL).Err() + err := c.rdb.SetEX(ctx, c.namespaced(key), value, redisTTL).Err() redisCacheDurationSumm.WithLabelValues("SETEX").Observe(float64(time.Since(start).Milliseconds())) if err != nil { diff --git a/proxyd/proxyd/config.go b/proxyd/proxyd/config.go index d140fd3..4f36931 100644 --- a/proxyd/proxyd/config.go +++ b/proxyd/proxyd/config.go @@ -32,7 +32,8 @@ type CacheConfig struct { } type RedisConfig struct { - URL string `toml:"url"` + URL string `toml:"url"` + Namespace string `toml:"namespace"` } type MetricsConfig struct { diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index e26d6b9..f077a0f 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -17,11 +17,14 @@ const ( PollerInterval = 1 * time.Second ) +type OnConsensusBroken func() + // ConsensusPoller checks the consensus state for each member of a BackendGroup // resolves the highest common block for multiple nodes, and reconciles the consensus // in case of block hash divergence to minimize re-orgs type ConsensusPoller struct { cancelFunc context.CancelFunc + listeners []OnConsensusBroken backendGroup *BackendGroup backendState map[*Backend]*backendState @@ -44,6 +47,7 @@ type backendState struct { latestBlockNumber hexutil.Uint64 latestBlockHash string peerCount uint64 + inSync bool lastUpdate time.Time @@ -149,6 +153,16 @@ func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt { } } +func WithListener(listener OnConsensusBroken) ConsensusOpt { + return func(cp *ConsensusPoller) { + cp.AddListener(listener) + } +} + +func (cp *ConsensusPoller) AddListener(listener OnConsensusBroken) { + cp.listeners = append(cp.listeners, listener) +} + func WithBanPeriod(banPeriod time.Duration) ConsensusOpt { return func(cp *ConsensusPoller) { cp.banPeriod = banPeriod @@ -215,7 +229,13 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { RecordConsensusBackendBanned(be, banned) if banned { - log.Debug("skipping backend banned", "backend", be.Name) + log.Debug("skipping backend - banned", "backend", be.Name) + return + } + + // if backend exhausted rate limit we'll skip it for now + if be.IsRateLimited() { + log.Debug("skipping backend - rate limited", "backend", be.Name) return } @@ -228,24 +248,16 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { // if backend it not in sync we'll check again after ban inSync, err := cp.isInSync(ctx, be) - if err != nil || !inSync { - log.Warn("backend banned - not in sync", "backend", be.Name) - cp.Ban(be) - return - } - RecordConsensusBackendInSync(be, inSync) - - // if backend exhausted rate limit we'll skip it for now - if be.IsRateLimited() { - return + RecordConsensusBackendInSync(be, err == nil && inSync) + if err != nil { + log.Warn("error updating backend sync state", "name", be.Name, "err", err) } var peerCount uint64 if !be.skipPeerCountCheck { peerCount, err = cp.getPeerCount(ctx, be) if err != nil { - log.Warn("error updating backend", "name", be.Name, "err", err) - return + log.Warn("error updating backend peer count", "name", be.Name, "err", err) } RecordConsensusBackendPeerCount(be, peerCount) } @@ -253,10 +265,9 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest") if err != nil { log.Warn("error updating backend", "name", be.Name, "err", err) - return } - changed, updateDelay := cp.setBackendState(be, peerCount, latestBlockNumber, latestBlockHash) + changed, updateDelay := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash) if changed { RecordBackendLatestBlock(be, latestBlockNumber) @@ -264,6 +275,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { log.Debug("backend state updated", "name", be.Name, "peerCount", peerCount, + "inSync", inSync, "latestBlockNumber", latestBlockNumber, "latestBlockHash", latestBlockHash, "updateDelay", updateDelay) @@ -280,11 +292,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { // find the highest block, in order to use it defining the highest non-lagging ancestor block for _, be := range cp.backendGroup.Backends { - peerCount, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be) + peerCount, inSync, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be) if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { continue } + if !inSync { + continue + } if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { continue } @@ -296,11 +311,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { // find the highest common ancestor block for _, be := range cp.backendGroup.Backends { - peerCount, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be) + peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be) if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { continue } + if !inSync { + continue + } if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { continue } @@ -349,14 +367,15 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { - with minimum peer count - updated recently - not lagging + - in sync */ - peerCount, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) + peerCount, inSync, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) isBanned := time.Now().Before(bannedUntil) notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount lagging := latestBlockNumber < proposedBlock - if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers || lagging { + if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync { filteredBackendsNames = append(filteredBackendsNames, be.Name) continue } @@ -392,7 +411,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { } if broken { - // propagate event to other interested parts, such as cache invalidator + for _, l := range cp.listeners { + l() + } log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash) } @@ -498,23 +519,25 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo return res, nil } -func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) { +func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) { bs := cp.backendState[be] + defer bs.backendStateMux.Unlock() bs.backendStateMux.Lock() peerCount = bs.peerCount + inSync = bs.inSync blockNumber = bs.latestBlockNumber blockHash = bs.latestBlockHash lastUpdate = bs.lastUpdate bannedUntil = bs.bannedUntil - bs.backendStateMux.Unlock() return } -func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) { +func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) { bs := cp.backendState[be] bs.backendStateMux.Lock() changed = bs.latestBlockHash != blockHash bs.peerCount = peerCount + bs.inSync = inSync bs.latestBlockNumber = blockNumber bs.latestBlockHash = blockHash updateDelay = time.Since(bs.lastUpdate) diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index 729fade..9320429 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -94,6 +94,7 @@ func TestConsensus(t *testing.T) { consensusGroup := bg.Consensus.GetConsensusGroup() require.NotContains(t, consensusGroup, be) + require.False(t, bg.Consensus.IsBanned(be)) require.Equal(t, 1, len(consensusGroup)) }) @@ -132,6 +133,7 @@ func TestConsensus(t *testing.T) { be := backend(bg, "node1") require.NotNil(t, be) require.NotContains(t, consensusGroup, be) + require.False(t, bg.Consensus.IsBanned(be)) require.Equal(t, 1, len(consensusGroup)) }) @@ -232,6 +234,7 @@ func TestConsensus(t *testing.T) { consensusGroup := bg.Consensus.GetConsensusGroup() require.NotContains(t, consensusGroup, be) + require.False(t, bg.Consensus.IsBanned(be)) require.Equal(t, 1, len(consensusGroup)) }) @@ -286,6 +289,11 @@ func TestConsensus(t *testing.T) { h2.ResetOverrides() bg.Consensus.Unban() + listenerCalled := false + bg.Consensus.AddListener(func() { + listenerCalled = true + }) + for _, be := range bg.Backends { bg.Consensus.UpdateBackend(ctx, be) } @@ -331,7 +339,7 @@ func TestConsensus(t *testing.T) { // should resolve to 0x1, since 0x2 is out of consensus at the moment require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) - // later, when impl events, listen to broken consensus event + require.True(t, listenerCalled) }) t.Run("broken consensus with depth 2", func(t *testing.T) { diff --git a/proxyd/proxyd/integration_tests/testdata/caching.toml b/proxyd/proxyd/integration_tests/testdata/caching.toml index cd14ff3..ef9d6de 100644 --- a/proxyd/proxyd/integration_tests/testdata/caching.toml +++ b/proxyd/proxyd/integration_tests/testdata/caching.toml @@ -6,6 +6,7 @@ response_timeout_seconds = 1 [redis] url = "$REDIS_URL" +namespace = "proxyd" [cache] enabled = true diff --git a/proxyd/proxyd/proxyd.go b/proxyd/proxyd/proxyd.go index fa0371b..4fc286e 100644 --- a/proxyd/proxyd/proxyd.go +++ b/proxyd/proxyd/proxyd.go @@ -236,7 +236,7 @@ func Start(config *Config) (*Server, func(), error) { log.Warn("redis is not configured, using in-memory cache") cache = newMemoryCache() } else { - cache = newRedisCache(redisClient) + cache = newRedisCache(redisClient, config.Redis.Namespace) } // Ideally, the BlocKSyncRPCURL should be the sequencer or a HA replica that's not far behind ethClient, err := ethclient.Dial(blockSyncRPCURL)