order params chronologically: latest, safe, finalized
This commit is contained in:
parent
04058779a9
commit
2bf2329dd3
@ -564,8 +564,8 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
|
|||||||
// We also rewrite block tags to enforce compliance with consensus
|
// We also rewrite block tags to enforce compliance with consensus
|
||||||
rctx := RewriteContext{
|
rctx := RewriteContext{
|
||||||
latest: bg.Consensus.GetLatestBlockNumber(),
|
latest: bg.Consensus.GetLatestBlockNumber(),
|
||||||
finalized: bg.Consensus.GetFinalizedBlockNumber(),
|
|
||||||
safe: bg.Consensus.GetSafeBlockNumber(),
|
safe: bg.Consensus.GetSafeBlockNumber(),
|
||||||
|
finalized: bg.Consensus.GetFinalizedBlockNumber(),
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, req := range rpcReqs {
|
for i, req := range rpcReqs {
|
||||||
|
@ -43,11 +43,10 @@ type ConsensusPoller struct {
|
|||||||
type backendState struct {
|
type backendState struct {
|
||||||
backendStateMux sync.Mutex
|
backendStateMux sync.Mutex
|
||||||
|
|
||||||
latestBlockNumber hexutil.Uint64
|
latestBlockNumber hexutil.Uint64
|
||||||
latestBlockHash string
|
latestBlockHash string
|
||||||
|
|
||||||
finalizedBlockNumber hexutil.Uint64
|
|
||||||
safeBlockNumber hexutil.Uint64
|
safeBlockNumber hexutil.Uint64
|
||||||
|
finalizedBlockNumber hexutil.Uint64
|
||||||
|
|
||||||
peerCount uint64
|
peerCount uint64
|
||||||
inSync bool
|
inSync bool
|
||||||
@ -77,16 +76,16 @@ func (ct *ConsensusPoller) GetLatestBlockNumber() hexutil.Uint64 {
|
|||||||
return ct.tracker.GetLatestBlockNumber()
|
return ct.tracker.GetLatestBlockNumber()
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus
|
|
||||||
func (ct *ConsensusPoller) GetFinalizedBlockNumber() hexutil.Uint64 {
|
|
||||||
return ct.tracker.GetFinalizedBlockNumber()
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetSafeBlockNumber returns the `safe` agreed block number in a consensus
|
// GetSafeBlockNumber returns the `safe` agreed block number in a consensus
|
||||||
func (ct *ConsensusPoller) GetSafeBlockNumber() hexutil.Uint64 {
|
func (ct *ConsensusPoller) GetSafeBlockNumber() hexutil.Uint64 {
|
||||||
return ct.tracker.GetSafeBlockNumber()
|
return ct.tracker.GetSafeBlockNumber()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus
|
||||||
|
func (ct *ConsensusPoller) GetFinalizedBlockNumber() hexutil.Uint64 {
|
||||||
|
return ct.tracker.GetFinalizedBlockNumber()
|
||||||
|
}
|
||||||
|
|
||||||
func (cp *ConsensusPoller) Shutdown() {
|
func (cp *ConsensusPoller) Shutdown() {
|
||||||
cp.asyncHandler.Shutdown()
|
cp.asyncHandler.Shutdown()
|
||||||
}
|
}
|
||||||
@ -289,15 +288,11 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
|
|||||||
log.Warn("error updating backend - finalized block", "name", be.Name, "err", err)
|
log.Warn("error updating backend - finalized block", "name", be.Name, "err", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// just for readability
|
|
||||||
oldFinalized := bs.finalizedBlockNumber
|
|
||||||
oldSafe := bs.safeBlockNumber
|
|
||||||
|
|
||||||
RecordConsensusBackendUpdateDelay(be, bs.lastUpdate)
|
RecordConsensusBackendUpdateDelay(be, bs.lastUpdate)
|
||||||
|
|
||||||
changed := cp.setBackendState(be, peerCount, inSync,
|
changed := cp.setBackendState(be, peerCount, inSync,
|
||||||
latestBlockNumber, latestBlockHash,
|
latestBlockNumber, latestBlockHash,
|
||||||
finalizedBlockNumber, safeBlockNumber)
|
safeBlockNumber, finalizedBlockNumber)
|
||||||
|
|
||||||
RecordBackendLatestBlock(be, latestBlockNumber)
|
RecordBackendLatestBlock(be, latestBlockNumber)
|
||||||
RecordBackendSafeBlock(be, safeBlockNumber)
|
RecordBackendSafeBlock(be, safeBlockNumber)
|
||||||
@ -310,25 +305,25 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
|
|||||||
"inSync", inSync,
|
"inSync", inSync,
|
||||||
"latestBlockNumber", latestBlockNumber,
|
"latestBlockNumber", latestBlockNumber,
|
||||||
"latestBlockHash", latestBlockHash,
|
"latestBlockHash", latestBlockHash,
|
||||||
"finalizedBlockNumber", finalizedBlockNumber,
|
|
||||||
"safeBlockNumber", safeBlockNumber,
|
"safeBlockNumber", safeBlockNumber,
|
||||||
|
"finalizedBlockNumber", finalizedBlockNumber,
|
||||||
"lastUpdate", bs.lastUpdate)
|
"lastUpdate", bs.lastUpdate)
|
||||||
}
|
}
|
||||||
|
|
||||||
// sanity check for latest, safe and finalized block tags
|
// sanity check for latest, safe and finalized block tags
|
||||||
expectedBlockTags := cp.checkExpectedBlockTags(
|
expectedBlockTags := cp.checkExpectedBlockTags(
|
||||||
finalizedBlockNumber, oldFinalized,
|
latestBlockNumber,
|
||||||
safeBlockNumber, oldSafe,
|
bs.safeBlockNumber, safeBlockNumber,
|
||||||
latestBlockNumber)
|
bs.finalizedBlockNumber, finalizedBlockNumber)
|
||||||
|
|
||||||
RecordBackendUnexpectedBlockTags(be, !expectedBlockTags)
|
RecordBackendUnexpectedBlockTags(be, !expectedBlockTags)
|
||||||
|
|
||||||
if !expectedBlockTags {
|
if !expectedBlockTags {
|
||||||
log.Warn("backend banned - unexpected block tags",
|
log.Warn("backend banned - unexpected block tags",
|
||||||
"backend", be.Name,
|
"backend", be.Name,
|
||||||
"oldFinalized", oldFinalized,
|
"oldFinalized", bs.finalizedBlockNumber,
|
||||||
"finalizedBlockNumber", finalizedBlockNumber,
|
"finalizedBlockNumber", finalizedBlockNumber,
|
||||||
"oldSafe", oldSafe,
|
"oldSafe", bs.safeBlockNumber,
|
||||||
"safeBlockNumber", safeBlockNumber,
|
"safeBlockNumber", safeBlockNumber,
|
||||||
"latestBlockNumber", latestBlockNumber,
|
"latestBlockNumber", latestBlockNumber,
|
||||||
)
|
)
|
||||||
@ -340,9 +335,10 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
|
|||||||
// - finalized block number should never decrease
|
// - finalized block number should never decrease
|
||||||
// - safe block number should never decrease
|
// - safe block number should never decrease
|
||||||
// - finalized block should be <= safe block <= latest block
|
// - finalized block should be <= safe block <= latest block
|
||||||
func (cp *ConsensusPoller) checkExpectedBlockTags(currentFinalized hexutil.Uint64, oldFinalized hexutil.Uint64,
|
func (cp *ConsensusPoller) checkExpectedBlockTags(
|
||||||
currentSafe hexutil.Uint64, oldSafe hexutil.Uint64,
|
currentLatest hexutil.Uint64,
|
||||||
currentLatest hexutil.Uint64) bool {
|
oldSafe hexutil.Uint64, currentSafe hexutil.Uint64,
|
||||||
|
oldFinalized hexutil.Uint64, currentFinalized hexutil.Uint64) bool {
|
||||||
return currentFinalized >= oldFinalized &&
|
return currentFinalized >= oldFinalized &&
|
||||||
currentSafe >= oldSafe &&
|
currentSafe >= oldSafe &&
|
||||||
currentFinalized <= currentSafe &&
|
currentFinalized <= currentSafe &&
|
||||||
@ -594,8 +590,8 @@ func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState {
|
|||||||
|
|
||||||
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool,
|
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool,
|
||||||
latestBlockNumber hexutil.Uint64, latestBlockHash string,
|
latestBlockNumber hexutil.Uint64, latestBlockHash string,
|
||||||
finalizedBlockNumber hexutil.Uint64,
|
safeBlockNumber hexutil.Uint64,
|
||||||
safeBlockNumber hexutil.Uint64) bool {
|
finalizedBlockNumber hexutil.Uint64) bool {
|
||||||
bs := cp.backendState[be]
|
bs := cp.backendState[be]
|
||||||
bs.backendStateMux.Lock()
|
bs.backendStateMux.Lock()
|
||||||
changed := bs.latestBlockHash != latestBlockHash
|
changed := bs.latestBlockHash != latestBlockHash
|
||||||
|
@ -15,17 +15,17 @@ import (
|
|||||||
type ConsensusTracker interface {
|
type ConsensusTracker interface {
|
||||||
GetLatestBlockNumber() hexutil.Uint64
|
GetLatestBlockNumber() hexutil.Uint64
|
||||||
SetLatestBlockNumber(blockNumber hexutil.Uint64)
|
SetLatestBlockNumber(blockNumber hexutil.Uint64)
|
||||||
GetFinalizedBlockNumber() hexutil.Uint64
|
|
||||||
SetFinalizedBlockNumber(blockNumber hexutil.Uint64)
|
|
||||||
GetSafeBlockNumber() hexutil.Uint64
|
GetSafeBlockNumber() hexutil.Uint64
|
||||||
SetSafeBlockNumber(blockNumber hexutil.Uint64)
|
SetSafeBlockNumber(blockNumber hexutil.Uint64)
|
||||||
|
GetFinalizedBlockNumber() hexutil.Uint64
|
||||||
|
SetFinalizedBlockNumber(blockNumber hexutil.Uint64)
|
||||||
}
|
}
|
||||||
|
|
||||||
// InMemoryConsensusTracker store and retrieve in memory, async-safe
|
// InMemoryConsensusTracker store and retrieve in memory, async-safe
|
||||||
type InMemoryConsensusTracker struct {
|
type InMemoryConsensusTracker struct {
|
||||||
latestBlockNumber hexutil.Uint64
|
latestBlockNumber hexutil.Uint64
|
||||||
finalizedBlockNumber hexutil.Uint64
|
|
||||||
safeBlockNumber hexutil.Uint64
|
safeBlockNumber hexutil.Uint64
|
||||||
|
finalizedBlockNumber hexutil.Uint64
|
||||||
mutex sync.Mutex
|
mutex sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -49,20 +49,6 @@ func (ct *InMemoryConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uin
|
|||||||
ct.latestBlockNumber = blockNumber
|
ct.latestBlockNumber = blockNumber
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ct *InMemoryConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
|
|
||||||
defer ct.mutex.Unlock()
|
|
||||||
ct.mutex.Lock()
|
|
||||||
|
|
||||||
return ct.finalizedBlockNumber
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ct *InMemoryConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
|
|
||||||
defer ct.mutex.Unlock()
|
|
||||||
ct.mutex.Lock()
|
|
||||||
|
|
||||||
ct.finalizedBlockNumber = blockNumber
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ct *InMemoryConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
|
func (ct *InMemoryConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
|
||||||
defer ct.mutex.Unlock()
|
defer ct.mutex.Unlock()
|
||||||
ct.mutex.Lock()
|
ct.mutex.Lock()
|
||||||
@ -77,6 +63,20 @@ func (ct *InMemoryConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint6
|
|||||||
ct.safeBlockNumber = blockNumber
|
ct.safeBlockNumber = blockNumber
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ct *InMemoryConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
|
||||||
|
defer ct.mutex.Unlock()
|
||||||
|
ct.mutex.Lock()
|
||||||
|
|
||||||
|
return ct.finalizedBlockNumber
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *InMemoryConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
|
||||||
|
defer ct.mutex.Unlock()
|
||||||
|
ct.mutex.Lock()
|
||||||
|
|
||||||
|
ct.finalizedBlockNumber = blockNumber
|
||||||
|
}
|
||||||
|
|
||||||
// RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe
|
// RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe
|
||||||
type RedisConsensusTracker struct {
|
type RedisConsensusTracker struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
@ -104,13 +104,6 @@ func (ct *RedisConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64
|
|||||||
ct.client.Set(ct.ctx, ct.key("latest"), blockNumber, 0)
|
ct.client.Set(ct.ctx, ct.key("latest"), blockNumber, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ct *RedisConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
|
|
||||||
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("finalized")).Val()))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ct *RedisConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
|
|
||||||
ct.client.Set(ct.ctx, ct.key("finalized"), blockNumber, 0)
|
|
||||||
}
|
|
||||||
func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
|
func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
|
||||||
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("safe")).Val()))
|
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("safe")).Val()))
|
||||||
}
|
}
|
||||||
@ -118,3 +111,11 @@ func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
|
|||||||
func (ct *RedisConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) {
|
func (ct *RedisConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) {
|
||||||
ct.client.Set(ct.ctx, ct.key("safe"), blockNumber, 0)
|
ct.client.Set(ct.ctx, ct.key("safe"), blockNumber, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ct *RedisConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
|
||||||
|
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("finalized")).Val()))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *RedisConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
|
||||||
|
ct.client.Set(ct.ctx, ct.key("finalized"), blockNumber, 0)
|
||||||
|
}
|
||||||
|
@ -267,8 +267,8 @@ func TestConsensus(t *testing.T) {
|
|||||||
overrideBlock("node2", "safe", "0xe2")
|
overrideBlock("node2", "safe", "0xe2")
|
||||||
update()
|
update()
|
||||||
|
|
||||||
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
|
|
||||||
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
|
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
|
||||||
|
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("advance safe and finalized", func(t *testing.T) {
|
t.Run("advance safe and finalized", func(t *testing.T) {
|
||||||
@ -279,8 +279,8 @@ func TestConsensus(t *testing.T) {
|
|||||||
overrideBlock("node2", "safe", "0xe2")
|
overrideBlock("node2", "safe", "0xe2")
|
||||||
update()
|
update()
|
||||||
|
|
||||||
require.Equal(t, "0xc2", bg.Consensus.GetFinalizedBlockNumber().String())
|
|
||||||
require.Equal(t, "0xe2", bg.Consensus.GetSafeBlockNumber().String())
|
require.Equal(t, "0xe2", bg.Consensus.GetSafeBlockNumber().String())
|
||||||
|
require.Equal(t, "0xc2", bg.Consensus.GetFinalizedBlockNumber().String())
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("ban backend if error rate is too high", func(t *testing.T) {
|
t.Run("ban backend if error rate is too high", func(t *testing.T) {
|
||||||
@ -317,8 +317,8 @@ func TestConsensus(t *testing.T) {
|
|||||||
overrideBlock("node1", "safe", "0xa1")
|
overrideBlock("node1", "safe", "0xa1")
|
||||||
update()
|
update()
|
||||||
|
|
||||||
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
|
|
||||||
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
|
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
|
||||||
|
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
|
||||||
|
|
||||||
consensusGroup := bg.Consensus.GetConsensusGroup()
|
consensusGroup := bg.Consensus.GetConsensusGroup()
|
||||||
require.NotContains(t, consensusGroup, nodes["node1"].backend)
|
require.NotContains(t, consensusGroup, nodes["node1"].backend)
|
||||||
@ -349,8 +349,8 @@ func TestConsensus(t *testing.T) {
|
|||||||
update()
|
update()
|
||||||
|
|
||||||
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
|
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
|
||||||
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
|
|
||||||
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
|
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
|
||||||
|
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
|
||||||
|
|
||||||
consensusGroup := bg.Consensus.GetConsensusGroup()
|
consensusGroup := bg.Consensus.GetConsensusGroup()
|
||||||
require.NotContains(t, consensusGroup, nodes["node1"].backend)
|
require.NotContains(t, consensusGroup, nodes["node1"].backend)
|
||||||
@ -365,8 +365,8 @@ func TestConsensus(t *testing.T) {
|
|||||||
update()
|
update()
|
||||||
|
|
||||||
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
|
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
|
||||||
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
|
|
||||||
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
|
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
|
||||||
|
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
|
||||||
|
|
||||||
consensusGroup := bg.Consensus.GetConsensusGroup()
|
consensusGroup := bg.Consensus.GetConsensusGroup()
|
||||||
require.NotContains(t, consensusGroup, nodes["node1"].backend)
|
require.NotContains(t, consensusGroup, nodes["node1"].backend)
|
||||||
@ -397,8 +397,8 @@ func TestConsensus(t *testing.T) {
|
|||||||
require.Equal(t, 1, len(consensusGroup))
|
require.Equal(t, 1, len(consensusGroup))
|
||||||
|
|
||||||
require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String())
|
require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String())
|
||||||
require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String())
|
|
||||||
require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String())
|
require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String())
|
||||||
|
require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String())
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("latest dropped below safe, then recovered", func(t *testing.T) {
|
t.Run("latest dropped below safe, then recovered", func(t *testing.T) {
|
||||||
@ -424,8 +424,8 @@ func TestConsensus(t *testing.T) {
|
|||||||
require.Equal(t, 1, len(consensusGroup))
|
require.Equal(t, 1, len(consensusGroup))
|
||||||
|
|
||||||
require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String())
|
require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String())
|
||||||
require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String())
|
|
||||||
require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String())
|
require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String())
|
||||||
|
require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String())
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("latest dropped below safe, and stayed inconsistent", func(t *testing.T) {
|
t.Run("latest dropped below safe, and stayed inconsistent", func(t *testing.T) {
|
||||||
|
Loading…
Reference in New Issue
Block a user