core, trie, eth: refactor stacktrie constructor (#28350)

This change enhances the stacktrie constructor by introducing an option struct. It also simplifies the `Hash` and `Commit` operations, getting rid of the special handling round root node.
This commit is contained in:
rjl493456442 2023-10-17 20:09:25 +08:00 committed by GitHub
parent aeb0abf80a
commit 1b1611b8d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 121 additions and 120 deletions

View File

@ -362,21 +362,15 @@ func generateTrieRoot(db ethdb.KeyValueWriter, scheme string, it Iterator, accou
}
func stackTrieGenerate(db ethdb.KeyValueWriter, scheme string, owner common.Hash, in chan trieKV, out chan common.Hash) {
var nodeWriter trie.NodeWriteFunc
options := trie.NewStackTrieOptions()
if db != nil {
nodeWriter = func(path []byte, hash common.Hash, blob []byte) {
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(db, owner, path, hash, blob, scheme)
}
})
}
t := trie.NewStackTrie(nodeWriter)
t := trie.NewStackTrie(options)
for leaf := range in {
t.Update(leaf.key[:], leaf.value)
}
var root common.Hash
if db == nil {
root = t.Hash()
} else {
root, _ = t.Commit()
}
out <- root
out <- t.Commit()
}

View File

@ -964,10 +964,12 @@ func (s *StateDB) fastDeleteStorage(addrHash common.Hash, root common.Hash) (boo
nodes = trienode.NewNodeSet(addrHash)
slots = make(map[common.Hash][]byte)
)
stack := trie.NewStackTrie(func(path []byte, hash common.Hash, blob []byte) {
options := trie.NewStackTrieOptions()
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
nodes.AddNode(path, trienode.NewDeleted())
size += common.StorageSize(len(path))
})
stack := trie.NewStackTrie(options)
for iter.Next() {
if size > storageDeleteLimit {
return true, size, nil, nil, nil

View File

@ -738,9 +738,11 @@ func (s *Syncer) loadSyncStatus() {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
task.genTrie = trie.NewStackTrie(func(path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(task.genBatch, common.Hash{}, path, hash, val, s.scheme)
options := trie.NewStackTrieOptions()
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(task.genBatch, common.Hash{}, path, hash, blob, s.scheme)
})
task.genTrie = trie.NewStackTrie(options)
for accountHash, subtasks := range task.SubTasks {
for _, subtask := range subtasks {
subtask := subtask // closure for subtask.genBatch in the stacktrie writer callback
@ -752,9 +754,11 @@ func (s *Syncer) loadSyncStatus() {
},
}
owner := accountHash // local assignment for stacktrie writer closure
subtask.genTrie = trie.NewStackTrie(func(path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(subtask.genBatch, owner, path, hash, val, s.scheme)
options := trie.NewStackTrieOptions()
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(subtask.genBatch, owner, path, hash, blob, s.scheme)
})
subtask.genTrie = trie.NewStackTrie(options)
}
}
}
@ -806,14 +810,16 @@ func (s *Syncer) loadSyncStatus() {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
options := trie.NewStackTrieOptions()
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(batch, common.Hash{}, path, hash, blob, s.scheme)
})
s.tasks = append(s.tasks, &accountTask{
Next: next,
Last: last,
SubTasks: make(map[common.Hash][]*storageTask),
genBatch: batch,
genTrie: trie.NewStackTrie(func(path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(batch, common.Hash{}, path, hash, val, s.scheme)
}),
genTrie: trie.NewStackTrie(options),
})
log.Debug("Created account sync task", "from", next, "last", last)
next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1))
@ -2006,14 +2012,16 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
},
}
owner := account // local assignment for stacktrie writer closure
options := trie.NewStackTrieOptions()
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, blob, s.scheme)
})
tasks = append(tasks, &storageTask{
Next: common.Hash{},
Last: r.End(),
root: acc.Root,
genBatch: batch,
genTrie: trie.NewStackTrie(func(path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, val, s.scheme)
}),
genTrie: trie.NewStackTrie(options),
})
for r.Next() {
batch := ethdb.HookedBatch{
@ -2022,14 +2030,16 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
options := trie.NewStackTrieOptions()
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, blob, s.scheme)
})
tasks = append(tasks, &storageTask{
Next: r.Start(),
Last: r.End(),
root: acc.Root,
genBatch: batch,
genTrie: trie.NewStackTrie(func(path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, val, s.scheme)
}),
genTrie: trie.NewStackTrie(options),
})
}
for _, task := range tasks {
@ -2075,9 +2085,11 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
if i < len(res.hashes)-1 || res.subTask == nil {
// no need to make local reassignment of account: this closure does not outlive the loop
tr := trie.NewStackTrie(func(path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(batch, account, path, hash, val, s.scheme)
options := trie.NewStackTrieOptions()
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(batch, account, path, hash, blob, s.scheme)
})
tr := trie.NewStackTrie(options)
for j := 0; j < len(res.hashes[i]); j++ {
tr.Update(res.hashes[i][j][:], res.slots[i][j])
}
@ -2099,9 +2111,8 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
// Large contracts could have generated new trie nodes, flush them to disk
if res.subTask != nil {
if res.subTask.done {
if root, err := res.subTask.genTrie.Commit(); err != nil {
log.Error("Failed to commit stack slots", "err", err)
} else if root == res.subTask.root {
root := res.subTask.genTrie.Commit()
if root == res.subTask.root {
// If the chunk's root is an overflown but full delivery, clear the heal request
for i, account := range res.mainTask.res.hashes {
if account == res.accounts[len(res.accounts)-1] {
@ -2317,9 +2328,7 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
// flush after finalizing task.done. It's fine even if we crash and lose this
// write as it will only cause more data to be downloaded during heal.
if task.done {
if _, err := task.genTrie.Commit(); err != nil {
log.Error("Failed to commit stack account", "err", err)
}
task.genTrie.Commit()
}
if task.genBatch.ValueSize() > ethdb.IdealBatchSize || task.done {
if err := task.genBatch.Write(); err != nil {

View File

@ -140,9 +140,11 @@ func (f *fuzzer) fuzz() int {
trieA = trie.NewEmpty(dbA)
spongeB = &spongeDb{sponge: sha3.NewLegacyKeccak256()}
dbB = trie.NewDatabase(rawdb.NewDatabase(spongeB), nil)
trieB = trie.NewStackTrie(func(path []byte, hash common.Hash, blob []byte) {
options = trie.NewStackTrieOptions().WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(spongeB, common.Hash{}, path, hash, blob, dbB.Scheme())
})
trieB = trie.NewStackTrie(options)
vals []kv
useful bool
maxElements = 10000
@ -204,19 +206,20 @@ func (f *fuzzer) fuzz() int {
// Ensure all the nodes are persisted correctly
var (
nodeset = make(map[string][]byte) // path -> blob
trieC = trie.NewStackTrie(func(path []byte, hash common.Hash, blob []byte) {
nodeset = make(map[string][]byte) // path -> blob
optionsC = trie.NewStackTrieOptions().WithWriter(func(path []byte, hash common.Hash, blob []byte) {
if crypto.Keccak256Hash(blob) != hash {
panic("invalid node blob")
}
nodeset[string(path)] = common.CopyBytes(blob)
})
trieC = trie.NewStackTrie(optionsC)
checked int
)
for _, kv := range vals {
trieC.MustUpdate(kv.k, kv.v)
}
rootC, _ := trieC.Commit()
rootC := trieC.Commit()
if rootA != rootC {
panic(fmt.Sprintf("roots differ: (trie) %x != %x (stacktrie)", rootA, rootC))
}

View File

@ -17,7 +17,6 @@
package trie
import (
"errors"
"sync"
"github.com/ethereum/go-ethereum/common"
@ -26,28 +25,40 @@ import (
)
var (
ErrCommitDisabled = errors.New("no database for committing")
stPool = sync.Pool{New: func() any { return new(stNode) }}
_ = types.TrieHasher((*StackTrie)(nil))
stPool = sync.Pool{New: func() any { return new(stNode) }}
_ = types.TrieHasher((*StackTrie)(nil))
)
// NodeWriteFunc is used to provide all information of a dirty node for committing
// so that callers can flush nodes into database with desired scheme.
type NodeWriteFunc = func(path []byte, hash common.Hash, blob []byte)
// StackTrieOptions contains the configured options for manipulating the stackTrie.
type StackTrieOptions struct {
Writer func(path []byte, hash common.Hash, blob []byte) // The function to commit the dirty nodes
}
// NewStackTrieOptions initializes an empty options for stackTrie.
func NewStackTrieOptions() *StackTrieOptions { return &StackTrieOptions{} }
// WithWriter configures trie node writer within the options.
func (o *StackTrieOptions) WithWriter(writer func(path []byte, hash common.Hash, blob []byte)) *StackTrieOptions {
o.Writer = writer
return o
}
// StackTrie is a trie implementation that expects keys to be inserted
// in order. Once it determines that a subtree will no longer be inserted
// into, it will hash it and free up the memory it uses.
type StackTrie struct {
writeFn NodeWriteFunc // function for committing nodes, can be nil
options *StackTrieOptions
root *stNode
h *hasher
}
// NewStackTrie allocates and initializes an empty trie.
func NewStackTrie(writeFn NodeWriteFunc) *StackTrie {
func NewStackTrie(options *StackTrieOptions) *StackTrie {
if options == nil {
options = NewStackTrieOptions()
}
return &StackTrie{
writeFn: writeFn,
options: options,
root: stPool.Get().(*stNode),
h: newHasher(false),
}
@ -59,7 +70,9 @@ func (t *StackTrie) Update(key, value []byte) error {
if len(value) == 0 {
panic("deletion not supported")
}
t.insert(t.root, k[:len(k)-1], value, nil)
k = k[:len(k)-1] // chop the termination flag
t.insert(t.root, k, value, nil)
return nil
}
@ -71,8 +84,9 @@ func (t *StackTrie) MustUpdate(key, value []byte) {
}
}
// Reset resets the stack trie object to empty state.
func (t *StackTrie) Reset() {
t.writeFn = nil
t.options = NewStackTrieOptions()
t.root = stPool.Get().(*stNode)
}
@ -138,7 +152,7 @@ func (n *stNode) getDiffIndex(key []byte) int {
// Helper function to that inserts a (key, value) pair into
// the trie.
func (t *StackTrie) insert(st *stNode, key, value []byte, prefix []byte) {
func (t *StackTrie) insert(st *stNode, key, value []byte, path []byte) {
switch st.typ {
case branchNode: /* Branch */
idx := int(key[0])
@ -147,7 +161,7 @@ func (t *StackTrie) insert(st *stNode, key, value []byte, prefix []byte) {
for i := idx - 1; i >= 0; i-- {
if st.children[i] != nil {
if st.children[i].typ != hashedNode {
t.hash(st.children[i], append(prefix, byte(i)))
t.hash(st.children[i], append(path, byte(i)))
}
break
}
@ -157,7 +171,7 @@ func (t *StackTrie) insert(st *stNode, key, value []byte, prefix []byte) {
if st.children[idx] == nil {
st.children[idx] = newLeaf(key[1:], value)
} else {
t.insert(st.children[idx], key[1:], value, append(prefix, key[0]))
t.insert(st.children[idx], key[1:], value, append(path, key[0]))
}
case extNode: /* Ext */
@ -172,7 +186,7 @@ func (t *StackTrie) insert(st *stNode, key, value []byte, prefix []byte) {
if diffidx == len(st.key) {
// Ext key and key segment are identical, recurse into
// the child node.
t.insert(st.children[0], key[diffidx:], value, append(prefix, key[:diffidx]...))
t.insert(st.children[0], key[diffidx:], value, append(path, key[:diffidx]...))
return
}
// Save the original part. Depending if the break is
@ -185,14 +199,14 @@ func (t *StackTrie) insert(st *stNode, key, value []byte, prefix []byte) {
// extension. The path prefix of the newly-inserted
// extension should also contain the different byte.
n = newExt(st.key[diffidx+1:], st.children[0])
t.hash(n, append(prefix, st.key[:diffidx+1]...))
t.hash(n, append(path, st.key[:diffidx+1]...))
} else {
// Break on the last byte, no need to insert
// an extension node: reuse the current node.
// The path prefix of the original part should
// still be same.
n = st.children[0]
t.hash(n, append(prefix, st.key...))
t.hash(n, append(path, st.key...))
}
var p *stNode
if diffidx == 0 {
@ -257,7 +271,7 @@ func (t *StackTrie) insert(st *stNode, key, value []byte, prefix []byte) {
// is hashed directly in order to free up some memory.
origIdx := st.key[diffidx]
p.children[origIdx] = newLeaf(st.key[diffidx+1:], st.val)
t.hash(p.children[origIdx], append(prefix, st.key[:diffidx+1]...))
t.hash(p.children[origIdx], append(path, st.key[:diffidx+1]...))
newIdx := key[diffidx]
p.children[newIdx] = newLeaf(key[diffidx+1:], value)
@ -292,8 +306,7 @@ func (t *StackTrie) insert(st *stNode, key, value []byte, prefix []byte) {
//
// This method also sets 'st.type' to hashedNode, and clears 'st.key'.
func (t *StackTrie) hash(st *stNode, path []byte) {
// The switch below sets this to the RLP-encoding of this node.
var encodedNode []byte
var blob []byte // RLP-encoded node blob
switch st.typ {
case hashedNode:
@ -323,11 +336,13 @@ func (t *StackTrie) hash(st *stNode, path []byte) {
stPool.Put(child.reset()) // Release child back to pool.
}
nodes.encode(t.h.encbuf)
encodedNode = t.h.encodedBytes()
blob = t.h.encodedBytes()
case extNode:
// recursively hash and commit child as the first step
t.hash(st.children[0], append(path, st.key...))
// encode the extension node
n := shortNode{Key: hexToCompactInPlace(st.key)}
if len(st.children[0].val) < 32 {
n.Val = rawNode(st.children[0].val)
@ -335,7 +350,7 @@ func (t *StackTrie) hash(st *stNode, path []byte) {
n.Val = hashNode(st.children[0].val)
}
n.encode(t.h.encbuf)
encodedNode = t.h.encodedBytes()
blob = t.h.encodedBytes()
stPool.Put(st.children[0].reset()) // Release child back to pool.
st.children[0] = nil
@ -345,7 +360,7 @@ func (t *StackTrie) hash(st *stNode, path []byte) {
n := shortNode{Key: hexToCompactInPlace(st.key), Val: valueNode(st.val)}
n.encode(t.h.encbuf)
encodedNode = t.h.encodedBytes()
blob = t.h.encodedBytes()
default:
panic("invalid node type")
@ -353,60 +368,38 @@ func (t *StackTrie) hash(st *stNode, path []byte) {
st.typ = hashedNode
st.key = st.key[:0]
if len(encodedNode) < 32 {
st.val = common.CopyBytes(encodedNode)
// Skip committing the non-root node if the size is smaller than 32 bytes.
if len(blob) < 32 && len(path) > 0 {
st.val = common.CopyBytes(blob)
return
}
// Write the hash to the 'val'. We allocate a new val here to not mutate
// input values
st.val = t.h.hashData(encodedNode)
if t.writeFn != nil {
t.writeFn(path, common.BytesToHash(st.val), encodedNode)
// input values.
st.val = t.h.hashData(blob)
// Commit the trie node if the writer is configured.
if t.options.Writer != nil {
t.options.Writer(path, common.BytesToHash(st.val), blob)
}
}
// Hash returns the hash of the current node.
func (t *StackTrie) Hash() (h common.Hash) {
st := t.root
t.hash(st, nil)
if len(st.val) == 32 {
copy(h[:], st.val)
return h
}
// If the node's RLP isn't 32 bytes long, the node will not
// be hashed, and instead contain the rlp-encoding of the
// node. For the top level node, we need to force the hashing.
t.h.sha.Reset()
t.h.sha.Write(st.val)
t.h.sha.Read(h[:])
return h
}
// Commit will firstly hash the entire trie if it's still not hashed
// and then commit all nodes to the associated database. Actually most
// of the trie nodes MAY have been committed already. The main purpose
// here is to commit the root node.
// Hash will firstly hash the entire trie if it's still not hashed and then commit
// all nodes to the associated database. Actually most of the trie nodes have been
// committed already. The main purpose here is to commit the nodes on right boundary.
//
// The associated database is expected, otherwise the whole commit
// functionality should be disabled.
func (t *StackTrie) Commit() (h common.Hash, err error) {
if t.writeFn == nil {
return common.Hash{}, ErrCommitDisabled
}
st := t.root
t.hash(st, nil)
if len(st.val) == 32 {
copy(h[:], st.val)
return h, nil
}
// If the node's RLP isn't 32 bytes long, the node will not
// be hashed (and committed), and instead contain the rlp-encoding of the
// node. For the top level node, we need to force the hashing+commit.
t.h.sha.Reset()
t.h.sha.Write(st.val)
t.h.sha.Read(h[:])
t.writeFn(nil, h, st.val)
return h, nil
// For stack trie, Hash and Commit are functionally identical.
func (t *StackTrie) Hash() common.Hash {
n := t.root
t.hash(n, nil)
return common.BytesToHash(n.val)
}
// Commit will firstly hash the entire trie if it's still not hashed and then commit
// all nodes to the associated database. Actually most of the trie nodes have been
// committed already. The main purpose here is to commit the nodes on right boundary.
//
// For stack trie, Hash and Commit are functionally identical.
func (t *StackTrie) Commit() common.Hash {
return t.Hash()
}

View File

@ -912,9 +912,12 @@ func TestCommitSequenceStackTrie(t *testing.T) {
trie := NewEmpty(db)
// Another sponge is used for the stacktrie commits
stackTrieSponge := &spongeDb{sponge: sha3.NewLegacyKeccak256(), id: "b"}
stTrie := NewStackTrie(func(path []byte, hash common.Hash, blob []byte) {
options := NewStackTrieOptions()
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(stackTrieSponge, common.Hash{}, path, hash, blob, db.Scheme())
})
stTrie := NewStackTrie(options)
// Fill the trie with elements
for i := 0; i < count; i++ {
// For the stack trie, we need to do inserts in proper order
@ -937,10 +940,7 @@ func TestCommitSequenceStackTrie(t *testing.T) {
db.Update(root, types.EmptyRootHash, 0, trienode.NewWithNodeSet(nodes), nil)
db.Commit(root, false)
// And flush stacktrie -> disk
stRoot, err := stTrie.Commit()
if err != nil {
t.Fatalf("Failed to commit stack trie %v", err)
}
stRoot := stTrie.Commit()
if stRoot != root {
t.Fatalf("root wrong, got %x exp %x", stRoot, root)
}
@ -971,9 +971,12 @@ func TestCommitSequenceSmallRoot(t *testing.T) {
trie := NewEmpty(db)
// Another sponge is used for the stacktrie commits
stackTrieSponge := &spongeDb{sponge: sha3.NewLegacyKeccak256(), id: "b"}
stTrie := NewStackTrie(func(path []byte, hash common.Hash, blob []byte) {
options := NewStackTrieOptions()
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(stackTrieSponge, common.Hash{}, path, hash, blob, db.Scheme())
})
stTrie := NewStackTrie(options)
// Add a single small-element to the trie(s)
key := make([]byte, 5)
key[0] = 1
@ -985,10 +988,7 @@ func TestCommitSequenceSmallRoot(t *testing.T) {
db.Update(root, types.EmptyRootHash, 0, trienode.NewWithNodeSet(nodes), nil)
db.Commit(root, false)
// And flush stacktrie -> disk
stRoot, err := stTrie.Commit()
if err != nil {
t.Fatalf("Failed to commit stack trie %v", err)
}
stRoot := stTrie.Commit()
if stRoot != root {
t.Fatalf("root wrong, got %x exp %x", stRoot, root)
}