cmd/geth, core/rawdb: seamless freezer consistency, friendly removedb

This commit is contained in:
Péter Szilágyi 2019-05-16 14:30:11 +03:00
parent 536b3b416c
commit 9eba3a9fff
No known key found for this signature in database
GPG Key ID: E9AE538CEDF8293D
10 changed files with 122 additions and 313 deletions

@ -18,10 +18,7 @@ package main
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io"
"io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"runtime" "runtime"
@ -171,21 +168,6 @@ Remove blockchain and state databases`,
The arguments are interpreted as block numbers or hashes. The arguments are interpreted as block numbers or hashes.
Use "ethereum dump 0" to dump the genesis block.`, Use "ethereum dump 0" to dump the genesis block.`,
} }
migrateAncientCommand = cli.Command{
Action: utils.MigrateFlags(migrateAncient),
Name: "migrate-ancient",
Usage: "migrate ancient database forcibly",
ArgsUsage: " ",
Flags: []cli.Flag{
utils.DataDirFlag,
utils.AncientFlag,
utils.CacheFlag,
utils.TestnetFlag,
utils.RinkebyFlag,
utils.GoerliFlag,
},
Category: "BLOCKCHAIN COMMANDS",
}
inspectCommand = cli.Command{ inspectCommand = cli.Command{
Action: utils.MigrateFlags(inspect), Action: utils.MigrateFlags(inspect),
Name: "inspect", Name: "inspect",
@ -460,50 +442,62 @@ func copyDb(ctx *cli.Context) error {
func removeDB(ctx *cli.Context) error { func removeDB(ctx *cli.Context) error {
stack, config := makeConfigNode(ctx) stack, config := makeConfigNode(ctx)
for i, name := range []string{"chaindata", "lightchaindata"} { // Remove the full node state database
// Ensure the database exists in the first place path := stack.ResolvePath("chaindata")
logger := log.New("database", name) if common.FileExist(path) {
confirmAndRemoveDB(path, "full node state database")
var ( } else {
dbdirs []string log.Info("Full node state database missing", "path", path)
freezer string
)
dbdir := stack.ResolvePath(name)
if !common.FileExist(dbdir) {
logger.Info("Database doesn't exist, skipping", "path", dbdir)
continue
} }
dbdirs = append(dbdirs, dbdir) // Remove the full node ancient database
if i == 0 { path = config.Eth.DatabaseFreezer
freezer = config.Eth.DatabaseFreezer
switch { switch {
case freezer == "": case path == "":
freezer = filepath.Join(dbdir, "ancient") path = filepath.Join(stack.ResolvePath("chaindata"), "ancient")
case !filepath.IsAbs(freezer): case !filepath.IsAbs(path):
freezer = config.Node.ResolvePath(freezer) path = config.Node.ResolvePath(path)
} }
if common.FileExist(freezer) { if common.FileExist(path) {
dbdirs = append(dbdirs, freezer) confirmAndRemoveDB(path, "full node ancient database")
} else {
log.Info("Full node ancient database missing", "path", path)
} }
// Remove the light node database
path = stack.ResolvePath("lightchaindata")
if common.FileExist(path) {
confirmAndRemoveDB(path, "light node database")
} else {
log.Info("Light node database missing", "path", path)
} }
for i := len(dbdirs) - 1; i >= 0; i-- { return nil
// Confirm removal and execute }
fmt.Println(dbdirs[i])
confirm, err := console.Stdin.PromptConfirm("Remove this database?") // confirmAndRemoveDB prompts the user for a last confirmation and removes the
// folder if accepted.
func confirmAndRemoveDB(database string, kind string) {
confirm, err := console.Stdin.PromptConfirm(fmt.Sprintf("Remove %s (%s)?", kind, database))
switch { switch {
case err != nil: case err != nil:
utils.Fatalf("%v", err) utils.Fatalf("%v", err)
case !confirm: case !confirm:
logger.Warn("Database deletion aborted") log.Info("Database deletion skipped", "path", database)
default: default:
start := time.Now() start := time.Now()
os.RemoveAll(dbdirs[i]) filepath.Walk(database, func(path string, info os.FileInfo, err error) error {
logger.Info("Database successfully deleted", "elapsed", common.PrettyDuration(time.Since(start))) // If we're at the top level folder, recurse into
} if path == database {
}
}
return nil return nil
} }
// Delete all the files, but not subfolders
if !info.IsDir() {
os.Remove(path)
return nil
}
return filepath.SkipDir
})
log.Info("Database successfully deleted", "path", database, "elapsed", common.PrettyDuration(time.Since(start)))
}
}
func dump(ctx *cli.Context) error { func dump(ctx *cli.Context) error {
stack := makeFullNode(ctx) stack := makeFullNode(ctx)
@ -533,47 +527,6 @@ func dump(ctx *cli.Context) error {
return nil return nil
} }
func migrateAncient(ctx *cli.Context) error {
node, config := makeConfigNode(ctx)
defer node.Close()
dbdir := config.Node.ResolvePath("chaindata")
kvdb, err := rawdb.NewLevelDBDatabase(dbdir, 128, 1024, "")
if err != nil {
return err
}
defer kvdb.Close()
freezer := config.Eth.DatabaseFreezer
switch {
case freezer == "":
freezer = filepath.Join(dbdir, "ancient")
case !filepath.IsAbs(freezer):
freezer = config.Node.ResolvePath(freezer)
}
stored := rawdb.ReadAncientPath(kvdb)
if stored != freezer && stored != "" {
confirm, err := console.Stdin.PromptConfirm(fmt.Sprintf("Are you sure to migrate ancient database from %s to %s?", stored, freezer))
switch {
case err != nil:
utils.Fatalf("%v", err)
case !confirm:
log.Warn("Ancient database migration aborted")
default:
if err := rename(stored, freezer); err != nil {
// Renaming a file can fail if the source and destination
// are on different file systems.
if err := moveAncient(stored, freezer); err != nil {
utils.Fatalf("Migrate ancient database failed, %v", err)
}
}
rawdb.WriteAncientPath(kvdb, freezer)
log.Info("Ancient database successfully migrated")
}
}
return nil
}
func inspect(ctx *cli.Context) error { func inspect(ctx *cli.Context) error {
node, _ := makeConfigNode(ctx) node, _ := makeConfigNode(ctx)
defer node.Close() defer node.Close()
@ -589,84 +542,3 @@ func hashish(x string) bool {
_, err := strconv.Atoi(x) _, err := strconv.Atoi(x)
return err != nil return err != nil
} }
// copyFileSynced copies data from source file to destination
// and synces the dest file forcibly.
func copyFileSynced(src string, dest string, info os.FileInfo) error {
srcf, err := os.Open(src)
if err != nil {
return err
}
defer srcf.Close()
destf, err := os.OpenFile(dest, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, info.Mode().Perm())
if err != nil {
return err
}
// The maximum size of ancient file is 2GB, 4MB buffer is suitable here.
buff := make([]byte, 4*1024*1024)
for {
rn, err := srcf.Read(buff)
if err != nil && err != io.EOF {
return err
}
if rn == 0 {
break
}
if wn, err := destf.Write(buff[:rn]); err != nil || wn != rn {
return err
}
}
if err1 := destf.Sync(); err == nil {
err = err1
}
if err1 := destf.Close(); err == nil {
err = err1
}
return err
}
// copyDirSynced recursively copies files under the specified dir
// to dest and synces the dest dir forcibly.
func copyDirSynced(src string, dest string, info os.FileInfo) error {
if err := os.MkdirAll(dest, os.ModePerm); err != nil {
return err
}
defer os.Chmod(dest, info.Mode())
objects, err := ioutil.ReadDir(src)
if err != nil {
return err
}
for _, obj := range objects {
// All files in ancient database should be flatten files.
if !obj.Mode().IsRegular() {
continue
}
subsrc, subdest := filepath.Join(src, obj.Name()), filepath.Join(dest, obj.Name())
if err := copyFileSynced(subsrc, subdest, obj); err != nil {
return err
}
}
return syncDir(dest)
}
// moveAncient migrates ancient database from source to destination.
func moveAncient(src string, dest string) error {
srcinfo, err := os.Stat(src)
if err != nil {
return err
}
if !srcinfo.IsDir() {
return errors.New("ancient directory expected")
}
if destinfo, err := os.Lstat(dest); !os.IsNotExist(err) {
if destinfo.Mode()&os.ModeSymlink != 0 {
return errors.New("symbolic link datadir is not supported")
}
}
if err := copyDirSynced(src, dest, srcinfo); err != nil {
return err
}
return os.RemoveAll(src)
}

@ -204,7 +204,6 @@ func init() {
copydbCommand, copydbCommand,
removedbCommand, removedbCommand,
dumpCommand, dumpCommand,
migrateAncientCommand,
inspectCommand, inspectCommand,
// See accountcmd.go: // See accountcmd.go:
accountCommand, accountCommand,

@ -1,51 +0,0 @@
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license.
//
// +build darwin dragonfly freebsd linux netbsd openbsd solaris
package main
import (
"os"
"syscall"
)
func rename(oldpath, newpath string) error {
return os.Rename(oldpath, newpath)
}
func isErrInvalid(err error) bool {
if err == os.ErrInvalid {
return true
}
// Go < 1.8
if syserr, ok := err.(*os.SyscallError); ok && syserr.Err == syscall.EINVAL {
return true
}
// Go >= 1.8 returns *os.PathError instead
if patherr, ok := err.(*os.PathError); ok && patherr.Err == syscall.EINVAL {
return true
}
return false
}
func syncDir(name string) error {
// As per fsync manpage, Linux seems to expect fsync on directory, however
// some system don't support this, so we will ignore syscall.EINVAL.
//
// From fsync(2):
// Calling fsync() does not necessarily ensure that the entry in the
// directory containing the file has also reached disk. For that an
// explicit fsync() on a file descriptor for the directory is also needed.
f, err := os.Open(name)
if err != nil {
return err
}
defer f.Close()
if err := f.Sync(); err != nil && !isErrInvalid(err) {
return err
}
return nil
}

@ -1,43 +0,0 @@
// Copyright (c) 2013, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license.
package main
import (
"syscall"
"unsafe"
)
var (
modkernel32 = syscall.NewLazyDLL("kernel32.dll")
procMoveFileExW = modkernel32.NewProc("MoveFileExW")
)
const _MOVEFILE_REPLACE_EXISTING = 1
func moveFileEx(from *uint16, to *uint16, flags uint32) error {
r1, _, e1 := syscall.Syscall(procMoveFileExW.Addr(), 3, uintptr(unsafe.Pointer(from)), uintptr(unsafe.Pointer(to)), uintptr(flags))
if r1 == 0 {
if e1 != 0 {
return error(e1)
}
return syscall.EINVAL
}
return nil
}
func rename(oldpath, newpath string) error {
from, err := syscall.UTF16PtrFromString(oldpath)
if err != nil {
return err
}
to, err := syscall.UTF16PtrFromString(newpath)
if err != nil {
return err
}
return moveFileEx(from, to, _MOVEFILE_REPLACE_EXISTING)
}
func syncDir(name string) error { return nil }

@ -142,7 +142,7 @@ func (p *terminalPrompter) PromptPassword(prompt string) (passwd string, err err
// PromptConfirm displays the given prompt to the user and requests a boolean // PromptConfirm displays the given prompt to the user and requests a boolean
// choice to be made, returning that choice. // choice to be made, returning that choice.
func (p *terminalPrompter) PromptConfirm(prompt string) (bool, error) { func (p *terminalPrompter) PromptConfirm(prompt string) (bool, error) {
input, err := p.Prompt(prompt + " [y/N] ") input, err := p.Prompt(prompt + " [y/n] ")
if len(input) > 0 && strings.ToUpper(input[:1]) == "Y" { if len(input) > 0 && strings.ToUpper(input[:1]) == "Y" {
return true, nil return true, nil
} }

@ -1717,10 +1717,7 @@ func TestIncompleteAncientReceiptChainInsertion(t *testing.T) {
} }
// Abort ancient receipt chain insertion deliberately // Abort ancient receipt chain insertion deliberately
ancient.terminateInsert = func(hash common.Hash, number uint64) bool { ancient.terminateInsert = func(hash common.Hash, number uint64) bool {
if number == blocks[len(blocks)/2].NumberU64() { return number == blocks[len(blocks)/2].NumberU64()
return true
}
return false
} }
previousFastBlock := ancient.CurrentFastBlock() previousFastBlock := ancient.CurrentFastBlock()
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err == nil { if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err == nil {

@ -80,20 +80,6 @@ func WriteChainConfig(db ethdb.KeyValueWriter, hash common.Hash, cfg *params.Cha
} }
} }
// ReadAncientPath retrieves ancient database path which is recorded during the
// first node setup or forcibly changed by user.
func ReadAncientPath(db ethdb.KeyValueReader) string {
data, _ := db.Get(ancientKey)
return string(data)
}
// WriteAncientPath writes ancient database path into the key-value database.
func WriteAncientPath(db ethdb.KeyValueWriter, path string) {
if err := db.Put(ancientKey, []byte(path)); err != nil {
log.Crit("Failed to store ancient path", "err", err)
}
}
// ReadPreimage retrieves a single preimage of the provided hash. // ReadPreimage retrieves a single preimage of the provided hash.
func ReadPreimage(db ethdb.KeyValueReader, hash common.Hash) []byte { func ReadPreimage(db ethdb.KeyValueReader, hash common.Hash) []byte {
data, _ := db.Get(preimageKey(hash)) data, _ := db.Get(preimageKey(hash))

@ -18,6 +18,7 @@ package rawdb
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
"os" "os"
"time" "time"
@ -104,10 +105,74 @@ func NewDatabase(db ethdb.KeyValueStore) ethdb.Database {
// value data store with a freezer moving immutable chain segments into cold // value data store with a freezer moving immutable chain segments into cold
// storage. // storage.
func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace string) (ethdb.Database, error) { func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace string) (ethdb.Database, error) {
// Create the idle freezer instance
frdb, err := newFreezer(freezer, namespace) frdb, err := newFreezer(freezer, namespace)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Since the freezer can be stored separately from the user's key-value database,
// there's a fairly high probability that the user requests invalid combinations
// of the freezer and database. Ensure that we don't shoot ourselves in the foot
// by serving up conflicting data, leading to both datastores getting corrupted.
//
// - If both the freezer and key-value store is empty (no genesis), we just
// initialized a new empty freezer, so everything's fine.
// - If the key-value store is empty, but the freezer is not, we need to make
// sure the user's genesis matches the freezer. That will be checked in the
// blockchain, since we don't have the genesis block here (nor should we at
// this point care, the key-value/freezer combo is valid).
// - If neither the key-value store nor the freezer is empty, cross validate
// the genesis hashes to make sure they are compatible. If they are, also
// ensure that there's no gap between the freezer and sunsequently leveldb.
// - If the key-value store is not empty, but the freezer is we might just be
// upgrading to the freezer release, or we might have had a small chain and
// not frozen anything yet. Ensure that no blocks are missing yet from the
// key-value store, since that would mean we already had an old freezer.
// If the genesis hash is empty, we have a new key-value store, so nothing to
// validate in this method. If, however, the genesis hash is not nil, compare
// it to the freezer content.
if kvgenesis, _ := db.Get(headerHashKey(0)); len(kvgenesis) > 0 {
if frozen, _ := frdb.Ancients(); frozen > 0 {
// If the freezer already contains something, ensure that the genesis blocks
// match, otherwise we might mix up freezers across chains and destroy both
// the freezer and the key-value store.
if frgenesis, _ := frdb.Ancient(freezerHashTable, 0); !bytes.Equal(kvgenesis, frgenesis) {
return nil, fmt.Errorf("genesis mismatch: %#x (leveldb) != %#x (ancients)", kvgenesis, frgenesis)
}
// Key-value store and freezer belong to the same network. Ensure that they
// are contiguous, otherwise we might end up with a non-functional freezer.
if kvhash, _ := db.Get(headerHashKey(frozen)); len(kvhash) == 0 {
// Subsequent header after the freezer limit is missing from the database.
// Reject startup is the database has a more recent head.
if *ReadHeaderNumber(db, ReadHeadHeaderHash(db)) > frozen-1 {
return nil, fmt.Errorf("gap (#%d) in the chain between ancients and leveldb", frozen)
}
// Database contains only older data than the freezer, this happens if the
// state was wiped and reinited from an existing freezer.
} else {
// Key-value store continues where the freezer left off, all is fine. We might
// have duplicate blocks (crash after freezer write but before kay-value store
// deletion, but that's fine).
}
} else {
// If the freezer is empty, ensure nothing was moved yet from the key-value
// store, otherwise we'll end up missing data. We check block #1 to decide
// if we froze anything previously or not, but do take care of databases with
// only the genesis block.
if ReadHeadHeaderHash(db) != common.BytesToHash(kvgenesis) {
// Key-value store contains more data than the genesis block, make sure we
// didn't freeze anything yet.
if kvblob, _ := db.Get(headerHashKey(1)); len(kvblob) == 0 {
return nil, errors.New("ancient chain segments already extracted, please set --datadir.ancient to the correct path")
}
// Block #1 is still in the database, we're allowed to init a new feezer
} else {
// The head header is still the genesis, we're allowed to init a new feezer
}
}
}
// Freezer is consistent with the key-value database, permit combining the two
go frdb.freeze(db) go frdb.freeze(db)
return &freezerdb{ return &freezerdb{
@ -151,19 +216,6 @@ func NewLevelDBDatabaseWithFreezer(file string, cache int, handles int, freezer
kvdb.Close() kvdb.Close()
return nil, err return nil, err
} }
// Make sure we always use the same ancient store.
//
// | stored == nil | stored != nil
// ----------------+------------------+----------------------
// freezer == nil | non-freezer mode | ancient store missing
// freezer != nil | initialize | ensure consistency
stored := ReadAncientPath(kvdb)
if stored == "" && freezer != "" {
WriteAncientPath(kvdb, freezer)
} else if stored != freezer {
log.Warn("Ancient path mismatch", "stored", stored, "given", freezer)
log.Crit("Please use a consistent ancient path or migrate it via the command line tool `geth migrate-ancient`")
}
return frdb, nil return frdb, nil
} }
@ -243,7 +295,7 @@ func InspectDatabase(db ethdb.Database) error {
trieSize += size trieSize += size
default: default:
var accounted bool var accounted bool
for _, meta := range [][]byte{databaseVerisionKey, headHeaderKey, headBlockKey, headFastBlockKey, fastTrieProgressKey, ancientKey} { for _, meta := range [][]byte{databaseVerisionKey, headHeaderKey, headBlockKey, headFastBlockKey, fastTrieProgressKey} {
if bytes.Equal(key, meta) { if bytes.Equal(key, meta) {
metadata += size metadata += size
accounted = true accounted = true

@ -259,7 +259,7 @@ func (t *freezerTable) preopen() (err error) {
// The repair might have already opened (some) files // The repair might have already opened (some) files
t.releaseFilesAfter(0, false) t.releaseFilesAfter(0, false)
// Open all except head in RDONLY // Open all except head in RDONLY
for i := uint32(t.tailId); i < t.headId; i++ { for i := t.tailId; i < t.headId; i++ {
if _, err = t.openFile(i, os.O_RDONLY); err != nil { if _, err = t.openFile(i, os.O_RDONLY); err != nil {
return err return err
} }

@ -41,9 +41,6 @@ var (
// fastTrieProgressKey tracks the number of trie entries imported during fast sync. // fastTrieProgressKey tracks the number of trie entries imported during fast sync.
fastTrieProgressKey = []byte("TrieSync") fastTrieProgressKey = []byte("TrieSync")
// ancientKey tracks the absolute path of ancient database.
ancientKey = []byte("AncientPath")
// Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes). // Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes).
headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header
headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td