nits
This commit is contained in:
parent
fa1912f594
commit
3911deccbd
@ -33,10 +33,13 @@ type ConsensusTrackerState struct {
|
|||||||
Finalized hexutil.Uint64 `json:"finalized"`
|
Finalized hexutil.Uint64 `json:"finalized"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ConsensusTrackerState) update(o *ConsensusTrackerState) {
|
func (ct *InMemoryConsensusTracker) update(o *ConsensusTrackerState) {
|
||||||
s.Latest = o.Latest
|
ct.mutex.Lock()
|
||||||
s.Safe = o.Safe
|
defer ct.mutex.Unlock()
|
||||||
s.Finalized = o.Finalized
|
|
||||||
|
ct.state.Latest = o.Latest
|
||||||
|
ct.state.Safe = o.Safe
|
||||||
|
ct.state.Finalized = o.Finalized
|
||||||
}
|
}
|
||||||
|
|
||||||
// InMemoryConsensusTracker store and retrieve in memory, async-safe
|
// InMemoryConsensusTracker store and retrieve in memory, async-safe
|
||||||
@ -169,14 +172,12 @@ func NewRedisConsensusTracker(ctx context.Context,
|
|||||||
func (ct *RedisConsensusTracker) Init() {
|
func (ct *RedisConsensusTracker) Init() {
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
// follow same context as backend group poller
|
|
||||||
ctx := ct.backendGroup.Consensus.ctx
|
|
||||||
timer := time.NewTimer(ct.heartbeatInterval)
|
timer := time.NewTimer(ct.heartbeatInterval)
|
||||||
ct.stateHeartbeat()
|
ct.stateHeartbeat()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
case <-ctx.Done():
|
case <-ct.ctx.Done():
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -242,9 +243,8 @@ func (ct *RedisConsensusTracker) stateHeartbeat() {
|
|||||||
log.Error("failed to unmarshal the remote state", "err", err)
|
log.Error("failed to unmarshal the remote state", "err", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ct.remote.mutex.Lock()
|
|
||||||
defer ct.remote.mutex.Unlock()
|
ct.remote.update(state)
|
||||||
ct.remote.state.update(state)
|
|
||||||
log.Debug("updated state from remote", "state", val, "leader", leaderName)
|
log.Debug("updated state from remote", "state", val, "leader", leaderName)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -320,15 +320,12 @@ func (ct *RedisConsensusTracker) postPayload(mutexVal string) {
|
|||||||
ct.client.Set(ct.ctx, ct.key(fmt.Sprintf("state:%s", mutexVal)), jsonState, ct.lockPeriod)
|
ct.client.Set(ct.ctx, ct.key(fmt.Sprintf("state:%s", mutexVal)), jsonState, ct.lockPeriod)
|
||||||
|
|
||||||
leader, _ := os.LookupEnv("HOSTNAME")
|
leader, _ := os.LookupEnv("HOSTNAME")
|
||||||
if leader == "" {
|
|
||||||
|
|
||||||
}
|
|
||||||
ct.client.Set(ct.ctx, ct.key(fmt.Sprintf("leader:%s", mutexVal)), leader, ct.lockPeriod)
|
ct.client.Set(ct.ctx, ct.key(fmt.Sprintf("leader:%s", mutexVal)), leader, ct.lockPeriod)
|
||||||
|
|
||||||
log.Debug("posted state", "state", string(jsonState), "leader", leader)
|
log.Debug("posted state", "state", string(jsonState), "leader", leader)
|
||||||
|
|
||||||
ct.leaderName = leader
|
ct.leaderName = leader
|
||||||
ct.remote.state.update(ct.local.state)
|
ct.remote.update(ct.local.state)
|
||||||
|
|
||||||
RecordGroupConsensusHALatestBlock(ct.backendGroup, leader, ct.local.state.Latest)
|
RecordGroupConsensusHALatestBlock(ct.backendGroup, leader, ct.local.state.Latest)
|
||||||
RecordGroupConsensusHASafeBlock(ct.backendGroup, leader, ct.local.state.Safe)
|
RecordGroupConsensusHASafeBlock(ct.backendGroup, leader, ct.local.state.Safe)
|
||||||
|
Loading…
Reference in New Issue
Block a user