diff --git a/proxyd/proxyd/config.go b/proxyd/proxyd/config.go index 218dbc4..d140fd3 100644 --- a/proxyd/proxyd/config.go +++ b/proxyd/proxyd/config.go @@ -105,6 +105,7 @@ type BackendGroupConfig struct { ConsensusBanPeriod TOMLDuration `toml:"consensus_ban_period"` ConsensusMaxUpdateThreshold TOMLDuration `toml:"consensus_max_update_threshold"` + ConsensusMaxBlockLag uint64 `toml:"consensus_max_block_lag"` ConsensusMinPeerCount int `toml:"consensus_min_peer_count"` } diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index c486d1c..e26d6b9 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -35,6 +35,7 @@ type ConsensusPoller struct { banPeriod time.Duration maxUpdateThreshold time.Duration + maxBlockLag uint64 } type backendState struct { @@ -160,6 +161,12 @@ func WithMaxUpdateThreshold(maxUpdateThreshold time.Duration) ConsensusOpt { } } +func WithMaxBlockLag(maxBlockLag uint64) ConsensusOpt { + return func(cp *ConsensusPoller) { + cp.maxBlockLag = maxBlockLag + } +} + func WithMinPeerCount(minPeerCount uint64) ConsensusOpt { return func(cp *ConsensusPoller) { cp.minPeerCount = minPeerCount @@ -181,6 +188,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller banPeriod: 5 * time.Minute, maxUpdateThreshold: 30 * time.Second, + maxBlockLag: 50, minPeerCount: 3, } @@ -264,11 +272,29 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { // UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { + var highestBlock hexutil.Uint64 var lowestBlock hexutil.Uint64 var lowestBlockHash string currentConsensusBlockNumber := cp.GetConsensusBlockNumber() + // find the highest block, in order to use it defining the highest non-lagging ancestor block + for _, be := range cp.backendGroup.Backends { + peerCount, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be) + + if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { + continue + } + if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { + continue + } + + if backendLatestBlockNumber > highestBlock { + highestBlock = backendLatestBlockNumber + } + } + + // find the highest common ancestor block for _, be := range cp.backendGroup.Backends { peerCount, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be) @@ -279,6 +305,11 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { continue } + // check if backend is lagging behind the highest block + if backendLatestBlockNumber < highestBlock && uint64(highestBlock-backendLatestBlockNumber) > cp.maxBlockLag { + continue + } + if lowestBlock == 0 || backendLatestBlockNumber < lowestBlock { lowestBlock = backendLatestBlockNumber lowestBlockHash = backendLatestBlockHash @@ -317,12 +348,15 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { - not banned - with minimum peer count - updated recently + - not lagging */ - peerCount, _, _, lastUpdate, bannedUntil := cp.getBackendState(be) + + peerCount, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) isBanned := time.Now().Before(bannedUntil) notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount - if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers { + lagging := latestBlockNumber < proposedBlock + if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers || lagging { filteredBackendsNames = append(filteredBackendsNames, be.Name) continue } diff --git a/proxyd/proxyd/example.config.toml b/proxyd/proxyd/example.config.toml index cb41614..413cd61 100644 --- a/proxyd/proxyd/example.config.toml +++ b/proxyd/proxyd/example.config.toml @@ -93,6 +93,8 @@ backends = ["infura"] # consensus_ban_period = "1m" # Maximum delay for update the backend, default 30s # consensus_max_update_threshold = "20s" +# Maximum block lag, default 50 +# consensus_max_block_lag = 10 # Minimum peer count, default 3 # consensus_min_peer_count = 4 diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index 51d1f27..729fade 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -97,6 +97,115 @@ func TestConsensus(t *testing.T) { require.Equal(t, 1, len(consensusGroup)) }) + t.Run("prevent using a backend lagging behind", func(t *testing.T) { + h1.ResetOverrides() + h2.ResetOverrides() + bg.Consensus.Unban() + + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildGetBlockResponse("0x1", "hash1"), + }) + + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildGetBlockResponse("0x100", "hash0x100"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "0x100", + Response: buildGetBlockResponse("0x100", "hash0x100"), + }) + + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + // since we ignored node1, the consensus should be at 0x100 + require.Equal(t, "0x100", bg.Consensus.GetConsensusBlockNumber().String()) + + consensusGroup := bg.Consensus.GetConsensusGroup() + + be := backend(bg, "node1") + require.NotNil(t, be) + require.NotContains(t, consensusGroup, be) + require.Equal(t, 1, len(consensusGroup)) + }) + + t.Run("prevent using a backend lagging behind - at limit", func(t *testing.T) { + h1.ResetOverrides() + h2.ResetOverrides() + bg.Consensus.Unban() + + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildGetBlockResponse("0x1", "hash1"), + }) + + // 0x1 + 50 = 0x33 + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildGetBlockResponse("0x33", "hash0x100"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "0x100", + Response: buildGetBlockResponse("0x33", "hash0x100"), + }) + + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + // since we ignored node1, the consensus should be at 0x100 + require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + + consensusGroup := bg.Consensus.GetConsensusGroup() + + require.Equal(t, 2, len(consensusGroup)) + }) + + t.Run("prevent using a backend lagging behind - one before limit", func(t *testing.T) { + h1.ResetOverrides() + h2.ResetOverrides() + bg.Consensus.Unban() + + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildGetBlockResponse("0x1", "hash1"), + }) + + // 0x1 + 49 = 0x32 + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildGetBlockResponse("0x32", "hash0x100"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "0x100", + Response: buildGetBlockResponse("0x32", "hash0x100"), + }) + + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + + consensusGroup := bg.Consensus.GetConsensusGroup() + + require.Equal(t, 2, len(consensusGroup)) + }) + t.Run("prevent using a backend not in sync", func(t *testing.T) { h1.ResetOverrides() h2.ResetOverrides() diff --git a/proxyd/proxyd/integration_tests/testdata/consensus.toml b/proxyd/proxyd/integration_tests/testdata/consensus.toml index d26b9dc..3f92a3d 100644 --- a/proxyd/proxyd/integration_tests/testdata/consensus.toml +++ b/proxyd/proxyd/integration_tests/testdata/consensus.toml @@ -18,6 +18,7 @@ consensus_aware = true consensus_handler = "noop" # allow more control over the consensus poller for tests consensus_ban_period = "1m" consensus_max_update_threshold = "2m" +consensus_max_block_lag = 50 consensus_min_peer_count = 4 [rpc_method_mappings] diff --git a/proxyd/proxyd/proxyd.go b/proxyd/proxyd/proxyd.go index cd02074..fa0371b 100644 --- a/proxyd/proxyd/proxyd.go +++ b/proxyd/proxyd/proxyd.go @@ -328,6 +328,9 @@ func Start(config *Config) (*Server, func(), error) { if bgcfg.ConsensusMaxUpdateThreshold > 0 { copts = append(copts, WithMaxUpdateThreshold(time.Duration(bgcfg.ConsensusMaxUpdateThreshold))) } + if bgcfg.ConsensusMaxBlockLag > 0 { + copts = append(copts, WithMaxBlockLag(bgcfg.ConsensusMaxBlockLag)) + } if bgcfg.ConsensusMinPeerCount > 0 { copts = append(copts, WithMinPeerCount(uint64(bgcfg.ConsensusMinPeerCount))) }