trie: remove node ordering slice in sync batch (#19929)

When we flush a batch of trie nodes into database during the state
sync, we should guarantee that all children should be flushed before
parent.

Actually the trie nodes commit order is strict by: children -> parent.
But when we flush all ready nodes into db, we don't need the order
anymore since

    (1) they are all ready nodes (no more dependency)
    (2) underlying database provides write atomicity
This commit is contained in:
gary rong 2019-10-29 02:50:11 +09:00 committed by Felix Lange
parent 9c81387bef
commit ecdbb402ee
4 changed files with 63 additions and 46 deletions

@ -136,7 +136,7 @@ func TestEmptyStateSync(t *testing.T) {
func TestIterativeStateSyncIndividual(t *testing.T) { testIterativeStateSync(t, 1) } func TestIterativeStateSyncIndividual(t *testing.T) { testIterativeStateSync(t, 1) }
func TestIterativeStateSyncBatched(t *testing.T) { testIterativeStateSync(t, 100) } func TestIterativeStateSyncBatched(t *testing.T) { testIterativeStateSync(t, 100) }
func testIterativeStateSync(t *testing.T, batch int) { func testIterativeStateSync(t *testing.T, count int) {
// Create a random state to copy // Create a random state to copy
srcDb, srcRoot, srcAccounts := makeTestState() srcDb, srcRoot, srcAccounts := makeTestState()
@ -144,7 +144,7 @@ func testIterativeStateSync(t *testing.T, batch int) {
dstDb := rawdb.NewMemoryDatabase() dstDb := rawdb.NewMemoryDatabase()
sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb)) sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb))
queue := append([]common.Hash{}, sched.Missing(batch)...) queue := append([]common.Hash{}, sched.Missing(count)...)
for len(queue) > 0 { for len(queue) > 0 {
results := make([]trie.SyncResult, len(queue)) results := make([]trie.SyncResult, len(queue))
for i, hash := range queue { for i, hash := range queue {
@ -157,10 +157,12 @@ func testIterativeStateSync(t *testing.T, batch int) {
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
if index, err := sched.Commit(dstDb); err != nil { batch := dstDb.NewBatch()
t.Fatalf("failed to commit data #%d: %v", index, err) if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
} }
queue = append(queue[:0], sched.Missing(batch)...) batch.Write()
queue = append(queue[:0], sched.Missing(count)...)
} }
// Cross check that the two states are in sync // Cross check that the two states are in sync
checkStateAccounts(t, dstDb, srcRoot, srcAccounts) checkStateAccounts(t, dstDb, srcRoot, srcAccounts)
@ -190,9 +192,11 @@ func TestIterativeDelayedStateSync(t *testing.T) {
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
if index, err := sched.Commit(dstDb); err != nil { batch := dstDb.NewBatch()
t.Fatalf("failed to commit data #%d: %v", index, err) if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
} }
batch.Write()
queue = append(queue[len(results):], sched.Missing(0)...) queue = append(queue[len(results):], sched.Missing(0)...)
} }
// Cross check that the two states are in sync // Cross check that the two states are in sync
@ -205,7 +209,7 @@ func TestIterativeDelayedStateSync(t *testing.T) {
func TestIterativeRandomStateSyncIndividual(t *testing.T) { testIterativeRandomStateSync(t, 1) } func TestIterativeRandomStateSyncIndividual(t *testing.T) { testIterativeRandomStateSync(t, 1) }
func TestIterativeRandomStateSyncBatched(t *testing.T) { testIterativeRandomStateSync(t, 100) } func TestIterativeRandomStateSyncBatched(t *testing.T) { testIterativeRandomStateSync(t, 100) }
func testIterativeRandomStateSync(t *testing.T, batch int) { func testIterativeRandomStateSync(t *testing.T, count int) {
// Create a random state to copy // Create a random state to copy
srcDb, srcRoot, srcAccounts := makeTestState() srcDb, srcRoot, srcAccounts := makeTestState()
@ -214,7 +218,7 @@ func testIterativeRandomStateSync(t *testing.T, batch int) {
sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb)) sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb))
queue := make(map[common.Hash]struct{}) queue := make(map[common.Hash]struct{})
for _, hash := range sched.Missing(batch) { for _, hash := range sched.Missing(count) {
queue[hash] = struct{}{} queue[hash] = struct{}{}
} }
for len(queue) > 0 { for len(queue) > 0 {
@ -231,11 +235,13 @@ func testIterativeRandomStateSync(t *testing.T, batch int) {
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
if index, err := sched.Commit(dstDb); err != nil { batch := dstDb.NewBatch()
t.Fatalf("failed to commit data #%d: %v", index, err) if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
} }
batch.Write()
queue = make(map[common.Hash]struct{}) queue = make(map[common.Hash]struct{})
for _, hash := range sched.Missing(batch) { for _, hash := range sched.Missing(count) {
queue[hash] = struct{}{} queue[hash] = struct{}{}
} }
} }
@ -277,9 +283,11 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) {
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
if index, err := sched.Commit(dstDb); err != nil { batch := dstDb.NewBatch()
t.Fatalf("failed to commit data #%d: %v", index, err) if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
} }
batch.Write()
for _, hash := range sched.Missing(0) { for _, hash := range sched.Missing(0) {
queue[hash] = struct{}{} queue[hash] = struct{}{}
} }
@ -316,9 +324,11 @@ func TestIncompleteStateSync(t *testing.T) {
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
if index, err := sched.Commit(dstDb); err != nil { batch := dstDb.NewBatch()
t.Fatalf("failed to commit data #%d: %v", index, err) if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
} }
batch.Write()
for _, result := range results { for _, result := range results {
added = append(added, result.Hash) added = append(added, result.Hash)
} }

