From 745757ac6bb10c296ab30874ddde774f4fcdec1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 29 Apr 2021 17:33:45 +0300 Subject: [PATCH] core, eth: abort snapshot generation on snap sync and resume later --- core/blockchain.go | 3 +- core/rawdb/accessors_snapshot.go | 20 +++++++++ core/rawdb/database.go | 6 +-- core/rawdb/schema.go | 3 ++ core/state/snapshot/generate.go | 10 ----- core/state/snapshot/journal.go | 15 ++++--- core/state/snapshot/snapshot.go | 68 +++++++++++++++++++++++++++---- eth/downloader/downloader.go | 10 +++++ eth/downloader/downloader_test.go | 6 +++ eth/protocols/snap/sync.go | 5 --- 10 files changed, 115 insertions(+), 31 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 49aa1c3e8..c3562902d 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -640,7 +640,8 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error { headBlockGauge.Update(int64(block.NumberU64())) bc.chainmu.Unlock() - // Destroy any existing state snapshot and regenerate it in the background + // Destroy any existing state snapshot and regenerate it in the background, + // also resuming the normal maintenance of any previously paused snapshot. if bc.snaps != nil { bc.snaps.Rebuild(block.Root()) } diff --git a/core/rawdb/accessors_snapshot.go b/core/rawdb/accessors_snapshot.go index c3616ba3a..88446e079 100644 --- a/core/rawdb/accessors_snapshot.go +++ b/core/rawdb/accessors_snapshot.go @@ -24,6 +24,26 @@ import ( "github.com/ethereum/go-ethereum/log" ) +// ReadSnapshotDisabled retrieves if the snapshot maintenance is disabled. +func ReadSnapshotDisabled(db ethdb.KeyValueReader) bool { + disabled, _ := db.Has(snapshotDisabledKey) + return disabled +} + +// WriteSnapshotDisabled stores the snapshot pause flag. +func WriteSnapshotDisabled(db ethdb.KeyValueWriter) { + if err := db.Put(snapshotDisabledKey, []byte("42")); err != nil { + log.Crit("Failed to store snapshot disabled flag", "err", err) + } +} + +// DeleteSnapshotDisabled deletes the flag keeping the snapshot maintenance disabled. +func DeleteSnapshotDisabled(db ethdb.KeyValueWriter) { + if err := db.Delete(snapshotDisabledKey); err != nil { + log.Crit("Failed to remove snapshot disabled flag", "err", err) + } +} + // ReadSnapshotRoot retrieves the root of the block whose state is contained in // the persisted snapshot. func ReadSnapshotRoot(db ethdb.KeyValueReader) common.Hash { diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 94759eb98..3a0a26c61 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -371,9 +371,9 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { var accounted bool for _, meta := range [][]byte{ databaseVersionKey, headHeaderKey, headBlockKey, headFastBlockKey, lastPivotKey, - fastTrieProgressKey, snapshotRootKey, snapshotJournalKey, snapshotGeneratorKey, - snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey, uncleanShutdownKey, - badBlockKey, + fastTrieProgressKey, snapshotDisabledKey, snapshotRootKey, snapshotJournalKey, + snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey, + uncleanShutdownKey, badBlockKey, } { if bytes.Equal(key, meta) { metadata.Add(size) diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 7a9738910..2505ce90b 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -45,6 +45,9 @@ var ( // fastTrieProgressKey tracks the number of trie entries imported during fast sync. fastTrieProgressKey = []byte("TrieSync") + // snapshotDisabledKey flags that the snapshot should not be maintained due to initial sync. + snapshotDisabledKey = []byte("SnapshotDisabled") + // snapshotRootKey tracks the hash of the last snapshot. snapshotRootKey = []byte("SnapshotRoot") diff --git a/core/state/snapshot/generate.go b/core/state/snapshot/generate.go index 8992d3f91..7e29e51b2 100644 --- a/core/state/snapshot/generate.go +++ b/core/state/snapshot/generate.go @@ -141,16 +141,6 @@ func (gs *generatorStats) Log(msg string, root common.Hash, marker []byte) { log.Info(msg, ctx...) } -// ClearSnapshotMarker sets the snapshot marker to zero, meaning that snapshots -// are not usable. -func ClearSnapshotMarker(diskdb ethdb.KeyValueStore) { - batch := diskdb.NewBatch() - journalProgress(batch, []byte{}, nil) - if err := batch.Write(); err != nil { - log.Crit("Failed to write initialized state marker", "err", err) - } -} - // generateSnapshot regenerates a brand new snapshot based on an existing state // database and head block asynchronously. The snapshot is returned immediately // and generation is continued in the background until done. diff --git a/core/state/snapshot/journal.go b/core/state/snapshot/journal.go index f8cec4d4e..5cfb9a9f2 100644 --- a/core/state/snapshot/journal.go +++ b/core/state/snapshot/journal.go @@ -126,12 +126,17 @@ func loadAndParseJournal(db ethdb.KeyValueStore, base *diskLayer) (snapshot, jou } // loadSnapshot loads a pre-existing state snapshot backed by a key-value store. -func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, recovery bool) (snapshot, error) { +func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, recovery bool) (snapshot, bool, error) { + // If snapshotting is disabled (initial sync in progress), don't do anything, + // wait for the chain to permit us to do something meaningful + if rawdb.ReadSnapshotDisabled(diskdb) { + return nil, true, nil + } // Retrieve the block number and hash of the snapshot, failing if no snapshot // is present in the database (or crashed mid-update). baseRoot := rawdb.ReadSnapshotRoot(diskdb) if baseRoot == (common.Hash{}) { - return nil, errors.New("missing or corrupted snapshot") + return nil, false, errors.New("missing or corrupted snapshot") } base := &diskLayer{ diskdb: diskdb, @@ -142,7 +147,7 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, snapshot, generator, err := loadAndParseJournal(diskdb, base) if err != nil { log.Warn("Failed to load new-format journal", "error", err) - return nil, err + return nil, false, err } // Entire snapshot journal loaded, sanity check the head. If the loaded // snapshot is not matched with current state root, print a warning log @@ -157,7 +162,7 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, // it's not in recovery mode, returns the error here for // rebuilding the entire snapshot forcibly. if !recovery { - return nil, fmt.Errorf("head doesn't match snapshot: have %#x, want %#x", head, root) + return nil, false, fmt.Errorf("head doesn't match snapshot: have %#x, want %#x", head, root) } // It's in snapshot recovery, the assumption is held that // the disk layer is always higher than chain head. It can @@ -187,7 +192,7 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, storage: common.StorageSize(generator.Storage), }) } - return snapshot, nil + return snapshot, false, nil } // loadDiffLayer reads the next sections of a snapshot journal, reconstructing a new diff --git a/core/state/snapshot/snapshot.go b/core/state/snapshot/snapshot.go index 9ecbd4a6c..cb8ec7a70 100644 --- a/core/state/snapshot/snapshot.go +++ b/core/state/snapshot/snapshot.go @@ -148,11 +148,11 @@ type snapshot interface { StorageIterator(account common.Hash, seek common.Hash) (StorageIterator, bool) } -// SnapshotTree is an Ethereum state snapshot tree. It consists of one persistent -// base layer backed by a key-value store, on top of which arbitrarily many in- -// memory diff layers are topped. The memory diffs can form a tree with branching, -// but the disk layer is singleton and common to all. If a reorg goes deeper than -// the disk layer, everything needs to be deleted. +// Tree is an Ethereum state snapshot tree. It consists of one persistent base +// layer backed by a key-value store, on top of which arbitrarily many in-memory +// diff layers are topped. The memory diffs can form a tree with branching, but +// the disk layer is singleton and common to all. If a reorg goes deeper than the +// disk layer, everything needs to be deleted. // // The goal of a state snapshot is twofold: to allow direct access to account and // storage data to avoid expensive multi-level trie lookups; and to allow sorted, @@ -186,7 +186,11 @@ func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root comm defer snap.waitBuild() } // Attempt to load a previously persisted snapshot and rebuild one if failed - head, err := loadSnapshot(diskdb, triedb, cache, root, recovery) + head, disabled, err := loadSnapshot(diskdb, triedb, cache, root, recovery) + if disabled { + log.Warn("Snapshot maintenance disabled (syncing)") + return snap, nil + } if err != nil { if rebuild { log.Warn("Failed to load snapshot, regenerating", "err", err) @@ -224,6 +228,55 @@ func (t *Tree) waitBuild() { } } +// Disable interrupts any pending snapshot generator, deletes all the snapshot +// layers in memory and marks snapshots disabled globally. In order to resume +// the snapshot functionality, the caller must invoke Rebuild. +func (t *Tree) Disable() { + // Interrupt any live snapshot layers + t.lock.Lock() + defer t.lock.Unlock() + + for _, layer := range t.layers { + switch layer := layer.(type) { + case *diskLayer: + // If the base layer is generating, abort it + if layer.genAbort != nil { + abort := make(chan *generatorStats) + layer.genAbort <- abort + <-abort + } + // Layer should be inactive now, mark it as stale + layer.lock.Lock() + layer.stale = true + layer.lock.Unlock() + + case *diffLayer: + // If the layer is a simple diff, simply mark as stale + layer.lock.Lock() + atomic.StoreUint32(&layer.stale, 1) + layer.lock.Unlock() + + default: + panic(fmt.Sprintf("unknown layer type: %T", layer)) + } + } + t.layers = map[common.Hash]snapshot{} + + // Delete all snapshot liveness information from the database + batch := t.diskdb.NewBatch() + + rawdb.WriteSnapshotDisabled(batch) + rawdb.DeleteSnapshotRoot(batch) + rawdb.DeleteSnapshotJournal(batch) + rawdb.DeleteSnapshotGenerator(batch) + rawdb.DeleteSnapshotRecoveryNumber(batch) + // Note, we don't delete the sync progress + + if err := batch.Write(); err != nil { + log.Crit("Failed to disable snapshots", "err", err) + } +} + // Snapshot retrieves a snapshot belonging to the given block root, or nil if no // snapshot is maintained for that block. func (t *Tree) Snapshot(blockRoot common.Hash) Snapshot { @@ -626,8 +679,9 @@ func (t *Tree) Rebuild(root common.Hash) { defer t.lock.Unlock() // Firstly delete any recovery flag in the database. Because now we are - // building a brand new snapshot. + // building a brand new snapshot. Also reenable the snapshot feature. rawdb.DeleteSnapshotRecoveryNumber(t.diskdb) + rawdb.DeleteSnapshotDisabled(t.diskdb) // Iterate over and mark all layers stale for _, layer := range t.layers { diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index b8cb48914..6f59b29a5 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" @@ -214,6 +215,9 @@ type BlockChain interface { // InsertReceiptChain inserts a batch of receipts into the local chain. InsertReceiptChain(types.Blocks, []types.Receipts, uint64) (int, error) + + // Snapshots returns the blockchain snapshot tree to paused it during sync. + Snapshots() *snapshot.Tree } // New creates a new downloader to fetch hashes and blocks from remote peers. @@ -393,6 +397,12 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode // but until snap becomes prevalent, we should support both. TODO(karalabe). if mode == SnapSync { if !d.snapSync { + // Snap sync uses the snapshot namespace to store potentially flakey data until + // sync completely heals and finishes. Pause snapshot maintenance in the mean + // time to prevent access. + if snapshots := d.blockchain.Snapshots(); snapshots != nil { // Only nil in tests + snapshots.Disable() + } log.Warn("Enabling snapshot sync prototype") d.snapSync = true } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 1140a444c..794160993 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/ethdb" @@ -409,6 +410,11 @@ func (dl *downloadTester) dropPeer(id string) { dl.downloader.UnregisterPeer(id) } +// Snapshots implements the BlockChain interface for the downloader, but is a noop. +func (dl *downloadTester) Snapshots() *snapshot.Tree { + return nil +} + type downloadTesterPeer struct { dl *downloadTester id string diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index d9c0cb9b1..237314916 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -551,11 +551,6 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { log.Debug("Snapshot sync already completed") return nil } - // If sync is still not finished, we need to ensure that any marker is wiped. - // Otherwise, it may happen that requests for e.g. genesis-data is delivered - // from the snapshot data, instead of from the trie - snapshot.ClearSnapshotMarker(s.db) - defer func() { // Persist any progress, independent of failure for _, task := range s.tasks { s.forwardAccountTask(task)