diff --git a/core/state/sync_test.go b/core/state/sync_test.go index de098dce0..f4a221bd9 100644 --- a/core/state/sync_test.go +++ b/core/state/sync_test.go @@ -136,7 +136,7 @@ func TestEmptyStateSync(t *testing.T) { func TestIterativeStateSyncIndividual(t *testing.T) { testIterativeStateSync(t, 1) } func TestIterativeStateSyncBatched(t *testing.T) { testIterativeStateSync(t, 100) } -func testIterativeStateSync(t *testing.T, batch int) { +func testIterativeStateSync(t *testing.T, count int) { // Create a random state to copy srcDb, srcRoot, srcAccounts := makeTestState() @@ -144,7 +144,7 @@ func testIterativeStateSync(t *testing.T, batch int) { dstDb := rawdb.NewMemoryDatabase() sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb)) - queue := append([]common.Hash{}, sched.Missing(batch)...) + queue := append([]common.Hash{}, sched.Missing(count)...) for len(queue) > 0 { results := make([]trie.SyncResult, len(queue)) for i, hash := range queue { @@ -157,10 +157,12 @@ func testIterativeStateSync(t *testing.T, batch int) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(dstDb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := dstDb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } - queue = append(queue[:0], sched.Missing(batch)...) + batch.Write() + queue = append(queue[:0], sched.Missing(count)...) } // Cross check that the two states are in sync checkStateAccounts(t, dstDb, srcRoot, srcAccounts) @@ -190,9 +192,11 @@ func TestIterativeDelayedStateSync(t *testing.T) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(dstDb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := dstDb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } + batch.Write() queue = append(queue[len(results):], sched.Missing(0)...) } // Cross check that the two states are in sync @@ -205,7 +209,7 @@ func TestIterativeDelayedStateSync(t *testing.T) { func TestIterativeRandomStateSyncIndividual(t *testing.T) { testIterativeRandomStateSync(t, 1) } func TestIterativeRandomStateSyncBatched(t *testing.T) { testIterativeRandomStateSync(t, 100) } -func testIterativeRandomStateSync(t *testing.T, batch int) { +func testIterativeRandomStateSync(t *testing.T, count int) { // Create a random state to copy srcDb, srcRoot, srcAccounts := makeTestState() @@ -214,7 +218,7 @@ func testIterativeRandomStateSync(t *testing.T, batch int) { sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb)) queue := make(map[common.Hash]struct{}) - for _, hash := range sched.Missing(batch) { + for _, hash := range sched.Missing(count) { queue[hash] = struct{}{} } for len(queue) > 0 { @@ -231,11 +235,13 @@ func testIterativeRandomStateSync(t *testing.T, batch int) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(dstDb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := dstDb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } + batch.Write() queue = make(map[common.Hash]struct{}) - for _, hash := range sched.Missing(batch) { + for _, hash := range sched.Missing(count) { queue[hash] = struct{}{} } } @@ -277,9 +283,11 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(dstDb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := dstDb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } + batch.Write() for _, hash := range sched.Missing(0) { queue[hash] = struct{}{} } @@ -316,9 +324,11 @@ func TestIncompleteStateSync(t *testing.T) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(dstDb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := dstDb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } + batch.Write() for _, result := range results { added = append(added, result.Hash) } diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index b422557d5..f875b3a84 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -347,7 +347,7 @@ func (s *stateSync) commit(force bool) error { } start := time.Now() b := s.d.stateDB.NewBatch() - if written, err := s.sched.Commit(b); written == 0 || err != nil { + if err := s.sched.Commit(b); err != nil { return err } if err := b.Write(); err != nil { diff --git a/trie/sync.go b/trie/sync.go index 6f40b45a1..e5a0c1749 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -57,14 +57,12 @@ type SyncResult struct { // persisted data items. type syncMemBatch struct { batch map[common.Hash][]byte // In-memory membatch of recently completed items - order []common.Hash // Order of completion to prevent out-of-order data loss } // newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes. func newSyncMemBatch() *syncMemBatch { return &syncMemBatch{ batch: make(map[common.Hash][]byte), - order: make([]common.Hash, 0, 256), } } @@ -223,20 +221,18 @@ func (s *Sync) Process(results []SyncResult) (bool, int, error) { } // Commit flushes the data stored in the internal membatch out to persistent -// storage, returning the number of items written and any occurred error. -func (s *Sync) Commit(dbw ethdb.KeyValueWriter) (int, error) { +// storage, returning any occurred error. +func (s *Sync) Commit(dbw ethdb.Batch) error { // Dump the membatch into a database dbw - for i, key := range s.membatch.order { - if err := dbw.Put(key[:], s.membatch.batch[key]); err != nil { - return i, err + for key, value := range s.membatch.batch { + if err := dbw.Put(key[:], value); err != nil { + return err } s.bloom.Add(key[:]) } - written := len(s.membatch.order) // TODO(karalabe): could an order change improve write performance? - // Drop the membatch data and return s.membatch = newSyncMemBatch() - return written, nil + return nil } // Pending returns the number of state entries currently pending for download. @@ -330,7 +326,6 @@ func (s *Sync) children(req *request, object node) ([]*request, error) { func (s *Sync) commit(req *request) (err error) { // Write the node content to the membatch s.membatch.batch[req.hash] = req.data - s.membatch.order = append(s.membatch.order, req.hash) delete(s.requests, req.hash) diff --git a/trie/sync_test.go b/trie/sync_test.go index 0621bb435..6025b87fc 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -105,7 +105,7 @@ func TestEmptySync(t *testing.T) { func TestIterativeSyncIndividual(t *testing.T) { testIterativeSync(t, 1) } func TestIterativeSyncBatched(t *testing.T) { testIterativeSync(t, 100) } -func testIterativeSync(t *testing.T, batch int) { +func testIterativeSync(t *testing.T, count int) { // Create a random trie to copy srcDb, srcTrie, srcData := makeTestTrie() @@ -114,7 +114,7 @@ func testIterativeSync(t *testing.T, batch int) { triedb := NewDatabase(diskdb) sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) - queue := append([]common.Hash{}, sched.Missing(batch)...) + queue := append([]common.Hash{}, sched.Missing(count)...) for len(queue) > 0 { results := make([]SyncResult, len(queue)) for i, hash := range queue { @@ -127,10 +127,12 @@ func testIterativeSync(t *testing.T, batch int) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(diskdb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := diskdb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } - queue = append(queue[:0], sched.Missing(batch)...) + batch.Write() + queue = append(queue[:0], sched.Missing(count)...) } // Cross check that the two tries are in sync checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData) @@ -161,9 +163,11 @@ func TestIterativeDelayedSync(t *testing.T) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(diskdb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := diskdb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } + batch.Write() queue = append(queue[len(results):], sched.Missing(10000)...) } // Cross check that the two tries are in sync @@ -176,7 +180,7 @@ func TestIterativeDelayedSync(t *testing.T) { func TestIterativeRandomSyncIndividual(t *testing.T) { testIterativeRandomSync(t, 1) } func TestIterativeRandomSyncBatched(t *testing.T) { testIterativeRandomSync(t, 100) } -func testIterativeRandomSync(t *testing.T, batch int) { +func testIterativeRandomSync(t *testing.T, count int) { // Create a random trie to copy srcDb, srcTrie, srcData := makeTestTrie() @@ -186,7 +190,7 @@ func testIterativeRandomSync(t *testing.T, batch int) { sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) queue := make(map[common.Hash]struct{}) - for _, hash := range sched.Missing(batch) { + for _, hash := range sched.Missing(count) { queue[hash] = struct{}{} } for len(queue) > 0 { @@ -203,11 +207,13 @@ func testIterativeRandomSync(t *testing.T, batch int) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(diskdb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := diskdb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } + batch.Write() queue = make(map[common.Hash]struct{}) - for _, hash := range sched.Missing(batch) { + for _, hash := range sched.Missing(count) { queue[hash] = struct{}{} } } @@ -248,9 +254,11 @@ func TestIterativeRandomDelayedSync(t *testing.T) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(diskdb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := diskdb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } + batch.Write() for _, result := range results { delete(queue, result.Hash) } @@ -293,9 +301,11 @@ func TestDuplicateAvoidanceSync(t *testing.T) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(diskdb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := diskdb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } + batch.Write() queue = append(queue[:0], sched.Missing(0)...) } // Cross check that the two tries are in sync @@ -329,9 +339,11 @@ func TestIncompleteSync(t *testing.T) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(diskdb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := diskdb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } + batch.Write() for _, result := range results { added = append(added, result.Hash) }