lotus/itests/splitstore_test.go

409 lines
13 KiB
Go
Raw Normal View History

2022-08-29 14:25:30 +00:00
// stm: #integration
package itests
import (
"bytes"
"context"
"fmt"
"testing"
"time"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/builtin"
miner8 "github.com/filecoin-project/go-state-types/builtin/v8/miner"
"github.com/filecoin-project/go-state-types/exitcode"
miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner"
power6 "github.com/filecoin-project/specs-actors/v6/actors/builtin/power"
"github.com/filecoin-project/lotus/api"
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/blockstore/splitstore"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin/power"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
)
// Startup a node with hotstore and discard coldstore. Compact once and return
func TestHotstoreCompactsOnce(t *testing.T) {
ctx := context.Background()
// disable sync checking because efficient itests require that the node is out of sync : /
splitstore.CheckSyncGap = false
opts := []interface{}{kit.MockProofs(), kit.SplitstoreDiscard()}
full, genesisMiner, ens := kit.EnsembleMinimal(t, opts...)
bm := ens.InterconnectAll().BeginMining(4 * time.Millisecond)[0]
_ = full
_ = genesisMiner
_ = bm
waitForCompaction(ctx, t, 1, full)
require.NoError(t, genesisMiner.Stop(ctx))
}
// create some unreachable state
// and check that compaction carries it away
func TestHotstoreCompactCleansGarbage(t *testing.T) {
ctx := context.Background()
// disable sync checking because efficient itests require that the node is out of sync : /
splitstore.CheckSyncGap = false
opts := []interface{}{kit.MockProofs(), kit.SplitstoreDiscard()}
full, genesisMiner, ens := kit.EnsembleMinimal(t, opts...)
bm := ens.InterconnectAll().BeginMining(4 * time.Millisecond)[0]
_ = full
_ = genesisMiner
// create garbage
g := NewGarbager(ctx, t, full)
garbage, e := g.Drop(ctx)
// calculate next compaction where we should actually see cleanup
// pause, check for compacting and get compaction info
// we do this to remove the (very unlikely) race where compaction index
// and compaction epoch are in the middle of update, or a whole compaction
// runs between the two
for {
bm.Pause()
if splitStoreCompacting(ctx, t, full) {
bm.Restart()
time.Sleep(3 * time.Second)
} else {
break
}
}
lastCompactionEpoch := splitStoreBaseEpoch(ctx, t, full)
garbageCompactionIndex := splitStoreCompactionIndex(ctx, t, full) + 1
boundary := lastCompactionEpoch + splitstore.CompactionThreshold - splitstore.CompactionBoundary
for e > boundary {
boundary += splitstore.CompactionThreshold - splitstore.CompactionBoundary
garbageCompactionIndex++
}
bm.Restart()
// wait for compaction to occur
waitForCompaction(ctx, t, garbageCompactionIndex, full)
// check that garbage is cleaned up
assert.False(t, g.Exists(ctx, garbage), "Garbage still exists in blockstore")
}
// Create unreachable state
// Check that it moves to coldstore
// Prune coldstore and check that it is deleted
func TestColdStorePrune(t *testing.T) {
ctx := context.Background()
// disable sync checking because efficient itests require that the node is out of sync : /
splitstore.CheckSyncGap = false
opts := []interface{}{kit.MockProofs(), kit.SplitstoreUniversal(), kit.FsRepo()}
full, genesisMiner, ens := kit.EnsembleMinimal(t, opts...)
bm := ens.InterconnectAll().BeginMining(4 * time.Millisecond)[0]
_ = full
_ = genesisMiner
// create garbage
g := NewGarbager(ctx, t, full)
garbage, e := g.Drop(ctx)
assert.True(g.t, g.Exists(ctx, garbage), "Garbage not found in splitstore")
// calculate next compaction where we should actually see cleanup
// pause, check for compacting and get compaction info
// we do this to remove the (very unlikely) race where compaction index
// and compaction epoch are in the middle of update, or a whole compaction
// runs between the two
for {
bm.Pause()
if splitStoreCompacting(ctx, t, full) {
bm.Restart()
time.Sleep(3 * time.Second)
} else {
break
}
}
lastCompactionEpoch := splitStoreBaseEpoch(ctx, t, full)
garbageCompactionIndex := splitStoreCompactionIndex(ctx, t, full) + 1
boundary := lastCompactionEpoch + splitstore.CompactionThreshold - splitstore.CompactionBoundary
for e > boundary {
boundary += splitstore.CompactionThreshold - splitstore.CompactionBoundary
garbageCompactionIndex++
}
bm.Restart()
// wait for compaction to occur
waitForCompaction(ctx, t, garbageCompactionIndex, full)
bm.Pause()
// This data should now be moved to the coldstore.
// Access it without hotview to keep it there while checking that it still exists
// Only state compute uses hot view so garbager Exists backed by ChainReadObj is all good
assert.True(g.t, g.Exists(ctx, garbage), "Garbage not found in splitstore")
bm.Restart()
// wait for compaction to finsih and pause to make sure it doesn't start to avoid racing
for {
bm.Pause()
if splitStoreCompacting(ctx, t, full) {
bm.Restart()
time.Sleep(1 * time.Second)
} else {
break
}
}
pruneOpts := api.PruneOpts{RetainState: int64(0), MovingGC: false}
require.NoError(t, full.ChainPrune(ctx, pruneOpts))
bm.Restart()
waitForPrune(ctx, t, 1, full)
assert.False(g.t, g.Exists(ctx, garbage), "Garbage should be removed from cold store after prune but it's still there")
}
func TestAutoPrune(t *testing.T) {
ctx := context.Background()
// disable sync checking because efficient itests require that the node is out of sync : /
splitstore.CheckSyncGap = false
opts := []interface{}{kit.MockProofs(), kit.SplitstoreUniversal(), kit.SplitstoreAutoPrune(), kit.FsRepo()}
full, genesisMiner, ens := kit.EnsembleMinimal(t, opts...)
bm := ens.InterconnectAll().BeginMining(4 * time.Millisecond)[0]
_ = full
_ = genesisMiner
// create garbage
g := NewGarbager(ctx, t, full)
garbage, e := g.Drop(ctx)
assert.True(g.t, g.Exists(ctx, garbage), "Garbage not found in splitstore")
// calculate next compaction where we should actually see cleanup
// pause, check for compacting and get compaction info
// we do this to remove the (very unlikely) race where compaction index
// and compaction epoch are in the middle of update, or a whole compaction
// runs between the two
for {
bm.Pause()
if splitStoreCompacting(ctx, t, full) {
bm.Restart()
time.Sleep(3 * time.Second)
} else {
break
}
}
lastCompactionEpoch := splitStoreBaseEpoch(ctx, t, full)
garbageCompactionIndex := splitStoreCompactionIndex(ctx, t, full) + 1
boundary := lastCompactionEpoch + splitstore.CompactionThreshold - splitstore.CompactionBoundary
for e > boundary {
boundary += splitstore.CompactionThreshold - splitstore.CompactionBoundary
garbageCompactionIndex++
}
bm.Restart()
// wait for compaction to occur
waitForCompaction(ctx, t, garbageCompactionIndex, full)
bm.Pause()
// This data should now be moved to the coldstore.
// Access it without hotview to keep it there while checking that it still exists
// Only state compute uses hot view so garbager Exists backed by ChainReadObj is all good
assert.True(g.t, g.Exists(ctx, garbage), "Garbage not found in splitstore")
bm.Restart()
waitForPrune(ctx, t, 1, full)
assert.False(g.t, g.Exists(ctx, garbage), "Garbage should be removed from cold store through auto prune but it's still there")
}
func waitForCompaction(ctx context.Context, t *testing.T, cIdx int64, n *kit.TestFullNode) {
for {
if splitStoreCompactionIndex(ctx, t, n) >= cIdx {
break
}
time.Sleep(1 * time.Second)
}
}
func waitForPrune(ctx context.Context, t *testing.T, pIdx int64, n *kit.TestFullNode) {
for {
if splitStorePruneIndex(ctx, t, n) >= pIdx {
break
}
time.Sleep(1 * time.Second)
}
}
func splitStoreCompacting(ctx context.Context, t *testing.T, n *kit.TestFullNode) bool {
info, err := n.ChainBlockstoreInfo(ctx)
require.NoError(t, err)
compactingRaw, ok := info["compacting"]
require.True(t, ok, "compactions not on blockstore info")
compacting, ok := compactingRaw.(bool)
require.True(t, ok, "compacting key on blockstore info wrong type")
return compacting
}
func splitStoreBaseEpoch(ctx context.Context, t *testing.T, n *kit.TestFullNode) abi.ChainEpoch {
info, err := n.ChainBlockstoreInfo(ctx)
require.NoError(t, err)
baseRaw, ok := info["base epoch"]
require.True(t, ok, "'base epoch' not on blockstore info")
base, ok := baseRaw.(abi.ChainEpoch)
require.True(t, ok, "base epoch key on blockstore info wrong type")
return base
}
func splitStoreCompactionIndex(ctx context.Context, t *testing.T, n *kit.TestFullNode) int64 {
info, err := n.ChainBlockstoreInfo(ctx)
require.NoError(t, err)
compact, ok := info["compactions"]
require.True(t, ok, "compactions not on blockstore info")
compactionIndex, ok := compact.(int64)
require.True(t, ok, "compaction key on blockstore info wrong type")
return compactionIndex
}
func splitStorePruneIndex(ctx context.Context, t *testing.T, n *kit.TestFullNode) int64 {
info, err := n.ChainBlockstoreInfo(ctx)
require.NoError(t, err)
prune, ok := info["prunes"]
require.True(t, ok, "prunes not on blockstore info")
pruneIndex, ok := prune.(int64)
require.True(t, ok, "prune key on blockstore info wrong type")
return pruneIndex
}
// Create on chain unreachable garbage for a network to exercise splitstore
// one garbage cid created at a time
//
// It works by rewriting an internally maintained miner actor's peer ID
type Garbager struct {
t *testing.T
node *kit.TestFullNode
latest trashID
// internal tracking
maddr4Data address.Address
}
type trashID uint8
func NewGarbager(ctx context.Context, t *testing.T, n *kit.TestFullNode) *Garbager {
// create miner actor for writing garbage
g := &Garbager{
t: t,
node: n,
latest: 0,
maddr4Data: address.Undef,
}
g.createMiner(ctx)
g.newPeerID(ctx)
return g
}
// drop returns the cid referencing the dropped garbage and the chain epoch of the drop
func (g *Garbager) Drop(ctx context.Context) (cid.Cid, abi.ChainEpoch) {
// record existing with mInfoCidAtEpoch
c := g.mInfoCid(ctx)
// update trashID and create newPeerID, dropping miner info cid c in the process
// wait for message and return the chain height that the drop occurred at
g.latest++
return c, g.newPeerID(ctx)
}
// exists checks whether the cid is reachable through the node
func (g *Garbager) Exists(ctx context.Context, c cid.Cid) bool {
// check chain get / blockstore get
_, err := g.node.ChainReadObj(ctx, c)
if ipld.IsNotFound(err) {
return false
} else if err != nil {
g.t.Fatalf("ChainReadObj failure on existence check: %s", err)
} else {
return true
}
g.t.Fatal("unreachable")
return false
}
func (g *Garbager) newPeerID(ctx context.Context) abi.ChainEpoch {
dataStr := fmt.Sprintf("Garbager-Data-%d", g.latest)
dataID := []byte(dataStr)
params, err := actors.SerializeParams(&miner2.ChangePeerIDParams{NewID: dataID})
require.NoError(g.t, err)
msg := &types.Message{
To: g.maddr4Data,
From: g.node.DefaultKey.Address,
Method: builtin.MethodsMiner.ChangePeerID,
Params: params,
Value: types.NewInt(0),
}
signed, err2 := g.node.MpoolPushMessage(ctx, msg, nil)
require.NoError(g.t, err2)
mw, err2 := g.node.StateWaitMsg(ctx, signed.Cid(), build.MessageConfidence, api.LookbackNoLimit, true)
require.NoError(g.t, err2)
require.Equal(g.t, exitcode.Ok, mw.Receipt.ExitCode)
return mw.Height
}
func (g *Garbager) mInfoCid(ctx context.Context) cid.Cid {
ts, err := g.node.ChainHead(ctx)
require.NoError(g.t, err)
act, err := g.node.StateGetActor(ctx, g.maddr4Data, ts.Key())
require.NoError(g.t, err)
raw, err := g.node.ChainReadObj(ctx, act.Head)
require.NoError(g.t, err)
var mSt miner8.State
require.NoError(g.t, mSt.UnmarshalCBOR(bytes.NewReader(raw)))
// return infoCid
return mSt.Info
}
func (g *Garbager) createMiner(ctx context.Context) {
require.True(g.t, g.maddr4Data == address.Undef, "garbager miner actor already created")
owner, err := g.node.WalletDefaultAddress(ctx)
require.NoError(g.t, err)
worker := owner
params, err := actors.SerializeParams(&power6.CreateMinerParams{
Owner: owner,
Worker: worker,
WindowPoStProofType: abi.RegisteredPoStProof_StackedDrgWindow32GiBV1,
})
require.NoError(g.t, err)
createStorageMinerMsg := &types.Message{
To: power.Address,
From: worker,
Value: big.Zero(),
Method: power.Methods.CreateMiner,
Params: params,
}
signed, err := g.node.MpoolPushMessage(ctx, createStorageMinerMsg, nil)
require.NoError(g.t, err)
mw, err := g.node.StateWaitMsg(ctx, signed.Cid(), build.MessageConfidence, lapi.LookbackNoLimit, true)
require.NoError(g.t, err)
require.True(g.t, mw.Receipt.ExitCode == 0, "garbager's internal create miner message failed")
var retval power6.CreateMinerReturn
require.NoError(g.t, retval.UnmarshalCBOR(bytes.NewReader(mw.Receipt.Return)))
g.maddr4Data = retval.IDAddress
}