diff --git a/proxyd/proxyd/cache.go b/proxyd/proxyd/cache.go index 73b7fd8..3a3731e 100644 --- a/proxyd/proxyd/cache.go +++ b/proxyd/proxyd/cache.go @@ -12,6 +12,7 @@ import ( type Cache interface { Get(ctx context.Context, key string) (string, error) Put(ctx context.Context, key string, value string) error + Clear(ctx context.Context) error } const ( @@ -42,6 +43,11 @@ func (c *cache) Put(ctx context.Context, key string, value string) error { return nil } +func (c *cache) Clear(ctx context.Context) error { + c.lru.Purge() + return nil +} + type redisCache struct { rdb *redis.Client } @@ -75,6 +81,29 @@ func (c *redisCache) Put(ctx context.Context, key string, value string) error { return err } +func (c *redisCache) Clear(ctx context.Context) error { + patterns := []string{"lvc:*", "method:*"} + + for _, p := range patterns { + scmd := c.rdb.Keys(ctx, p) + err := scmd.Err() + if err != nil { + RecordRedisError("CacheClear") + return err + } + keys, _ := scmd.Result() + + icmd := c.rdb.Del(ctx, keys...) + err = icmd.Err() + if err != nil { + RecordRedisError("CacheClear") + return err + } + } + + return nil +} + type cacheWithCompression struct { cache Cache } @@ -103,6 +132,10 @@ func (c *cacheWithCompression) Put(ctx context.Context, key string, value string return c.cache.Put(ctx, key, string(encodedVal)) } +func (c *cacheWithCompression) Clear(ctx context.Context) error { + return c.cache.Clear(ctx) +} + type GetLatestBlockNumFn func(ctx context.Context) (uint64, error) type GetLatestGasPriceFn func(ctx context.Context) (uint64, error) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index e26d6b9..b4d3b20 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -17,11 +17,14 @@ const ( PollerInterval = 1 * time.Second ) +type OnConsensusBroken func() + // ConsensusPoller checks the consensus state for each member of a BackendGroup // resolves the highest common block for multiple nodes, and reconciles the consensus // in case of block hash divergence to minimize re-orgs type ConsensusPoller struct { cancelFunc context.CancelFunc + listeners []OnConsensusBroken backendGroup *BackendGroup backendState map[*Backend]*backendState @@ -149,6 +152,16 @@ func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt { } } +func WithListener(listener OnConsensusBroken) ConsensusOpt { + return func(cp *ConsensusPoller) { + cp.AddListener(listener) + } +} + +func (cp *ConsensusPoller) AddListener(listener OnConsensusBroken) { + cp.listeners = append(cp.listeners, listener) +} + func WithBanPeriod(banPeriod time.Duration) ConsensusOpt { return func(cp *ConsensusPoller) { cp.banPeriod = banPeriod @@ -349,6 +362,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { - with minimum peer count - updated recently - not lagging + - in sync */ peerCount, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) @@ -392,7 +406,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { } if broken { - // propagate event to other interested parts, such as cache invalidator + for _, l := range cp.listeners { + l() + } log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash) } diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index 729fade..59f3664 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -286,6 +286,11 @@ func TestConsensus(t *testing.T) { h2.ResetOverrides() bg.Consensus.Unban() + listenerCalled := false + bg.Consensus.AddListener(func() { + listenerCalled = true + }) + for _, be := range bg.Backends { bg.Consensus.UpdateBackend(ctx, be) } @@ -331,7 +336,7 @@ func TestConsensus(t *testing.T) { // should resolve to 0x1, since 0x2 is out of consensus at the moment require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) - // later, when impl events, listen to broken consensus event + require.True(t, listenerCalled) }) t.Run("broken consensus with depth 2", func(t *testing.T) {