itests: Test snapdeals abort cleanup

This commit is contained in:
Łukasz Magiera 2022-11-15 16:16:03 +01:00
parent 4ae2d400d1
commit 211712bf6d
6 changed files with 217 additions and 46 deletions

View File

@ -417,6 +417,10 @@ func (st *SealSeed) Equals(ost *SealSeed) bool {
type SectorState string
func (s *SectorState) String() string {
return string(*s)
}
type AddrUse int
const (

View File

@ -224,3 +224,36 @@ func (tm *TestMiner) SectorsListNonGenesis(ctx context.Context) ([]abi.SectorNum
return l[tm.PresealSectors:], nil
}
type SchedInfo struct {
CallToWork struct{}
EarlyRet interface{}
ReturnedWork interface{}
SchedInfo struct {
OpenWindows []string
Requests []struct {
Priority int
SchedId uuid.UUID
Sector struct {
Miner int
Number int
}
TaskType string
}
}
Waiting interface{}
}
func (tm *TestMiner) SchedInfo(ctx context.Context) SchedInfo {
schedb, err := tm.SealingSchedDiag(ctx, false)
require.NoError(tm.t, err)
j, err := json.MarshalIndent(&schedb, "", " ")
require.NoError(tm.t, err)
var b SchedInfo
err = json.Unmarshal(j, &b)
require.NoError(tm.t, err)
return b
}

View File

@ -2,13 +2,11 @@ package itests
import (
"context"
"encoding/json"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/google/uuid"
logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
@ -417,26 +415,6 @@ func TestSchedulerRemoveRequest(t *testing.T) {
require.NoError(t, err)
require.True(t, e)
type info struct {
CallToWork struct {
} `json:"CallToWork"`
EarlyRet interface{} `json:"EarlyRet"`
ReturnedWork interface{} `json:"ReturnedWork"`
SchedInfo struct {
OpenWindows []string `json:"OpenWindows"`
Requests []struct {
Priority int `json:"Priority"`
SchedID string `json:"SchedId"`
Sector struct {
Miner int `json:"Miner"`
Number int `json:"Number"`
} `json:"Sector"`
TaskType string `json:"TaskType"`
} `json:"Requests"`
} `json:"SchedInfo"`
Waiting interface{} `json:"Waiting"`
}
tocheck := miner.StartPledge(ctx, 1, 0, nil)
var sn abi.SectorNumber
for n := range tocheck {
@ -453,39 +431,18 @@ func TestSchedulerRemoveRequest(t *testing.T) {
}
// Dump current scheduler info
schedb, err := miner.SealingSchedDiag(ctx, false)
require.NoError(t, err)
j, err := json.MarshalIndent(&schedb, "", " ")
require.NoError(t, err)
var b info
err = json.Unmarshal(j, &b)
require.NoError(t, err)
var schedidb uuid.UUID
b := miner.SchedInfo(ctx)
// cast scheduler info and get the request UUID. Call the SealingRemoveRequest()
require.Len(t, b.SchedInfo.Requests, 1)
require.Equal(t, "seal/v0/precommit/2", b.SchedInfo.Requests[0].TaskType)
schedidb, err = uuid.Parse(b.SchedInfo.Requests[0].SchedID)
require.NoError(t, err)
err = miner.SealingRemoveRequest(ctx, schedidb)
err = miner.SealingRemoveRequest(ctx, b.SchedInfo.Requests[0].SchedId)
require.NoError(t, err)
// Dump the schduler again and compare the UUID if a request is present
// If no request present then pass the test
scheda, err := miner.SealingSchedDiag(ctx, false)
require.NoError(t, err)
k, err := json.MarshalIndent(&scheda, "", " ")
require.NoError(t, err)
var a info
err = json.Unmarshal(k, &a)
require.NoError(t, err)
a := miner.SchedInfo(ctx)
require.Len(t, a.SchedInfo.Requests, 0)
}

View File

@ -0,0 +1,168 @@
package itests
import (
"context"
"fmt"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/node/config"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/stretchr/testify/require"
"testing"
"time"
)
func TestWorkerUpgradeAbortCleanup(t *testing.T) {
ctx := context.Background()
blockTime := 1 * time.Millisecond
kit.QuietMiningLogs()
client, miner, ens := kit.EnsembleMinimal(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
kit.NoStorage(), // no storage to have better control over path settings
kit.MutateSealingConfig(func(sc *config.SealingConfig) { sc.FinalizeEarly = true })) // no mock proofs
var worker kit.TestWorker
ens.Worker(miner, &worker, kit.ThroughRPC(), kit.NoStorage(), // no storage to have better control over path settings
kit.WithTaskTypes([]sealtasks.TaskType{
sealtasks.TTFetch, sealtasks.TTAddPiece,
sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2,
sealtasks.TTReplicaUpdate, // only first update step, later steps will not run and we'll abort
}),
)
ens.Start().InterconnectAll().BeginMiningMustPost(blockTime)
maddr, err := miner.ActorAddress(ctx)
if err != nil {
t.Fatal(err)
}
// get storage paths
// store-only path on the miner
miner.AddStorage(ctx, t, func(cfg *storiface.LocalStorageMeta) {
cfg.CanSeal = false
cfg.CanStore = true
})
mlocal, err := miner.StorageLocal(ctx)
require.NoError(t, err)
require.Len(t, mlocal, 2) // genesis and one local
// we want a seal-only path on the worker disconnected from miner path
worker.AddStorage(ctx, t, func(cfg *storiface.LocalStorageMeta) {
cfg.CanSeal = true
cfg.CanStore = false
})
wpaths, err := worker.Paths(ctx)
require.NoError(t, err)
require.Len(t, wpaths, 1)
// check sectors in paths
checkSectors := func(miners, workers storiface.SectorFileType) {
paths, err := miner.StorageList(ctx)
require.NoError(t, err)
require.Len(t, paths, 3) // genesis, miner, worker
// first loop for debugging
for id, decls := range paths {
pinfo, err := miner.StorageInfo(ctx, id)
require.NoError(t, err)
switch {
case id == wpaths[0].ID: // worker path
fmt.Println("Worker Decls ", len(decls), decls)
case !pinfo.CanStore && !pinfo.CanSeal: // genesis path
fmt.Println("Genesis Decls ", len(decls), decls)
default: // miner path
fmt.Println("Miner Decls ", len(decls), decls)
}
}
for id, decls := range paths {
pinfo, err := miner.StorageInfo(ctx, id)
require.NoError(t, err)
switch {
case id == wpaths[0].ID: // worker path
if workers != storiface.FTNone {
require.Len(t, decls, 1)
require.EqualValues(t, decls[0].SectorFileType.Strings(), workers.Strings())
} else {
require.Len(t, decls, 0)
}
case !pinfo.CanStore && !pinfo.CanSeal: // genesis path
require.Len(t, decls, kit.DefaultPresealsPerBootstrapMiner)
default: // miner path
if miners != storiface.FTNone {
require.Len(t, decls, 1)
require.EqualValues(t, decls[0].SectorFileType.Strings(), miners.Strings())
} else {
require.Len(t, decls, 0)
}
}
}
}
checkSectors(storiface.FTNone, storiface.FTNone)
// get a sector for upgrading
miner.PledgeSectors(ctx, 1, 0, nil)
sl, err := miner.SectorsListNonGenesis(ctx)
require.NoError(t, err)
require.Len(t, sl, 1, "expected 1 sector")
snum := sl[0]
checkSectors(storiface.FTCache|storiface.FTSealed, storiface.FTNone)
client.WaitForSectorActive(ctx, t, snum, maddr)
// make available
err = miner.SectorMarkForUpgrade(ctx, snum, true)
require.NoError(t, err)
// Start a deal
dh := kit.NewDealHarness(t, client, miner, miner)
res, _ := client.CreateImportFile(ctx, 123, 0)
dp := dh.DefaultStartDealParams()
dp.Data.Root = res.Root
deal := dh.StartDeal(ctx, dp)
// wait for the deal to be in a sector
dh.WaitDealSealed(ctx, deal, true, false, nil)
// wait for replica update to happen
require.Eventually(t, func() bool {
sstate, err := miner.SectorsStatus(ctx, snum, false)
require.NoError(t, err)
return sstate.State == api.SectorState(sealing.ProveReplicaUpdate)
}, 10*time.Second, 50*time.Millisecond)
// check that the sector was copied to the worker
checkSectors(storiface.FTCache|storiface.FTSealed, storiface.FTCache|storiface.FTSealed|storiface.FTUnsealed|storiface.FTUpdate|storiface.FTUpdateCache)
// abort upgrade
err = miner.SectorAbortUpgrade(ctx, snum)
require.NoError(t, err)
// the task is stuck in scheduler, so manually abort the task to get the sector fsm moving
si := miner.SchedInfo(ctx)
err = miner.SealingRemoveRequest(ctx, si.SchedInfo.Requests[0].SchedId)
require.NoError(t, err)
var lastState api.SectorState
require.Eventually(t, func() bool {
sstate, err := miner.SectorsStatus(ctx, snum, false)
require.NoError(t, err)
lastState = sstate.State
return sstate.State == api.SectorState(sealing.Proving)
}, 10*time.Second, 50*time.Millisecond, "last state was %s", &lastState)
// check that nothing was left on the worker
checkSectors(storiface.FTCache|storiface.FTSealed, storiface.FTNone)
}

View File

@ -425,6 +425,7 @@ func (m *Sealing) handleAbortUpgrade(ctx statemachine.Context, sector SectorInfo
m.cleanupAssignedDeals(sector)
// Remove snap deals replica if any
// This removes update / update-cache from all storage
if err := m.sealer.ReleaseReplicaUpgrade(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil {
return xerrors.Errorf("removing CC update files from sector storage")
}
@ -434,10 +435,14 @@ func (m *Sealing) handleAbortUpgrade(ctx statemachine.Context, sector SectorInfo
return xerrors.Errorf("getting sealing config: %w", err)
}
// This removes the unsealed file from all storage
// TODO: Pass full sector range
if err := m.sealer.ReleaseUnsealed(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(sector.CCPieces, true, cfg.AlwaysKeepUnsealedCopy)); err != nil {
log.Error(err)
}
// TODO: Remove sealed/cache copies
return ctx.Send(SectorRevertUpgradeToProving{})
}

View File

@ -69,7 +69,11 @@ type Sealer interface {
// (called by the fsm on restart, allows storage to keep no persistent
// state about unsealed fast-retrieval copies)
ReleaseUnsealed(ctx context.Context, sector SectorRef, safeToFree []Range) error
// ReleaseSectorKey removes `sealed` from all storage
// called after successful sector upgrade
ReleaseSectorKey(ctx context.Context, sector SectorRef) error
// ReleaseReplicaUpgrade removes `update` / `update-cache` from all storage
// called when aborting sector upgrade
ReleaseReplicaUpgrade(ctx context.Context, sector SectorRef) error
// Removes all data associated with the specified sector