Merge pull request #9648 from filecoin-project/fix/snap-abort-cleanup
fix: sealing: More complete snapdeals abort cleanup
This commit is contained in:
commit
285cc66773
@ -958,6 +958,11 @@ workflows:
|
||||
suite: itest-worker
|
||||
target: "./itests/worker_test.go"
|
||||
|
||||
- test:
|
||||
name: test-itest-worker_upgrade
|
||||
suite: itest-worker_upgrade
|
||||
target: "./itests/worker_upgrade_test.go"
|
||||
|
||||
- test:
|
||||
name: test-unit-cli
|
||||
suite: utest-unit-cli
|
||||
|
@ -417,6 +417,10 @@ func (st *SealSeed) Equals(ost *SealSeed) bool {
|
||||
|
||||
type SectorState string
|
||||
|
||||
func (s *SectorState) String() string {
|
||||
return string(*s)
|
||||
}
|
||||
|
||||
type AddrUse int
|
||||
|
||||
const (
|
||||
|
@ -39,13 +39,13 @@ type Worker interface {
|
||||
SealPreCommit2(ctx context.Context, sector storiface.SectorRef, pc1o storiface.PreCommit1Out) (storiface.CallID, error) //perm:admin
|
||||
SealCommit1(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storiface.SectorCids) (storiface.CallID, error) //perm:admin
|
||||
SealCommit2(ctx context.Context, sector storiface.SectorRef, c1o storiface.Commit1Out) (storiface.CallID, error) //perm:admin
|
||||
FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) //perm:admin
|
||||
FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) //perm:admin
|
||||
FinalizeSector(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) //perm:admin
|
||||
FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) //perm:admin
|
||||
ReplicaUpdate(ctx context.Context, sector storiface.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
|
||||
ProveReplicaUpdate1(ctx context.Context, sector storiface.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) //perm:admin
|
||||
ProveReplicaUpdate2(ctx context.Context, sector storiface.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storiface.ReplicaVanillaProofs) (storiface.CallID, error) //perm:admin
|
||||
GenerateSectorKeyFromData(ctx context.Context, sector storiface.SectorRef, commD cid.Cid) (storiface.CallID, error) //perm:admin
|
||||
ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, safeToFree []storiface.Range) (storiface.CallID, error) //perm:admin
|
||||
ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) //perm:admin
|
||||
MoveStorage(ctx context.Context, sector storiface.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) //perm:admin
|
||||
UnsealPiece(context.Context, storiface.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) //perm:admin
|
||||
Fetch(context.Context, storiface.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) //perm:admin
|
||||
|
@ -978,9 +978,9 @@ type WorkerStruct struct {
|
||||
|
||||
Fetch func(p0 context.Context, p1 storiface.SectorRef, p2 storiface.SectorFileType, p3 storiface.PathType, p4 storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"`
|
||||
|
||||
FinalizeReplicaUpdate func(p0 context.Context, p1 storiface.SectorRef, p2 []storiface.Range) (storiface.CallID, error) `perm:"admin"`
|
||||
FinalizeReplicaUpdate func(p0 context.Context, p1 storiface.SectorRef) (storiface.CallID, error) `perm:"admin"`
|
||||
|
||||
FinalizeSector func(p0 context.Context, p1 storiface.SectorRef, p2 []storiface.Range) (storiface.CallID, error) `perm:"admin"`
|
||||
FinalizeSector func(p0 context.Context, p1 storiface.SectorRef) (storiface.CallID, error) `perm:"admin"`
|
||||
|
||||
GenerateSectorKeyFromData func(p0 context.Context, p1 storiface.SectorRef, p2 cid.Cid) (storiface.CallID, error) `perm:"admin"`
|
||||
|
||||
@ -5689,25 +5689,25 @@ func (s *WorkerStub) Fetch(p0 context.Context, p1 storiface.SectorRef, p2 storif
|
||||
return *new(storiface.CallID), ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *WorkerStruct) FinalizeReplicaUpdate(p0 context.Context, p1 storiface.SectorRef, p2 []storiface.Range) (storiface.CallID, error) {
|
||||
func (s *WorkerStruct) FinalizeReplicaUpdate(p0 context.Context, p1 storiface.SectorRef) (storiface.CallID, error) {
|
||||
if s.Internal.FinalizeReplicaUpdate == nil {
|
||||
return *new(storiface.CallID), ErrNotSupported
|
||||
}
|
||||
return s.Internal.FinalizeReplicaUpdate(p0, p1, p2)
|
||||
return s.Internal.FinalizeReplicaUpdate(p0, p1)
|
||||
}
|
||||
|
||||
func (s *WorkerStub) FinalizeReplicaUpdate(p0 context.Context, p1 storiface.SectorRef, p2 []storiface.Range) (storiface.CallID, error) {
|
||||
func (s *WorkerStub) FinalizeReplicaUpdate(p0 context.Context, p1 storiface.SectorRef) (storiface.CallID, error) {
|
||||
return *new(storiface.CallID), ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *WorkerStruct) FinalizeSector(p0 context.Context, p1 storiface.SectorRef, p2 []storiface.Range) (storiface.CallID, error) {
|
||||
func (s *WorkerStruct) FinalizeSector(p0 context.Context, p1 storiface.SectorRef) (storiface.CallID, error) {
|
||||
if s.Internal.FinalizeSector == nil {
|
||||
return *new(storiface.CallID), ErrNotSupported
|
||||
}
|
||||
return s.Internal.FinalizeSector(p0, p1, p2)
|
||||
return s.Internal.FinalizeSector(p0, p1)
|
||||
}
|
||||
|
||||
func (s *WorkerStub) FinalizeSector(p0 context.Context, p1 storiface.SectorRef, p2 []storiface.Range) (storiface.CallID, error) {
|
||||
func (s *WorkerStub) FinalizeSector(p0 context.Context, p1 storiface.SectorRef) (storiface.CallID, error) {
|
||||
return *new(storiface.CallID), ErrNotSupported
|
||||
}
|
||||
|
||||
|
@ -58,7 +58,7 @@ var (
|
||||
FullAPIVersion1 = newVer(2, 3, 0)
|
||||
|
||||
MinerAPIVersion0 = newVer(1, 5, 0)
|
||||
WorkerAPIVersion0 = newVer(1, 6, 0)
|
||||
WorkerAPIVersion0 = newVer(1, 7, 0)
|
||||
)
|
||||
|
||||
//nolint:varcheck,deadcode
|
||||
|
Binary file not shown.
@ -164,7 +164,7 @@ func presealSector(sb *ffiwrapper.Sealer, sbfs *basicfs.Provider, sid storiface.
|
||||
return nil, xerrors.Errorf("commit: %w", err)
|
||||
}
|
||||
|
||||
if err := sb.FinalizeSector(context.TODO(), sid, nil); err != nil {
|
||||
if err := sb.FinalizeSector(context.TODO(), sid); err != nil {
|
||||
return nil, xerrors.Errorf("trim cache: %w", err)
|
||||
}
|
||||
|
||||
|
@ -369,7 +369,7 @@ var runCmd = &cli.Command{
|
||||
}
|
||||
|
||||
if workerType == "" {
|
||||
taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFinalizeReplicaUpdate)
|
||||
taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFinalizeUnsealed, sealtasks.TTFinalizeReplicaUpdate)
|
||||
|
||||
if !cctx.Bool("no-default") {
|
||||
workerType = sealtasks.WorkerSealing
|
||||
|
@ -1601,14 +1601,8 @@ Inputs:
|
||||
"Number": 9
|
||||
},
|
||||
"ProofType": 8
|
||||
},
|
||||
[
|
||||
{
|
||||
"Offset": 1024,
|
||||
"Size": 1024
|
||||
}
|
||||
]
|
||||
]
|
||||
```
|
||||
|
||||
Response:
|
||||
@ -1636,14 +1630,8 @@ Inputs:
|
||||
"Number": 9
|
||||
},
|
||||
"ProofType": 8
|
||||
},
|
||||
[
|
||||
{
|
||||
"Offset": 1024,
|
||||
"Size": 1024
|
||||
}
|
||||
]
|
||||
]
|
||||
```
|
||||
|
||||
Response:
|
||||
|
@ -224,3 +224,38 @@ func (tm *TestMiner) SectorsListNonGenesis(ctx context.Context) ([]abi.SectorNum
|
||||
|
||||
return l[tm.PresealSectors:], nil
|
||||
}
|
||||
|
||||
// comes from https://github.com/filecoin-project/lotus/blob/8ba4355cabd25e5f65261aaa561ff676321ffbd8/storage/sealer/manager.go#L1226
|
||||
// todo: have this defined in one place
|
||||
type SchedInfo struct {
|
||||
CallToWork struct{}
|
||||
EarlyRet interface{}
|
||||
ReturnedWork interface{}
|
||||
SchedInfo struct {
|
||||
OpenWindows []string
|
||||
Requests []struct {
|
||||
Priority int
|
||||
SchedId uuid.UUID
|
||||
Sector struct {
|
||||
Miner int
|
||||
Number int
|
||||
}
|
||||
TaskType string
|
||||
}
|
||||
}
|
||||
Waiting interface{}
|
||||
}
|
||||
|
||||
func (tm *TestMiner) SchedInfo(ctx context.Context) SchedInfo {
|
||||
schedb, err := tm.SealingSchedDiag(ctx, false)
|
||||
require.NoError(tm.t, err)
|
||||
|
||||
j, err := json.MarshalIndent(&schedb, "", " ")
|
||||
require.NoError(tm.t, err)
|
||||
|
||||
var b SchedInfo
|
||||
err = json.Unmarshal(j, &b)
|
||||
require.NoError(tm.t, err)
|
||||
|
||||
return b
|
||||
}
|
||||
|
@ -58,7 +58,7 @@ var DefaultNodeOpts = nodeOpts{
|
||||
sectors: DefaultPresealsPerBootstrapMiner,
|
||||
sectorSize: abi.SectorSize(2 << 10), // 2KiB.
|
||||
|
||||
workerTasks: []sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize},
|
||||
workerTasks: []sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFinalizeUnsealed},
|
||||
workerStorageOpt: func(store paths.Store) paths.Store { return store },
|
||||
}
|
||||
|
||||
@ -229,7 +229,7 @@ func WithWorkerName(n string) NodeOpt {
|
||||
}
|
||||
}
|
||||
|
||||
var WithSealWorkerTasks = WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal})
|
||||
var WithSealWorkerTasks = WithTaskTypes(append([]sealtasks.TaskType{sealtasks.TTAddPiece, sealtasks.TTDataCid, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal}, DefaultNodeOpts.workerTasks...))
|
||||
|
||||
func WithWorkerStorage(transform func(paths.Store) paths.Store) NodeOpt {
|
||||
return func(opts *nodeOpts) error {
|
||||
|
@ -2,13 +2,11 @@ package itests
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/xerrors"
|
||||
@ -79,7 +77,7 @@ func TestWorkerPledgeLocalFin(t *testing.T) {
|
||||
func TestWorkerDataCid(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
_, miner, worker, _ := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
|
||||
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTDataCid, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal})) // no mock proofs
|
||||
kit.WithSealWorkerTasks) // no mock proofs
|
||||
|
||||
e, err := worker.Enabled(ctx)
|
||||
require.NoError(t, err)
|
||||
@ -409,7 +407,7 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
|
||||
func TestSchedulerRemoveRequest(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
_, miner, worker, _ := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
|
||||
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTDataCid, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit2, sealtasks.TTUnseal})) // no mock proofs
|
||||
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTAddPiece, sealtasks.TTPreCommit1})) // no mock proofs
|
||||
|
||||
//ens.InterconnectAll().BeginMining(50 * time.Millisecond)
|
||||
|
||||
@ -417,26 +415,6 @@ func TestSchedulerRemoveRequest(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.True(t, e)
|
||||
|
||||
type info struct {
|
||||
CallToWork struct {
|
||||
} `json:"CallToWork"`
|
||||
EarlyRet interface{} `json:"EarlyRet"`
|
||||
ReturnedWork interface{} `json:"ReturnedWork"`
|
||||
SchedInfo struct {
|
||||
OpenWindows []string `json:"OpenWindows"`
|
||||
Requests []struct {
|
||||
Priority int `json:"Priority"`
|
||||
SchedID string `json:"SchedId"`
|
||||
Sector struct {
|
||||
Miner int `json:"Miner"`
|
||||
Number int `json:"Number"`
|
||||
} `json:"Sector"`
|
||||
TaskType string `json:"TaskType"`
|
||||
} `json:"Requests"`
|
||||
} `json:"SchedInfo"`
|
||||
Waiting interface{} `json:"Waiting"`
|
||||
}
|
||||
|
||||
tocheck := miner.StartPledge(ctx, 1, 0, nil)
|
||||
var sn abi.SectorNumber
|
||||
for n := range tocheck {
|
||||
@ -453,39 +431,18 @@ func TestSchedulerRemoveRequest(t *testing.T) {
|
||||
}
|
||||
|
||||
// Dump current scheduler info
|
||||
schedb, err := miner.SealingSchedDiag(ctx, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
j, err := json.MarshalIndent(&schedb, "", " ")
|
||||
require.NoError(t, err)
|
||||
|
||||
var b info
|
||||
err = json.Unmarshal(j, &b)
|
||||
require.NoError(t, err)
|
||||
|
||||
var schedidb uuid.UUID
|
||||
b := miner.SchedInfo(ctx)
|
||||
|
||||
// cast scheduler info and get the request UUID. Call the SealingRemoveRequest()
|
||||
require.Len(t, b.SchedInfo.Requests, 1)
|
||||
require.Equal(t, "seal/v0/precommit/2", b.SchedInfo.Requests[0].TaskType)
|
||||
|
||||
schedidb, err = uuid.Parse(b.SchedInfo.Requests[0].SchedID)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = miner.SealingRemoveRequest(ctx, schedidb)
|
||||
err = miner.SealingRemoveRequest(ctx, b.SchedInfo.Requests[0].SchedId)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Dump the schduler again and compare the UUID if a request is present
|
||||
// If no request present then pass the test
|
||||
scheda, err := miner.SealingSchedDiag(ctx, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
k, err := json.MarshalIndent(&scheda, "", " ")
|
||||
require.NoError(t, err)
|
||||
|
||||
var a info
|
||||
err = json.Unmarshal(k, &a)
|
||||
require.NoError(t, err)
|
||||
a := miner.SchedInfo(ctx)
|
||||
|
||||
require.Len(t, a.SchedInfo.Requests, 0)
|
||||
}
|
||||
|
170
itests/worker_upgrade_test.go
Normal file
170
itests/worker_upgrade_test.go
Normal file
@ -0,0 +1,170 @@
|
||||
package itests
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/itests/kit"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
sealing "github.com/filecoin-project/lotus/storage/pipeline"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
func TestWorkerUpgradeAbortCleanup(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
blockTime := 1 * time.Millisecond
|
||||
kit.QuietMiningLogs()
|
||||
|
||||
client, miner, ens := kit.EnsembleMinimal(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
|
||||
kit.NoStorage(), // no storage to have better control over path settings
|
||||
kit.MutateSealingConfig(func(sc *config.SealingConfig) { sc.FinalizeEarly = true })) // no mock proofs
|
||||
|
||||
var worker kit.TestWorker
|
||||
ens.Worker(miner, &worker, kit.ThroughRPC(), kit.NoStorage(), // no storage to have better control over path settings
|
||||
kit.WithTaskTypes([]sealtasks.TaskType{
|
||||
sealtasks.TTFetch, sealtasks.TTAddPiece,
|
||||
sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFinalizeUnsealed, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2,
|
||||
sealtasks.TTReplicaUpdate, // only first update step, later steps will not run and we'll abort
|
||||
}),
|
||||
)
|
||||
|
||||
ens.Start().InterconnectAll().BeginMiningMustPost(blockTime)
|
||||
|
||||
maddr, err := miner.ActorAddress(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// get storage paths
|
||||
|
||||
// store-only path on the miner
|
||||
miner.AddStorage(ctx, t, func(cfg *storiface.LocalStorageMeta) {
|
||||
cfg.CanSeal = false
|
||||
cfg.CanStore = true
|
||||
})
|
||||
|
||||
mlocal, err := miner.StorageLocal(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, mlocal, 2) // genesis and one local
|
||||
|
||||
// we want a seal-only path on the worker disconnected from miner path
|
||||
worker.AddStorage(ctx, t, func(cfg *storiface.LocalStorageMeta) {
|
||||
cfg.CanSeal = true
|
||||
cfg.CanStore = false
|
||||
})
|
||||
|
||||
wpaths, err := worker.Paths(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, wpaths, 1)
|
||||
|
||||
// check sectors in paths
|
||||
checkSectors := func(miners, workers storiface.SectorFileType) {
|
||||
paths, err := miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, paths, 3) // genesis, miner, worker
|
||||
|
||||
// first loop for debugging
|
||||
for id, decls := range paths {
|
||||
pinfo, err := miner.StorageInfo(ctx, id)
|
||||
require.NoError(t, err)
|
||||
|
||||
switch {
|
||||
case id == wpaths[0].ID: // worker path
|
||||
fmt.Println("Worker Decls ", len(decls), decls)
|
||||
case !pinfo.CanStore && !pinfo.CanSeal: // genesis path
|
||||
fmt.Println("Genesis Decls ", len(decls), decls)
|
||||
default: // miner path
|
||||
fmt.Println("Miner Decls ", len(decls), decls)
|
||||
}
|
||||
}
|
||||
|
||||
for id, decls := range paths {
|
||||
pinfo, err := miner.StorageInfo(ctx, id)
|
||||
require.NoError(t, err)
|
||||
|
||||
switch {
|
||||
case id == wpaths[0].ID: // worker path
|
||||
if workers != storiface.FTNone {
|
||||
require.Len(t, decls, 1)
|
||||
require.EqualValues(t, workers.Strings(), decls[0].SectorFileType.Strings())
|
||||
} else {
|
||||
require.Len(t, decls, 0)
|
||||
}
|
||||
case !pinfo.CanStore && !pinfo.CanSeal: // genesis path
|
||||
require.Len(t, decls, kit.DefaultPresealsPerBootstrapMiner)
|
||||
default: // miner path
|
||||
if miners != storiface.FTNone {
|
||||
require.Len(t, decls, 1)
|
||||
require.EqualValues(t, miners.Strings(), decls[0].SectorFileType.Strings())
|
||||
} else {
|
||||
require.Len(t, decls, 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
checkSectors(storiface.FTNone, storiface.FTNone)
|
||||
|
||||
// get a sector for upgrading
|
||||
miner.PledgeSectors(ctx, 1, 0, nil)
|
||||
sl, err := miner.SectorsListNonGenesis(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sl, 1, "expected 1 sector")
|
||||
|
||||
snum := sl[0]
|
||||
|
||||
checkSectors(storiface.FTCache|storiface.FTSealed, storiface.FTNone)
|
||||
|
||||
client.WaitForSectorActive(ctx, t, snum, maddr)
|
||||
|
||||
// make available
|
||||
err = miner.SectorMarkForUpgrade(ctx, snum, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Start a deal
|
||||
|
||||
dh := kit.NewDealHarness(t, client, miner, miner)
|
||||
res, _ := client.CreateImportFile(ctx, 123, 0)
|
||||
dp := dh.DefaultStartDealParams()
|
||||
dp.Data.Root = res.Root
|
||||
deal := dh.StartDeal(ctx, dp)
|
||||
|
||||
// wait for the deal to be in a sector
|
||||
dh.WaitDealSealed(ctx, deal, true, false, nil)
|
||||
|
||||
// wait for replica update to happen
|
||||
require.Eventually(t, func() bool {
|
||||
sstate, err := miner.SectorsStatus(ctx, snum, false)
|
||||
require.NoError(t, err)
|
||||
return sstate.State == api.SectorState(sealing.ProveReplicaUpdate)
|
||||
}, 10*time.Second, 50*time.Millisecond)
|
||||
|
||||
// check that the sector was copied to the worker
|
||||
checkSectors(storiface.FTCache|storiface.FTSealed, storiface.FTCache|storiface.FTSealed|storiface.FTUnsealed|storiface.FTUpdate|storiface.FTUpdateCache)
|
||||
|
||||
// abort upgrade
|
||||
err = miner.SectorAbortUpgrade(ctx, snum)
|
||||
require.NoError(t, err)
|
||||
|
||||
// the task is stuck in scheduler, so manually abort the task to get the sector fsm moving
|
||||
si := miner.SchedInfo(ctx)
|
||||
err = miner.SealingRemoveRequest(ctx, si.SchedInfo.Requests[0].SchedId)
|
||||
require.NoError(t, err)
|
||||
|
||||
var lastState api.SectorState
|
||||
require.Eventually(t, func() bool {
|
||||
sstate, err := miner.SectorsStatus(ctx, snum, false)
|
||||
require.NoError(t, err)
|
||||
lastState = sstate.State
|
||||
|
||||
return sstate.State == api.SectorState(sealing.Proving)
|
||||
}, 10*time.Second, 50*time.Millisecond, "last state was %s", &lastState)
|
||||
|
||||
// check that nothing was left on the worker
|
||||
checkSectors(storiface.FTCache|storiface.FTSealed, storiface.FTNone)
|
||||
}
|
@ -97,8 +97,8 @@ func NewClusterRaftConfig(userRaftConfig *config.UserRaftConfig) *ClusterRaftCon
|
||||
|
||||
}
|
||||
|
||||
// // Validate checks that this configuration has working values,
|
||||
// // at least in appearance.
|
||||
// Validate checks that this configuration has working values,
|
||||
// at least in appearance.
|
||||
func ValidateConfig(cfg *ClusterRaftConfig) error {
|
||||
if cfg.RaftConfig == nil {
|
||||
return xerrors.Errorf("no hashicorp/raft.Config")
|
||||
|
@ -17,6 +17,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
var MinRetryTime = 1 * time.Minute
|
||||
@ -425,16 +426,19 @@ func (m *Sealing) handleAbortUpgrade(ctx statemachine.Context, sector SectorInfo
|
||||
m.cleanupAssignedDeals(sector)
|
||||
|
||||
// Remove snap deals replica if any
|
||||
// This removes update / update-cache from all storage
|
||||
if err := m.sealer.ReleaseReplicaUpgrade(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil {
|
||||
return xerrors.Errorf("removing CC update files from sector storage")
|
||||
}
|
||||
|
||||
cfg, err := m.getConfig()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting sealing config: %w", err)
|
||||
// This removes the unsealed file from all storage
|
||||
// note: we're not keeping anything unsealed because we're reverting to CC
|
||||
if err := m.sealer.ReleaseUnsealed(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), []storiface.Range{}); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
|
||||
if err := m.sealer.ReleaseUnsealed(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(sector.CCPieces, true, cfg.AlwaysKeepUnsealedCopy)); err != nil {
|
||||
// and makes sure sealed/cache files only exist in long-term-storage
|
||||
if err := m.sealer.FinalizeSector(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
|
||||
|
@ -305,7 +305,11 @@ func (m *Sealing) handleFinalizeReplicaUpdate(ctx statemachine.Context, sector S
|
||||
return xerrors.Errorf("getting sealing config: %w", err)
|
||||
}
|
||||
|
||||
if err := m.sealer.FinalizeReplicaUpdate(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(sector.Pieces, false, cfg.AlwaysKeepUnsealedCopy)); err != nil {
|
||||
if err := m.sealer.ReleaseUnsealed(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(sector.Pieces, false, cfg.AlwaysKeepUnsealedCopy)); err != nil {
|
||||
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("release unsealed: %w", err)})
|
||||
}
|
||||
|
||||
if err := m.sealer.FinalizeReplicaUpdate(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil {
|
||||
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
|
||||
}
|
||||
|
||||
|
@ -862,7 +862,11 @@ func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorIn
|
||||
return xerrors.Errorf("getting sealing config: %w", err)
|
||||
}
|
||||
|
||||
if err := m.sealer.FinalizeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(sector.Pieces, false, cfg.AlwaysKeepUnsealedCopy)); err != nil {
|
||||
if err := m.sealer.ReleaseUnsealed(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(sector.Pieces, false, cfg.AlwaysKeepUnsealedCopy)); err != nil {
|
||||
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("release unsealed: %w", err)})
|
||||
}
|
||||
|
||||
if err := m.sealer.FinalizeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil {
|
||||
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
|
||||
}
|
||||
|
||||
|
@ -999,7 +999,7 @@ func (sb *Sealer) ReleaseSealed(ctx context.Context, sector storiface.SectorRef)
|
||||
return xerrors.Errorf("not supported at this layer")
|
||||
}
|
||||
|
||||
func (sb *Sealer) freeUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
|
||||
func (sb *Sealer) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
|
||||
ssize, err := sector.ProofType.SectorSize()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -1067,16 +1067,12 @@ func (sb *Sealer) freeUnsealed(ctx context.Context, sector storiface.SectorRef,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sb *Sealer) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
|
||||
func (sb *Sealer) FinalizeSector(ctx context.Context, sector storiface.SectorRef) error {
|
||||
ssize, err := sector.ProofType.SectorSize()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := sb.freeUnsealed(ctx, sector, keepUnsealed); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, 0, storiface.PathStorage)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquiring sector cache path: %w", err)
|
||||
@ -1124,16 +1120,12 @@ func (sb *Sealer) FinalizeSectorInto(ctx context.Context, sector storiface.Secto
|
||||
return ffi.ClearCache(uint64(ssize), dest)
|
||||
}
|
||||
|
||||
func (sb *Sealer) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
|
||||
func (sb *Sealer) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) error {
|
||||
ssize, err := sector.ProofType.SectorSize()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := sb.freeUnsealed(ctx, sector, keepUnsealed); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
{
|
||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, 0, storiface.PathStorage)
|
||||
if err != nil {
|
||||
@ -1161,16 +1153,6 @@ func (sb *Sealer) FinalizeReplicaUpdate(ctx context.Context, sector storiface.Se
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sb *Sealer) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, safeToFree []storiface.Range) error {
|
||||
// This call is meant to mark storage as 'freeable'. Given that unsealing is
|
||||
// very expensive, we don't remove data as soon as we can - instead we only
|
||||
// do that when we don't have free space for data that really needs it
|
||||
|
||||
// This function should not be called at this layer, everything should be
|
||||
// handled in localworker
|
||||
return xerrors.Errorf("not supported at this layer")
|
||||
}
|
||||
|
||||
func (sb *Sealer) ReleaseReplicaUpgrade(ctx context.Context, sector storiface.SectorRef) error {
|
||||
return xerrors.Errorf("not supported at this layer")
|
||||
}
|
||||
|
@ -327,7 +327,7 @@ func TestSealAndVerify(t *testing.T) {
|
||||
|
||||
post(t, sb, nil, s)
|
||||
|
||||
if err := sb.FinalizeSector(context.TODO(), si, nil); err != nil {
|
||||
if err := sb.FinalizeSector(context.TODO(), si); err != nil {
|
||||
t.Fatalf("%+v", err)
|
||||
}
|
||||
|
||||
@ -390,7 +390,7 @@ func TestSealPoStNoCommit(t *testing.T) {
|
||||
|
||||
precommit := time.Now()
|
||||
|
||||
if err := sb.FinalizeSector(context.TODO(), si, nil); err != nil {
|
||||
if err := sb.FinalizeSector(context.TODO(), si); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -142,7 +142,7 @@ func New(ctx context.Context, lstor *paths.Local, stor paths.Store, ls paths.Loc
|
||||
go m.sched.runSched()
|
||||
|
||||
localTasks := []sealtasks.TaskType{
|
||||
sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTFinalizeReplicaUpdate,
|
||||
sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTFinalizeUnsealed, sealtasks.TTFinalizeReplicaUpdate,
|
||||
}
|
||||
if sc.AllowSectorDownload {
|
||||
localTasks = append(localTasks, sealtasks.TTDownloadSector)
|
||||
@ -628,7 +628,27 @@ func (m *Manager) SealCommit2(ctx context.Context, sector storiface.SectorRef, p
|
||||
return out, waitErr
|
||||
}
|
||||
|
||||
func (m *Manager) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
|
||||
// sectorStorageType tries to figure out storage type for a given sector; expects only a single copy of the file in the
|
||||
// storage system
|
||||
func (m *Manager) sectorStorageType(ctx context.Context, sector storiface.SectorRef, ft storiface.SectorFileType) (sectorFound bool, ptype storiface.PathType, err error) {
|
||||
stores, err := m.index.StorageFindSector(ctx, sector.ID, ft, 0, false)
|
||||
if err != nil {
|
||||
return false, "", xerrors.Errorf("finding sector: %w", err)
|
||||
}
|
||||
if len(stores) == 0 {
|
||||
return false, "", nil
|
||||
}
|
||||
|
||||
for _, store := range stores {
|
||||
if store.CanSeal {
|
||||
return true, storiface.PathSealing, nil
|
||||
}
|
||||
}
|
||||
|
||||
return true, storiface.PathStorage, nil
|
||||
}
|
||||
|
||||
func (m *Manager) FinalizeSector(ctx context.Context, sector storiface.SectorRef) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
@ -636,44 +656,38 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storiface.SectorRef
|
||||
return xerrors.Errorf("acquiring sector lock: %w", err)
|
||||
}
|
||||
|
||||
// first check if the unsealed file exists anywhere; If it doesn't ignore it
|
||||
unsealed := storiface.FTUnsealed
|
||||
{
|
||||
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
|
||||
/*
|
||||
We want to:
|
||||
- Trim cache
|
||||
- Move stuff to long-term storage
|
||||
*/
|
||||
|
||||
// remove redundant copies if there are any
|
||||
if err := m.storage.RemoveCopies(ctx, sector.ID, storiface.FTUnsealed); err != nil {
|
||||
return xerrors.Errorf("remove copies (sealed): %w", err)
|
||||
}
|
||||
if err := m.storage.RemoveCopies(ctx, sector.ID, storiface.FTSealed); err != nil {
|
||||
return xerrors.Errorf("remove copies (sealed): %w", err)
|
||||
}
|
||||
if err := m.storage.RemoveCopies(ctx, sector.ID, storiface.FTCache); err != nil {
|
||||
return xerrors.Errorf("remove copies (cache): %w", err)
|
||||
}
|
||||
|
||||
// Make sure that the cache files are still in sealing storage; In case not,
|
||||
// we want to do finalize in long-term storage
|
||||
_, cachePathType, err := m.sectorStorageType(ctx, sector, storiface.FTCache)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("finding unsealed sector: %w", err)
|
||||
}
|
||||
|
||||
if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine
|
||||
unsealed = storiface.FTNone
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure that the sealed file is still in sealing storage; In case it already
|
||||
// isn't, we want to do finalize in long-term storage
|
||||
pathType := storiface.PathStorage
|
||||
{
|
||||
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("finding sealed sector: %w", err)
|
||||
}
|
||||
|
||||
for _, store := range sealedStores {
|
||||
if store.CanSeal {
|
||||
pathType = storiface.PathSealing
|
||||
break
|
||||
}
|
||||
}
|
||||
return xerrors.Errorf("checking cache storage type: %w", err)
|
||||
}
|
||||
|
||||
// do the cache trimming wherever the likely still very large cache lives.
|
||||
// we really don't want to move it.
|
||||
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache, false)
|
||||
|
||||
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
||||
m.schedFetch(sector, storiface.FTCache|unsealed, pathType, storiface.AcquireMove),
|
||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
||||
m.schedFetch(sector, storiface.FTCache, cachePathType, storiface.AcquireMove),
|
||||
func(ctx context.Context, w Worker) error {
|
||||
_, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed))
|
||||
_, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector))
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
@ -684,9 +698,14 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storiface.SectorRef
|
||||
fetchSel := newMoveSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, storiface.PathStorage, !m.disallowRemoteFinalize)
|
||||
|
||||
// only move the unsealed file if it still exists and needs moving
|
||||
moveUnsealed := unsealed
|
||||
moveUnsealed := storiface.FTUnsealed
|
||||
{
|
||||
if len(keepUnsealed) == 0 {
|
||||
found, unsealedPathType, err := m.sectorStorageType(ctx, sector, storiface.FTUnsealed)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("checking cache storage type: %w", err)
|
||||
}
|
||||
|
||||
if !found || unsealedPathType == storiface.PathStorage {
|
||||
moveUnsealed = storiface.FTNone
|
||||
}
|
||||
}
|
||||
@ -705,7 +724,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storiface.SectorRef
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
|
||||
func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
@ -713,19 +732,6 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storiface.Se
|
||||
return xerrors.Errorf("acquiring sector lock: %w", err)
|
||||
}
|
||||
|
||||
// first check if the unsealed file exists anywhere; If it doesn't ignore it
|
||||
moveUnsealed := storiface.FTUnsealed
|
||||
{
|
||||
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("finding unsealed sector: %w", err)
|
||||
}
|
||||
|
||||
if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine
|
||||
moveUnsealed = storiface.FTNone
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure that the update file is still in sealing storage; In case it already
|
||||
// isn't, we want to do finalize in long-term storage
|
||||
pathType := storiface.PathStorage
|
||||
@ -748,9 +754,9 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storiface.Se
|
||||
selector := newExistingSelector(m.index, sector.ID, storiface.FTUpdateCache, false)
|
||||
|
||||
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector,
|
||||
m.schedFetch(sector, storiface.FTCache|storiface.FTUpdateCache|moveUnsealed, pathType, storiface.AcquireMove),
|
||||
m.schedFetch(sector, storiface.FTCache|storiface.FTUpdateCache, pathType, storiface.AcquireMove),
|
||||
func(ctx context.Context, w Worker) error {
|
||||
_, err := m.waitSimpleCall(ctx)(w.FinalizeReplicaUpdate(ctx, sector, keepUnsealed))
|
||||
_, err := m.waitSimpleCall(ctx)(w.FinalizeReplicaUpdate(ctx, sector))
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
@ -775,9 +781,15 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storiface.Se
|
||||
|
||||
err = multierr.Append(move(storiface.FTUpdate|storiface.FTUpdateCache), move(storiface.FTCache))
|
||||
err = multierr.Append(err, move(storiface.FTSealed)) // Sealed separate from cache just in case ReleaseSectorKey was already called
|
||||
// if we found unsealed files, AND have been asked to keep at least one, move unsealed
|
||||
if moveUnsealed != storiface.FTNone && len(keepUnsealed) != 0 {
|
||||
err = multierr.Append(err, move(moveUnsealed))
|
||||
|
||||
{
|
||||
unsealedStores, ferr := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
|
||||
if err != nil {
|
||||
err = multierr.Append(err, xerrors.Errorf("find unsealed sector before move: %w", ferr))
|
||||
} else if len(unsealedStores) > 0 {
|
||||
// if we found unsealed files, AND have been asked to keep at least one piece, move unsealed
|
||||
err = multierr.Append(err, move(storiface.FTUnsealed))
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
@ -787,16 +799,7 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storiface.Se
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, safeToFree []storiface.Range) error {
|
||||
ssize, err := sector.ProofType.SectorSize()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(safeToFree) == 0 || safeToFree[0].Offset != 0 || safeToFree[0].Size.Padded() != abi.PaddedPieceSize(ssize) {
|
||||
// todo support partial free
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
@ -804,7 +807,25 @@ func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRe
|
||||
return xerrors.Errorf("acquiring sector lock: %w", err)
|
||||
}
|
||||
|
||||
return m.storage.Remove(ctx, sector.ID, storiface.FTUnsealed, true, nil)
|
||||
found, pathType, err := m.sectorStorageType(ctx, sector, storiface.FTUnsealed)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("checking cache storage type: %w", err)
|
||||
}
|
||||
if !found {
|
||||
// already removed
|
||||
return nil
|
||||
}
|
||||
|
||||
selector := newExistingSelector(m.index, sector.ID, storiface.FTUnsealed, false)
|
||||
|
||||
return m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, pathType, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
|
||||
_, err := m.waitSimpleCall(ctx)(w.ReleaseUnsealed(ctx, sector, keepUnsealed))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (m *Manager) ReleaseSectorKey(ctx context.Context, sector storiface.SectorRef) error {
|
||||
|
@ -148,7 +148,7 @@ func TestSimple(t *testing.T) {
|
||||
defer cleanup()
|
||||
|
||||
localTasks := []sealtasks.TaskType{
|
||||
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
|
||||
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFinalizeUnsealed, sealtasks.TTFetch,
|
||||
}
|
||||
|
||||
err := m.AddWorker(ctx, newTestWorker(WorkerConfig{
|
||||
@ -207,7 +207,7 @@ func TestSnapDeals(t *testing.T) {
|
||||
localTasks := []sealtasks.TaskType{
|
||||
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit1, sealtasks.TTCommit2, sealtasks.TTFinalize,
|
||||
sealtasks.TTFetch, sealtasks.TTReplicaUpdate, sealtasks.TTProveReplicaUpdate1, sealtasks.TTProveReplicaUpdate2, sealtasks.TTUnseal,
|
||||
sealtasks.TTRegenSectorKey,
|
||||
sealtasks.TTRegenSectorKey, sealtasks.TTFinalizeUnsealed,
|
||||
}
|
||||
wds := datastore.NewMapDatastore()
|
||||
|
||||
@ -304,13 +304,13 @@ func TestSnapDeals(t *testing.T) {
|
||||
|
||||
fmt.Printf("Decode\n")
|
||||
// Remove unsealed data and decode for retrieval
|
||||
require.NoError(t, m.FinalizeSector(ctx, sid, nil))
|
||||
require.NoError(t, m.ReleaseUnsealed(ctx, sid, nil))
|
||||
startDecode := time.Now()
|
||||
require.NoError(t, m.SectorsUnsealPiece(ctx, sid, 0, p1.Size.Unpadded(), ticket, &out.NewUnsealed))
|
||||
fmt.Printf("Decode duration (%s): %s\n", ss.ShortString(), time.Since(startDecode))
|
||||
|
||||
// Remove just the first piece and decode for retrieval
|
||||
require.NoError(t, m.FinalizeSector(ctx, sid, []storiface.Range{{Offset: p1.Size.Unpadded(), Size: p2.Size.Unpadded()}}))
|
||||
require.NoError(t, m.ReleaseUnsealed(ctx, sid, []storiface.Range{{Offset: p1.Size.Unpadded(), Size: p2.Size.Unpadded()}}))
|
||||
require.NoError(t, m.SectorsUnsealPiece(ctx, sid, 0, p1.Size.Unpadded(), ticket, &out.NewUnsealed))
|
||||
|
||||
fmt.Printf("GSK\n")
|
||||
@ -320,7 +320,7 @@ func TestSnapDeals(t *testing.T) {
|
||||
fmt.Printf("GSK duration (%s): %s\n", ss.ShortString(), time.Since(startGSK))
|
||||
|
||||
fmt.Printf("Remove data\n")
|
||||
require.NoError(t, m.FinalizeSector(ctx, sid, nil))
|
||||
require.NoError(t, m.ReleaseUnsealed(ctx, sid, nil))
|
||||
fmt.Printf("Release Sector Key\n")
|
||||
require.NoError(t, m.ReleaseSectorKey(ctx, sid))
|
||||
fmt.Printf("Unseal Replica\n")
|
||||
@ -336,7 +336,7 @@ func TestSnarkPackV2(t *testing.T) {
|
||||
localTasks := []sealtasks.TaskType{
|
||||
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit1, sealtasks.TTCommit2, sealtasks.TTFinalize,
|
||||
sealtasks.TTFetch, sealtasks.TTReplicaUpdate, sealtasks.TTProveReplicaUpdate1, sealtasks.TTProveReplicaUpdate2, sealtasks.TTUnseal,
|
||||
sealtasks.TTRegenSectorKey,
|
||||
sealtasks.TTRegenSectorKey, sealtasks.TTFinalizeUnsealed,
|
||||
}
|
||||
wds := datastore.NewMapDatastore()
|
||||
|
||||
@ -476,7 +476,7 @@ func TestRedoPC1(t *testing.T) {
|
||||
defer cleanup()
|
||||
|
||||
localTasks := []sealtasks.TaskType{
|
||||
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
|
||||
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFinalizeUnsealed, sealtasks.TTFetch,
|
||||
}
|
||||
|
||||
tw := newTestWorker(WorkerConfig{
|
||||
@ -531,7 +531,7 @@ func TestRestartManager(t *testing.T) {
|
||||
defer cleanup()
|
||||
|
||||
localTasks := []sealtasks.TaskType{
|
||||
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
|
||||
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFinalizeUnsealed, sealtasks.TTFetch,
|
||||
}
|
||||
|
||||
tw := newTestWorker(WorkerConfig{
|
||||
@ -702,7 +702,7 @@ func TestReenableWorker(t *testing.T) {
|
||||
defer cleanup()
|
||||
|
||||
localTasks := []sealtasks.TaskType{
|
||||
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
|
||||
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFinalizeUnsealed, sealtasks.TTFetch,
|
||||
}
|
||||
|
||||
wds := datastore.NewMapDatastore()
|
||||
|
@ -497,15 +497,15 @@ func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof
|
||||
return id, []abi.PieceInfo{pi}, nil
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) FinalizeSector(context.Context, storiface.SectorRef, []storiface.Range) error {
|
||||
func (mgr *SectorMgr) FinalizeSector(context.Context, storiface.SectorRef) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) FinalizeReplicaUpdate(context.Context, storiface.SectorRef, []storiface.Range) error {
|
||||
func (mgr *SectorMgr) FinalizeReplicaUpdate(context.Context, storiface.SectorRef) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, safeToFree []storiface.Range) error {
|
||||
func (mgr *SectorMgr) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -107,7 +107,7 @@ func TestReadPieceRemoteWorkers(t *testing.T) {
|
||||
// the unsealed file from the miner.
|
||||
ppt.addRemoteWorker(t, []sealtasks.TaskType{
|
||||
sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit1,
|
||||
sealtasks.TTFetch, sealtasks.TTFinalize,
|
||||
sealtasks.TTFetch, sealtasks.TTFinalize, sealtasks.TTFinalizeUnsealed,
|
||||
})
|
||||
|
||||
// create a worker that can ONLY unseal and fetch
|
||||
@ -352,7 +352,8 @@ func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.Unpa
|
||||
}
|
||||
|
||||
func (p *pieceProviderTestHarness) finalizeSector(t *testing.T, keepUnseal []storiface.Range) {
|
||||
require.NoError(t, p.mgr.FinalizeSector(p.ctx, p.sector, keepUnseal))
|
||||
require.NoError(t, p.mgr.ReleaseUnsealed(p.ctx, p.sector, keepUnseal))
|
||||
require.NoError(t, p.mgr.FinalizeSector(p.ctx, p.sector))
|
||||
}
|
||||
|
||||
func (p *pieceProviderTestHarness) shutdown(t *testing.T) {
|
||||
|
@ -148,7 +148,6 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
|
||||
|
||||
wg.Wait()
|
||||
|
||||
log.Debugf("SCHED windows: %+v", windows)
|
||||
log.Debugf("SCHED Acceptable win: %+v", acceptableWindows)
|
||||
|
||||
// Step 2
|
||||
|
@ -91,11 +91,11 @@ func (s *schedTestWorker) SealCommit2(ctx context.Context, sector storiface.Sect
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s *schedTestWorker) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) {
|
||||
func (s *schedTestWorker) FinalizeSector(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s *schedTestWorker) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, safeToFree []storiface.Range) (storiface.CallID, error) {
|
||||
func (s *schedTestWorker) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
@ -127,7 +127,7 @@ func (s *schedTestWorker) GenerateSectorKeyFromData(ctx context.Context, sector
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s *schedTestWorker) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) {
|
||||
func (s *schedTestWorker) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,7 @@ const (
|
||||
TTCommit2 TaskType = "seal/v0/commit/2"
|
||||
|
||||
TTFinalize TaskType = "seal/v0/finalize"
|
||||
TTFinalizeUnsealed TaskType = "seal/v0/finalizeunsealed"
|
||||
|
||||
TTFetch TaskType = "seal/v0/fetch"
|
||||
TTUnseal TaskType = "seal/v0/unseal"
|
||||
@ -53,9 +54,10 @@ var order = map[TaskType]int{
|
||||
TTFetch: -1,
|
||||
TTDownloadSector: -2,
|
||||
TTFinalize: -3,
|
||||
TTFinalizeUnsealed: -4,
|
||||
|
||||
TTGenerateWindowPoSt: -4,
|
||||
TTGenerateWinningPoSt: -5, // most priority
|
||||
TTGenerateWindowPoSt: -5,
|
||||
TTGenerateWinningPoSt: -6, // most priority
|
||||
}
|
||||
|
||||
var shortNames = map[TaskType]string{
|
||||
@ -68,6 +70,7 @@ var shortNames = map[TaskType]string{
|
||||
TTCommit2: "C2",
|
||||
|
||||
TTFinalize: "FIN",
|
||||
TTFinalizeUnsealed: "FUS",
|
||||
|
||||
TTFetch: "GET",
|
||||
TTUnseal: "UNS",
|
||||
|
@ -83,7 +83,7 @@ func (t SectorFileType) String() string {
|
||||
case FTUpdateCache:
|
||||
return "update-cache"
|
||||
default:
|
||||
return fmt.Sprintf("<unknown %d>", t)
|
||||
return fmt.Sprintf("<unknown %d %v>", t, (t & ((1 << FileTypes) - 1)).Strings())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -63,13 +63,17 @@ type Sealer interface {
|
||||
SealCommit1(ctx context.Context, sector SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids SectorCids) (Commit1Out, error)
|
||||
SealCommit2(ctx context.Context, sector SectorRef, c1o Commit1Out) (Proof, error)
|
||||
|
||||
FinalizeSector(ctx context.Context, sector SectorRef, keepUnsealed []Range) error
|
||||
FinalizeSector(ctx context.Context, sector SectorRef) error
|
||||
|
||||
// ReleaseUnsealed marks parts of the unsealed sector file as safe to drop
|
||||
// (called by the fsm on restart, allows storage to keep no persistent
|
||||
// state about unsealed fast-retrieval copies)
|
||||
ReleaseUnsealed(ctx context.Context, sector SectorRef, safeToFree []Range) error
|
||||
ReleaseUnsealed(ctx context.Context, sector SectorRef, keepUnsealed []Range) error
|
||||
// ReleaseSectorKey removes `sealed` from all storage
|
||||
// called after successful sector upgrade
|
||||
ReleaseSectorKey(ctx context.Context, sector SectorRef) error
|
||||
// ReleaseReplicaUpgrade removes `update` / `update-cache` from all storage
|
||||
// called when aborting sector upgrade
|
||||
ReleaseReplicaUpgrade(ctx context.Context, sector SectorRef) error
|
||||
|
||||
// Removes all data associated with the specified sector
|
||||
@ -85,7 +89,7 @@ type Sealer interface {
|
||||
// GenerateSectorKeyFromData computes sector key given unsealed data and updated replica
|
||||
GenerateSectorKeyFromData(ctx context.Context, sector SectorRef, unsealed cid.Cid) error
|
||||
|
||||
FinalizeReplicaUpdate(ctx context.Context, sector SectorRef, keepUnsealed []Range) error
|
||||
FinalizeReplicaUpdate(ctx context.Context, sector SectorRef) error
|
||||
|
||||
DownloadSectorData(ctx context.Context, sector SectorRef, finalized bool, src map[SectorFileType]SectorLocation) error
|
||||
}
|
||||
|
@ -124,8 +124,8 @@ type WorkerCalls interface {
|
||||
SealPreCommit2(ctx context.Context, sector SectorRef, pc1o PreCommit1Out) (CallID, error)
|
||||
SealCommit1(ctx context.Context, sector SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids SectorCids) (CallID, error)
|
||||
SealCommit2(ctx context.Context, sector SectorRef, c1o Commit1Out) (CallID, error)
|
||||
FinalizeSector(ctx context.Context, sector SectorRef, keepUnsealed []Range) (CallID, error)
|
||||
FinalizeReplicaUpdate(ctx context.Context, sector SectorRef, keepUnsealed []Range) (CallID, error)
|
||||
FinalizeSector(ctx context.Context, sector SectorRef) (CallID, error)
|
||||
FinalizeReplicaUpdate(ctx context.Context, sector SectorRef) (CallID, error)
|
||||
ReleaseUnsealed(ctx context.Context, sector SectorRef, safeToFree []Range) (CallID, error)
|
||||
ReplicaUpdate(ctx context.Context, sector SectorRef, pieces []abi.PieceInfo) (CallID, error)
|
||||
ProveReplicaUpdate1(ctx context.Context, sector SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (CallID, error)
|
||||
|
@ -61,11 +61,11 @@ func (t *testExec) SealCommit2(ctx context.Context, sector storiface.SectorRef,
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *testExec) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
|
||||
func (t *testExec) FinalizeSector(ctx context.Context, sector storiface.SectorRef) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *testExec) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, safeToFree []storiface.Range) error {
|
||||
func (t *testExec) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
@ -101,7 +101,7 @@ func (t *testExec) GenerateSectorKeyFromData(ctx context.Context, sector storifa
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *testExec) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
|
||||
func (t *testExec) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
|
@ -481,35 +481,36 @@ func (l *LocalWorker) GenerateSectorKeyFromData(ctx context.Context, sector stor
|
||||
})
|
||||
}
|
||||
|
||||
func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) {
|
||||
func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) {
|
||||
sb, err := l.executor()
|
||||
if err != nil {
|
||||
return storiface.UndefCall, err
|
||||
}
|
||||
|
||||
return l.asyncCall(ctx, sector, FinalizeSector, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||
if err := sb.FinalizeSector(ctx, sector, keepUnsealed); err != nil {
|
||||
return nil, xerrors.Errorf("finalizing sector: %w", err)
|
||||
}
|
||||
|
||||
if len(keepUnsealed) == 0 {
|
||||
if err := l.storage.Remove(ctx, sector.ID, storiface.FTUnsealed, true, nil); err != nil {
|
||||
return nil, xerrors.Errorf("removing unsealed data: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil, err
|
||||
return nil, sb.FinalizeSector(ctx, sector)
|
||||
})
|
||||
}
|
||||
|
||||
func (l *LocalWorker) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) {
|
||||
func (l *LocalWorker) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) {
|
||||
sb, err := l.executor()
|
||||
if err != nil {
|
||||
return storiface.UndefCall, err
|
||||
}
|
||||
|
||||
return l.asyncCall(ctx, sector, FinalizeReplicaUpdate, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||
if err := sb.FinalizeReplicaUpdate(ctx, sector, keepUnsealed); err != nil {
|
||||
return nil, sb.FinalizeReplicaUpdate(ctx, sector)
|
||||
})
|
||||
}
|
||||
|
||||
func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) {
|
||||
sb, err := l.executor()
|
||||
if err != nil {
|
||||
return storiface.UndefCall, err
|
||||
}
|
||||
|
||||
return l.asyncCall(ctx, sector, ReleaseUnsealed, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||
if err := sb.ReleaseUnsealed(ctx, sector, keepUnsealed); err != nil {
|
||||
return nil, xerrors.Errorf("finalizing sector: %w", err)
|
||||
}
|
||||
|
||||
@ -523,10 +524,6 @@ func (l *LocalWorker) FinalizeReplicaUpdate(ctx context.Context, sector storifac
|
||||
})
|
||||
}
|
||||
|
||||
func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, safeToFree []storiface.Range) (storiface.CallID, error) {
|
||||
return storiface.UndefCall, xerrors.Errorf("implement me")
|
||||
}
|
||||
|
||||
func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error {
|
||||
var err error
|
||||
|
||||
|
@ -183,8 +183,12 @@ func (t *trackedWorker) SealCommit2(ctx context.Context, sector storiface.Sector
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTCommit2, func() (storiface.CallID, error) { return t.Worker.SealCommit2(ctx, sector, c1o) })
|
||||
}
|
||||
|
||||
func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) {
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalize, func() (storiface.CallID, error) { return t.Worker.FinalizeSector(ctx, sector, keepUnsealed) })
|
||||
func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) {
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalize, func() (storiface.CallID, error) { return t.Worker.FinalizeSector(ctx, sector) })
|
||||
}
|
||||
|
||||
func (t *trackedWorker) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) {
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalizeUnsealed, func() (storiface.CallID, error) { return t.Worker.ReleaseUnsealed(ctx, sector, keepUnsealed) })
|
||||
}
|
||||
|
||||
func (t *trackedWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data) (storiface.CallID, error) {
|
||||
@ -225,8 +229,8 @@ func (t *trackedWorker) ProveReplicaUpdate2(ctx context.Context, sector storifac
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trackedWorker) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) {
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalizeReplicaUpdate, func() (storiface.CallID, error) { return t.Worker.FinalizeReplicaUpdate(ctx, sector, keepUnsealed) })
|
||||
func (t *trackedWorker) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) {
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalizeReplicaUpdate, func() (storiface.CallID, error) { return t.Worker.FinalizeReplicaUpdate(ctx, sector) })
|
||||
}
|
||||
|
||||
var _ Worker = &trackedWorker{}
|
||||
|
Loading…
Reference in New Issue
Block a user