From 2856a2c9e1cbd07ccf72c7a3fb211a077f37ee61 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Mon, 8 May 2023 16:29:05 -0700 Subject: [PATCH 1/5] proxyd: add limit to consensus block lag --- proxyd/proxyd/config.go | 1 + proxyd/proxyd/consensus_poller.go | 21 ++++++++-- proxyd/proxyd/example.config.toml | 2 + .../integration_tests/consensus_test.go | 38 +++++++++++++++++++ .../integration_tests/testdata/consensus.toml | 1 + proxyd/proxyd/proxyd.go | 3 ++ 6 files changed, 63 insertions(+), 3 deletions(-) diff --git a/proxyd/proxyd/config.go b/proxyd/proxyd/config.go index 218dbc4..d140fd3 100644 --- a/proxyd/proxyd/config.go +++ b/proxyd/proxyd/config.go @@ -105,6 +105,7 @@ type BackendGroupConfig struct { ConsensusBanPeriod TOMLDuration `toml:"consensus_ban_period"` ConsensusMaxUpdateThreshold TOMLDuration `toml:"consensus_max_update_threshold"` + ConsensusMaxBlockLag uint64 `toml:"consensus_max_block_lag"` ConsensusMinPeerCount int `toml:"consensus_min_peer_count"` } diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index 34fe708..6a47cd2 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -35,6 +35,7 @@ type ConsensusPoller struct { banPeriod time.Duration maxUpdateThreshold time.Duration + maxBlockLag uint64 } type backendState struct { @@ -160,6 +161,12 @@ func WithMaxUpdateThreshold(maxUpdateThreshold time.Duration) ConsensusOpt { } } +func WithMaxBlockLag(maxBlockLag uint64) ConsensusOpt { + return func(cp *ConsensusPoller) { + cp.maxBlockLag = maxBlockLag + } +} + func WithMinPeerCount(minPeerCount uint64) ConsensusOpt { return func(cp *ConsensusPoller) { cp.minPeerCount = minPeerCount @@ -181,6 +188,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller banPeriod: 5 * time.Minute, maxUpdateThreshold: 30 * time.Second, + maxBlockLag: 50, minPeerCount: 3, } @@ -270,7 +278,12 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { continue } - if lowestBlock == 0 || backendLatestBlockNumber < lowestBlock { + // find the highest common ancestor, ignoring backends that are too far lagging behind + // when the backend is too far ahead from current lowest, the current lowest is ignored + // when the backend if too far behind, the backend itself is ignored + if lowestBlock == 0 || + backendLatestBlockNumber > lowestBlock && uint64(backendLatestBlockNumber-lowestBlock) > cp.maxBlockLag || + backendLatestBlockNumber < lowestBlock && uint64(lowestBlock-backendLatestBlockNumber) < cp.maxBlockLag { lowestBlock = backendLatestBlockNumber lowestBlockHash = backendLatestBlockHash } @@ -308,12 +321,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { - not banned - with minimum peer count - updated recently + - not lagging */ - peerCount, _, _, lastUpdate, bannedUntil := cp.getBackendState(be) + peerCount, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) isBanned := time.Now().Before(bannedUntil) notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount - if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers { + lagging := (latestBlockNumber < proposedBlock) && uint64(proposedBlock-latestBlockNumber) >= cp.maxBlockLag + if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers || lagging { filteredBackendsNames = append(filteredBackendsNames, be.Name) continue } diff --git a/proxyd/proxyd/example.config.toml b/proxyd/proxyd/example.config.toml index cb41614..413cd61 100644 --- a/proxyd/proxyd/example.config.toml +++ b/proxyd/proxyd/example.config.toml @@ -93,6 +93,8 @@ backends = ["infura"] # consensus_ban_period = "1m" # Maximum delay for update the backend, default 30s # consensus_max_update_threshold = "20s" +# Maximum block lag, default 50 +# consensus_max_block_lag = 10 # Minimum peer count, default 3 # consensus_min_peer_count = 4 diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index 51d1f27..da162b0 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -97,6 +97,44 @@ func TestConsensus(t *testing.T) { require.Equal(t, 1, len(consensusGroup)) }) + t.Run("prevent using a backend lagging behind", func(t *testing.T) { + h1.ResetOverrides() + h2.ResetOverrides() + bg.Consensus.Unban() + + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildGetBlockResponse("0x1", "hash1"), + }) + + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildGetBlockResponse("0x100", "hash0x100"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "0x100", + Response: buildGetBlockResponse("0x100", "hash0x100"), + }) + + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + // since we ignored node1, the consensus should be at 0x100 + require.Equal(t, "0x100", bg.Consensus.GetConsensusBlockNumber().String()) + + consensusGroup := bg.Consensus.GetConsensusGroup() + + be := backend(bg, "node1") + require.NotNil(t, be) + require.NotContains(t, consensusGroup, be) + require.Equal(t, 1, len(consensusGroup)) + }) + t.Run("prevent using a backend not in sync", func(t *testing.T) { h1.ResetOverrides() h2.ResetOverrides() diff --git a/proxyd/proxyd/integration_tests/testdata/consensus.toml b/proxyd/proxyd/integration_tests/testdata/consensus.toml index d26b9dc..3f92a3d 100644 --- a/proxyd/proxyd/integration_tests/testdata/consensus.toml +++ b/proxyd/proxyd/integration_tests/testdata/consensus.toml @@ -18,6 +18,7 @@ consensus_aware = true consensus_handler = "noop" # allow more control over the consensus poller for tests consensus_ban_period = "1m" consensus_max_update_threshold = "2m" +consensus_max_block_lag = 50 consensus_min_peer_count = 4 [rpc_method_mappings] diff --git a/proxyd/proxyd/proxyd.go b/proxyd/proxyd/proxyd.go index cd02074..fa0371b 100644 --- a/proxyd/proxyd/proxyd.go +++ b/proxyd/proxyd/proxyd.go @@ -328,6 +328,9 @@ func Start(config *Config) (*Server, func(), error) { if bgcfg.ConsensusMaxUpdateThreshold > 0 { copts = append(copts, WithMaxUpdateThreshold(time.Duration(bgcfg.ConsensusMaxUpdateThreshold))) } + if bgcfg.ConsensusMaxBlockLag > 0 { + copts = append(copts, WithMaxBlockLag(bgcfg.ConsensusMaxBlockLag)) + } if bgcfg.ConsensusMinPeerCount > 0 { copts = append(copts, WithMinPeerCount(uint64(bgcfg.ConsensusMinPeerCount))) } From 3fd2abea8430a1ef8dcb568b1e69ba158fb7e522 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Mon, 8 May 2023 19:15:48 -0700 Subject: [PATCH 2/5] test on edge --- .../integration_tests/consensus_test.go | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index da162b0..02da59b 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -135,6 +135,41 @@ func TestConsensus(t *testing.T) { require.Equal(t, 1, len(consensusGroup)) }) + t.Run("prevent using a backend lagging behind 2", func(t *testing.T) { + h1.ResetOverrides() + h2.ResetOverrides() + bg.Consensus.Unban() + + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildGetBlockResponse("0x1", "hash1"), + }) + + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildGetBlockResponse("0x33", "hash0x100"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "0x100", + Response: buildGetBlockResponse("0x33", "hash0x100"), + }) + + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + // since we ignored node1, the consensus should be at 0x100 + require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + + consensusGroup := bg.Consensus.GetConsensusGroup() + + require.Equal(t, 2, len(consensusGroup)) + }) + t.Run("prevent using a backend not in sync", func(t *testing.T) { h1.ResetOverrides() h2.ResetOverrides() From 8d51c200794513de7a9b92e51a91da4c53f1cc7e Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Tue, 9 May 2023 14:11:06 -0700 Subject: [PATCH 3/5] use a global lag comparison, non order dependent --- proxyd/proxyd/consensus_poller.go | 33 ++++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index 6a47cd2..6308b22 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -263,11 +263,29 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { // UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { + var highestBlock hexutil.Uint64 var lowestBlock hexutil.Uint64 var lowestBlockHash string currentConsensusBlockNumber := cp.GetConsensusBlockNumber() + // find the highest block, in order to use it defining the highest non-lagging ancestor block + for _, be := range cp.backendGroup.Backends { + peerCount, backendLatestBlockNumber, _, lastUpdate := cp.getBackendState(be) + + if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { + continue + } + if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { + continue + } + + if backendLatestBlockNumber > highestBlock { + highestBlock = backendLatestBlockNumber + } + } + + // find the highest common ancestor block for _, be := range cp.backendGroup.Backends { peerCount, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be) @@ -278,12 +296,12 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { continue } - // find the highest common ancestor, ignoring backends that are too far lagging behind - // when the backend is too far ahead from current lowest, the current lowest is ignored - // when the backend if too far behind, the backend itself is ignored - if lowestBlock == 0 || - backendLatestBlockNumber > lowestBlock && uint64(backendLatestBlockNumber-lowestBlock) > cp.maxBlockLag || - backendLatestBlockNumber < lowestBlock && uint64(lowestBlock-backendLatestBlockNumber) < cp.maxBlockLag { + // check if backend is lagging behind the highest block + if backendLatestBlockNumber < highestBlock && uint64(highestBlock-backendLatestBlockNumber) > cp.maxBlockLag { + continue + } + + if lowestBlock == 0 || backendLatestBlockNumber < lowestBlock { lowestBlock = backendLatestBlockNumber lowestBlockHash = backendLatestBlockHash } @@ -323,11 +341,12 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { - updated recently - not lagging */ + peerCount, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) isBanned := time.Now().Before(bannedUntil) notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount - lagging := (latestBlockNumber < proposedBlock) && uint64(proposedBlock-latestBlockNumber) >= cp.maxBlockLag + lagging := latestBlockNumber < proposedBlock if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers || lagging { filteredBackendsNames = append(filteredBackendsNames, be.Name) continue From c7caf2af4dc300e8f0b104b6b40a36549f039d29 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Tue, 9 May 2023 14:17:29 -0700 Subject: [PATCH 4/5] rebase --- proxyd/proxyd/consensus_poller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index 6308b22..6290634 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -271,7 +271,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { // find the highest block, in order to use it defining the highest non-lagging ancestor block for _, be := range cp.backendGroup.Backends { - peerCount, backendLatestBlockNumber, _, lastUpdate := cp.getBackendState(be) + peerCount, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be) if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { continue From 53e4a369abe70c0fdc446fbe12d9b082f43435a2 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Tue, 9 May 2023 14:20:23 -0700 Subject: [PATCH 5/5] test edge cases --- .../integration_tests/consensus_test.go | 38 ++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index 02da59b..729fade 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -135,7 +135,7 @@ func TestConsensus(t *testing.T) { require.Equal(t, 1, len(consensusGroup)) }) - t.Run("prevent using a backend lagging behind 2", func(t *testing.T) { + t.Run("prevent using a backend lagging behind - at limit", func(t *testing.T) { h1.ResetOverrides() h2.ResetOverrides() bg.Consensus.Unban() @@ -146,6 +146,7 @@ func TestConsensus(t *testing.T) { Response: buildGetBlockResponse("0x1", "hash1"), }) + // 0x1 + 50 = 0x33 h2.AddOverride(&ms.MethodTemplate{ Method: "eth_getBlockByNumber", Block: "latest", @@ -170,6 +171,41 @@ func TestConsensus(t *testing.T) { require.Equal(t, 2, len(consensusGroup)) }) + t.Run("prevent using a backend lagging behind - one before limit", func(t *testing.T) { + h1.ResetOverrides() + h2.ResetOverrides() + bg.Consensus.Unban() + + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildGetBlockResponse("0x1", "hash1"), + }) + + // 0x1 + 49 = 0x32 + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildGetBlockResponse("0x32", "hash0x100"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "0x100", + Response: buildGetBlockResponse("0x32", "hash0x100"), + }) + + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + + consensusGroup := bg.Consensus.GetConsensusGroup() + + require.Equal(t, 2, len(consensusGroup)) + }) + t.Run("prevent using a backend not in sync", func(t *testing.T) { h1.ResetOverrides() h2.ResetOverrides()