consensus for {safe,finalized} and rewrite tags

This commit is contained in:
Felipe Andrade 2023-05-25 21:37:47 -07:00
parent d5a476c5e8
commit af863d39de
8 changed files with 423 additions and 77 deletions

@ -556,7 +556,11 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
backends = bg.loadBalancedConsensusGroup() backends = bg.loadBalancedConsensusGroup()
// We also rewrite block tags to enforce compliance with consensus // We also rewrite block tags to enforce compliance with consensus
rctx := RewriteContext{latest: bg.Consensus.GetConsensusBlockNumber()} rctx := RewriteContext{
latest: bg.Consensus.GetLatestBlockNumber(),
finalized: bg.Consensus.GetFinalizedBlockNumber(),
safe: bg.Consensus.GetSafeBlockNumber(),
}
for i, req := range rpcReqs { for i, req := range rpcReqs {
res := RPCRes{JSONRPC: JSONRPCVersion, ID: req.ID} res := RPCRes{JSONRPC: JSONRPCVersion, ID: req.ID}

@ -46,6 +46,10 @@ type backendState struct {
latestBlockNumber hexutil.Uint64 latestBlockNumber hexutil.Uint64
latestBlockHash string latestBlockHash string
finalizedBlockNumber hexutil.Uint64
safeBlockNumber hexutil.Uint64
peerCount uint64 peerCount uint64
inSync bool inSync bool
@ -65,9 +69,19 @@ func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
return g return g
} }
// GetConsensusBlockNumber returns the agreed block number in a consensus // GetLatestBlockNumber returns the `latest` agreed block number in a consensus
func (ct *ConsensusPoller) GetConsensusBlockNumber() hexutil.Uint64 { func (ct *ConsensusPoller) GetLatestBlockNumber() hexutil.Uint64 {
return ct.tracker.GetConsensusBlockNumber() return ct.tracker.GetLatestBlockNumber()
}
// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus
func (ct *ConsensusPoller) GetFinalizedBlockNumber() hexutil.Uint64 {
return ct.tracker.GetFinalizedBlockNumber()
}
// GetSafeBlockNumber returns the `safe` agreed block number in a consensus
func (ct *ConsensusPoller) GetSafeBlockNumber() hexutil.Uint64 {
return ct.tracker.GetSafeBlockNumber()
} }
func (cp *ConsensusPoller) Shutdown() { func (cp *ConsensusPoller) Shutdown() {
@ -261,7 +275,19 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
log.Warn("error updating backend", "name", be.Name, "err", err) log.Warn("error updating backend", "name", be.Name, "err", err)
} }
changed, updateDelay := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash) finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized")
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
}
safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe")
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
}
changed, updateDelay := cp.setBackendState(be, peerCount, inSync,
latestBlockNumber, latestBlockHash,
finalizedBlockNumber, safeBlockNumber)
if changed { if changed {
RecordBackendLatestBlock(be, latestBlockNumber) RecordBackendLatestBlock(be, latestBlockNumber)
@ -272,21 +298,27 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
"inSync", inSync, "inSync", inSync,
"latestBlockNumber", latestBlockNumber, "latestBlockNumber", latestBlockNumber,
"latestBlockHash", latestBlockHash, "latestBlockHash", latestBlockHash,
"finalizedBlockNumber", finalizedBlockNumber,
"safeBlockNumber", safeBlockNumber,
"updateDelay", updateDelay) "updateDelay", updateDelay)
} }
} }
// UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends // UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends
func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
var highestBlock hexutil.Uint64 var highestLatestBlock hexutil.Uint64
var lowestBlock hexutil.Uint64
var lowestBlockHash string
currentConsensusBlockNumber := cp.GetConsensusBlockNumber() var lowestLatestBlock hexutil.Uint64
var lowestLatestBlockHash string
var lowestFinalizedBlock hexutil.Uint64
var lowestSafeBlock hexutil.Uint64
currentConsensusBlockNumber := cp.GetLatestBlockNumber()
// find the highest block, in order to use it defining the highest non-lagging ancestor block // find the highest block, in order to use it defining the highest non-lagging ancestor block
for _, be := range cp.backendGroup.Backends { for _, be := range cp.backendGroup.Backends {
peerCount, inSync, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be) peerCount, inSync, backendLatestBlockNumber, _, _, _, lastUpdate, _ := cp.getBackendState(be)
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
continue continue
@ -298,14 +330,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
continue continue
} }
if backendLatestBlockNumber > highestBlock { if backendLatestBlockNumber > highestLatestBlock {
highestBlock = backendLatestBlockNumber highestLatestBlock = backendLatestBlockNumber
} }
} }
// find the highest common ancestor block // find the highest common ancestor block
for _, be := range cp.backendGroup.Backends { for _, be := range cp.backendGroup.Backends {
peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be) peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, backendFinalizedBlockNumber, backendSafeBlockNumber, lastUpdate, _ := cp.getBackendState(be)
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
continue continue
@ -318,23 +350,31 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
} }
// check if backend is lagging behind the highest block // check if backend is lagging behind the highest block
if backendLatestBlockNumber < highestBlock && uint64(highestBlock-backendLatestBlockNumber) > cp.maxBlockLag { if backendLatestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-backendLatestBlockNumber) > cp.maxBlockLag {
continue continue
} }
if lowestBlock == 0 || backendLatestBlockNumber < lowestBlock { if lowestLatestBlock == 0 || backendLatestBlockNumber < lowestLatestBlock {
lowestBlock = backendLatestBlockNumber lowestLatestBlock = backendLatestBlockNumber
lowestBlockHash = backendLatestBlockHash lowestLatestBlockHash = backendLatestBlockHash
}
if lowestFinalizedBlock == 0 || backendFinalizedBlockNumber < lowestFinalizedBlock {
lowestFinalizedBlock = backendFinalizedBlockNumber
}
if lowestSafeBlock == 0 || backendSafeBlockNumber < lowestSafeBlock {
lowestSafeBlock = backendSafeBlockNumber
} }
} }
// no block to propose (i.e. initializing consensus) // no block to propose (i.e. initializing consensus)
if lowestBlock == 0 { if lowestLatestBlock == 0 {
return return
} }
proposedBlock := lowestBlock proposedBlock := lowestLatestBlock
proposedBlockHash := lowestBlockHash proposedBlockHash := lowestLatestBlockHash
hasConsensus := false hasConsensus := false
// check if everybody agrees on the same block hash // check if everybody agrees on the same block hash
@ -342,8 +382,8 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
consensusBackendsNames := make([]string, 0, len(cp.backendGroup.Backends)) consensusBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends)) filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
if lowestBlock > currentConsensusBlockNumber { if lowestLatestBlock > currentConsensusBlockNumber {
log.Debug("validating consensus on block", "lowestBlock", lowestBlock) log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock)
} }
broken := false broken := false
@ -362,7 +402,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
- in sync - in sync
*/ */
peerCount, inSync, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) peerCount, inSync, latestBlockNumber, _, _, _, lastUpdate, bannedUntil := cp.getBackendState(be)
notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now())
isBanned := time.Now().Before(bannedUntil) isBanned := time.Now().Before(bannedUntil)
notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount
@ -410,7 +450,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash) log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash)
} }
cp.tracker.SetConsensusBlockNumber(proposedBlock) cp.tracker.SetLatestBlockNumber(proposedBlock)
cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock)
cp.tracker.SetSafeBlockNumber(lowestSafeBlock)
cp.consensusGroupMux.Lock() cp.consensusGroupMux.Lock()
cp.consensusGroup = consensusBackends cp.consensusGroup = consensusBackends
cp.consensusGroupMux.Unlock() cp.consensusGroupMux.Unlock()
@ -512,27 +554,38 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo
return res, nil return res, nil
} }
func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) { func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool,
latestBlockNumber hexutil.Uint64, latestBlockHash string,
finalizedBlockNumber hexutil.Uint64,
safeBlockNumber hexutil.Uint64,
lastUpdate time.Time, bannedUntil time.Time) {
bs := cp.backendState[be] bs := cp.backendState[be]
defer bs.backendStateMux.Unlock() defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
peerCount = bs.peerCount peerCount = bs.peerCount
inSync = bs.inSync inSync = bs.inSync
blockNumber = bs.latestBlockNumber latestBlockNumber = bs.latestBlockNumber
blockHash = bs.latestBlockHash latestBlockHash = bs.latestBlockHash
finalizedBlockNumber = bs.finalizedBlockNumber
safeBlockNumber = bs.safeBlockNumber
lastUpdate = bs.lastUpdate lastUpdate = bs.lastUpdate
bannedUntil = bs.bannedUntil bannedUntil = bs.bannedUntil
return return
} }
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) { func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool,
latestBlockNumber hexutil.Uint64, latestBlockHash string,
finalizedBlockNumber hexutil.Uint64,
safeBlockNumber hexutil.Uint64) (changed bool, updateDelay time.Duration) {
bs := cp.backendState[be] bs := cp.backendState[be]
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
changed = bs.latestBlockHash != blockHash changed = bs.latestBlockHash != latestBlockHash
bs.peerCount = peerCount bs.peerCount = peerCount
bs.inSync = inSync bs.inSync = inSync
bs.latestBlockNumber = blockNumber bs.latestBlockNumber = latestBlockNumber
bs.latestBlockHash = blockHash bs.latestBlockHash = latestBlockHash
bs.finalizedBlockNumber = finalizedBlockNumber
bs.safeBlockNumber = safeBlockNumber
updateDelay = time.Since(bs.lastUpdate) updateDelay = time.Since(bs.lastUpdate)
bs.lastUpdate = time.Now() bs.lastUpdate = time.Now()
bs.backendStateMux.Unlock() bs.backendStateMux.Unlock()

@ -13,35 +13,68 @@ import (
// ConsensusTracker abstracts how we store and retrieve the current consensus // ConsensusTracker abstracts how we store and retrieve the current consensus
// allowing it to be stored locally in-memory or in a shared Redis cluster // allowing it to be stored locally in-memory or in a shared Redis cluster
type ConsensusTracker interface { type ConsensusTracker interface {
GetConsensusBlockNumber() hexutil.Uint64 GetLatestBlockNumber() hexutil.Uint64
SetConsensusBlockNumber(blockNumber hexutil.Uint64) SetLatestBlockNumber(blockNumber hexutil.Uint64)
GetFinalizedBlockNumber() hexutil.Uint64
SetFinalizedBlockNumber(blockNumber hexutil.Uint64)
GetSafeBlockNumber() hexutil.Uint64
SetSafeBlockNumber(blockNumber hexutil.Uint64)
} }
// InMemoryConsensusTracker store and retrieve in memory, async-safe // InMemoryConsensusTracker store and retrieve in memory, async-safe
type InMemoryConsensusTracker struct { type InMemoryConsensusTracker struct {
consensusBlockNumber hexutil.Uint64 latestBlockNumber hexutil.Uint64
finalizedBlockNumber hexutil.Uint64
safeBlockNumber hexutil.Uint64
mutex sync.Mutex mutex sync.Mutex
} }
func NewInMemoryConsensusTracker() ConsensusTracker { func NewInMemoryConsensusTracker() ConsensusTracker {
return &InMemoryConsensusTracker{ return &InMemoryConsensusTracker{
consensusBlockNumber: 0,
mutex: sync.Mutex{}, mutex: sync.Mutex{},
} }
} }
func (ct *InMemoryConsensusTracker) GetConsensusBlockNumber() hexutil.Uint64 { func (ct *InMemoryConsensusTracker) GetLatestBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock() defer ct.mutex.Unlock()
ct.mutex.Lock() ct.mutex.Lock()
return ct.consensusBlockNumber return ct.latestBlockNumber
} }
func (ct *InMemoryConsensusTracker) SetConsensusBlockNumber(blockNumber hexutil.Uint64) { func (ct *InMemoryConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock() defer ct.mutex.Unlock()
ct.mutex.Lock() ct.mutex.Lock()
ct.consensusBlockNumber = blockNumber ct.latestBlockNumber = blockNumber
}
func (ct *InMemoryConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()
return ct.finalizedBlockNumber
}
func (ct *InMemoryConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()
ct.finalizedBlockNumber = blockNumber
}
func (ct *InMemoryConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()
return ct.safeBlockNumber
}
func (ct *InMemoryConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()
ct.safeBlockNumber = blockNumber
} }
// RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe // RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe
@ -59,14 +92,29 @@ func NewRedisConsensusTracker(ctx context.Context, r *redis.Client, namespace st
} }
} }
func (ct *RedisConsensusTracker) key() string { func (ct *RedisConsensusTracker) key(tag string) string {
return fmt.Sprintf("consensus_latest_block:%s", ct.backendGroup) return fmt.Sprintf("consensus:%s:%s", ct.backendGroup, tag)
} }
func (ct *RedisConsensusTracker) GetConsensusBlockNumber() hexutil.Uint64 { func (ct *RedisConsensusTracker) GetLatestBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key()).Val())) return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("latest")).Val()))
} }
func (ct *RedisConsensusTracker) SetConsensusBlockNumber(blockNumber hexutil.Uint64) { func (ct *RedisConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key(), blockNumber, 0) ct.client.Set(ct.ctx, ct.key("latest"), blockNumber, 0)
}
func (ct *RedisConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("finalized")).Val()))
}
func (ct *RedisConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key("finalized"), blockNumber, 0)
}
func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("safe")).Val()))
}
func (ct *RedisConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key("safe"), blockNumber, 0)
} }

