forked from cerc-io/plugeth
a1b8892384
This avoids copying the input []byte while decoding trie nodes. In most cases, particularly when the input slice is provided by the underlying database, this optimization is safe to use. For cases where the origin of the input slice is unclear, the copying version is retained. The new code performs better even when the input must be copied, because it is now only copied once in decodeNode.
876 lines
30 KiB
Go
876 lines
30 KiB
Go
// Copyright 2018 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package trie
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"reflect"
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/fastcache"
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
"github.com/ethereum/go-ethereum/ethdb"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/metrics"
|
|
"github.com/ethereum/go-ethereum/rlp"
|
|
)
|
|
|
|
var (
|
|
memcacheCleanHitMeter = metrics.NewRegisteredMeter("trie/memcache/clean/hit", nil)
|
|
memcacheCleanMissMeter = metrics.NewRegisteredMeter("trie/memcache/clean/miss", nil)
|
|
memcacheCleanReadMeter = metrics.NewRegisteredMeter("trie/memcache/clean/read", nil)
|
|
memcacheCleanWriteMeter = metrics.NewRegisteredMeter("trie/memcache/clean/write", nil)
|
|
|
|
memcacheDirtyHitMeter = metrics.NewRegisteredMeter("trie/memcache/dirty/hit", nil)
|
|
memcacheDirtyMissMeter = metrics.NewRegisteredMeter("trie/memcache/dirty/miss", nil)
|
|
memcacheDirtyReadMeter = metrics.NewRegisteredMeter("trie/memcache/dirty/read", nil)
|
|
memcacheDirtyWriteMeter = metrics.NewRegisteredMeter("trie/memcache/dirty/write", nil)
|
|
|
|
memcacheFlushTimeTimer = metrics.NewRegisteredResettingTimer("trie/memcache/flush/time", nil)
|
|
memcacheFlushNodesMeter = metrics.NewRegisteredMeter("trie/memcache/flush/nodes", nil)
|
|
memcacheFlushSizeMeter = metrics.NewRegisteredMeter("trie/memcache/flush/size", nil)
|
|
|
|
memcacheGCTimeTimer = metrics.NewRegisteredResettingTimer("trie/memcache/gc/time", nil)
|
|
memcacheGCNodesMeter = metrics.NewRegisteredMeter("trie/memcache/gc/nodes", nil)
|
|
memcacheGCSizeMeter = metrics.NewRegisteredMeter("trie/memcache/gc/size", nil)
|
|
|
|
memcacheCommitTimeTimer = metrics.NewRegisteredResettingTimer("trie/memcache/commit/time", nil)
|
|
memcacheCommitNodesMeter = metrics.NewRegisteredMeter("trie/memcache/commit/nodes", nil)
|
|
memcacheCommitSizeMeter = metrics.NewRegisteredMeter("trie/memcache/commit/size", nil)
|
|
)
|
|
|
|
// Database is an intermediate write layer between the trie data structures and
|
|
// the disk database. The aim is to accumulate trie writes in-memory and only
|
|
// periodically flush a couple tries to disk, garbage collecting the remainder.
|
|
//
|
|
// Note, the trie Database is **not** thread safe in its mutations, but it **is**
|
|
// thread safe in providing individual, independent node access. The rationale
|
|
// behind this split design is to provide read access to RPC handlers and sync
|
|
// servers even while the trie is executing expensive garbage collection.
|
|
type Database struct {
|
|
diskdb ethdb.KeyValueStore // Persistent storage for matured trie nodes
|
|
|
|
cleans *fastcache.Cache // GC friendly memory cache of clean node RLPs
|
|
dirties map[common.Hash]*cachedNode // Data and references relationships of dirty trie nodes
|
|
oldest common.Hash // Oldest tracked node, flush-list head
|
|
newest common.Hash // Newest tracked node, flush-list tail
|
|
|
|
gctime time.Duration // Time spent on garbage collection since last commit
|
|
gcnodes uint64 // Nodes garbage collected since last commit
|
|
gcsize common.StorageSize // Data storage garbage collected since last commit
|
|
|
|
flushtime time.Duration // Time spent on data flushing since last commit
|
|
flushnodes uint64 // Nodes flushed since last commit
|
|
flushsize common.StorageSize // Data storage flushed since last commit
|
|
|
|
dirtiesSize common.StorageSize // Storage size of the dirty node cache (exc. metadata)
|
|
childrenSize common.StorageSize // Storage size of the external children tracking
|
|
preimages *preimageStore // The store for caching preimages
|
|
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
// rawNode is a simple binary blob used to differentiate between collapsed trie
|
|
// nodes and already encoded RLP binary blobs (while at the same time store them
|
|
// in the same cache fields).
|
|
type rawNode []byte
|
|
|
|
func (n rawNode) cache() (hashNode, bool) { panic("this should never end up in a live trie") }
|
|
func (n rawNode) fstring(ind string) string { panic("this should never end up in a live trie") }
|
|
|
|
func (n rawNode) EncodeRLP(w io.Writer) error {
|
|
_, err := w.Write(n)
|
|
return err
|
|
}
|
|
|
|
// rawFullNode represents only the useful data content of a full node, with the
|
|
// caches and flags stripped out to minimize its data storage. This type honors
|
|
// the same RLP encoding as the original parent.
|
|
type rawFullNode [17]node
|
|
|
|
func (n rawFullNode) cache() (hashNode, bool) { panic("this should never end up in a live trie") }
|
|
func (n rawFullNode) fstring(ind string) string { panic("this should never end up in a live trie") }
|
|
|
|
func (n rawFullNode) EncodeRLP(w io.Writer) error {
|
|
eb := rlp.NewEncoderBuffer(w)
|
|
n.encode(eb)
|
|
return eb.Flush()
|
|
}
|
|
|
|
// rawShortNode represents only the useful data content of a short node, with the
|
|
// caches and flags stripped out to minimize its data storage. This type honors
|
|
// the same RLP encoding as the original parent.
|
|
type rawShortNode struct {
|
|
Key []byte
|
|
Val node
|
|
}
|
|
|
|
func (n rawShortNode) cache() (hashNode, bool) { panic("this should never end up in a live trie") }
|
|
func (n rawShortNode) fstring(ind string) string { panic("this should never end up in a live trie") }
|
|
|
|
// cachedNode is all the information we know about a single cached trie node
|
|
// in the memory database write layer.
|
|
type cachedNode struct {
|
|
node node // Cached collapsed trie node, or raw rlp data
|
|
size uint16 // Byte size of the useful cached data
|
|
|
|
parents uint32 // Number of live nodes referencing this one
|
|
children map[common.Hash]uint16 // External children referenced by this node
|
|
|
|
flushPrev common.Hash // Previous node in the flush-list
|
|
flushNext common.Hash // Next node in the flush-list
|
|
}
|
|
|
|
// cachedNodeSize is the raw size of a cachedNode data structure without any
|
|
// node data included. It's an approximate size, but should be a lot better
|
|
// than not counting them.
|
|
var cachedNodeSize = int(reflect.TypeOf(cachedNode{}).Size())
|
|
|
|
// cachedNodeChildrenSize is the raw size of an initialized but empty external
|
|
// reference map.
|
|
const cachedNodeChildrenSize = 48
|
|
|
|
// rlp returns the raw rlp encoded blob of the cached trie node, either directly
|
|
// from the cache, or by regenerating it from the collapsed node.
|
|
func (n *cachedNode) rlp() []byte {
|
|
if node, ok := n.node.(rawNode); ok {
|
|
return node
|
|
}
|
|
return nodeToBytes(n.node)
|
|
}
|
|
|
|
// obj returns the decoded and expanded trie node, either directly from the cache,
|
|
// or by regenerating it from the rlp encoded blob.
|
|
func (n *cachedNode) obj(hash common.Hash) node {
|
|
if node, ok := n.node.(rawNode); ok {
|
|
// The raw-blob format nodes are loaded from either from
|
|
// clean cache or the database, they are all in their own
|
|
// copy and safe to use unsafe decoder.
|
|
return mustDecodeNodeUnsafe(hash[:], node)
|
|
}
|
|
return expandNode(hash[:], n.node)
|
|
}
|
|
|
|
// forChilds invokes the callback for all the tracked children of this node,
|
|
// both the implicit ones from inside the node as well as the explicit ones
|
|
// from outside the node.
|
|
func (n *cachedNode) forChilds(onChild func(hash common.Hash)) {
|
|
for child := range n.children {
|
|
onChild(child)
|
|
}
|
|
if _, ok := n.node.(rawNode); !ok {
|
|
forGatherChildren(n.node, onChild)
|
|
}
|
|
}
|
|
|
|
// forGatherChildren traverses the node hierarchy of a collapsed storage node and
|
|
// invokes the callback for all the hashnode children.
|
|
func forGatherChildren(n node, onChild func(hash common.Hash)) {
|
|
switch n := n.(type) {
|
|
case *rawShortNode:
|
|
forGatherChildren(n.Val, onChild)
|
|
case rawFullNode:
|
|
for i := 0; i < 16; i++ {
|
|
forGatherChildren(n[i], onChild)
|
|
}
|
|
case hashNode:
|
|
onChild(common.BytesToHash(n))
|
|
case valueNode, nil, rawNode:
|
|
default:
|
|
panic(fmt.Sprintf("unknown node type: %T", n))
|
|
}
|
|
}
|
|
|
|
// simplifyNode traverses the hierarchy of an expanded memory node and discards
|
|
// all the internal caches, returning a node that only contains the raw data.
|
|
func simplifyNode(n node) node {
|
|
switch n := n.(type) {
|
|
case *shortNode:
|
|
// Short nodes discard the flags and cascade
|
|
return &rawShortNode{Key: n.Key, Val: simplifyNode(n.Val)}
|
|
|
|
case *fullNode:
|
|
// Full nodes discard the flags and cascade
|
|
node := rawFullNode(n.Children)
|
|
for i := 0; i < len(node); i++ {
|
|
if node[i] != nil {
|
|
node[i] = simplifyNode(node[i])
|
|
}
|
|
}
|
|
return node
|
|
|
|
case valueNode, hashNode, rawNode:
|
|
return n
|
|
|
|
default:
|
|
panic(fmt.Sprintf("unknown node type: %T", n))
|
|
}
|
|
}
|
|
|
|
// expandNode traverses the node hierarchy of a collapsed storage node and converts
|
|
// all fields and keys into expanded memory form.
|
|
func expandNode(hash hashNode, n node) node {
|
|
switch n := n.(type) {
|
|
case *rawShortNode:
|
|
// Short nodes need key and child expansion
|
|
return &shortNode{
|
|
Key: compactToHex(n.Key),
|
|
Val: expandNode(nil, n.Val),
|
|
flags: nodeFlag{
|
|
hash: hash,
|
|
},
|
|
}
|
|
|
|
case rawFullNode:
|
|
// Full nodes need child expansion
|
|
node := &fullNode{
|
|
flags: nodeFlag{
|
|
hash: hash,
|
|
},
|
|
}
|
|
for i := 0; i < len(node.Children); i++ {
|
|
if n[i] != nil {
|
|
node.Children[i] = expandNode(nil, n[i])
|
|
}
|
|
}
|
|
return node
|
|
|
|
case valueNode, hashNode:
|
|
return n
|
|
|
|
default:
|
|
panic(fmt.Sprintf("unknown node type: %T", n))
|
|
}
|
|
}
|
|
|
|
// Config defines all necessary options for database.
|
|
type Config struct {
|
|
Cache int // Memory allowance (MB) to use for caching trie nodes in memory
|
|
Journal string // Journal of clean cache to survive node restarts
|
|
Preimages bool // Flag whether the preimage of trie key is recorded
|
|
}
|
|
|
|
// NewDatabase creates a new trie database to store ephemeral trie content before
|
|
// its written out to disk or garbage collected. No read cache is created, so all
|
|
// data retrievals will hit the underlying disk database.
|
|
func NewDatabase(diskdb ethdb.KeyValueStore) *Database {
|
|
return NewDatabaseWithConfig(diskdb, nil)
|
|
}
|
|
|
|
// NewDatabaseWithConfig creates a new trie database to store ephemeral trie content
|
|
// before its written out to disk or garbage collected. It also acts as a read cache
|
|
// for nodes loaded from disk.
|
|
func NewDatabaseWithConfig(diskdb ethdb.KeyValueStore, config *Config) *Database {
|
|
var cleans *fastcache.Cache
|
|
if config != nil && config.Cache > 0 {
|
|
if config.Journal == "" {
|
|
cleans = fastcache.New(config.Cache * 1024 * 1024)
|
|
} else {
|
|
cleans = fastcache.LoadFromFileOrNew(config.Journal, config.Cache*1024*1024)
|
|
}
|
|
}
|
|
var preimage *preimageStore
|
|
if config != nil && config.Preimages {
|
|
preimage = newPreimageStore(diskdb)
|
|
}
|
|
db := &Database{
|
|
diskdb: diskdb,
|
|
cleans: cleans,
|
|
dirties: map[common.Hash]*cachedNode{{}: {
|
|
children: make(map[common.Hash]uint16),
|
|
}},
|
|
preimages: preimage,
|
|
}
|
|
return db
|
|
}
|
|
|
|
// DiskDB retrieves the persistent storage backing the trie database.
|
|
func (db *Database) DiskDB() ethdb.KeyValueStore {
|
|
return db.diskdb
|
|
}
|
|
|
|
// insert inserts a simplified trie node into the memory database.
|
|
// All nodes inserted by this function will be reference tracked
|
|
// and in theory should only used for **trie nodes** insertion.
|
|
func (db *Database) insert(hash common.Hash, size int, node node) {
|
|
// If the node's already cached, skip
|
|
if _, ok := db.dirties[hash]; ok {
|
|
return
|
|
}
|
|
memcacheDirtyWriteMeter.Mark(int64(size))
|
|
|
|
// Create the cached entry for this node
|
|
entry := &cachedNode{
|
|
node: node,
|
|
size: uint16(size),
|
|
flushPrev: db.newest,
|
|
}
|
|
entry.forChilds(func(child common.Hash) {
|
|
if c := db.dirties[child]; c != nil {
|
|
c.parents++
|
|
}
|
|
})
|
|
db.dirties[hash] = entry
|
|
|
|
// Update the flush-list endpoints
|
|
if db.oldest == (common.Hash{}) {
|
|
db.oldest, db.newest = hash, hash
|
|
} else {
|
|
db.dirties[db.newest].flushNext, db.newest = hash, hash
|
|
}
|
|
db.dirtiesSize += common.StorageSize(common.HashLength + entry.size)
|
|
}
|
|
|
|
// node retrieves a cached trie node from memory, or returns nil if none can be
|
|
// found in the memory cache.
|
|
func (db *Database) node(hash common.Hash) node {
|
|
// Retrieve the node from the clean cache if available
|
|
if db.cleans != nil {
|
|
if enc := db.cleans.Get(nil, hash[:]); enc != nil {
|
|
memcacheCleanHitMeter.Mark(1)
|
|
memcacheCleanReadMeter.Mark(int64(len(enc)))
|
|
|
|
// The returned value from cache is in its own copy,
|
|
// safe to use mustDecodeNodeUnsafe for decoding.
|
|
return mustDecodeNodeUnsafe(hash[:], enc)
|
|
}
|
|
}
|
|
// Retrieve the node from the dirty cache if available
|
|
db.lock.RLock()
|
|
dirty := db.dirties[hash]
|
|
db.lock.RUnlock()
|
|
|
|
if dirty != nil {
|
|
memcacheDirtyHitMeter.Mark(1)
|
|
memcacheDirtyReadMeter.Mark(int64(dirty.size))
|
|
return dirty.obj(hash)
|
|
}
|
|
memcacheDirtyMissMeter.Mark(1)
|
|
|
|
// Content unavailable in memory, attempt to retrieve from disk
|
|
enc, err := db.diskdb.Get(hash[:])
|
|
if err != nil || enc == nil {
|
|
return nil
|
|
}
|
|
if db.cleans != nil {
|
|
db.cleans.Set(hash[:], enc)
|
|
memcacheCleanMissMeter.Mark(1)
|
|
memcacheCleanWriteMeter.Mark(int64(len(enc)))
|
|
}
|
|
// The returned value from database is in its own copy,
|
|
// safe to use mustDecodeNodeUnsafe for decoding.
|
|
return mustDecodeNodeUnsafe(hash[:], enc)
|
|
}
|
|
|
|
// Node retrieves an encoded cached trie node from memory. If it cannot be found
|
|
// cached, the method queries the persistent database for the content.
|
|
func (db *Database) Node(hash common.Hash) ([]byte, error) {
|
|
// It doesn't make sense to retrieve the metaroot
|
|
if hash == (common.Hash{}) {
|
|
return nil, errors.New("not found")
|
|
}
|
|
// Retrieve the node from the clean cache if available
|
|
if db.cleans != nil {
|
|
if enc := db.cleans.Get(nil, hash[:]); enc != nil {
|
|
memcacheCleanHitMeter.Mark(1)
|
|
memcacheCleanReadMeter.Mark(int64(len(enc)))
|
|
return enc, nil
|
|
}
|
|
}
|
|
// Retrieve the node from the dirty cache if available
|
|
db.lock.RLock()
|
|
dirty := db.dirties[hash]
|
|
db.lock.RUnlock()
|
|
|
|
if dirty != nil {
|
|
memcacheDirtyHitMeter.Mark(1)
|
|
memcacheDirtyReadMeter.Mark(int64(dirty.size))
|
|
return dirty.rlp(), nil
|
|
}
|
|
memcacheDirtyMissMeter.Mark(1)
|
|
|
|
// Content unavailable in memory, attempt to retrieve from disk
|
|
enc := rawdb.ReadTrieNode(db.diskdb, hash)
|
|
if len(enc) != 0 {
|
|
if db.cleans != nil {
|
|
db.cleans.Set(hash[:], enc)
|
|
memcacheCleanMissMeter.Mark(1)
|
|
memcacheCleanWriteMeter.Mark(int64(len(enc)))
|
|
}
|
|
return enc, nil
|
|
}
|
|
return nil, errors.New("not found")
|
|
}
|
|
|
|
// Nodes retrieves the hashes of all the nodes cached within the memory database.
|
|
// This method is extremely expensive and should only be used to validate internal
|
|
// states in test code.
|
|
func (db *Database) Nodes() []common.Hash {
|
|
db.lock.RLock()
|
|
defer db.lock.RUnlock()
|
|
|
|
var hashes = make([]common.Hash, 0, len(db.dirties))
|
|
for hash := range db.dirties {
|
|
if hash != (common.Hash{}) { // Special case for "root" references/nodes
|
|
hashes = append(hashes, hash)
|
|
}
|
|
}
|
|
return hashes
|
|
}
|
|
|
|
// Reference adds a new reference from a parent node to a child node.
|
|
// This function is used to add reference between internal trie node
|
|
// and external node(e.g. storage trie root), all internal trie nodes
|
|
// are referenced together by database itself.
|
|
func (db *Database) Reference(child common.Hash, parent common.Hash) {
|
|
db.lock.Lock()
|
|
defer db.lock.Unlock()
|
|
|
|
db.reference(child, parent)
|
|
}
|
|
|
|
// reference is the private locked version of Reference.
|
|
func (db *Database) reference(child common.Hash, parent common.Hash) {
|
|
// If the node does not exist, it's a node pulled from disk, skip
|
|
node, ok := db.dirties[child]
|
|
if !ok {
|
|
return
|
|
}
|
|
// If the reference already exists, only duplicate for roots
|
|
if db.dirties[parent].children == nil {
|
|
db.dirties[parent].children = make(map[common.Hash]uint16)
|
|
db.childrenSize += cachedNodeChildrenSize
|
|
} else if _, ok = db.dirties[parent].children[child]; ok && parent != (common.Hash{}) {
|
|
return
|
|
}
|
|
node.parents++
|
|
db.dirties[parent].children[child]++
|
|
if db.dirties[parent].children[child] == 1 {
|
|
db.childrenSize += common.HashLength + 2 // uint16 counter
|
|
}
|
|
}
|
|
|
|
// Dereference removes an existing reference from a root node.
|
|
func (db *Database) Dereference(root common.Hash) {
|
|
// Sanity check to ensure that the meta-root is not removed
|
|
if root == (common.Hash{}) {
|
|
log.Error("Attempted to dereference the trie cache meta root")
|
|
return
|
|
}
|
|
db.lock.Lock()
|
|
defer db.lock.Unlock()
|
|
|
|
nodes, storage, start := len(db.dirties), db.dirtiesSize, time.Now()
|
|
db.dereference(root, common.Hash{})
|
|
|
|
db.gcnodes += uint64(nodes - len(db.dirties))
|
|
db.gcsize += storage - db.dirtiesSize
|
|
db.gctime += time.Since(start)
|
|
|
|
memcacheGCTimeTimer.Update(time.Since(start))
|
|
memcacheGCSizeMeter.Mark(int64(storage - db.dirtiesSize))
|
|
memcacheGCNodesMeter.Mark(int64(nodes - len(db.dirties)))
|
|
|
|
log.Debug("Dereferenced trie from memory database", "nodes", nodes-len(db.dirties), "size", storage-db.dirtiesSize, "time", time.Since(start),
|
|
"gcnodes", db.gcnodes, "gcsize", db.gcsize, "gctime", db.gctime, "livenodes", len(db.dirties), "livesize", db.dirtiesSize)
|
|
}
|
|
|
|
// dereference is the private locked version of Dereference.
|
|
func (db *Database) dereference(child common.Hash, parent common.Hash) {
|
|
// Dereference the parent-child
|
|
node := db.dirties[parent]
|
|
|
|
if node.children != nil && node.children[child] > 0 {
|
|
node.children[child]--
|
|
if node.children[child] == 0 {
|
|
delete(node.children, child)
|
|
db.childrenSize -= (common.HashLength + 2) // uint16 counter
|
|
}
|
|
}
|
|
// If the child does not exist, it's a previously committed node.
|
|
node, ok := db.dirties[child]
|
|
if !ok {
|
|
return
|
|
}
|
|
// If there are no more references to the child, delete it and cascade
|
|
if node.parents > 0 {
|
|
// This is a special cornercase where a node loaded from disk (i.e. not in the
|
|
// memcache any more) gets reinjected as a new node (short node split into full,
|
|
// then reverted into short), causing a cached node to have no parents. That is
|
|
// no problem in itself, but don't make maxint parents out of it.
|
|
node.parents--
|
|
}
|
|
if node.parents == 0 {
|
|
// Remove the node from the flush-list
|
|
switch child {
|
|
case db.oldest:
|
|
db.oldest = node.flushNext
|
|
db.dirties[node.flushNext].flushPrev = common.Hash{}
|
|
case db.newest:
|
|
db.newest = node.flushPrev
|
|
db.dirties[node.flushPrev].flushNext = common.Hash{}
|
|
default:
|
|
db.dirties[node.flushPrev].flushNext = node.flushNext
|
|
db.dirties[node.flushNext].flushPrev = node.flushPrev
|
|
}
|
|
// Dereference all children and delete the node
|
|
node.forChilds(func(hash common.Hash) {
|
|
db.dereference(hash, child)
|
|
})
|
|
delete(db.dirties, child)
|
|
db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size))
|
|
if node.children != nil {
|
|
db.childrenSize -= cachedNodeChildrenSize
|
|
}
|
|
}
|
|
}
|
|
|
|
// Cap iteratively flushes old but still referenced trie nodes until the total
|
|
// memory usage goes below the given threshold.
|
|
//
|
|
// Note, this method is a non-synchronized mutator. It is unsafe to call this
|
|
// concurrently with other mutators.
|
|
func (db *Database) Cap(limit common.StorageSize) error {
|
|
// Create a database batch to flush persistent data out. It is important that
|
|
// outside code doesn't see an inconsistent state (referenced data removed from
|
|
// memory cache during commit but not yet in persistent storage). This is ensured
|
|
// by only uncaching existing data when the database write finalizes.
|
|
nodes, storage, start := len(db.dirties), db.dirtiesSize, time.Now()
|
|
batch := db.diskdb.NewBatch()
|
|
|
|
// db.dirtiesSize only contains the useful data in the cache, but when reporting
|
|
// the total memory consumption, the maintenance metadata is also needed to be
|
|
// counted.
|
|
size := db.dirtiesSize + common.StorageSize((len(db.dirties)-1)*cachedNodeSize)
|
|
size += db.childrenSize - common.StorageSize(len(db.dirties[common.Hash{}].children)*(common.HashLength+2))
|
|
|
|
// If the preimage cache got large enough, push to disk. If it's still small
|
|
// leave for later to deduplicate writes.
|
|
if db.preimages != nil {
|
|
db.preimages.commit(false)
|
|
}
|
|
// Keep committing nodes from the flush-list until we're below allowance
|
|
oldest := db.oldest
|
|
for size > limit && oldest != (common.Hash{}) {
|
|
// Fetch the oldest referenced node and push into the batch
|
|
node := db.dirties[oldest]
|
|
rawdb.WriteTrieNode(batch, oldest, node.rlp())
|
|
|
|
// If we exceeded the ideal batch size, commit and reset
|
|
if batch.ValueSize() >= ethdb.IdealBatchSize {
|
|
if err := batch.Write(); err != nil {
|
|
log.Error("Failed to write flush list to disk", "err", err)
|
|
return err
|
|
}
|
|
batch.Reset()
|
|
}
|
|
// Iterate to the next flush item, or abort if the size cap was achieved. Size
|
|
// is the total size, including the useful cached data (hash -> blob), the
|
|
// cache item metadata, as well as external children mappings.
|
|
size -= common.StorageSize(common.HashLength + int(node.size) + cachedNodeSize)
|
|
if node.children != nil {
|
|
size -= common.StorageSize(cachedNodeChildrenSize + len(node.children)*(common.HashLength+2))
|
|
}
|
|
oldest = node.flushNext
|
|
}
|
|
// Flush out any remainder data from the last batch
|
|
if err := batch.Write(); err != nil {
|
|
log.Error("Failed to write flush list to disk", "err", err)
|
|
return err
|
|
}
|
|
// Write successful, clear out the flushed data
|
|
db.lock.Lock()
|
|
defer db.lock.Unlock()
|
|
|
|
for db.oldest != oldest {
|
|
node := db.dirties[db.oldest]
|
|
delete(db.dirties, db.oldest)
|
|
db.oldest = node.flushNext
|
|
|
|
db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size))
|
|
if node.children != nil {
|
|
db.childrenSize -= common.StorageSize(cachedNodeChildrenSize + len(node.children)*(common.HashLength+2))
|
|
}
|
|
}
|
|
if db.oldest != (common.Hash{}) {
|
|
db.dirties[db.oldest].flushPrev = common.Hash{}
|
|
}
|
|
db.flushnodes += uint64(nodes - len(db.dirties))
|
|
db.flushsize += storage - db.dirtiesSize
|
|
db.flushtime += time.Since(start)
|
|
|
|
memcacheFlushTimeTimer.Update(time.Since(start))
|
|
memcacheFlushSizeMeter.Mark(int64(storage - db.dirtiesSize))
|
|
memcacheFlushNodesMeter.Mark(int64(nodes - len(db.dirties)))
|
|
|
|
log.Debug("Persisted nodes from memory database", "nodes", nodes-len(db.dirties), "size", storage-db.dirtiesSize, "time", time.Since(start),
|
|
"flushnodes", db.flushnodes, "flushsize", db.flushsize, "flushtime", db.flushtime, "livenodes", len(db.dirties), "livesize", db.dirtiesSize)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Commit iterates over all the children of a particular node, writes them out
|
|
// to disk, forcefully tearing down all references in both directions. As a side
|
|
// effect, all pre-images accumulated up to this point are also written.
|
|
//
|
|
// Note, this method is a non-synchronized mutator. It is unsafe to call this
|
|
// concurrently with other mutators.
|
|
func (db *Database) Commit(node common.Hash, report bool, callback func(common.Hash)) error {
|
|
// Create a database batch to flush persistent data out. It is important that
|
|
// outside code doesn't see an inconsistent state (referenced data removed from
|
|
// memory cache during commit but not yet in persistent storage). This is ensured
|
|
// by only uncaching existing data when the database write finalizes.
|
|
start := time.Now()
|
|
batch := db.diskdb.NewBatch()
|
|
|
|
// Move all of the accumulated preimages into a write batch
|
|
if db.preimages != nil {
|
|
db.preimages.commit(true)
|
|
}
|
|
// Move the trie itself into the batch, flushing if enough data is accumulated
|
|
nodes, storage := len(db.dirties), db.dirtiesSize
|
|
|
|
uncacher := &cleaner{db}
|
|
if err := db.commit(node, batch, uncacher, callback); err != nil {
|
|
log.Error("Failed to commit trie from trie database", "err", err)
|
|
return err
|
|
}
|
|
// Trie mostly committed to disk, flush any batch leftovers
|
|
if err := batch.Write(); err != nil {
|
|
log.Error("Failed to write trie to disk", "err", err)
|
|
return err
|
|
}
|
|
// Uncache any leftovers in the last batch
|
|
db.lock.Lock()
|
|
defer db.lock.Unlock()
|
|
|
|
batch.Replay(uncacher)
|
|
batch.Reset()
|
|
|
|
// Reset the storage counters and bumped metrics
|
|
memcacheCommitTimeTimer.Update(time.Since(start))
|
|
memcacheCommitSizeMeter.Mark(int64(storage - db.dirtiesSize))
|
|
memcacheCommitNodesMeter.Mark(int64(nodes - len(db.dirties)))
|
|
|
|
logger := log.Info
|
|
if !report {
|
|
logger = log.Debug
|
|
}
|
|
logger("Persisted trie from memory database", "nodes", nodes-len(db.dirties)+int(db.flushnodes), "size", storage-db.dirtiesSize+db.flushsize, "time", time.Since(start)+db.flushtime,
|
|
"gcnodes", db.gcnodes, "gcsize", db.gcsize, "gctime", db.gctime, "livenodes", len(db.dirties), "livesize", db.dirtiesSize)
|
|
|
|
// Reset the garbage collection statistics
|
|
db.gcnodes, db.gcsize, db.gctime = 0, 0, 0
|
|
db.flushnodes, db.flushsize, db.flushtime = 0, 0, 0
|
|
|
|
return nil
|
|
}
|
|
|
|
// commit is the private locked version of Commit.
|
|
func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleaner, callback func(common.Hash)) error {
|
|
// If the node does not exist, it's a previously committed node
|
|
node, ok := db.dirties[hash]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
var err error
|
|
node.forChilds(func(child common.Hash) {
|
|
if err == nil {
|
|
err = db.commit(child, batch, uncacher, callback)
|
|
}
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// If we've reached an optimal batch size, commit and start over
|
|
rawdb.WriteTrieNode(batch, hash, node.rlp())
|
|
if callback != nil {
|
|
callback(hash)
|
|
}
|
|
if batch.ValueSize() >= ethdb.IdealBatchSize {
|
|
if err := batch.Write(); err != nil {
|
|
return err
|
|
}
|
|
db.lock.Lock()
|
|
batch.Replay(uncacher)
|
|
batch.Reset()
|
|
db.lock.Unlock()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// cleaner is a database batch replayer that takes a batch of write operations
|
|
// and cleans up the trie database from anything written to disk.
|
|
type cleaner struct {
|
|
db *Database
|
|
}
|
|
|
|
// Put reacts to database writes and implements dirty data uncaching. This is the
|
|
// post-processing step of a commit operation where the already persisted trie is
|
|
// removed from the dirty cache and moved into the clean cache. The reason behind
|
|
// the two-phase commit is to ensure data availability while moving from memory
|
|
// to disk.
|
|
func (c *cleaner) Put(key []byte, rlp []byte) error {
|
|
hash := common.BytesToHash(key)
|
|
|
|
// If the node does not exist, we're done on this path
|
|
node, ok := c.db.dirties[hash]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
// Node still exists, remove it from the flush-list
|
|
switch hash {
|
|
case c.db.oldest:
|
|
c.db.oldest = node.flushNext
|
|
c.db.dirties[node.flushNext].flushPrev = common.Hash{}
|
|
case c.db.newest:
|
|
c.db.newest = node.flushPrev
|
|
c.db.dirties[node.flushPrev].flushNext = common.Hash{}
|
|
default:
|
|
c.db.dirties[node.flushPrev].flushNext = node.flushNext
|
|
c.db.dirties[node.flushNext].flushPrev = node.flushPrev
|
|
}
|
|
// Remove the node from the dirty cache
|
|
delete(c.db.dirties, hash)
|
|
c.db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size))
|
|
if node.children != nil {
|
|
c.db.childrenSize -= common.StorageSize(cachedNodeChildrenSize + len(node.children)*(common.HashLength+2))
|
|
}
|
|
// Move the flushed node into the clean cache to prevent insta-reloads
|
|
if c.db.cleans != nil {
|
|
c.db.cleans.Set(hash[:], rlp)
|
|
memcacheCleanWriteMeter.Mark(int64(len(rlp)))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *cleaner) Delete(key []byte) error {
|
|
panic("not implemented")
|
|
}
|
|
|
|
// Update inserts the dirty nodes in provided nodeset into database and
|
|
// link the account trie with multiple storage tries if necessary.
|
|
func (db *Database) Update(nodes *MergedNodeSet) error {
|
|
db.lock.Lock()
|
|
defer db.lock.Unlock()
|
|
|
|
// Insert dirty nodes into the database. In the same tree, it must be
|
|
// ensured that children are inserted first, then parent so that children
|
|
// can be linked with their parent correctly. The order of writing between
|
|
// different tries(account trie, storage tries) is not required.
|
|
for owner, subset := range nodes.sets {
|
|
for _, path := range subset.paths {
|
|
n, ok := subset.nodes[path]
|
|
if !ok {
|
|
return fmt.Errorf("missing node %x %v", owner, path)
|
|
}
|
|
db.insert(n.hash, int(n.size), n.node)
|
|
}
|
|
}
|
|
// Link up the account trie and storage trie if the node points
|
|
// to an account trie leaf.
|
|
if set, present := nodes.sets[common.Hash{}]; present {
|
|
for _, n := range set.leaves {
|
|
var account types.StateAccount
|
|
if err := rlp.DecodeBytes(n.blob, &account); err != nil {
|
|
return err
|
|
}
|
|
if account.Root != emptyRoot {
|
|
db.reference(account.Root, n.parent)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Size returns the current storage size of the memory cache in front of the
|
|
// persistent database layer.
|
|
func (db *Database) Size() (common.StorageSize, common.StorageSize) {
|
|
db.lock.RLock()
|
|
defer db.lock.RUnlock()
|
|
|
|
// db.dirtiesSize only contains the useful data in the cache, but when reporting
|
|
// the total memory consumption, the maintenance metadata is also needed to be
|
|
// counted.
|
|
var metadataSize = common.StorageSize((len(db.dirties) - 1) * cachedNodeSize)
|
|
var metarootRefs = common.StorageSize(len(db.dirties[common.Hash{}].children) * (common.HashLength + 2))
|
|
var preimageSize common.StorageSize
|
|
if db.preimages != nil {
|
|
preimageSize = db.preimages.size()
|
|
}
|
|
return db.dirtiesSize + db.childrenSize + metadataSize - metarootRefs, preimageSize
|
|
}
|
|
|
|
// saveCache saves clean state cache to given directory path
|
|
// using specified CPU cores.
|
|
func (db *Database) saveCache(dir string, threads int) error {
|
|
if db.cleans == nil {
|
|
return nil
|
|
}
|
|
log.Info("Writing clean trie cache to disk", "path", dir, "threads", threads)
|
|
|
|
start := time.Now()
|
|
err := db.cleans.SaveToFileConcurrent(dir, threads)
|
|
if err != nil {
|
|
log.Error("Failed to persist clean trie cache", "error", err)
|
|
return err
|
|
}
|
|
log.Info("Persisted the clean trie cache", "path", dir, "elapsed", common.PrettyDuration(time.Since(start)))
|
|
return nil
|
|
}
|
|
|
|
// SaveCache atomically saves fast cache data to the given dir using all
|
|
// available CPU cores.
|
|
func (db *Database) SaveCache(dir string) error {
|
|
return db.saveCache(dir, runtime.GOMAXPROCS(0))
|
|
}
|
|
|
|
// SaveCachePeriodically atomically saves fast cache data to the given dir with
|
|
// the specified interval. All dump operation will only use a single CPU core.
|
|
func (db *Database) SaveCachePeriodically(dir string, interval time.Duration, stopCh <-chan struct{}) {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
db.saveCache(dir, 1)
|
|
case <-stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// CommitPreimages flushes the dangling preimages to disk. It is meant to be
|
|
// called when closing the blockchain object, so that preimages are persisted
|
|
// to the database.
|
|
func (db *Database) CommitPreimages() error {
|
|
db.lock.Lock()
|
|
defer db.lock.Unlock()
|
|
|
|
if db.preimages == nil {
|
|
return nil
|
|
}
|
|
return db.preimages.commit(true)
|
|
}
|