276 lines
9.1 KiB
Go
276 lines
9.1 KiB
Go
|
// Copyright 2022 The go-ethereum Authors
|
||
|
// This file is part of the go-ethereum library.
|
||
|
//
|
||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||
|
// the Free Software Foundation, either version 3 of the License, or
|
||
|
// (at your option) any later version.
|
||
|
//
|
||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
// GNU Lesser General Public License for more details.
|
||
|
//
|
||
|
// You should have received a copy of the GNU Lesser General Public License
|
||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||
|
|
||
|
package pathdb
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"time"
|
||
|
|
||
|
"github.com/VictoriaMetrics/fastcache"
|
||
|
"github.com/cerc-io/ipld-eth-statedb/trie_by_cid/trie/trienode"
|
||
|
"github.com/ethereum/go-ethereum/common"
|
||
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
||
|
"github.com/ethereum/go-ethereum/crypto"
|
||
|
"github.com/ethereum/go-ethereum/ethdb"
|
||
|
"github.com/ethereum/go-ethereum/log"
|
||
|
)
|
||
|
|
||
|
// nodebuffer is a collection of modified trie nodes to aggregate the disk
|
||
|
// write. The content of the nodebuffer must be checked before diving into
|
||
|
// disk (since it basically is not-yet-written data).
|
||
|
type nodebuffer struct {
|
||
|
layers uint64 // The number of diff layers aggregated inside
|
||
|
size uint64 // The size of aggregated writes
|
||
|
limit uint64 // The maximum memory allowance in bytes
|
||
|
nodes map[common.Hash]map[string]*trienode.Node // The dirty node set, mapped by owner and path
|
||
|
}
|
||
|
|
||
|
// newNodeBuffer initializes the node buffer with the provided nodes.
|
||
|
func newNodeBuffer(limit int, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) *nodebuffer {
|
||
|
if nodes == nil {
|
||
|
nodes = make(map[common.Hash]map[string]*trienode.Node)
|
||
|
}
|
||
|
var size uint64
|
||
|
for _, subset := range nodes {
|
||
|
for path, n := range subset {
|
||
|
size += uint64(len(n.Blob) + len(path))
|
||
|
}
|
||
|
}
|
||
|
return &nodebuffer{
|
||
|
layers: layers,
|
||
|
nodes: nodes,
|
||
|
size: size,
|
||
|
limit: uint64(limit),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// node retrieves the trie node with given node info.
|
||
|
func (b *nodebuffer) node(owner common.Hash, path []byte, hash common.Hash) (*trienode.Node, error) {
|
||
|
subset, ok := b.nodes[owner]
|
||
|
if !ok {
|
||
|
return nil, nil
|
||
|
}
|
||
|
n, ok := subset[string(path)]
|
||
|
if !ok {
|
||
|
return nil, nil
|
||
|
}
|
||
|
if n.Hash != hash {
|
||
|
dirtyFalseMeter.Mark(1)
|
||
|
log.Error("Unexpected trie node in node buffer", "owner", owner, "path", path, "expect", hash, "got", n.Hash)
|
||
|
return nil, newUnexpectedNodeError("dirty", hash, n.Hash, owner, path, n.Blob)
|
||
|
}
|
||
|
return n, nil
|
||
|
}
|
||
|
|
||
|
// commit merges the dirty nodes into the nodebuffer. This operation won't take
|
||
|
// the ownership of the nodes map which belongs to the bottom-most diff layer.
|
||
|
// It will just hold the node references from the given map which are safe to
|
||
|
// copy.
|
||
|
func (b *nodebuffer) commit(nodes map[common.Hash]map[string]*trienode.Node) *nodebuffer {
|
||
|
var (
|
||
|
delta int64
|
||
|
overwrite int64
|
||
|
overwriteSize int64
|
||
|
)
|
||
|
for owner, subset := range nodes {
|
||
|
current, exist := b.nodes[owner]
|
||
|
if !exist {
|
||
|
// Allocate a new map for the subset instead of claiming it directly
|
||
|
// from the passed map to avoid potential concurrent map read/write.
|
||
|
// The nodes belong to original diff layer are still accessible even
|
||
|
// after merging, thus the ownership of nodes map should still belong
|
||
|
// to original layer and any mutation on it should be prevented.
|
||
|
current = make(map[string]*trienode.Node)
|
||
|
for path, n := range subset {
|
||
|
current[path] = n
|
||
|
delta += int64(len(n.Blob) + len(path))
|
||
|
}
|
||
|
b.nodes[owner] = current
|
||
|
continue
|
||
|
}
|
||
|
for path, n := range subset {
|
||
|
if orig, exist := current[path]; !exist {
|
||
|
delta += int64(len(n.Blob) + len(path))
|
||
|
} else {
|
||
|
delta += int64(len(n.Blob) - len(orig.Blob))
|
||
|
overwrite++
|
||
|
overwriteSize += int64(len(orig.Blob) + len(path))
|
||
|
}
|
||
|
current[path] = n
|
||
|
}
|
||
|
b.nodes[owner] = current
|
||
|
}
|
||
|
b.updateSize(delta)
|
||
|
b.layers++
|
||
|
gcNodesMeter.Mark(overwrite)
|
||
|
gcBytesMeter.Mark(overwriteSize)
|
||
|
return b
|
||
|
}
|
||
|
|
||
|
// revert is the reverse operation of commit. It also merges the provided nodes
|
||
|
// into the nodebuffer, the difference is that the provided node set should
|
||
|
// revert the changes made by the last state transition.
|
||
|
func (b *nodebuffer) revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error {
|
||
|
// Short circuit if no embedded state transition to revert.
|
||
|
if b.layers == 0 {
|
||
|
return errStateUnrecoverable
|
||
|
}
|
||
|
b.layers--
|
||
|
|
||
|
// Reset the entire buffer if only a single transition left.
|
||
|
if b.layers == 0 {
|
||
|
b.reset()
|
||
|
return nil
|
||
|
}
|
||
|
var delta int64
|
||
|
for owner, subset := range nodes {
|
||
|
current, ok := b.nodes[owner]
|
||
|
if !ok {
|
||
|
panic(fmt.Sprintf("non-existent subset (%x)", owner))
|
||
|
}
|
||
|
for path, n := range subset {
|
||
|
orig, ok := current[path]
|
||
|
if !ok {
|
||
|
// There is a special case in MPT that one child is removed from
|
||
|
// a fullNode which only has two children, and then a new child
|
||
|
// with different position is immediately inserted into the fullNode.
|
||
|
// In this case, the clean child of the fullNode will also be
|
||
|
// marked as dirty because of node collapse and expansion.
|
||
|
//
|
||
|
// In case of database rollback, don't panic if this "clean"
|
||
|
// node occurs which is not present in buffer.
|
||
|
var nhash common.Hash
|
||
|
if owner == (common.Hash{}) {
|
||
|
_, nhash = rawdb.ReadAccountTrieNode(db, []byte(path))
|
||
|
} else {
|
||
|
_, nhash = rawdb.ReadStorageTrieNode(db, owner, []byte(path))
|
||
|
}
|
||
|
// Ignore the clean node in the case described above.
|
||
|
if nhash == n.Hash {
|
||
|
continue
|
||
|
}
|
||
|
panic(fmt.Sprintf("non-existent node (%x %v) blob: %v", owner, path, crypto.Keccak256Hash(n.Blob).Hex()))
|
||
|
}
|
||
|
current[path] = n
|
||
|
delta += int64(len(n.Blob)) - int64(len(orig.Blob))
|
||
|
}
|
||
|
}
|
||
|
b.updateSize(delta)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// updateSize updates the total cache size by the given delta.
|
||
|
func (b *nodebuffer) updateSize(delta int64) {
|
||
|
size := int64(b.size) + delta
|
||
|
if size >= 0 {
|
||
|
b.size = uint64(size)
|
||
|
return
|
||
|
}
|
||
|
s := b.size
|
||
|
b.size = 0
|
||
|
log.Error("Invalid pathdb buffer size", "prev", common.StorageSize(s), "delta", common.StorageSize(delta))
|
||
|
}
|
||
|
|
||
|
// reset cleans up the disk cache.
|
||
|
func (b *nodebuffer) reset() {
|
||
|
b.layers = 0
|
||
|
b.size = 0
|
||
|
b.nodes = make(map[common.Hash]map[string]*trienode.Node)
|
||
|
}
|
||
|
|
||
|
// empty returns an indicator if nodebuffer contains any state transition inside.
|
||
|
func (b *nodebuffer) empty() bool {
|
||
|
return b.layers == 0
|
||
|
}
|
||
|
|
||
|
// setSize sets the buffer size to the provided number, and invokes a flush
|
||
|
// operation if the current memory usage exceeds the new limit.
|
||
|
func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
|
||
|
b.limit = uint64(size)
|
||
|
return b.flush(db, clean, id, false)
|
||
|
}
|
||
|
|
||
|
// flush persists the in-memory dirty trie node into the disk if the configured
|
||
|
// memory threshold is reached. Note, all data must be written atomically.
|
||
|
func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error {
|
||
|
if b.size <= b.limit && !force {
|
||
|
return nil
|
||
|
}
|
||
|
// Ensure the target state id is aligned with the internal counter.
|
||
|
head := rawdb.ReadPersistentStateID(db)
|
||
|
if head+b.layers != id {
|
||
|
return fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", b.layers, head, id)
|
||
|
}
|
||
|
var (
|
||
|
start = time.Now()
|
||
|
batch = db.NewBatchWithSize(int(b.size))
|
||
|
)
|
||
|
nodes := writeNodes(batch, b.nodes, clean)
|
||
|
rawdb.WritePersistentStateID(batch, id)
|
||
|
|
||
|
// Flush all mutations in a single batch
|
||
|
size := batch.ValueSize()
|
||
|
if err := batch.Write(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
commitBytesMeter.Mark(int64(size))
|
||
|
commitNodesMeter.Mark(int64(nodes))
|
||
|
commitTimeTimer.UpdateSince(start)
|
||
|
log.Debug("Persisted pathdb nodes", "nodes", len(b.nodes), "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
|
||
|
b.reset()
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// writeNodes writes the trie nodes into the provided database batch.
|
||
|
// Note this function will also inject all the newly written nodes
|
||
|
// into clean cache.
|
||
|
func writeNodes(batch ethdb.Batch, nodes map[common.Hash]map[string]*trienode.Node, clean *fastcache.Cache) (total int) {
|
||
|
for owner, subset := range nodes {
|
||
|
for path, n := range subset {
|
||
|
if n.IsDeleted() {
|
||
|
if owner == (common.Hash{}) {
|
||
|
rawdb.DeleteAccountTrieNode(batch, []byte(path))
|
||
|
} else {
|
||
|
rawdb.DeleteStorageTrieNode(batch, owner, []byte(path))
|
||
|
}
|
||
|
if clean != nil {
|
||
|
clean.Del(cacheKey(owner, []byte(path)))
|
||
|
}
|
||
|
} else {
|
||
|
if owner == (common.Hash{}) {
|
||
|
rawdb.WriteAccountTrieNode(batch, []byte(path), n.Blob)
|
||
|
} else {
|
||
|
rawdb.WriteStorageTrieNode(batch, owner, []byte(path), n.Blob)
|
||
|
}
|
||
|
if clean != nil {
|
||
|
clean.Set(cacheKey(owner, []byte(path)), n.Blob)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
total += len(subset)
|
||
|
}
|
||
|
return total
|
||
|
}
|
||
|
|
||
|
// cacheKey constructs the unique key of clean cache.
|
||
|
func cacheKey(owner common.Hash, path []byte) []byte {
|
||
|
if owner == (common.Hash{}) {
|
||
|
return path
|
||
|
}
|
||
|
return append(owner.Bytes(), path...)
|
||
|
}
|