Merge branch 'develop' into felipe/fix-cache-for-eth2-block-tags
This commit is contained in:
commit
96ebb5a1c4
@ -203,23 +203,22 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
|
|||||||
|
|
||||||
// UpdateBackend refreshes the consensus state of a single backend
|
// UpdateBackend refreshes the consensus state of a single backend
|
||||||
func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
|
func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
|
||||||
bs := cp.backendState[be]
|
if cp.IsBanned(be) {
|
||||||
if time.Now().Before(bs.bannedUntil) {
|
log.Debug("skipping backend banned", "backend", be.Name)
|
||||||
log.Debug("skipping backend banned", "backend", be.Name, "bannedUntil", bs.bannedUntil)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// if backend it not online or not in a health state we'll only resume checkin it after ban
|
// if backend it not online or not in a health state we'll only resume checkin it after ban
|
||||||
if !be.Online() || !be.IsHealthy() {
|
if !be.Online() || !be.IsHealthy() {
|
||||||
log.Warn("backend banned - not online or not healthy", "backend", be.Name, "bannedUntil", bs.bannedUntil)
|
log.Warn("backend banned - not online or not healthy", "backend", be.Name)
|
||||||
bs.bannedUntil = time.Now().Add(cp.banPeriod)
|
cp.Ban(be)
|
||||||
}
|
}
|
||||||
|
|
||||||
// if backend it not in sync we'll check again after ban
|
// if backend it not in sync we'll check again after ban
|
||||||
inSync, err := cp.isInSync(ctx, be)
|
inSync, err := cp.isInSync(ctx, be)
|
||||||
if err != nil || !inSync {
|
if err != nil || !inSync {
|
||||||
log.Warn("backend banned - not in sync", "backend", be.Name, "bannedUntil", bs.bannedUntil)
|
log.Warn("backend banned - not in sync", "backend", be.Name)
|
||||||
bs.bannedUntil = time.Now().Add(cp.banPeriod)
|
cp.Ban(be)
|
||||||
}
|
}
|
||||||
|
|
||||||
// if backend exhausted rate limit we'll skip it for now
|
// if backend exhausted rate limit we'll skip it for now
|
||||||
@ -246,7 +245,11 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
|
|||||||
|
|
||||||
if changed {
|
if changed {
|
||||||
RecordBackendLatestBlock(be, latestBlockNumber)
|
RecordBackendLatestBlock(be, latestBlockNumber)
|
||||||
log.Debug("backend state updated", "name", be.Name, "state", bs)
|
log.Debug("backend state updated",
|
||||||
|
"name", be.Name,
|
||||||
|
"peerCount", peerCount,
|
||||||
|
"latestBlockNumber", latestBlockNumber,
|
||||||
|
"latestBlockHash", latestBlockHash)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -258,7 +261,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
|
|||||||
currentConsensusBlockNumber := cp.GetConsensusBlockNumber()
|
currentConsensusBlockNumber := cp.GetConsensusBlockNumber()
|
||||||
|
|
||||||
for _, be := range cp.backendGroup.Backends {
|
for _, be := range cp.backendGroup.Backends {
|
||||||
peerCount, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate := cp.getBackendState(be)
|
peerCount, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be)
|
||||||
|
|
||||||
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
|
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
|
||||||
continue
|
continue
|
||||||
@ -306,10 +309,10 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
|
|||||||
- with minimum peer count
|
- with minimum peer count
|
||||||
- updated recently
|
- updated recently
|
||||||
*/
|
*/
|
||||||
bs := cp.backendState[be]
|
peerCount, _, _, lastUpdate, bannedUntil := cp.getBackendState(be)
|
||||||
notUpdated := bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now())
|
notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now())
|
||||||
isBanned := time.Now().Before(bs.bannedUntil)
|
isBanned := time.Now().Before(bannedUntil)
|
||||||
notEnoughPeers := !be.skipPeerCountCheck && bs.peerCount < cp.minPeerCount
|
notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount
|
||||||
if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers {
|
if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers {
|
||||||
filteredBackendsNames = append(filteredBackendsNames, be.Name)
|
filteredBackendsNames = append(filteredBackendsNames, be.Name)
|
||||||
continue
|
continue
|
||||||
@ -359,6 +362,22 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
|
|||||||
log.Debug("group state", "proposedBlock", proposedBlock, "consensusBackends", strings.Join(consensusBackendsNames, ", "), "filteredBackends", strings.Join(filteredBackendsNames, ", "))
|
log.Debug("group state", "proposedBlock", proposedBlock, "consensusBackends", strings.Join(consensusBackendsNames, ", "), "filteredBackends", strings.Join(filteredBackendsNames, ", "))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsBanned checks if a specific backend is banned
|
||||||
|
func (cp *ConsensusPoller) IsBanned(be *Backend) bool {
|
||||||
|
bs := cp.backendState[be]
|
||||||
|
defer bs.backendStateMux.Unlock()
|
||||||
|
bs.backendStateMux.Lock()
|
||||||
|
return time.Now().Before(bs.bannedUntil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ban bans a specific backend
|
||||||
|
func (cp *ConsensusPoller) Ban(be *Backend) {
|
||||||
|
bs := cp.backendState[be]
|
||||||
|
defer bs.backendStateMux.Unlock()
|
||||||
|
bs.backendStateMux.Lock()
|
||||||
|
bs.bannedUntil = time.Now().Add(cp.banPeriod)
|
||||||
|
}
|
||||||
|
|
||||||
// Unban remove any bans from the backends
|
// Unban remove any bans from the backends
|
||||||
func (cp *ConsensusPoller) Unban() {
|
func (cp *ConsensusPoller) Unban() {
|
||||||
for _, be := range cp.backendGroup.Backends {
|
for _, be := range cp.backendGroup.Backends {
|
||||||
@ -432,13 +451,14 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo
|
|||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time) {
|
func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) {
|
||||||
bs := cp.backendState[be]
|
bs := cp.backendState[be]
|
||||||
bs.backendStateMux.Lock()
|
bs.backendStateMux.Lock()
|
||||||
peerCount = bs.peerCount
|
peerCount = bs.peerCount
|
||||||
blockNumber = bs.latestBlockNumber
|
blockNumber = bs.latestBlockNumber
|
||||||
blockHash = bs.latestBlockHash
|
blockHash = bs.latestBlockHash
|
||||||
lastUpdate = bs.lastUpdate
|
lastUpdate = bs.lastUpdate
|
||||||
|
bannedUntil = bs.bannedUntil
|
||||||
bs.backendStateMux.Unlock()
|
bs.backendStateMux.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -576,7 +576,7 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx = context.WithValue(r.Context(), ContextKeyAuth, s.authenticatedPaths[authorization]) // nolint:staticcheck
|
ctx = context.WithValue(ctx, ContextKeyAuth, s.authenticatedPaths[authorization]) // nolint:staticcheck
|
||||||
}
|
}
|
||||||
|
|
||||||
return context.WithValue(
|
return context.WithValue(
|
||||||
|
Loading…
Reference in New Issue
Block a user