Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
12 changed files with 302 additions and 197 deletions
Showing only changes of commit 65ed1a6871 - Show all commits

View File

@ -36,27 +36,31 @@ func (buf *encBuffer) size() int {
return len(buf.str) + buf.lhsize
}
// toBytes creates the encoder output.
func (w *encBuffer) toBytes() []byte {
// makeBytes creates the encoder output.
func (w *encBuffer) makeBytes() []byte {
out := make([]byte, w.size())
w.copyTo(out)
return out
}
func (w *encBuffer) copyTo(dst []byte) {
strpos := 0
pos := 0
for _, head := range w.lheads {
// write string data before header
n := copy(out[pos:], w.str[strpos:head.offset])
n := copy(dst[pos:], w.str[strpos:head.offset])
pos += n
strpos += n
// write the header
enc := head.encode(out[pos:])
enc := head.encode(dst[pos:])
pos += len(enc)
}
// copy string data after the last list header
copy(out[pos:], w.str[strpos:])
return out
copy(dst[pos:], w.str[strpos:])
}
// toWriter writes the encoder output to w.
func (buf *encBuffer) toWriter(w io.Writer) (err error) {
// writeTo writes the encoder output to w.
func (buf *encBuffer) writeTo(w io.Writer) (err error) {
strpos := 0
for _, head := range buf.lheads {
// write string data before header
@ -252,6 +256,19 @@ func (r *encReader) next() []byte {
}
}
func encBufferFromWriter(w io.Writer) *encBuffer {
switch w := w.(type) {
case EncoderBuffer:
return w.buf
case *EncoderBuffer:
return w.buf
case *encBuffer:
return w
default:
return nil
}
}
// EncoderBuffer is a buffer for incremental encoding.
//
// The zero value is NOT ready for use. To get a usable buffer,
@ -279,14 +296,10 @@ func (w *EncoderBuffer) Reset(dst io.Writer) {
// If the destination writer has an *encBuffer, use it.
// Note that w.ownBuffer is left false here.
if dst != nil {
if outer, ok := dst.(*encBuffer); ok {
if outer := encBufferFromWriter(dst); outer != nil {
*w = EncoderBuffer{outer, nil, false}
return
}
if outer, ok := dst.(EncoderBuffer); ok {
*w = EncoderBuffer{outer.buf, nil, false}
return
}
}
// Get a fresh buffer.
@ -303,7 +316,7 @@ func (w *EncoderBuffer) Reset(dst io.Writer) {
func (w *EncoderBuffer) Flush() error {
var err error
if w.dst != nil {
err = w.buf.toWriter(w.dst)
err = w.buf.writeTo(w.dst)
}
// Release the internal buffer.
if w.ownBuffer {
@ -315,7 +328,15 @@ func (w *EncoderBuffer) Flush() error {
// ToBytes returns the encoded bytes.
func (w *EncoderBuffer) ToBytes() []byte {
return w.buf.toBytes()
return w.buf.makeBytes()
}
// AppendToBytes appends the encoded bytes to dst.
func (w *EncoderBuffer) AppendToBytes(dst []byte) []byte {
size := w.buf.size()
out := append(dst, make([]byte, size)...)
w.buf.copyTo(out[len(dst):])
return out
}
// Write appends b directly to the encoder output.

View File

@ -56,20 +56,16 @@ type Encoder interface {
// Please see package-level documentation of encoding rules.
func Encode(w io.Writer, val interface{}) error {
// Optimization: reuse *encBuffer when called by EncodeRLP.
if buf, ok := w.(*encBuffer); ok {
if buf := encBufferFromWriter(w); buf != nil {
return buf.encode(val)
}
if ebuf, ok := w.(EncoderBuffer); ok {
return ebuf.buf.encode(val)
}
buf := getEncBuffer()
defer encBufferPool.Put(buf)
if err := buf.encode(val); err != nil {
return err
}
return buf.toWriter(w)
return buf.writeTo(w)
}
// EncodeToBytes returns the RLP encoding of val.
@ -81,7 +77,7 @@ func EncodeToBytes(val interface{}) ([]byte, error) {
if err := buf.encode(val); err != nil {
return nil, err
}
return buf.toBytes(), nil
return buf.makeBytes(), nil
}
// EncodeToReader returns a reader from which the RLP encoding of val

View File

@ -399,6 +399,21 @@ func TestEncodeToBytes(t *testing.T) {
runEncTests(t, EncodeToBytes)
}
func TestEncodeAppendToBytes(t *testing.T) {
buffer := make([]byte, 20)
runEncTests(t, func(val interface{}) ([]byte, error) {
w := NewEncoderBuffer(nil)
defer w.Flush()
err := Encode(w, val)
if err != nil {
return nil, err
}
output := w.AppendToBytes(buffer[:0])
return output, nil
})
}
func TestEncodeToReader(t *testing.T) {
runEncTests(t, func(val interface{}) ([]byte, error) {
_, r, err := EncodeToReader(val)

View File

@ -44,7 +44,6 @@ type leaf struct {
// By 'some level' of parallelism, it's still the case that all leaves will be
// processed sequentially - onleaf will never be called in parallel or out of order.
type committer struct {
tmp sliceBuffer
sha crypto.KeccakState
onleaf LeafCallback
@ -55,7 +54,6 @@ type committer struct {
var committerPool = sync.Pool{
New: func() interface{} {
return &committer{
tmp: make(sliceBuffer, 0, 550), // cap is as large as a full fullNode.
sha: sha3.NewLegacyKeccak256().(crypto.KeccakState),
}
},

View File

@ -113,16 +113,9 @@ func (n rawFullNode) cache() (hashNode, bool) { panic("this should never end u
func (n rawFullNode) fstring(ind string) string { panic("this should never end up in a live trie") }
func (n rawFullNode) EncodeRLP(w io.Writer) error {
var nodes [17]node
for i, child := range n {
if child != nil {
nodes[i] = child
} else {
nodes[i] = nilValueNode
}
}
return rlp.Encode(w, nodes)
eb := rlp.NewEncoderBuffer(w)
n.encode(eb)
return eb.Flush()
}
// rawShortNode represents only the useful data content of a short node, with the
@ -164,11 +157,7 @@ func (n *cachedNode) rlp() []byte {
if node, ok := n.node.(rawNode); ok {
return node
}
blob, err := rlp.EncodeToBytes(n.node)
if err != nil {
panic(err)
}
return blob
return nodeToBytes(n.node)
}
// obj returns the decoded and expanded trie node, either directly from the cache,

View File

@ -24,22 +24,12 @@ import (
"golang.org/x/crypto/sha3"
)
type sliceBuffer []byte
func (b *sliceBuffer) Write(data []byte) (n int, err error) {
*b = append(*b, data...)
return len(data), nil
}
func (b *sliceBuffer) Reset() {
*b = (*b)[:0]
}
// hasher is a type used for the trie Hash operation. A hasher has some
// internal preallocated temp space
type hasher struct {
sha crypto.KeccakState
tmp sliceBuffer
tmp []byte
encbuf rlp.EncoderBuffer
parallel bool // Whether to use paralallel threads when hashing
}
@ -47,8 +37,9 @@ type hasher struct {
var hasherPool = sync.Pool{
New: func() interface{} {
return &hasher{
tmp: make(sliceBuffer, 0, 550), // cap is as large as a full fullNode.
sha: sha3.NewLegacyKeccak256().(crypto.KeccakState),
tmp: make([]byte, 0, 550), // cap is as large as a full fullNode.
sha: sha3.NewLegacyKeccak256().(crypto.KeccakState),
encbuf: rlp.NewEncoderBuffer(nil),
}
},
}
@ -153,30 +144,41 @@ func (h *hasher) hashFullNodeChildren(n *fullNode) (collapsed *fullNode, cached
// into compact form for RLP encoding.
// If the rlp data is smaller than 32 bytes, `nil` is returned.
func (h *hasher) shortnodeToHash(n *shortNode, force bool) node {
h.tmp.Reset()
if err := rlp.Encode(&h.tmp, n); err != nil {
panic("encode error: " + err.Error())
}
n.encode(h.encbuf)
enc := h.encodedBytes()
if len(h.tmp) < 32 && !force {
if len(enc) < 32 && !force {
return n // Nodes smaller than 32 bytes are stored inside their parent
}
return h.hashData(h.tmp)
return h.hashData(enc)
}
// shortnodeToHash is used to creates a hashNode from a set of hashNodes, (which
// may contain nil values)
func (h *hasher) fullnodeToHash(n *fullNode, force bool) node {
h.tmp.Reset()
// Generate the RLP encoding of the node
if err := n.EncodeRLP(&h.tmp); err != nil {
panic("encode error: " + err.Error())
}
n.encode(h.encbuf)
enc := h.encodedBytes()
if len(h.tmp) < 32 && !force {
if len(enc) < 32 && !force {
return n // Nodes smaller than 32 bytes are stored inside their parent
}
return h.hashData(h.tmp)
return h.hashData(enc)
}
// encodedBytes returns the result of the last encoding operation on h.encbuf.
// This also resets the encoder buffer.
//
// All node encoding must be done like this:
//
// node.encode(h.encbuf)
// enc := h.encodedBytes()
//
// This convention exists because node.encode can only be inlined/escape-analyzed when
// called on a concrete receiver type.
func (h *hasher) encodedBytes() []byte {
h.tmp = h.encbuf.AppendToBytes(h.tmp[:0])
h.encbuf.Reset(nil)
return h.tmp
}
// hashData hashes the provided data

View File

@ -23,7 +23,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
)
// Iterator is a key-value trie iterator that traverses a Trie.
@ -214,8 +213,7 @@ func (it *nodeIterator) LeafProof() [][]byte {
// Gather nodes that end up as hash nodes (or the root)
node, hashed := hasher.proofHash(item.node)
if _, ok := hashed.(hashNode); ok || i == 0 {
enc, _ := rlp.EncodeToBytes(node)
proofs = append(proofs, enc)
proofs = append(proofs, nodeToBytes(node))
}
}
return proofs

View File

@ -28,8 +28,9 @@ import (
var indices = []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f", "[17]"}
type node interface {
fstring(string) string
cache() (hashNode, bool)
encode(w rlp.EncoderBuffer)
fstring(string) string
}
type (
@ -52,16 +53,9 @@ var nilValueNode = valueNode(nil)
// EncodeRLP encodes a full node into the consensus RLP format.
func (n *fullNode) EncodeRLP(w io.Writer) error {
var nodes [17]node
for i, child := range &n.Children {
if child != nil {
nodes[i] = child
} else {
nodes[i] = nilValueNode
}
}
return rlp.Encode(w, nodes)
eb := rlp.NewEncoderBuffer(w)
n.encode(eb)
return eb.Flush()
}
func (n *fullNode) copy() *fullNode { copy := *n; return &copy }

87
trie/node_enc.go Normal file
View File

@ -0,0 +1,87 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package trie
import (
"github.com/ethereum/go-ethereum/rlp"
)
func nodeToBytes(n node) []byte {
w := rlp.NewEncoderBuffer(nil)
n.encode(w)
result := w.ToBytes()
w.Flush()
return result
}
func (n *fullNode) encode(w rlp.EncoderBuffer) {
offset := w.List()
for _, c := range n.Children {
if c != nil {
c.encode(w)
} else {
w.Write(rlp.EmptyString)
}
}
w.ListEnd(offset)
}
func (n *shortNode) encode(w rlp.EncoderBuffer) {
offset := w.List()
w.WriteBytes(n.Key)
if n.Val != nil {
n.Val.encode(w)
} else {
w.Write(rlp.EmptyString)
}
w.ListEnd(offset)
}
func (n hashNode) encode(w rlp.EncoderBuffer) {
w.WriteBytes(n)
}
func (n valueNode) encode(w rlp.EncoderBuffer) {
w.WriteBytes(n)
}
func (n rawFullNode) encode(w rlp.EncoderBuffer) {
offset := w.List()
for _, c := range n {
if c != nil {
c.encode(w)
} else {
w.Write(rlp.EmptyString)
}
}
w.ListEnd(offset)
}
func (n *rawShortNode) encode(w rlp.EncoderBuffer) {
offset := w.List()
w.WriteBytes(n.Key)
if n.Val != nil {
n.Val.encode(w)
} else {
w.Write(rlp.EmptyString)
}
w.ListEnd(offset)
}
func (n rawNode) encode(w rlp.EncoderBuffer) {
w.Write(n)
}

View File

@ -25,7 +25,6 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)
// Prove constructs a merkle proof for key. The result contains all encoded nodes
@ -79,7 +78,7 @@ func (t *Trie) Prove(key []byte, fromLevel uint, proofDb ethdb.KeyValueWriter) e
if hash, ok := hn.(hashNode); ok || i == 0 {
// If the node's database encoding is a hash (or is the
// root node), it becomes a proof element.
enc, _ := rlp.EncodeToBytes(n)
enc := nodeToBytes(n)
if !ok {
hash = hasher.hashData(enc)
}

View File

@ -28,7 +28,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)
var ErrCommitDisabled = errors.New("no database for committing")
@ -224,6 +223,7 @@ func (st *StackTrie) insert(key, value []byte) {
switch st.nodeType {
case branchNode: /* Branch */
idx := int(key[0])
// Unresolve elder siblings
for i := idx - 1; i >= 0; i-- {
if st.children[i] != nil {
@ -233,12 +233,14 @@ func (st *StackTrie) insert(key, value []byte) {
break
}
}
// Add new child
if st.children[idx] == nil {
st.children[idx] = newLeaf(key[1:], value, st.db)
} else {
st.children[idx].insert(key[1:], value)
}
case extNode: /* Ext */
// Compare both key chunks and see where they differ
diffidx := st.getDiffIndex(key)
@ -326,10 +328,9 @@ func (st *StackTrie) insert(key, value []byte) {
p = st.children[0]
}
// Create the two child leaves: the one containing the
// original value and the one containing the new value
// The child leave will be hashed directly in order to
// free up some memory.
// Create the two child leaves: one containing the original
// value and another containing the new value. The child leaf
// is hashed directly in order to free up some memory.
origIdx := st.key[diffidx]
p.children[origIdx] = newLeaf(st.key[diffidx+1:], st.val, st.db)
p.children[origIdx].hash()
@ -341,19 +342,22 @@ func (st *StackTrie) insert(key, value []byte) {
// over to the children.
st.key = st.key[:diffidx]
st.val = nil
case emptyNode: /* Empty */
st.nodeType = leafNode
st.key = key
st.val = value
case hashedNode:
panic("trying to insert into hash")
default:
panic("invalid type")
}
}
// hash() hashes the node 'st' and converts it into 'hashedNode', if possible.
// Possible outcomes:
// hash converts st into a 'hashedNode', if possible. Possible outcomes:
//
// 1. The rlp-encoded value was >= 32 bytes:
// - Then the 32-byte `hash` will be accessible in `st.val`.
// - And the 'st.type' will be 'hashedNode'
@ -361,119 +365,116 @@ func (st *StackTrie) insert(key, value []byte) {
// - Then the <32 byte rlp-encoded value will be accessible in 'st.val'.
// - And the 'st.type' will be 'hashedNode' AGAIN
//
// This method will also:
// set 'st.type' to hashedNode
// clear 'st.key'
// This method also sets 'st.type' to hashedNode, and clears 'st.key'.
func (st *StackTrie) hash() {
/* Shortcut if node is already hashed */
if st.nodeType == hashedNode {
return
}
// The 'hasher' is taken from a pool, but we don't actually
// claim an instance until all children are done with their hashing,
// and we actually need one
var h *hasher
h := newHasher(false)
defer returnHasherToPool(h)
st.hashRec(h)
}
func (st *StackTrie) hashRec(hasher *hasher) {
// The switch below sets this to the RLP-encoding of this node.
var encodedNode []byte
switch st.nodeType {
case branchNode:
var nodes [17]node
for i, child := range st.children {
if child == nil {
nodes[i] = nilValueNode
continue
}
child.hash()
if len(child.val) < 32 {
nodes[i] = rawNode(child.val)
} else {
nodes[i] = hashNode(child.val)
}
st.children[i] = nil // Reclaim mem from subtree
returnToPool(child)
}
nodes[16] = nilValueNode
h = newHasher(false)
defer returnHasherToPool(h)
h.tmp.Reset()
if err := rlp.Encode(&h.tmp, nodes); err != nil {
panic(err)
}
case extNode:
st.children[0].hash()
h = newHasher(false)
defer returnHasherToPool(h)
h.tmp.Reset()
var valuenode node
if len(st.children[0].val) < 32 {
valuenode = rawNode(st.children[0].val)
} else {
valuenode = hashNode(st.children[0].val)
}
n := struct {
Key []byte
Val node
}{
Key: hexToCompact(st.key),
Val: valuenode,
}
if err := rlp.Encode(&h.tmp, n); err != nil {
panic(err)
}
returnToPool(st.children[0])
st.children[0] = nil // Reclaim mem from subtree
case leafNode:
h = newHasher(false)
defer returnHasherToPool(h)
h.tmp.Reset()
st.key = append(st.key, byte(16))
sz := hexToCompactInPlace(st.key)
n := [][]byte{st.key[:sz], st.val}
if err := rlp.Encode(&h.tmp, n); err != nil {
panic(err)
}
case hashedNode:
return
case emptyNode:
st.val = emptyRoot.Bytes()
st.key = st.key[:0]
st.nodeType = hashedNode
return
case branchNode:
var nodes rawFullNode
for i, child := range st.children {
if child == nil {
nodes[i] = nilValueNode
continue
}
child.hashRec(hasher)
if len(child.val) < 32 {
nodes[i] = rawNode(child.val)
} else {
nodes[i] = hashNode(child.val)
}
// Release child back to pool.
st.children[i] = nil
returnToPool(child)
}
nodes.encode(hasher.encbuf)
encodedNode = hasher.encodedBytes()
case extNode:
st.children[0].hashRec(hasher)
sz := hexToCompactInPlace(st.key)
n := rawShortNode{Key: st.key[:sz]}
if len(st.children[0].val) < 32 {
n.Val = rawNode(st.children[0].val)
} else {
n.Val = hashNode(st.children[0].val)
}
n.encode(hasher.encbuf)
encodedNode = hasher.encodedBytes()
// Release child back to pool.
returnToPool(st.children[0])
st.children[0] = nil
case leafNode:
st.key = append(st.key, byte(16))
sz := hexToCompactInPlace(st.key)
n := rawShortNode{Key: st.key[:sz], Val: valueNode(st.val)}
n.encode(hasher.encbuf)
encodedNode = hasher.encodedBytes()
default:
panic("Invalid node type")
panic("invalid node type")
}
st.key = st.key[:0]
st.nodeType = hashedNode
if len(h.tmp) < 32 {
st.val = common.CopyBytes(h.tmp)
st.key = st.key[:0]
if len(encodedNode) < 32 {
st.val = common.CopyBytes(encodedNode)
return
}
// Write the hash to the 'val'. We allocate a new val here to not mutate
// input values
st.val = make([]byte, 32)
h.sha.Reset()
h.sha.Write(h.tmp)
h.sha.Read(st.val)
st.val = hasher.hashData(encodedNode)
if st.db != nil {
// TODO! Is it safe to Put the slice here?
// Do all db implementations copy the value provided?
st.db.Put(st.val, h.tmp)
st.db.Put(st.val, encodedNode)
}
}
// Hash returns the hash of the current node
// Hash returns the hash of the current node.
func (st *StackTrie) Hash() (h common.Hash) {
st.hash()
if len(st.val) != 32 {
// If the node's RLP isn't 32 bytes long, the node will not
// be hashed, and instead contain the rlp-encoding of the
// node. For the top level node, we need to force the hashing.
ret := make([]byte, 32)
h := newHasher(false)
defer returnHasherToPool(h)
h.sha.Reset()
h.sha.Write(st.val)
h.sha.Read(ret)
return common.BytesToHash(ret)
hasher := newHasher(false)
defer returnHasherToPool(hasher)
st.hashRec(hasher)
if len(st.val) == 32 {
copy(h[:], st.val)
return h
}
return common.BytesToHash(st.val)
// If the node's RLP isn't 32 bytes long, the node will not
// be hashed, and instead contain the rlp-encoding of the
// node. For the top level node, we need to force the hashing.
hasher.sha.Reset()
hasher.sha.Write(st.val)
hasher.sha.Read(h[:])
return h
}
// Commit will firstly hash the entrie trie if it's still not hashed
@ -483,23 +484,26 @@ func (st *StackTrie) Hash() (h common.Hash) {
//
// The associated database is expected, otherwise the whole commit
// functionality should be disabled.
func (st *StackTrie) Commit() (common.Hash, error) {
func (st *StackTrie) Commit() (h common.Hash, err error) {
if st.db == nil {
return common.Hash{}, ErrCommitDisabled
}
st.hash()
if len(st.val) != 32 {
// If the node's RLP isn't 32 bytes long, the node will not
// be hashed (and committed), and instead contain the rlp-encoding of the
// node. For the top level node, we need to force the hashing+commit.
ret := make([]byte, 32)
h := newHasher(false)
defer returnHasherToPool(h)
h.sha.Reset()
h.sha.Write(st.val)
h.sha.Read(ret)
st.db.Put(ret, st.val)
return common.BytesToHash(ret), nil
hasher := newHasher(false)
defer returnHasherToPool(hasher)
st.hashRec(hasher)
if len(st.val) == 32 {
copy(h[:], st.val)
return h, nil
}
return common.BytesToHash(st.val), nil
// If the node's RLP isn't 32 bytes long, the node will not
// be hashed (and committed), and instead contain the rlp-encoding of the
// node. For the top level node, we need to force the hashing+commit.
hasher.sha.Reset()
hasher.sha.Write(st.val)
hasher.sha.Read(h[:])
st.db.Put(h[:], st.val)
return h, nil
}

View File

@ -414,8 +414,9 @@ func runRandTest(rt randTest) bool {
values := make(map[string]string) // tracks content of the trie
for i, step := range rt {
fmt.Printf("{op: %d, key: common.Hex2Bytes(\"%x\"), value: common.Hex2Bytes(\"%x\")}, // step %d\n",
step.op, step.key, step.value, i)
// fmt.Printf("{op: %d, key: common.Hex2Bytes(\"%x\"), value: common.Hex2Bytes(\"%x\")}, // step %d\n",
// step.op, step.key, step.value, i)
switch step.op {
case opUpdate:
tr.Update(step.key, step.value)
@ -885,7 +886,8 @@ func TestCommitSequenceSmallRoot(t *testing.T) {
if stRoot != root {
t.Fatalf("root wrong, got %x exp %x", stRoot, root)
}
fmt.Printf("root: %x\n", stRoot)
t.Logf("root: %x\n", stRoot)
if got, exp := stackTrieSponge.sponge.Sum(nil), s.sponge.Sum(nil); !bytes.Equal(got, exp) {
t.Fatalf("test, disk write sequence wrong:\ngot %x exp %x\n", got, exp)
}