diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index d79e67a..f077a0f 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -17,11 +17,14 @@ const ( PollerInterval = 1 * time.Second ) +type OnConsensusBroken func() + // ConsensusPoller checks the consensus state for each member of a BackendGroup // resolves the highest common block for multiple nodes, and reconciles the consensus // in case of block hash divergence to minimize re-orgs type ConsensusPoller struct { cancelFunc context.CancelFunc + listeners []OnConsensusBroken backendGroup *BackendGroup backendState map[*Backend]*backendState @@ -150,6 +153,16 @@ func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt { } } +func WithListener(listener OnConsensusBroken) ConsensusOpt { + return func(cp *ConsensusPoller) { + cp.AddListener(listener) + } +} + +func (cp *ConsensusPoller) AddListener(listener OnConsensusBroken) { + cp.listeners = append(cp.listeners, listener) +} + func WithBanPeriod(banPeriod time.Duration) ConsensusOpt { return func(cp *ConsensusPoller) { cp.banPeriod = banPeriod @@ -354,6 +367,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { - with minimum peer count - updated recently - not lagging + - in sync */ peerCount, inSync, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) @@ -397,7 +411,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { } if broken { - // propagate event to other interested parts, such as cache invalidator + for _, l := range cp.listeners { + l() + } log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash) } diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index 41f7e22..9320429 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -289,6 +289,11 @@ func TestConsensus(t *testing.T) { h2.ResetOverrides() bg.Consensus.Unban() + listenerCalled := false + bg.Consensus.AddListener(func() { + listenerCalled = true + }) + for _, be := range bg.Backends { bg.Consensus.UpdateBackend(ctx, be) } @@ -334,7 +339,7 @@ func TestConsensus(t *testing.T) { // should resolve to 0x1, since 0x2 is out of consensus at the moment require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) - // later, when impl events, listen to broken consensus event + require.True(t, listenerCalled) }) t.Run("broken consensus with depth 2", func(t *testing.T) {