diff --git a/cmd/geth/snapshot.go b/cmd/geth/snapshot.go index 7582b4c4d..a9fc035db 100644 --- a/cmd/geth/snapshot.go +++ b/cmd/geth/snapshot.go @@ -20,7 +20,6 @@ import ( "bytes" "encoding/json" "errors" - "fmt" "os" "time" @@ -32,7 +31,6 @@ import ( "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" @@ -223,15 +221,7 @@ func verifyState(ctx *cli.Context) error { return err } log.Info("Verified the state", "root", root) - if err := checkDanglingDiskStorage(chaindb); err != nil { - log.Error("Dangling snap disk-storage check failed", "root", root, "err", err) - return err - } - if err := checkDanglingMemStorage(chaindb); err != nil { - log.Error("Dangling snap mem-storage check failed", "root", root, "err", err) - return err - } - return nil + return snapshot.CheckDanglingStorage(chaindb) } // checkDanglingStorage iterates the snap storage data, and verifies that all @@ -240,56 +230,7 @@ func checkDanglingStorage(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - chaindb := utils.MakeChainDatabase(ctx, stack, true) - if err := checkDanglingDiskStorage(chaindb); err != nil { - return err - } - return checkDanglingMemStorage(chaindb) - -} - -// checkDanglingDiskStorage checks if there is any 'dangling' storage data in the -// disk-backed snapshot layer. -func checkDanglingDiskStorage(chaindb ethdb.Database) error { - log.Info("Checking dangling snapshot disk storage") - var ( - lastReport = time.Now() - start = time.Now() - lastKey []byte - it = rawdb.NewKeyLengthIterator(chaindb.NewIterator(rawdb.SnapshotStoragePrefix, nil), 1+2*common.HashLength) - ) - defer it.Release() - for it.Next() { - k := it.Key() - accKey := k[1:33] - if bytes.Equal(accKey, lastKey) { - // No need to look up for every slot - continue - } - lastKey = common.CopyBytes(accKey) - if time.Since(lastReport) > time.Second*8 { - log.Info("Iterating snap storage", "at", fmt.Sprintf("%#x", accKey), "elapsed", common.PrettyDuration(time.Since(start))) - lastReport = time.Now() - } - if data := rawdb.ReadAccountSnapshot(chaindb, common.BytesToHash(accKey)); len(data) == 0 { - log.Error("Dangling storage - missing account", "account", fmt.Sprintf("%#x", accKey), "storagekey", fmt.Sprintf("%#x", k)) - return fmt.Errorf("dangling snapshot storage account %#x", accKey) - } - } - log.Info("Verified the snapshot disk storage", "time", common.PrettyDuration(time.Since(start)), "err", it.Error()) - return nil -} - -// checkDanglingMemStorage checks if there is any 'dangling' storage in the journalled -// snapshot difflayers. -func checkDanglingMemStorage(chaindb ethdb.Database) error { - start := time.Now() - log.Info("Checking dangling snapshot difflayer journalled storage") - if err := snapshot.CheckJournalStorage(chaindb); err != nil { - return err - } - log.Info("Verified the snapshot journalled storage", "time", common.PrettyDuration(time.Since(start))) - return nil + return snapshot.CheckDanglingStorage(utils.MakeChainDatabase(ctx, stack, true)) } // traverseState is a helper function used for pruning verification. diff --git a/core/state/snapshot/context.go b/core/state/snapshot/context.go new file mode 100644 index 000000000..67d7e41a0 --- /dev/null +++ b/core/state/snapshot/context.go @@ -0,0 +1,241 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package snapshot + +import ( + "bytes" + "encoding/binary" + "errors" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/math" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/ethdb/memorydb" + "github.com/ethereum/go-ethereum/log" +) + +const ( + snapAccount = "account" // Identifier of account snapshot generation + snapStorage = "storage" // Identifier of storage snapshot generation +) + +// generatorStats is a collection of statistics gathered by the snapshot generator +// for logging purposes. +type generatorStats struct { + origin uint64 // Origin prefix where generation started + start time.Time // Timestamp when generation started + accounts uint64 // Number of accounts indexed(generated or recovered) + slots uint64 // Number of storage slots indexed(generated or recovered) + dangling uint64 // Number of dangling storage slots + storage common.StorageSize // Total account and storage slot size(generation or recovery) +} + +// Log creates an contextual log with the given message and the context pulled +// from the internally maintained statistics. +func (gs *generatorStats) Log(msg string, root common.Hash, marker []byte) { + var ctx []interface{} + if root != (common.Hash{}) { + ctx = append(ctx, []interface{}{"root", root}...) + } + // Figure out whether we're after or within an account + switch len(marker) { + case common.HashLength: + ctx = append(ctx, []interface{}{"at", common.BytesToHash(marker)}...) + case 2 * common.HashLength: + ctx = append(ctx, []interface{}{ + "in", common.BytesToHash(marker[:common.HashLength]), + "at", common.BytesToHash(marker[common.HashLength:]), + }...) + } + // Add the usual measurements + ctx = append(ctx, []interface{}{ + "accounts", gs.accounts, + "slots", gs.slots, + "storage", gs.storage, + "dangling", gs.dangling, + "elapsed", common.PrettyDuration(time.Since(gs.start)), + }...) + // Calculate the estimated indexing time based on current stats + if len(marker) > 0 { + if done := binary.BigEndian.Uint64(marker[:8]) - gs.origin; done > 0 { + left := math.MaxUint64 - binary.BigEndian.Uint64(marker[:8]) + + speed := done/uint64(time.Since(gs.start)/time.Millisecond+1) + 1 // +1s to avoid division by zero + ctx = append(ctx, []interface{}{ + "eta", common.PrettyDuration(time.Duration(left/speed) * time.Millisecond), + }...) + } + } + log.Info(msg, ctx...) +} + +// generatorContext carries a few global values to be shared by all generation functions. +type generatorContext struct { + stats *generatorStats // Generation statistic collection + db ethdb.KeyValueStore // Key-value store containing the snapshot data + account *holdableIterator // Iterator of account snapshot data + storage *holdableIterator // Iterator of storage snapshot data + batch ethdb.Batch // Database batch for writing batch data atomically + logged time.Time // The timestamp when last generation progress was displayed +} + +// newGeneratorContext initializes the context for generation. +func newGeneratorContext(stats *generatorStats, db ethdb.KeyValueStore, accMarker []byte, storageMarker []byte) *generatorContext { + ctx := &generatorContext{ + stats: stats, + db: db, + batch: db.NewBatch(), + logged: time.Now(), + } + ctx.openIterator(snapAccount, accMarker) + ctx.openIterator(snapStorage, storageMarker) + return ctx +} + +// openIterator constructs global account and storage snapshot iterators +// at the interrupted position. These iterators should be reopened from time +// to time to avoid blocking leveldb compaction for a long time. +func (ctx *generatorContext) openIterator(kind string, start []byte) { + if kind == snapAccount { + iter := ctx.db.NewIterator(rawdb.SnapshotAccountPrefix, start) + ctx.account = newHoldableIterator(rawdb.NewKeyLengthIterator(iter, 1+common.HashLength)) + return + } + iter := ctx.db.NewIterator(rawdb.SnapshotStoragePrefix, start) + ctx.storage = newHoldableIterator(rawdb.NewKeyLengthIterator(iter, 1+2*common.HashLength)) +} + +// reopenIterator releases the specified snapshot iterator and re-open it +// in the next position. It's aimed for not blocking leveldb compaction. +func (ctx *generatorContext) reopenIterator(kind string) { + // Shift iterator one more step, so that we can reopen + // the iterator at the right position. + var iter = ctx.account + if kind == snapStorage { + iter = ctx.storage + } + hasNext := iter.Next() + if !hasNext { + // Iterator exhausted, release forever and create an already exhausted virtual iterator + iter.Release() + if kind == snapAccount { + ctx.account = newHoldableIterator(memorydb.New().NewIterator(nil, nil)) + return + } + ctx.storage = newHoldableIterator(memorydb.New().NewIterator(nil, nil)) + return + } + next := iter.Key() + iter.Release() + ctx.openIterator(kind, next[1:]) +} + +// close releases all the held resources. +func (ctx *generatorContext) close() { + ctx.account.Release() + ctx.storage.Release() +} + +// iterator returns the corresponding iterator specified by the kind. +func (ctx *generatorContext) iterator(kind string) *holdableIterator { + if kind == snapAccount { + return ctx.account + } + return ctx.storage +} + +// removeStorageBefore deletes all storage entries which are located before +// the specified account. When the iterator touches the storage entry which +// is located in or outside the given account, it stops and holds the current +// iterated element locally. +func (ctx *generatorContext) removeStorageBefore(account common.Hash) { + var ( + count uint64 + start = time.Now() + iter = ctx.storage + ) + for iter.Next() { + key := iter.Key() + if bytes.Compare(key[1:1+common.HashLength], account.Bytes()) >= 0 { + iter.Hold() + break + } + count++ + ctx.batch.Delete(key) + if ctx.batch.ValueSize() > ethdb.IdealBatchSize { + ctx.batch.Write() + ctx.batch.Reset() + } + } + ctx.stats.dangling += count + snapStorageCleanCounter.Inc(time.Since(start).Nanoseconds()) +} + +// removeStorageAt deletes all storage entries which are located in the specified +// account. When the iterator touches the storage entry which is outside the given +// account, it stops and holds the current iterated element locally. An error will +// be returned if the initial position of iterator is not in the given account. +func (ctx *generatorContext) removeStorageAt(account common.Hash) error { + var ( + count int64 + start = time.Now() + iter = ctx.storage + ) + for iter.Next() { + key := iter.Key() + cmp := bytes.Compare(key[1:1+common.HashLength], account.Bytes()) + if cmp < 0 { + return errors.New("invalid iterator position") + } + if cmp > 0 { + iter.Hold() + break + } + count++ + ctx.batch.Delete(key) + if ctx.batch.ValueSize() > ethdb.IdealBatchSize { + ctx.batch.Write() + ctx.batch.Reset() + } + } + snapWipedStorageMeter.Mark(count) + snapStorageCleanCounter.Inc(time.Since(start).Nanoseconds()) + return nil +} + +// removeStorageLeft deletes all storage entries which are located after +// the current iterator position. +func (ctx *generatorContext) removeStorageLeft() { + var ( + count uint64 + start = time.Now() + iter = ctx.storage + ) + for iter.Next() { + count++ + ctx.batch.Delete(iter.Key()) + if ctx.batch.ValueSize() > ethdb.IdealBatchSize { + ctx.batch.Write() + ctx.batch.Reset() + } + } + ctx.stats.dangling += count + snapDanglingStorageMeter.Mark(int64(count)) + snapStorageCleanCounter.Inc(time.Since(start).Nanoseconds()) +} diff --git a/core/state/snapshot/dangling.go b/core/state/snapshot/dangling.go new file mode 100644 index 000000000..ca73da793 --- /dev/null +++ b/core/state/snapshot/dangling.go @@ -0,0 +1,155 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package snapshot + +import ( + "bytes" + "errors" + "fmt" + "io" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" +) + +// CheckDanglingStorage iterates the snap storage data, and verifies that all +// storage also has corresponding account data. +func CheckDanglingStorage(chaindb ethdb.KeyValueStore) error { + if err := checkDanglingDiskStorage(chaindb); err != nil { + return err + } + return checkDanglingMemStorage(chaindb) +} + +// checkDanglingDiskStorage checks if there is any 'dangling' storage data in the +// disk-backed snapshot layer. +func checkDanglingDiskStorage(chaindb ethdb.KeyValueStore) error { + var ( + lastReport = time.Now() + start = time.Now() + lastKey []byte + it = rawdb.NewKeyLengthIterator(chaindb.NewIterator(rawdb.SnapshotStoragePrefix, nil), 1+2*common.HashLength) + ) + log.Info("Checking dangling snapshot disk storage") + + defer it.Release() + for it.Next() { + k := it.Key() + accKey := k[1:33] + if bytes.Equal(accKey, lastKey) { + // No need to look up for every slot + continue + } + lastKey = common.CopyBytes(accKey) + if time.Since(lastReport) > time.Second*8 { + log.Info("Iterating snap storage", "at", fmt.Sprintf("%#x", accKey), "elapsed", common.PrettyDuration(time.Since(start))) + lastReport = time.Now() + } + if data := rawdb.ReadAccountSnapshot(chaindb, common.BytesToHash(accKey)); len(data) == 0 { + log.Warn("Dangling storage - missing account", "account", fmt.Sprintf("%#x", accKey), "storagekey", fmt.Sprintf("%#x", k)) + return fmt.Errorf("dangling snapshot storage account %#x", accKey) + } + } + log.Info("Verified the snapshot disk storage", "time", common.PrettyDuration(time.Since(start)), "err", it.Error()) + return nil +} + +// checkDanglingMemStorage checks if there is any 'dangling' storage in the journalled +// snapshot difflayers. +func checkDanglingMemStorage(db ethdb.KeyValueStore) error { + var ( + start = time.Now() + journal = rawdb.ReadSnapshotJournal(db) + ) + if len(journal) == 0 { + log.Warn("Loaded snapshot journal", "diffs", "missing") + return nil + } + r := rlp.NewStream(bytes.NewReader(journal), 0) + // Firstly, resolve the first element as the journal version + version, err := r.Uint() + if err != nil { + log.Warn("Failed to resolve the journal version", "error", err) + return nil + } + if version != journalVersion { + log.Warn("Discarded the snapshot journal with wrong version", "required", journalVersion, "got", version) + return nil + } + // Secondly, resolve the disk layer root, ensure it's continuous + // with disk layer. Note now we can ensure it's the snapshot journal + // correct version, so we expect everything can be resolved properly. + var root common.Hash + if err := r.Decode(&root); err != nil { + return errors.New("missing disk layer root") + } + // The diff journal is not matched with disk, discard them. + // It can happen that Geth crashes without persisting the latest + // diff journal. + // Load all the snapshot diffs from the journal + if err := checkDanglingJournalStorage(r); err != nil { + return err + } + log.Info("Verified the snapshot journalled storage", "time", common.PrettyDuration(time.Since(start))) + return nil +} + +// loadDiffLayer reads the next sections of a snapshot journal, reconstructing a new +// diff and verifying that it can be linked to the requested parent. +func checkDanglingJournalStorage(r *rlp.Stream) error { + for { + // Read the next diff journal entry + var root common.Hash + if err := r.Decode(&root); err != nil { + // The first read may fail with EOF, marking the end of the journal + if err == io.EOF { + return nil + } + return fmt.Errorf("load diff root: %v", err) + } + var destructs []journalDestruct + if err := r.Decode(&destructs); err != nil { + return fmt.Errorf("load diff destructs: %v", err) + } + var accounts []journalAccount + if err := r.Decode(&accounts); err != nil { + return fmt.Errorf("load diff accounts: %v", err) + } + accountData := make(map[common.Hash][]byte) + for _, entry := range accounts { + if len(entry.Blob) > 0 { // RLP loses nil-ness, but `[]byte{}` is not a valid item, so reinterpret that + accountData[entry.Hash] = entry.Blob + } else { + accountData[entry.Hash] = nil + } + } + var storage []journalStorage + if err := r.Decode(&storage); err != nil { + return fmt.Errorf("load diff storage: %v", err) + } + for _, entry := range storage { + if _, ok := accountData[entry.Hash]; !ok { + log.Error("Dangling storage - missing account", "account", fmt.Sprintf("%#x", entry.Hash), "root", root) + return fmt.Errorf("dangling journal snapshot storage account %#x", entry.Hash) + } + } + } +} diff --git a/core/state/snapshot/generate.go b/core/state/snapshot/generate.go index 39d30a20c..769989aec 100644 --- a/core/state/snapshot/generate.go +++ b/core/state/snapshot/generate.go @@ -18,7 +18,6 @@ package snapshot import ( "bytes" - "encoding/binary" "errors" "fmt" "math/big" @@ -27,13 +26,11 @@ import ( "github.com/VictoriaMetrics/fastcache" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" ) @@ -47,14 +44,14 @@ var ( // accountCheckRange is the upper limit of the number of accounts involved in // each range check. This is a value estimated based on experience. If this - // value is too large, the failure rate of range prove will increase. Otherwise - // the value is too small, the efficiency of the state recovery will decrease. + // range is too large, the failure rate of range proof will increase. Otherwise, + // if the range is too small, the efficiency of the state recovery will decrease. accountCheckRange = 128 // storageCheckRange is the upper limit of the number of storage slots involved // in each range check. This is a value estimated based on experience. If this - // value is too large, the failure rate of range prove will increase. Otherwise - // the value is too small, the efficiency of the state recovery will decrease. + // range is too large, the failure rate of range proof will increase. Otherwise, + // if the range is too small, the efficiency of the state recovery will decrease. storageCheckRange = 1024 // errMissingTrie is returned if the target trie is missing while the generation @@ -62,85 +59,6 @@ var ( errMissingTrie = errors.New("missing trie") ) -// Metrics in generation -var ( - snapGeneratedAccountMeter = metrics.NewRegisteredMeter("state/snapshot/generation/account/generated", nil) - snapRecoveredAccountMeter = metrics.NewRegisteredMeter("state/snapshot/generation/account/recovered", nil) - snapWipedAccountMeter = metrics.NewRegisteredMeter("state/snapshot/generation/account/wiped", nil) - snapMissallAccountMeter = metrics.NewRegisteredMeter("state/snapshot/generation/account/missall", nil) - snapGeneratedStorageMeter = metrics.NewRegisteredMeter("state/snapshot/generation/storage/generated", nil) - snapRecoveredStorageMeter = metrics.NewRegisteredMeter("state/snapshot/generation/storage/recovered", nil) - snapWipedStorageMeter = metrics.NewRegisteredMeter("state/snapshot/generation/storage/wiped", nil) - snapMissallStorageMeter = metrics.NewRegisteredMeter("state/snapshot/generation/storage/missall", nil) - snapSuccessfulRangeProofMeter = metrics.NewRegisteredMeter("state/snapshot/generation/proof/success", nil) - snapFailedRangeProofMeter = metrics.NewRegisteredMeter("state/snapshot/generation/proof/failure", nil) - - // snapAccountProveCounter measures time spent on the account proving - snapAccountProveCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/account/prove", nil) - // snapAccountTrieReadCounter measures time spent on the account trie iteration - snapAccountTrieReadCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/account/trieread", nil) - // snapAccountSnapReadCounter measues time spent on the snapshot account iteration - snapAccountSnapReadCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/account/snapread", nil) - // snapAccountWriteCounter measures time spent on writing/updating/deleting accounts - snapAccountWriteCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/account/write", nil) - // snapStorageProveCounter measures time spent on storage proving - snapStorageProveCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/prove", nil) - // snapStorageTrieReadCounter measures time spent on the storage trie iteration - snapStorageTrieReadCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/trieread", nil) - // snapStorageSnapReadCounter measures time spent on the snapshot storage iteration - snapStorageSnapReadCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/snapread", nil) - // snapStorageWriteCounter measures time spent on writing/updating/deleting storages - snapStorageWriteCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/write", nil) -) - -// generatorStats is a collection of statistics gathered by the snapshot generator -// for logging purposes. -type generatorStats struct { - origin uint64 // Origin prefix where generation started - start time.Time // Timestamp when generation started - accounts uint64 // Number of accounts indexed(generated or recovered) - slots uint64 // Number of storage slots indexed(generated or recovered) - storage common.StorageSize // Total account and storage slot size(generation or recovery) -} - -// Log creates an contextual log with the given message and the context pulled -// from the internally maintained statistics. -func (gs *generatorStats) Log(msg string, root common.Hash, marker []byte) { - var ctx []interface{} - if root != (common.Hash{}) { - ctx = append(ctx, []interface{}{"root", root}...) - } - // Figure out whether we're after or within an account - switch len(marker) { - case common.HashLength: - ctx = append(ctx, []interface{}{"at", common.BytesToHash(marker)}...) - case 2 * common.HashLength: - ctx = append(ctx, []interface{}{ - "in", common.BytesToHash(marker[:common.HashLength]), - "at", common.BytesToHash(marker[common.HashLength:]), - }...) - } - // Add the usual measurements - ctx = append(ctx, []interface{}{ - "accounts", gs.accounts, - "slots", gs.slots, - "storage", gs.storage, - "elapsed", common.PrettyDuration(time.Since(gs.start)), - }...) - // Calculate the estimated indexing time based on current stats - if len(marker) > 0 { - if done := binary.BigEndian.Uint64(marker[:8]) - gs.origin; done > 0 { - left := math.MaxUint64 - binary.BigEndian.Uint64(marker[:8]) - - speed := done/uint64(time.Since(gs.start)/time.Millisecond+1) + 1 // +1s to avoid division by zero - ctx = append(ctx, []interface{}{ - "eta", common.PrettyDuration(time.Duration(left/speed) * time.Millisecond), - }...) - } - } - log.Info(msg, ctx...) -} - // generateSnapshot regenerates a brand new snapshot based on an existing state // database and head block asynchronously. The snapshot is returned immediately // and generation is continued in the background until done. @@ -248,25 +166,35 @@ func (result *proofResult) forEach(callback func(key []byte, val []byte) error) // // The proof result will be returned if the range proving is finished, otherwise // the error will be returned to abort the entire procedure. -func (dl *diskLayer) proveRange(stats *generatorStats, root common.Hash, prefix []byte, kind string, origin []byte, max int, valueConvertFn func([]byte) ([]byte, error)) (*proofResult, error) { +func (dl *diskLayer) proveRange(ctx *generatorContext, root common.Hash, prefix []byte, kind string, origin []byte, max int, valueConvertFn func([]byte) ([]byte, error)) (*proofResult, error) { var ( keys [][]byte vals [][]byte proof = rawdb.NewMemoryDatabase() diskMore = false + iter = ctx.iterator(kind) + start = time.Now() + min = append(prefix, origin...) ) - iter := dl.diskdb.NewIterator(prefix, origin) - defer iter.Release() - - var start = time.Now() for iter.Next() { + // Ensure the iterated item is always equal or larger than the given origin. key := iter.Key() - if len(key) != len(prefix)+common.HashLength { - continue + if bytes.Compare(key, min) < 0 { + return nil, errors.New("invalid iteration position") } + // Ensure the iterated item still fall in the specified prefix. If + // not which means the items in the specified area are all visited. + // Move the iterator a step back since we iterate one extra element + // out. + if !bytes.Equal(key[:len(prefix)], prefix) { + iter.Hold() + break + } + // Break if we've reached the max size, and signal that we're not + // done yet. Move the iterator a step back since we iterate one + // extra element out. if len(keys) == max { - // Break if we've reached the max size, and signal that we're not - // done yet. + iter.Hold() diskMore = true break } @@ -282,7 +210,7 @@ func (dl *diskLayer) proveRange(stats *generatorStats, root common.Hash, prefix // generation to heal the invalid data. // // Here append the original value to ensure that the number of key and - // value are the same. + // value are aligned. vals = append(vals, common.CopyBytes(iter.Value())) log.Error("Failed to convert account state data", "err", err) } else { @@ -291,13 +219,13 @@ func (dl *diskLayer) proveRange(stats *generatorStats, root common.Hash, prefix } } // Update metrics for database iteration and merkle proving - if kind == "storage" { + if kind == snapStorage { snapStorageSnapReadCounter.Inc(time.Since(start).Nanoseconds()) } else { snapAccountSnapReadCounter.Inc(time.Since(start).Nanoseconds()) } defer func(start time.Time) { - if kind == "storage" { + if kind == snapStorage { snapStorageProveCounter.Inc(time.Since(start).Nanoseconds()) } else { snapAccountProveCounter.Inc(time.Since(start).Nanoseconds()) @@ -322,7 +250,7 @@ func (dl *diskLayer) proveRange(stats *generatorStats, root common.Hash, prefix // Snap state is chunked, generate edge proofs for verification. tr, err := trie.New(root, dl.triedb) if err != nil { - stats.Log("Trie missing, state snapshotting paused", dl.root, dl.genMarker) + ctx.stats.Log("Trie missing, state snapshotting paused", dl.root, dl.genMarker) return nil, errMissingTrie } // Firstly find out the key of last iterated element. @@ -371,19 +299,23 @@ func (dl *diskLayer) proveRange(stats *generatorStats, root common.Hash, prefix // onStateCallback is a function that is called by generateRange, when processing a range of // accounts or storage slots. For each element, the callback is invoked. -// If 'delete' is true, then this element (and potential slots) needs to be deleted from the snapshot. -// If 'write' is true, then this element needs to be updated with the 'val'. -// If 'write' is false, then this element is already correct, and needs no update. However, -// for accounts, the storage trie of the account needs to be checked. +// +// - If 'delete' is true, then this element (and potential slots) needs to be deleted from the snapshot. +// - If 'write' is true, then this element needs to be updated with the 'val'. +// - If 'write' is false, then this element is already correct, and needs no update. // The 'val' is the canonical encoding of the value (not the slim format for accounts) +// +// However, for accounts, the storage trie of the account needs to be checked. Also, +// dangling storages(storage exists but the corresponding account is missing) need to +// be cleaned up. type onStateCallback func(key []byte, val []byte, write bool, delete bool) error // generateRange generates the state segment with particular prefix. Generation can // either verify the correctness of existing state through range-proof and skip // generation, or iterate trie to regenerate state on demand. -func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string, origin []byte, max int, stats *generatorStats, onState onStateCallback, valueConvertFn func([]byte) ([]byte, error)) (bool, []byte, error) { +func (dl *diskLayer) generateRange(ctx *generatorContext, root common.Hash, prefix []byte, kind string, origin []byte, max int, onState onStateCallback, valueConvertFn func([]byte) ([]byte, error)) (bool, []byte, error) { // Use range prover to check the validity of the flat state in the range - result, err := dl.proveRange(stats, root, prefix, kind, origin, max, valueConvertFn) + result, err := dl.proveRange(ctx, root, prefix, kind, origin, max, valueConvertFn) if err != nil { return false, nil, err } @@ -414,18 +346,17 @@ func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string, snapFailedRangeProofMeter.Mark(1) // Special case, the entire trie is missing. In the original trie scheme, - // all the duplicated subtries will be filter out(only one copy of data + // all the duplicated subtries will be filtered out (only one copy of data // will be stored). While in the snapshot model, all the storage tries // belong to different contracts will be kept even they are duplicated. // Track it to a certain extent remove the noise data used for statistics. if origin == nil && last == nil { meter := snapMissallAccountMeter - if kind == "storage" { + if kind == snapStorage { meter = snapMissallStorageMeter } meter.Mark(1) } - // We use the snap data to build up a cache which can be used by the // main account trie as a primary lookup when resolving hashes var snapNodeCache ethdb.KeyValueStore @@ -439,15 +370,16 @@ func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string, root, _, _ := snapTrie.Commit(nil) snapTrieDb.Commit(root, false, nil) } + // Construct the trie for state iteration, reuse the trie + // if it's already opened with some nodes resolved. tr := result.tr if tr == nil { tr, err = trie.New(root, dl.triedb) if err != nil { - stats.Log("Trie missing, state snapshotting paused", dl.root, dl.genMarker) + ctx.stats.Log("Trie missing, state snapshotting paused", dl.root, dl.genMarker) return false, nil, errMissingTrie } } - var ( trieMore bool nodeIt = tr.NodeIterator(origin) @@ -466,6 +398,7 @@ func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string, internal time.Duration ) nodeIt.AddResolver(snapNodeCache) + for iter.Next() { if last != nil && bytes.Compare(iter.Key, last) > 0 { trieMore = true @@ -519,7 +452,7 @@ func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string, internal += time.Since(istart) // Update metrics for counting trie iteration - if kind == "storage" { + if kind == snapStorage { snapStorageTrieReadCounter.Inc((time.Since(start) - internal).Nanoseconds()) } else { snapAccountTrieReadCounter.Inc((time.Since(start) - internal).Nanoseconds()) @@ -534,66 +467,69 @@ func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string, // checkAndFlush checks if an interruption signal is received or the // batch size has exceeded the allowance. -func (dl *diskLayer) checkAndFlush(current []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error { +func (dl *diskLayer) checkAndFlush(ctx *generatorContext, current []byte) error { var abort chan *generatorStats select { case abort = <-dl.genAbort: default: } - if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil { + if ctx.batch.ValueSize() > ethdb.IdealBatchSize || abort != nil { if bytes.Compare(current, dl.genMarker) < 0 { log.Error("Snapshot generator went backwards", "current", fmt.Sprintf("%x", current), "genMarker", fmt.Sprintf("%x", dl.genMarker)) } // Flush out the batch anyway no matter it's empty or not. // It's possible that all the states are recovered and the // generation indeed makes progress. - journalProgress(batch, current, stats) + journalProgress(ctx.batch, current, ctx.stats) - if err := batch.Write(); err != nil { + if err := ctx.batch.Write(); err != nil { return err } - batch.Reset() + ctx.batch.Reset() dl.lock.Lock() dl.genMarker = current dl.lock.Unlock() if abort != nil { - stats.Log("Aborting state snapshot generation", dl.root, current) + ctx.stats.Log("Aborting state snapshot generation", dl.root, current) return newAbortErr(abort) // bubble up an error for interruption } + // Don't hold the iterators too long, release them to let compactor works + ctx.reopenIterator(snapAccount) + ctx.reopenIterator(snapStorage) } - if time.Since(*logged) > 8*time.Second { - stats.Log("Generating state snapshot", dl.root, current) - *logged = time.Now() + if time.Since(ctx.logged) > 8*time.Second { + ctx.stats.Log("Generating state snapshot", dl.root, current) + ctx.logged = time.Now() } return nil } // generateStorages generates the missing storage slots of the specific contract. // It's supposed to restart the generation from the given origin position. -func generateStorages(dl *diskLayer, account common.Hash, storageRoot common.Hash, storeMarker []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error { +func generateStorages(ctx *generatorContext, dl *diskLayer, account common.Hash, storageRoot common.Hash, storeMarker []byte) error { onStorage := func(key []byte, val []byte, write bool, delete bool) error { defer func(start time.Time) { snapStorageWriteCounter.Inc(time.Since(start).Nanoseconds()) }(time.Now()) if delete { - rawdb.DeleteStorageSnapshot(batch, account, common.BytesToHash(key)) + rawdb.DeleteStorageSnapshot(ctx.batch, account, common.BytesToHash(key)) snapWipedStorageMeter.Mark(1) return nil } if write { - rawdb.WriteStorageSnapshot(batch, account, common.BytesToHash(key), val) + rawdb.WriteStorageSnapshot(ctx.batch, account, common.BytesToHash(key), val) snapGeneratedStorageMeter.Mark(1) } else { snapRecoveredStorageMeter.Mark(1) } - stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val)) - stats.slots++ + ctx.stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val)) + ctx.stats.slots++ // If we've exceeded our batch allowance or termination was requested, flush to disk - if err := dl.checkAndFlush(append(account[:], key...), batch, stats, logged); err != nil { + if err := dl.checkAndFlush(ctx, append(account[:], key...)); err != nil { return err } return nil @@ -601,7 +537,7 @@ func generateStorages(dl *diskLayer, account common.Hash, storageRoot common.Has // Loop for re-generating the missing storage slots. var origin = common.CopyBytes(storeMarker) for { - exhausted, last, err := dl.generateRange(storageRoot, append(rawdb.SnapshotStoragePrefix, account.Bytes()...), "storage", origin, storageCheckRange, stats, onStorage, nil) + exhausted, last, err := dl.generateRange(ctx, storageRoot, append(rawdb.SnapshotStoragePrefix, account.Bytes()...), snapStorage, origin, storageCheckRange, onStorage, nil) if err != nil { return err // The procedure it aborted, either by external signal or internal error. } @@ -619,23 +555,19 @@ func generateStorages(dl *diskLayer, account common.Hash, storageRoot common.Has // generateAccounts generates the missing snapshot accounts as well as their // storage slots in the main trie. It's supposed to restart the generation // from the given origin position. -func generateAccounts(dl *diskLayer, accMarker []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error { +func generateAccounts(ctx *generatorContext, dl *diskLayer, accMarker []byte) error { onAccount := func(key []byte, val []byte, write bool, delete bool) error { - var ( - start = time.Now() - accountHash = common.BytesToHash(key) - ) - if delete { - rawdb.DeleteAccountSnapshot(batch, accountHash) - snapWipedAccountMeter.Mark(1) + // Make sure to clear all dangling storages before this account + account := common.BytesToHash(key) + ctx.removeStorageBefore(account) - // Ensure that any previous snapshot storage values are cleared - prefix := append(rawdb.SnapshotStoragePrefix, accountHash.Bytes()...) - keyLen := len(rawdb.SnapshotStoragePrefix) + 2*common.HashLength - if err := wipeKeyRange(dl.diskdb, "storage", prefix, nil, nil, keyLen, snapWipedStorageMeter, false); err != nil { - return err - } + start := time.Now() + if delete { + rawdb.DeleteAccountSnapshot(ctx.batch, account) + snapWipedAccountMeter.Mark(1) snapAccountWriteCounter.Inc(time.Since(start).Nanoseconds()) + + ctx.removeStorageAt(account) return nil } // Retrieve the current account and flatten it into the internal format @@ -649,7 +581,7 @@ func generateAccounts(dl *diskLayer, accMarker []byte, batch ethdb.Batch, stats log.Crit("Invalid account encountered during snapshot creation", "err", err) } // If the account is not yet in-progress, write it out - if accMarker == nil || !bytes.Equal(accountHash[:], accMarker) { + if accMarker == nil || !bytes.Equal(account[:], accMarker) { dataLen := len(val) // Approximate size, saves us a round of RLP-encoding if !write { if bytes.Equal(acc.CodeHash, emptyCode[:]) { @@ -662,44 +594,34 @@ func generateAccounts(dl *diskLayer, accMarker []byte, batch ethdb.Batch, stats } else { data := SlimAccountRLP(acc.Nonce, acc.Balance, acc.Root, acc.CodeHash) dataLen = len(data) - rawdb.WriteAccountSnapshot(batch, accountHash, data) + rawdb.WriteAccountSnapshot(ctx.batch, account, data) snapGeneratedAccountMeter.Mark(1) } - stats.storage += common.StorageSize(1 + common.HashLength + dataLen) - stats.accounts++ + ctx.stats.storage += common.StorageSize(1 + common.HashLength + dataLen) + ctx.stats.accounts++ } - marker := accountHash[:] // If the snap generation goes here after interrupted, genMarker may go backward // when last genMarker is consisted of accountHash and storageHash + marker := account[:] if accMarker != nil && bytes.Equal(marker, accMarker) && len(dl.genMarker) > common.HashLength { marker = dl.genMarker[:] } // If we've exceeded our batch allowance or termination was requested, flush to disk - if err := dl.checkAndFlush(marker, batch, stats, logged); err != nil { + if err := dl.checkAndFlush(ctx, marker); err != nil { return err } + snapAccountWriteCounter.Inc(time.Since(start).Nanoseconds()) // let's count flush time as well + // If the iterated account is the contract, create a further loop to // verify or regenerate the contract storage. if acc.Root == emptyRoot { - // If the root is empty, we still need to ensure that any previous snapshot - // storage values are cleared - // TODO: investigate if this can be avoided, this will be very costly since it - // affects every single EOA account - // - Perhaps we can avoid if where codeHash is emptyCode - prefix := append(rawdb.SnapshotStoragePrefix, accountHash.Bytes()...) - keyLen := len(rawdb.SnapshotStoragePrefix) + 2*common.HashLength - if err := wipeKeyRange(dl.diskdb, "storage", prefix, nil, nil, keyLen, snapWipedStorageMeter, false); err != nil { - return err - } - snapAccountWriteCounter.Inc(time.Since(start).Nanoseconds()) + ctx.removeStorageAt(account) } else { - snapAccountWriteCounter.Inc(time.Since(start).Nanoseconds()) - var storeMarker []byte - if accMarker != nil && bytes.Equal(accountHash[:], accMarker) && len(dl.genMarker) > common.HashLength { + if accMarker != nil && bytes.Equal(account[:], accMarker) && len(dl.genMarker) > common.HashLength { storeMarker = dl.genMarker[common.HashLength:] } - if err := generateStorages(dl, accountHash, acc.Root, storeMarker, batch, stats, logged); err != nil { + if err := generateStorages(ctx, dl, account, acc.Root, storeMarker); err != nil { return err } } @@ -707,25 +629,26 @@ func generateAccounts(dl *diskLayer, accMarker []byte, batch ethdb.Batch, stats accMarker = nil return nil } - // Always reset the initial account range as 1 whenever recover from the interruption. + // Always reset the initial account range as 1 whenever recover from the + // interruption. TODO(rjl493456442) can we remove it? var accountRange = accountCheckRange if len(accMarker) > 0 { accountRange = 1 } - // Global loop for re-generating the account snapshots + all layered storage snapshots. origin := common.CopyBytes(accMarker) for { - exhausted, last, err := dl.generateRange(dl.root, rawdb.SnapshotAccountPrefix, "account", origin, accountRange, stats, onAccount, FullAccountRLP) + exhausted, last, err := dl.generateRange(ctx, dl.root, rawdb.SnapshotAccountPrefix, snapAccount, origin, accountRange, onAccount, FullAccountRLP) if err != nil { return err // The procedure it aborted, either by external signal or internal error. } - // Abort the procedure if the entire snapshot is generated - if exhausted { + origin = increaseKey(last) + + // Last step, cleanup the storages after the last account. + // All the left storages should be treated as dangling. + if origin == nil || exhausted { + ctx.removeStorageLeft() break } - if origin = increaseKey(last); origin == nil { - break // special case, the last is 0xffffffff...fff - } accountRange = accountCheckRange } return nil @@ -736,19 +659,27 @@ func generateAccounts(dl *diskLayer, accMarker []byte, batch ethdb.Batch, stats // gathering and logging, since the method surfs the blocks as they arrive, often // being restarted. func (dl *diskLayer) generate(stats *generatorStats) { - var accMarker []byte + var ( + accMarker []byte + abort chan *generatorStats + ) if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that accMarker = dl.genMarker[:common.HashLength] } - var ( - batch = dl.diskdb.NewBatch() - logged = time.Now() - abort chan *generatorStats - ) stats.Log("Resuming state snapshot generation", dl.root, dl.genMarker) - // Generate the snapshot accounts from the point where they left off. - if err := generateAccounts(dl, accMarker, batch, stats, &logged); err != nil { + // Initialize the global generator context. The snapshot iterators are + // opened at the interrupted position because the assumption is held + // that all the snapshot data are generated correctly before the marker. + // Even if the snapshot data is updated during the interruption (before + // or at the marker), the assumption is still held. + // For the account or storage slot at the interruption, they will be + // processed twice by the generator(they are already processed in the + // last run) but it's fine. + ctx := newGeneratorContext(stats, dl.diskdb, accMarker, dl.genMarker) + defer ctx.close() + + if err := generateAccounts(ctx, dl, accMarker); err != nil { // Extract the received interruption signal if exists if aerr, ok := err.(*abortErr); ok { abort = aerr.abort @@ -763,18 +694,18 @@ func (dl *diskLayer) generate(stats *generatorStats) { // Snapshot fully generated, set the marker to nil. // Note even there is nothing to commit, persist the // generator anyway to mark the snapshot is complete. - journalProgress(batch, nil, stats) - if err := batch.Write(); err != nil { + journalProgress(ctx.batch, nil, stats) + if err := ctx.batch.Write(); err != nil { log.Error("Failed to flush batch", "err", err) abort = <-dl.genAbort abort <- stats return } - batch.Reset() + ctx.batch.Reset() log.Info("Generated state snapshot", "accounts", stats.accounts, "slots", stats.slots, - "storage", stats.storage, "elapsed", common.PrettyDuration(time.Since(stats.start))) + "storage", stats.storage, "dangling", stats.dangling, "elapsed", common.PrettyDuration(time.Since(stats.start))) dl.lock.Lock() dl.genMarker = nil diff --git a/core/state/snapshot/generate_test.go b/core/state/snapshot/generate_test.go index 582da6a2e..7e1d2b96f 100644 --- a/core/state/snapshot/generate_test.go +++ b/core/state/snapshot/generate_test.go @@ -148,8 +148,10 @@ func TestGenerateExistentState(t *testing.T) { func checkSnapRoot(t *testing.T, snap *diskLayer, trieRoot common.Hash) { t.Helper() + accIt := snap.AccountIterator(common.Hash{}) defer accIt.Release() + snapRoot, err := generateTrieRoot(nil, accIt, common.Hash{}, stackTrieGenerate, func(db ethdb.KeyValueWriter, accountHash, codeHash common.Hash, stat *generateStats) (common.Hash, error) { storageIt, _ := snap.StorageIterator(accountHash, common.Hash{}) @@ -168,6 +170,9 @@ func checkSnapRoot(t *testing.T, snap *diskLayer, trieRoot common.Hash) { if snapRoot != trieRoot { t.Fatalf("snaproot: %#x != trieroot #%x", snapRoot, trieRoot) } + if err := CheckDanglingStorage(snap.diskdb); err != nil { + t.Fatalf("Detected dangling storages %v", err) + } } type testHelper struct { @@ -831,3 +836,122 @@ func TestGenerateWithIncompleteStorage(t *testing.T) { snap.genAbort <- stop <-stop } + +func incKey(key []byte) []byte { + for i := len(key) - 1; i >= 0; i-- { + key[i]++ + if key[i] != 0x0 { + break + } + } + return key +} + +func decKey(key []byte) []byte { + for i := len(key) - 1; i >= 0; i-- { + key[i]-- + if key[i] != 0xff { + break + } + } + return key +} + +func populateDangling(disk ethdb.KeyValueStore) { + populate := func(accountHash common.Hash, keys []string, vals []string) { + for i, key := range keys { + rawdb.WriteStorageSnapshot(disk, accountHash, hashData([]byte(key)), []byte(vals[i])) + } + } + // Dangling storages of the "first" account + populate(common.Hash{}, []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + + // Dangling storages of the "last" account + populate(common.HexToHash("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + + // Dangling storages around the account 1 + hash := decKey(hashData([]byte("acc-1")).Bytes()) + populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + hash = incKey(hashData([]byte("acc-1")).Bytes()) + populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + + // Dangling storages around the account 2 + hash = decKey(hashData([]byte("acc-2")).Bytes()) + populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + hash = incKey(hashData([]byte("acc-2")).Bytes()) + populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + + // Dangling storages around the account 3 + hash = decKey(hashData([]byte("acc-3")).Bytes()) + populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + hash = incKey(hashData([]byte("acc-3")).Bytes()) + populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + + // Dangling storages of the random account + populate(randomHash(), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + populate(randomHash(), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + populate(randomHash(), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) +} + +// Tests that snapshot generation with dangling storages. Dangling storage means +// the storage data is existent while the corresponding account data is missing. +// +// This test will populate some dangling storages to see if they can be cleaned up. +func TestGenerateCompleteSnapshotWithDanglingStorage(t *testing.T) { + var helper = newHelper() + stRoot := helper.makeStorageTrie([]string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + + helper.addAccount("acc-1", &Account{Balance: big.NewInt(1), Root: stRoot, CodeHash: emptyCode.Bytes()}) + helper.addAccount("acc-2", &Account{Balance: big.NewInt(1), Root: emptyRoot.Bytes(), CodeHash: emptyCode.Bytes()}) + helper.addAccount("acc-3", &Account{Balance: big.NewInt(1), Root: stRoot, CodeHash: emptyCode.Bytes()}) + + helper.addSnapStorage("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + helper.addSnapStorage("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + + populateDangling(helper.diskdb) + + root, snap := helper.Generate() + select { + case <-snap.genPending: + // Snapshot generation succeeded + + case <-time.After(3 * time.Second): + t.Errorf("Snapshot generation failed") + } + checkSnapRoot(t, snap, root) + + // Signal abortion to the generator and wait for it to tear down + stop := make(chan *generatorStats) + snap.genAbort <- stop + <-stop +} + +// Tests that snapshot generation with dangling storages. Dangling storage means +// the storage data is existent while the corresponding account data is missing. +// +// This test will populate some dangling storages to see if they can be cleaned up. +func TestGenerateBrokenSnapshotWithDanglingStorage(t *testing.T) { + var helper = newHelper() + stRoot := helper.makeStorageTrie([]string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + + helper.addTrieAccount("acc-1", &Account{Balance: big.NewInt(1), Root: stRoot, CodeHash: emptyCode.Bytes()}) + helper.addTrieAccount("acc-2", &Account{Balance: big.NewInt(2), Root: emptyRoot.Bytes(), CodeHash: emptyCode.Bytes()}) + helper.addTrieAccount("acc-3", &Account{Balance: big.NewInt(3), Root: stRoot, CodeHash: emptyCode.Bytes()}) + + populateDangling(helper.diskdb) + + root, snap := helper.Generate() + select { + case <-snap.genPending: + // Snapshot generation succeeded + + case <-time.After(3 * time.Second): + t.Errorf("Snapshot generation failed") + } + checkSnapRoot(t, snap, root) + + // Signal abortion to the generator and wait for it to tear down + stop := make(chan *generatorStats) + snap.genAbort <- stop + <-stop +} diff --git a/core/state/snapshot/holdable_iterator.go b/core/state/snapshot/holdable_iterator.go new file mode 100644 index 000000000..c3ce4d6fc --- /dev/null +++ b/core/state/snapshot/holdable_iterator.go @@ -0,0 +1,97 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see + +package snapshot + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" +) + +// holdableIterator is a wrapper of underlying database iterator. It extends +// the basic iterator interface by adding Hold which can hold the element +// locally where the iterator is currently located and serve it up next time. +type holdableIterator struct { + it ethdb.Iterator + key []byte + val []byte + atHeld bool +} + +// newHoldableIterator initializes the holdableIterator with the given iterator. +func newHoldableIterator(it ethdb.Iterator) *holdableIterator { + return &holdableIterator{it: it} +} + +// Hold holds the element locally where the iterator is currently located which +// can be served up next time. +func (it *holdableIterator) Hold() { + if it.it.Key() == nil { + return // nothing to hold + } + it.key = common.CopyBytes(it.it.Key()) + it.val = common.CopyBytes(it.it.Value()) + it.atHeld = false +} + +// Next moves the iterator to the next key/value pair. It returns whether the +// iterator is exhausted. +func (it *holdableIterator) Next() bool { + if !it.atHeld && it.key != nil { + it.atHeld = true + } else if it.atHeld { + it.atHeld = false + it.key = nil + it.val = nil + } + if it.key != nil { + return true // shifted to locally held value + } + return it.it.Next() +} + +// Error returns any accumulated error. Exhausting all the key/value pairs +// is not considered to be an error. +func (it *holdableIterator) Error() error { return it.it.Error() } + +// Release releases associated resources. Release should always succeed and can +// be called multiple times without causing error. +func (it *holdableIterator) Release() { + it.atHeld = false + it.key = nil + it.val = nil + it.it.Release() +} + +// Key returns the key of the current key/value pair, or nil if done. The caller +// should not modify the contents of the returned slice, and its contents may +// change on the next call to Next. +func (it *holdableIterator) Key() []byte { + if it.key != nil { + return it.key + } + return it.it.Key() +} + +// Value returns the value of the current key/value pair, or nil if done. The +// caller should not modify the contents of the returned slice, and its contents +// may change on the next call to Next. +func (it *holdableIterator) Value() []byte { + if it.val != nil { + return it.val + } + return it.it.Value() +} diff --git a/core/state/snapshot/holdable_iterator_test.go b/core/state/snapshot/holdable_iterator_test.go new file mode 100644 index 000000000..397dbf103 --- /dev/null +++ b/core/state/snapshot/holdable_iterator_test.go @@ -0,0 +1,163 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see + +package snapshot + +import ( + "bytes" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" +) + +func TestIteratorHold(t *testing.T) { + // Create the key-value data store + var ( + content = map[string]string{"k1": "v1", "k2": "v2", "k3": "v3"} + order = []string{"k1", "k2", "k3"} + db = rawdb.NewMemoryDatabase() + ) + for key, val := range content { + if err := db.Put([]byte(key), []byte(val)); err != nil { + t.Fatalf("failed to insert item %s:%s into database: %v", key, val, err) + } + } + // Iterate over the database with the given configs and verify the results + it, idx := newHoldableIterator(db.NewIterator(nil, nil)), 0 + + // Nothing should be affected for calling Discard on non-initialized iterator + it.Hold() + + for it.Next() { + if len(content) <= idx { + t.Errorf("more items than expected: checking idx=%d (key %q), expecting len=%d", idx, it.Key(), len(order)) + break + } + if !bytes.Equal(it.Key(), []byte(order[idx])) { + t.Errorf("item %d: key mismatch: have %s, want %s", idx, string(it.Key()), order[idx]) + } + if !bytes.Equal(it.Value(), []byte(content[order[idx]])) { + t.Errorf("item %d: value mismatch: have %s, want %s", idx, string(it.Value()), content[order[idx]]) + } + // Should be safe to call discard multiple times + it.Hold() + it.Hold() + + // Shift iterator to the discarded element + it.Next() + if !bytes.Equal(it.Key(), []byte(order[idx])) { + t.Errorf("item %d: key mismatch: have %s, want %s", idx, string(it.Key()), order[idx]) + } + if !bytes.Equal(it.Value(), []byte(content[order[idx]])) { + t.Errorf("item %d: value mismatch: have %s, want %s", idx, string(it.Value()), content[order[idx]]) + } + + // Discard/Next combo should work always + it.Hold() + it.Next() + if !bytes.Equal(it.Key(), []byte(order[idx])) { + t.Errorf("item %d: key mismatch: have %s, want %s", idx, string(it.Key()), order[idx]) + } + if !bytes.Equal(it.Value(), []byte(content[order[idx]])) { + t.Errorf("item %d: value mismatch: have %s, want %s", idx, string(it.Value()), content[order[idx]]) + } + idx++ + } + if err := it.Error(); err != nil { + t.Errorf("iteration failed: %v", err) + } + if idx != len(order) { + t.Errorf("iteration terminated prematurely: have %d, want %d", idx, len(order)) + } + db.Close() +} + +func TestReopenIterator(t *testing.T) { + var ( + content = map[common.Hash]string{ + common.HexToHash("a1"): "v1", + common.HexToHash("a2"): "v2", + common.HexToHash("a3"): "v3", + common.HexToHash("a4"): "v4", + common.HexToHash("a5"): "v5", + common.HexToHash("a6"): "v6", + } + order = []common.Hash{ + common.HexToHash("a1"), + common.HexToHash("a2"), + common.HexToHash("a3"), + common.HexToHash("a4"), + common.HexToHash("a5"), + common.HexToHash("a6"), + } + db = rawdb.NewMemoryDatabase() + ) + for key, val := range content { + rawdb.WriteAccountSnapshot(db, key, []byte(val)) + } + checkVal := func(it *holdableIterator, index int) { + if !bytes.Equal(it.Key(), append(rawdb.SnapshotAccountPrefix, order[index].Bytes()...)) { + t.Fatalf("Unexpected data entry key, want %v got %v", order[index], it.Key()) + } + if !bytes.Equal(it.Value(), []byte(content[order[index]])) { + t.Fatalf("Unexpected data entry key, want %v got %v", []byte(content[order[index]]), it.Value()) + } + } + // Iterate over the database with the given configs and verify the results + ctx, idx := newGeneratorContext(&generatorStats{}, db, nil, nil), -1 + + idx++ + ctx.account.Next() + checkVal(ctx.account, idx) + + ctx.reopenIterator(snapAccount) + idx++ + ctx.account.Next() + checkVal(ctx.account, idx) + + // reopen twice + ctx.reopenIterator(snapAccount) + ctx.reopenIterator(snapAccount) + idx++ + ctx.account.Next() + checkVal(ctx.account, idx) + + // reopen iterator with held value + ctx.account.Next() + ctx.account.Hold() + ctx.reopenIterator(snapAccount) + idx++ + ctx.account.Next() + checkVal(ctx.account, idx) + + // reopen twice iterator with held value + ctx.account.Next() + ctx.account.Hold() + ctx.reopenIterator(snapAccount) + ctx.reopenIterator(snapAccount) + idx++ + ctx.account.Next() + checkVal(ctx.account, idx) + + // shift to the end and reopen + ctx.account.Next() // the end + ctx.reopenIterator(snapAccount) + ctx.account.Next() + if ctx.account.Key() != nil { + t.Fatal("Unexpected iterated entry") + } +} diff --git a/core/state/snapshot/journal.go b/core/state/snapshot/journal.go index 8acc441aa..6836a5740 100644 --- a/core/state/snapshot/journal.go +++ b/core/state/snapshot/journal.go @@ -345,78 +345,3 @@ func (dl *diffLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) { log.Debug("Journalled diff layer", "root", dl.root, "parent", dl.parent.Root()) return base, nil } - -// CheckJournalStorage performs consistency-checks on the journalled -// difflayers. -func CheckJournalStorage(db ethdb.KeyValueStore) error { - journal := rawdb.ReadSnapshotJournal(db) - if len(journal) == 0 { - log.Warn("Loaded snapshot journal", "diffs", "missing") - return nil - } - r := rlp.NewStream(bytes.NewReader(journal), 0) - // Firstly, resolve the first element as the journal version - version, err := r.Uint() - if err != nil { - log.Warn("Failed to resolve the journal version", "error", err) - return nil - } - if version != journalVersion { - log.Warn("Discarded the snapshot journal with wrong version", "required", journalVersion, "got", version) - return nil - } - // Secondly, resolve the disk layer root, ensure it's continuous - // with disk layer. Note now we can ensure it's the snapshot journal - // correct version, so we expect everything can be resolved properly. - var root common.Hash - if err := r.Decode(&root); err != nil { - return errors.New("missing disk layer root") - } - // The diff journal is not matched with disk, discard them. - // It can happen that Geth crashes without persisting the latest - // diff journal. - // Load all the snapshot diffs from the journal - return checkDanglingJournalStorage(r) -} - -// loadDiffLayer reads the next sections of a snapshot journal, reconstructing a new -// diff and verifying that it can be linked to the requested parent. -func checkDanglingJournalStorage(r *rlp.Stream) error { - for { - // Read the next diff journal entry - var root common.Hash - if err := r.Decode(&root); err != nil { - // The first read may fail with EOF, marking the end of the journal - if err == io.EOF { - return nil - } - return fmt.Errorf("load diff root: %v", err) - } - var destructs []journalDestruct - if err := r.Decode(&destructs); err != nil { - return fmt.Errorf("load diff destructs: %v", err) - } - var accounts []journalAccount - if err := r.Decode(&accounts); err != nil { - return fmt.Errorf("load diff accounts: %v", err) - } - accountData := make(map[common.Hash][]byte) - for _, entry := range accounts { - if len(entry.Blob) > 0 { // RLP loses nil-ness, but `[]byte{}` is not a valid item, so reinterpret that - accountData[entry.Hash] = entry.Blob - } else { - accountData[entry.Hash] = nil - } - } - var storage []journalStorage - if err := r.Decode(&storage); err != nil { - return fmt.Errorf("load diff storage: %v", err) - } - for _, entry := range storage { - if _, ok := accountData[entry.Hash]; !ok { - log.Error("Dangling storage - missing account", "account", fmt.Sprintf("%#x", entry.Hash), "root", root) - return fmt.Errorf("dangling journal snapshot storage account %#x", entry.Hash) - } - } - } -} diff --git a/core/state/snapshot/metrics.go b/core/state/snapshot/metrics.go new file mode 100644 index 000000000..42fa6fafa --- /dev/null +++ b/core/state/snapshot/metrics.go @@ -0,0 +1,53 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see + +package snapshot + +import "github.com/ethereum/go-ethereum/metrics" + +// Metrics in generation +var ( + snapGeneratedAccountMeter = metrics.NewRegisteredMeter("state/snapshot/generation/account/generated", nil) + snapRecoveredAccountMeter = metrics.NewRegisteredMeter("state/snapshot/generation/account/recovered", nil) + snapWipedAccountMeter = metrics.NewRegisteredMeter("state/snapshot/generation/account/wiped", nil) + snapMissallAccountMeter = metrics.NewRegisteredMeter("state/snapshot/generation/account/missall", nil) + snapGeneratedStorageMeter = metrics.NewRegisteredMeter("state/snapshot/generation/storage/generated", nil) + snapRecoveredStorageMeter = metrics.NewRegisteredMeter("state/snapshot/generation/storage/recovered", nil) + snapWipedStorageMeter = metrics.NewRegisteredMeter("state/snapshot/generation/storage/wiped", nil) + snapMissallStorageMeter = metrics.NewRegisteredMeter("state/snapshot/generation/storage/missall", nil) + snapDanglingStorageMeter = metrics.NewRegisteredMeter("state/snapshot/generation/storage/dangling", nil) + snapSuccessfulRangeProofMeter = metrics.NewRegisteredMeter("state/snapshot/generation/proof/success", nil) + snapFailedRangeProofMeter = metrics.NewRegisteredMeter("state/snapshot/generation/proof/failure", nil) + + // snapAccountProveCounter measures time spent on the account proving + snapAccountProveCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/account/prove", nil) + // snapAccountTrieReadCounter measures time spent on the account trie iteration + snapAccountTrieReadCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/account/trieread", nil) + // snapAccountSnapReadCounter measues time spent on the snapshot account iteration + snapAccountSnapReadCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/account/snapread", nil) + // snapAccountWriteCounter measures time spent on writing/updating/deleting accounts + snapAccountWriteCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/account/write", nil) + // snapStorageProveCounter measures time spent on storage proving + snapStorageProveCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/prove", nil) + // snapStorageTrieReadCounter measures time spent on the storage trie iteration + snapStorageTrieReadCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/trieread", nil) + // snapStorageSnapReadCounter measures time spent on the snapshot storage iteration + snapStorageSnapReadCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/snapread", nil) + // snapStorageWriteCounter measures time spent on writing/updating storages + snapStorageWriteCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/write", nil) + // snapStorageCleanCounter measures time spent on deleting storages + snapStorageCleanCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/clean", nil) +) diff --git a/core/state/snapshot/wipe.go b/core/state/snapshot/wipe.go deleted file mode 100644 index b774c37a4..000000000 --- a/core/state/snapshot/wipe.go +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright 2019 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package snapshot - -import ( - "bytes" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" -) - -// wipeKeyRange deletes a range of keys from the database starting with prefix -// and having a specific total key length. The start and limit is optional for -// specifying a particular key range for deletion. -// -// Origin is included for wiping and limit is excluded if they are specified. -func wipeKeyRange(db ethdb.KeyValueStore, kind string, prefix []byte, origin []byte, limit []byte, keylen int, meter metrics.Meter, report bool) error { - // Batch deletions together to avoid holding an iterator for too long - var ( - batch = db.NewBatch() - items int - ) - // Iterate over the key-range and delete all of them - start, logged := time.Now(), time.Now() - - it := db.NewIterator(prefix, origin) - var stop []byte - if limit != nil { - stop = append(prefix, limit...) - } - for it.Next() { - // Skip any keys with the correct prefix but wrong length (trie nodes) - key := it.Key() - if !bytes.HasPrefix(key, prefix) { - break - } - if len(key) != keylen { - continue - } - if stop != nil && bytes.Compare(key, stop) >= 0 { - break - } - // Delete the key and periodically recreate the batch and iterator - batch.Delete(key) - items++ - - if items%10000 == 0 { - // Batch too large (or iterator too long lived, flush and recreate) - it.Release() - if err := batch.Write(); err != nil { - return err - } - batch.Reset() - seekPos := key[len(prefix):] - it = db.NewIterator(prefix, seekPos) - - if time.Since(logged) > 8*time.Second && report { - log.Info("Deleting state snapshot leftovers", "kind", kind, "wiped", items, "elapsed", common.PrettyDuration(time.Since(start))) - logged = time.Now() - } - } - } - it.Release() - if err := batch.Write(); err != nil { - return err - } - if meter != nil { - meter.Mark(int64(items)) - } - if report { - log.Info("Deleted state snapshot leftovers", "kind", kind, "wiped", items, "elapsed", common.PrettyDuration(time.Since(start))) - } - return nil -} diff --git a/core/state/snapshot/wipe_test.go b/core/state/snapshot/wipe_test.go deleted file mode 100644 index c5b340136..000000000 --- a/core/state/snapshot/wipe_test.go +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright 2019 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package snapshot - -import ( - "math/rand" - "testing" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/ethdb/memorydb" -) - -// Tests that given a database with random data content, all parts of a snapshot -// can be crrectly wiped without touching anything else. -func TestWipe(t *testing.T) { - // Create a database with some random snapshot data - db := memorydb.New() - for i := 0; i < 128; i++ { - rawdb.WriteAccountSnapshot(db, randomHash(), randomHash().Bytes()) - } - // Add some random non-snapshot data too to make wiping harder - for i := 0; i < 500; i++ { - // Generate keys with wrong length for a state snapshot item - keysuffix := make([]byte, 31) - rand.Read(keysuffix) - db.Put(append(rawdb.SnapshotAccountPrefix, keysuffix...), randomHash().Bytes()) - keysuffix = make([]byte, 33) - rand.Read(keysuffix) - db.Put(append(rawdb.SnapshotAccountPrefix, keysuffix...), randomHash().Bytes()) - } - count := func() (items int) { - it := db.NewIterator(rawdb.SnapshotAccountPrefix, nil) - defer it.Release() - for it.Next() { - if len(it.Key()) == len(rawdb.SnapshotAccountPrefix)+common.HashLength { - items++ - } - } - return items - } - // Sanity check that all the keys are present - if items := count(); items != 128 { - t.Fatalf("snapshot size mismatch: have %d, want %d", items, 128) - } - // Wipe the accounts - if err := wipeKeyRange(db, "accounts", rawdb.SnapshotAccountPrefix, nil, nil, - len(rawdb.SnapshotAccountPrefix)+common.HashLength, snapWipedAccountMeter, true); err != nil { - t.Fatal(err) - } - // Iterate over the database end ensure no snapshot information remains - if items := count(); items != 0 { - t.Fatalf("snapshot size mismatch: have %d, want %d", items, 0) - } - // Iterate over the database and ensure miscellaneous items are present - items := 0 - it := db.NewIterator(nil, nil) - defer it.Release() - for it.Next() { - items++ - } - if items != 1000 { - t.Fatalf("misc item count mismatch: have %d, want %d", items, 1000) - } -} diff --git a/ethdb/memorydb/memorydb.go b/ethdb/memorydb/memorydb.go index 95ec9bb8a..e94570cb3 100644 --- a/ethdb/memorydb/memorydb.go +++ b/ethdb/memorydb/memorydb.go @@ -169,6 +169,7 @@ func (db *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator { values = append(values, db.db[key]) } return &iterator{ + index: -1, keys: keys, values: values, } @@ -279,7 +280,7 @@ func (b *batch) Replay(w ethdb.KeyValueWriter) error { // value store. Internally it is a deep copy of the entire iterated state, // sorted by keys. type iterator struct { - inited bool + index int keys []string values [][]byte } @@ -287,17 +288,12 @@ type iterator struct { // Next moves the iterator to the next key/value pair. It returns whether the // iterator is exhausted. func (it *iterator) Next() bool { - // If the iterator was not yet initialized, do it now - if !it.inited { - it.inited = true - return len(it.keys) > 0 + // Short circuit if iterator is already exhausted in the forward direction. + if it.index >= len(it.keys) { + return false } - // Iterator already initialize, advance it - if len(it.keys) > 0 { - it.keys = it.keys[1:] - it.values = it.values[1:] - } - return len(it.keys) > 0 + it.index += 1 + return it.index < len(it.keys) } // Error returns any accumulated error. Exhausting all the key/value pairs @@ -310,26 +306,28 @@ func (it *iterator) Error() error { // should not modify the contents of the returned slice, and its contents may // change on the next call to Next. func (it *iterator) Key() []byte { - if len(it.keys) > 0 { - return []byte(it.keys[0]) + // Short circuit if iterator is not in a valid position + if it.index < 0 || it.index >= len(it.keys) { + return nil } - return nil + return []byte(it.keys[it.index]) } // Value returns the value of the current key/value pair, or nil if done. The // caller should not modify the contents of the returned slice, and its contents // may change on the next call to Next. func (it *iterator) Value() []byte { - if len(it.values) > 0 { - return it.values[0] + // Short circuit if iterator is not in a valid position + if it.index < 0 || it.index >= len(it.keys) { + return nil } - return nil + return it.values[it.index] } // Release releases associated resources. Release should always succeed and can // be called multiple times without causing error. func (it *iterator) Release() { - it.keys, it.values = nil, nil + it.index, it.keys, it.values = -1, nil, nil } // snapshot wraps a batch of key-value entries deep copied from the in-memory diff --git a/metrics/influxdb/influxdb.go b/metrics/influxdb/influxdb.go index 52d009103..dac9e8247 100644 --- a/metrics/influxdb/influxdb.go +++ b/metrics/influxdb/influxdb.go @@ -129,17 +129,15 @@ func (r *reporter) send() error { switch metric := i.(type) { case metrics.Counter: - v := metric.Count() - l := r.cache[name] + count := metric.Count() pts = append(pts, client.Point{ Measurement: fmt.Sprintf("%s%s.count", namespace, name), Tags: r.tags, Fields: map[string]interface{}{ - "value": v - l, + "value": count, }, Time: now, }) - r.cache[name] = v case metrics.Gauge: ms := metric.Snapshot() pts = append(pts, client.Point{