core/rawdb: freezer index repair (#29792)
This pull request removes the `fsync` of index files in freezer.ModifyAncients function for performance gain. Originally, fsync is added after each freezer write operation to ensure the written data is truly transferred into disk. Unfortunately, it turns out `fsync` can be relatively slow, especially on macOS (see https://github.com/ethereum/go-ethereum/issues/28754 for more information). In this pull request, fsync for index file is removed as it turns out index file can be recovered even after a unclean shutdown. But fsync for data file is still kept, as we have no meaningful way to validate the data correctness after unclean shutdown. --- **But why do we need the `fsync` in the first place?** As it's necessary for freezer to survive/recover after the machine crash (e.g. power failure). In linux, whenever the file write is performed, the file metadata update and data update are not necessarily performed at the same time. Typically, the metadata will be flushed/journalled ahead of the file data. Therefore, we make the pessimistic assumption that the file is first extended with invalid "garbage" data (normally zero bytes) and that afterwards the correct data replaces the garbage. We have observed that the index file of the freezer often contain garbage entry with zero value (filenumber = 0, offset = 0) after a machine power failure. It proves that the index file is extended without the data being flushed. And this corruption can destroy the whole freezer data eventually. Performing fsync after each write operation can reduce the time window for data to be transferred to the disk and ensure the correctness of the data in the disk to the greatest extent. --- **How can we maintain this guarantee without relying on fsync?** Because the items in the index file are strictly in order, we can leverage this characteristic to detect the corruption and truncate them when freezer is opened. Specifically these validation rules are performed for each index file: For two consecutive index items: - If their file numbers are the same, then the offset of the latter one MUST not be less than that of the former. - If the file number of the latter one is equal to that of the former plus one, then the offset of the latter one MUST not be 0. - If their file numbers are not equal, and the latter's file number is not equal to the former plus 1, the latter one is valid And also, for the first non-head item, it must refer to the earliest data file, or the next file if the earliest file is not sufficient to place the first item(very special case, only theoretical possible in tests) With these validation rules, we can detect the invalid item in index file with greatest possibility. --- But unfortunately, these scenarios are not covered and could still lead to a freezer corruption if it occurs: **All items in index file are in zero value** It's impossible to distinguish if they are truly zero (e.g. all the data entries maintained in freezer are zero size) or just the garbage left by OS. In this case, these index items will be kept by truncating the entire data file, namely the freezer is corrupted. However, we can consider that the probability of this situation occurring is quite low, and even if it occurs, the freezer can be considered to be close to an empty state. Rerun the state sync should be acceptable. **Index file is integral while relative data file is corrupted** It might be possible the data file is corrupted whose file size is extended correctly with garbage filled (e.g. zero bytes). In this case, it's impossible to detect the corruption by index validation. We can either choose to `fsync` the data file, or blindly believe that if index file is integral then the data file could be integral with very high chance. In this pull request, the first option is taken.
This commit is contained in:
parent
90970ed3cd
commit
eff0bed91b
@ -180,10 +180,10 @@ func (batch *freezerTableBatch) maybeCommit() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// commit writes the batched items to the backing freezerTable.
|
// commit writes the batched items to the backing freezerTable. Note index
|
||||||
|
// file isn't fsync'd after the file write, the recent write can be lost
|
||||||
|
// after the power failure.
|
||||||
func (batch *freezerTableBatch) commit() error {
|
func (batch *freezerTableBatch) commit() error {
|
||||||
// Write data. The head file is fsync'd after write to ensure the
|
|
||||||
// data is truly transferred to disk.
|
|
||||||
_, err := batch.t.head.Write(batch.dataBuffer)
|
_, err := batch.t.head.Write(batch.dataBuffer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -194,15 +194,10 @@ func (batch *freezerTableBatch) commit() error {
|
|||||||
dataSize := int64(len(batch.dataBuffer))
|
dataSize := int64(len(batch.dataBuffer))
|
||||||
batch.dataBuffer = batch.dataBuffer[:0]
|
batch.dataBuffer = batch.dataBuffer[:0]
|
||||||
|
|
||||||
// Write indices. The index file is fsync'd after write to ensure the
|
|
||||||
// data indexes are truly transferred to disk.
|
|
||||||
_, err = batch.t.index.Write(batch.indexBuffer)
|
_, err = batch.t.index.Write(batch.indexBuffer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := batch.t.index.Sync(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
indexSize := int64(len(batch.indexBuffer))
|
indexSize := int64(len(batch.indexBuffer))
|
||||||
batch.indexBuffer = batch.indexBuffer[:0]
|
batch.indexBuffer = batch.indexBuffer[:0]
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
package rawdb
|
package rawdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
@ -26,6 +27,7 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
@ -219,7 +221,13 @@ func (t *freezerTable) repair() error {
|
|||||||
return err
|
return err
|
||||||
} // New file can't trigger this path
|
} // New file can't trigger this path
|
||||||
}
|
}
|
||||||
// Retrieve the file sizes and prepare for truncation
|
// Validate the index file as it might contain some garbage data after the
|
||||||
|
// power failures.
|
||||||
|
if err := t.repairIndex(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Retrieve the file sizes and prepare for truncation. Note the file size
|
||||||
|
// might be changed after index validation.
|
||||||
if stat, err = t.index.Stat(); err != nil {
|
if stat, err = t.index.Stat(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -364,6 +372,133 @@ func (t *freezerTable) repair() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// repairIndex validates the integrity of the index file. According to the design,
|
||||||
|
// the initial entry in the file denotes the earliest data file along with the
|
||||||
|
// count of deleted items. Following this, all subsequent entries in the file must
|
||||||
|
// be in order. This function identifies any corrupted entries and truncates items
|
||||||
|
// occurring after the corruption point.
|
||||||
|
//
|
||||||
|
// corruption can occur because of the power failure. In the Linux kernel, the
|
||||||
|
// file metadata update and data update are not necessarily performed at the
|
||||||
|
// same time. Typically, the metadata will be flushed/journalled ahead of the file
|
||||||
|
// data. Therefore, we make the pessimistic assumption that the file is first
|
||||||
|
// extended with invalid "garbage" data (normally zero bytes) and that afterwards
|
||||||
|
// the correct data replaces the garbage. As all the items in index file are
|
||||||
|
// supposed to be in-order, the leftover garbage must be truncated before the
|
||||||
|
// index data is utilized.
|
||||||
|
//
|
||||||
|
// It's important to note an exception that's unfortunately undetectable: when
|
||||||
|
// all index entries in the file are zero. Distinguishing whether they represent
|
||||||
|
// leftover garbage or if all items in the table have zero size is impossible.
|
||||||
|
// In such instances, the file will remain unchanged to prevent potential data
|
||||||
|
// loss or misinterpretation.
|
||||||
|
func (t *freezerTable) repairIndex() error {
|
||||||
|
// Retrieve the file sizes and prepare for validation
|
||||||
|
stat, err := t.index.Stat()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
size := stat.Size()
|
||||||
|
|
||||||
|
// Move the read cursor to the beginning of the file
|
||||||
|
_, err = t.index.Seek(0, io.SeekStart)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fr := bufio.NewReader(t.index)
|
||||||
|
|
||||||
|
var (
|
||||||
|
start = time.Now()
|
||||||
|
buff = make([]byte, indexEntrySize)
|
||||||
|
prev indexEntry
|
||||||
|
head indexEntry
|
||||||
|
|
||||||
|
read = func() (indexEntry, error) {
|
||||||
|
n, err := io.ReadFull(fr, buff)
|
||||||
|
if err != nil {
|
||||||
|
return indexEntry{}, err
|
||||||
|
}
|
||||||
|
if n != indexEntrySize {
|
||||||
|
return indexEntry{}, fmt.Errorf("failed to read from index, n: %d", n)
|
||||||
|
}
|
||||||
|
var entry indexEntry
|
||||||
|
entry.unmarshalBinary(buff)
|
||||||
|
return entry, nil
|
||||||
|
}
|
||||||
|
truncate = func(offset int64) error {
|
||||||
|
if t.readonly {
|
||||||
|
return fmt.Errorf("index file is corrupted at %d, size: %d", offset, size)
|
||||||
|
}
|
||||||
|
if err := truncateFreezerFile(t.index, offset); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Warn("Truncated index file", "offset", offset, "truncated", size-offset)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
)
|
||||||
|
for offset := int64(0); offset < size; offset += indexEntrySize {
|
||||||
|
entry, err := read()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if offset == 0 {
|
||||||
|
head = entry
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Ensure that the first non-head index refers to the earliest file,
|
||||||
|
// or the next file if the earliest file has no space to place the
|
||||||
|
// first item.
|
||||||
|
if offset == indexEntrySize {
|
||||||
|
if entry.filenum != head.filenum && entry.filenum != head.filenum+1 {
|
||||||
|
log.Error("Corrupted index item detected", "earliest", head.filenum, "filenumber", entry.filenum)
|
||||||
|
return truncate(offset)
|
||||||
|
}
|
||||||
|
prev = entry
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// ensure two consecutive index items are in order
|
||||||
|
if err := t.checkIndexItems(prev, entry); err != nil {
|
||||||
|
log.Error("Corrupted index item detected", "err", err)
|
||||||
|
return truncate(offset)
|
||||||
|
}
|
||||||
|
prev = entry
|
||||||
|
}
|
||||||
|
// Move the read cursor to the end of the file. While theoretically, the
|
||||||
|
// cursor should reach the end by reading all the items in the file, perform
|
||||||
|
// the seek operation anyway as a precaution.
|
||||||
|
_, err = t.index.Seek(0, io.SeekEnd)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Debug("Verified index file", "items", size/indexEntrySize, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkIndexItems validates the correctness of two consecutive index items based
|
||||||
|
// on the following rules:
|
||||||
|
//
|
||||||
|
// - The file number of two consecutive index items must either be the same or
|
||||||
|
// increase monotonically. If the file number decreases or skips in a
|
||||||
|
// non-sequential manner, the index item is considered invalid.
|
||||||
|
//
|
||||||
|
// - For index items with the same file number, the data offset must be in
|
||||||
|
// non-decreasing order. Note: Two index items with the same file number
|
||||||
|
// and the same data offset are permitted if the entry size is zero.
|
||||||
|
//
|
||||||
|
// - The first index item in a new data file must not have a zero data offset.
|
||||||
|
func (t *freezerTable) checkIndexItems(a, b indexEntry) error {
|
||||||
|
if b.filenum != a.filenum && b.filenum != a.filenum+1 {
|
||||||
|
return fmt.Errorf("index items with inconsistent file number, prev: %d, next: %d", a.filenum, b.filenum)
|
||||||
|
}
|
||||||
|
if b.filenum == a.filenum && b.offset < a.offset {
|
||||||
|
return fmt.Errorf("index items with unordered offset, prev: %d, next: %d", a.offset, b.offset)
|
||||||
|
}
|
||||||
|
if b.filenum == a.filenum+1 && b.offset == 0 {
|
||||||
|
return fmt.Errorf("index items with zero offset, file number: %d", b.filenum)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// preopen opens all files that the freezer will need. This method should be called from an init-context,
|
// preopen opens all files that the freezer will need. This method should be called from an init-context,
|
||||||
// since it assumes that it doesn't have to bother with locking
|
// since it assumes that it doesn't have to bother with locking
|
||||||
// The rationale for doing preopen is to not have to do it from within Retrieve, thus not needing to ever
|
// The rationale for doing preopen is to not have to do it from within Retrieve, thus not needing to ever
|
||||||
|
@ -1367,3 +1367,69 @@ func TestRandom(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIndexValidation(t *testing.T) {
|
||||||
|
const (
|
||||||
|
items = 30
|
||||||
|
dataSize = 10
|
||||||
|
)
|
||||||
|
garbage := indexEntry{
|
||||||
|
filenum: 100,
|
||||||
|
offset: 200,
|
||||||
|
}
|
||||||
|
var cases = []struct {
|
||||||
|
offset int64
|
||||||
|
data []byte
|
||||||
|
expItems int
|
||||||
|
}{
|
||||||
|
// extend index file with zero bytes at the end
|
||||||
|
{
|
||||||
|
offset: (items + 1) * indexEntrySize,
|
||||||
|
data: make([]byte, indexEntrySize),
|
||||||
|
expItems: 30,
|
||||||
|
},
|
||||||
|
// write garbage in the first non-head item
|
||||||
|
{
|
||||||
|
offset: indexEntrySize,
|
||||||
|
data: garbage.append(nil),
|
||||||
|
expItems: 0,
|
||||||
|
},
|
||||||
|
// write garbage in the first non-head item
|
||||||
|
{
|
||||||
|
offset: (items/2 + 1) * indexEntrySize,
|
||||||
|
data: garbage.append(nil),
|
||||||
|
expItems: items / 2,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, c := range cases {
|
||||||
|
fn := fmt.Sprintf("t-%d", rand.Uint64())
|
||||||
|
f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 100, true, false)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
writeChunks(t, f, items, dataSize)
|
||||||
|
|
||||||
|
// write corrupted data
|
||||||
|
f.index.WriteAt(c.data, c.offset)
|
||||||
|
f.Close()
|
||||||
|
|
||||||
|
// reopen the table, corruption should be truncated
|
||||||
|
f, err = newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 100, true, false)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
for i := 0; i < c.expItems; i++ {
|
||||||
|
exp := getChunk(10, i)
|
||||||
|
got, err := f.Retrieve(uint64(i))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to read from table, %v", err)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(exp, got) {
|
||||||
|
t.Fatalf("Unexpected item data, want: %v, got: %v", exp, got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if f.items.Load() != uint64(c.expItems) {
|
||||||
|
t.Fatalf("Unexpected item number, want: %d, got: %d", c.expItems, f.items.Load())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -202,7 +202,7 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
|
|||||||
if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest {
|
if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest {
|
||||||
force = true
|
force = true
|
||||||
}
|
}
|
||||||
if err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force); err != nil {
|
if err := ndl.buffer.flush(ndl.db.diskdb, ndl.db.freezer, ndl.cleans, ndl.id, force); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// To remove outdated history objects from the end, we set the 'tail' parameter
|
// To remove outdated history objects from the end, we set the 'tail' parameter
|
||||||
@ -267,7 +267,7 @@ func (dl *diskLayer) setBufferSize(size int) error {
|
|||||||
if dl.stale {
|
if dl.stale {
|
||||||
return errSnapshotStale
|
return errSnapshotStale
|
||||||
}
|
}
|
||||||
return dl.buffer.setSize(size, dl.db.diskdb, dl.cleans, dl.id)
|
return dl.buffer.setSize(size, dl.db.diskdb, dl.db.freezer, dl.cleans, dl.id)
|
||||||
}
|
}
|
||||||
|
|
||||||
// size returns the approximate size of cached nodes in the disk layer.
|
// size returns the approximate size of cached nodes in the disk layer.
|
||||||
|
@ -194,9 +194,9 @@ func (b *nodebuffer) empty() bool {
|
|||||||
|
|
||||||
// setSize sets the buffer size to the provided number, and invokes a flush
|
// setSize sets the buffer size to the provided number, and invokes a flush
|
||||||
// operation if the current memory usage exceeds the new limit.
|
// operation if the current memory usage exceeds the new limit.
|
||||||
func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
|
func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, freezer ethdb.AncientStore, clean *fastcache.Cache, id uint64) error {
|
||||||
b.limit = uint64(size)
|
b.limit = uint64(size)
|
||||||
return b.flush(db, clean, id, false)
|
return b.flush(db, freezer, clean, id, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// allocBatch returns a database batch with pre-allocated buffer.
|
// allocBatch returns a database batch with pre-allocated buffer.
|
||||||
@ -214,7 +214,7 @@ func (b *nodebuffer) allocBatch(db ethdb.KeyValueStore) ethdb.Batch {
|
|||||||
|
|
||||||
// flush persists the in-memory dirty trie node into the disk if the configured
|
// flush persists the in-memory dirty trie node into the disk if the configured
|
||||||
// memory threshold is reached. Note, all data must be written atomically.
|
// memory threshold is reached. Note, all data must be written atomically.
|
||||||
func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error {
|
func (b *nodebuffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, clean *fastcache.Cache, id uint64, force bool) error {
|
||||||
if b.size <= b.limit && !force {
|
if b.size <= b.limit && !force {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -227,6 +227,13 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui
|
|||||||
start = time.Now()
|
start = time.Now()
|
||||||
batch = b.allocBatch(db)
|
batch = b.allocBatch(db)
|
||||||
)
|
)
|
||||||
|
// Explicitly sync the state freezer, ensuring that all written
|
||||||
|
// data is transferred to disk before updating the key-value store.
|
||||||
|
if freezer != nil {
|
||||||
|
if err := freezer.Sync(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
nodes := writeNodes(batch, b.nodes, clean)
|
nodes := writeNodes(batch, b.nodes, clean)
|
||||||
rawdb.WritePersistentStateID(batch, id)
|
rawdb.WritePersistentStateID(batch, id)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user