From 351a5903b0ccb9c77b5f0983fdd17c3d4de7acf9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 26 Nov 2019 09:48:29 +0200 Subject: [PATCH] core/rawdb, core/state/snapshot: runtime snapshot generation --- cmd/geth/main.go | 1 + cmd/geth/usage.go | 1 + cmd/utils/flags.go | 13 +- core/blockchain.go | 24 +- core/rawdb/database.go | 4 +- core/rawdb/schema.go | 19 +- core/state/snapshot/difflayer.go | 206 ++++++++- core/state/snapshot/difflayer_journal.go | 137 ------ core/state/snapshot/difflayer_test.go | 54 +-- core/state/snapshot/disklayer.go | 57 ++- core/state/snapshot/disklayer_test.go | 433 ++++++++++++++++++ core/state/snapshot/generate.go | 330 +++++++------ core/state/snapshot/journal.go | 257 +++++++++++ core/state/snapshot/snapshot.go | 270 ++++++++--- core/state/snapshot/snapshot_test.go | 49 +- core/state/snapshot/wipe.go | 130 ++++++ .../{generate_test.go => wipe_test.go} | 38 +- core/state/statedb.go | 4 +- eth/backend.go | 6 +- eth/config.go | 2 + trie/iterator.go | 2 - 21 files changed, 1551 insertions(+), 486 deletions(-) delete mode 100644 core/state/snapshot/difflayer_journal.go create mode 100644 core/state/snapshot/disklayer_test.go create mode 100644 core/state/snapshot/journal.go create mode 100644 core/state/snapshot/wipe.go rename core/state/snapshot/{generate_test.go => wipe_test.go} (77%) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 99ef78238..36187e484 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -106,6 +106,7 @@ var ( utils.CacheDatabaseFlag, utils.CacheTrieFlag, utils.CacheGCFlag, + utils.CacheSnapshotFlag, utils.CacheNoPrefetchFlag, utils.ListenPortFlag, utils.MaxPeersFlag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 6f3197b9c..f2f3b5756 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -137,6 +137,7 @@ var AppHelpFlagGroups = []flagGroup{ utils.CacheDatabaseFlag, utils.CacheTrieFlag, utils.CacheGCFlag, + utils.CacheSnapshotFlag, utils.CacheNoPrefetchFlag, }, }, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index bdadebd85..22fe677fa 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -383,14 +383,19 @@ var ( } CacheTrieFlag = cli.IntFlag{ Name: "cache.trie", - Usage: "Percentage of cache memory allowance to use for trie caching (default = 25% full mode, 50% archive mode)", - Value: 25, + Usage: "Percentage of cache memory allowance to use for trie caching (default = 15% full mode, 30% archive mode)", + Value: 15, } CacheGCFlag = cli.IntFlag{ Name: "cache.gc", Usage: "Percentage of cache memory allowance to use for trie pruning (default = 25% full mode, 0% archive mode)", Value: 25, } + CacheSnapshotFlag = cli.IntFlag{ + Name: "cache.snapshot", + Usage: "Percentage of cache memory allowance to use for snapshot caching (default = 10% full mode, 20% archive mode)", + Value: 10, + } CacheNoPrefetchFlag = cli.BoolFlag{ Name: "cache.noprefetch", Usage: "Disable heuristic state prefetch during block import (less CPU and disk IO, more time waiting for data)", @@ -1463,6 +1468,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) { if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheGCFlag.Name) { cfg.TrieDirtyCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheGCFlag.Name) / 100 } + if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheSnapshotFlag.Name) { + cfg.SnapshotCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheSnapshotFlag.Name) / 100 + } if ctx.GlobalIsSet(DocRootFlag.Name) { cfg.DocRoot = ctx.GlobalString(DocRootFlag.Name) } @@ -1724,6 +1732,7 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (chain *core.BlockChain, chai TrieDirtyLimit: eth.DefaultConfig.TrieDirtyCache, TrieDirtyDisabled: ctx.GlobalString(GCModeFlag.Name) == "archive", TrieTimeLimit: eth.DefaultConfig.TrieTimeout, + SnapshotLimit: eth.DefaultConfig.SnapshotCache, } if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheTrieFlag.Name) { cache.TrieCleanLimit = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheTrieFlag.Name) / 100 diff --git a/core/blockchain.go b/core/blockchain.go index 6fb722d2d..3932baf55 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -62,8 +62,8 @@ var ( storageUpdateTimer = metrics.NewRegisteredTimer("chain/storage/updates", nil) storageCommitTimer = metrics.NewRegisteredTimer("chain/storage/commits", nil) - snapshotAccountReadTimer = metrics.NewRegisteredTimer("chain/snapshot/accountreads", nil) - snapshotStorageReadTimer = metrics.NewRegisteredTimer("chain/snapshot/storagereads", nil) + snapshotAccountReadTimer = metrics.NewRegisteredTimer("chain/snapshot/account/reads", nil) + snapshotStorageReadTimer = metrics.NewRegisteredTimer("chain/snapshot/storage/reads", nil) snapshotCommitTimer = metrics.NewRegisteredTimer("chain/snapshot/commits", nil) blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil) @@ -120,6 +120,7 @@ type CacheConfig struct { TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node) TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk + SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory } // BlockChain represents the canonical chain given a database with a genesis @@ -194,6 +195,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par TrieCleanLimit: 256, TrieDirtyLimit: 256, TrieTimeLimit: 5 * time.Minute, + SnapshotLimit: 256, } } bodyCache, _ := lru.New(bodyCacheLimit) @@ -300,10 +302,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } } // Load any existing snapshot, regenerating it if loading failed - head := bc.CurrentBlock() - if bc.snaps, err = snapshot.New(bc.db, "snapshot.rlp", head.Root()); err != nil { - return nil, err - } + bc.snaps = snapshot.New(bc.db, bc.stateCache.TrieDB(), "snapshot.rlp", bc.cacheConfig.SnapshotLimit, bc.CurrentBlock().Root()) + // Take ownership of this particular state go bc.update() return bc, nil @@ -497,6 +497,9 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error { headBlockGauge.Update(int64(block.NumberU64())) bc.chainmu.Unlock() + // Destroy any existing state snapshot and regenerate it in the background + bc.snaps.Rebuild(block.Root()) + log.Info("Committed new head block", "number", block.Number(), "hash", hash) return nil } @@ -851,7 +854,8 @@ func (bc *BlockChain) Stop() { bc.wg.Wait() // Ensure that the entirety of the state snapshot is journalled to disk. - if err := bc.snaps.Journal(bc.CurrentBlock().Root()); err != nil { + snapBase, err := bc.snaps.Journal(bc.CurrentBlock().Root(), "snapshot.rlp") + if err != nil { log.Error("Failed to journal state snapshot", "err", err) } // Ensure the state of a recent block is also stored to disk before exiting. @@ -872,6 +876,12 @@ func (bc *BlockChain) Stop() { } } } + if snapBase != (common.Hash{}) { + log.Info("Writing snapshot state to disk", "root", snapBase) + if err := triedb.Commit(snapBase, true); err != nil { + log.Error("Failed to commit recent state trie", "err", err) + } + } for !bc.triegc.Empty() { triedb.Dereference(bc.triegc.PopItem().(common.Hash)) } diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 7abd07359..b74d8e2e3 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -282,9 +282,9 @@ func InspectDatabase(db ethdb.Database) error { receiptSize += size case bytes.HasPrefix(key, txLookupPrefix) && len(key) == (len(txLookupPrefix)+common.HashLength): txlookupSize += size - case bytes.HasPrefix(key, StateSnapshotPrefix) && len(key) == (len(StateSnapshotPrefix)+common.HashLength): + case bytes.HasPrefix(key, SnapshotAccountPrefix) && len(key) == (len(SnapshotAccountPrefix)+common.HashLength): accountSnapSize += size - case bytes.HasPrefix(key, StateSnapshotPrefix) && len(key) == (len(StateSnapshotPrefix)+2*common.HashLength): + case bytes.HasPrefix(key, SnapshotStoragePrefix) && len(key) == (len(SnapshotStoragePrefix)+2*common.HashLength): storageSnapSize += size case bytes.HasPrefix(key, preimagePrefix) && len(key) == (len(preimagePrefix)+common.HashLength): preimageSize += size diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index d20658792..1b8e53eb6 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -53,9 +53,10 @@ var ( blockBodyPrefix = []byte("b") // blockBodyPrefix + num (uint64 big endian) + hash -> block body blockReceiptsPrefix = []byte("r") // blockReceiptsPrefix + num (uint64 big endian) + hash -> block receipts - txLookupPrefix = []byte("l") // txLookupPrefix + hash -> transaction/receipt lookup metadata - bloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash -> bloom bits - StateSnapshotPrefix = []byte("s") // StateSnapshotPrefix + account hash [+ storage hash] -> account/storage trie value + txLookupPrefix = []byte("l") // txLookupPrefix + hash -> transaction/receipt lookup metadata + bloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash -> bloom bits + SnapshotAccountPrefix = []byte("a") // SnapshotAccountPrefix + account hash -> account trie value + SnapshotStoragePrefix = []byte("s") // SnapshotStoragePrefix + account hash + storage hash -> storage trie value preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage configPrefix = []byte("ethereum-config-") // config prefix for the db @@ -149,19 +150,19 @@ func txLookupKey(hash common.Hash) []byte { return append(txLookupPrefix, hash.Bytes()...) } -// accountSnapshotKey = StateSnapshotPrefix + hash +// accountSnapshotKey = SnapshotAccountPrefix + hash func accountSnapshotKey(hash common.Hash) []byte { - return append(StateSnapshotPrefix, hash.Bytes()...) + return append(SnapshotAccountPrefix, hash.Bytes()...) } -// storageSnapshotKey = StateSnapshotPrefix + account hash + storage hash +// storageSnapshotKey = SnapshotStoragePrefix + account hash + storage hash func storageSnapshotKey(accountHash, storageHash common.Hash) []byte { - return append(append(StateSnapshotPrefix, accountHash.Bytes()...), storageHash.Bytes()...) + return append(append(SnapshotStoragePrefix, accountHash.Bytes()...), storageHash.Bytes()...) } -// storageSnapshotsKey = StateSnapshotPrefix + account hash + storage hash +// storageSnapshotsKey = SnapshotStoragePrefix + account hash + storage hash func storageSnapshotsKey(accountHash common.Hash) []byte { - return append(StateSnapshotPrefix, accountHash.Bytes()...) + return append(SnapshotStoragePrefix, accountHash.Bytes()...) } // bloomBitsKey = bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash diff --git a/core/state/snapshot/difflayer.go b/core/state/snapshot/difflayer.go index 7e8487ea8..0743e4759 100644 --- a/core/state/snapshot/difflayer.go +++ b/core/state/snapshot/difflayer.go @@ -17,13 +17,52 @@ package snapshot import ( + "encoding/binary" "fmt" + "math" "sort" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" + "github.com/steakknife/bloomfilter" +) + +var ( + // aggregatorMemoryLimit is the maximum size of the bottom-most diff layer + // that aggregates the writes from above until it's flushed into the disk + // layer. + // + // Note, bumping this up might drastically increase the size of the bloom + // filters that's stored in every diff layer. Don't do that without fully + // understanding all the implications. + aggregatorMemoryLimit = uint64(4 * 1024 * 1024) + + // aggregatorItemLimit is an approximate number of items that will end up + // in the agregator layer before it's flushed out to disk. A plain account + // weighs around 14B (+hash), a storage slot 32B (+hash), so 50 is a very + // rough average of what we might see. + aggregatorItemLimit = aggregatorMemoryLimit / 55 + + // bloomTargetError is the target false positive rate when the aggregator + // layer is at its fullest. The actual value will probably move around up + // and down from this number, it's mostly a ballpark figure. + // + // Note, dropping this down might drastically increase the size of the bloom + // filters that's stored in every diff layer. Don't do that without fully + // understanding all the implications. + bloomTargetError = 0.02 + + // bloomSize is the ideal bloom filter size given the maximum number of items + // it's expected to hold and the target false positive error rate. + bloomSize = math.Ceil(float64(aggregatorItemLimit) * math.Log(bloomTargetError) / math.Log(1/math.Pow(2, math.Log(2)))) + + // bloomFuncs is the ideal number of bits a single entry should set in the + // bloom filter to keep its size to a minimum (given it's size and maximum + // entry count). + bloomFuncs = math.Round((bloomSize / float64(aggregatorItemLimit)) * math.Log(2)) ) // diffLayer represents a collection of modifications made to a state snapshot @@ -33,8 +72,9 @@ import ( // The goal of a diff layer is to act as a journal, tracking recent modifications // made to the state, that have not yet graduated into a semi-immutable state. type diffLayer struct { - parent snapshot // Parent snapshot modified by this one, never nil - memory uint64 // Approximate guess as to how much memory we use + origin *diskLayer // Base disk layer to directly use on bloom misses + parent snapshot // Parent snapshot modified by this one, never nil + memory uint64 // Approximate guess as to how much memory we use root common.Hash // Root hash to which this snapshot diff belongs to stale bool // Signals that the layer became stale (state progressed) @@ -44,9 +84,39 @@ type diffLayer struct { storageList map[common.Hash][]common.Hash // List of storage slots for iterated retrievals, one per account. Any existing lists are sorted if non-nil storageData map[common.Hash]map[common.Hash][]byte // Keyed storage slots for direct retrival. one per account (nil means deleted) + diffed *bloomfilter.Filter // Bloom filter tracking all the diffed items up to the disk layer + lock sync.RWMutex } +// accountBloomHasher is a wrapper around a common.Hash to satisfy the interface +// API requirements of the bloom library used. It's used to convert an account +// hash into a 64 bit mini hash. +type accountBloomHasher common.Hash + +func (h accountBloomHasher) Write(p []byte) (n int, err error) { panic("not implemented") } +func (h accountBloomHasher) Sum(b []byte) []byte { panic("not implemented") } +func (h accountBloomHasher) Reset() { panic("not implemented") } +func (h accountBloomHasher) BlockSize() int { panic("not implemented") } +func (h accountBloomHasher) Size() int { return 8 } +func (h accountBloomHasher) Sum64() uint64 { + return binary.BigEndian.Uint64(h[:8]) +} + +// storageBloomHasher is a wrapper around a [2]common.Hash to satisfy the interface +// API requirements of the bloom library used. It's used to convert an account +// hash into a 64 bit mini hash. +type storageBloomHasher [2]common.Hash + +func (h storageBloomHasher) Write(p []byte) (n int, err error) { panic("not implemented") } +func (h storageBloomHasher) Sum(b []byte) []byte { panic("not implemented") } +func (h storageBloomHasher) Reset() { panic("not implemented") } +func (h storageBloomHasher) BlockSize() int { panic("not implemented") } +func (h storageBloomHasher) Size() int { return 8 } +func (h storageBloomHasher) Sum64() uint64 { + return binary.BigEndian.Uint64(h[0][:8]) ^ binary.BigEndian.Uint64(h[1][:8]) +} + // newDiffLayer creates a new diff on top of an existing snapshot, whether that's a low // level persistent database or a hierarchical diff already. func newDiffLayer(parent snapshot, root common.Hash, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer { @@ -57,9 +127,18 @@ func newDiffLayer(parent snapshot, root common.Hash, accounts map[common.Hash][] accountData: accounts, storageData: storage, } - // Determine mem size + switch parent := parent.(type) { + case *diskLayer: + dl.rebloom(parent) + case *diffLayer: + dl.rebloom(parent.origin) + default: + panic("unknown parent type") + } + // Determine memory size and track the dirty writes for _, data := range accounts { - dl.memory += uint64(len(data)) + dl.memory += uint64(common.HashLength + len(data)) + snapshotDirtyAccountWriteMeter.Mark(int64(len(data))) } // Fill the storage hashes and sort them for the iterator dl.storageList = make(map[common.Hash][]common.Hash) @@ -80,16 +159,56 @@ func newDiffLayer(parent snapshot, root common.Hash, accounts map[common.Hash][] if account, ok := accounts[accountHash]; account == nil || !ok { log.Error(fmt.Sprintf("storage in %#x exists, but account nil (exists: %v)", accountHash, ok)) } - // Determine mem size + // Determine memory size and track the dirty writes for _, data := range slots { - dl.memory += uint64(len(data)) + dl.memory += uint64(common.HashLength + len(data)) + snapshotDirtyStorageWriteMeter.Mark(int64(len(data))) } } dl.memory += uint64(len(dl.storageList) * common.HashLength) - return dl } +// rebloom discards the layer's current bloom and rebuilds it from scratch based +// on the parent's and the local diffs. +func (dl *diffLayer) rebloom(origin *diskLayer) { + dl.lock.Lock() + defer dl.lock.Unlock() + + defer func(start time.Time) { + snapshotBloomIndexTimer.Update(time.Since(start)) + }(time.Now()) + + // Inject the new origin that triggered the rebloom + dl.origin = origin + + // Retrieve the parent bloom or create a fresh empty one + if parent, ok := dl.parent.(*diffLayer); ok { + parent.lock.RLock() + dl.diffed, _ = parent.diffed.Copy() + parent.lock.RUnlock() + } else { + dl.diffed, _ = bloomfilter.New(uint64(bloomSize), uint64(bloomFuncs)) + } + // Iterate over all the accounts and storage slots and index them + for hash := range dl.accountData { + dl.diffed.Add(accountBloomHasher(hash)) + } + for accountHash, slots := range dl.storageData { + for storageHash := range slots { + dl.diffed.Add(storageBloomHasher{accountHash, storageHash}) + } + } + // Calculate the current false positive rate and update the error rate meter. + // This is a bit cheating because subsequent layers will overwrite it, but it + // should be fine, we're only interested in ballpark figures. + k := float64(dl.diffed.K()) + n := float64(dl.diffed.N()) + m := float64(dl.diffed.M()) + + snapshotBloomErrorGauge.Update(math.Pow(1.0-math.Exp((-k)*(n+0.5)/(m-1)), k)) +} + // Root returns the root hash for which this snapshot was made. func (dl *diffLayer) Root() common.Hash { return dl.root @@ -124,6 +243,26 @@ func (dl *diffLayer) Account(hash common.Hash) (*Account, error) { // AccountRLP directly retrieves the account RLP associated with a particular // hash in the snapshot slim data format. func (dl *diffLayer) AccountRLP(hash common.Hash) ([]byte, error) { + // Check the bloom filter first whether there's even a point in reaching into + // all the maps in all the layers below + dl.lock.RLock() + hit := dl.diffed.Contains(accountBloomHasher(hash)) + dl.lock.RUnlock() + + // If the bloom filter misses, don't even bother with traversing the memory + // diff layers, reach straight into the bottom persistent disk layer + if !hit { + snapshotBloomAccountMissMeter.Mark(1) + return dl.origin.AccountRLP(hash) + } + // The bloom filter hit, start poking in the internal maps + return dl.accountRLP(hash) +} + +// accountRLP is an internal version of AccountRLP that skips the bloom filter +// checks and uses the internal maps to try and retrieve the data. It's meant +// to be used if a higher layer's bloom filter hit already. +func (dl *diffLayer) accountRLP(hash common.Hash) ([]byte, error) { dl.lock.RLock() defer dl.lock.RUnlock() @@ -135,9 +274,17 @@ func (dl *diffLayer) AccountRLP(hash common.Hash) ([]byte, error) { // If the account is known locally, return it. Note, a nil account means it was // deleted, and is a different notion than an unknown account! if data, ok := dl.accountData[hash]; ok { + snapshotDirtyAccountHitMeter.Mark(1) + snapshotDirtyAccountReadMeter.Mark(int64(len(data))) + snapshotBloomAccountTrueHitMeter.Mark(1) return data, nil } // Account unknown to this diff, resolve from parent + if diff, ok := dl.parent.(*diffLayer); ok { + return diff.accountRLP(hash) + } + // Failed to resolve through diff layers, mark a bloom error and use the disk + snapshotBloomAccountFalseHitMeter.Mark(1) return dl.parent.AccountRLP(hash) } @@ -145,6 +292,26 @@ func (dl *diffLayer) AccountRLP(hash common.Hash) ([]byte, error) { // within a particular account. If the slot is unknown to this diff, it's parent // is consulted. func (dl *diffLayer) Storage(accountHash, storageHash common.Hash) ([]byte, error) { + // Check the bloom filter first whether there's even a point in reaching into + // all the maps in all the layers below + dl.lock.RLock() + hit := dl.diffed.Contains(storageBloomHasher{accountHash, storageHash}) + dl.lock.RUnlock() + + // If the bloom filter misses, don't even bother with traversing the memory + // diff layers, reach straight into the bottom persistent disk layer + if !hit { + snapshotBloomStorageMissMeter.Mark(1) + return dl.origin.Storage(accountHash, storageHash) + } + // The bloom filter hit, start poking in the internal maps + return dl.storage(accountHash, storageHash) +} + +// storage is an internal version of Storage that skips the bloom filter checks +// and uses the internal maps to try and retrieve the data. It's meant to be +// used if a higher layer's bloom filter hit already. +func (dl *diffLayer) storage(accountHash, storageHash common.Hash) ([]byte, error) { dl.lock.RLock() defer dl.lock.RUnlock() @@ -157,13 +324,23 @@ func (dl *diffLayer) Storage(accountHash, storageHash common.Hash) ([]byte, erro // account means it was deleted, and is a different notion than an unknown account! if storage, ok := dl.storageData[accountHash]; ok { if storage == nil { + snapshotDirtyStorageHitMeter.Mark(1) + snapshotBloomStorageTrueHitMeter.Mark(1) return nil, nil } if data, ok := storage[storageHash]; ok { + snapshotDirtyStorageHitMeter.Mark(1) + snapshotDirtyStorageReadMeter.Mark(int64(len(data))) + snapshotBloomStorageTrueHitMeter.Mark(1) return data, nil } } - // Account - or slot within - unknown to this diff, resolve from parent + // Storage slot unknown to this diff, resolve from parent + if diff, ok := dl.parent.(*diffLayer); ok { + return diff.storage(accountHash, storageHash) + } + // Failed to resolve through diff layers, mark a bloom error and use the disk + snapshotBloomStorageFalseHitMeter.Mark(1) return dl.parent.Storage(accountHash, storageHash) } @@ -224,22 +401,11 @@ func (dl *diffLayer) flatten() snapshot { storageData: parent.storageData, accountList: parent.accountList, accountData: parent.accountData, + diffed: dl.diffed, memory: parent.memory + dl.memory, } } -// Journal commits an entire diff hierarchy to disk into a single journal file. -// This is meant to be used during shutdown to persist the snapshot without -// flattening everything down (bad for reorgs). -func (dl *diffLayer) Journal() error { - writer, err := dl.journal() - if err != nil { - return err - } - writer.Close() - return nil -} - // AccountList returns a sorted list of all accounts in this difflayer. func (dl *diffLayer) AccountList() []common.Hash { dl.lock.Lock() diff --git a/core/state/snapshot/difflayer_journal.go b/core/state/snapshot/difflayer_journal.go deleted file mode 100644 index 5490531be..000000000 --- a/core/state/snapshot/difflayer_journal.go +++ /dev/null @@ -1,137 +0,0 @@ -// Copyright 2019 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package snapshot - -import ( - "bufio" - "fmt" - "io" - "os" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/rlp" -) - -// journalAccount is an account entry in a diffLayer's disk journal. -type journalAccount struct { - Hash common.Hash - Blob []byte -} - -// journalStorage is an account's storage map in a diffLayer's disk journal. -type journalStorage struct { - Hash common.Hash - Keys []common.Hash - Vals [][]byte -} - -// loadDiffLayer reads the next sections of a snapshot journal, reconstructing a new -// diff and verifying that it can be linked to the requested parent. -func loadDiffLayer(parent snapshot, r *rlp.Stream) (snapshot, error) { - // Read the next diff journal entry - var root common.Hash - if err := r.Decode(&root); err != nil { - // The first read may fail with EOF, marking the end of the journal - if err == io.EOF { - return parent, nil - } - return nil, fmt.Errorf("load diff root: %v", err) - } - var accounts []journalAccount - if err := r.Decode(&accounts); err != nil { - return nil, fmt.Errorf("load diff accounts: %v", err) - } - accountData := make(map[common.Hash][]byte) - for _, entry := range accounts { - accountData[entry.Hash] = entry.Blob - } - var storage []journalStorage - if err := r.Decode(&storage); err != nil { - return nil, fmt.Errorf("load diff storage: %v", err) - } - storageData := make(map[common.Hash]map[common.Hash][]byte) - for _, entry := range storage { - slots := make(map[common.Hash][]byte) - for i, key := range entry.Keys { - slots[key] = entry.Vals[i] - } - storageData[entry.Hash] = slots - } - return loadDiffLayer(newDiffLayer(parent, root, accountData, storageData), r) -} - -// journal is the internal version of Journal that also returns the journal file -// so subsequent layers know where to write to. -func (dl *diffLayer) journal() (io.WriteCloser, error) { - // If we've reached the bottom, open the journal - var writer io.WriteCloser - if parent, ok := dl.parent.(*diskLayer); ok { - file, err := os.Create(parent.journal) - if err != nil { - return nil, err - } - writer = file - } - // If we haven't reached the bottom yet, journal the parent first - if writer == nil { - file, err := dl.parent.(*diffLayer).journal() - if err != nil { - return nil, err - } - writer = file - } - dl.lock.RLock() - defer dl.lock.RUnlock() - - if dl.stale { - writer.Close() - return nil, ErrSnapshotStale - } - // Everything below was journalled, persist this layer too - buf := bufio.NewWriter(writer) - if err := rlp.Encode(buf, dl.root); err != nil { - buf.Flush() - writer.Close() - return nil, err - } - accounts := make([]journalAccount, 0, len(dl.accountData)) - for hash, blob := range dl.accountData { - accounts = append(accounts, journalAccount{Hash: hash, Blob: blob}) - } - if err := rlp.Encode(buf, accounts); err != nil { - buf.Flush() - writer.Close() - return nil, err - } - storage := make([]journalStorage, 0, len(dl.storageData)) - for hash, slots := range dl.storageData { - keys := make([]common.Hash, 0, len(slots)) - vals := make([][]byte, 0, len(slots)) - for key, val := range slots { - keys = append(keys, key) - vals = append(vals, val) - } - storage = append(storage, journalStorage{Hash: hash, Keys: keys, Vals: vals}) - } - if err := rlp.Encode(buf, storage); err != nil { - buf.Flush() - writer.Close() - return nil, err - } - buf.Flush() - return writer, nil -} diff --git a/core/state/snapshot/difflayer_test.go b/core/state/snapshot/difflayer_test.go index 7cd1e8062..9029bb04b 100644 --- a/core/state/snapshot/difflayer_test.go +++ b/core/state/snapshot/difflayer_test.go @@ -24,7 +24,9 @@ import ( "path" "testing" + "github.com/VictoriaMetrics/fastcache" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/rlp" ) @@ -61,7 +63,7 @@ func TestMergeBasics(t *testing.T) { } } // Add some (identical) layers on top - parent := newDiffLayer(emptyLayer{}, common.Hash{}, accounts, storage) + parent := newDiffLayer(emptyLayer(), common.Hash{}, accounts, storage) child := newDiffLayer(parent, common.Hash{}, accounts, storage) child = newDiffLayer(child, common.Hash{}, accounts, storage) child = newDiffLayer(child, common.Hash{}, accounts, storage) @@ -122,7 +124,7 @@ func TestMergeDelete(t *testing.T) { } // Add some flip-flopping layers on top - parent := newDiffLayer(emptyLayer{}, common.Hash{}, flip(), storage) + parent := newDiffLayer(emptyLayer(), common.Hash{}, flip(), storage) child := parent.Update(common.Hash{}, flop(), storage) child = child.Update(common.Hash{}, flip(), storage) child = child.Update(common.Hash{}, flop(), storage) @@ -165,7 +167,7 @@ func TestInsertAndMerge(t *testing.T) { { var accounts = make(map[common.Hash][]byte) var storage = make(map[common.Hash]map[common.Hash][]byte) - parent = newDiffLayer(emptyLayer{}, common.Hash{}, accounts, storage) + parent = newDiffLayer(emptyLayer(), common.Hash{}, accounts, storage) } { var accounts = make(map[common.Hash][]byte) @@ -186,34 +188,11 @@ func TestInsertAndMerge(t *testing.T) { } } -type emptyLayer struct{} - -func (emptyLayer) Update(blockRoot common.Hash, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer { - panic("implement me") -} - -func (emptyLayer) Journal() error { - panic("implement me") -} - -func (emptyLayer) Stale() bool { - panic("implement me") -} - -func (emptyLayer) Root() common.Hash { - return common.Hash{} -} - -func (emptyLayer) Account(hash common.Hash) (*Account, error) { - return nil, nil -} - -func (emptyLayer) AccountRLP(hash common.Hash) ([]byte, error) { - return nil, nil -} - -func (emptyLayer) Storage(accountHash, storageHash common.Hash) ([]byte, error) { - return nil, nil +func emptyLayer() *diskLayer { + return &diskLayer{ + diskdb: memorydb.New(), + cache: fastcache.New(500 * 1024), + } } // BenchmarkSearch checks how long it takes to find a non-existing key @@ -234,7 +213,7 @@ func BenchmarkSearch(b *testing.B) { return newDiffLayer(parent, common.Hash{}, accounts, storage) } var layer snapshot - layer = emptyLayer{} + layer = emptyLayer() for i := 0; i < 128; i++ { layer = fill(layer) } @@ -272,7 +251,7 @@ func BenchmarkSearchSlot(b *testing.B) { return newDiffLayer(parent, common.Hash{}, accounts, storage) } var layer snapshot - layer = emptyLayer{} + layer = emptyLayer() for i := 0; i < 128; i++ { layer = fill(layer) } @@ -313,7 +292,7 @@ func BenchmarkFlatten(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() var layer snapshot - layer = emptyLayer{} + layer = emptyLayer() for i := 1; i < 128; i++ { layer = fill(layer) } @@ -357,17 +336,14 @@ func BenchmarkJournal(b *testing.B) { } return newDiffLayer(parent, common.Hash{}, accounts, storage) } - var layer snapshot - layer = &diskLayer{ - journal: path.Join(os.TempDir(), "difflayer_journal.tmp"), - } + layer := snapshot(new(diskLayer)) for i := 1; i < 128; i++ { layer = fill(layer) } b.ResetTimer() for i := 0; i < b.N; i++ { - f, _ := layer.(*diffLayer).journal() + f, _, _ := layer.Journal(path.Join(os.TempDir(), "difflayer_journal.tmp")) f.Close() } } diff --git a/core/state/snapshot/disklayer.go b/core/state/snapshot/disklayer.go index 474182f1d..b1934d273 100644 --- a/core/state/snapshot/disklayer.go +++ b/core/state/snapshot/disklayer.go @@ -17,6 +17,7 @@ package snapshot import ( + "bytes" "sync" "github.com/VictoriaMetrics/fastcache" @@ -24,17 +25,21 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" ) // diskLayer is a low level persistent snapshot built on top of a key-value store. type diskLayer struct { - journal string // Path of the snapshot journal to use on shutdown - db ethdb.KeyValueStore // Key-value store containing the base snapshot - cache *fastcache.Cache // Cache to avoid hitting the disk for direct access + diskdb ethdb.KeyValueStore // Key-value store containing the base snapshot + triedb *trie.Database // Trie node cache for reconstuction purposes + cache *fastcache.Cache // Cache to avoid hitting the disk for direct access root common.Hash // Root hash of the base snapshot stale bool // Signals that the layer became stale (state progressed) + genMarker []byte // Marker for the state that's indexed during initial layer generation + genAbort chan chan *generatorStats // Notification channel to abort generating the snapshot in this layer + lock sync.RWMutex } @@ -80,18 +85,26 @@ func (dl *diskLayer) AccountRLP(hash common.Hash) ([]byte, error) { if dl.stale { return nil, ErrSnapshotStale } + // If the layer is being generated, ensure the requested hash has already been + // covered by the generator. + if dl.genMarker != nil && bytes.Compare(hash[:], dl.genMarker) > 0 { + return nil, ErrNotCoveredYet + } + // If we're in the disk layer, all diff layers missed + snapshotDirtyAccountMissMeter.Mark(1) + // Try to retrieve the account from the memory cache - if blob := dl.cache.Get(nil, hash[:]); blob != nil { - snapshotCleanHitMeter.Mark(1) - snapshotCleanReadMeter.Mark(int64(len(blob))) + if blob, found := dl.cache.HasGet(nil, hash[:]); found { + snapshotCleanAccountHitMeter.Mark(1) + snapshotCleanAccountReadMeter.Mark(int64(len(blob))) return blob, nil } // Cache doesn't contain account, pull from disk and cache for later - blob := rawdb.ReadAccountSnapshot(dl.db, hash) + blob := rawdb.ReadAccountSnapshot(dl.diskdb, hash) dl.cache.Set(hash[:], blob) - snapshotCleanMissMeter.Mark(1) - snapshotCleanWriteMeter.Mark(int64(len(blob))) + snapshotCleanAccountMissMeter.Mark(1) + snapshotCleanAccountWriteMeter.Mark(int64(len(blob))) return blob, nil } @@ -109,18 +122,26 @@ func (dl *diskLayer) Storage(accountHash, storageHash common.Hash) ([]byte, erro } key := append(accountHash[:], storageHash[:]...) + // If the layer is being generated, ensure the requested hash has already been + // covered by the generator. + if dl.genMarker != nil && bytes.Compare(key, dl.genMarker) > 0 { + return nil, ErrNotCoveredYet + } + // If we're in the disk layer, all diff layers missed + snapshotDirtyStorageMissMeter.Mark(1) + // Try to retrieve the storage slot from the memory cache - if blob := dl.cache.Get(nil, key); blob != nil { - snapshotCleanHitMeter.Mark(1) - snapshotCleanReadMeter.Mark(int64(len(blob))) + if blob, found := dl.cache.HasGet(nil, key); found { + snapshotCleanStorageHitMeter.Mark(1) + snapshotCleanStorageReadMeter.Mark(int64(len(blob))) return blob, nil } // Cache doesn't contain storage slot, pull from disk and cache for later - blob := rawdb.ReadStorageSnapshot(dl.db, accountHash, storageHash) + blob := rawdb.ReadStorageSnapshot(dl.diskdb, accountHash, storageHash) dl.cache.Set(key, blob) - snapshotCleanMissMeter.Mark(1) - snapshotCleanWriteMeter.Mark(int64(len(blob))) + snapshotCleanStorageMissMeter.Mark(1) + snapshotCleanStorageWriteMeter.Mark(int64(len(blob))) return blob, nil } @@ -131,9 +152,3 @@ func (dl *diskLayer) Storage(accountHash, storageHash common.Hash) ([]byte, erro func (dl *diskLayer) Update(blockHash common.Hash, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer { return newDiffLayer(dl, blockHash, accounts, storage) } - -// Journal commits an entire diff hierarchy to disk into a single journal file. -func (dl *diskLayer) Journal() error { - // There's no journalling a disk layer - return nil -} diff --git a/core/state/snapshot/disklayer_test.go b/core/state/snapshot/disklayer_test.go new file mode 100644 index 000000000..30b690454 --- /dev/null +++ b/core/state/snapshot/disklayer_test.go @@ -0,0 +1,433 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package snapshot + +import ( + "bytes" + "testing" + + "github.com/VictoriaMetrics/fastcache" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/ethdb/memorydb" +) + +// reverse reverses the contents of a byte slice. It's used to update random accs +// with deterministic changes. +func reverse(blob []byte) []byte { + res := make([]byte, len(blob)) + for i, b := range blob { + res[len(blob)-1-i] = b + } + return res +} + +// Tests that merging something into a disk layer persists it into the database +// and invalidates any previously written and cached values. +func TestDiskMerge(t *testing.T) { + // Create some accounts in the disk layer + db := memorydb.New() + + var ( + accNoModNoCache = common.Hash{0x1} + accNoModCache = common.Hash{0x2} + accModNoCache = common.Hash{0x3} + accModCache = common.Hash{0x4} + accDelNoCache = common.Hash{0x5} + accDelCache = common.Hash{0x6} + conNoModNoCache = common.Hash{0x7} + conNoModNoCacheSlot = common.Hash{0x70} + conNoModCache = common.Hash{0x8} + conNoModCacheSlot = common.Hash{0x80} + conModNoCache = common.Hash{0x9} + conModNoCacheSlot = common.Hash{0x90} + conModCache = common.Hash{0xa} + conModCacheSlot = common.Hash{0xa0} + conDelNoCache = common.Hash{0xb} + conDelNoCacheSlot = common.Hash{0xb0} + conDelCache = common.Hash{0xc} + conDelCacheSlot = common.Hash{0xc0} + conNukeNoCache = common.Hash{0xd} + conNukeNoCacheSlot = common.Hash{0xd0} + conNukeCache = common.Hash{0xe} + conNukeCacheSlot = common.Hash{0xe0} + baseRoot = randomHash() + diffRoot = randomHash() + ) + + rawdb.WriteAccountSnapshot(db, accNoModNoCache, accNoModNoCache[:]) + rawdb.WriteAccountSnapshot(db, accNoModCache, accNoModCache[:]) + rawdb.WriteAccountSnapshot(db, accModNoCache, accModNoCache[:]) + rawdb.WriteAccountSnapshot(db, accModCache, accModCache[:]) + rawdb.WriteAccountSnapshot(db, accDelNoCache, accDelNoCache[:]) + rawdb.WriteAccountSnapshot(db, accDelCache, accDelCache[:]) + + rawdb.WriteAccountSnapshot(db, conNoModNoCache, conNoModNoCache[:]) + rawdb.WriteStorageSnapshot(db, conNoModNoCache, conNoModNoCacheSlot, conNoModNoCacheSlot[:]) + rawdb.WriteAccountSnapshot(db, conNoModCache, conNoModCache[:]) + rawdb.WriteStorageSnapshot(db, conNoModCache, conNoModCacheSlot, conNoModCacheSlot[:]) + rawdb.WriteAccountSnapshot(db, conModNoCache, conModNoCache[:]) + rawdb.WriteStorageSnapshot(db, conModNoCache, conModNoCacheSlot, conModNoCacheSlot[:]) + rawdb.WriteAccountSnapshot(db, conModCache, conModCache[:]) + rawdb.WriteStorageSnapshot(db, conModCache, conModCacheSlot, conModCacheSlot[:]) + rawdb.WriteAccountSnapshot(db, conDelNoCache, conDelNoCache[:]) + rawdb.WriteStorageSnapshot(db, conDelNoCache, conDelNoCacheSlot, conDelNoCacheSlot[:]) + rawdb.WriteAccountSnapshot(db, conDelCache, conDelCache[:]) + rawdb.WriteStorageSnapshot(db, conDelCache, conDelCacheSlot, conDelCacheSlot[:]) + + rawdb.WriteAccountSnapshot(db, conNukeNoCache, conNukeNoCache[:]) + rawdb.WriteStorageSnapshot(db, conNukeNoCache, conNukeNoCacheSlot, conNukeNoCacheSlot[:]) + rawdb.WriteAccountSnapshot(db, conNukeCache, conNukeCache[:]) + rawdb.WriteStorageSnapshot(db, conNukeCache, conNukeCacheSlot, conNukeCacheSlot[:]) + + rawdb.WriteSnapshotRoot(db, baseRoot) + + // Create a disk layer based on the above and cache in some data + snaps := &Tree{ + layers: map[common.Hash]snapshot{ + baseRoot: &diskLayer{ + diskdb: db, + cache: fastcache.New(500 * 1024), + root: baseRoot, + }, + }, + } + base := snaps.Snapshot(baseRoot) + base.AccountRLP(accNoModCache) + base.AccountRLP(accModCache) + base.AccountRLP(accDelCache) + base.Storage(conNoModCache, conNoModCacheSlot) + base.Storage(conModCache, conModCacheSlot) + base.Storage(conDelCache, conDelCacheSlot) + base.Storage(conNukeCache, conNukeCacheSlot) + + // Modify or delete some accounts, flatten everything onto disk + if err := snaps.Update(diffRoot, baseRoot, map[common.Hash][]byte{ + accModNoCache: reverse(accModNoCache[:]), + accModCache: reverse(accModCache[:]), + accDelNoCache: nil, + accDelCache: nil, + conNukeNoCache: nil, + conNukeCache: nil, + }, map[common.Hash]map[common.Hash][]byte{ + conModNoCache: {conModNoCacheSlot: reverse(conModNoCacheSlot[:])}, + conModCache: {conModCacheSlot: reverse(conModCacheSlot[:])}, + conDelNoCache: {conDelNoCacheSlot: nil}, + conDelCache: {conDelCacheSlot: nil}, + }); err != nil { + t.Fatalf("failed to update snapshot tree: %v", err) + } + if err := snaps.Cap(diffRoot, 0); err != nil { + t.Fatalf("failed to flatten snapshot tree: %v", err) + } + // Retrieve all the data through the disk layer and validate it + base = snaps.Snapshot(diffRoot) + if _, ok := base.(*diskLayer); !ok { + t.Fatalf("update not flattend into the disk layer") + } + + // assertAccount ensures that an account matches the given blob. + assertAccount := func(account common.Hash, data []byte) { + t.Helper() + blob, err := base.AccountRLP(account) + if err != nil { + t.Errorf("account access (%x) failed: %v", account, err) + } else if !bytes.Equal(blob, data) { + t.Errorf("account access (%x) mismatch: have %x, want %x", account, blob, data) + } + } + assertAccount(accNoModNoCache, accNoModNoCache[:]) + assertAccount(accNoModCache, accNoModCache[:]) + assertAccount(accModNoCache, reverse(accModNoCache[:])) + assertAccount(accModCache, reverse(accModCache[:])) + assertAccount(accDelNoCache, nil) + assertAccount(accDelCache, nil) + + // assertStorage ensures that a storage slot matches the given blob. + assertStorage := func(account common.Hash, slot common.Hash, data []byte) { + t.Helper() + blob, err := base.Storage(account, slot) + if err != nil { + t.Errorf("storage access (%x:%x) failed: %v", account, slot, err) + } else if !bytes.Equal(blob, data) { + t.Errorf("storage access (%x:%x) mismatch: have %x, want %x", account, slot, blob, data) + } + } + assertStorage(conNoModNoCache, conNoModNoCacheSlot, conNoModNoCacheSlot[:]) + assertStorage(conNoModCache, conNoModCacheSlot, conNoModCacheSlot[:]) + assertStorage(conModNoCache, conModNoCacheSlot, reverse(conModNoCacheSlot[:])) + assertStorage(conModCache, conModCacheSlot, reverse(conModCacheSlot[:])) + assertStorage(conDelNoCache, conDelNoCacheSlot, nil) + assertStorage(conDelCache, conDelCacheSlot, nil) + assertStorage(conNukeNoCache, conNukeNoCacheSlot, nil) + assertStorage(conNukeCache, conNukeCacheSlot, nil) + + // Retrieve all the data directly from the database and validate it + + // assertDatabaseAccount ensures that an account from the database matches the given blob. + assertDatabaseAccount := func(account common.Hash, data []byte) { + t.Helper() + if blob := rawdb.ReadAccountSnapshot(db, account); !bytes.Equal(blob, data) { + t.Errorf("account database access (%x) mismatch: have %x, want %x", account, blob, data) + } + } + assertDatabaseAccount(accNoModNoCache, accNoModNoCache[:]) + assertDatabaseAccount(accNoModCache, accNoModCache[:]) + assertDatabaseAccount(accModNoCache, reverse(accModNoCache[:])) + assertDatabaseAccount(accModCache, reverse(accModCache[:])) + assertDatabaseAccount(accDelNoCache, nil) + assertDatabaseAccount(accDelCache, nil) + + // assertDatabaseStorage ensures that a storage slot from the database matches the given blob. + assertDatabaseStorage := func(account common.Hash, slot common.Hash, data []byte) { + t.Helper() + if blob := rawdb.ReadStorageSnapshot(db, account, slot); !bytes.Equal(blob, data) { + t.Errorf("storage database access (%x:%x) mismatch: have %x, want %x", account, slot, blob, data) + } + } + assertDatabaseStorage(conNoModNoCache, conNoModNoCacheSlot, conNoModNoCacheSlot[:]) + assertDatabaseStorage(conNoModCache, conNoModCacheSlot, conNoModCacheSlot[:]) + assertDatabaseStorage(conModNoCache, conModNoCacheSlot, reverse(conModNoCacheSlot[:])) + assertDatabaseStorage(conModCache, conModCacheSlot, reverse(conModCacheSlot[:])) + assertDatabaseStorage(conDelNoCache, conDelNoCacheSlot, nil) + assertDatabaseStorage(conDelCache, conDelCacheSlot, nil) + assertDatabaseStorage(conNukeNoCache, conNukeNoCacheSlot, nil) + assertDatabaseStorage(conNukeCache, conNukeCacheSlot, nil) +} + +// Tests that merging something into a disk layer persists it into the database +// and invalidates any previously written and cached values, discarding anything +// after the in-progress generation marker. +func TestDiskPartialMerge(t *testing.T) { + // Iterate the test a few times to ensure we pick various internal orderings + // for the data slots as well as the progress marker. + for i := 0; i < 1024; i++ { + // Create some accounts in the disk layer + db := memorydb.New() + + var ( + accNoModNoCache = randomHash() + accNoModCache = randomHash() + accModNoCache = randomHash() + accModCache = randomHash() + accDelNoCache = randomHash() + accDelCache = randomHash() + conNoModNoCache = randomHash() + conNoModNoCacheSlot = randomHash() + conNoModCache = randomHash() + conNoModCacheSlot = randomHash() + conModNoCache = randomHash() + conModNoCacheSlot = randomHash() + conModCache = randomHash() + conModCacheSlot = randomHash() + conDelNoCache = randomHash() + conDelNoCacheSlot = randomHash() + conDelCache = randomHash() + conDelCacheSlot = randomHash() + conNukeNoCache = randomHash() + conNukeNoCacheSlot = randomHash() + conNukeCache = randomHash() + conNukeCacheSlot = randomHash() + baseRoot = randomHash() + diffRoot = randomHash() + genMarker = append(randomHash().Bytes(), randomHash().Bytes()...) + ) + + // insertAccount injects an account into the database if it's after the + // generator marker, drops the op otherwise. This is needed to seed the + // database with a valid starting snapshot. + insertAccount := func(account common.Hash, data []byte) { + if bytes.Compare(account[:], genMarker) <= 0 { + rawdb.WriteAccountSnapshot(db, account, data[:]) + } + } + insertAccount(accNoModNoCache, accNoModNoCache[:]) + insertAccount(accNoModCache, accNoModCache[:]) + insertAccount(accModNoCache, accModNoCache[:]) + insertAccount(accModCache, accModCache[:]) + insertAccount(accDelNoCache, accDelNoCache[:]) + insertAccount(accDelCache, accDelCache[:]) + + // insertStorage injects a storage slot into the database if it's after + // the generator marker, drops the op otherwise. This is needed to seed + // the database with a valid starting snapshot. + insertStorage := func(account common.Hash, slot common.Hash, data []byte) { + if bytes.Compare(append(account[:], slot[:]...), genMarker) <= 0 { + rawdb.WriteStorageSnapshot(db, account, slot, data[:]) + } + } + insertAccount(conNoModNoCache, conNoModNoCache[:]) + insertStorage(conNoModNoCache, conNoModNoCacheSlot, conNoModNoCacheSlot[:]) + insertAccount(conNoModCache, conNoModCache[:]) + insertStorage(conNoModCache, conNoModCacheSlot, conNoModCacheSlot[:]) + insertAccount(conModNoCache, conModNoCache[:]) + insertStorage(conModNoCache, conModNoCacheSlot, conModNoCacheSlot[:]) + insertAccount(conModCache, conModCache[:]) + insertStorage(conModCache, conModCacheSlot, conModCacheSlot[:]) + insertAccount(conDelNoCache, conDelNoCache[:]) + insertStorage(conDelNoCache, conDelNoCacheSlot, conDelNoCacheSlot[:]) + insertAccount(conDelCache, conDelCache[:]) + insertStorage(conDelCache, conDelCacheSlot, conDelCacheSlot[:]) + + insertAccount(conNukeNoCache, conNukeNoCache[:]) + insertStorage(conNukeNoCache, conNukeNoCacheSlot, conNukeNoCacheSlot[:]) + insertAccount(conNukeCache, conNukeCache[:]) + insertStorage(conNukeCache, conNukeCacheSlot, conNukeCacheSlot[:]) + + rawdb.WriteSnapshotRoot(db, baseRoot) + + // Create a disk layer based on the above using a random progress marker + // and cache in some data. + snaps := &Tree{ + layers: map[common.Hash]snapshot{ + baseRoot: &diskLayer{ + diskdb: db, + cache: fastcache.New(500 * 1024), + root: baseRoot, + }, + }, + } + snaps.layers[baseRoot].(*diskLayer).genMarker = genMarker + base := snaps.Snapshot(baseRoot) + + // assertAccount ensures that an account matches the given blob if it's + // already covered by the disk snapshot, and errors out otherwise. + assertAccount := func(account common.Hash, data []byte) { + t.Helper() + blob, err := base.AccountRLP(account) + if bytes.Compare(account[:], genMarker) > 0 && err != ErrNotCoveredYet { + t.Fatalf("test %d: post-marker (%x) account access (%x) succeded: %x", i, genMarker, account, blob) + } + if bytes.Compare(account[:], genMarker) <= 0 && !bytes.Equal(blob, data) { + t.Fatalf("test %d: pre-marker (%x) account access (%x) mismatch: have %x, want %x", i, genMarker, account, blob, data) + } + } + assertAccount(accNoModCache, accNoModCache[:]) + assertAccount(accModCache, accModCache[:]) + assertAccount(accDelCache, accDelCache[:]) + + // assertStorage ensures that a storage slot matches the given blob if + // it's already covered by the disk snapshot, and errors out otherwise. + assertStorage := func(account common.Hash, slot common.Hash, data []byte) { + t.Helper() + blob, err := base.Storage(account, slot) + if bytes.Compare(append(account[:], slot[:]...), genMarker) > 0 && err != ErrNotCoveredYet { + t.Fatalf("test %d: post-marker (%x) storage access (%x:%x) succeded: %x", i, genMarker, account, slot, blob) + } + if bytes.Compare(append(account[:], slot[:]...), genMarker) <= 0 && !bytes.Equal(blob, data) { + t.Fatalf("test %d: pre-marker (%x) storage access (%x:%x) mismatch: have %x, want %x", i, genMarker, account, slot, blob, data) + } + } + assertStorage(conNoModCache, conNoModCacheSlot, conNoModCacheSlot[:]) + assertStorage(conModCache, conModCacheSlot, conModCacheSlot[:]) + assertStorage(conDelCache, conDelCacheSlot, conDelCacheSlot[:]) + assertStorage(conNukeCache, conNukeCacheSlot, conNukeCacheSlot[:]) + + // Modify or delete some accounts, flatten everything onto disk + if err := snaps.Update(diffRoot, baseRoot, map[common.Hash][]byte{ + accModNoCache: reverse(accModNoCache[:]), + accModCache: reverse(accModCache[:]), + accDelNoCache: nil, + accDelCache: nil, + conNukeNoCache: nil, + conNukeCache: nil, + }, map[common.Hash]map[common.Hash][]byte{ + conModNoCache: {conModNoCacheSlot: reverse(conModNoCacheSlot[:])}, + conModCache: {conModCacheSlot: reverse(conModCacheSlot[:])}, + conDelNoCache: {conDelNoCacheSlot: nil}, + conDelCache: {conDelCacheSlot: nil}, + }); err != nil { + t.Fatalf("test %d: failed to update snapshot tree: %v", i, err) + } + if err := snaps.Cap(diffRoot, 0); err != nil { + t.Fatalf("test %d: failed to flatten snapshot tree: %v", i, err) + } + // Retrieve all the data through the disk layer and validate it + base = snaps.Snapshot(diffRoot) + if _, ok := base.(*diskLayer); !ok { + t.Fatalf("test %d: update not flattend into the disk layer", i) + } + assertAccount(accNoModNoCache, accNoModNoCache[:]) + assertAccount(accNoModCache, accNoModCache[:]) + assertAccount(accModNoCache, reverse(accModNoCache[:])) + assertAccount(accModCache, reverse(accModCache[:])) + assertAccount(accDelNoCache, nil) + assertAccount(accDelCache, nil) + + assertStorage(conNoModNoCache, conNoModNoCacheSlot, conNoModNoCacheSlot[:]) + assertStorage(conNoModCache, conNoModCacheSlot, conNoModCacheSlot[:]) + assertStorage(conModNoCache, conModNoCacheSlot, reverse(conModNoCacheSlot[:])) + assertStorage(conModCache, conModCacheSlot, reverse(conModCacheSlot[:])) + assertStorage(conDelNoCache, conDelNoCacheSlot, nil) + assertStorage(conDelCache, conDelCacheSlot, nil) + assertStorage(conNukeNoCache, conNukeNoCacheSlot, nil) + assertStorage(conNukeCache, conNukeCacheSlot, nil) + + // Retrieve all the data directly from the database and validate it + + // assertDatabaseAccount ensures that an account inside the database matches + // the given blob if it's already covered by the disk snapshot, and does not + // exist otherwise. + assertDatabaseAccount := func(account common.Hash, data []byte) { + t.Helper() + blob := rawdb.ReadAccountSnapshot(db, account) + if bytes.Compare(account[:], genMarker) > 0 && blob != nil { + t.Fatalf("test %d: post-marker (%x) account database access (%x) succeded: %x", i, genMarker, account, blob) + } + if bytes.Compare(account[:], genMarker) <= 0 && !bytes.Equal(blob, data) { + t.Fatalf("test %d: pre-marker (%x) account database access (%x) mismatch: have %x, want %x", i, genMarker, account, blob, data) + } + } + assertDatabaseAccount(accNoModNoCache, accNoModNoCache[:]) + assertDatabaseAccount(accNoModCache, accNoModCache[:]) + assertDatabaseAccount(accModNoCache, reverse(accModNoCache[:])) + assertDatabaseAccount(accModCache, reverse(accModCache[:])) + assertDatabaseAccount(accDelNoCache, nil) + assertDatabaseAccount(accDelCache, nil) + + // assertDatabaseStorage ensures that a storage slot inside the database + // matches the given blob if it's already covered by the disk snapshot, + // and does not exist otherwise. + assertDatabaseStorage := func(account common.Hash, slot common.Hash, data []byte) { + t.Helper() + blob := rawdb.ReadStorageSnapshot(db, account, slot) + if bytes.Compare(append(account[:], slot[:]...), genMarker) > 0 && blob != nil { + t.Fatalf("test %d: post-marker (%x) storage database access (%x:%x) succeded: %x", i, genMarker, account, slot, blob) + } + if bytes.Compare(append(account[:], slot[:]...), genMarker) <= 0 && !bytes.Equal(blob, data) { + t.Fatalf("test %d: pre-marker (%x) storage database access (%x:%x) mismatch: have %x, want %x", i, genMarker, account, slot, blob, data) + } + } + assertDatabaseStorage(conNoModNoCache, conNoModNoCacheSlot, conNoModNoCacheSlot[:]) + assertDatabaseStorage(conNoModCache, conNoModCacheSlot, conNoModCacheSlot[:]) + assertDatabaseStorage(conModNoCache, conModNoCacheSlot, reverse(conModNoCacheSlot[:])) + assertDatabaseStorage(conModCache, conModCacheSlot, reverse(conModCacheSlot[:])) + assertDatabaseStorage(conDelNoCache, conDelNoCacheSlot, nil) + assertDatabaseStorage(conDelCache, conDelCacheSlot, nil) + assertDatabaseStorage(conNukeNoCache, conNukeNoCacheSlot, nil) + assertDatabaseStorage(conNukeCache, conNukeCacheSlot, nil) + } +} + +// Tests that merging something into a disk layer persists it into the database +// and invalidates any previously written and cached values, discarding anything +// after the in-progress generation marker. +// +// This test case is a tiny specialized case of TestDiskPartialMerge, which tests +// some very specific cornercases that random tests won't ever trigger. +func TestDiskMidAccountPartialMerge(t *testing.T) { +} diff --git a/core/state/snapshot/generate.go b/core/state/snapshot/generate.go index 445a6ebd9..0f9e5fae5 100644 --- a/core/state/snapshot/generate.go +++ b/core/state/snapshot/generate.go @@ -18,12 +18,13 @@ package snapshot import ( "bytes" - "fmt" + "encoding/binary" "math/big" "time" "github.com/VictoriaMetrics/fastcache" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" @@ -40,103 +41,122 @@ var ( emptyCode = crypto.Keccak256Hash(nil) ) -// wipeSnapshot iterates over the entire key-value database and deletes all the -// data associated with the snapshot (accounts, storage, metadata). After all is -// done, the snapshot range of the database is compacted to free up unused data -// blocks. -func wipeSnapshot(db ethdb.KeyValueStore) error { - // Batch deletions together to avoid holding an iterator for too long - var ( - batch = db.NewBatch() - items int - ) - // Iterate over the snapshot key-range and delete all of them - log.Info("Deleting previous snapshot leftovers") - start, logged := time.Now(), time.Now() - - it := db.NewIteratorWithStart(rawdb.StateSnapshotPrefix) - for it.Next() { - // Skip any keys with the correct prefix but wrong lenth (trie nodes) - key := it.Key() - if !bytes.HasPrefix(key, rawdb.StateSnapshotPrefix) { - break - } - if len(key) != len(rawdb.StateSnapshotPrefix)+common.HashLength && len(key) != len(rawdb.StateSnapshotPrefix)+2*common.HashLength { - continue - } - // Delete the key and periodically recreate the batch and iterator - batch.Delete(key) - items++ - - if items%10000 == 0 { - // Batch too large (or iterator too long lived, flush and recreate) - it.Release() - if err := batch.Write(); err != nil { - return err - } - batch.Reset() - it = db.NewIteratorWithStart(key) - - if time.Since(logged) > 8*time.Second { - log.Info("Deleting previous snapshot leftovers", "wiped", items, "elapsed", time.Since(start)) - logged = time.Now() - } - } - } - it.Release() - - rawdb.DeleteSnapshotRoot(batch) - if err := batch.Write(); err != nil { - return err - } - log.Info("Deleted previous snapshot leftovers", "wiped", items, "elapsed", time.Since(start)) - - // Compact the snapshot section of the database to get rid of unused space - log.Info("Compacting snapshot area in database") - start = time.Now() - - end := common.CopyBytes(rawdb.StateSnapshotPrefix) - end[len(end)-1]++ - - if err := db.Compact(rawdb.StateSnapshotPrefix, end); err != nil { - return err - } - log.Info("Compacted snapshot area in database", "elapsed", time.Since(start)) - - return nil +// generatorStats is a collection of statistics gathered by the snapshot generator +// for logging purposes. +type generatorStats struct { + wiping chan struct{} // Notification channel if wiping is in progress + origin uint64 // Origin prefix where generation started + start time.Time // Timestamp when generation started + accounts uint64 // Number of accounts indexed + slots uint64 // Number of storage slots indexed + storage common.StorageSize // Account and storage slot size } -// generateSnapshot regenerates a brand new snapshot based on an existing state database and head block. -func generateSnapshot(db ethdb.KeyValueStore, journal string, root common.Hash) (snapshot, error) { - // Wipe any previously existing snapshot from the database - if err := wipeSnapshot(db); err != nil { - return nil, err - } - // Iterate the entire storage trie and re-generate the state snapshot - var ( - accountCount int - storageCount int - storageNodes int - accountSize common.StorageSize - storageSize common.StorageSize - logged time.Time - ) - batch := db.NewBatch() - triedb := trie.NewDatabase(db) +// Log creates an contextual log with the given message and the context pulled +// from the internally maintained statistics. +func (gs *generatorStats) Log(msg string, marker []byte) { + var ctx []interface{} - accTrie, err := trie.NewSecure(root, triedb) - if err != nil { - return nil, err + // Figure out whether we're after or within an account + switch len(marker) { + case common.HashLength: + ctx = append(ctx, []interface{}{"at", common.BytesToHash(marker)}...) + case 2 * common.HashLength: + ctx = append(ctx, []interface{}{ + "in", common.BytesToHash(marker[:common.HashLength]), + "at", common.BytesToHash(marker[common.HashLength:]), + }...) } - accIt := trie.NewIterator(accTrie.NodeIterator(nil)) + // Add the usual measurements + ctx = append(ctx, []interface{}{ + "accounts", gs.accounts, + "slots", gs.slots, + "storage", gs.storage, + "elapsed", common.PrettyDuration(time.Since(gs.start)), + }...) + // Calculate the estimated indexing time based on current stats + if len(marker) > 0 { + if done := binary.BigEndian.Uint64(marker[:8]) - gs.origin; done > 0 { + left := math.MaxUint64 - binary.BigEndian.Uint64(marker[:8]) + + speed := done/uint64(time.Since(gs.start)/time.Millisecond+1) + 1 // +1s to avoid division by zero + ctx = append(ctx, []interface{}{ + "eta", common.PrettyDuration(time.Duration(left/speed) * time.Millisecond), + }...) + } + } + log.Info(msg, ctx...) +} + +// generateSnapshot regenerates a brand new snapshot based on an existing state +// database and head block asynchronously. The snapshot is returned immediately +// and generation is continued in the background until done. +func generateSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, wiper chan struct{}) *diskLayer { + // Wipe any previously existing snapshot from the database if no wiper is + // currenty in progress. + if wiper == nil { + wiper = wipeSnapshot(diskdb, true) + } + // Create a new disk layer with an initialized state marker at zero + rawdb.WriteSnapshotRoot(diskdb, root) + + base := &diskLayer{ + diskdb: diskdb, + triedb: triedb, + root: root, + cache: fastcache.New(cache * 1024 * 1024), + genMarker: []byte{}, // Initialized but empty! + genAbort: make(chan chan *generatorStats), + } + go base.generate(&generatorStats{wiping: wiper, start: time.Now()}) + return base +} + +// generate is a background thread that iterates over the state and storage tries, +// constructing the state snapshot. All the arguments are purely for statistics +// gethering and logging, since the method surfs the blocks as they arrive, often +// being restarted. +func (dl *diskLayer) generate(stats *generatorStats) { + // If a database wipe is in operation, wait until it's done + if stats.wiping != nil { + stats.Log("Wiper running, state snapshotting paused", dl.genMarker) + select { + // If wiper is done, resume normal mode of operation + case <-stats.wiping: + stats.wiping = nil + stats.start = time.Now() + + // If generator was aboted during wipe, return + case abort := <-dl.genAbort: + abort <- stats + return + } + } + // Create an account and state iterator pointing to the current generator marker + accTrie, err := trie.NewSecure(dl.root, dl.triedb) + if err != nil { + // The account trie is missing (GC), surf the chain until one becomes available + stats.Log("Trie missing, state snapshotting paused", dl.genMarker) + + abort := <-dl.genAbort + abort <- stats + return + } + stats.Log("Resuming state snapshot generation", dl.genMarker) + + var accMarker []byte + if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that + accMarker = dl.genMarker[:common.HashLength] + } + accIt := trie.NewIterator(accTrie.NodeIterator(accMarker)) + batch := dl.diskdb.NewBatch() + + // Iterate from the previous marker and continue generating the state snapshot + logged := time.Now() for accIt.Next() { - var ( - curStorageCount int - curStorageNodes int - curAccountSize common.StorageSize - curStorageSize common.StorageSize - accountHash = common.BytesToHash(accIt.Key) - ) + // Retrieve the current account and flatten it into the internal format + accountHash := common.BytesToHash(accIt.Key) + var acc struct { Nonce uint64 Balance *big.Int @@ -144,63 +164,97 @@ func generateSnapshot(db ethdb.KeyValueStore, journal string, root common.Hash) CodeHash []byte } if err := rlp.DecodeBytes(accIt.Value, &acc); err != nil { - return nil, err + log.Crit("Invalid account encountered during snapshot creation", "err", err) } data := AccountRLP(acc.Nonce, acc.Balance, acc.Root, acc.CodeHash) - curAccountSize += common.StorageSize(1 + common.HashLength + len(data)) - rawdb.WriteAccountSnapshot(batch, accountHash, data) - if batch.ValueSize() > ethdb.IdealBatchSize { - batch.Write() - batch.Reset() + // If the account is not yet in-progress, write it out + if accMarker == nil || !bytes.Equal(accountHash[:], accMarker) { + rawdb.WriteAccountSnapshot(batch, accountHash, data) + stats.storage += common.StorageSize(1 + common.HashLength + len(data)) + stats.accounts++ } - if acc.Root != emptyRoot { - storeTrie, err := trie.NewSecure(acc.Root, triedb) - if err != nil { - return nil, err - } - storeIt := trie.NewIterator(storeTrie.NodeIterator(nil)) - for storeIt.Next() { - curStorageSize += common.StorageSize(1 + 2*common.HashLength + len(storeIt.Value)) - curStorageCount++ + // If we've exceeded our batch allowance or termination was requested, flush to disk + var abort chan *generatorStats + select { + case abort = <-dl.genAbort: + default: + } + if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil { + // Only write and set the marker if we actually did something useful + if batch.ValueSize() > 0 { + batch.Write() + batch.Reset() + dl.lock.Lock() + dl.genMarker = accountHash[:] + dl.lock.Unlock() + } + if abort != nil { + stats.Log("Aborting state snapshot generation", accountHash[:]) + abort <- stats + return + } + } + // If the account is in-progress, continue where we left off (otherwise iterate all) + if acc.Root != emptyRoot { + storeTrie, err := trie.NewSecure(acc.Root, dl.triedb) + if err != nil { + log.Crit("Storage trie inaccessible for snapshot generation", "err", err) + } + var storeMarker []byte + if accMarker != nil && bytes.Equal(accountHash[:], accMarker) && len(dl.genMarker) > common.HashLength { + storeMarker = dl.genMarker[common.HashLength:] + } + storeIt := trie.NewIterator(storeTrie.NodeIterator(storeMarker)) + for storeIt.Next() { rawdb.WriteStorageSnapshot(batch, accountHash, common.BytesToHash(storeIt.Key), storeIt.Value) - if batch.ValueSize() > ethdb.IdealBatchSize { - batch.Write() - batch.Reset() + stats.storage += common.StorageSize(1 + 2*common.HashLength + len(storeIt.Value)) + stats.slots++ + + // If we've exceeded our batch allowance or termination was requested, flush to disk + var abort chan *generatorStats + select { + case abort = <-dl.genAbort: + default: + } + if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil { + // Only write and set the marker if we actually did something useful + if batch.ValueSize() > 0 { + batch.Write() + batch.Reset() + + dl.lock.Lock() + dl.genMarker = append(accountHash[:], storeIt.Key...) + dl.lock.Unlock() + } + if abort != nil { + stats.Log("Aborting state snapshot generation", append(accountHash[:], storeIt.Key...)) + abort <- stats + return + } } } - curStorageNodes = storeIt.Nodes } - accountCount++ - storageCount += curStorageCount - accountSize += curAccountSize - storageSize += curStorageSize - storageNodes += curStorageNodes - if time.Since(logged) > 8*time.Second { - fmt.Printf("%#x: %9s + %9s (%6d slots, %6d nodes), total %9s (%d accs, %d nodes) + %9s (%d slots, %d nodes)\n", accIt.Key, curAccountSize.TerminalString(), curStorageSize.TerminalString(), curStorageCount, curStorageNodes, accountSize.TerminalString(), accountCount, accIt.Nodes, storageSize.TerminalString(), storageCount, storageNodes) + stats.Log("Generating state snapshot", accIt.Key) logged = time.Now() } + // Some account processed, unmark the marker + accMarker = nil } - fmt.Printf("Totals: %9s (%d accs, %d nodes) + %9s (%d slots, %d nodes)\n", accountSize.TerminalString(), accountCount, accIt.Nodes, storageSize.TerminalString(), storageCount, storageNodes) - - // Update the snapshot block marker and write any remainder data - rawdb.WriteSnapshotRoot(batch, root) - batch.Write() - batch.Reset() - - // Compact the snapshot section of the database to get rid of unused space - log.Info("Compacting snapshot in chain database") - if err := db.Compact([]byte{'s'}, []byte{'s' + 1}); err != nil { - return nil, err + // Snapshot fully generated, set the marker to nil + if batch.ValueSize() > 0 { + batch.Write() } - // New snapshot generated, construct a brand new base layer - cache := fastcache.New(512 * 1024 * 1024) - return &diskLayer{ - journal: journal, - db: db, - cache: cache, - root: root, - }, nil + log.Info("Generated state snapshot", "accounts", stats.accounts, "slots", stats.slots, + "storage", stats.storage, "elapsed", common.PrettyDuration(time.Since(stats.start))) + + dl.lock.Lock() + dl.genMarker = nil + dl.lock.Unlock() + + // Someone will be looking for us, wait it out + abort := <-dl.genAbort + abort <- nil } diff --git a/core/state/snapshot/journal.go b/core/state/snapshot/journal.go new file mode 100644 index 000000000..1c6c63a0b --- /dev/null +++ b/core/state/snapshot/journal.go @@ -0,0 +1,257 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package snapshot + +import ( + "bufio" + "encoding/binary" + "errors" + "fmt" + "io" + "os" + "time" + + "github.com/VictoriaMetrics/fastcache" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" +) + +// journalGenerator is a disk layer entry containing the generator progress marker. +type journalGenerator struct { + Wiping bool // Whether the database was in progress of being wiped + Done bool // Whether the generator finished creating the snapshot + Marker []byte + Accounts uint64 + Slots uint64 + Storage uint64 +} + +// journalAccount is an account entry in a diffLayer's disk journal. +type journalAccount struct { + Hash common.Hash + Blob []byte +} + +// journalStorage is an account's storage map in a diffLayer's disk journal. +type journalStorage struct { + Hash common.Hash + Keys []common.Hash + Vals [][]byte +} + +// loadSnapshot loads a pre-existing state snapshot backed by a key-value store. +func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, journal string, cache int, root common.Hash) (snapshot, error) { + // Retrieve the block number and hash of the snapshot, failing if no snapshot + // is present in the database (or crashed mid-update). + baseRoot := rawdb.ReadSnapshotRoot(diskdb) + if baseRoot == (common.Hash{}) { + return nil, errors.New("missing or corrupted snapshot") + } + base := &diskLayer{ + diskdb: diskdb, + triedb: triedb, + cache: fastcache.New(cache * 1024 * 1024), + root: baseRoot, + } + // Open the journal, it must exist since even for 0 layer it stores whether + // we've already generated the snapshot or are in progress only + file, err := os.Open(journal) + if err != nil { + return nil, err + } + r := rlp.NewStream(file, 0) + + // Read the snapshot generation progress for the disk layer + var generator journalGenerator + if err := r.Decode(&generator); err != nil { + return nil, fmt.Errorf("failed to load snapshot progress marker: %v", err) + } + // Load all the snapshot diffs from the journal + snapshot, err := loadDiffLayer(base, r) + if err != nil { + return nil, err + } + // Entire snapshot journal loaded, sanity check the head and return + // Journal doesn't exist, don't worry if it's not supposed to + if head := snapshot.Root(); head != root { + return nil, fmt.Errorf("head doesn't match snapshot: have %#x, want %#x", head, root) + } + // Everything loaded correctly, resume any suspended operations + if !generator.Done { + // If the generator was still wiping, restart one from scratch (fine for + // now as it's rare and the wiper deletes the stuff it touches anyway, so + // restarting won't incur a lot of extra database hops. + var wiper chan struct{} + if generator.Wiping { + log.Info("Resuming previous snapshot wipe") + wiper = wipeSnapshot(diskdb, false) + } + // Whether or not wiping was in progress, load any generator progress too + base.genMarker = generator.Marker + if base.genMarker == nil { + base.genMarker = []byte{} + } + base.genAbort = make(chan chan *generatorStats) + + var origin uint64 + if len(generator.Marker) >= 8 { + origin = binary.BigEndian.Uint64(generator.Marker) + } + go base.generate(&generatorStats{ + wiping: wiper, + origin: origin, + start: time.Now(), + accounts: generator.Accounts, + slots: generator.Slots, + storage: common.StorageSize(generator.Storage), + }) + } + return snapshot, nil +} + +// loadDiffLayer reads the next sections of a snapshot journal, reconstructing a new +// diff and verifying that it can be linked to the requested parent. +func loadDiffLayer(parent snapshot, r *rlp.Stream) (snapshot, error) { + // Read the next diff journal entry + var root common.Hash + if err := r.Decode(&root); err != nil { + // The first read may fail with EOF, marking the end of the journal + if err == io.EOF { + return parent, nil + } + return nil, fmt.Errorf("load diff root: %v", err) + } + var accounts []journalAccount + if err := r.Decode(&accounts); err != nil { + return nil, fmt.Errorf("load diff accounts: %v", err) + } + accountData := make(map[common.Hash][]byte) + for _, entry := range accounts { + accountData[entry.Hash] = entry.Blob + } + var storage []journalStorage + if err := r.Decode(&storage); err != nil { + return nil, fmt.Errorf("load diff storage: %v", err) + } + storageData := make(map[common.Hash]map[common.Hash][]byte) + for _, entry := range storage { + slots := make(map[common.Hash][]byte) + for i, key := range entry.Keys { + slots[key] = entry.Vals[i] + } + storageData[entry.Hash] = slots + } + return loadDiffLayer(newDiffLayer(parent, root, accountData, storageData), r) +} + +// Journal is the internal version of Journal that also returns the journal file +// so subsequent layers know where to write to. +func (dl *diskLayer) Journal(path string) (io.WriteCloser, common.Hash, error) { + // If the snapshot is currenty being generated, abort it + var stats *generatorStats + if dl.genAbort != nil { + abort := make(chan *generatorStats) + dl.genAbort <- abort + + if stats = <-abort; stats != nil { + stats.Log("Journalling in-progress snapshot", dl.genMarker) + } + } + // Ensure the layer didn't get stale + dl.lock.RLock() + defer dl.lock.RUnlock() + + if dl.stale { + return nil, common.Hash{}, ErrSnapshotStale + } + // We've reached the bottom, open the journal + file, err := os.Create(path) + if err != nil { + return nil, common.Hash{}, err + } + // Write out the generator marker + entry := journalGenerator{ + Done: dl.genMarker == nil, + Marker: dl.genMarker, + } + if stats != nil { + entry.Wiping = (stats.wiping != nil) + entry.Accounts = stats.accounts + entry.Slots = stats.slots + entry.Storage = uint64(stats.storage) + } + if err := rlp.Encode(file, entry); err != nil { + file.Close() + return nil, common.Hash{}, err + } + return file, dl.root, nil +} + +// Journal is the internal version of Journal that also returns the journal file +// so subsequent layers know where to write to. +func (dl *diffLayer) Journal(path string) (io.WriteCloser, common.Hash, error) { + // Journal the parent first + writer, base, err := dl.parent.Journal(path) + if err != nil { + return nil, common.Hash{}, err + } + // Ensure the layer didn't get stale + dl.lock.RLock() + defer dl.lock.RUnlock() + + if dl.stale { + writer.Close() + return nil, common.Hash{}, ErrSnapshotStale + } + // Everything below was journalled, persist this layer too + buf := bufio.NewWriter(writer) + if err := rlp.Encode(buf, dl.root); err != nil { + buf.Flush() + writer.Close() + return nil, common.Hash{}, err + } + accounts := make([]journalAccount, 0, len(dl.accountData)) + for hash, blob := range dl.accountData { + accounts = append(accounts, journalAccount{Hash: hash, Blob: blob}) + } + if err := rlp.Encode(buf, accounts); err != nil { + buf.Flush() + writer.Close() + return nil, common.Hash{}, err + } + storage := make([]journalStorage, 0, len(dl.storageData)) + for hash, slots := range dl.storageData { + keys := make([]common.Hash, 0, len(slots)) + vals := make([][]byte, 0, len(slots)) + for key, val := range slots { + keys = append(keys, key) + vals = append(vals, val) + } + storage = append(storage, journalStorage{Hash: hash, Keys: keys, Vals: vals}) + } + if err := rlp.Encode(buf, storage); err != nil { + buf.Flush() + writer.Close() + return nil, common.Hash{}, err + } + buf.Flush() + return writer, base, nil +} diff --git a/core/state/snapshot/snapshot.go b/core/state/snapshot/snapshot.go index d35d69839..744d56c1b 100644 --- a/core/state/snapshot/snapshot.go +++ b/core/state/snapshot/snapshot.go @@ -18,31 +18,67 @@ package snapshot import ( + "bytes" "errors" "fmt" - "os" + "io" "sync" - "github.com/VictoriaMetrics/fastcache" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" ) var ( - snapshotCleanHitMeter = metrics.NewRegisteredMeter("state/snapshot/clean/hit", nil) - snapshotCleanMissMeter = metrics.NewRegisteredMeter("state/snapshot/clean/miss", nil) - snapshotCleanReadMeter = metrics.NewRegisteredMeter("state/snapshot/clean/read", nil) - snapshotCleanWriteMeter = metrics.NewRegisteredMeter("state/snapshot/clean/write", nil) + snapshotCleanAccountHitMeter = metrics.NewRegisteredMeter("state/snapshot/clean/account/hit", nil) + snapshotCleanAccountMissMeter = metrics.NewRegisteredMeter("state/snapshot/clean/account/miss", nil) + snapshotCleanAccountReadMeter = metrics.NewRegisteredMeter("state/snapshot/clean/account/read", nil) + snapshotCleanAccountWriteMeter = metrics.NewRegisteredMeter("state/snapshot/clean/account/write", nil) + + snapshotCleanStorageHitMeter = metrics.NewRegisteredMeter("state/snapshot/clean/storage/hit", nil) + snapshotCleanStorageMissMeter = metrics.NewRegisteredMeter("state/snapshot/clean/storage/miss", nil) + snapshotCleanStorageReadMeter = metrics.NewRegisteredMeter("state/snapshot/clean/storage/read", nil) + snapshotCleanStorageWriteMeter = metrics.NewRegisteredMeter("state/snapshot/clean/storage/write", nil) + + snapshotDirtyAccountHitMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/account/hit", nil) + snapshotDirtyAccountMissMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/account/miss", nil) + snapshotDirtyAccountReadMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/account/read", nil) + snapshotDirtyAccountWriteMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/account/write", nil) + + snapshotDirtyStorageHitMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/storage/hit", nil) + snapshotDirtyStorageMissMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/storage/miss", nil) + snapshotDirtyStorageReadMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/storage/read", nil) + snapshotDirtyStorageWriteMeter = metrics.NewRegisteredMeter("state/snapshot/dirty/storage/write", nil) + + snapshotFlushAccountItemMeter = metrics.NewRegisteredMeter("state/snapshot/flush/account/item", nil) + snapshotFlushAccountSizeMeter = metrics.NewRegisteredMeter("state/snapshot/flush/account/size", nil) + snapshotFlushStorageItemMeter = metrics.NewRegisteredMeter("state/snapshot/flush/storage/item", nil) + snapshotFlushStorageSizeMeter = metrics.NewRegisteredMeter("state/snapshot/flush/storage/size", nil) + + snapshotBloomIndexTimer = metrics.NewRegisteredResettingTimer("state/snapshot/bloom/index", nil) + snapshotBloomErrorGauge = metrics.NewRegisteredGaugeFloat64("state/snapshot/bloom/error", nil) + + snapshotBloomAccountTrueHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/account/truehit", nil) + snapshotBloomAccountFalseHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/account/falsehit", nil) + snapshotBloomAccountMissMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/account/miss", nil) + + snapshotBloomStorageTrueHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/storage/truehit", nil) + snapshotBloomStorageFalseHitMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/storage/falsehit", nil) + snapshotBloomStorageMissMeter = metrics.NewRegisteredMeter("state/snapshot/bloom/storage/miss", nil) // ErrSnapshotStale is returned from data accessors if the underlying snapshot // layer had been invalidated due to the chain progressing forward far enough // to not maintain the layer's original state. ErrSnapshotStale = errors.New("snapshot stale") + // ErrNotCoveredYet is returned from data accessors if the underlying snapshot + // is being generated currently and the requested data item is not yet in the + // range of accounts covered. + ErrNotCoveredYet = errors.New("not covered yet") + // errSnapshotCycle is returned if a snapshot is attempted to be inserted // that forms a cycle in the snapshot tree. errSnapshotCycle = errors.New("snapshot cycle") @@ -79,7 +115,7 @@ type snapshot interface { // Journal commits an entire diff hierarchy to disk into a single journal file. // This is meant to be used during shutdown to persist the snapshot without // flattening everything down (bad for reorgs). - Journal() error + Journal(path string) (io.WriteCloser, common.Hash, error) // Stale return whether this layer has become stale (was flattened across) or // if it's still live. @@ -96,7 +132,10 @@ type snapshot interface { // storage data to avoid expensive multi-level trie lookups; and to allow sorted, // cheap iteration of the account/storage tries for sync aid. type Tree struct { - layers map[common.Hash]snapshot // Collection of all known layers // TODO(karalabe): split Clique overlaps + diskdb ethdb.KeyValueStore // Persistent database to store the snapshot + triedb *trie.Database // In-memory cache to access the trie through + cache int // Megabytes permitted to use for read caches + layers map[common.Hash]snapshot // Collection of all known layers lock sync.RWMutex } @@ -105,20 +144,24 @@ type Tree struct { // of the snapshot matches the expected one. // // If the snapshot is missing or inconsistent, the entirety is deleted and will -// be reconstructed from scratch based on the tries in the key-value store. -func New(db ethdb.KeyValueStore, journal string, root common.Hash) (*Tree, error) { - // Attempt to load a previously persisted snapshot - head, err := loadSnapshot(db, journal, root) - if err != nil { - log.Warn("Failed to load snapshot, regenerating", "err", err) - if head, err = generateSnapshot(db, journal, root); err != nil { - return nil, err - } - } - // Existing snapshot loaded or one regenerated, seed all the layers +// be reconstructed from scratch based on the tries in the key-value store, on a +// background thread. +func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, journal string, cache int, root common.Hash) *Tree { + // Create a new, empty snapshot tree snap := &Tree{ + diskdb: diskdb, + triedb: triedb, + cache: cache, layers: make(map[common.Hash]snapshot), } + // Attempt to load a previously persisted snapshot and rebuild one if failed + head, err := loadSnapshot(diskdb, triedb, journal, cache, root) + if err != nil { + log.Warn("Failed to load snapshot, regenerating", "err", err) + snap.Rebuild(root) + return snap + } + // Existing snapshot loaded, seed all the layers for head != nil { snap.layers[head.Root()] = head @@ -131,7 +174,7 @@ func New(db ethdb.KeyValueStore, journal string, root common.Hash) (*Tree, error panic(fmt.Sprintf("unknown data layer: %T", self)) } } - return snap, nil + return snap } // Snapshot retrieves a snapshot belonging to the given block root, or nil if no @@ -173,7 +216,7 @@ func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, accounts ma // Cap traverses downwards the snapshot tree from a head block hash until the // number of allowed layers are crossed. All layers beyond the permitted number // are flattened downwards. -func (t *Tree) Cap(root common.Hash, layers int, memory uint64) error { +func (t *Tree) Cap(root common.Hash, layers int) error { // Retrieve the head snapshot to cap from snap := t.Snapshot(root) if snap == nil { @@ -190,6 +233,8 @@ func (t *Tree) Cap(root common.Hash, layers int, memory uint64) error { // Flattening the bottom-most diff layer requires special casing since there's // no child to rewire to the grandparent. In that case we can fake a temporary // child for the capping and then remove it. + var persisted *diskLayer + switch layers { case 0: // If full commit was requested, flatten the diffs and merge onto disk @@ -210,7 +255,7 @@ func (t *Tree) Cap(root common.Hash, layers int, memory uint64) error { ) diff.lock.RLock() bottom = diff.flatten().(*diffLayer) - if bottom.memory >= memory { + if bottom.memory >= aggregatorMemoryLimit { base = diffToDisk(bottom) } diff.lock.RUnlock() @@ -225,7 +270,7 @@ func (t *Tree) Cap(root common.Hash, layers int, memory uint64) error { default: // Many layers requested to be retained, cap normally - t.cap(diff, layers, memory) + persisted = t.cap(diff, layers) } // Remove any layer that is stale or links into a stale layer children := make(map[common.Hash][]common.Hash) @@ -248,13 +293,28 @@ func (t *Tree) Cap(root common.Hash, layers int, memory uint64) error { remove(root) } } + // If the disk layer was modified, regenerate all the cummulative blooms + if persisted != nil { + var rebloom func(root common.Hash) + rebloom = func(root common.Hash) { + if diff, ok := t.layers[root].(*diffLayer); ok { + diff.rebloom(persisted) + } + for _, child := range children[root] { + rebloom(child) + } + } + rebloom(persisted.root) + } return nil } // cap traverses downwards the diff tree until the number of allowed layers are // crossed. All diffs beyond the permitted number are flattened downwards. If the // layer limit is reached, memory cap is also enforced (but not before). -func (t *Tree) cap(diff *diffLayer, layers int, memory uint64) { +// +// The method returns the new disk layer if diffs were persistend into it. +func (t *Tree) cap(diff *diffLayer, layers int) *diskLayer { // Dive until we run out of layers or reach the persistent database for ; layers > 2; layers-- { // If we still have diff layers below, continue down @@ -262,14 +322,14 @@ func (t *Tree) cap(diff *diffLayer, layers int, memory uint64) { diff = parent } else { // Diff stack too shallow, return without modifications - return + return nil } } // We're out of layers, flatten anything below, stopping if it's the disk or if // the memory limit is not yet exceeded. switch parent := diff.parent.(type) { case *diskLayer: - return + return nil case *diffLayer: // Flatten the parent into the grandparent. The flattening internally obtains a @@ -281,8 +341,14 @@ func (t *Tree) cap(diff *diffLayer, layers int, memory uint64) { defer diff.lock.Unlock() diff.parent = flattened - if flattened.memory < memory { - return + if flattened.memory < aggregatorMemoryLimit { + // Accumulator layer is smaller than the limit, so we can abort, unless + // there's a snapshot being generated currently. In that case, the trie + // will move fron underneath the generator so we **must** merge all the + // partial data down into the snapshot and restart the generation. + if flattened.parent.(*diskLayer).genAbort == nil { + return nil + } } default: panic(fmt.Sprintf("unknown data layer: %T", parent)) @@ -296,6 +362,7 @@ func (t *Tree) cap(diff *diffLayer, layers int, memory uint64) { t.layers[base.root] = base diff.parent = base + return base } // diffToDisk merges a bottom-most diff into the persistent disk layer underneath @@ -303,8 +370,15 @@ func (t *Tree) cap(diff *diffLayer, layers int, memory uint64) { func diffToDisk(bottom *diffLayer) *diskLayer { var ( base = bottom.parent.(*diskLayer) - batch = base.db.NewBatch() + batch = base.diskdb.NewBatch() + stats *generatorStats ) + // If the disk layer is running a snapshot generator, abort it + if base.genAbort != nil { + abort := make(chan *generatorStats) + base.genAbort <- abort + stats = <-abort + } // Start by temporarily deleting the current snapshot block marker. This // ensures that in the case of a crash, the entire snapshot is invalidated. rawdb.DeleteSnapshotRoot(batch) @@ -319,6 +393,10 @@ func diffToDisk(bottom *diffLayer) *diskLayer { // Push all the accounts into the database for hash, data := range bottom.accountData { + // Skip any account not covered yet by the snapshot + if base.genMarker != nil && bytes.Compare(hash[:], base.genMarker) > 0 { + continue + } if len(data) > 0 { // Account was updated, push to disk rawdb.WriteAccountSnapshot(batch, hash, data) @@ -335,19 +413,35 @@ func diffToDisk(bottom *diffLayer) *diskLayer { rawdb.DeleteAccountSnapshot(batch, hash) base.cache.Set(hash[:], nil) - it := rawdb.IterateStorageSnapshots(base.db, hash) + it := rawdb.IterateStorageSnapshots(base.diskdb, hash) for it.Next() { if key := it.Key(); len(key) == 65 { // TODO(karalabe): Yuck, we should move this into the iterator batch.Delete(key) base.cache.Del(key[1:]) + + snapshotFlushStorageItemMeter.Mark(1) + snapshotFlushStorageSizeMeter.Mark(int64(len(data))) } } it.Release() } + snapshotFlushAccountItemMeter.Mark(1) + snapshotFlushAccountSizeMeter.Mark(int64(len(data))) } // Push all the storage slots into the database for accountHash, storage := range bottom.storageData { + // Skip any account not covered yet by the snapshot + if base.genMarker != nil && bytes.Compare(accountHash[:], base.genMarker) > 0 { + continue + } + // Generation might be mid-account, track that case too + midAccount := base.genMarker != nil && bytes.Equal(accountHash[:], base.genMarker[:common.HashLength]) + for storageHash, data := range storage { + // Skip any slot not covered yet by the snapshot + if midAccount && bytes.Compare(storageHash[:], base.genMarker[common.HashLength:]) > 0 { + continue + } if len(data) > 0 { rawdb.WriteStorageSnapshot(batch, accountHash, storageHash, data) base.cache.Set(append(accountHash[:], storageHash[:]...), data) @@ -355,6 +449,8 @@ func diffToDisk(bottom *diffLayer) *diskLayer { rawdb.DeleteStorageSnapshot(batch, accountHash, storageHash) base.cache.Set(append(accountHash[:], storageHash[:]...), nil) } + snapshotFlushStorageItemMeter.Mark(1) + snapshotFlushStorageSizeMeter.Mark(int64(len(data))) } if batch.ValueSize() > ethdb.IdealBatchSize { if err := batch.Write(); err != nil { @@ -368,65 +464,91 @@ func diffToDisk(bottom *diffLayer) *diskLayer { if err := batch.Write(); err != nil { log.Crit("Failed to write leftover snapshot", "err", err) } - return &diskLayer{ - root: bottom.root, - cache: base.cache, - db: base.db, - journal: base.journal, + res := &diskLayer{ + root: bottom.root, + cache: base.cache, + diskdb: base.diskdb, + triedb: base.triedb, + genMarker: base.genMarker, } + // If snapshot generation hasn't finished yet, port over all the starts and + // continue where the previous round left off. + // + // Note, the `base.genAbort` comparison is not used normally, it's checked + // to allow the tests to play with the marker without triggering this path. + if base.genMarker != nil && base.genAbort != nil { + res.genMarker = base.genMarker + res.genAbort = make(chan chan *generatorStats) + go res.generate(stats) + } + return res } // Journal commits an entire diff hierarchy to disk into a single journal file. // This is meant to be used during shutdown to persist the snapshot without // flattening everything down (bad for reorgs). -func (t *Tree) Journal(blockRoot common.Hash) error { +// +// The method returns the root hash of the base layer that needs to be persisted +// to disk as a trie too to allow continuing any pending generation op. +func (t *Tree) Journal(root common.Hash, path string) (common.Hash, error) { // Retrieve the head snapshot to journal from var snap snapshot - snap := t.Snapshot(blockRoot) + snap := t.Snapshot(root) if snap == nil { - return fmt.Errorf("snapshot [%#x] missing", blockRoot) + return common.Hash{}, fmt.Errorf("snapshot [%#x] missing", root) } // Run the journaling t.lock.Lock() defer t.lock.Unlock() - return snap.(snapshot).Journal() + writer, base, err := snap.(snapshot).Journal(path) + if err != nil { + return common.Hash{}, err + } + return base, writer.Close() } -// loadSnapshot loads a pre-existing state snapshot backed by a key-value store. -func loadSnapshot(db ethdb.KeyValueStore, journal string, root common.Hash) (snapshot, error) { - // Retrieve the block number and hash of the snapshot, failing if no snapshot - // is present in the database (or crashed mid-update). - baseRoot := rawdb.ReadSnapshotRoot(db) - if baseRoot == (common.Hash{}) { - return nil, errors.New("missing or corrupted snapshot") - } - base := &diskLayer{ - journal: journal, - db: db, - cache: fastcache.New(512 * 1024 * 1024), - root: baseRoot, - } - // Load all the snapshot diffs from the journal, failing if their chain is broken - // or does not lead from the disk snapshot to the specified head. - if _, err := os.Stat(journal); os.IsNotExist(err) { - // Journal doesn't exist, don't worry if it's not supposed to - if baseRoot != root { - return nil, fmt.Errorf("snapshot journal missing, head doesn't match snapshot: have %#x, want %#x", baseRoot, root) +// Rebuild wipes all available snapshot data from the persistent database and +// discard all caches and diff layers. Afterwards, it starts a new snapshot +// generator with the given root hash. +func (t *Tree) Rebuild(root common.Hash) { + t.lock.Lock() + defer t.lock.Unlock() + + // Track whether there's a wipe currently running and keep it alive if so + var wiper chan struct{} + + // Iterate over and mark all layers stale + for _, layer := range t.layers { + switch layer := layer.(type) { + case *diskLayer: + // If the base layer is generating, abort it and save + if layer.genAbort != nil { + abort := make(chan *generatorStats) + layer.genAbort <- abort + + if stats := <-abort; stats != nil { + wiper = stats.wiping + } + } + // Layer should be inactive now, mark it as stale + layer.lock.Lock() + layer.stale = true + layer.lock.Unlock() + + case *diffLayer: + // If the layer is a simple diff, simply mark as stale + layer.lock.Lock() + layer.stale = true + layer.lock.Unlock() + + default: + panic(fmt.Sprintf("unknown layer type: %T", layer)) } - return base, nil } - file, err := os.Open(journal) - if err != nil { - return nil, err + // Start generating a new snapshot from scratch on a backgroung thread. The + // generator will run a wiper first if there's not one running right now. + log.Info("Rebuilding state snapshot") + t.layers = map[common.Hash]snapshot{ + root: generateSnapshot(t.diskdb, t.triedb, t.cache, root, wiper), } - snapshot, err := loadDiffLayer(base, rlp.NewStream(file, 0)) - if err != nil { - return nil, err - } - // Entire snapshot journal loaded, sanity check the head and return - // Journal doesn't exist, don't worry if it's not supposed to - if head := snapshot.Root(); head != root { - return nil, fmt.Errorf("head doesn't match snapshot: have %#x, want %#x", head, root) - } - return snapshot, nil } diff --git a/core/state/snapshot/snapshot_test.go b/core/state/snapshot/snapshot_test.go index 9c872a895..44b8f3cef 100644 --- a/core/state/snapshot/snapshot_test.go +++ b/core/state/snapshot/snapshot_test.go @@ -31,9 +31,9 @@ import ( func TestDiskLayerExternalInvalidationFullFlatten(t *testing.T) { // Create an empty base layer and a snapshot tree out of it base := &diskLayer{ - db: rawdb.NewMemoryDatabase(), - root: common.HexToHash("0x01"), - cache: fastcache.New(1024 * 500), + diskdb: rawdb.NewMemoryDatabase(), + root: common.HexToHash("0x01"), + cache: fastcache.New(1024 * 500), } snaps := &Tree{ layers: map[common.Hash]snapshot{ @@ -54,7 +54,7 @@ func TestDiskLayerExternalInvalidationFullFlatten(t *testing.T) { t.Errorf("pre-cap layer count mismatch: have %d, want %d", n, 2) } // Commit the diff layer onto the disk and ensure it's persisted - if err := snaps.Cap(common.HexToHash("0x02"), 0, 0); err != nil { + if err := snaps.Cap(common.HexToHash("0x02"), 0); err != nil { t.Fatalf("failed to merge diff layer onto disk: %v", err) } // Since the base layer was modified, ensure that data retrievald on the external reference fail @@ -76,9 +76,9 @@ func TestDiskLayerExternalInvalidationFullFlatten(t *testing.T) { func TestDiskLayerExternalInvalidationPartialFlatten(t *testing.T) { // Create an empty base layer and a snapshot tree out of it base := &diskLayer{ - db: rawdb.NewMemoryDatabase(), - root: common.HexToHash("0x01"), - cache: fastcache.New(1024 * 500), + diskdb: rawdb.NewMemoryDatabase(), + root: common.HexToHash("0x01"), + cache: fastcache.New(1024 * 500), } snaps := &Tree{ layers: map[common.Hash]snapshot{ @@ -102,7 +102,10 @@ func TestDiskLayerExternalInvalidationPartialFlatten(t *testing.T) { t.Errorf("pre-cap layer count mismatch: have %d, want %d", n, 3) } // Commit the diff layer onto the disk and ensure it's persisted - if err := snaps.Cap(common.HexToHash("0x03"), 2, 0); err != nil { + defer func(memcap uint64) { aggregatorMemoryLimit = memcap }(aggregatorMemoryLimit) + aggregatorMemoryLimit = 0 + + if err := snaps.Cap(common.HexToHash("0x03"), 2); err != nil { t.Fatalf("failed to merge diff layer onto disk: %v", err) } // Since the base layer was modified, ensure that data retrievald on the external reference fail @@ -124,9 +127,9 @@ func TestDiskLayerExternalInvalidationPartialFlatten(t *testing.T) { func TestDiffLayerExternalInvalidationFullFlatten(t *testing.T) { // Create an empty base layer and a snapshot tree out of it base := &diskLayer{ - db: rawdb.NewMemoryDatabase(), - root: common.HexToHash("0x01"), - cache: fastcache.New(1024 * 500), + diskdb: rawdb.NewMemoryDatabase(), + root: common.HexToHash("0x01"), + cache: fastcache.New(1024 * 500), } snaps := &Tree{ layers: map[common.Hash]snapshot{ @@ -150,7 +153,7 @@ func TestDiffLayerExternalInvalidationFullFlatten(t *testing.T) { ref := snaps.Snapshot(common.HexToHash("0x02")) // Flatten the diff layer into the bottom accumulator - if err := snaps.Cap(common.HexToHash("0x03"), 1, 1024*1024); err != nil { + if err := snaps.Cap(common.HexToHash("0x03"), 1); err != nil { t.Fatalf("failed to flatten diff layer into accumulator: %v", err) } // Since the accumulator diff layer was modified, ensure that data retrievald on the external reference fail @@ -172,9 +175,9 @@ func TestDiffLayerExternalInvalidationFullFlatten(t *testing.T) { func TestDiffLayerExternalInvalidationPartialFlatten(t *testing.T) { // Create an empty base layer and a snapshot tree out of it base := &diskLayer{ - db: rawdb.NewMemoryDatabase(), - root: common.HexToHash("0x01"), - cache: fastcache.New(1024 * 500), + diskdb: rawdb.NewMemoryDatabase(), + root: common.HexToHash("0x01"), + cache: fastcache.New(1024 * 500), } snaps := &Tree{ layers: map[common.Hash]snapshot{ @@ -202,14 +205,14 @@ func TestDiffLayerExternalInvalidationPartialFlatten(t *testing.T) { // Doing a Cap operation with many allowed layers should be a no-op exp := len(snaps.layers) - if err := snaps.Cap(common.HexToHash("0x04"), 2000, 1024*1024); err != nil { + if err := snaps.Cap(common.HexToHash("0x04"), 2000); err != nil { t.Fatalf("failed to flatten diff layer into accumulator: %v", err) } if got := len(snaps.layers); got != exp { t.Errorf("layers modified, got %d exp %d", got, exp) } // Flatten the diff layer into the bottom accumulator - if err := snaps.Cap(common.HexToHash("0x04"), 2, 1024*1024); err != nil { + if err := snaps.Cap(common.HexToHash("0x04"), 2); err != nil { t.Fatalf("failed to flatten diff layer into accumulator: %v", err) } // Since the accumulator diff layer was modified, ensure that data retrievald on the external reference fail @@ -236,9 +239,9 @@ func TestPostCapBasicDataAccess(t *testing.T) { } // Create a starting base layer and a snapshot tree out of it base := &diskLayer{ - db: rawdb.NewMemoryDatabase(), - root: common.HexToHash("0x01"), - cache: fastcache.New(1024 * 500), + diskdb: rawdb.NewMemoryDatabase(), + root: common.HexToHash("0x01"), + cache: fastcache.New(1024 * 500), } snaps := &Tree{ layers: map[common.Hash]snapshot{ @@ -280,11 +283,11 @@ func TestPostCapBasicDataAccess(t *testing.T) { t.Error(err) } // Cap to a bad root should fail - if err := snaps.Cap(common.HexToHash("0x1337"), 0, 1024); err == nil { + if err := snaps.Cap(common.HexToHash("0x1337"), 0); err == nil { t.Errorf("expected error, got none") } // Now, merge the a-chain - snaps.Cap(common.HexToHash("0xa3"), 0, 1024) + snaps.Cap(common.HexToHash("0xa3"), 0) // At this point, a2 got merged into a1. Thus, a1 is now modified, and as a1 is // the parent of b2, b2 should no longer be able to iterate into parent. @@ -308,7 +311,7 @@ func TestPostCapBasicDataAccess(t *testing.T) { } // Now, merge it again, just for fun. It should now error, since a3 // is a disk layer - if err := snaps.Cap(common.HexToHash("0xa3"), 0, 1024); err == nil { + if err := snaps.Cap(common.HexToHash("0xa3"), 0); err == nil { t.Error("expected error capping the disk layer, got none") } } diff --git a/core/state/snapshot/wipe.go b/core/state/snapshot/wipe.go new file mode 100644 index 000000000..052af6f1f --- /dev/null +++ b/core/state/snapshot/wipe.go @@ -0,0 +1,130 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package snapshot + +import ( + "bytes" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +// wipeSnapshot starts a goroutine to iterate over the entire key-value database +// and delete all the data associated with the snapshot (accounts, storage, +// metadata). After all is done, the snapshot range of the database is compacted +// to free up unused data blocks. +func wipeSnapshot(db ethdb.KeyValueStore, full bool) chan struct{} { + // Wipe the snapshot root marker synchronously + if full { + rawdb.DeleteSnapshotRoot(db) + } + // Wipe everything else asynchronously + wiper := make(chan struct{}, 1) + go func() { + if err := wipeContent(db); err != nil { + log.Error("Failed to wipe state snapshot", "err", err) // Database close will trigger this + return + } + close(wiper) + }() + return wiper +} + +// wipeContent iterates over the entire key-value database and deletes all the +// data associated with the snapshot (accounts, storage), but not the root hash +// as the wiper is meant to run on a background thread but the root needs to be +// removed in sync to avoid data races. After all is done, the snapshot range of +// the database is compacted to free up unused data blocks. +func wipeContent(db ethdb.KeyValueStore) error { + if err := wipeKeyRange(db, "accounts", rawdb.SnapshotAccountPrefix, len(rawdb.SnapshotAccountPrefix)+common.HashLength); err != nil { + return err + } + if err := wipeKeyRange(db, "storage", rawdb.SnapshotStoragePrefix, len(rawdb.SnapshotStoragePrefix)+2*common.HashLength); err != nil { + return err + } + // Compact the snapshot section of the database to get rid of unused space + start := time.Now() + + log.Info("Compacting snapshot account area ") + end := common.CopyBytes(rawdb.SnapshotAccountPrefix) + end[len(end)-1]++ + + if err := db.Compact(rawdb.SnapshotAccountPrefix, end); err != nil { + return err + } + log.Info("Compacting snapshot storage area ") + end = common.CopyBytes(rawdb.SnapshotStoragePrefix) + end[len(end)-1]++ + + if err := db.Compact(rawdb.SnapshotStoragePrefix, end); err != nil { + return err + } + log.Info("Compacted snapshot area in database", "elapsed", common.PrettyDuration(time.Since(start))) + + return nil +} + +// wipeKeyRange deletes a range of keys from the database starting with prefix +// and having a specific total key length. +func wipeKeyRange(db ethdb.KeyValueStore, kind string, prefix []byte, keylen int) error { + // Batch deletions together to avoid holding an iterator for too long + var ( + batch = db.NewBatch() + items int + ) + // Iterate over the key-range and delete all of them + start, logged := time.Now(), time.Now() + + it := db.NewIteratorWithStart(prefix) + for it.Next() { + // Skip any keys with the correct prefix but wrong lenth (trie nodes) + key := it.Key() + if !bytes.HasPrefix(key, prefix) { + break + } + if len(key) != keylen { + continue + } + // Delete the key and periodically recreate the batch and iterator + batch.Delete(key) + items++ + + if items%10000 == 0 { + // Batch too large (or iterator too long lived, flush and recreate) + it.Release() + if err := batch.Write(); err != nil { + return err + } + batch.Reset() + it = db.NewIteratorWithStart(key) + + if time.Since(logged) > 8*time.Second { + log.Info("Deleting state snapshot leftovers", "kind", kind, "wiped", items, "elapsed", common.PrettyDuration(time.Since(start))) + logged = time.Now() + } + } + } + it.Release() + if err := batch.Write(); err != nil { + return err + } + log.Info("Deleted state snapshot leftovers", "kind", kind, "wiped", items, "elapsed", common.PrettyDuration(time.Since(start))) + return nil +} diff --git a/core/state/snapshot/generate_test.go b/core/state/snapshot/wipe_test.go similarity index 77% rename from core/state/snapshot/generate_test.go rename to core/state/snapshot/wipe_test.go index 180db920a..f12769a95 100644 --- a/core/state/snapshot/generate_test.go +++ b/core/state/snapshot/wipe_test.go @@ -59,17 +59,31 @@ func TestWipe(t *testing.T) { // Randomize the suffix, dedup and inject it under the snapshot namespace keysuffix := make([]byte, keysize) rand.Read(keysuffix) - db.Put(append(rawdb.StateSnapshotPrefix, keysuffix...), randomHash().Bytes()) + + if rand.Int31n(2) == 0 { + db.Put(append(rawdb.SnapshotAccountPrefix, keysuffix...), randomHash().Bytes()) + } else { + db.Put(append(rawdb.SnapshotStoragePrefix, keysuffix...), randomHash().Bytes()) + } } // Sanity check that all the keys are present var items int - it := db.NewIteratorWithPrefix(rawdb.StateSnapshotPrefix) + it := db.NewIteratorWithPrefix(rawdb.SnapshotAccountPrefix) defer it.Release() for it.Next() { key := it.Key() - if len(key) == len(rawdb.StateSnapshotPrefix)+32 || len(key) == len(rawdb.StateSnapshotPrefix)+64 { + if len(key) == len(rawdb.SnapshotAccountPrefix)+common.HashLength { + items++ + } + } + it = db.NewIteratorWithPrefix(rawdb.SnapshotStoragePrefix) + defer it.Release() + + for it.Next() { + key := it.Key() + if len(key) == len(rawdb.SnapshotStoragePrefix)+2*common.HashLength { items++ } } @@ -80,16 +94,24 @@ func TestWipe(t *testing.T) { t.Errorf("snapshot block marker mismatch: have %#x, want ", hash) } // Wipe all snapshot entries from the database - if err := wipeSnapshot(db); err != nil { - t.Fatalf("failed to wipe snapshot: %v", err) - } + <-wipeSnapshot(db, true) + // Iterate over the database end ensure no snapshot information remains - it = db.NewIteratorWithPrefix(rawdb.StateSnapshotPrefix) + it = db.NewIteratorWithPrefix(rawdb.SnapshotAccountPrefix) defer it.Release() for it.Next() { key := it.Key() - if len(key) == len(rawdb.StateSnapshotPrefix)+32 || len(key) == len(rawdb.StateSnapshotPrefix)+64 { + if len(key) == len(rawdb.SnapshotAccountPrefix)+common.HashLength { + t.Errorf("snapshot entry remained after wipe: %x", key) + } + } + it = db.NewIteratorWithPrefix(rawdb.SnapshotStoragePrefix) + defer it.Release() + + for it.Next() { + key := it.Key() + if len(key) == len(rawdb.SnapshotStoragePrefix)+2*common.HashLength { t.Errorf("snapshot entry remained after wipe: %x", key) } } diff --git a/core/state/statedb.go b/core/state/statedb.go index f11bd2adb..1528b45aa 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -845,8 +845,8 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) { if err := s.snaps.Update(root, parent, s.snapAccounts, s.snapStorage); err != nil { log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err) } - if err := s.snaps.Cap(root, 16, 4*1024*1024); err != nil { - log.Warn("Failed to cap snapshot tree", "root", root, "layers", 16, "memory", 4*1024*1024, "err", err) + if err := s.snaps.Cap(root, 128); err != nil { + log.Warn("Failed to cap snapshot tree", "root", root, "layers", 128, "err", err) } } s.snap, s.snapAccounts, s.snapStorage = nil, nil, nil diff --git a/eth/backend.go b/eth/backend.go index bda307d95..ed79340f5 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -127,7 +127,8 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { config.Miner.GasPrice = new(big.Int).Set(DefaultConfig.Miner.GasPrice) } if config.NoPruning && config.TrieDirtyCache > 0 { - config.TrieCleanCache += config.TrieDirtyCache + config.TrieCleanCache += config.TrieDirtyCache * 3 / 5 + config.SnapshotCache += config.TrieDirtyCache * 3 / 5 config.TrieDirtyCache = 0 } log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024) @@ -184,6 +185,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { TrieDirtyLimit: config.TrieDirtyCache, TrieDirtyDisabled: config.NoPruning, TrieTimeLimit: config.TrieTimeout, + SnapshotLimit: config.SnapshotCache, } ) eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve) @@ -204,7 +206,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain) // Permit the downloader to use the trie cache allowance during fast sync - cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit checkpoint := config.Checkpoint if checkpoint == nil { checkpoint = params.TrustedCheckpoints[genesisHash] diff --git a/eth/config.go b/eth/config.go index 2eaf21fbc..160ce8aa5 100644 --- a/eth/config.go +++ b/eth/config.go @@ -50,6 +50,7 @@ var DefaultConfig = Config{ TrieCleanCache: 256, TrieDirtyCache: 256, TrieTimeout: 60 * time.Minute, + SnapshotCache: 256, Miner: miner.Config{ GasFloor: 8000000, GasCeil: 8000000, @@ -125,6 +126,7 @@ type Config struct { TrieCleanCache int TrieDirtyCache int TrieTimeout time.Duration + SnapshotCache int // Mining options Miner miner.Config diff --git a/trie/iterator.go b/trie/iterator.go index 88189c542..bb4025d8f 100644 --- a/trie/iterator.go +++ b/trie/iterator.go @@ -29,7 +29,6 @@ import ( type Iterator struct { nodeIt NodeIterator - Nodes int // Number of nodes iterated over Key []byte // Current data key on which the iterator is positioned on Value []byte // Current data value on which the iterator is positioned on Err error @@ -47,7 +46,6 @@ func NewIterator(it NodeIterator) *Iterator { // Next moves the iterator forward one key-value entry. func (it *Iterator) Next() bool { for it.nodeIt.Next(true) { - it.Nodes++ if it.nodeIt.Leaf() { it.Key = it.nodeIt.LeafKey() it.Value = it.nodeIt.LeafBlob()