Merge branch 'develop' into felipe/event-dispatcher

This commit is contained in:
mergify[bot] 2023-05-18 04:23:04 +00:00 committed by GitHub
commit 3fe6123648
6 changed files with 50 additions and 29 deletions

@ -2,6 +2,7 @@ package proxyd
import ( import (
"context" "context"
"strings"
"time" "time"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
@ -44,15 +45,23 @@ func (c *cache) Put(ctx context.Context, key string, value string) error {
type redisCache struct { type redisCache struct {
rdb *redis.Client rdb *redis.Client
prefix string
} }
func newRedisCache(rdb *redis.Client) *redisCache { func newRedisCache(rdb *redis.Client, prefix string) *redisCache {
return &redisCache{rdb} return &redisCache{rdb, prefix}
}
func (c *redisCache) namespaced(key string) string {
if c.prefix == "" {
return key
}
return strings.Join([]string{c.prefix, key}, ":")
} }
func (c *redisCache) Get(ctx context.Context, key string) (string, error) { func (c *redisCache) Get(ctx context.Context, key string) (string, error) {
start := time.Now() start := time.Now()
val, err := c.rdb.Get(ctx, key).Result() val, err := c.rdb.Get(ctx, c.namespaced(key)).Result()
redisCacheDurationSumm.WithLabelValues("GET").Observe(float64(time.Since(start).Milliseconds())) redisCacheDurationSumm.WithLabelValues("GET").Observe(float64(time.Since(start).Milliseconds()))
if err == redis.Nil { if err == redis.Nil {
@ -66,7 +75,7 @@ func (c *redisCache) Get(ctx context.Context, key string) (string, error) {
func (c *redisCache) Put(ctx context.Context, key string, value string) error { func (c *redisCache) Put(ctx context.Context, key string, value string) error {
start := time.Now() start := time.Now()
err := c.rdb.SetEX(ctx, key, value, redisTTL).Err() err := c.rdb.SetEX(ctx, c.namespaced(key), value, redisTTL).Err()
redisCacheDurationSumm.WithLabelValues("SETEX").Observe(float64(time.Since(start).Milliseconds())) redisCacheDurationSumm.WithLabelValues("SETEX").Observe(float64(time.Since(start).Milliseconds()))
if err != nil { if err != nil {

@ -33,6 +33,7 @@ type CacheConfig struct {
type RedisConfig struct { type RedisConfig struct {
URL string `toml:"url"` URL string `toml:"url"`
Namespace string `toml:"namespace"`
} }
type MetricsConfig struct { type MetricsConfig struct {

@ -47,6 +47,7 @@ type backendState struct {
latestBlockNumber hexutil.Uint64 latestBlockNumber hexutil.Uint64
latestBlockHash string latestBlockHash string
peerCount uint64 peerCount uint64
inSync bool
lastUpdate time.Time lastUpdate time.Time
@ -228,7 +229,13 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
RecordConsensusBackendBanned(be, banned) RecordConsensusBackendBanned(be, banned)
if banned { if banned {
log.Debug("skipping backend banned", "backend", be.Name) log.Debug("skipping backend - banned", "backend", be.Name)
return
}
// if backend exhausted rate limit we'll skip it for now
if be.IsRateLimited() {
log.Debug("skipping backend - rate limited", "backend", be.Name)
return return
} }
@ -241,24 +248,16 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
// if backend it not in sync we'll check again after ban // if backend it not in sync we'll check again after ban
inSync, err := cp.isInSync(ctx, be) inSync, err := cp.isInSync(ctx, be)
if err != nil || !inSync { RecordConsensusBackendInSync(be, err == nil && inSync)
log.Warn("backend banned - not in sync", "backend", be.Name) if err != nil {
cp.Ban(be) log.Warn("error updating backend sync state", "name", be.Name, "err", err)
return
}
RecordConsensusBackendInSync(be, inSync)
// if backend exhausted rate limit we'll skip it for now
if be.IsRateLimited() {
return
} }
var peerCount uint64 var peerCount uint64
if !be.skipPeerCountCheck { if !be.skipPeerCountCheck {
peerCount, err = cp.getPeerCount(ctx, be) peerCount, err = cp.getPeerCount(ctx, be)
if err != nil { if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err) log.Warn("error updating backend peer count", "name", be.Name, "err", err)
return
} }
RecordConsensusBackendPeerCount(be, peerCount) RecordConsensusBackendPeerCount(be, peerCount)
} }
@ -266,10 +265,9 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest") latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest")
if err != nil { if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err) log.Warn("error updating backend", "name", be.Name, "err", err)
return
} }
changed, updateDelay := cp.setBackendState(be, peerCount, latestBlockNumber, latestBlockHash) changed, updateDelay := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash)
if changed { if changed {
RecordBackendLatestBlock(be, latestBlockNumber) RecordBackendLatestBlock(be, latestBlockNumber)
@ -277,6 +275,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
log.Debug("backend state updated", log.Debug("backend state updated",
"name", be.Name, "name", be.Name,
"peerCount", peerCount, "peerCount", peerCount,
"inSync", inSync,
"latestBlockNumber", latestBlockNumber, "latestBlockNumber", latestBlockNumber,
"latestBlockHash", latestBlockHash, "latestBlockHash", latestBlockHash,
"updateDelay", updateDelay) "updateDelay", updateDelay)
@ -293,11 +292,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
// find the highest block, in order to use it defining the highest non-lagging ancestor block // find the highest block, in order to use it defining the highest non-lagging ancestor block
for _, be := range cp.backendGroup.Backends { for _, be := range cp.backendGroup.Backends {
peerCount, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be) peerCount, inSync, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be)
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
continue continue
} }
if !inSync {
continue
}
if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue continue
} }
@ -309,11 +311,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
// find the highest common ancestor block // find the highest common ancestor block
for _, be := range cp.backendGroup.Backends { for _, be := range cp.backendGroup.Backends {
peerCount, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be) peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be)
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
continue continue
} }
if !inSync {
continue
}
if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue continue
} }
@ -365,12 +370,12 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
- in sync - in sync
*/ */
peerCount, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) peerCount, inSync, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be)
notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now())
isBanned := time.Now().Before(bannedUntil) isBanned := time.Now().Before(bannedUntil)
notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount
lagging := latestBlockNumber < proposedBlock lagging := latestBlockNumber < proposedBlock
if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers || lagging { if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync {
filteredBackendsNames = append(filteredBackendsNames, be.Name) filteredBackendsNames = append(filteredBackendsNames, be.Name)
continue continue
} }
@ -514,23 +519,25 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo
return res, nil return res, nil
} }
func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) { func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) {
bs := cp.backendState[be] bs := cp.backendState[be]
defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
peerCount = bs.peerCount peerCount = bs.peerCount
inSync = bs.inSync
blockNumber = bs.latestBlockNumber blockNumber = bs.latestBlockNumber
blockHash = bs.latestBlockHash blockHash = bs.latestBlockHash
lastUpdate = bs.lastUpdate lastUpdate = bs.lastUpdate
bannedUntil = bs.bannedUntil bannedUntil = bs.bannedUntil
bs.backendStateMux.Unlock()
return return
} }
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) { func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) {
bs := cp.backendState[be] bs := cp.backendState[be]
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
changed = bs.latestBlockHash != blockHash changed = bs.latestBlockHash != blockHash
bs.peerCount = peerCount bs.peerCount = peerCount
bs.inSync = inSync
bs.latestBlockNumber = blockNumber bs.latestBlockNumber = blockNumber
bs.latestBlockHash = blockHash bs.latestBlockHash = blockHash
updateDelay = time.Since(bs.lastUpdate) updateDelay = time.Since(bs.lastUpdate)

@ -94,6 +94,7 @@ func TestConsensus(t *testing.T) {
consensusGroup := bg.Consensus.GetConsensusGroup() consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, be) require.NotContains(t, consensusGroup, be)
require.False(t, bg.Consensus.IsBanned(be))
require.Equal(t, 1, len(consensusGroup)) require.Equal(t, 1, len(consensusGroup))
}) })
@ -132,6 +133,7 @@ func TestConsensus(t *testing.T) {
be := backend(bg, "node1") be := backend(bg, "node1")
require.NotNil(t, be) require.NotNil(t, be)
require.NotContains(t, consensusGroup, be) require.NotContains(t, consensusGroup, be)
require.False(t, bg.Consensus.IsBanned(be))
require.Equal(t, 1, len(consensusGroup)) require.Equal(t, 1, len(consensusGroup))
}) })
@ -232,6 +234,7 @@ func TestConsensus(t *testing.T) {
consensusGroup := bg.Consensus.GetConsensusGroup() consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, be) require.NotContains(t, consensusGroup, be)
require.False(t, bg.Consensus.IsBanned(be))
require.Equal(t, 1, len(consensusGroup)) require.Equal(t, 1, len(consensusGroup))
}) })

@ -6,6 +6,7 @@ response_timeout_seconds = 1
[redis] [redis]
url = "$REDIS_URL" url = "$REDIS_URL"
namespace = "proxyd"
[cache] [cache]
enabled = true enabled = true

@ -236,7 +236,7 @@ func Start(config *Config) (*Server, func(), error) {
log.Warn("redis is not configured, using in-memory cache") log.Warn("redis is not configured, using in-memory cache")
cache = newMemoryCache() cache = newMemoryCache()
} else { } else {
cache = newRedisCache(redisClient) cache = newRedisCache(redisClient, config.Redis.Namespace)
} }
// Ideally, the BlocKSyncRPCURL should be the sequencer or a HA replica that's not far behind // Ideally, the BlocKSyncRPCURL should be the sequencer or a HA replica that's not far behind
ethClient, err := ethclient.Dial(blockSyncRPCURL) ethClient, err := ethclient.Dial(blockSyncRPCURL)