From fc85777a219acd12620fe9b76a3b7e585800300c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 27 May 2019 15:48:30 +0300 Subject: [PATCH] core: concurrent database reinit from freezer dump * core: reinit chain from freezer in batches * core/rawdb: concurrent database reinit from freezer dump * core/rawdb: reinit from freezer in sequential order --- common/prque/prque.go | 25 ++++++- common/prque/sstack.go | 18 +++-- core/blockchain.go | 43 ++--------- core/rawdb/accessors_indexes.go | 3 +- core/rawdb/freezer_reinit.go | 127 ++++++++++++++++++++++++++++++++ 5 files changed, 171 insertions(+), 45 deletions(-) create mode 100644 core/rawdb/freezer_reinit.go diff --git a/common/prque/prque.go b/common/prque/prque.go index 9fd31a2e5d..3cc5a1adaf 100755 --- a/common/prque/prque.go +++ b/common/prque/prque.go @@ -1,5 +1,20 @@ +// CookieJar - A contestant's algorithm toolbox +// Copyright (c) 2013 Peter Szilagyi. All rights reserved. +// +// CookieJar is dual licensed: use of this source code is governed by a BSD +// license that can be found in the LICENSE file. Alternatively, the CookieJar +// toolbox may be used in accordance with the terms and conditions contained +// in a signed written agreement between you and the author(s). + // This is a duplicated and slightly modified version of "gopkg.in/karalabe/cookiejar.v2/collections/prque". +// Package prque implements a priority queue data structure supporting arbitrary +// value types and int64 priorities. +// +// If you would like to use a min-priority queue, simply negate the priorities. +// +// Internally the queue is based on the standard heap package working on a +// sortable version of the block based stack. package prque import ( @@ -11,8 +26,8 @@ type Prque struct { cont *sstack } -// Creates a new priority queue. -func New(setIndex setIndexCallback) *Prque { +// New creates a new priority queue. +func New(setIndex SetIndexCallback) *Prque { return &Prque{newSstack(setIndex)} } @@ -21,6 +36,12 @@ func (p *Prque) Push(data interface{}, priority int64) { heap.Push(p.cont, &item{data, priority}) } +// Peek returns the value with the greates priority but does not pop it off. +func (p *Prque) Peek() (interface{}, int64) { + item := p.cont.blocks[0][0] + return item.value, item.priority +} + // Pops the value with the greates priority off the stack and returns it. // Currently no shrinking is done. func (p *Prque) Pop() (interface{}, int64) { diff --git a/common/prque/sstack.go b/common/prque/sstack.go index 4875dae99d..8518af54ff 100755 --- a/common/prque/sstack.go +++ b/common/prque/sstack.go @@ -1,3 +1,11 @@ +// CookieJar - A contestant's algorithm toolbox +// Copyright (c) 2013 Peter Szilagyi. All rights reserved. +// +// CookieJar is dual licensed: use of this source code is governed by a BSD +// license that can be found in the LICENSE file. Alternatively, the CookieJar +// toolbox may be used in accordance with the terms and conditions contained +// in a signed written agreement between you and the author(s). + // This is a duplicated and slightly modified version of "gopkg.in/karalabe/cookiejar.v2/collections/prque". package prque @@ -14,16 +22,16 @@ type item struct { priority int64 } -// setIndexCallback is called when the element is moved to a new index. -// Providing setIndexCallback is optional, it is needed only if the application needs +// SetIndexCallback is called when the element is moved to a new index. +// Providing SetIndexCallback is optional, it is needed only if the application needs // to delete elements other than the top one. -type setIndexCallback func(a interface{}, i int) +type SetIndexCallback func(data interface{}, index int) // Internal sortable stack data structure. Implements the Push and Pop ops for // the stack (heap) functionality and the Len, Less and Swap methods for the // sortability requirements of the heaps. type sstack struct { - setIndex setIndexCallback + setIndex SetIndexCallback size int capacity int offset int @@ -33,7 +41,7 @@ type sstack struct { } // Creates a new, empty stack. -func newSstack(setIndex setIndexCallback) *sstack { +func newSstack(setIndex SetIndexCallback) *sstack { result := new(sstack) result.setIndex = setIndex result.active = make([]*item, blockSize) diff --git a/core/blockchain.go b/core/blockchain.go index 60707281c2..7119596928 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -220,47 +220,16 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } // Initialize the chain with ancient data if it isn't empty. if bc.empty() { - if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 { - var ( - start = time.Now() - logged time.Time - ) - for i := uint64(0); i < frozen; i++ { - // Inject hash<->number mapping. - hash := rawdb.ReadCanonicalHash(bc.db, i) - if hash == (common.Hash{}) { - return nil, errors.New("broken ancient database") - } - rawdb.WriteHeaderNumber(bc.db, hash, i) - - // Inject txlookup indexes. - block := rawdb.ReadBlock(bc.db, hash, i) - if block == nil { - return nil, errors.New("broken ancient database") - } - rawdb.WriteTxLookupEntries(bc.db, block) - - // If we've spent too much time already, notify the user of what we're doing - if time.Since(logged) > 8*time.Second { - log.Info("Initializing chain from ancient data", "number", i, "hash", hash, "total", frozen-1, "elapsed", common.PrettyDuration(time.Since(start))) - logged = time.Now() - } - } - hash := rawdb.ReadCanonicalHash(bc.db, frozen-1) - rawdb.WriteHeadHeaderHash(bc.db, hash) - rawdb.WriteHeadFastBlockHash(bc.db, hash) - - // The first thing the node will do is reconstruct the verification data for - // the head block (ethash cache or clique voting snapshot). Might as well do - // it in advance. - bc.engine.VerifyHeader(bc, rawdb.ReadHeader(bc.db, hash, frozen-1), true) - - log.Info("Initialized chain from ancient data", "number", frozen-1, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start))) - } + rawdb.InitDatabaseFromFreezer(bc.db) } if err := bc.loadLastState(); err != nil { return nil, err } + // The first thing the node will do is reconstruct the verification data for + // the head block (ethash cache or clique voting snapshot). Might as well do + // it in advance. + bc.engine.VerifyHeader(bc, bc.CurrentHeader(), true) + if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 { var ( needRewind bool diff --git a/core/rawdb/accessors_indexes.go b/core/rawdb/accessors_indexes.go index ed1f1bca6e..38f8fe10ea 100644 --- a/core/rawdb/accessors_indexes.go +++ b/core/rawdb/accessors_indexes.go @@ -55,8 +55,9 @@ func ReadTxLookupEntry(db ethdb.Reader, hash common.Hash) *uint64 { // WriteTxLookupEntries stores a positional metadata for every transaction from // a block, enabling hash based transaction and receipt lookups. func WriteTxLookupEntries(db ethdb.KeyValueWriter, block *types.Block) { + number := block.Number().Bytes() for _, tx := range block.Transactions() { - if err := db.Put(txLookupKey(tx.Hash()), block.Number().Bytes()); err != nil { + if err := db.Put(txLookupKey(tx.Hash()), number); err != nil { log.Crit("Failed to store transaction lookup entry", "err", err) } } diff --git a/core/rawdb/freezer_reinit.go b/core/rawdb/freezer_reinit.go new file mode 100644 index 0000000000..ea4dd33d1d --- /dev/null +++ b/core/rawdb/freezer_reinit.go @@ -0,0 +1,127 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "errors" + "runtime" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/prque" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +// InitDatabaseFromFreezer reinitializes an empty database from a previous batch +// of frozen ancient blocks. The method iterates over all the frozen blocks and +// injects into the database the block hash->number mappings and the transaction +// lookup entries. +func InitDatabaseFromFreezer(db ethdb.Database) error { + // If we can't access the freezer or it's empty, abort + frozen, err := db.Ancients() + if err != nil || frozen == 0 { + return err + } + // Blocks previously frozen, iterate over- and hash them concurrently + var ( + number = ^uint64(0) // -1 + results = make(chan *types.Block, 4*runtime.NumCPU()) + ) + abort := make(chan struct{}) + defer close(abort) + + for i := 0; i < runtime.NumCPU(); i++ { + go func() { + for { + // Fetch the next task number, terminating if everything's done + n := atomic.AddUint64(&number, 1) + if n >= frozen { + return + } + // Retrieve the block from the freezer (no need for the hash, we pull by + // number from the freezer). If successful, pre-cache the block hash and + // the individual transaction hashes for storing into the database. + block := ReadBlock(db, common.Hash{}, n) + if block != nil { + block.Hash() + for _, tx := range block.Transactions() { + tx.Hash() + } + } + // Feed the block to the aggregator, or abort on interrupt + select { + case results <- block: + case <-abort: + return + } + } + }() + } + // Reassemble the blocks into a contiguous stream and push them out to disk + var ( + queue = prque.New(nil) + next = int64(0) + + batch = db.NewBatch() + start = time.Now() + logged time.Time + ) + for i := uint64(0); i < frozen; i++ { + // Retrieve the next result and bail if it's nil + block := <-results + if block == nil { + return errors.New("broken ancient database") + } + // Push the block into the import queue and process contiguous ranges + queue.Push(block, -int64(block.NumberU64())) + for !queue.Empty() { + // If the next available item is gapped, return + if _, priority := queue.Peek(); -priority != next { + break + } + // Next block available, pop it off and index it + block = queue.PopItem().(*types.Block) + next++ + + // Inject hash<->number mapping and txlookup indexes + WriteHeaderNumber(batch, block.Hash(), block.NumberU64()) + WriteTxLookupEntries(batch, block) + + // If enough data was accumulated in memory or we're at the last block, dump to disk + if batch.ValueSize() > ethdb.IdealBatchSize || uint64(next) == frozen { + if err := batch.Write(); err != nil { + return err + } + batch.Reset() + } + // If we've spent too much time already, notify the user of what we're doing + if time.Since(logged) > 8*time.Second { + log.Info("Initializing chain from ancient data", "number", block.Number(), "hash", block.Hash(), "total", frozen-1, "elapsed", common.PrettyDuration(time.Since(start))) + logged = time.Now() + } + } + } + hash := ReadCanonicalHash(db, frozen-1) + WriteHeadHeaderHash(db, hash) + WriteHeadFastBlockHash(db, hash) + + log.Info("Initialized chain from ancient data", "number", frozen-1, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start))) + return nil +}