// stm: #integration package itests import ( "bytes" "context" "fmt" "testing" "time" "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/builtin" miner8 "github.com/filecoin-project/go-state-types/builtin/v8/miner" "github.com/filecoin-project/go-state-types/exitcode" miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner" power6 "github.com/filecoin-project/specs-actors/v6/actors/builtin/power" "github.com/filecoin-project/lotus/api" lapi "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/blockstore/splitstore" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors/builtin/power" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/itests/kit" ) // Startup a node with hotstore and discard coldstore. Compact once and return func TestHotstoreCompactsOnce(t *testing.T) { ctx := context.Background() // disable sync checking because efficient itests require that the node is out of sync : / splitstore.CheckSyncGap = false opts := []interface{}{kit.MockProofs(), kit.SplitstoreDiscard()} full, genesisMiner, ens := kit.EnsembleMinimal(t, opts...) bm := ens.InterconnectAll().BeginMining(4 * time.Millisecond)[0] _ = full _ = genesisMiner _ = bm waitForCompaction(ctx, t, 1, full) require.NoError(t, genesisMiner.Stop(ctx)) } // create some unreachable state // and check that compaction carries it away func TestHotstoreCompactCleansGarbage(t *testing.T) { ctx := context.Background() // disable sync checking because efficient itests require that the node is out of sync : / splitstore.CheckSyncGap = false opts := []interface{}{kit.MockProofs(), kit.SplitstoreDiscard()} full, genesisMiner, ens := kit.EnsembleMinimal(t, opts...) bm := ens.InterconnectAll().BeginMining(4 * time.Millisecond)[0] _ = full _ = genesisMiner // create garbage g := NewGarbager(ctx, t, full) // state garbageS, eS := g.Drop(ctx) // message garbageM, eM := g.Message(ctx) e := eM if eS > eM { e = eS } assert.True(g.t, g.Exists(ctx, garbageS), "Garbage state not found in splitstore") assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message not found in splitstore") // calculate next compaction where we should actually see cleanup // pause, check for compacting and get compaction info // we do this to remove the (very unlikely) race where compaction index // and compaction epoch are in the middle of update, or a whole compaction // runs between the two for { bm.Pause() if splitStoreCompacting(ctx, t, full) { bm.Restart() time.Sleep(3 * time.Second) } else { break } } lastCompactionEpoch := splitStoreBaseEpoch(ctx, t, full) garbageCompactionIndex := splitStoreCompactionIndex(ctx, t, full) + 1 boundary := lastCompactionEpoch + splitstore.CompactionThreshold - splitstore.CompactionBoundary for e > boundary { boundary += splitstore.CompactionThreshold - splitstore.CompactionBoundary garbageCompactionIndex++ } bm.Restart() // wait for compaction to occur waitForCompaction(ctx, t, garbageCompactionIndex, full) // check that garbage is cleaned up assert.False(t, g.Exists(ctx, garbageS), "Garbage state still exists in blockstore") assert.False(t, g.Exists(ctx, garbageM), "Garbage message still exists in blockstore") } // Create unreachable state // Check that it moves to coldstore // Prune coldstore and check that it is deleted func TestColdStorePrune(t *testing.T) { ctx := context.Background() // disable sync checking because efficient itests require that the node is out of sync : / splitstore.CheckSyncGap = false opts := []interface{}{kit.MockProofs(), kit.SplitstoreUniversal(), kit.FsRepo()} full, genesisMiner, ens := kit.EnsembleMinimal(t, opts...) bm := ens.InterconnectAll().BeginMining(4 * time.Millisecond)[0] _ = full _ = genesisMiner // create garbage g := NewGarbager(ctx, t, full) // state garbageS, eS := g.Drop(ctx) // message garbageM, eM := g.Message(ctx) e := eM if eS > eM { e = eS } assert.True(g.t, g.Exists(ctx, garbageS), "Garbage state not found in splitstore") assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message not found in splitstore") // calculate next compaction where we should actually see cleanup // pause, check for compacting and get compaction info // we do this to remove the (very unlikely) race where compaction index // and compaction epoch are in the middle of update, or a whole compaction // runs between the two for { bm.Pause() if splitStoreCompacting(ctx, t, full) { bm.Restart() time.Sleep(3 * time.Second) } else { break } } lastCompactionEpoch := splitStoreBaseEpoch(ctx, t, full) garbageCompactionIndex := splitStoreCompactionIndex(ctx, t, full) + 1 boundary := lastCompactionEpoch + splitstore.CompactionThreshold - splitstore.CompactionBoundary for e > boundary { boundary += splitstore.CompactionThreshold - splitstore.CompactionBoundary garbageCompactionIndex++ } bm.Restart() // wait for compaction to occur waitForCompaction(ctx, t, garbageCompactionIndex, full) bm.Pause() // This data should now be moved to the coldstore. // Access it without hotview to keep it there while checking that it still exists // Only state compute uses hot view so garbager Exists backed by ChainReadObj is all good assert.True(g.t, g.Exists(ctx, garbageS), "Garbage state not found in splitstore") assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message not found in splitstore") bm.Restart() // wait for compaction to finsih and pause to make sure it doesn't start to avoid racing for { bm.Pause() if splitStoreCompacting(ctx, t, full) { bm.Restart() time.Sleep(1 * time.Second) } else { break } } pruneOpts := api.PruneOpts{RetainState: int64(0), MovingGC: false} require.NoError(t, full.ChainPrune(ctx, pruneOpts)) bm.Restart() waitForPrune(ctx, t, 1, full) assert.False(g.t, g.Exists(ctx, garbageS), "Garbage state should be removed from cold store after prune but it's still there") assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message should be on the cold store after prune") } func TestMessagesMode(t *testing.T) { ctx := context.Background() // disable sync checking because efficient itests require that the node is out of sync : / splitstore.CheckSyncGap = false opts := []interface{}{kit.MockProofs(), kit.SplitstoreMessges(), kit.FsRepo()} full, genesisMiner, ens := kit.EnsembleMinimal(t, opts...) bm := ens.InterconnectAll().BeginMining(4 * time.Millisecond)[0] _ = full _ = genesisMiner // create garbage g := NewGarbager(ctx, t, full) // state garbageS, eS := g.Drop(ctx) // message garbageM, eM := g.Message(ctx) e := eM if eS > eM { e = eS } assert.True(g.t, g.Exists(ctx, garbageS), "Garbage state not found in splitstore") assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message not found in splitstore") // calculate next compaction where we should actually see cleanup // pause, check for compacting and get compaction info // we do this to remove the (very unlikely) race where compaction index // and compaction epoch are in the middle of update, or a whole compaction // runs between the two for { bm.Pause() if splitStoreCompacting(ctx, t, full) { bm.Restart() time.Sleep(3 * time.Second) } else { break } } lastCompactionEpoch := splitStoreBaseEpoch(ctx, t, full) garbageCompactionIndex := splitStoreCompactionIndex(ctx, t, full) + 1 boundary := lastCompactionEpoch + splitstore.CompactionThreshold - splitstore.CompactionBoundary for e > boundary { boundary += splitstore.CompactionThreshold - splitstore.CompactionBoundary garbageCompactionIndex++ } bm.Restart() // wait for compaction to occur waitForCompaction(ctx, t, garbageCompactionIndex, full) bm.Pause() // Messages should be moved to the coldstore // State should be gced // Access it without hotview to keep it there while checking that it still exists // Only state compute uses hot view so garbager Exists backed by ChainReadObj is all good assert.False(g.t, g.Exists(ctx, garbageS), "Garbage state not found in splitstore") assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message not found in splitstore") } func waitForCompaction(ctx context.Context, t *testing.T, cIdx int64, n *kit.TestFullNode) { for { if splitStoreCompactionIndex(ctx, t, n) >= cIdx { break } time.Sleep(1 * time.Second) } } func waitForPrune(ctx context.Context, t *testing.T, pIdx int64, n *kit.TestFullNode) { for { if splitStorePruneIndex(ctx, t, n) >= pIdx { break } time.Sleep(1 * time.Second) } } func splitStoreCompacting(ctx context.Context, t *testing.T, n *kit.TestFullNode) bool { info, err := n.ChainBlockstoreInfo(ctx) require.NoError(t, err) compactingRaw, ok := info["compacting"] require.True(t, ok, "compactions not on blockstore info") compacting, ok := compactingRaw.(bool) require.True(t, ok, "compacting key on blockstore info wrong type") return compacting } func splitStoreBaseEpoch(ctx context.Context, t *testing.T, n *kit.TestFullNode) abi.ChainEpoch { info, err := n.ChainBlockstoreInfo(ctx) require.NoError(t, err) baseRaw, ok := info["base epoch"] require.True(t, ok, "'base epoch' not on blockstore info") base, ok := baseRaw.(abi.ChainEpoch) require.True(t, ok, "base epoch key on blockstore info wrong type") return base } func splitStoreCompactionIndex(ctx context.Context, t *testing.T, n *kit.TestFullNode) int64 { info, err := n.ChainBlockstoreInfo(ctx) require.NoError(t, err) compact, ok := info["compactions"] require.True(t, ok, "compactions not on blockstore info") compactionIndex, ok := compact.(int64) require.True(t, ok, "compaction key on blockstore info wrong type") return compactionIndex } func splitStorePruneIndex(ctx context.Context, t *testing.T, n *kit.TestFullNode) int64 { info, err := n.ChainBlockstoreInfo(ctx) require.NoError(t, err) prune, ok := info["prunes"] require.True(t, ok, "prunes not on blockstore info") pruneIndex, ok := prune.(int64) require.True(t, ok, "prune key on blockstore info wrong type") return pruneIndex } // Create on chain unreachable garbage for a network to exercise splitstore // one garbage cid created at a time // // It works by rewriting an internally maintained miner actor's peer ID type Garbager struct { t *testing.T node *kit.TestFullNode latest trashID // internal tracking maddr4Data address.Address } type trashID uint8 func NewGarbager(ctx context.Context, t *testing.T, n *kit.TestFullNode) *Garbager { // create miner actor for writing garbage g := &Garbager{ t: t, node: n, latest: 0, maddr4Data: address.Undef, } g.createMiner4Data(ctx) g.newPeerID(ctx) return g } // drop returns the cid referencing the dropped garbage and the chain epoch of the drop func (g *Garbager) Drop(ctx context.Context) (cid.Cid, abi.ChainEpoch) { // record existing with mInfoCidAtEpoch c := g.mInfoCid(ctx) // update trashID and create newPeerID, dropping miner info cid c in the process // wait for message and return the chain height that the drop occurred at g.latest++ return c, g.newPeerID(ctx) } // message returns the cid referencing a message and the chain epoch it went on chain func (g *Garbager) Message(ctx context.Context) (cid.Cid, abi.ChainEpoch) { mw := g.createMiner(ctx) return mw.Message, mw.Height } // exists checks whether the cid is reachable through the node func (g *Garbager) Exists(ctx context.Context, c cid.Cid) bool { // check chain get / blockstore get _, err := g.node.ChainReadObj(ctx, c) if ipld.IsNotFound(err) { return false } else if err != nil { g.t.Fatalf("ChainReadObj failure on existence check: %s", err) } else { return true } g.t.Fatal("unreachable") return false } func (g *Garbager) newPeerID(ctx context.Context) abi.ChainEpoch { dataStr := fmt.Sprintf("Garbager-Data-%d", g.latest) dataID := []byte(dataStr) params, err := actors.SerializeParams(&miner2.ChangePeerIDParams{NewID: dataID}) require.NoError(g.t, err) msg := &types.Message{ To: g.maddr4Data, From: g.node.DefaultKey.Address, Method: builtin.MethodsMiner.ChangePeerID, Params: params, Value: types.NewInt(0), } signed, err2 := g.node.MpoolPushMessage(ctx, msg, nil) require.NoError(g.t, err2) mw, err2 := g.node.StateWaitMsg(ctx, signed.Cid(), build.MessageConfidence, api.LookbackNoLimit, true) require.NoError(g.t, err2) require.Equal(g.t, exitcode.Ok, mw.Receipt.ExitCode) return mw.Height } func (g *Garbager) mInfoCid(ctx context.Context) cid.Cid { ts, err := g.node.ChainHead(ctx) require.NoError(g.t, err) act, err := g.node.StateGetActor(ctx, g.maddr4Data, ts.Key()) require.NoError(g.t, err) raw, err := g.node.ChainReadObj(ctx, act.Head) require.NoError(g.t, err) var mSt miner8.State require.NoError(g.t, mSt.UnmarshalCBOR(bytes.NewReader(raw))) // return infoCid return mSt.Info } func (g *Garbager) createMiner4Data(ctx context.Context) { require.True(g.t, g.maddr4Data == address.Undef, "garbager miner actor already created") mw := g.createMiner(ctx) var retval power6.CreateMinerReturn require.NoError(g.t, retval.UnmarshalCBOR(bytes.NewReader(mw.Receipt.Return))) g.maddr4Data = retval.IDAddress } func (g *Garbager) createMiner(ctx context.Context) *lapi.MsgLookup { owner, err := g.node.WalletDefaultAddress(ctx) require.NoError(g.t, err) worker := owner params, err := actors.SerializeParams(&power6.CreateMinerParams{ Owner: owner, Worker: worker, WindowPoStProofType: abi.RegisteredPoStProof_StackedDrgWindow32GiBV1, }) require.NoError(g.t, err) createStorageMinerMsg := &types.Message{ To: power.Address, From: worker, Value: big.Zero(), Method: power.Methods.CreateMiner, Params: params, } signed, err := g.node.MpoolPushMessage(ctx, createStorageMinerMsg, nil) require.NoError(g.t, err) mw, err := g.node.StateWaitMsg(ctx, signed.Cid(), build.MessageConfidence, lapi.LookbackNoLimit, true) require.NoError(g.t, err) require.True(g.t, mw.Receipt.ExitCode == 0, "garbager's internal create miner message failed") return mw }