proxyd: emit event on consensus broken

This commit is contained in:
Felipe Andrade 2023-04-26 16:31:03 -07:00
parent 21284bd651
commit e025bcc4b4
3 changed files with 56 additions and 2 deletions

@ -12,6 +12,7 @@ import (
type Cache interface { type Cache interface {
Get(ctx context.Context, key string) (string, error) Get(ctx context.Context, key string) (string, error)
Put(ctx context.Context, key string, value string) error Put(ctx context.Context, key string, value string) error
Clear(ctx context.Context) error
} }
const ( const (
@ -42,6 +43,11 @@ func (c *cache) Put(ctx context.Context, key string, value string) error {
return nil return nil
} }
func (c *cache) Clear(ctx context.Context) error {
c.lru.Purge()
return nil
}
type redisCache struct { type redisCache struct {
rdb *redis.Client rdb *redis.Client
} }
@ -75,6 +81,29 @@ func (c *redisCache) Put(ctx context.Context, key string, value string) error {
return err return err
} }
func (c *redisCache) Clear(ctx context.Context) error {
patterns := []string{"lvc:*", "method:*"}
for _, p := range patterns {
scmd := c.rdb.Keys(ctx, p)
err := scmd.Err()
if err != nil {
RecordRedisError("CacheClear")
return err
}
keys, _ := scmd.Result()
icmd := c.rdb.Del(ctx, keys...)
err = icmd.Err()
if err != nil {
RecordRedisError("CacheClear")
return err
}
}
return nil
}
type cacheWithCompression struct { type cacheWithCompression struct {
cache Cache cache Cache
} }
@ -103,6 +132,10 @@ func (c *cacheWithCompression) Put(ctx context.Context, key string, value string
return c.cache.Put(ctx, key, string(encodedVal)) return c.cache.Put(ctx, key, string(encodedVal))
} }
func (c *cacheWithCompression) Clear(ctx context.Context) error {
return c.cache.Clear(ctx)
}
type GetLatestBlockNumFn func(ctx context.Context) (uint64, error) type GetLatestBlockNumFn func(ctx context.Context) (uint64, error)
type GetLatestGasPriceFn func(ctx context.Context) (uint64, error) type GetLatestGasPriceFn func(ctx context.Context) (uint64, error)

@ -17,11 +17,14 @@ const (
PollerInterval = 1 * time.Second PollerInterval = 1 * time.Second
) )
type OnConsensusBroken func()
// ConsensusPoller checks the consensus state for each member of a BackendGroup // ConsensusPoller checks the consensus state for each member of a BackendGroup
// resolves the highest common block for multiple nodes, and reconciles the consensus // resolves the highest common block for multiple nodes, and reconciles the consensus
// in case of block hash divergence to minimize re-orgs // in case of block hash divergence to minimize re-orgs
type ConsensusPoller struct { type ConsensusPoller struct {
cancelFunc context.CancelFunc cancelFunc context.CancelFunc
listeners []OnConsensusBroken
backendGroup *BackendGroup backendGroup *BackendGroup
backendState map[*Backend]*backendState backendState map[*Backend]*backendState
@ -149,6 +152,16 @@ func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt {
} }
} }
func WithListener(listener OnConsensusBroken) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.AddListener(listener)
}
}
func (cp *ConsensusPoller) AddListener(listener OnConsensusBroken) {
cp.listeners = append(cp.listeners, listener)
}
func WithBanPeriod(banPeriod time.Duration) ConsensusOpt { func WithBanPeriod(banPeriod time.Duration) ConsensusOpt {
return func(cp *ConsensusPoller) { return func(cp *ConsensusPoller) {
cp.banPeriod = banPeriod cp.banPeriod = banPeriod
@ -349,6 +362,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
- with minimum peer count - with minimum peer count
- updated recently - updated recently
- not lagging - not lagging
- in sync
*/ */
peerCount, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) peerCount, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be)
@ -392,7 +406,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
} }
if broken { if broken {
// propagate event to other interested parts, such as cache invalidator for _, l := range cp.listeners {
l()
}
log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash) log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash)
} }

@ -286,6 +286,11 @@ func TestConsensus(t *testing.T) {
h2.ResetOverrides() h2.ResetOverrides()
bg.Consensus.Unban() bg.Consensus.Unban()
listenerCalled := false
bg.Consensus.AddListener(func() {
listenerCalled = true
})
for _, be := range bg.Backends { for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be) bg.Consensus.UpdateBackend(ctx, be)
} }
@ -331,7 +336,7 @@ func TestConsensus(t *testing.T) {
// should resolve to 0x1, since 0x2 is out of consensus at the moment // should resolve to 0x1, since 0x2 is out of consensus at the moment
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// later, when impl events, listen to broken consensus event require.True(t, listenerCalled)
}) })
t.Run("broken consensus with depth 2", func(t *testing.T) { t.Run("broken consensus with depth 2", func(t *testing.T) {