Merge branch 'develop' into felipe/fix-consensus-shutdown

This commit is contained in:
mergify[bot] 2023-05-18 04:26:41 +00:00 committed by GitHub
commit 64334e75df
6 changed files with 73 additions and 31 deletions

@ -2,6 +2,7 @@ package proxyd
import (
"context"
"strings"
"time"
"github.com/go-redis/redis/v8"
@ -43,16 +44,24 @@ func (c *cache) Put(ctx context.Context, key string, value string) error {
}
type redisCache struct {
rdb *redis.Client
rdb *redis.Client
prefix string
}
func newRedisCache(rdb *redis.Client) *redisCache {
return &redisCache{rdb}
func newRedisCache(rdb *redis.Client, prefix string) *redisCache {
return &redisCache{rdb, prefix}
}
func (c *redisCache) namespaced(key string) string {
if c.prefix == "" {
return key
}
return strings.Join([]string{c.prefix, key}, ":")
}
func (c *redisCache) Get(ctx context.Context, key string) (string, error) {
start := time.Now()
val, err := c.rdb.Get(ctx, key).Result()
val, err := c.rdb.Get(ctx, c.namespaced(key)).Result()
redisCacheDurationSumm.WithLabelValues("GET").Observe(float64(time.Since(start).Milliseconds()))
if err == redis.Nil {
@ -66,7 +75,7 @@ func (c *redisCache) Get(ctx context.Context, key string) (string, error) {
func (c *redisCache) Put(ctx context.Context, key string, value string) error {
start := time.Now()
err := c.rdb.SetEX(ctx, key, value, redisTTL).Err()
err := c.rdb.SetEX(ctx, c.namespaced(key), value, redisTTL).Err()
redisCacheDurationSumm.WithLabelValues("SETEX").Observe(float64(time.Since(start).Milliseconds()))
if err != nil {

@ -32,7 +32,8 @@ type CacheConfig struct {
}
type RedisConfig struct {
URL string `toml:"url"`
URL string `toml:"url"`
Namespace string `toml:"namespace"`
}
type MetricsConfig struct {

@ -17,11 +17,14 @@ const (
PollerInterval = 1 * time.Second
)
type OnConsensusBroken func()
// ConsensusPoller checks the consensus state for each member of a BackendGroup
// resolves the highest common block for multiple nodes, and reconciles the consensus
// in case of block hash divergence to minimize re-orgs
type ConsensusPoller struct {
cancelFunc context.CancelFunc
listeners []OnConsensusBroken
backendGroup *BackendGroup
backendState map[*Backend]*backendState
@ -44,6 +47,7 @@ type backendState struct {
latestBlockNumber hexutil.Uint64
latestBlockHash string
peerCount uint64
inSync bool
lastUpdate time.Time
@ -149,6 +153,16 @@ func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt {
}
}
func WithListener(listener OnConsensusBroken) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.AddListener(listener)
}
}
func (cp *ConsensusPoller) AddListener(listener OnConsensusBroken) {
cp.listeners = append(cp.listeners, listener)
}
func WithBanPeriod(banPeriod time.Duration) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.banPeriod = banPeriod
@ -215,7 +229,13 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
RecordConsensusBackendBanned(be, banned)
if banned {
log.Debug("skipping backend banned", "backend", be.Name)
log.Debug("skipping backend - banned", "backend", be.Name)
return
}
// if backend exhausted rate limit we'll skip it for now
if be.IsRateLimited() {
log.Debug("skipping backend - rate limited", "backend", be.Name)
return
}
@ -228,24 +248,16 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
// if backend it not in sync we'll check again after ban
inSync, err := cp.isInSync(ctx, be)
if err != nil || !inSync {
log.Warn("backend banned - not in sync", "backend", be.Name)
cp.Ban(be)
return
}
RecordConsensusBackendInSync(be, inSync)
// if backend exhausted rate limit we'll skip it for now
if be.IsRateLimited() {
return
RecordConsensusBackendInSync(be, err == nil && inSync)
if err != nil {
log.Warn("error updating backend sync state", "name", be.Name, "err", err)
}
var peerCount uint64
if !be.skipPeerCountCheck {
peerCount, err = cp.getPeerCount(ctx, be)
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
return
log.Warn("error updating backend peer count", "name", be.Name, "err", err)
}
RecordConsensusBackendPeerCount(be, peerCount)
}
@ -253,10 +265,9 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest")
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
return
}
changed, updateDelay := cp.setBackendState(be, peerCount, latestBlockNumber, latestBlockHash)
changed, updateDelay := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash)
if changed {
RecordBackendLatestBlock(be, latestBlockNumber)
@ -264,6 +275,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
log.Debug("backend state updated",
"name", be.Name,
"peerCount", peerCount,
"inSync", inSync,
"latestBlockNumber", latestBlockNumber,
"latestBlockHash", latestBlockHash,
"updateDelay", updateDelay)
@ -280,11 +292,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
// find the highest block, in order to use it defining the highest non-lagging ancestor block
for _, be := range cp.backendGroup.Backends {
peerCount, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be)
peerCount, inSync, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be)
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
continue
}
if !inSync {
continue
}
if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue
}
@ -296,11 +311,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
// find the highest common ancestor block
for _, be := range cp.backendGroup.Backends {
peerCount, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be)
peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be)
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
continue
}
if !inSync {
continue
}
if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue
}
@ -349,14 +367,15 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
- with minimum peer count
- updated recently
- not lagging
- in sync
*/
peerCount, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be)
peerCount, inSync, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be)
notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now())
isBanned := time.Now().Before(bannedUntil)
notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount
lagging := latestBlockNumber < proposedBlock
if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers || lagging {
if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync {
filteredBackendsNames = append(filteredBackendsNames, be.Name)
continue
}
@ -392,7 +411,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
}
if broken {
// propagate event to other interested parts, such as cache invalidator
for _, l := range cp.listeners {
l()
}
log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash)
}
@ -498,23 +519,25 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo
return res, nil
}
func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) {
func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) {
bs := cp.backendState[be]
defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock()
peerCount = bs.peerCount
inSync = bs.inSync
blockNumber = bs.latestBlockNumber
blockHash = bs.latestBlockHash
lastUpdate = bs.lastUpdate
bannedUntil = bs.bannedUntil
bs.backendStateMux.Unlock()
return
}
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) {
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) {
bs := cp.backendState[be]
bs.backendStateMux.Lock()
changed = bs.latestBlockHash != blockHash
bs.peerCount = peerCount
bs.inSync = inSync
bs.latestBlockNumber = blockNumber
bs.latestBlockHash = blockHash
updateDelay = time.Since(bs.lastUpdate)

@ -94,6 +94,7 @@ func TestConsensus(t *testing.T) {
consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, be)
require.False(t, bg.Consensus.IsBanned(be))
require.Equal(t, 1, len(consensusGroup))
})
@ -132,6 +133,7 @@ func TestConsensus(t *testing.T) {
be := backend(bg, "node1")
require.NotNil(t, be)
require.NotContains(t, consensusGroup, be)
require.False(t, bg.Consensus.IsBanned(be))
require.Equal(t, 1, len(consensusGroup))
})
@ -232,6 +234,7 @@ func TestConsensus(t *testing.T) {
consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, be)
require.False(t, bg.Consensus.IsBanned(be))
require.Equal(t, 1, len(consensusGroup))
})
@ -286,6 +289,11 @@ func TestConsensus(t *testing.T) {
h2.ResetOverrides()
bg.Consensus.Unban()
listenerCalled := false
bg.Consensus.AddListener(func() {
listenerCalled = true
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
@ -331,7 +339,7 @@ func TestConsensus(t *testing.T) {
// should resolve to 0x1, since 0x2 is out of consensus at the moment
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
// later, when impl events, listen to broken consensus event
require.True(t, listenerCalled)
})
t.Run("broken consensus with depth 2", func(t *testing.T) {

@ -6,6 +6,7 @@ response_timeout_seconds = 1
[redis]
url = "$REDIS_URL"
namespace = "proxyd"
[cache]
enabled = true

@ -236,7 +236,7 @@ func Start(config *Config) (*Server, func(), error) {
log.Warn("redis is not configured, using in-memory cache")
cache = newMemoryCache()
} else {
cache = newRedisCache(redisClient)
cache = newRedisCache(redisClient, config.Redis.Namespace)
}
// Ideally, the BlocKSyncRPCURL should be the sequencer or a HA replica that's not far behind
ethClient, err := ethclient.Dial(blockSyncRPCURL)