infra/proxyd/consensus_tracker.go
2024-06-11 14:17:04 -05:00

357 lines
10 KiB
Go

package proxyd
import (
"context"
"encoding/json"
"fmt"
"os"
"sync"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v9"
"github.com/redis/go-redis/v9"
)
// ConsensusTracker abstracts how we store and retrieve the current consensus
// allowing it to be stored locally in-memory or in a shared Redis cluster
type ConsensusTracker interface {
GetLatestBlockNumber() hexutil.Uint64
SetLatestBlockNumber(blockNumber hexutil.Uint64)
GetSafeBlockNumber() hexutil.Uint64
SetSafeBlockNumber(blockNumber hexutil.Uint64)
GetFinalizedBlockNumber() hexutil.Uint64
SetFinalizedBlockNumber(blockNumber hexutil.Uint64)
}
// DTO to hold the current consensus state
type ConsensusTrackerState struct {
Latest hexutil.Uint64 `json:"latest"`
Safe hexutil.Uint64 `json:"safe"`
Finalized hexutil.Uint64 `json:"finalized"`
}
func (ct *InMemoryConsensusTracker) update(o *ConsensusTrackerState) {
ct.mutex.Lock()
defer ct.mutex.Unlock()
ct.state.Latest = o.Latest
ct.state.Safe = o.Safe
ct.state.Finalized = o.Finalized
}
// InMemoryConsensusTracker store and retrieve in memory, async-safe
type InMemoryConsensusTracker struct {
mutex sync.Mutex
state *ConsensusTrackerState
}
func NewInMemoryConsensusTracker() ConsensusTracker {
return &InMemoryConsensusTracker{
mutex: sync.Mutex{},
state: &ConsensusTrackerState{},
}
}
func (ct *InMemoryConsensusTracker) Valid() bool {
return ct.GetLatestBlockNumber() > 0 &&
ct.GetSafeBlockNumber() > 0 &&
ct.GetFinalizedBlockNumber() > 0
}
func (ct *InMemoryConsensusTracker) Behind(other *InMemoryConsensusTracker) bool {
return ct.GetLatestBlockNumber() < other.GetLatestBlockNumber() ||
ct.GetSafeBlockNumber() < other.GetSafeBlockNumber() ||
ct.GetFinalizedBlockNumber() < other.GetFinalizedBlockNumber()
}
func (ct *InMemoryConsensusTracker) GetLatestBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()
return ct.state.Latest
}
func (ct *InMemoryConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()
ct.state.Latest = blockNumber
}
func (ct *InMemoryConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()
return ct.state.Safe
}
func (ct *InMemoryConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()
ct.state.Safe = blockNumber
}
func (ct *InMemoryConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()
return ct.state.Finalized
}
func (ct *InMemoryConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()
ct.state.Finalized = blockNumber
}
// RedisConsensusTracker store and retrieve in a shared Redis cluster, with leader election
type RedisConsensusTracker struct {
ctx context.Context
client *redis.Client
namespace string
backendGroup *BackendGroup
redlock *redsync.Mutex
lockPeriod time.Duration
heartbeatInterval time.Duration
leader bool
leaderName string
// holds the state collected by local pollers
local *InMemoryConsensusTracker
// holds a copy of the remote shared state
// when leader, updates the remote with the local state
remote *InMemoryConsensusTracker
}
type RedisConsensusTrackerOpt func(cp *RedisConsensusTracker)
func WithLockPeriod(lockPeriod time.Duration) RedisConsensusTrackerOpt {
return func(ct *RedisConsensusTracker) {
ct.lockPeriod = lockPeriod
}
}
func WithHeartbeatInterval(heartbeatInterval time.Duration) RedisConsensusTrackerOpt {
return func(ct *RedisConsensusTracker) {
ct.heartbeatInterval = heartbeatInterval
}
}
func NewRedisConsensusTracker(ctx context.Context,
redisClient *redis.Client,
bg *BackendGroup,
namespace string,
opts ...RedisConsensusTrackerOpt) ConsensusTracker {
tracker := &RedisConsensusTracker{
ctx: ctx,
client: redisClient,
backendGroup: bg,
namespace: namespace,
lockPeriod: 30 * time.Second,
heartbeatInterval: 2 * time.Second,
local: NewInMemoryConsensusTracker().(*InMemoryConsensusTracker),
remote: NewInMemoryConsensusTracker().(*InMemoryConsensusTracker),
}
for _, opt := range opts {
opt(tracker)
}
return tracker
}
func (ct *RedisConsensusTracker) Init() {
go func() {
for {
timer := time.NewTimer(ct.heartbeatInterval)
ct.stateHeartbeat()
select {
case <-timer.C:
continue
case <-ct.ctx.Done():
timer.Stop()
return
}
}
}()
}
func (ct *RedisConsensusTracker) stateHeartbeat() {
pool := goredis.NewPool(ct.client)
rs := redsync.New(pool)
key := ct.key("mutex")
val, err := ct.client.Get(ct.ctx, key).Result()
if err != nil && err != redis.Nil {
log.Error("failed to read the lock", "err", err)
RecordGroupConsensusError(ct.backendGroup, "read_lock", err)
if ct.leader {
ok, err := ct.redlock.Unlock()
if err != nil || !ok {
log.Error("failed to release the lock after error", "err", err)
RecordGroupConsensusError(ct.backendGroup, "leader_release_lock", err)
return
}
ct.leader = false
}
return
}
if val != "" {
if ct.leader {
log.Debug("extending lock")
ok, err := ct.redlock.Extend()
if err != nil || !ok {
log.Error("failed to extend lock", "err", err, "mutex", ct.redlock.Name(), "val", ct.redlock.Value())
RecordGroupConsensusError(ct.backendGroup, "leader_extend_lock", err)
ok, err := ct.redlock.Unlock()
if err != nil || !ok {
log.Error("failed to release the lock after error", "err", err)
RecordGroupConsensusError(ct.backendGroup, "leader_release_lock", err)
return
}
ct.leader = false
return
}
ct.postPayload(val)
} else {
// retrieve current leader
leaderName, err := ct.client.Get(ct.ctx, ct.key(fmt.Sprintf("leader:%s", val))).Result()
if err != nil && err != redis.Nil {
log.Error("failed to read the remote leader", "err", err)
RecordGroupConsensusError(ct.backendGroup, "read_leader", err)
return
}
ct.leaderName = leaderName
log.Debug("following", "val", val, "leader", leaderName)
// retrieve payload
val, err := ct.client.Get(ct.ctx, ct.key(fmt.Sprintf("state:%s", val))).Result()
if err != nil && err != redis.Nil {
log.Error("failed to read the remote state", "err", err)
RecordGroupConsensusError(ct.backendGroup, "read_state", err)
return
}
if val == "" {
log.Error("remote state is missing (recent leader election maybe?)")
RecordGroupConsensusError(ct.backendGroup, "read_state_missing", err)
return
}
state := &ConsensusTrackerState{}
err = json.Unmarshal([]byte(val), state)
if err != nil {
log.Error("failed to unmarshal the remote state", "err", err)
RecordGroupConsensusError(ct.backendGroup, "read_unmarshal_state", err)
return
}
ct.remote.update(state)
log.Debug("updated state from remote", "state", val, "leader", leaderName)
RecordGroupConsensusHALatestBlock(ct.backendGroup, leaderName, ct.remote.state.Latest)
RecordGroupConsensusHASafeBlock(ct.backendGroup, leaderName, ct.remote.state.Safe)
RecordGroupConsensusHAFinalizedBlock(ct.backendGroup, leaderName, ct.remote.state.Finalized)
}
} else {
if !ct.local.Valid() {
log.Warn("local state is not valid or behind remote, skipping")
return
}
if ct.remote.Valid() && ct.local.Behind(ct.remote) {
log.Warn("local state is behind remote, skipping")
return
}
log.Info("lock not found, creating a new one")
mutex := rs.NewMutex(key,
redsync.WithExpiry(ct.lockPeriod),
redsync.WithFailFast(true),
redsync.WithTries(1))
// nosemgrep: missing-unlock-before-return
// this lock is hold indefinitely, and it is extended until the leader dies
if err := mutex.Lock(); err != nil {
log.Debug("failed to obtain lock", "err", err)
ct.leader = false
return
}
log.Info("lock acquired", "mutex", mutex.Name(), "val", mutex.Value())
ct.redlock = mutex
ct.leader = true
ct.postPayload(mutex.Value())
}
}
func (ct *RedisConsensusTracker) key(tag string) string {
return fmt.Sprintf("consensus:%s:%s", ct.namespace, tag)
}
func (ct *RedisConsensusTracker) GetLatestBlockNumber() hexutil.Uint64 {
return ct.remote.GetLatestBlockNumber()
}
func (ct *RedisConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64) {
ct.local.SetLatestBlockNumber(blockNumber)
}
func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
return ct.remote.GetSafeBlockNumber()
}
func (ct *RedisConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) {
ct.local.SetSafeBlockNumber(blockNumber)
}
func (ct *RedisConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
return ct.remote.GetFinalizedBlockNumber()
}
func (ct *RedisConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
ct.local.SetFinalizedBlockNumber(blockNumber)
}
func (ct *RedisConsensusTracker) postPayload(mutexVal string) {
jsonState, err := json.Marshal(ct.local.state)
if err != nil {
log.Error("failed to marshal local", "err", err)
RecordGroupConsensusError(ct.backendGroup, "leader_marshal_local_state", err)
ct.leader = false
return
}
err = ct.client.Set(ct.ctx, ct.key(fmt.Sprintf("state:%s", mutexVal)), jsonState, ct.lockPeriod).Err()
if err != nil {
log.Error("failed to post the state", "err", err)
RecordGroupConsensusError(ct.backendGroup, "leader_post_state", err)
ct.leader = false
return
}
leader, _ := os.LookupEnv("HOSTNAME")
err = ct.client.Set(ct.ctx, ct.key(fmt.Sprintf("leader:%s", mutexVal)), leader, ct.lockPeriod).Err()
if err != nil {
log.Error("failed to post the leader", "err", err)
RecordGroupConsensusError(ct.backendGroup, "leader_post_leader", err)
ct.leader = false
return
}
log.Debug("posted state", "state", string(jsonState), "leader", leader)
ct.leaderName = leader
ct.remote.update(ct.local.state)
RecordGroupConsensusHALatestBlock(ct.backendGroup, leader, ct.remote.state.Latest)
RecordGroupConsensusHASafeBlock(ct.backendGroup, leader, ct.remote.state.Safe)
RecordGroupConsensusHAFinalizedBlock(ct.backendGroup, leader, ct.remote.state.Finalized)
}