forked from cerc-io/plugeth
core/state/snapshot: fix journal recovery from generating old journal (#21775)
* core/state/snapshot: print warning if failed to resolve journal * core/state/snapshot: fix snapshot recovery When we meet the snapshot journal consisted with: - disk layer generator with new-format - diff layer journal with old-format The base layer should be returned without error. The broken diff layer can be reconstructed later but we definitely don't want to reconstruct the huge diff layer. * core: add tests
This commit is contained in:
parent
3eebf34038
commit
e6402677c2
@ -43,10 +43,11 @@ import (
|
|||||||
// (v) Geth restarts normally, but it's requested to be rewound to a lower point via SetHead
|
// (v) Geth restarts normally, but it's requested to be rewound to a lower point via SetHead
|
||||||
// (vi) Geth restarts normally with a stale snapshot
|
// (vi) Geth restarts normally with a stale snapshot
|
||||||
type snapshotTest struct {
|
type snapshotTest struct {
|
||||||
legacy bool // Flag whether the loaded snapshot is in legacy format
|
legacy bool // Flag whether the loaded snapshot is in legacy format
|
||||||
crash bool // Flag whether the Geth restarts from the previous crash
|
crash bool // Flag whether the Geth restarts from the previous crash
|
||||||
gapped int // Number of blocks to insert without enabling snapshot
|
restartCrash int // Number of blocks to insert after the normal stop, then the crash happens
|
||||||
setHead uint64 // Block number to set head back to
|
gapped int // Number of blocks to insert without enabling snapshot
|
||||||
|
setHead uint64 // Block number to set head back to
|
||||||
|
|
||||||
chainBlocks int // Number of blocks to generate for the canonical chain
|
chainBlocks int // Number of blocks to generate for the canonical chain
|
||||||
snapshotBlock uint64 // Block number of the relevant snapshot disk layer
|
snapshotBlock uint64 // Block number of the relevant snapshot disk layer
|
||||||
@ -565,10 +566,50 @@ func TestSetHeadWithLegacySnapshot(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests the Geth was running with snapshot(legacy-format) enabled and upgrades
|
||||||
|
// the disk layer journal(journal generator) to latest format. After that the Geth
|
||||||
|
// is restarted from a crash. In this case Geth will find the new-format disk layer
|
||||||
|
// journal but with legacy-format diff journal(the new-format is never committed),
|
||||||
|
// and the invalid diff journal is expected to be dropped.
|
||||||
|
func TestRecoverSnapshotFromCrashWithLegacyDiffJournal(t *testing.T) {
|
||||||
|
// Chain:
|
||||||
|
// G->C1->C2->C3->C4->C5->C6->C7->C8 (HEAD)
|
||||||
|
//
|
||||||
|
// Commit: G
|
||||||
|
// Snapshot: G
|
||||||
|
//
|
||||||
|
// SetHead(0)
|
||||||
|
//
|
||||||
|
// ------------------------------
|
||||||
|
//
|
||||||
|
// Expected in leveldb:
|
||||||
|
// G->C1->C2->C3->C4->C5->C6->C7->C8->C9->C10
|
||||||
|
//
|
||||||
|
// Expected head header : C10
|
||||||
|
// Expected head fast block: C10
|
||||||
|
// Expected head block : C8
|
||||||
|
// Expected snapshot disk : C10
|
||||||
|
testSnapshot(t, &snapshotTest{
|
||||||
|
legacy: true,
|
||||||
|
crash: false,
|
||||||
|
restartCrash: 2,
|
||||||
|
gapped: 0,
|
||||||
|
setHead: 0,
|
||||||
|
chainBlocks: 8,
|
||||||
|
snapshotBlock: 0,
|
||||||
|
commitBlock: 0,
|
||||||
|
expCanonicalBlocks: 10,
|
||||||
|
expHeadHeader: 10,
|
||||||
|
expHeadFastBlock: 10,
|
||||||
|
expHeadBlock: 8, // The persisted state in the first running
|
||||||
|
expSnapshotBottom: 10, // The persisted disk layer in the second running
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func testSnapshot(t *testing.T, tt *snapshotTest) {
|
func testSnapshot(t *testing.T, tt *snapshotTest) {
|
||||||
// It's hard to follow the test case, visualize the input
|
// It's hard to follow the test case, visualize the input
|
||||||
//log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
|
// log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
|
||||||
//fmt.Println(tt.dump())
|
// fmt.Println(tt.dump())
|
||||||
|
|
||||||
// Create a temporary persistent database
|
// Create a temporary persistent database
|
||||||
datadir, err := ioutil.TempDir("", "")
|
datadir, err := ioutil.TempDir("", "")
|
||||||
@ -694,6 +735,30 @@ func testSnapshot(t *testing.T, tt *snapshotTest) {
|
|||||||
chain.SetHead(tt.setHead)
|
chain.SetHead(tt.setHead)
|
||||||
chain.Stop()
|
chain.Stop()
|
||||||
|
|
||||||
|
chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to recreate chain: %v", err)
|
||||||
|
}
|
||||||
|
defer chain.Stop()
|
||||||
|
} else if tt.restartCrash != 0 {
|
||||||
|
// Firstly, stop the chain properly, with all snapshot journal
|
||||||
|
// and state committed.
|
||||||
|
chain.Stop()
|
||||||
|
|
||||||
|
// Restart chain, forcibly flush the disk layer journal with new format
|
||||||
|
newBlocks, _ := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], engine, gendb, tt.restartCrash, func(i int, b *BlockGen) {})
|
||||||
|
chain, err = NewBlockChain(db, cacheConfig, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to recreate chain: %v", err)
|
||||||
|
}
|
||||||
|
chain.InsertChain(newBlocks)
|
||||||
|
chain.Snapshot().Cap(newBlocks[len(newBlocks)-1].Root(), 0)
|
||||||
|
|
||||||
|
// Simulate the blockchain crash
|
||||||
|
// Don't call chain.Stop here, so that no snapshot
|
||||||
|
// journal and latest state will be committed
|
||||||
|
|
||||||
|
// Restart the chain after the crash
|
||||||
chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil)
|
chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to recreate chain: %v", err)
|
t.Fatalf("Failed to recreate chain: %v", err)
|
||||||
|
@ -103,8 +103,9 @@ func loadAndParseJournal(db ethdb.KeyValueStore, base *diskLayer) (snapshot, jou
|
|||||||
// Retrieve the diff layer journal. It's possible that the journal is
|
// Retrieve the diff layer journal. It's possible that the journal is
|
||||||
// not existent, e.g. the disk layer is generating while that the Geth
|
// not existent, e.g. the disk layer is generating while that the Geth
|
||||||
// crashes without persisting the diff journal.
|
// crashes without persisting the diff journal.
|
||||||
// So if there is no journal, or the journal is not matched with disk
|
// So if there is no journal, or the journal is invalid(e.g. the journal
|
||||||
// layer, we just discard all diffs and try to recover them later.
|
// is not matched with disk layer; or the it's the legacy-format journal,
|
||||||
|
// etc.), we just discard all diffs and try to recover them later.
|
||||||
journal := rawdb.ReadSnapshotJournal(db)
|
journal := rawdb.ReadSnapshotJournal(db)
|
||||||
if len(journal) == 0 {
|
if len(journal) == 0 {
|
||||||
log.Warn("Loaded snapshot journal", "diskroot", base.root, "diffs", "missing")
|
log.Warn("Loaded snapshot journal", "diskroot", base.root, "diffs", "missing")
|
||||||
@ -115,13 +116,16 @@ func loadAndParseJournal(db ethdb.KeyValueStore, base *diskLayer) (snapshot, jou
|
|||||||
// Firstly, resolve the first element as the journal version
|
// Firstly, resolve the first element as the journal version
|
||||||
version, err := r.Uint()
|
version, err := r.Uint()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, journalGenerator{}, err
|
log.Warn("Failed to resolve the journal version", "error", err)
|
||||||
|
return base, generator, nil
|
||||||
}
|
}
|
||||||
if version != journalVersion {
|
if version != journalVersion {
|
||||||
return nil, journalGenerator{}, fmt.Errorf("journal version mismatch, want %d got %v", journalVersion, version)
|
log.Warn("Discarded the snapshot journal with wrong version", "required", journalVersion, "got", version)
|
||||||
|
return base, generator, nil
|
||||||
}
|
}
|
||||||
// Secondly, resolve the disk layer root, ensure it's continuous
|
// Secondly, resolve the disk layer root, ensure it's continuous
|
||||||
// with disk layer.
|
// with disk layer. Note now we can ensure it's the snapshot journal
|
||||||
|
// correct version, so we expect everything can be resolved properly.
|
||||||
var root common.Hash
|
var root common.Hash
|
||||||
if err := r.Decode(&root); err != nil {
|
if err := r.Decode(&root); err != nil {
|
||||||
return nil, journalGenerator{}, errors.New("missing disk layer root")
|
return nil, journalGenerator{}, errors.New("missing disk layer root")
|
||||||
@ -159,7 +163,7 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int,
|
|||||||
var legacy bool
|
var legacy bool
|
||||||
snapshot, generator, err := loadAndParseJournal(diskdb, base)
|
snapshot, generator, err := loadAndParseJournal(diskdb, base)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug("Failed to load new-format journal", "error", err)
|
log.Warn("Failed to load new-format journal", "error", err)
|
||||||
snapshot, generator, err = loadAndParseLegacyJournal(diskdb, base)
|
snapshot, generator, err = loadAndParseLegacyJournal(diskdb, base)
|
||||||
legacy = true
|
legacy = true
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user