@ -61,7 +61,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.Unban() bg.Consensus.Unban()
// unknown consensus at init // unknown consensus at init
require.Equal(t, "0x0", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x0", bg.Consensus.GetLatestBlockNumber().String())
// first poll // first poll
for _, be := range bg.Backends { for _, be := range bg.Backends {
@ -70,7 +70,9 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
// consensus at block 0x1 // consensus at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0x555", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0x551", bg.Consensus.GetSafeBlockNumber().String())
}) })
t.Run("prevent using a backend with low peer count", func(t *testing.T) { t.Run("prevent using a backend with low peer count", func(t *testing.T) {
@ -108,6 +110,16 @@ func TestConsensus(t *testing.T) {
Block: "latest", Block: "latest",
Response: buildGetBlockResponse("0x1", "hash1"), Response: buildGetBlockResponse("0x1", "hash1"),
}) })
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "finalized",
Response: buildGetBlockResponse("0x1", "hash1"),
})
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "safe",
Response: buildGetBlockResponse("0x1", "hash1"),
})
h2.AddOverride(&ms.MethodTemplate{ h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber", Method: "eth_getBlockByNumber",
@ -126,7 +138,9 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
// since we ignored node1, the consensus should be at 0x100 // since we ignored node1, the consensus should be at 0x100
require.Equal(t, "0x100", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x100", bg.Consensus.GetLatestBlockNumber().String())
require.Equal(t, "0x555", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0x551", bg.Consensus.GetSafeBlockNumber().String())
consensusGroup := bg.Consensus.GetConsensusGroup() consensusGroup := bg.Consensus.GetConsensusGroup()
@ -166,7 +180,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
// since we ignored node1, the consensus should be at 0x100 // since we ignored node1, the consensus should be at 0x100
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String())
consensusGroup := bg.Consensus.GetConsensusGroup() consensusGroup := bg.Consensus.GetConsensusGroup()
@ -201,7 +215,7 @@ func TestConsensus(t *testing.T) {
} }
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String())
consensusGroup := bg.Consensus.GetConsensusGroup() consensusGroup := bg.Consensus.GetConsensusGroup()
@ -249,7 +263,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1 // all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String())
// advance latest on node2 to 0x2 // advance latest on node2 to 0x2
h2.AddOverride(&ms.MethodTemplate{ h2.AddOverride(&ms.MethodTemplate{
@ -265,7 +279,7 @@ func TestConsensus(t *testing.T) {
// consensus should stick to 0x1, since node1 is still lagging there // consensus should stick to 0x1, since node1 is still lagging there
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String())
// advance latest on node1 to 0x2 // advance latest on node1 to 0x2
h1.AddOverride(&ms.MethodTemplate{ h1.AddOverride(&ms.MethodTemplate{
@ -281,7 +295,82 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should stick to 0x2, since now all nodes are at 0x2 // should stick to 0x2, since now all nodes are at 0x2
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x2", bg.Consensus.GetLatestBlockNumber().String())
})
t.Run("should use lowest safe and finalized", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "finalized",
Response: buildGetBlockResponse("0x559", "hash559"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "safe",
Response: buildGetBlockResponse("0x558", "hash558"),
})
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, "0x555", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0x551", bg.Consensus.GetSafeBlockNumber().String())
})
t.Run("advance safe and finalized", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "finalized",
Response: buildGetBlockResponse("0x556", "hash556"),
})
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "safe",
Response: buildGetBlockResponse("0x552", "hash552"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "finalized",
Response: buildGetBlockResponse("0x559", "hash559"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "safe",
Response: buildGetBlockResponse("0x558", "hash558"),
})
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, "0x556", bg.Consensus.GetFinalizedBlockNumber().String())
require.Equal(t, "0x552", bg.Consensus.GetSafeBlockNumber().String())
}) })
t.Run("broken consensus", func(t *testing.T) { t.Run("broken consensus", func(t *testing.T) {
@ -300,7 +389,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1 // all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String())
// advance latest on both nodes to 0x2 // advance latest on both nodes to 0x2
h1.AddOverride(&ms.MethodTemplate{ h1.AddOverride(&ms.MethodTemplate{
@ -321,7 +410,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
// at 0x2 // at 0x2
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x2", bg.Consensus.GetLatestBlockNumber().String())
// make node2 diverge on hash // make node2 diverge on hash
h2.AddOverride(&ms.MethodTemplate{ h2.AddOverride(&ms.MethodTemplate{
@ -337,7 +426,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should resolve to 0x1, since 0x2 is out of consensus at the moment // should resolve to 0x1, since 0x2 is out of consensus at the moment
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String())
require.True(t, listenerCalled) require.True(t, listenerCalled)
}) })
@ -353,7 +442,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1 // all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String())
// advance latest on both nodes to 0x2 // advance latest on both nodes to 0x2
h1.AddOverride(&ms.MethodTemplate{ h1.AddOverride(&ms.MethodTemplate{
@ -374,7 +463,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
// at 0x2 // at 0x2
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x2", bg.Consensus.GetLatestBlockNumber().String())
// advance latest on both nodes to 0x3 // advance latest on both nodes to 0x3
h1.AddOverride(&ms.MethodTemplate{ h1.AddOverride(&ms.MethodTemplate{
@ -395,7 +484,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
// at 0x3 // at 0x3
require.Equal(t, "0x3", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x3", bg.Consensus.GetLatestBlockNumber().String())
// make node2 diverge on hash for blocks 0x2 and 0x3 // make node2 diverge on hash for blocks 0x2 and 0x3
h2.AddOverride(&ms.MethodTemplate{ h2.AddOverride(&ms.MethodTemplate{
@ -416,7 +505,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should resolve to 0x1 // should resolve to 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String())
}) })
t.Run("fork in advanced block", func(t *testing.T) { t.Run("fork in advanced block", func(t *testing.T) {
@ -430,7 +519,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1 // all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String())
// make nodes 1 and 2 advance in forks // make nodes 1 and 2 advance in forks
h1.AddOverride(&ms.MethodTemplate{ h1.AddOverride(&ms.MethodTemplate{
@ -471,7 +560,7 @@ func TestConsensus(t *testing.T) {
bg.Consensus.UpdateBackendGroupConsensus(ctx) bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should resolve to 0x1, the highest common ancestor // should resolve to 0x1, the highest common ancestor
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String()) require.Equal(t, "0x1", bg.Consensus.GetLatestBlockNumber().String())
}) })
t.Run("load balancing should hit both backends", func(t *testing.T) { t.Run("load balancing should hit both backends", func(t *testing.T) {
@ -607,7 +696,7 @@ func TestConsensus(t *testing.T) {
require.Equal(t, totalRequests, len(node1.Requests())+len(node2.Requests())) require.Equal(t, totalRequests, len(node1.Requests())+len(node2.Requests()))
}) })
t.Run("rewrite request of eth_getBlockByNumber", func(t *testing.T) { t.Run("rewrite request of eth_getBlockByNumber for latest", func(t *testing.T) {
h1.ResetOverrides() h1.ResetOverrides()
h2.ResetOverrides() h2.ResetOverrides()
bg.Consensus.Unban() bg.Consensus.Unban()
@ -643,6 +732,88 @@ func TestConsensus(t *testing.T) {
require.Equal(t, "0x2", jsonMap["params"].([]interface{})[0]) require.Equal(t, "0x2", jsonMap["params"].([]interface{})[0])
}) })
t.Run("rewrite request of eth_getBlockByNumber for finalized", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
// establish the consensus and ban node2 for now
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x20", "hash20"),
})
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "finalized",
Response: buildGetBlockResponse("0x5", "hash5"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
node1.Reset()
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"finalized"})
require.NoError(t, err)
require.Equal(t, 200, statusCode)
var jsonMap map[string]interface{}
err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap)
require.NoError(t, err)
require.Equal(t, "0x5", jsonMap["params"].([]interface{})[0])
})
t.Run("rewrite request of eth_getBlockByNumber for safe", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
bg.Consensus.Unban()
// establish the consensus and ban node2 for now
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildGetBlockResponse("0x20", "hash20"),
})
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "safe",
Response: buildGetBlockResponse("0x1", "hash1"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "net_peerCount",
Block: "",
Response: buildPeerCountResponse(1),
})
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, 1, len(bg.Consensus.GetConsensusGroup()))
node1.Reset()
_, statusCode, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"safe"})
require.NoError(t, err)
require.Equal(t, 200, statusCode)
var jsonMap map[string]interface{}
err = json.Unmarshal(node1.Requests()[0].Body, &jsonMap)
require.NoError(t, err)
require.Equal(t, "0x1", jsonMap["params"].([]interface{})[0])
})
t.Run("rewrite request of eth_getBlockByNumber - out of range", func(t *testing.T) { t.Run("rewrite request of eth_getBlockByNumber - out of range", func(t *testing.T) {
h1.ResetOverrides() h1.ResetOverrides()
h2.ResetOverrides() h2.ResetOverrides()

@ -63,3 +63,69 @@
"number": "0x3" "number": "0x3"
} }
} }
- method: eth_getBlockByNumber
block: finalized
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_finalized",
"number": "0x555"
}
}
- method: eth_getBlockByNumber
block: 0x555
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_finalized",
"number": "0x555"
}
}
- method: eth_getBlockByNumber
block: safe
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_safe",
"number": "0x551"
}
}
- method: eth_getBlockByNumber
block: 0x555
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_safe",
"number": "0x551"
}
}
- method: eth_getBlockByNumber
block: 0x5
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash5",
"number": "0x5"
}
}
- method: eth_getBlockByNumber
block: 0x20
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash20",
"number": "0x20"
}
}

