From e025bcc4b4b8c3a457d784023f59302b39ff3057 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Wed, 26 Apr 2023 16:31:03 -0700 Subject: [PATCH 1/3] proxyd: emit event on consensus broken --- proxyd/proxyd/cache.go | 33 +++++++++++++++++++ proxyd/proxyd/consensus_poller.go | 18 +++++++++- .../integration_tests/consensus_test.go | 7 +++- 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/proxyd/proxyd/cache.go b/proxyd/proxyd/cache.go index 73b7fd8..3a3731e 100644 --- a/proxyd/proxyd/cache.go +++ b/proxyd/proxyd/cache.go @@ -12,6 +12,7 @@ import ( type Cache interface { Get(ctx context.Context, key string) (string, error) Put(ctx context.Context, key string, value string) error + Clear(ctx context.Context) error } const ( @@ -42,6 +43,11 @@ func (c *cache) Put(ctx context.Context, key string, value string) error { return nil } +func (c *cache) Clear(ctx context.Context) error { + c.lru.Purge() + return nil +} + type redisCache struct { rdb *redis.Client } @@ -75,6 +81,29 @@ func (c *redisCache) Put(ctx context.Context, key string, value string) error { return err } +func (c *redisCache) Clear(ctx context.Context) error { + patterns := []string{"lvc:*", "method:*"} + + for _, p := range patterns { + scmd := c.rdb.Keys(ctx, p) + err := scmd.Err() + if err != nil { + RecordRedisError("CacheClear") + return err + } + keys, _ := scmd.Result() + + icmd := c.rdb.Del(ctx, keys...) + err = icmd.Err() + if err != nil { + RecordRedisError("CacheClear") + return err + } + } + + return nil +} + type cacheWithCompression struct { cache Cache } @@ -103,6 +132,10 @@ func (c *cacheWithCompression) Put(ctx context.Context, key string, value string return c.cache.Put(ctx, key, string(encodedVal)) } +func (c *cacheWithCompression) Clear(ctx context.Context) error { + return c.cache.Clear(ctx) +} + type GetLatestBlockNumFn func(ctx context.Context) (uint64, error) type GetLatestGasPriceFn func(ctx context.Context) (uint64, error) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index e26d6b9..b4d3b20 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -17,11 +17,14 @@ const ( PollerInterval = 1 * time.Second ) +type OnConsensusBroken func() + // ConsensusPoller checks the consensus state for each member of a BackendGroup // resolves the highest common block for multiple nodes, and reconciles the consensus // in case of block hash divergence to minimize re-orgs type ConsensusPoller struct { cancelFunc context.CancelFunc + listeners []OnConsensusBroken backendGroup *BackendGroup backendState map[*Backend]*backendState @@ -149,6 +152,16 @@ func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt { } } +func WithListener(listener OnConsensusBroken) ConsensusOpt { + return func(cp *ConsensusPoller) { + cp.AddListener(listener) + } +} + +func (cp *ConsensusPoller) AddListener(listener OnConsensusBroken) { + cp.listeners = append(cp.listeners, listener) +} + func WithBanPeriod(banPeriod time.Duration) ConsensusOpt { return func(cp *ConsensusPoller) { cp.banPeriod = banPeriod @@ -349,6 +362,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { - with minimum peer count - updated recently - not lagging + - in sync */ peerCount, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) @@ -392,7 +406,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { } if broken { - // propagate event to other interested parts, such as cache invalidator + for _, l := range cp.listeners { + l() + } log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash) } diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index 729fade..59f3664 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -286,6 +286,11 @@ func TestConsensus(t *testing.T) { h2.ResetOverrides() bg.Consensus.Unban() + listenerCalled := false + bg.Consensus.AddListener(func() { + listenerCalled = true + }) + for _, be := range bg.Backends { bg.Consensus.UpdateBackend(ctx, be) } @@ -331,7 +336,7 @@ func TestConsensus(t *testing.T) { // should resolve to 0x1, since 0x2 is out of consensus at the moment require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) - // later, when impl events, listen to broken consensus event + require.True(t, listenerCalled) }) t.Run("broken consensus with depth 2", func(t *testing.T) { From 491369d32f695455f16d11cb70e6874da07dc08f Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Thu, 11 May 2023 15:44:12 -0700 Subject: [PATCH 2/3] lint --- proxyd/proxyd/consensus_poller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index b4d3b20..fbb8a17 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -362,7 +362,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { - with minimum peer count - updated recently - not lagging - - in sync + - in sync */ peerCount, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) From ef42dde6e1c41b17d777435ab8d5a80ce9152a29 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Thu, 11 May 2023 15:45:33 -0700 Subject: [PATCH 3/3] revert cache.go --- proxyd/proxyd/cache.go | 33 --------------------------------- 1 file changed, 33 deletions(-) diff --git a/proxyd/proxyd/cache.go b/proxyd/proxyd/cache.go index 3a3731e..73b7fd8 100644 --- a/proxyd/proxyd/cache.go +++ b/proxyd/proxyd/cache.go @@ -12,7 +12,6 @@ import ( type Cache interface { Get(ctx context.Context, key string) (string, error) Put(ctx context.Context, key string, value string) error - Clear(ctx context.Context) error } const ( @@ -43,11 +42,6 @@ func (c *cache) Put(ctx context.Context, key string, value string) error { return nil } -func (c *cache) Clear(ctx context.Context) error { - c.lru.Purge() - return nil -} - type redisCache struct { rdb *redis.Client } @@ -81,29 +75,6 @@ func (c *redisCache) Put(ctx context.Context, key string, value string) error { return err } -func (c *redisCache) Clear(ctx context.Context) error { - patterns := []string{"lvc:*", "method:*"} - - for _, p := range patterns { - scmd := c.rdb.Keys(ctx, p) - err := scmd.Err() - if err != nil { - RecordRedisError("CacheClear") - return err - } - keys, _ := scmd.Result() - - icmd := c.rdb.Del(ctx, keys...) - err = icmd.Err() - if err != nil { - RecordRedisError("CacheClear") - return err - } - } - - return nil -} - type cacheWithCompression struct { cache Cache } @@ -132,10 +103,6 @@ func (c *cacheWithCompression) Put(ctx context.Context, key string, value string return c.cache.Put(ctx, key, string(encodedVal)) } -func (c *cacheWithCompression) Clear(ctx context.Context) error { - return c.cache.Clear(ctx) -} - type GetLatestBlockNumFn func(ctx context.Context) (uint64, error) type GetLatestGasPriceFn func(ctx context.Context) (uint64, error)