eth/catalyst, eth/downloader: expose more sync information (#28584)

This change exposes more information from sync module internally
This commit is contained in:
rjl493456442 2023-11-28 15:38:30 +08:00 committed by GitHub
parent 5b57727d6d
commit 71817f318e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 37 additions and 36 deletions

@ -611,7 +611,8 @@ func (api *ConsensusAPI) delayPayloadImport(block *types.Block) (engine.PayloadS
// Although we don't want to trigger a sync, if there is one already in // Although we don't want to trigger a sync, if there is one already in
// progress, try to extend if with the current payload request to relieve // progress, try to extend if with the current payload request to relieve
// some strain from the forkchoice update. // some strain from the forkchoice update.
if err := api.eth.Downloader().BeaconExtend(api.eth.SyncMode(), block.Header()); err == nil { err := api.eth.Downloader().BeaconExtend(api.eth.SyncMode(), block.Header())
if err == nil {
log.Debug("Payload accepted for sync extension", "number", block.NumberU64(), "hash", block.Hash()) log.Debug("Payload accepted for sync extension", "number", block.NumberU64(), "hash", block.Hash())
return engine.PayloadStatusV1{Status: engine.SYNCING}, nil return engine.PayloadStatusV1{Status: engine.SYNCING}, nil
} }
@ -623,12 +624,12 @@ func (api *ConsensusAPI) delayPayloadImport(block *types.Block) (engine.PayloadS
// In full sync mode, failure to import a well-formed block can only mean // In full sync mode, failure to import a well-formed block can only mean
// that the parent state is missing and the syncer rejected extending the // that the parent state is missing and the syncer rejected extending the
// current cycle with the new payload. // current cycle with the new payload.
log.Warn("Ignoring payload with missing parent", "number", block.NumberU64(), "hash", block.Hash(), "parent", block.ParentHash()) log.Warn("Ignoring payload with missing parent", "number", block.NumberU64(), "hash", block.Hash(), "parent", block.ParentHash(), "reason", err)
} else { } else {
// In non-full sync mode (i.e. snap sync) all payloads are rejected until // In non-full sync mode (i.e. snap sync) all payloads are rejected until
// snap sync terminates as snap sync relies on direct database injections // snap sync terminates as snap sync relies on direct database injections
// and cannot afford concurrent out-if-band modifications via imports. // and cannot afford concurrent out-if-band modifications via imports.
log.Warn("Ignoring payload while snap syncing", "number", block.NumberU64(), "hash", block.Hash()) log.Warn("Ignoring payload while snap syncing", "number", block.NumberU64(), "hash", block.Hash(), "reason", err)
} }
return engine.PayloadStatusV1{Status: engine.SYNCING}, nil return engine.PayloadStatusV1{Status: engine.SYNCING}, nil
} }

