From 5c4b805efc25bb88c4d6a5ae6013a53099315b25 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Thu, 11 May 2023 15:33:47 -0700 Subject: [PATCH] ban logic --- proxyd/proxyd/consensus_poller.go | 46 +++++++++++-------- .../integration_tests/consensus_test.go | 3 ++ 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index 3639a25..d79e67a 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -44,6 +44,7 @@ type backendState struct { latestBlockNumber hexutil.Uint64 latestBlockHash string peerCount uint64 + inSync bool lastUpdate time.Time @@ -219,11 +220,9 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { return } - // if backend it not in sync we'll check again after ban - inSync, err := cp.isInSync(ctx, be) - RecordConsensusBackendInSync(be, err == nil && inSync) - if err != nil || !inSync { - log.Warn("skipping backend - not in sync", "backend", be.Name) + // if backend exhausted rate limit we'll skip it for now + if be.IsRateLimited() { + log.Debug("skipping backend - rate limited", "backend", be.Name) return } @@ -234,17 +233,18 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { return } - // if backend exhausted rate limit we'll skip it for now - if be.IsRateLimited() { - return + // if backend it not in sync we'll check again after ban + inSync, err := cp.isInSync(ctx, be) + RecordConsensusBackendInSync(be, err == nil && inSync) + if err != nil { + log.Warn("error updating backend sync state", "name", be.Name, "err", err) } var peerCount uint64 if !be.skipPeerCountCheck { peerCount, err = cp.getPeerCount(ctx, be) if err != nil { - log.Warn("error updating backend", "name", be.Name, "err", err) - return + log.Warn("error updating backend peer count", "name", be.Name, "err", err) } RecordConsensusBackendPeerCount(be, peerCount) } @@ -252,10 +252,9 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest") if err != nil { log.Warn("error updating backend", "name", be.Name, "err", err) - return } - changed, updateDelay := cp.setBackendState(be, peerCount, latestBlockNumber, latestBlockHash) + changed, updateDelay := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash) if changed { RecordBackendLatestBlock(be, latestBlockNumber) @@ -263,6 +262,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { log.Debug("backend state updated", "name", be.Name, "peerCount", peerCount, + "inSync", inSync, "latestBlockNumber", latestBlockNumber, "latestBlockHash", latestBlockHash, "updateDelay", updateDelay) @@ -279,11 +279,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { // find the highest block, in order to use it defining the highest non-lagging ancestor block for _, be := range cp.backendGroup.Backends { - peerCount, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be) + peerCount, inSync, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be) if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { continue } + if !inSync { + continue + } if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { continue } @@ -295,11 +298,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { // find the highest common ancestor block for _, be := range cp.backendGroup.Backends { - peerCount, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be) + peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be) if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { continue } + if !inSync { + continue + } if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { continue } @@ -350,12 +356,12 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { - not lagging */ - peerCount, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) + peerCount, inSync, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) isBanned := time.Now().Before(bannedUntil) notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount lagging := latestBlockNumber < proposedBlock - if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers || lagging { + if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync { filteredBackendsNames = append(filteredBackendsNames, be.Name) continue } @@ -497,23 +503,25 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo return res, nil } -func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) { +func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) { bs := cp.backendState[be] + defer bs.backendStateMux.Unlock() bs.backendStateMux.Lock() peerCount = bs.peerCount + inSync = bs.inSync blockNumber = bs.latestBlockNumber blockHash = bs.latestBlockHash lastUpdate = bs.lastUpdate bannedUntil = bs.bannedUntil - bs.backendStateMux.Unlock() return } -func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) { +func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) { bs := cp.backendState[be] bs.backendStateMux.Lock() changed = bs.latestBlockHash != blockHash bs.peerCount = peerCount + bs.inSync = inSync bs.latestBlockNumber = blockNumber bs.latestBlockHash = blockHash updateDelay = time.Since(bs.lastUpdate) diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index 729fade..41f7e22 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -94,6 +94,7 @@ func TestConsensus(t *testing.T) { consensusGroup := bg.Consensus.GetConsensusGroup() require.NotContains(t, consensusGroup, be) + require.False(t, bg.Consensus.IsBanned(be)) require.Equal(t, 1, len(consensusGroup)) }) @@ -132,6 +133,7 @@ func TestConsensus(t *testing.T) { be := backend(bg, "node1") require.NotNil(t, be) require.NotContains(t, consensusGroup, be) + require.False(t, bg.Consensus.IsBanned(be)) require.Equal(t, 1, len(consensusGroup)) }) @@ -232,6 +234,7 @@ func TestConsensus(t *testing.T) { consensusGroup := bg.Consensus.GetConsensusGroup() require.NotContains(t, consensusGroup, be) + require.False(t, bg.Consensus.IsBanned(be)) require.Equal(t, 1, len(consensusGroup)) })