@ -347,7 +347,7 @@ func (s *stateSync) commit(force bool) error {
} }
start := time.Now() start := time.Now()
b := s.d.stateDB.NewBatch() b := s.d.stateDB.NewBatch()
if written, err := s.sched.Commit(b); written == 0 || err != nil { if err := s.sched.Commit(b); err != nil {
return err return err
} }
if err := b.Write(); err != nil { if err := b.Write(); err != nil {

@ -57,14 +57,12 @@ type SyncResult struct {
// persisted data items. // persisted data items.
type syncMemBatch struct { type syncMemBatch struct {
batch map[common.Hash][]byte // In-memory membatch of recently completed items batch map[common.Hash][]byte // In-memory membatch of recently completed items
order []common.Hash // Order of completion to prevent out-of-order data loss
} }
// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes. // newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes.
func newSyncMemBatch() *syncMemBatch { func newSyncMemBatch() *syncMemBatch {
return &syncMemBatch{ return &syncMemBatch{
batch: make(map[common.Hash][]byte), batch: make(map[common.Hash][]byte),
order: make([]common.Hash, 0, 256),
} }
} }
@ -223,20 +221,18 @@ func (s *Sync) Process(results []SyncResult) (bool, int, error) {
} }
// Commit flushes the data stored in the internal membatch out to persistent // Commit flushes the data stored in the internal membatch out to persistent
// storage, returning the number of items written and any occurred error. // storage, returning any occurred error.
func (s *Sync) Commit(dbw ethdb.KeyValueWriter) (int, error) { func (s *Sync) Commit(dbw ethdb.Batch) error {
// Dump the membatch into a database dbw // Dump the membatch into a database dbw
for i, key := range s.membatch.order { for key, value := range s.membatch.batch {
if err := dbw.Put(key[:], s.membatch.batch[key]); err != nil { if err := dbw.Put(key[:], value); err != nil {
return i, err return err
} }
s.bloom.Add(key[:]) s.bloom.Add(key[:])
} }
written := len(s.membatch.order) // TODO(karalabe): could an order change improve write performance?
// Drop the membatch data and return // Drop the membatch data and return
s.membatch = newSyncMemBatch() s.membatch = newSyncMemBatch()
return written, nil return nil
} }
// Pending returns the number of state entries currently pending for download. // Pending returns the number of state entries currently pending for download.
@ -330,7 +326,6 @@ func (s *Sync) children(req *request, object node) ([]*request, error) {
func (s *Sync) commit(req *request) (err error) { func (s *Sync) commit(req *request) (err error) {
// Write the node content to the membatch // Write the node content to the membatch
s.membatch.batch[req.hash] = req.data s.membatch.batch[req.hash] = req.data
s.membatch.order = append(s.membatch.order, req.hash)
delete(s.requests, req.hash) delete(s.requests, req.hash)

@ -105,7 +105,7 @@ func TestEmptySync(t *testing.T) {
func TestIterativeSyncIndividual(t *testing.T) { testIterativeSync(t, 1) } func TestIterativeSyncIndividual(t *testing.T) { testIterativeSync(t, 1) }
func TestIterativeSyncBatched(t *testing.T) { testIterativeSync(t, 100) } func TestIterativeSyncBatched(t *testing.T) { testIterativeSync(t, 100) }
func testIterativeSync(t *testing.T, batch int) { func testIterativeSync(t *testing.T, count int) {
// Create a random trie to copy // Create a random trie to copy
srcDb, srcTrie, srcData := makeTestTrie() srcDb, srcTrie, srcData := makeTestTrie()
@ -114,7 +114,7 @@ func testIterativeSync(t *testing.T, batch int) {
triedb := NewDatabase(diskdb) triedb := NewDatabase(diskdb)
sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb))
queue := append([]common.Hash{}, sched.Missing(batch)...) queue := append([]common.Hash{}, sched.Missing(count)...)
for len(queue) > 0 { for len(queue) > 0 {
results := make([]SyncResult, len(queue)) results := make([]SyncResult, len(queue))
for i, hash := range queue { for i, hash := range queue {
@ -127,10 +127,12 @@ func testIterativeSync(t *testing.T, batch int) {
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
if index, err := sched.Commit(diskdb); err != nil { batch := diskdb.NewBatch()
t.Fatalf("failed to commit data #%d: %v", index, err) if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
} }
queue = append(queue[:0], sched.Missing(batch)...) batch.Write()
queue = append(queue[:0], sched.Missing(count)...)
} }
// Cross check that the two tries are in sync // Cross check that the two tries are in sync
checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData) checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData)
@ -161,9 +163,11 @@ func TestIterativeDelayedSync(t *testing.T) {
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
if index, err := sched.Commit(diskdb); err != nil { batch := diskdb.NewBatch()
t.Fatalf("failed to commit data #%d: %v", index, err) if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
} }
batch.Write()
queue = append(queue[len(results):], sched.Missing(10000)...) queue = append(queue[len(results):], sched.Missing(10000)...)
} }
// Cross check that the two tries are in sync // Cross check that the two tries are in sync
@ -176,7 +180,7 @@ func TestIterativeDelayedSync(t *testing.T) {
func TestIterativeRandomSyncIndividual(t *testing.T) { testIterativeRandomSync(t, 1) } func TestIterativeRandomSyncIndividual(t *testing.T) { testIterativeRandomSync(t, 1) }
func TestIterativeRandomSyncBatched(t *testing.T) { testIterativeRandomSync(t, 100) } func TestIterativeRandomSyncBatched(t *testing.T) { testIterativeRandomSync(t, 100) }
func testIterativeRandomSync(t *testing.T, batch int) { func testIterativeRandomSync(t *testing.T, count int) {
// Create a random trie to copy // Create a random trie to copy
srcDb, srcTrie, srcData := makeTestTrie() srcDb, srcTrie, srcData := makeTestTrie()
@ -186,7 +190,7 @@ func testIterativeRandomSync(t *testing.T, batch int) {
sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb))
queue := make(map[common.Hash]struct{}) queue := make(map[common.Hash]struct{})
for _, hash := range sched.Missing(batch) { for _, hash := range sched.Missing(count) {
queue[hash] = struct{}{} queue[hash] = struct{}{}
} }
for len(queue) > 0 { for len(queue) > 0 {
@ -203,11 +207,13 @@ func testIterativeRandomSync(t *testing.T, batch int) {
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
if index, err := sched.Commit(diskdb); err != nil { batch := diskdb.NewBatch()
t.Fatalf("failed to commit data #%d: %v", index, err) if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
} }
batch.Write()
queue = make(map[common.Hash]struct{}) queue = make(map[common.Hash]struct{})
for _, hash := range sched.Missing(batch) { for _, hash := range sched.Missing(count) {
queue[hash] = struct{}{} queue[hash] = struct{}{}
} }
} }
@ -248,9 +254,11 @@ func TestIterativeRandomDelayedSync(t *testing.T) {
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
if index, err := sched.Commit(diskdb); err != nil { batch := diskdb.NewBatch()
t.Fatalf("failed to commit data #%d: %v", index, err) if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
} }
batch.Write()
for _, result := range results { for _, result := range results {
delete(queue, result.Hash) delete(queue, result.Hash)
} }
@ -293,9 +301,11 @@ func TestDuplicateAvoidanceSync(t *testing.T) {
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
if index, err := sched.Commit(diskdb); err != nil { batch := diskdb.NewBatch()
t.Fatalf("failed to commit data #%d: %v", index, err) if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
} }
batch.Write()
queue = append(queue[:0], sched.Missing(0)...) queue = append(queue[:0], sched.Missing(0)...)
} }
// Cross check that the two tries are in sync // Cross check that the two tries are in sync
@ -329,9 +339,11 @@ func TestIncompleteSync(t *testing.T) {
if _, index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
if index, err := sched.Commit(diskdb); err != nil { batch := diskdb.NewBatch()
t.Fatalf("failed to commit data #%d: %v", index, err) if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
} }
batch.Write()
for _, result := range results { for _, result := range results {
added = append(added, result.Hash) added = append(added, result.Hash)
} }