core/state/snapshot: update generator marker in sync with flushes
This commit is contained in:
parent
97fc1c3b1d
commit
7b7b327ff2
@ -675,7 +675,7 @@ func testSnapshot(t *testing.T, tt *snapshotTest) {
|
|||||||
if _, err := chain.InsertChain(blocks[startPoint:]); err != nil {
|
if _, err := chain.InsertChain(blocks[startPoint:]); err != nil {
|
||||||
t.Fatalf("Failed to import canonical chain tail: %v", err)
|
t.Fatalf("Failed to import canonical chain tail: %v", err)
|
||||||
}
|
}
|
||||||
// Set the flag for writing legacy journal if ncessary
|
// Set the flag for writing legacy journal if necessary
|
||||||
if tt.legacy {
|
if tt.legacy {
|
||||||
chain.writeLegacyJournal = true
|
chain.writeLegacyJournal = true
|
||||||
}
|
}
|
||||||
@ -708,7 +708,6 @@ func testSnapshot(t *testing.T, tt *snapshotTest) {
|
|||||||
} else if tt.gapped > 0 {
|
} else if tt.gapped > 0 {
|
||||||
// Insert blocks without enabling snapshot if gapping is required.
|
// Insert blocks without enabling snapshot if gapping is required.
|
||||||
chain.Stop()
|
chain.Stop()
|
||||||
|
|
||||||
gappedBlocks, _ := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], engine, gendb, tt.gapped, func(i int, b *BlockGen) {})
|
gappedBlocks, _ := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], engine, gendb, tt.gapped, func(i int, b *BlockGen) {})
|
||||||
|
|
||||||
// Insert a few more blocks without enabling snapshot
|
// Insert a few more blocks without enabling snapshot
|
||||||
@ -766,6 +765,7 @@ func testSnapshot(t *testing.T, tt *snapshotTest) {
|
|||||||
defer chain.Stop()
|
defer chain.Stop()
|
||||||
} else {
|
} else {
|
||||||
chain.Stop()
|
chain.Stop()
|
||||||
|
|
||||||
// Restart the chain normally
|
// Restart the chain normally
|
||||||
chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil)
|
chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -19,6 +19,7 @@ package snapshot
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -116,6 +117,38 @@ func generateSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache i
|
|||||||
return base
|
return base
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// journalProgress persists the generator stats into the database to resume later.
|
||||||
|
func journalProgress(db ethdb.KeyValueWriter, marker []byte, stats *generatorStats) {
|
||||||
|
// Write out the generator marker. Note it's a standalone disk layer generator
|
||||||
|
// which is not mixed with journal. It's ok if the generator is persisted while
|
||||||
|
// journal is not.
|
||||||
|
entry := journalGenerator{
|
||||||
|
Done: marker == nil,
|
||||||
|
Marker: marker,
|
||||||
|
}
|
||||||
|
if stats != nil {
|
||||||
|
entry.Wiping = (stats.wiping != nil)
|
||||||
|
entry.Accounts = stats.accounts
|
||||||
|
entry.Slots = stats.slots
|
||||||
|
entry.Storage = uint64(stats.storage)
|
||||||
|
}
|
||||||
|
blob, err := rlp.EncodeToBytes(entry)
|
||||||
|
if err != nil {
|
||||||
|
panic(err) // Cannot happen, here to catch dev errors
|
||||||
|
}
|
||||||
|
var logstr string
|
||||||
|
switch len(marker) {
|
||||||
|
case 0:
|
||||||
|
logstr = "done"
|
||||||
|
case common.HashLength:
|
||||||
|
logstr = fmt.Sprintf("%#x", marker)
|
||||||
|
default:
|
||||||
|
logstr = fmt.Sprintf("%#x:%#x", marker[:common.HashLength], marker[common.HashLength:])
|
||||||
|
}
|
||||||
|
log.Debug("Journalled generator progress", "progress", logstr)
|
||||||
|
rawdb.WriteSnapshotGenerator(db, blob)
|
||||||
|
}
|
||||||
|
|
||||||
// generate is a background thread that iterates over the state and storage tries,
|
// generate is a background thread that iterates over the state and storage tries,
|
||||||
// constructing the state snapshot. All the arguments are purely for statistics
|
// constructing the state snapshot. All the arguments are purely for statistics
|
||||||
// gethering and logging, since the method surfs the blocks as they arrive, often
|
// gethering and logging, since the method surfs the blocks as they arrive, often
|
||||||
@ -187,11 +220,15 @@ func (dl *diskLayer) generate(stats *generatorStats) {
|
|||||||
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
|
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
|
||||||
// Only write and set the marker if we actually did something useful
|
// Only write and set the marker if we actually did something useful
|
||||||
if batch.ValueSize() > 0 {
|
if batch.ValueSize() > 0 {
|
||||||
|
// Ensure the generator entry is in sync with the data
|
||||||
|
marker := accountHash[:]
|
||||||
|
journalProgress(batch, marker, stats)
|
||||||
|
|
||||||
batch.Write()
|
batch.Write()
|
||||||
batch.Reset()
|
batch.Reset()
|
||||||
|
|
||||||
dl.lock.Lock()
|
dl.lock.Lock()
|
||||||
dl.genMarker = accountHash[:]
|
dl.genMarker = marker
|
||||||
dl.lock.Unlock()
|
dl.lock.Unlock()
|
||||||
}
|
}
|
||||||
if abort != nil {
|
if abort != nil {
|
||||||
@ -228,11 +265,15 @@ func (dl *diskLayer) generate(stats *generatorStats) {
|
|||||||
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
|
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
|
||||||
// Only write and set the marker if we actually did something useful
|
// Only write and set the marker if we actually did something useful
|
||||||
if batch.ValueSize() > 0 {
|
if batch.ValueSize() > 0 {
|
||||||
|
// Ensure the generator entry is in sync with the data
|
||||||
|
marker := append(accountHash[:], storeIt.Key...)
|
||||||
|
journalProgress(batch, marker, stats)
|
||||||
|
|
||||||
batch.Write()
|
batch.Write()
|
||||||
batch.Reset()
|
batch.Reset()
|
||||||
|
|
||||||
dl.lock.Lock()
|
dl.lock.Lock()
|
||||||
dl.genMarker = append(accountHash[:], storeIt.Key...)
|
dl.genMarker = marker
|
||||||
dl.lock.Unlock()
|
dl.lock.Unlock()
|
||||||
}
|
}
|
||||||
if abort != nil {
|
if abort != nil {
|
||||||
@ -264,6 +305,9 @@ func (dl *diskLayer) generate(stats *generatorStats) {
|
|||||||
}
|
}
|
||||||
// Snapshot fully generated, set the marker to nil
|
// Snapshot fully generated, set the marker to nil
|
||||||
if batch.ValueSize() > 0 {
|
if batch.ValueSize() > 0 {
|
||||||
|
// Ensure the generator entry is in sync with the data
|
||||||
|
journalProgress(batch, nil, stats)
|
||||||
|
|
||||||
batch.Write()
|
batch.Write()
|
||||||
}
|
}
|
||||||
log.Info("Generated state snapshot", "accounts", stats.accounts, "slots", stats.slots,
|
log.Info("Generated state snapshot", "accounts", stats.accounts, "slots", stats.slots,
|
||||||
|
@ -276,8 +276,8 @@ func loadDiffLayer(parent snapshot, r *rlp.Stream) (snapshot, error) {
|
|||||||
return loadDiffLayer(newDiffLayer(parent, root, destructSet, accountData, storageData), r)
|
return loadDiffLayer(newDiffLayer(parent, root, destructSet, accountData, storageData), r)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Journal writes the persistent layer generator stats into a buffer to be stored
|
// Journal terminates any in-progress snapshot generation, also implicitly pushing
|
||||||
// in the database as the snapshot journal.
|
// the progress into the database.
|
||||||
func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
|
func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
|
||||||
// If the snapshot is currently being generated, abort it
|
// If the snapshot is currently being generated, abort it
|
||||||
var stats *generatorStats
|
var stats *generatorStats
|
||||||
@ -296,25 +296,10 @@ func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
|
|||||||
if dl.stale {
|
if dl.stale {
|
||||||
return common.Hash{}, ErrSnapshotStale
|
return common.Hash{}, ErrSnapshotStale
|
||||||
}
|
}
|
||||||
// Write out the generator marker. Note it's a standalone disk layer generator
|
// Ensure the generator stats is written even if none was ran this cycle
|
||||||
// which is not mixed with journal. It's ok if the generator is persisted while
|
journalProgress(dl.diskdb, dl.genMarker, stats)
|
||||||
// journal is not.
|
|
||||||
entry := journalGenerator{
|
log.Debug("Journalled disk layer", "root", dl.root)
|
||||||
Done: dl.genMarker == nil,
|
|
||||||
Marker: dl.genMarker,
|
|
||||||
}
|
|
||||||
if stats != nil {
|
|
||||||
entry.Wiping = (stats.wiping != nil)
|
|
||||||
entry.Accounts = stats.accounts
|
|
||||||
entry.Slots = stats.slots
|
|
||||||
entry.Storage = uint64(stats.storage)
|
|
||||||
}
|
|
||||||
blob, err := rlp.EncodeToBytes(entry)
|
|
||||||
if err != nil {
|
|
||||||
return common.Hash{}, err
|
|
||||||
}
|
|
||||||
log.Debug("Journalled disk layer", "root", dl.root, "complete", dl.genMarker == nil)
|
|
||||||
rawdb.WriteSnapshotGenerator(dl.diskdb, blob)
|
|
||||||
return dl.root, nil
|
return dl.root, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -401,6 +386,7 @@ func (dl *diskLayer) LegacyJournal(buffer *bytes.Buffer) (common.Hash, error) {
|
|||||||
entry.Slots = stats.slots
|
entry.Slots = stats.slots
|
||||||
entry.Storage = uint64(stats.storage)
|
entry.Storage = uint64(stats.storage)
|
||||||
}
|
}
|
||||||
|
log.Debug("Legacy journalled disk layer", "root", dl.root)
|
||||||
if err := rlp.Encode(buffer, entry); err != nil {
|
if err := rlp.Encode(buffer, entry); err != nil {
|
||||||
return common.Hash{}, err
|
return common.Hash{}, err
|
||||||
}
|
}
|
||||||
@ -455,6 +441,6 @@ func (dl *diffLayer) LegacyJournal(buffer *bytes.Buffer) (common.Hash, error) {
|
|||||||
if err := rlp.Encode(buffer, storage); err != nil {
|
if err := rlp.Encode(buffer, storage); err != nil {
|
||||||
return common.Hash{}, err
|
return common.Hash{}, err
|
||||||
}
|
}
|
||||||
log.Debug("Journalled diff layer", "root", dl.root, "parent", dl.parent.Root())
|
log.Debug("Legacy journalled disk layer", "root", dl.root, "parent", dl.parent.Root())
|
||||||
return base, nil
|
return base, nil
|
||||||
}
|
}
|
||||||
|
@ -512,22 +512,8 @@ func diffToDisk(bottom *diffLayer) *diskLayer {
|
|||||||
// Update the snapshot block marker and write any remainder data
|
// Update the snapshot block marker and write any remainder data
|
||||||
rawdb.WriteSnapshotRoot(batch, bottom.root)
|
rawdb.WriteSnapshotRoot(batch, bottom.root)
|
||||||
|
|
||||||
// Write out the generator marker
|
// Write out the generator progress marker and report
|
||||||
entry := journalGenerator{
|
journalProgress(batch, base.genMarker, stats)
|
||||||
Done: base.genMarker == nil,
|
|
||||||
Marker: base.genMarker,
|
|
||||||
}
|
|
||||||
if stats != nil {
|
|
||||||
entry.Wiping = (stats.wiping != nil)
|
|
||||||
entry.Accounts = stats.accounts
|
|
||||||
entry.Slots = stats.slots
|
|
||||||
entry.Storage = uint64(stats.storage)
|
|
||||||
}
|
|
||||||
blob, err := rlp.EncodeToBytes(entry)
|
|
||||||
if err != nil {
|
|
||||||
panic(fmt.Sprintf("Failed to RLP encode generator %v", err))
|
|
||||||
}
|
|
||||||
rawdb.WriteSnapshotGenerator(batch, blob)
|
|
||||||
|
|
||||||
// Flush all the updates in the single db operation. Ensure the
|
// Flush all the updates in the single db operation. Ensure the
|
||||||
// disk layer transition is atomic.
|
// disk layer transition is atomic.
|
||||||
|
Loading…
Reference in New Issue
Block a user