Merge branch 'develop' into felipe/moar-consensus-metrics

This commit is contained in:
mergify[bot] 2023-05-10 19:36:37 +00:00 committed by GitHub
commit fdfbd9ce49
6 changed files with 152 additions and 2 deletions

@ -105,6 +105,7 @@ type BackendGroupConfig struct {
ConsensusBanPeriod TOMLDuration `toml:"consensus_ban_period"` ConsensusBanPeriod TOMLDuration `toml:"consensus_ban_period"`
ConsensusMaxUpdateThreshold TOMLDuration `toml:"consensus_max_update_threshold"` ConsensusMaxUpdateThreshold TOMLDuration `toml:"consensus_max_update_threshold"`
ConsensusMaxBlockLag uint64 `toml:"consensus_max_block_lag"`
ConsensusMinPeerCount int `toml:"consensus_min_peer_count"` ConsensusMinPeerCount int `toml:"consensus_min_peer_count"`
} }

@ -35,6 +35,7 @@ type ConsensusPoller struct {
banPeriod time.Duration banPeriod time.Duration
maxUpdateThreshold time.Duration maxUpdateThreshold time.Duration
maxBlockLag uint64
} }
type backendState struct { type backendState struct {
@ -160,6 +161,12 @@ func WithMaxUpdateThreshold(maxUpdateThreshold time.Duration) ConsensusOpt {
} }
} }
func WithMaxBlockLag(maxBlockLag uint64) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.maxBlockLag = maxBlockLag
}
}
func WithMinPeerCount(minPeerCount uint64) ConsensusOpt { func WithMinPeerCount(minPeerCount uint64) ConsensusOpt {
return func(cp *ConsensusPoller) { return func(cp *ConsensusPoller) {
cp.minPeerCount = minPeerCount cp.minPeerCount = minPeerCount
@ -181,6 +188,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
banPeriod: 5 * time.Minute, banPeriod: 5 * time.Minute,
maxUpdateThreshold: 30 * time.Second, maxUpdateThreshold: 30 * time.Second,
maxBlockLag: 50,
minPeerCount: 3, minPeerCount: 3,
} }
@ -264,11 +272,29 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
// UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends // UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends
func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
var highestBlock hexutil.Uint64
var lowestBlock hexutil.Uint64 var lowestBlock hexutil.Uint64
var lowestBlockHash string var lowestBlockHash string
currentConsensusBlockNumber := cp.GetConsensusBlockNumber() currentConsensusBlockNumber := cp.GetConsensusBlockNumber()
// find the highest block, in order to use it defining the highest non-lagging ancestor block
for _, be := range cp.backendGroup.Backends {
peerCount, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be)
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
continue
}
if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue
}
if backendLatestBlockNumber > highestBlock {
highestBlock = backendLatestBlockNumber
}
}
// find the highest common ancestor block
for _, be := range cp.backendGroup.Backends { for _, be := range cp.backendGroup.Backends {
peerCount, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be) peerCount, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be)
@ -279,6 +305,11 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
continue continue
} }
// check if backend is lagging behind the highest block
if backendLatestBlockNumber < highestBlock && uint64(highestBlock-backendLatestBlockNumber) > cp.maxBlockLag {
continue
}
if lowestBlock == 0 || backendLatestBlockNumber < lowestBlock { if lowestBlock == 0 || backendLatestBlockNumber < lowestBlock {
lowestBlock = backendLatestBlockNumber lowestBlock = backendLatestBlockNumber
lowestBlockHash = backendLatestBlockHash lowestBlockHash = backendLatestBlockHash
@ -317,12 +348,15 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
- not banned - not banned
- with minimum peer count - with minimum peer count
- updated recently - updated recently
- not lagging
*/ */
peerCount, _, _, lastUpdate, bannedUntil := cp.getBackendState(be)
peerCount, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be)
notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now())
isBanned := time.Now().Before(bannedUntil) isBanned := time.Now().Before(bannedUntil)
notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount
if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers { lagging := latestBlockNumber < proposedBlock
if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers || lagging {
filteredBackendsNames = append(filteredBackendsNames, be.Name) filteredBackendsNames = append(filteredBackendsNames, be.Name)
continue continue
} }

