From af863d39de25d252479ed6e7266dc7c893c1c843 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Thu, 25 May 2023 21:37:47 -0700 Subject: [PATCH] consensus for {safe,finalized} and rewrite tags --- proxyd/proxyd/backend.go | 6 +- proxyd/proxyd/consensus_poller.go | 117 +++++++--- proxyd/proxyd/consensus_tracker.go | 78 +++++-- .../integration_tests/consensus_test.go | 207 ++++++++++++++++-- .../testdata/consensus_responses.yml | 66 ++++++ proxyd/proxyd/rewriter.go | 12 +- proxyd/proxyd/rewriter_test.go | 12 +- .../tools/mockserver/handler/handler.go | 2 +- 8 files changed, 423 insertions(+), 77 deletions(-) diff --git a/proxyd/proxyd/backend.go b/proxyd/proxyd/backend.go index 8e8db75..3946995 100644 --- a/proxyd/proxyd/backend.go +++ b/proxyd/proxyd/backend.go @@ -556,7 +556,11 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch backends = bg.loadBalancedConsensusGroup() // We also rewrite block tags to enforce compliance with consensus - rctx := RewriteContext{latest: bg.Consensus.GetConsensusBlockNumber()} + rctx := RewriteContext{ + latest: bg.Consensus.GetLatestBlockNumber(), + finalized: bg.Consensus.GetFinalizedBlockNumber(), + safe: bg.Consensus.GetSafeBlockNumber(), + } for i, req := range rpcReqs { res := RPCRes{JSONRPC: JSONRPCVersion, ID: req.ID} diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index a170aca..5c5edcd 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -46,8 +46,12 @@ type backendState struct { latestBlockNumber hexutil.Uint64 latestBlockHash string - peerCount uint64 - inSync bool + + finalizedBlockNumber hexutil.Uint64 + safeBlockNumber hexutil.Uint64 + + peerCount uint64 + inSync bool lastUpdate time.Time @@ -65,9 +69,19 @@ func (cp *ConsensusPoller) GetConsensusGroup() []*Backend { return g } -// GetConsensusBlockNumber returns the agreed block number in a consensus -func (ct *ConsensusPoller) GetConsensusBlockNumber() hexutil.Uint64 { - return ct.tracker.GetConsensusBlockNumber() +// GetLatestBlockNumber returns the `latest` agreed block number in a consensus +func (ct *ConsensusPoller) GetLatestBlockNumber() hexutil.Uint64 { + return ct.tracker.GetLatestBlockNumber() +} + +// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus +func (ct *ConsensusPoller) GetFinalizedBlockNumber() hexutil.Uint64 { + return ct.tracker.GetFinalizedBlockNumber() +} + +// GetSafeBlockNumber returns the `safe` agreed block number in a consensus +func (ct *ConsensusPoller) GetSafeBlockNumber() hexutil.Uint64 { + return ct.tracker.GetSafeBlockNumber() } func (cp *ConsensusPoller) Shutdown() { @@ -261,7 +275,19 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { log.Warn("error updating backend", "name", be.Name, "err", err) } - changed, updateDelay := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash) + finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized") + if err != nil { + log.Warn("error updating backend", "name", be.Name, "err", err) + } + + safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe") + if err != nil { + log.Warn("error updating backend", "name", be.Name, "err", err) + } + + changed, updateDelay := cp.setBackendState(be, peerCount, inSync, + latestBlockNumber, latestBlockHash, + finalizedBlockNumber, safeBlockNumber) if changed { RecordBackendLatestBlock(be, latestBlockNumber) @@ -272,21 +298,27 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { "inSync", inSync, "latestBlockNumber", latestBlockNumber, "latestBlockHash", latestBlockHash, + "finalizedBlockNumber", finalizedBlockNumber, + "safeBlockNumber", safeBlockNumber, "updateDelay", updateDelay) } } // UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { - var highestBlock hexutil.Uint64 - var lowestBlock hexutil.Uint64 - var lowestBlockHash string + var highestLatestBlock hexutil.Uint64 - currentConsensusBlockNumber := cp.GetConsensusBlockNumber() + var lowestLatestBlock hexutil.Uint64 + var lowestLatestBlockHash string + + var lowestFinalizedBlock hexutil.Uint64 + var lowestSafeBlock hexutil.Uint64 + + currentConsensusBlockNumber := cp.GetLatestBlockNumber() // find the highest block, in order to use it defining the highest non-lagging ancestor block for _, be := range cp.backendGroup.Backends { - peerCount, inSync, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be) + peerCount, inSync, backendLatestBlockNumber, _, _, _, lastUpdate, _ := cp.getBackendState(be) if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { continue @@ -298,14 +330,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { continue } - if backendLatestBlockNumber > highestBlock { - highestBlock = backendLatestBlockNumber + if backendLatestBlockNumber > highestLatestBlock { + highestLatestBlock = backendLatestBlockNumber } } // find the highest common ancestor block for _, be := range cp.backendGroup.Backends { - peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be) + peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, backendFinalizedBlockNumber, backendSafeBlockNumber, lastUpdate, _ := cp.getBackendState(be) if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { continue @@ -318,23 +350,31 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { } // check if backend is lagging behind the highest block - if backendLatestBlockNumber < highestBlock && uint64(highestBlock-backendLatestBlockNumber) > cp.maxBlockLag { + if backendLatestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-backendLatestBlockNumber) > cp.maxBlockLag { continue } - if lowestBlock == 0 || backendLatestBlockNumber < lowestBlock { - lowestBlock = backendLatestBlockNumber - lowestBlockHash = backendLatestBlockHash + if lowestLatestBlock == 0 || backendLatestBlockNumber < lowestLatestBlock { + lowestLatestBlock = backendLatestBlockNumber + lowestLatestBlockHash = backendLatestBlockHash + } + + if lowestFinalizedBlock == 0 || backendFinalizedBlockNumber < lowestFinalizedBlock { + lowestFinalizedBlock = backendFinalizedBlockNumber + } + + if lowestSafeBlock == 0 || backendSafeBlockNumber < lowestSafeBlock { + lowestSafeBlock = backendSafeBlockNumber } } // no block to propose (i.e. initializing consensus) - if lowestBlock == 0 { + if lowestLatestBlock == 0 { return } - proposedBlock := lowestBlock - proposedBlockHash := lowestBlockHash + proposedBlock := lowestLatestBlock + proposedBlockHash := lowestLatestBlockHash hasConsensus := false // check if everybody agrees on the same block hash @@ -342,8 +382,8 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { consensusBackendsNames := make([]string, 0, len(cp.backendGroup.Backends)) filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends)) - if lowestBlock > currentConsensusBlockNumber { - log.Debug("validating consensus on block", "lowestBlock", lowestBlock) + if lowestLatestBlock > currentConsensusBlockNumber { + log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock) } broken := false @@ -362,7 +402,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { - in sync */ - peerCount, inSync, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) + peerCount, inSync, latestBlockNumber, _, _, _, lastUpdate, bannedUntil := cp.getBackendState(be) notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) isBanned := time.Now().Before(bannedUntil) notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount @@ -410,7 +450,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash) } - cp.tracker.SetConsensusBlockNumber(proposedBlock) + cp.tracker.SetLatestBlockNumber(proposedBlock) + cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock) + cp.tracker.SetSafeBlockNumber(lowestSafeBlock) cp.consensusGroupMux.Lock() cp.consensusGroup = consensusBackends cp.consensusGroupMux.Unlock() @@ -512,27 +554,38 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo return res, nil } -func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) { +func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool, + latestBlockNumber hexutil.Uint64, latestBlockHash string, + finalizedBlockNumber hexutil.Uint64, + safeBlockNumber hexutil.Uint64, + lastUpdate time.Time, bannedUntil time.Time) { bs := cp.backendState[be] defer bs.backendStateMux.Unlock() bs.backendStateMux.Lock() peerCount = bs.peerCount inSync = bs.inSync - blockNumber = bs.latestBlockNumber - blockHash = bs.latestBlockHash + latestBlockNumber = bs.latestBlockNumber + latestBlockHash = bs.latestBlockHash + finalizedBlockNumber = bs.finalizedBlockNumber + safeBlockNumber = bs.safeBlockNumber lastUpdate = bs.lastUpdate bannedUntil = bs.bannedUntil return } -func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) { +func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, + latestBlockNumber hexutil.Uint64, latestBlockHash string, + finalizedBlockNumber hexutil.Uint64, + safeBlockNumber hexutil.Uint64) (changed bool, updateDelay time.Duration) { bs := cp.backendState[be] bs.backendStateMux.Lock() - changed = bs.latestBlockHash != blockHash + changed = bs.latestBlockHash != latestBlockHash bs.peerCount = peerCount bs.inSync = inSync - bs.latestBlockNumber = blockNumber - bs.latestBlockHash = blockHash + bs.latestBlockNumber = latestBlockNumber + bs.latestBlockHash = latestBlockHash + bs.finalizedBlockNumber = finalizedBlockNumber + bs.safeBlockNumber = safeBlockNumber updateDelay = time.Since(bs.lastUpdate) bs.lastUpdate = time.Now() bs.backendStateMux.Unlock() diff --git a/proxyd/proxyd/consensus_tracker.go b/proxyd/proxyd/consensus_tracker.go index 3bd10e7..83d2ec7 100644 --- a/proxyd/proxyd/consensus_tracker.go +++ b/proxyd/proxyd/consensus_tracker.go @@ -13,35 +13,68 @@ import ( // ConsensusTracker abstracts how we store and retrieve the current consensus // allowing it to be stored locally in-memory or in a shared Redis cluster type ConsensusTracker interface { - GetConsensusBlockNumber() hexutil.Uint64 - SetConsensusBlockNumber(blockNumber hexutil.Uint64) + GetLatestBlockNumber() hexutil.Uint64 + SetLatestBlockNumber(blockNumber hexutil.Uint64) + GetFinalizedBlockNumber() hexutil.Uint64 + SetFinalizedBlockNumber(blockNumber hexutil.Uint64) + GetSafeBlockNumber() hexutil.Uint64 + SetSafeBlockNumber(blockNumber hexutil.Uint64) } // InMemoryConsensusTracker store and retrieve in memory, async-safe type InMemoryConsensusTracker struct { - consensusBlockNumber hexutil.Uint64 + latestBlockNumber hexutil.Uint64 + finalizedBlockNumber hexutil.Uint64 + safeBlockNumber hexutil.Uint64 mutex sync.Mutex } func NewInMemoryConsensusTracker() ConsensusTracker { return &InMemoryConsensusTracker{ - consensusBlockNumber: 0, - mutex: sync.Mutex{}, + mutex: sync.Mutex{}, } } -func (ct *InMemoryConsensusTracker) GetConsensusBlockNumber() hexutil.Uint64 { +func (ct *InMemoryConsensusTracker) GetLatestBlockNumber() hexutil.Uint64 { defer ct.mutex.Unlock() ct.mutex.Lock() - return ct.consensusBlockNumber + return ct.latestBlockNumber } -func (ct *InMemoryConsensusTracker) SetConsensusBlockNumber(blockNumber hexutil.Uint64) { +func (ct *InMemoryConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64) { defer ct.mutex.Unlock() ct.mutex.Lock() - ct.consensusBlockNumber = blockNumber + ct.latestBlockNumber = blockNumber +} + +func (ct *InMemoryConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 { + defer ct.mutex.Unlock() + ct.mutex.Lock() + + return ct.finalizedBlockNumber +} + +func (ct *InMemoryConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) { + defer ct.mutex.Unlock() + ct.mutex.Lock() + + ct.finalizedBlockNumber = blockNumber +} + +func (ct *InMemoryConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 { + defer ct.mutex.Unlock() + ct.mutex.Lock() + + return ct.safeBlockNumber +} + +func (ct *InMemoryConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) { + defer ct.mutex.Unlock() + ct.mutex.Lock() + + ct.safeBlockNumber = blockNumber } // RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe @@ -59,14 +92,29 @@ func NewRedisConsensusTracker(ctx context.Context, r *redis.Client, namespace st } } -func (ct *RedisConsensusTracker) key() string { - return fmt.Sprintf("consensus_latest_block:%s", ct.backendGroup) +func (ct *RedisConsensusTracker) key(tag string) string { + return fmt.Sprintf("consensus:%s:%s", ct.backendGroup, tag) } -func (ct *RedisConsensusTracker) GetConsensusBlockNumber() hexutil.Uint64 { - return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key()).Val())) +func (ct *RedisConsensusTracker) GetLatestBlockNumber() hexutil.Uint64 { + return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("latest")).Val())) } -func (ct *RedisConsensusTracker) SetConsensusBlockNumber(blockNumber hexutil.Uint64) { - ct.client.Set(ct.ctx, ct.key(), blockNumber, 0) +func (ct *RedisConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64) { + ct.client.Set(ct.ctx, ct.key("latest"), blockNumber, 0) +} + +func (ct *RedisConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 { + return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("finalized")).Val())) +} + +func (ct *RedisConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) { + ct.client.Set(ct.ctx, ct.key("finalized"), blockNumber, 0) +} +func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 { + return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("safe")).Val())) +} + +func (ct *RedisConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) { + ct.client.Set(ct.ctx, ct.key("safe"), blockNumber, 0) } diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go index 9320429..d95903e 100644 --- a/proxyd/proxyd/integration_tests/consensus_test.go +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -61,7 +61,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.Unban() // unknown consensus at init - require.Equal(t, "0x0", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x0", bg.Consensus.GetLatestBlockNumber().String()) // first poll for _, be := range bg.Backends { @@ -70,7 +70,9 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // consensus at block 0x1 - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0x555", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0x551", bg.Consensus.GetSafeBlockNumber().String()) }) t.Run("prevent using a backend with low peer count", func(t *testing.T) { @@ -108,6 +110,16 @@ func TestConsensus(t *testing.T) { Block: "latest", Response: buildGetBlockResponse("0x1", "hash1"), }) + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "finalized", + Response: buildGetBlockResponse("0x1", "hash1"), + }) + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "safe", + Response: buildGetBlockResponse("0x1", "hash1"), + }) h2.AddOverride(&ms.MethodTemplate{ Method: "eth_getBlockByNumber", @@ -126,7 +138,9 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // since we ignored node1, the consensus should be at 0x100 - require.Equal(t, "0x100", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x100", bg.Consensus.GetLatestBlockNumber().String()) + require.Equal(t, "0x555", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0x551", bg.Consensus.GetSafeBlockNumber().String()) consensusGroup := bg.Consensus.GetConsensusGroup() @@ -166,7 +180,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // since we ignored node1, the consensus should be at 0x100 - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) consensusGroup := bg.Consensus.GetConsensusGroup() @@ -201,7 +215,7 @@ func TestConsensus(t *testing.T) { } bg.Consensus.UpdateBackendGroupConsensus(ctx) - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) consensusGroup := bg.Consensus.GetConsensusGroup() @@ -249,7 +263,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // all nodes start at block 0x1 - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) // advance latest on node2 to 0x2 h2.AddOverride(&ms.MethodTemplate{ @@ -265,7 +279,7 @@ func TestConsensus(t *testing.T) { // consensus should stick to 0x1, since node1 is still lagging there bg.Consensus.UpdateBackendGroupConsensus(ctx) - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) // advance latest on node1 to 0x2 h1.AddOverride(&ms.MethodTemplate{ @@ -281,7 +295,82 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // should stick to 0x2, since now all nodes are at 0x2 - require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x2", bg.Consensus.GetLatestBlockNumber().String()) + }) + + t.Run("should use lowest safe and finalized", func(t *testing.T) { + h1.ResetOverrides() + h2.ResetOverrides() + bg.Consensus.Unban() + + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "finalized", + Response: buildGetBlockResponse("0x559", "hash559"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "safe", + Response: buildGetBlockResponse("0x558", "hash558"), + }) + + // poll for group consensus + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + require.Equal(t, "0x555", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0x551", bg.Consensus.GetSafeBlockNumber().String()) + }) + + t.Run("advance safe and finalized", func(t *testing.T) { + h1.ResetOverrides() + h2.ResetOverrides() + bg.Consensus.Unban() + + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "finalized", + Response: buildGetBlockResponse("0x556", "hash556"), + }) + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "safe", + Response: buildGetBlockResponse("0x552", "hash552"), + }) + + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "finalized", + Response: buildGetBlockResponse("0x559", "hash559"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "safe", + Response: buildGetBlockResponse("0x558", "hash558"), + }) + + // poll for group consensus + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + require.Equal(t, "0x556", bg.Consensus.GetFinalizedBlockNumber().String()) + require.Equal(t, "0x552", bg.Consensus.GetSafeBlockNumber().String()) }) t.Run("broken consensus", func(t *testing.T) { @@ -300,7 +389,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // all nodes start at block 0x1 - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) // advance latest on both nodes to 0x2 h1.AddOverride(&ms.MethodTemplate{ @@ -321,7 +410,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // at 0x2 - require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x2", bg.Consensus.GetLatestBlockNumber().String()) // make node2 diverge on hash h2.AddOverride(&ms.MethodTemplate{ @@ -337,7 +426,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // should resolve to 0x1, since 0x2 is out of consensus at the moment - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) require.True(t, listenerCalled) }) @@ -353,7 +442,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // all nodes start at block 0x1 - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) // advance latest on both nodes to 0x2 h1.AddOverride(&ms.MethodTemplate{ @@ -374,7 +463,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // at 0x2 - require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x2", bg.Consensus.GetLatestBlockNumber().String()) // advance latest on both nodes to 0x3 h1.AddOverride(&ms.MethodTemplate{ @@ -395,7 +484,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // at 0x3 - require.Equal(t, "0x3", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x3", bg.Consensus.GetLatestBlockNumber().String()) // make node2 diverge on hash for blocks 0x2 and 0x3 h2.AddOverride(&ms.MethodTemplate{ @@ -416,7 +505,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // should resolve to 0x1 - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) }) t.Run("fork in advanced block", func(t *testing.T) { @@ -430,7 +519,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // all nodes start at block 0x1 - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) // make nodes 1 and 2 advance in forks h1.AddOverride(&ms.MethodTemplate{ @@ -471,7 +560,7 @@ func TestConsensus(t *testing.T) { bg.Consensus.UpdateBackendGroupConsensus(ctx) // should resolve to 0x1, the highest common ancestor - require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) + require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String()) }) t.Run("load balancing should hit both backends", func(t *testing.T) { @@ -607,7 +696,7 @@ func TestConsensus(t *testing.T) { require.Equal(t, totalRequests, len(node1.Requests())+len(node2.Requests())) }) - t.Run("rewrite request of eth_getBlockByNumber", func(t *testing.T) { + t.Run("rewrite request of eth_getBlockByNumber for latest", func(t *testing.T) { h1.ResetOverrides() h2.ResetOverrides() bg.Consensus.Unban() @@ -643,6 +732,88 @@ func TestConsensus(t *testing.T) { require.Equal(t, "0x2", jsonMap["params"].([]interface{})[0]) }) + t.Run("rewrite request of eth_getBlockByNumber for finalized", func(t *testing.T) { + h1.ResetOverrides() + h2.ResetOverrides() + bg.Consensus.Unban() + + // establish the consensus and ban node2 for now + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildGetBlockResponse("0x20", "hash20"), + }) + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "finalized", + Response: buildGetBlockResponse("0x5", "hash5"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "net_peerCount", + Block: "", + Response: buildPeerCountResponse(1), + }) + + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) + + node1.Reset() + + _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"finalized"}) + require.NoError(t, err) + require.Equal(t, 200, statusCode) + + var jsonMap map[string]interface{} + err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap) + require.NoError(t, err) + require.Equal(t, "0x5", jsonMap["params"].([]interface{})[0]) + }) + + t.Run("rewrite request of eth_getBlockByNumber for safe", func(t *testing.T) { + h1.ResetOverrides() + h2.ResetOverrides() + bg.Consensus.Unban() + + // establish the consensus and ban node2 for now + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildGetBlockResponse("0x20", "hash20"), + }) + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "safe", + Response: buildGetBlockResponse("0x1", "hash1"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "net_peerCount", + Block: "", + Response: buildPeerCountResponse(1), + }) + + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup())) + + node1.Reset() + + _, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"safe"}) + require.NoError(t, err) + require.Equal(t, 200, statusCode) + + var jsonMap map[string]interface{} + err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap) + require.NoError(t, err) + require.Equal(t, "0x1", jsonMap["params"].([]interface{})[0]) + }) + t.Run("rewrite request of eth_getBlockByNumber - out of range", func(t *testing.T) { h1.ResetOverrides() h2.ResetOverrides() diff --git a/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml b/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml index 83579e7..ef5b692 100644 --- a/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml +++ b/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml @@ -63,3 +63,69 @@ "number": "0x3" } } +- method: eth_getBlockByNumber + block: finalized + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_finalized", + "number": "0x555" + } + } +- method: eth_getBlockByNumber + block: 0x555 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_finalized", + "number": "0x555" + } + } +- method: eth_getBlockByNumber + block: safe + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_safe", + "number": "0x551" + } + } +- method: eth_getBlockByNumber + block: 0x555 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash_safe", + "number": "0x551" + } + } +- method: eth_getBlockByNumber + block: 0x5 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash5", + "number": "0x5" + } + } +- method: eth_getBlockByNumber + block: 0x20 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash20", + "number": "0x20" + } + } diff --git a/proxyd/proxyd/rewriter.go b/proxyd/proxyd/rewriter.go index be539bc..ab098d8 100644 --- a/proxyd/proxyd/rewriter.go +++ b/proxyd/proxyd/rewriter.go @@ -9,7 +9,9 @@ import ( ) type RewriteContext struct { - latest hexutil.Uint64 + latest hexutil.Uint64 + finalized hexutil.Uint64 + safe hexutil.Uint64 } type RewriteResult uint8 @@ -180,11 +182,13 @@ func rewriteTag(rctx RewriteContext, current string) (string, bool, error) { } switch *bnh.BlockNumber { - case rpc.SafeBlockNumber, - rpc.FinalizedBlockNumber, - rpc.PendingBlockNumber, + case rpc.PendingBlockNumber, rpc.EarliestBlockNumber: return current, false, nil + case rpc.FinalizedBlockNumber: + return rctx.finalized.String(), true, nil + case rpc.SafeBlockNumber: + return rctx.safe.String(), true, nil case rpc.LatestBlockNumber: return rctx.latest.String(), true, nil default: diff --git a/proxyd/proxyd/rewriter_test.go b/proxyd/proxyd/rewriter_test.go index ae5c9c8..eb7f18f 100644 --- a/proxyd/proxyd/rewriter_test.go +++ b/proxyd/proxyd/rewriter_test.go @@ -326,33 +326,33 @@ func TestRewriteRequest(t *testing.T) { { name: "eth_getBlockByNumber finalized", args: args{ - rctx: RewriteContext{latest: hexutil.Uint64(100)}, + rctx: RewriteContext{latest: hexutil.Uint64(100), finalized: hexutil.Uint64(55)}, req: &RPCReq{Method: "eth_getBlockByNumber", Params: mustMarshalJSON([]string{"finalized"})}, res: nil, }, - expected: RewriteNone, + expected: RewriteOverrideRequest, check: func(t *testing.T, args args) { var p []string err := json.Unmarshal(args.req.Params, &p) require.Nil(t, err) require.Equal(t, 1, len(p)) - require.Equal(t, "finalized", p[0]) + require.Equal(t, hexutil.Uint64(55).String(), p[0]) }, }, { name: "eth_getBlockByNumber safe", args: args{ - rctx: RewriteContext{latest: hexutil.Uint64(100)}, + rctx: RewriteContext{latest: hexutil.Uint64(100), safe: hexutil.Uint64(50)}, req: &RPCReq{Method: "eth_getBlockByNumber", Params: mustMarshalJSON([]string{"safe"})}, res: nil, }, - expected: RewriteNone, + expected: RewriteOverrideRequest, check: func(t *testing.T, args args) { var p []string err := json.Unmarshal(args.req.Params, &p) require.Nil(t, err) require.Equal(t, 1, len(p)) - require.Equal(t, "safe", p[0]) + require.Equal(t, hexutil.Uint64(50).String(), p[0]) }, }, { diff --git a/proxyd/proxyd/tools/mockserver/handler/handler.go b/proxyd/proxyd/tools/mockserver/handler/handler.go index 04f30e7..8a78a7e 100644 --- a/proxyd/proxyd/tools/mockserver/handler/handler.go +++ b/proxyd/proxyd/tools/mockserver/handler/handler.go @@ -95,7 +95,7 @@ func (mh *MockedHandler) Handler(w http.ResponseWriter, req *http.Request) { resBody := "" if batched { resBody = "[" + strings.Join(responses, ",") + "]" - } else { + } else if len(responses) > 0 { resBody = responses[0] }