From 5e9f5ca5d302298b933668af539ad1e213bdfa6e Mon Sep 17 00:00:00 2001 From: gary rong Date: Mon, 18 Jan 2021 21:39:43 +0800 Subject: [PATCH] core/state/snapshot: write snapshot generator in batch (#22163) * core/state/snapshot: write snapshot generator in batch * core: refactor the tests * core: update tests * core: update tests --- core/blockchain_snapshot_test.go | 1033 ++++++++++++++++++------------ core/state/snapshot/generate.go | 35 +- core/state/snapshot/journal.go | 2 +- 3 files changed, 653 insertions(+), 417 deletions(-) diff --git a/core/blockchain_snapshot_test.go b/core/blockchain_snapshot_test.go index f35dae167..cb634a451 100644 --- a/core/blockchain_snapshot_test.go +++ b/core/blockchain_snapshot_test.go @@ -28,27 +28,19 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state/snapshot" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/params" ) -// snapshotTest is a test case for snapshot recovery. It can be used for -// simulating these scenarios: -// (i) Geth restarts normally with valid legacy snapshot -// (ii) Geth restarts normally with valid new-format snapshot -// (iii) Geth restarts after the crash, with broken legacy snapshot -// (iv) Geth restarts after the crash, with broken new-format snapshot -// (v) Geth restarts normally, but it's requested to be rewound to a lower point via SetHead -// (vi) Geth restarts normally with a stale snapshot -type snapshotTest struct { - legacy bool // Flag whether the loaded snapshot is in legacy format - crash bool // Flag whether the Geth restarts from the previous crash - restartCrash int // Number of blocks to insert after the normal stop, then the crash happens - gapped int // Number of blocks to insert without enabling snapshot - setHead uint64 // Block number to set head back to - +// snapshotTestBasic wraps the common testing fields in the snapshot tests. +type snapshotTestBasic struct { + legacy bool // Wether write the snapshot journal in legacy format chainBlocks int // Number of blocks to generate for the canonical chain snapshotBlock uint64 // Block number of the relevant snapshot disk layer commitBlock uint64 // Block number for which to commit the state to disk @@ -58,56 +50,418 @@ type snapshotTest struct { expHeadFastBlock uint64 // Block number of the expected head fast sync block expHeadBlock uint64 // Block number of the expected head full block expSnapshotBottom uint64 // The block height corresponding to the snapshot disk layer + + // share fields, set in runtime + datadir string + db ethdb.Database + gendb ethdb.Database + engine consensus.Engine } -func (tt *snapshotTest) dump() string { +func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Block) { + // Create a temporary persistent database + datadir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("Failed to create temporary datadir: %v", err) + } + os.RemoveAll(datadir) + + db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "") + if err != nil { + t.Fatalf("Failed to create persistent database: %v", err) + } + // Initialize a fresh chain + var ( + genesis = new(Genesis).MustCommit(db) + engine = ethash.NewFullFaker() + gendb = rawdb.NewMemoryDatabase() + + // Snapshot is enabled, the first snapshot is created from the Genesis. + // The snapshot memory allowance is 256MB, it means no snapshot flush + // will happen during the block insertion. + cacheConfig = defaultCacheConfig + ) + chain, err := NewBlockChain(db, cacheConfig, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil) + if err != nil { + t.Fatalf("Failed to create chain: %v", err) + } + blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, gendb, basic.chainBlocks, func(i int, b *BlockGen) {}) + + // Insert the blocks with configured settings. + var breakpoints []uint64 + if basic.commitBlock > basic.snapshotBlock { + breakpoints = append(breakpoints, basic.snapshotBlock, basic.commitBlock) + } else { + breakpoints = append(breakpoints, basic.commitBlock, basic.snapshotBlock) + } + var startPoint uint64 + for _, point := range breakpoints { + if _, err := chain.InsertChain(blocks[startPoint:point]); err != nil { + t.Fatalf("Failed to import canonical chain start: %v", err) + } + startPoint = point + + if basic.commitBlock > 0 && basic.commitBlock == point { + chain.stateCache.TrieDB().Commit(blocks[point-1].Root(), true, nil) + } + if basic.snapshotBlock > 0 && basic.snapshotBlock == point { + if basic.legacy { + // Here we commit the snapshot disk root to simulate + // committing the legacy snapshot. + rawdb.WriteSnapshotRoot(db, blocks[point-1].Root()) + } else { + // Flushing the entire snap tree into the disk, the + // relavant (a) snapshot root and (b) snapshot generator + // will be persisted atomically. + chain.snaps.Cap(blocks[point-1].Root(), 0) + diskRoot, blockRoot := chain.snaps.DiskRoot(), blocks[point-1].Root() + if !bytes.Equal(diskRoot.Bytes(), blockRoot.Bytes()) { + t.Fatalf("Failed to flush disk layer change, want %x, got %x", blockRoot, diskRoot) + } + } + } + } + if _, err := chain.InsertChain(blocks[startPoint:]); err != nil { + t.Fatalf("Failed to import canonical chain tail: %v", err) + } + + // Set runtime fields + basic.datadir = datadir + basic.db = db + basic.gendb = gendb + basic.engine = engine + + // Ugly hack, notify the chain to flush the journal in legacy format + // if it's requested. + if basic.legacy { + chain.writeLegacyJournal = true + } + return chain, blocks +} + +func (basic *snapshotTestBasic) verify(t *testing.T, chain *BlockChain, blocks []*types.Block) { + // Iterate over all the remaining blocks and ensure there are no gaps + verifyNoGaps(t, chain, true, blocks) + verifyCutoff(t, chain, true, blocks, basic.expCanonicalBlocks) + + if head := chain.CurrentHeader(); head.Number.Uint64() != basic.expHeadHeader { + t.Errorf("Head header mismatch: have %d, want %d", head.Number, basic.expHeadHeader) + } + if head := chain.CurrentFastBlock(); head.NumberU64() != basic.expHeadFastBlock { + t.Errorf("Head fast block mismatch: have %d, want %d", head.NumberU64(), basic.expHeadFastBlock) + } + if head := chain.CurrentBlock(); head.NumberU64() != basic.expHeadBlock { + t.Errorf("Head block mismatch: have %d, want %d", head.NumberU64(), basic.expHeadBlock) + } + + // Check the disk layer, ensure they are matched + block := chain.GetBlockByNumber(basic.expSnapshotBottom) + if block == nil { + t.Errorf("The correspnding block[%d] of snapshot disk layer is missing", basic.expSnapshotBottom) + } else if !bytes.Equal(chain.snaps.DiskRoot().Bytes(), block.Root().Bytes()) { + t.Errorf("The snapshot disk layer root is incorrect, want %x, get %x", block.Root(), chain.snaps.DiskRoot()) + } + + // Check the snapshot, ensure it's integrated + if err := snapshot.VerifyState(chain.snaps, block.Root()); err != nil { + t.Errorf("The disk layer is not integrated %v", err) + } +} + +func (basic *snapshotTestBasic) dump() string { buffer := new(strings.Builder) fmt.Fprint(buffer, "Chain:\n G") - for i := 0; i < tt.chainBlocks; i++ { + for i := 0; i < basic.chainBlocks; i++ { fmt.Fprintf(buffer, "->C%d", i+1) } fmt.Fprint(buffer, " (HEAD)\n\n") fmt.Fprintf(buffer, "Commit: G") - if tt.commitBlock > 0 { - fmt.Fprintf(buffer, ", C%d", tt.commitBlock) + if basic.commitBlock > 0 { + fmt.Fprintf(buffer, ", C%d", basic.commitBlock) } fmt.Fprint(buffer, "\n") fmt.Fprintf(buffer, "Snapshot: G") - if tt.snapshotBlock > 0 { - fmt.Fprintf(buffer, ", C%d", tt.snapshotBlock) + if basic.snapshotBlock > 0 { + fmt.Fprintf(buffer, ", C%d", basic.snapshotBlock) } fmt.Fprint(buffer, "\n") - if tt.crash { - fmt.Fprintf(buffer, "\nCRASH\n\n") - } else { - fmt.Fprintf(buffer, "\nSetHead(%d)\n\n", tt.setHead) - } + //if crash { + // fmt.Fprintf(buffer, "\nCRASH\n\n") + //} else { + // fmt.Fprintf(buffer, "\nSetHead(%d)\n\n", basic.setHead) + //} fmt.Fprintf(buffer, "------------------------------\n\n") fmt.Fprint(buffer, "Expected in leveldb:\n G") - for i := 0; i < tt.expCanonicalBlocks; i++ { + for i := 0; i < basic.expCanonicalBlocks; i++ { fmt.Fprintf(buffer, "->C%d", i+1) } fmt.Fprintf(buffer, "\n\n") - fmt.Fprintf(buffer, "Expected head header : C%d\n", tt.expHeadHeader) - fmt.Fprintf(buffer, "Expected head fast block: C%d\n", tt.expHeadFastBlock) - if tt.expHeadBlock == 0 { + fmt.Fprintf(buffer, "Expected head header : C%d\n", basic.expHeadHeader) + fmt.Fprintf(buffer, "Expected head fast block: C%d\n", basic.expHeadFastBlock) + if basic.expHeadBlock == 0 { fmt.Fprintf(buffer, "Expected head block : G\n") } else { - fmt.Fprintf(buffer, "Expected head block : C%d\n", tt.expHeadBlock) + fmt.Fprintf(buffer, "Expected head block : C%d\n", basic.expHeadBlock) } - if tt.expSnapshotBottom == 0 { + if basic.expSnapshotBottom == 0 { fmt.Fprintf(buffer, "Expected snapshot disk : G\n") } else { - fmt.Fprintf(buffer, "Expected snapshot disk : C%d\n", tt.expSnapshotBottom) + fmt.Fprintf(buffer, "Expected snapshot disk : C%d\n", basic.expSnapshotBottom) } return buffer.String() } +func (basic *snapshotTestBasic) teardown() { + basic.db.Close() + basic.gendb.Close() + os.RemoveAll(basic.datadir) +} + +// snapshotTest is a test case type for normal snapshot recovery. +// It can be used for testing that restart Geth normally. +type snapshotTest struct { + snapshotTestBasic +} + +func (snaptest *snapshotTest) test(t *testing.T) { + // It's hard to follow the test case, visualize the input + // log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) + // fmt.Println(tt.dump()) + chain, blocks := snaptest.prepare(t) + + // Restart the chain normally + chain.Stop() + newchain, err := NewBlockChain(snaptest.db, nil, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil) + if err != nil { + t.Fatalf("Failed to recreate chain: %v", err) + } + defer newchain.Stop() + + snaptest.verify(t, newchain, blocks) +} + +// crashSnapshotTest is a test case type for innormal snapshot recovery. +// It can be used for testing that restart Geth after the crash. +type crashSnapshotTest struct { + snapshotTestBasic +} + +func (snaptest *crashSnapshotTest) test(t *testing.T) { + // It's hard to follow the test case, visualize the input + // log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) + // fmt.Println(tt.dump()) + chain, blocks := snaptest.prepare(t) + + // Pull the plug on the database, simulating a hard crash + db := chain.db + db.Close() + + // Start a new blockchain back up and see where the repair leads us + newdb, err := rawdb.NewLevelDBDatabaseWithFreezer(snaptest.datadir, 0, 0, snaptest.datadir, "") + if err != nil { + t.Fatalf("Failed to reopen persistent database: %v", err) + } + defer newdb.Close() + + // The interesting thing is: instead of starting the blockchain after + // the crash, we do restart twice here: one after the crash and one + // after the normal stop. It's used to ensure the broken snapshot + // can be detected all the time. + newchain, err := NewBlockChain(newdb, nil, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil) + if err != nil { + t.Fatalf("Failed to recreate chain: %v", err) + } + newchain.Stop() + + newchain, err = NewBlockChain(newdb, nil, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil) + if err != nil { + t.Fatalf("Failed to recreate chain: %v", err) + } + defer newchain.Stop() + + snaptest.verify(t, newchain, blocks) +} + +// gappedSnapshotTest is a test type used to test this scenario: +// - have a complete snapshot +// - restart without enabling the snapshot +// - insert a few blocks +// - restart with enabling the snapshot again +type gappedSnapshotTest struct { + snapshotTestBasic + gapped int // Number of blocks to insert without enabling snapshot +} + +func (snaptest *gappedSnapshotTest) test(t *testing.T) { + // It's hard to follow the test case, visualize the input + // log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) + // fmt.Println(tt.dump()) + chain, blocks := snaptest.prepare(t) + + // Insert blocks without enabling snapshot if gapping is required. + chain.Stop() + gappedBlocks, _ := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], snaptest.engine, snaptest.gendb, snaptest.gapped, func(i int, b *BlockGen) {}) + + // Insert a few more blocks without enabling snapshot + var cacheConfig = &CacheConfig{ + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TrieTimeLimit: 5 * time.Minute, + SnapshotLimit: 0, + } + newchain, err := NewBlockChain(snaptest.db, cacheConfig, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil) + if err != nil { + t.Fatalf("Failed to recreate chain: %v", err) + } + newchain.InsertChain(gappedBlocks) + newchain.Stop() + + // Restart the chain with enabling the snapshot + newchain, err = NewBlockChain(snaptest.db, nil, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil) + if err != nil { + t.Fatalf("Failed to recreate chain: %v", err) + } + defer newchain.Stop() + + snaptest.verify(t, newchain, blocks) +} + +// setHeadSnapshotTest is the test type used to test this scenario: +// - have a complete snapshot +// - set the head to a lower point +// - restart +type setHeadSnapshotTest struct { + snapshotTestBasic + setHead uint64 // Block number to set head back to +} + +func (snaptest *setHeadSnapshotTest) test(t *testing.T) { + // It's hard to follow the test case, visualize the input + // log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) + // fmt.Println(tt.dump()) + chain, blocks := snaptest.prepare(t) + + // Rewind the chain if setHead operation is required. + chain.SetHead(snaptest.setHead) + chain.Stop() + + newchain, err := NewBlockChain(snaptest.db, nil, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil) + if err != nil { + t.Fatalf("Failed to recreate chain: %v", err) + } + defer newchain.Stop() + + snaptest.verify(t, newchain, blocks) +} + +// restartCrashSnapshotTest is the test type used to test this scenario: +// - have a complete snapshot +// - restart chain +// - insert more blocks with enabling the snapshot +// - commit the snapshot +// - crash +// - restart again +type restartCrashSnapshotTest struct { + snapshotTestBasic + newBlocks int +} + +func (snaptest *restartCrashSnapshotTest) test(t *testing.T) { + // It's hard to follow the test case, visualize the input + // log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) + // fmt.Println(tt.dump()) + chain, blocks := snaptest.prepare(t) + + // Firstly, stop the chain properly, with all snapshot journal + // and state committed. + chain.Stop() + + newchain, err := NewBlockChain(snaptest.db, nil, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil) + if err != nil { + t.Fatalf("Failed to recreate chain: %v", err) + } + newBlocks, _ := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], snaptest.engine, snaptest.gendb, snaptest.newBlocks, func(i int, b *BlockGen) {}) + newchain.InsertChain(newBlocks) + + // Commit the entire snapshot into the disk if requested. Note only + // (a) snapshot root and (b) snapshot generator will be committed, + // the diff journal is not. + newchain.Snapshots().Cap(newBlocks[len(newBlocks)-1].Root(), 0) + + // Simulate the blockchain crash + // Don't call chain.Stop here, so that no snapshot + // journal and latest state will be committed + + // Restart the chain after the crash + newchain, err = NewBlockChain(snaptest.db, nil, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil) + if err != nil { + t.Fatalf("Failed to recreate chain: %v", err) + } + defer newchain.Stop() + + snaptest.verify(t, newchain, blocks) +} + +// wipeCrashSnapshotTest is the test type used to test this scenario: +// - have a complete snapshot +// - restart, insert more blocks without enabling the snapshot +// - restart again with enabling the snapshot +// - crash +type wipeCrashSnapshotTest struct { + snapshotTestBasic + newBlocks int +} + +func (snaptest *wipeCrashSnapshotTest) test(t *testing.T) { + // It's hard to follow the test case, visualize the input + // log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) + // fmt.Println(tt.dump()) + chain, blocks := snaptest.prepare(t) + + // Firstly, stop the chain properly, with all snapshot journal + // and state committed. + chain.Stop() + + config := &CacheConfig{ + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TrieTimeLimit: 5 * time.Minute, + SnapshotLimit: 0, + } + newchain, err := NewBlockChain(snaptest.db, config, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil) + if err != nil { + t.Fatalf("Failed to recreate chain: %v", err) + } + newBlocks, _ := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], snaptest.engine, snaptest.gendb, snaptest.newBlocks, func(i int, b *BlockGen) {}) + newchain.InsertChain(newBlocks) + newchain.Stop() + + // Restart the chain, the wiper should starts working + config = &CacheConfig{ + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TrieTimeLimit: 5 * time.Minute, + SnapshotLimit: 256, + SnapshotWait: false, // Don't wait rebuild + } + newchain, err = NewBlockChain(snaptest.db, config, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil) + if err != nil { + t.Fatalf("Failed to recreate chain: %v", err) + } + // Simulate the blockchain crash. + + newchain, err = NewBlockChain(snaptest.db, nil, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil) + if err != nil { + t.Fatalf("Failed to recreate chain: %v", err) + } + snaptest.verify(t, newchain, blocks) +} + // Tests a Geth restart with valid snapshot. Before the shutdown, all snapshot // journal will be persisted correctly. In this case no snapshot recovery is // required. @@ -129,20 +483,21 @@ func TestRestartWithNewSnapshot(t *testing.T) { // Expected head fast block: C8 // Expected head block : C8 // Expected snapshot disk : G - testSnapshot(t, &snapshotTest{ - legacy: false, - crash: false, - gapped: 0, - setHead: 0, - chainBlocks: 8, - snapshotBlock: 0, - commitBlock: 0, - expCanonicalBlocks: 8, - expHeadHeader: 8, - expHeadFastBlock: 8, - expHeadBlock: 8, - expSnapshotBottom: 0, // Initial disk layer built from genesis - }) + test := &snapshotTest{ + snapshotTestBasic{ + legacy: false, + chainBlocks: 8, + snapshotBlock: 0, + commitBlock: 0, + expCanonicalBlocks: 8, + expHeadHeader: 8, + expHeadFastBlock: 8, + expHeadBlock: 8, + expSnapshotBottom: 0, // Initial disk layer built from genesis + }, + } + test.test(t) + test.teardown() } // Tests a Geth restart with valid but "legacy" snapshot. Before the shutdown, @@ -166,20 +521,22 @@ func TestRestartWithLegacySnapshot(t *testing.T) { // Expected head fast block: C8 // Expected head block : C8 // Expected snapshot disk : G - testSnapshot(t, &snapshotTest{ - legacy: true, - crash: false, - gapped: 0, - setHead: 0, - chainBlocks: 8, - snapshotBlock: 0, - commitBlock: 0, - expCanonicalBlocks: 8, - expHeadHeader: 8, - expHeadFastBlock: 8, - expHeadBlock: 8, - expSnapshotBottom: 0, // Initial disk layer built from genesis - }) + t.Skip("Legacy format testing is not supported") + test := &snapshotTest{ + snapshotTestBasic{ + legacy: true, + chainBlocks: 8, + snapshotBlock: 0, + commitBlock: 0, + expCanonicalBlocks: 8, + expHeadHeader: 8, + expHeadFastBlock: 8, + expHeadBlock: 8, + expSnapshotBottom: 0, // Initial disk layer built from genesis + }, + } + test.test(t) + test.teardown() } // Tests a Geth was crashed and restarts with a broken snapshot. In this case the @@ -205,20 +562,21 @@ func TestNoCommitCrashWithNewSnapshot(t *testing.T) { // Expected head fast block: C8 // Expected head block : G // Expected snapshot disk : C4 - testSnapshot(t, &snapshotTest{ - legacy: false, - crash: true, - gapped: 0, - setHead: 0, - chainBlocks: 8, - snapshotBlock: 4, - commitBlock: 0, - expCanonicalBlocks: 8, - expHeadHeader: 8, - expHeadFastBlock: 8, - expHeadBlock: 0, - expSnapshotBottom: 4, // Last committed disk layer, wait recovery - }) + test := &crashSnapshotTest{ + snapshotTestBasic{ + legacy: false, + chainBlocks: 8, + snapshotBlock: 4, + commitBlock: 0, + expCanonicalBlocks: 8, + expHeadHeader: 8, + expHeadFastBlock: 8, + expHeadBlock: 0, + expSnapshotBottom: 4, // Last committed disk layer, wait recovery + }, + } + test.test(t) + test.teardown() } // Tests a Geth was crashed and restarts with a broken snapshot. In this case the @@ -244,20 +602,21 @@ func TestLowCommitCrashWithNewSnapshot(t *testing.T) { // Expected head fast block: C8 // Expected head block : C2 // Expected snapshot disk : C4 - testSnapshot(t, &snapshotTest{ - legacy: false, - crash: true, - gapped: 0, - setHead: 0, - chainBlocks: 8, - snapshotBlock: 4, - commitBlock: 2, - expCanonicalBlocks: 8, - expHeadHeader: 8, - expHeadFastBlock: 8, - expHeadBlock: 2, - expSnapshotBottom: 4, // Last committed disk layer, wait recovery - }) + test := &crashSnapshotTest{ + snapshotTestBasic{ + legacy: false, + chainBlocks: 8, + snapshotBlock: 4, + commitBlock: 2, + expCanonicalBlocks: 8, + expHeadHeader: 8, + expHeadFastBlock: 8, + expHeadBlock: 2, + expSnapshotBottom: 4, // Last committed disk layer, wait recovery + }, + } + test.test(t) + test.teardown() } // Tests a Geth was crashed and restarts with a broken snapshot. In this case @@ -283,20 +642,21 @@ func TestHighCommitCrashWithNewSnapshot(t *testing.T) { // Expected head fast block: C8 // Expected head block : G // Expected snapshot disk : C4 - testSnapshot(t, &snapshotTest{ - legacy: false, - crash: true, - gapped: 0, - setHead: 0, - chainBlocks: 8, - snapshotBlock: 4, - commitBlock: 6, - expCanonicalBlocks: 8, - expHeadHeader: 8, - expHeadFastBlock: 8, - expHeadBlock: 0, - expSnapshotBottom: 4, // Last committed disk layer, wait recovery - }) + test := &crashSnapshotTest{ + snapshotTestBasic{ + legacy: false, + chainBlocks: 8, + snapshotBlock: 4, + commitBlock: 6, + expCanonicalBlocks: 8, + expHeadHeader: 8, + expHeadFastBlock: 8, + expHeadBlock: 0, + expSnapshotBottom: 4, // Last committed disk layer, wait recovery + }, + } + test.test(t) + test.teardown() } // Tests a Geth was crashed and restarts with a broken and "legacy format" @@ -321,20 +681,22 @@ func TestNoCommitCrashWithLegacySnapshot(t *testing.T) { // Expected head fast block: C8 // Expected head block : G // Expected snapshot disk : G - testSnapshot(t, &snapshotTest{ - legacy: true, - crash: true, - gapped: 0, - setHead: 0, - chainBlocks: 8, - snapshotBlock: 4, - commitBlock: 0, - expCanonicalBlocks: 8, - expHeadHeader: 8, - expHeadFastBlock: 8, - expHeadBlock: 0, - expSnapshotBottom: 0, // Rebuilt snapshot from the latest HEAD(genesis) - }) + t.Skip("Legacy format testing is not supported") + test := &crashSnapshotTest{ + snapshotTestBasic{ + legacy: true, + chainBlocks: 8, + snapshotBlock: 4, + commitBlock: 0, + expCanonicalBlocks: 8, + expHeadHeader: 8, + expHeadFastBlock: 8, + expHeadBlock: 0, + expSnapshotBottom: 0, // Rebuilt snapshot from the latest HEAD(genesis) + }, + } + test.test(t) + test.teardown() } // Tests a Geth was crashed and restarts with a broken and "legacy format" @@ -359,20 +721,22 @@ func TestLowCommitCrashWithLegacySnapshot(t *testing.T) { // Expected head fast block: C8 // Expected head block : C2 // Expected snapshot disk : C2 - testSnapshot(t, &snapshotTest{ - legacy: true, - crash: true, - gapped: 0, - setHead: 0, - chainBlocks: 8, - snapshotBlock: 4, - commitBlock: 2, - expCanonicalBlocks: 8, - expHeadHeader: 8, - expHeadFastBlock: 8, - expHeadBlock: 2, - expSnapshotBottom: 2, // Rebuilt snapshot from the latest HEAD - }) + t.Skip("Legacy format testing is not supported") + test := &crashSnapshotTest{ + snapshotTestBasic{ + legacy: true, + chainBlocks: 8, + snapshotBlock: 4, + commitBlock: 2, + expCanonicalBlocks: 8, + expHeadHeader: 8, + expHeadFastBlock: 8, + expHeadBlock: 2, + expSnapshotBottom: 2, // Rebuilt snapshot from the latest HEAD + }, + } + test.test(t) + test.teardown() } // Tests a Geth was crashed and restarts with a broken and "legacy format" @@ -402,20 +766,22 @@ func TestHighCommitCrashWithLegacySnapshot(t *testing.T) { // Expected head fast block: C8 // Expected head block : G // Expected snapshot disk : G - testSnapshot(t, &snapshotTest{ - legacy: true, - crash: true, - gapped: 0, - setHead: 0, - chainBlocks: 8, - snapshotBlock: 4, - commitBlock: 6, - expCanonicalBlocks: 8, - expHeadHeader: 8, - expHeadFastBlock: 8, - expHeadBlock: 0, - expSnapshotBottom: 0, // Rebuilt snapshot from the latest HEAD(genesis) - }) + t.Skip("Legacy format testing is not supported") + test := &crashSnapshotTest{ + snapshotTestBasic{ + legacy: true, + chainBlocks: 8, + snapshotBlock: 4, + commitBlock: 6, + expCanonicalBlocks: 8, + expHeadHeader: 8, + expHeadFastBlock: 8, + expHeadBlock: 0, + expSnapshotBottom: 0, // Rebuilt snapshot from the latest HEAD(genesis) + }, + } + test.test(t) + test.teardown() } // Tests a Geth was running with snapshot enabled. Then restarts without @@ -439,20 +805,22 @@ func TestGappedNewSnapshot(t *testing.T) { // Expected head fast block: C10 // Expected head block : C10 // Expected snapshot disk : C10 - testSnapshot(t, &snapshotTest{ - legacy: false, - crash: false, - gapped: 2, - setHead: 0, - chainBlocks: 8, - snapshotBlock: 0, - commitBlock: 0, - expCanonicalBlocks: 10, - expHeadHeader: 10, - expHeadFastBlock: 10, - expHeadBlock: 10, - expSnapshotBottom: 10, // Rebuilt snapshot from the latest HEAD - }) + test := &gappedSnapshotTest{ + snapshotTestBasic: snapshotTestBasic{ + legacy: false, + chainBlocks: 8, + snapshotBlock: 0, + commitBlock: 0, + expCanonicalBlocks: 10, + expHeadHeader: 10, + expHeadFastBlock: 10, + expHeadBlock: 10, + expSnapshotBottom: 10, // Rebuilt snapshot from the latest HEAD + }, + gapped: 2, + } + test.test(t) + test.teardown() } // Tests a Geth was running with leagcy snapshot enabled. Then restarts @@ -476,20 +844,23 @@ func TestGappedLegacySnapshot(t *testing.T) { // Expected head fast block: C10 // Expected head block : C10 // Expected snapshot disk : C10 - testSnapshot(t, &snapshotTest{ - legacy: true, - crash: false, - gapped: 2, - setHead: 0, - chainBlocks: 8, - snapshotBlock: 0, - commitBlock: 0, - expCanonicalBlocks: 10, - expHeadHeader: 10, - expHeadFastBlock: 10, - expHeadBlock: 10, - expSnapshotBottom: 10, // Rebuilt snapshot from the latest HEAD - }) + t.Skip("Legacy format testing is not supported") + test := &gappedSnapshotTest{ + snapshotTestBasic: snapshotTestBasic{ + legacy: true, + chainBlocks: 8, + snapshotBlock: 0, + commitBlock: 0, + expCanonicalBlocks: 10, + expHeadHeader: 10, + expHeadFastBlock: 10, + expHeadBlock: 10, + expSnapshotBottom: 10, // Rebuilt snapshot from the latest HEAD + }, + gapped: 2, + } + test.test(t) + test.teardown() } // Tests the Geth was running with snapshot enabled and resetHead is applied. @@ -513,20 +884,22 @@ func TestSetHeadWithNewSnapshot(t *testing.T) { // Expected head fast block: C4 // Expected head block : C4 // Expected snapshot disk : G - testSnapshot(t, &snapshotTest{ - legacy: false, - crash: false, - gapped: 0, - setHead: 4, - chainBlocks: 8, - snapshotBlock: 0, - commitBlock: 0, - expCanonicalBlocks: 4, - expHeadHeader: 4, - expHeadFastBlock: 4, - expHeadBlock: 4, - expSnapshotBottom: 0, // The initial disk layer is built from the genesis - }) + test := &setHeadSnapshotTest{ + snapshotTestBasic: snapshotTestBasic{ + legacy: false, + chainBlocks: 8, + snapshotBlock: 0, + commitBlock: 0, + expCanonicalBlocks: 4, + expHeadHeader: 4, + expHeadFastBlock: 4, + expHeadBlock: 4, + expSnapshotBottom: 0, // The initial disk layer is built from the genesis + }, + setHead: 4, + } + test.test(t) + test.teardown() } // Tests the Geth was running with snapshot(legacy-format) enabled and resetHead @@ -550,20 +923,23 @@ func TestSetHeadWithLegacySnapshot(t *testing.T) { // Expected head fast block: C4 // Expected head block : C4 // Expected snapshot disk : G - testSnapshot(t, &snapshotTest{ - legacy: true, - crash: false, - gapped: 0, - setHead: 4, - chainBlocks: 8, - snapshotBlock: 0, - commitBlock: 0, - expCanonicalBlocks: 4, - expHeadHeader: 4, - expHeadFastBlock: 4, - expHeadBlock: 4, - expSnapshotBottom: 0, // The initial disk layer is built from the genesis - }) + t.Skip("Legacy format testing is not supported") + test := &setHeadSnapshotTest{ + snapshotTestBasic: snapshotTestBasic{ + legacy: true, + chainBlocks: 8, + snapshotBlock: 0, + commitBlock: 0, + expCanonicalBlocks: 4, + expHeadHeader: 4, + expHeadFastBlock: 4, + expHeadBlock: 4, + expSnapshotBottom: 0, // The initial disk layer is built from the genesis + }, + setHead: 4, + } + test.test(t) + test.teardown() } // Tests the Geth was running with snapshot(legacy-format) enabled and upgrades @@ -589,209 +965,60 @@ func TestRecoverSnapshotFromCrashWithLegacyDiffJournal(t *testing.T) { // Expected head fast block: C10 // Expected head block : C8 // Expected snapshot disk : C10 - testSnapshot(t, &snapshotTest{ - legacy: true, - crash: false, - restartCrash: 2, - gapped: 0, - setHead: 0, - chainBlocks: 8, - snapshotBlock: 0, - commitBlock: 0, - expCanonicalBlocks: 10, - expHeadHeader: 10, - expHeadFastBlock: 10, - expHeadBlock: 8, // The persisted state in the first running - expSnapshotBottom: 10, // The persisted disk layer in the second running - }) + t.Skip("Legacy format testing is not supported") + test := &restartCrashSnapshotTest{ + snapshotTestBasic: snapshotTestBasic{ + legacy: true, + chainBlocks: 8, + snapshotBlock: 0, + commitBlock: 0, + expCanonicalBlocks: 10, + expHeadHeader: 10, + expHeadFastBlock: 10, + expHeadBlock: 8, // The persisted state in the first running + expSnapshotBottom: 10, // The persisted disk layer in the second running + }, + newBlocks: 2, + } + test.test(t) + test.teardown() } -func testSnapshot(t *testing.T, tt *snapshotTest) { - // It's hard to follow the test case, visualize the input - // log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) - // fmt.Println(tt.dump()) - - // Create a temporary persistent database - datadir, err := ioutil.TempDir("", "") - if err != nil { - t.Fatalf("Failed to create temporary datadir: %v", err) - } - os.RemoveAll(datadir) - - db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "") - if err != nil { - t.Fatalf("Failed to create persistent database: %v", err) - } - defer db.Close() // Might double close, should be fine - - // Initialize a fresh chain - var ( - genesis = new(Genesis).MustCommit(db) - engine = ethash.NewFullFaker() - gendb = rawdb.NewMemoryDatabase() - - // Snapshot is enabled, the first snapshot is created from the Genesis. - // The snapshot memory allowance is 256MB, it means no snapshot flush - // will happen during the block insertion. - cacheConfig = defaultCacheConfig - ) - chain, err := NewBlockChain(db, cacheConfig, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil) - if err != nil { - t.Fatalf("Failed to create chain: %v", err) - } - blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, gendb, tt.chainBlocks, func(i int, b *BlockGen) {}) - - // Insert the blocks with configured settings. - var breakpoints []uint64 - if tt.commitBlock > tt.snapshotBlock { - breakpoints = append(breakpoints, tt.snapshotBlock, tt.commitBlock) - } else { - breakpoints = append(breakpoints, tt.commitBlock, tt.snapshotBlock) - } - var startPoint uint64 - for _, point := range breakpoints { - if _, err := chain.InsertChain(blocks[startPoint:point]); err != nil { - t.Fatalf("Failed to import canonical chain start: %v", err) - } - startPoint = point - - if tt.commitBlock > 0 && tt.commitBlock == point { - chain.stateCache.TrieDB().Commit(blocks[point-1].Root(), true, nil) - } - if tt.snapshotBlock > 0 && tt.snapshotBlock == point { - if tt.legacy { - // Here we commit the snapshot disk root to simulate - // committing the legacy snapshot. - rawdb.WriteSnapshotRoot(db, blocks[point-1].Root()) - } else { - chain.snaps.Cap(blocks[point-1].Root(), 0) - diskRoot, blockRoot := chain.snaps.DiskRoot(), blocks[point-1].Root() - if !bytes.Equal(diskRoot.Bytes(), blockRoot.Bytes()) { - t.Fatalf("Failed to flush disk layer change, want %x, got %x", blockRoot, diskRoot) - } - } - } - } - if _, err := chain.InsertChain(blocks[startPoint:]); err != nil { - t.Fatalf("Failed to import canonical chain tail: %v", err) - } - // Set the flag for writing legacy journal if necessary - if tt.legacy { - chain.writeLegacyJournal = true - } - // Pull the plug on the database, simulating a hard crash - if tt.crash { - db.Close() - - // Start a new blockchain back up and see where the repair leads us - db, err = rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "") - if err != nil { - t.Fatalf("Failed to reopen persistent database: %v", err) - } - defer db.Close() - - // The interesting thing is: instead of start the blockchain after - // the crash, we do restart twice here: one after the crash and one - // after the normal stop. It's used to ensure the broken snapshot - // can be detected all the time. - chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil) - if err != nil { - t.Fatalf("Failed to recreate chain: %v", err) - } - chain.Stop() - - chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil) - if err != nil { - t.Fatalf("Failed to recreate chain: %v", err) - } - defer chain.Stop() - } else if tt.gapped > 0 { - // Insert blocks without enabling snapshot if gapping is required. - chain.Stop() - gappedBlocks, _ := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], engine, gendb, tt.gapped, func(i int, b *BlockGen) {}) - - // Insert a few more blocks without enabling snapshot - var cacheConfig = &CacheConfig{ - TrieCleanLimit: 256, - TrieDirtyLimit: 256, - TrieTimeLimit: 5 * time.Minute, - SnapshotLimit: 0, - } - chain, err = NewBlockChain(db, cacheConfig, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil) - if err != nil { - t.Fatalf("Failed to recreate chain: %v", err) - } - chain.InsertChain(gappedBlocks) - chain.Stop() - - chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil) - if err != nil { - t.Fatalf("Failed to recreate chain: %v", err) - } - defer chain.Stop() - } else if tt.setHead != 0 { - // Rewind the chain if setHead operation is required. - chain.SetHead(tt.setHead) - chain.Stop() - - chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil) - if err != nil { - t.Fatalf("Failed to recreate chain: %v", err) - } - defer chain.Stop() - } else if tt.restartCrash != 0 { - // Firstly, stop the chain properly, with all snapshot journal - // and state committed. - chain.Stop() - - // Restart chain, forcibly flush the disk layer journal with new format - newBlocks, _ := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], engine, gendb, tt.restartCrash, func(i int, b *BlockGen) {}) - chain, err = NewBlockChain(db, cacheConfig, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil) - if err != nil { - t.Fatalf("Failed to recreate chain: %v", err) - } - chain.InsertChain(newBlocks) - chain.Snapshots().Cap(newBlocks[len(newBlocks)-1].Root(), 0) - - // Simulate the blockchain crash - // Don't call chain.Stop here, so that no snapshot - // journal and latest state will be committed - - // Restart the chain after the crash - chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil) - if err != nil { - t.Fatalf("Failed to recreate chain: %v", err) - } - defer chain.Stop() - } else { - chain.Stop() - - // Restart the chain normally - chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil) - if err != nil { - t.Fatalf("Failed to recreate chain: %v", err) - } - defer chain.Stop() - } - - // Iterate over all the remaining blocks and ensure there are no gaps - verifyNoGaps(t, chain, true, blocks) - verifyCutoff(t, chain, true, blocks, tt.expCanonicalBlocks) - - if head := chain.CurrentHeader(); head.Number.Uint64() != tt.expHeadHeader { - t.Errorf("Head header mismatch: have %d, want %d", head.Number, tt.expHeadHeader) - } - if head := chain.CurrentFastBlock(); head.NumberU64() != tt.expHeadFastBlock { - t.Errorf("Head fast block mismatch: have %d, want %d", head.NumberU64(), tt.expHeadFastBlock) - } - if head := chain.CurrentBlock(); head.NumberU64() != tt.expHeadBlock { - t.Errorf("Head block mismatch: have %d, want %d", head.NumberU64(), tt.expHeadBlock) - } - // Check the disk layer, ensure they are matched - block := chain.GetBlockByNumber(tt.expSnapshotBottom) - if block == nil { - t.Errorf("The correspnding block[%d] of snapshot disk layer is missing", tt.expSnapshotBottom) - } else if !bytes.Equal(chain.snaps.DiskRoot().Bytes(), block.Root().Bytes()) { - t.Errorf("The snapshot disk layer root is incorrect, want %x, get %x", block.Root(), chain.snaps.DiskRoot()) +// Tests the Geth was running with a complete snapshot and then imports a few +// more new blocks on top without enabling the snapshot. After the restart, +// crash happens. Check everything is ok after the restart. +func TestRecoverSnapshotFromWipingCrash(t *testing.T) { + // Chain: + // G->C1->C2->C3->C4->C5->C6->C7->C8 (HEAD) + // + // Commit: G + // Snapshot: G + // + // SetHead(0) + // + // ------------------------------ + // + // Expected in leveldb: + // G->C1->C2->C3->C4->C5->C6->C7->C8->C9->C10 + // + // Expected head header : C10 + // Expected head fast block: C10 + // Expected head block : C8 + // Expected snapshot disk : C10 + test := &wipeCrashSnapshotTest{ + snapshotTestBasic: snapshotTestBasic{ + legacy: false, + chainBlocks: 8, + snapshotBlock: 4, + commitBlock: 0, + expCanonicalBlocks: 10, + expHeadHeader: 10, + expHeadFastBlock: 10, + expHeadBlock: 10, + expSnapshotBottom: 10, + }, + newBlocks: 2, } + test.test(t) + test.teardown() } diff --git a/core/state/snapshot/generate.go b/core/state/snapshot/generate.go index fcc6b44cb..2b41dd551 100644 --- a/core/state/snapshot/generate.go +++ b/core/state/snapshot/generate.go @@ -101,18 +101,26 @@ func generateSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache i wiper = wipeSnapshot(diskdb, true) } // Create a new disk layer with an initialized state marker at zero - rawdb.WriteSnapshotRoot(diskdb, root) - + var ( + stats = &generatorStats{wiping: wiper, start: time.Now()} + batch = diskdb.NewBatch() + genMarker = []byte{} // Initialized but empty! + ) + rawdb.WriteSnapshotRoot(batch, root) + journalProgress(batch, genMarker, stats) + if err := batch.Write(); err != nil { + log.Crit("Failed to write initialized state marker", "error", err) + } base := &diskLayer{ diskdb: diskdb, triedb: triedb, root: root, cache: fastcache.New(cache * 1024 * 1024), - genMarker: []byte{}, // Initialized but empty! + genMarker: genMarker, genPending: make(chan struct{}), genAbort: make(chan chan *generatorStats), } - go base.generate(&generatorStats{wiping: wiper, start: time.Now()}) + go base.generate(stats) log.Debug("Start snapshot generation", "root", root) return base } @@ -137,10 +145,12 @@ func journalProgress(db ethdb.KeyValueWriter, marker []byte, stats *generatorSta panic(err) // Cannot happen, here to catch dev errors } var logstr string - switch len(marker) { - case 0: + switch { + case marker == nil: logstr = "done" - case common.HashLength: + case bytes.Equal(marker, []byte{}): + logstr = "empty" + case len(marker) == common.HashLength: logstr = fmt.Sprintf("%#x", marker) default: logstr = fmt.Sprintf("%#x:%#x", marker[:common.HashLength], marker[common.HashLength:]) @@ -307,13 +317,12 @@ func (dl *diskLayer) generate(stats *generatorStats) { abort <- stats return } - // Snapshot fully generated, set the marker to nil - if batch.ValueSize() > 0 { - // Ensure the generator entry is in sync with the data - journalProgress(batch, nil, stats) + // Snapshot fully generated, set the marker to nil. + // Note even there is nothing to commit, persist the + // generator anyway to mark the snapshot is complete. + journalProgress(batch, nil, stats) + batch.Write() - batch.Write() - } log.Info("Generated state snapshot", "accounts", stats.accounts, "slots", stats.slots, "storage", stats.storage, "elapsed", common.PrettyDuration(time.Since(stats.start))) diff --git a/core/state/snapshot/journal.go b/core/state/snapshot/journal.go index 178ba0890..d7e454cce 100644 --- a/core/state/snapshot/journal.go +++ b/core/state/snapshot/journal.go @@ -441,6 +441,6 @@ func (dl *diffLayer) LegacyJournal(buffer *bytes.Buffer) (common.Hash, error) { if err := rlp.Encode(buffer, storage); err != nil { return common.Hash{}, err } - log.Debug("Legacy journalled disk layer", "root", dl.root, "parent", dl.parent.Root()) + log.Debug("Legacy journalled diff layer", "root", dl.root, "parent", dl.parent.Root()) return base, nil }