@ -93,6 +93,8 @@ backends = ["infura"]
# consensus_ban_period = "1m" # consensus_ban_period = "1m"
# Maximum delay for update the backend, default 30s # Maximum delay for update the backend, default 30s
# consensus_max_update_threshold = "20s" # consensus_max_update_threshold = "20s"
# Maximum block lag, default 50
# consensus_max_block_lag = 10
# Minimum peer count, default 3 # Minimum peer count, default 3
# consensus_min_peer_count = 4 # consensus_min_peer_count = 4

@ -97,6 +97,115 @@ func TestConsensus(t *testing.T) {
require.Equal(t, 1, len(consensusGroup)) require.Equal(t, 1, len(consensusGroup))
}) })
t.Run("prevent using a backend lagging behind", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x1", "hash1"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x100", "hash0x100"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x100",
Response: buildGetBlockResponse("0x100", "hash0x100"),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// since we ignored node1, the consensus should be at 0x100
require.Equal(t, "0x100", bg.Consensus.GetConsensusBlockNumber().String())
consensusGroup := bg.Consensus.GetConsensusGroup()
be := backend(bg, "node1")
require.NotNil(t, be)
require.NotContains(t, consensusGroup, be)
require.Equal(t, 1, len(consensusGroup))
})
t.Run("prevent using a backend lagging behind - at limit", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x1", "hash1"),
})
// 0x1 + 50 = 0x33
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x33", "hash0x100"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x100",
Response: buildGetBlockResponse("0x33", "hash0x100"),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// since we ignored node1, the consensus should be at 0x100
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
consensusGroup := bg.Consensus.GetConsensusGroup()
require.Equal(t, 2, len(consensusGroup))
})
t.Run("prevent using a backend lagging behind - one before limit", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x1", "hash1"),
})
// 0x1 + 49 = 0x32
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x32", "hash0x100"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x100",
Response: buildGetBlockResponse("0x32", "hash0x100"),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
consensusGroup := bg.Consensus.GetConsensusGroup()
require.Equal(t, 2, len(consensusGroup))
})
t.Run("prevent using a backend not in sync", func(t *testing.T) { t.Run("prevent using a backend not in sync", func(t *testing.T) {
h1.ResetOverrides() h1.ResetOverrides()
h2.ResetOverrides() h2.ResetOverrides()

@ -18,6 +18,7 @@ consensus_aware = true
consensus_handler = "noop" # allow more control over the consensus poller for tests consensus_handler = "noop" # allow more control over the consensus poller for tests
consensus_ban_period = "1m" consensus_ban_period = "1m"
consensus_max_update_threshold = "2m" consensus_max_update_threshold = "2m"
consensus_max_block_lag = 50
consensus_min_peer_count = 4 consensus_min_peer_count = 4
[rpc_method_mappings] [rpc_method_mappings]

@ -328,6 +328,9 @@ func Start(config *Config) (*Server, func(), error) {
if bgcfg.ConsensusMaxUpdateThreshold > 0 { if bgcfg.ConsensusMaxUpdateThreshold > 0 {
copts = append(copts, WithMaxUpdateThreshold(time.Duration(bgcfg.ConsensusMaxUpdateThreshold))) copts = append(copts, WithMaxUpdateThreshold(time.Duration(bgcfg.ConsensusMaxUpdateThreshold)))
} }
if bgcfg.ConsensusMaxBlockLag > 0 {
copts = append(copts, WithMaxBlockLag(bgcfg.ConsensusMaxBlockLag))
}
if bgcfg.ConsensusMinPeerCount > 0 { if bgcfg.ConsensusMinPeerCount > 0 {
copts = append(copts, WithMinPeerCount(uint64(bgcfg.ConsensusMinPeerCount))) copts = append(copts, WithMinPeerCount(uint64(bgcfg.ConsensusMinPeerCount)))
} }