@ -10,6 +10,8 @@ import (
type RewriteContext struct { type RewriteContext struct {
latest hexutil.Uint64 latest hexutil.Uint64
finalized hexutil.Uint64
safe hexutil.Uint64
} }
type RewriteResult uint8 type RewriteResult uint8
@ -180,11 +182,13 @@ func rewriteTag(rctx RewriteContext, current string) (string, bool, error) {
} }
switch *bnh.BlockNumber { switch *bnh.BlockNumber {
case rpc.SafeBlockNumber, case rpc.PendingBlockNumber,
rpc.FinalizedBlockNumber,
rpc.PendingBlockNumber,
rpc.EarliestBlockNumber: rpc.EarliestBlockNumber:
return current, false, nil return current, false, nil
case rpc.FinalizedBlockNumber:
return rctx.finalized.String(), true, nil
case rpc.SafeBlockNumber:
return rctx.safe.String(), true, nil
case rpc.LatestBlockNumber: case rpc.LatestBlockNumber:
return rctx.latest.String(), true, nil return rctx.latest.String(), true, nil
default: default:

@ -326,33 +326,33 @@ func TestRewriteRequest(t *testing.T) {
{ {
name: "eth_getBlockByNumber finalized", name: "eth_getBlockByNumber finalized",
args: args{ args: args{
rctx: RewriteContext{latest: hexutil.Uint64(100)}, rctx: RewriteContext{latest: hexutil.Uint64(100), finalized: hexutil.Uint64(55)},
req: &RPCReq{Method: "eth_getBlockByNumber", Params: mustMarshalJSON([]string{"finalized"})}, req: &RPCReq{Method: "eth_getBlockByNumber", Params: mustMarshalJSON([]string{"finalized"})},
res: nil, res: nil,
}, },
expected: RewriteNone, expected: RewriteOverrideRequest,
check: func(t *testing.T, args args) { check: func(t *testing.T, args args) {
var p []string var p []string
err := json.Unmarshal(args.req.Params, &p) err := json.Unmarshal(args.req.Params, &p)
require.Nil(t, err) require.Nil(t, err)
require.Equal(t, 1, len(p)) require.Equal(t, 1, len(p))
require.Equal(t, "finalized", p[0]) require.Equal(t, hexutil.Uint64(55).String(), p[0])
}, },
}, },
{ {
name: "eth_getBlockByNumber safe", name: "eth_getBlockByNumber safe",
args: args{ args: args{
rctx: RewriteContext{latest: hexutil.Uint64(100)}, rctx: RewriteContext{latest: hexutil.Uint64(100), safe: hexutil.Uint64(50)},
req: &RPCReq{Method: "eth_getBlockByNumber", Params: mustMarshalJSON([]string{"safe"})}, req: &RPCReq{Method: "eth_getBlockByNumber", Params: mustMarshalJSON([]string{"safe"})},
res: nil, res: nil,
}, },
expected: RewriteNone, expected: RewriteOverrideRequest,
check: func(t *testing.T, args args) { check: func(t *testing.T, args args) {
var p []string var p []string
err := json.Unmarshal(args.req.Params, &p) err := json.Unmarshal(args.req.Params, &p)
require.Nil(t, err) require.Nil(t, err)
require.Equal(t, 1, len(p)) require.Equal(t, 1, len(p))
require.Equal(t, "safe", p[0]) require.Equal(t, hexutil.Uint64(50).String(), p[0])
}, },
}, },
{ {

@ -95,7 +95,7 @@ func (mh *MockedHandler) Handler(w http.ResponseWriter, req *http.Request) {
resBody := "" resBody := ""
if batched { if batched {
resBody = "[" + strings.Join(responses, ",") + "]" resBody = "[" + strings.Join(responses, ",") + "]"
} else { } else if len(responses) > 0 {
resBody = responses[0] resBody = responses[0]
} }