release lock on errors, validate if local is behind remote before acquiring the lock
This commit is contained in:
parent
fb7d24ce0c
commit
fa1912f594
@ -52,6 +52,18 @@ func NewInMemoryConsensusTracker() ConsensusTracker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ct *InMemoryConsensusTracker) Valid() bool {
|
||||||
|
return ct.GetLatestBlockNumber() > 0 &&
|
||||||
|
ct.GetSafeBlockNumber() > 0 &&
|
||||||
|
ct.GetFinalizedBlockNumber() > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *InMemoryConsensusTracker) Behind(other *InMemoryConsensusTracker) bool {
|
||||||
|
return ct.GetLatestBlockNumber() < other.GetLatestBlockNumber() ||
|
||||||
|
ct.GetSafeBlockNumber() < other.GetSafeBlockNumber() ||
|
||||||
|
ct.GetFinalizedBlockNumber() < other.GetFinalizedBlockNumber()
|
||||||
|
}
|
||||||
|
|
||||||
func (ct *InMemoryConsensusTracker) GetLatestBlockNumber() hexutil.Uint64 {
|
func (ct *InMemoryConsensusTracker) GetLatestBlockNumber() hexutil.Uint64 {
|
||||||
defer ct.mutex.Unlock()
|
defer ct.mutex.Unlock()
|
||||||
ct.mutex.Lock()
|
ct.mutex.Lock()
|
||||||
@ -179,8 +191,15 @@ func (ct *RedisConsensusTracker) stateHeartbeat() {
|
|||||||
|
|
||||||
val, err := ct.client.Get(ct.ctx, key).Result()
|
val, err := ct.client.Get(ct.ctx, key).Result()
|
||||||
if err != nil && err != redis.Nil {
|
if err != nil && err != redis.Nil {
|
||||||
log.Error("failed to read the mutex", "err", err)
|
log.Error("failed to read the lock", "err", err)
|
||||||
ct.leader = false
|
if ct.leader {
|
||||||
|
ok, err := ct.redlock.Unlock()
|
||||||
|
if err != nil || !ok {
|
||||||
|
log.Error("failed to release the lock after error", "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ct.leader = false
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if val != "" {
|
if val != "" {
|
||||||
@ -189,6 +208,11 @@ func (ct *RedisConsensusTracker) stateHeartbeat() {
|
|||||||
ok, err := ct.redlock.Extend()
|
ok, err := ct.redlock.Extend()
|
||||||
if err != nil || !ok {
|
if err != nil || !ok {
|
||||||
log.Error("failed to extend lock", "err", err, "mutex", ct.redlock.Name(), "val", ct.redlock.Value())
|
log.Error("failed to extend lock", "err", err, "mutex", ct.redlock.Name(), "val", ct.redlock.Value())
|
||||||
|
ok, err := ct.redlock.Unlock()
|
||||||
|
if err != nil || !ok {
|
||||||
|
log.Error("failed to release the lock after error", "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
ct.leader = false
|
ct.leader = false
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -224,12 +248,15 @@ func (ct *RedisConsensusTracker) stateHeartbeat() {
|
|||||||
log.Debug("updated state from remote", "state", val, "leader", leaderName)
|
log.Debug("updated state from remote", "state", val, "leader", leaderName)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if ct.local.GetLatestBlockNumber() == 0 ||
|
if !ct.local.Valid() {
|
||||||
ct.local.GetSafeBlockNumber() == 0 ||
|
log.Warn("local state is not valid or behind remote, skipping")
|
||||||
ct.local.GetFinalizedBlockNumber() == 0 {
|
|
||||||
log.Warn("lock not found, but local state is missing, skipping")
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if ct.remote.Valid() && ct.local.Behind(ct.remote) {
|
||||||
|
log.Warn("local state is behind remote, skipping")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
log.Info("lock not found, creating a new one")
|
log.Info("lock not found, creating a new one")
|
||||||
|
|
||||||
mutex := rs.NewMutex(key,
|
mutex := rs.NewMutex(key,
|
||||||
|
Loading…
Reference in New Issue
Block a user