Merge pull request #22777 from karalabe/snapshots-abort-resume-on-sync
core, eth: abort snapshot generation on snap sync and resume later
This commit is contained in:
		
						commit
						8681a2536c
					
				| @ -640,7 +640,8 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error { | ||||
| 	headBlockGauge.Update(int64(block.NumberU64())) | ||||
| 	bc.chainmu.Unlock() | ||||
| 
 | ||||
| 	// Destroy any existing state snapshot and regenerate it in the background
 | ||||
| 	// Destroy any existing state snapshot and regenerate it in the background,
 | ||||
| 	// also resuming the normal maintenance of any previously paused snapshot.
 | ||||
| 	if bc.snaps != nil { | ||||
| 		bc.snaps.Rebuild(block.Root()) | ||||
| 	} | ||||
|  | ||||
| @ -24,6 +24,26 @@ import ( | ||||
| 	"github.com/ethereum/go-ethereum/log" | ||||
| ) | ||||
| 
 | ||||
| // ReadSnapshotDisabled retrieves if the snapshot maintenance is disabled.
 | ||||
| func ReadSnapshotDisabled(db ethdb.KeyValueReader) bool { | ||||
| 	disabled, _ := db.Has(snapshotDisabledKey) | ||||
| 	return disabled | ||||
| } | ||||
| 
 | ||||
| // WriteSnapshotDisabled stores the snapshot pause flag.
 | ||||
| func WriteSnapshotDisabled(db ethdb.KeyValueWriter) { | ||||
| 	if err := db.Put(snapshotDisabledKey, []byte("42")); err != nil { | ||||
| 		log.Crit("Failed to store snapshot disabled flag", "err", err) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // DeleteSnapshotDisabled deletes the flag keeping the snapshot maintenance disabled.
 | ||||
| func DeleteSnapshotDisabled(db ethdb.KeyValueWriter) { | ||||
| 	if err := db.Delete(snapshotDisabledKey); err != nil { | ||||
| 		log.Crit("Failed to remove snapshot disabled flag", "err", err) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // ReadSnapshotRoot retrieves the root of the block whose state is contained in
 | ||||
| // the persisted snapshot.
 | ||||
| func ReadSnapshotRoot(db ethdb.KeyValueReader) common.Hash { | ||||
|  | ||||
| @ -371,9 +371,9 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { | ||||
| 			var accounted bool | ||||
| 			for _, meta := range [][]byte{ | ||||
| 				databaseVersionKey, headHeaderKey, headBlockKey, headFastBlockKey, lastPivotKey, | ||||
| 				fastTrieProgressKey, snapshotRootKey, snapshotJournalKey, snapshotGeneratorKey, | ||||
| 				snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey, uncleanShutdownKey, | ||||
| 				badBlockKey, | ||||
| 				fastTrieProgressKey, snapshotDisabledKey, snapshotRootKey, snapshotJournalKey, | ||||
| 				snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey, | ||||
| 				uncleanShutdownKey, badBlockKey, | ||||
| 			} { | ||||
| 				if bytes.Equal(key, meta) { | ||||
| 					metadata.Add(size) | ||||
|  | ||||
| @ -45,6 +45,9 @@ var ( | ||||
| 	// fastTrieProgressKey tracks the number of trie entries imported during fast sync.
 | ||||
| 	fastTrieProgressKey = []byte("TrieSync") | ||||
| 
 | ||||
| 	// snapshotDisabledKey flags that the snapshot should not be maintained due to initial sync.
 | ||||
| 	snapshotDisabledKey = []byte("SnapshotDisabled") | ||||
| 
 | ||||
| 	// snapshotRootKey tracks the hash of the last snapshot.
 | ||||
| 	snapshotRootKey = []byte("SnapshotRoot") | ||||
| 
 | ||||
|  | ||||
| @ -141,16 +141,6 @@ func (gs *generatorStats) Log(msg string, root common.Hash, marker []byte) { | ||||
| 	log.Info(msg, ctx...) | ||||
| } | ||||
| 
 | ||||
| // ClearSnapshotMarker sets the snapshot marker to zero, meaning that snapshots
 | ||||
| // are not usable.
 | ||||
| func ClearSnapshotMarker(diskdb ethdb.KeyValueStore) { | ||||
| 	batch := diskdb.NewBatch() | ||||
| 	journalProgress(batch, []byte{}, nil) | ||||
| 	if err := batch.Write(); err != nil { | ||||
| 		log.Crit("Failed to write initialized state marker", "err", err) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // generateSnapshot regenerates a brand new snapshot based on an existing state
 | ||||
| // database and head block asynchronously. The snapshot is returned immediately
 | ||||
| // and generation is continued in the background until done.
 | ||||
|  | ||||
| @ -126,12 +126,17 @@ func loadAndParseJournal(db ethdb.KeyValueStore, base *diskLayer) (snapshot, jou | ||||
| } | ||||
| 
 | ||||
| // loadSnapshot loads a pre-existing state snapshot backed by a key-value store.
 | ||||
| func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, recovery bool) (snapshot, error) { | ||||
| func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, recovery bool) (snapshot, bool, error) { | ||||
| 	// If snapshotting is disabled (initial sync in progress), don't do anything,
 | ||||
| 	// wait for the chain to permit us to do something meaningful
 | ||||
| 	if rawdb.ReadSnapshotDisabled(diskdb) { | ||||
| 		return nil, true, nil | ||||
| 	} | ||||
| 	// Retrieve the block number and hash of the snapshot, failing if no snapshot
 | ||||
| 	// is present in the database (or crashed mid-update).
 | ||||
| 	baseRoot := rawdb.ReadSnapshotRoot(diskdb) | ||||
| 	if baseRoot == (common.Hash{}) { | ||||
| 		return nil, errors.New("missing or corrupted snapshot") | ||||
| 		return nil, false, errors.New("missing or corrupted snapshot") | ||||
| 	} | ||||
| 	base := &diskLayer{ | ||||
| 		diskdb: diskdb, | ||||
| @ -142,7 +147,7 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, | ||||
| 	snapshot, generator, err := loadAndParseJournal(diskdb, base) | ||||
| 	if err != nil { | ||||
| 		log.Warn("Failed to load new-format journal", "error", err) | ||||
| 		return nil, err | ||||
| 		return nil, false, err | ||||
| 	} | ||||
| 	// Entire snapshot journal loaded, sanity check the head. If the loaded
 | ||||
| 	// snapshot is not matched with current state root, print a warning log
 | ||||
| @ -157,7 +162,7 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, | ||||
| 		// it's not in recovery mode, returns the error here for
 | ||||
| 		// rebuilding the entire snapshot forcibly.
 | ||||
| 		if !recovery { | ||||
| 			return nil, fmt.Errorf("head doesn't match snapshot: have %#x, want %#x", head, root) | ||||
| 			return nil, false, fmt.Errorf("head doesn't match snapshot: have %#x, want %#x", head, root) | ||||
| 		} | ||||
| 		// It's in snapshot recovery, the assumption is held that
 | ||||
| 		// the disk layer is always higher than chain head. It can
 | ||||
| @ -187,7 +192,7 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, | ||||
| 			storage:  common.StorageSize(generator.Storage), | ||||
| 		}) | ||||
| 	} | ||||
| 	return snapshot, nil | ||||
| 	return snapshot, false, nil | ||||
| } | ||||
| 
 | ||||
| // loadDiffLayer reads the next sections of a snapshot journal, reconstructing a new
 | ||||
|  | ||||
| @ -148,11 +148,11 @@ type snapshot interface { | ||||
| 	StorageIterator(account common.Hash, seek common.Hash) (StorageIterator, bool) | ||||
| } | ||||
| 
 | ||||
| // SnapshotTree is an Ethereum state snapshot tree. It consists of one persistent
 | ||||
| // base layer backed by a key-value store, on top of which arbitrarily many in-
 | ||||
| // memory diff layers are topped. The memory diffs can form a tree with branching,
 | ||||
| // but the disk layer is singleton and common to all. If a reorg goes deeper than
 | ||||
| // the disk layer, everything needs to be deleted.
 | ||||
| // Tree is an Ethereum state snapshot tree. It consists of one persistent base
 | ||||
| // layer backed by a key-value store, on top of which arbitrarily many in-memory
 | ||||
| // diff layers are topped. The memory diffs can form a tree with branching, but
 | ||||
| // the disk layer is singleton and common to all. If a reorg goes deeper than the
 | ||||
| // disk layer, everything needs to be deleted.
 | ||||
| //
 | ||||
| // The goal of a state snapshot is twofold: to allow direct access to account and
 | ||||
| // storage data to avoid expensive multi-level trie lookups; and to allow sorted,
 | ||||
| @ -186,7 +186,11 @@ func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root comm | ||||
| 		defer snap.waitBuild() | ||||
| 	} | ||||
| 	// Attempt to load a previously persisted snapshot and rebuild one if failed
 | ||||
| 	head, err := loadSnapshot(diskdb, triedb, cache, root, recovery) | ||||
| 	head, disabled, err := loadSnapshot(diskdb, triedb, cache, root, recovery) | ||||
| 	if disabled { | ||||
| 		log.Warn("Snapshot maintenance disabled (syncing)") | ||||
| 		return snap, nil | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		if rebuild { | ||||
| 			log.Warn("Failed to load snapshot, regenerating", "err", err) | ||||
| @ -224,6 +228,55 @@ func (t *Tree) waitBuild() { | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // Disable interrupts any pending snapshot generator, deletes all the snapshot
 | ||||
| // layers in memory and marks snapshots disabled globally. In order to resume
 | ||||
| // the snapshot functionality, the caller must invoke Rebuild.
 | ||||
| func (t *Tree) Disable() { | ||||
| 	// Interrupt any live snapshot layers
 | ||||
| 	t.lock.Lock() | ||||
| 	defer t.lock.Unlock() | ||||
| 
 | ||||
| 	for _, layer := range t.layers { | ||||
| 		switch layer := layer.(type) { | ||||
| 		case *diskLayer: | ||||
| 			// If the base layer is generating, abort it
 | ||||
| 			if layer.genAbort != nil { | ||||
| 				abort := make(chan *generatorStats) | ||||
| 				layer.genAbort <- abort | ||||
| 				<-abort | ||||
| 			} | ||||
| 			// Layer should be inactive now, mark it as stale
 | ||||
| 			layer.lock.Lock() | ||||
| 			layer.stale = true | ||||
| 			layer.lock.Unlock() | ||||
| 
 | ||||
| 		case *diffLayer: | ||||
| 			// If the layer is a simple diff, simply mark as stale
 | ||||
| 			layer.lock.Lock() | ||||
| 			atomic.StoreUint32(&layer.stale, 1) | ||||
| 			layer.lock.Unlock() | ||||
| 
 | ||||
| 		default: | ||||
| 			panic(fmt.Sprintf("unknown layer type: %T", layer)) | ||||
| 		} | ||||
| 	} | ||||
| 	t.layers = map[common.Hash]snapshot{} | ||||
| 
 | ||||
| 	// Delete all snapshot liveness information from the database
 | ||||
| 	batch := t.diskdb.NewBatch() | ||||
| 
 | ||||
| 	rawdb.WriteSnapshotDisabled(batch) | ||||
| 	rawdb.DeleteSnapshotRoot(batch) | ||||
| 	rawdb.DeleteSnapshotJournal(batch) | ||||
| 	rawdb.DeleteSnapshotGenerator(batch) | ||||
| 	rawdb.DeleteSnapshotRecoveryNumber(batch) | ||||
| 	// Note, we don't delete the sync progress
 | ||||
| 
 | ||||
| 	if err := batch.Write(); err != nil { | ||||
| 		log.Crit("Failed to disable snapshots", "err", err) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // Snapshot retrieves a snapshot belonging to the given block root, or nil if no
 | ||||
| // snapshot is maintained for that block.
 | ||||
| func (t *Tree) Snapshot(blockRoot common.Hash) Snapshot { | ||||
| @ -626,8 +679,9 @@ func (t *Tree) Rebuild(root common.Hash) { | ||||
| 	defer t.lock.Unlock() | ||||
| 
 | ||||
| 	// Firstly delete any recovery flag in the database. Because now we are
 | ||||
| 	// building a brand new snapshot.
 | ||||
| 	// building a brand new snapshot. Also reenable the snapshot feature.
 | ||||
| 	rawdb.DeleteSnapshotRecoveryNumber(t.diskdb) | ||||
| 	rawdb.DeleteSnapshotDisabled(t.diskdb) | ||||
| 
 | ||||
| 	// Iterate over and mark all layers stale
 | ||||
| 	for _, layer := range t.layers { | ||||
|  | ||||
| @ -28,6 +28,7 @@ import ( | ||||
| 	"github.com/ethereum/go-ethereum" | ||||
| 	"github.com/ethereum/go-ethereum/common" | ||||
| 	"github.com/ethereum/go-ethereum/core/rawdb" | ||||
| 	"github.com/ethereum/go-ethereum/core/state/snapshot" | ||||
| 	"github.com/ethereum/go-ethereum/core/types" | ||||
| 	"github.com/ethereum/go-ethereum/eth/protocols/eth" | ||||
| 	"github.com/ethereum/go-ethereum/eth/protocols/snap" | ||||
| @ -214,6 +215,9 @@ type BlockChain interface { | ||||
| 
 | ||||
| 	// InsertReceiptChain inserts a batch of receipts into the local chain.
 | ||||
| 	InsertReceiptChain(types.Blocks, []types.Receipts, uint64) (int, error) | ||||
| 
 | ||||
| 	// Snapshots returns the blockchain snapshot tree to paused it during sync.
 | ||||
| 	Snapshots() *snapshot.Tree | ||||
| } | ||||
| 
 | ||||
| // New creates a new downloader to fetch hashes and blocks from remote peers.
 | ||||
| @ -393,6 +397,12 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode | ||||
| 	// but until snap becomes prevalent, we should support both. TODO(karalabe).
 | ||||
| 	if mode == SnapSync { | ||||
| 		if !d.snapSync { | ||||
| 			// Snap sync uses the snapshot namespace to store potentially flakey data until
 | ||||
| 			// sync completely heals and finishes. Pause snapshot maintenance in the mean
 | ||||
| 			// time to prevent access.
 | ||||
| 			if snapshots := d.blockchain.Snapshots(); snapshots != nil { // Only nil in tests
 | ||||
| 				snapshots.Disable() | ||||
| 			} | ||||
| 			log.Warn("Enabling snapshot sync prototype") | ||||
| 			d.snapSync = true | ||||
| 		} | ||||
|  | ||||
| @ -29,6 +29,7 @@ import ( | ||||
| 	"github.com/ethereum/go-ethereum" | ||||
| 	"github.com/ethereum/go-ethereum/common" | ||||
| 	"github.com/ethereum/go-ethereum/core/rawdb" | ||||
| 	"github.com/ethereum/go-ethereum/core/state/snapshot" | ||||
| 	"github.com/ethereum/go-ethereum/core/types" | ||||
| 	"github.com/ethereum/go-ethereum/eth/protocols/eth" | ||||
| 	"github.com/ethereum/go-ethereum/ethdb" | ||||
| @ -409,6 +410,11 @@ func (dl *downloadTester) dropPeer(id string) { | ||||
| 	dl.downloader.UnregisterPeer(id) | ||||
| } | ||||
| 
 | ||||
| // Snapshots implements the BlockChain interface for the downloader, but is a noop.
 | ||||
| func (dl *downloadTester) Snapshots() *snapshot.Tree { | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| type downloadTesterPeer struct { | ||||
| 	dl            *downloadTester | ||||
| 	id            string | ||||
|  | ||||
| @ -551,11 +551,6 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { | ||||
| 		log.Debug("Snapshot sync already completed") | ||||
| 		return nil | ||||
| 	} | ||||
| 	// If sync is still not finished, we need to ensure that any marker is wiped.
 | ||||
| 	// Otherwise, it may happen that requests for e.g. genesis-data is delivered
 | ||||
| 	// from the snapshot data, instead of from the trie
 | ||||
| 	snapshot.ClearSnapshotMarker(s.db) | ||||
| 
 | ||||
| 	defer func() { // Persist any progress, independent of failure
 | ||||
| 		for _, task := range s.tasks { | ||||
| 			s.forwardAccountTask(task) | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user