From 4773dcbc81aac9d330df29446283361f5a7062c7 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Fri, 22 Sep 2023 14:31:10 +0800 Subject: [PATCH] trie: remove internal nodes between shortNode and child in path mode (#28163) * trie: remove internal nodes between shortNode and child in path mode * trie: address comments * core/rawdb, trie: address comments * core/rawdb: delete unused func * trie: change comments * trie: add missing tests * trie: fix lint --- core/rawdb/accessors_trie.go | 20 +++++ trie/sync.go | 95 ++++++++++++++++---- trie/sync_test.go | 163 +++++++++++++++++++++++++++++------ 3 files changed, 238 insertions(+), 40 deletions(-) diff --git a/core/rawdb/accessors_trie.go b/core/rawdb/accessors_trie.go index f5c2f8899..ea437b811 100644 --- a/core/rawdb/accessors_trie.go +++ b/core/rawdb/accessors_trie.go @@ -89,6 +89,16 @@ func HasAccountTrieNode(db ethdb.KeyValueReader, path []byte, hash common.Hash) return h.hash(data) == hash } +// ExistsAccountTrieNode checks the presence of the account trie node with the +// specified node path, regardless of the node hash. +func ExistsAccountTrieNode(db ethdb.KeyValueReader, path []byte) bool { + has, err := db.Has(accountTrieNodeKey(path)) + if err != nil { + return false + } + return has +} + // WriteAccountTrieNode writes the provided account trie node into database. func WriteAccountTrieNode(db ethdb.KeyValueWriter, path []byte, node []byte) { if err := db.Put(accountTrieNodeKey(path), node); err != nil { @@ -127,6 +137,16 @@ func HasStorageTrieNode(db ethdb.KeyValueReader, accountHash common.Hash, path [ return h.hash(data) == hash } +// ExistsStorageTrieNode checks the presence of the storage trie node with the +// specified account hash and node path, regardless of the node hash. +func ExistsStorageTrieNode(db ethdb.KeyValueReader, accountHash common.Hash, path []byte) bool { + has, err := db.Has(storageTrieNodeKey(accountHash, path)) + if err != nil { + return false + } + return has +} + // WriteStorageTrieNode writes the provided storage trie node into database. func WriteStorageTrieNode(db ethdb.KeyValueWriter, accountHash common.Hash, path []byte, node []byte) { if err := db.Put(storageTrieNodeKey(accountHash, path), node); err != nil { diff --git a/trie/sync.go b/trie/sync.go index 4f5584599..9da070607 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" ) // ErrNotRequested is returned by the trie sync when it's requested to process a @@ -42,6 +43,16 @@ var ErrAlreadyProcessed = errors.New("already processed") // memory if the node was configured with a significant number of peers. const maxFetchesPerDepth = 16384 +var ( + // deletionGauge is the metric to track how many trie node deletions + // are performed in total during the sync process. + deletionGauge = metrics.NewRegisteredGauge("trie/sync/delete", nil) + + // lookupGauge is the metric to track how many trie node lookups are + // performed to determine if node needs to be deleted. + lookupGauge = metrics.NewRegisteredGauge("trie/sync/lookup", nil) +) + // SyncPath is a path tuple identifying a particular trie node either in a single // trie (account) or a layered trie (account -> storage). // @@ -93,9 +104,10 @@ type LeafCallback func(keys [][]byte, path []byte, leaf []byte, parent common.Ha // nodeRequest represents a scheduled or already in-flight trie node retrieval request. type nodeRequest struct { - hash common.Hash // Hash of the trie node to retrieve - path []byte // Merkle path leading to this node for prioritization - data []byte // Data content of the node, cached until all subtrees complete + hash common.Hash // Hash of the trie node to retrieve + path []byte // Merkle path leading to this node for prioritization + data []byte // Data content of the node, cached until all subtrees complete + deletes [][]byte // List of internal path segments for trie nodes to delete parent *nodeRequest // Parent state node referencing this entry deps int // Number of dependencies before allowed to commit this node @@ -125,18 +137,20 @@ type CodeSyncResult struct { // syncMemBatch is an in-memory buffer of successfully downloaded but not yet // persisted data items. type syncMemBatch struct { - nodes map[string][]byte // In-memory membatch of recently completed nodes - hashes map[string]common.Hash // Hashes of recently completed nodes - codes map[common.Hash][]byte // In-memory membatch of recently completed codes - size uint64 // Estimated batch-size of in-memory data. + nodes map[string][]byte // In-memory membatch of recently completed nodes + hashes map[string]common.Hash // Hashes of recently completed nodes + deletes map[string]struct{} // List of paths for trie node to delete + codes map[common.Hash][]byte // In-memory membatch of recently completed codes + size uint64 // Estimated batch-size of in-memory data. } // newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes. func newSyncMemBatch() *syncMemBatch { return &syncMemBatch{ - nodes: make(map[string][]byte), - hashes: make(map[string]common.Hash), - codes: make(map[common.Hash][]byte), + nodes: make(map[string][]byte), + hashes: make(map[string]common.Hash), + deletes: make(map[string]struct{}), + codes: make(map[common.Hash][]byte), } } @@ -347,16 +361,23 @@ func (s *Sync) ProcessNode(result NodeSyncResult) error { // Commit flushes the data stored in the internal membatch out to persistent // storage, returning any occurred error. func (s *Sync) Commit(dbw ethdb.Batch) error { - // Dump the membatch into a database dbw + // Flush the pending node writes into database batch. for path, value := range s.membatch.nodes { owner, inner := ResolvePath([]byte(path)) rawdb.WriteTrieNode(dbw, owner, inner, s.membatch.hashes[path], value, s.scheme) } + // Flush the pending node deletes into the database batch. + // Please note that each written and deleted node has a + // unique path, ensuring no duplication occurs. + for path := range s.membatch.deletes { + owner, inner := ResolvePath([]byte(path)) + rawdb.DeleteTrieNode(dbw, owner, inner, common.Hash{} /* unused */, s.scheme) + } + // Flush the pending code writes into database batch. for hash, value := range s.membatch.codes { rawdb.WriteCode(dbw, hash, value) } - // Drop the membatch data and return - s.membatch = newSyncMemBatch() + s.membatch = newSyncMemBatch() // reset the batch return nil } @@ -425,6 +446,39 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { node: node.Val, path: append(append([]byte(nil), req.path...), key...), }} + // Mark all internal nodes between shortNode and its **in disk** + // child as invalid. This is essential in the case of path mode + // scheme; otherwise, state healing might overwrite existing child + // nodes silently while leaving a dangling parent node within the + // range of this internal path on disk. This would break the + // guarantee for state healing. + // + // While it's possible for this shortNode to overwrite a previously + // existing full node, the other branches of the fullNode can be + // retained as they remain untouched and complete. + // + // This step is only necessary for path mode, as there is no deletion + // in hash mode at all. + if _, ok := node.Val.(hashNode); ok && s.scheme == rawdb.PathScheme { + owner, inner := ResolvePath(req.path) + for i := 1; i < len(key); i++ { + // While checking for a non-existent item in Pebble can be less efficient + // without a bloom filter, the relatively low frequency of lookups makes + // the performance impact negligible. + var exists bool + if owner == (common.Hash{}) { + exists = rawdb.ExistsAccountTrieNode(s.database, append(inner, key[:i]...)) + } else { + exists = rawdb.ExistsStorageTrieNode(s.database, owner, append(inner, key[:i]...)) + } + if exists { + req.deletes = append(req.deletes, key[:i]) + deletionGauge.Inc(1) + log.Debug("Detected dangling node", "owner", owner, "path", append(inner, key[:i]...)) + } + } + lookupGauge.Inc(int64(len(key) - 1)) + } case *fullNode: for i := 0; i < 17; i++ { if node.Children[i] != nil { @@ -509,10 +563,19 @@ func (s *Sync) commitNodeRequest(req *nodeRequest) error { // Write the node content to the membatch s.membatch.nodes[string(req.path)] = req.data s.membatch.hashes[string(req.path)] = req.hash + // The size tracking refers to the db-batch, not the in-memory data. - // Therefore, we ignore the req.path, and account only for the hash+data - // which eventually is written to db. - s.membatch.size += common.HashLength + uint64(len(req.data)) + if s.scheme == rawdb.PathScheme { + s.membatch.size += uint64(len(req.path) + len(req.data)) + } else { + s.membatch.size += common.HashLength + uint64(len(req.data)) + } + // Delete the internal nodes which are marked as invalid + for _, segment := range req.deletes { + path := append(req.path, segment...) + s.membatch.deletes[string(path)] = struct{}{} + s.membatch.size += uint64(len(path)) + } delete(s.nodeReqs, string(req.path)) s.fetches[len(req.path)]-- diff --git a/trie/sync_test.go b/trie/sync_test.go index dd3506559..3b7986ef6 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -70,31 +70,53 @@ func makeTestTrie(scheme string) (ethdb.Database, *Database, *StateTrie, map[str // checkTrieContents cross references a reconstructed trie with an expected data // content map. -func checkTrieContents(t *testing.T, db ethdb.Database, scheme string, root []byte, content map[string][]byte) { +func checkTrieContents(t *testing.T, db ethdb.Database, scheme string, root []byte, content map[string][]byte, rawTrie bool) { // Check root availability and trie contents ndb := newTestDatabase(db, scheme) - trie, err := NewStateTrie(TrieID(common.BytesToHash(root)), ndb) - if err != nil { - t.Fatalf("failed to create trie at %x: %v", root, err) - } - if err := checkTrieConsistency(db, scheme, common.BytesToHash(root)); err != nil { + if err := checkTrieConsistency(db, scheme, common.BytesToHash(root), rawTrie); err != nil { t.Fatalf("inconsistent trie at %x: %v", root, err) } + type reader interface { + MustGet(key []byte) []byte + } + var r reader + if rawTrie { + trie, err := New(TrieID(common.BytesToHash(root)), ndb) + if err != nil { + t.Fatalf("failed to create trie at %x: %v", root, err) + } + r = trie + } else { + trie, err := NewStateTrie(TrieID(common.BytesToHash(root)), ndb) + if err != nil { + t.Fatalf("failed to create trie at %x: %v", root, err) + } + r = trie + } for key, val := range content { - if have := trie.MustGet([]byte(key)); !bytes.Equal(have, val) { + if have := r.MustGet([]byte(key)); !bytes.Equal(have, val) { t.Errorf("entry %x: content mismatch: have %x, want %x", key, have, val) } } } // checkTrieConsistency checks that all nodes in a trie are indeed present. -func checkTrieConsistency(db ethdb.Database, scheme string, root common.Hash) error { +func checkTrieConsistency(db ethdb.Database, scheme string, root common.Hash, rawTrie bool) error { ndb := newTestDatabase(db, scheme) - trie, err := NewStateTrie(TrieID(root), ndb) - if err != nil { - return nil // Consider a non existent state consistent + var it NodeIterator + if rawTrie { + trie, err := New(TrieID(root), ndb) + if err != nil { + return nil // Consider a non existent state consistent + } + it = trie.MustNodeIterator(nil) + } else { + trie, err := NewStateTrie(TrieID(root), ndb) + if err != nil { + return nil // Consider a non existent state consistent + } + it = trie.MustNodeIterator(nil) } - it := trie.MustNodeIterator(nil) for it.Next(true) { } return it.Error() @@ -205,7 +227,7 @@ func testIterativeSync(t *testing.T, count int, bypath bool, scheme string) { } } // Cross check that the two tries are in sync - checkTrieContents(t, diskdb, srcDb.Scheme(), srcTrie.Hash().Bytes(), srcData) + checkTrieContents(t, diskdb, srcDb.Scheme(), srcTrie.Hash().Bytes(), srcData, false) } // Tests that the trie scheduler can correctly reconstruct the state even if only @@ -271,7 +293,7 @@ func testIterativeDelayedSync(t *testing.T, scheme string) { } } // Cross check that the two tries are in sync - checkTrieContents(t, diskdb, srcDb.Scheme(), srcTrie.Hash().Bytes(), srcData) + checkTrieContents(t, diskdb, srcDb.Scheme(), srcTrie.Hash().Bytes(), srcData, false) } // Tests that given a root hash, a trie can sync iteratively on a single thread, @@ -341,7 +363,7 @@ func testIterativeRandomSync(t *testing.T, count int, scheme string) { } } // Cross check that the two tries are in sync - checkTrieContents(t, diskdb, srcDb.Scheme(), srcTrie.Hash().Bytes(), srcData) + checkTrieContents(t, diskdb, srcDb.Scheme(), srcTrie.Hash().Bytes(), srcData, false) } // Tests that the trie scheduler can correctly reconstruct the state even if only @@ -413,7 +435,7 @@ func testIterativeRandomDelayedSync(t *testing.T, scheme string) { } } // Cross check that the two tries are in sync - checkTrieContents(t, diskdb, srcDb.Scheme(), srcTrie.Hash().Bytes(), srcData) + checkTrieContents(t, diskdb, srcDb.Scheme(), srcTrie.Hash().Bytes(), srcData, false) } // Tests that a trie sync will not request nodes multiple times, even if they @@ -484,7 +506,7 @@ func testDuplicateAvoidanceSync(t *testing.T, scheme string) { } } // Cross check that the two tries are in sync - checkTrieContents(t, diskdb, srcDb.Scheme(), srcTrie.Hash().Bytes(), srcData) + checkTrieContents(t, diskdb, srcDb.Scheme(), srcTrie.Hash().Bytes(), srcData, false) } // Tests that at any point in time during a sync, only complete sub-tries are in @@ -569,7 +591,7 @@ func testIncompleteSync(t *testing.T, scheme string) { nodeHash := addedHashes[i] value := rawdb.ReadTrieNode(diskdb, owner, inner, nodeHash, scheme) rawdb.DeleteTrieNode(diskdb, owner, inner, nodeHash, scheme) - if err := checkTrieConsistency(diskdb, srcDb.Scheme(), root); err == nil { + if err := checkTrieConsistency(diskdb, srcDb.Scheme(), root, false); err == nil { t.Fatalf("trie inconsistency not caught, missing: %x", path) } rawdb.WriteTrieNode(diskdb, owner, inner, nodeHash, value, scheme) @@ -643,7 +665,7 @@ func testSyncOrdering(t *testing.T, scheme string) { } } // Cross check that the two tries are in sync - checkTrieContents(t, diskdb, srcDb.Scheme(), srcTrie.Hash().Bytes(), srcData) + checkTrieContents(t, diskdb, srcDb.Scheme(), srcTrie.Hash().Bytes(), srcData, false) // Check that the trie nodes have been requested path-ordered for i := 0; i < len(reqs)-1; i++ { @@ -664,7 +686,7 @@ func syncWith(t *testing.T, root common.Hash, db ethdb.Database, srcDb *Database // The code requests are ignored here since there is no code // at the testing trie. - paths, nodes, _ := sched.Missing(1) + paths, nodes, _ := sched.Missing(0) var elements []trieElement for i := 0; i < len(paths); i++ { elements = append(elements, trieElement{ @@ -698,7 +720,7 @@ func syncWith(t *testing.T, root common.Hash, db ethdb.Database, srcDb *Database } batch.Write() - paths, nodes, _ = sched.Missing(1) + paths, nodes, _ = sched.Missing(0) elements = elements[:0] for i := 0; i < len(paths); i++ { elements = append(elements, trieElement{ @@ -724,7 +746,7 @@ func testSyncMovingTarget(t *testing.T, scheme string) { // Create a destination trie and sync with the scheduler diskdb := rawdb.NewMemoryDatabase() syncWith(t, srcTrie.Hash(), diskdb, srcDb) - checkTrieContents(t, diskdb, srcDb.Scheme(), srcTrie.Hash().Bytes(), srcData) + checkTrieContents(t, diskdb, srcDb.Scheme(), srcTrie.Hash().Bytes(), srcData, false) // Push more modifications into the src trie, to see if dest trie can still // sync with it(overwrite stale states) @@ -748,7 +770,7 @@ func testSyncMovingTarget(t *testing.T, scheme string) { srcTrie, _ = NewStateTrie(TrieID(root), srcDb) syncWith(t, srcTrie.Hash(), diskdb, srcDb) - checkTrieContents(t, diskdb, srcDb.Scheme(), srcTrie.Hash().Bytes(), diff) + checkTrieContents(t, diskdb, srcDb.Scheme(), srcTrie.Hash().Bytes(), diff, false) // Revert added modifications from the src trie, to see if dest trie can still // sync with it(overwrite reverted states) @@ -772,5 +794,98 @@ func testSyncMovingTarget(t *testing.T, scheme string) { srcTrie, _ = NewStateTrie(TrieID(root), srcDb) syncWith(t, srcTrie.Hash(), diskdb, srcDb) - checkTrieContents(t, diskdb, srcDb.Scheme(), srcTrie.Hash().Bytes(), reverted) + checkTrieContents(t, diskdb, srcDb.Scheme(), srcTrie.Hash().Bytes(), reverted, false) +} + +// Tests if state syncer can correctly catch up the pivot move. +func TestPivotMove(t *testing.T) { + testPivotMove(t, rawdb.HashScheme, true) + testPivotMove(t, rawdb.HashScheme, false) + testPivotMove(t, rawdb.PathScheme, true) + testPivotMove(t, rawdb.PathScheme, false) +} + +func testPivotMove(t *testing.T, scheme string, tiny bool) { + var ( + srcDisk = rawdb.NewMemoryDatabase() + srcTrieDB = newTestDatabase(srcDisk, scheme) + srcTrie, _ = New(TrieID(types.EmptyRootHash), srcTrieDB) + + deleteFn = func(key []byte, tr *Trie, states map[string][]byte) { + tr.Delete(key) + delete(states, string(key)) + } + writeFn = func(key []byte, val []byte, tr *Trie, states map[string][]byte) { + if val == nil { + if tiny { + val = randBytes(4) + } else { + val = randBytes(32) + } + } + tr.Update(key, val) + states[string(key)] = common.CopyBytes(val) + } + copyStates = func(states map[string][]byte) map[string][]byte { + cpy := make(map[string][]byte) + for k, v := range states { + cpy[k] = v + } + return cpy + } + ) + stateA := make(map[string][]byte) + writeFn([]byte{0x01, 0x23}, nil, srcTrie, stateA) + writeFn([]byte{0x01, 0x24}, nil, srcTrie, stateA) + writeFn([]byte{0x12, 0x33}, nil, srcTrie, stateA) + writeFn([]byte{0x12, 0x34}, nil, srcTrie, stateA) + writeFn([]byte{0x02, 0x34}, nil, srcTrie, stateA) + writeFn([]byte{0x13, 0x44}, nil, srcTrie, stateA) + + rootA, nodesA, _ := srcTrie.Commit(false) + if err := srcTrieDB.Update(rootA, types.EmptyRootHash, 0, trienode.NewWithNodeSet(nodesA), nil); err != nil { + panic(err) + } + if err := srcTrieDB.Commit(rootA, false); err != nil { + panic(err) + } + // Create a destination trie and sync with the scheduler + destDisk := rawdb.NewMemoryDatabase() + syncWith(t, rootA, destDisk, srcTrieDB) + checkTrieContents(t, destDisk, scheme, srcTrie.Hash().Bytes(), stateA, true) + + // Delete element to collapse trie + stateB := copyStates(stateA) + srcTrie, _ = New(TrieID(rootA), srcTrieDB) + deleteFn([]byte{0x02, 0x34}, srcTrie, stateB) + deleteFn([]byte{0x13, 0x44}, srcTrie, stateB) + writeFn([]byte{0x01, 0x24}, nil, srcTrie, stateB) + + rootB, nodesB, _ := srcTrie.Commit(false) + if err := srcTrieDB.Update(rootB, rootA, 0, trienode.NewWithNodeSet(nodesB), nil); err != nil { + panic(err) + } + if err := srcTrieDB.Commit(rootB, false); err != nil { + panic(err) + } + syncWith(t, rootB, destDisk, srcTrieDB) + checkTrieContents(t, destDisk, scheme, srcTrie.Hash().Bytes(), stateB, true) + + // Add elements to expand trie + stateC := copyStates(stateB) + srcTrie, _ = New(TrieID(rootB), srcTrieDB) + + writeFn([]byte{0x01, 0x24}, stateA[string([]byte{0x01, 0x24})], srcTrie, stateC) + writeFn([]byte{0x02, 0x34}, nil, srcTrie, stateC) + writeFn([]byte{0x13, 0x44}, nil, srcTrie, stateC) + + rootC, nodesC, _ := srcTrie.Commit(false) + if err := srcTrieDB.Update(rootC, rootB, 0, trienode.NewWithNodeSet(nodesC), nil); err != nil { + panic(err) + } + if err := srcTrieDB.Commit(rootC, false); err != nil { + panic(err) + } + syncWith(t, rootC, destDisk, srcTrieDB) + checkTrieContents(t, destDisk, scheme, srcTrie.Hash().Bytes(), stateC, true) }