@ -69,9 +69,17 @@ var errSyncReorged = errors.New("sync reorged")
// might still be propagating. // might still be propagating.
var errTerminated = errors.New("terminated") var errTerminated = errors.New("terminated")
// errReorgDenied is returned if an attempt is made to extend the beacon chain // errChainReorged is an internal helper error to signal that the header chain
// with a new header, but it does not link up to the existing sync. // of the current sync cycle was (partially) reorged.
var errReorgDenied = errors.New("non-forced head reorg denied") var errChainReorged = errors.New("chain reorged")
// errChainGapped is an internal helper error to signal that the header chain
// of the current sync cycle is gaped with the one advertised by consensus client.
var errChainGapped = errors.New("chain gapped")
// errChainForked is an internal helper error to signal that the header chain
// of the current sync cycle is forked with the one advertised by consensus client.
var errChainForked = errors.New("chain forked")
func init() { func init() {
// Tuning parameters is nice, but the scratch space must be assignable in // Tuning parameters is nice, but the scratch space must be assignable in
@ -271,9 +279,9 @@ func (s *skeleton) startup() {
newhead, err := s.sync(head) newhead, err := s.sync(head)
switch { switch {
case err == errSyncLinked: case err == errSyncLinked:
// Sync cycle linked up to the genesis block. Tear down the loop // Sync cycle linked up to the genesis block, or the existent chain
// and restart it so, it can properly notify the backfiller. Don't // segment. Tear down the loop and restart it so, it can properly
// account a new head. // notify the backfiller. Don't account a new head.
head = nil head = nil
case err == errSyncMerged: case err == errSyncMerged:
@ -457,15 +465,16 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
// we don't seamlessly integrate reorgs to keep things simple. If the // we don't seamlessly integrate reorgs to keep things simple. If the
// network starts doing many mini reorgs, it might be worthwhile handling // network starts doing many mini reorgs, it might be worthwhile handling
// a limited depth without an error. // a limited depth without an error.
if reorged := s.processNewHead(event.header, event.final, event.force); reorged { if err := s.processNewHead(event.header, event.final); err != nil {
// If a reorg is needed, and we're forcing the new head, signal // If a reorg is needed, and we're forcing the new head, signal
// the syncer to tear down and start over. Otherwise, drop the // the syncer to tear down and start over. Otherwise, drop the
// non-force reorg. // non-force reorg.
if event.force { if event.force {
event.errc <- nil // forced head reorg accepted event.errc <- nil // forced head reorg accepted
log.Info("Restarting sync cycle", "reason", err)
return event.header, errSyncReorged return event.header, errSyncReorged
} }
event.errc <- errReorgDenied event.errc <- err
continue continue
} }
event.errc <- nil // head extension accepted event.errc <- nil // head extension accepted
@ -610,7 +619,7 @@ func (s *skeleton) saveSyncStatus(db ethdb.KeyValueWriter) {
// accepts and integrates it into the skeleton or requests a reorg. Upon reorg, // accepts and integrates it into the skeleton or requests a reorg. Upon reorg,
// the syncer will tear itself down and restart with a fresh head. It is simpler // the syncer will tear itself down and restart with a fresh head. It is simpler
// to reconstruct the sync state than to mutate it and hope for the best. // to reconstruct the sync state than to mutate it and hope for the best.
func (s *skeleton) processNewHead(head *types.Header, final *types.Header, force bool) bool { func (s *skeleton) processNewHead(head *types.Header, final *types.Header) error {
// If a new finalized block was announced, update the sync process independent // If a new finalized block was announced, update the sync process independent
// of what happens with the sync head below // of what happens with the sync head below
if final != nil { if final != nil {
@ -631,26 +640,17 @@ func (s *skeleton) processNewHead(head *types.Header, final *types.Header, force
// once more, ignore it instead of tearing down sync for a noop. // once more, ignore it instead of tearing down sync for a noop.
if lastchain.Head == lastchain.Tail { if lastchain.Head == lastchain.Tail {
if current := rawdb.ReadSkeletonHeader(s.db, number); current.Hash() == head.Hash() { if current := rawdb.ReadSkeletonHeader(s.db, number); current.Hash() == head.Hash() {
return false return nil
} }
} }
// Not a noop / double head announce, abort with a reorg // Not a noop / double head announce, abort with a reorg
if force { return fmt.Errorf("%w, tail: %d, head: %d, newHead: %d", errChainReorged, lastchain.Tail, lastchain.Head, number)
log.Warn("Beacon chain reorged", "tail", lastchain.Tail, "head", lastchain.Head, "newHead", number)
}
return true
} }
if lastchain.Head+1 < number { if lastchain.Head+1 < number {
if force { return fmt.Errorf("%w, head: %d, newHead: %d", errChainGapped, lastchain.Head, number)
log.Warn("Beacon chain gapped", "head", lastchain.Head, "newHead", number)
}
return true
} }
if parent := rawdb.ReadSkeletonHeader(s.db, number-1); parent.Hash() != head.ParentHash { if parent := rawdb.ReadSkeletonHeader(s.db, number-1); parent.Hash() != head.ParentHash {
if force { return fmt.Errorf("%w, ancestor: %d, hash: %s, want: %s", errChainForked, number-1, parent.Hash(), head.ParentHash)
log.Warn("Beacon chain forked", "ancestor", number-1, "hash", parent.Hash(), "want", head.ParentHash)
}
return true
} }
// New header seems to be in the last subchain range. Unwind any extra headers // New header seems to be in the last subchain range. Unwind any extra headers
// from the chain tip and insert the new head. We won't delete any trimmed // from the chain tip and insert the new head. We won't delete any trimmed
@ -666,7 +666,7 @@ func (s *skeleton) processNewHead(head *types.Header, final *types.Header, force
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {
log.Crit("Failed to write skeleton sync status", "err", err) log.Crit("Failed to write skeleton sync status", "err", err)
} }
return false return nil
} }
// assignTasks attempts to match idle peers to pending header retrievals. // assignTasks attempts to match idle peers to pending header retrievals.

@ -434,7 +434,7 @@ func TestSkeletonSyncExtend(t *testing.T) {
newstate: []*subchain{ newstate: []*subchain{
{Head: 49, Tail: 49}, {Head: 49, Tail: 49},
}, },
err: errReorgDenied, err: errChainReorged,
}, },
// Initialize a sync and try to extend it with a number-wise sequential // Initialize a sync and try to extend it with a number-wise sequential
// header, but a hash wise non-linking one. // header, but a hash wise non-linking one.
@ -444,7 +444,7 @@ func TestSkeletonSyncExtend(t *testing.T) {
newstate: []*subchain{ newstate: []*subchain{
{Head: 49, Tail: 49}, {Head: 49, Tail: 49},
}, },
err: errReorgDenied, err: errChainForked,
}, },
// Initialize a sync and try to extend it with a non-linking future block. // Initialize a sync and try to extend it with a non-linking future block.
{ {
@ -453,7 +453,7 @@ func TestSkeletonSyncExtend(t *testing.T) {
newstate: []*subchain{ newstate: []*subchain{
{Head: 49, Tail: 49}, {Head: 49, Tail: 49},
}, },
err: errReorgDenied, err: errChainGapped,
}, },
// Initialize a sync and try to extend it with a past canonical block. // Initialize a sync and try to extend it with a past canonical block.
{ {
@ -462,7 +462,7 @@ func TestSkeletonSyncExtend(t *testing.T) {
newstate: []*subchain{ newstate: []*subchain{
{Head: 50, Tail: 50}, {Head: 50, Tail: 50},
}, },
err: errReorgDenied, err: errChainReorged,
}, },
// Initialize a sync and try to extend it with a past sidechain block. // Initialize a sync and try to extend it with a past sidechain block.
{ {
@ -471,7 +471,7 @@ func TestSkeletonSyncExtend(t *testing.T) {
newstate: []*subchain{ newstate: []*subchain{
{Head: 50, Tail: 50}, {Head: 50, Tail: 50},
}, },
err: errReorgDenied, err: errChainReorged,
}, },
} }
for i, tt := range tests { for i, tt := range tests {
@ -487,7 +487,7 @@ func TestSkeletonSyncExtend(t *testing.T) {
skeleton.Sync(tt.head, nil, true) skeleton.Sync(tt.head, nil, true)
<-wait <-wait
if err := skeleton.Sync(tt.extend, nil, false); err != tt.err { if err := skeleton.Sync(tt.extend, nil, false); !errors.Is(err, tt.err) {
t.Errorf("test %d: extension failure mismatch: have %v, want %v", i, err, tt.err) t.Errorf("test %d: extension failure mismatch: have %v, want %v", i, err, tt.err)
} }
skeleton.Terminate() skeleton.Terminate()

@ -31,24 +31,24 @@ import (
var ( var (
accounts = map[common.Address]*types.StateAccount{ accounts = map[common.Address]*types.StateAccount{
common.Address{1}: { {1}: {
Nonce: 100, Nonce: 100,
Balance: big.NewInt(100), Balance: big.NewInt(100),
CodeHash: common.Hash{0x1}.Bytes(), CodeHash: common.Hash{0x1}.Bytes(),
}, },
common.Address{2}: { {2}: {
Nonce: 200, Nonce: 200,
Balance: big.NewInt(200), Balance: big.NewInt(200),
CodeHash: common.Hash{0x2}.Bytes(), CodeHash: common.Hash{0x2}.Bytes(),
}, },
} }
storages = map[common.Address]map[common.Hash][]byte{ storages = map[common.Address]map[common.Hash][]byte{
common.Address{1}: { {1}: {
common.Hash{10}: []byte{10}, common.Hash{10}: []byte{10},
common.Hash{11}: []byte{11}, common.Hash{11}: []byte{11},
common.MaxHash: []byte{0xff}, common.MaxHash: []byte{0xff},
}, },
common.Address{2}: { {2}: {
common.Hash{20}: []byte{20}, common.Hash{20}: []byte{20},
common.Hash{21}: []byte{21}, common.Hash{21}: []byte{21},
common.MaxHash: []byte{0xff}, common.MaxHash: []byte{0xff},