diff --git a/api/api_worker.go b/api/api_worker.go index cca929d39..197ca898d 100644 --- a/api/api_worker.go +++ b/api/api_worker.go @@ -39,13 +39,13 @@ type Worker interface { SealPreCommit2(ctx context.Context, sector storiface.SectorRef, pc1o storiface.PreCommit1Out) (storiface.CallID, error) //perm:admin SealCommit1(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storiface.SectorCids) (storiface.CallID, error) //perm:admin SealCommit2(ctx context.Context, sector storiface.SectorRef, c1o storiface.Commit1Out) (storiface.CallID, error) //perm:admin - FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) //perm:admin - FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) //perm:admin + FinalizeSector(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) //perm:admin + FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) //perm:admin ReplicaUpdate(ctx context.Context, sector storiface.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin ProveReplicaUpdate1(ctx context.Context, sector storiface.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) //perm:admin ProveReplicaUpdate2(ctx context.Context, sector storiface.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storiface.ReplicaVanillaProofs) (storiface.CallID, error) //perm:admin GenerateSectorKeyFromData(ctx context.Context, sector storiface.SectorRef, commD cid.Cid) (storiface.CallID, error) //perm:admin - ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, safeToFree []storiface.Range) (storiface.CallID, error) //perm:admin + ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) //perm:admin MoveStorage(ctx context.Context, sector storiface.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) //perm:admin UnsealPiece(context.Context, storiface.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) //perm:admin Fetch(context.Context, storiface.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) //perm:admin diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 0631a22bd..14d5c999d 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -978,9 +978,9 @@ type WorkerStruct struct { Fetch func(p0 context.Context, p1 storiface.SectorRef, p2 storiface.SectorFileType, p3 storiface.PathType, p4 storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"` - FinalizeReplicaUpdate func(p0 context.Context, p1 storiface.SectorRef, p2 []storiface.Range) (storiface.CallID, error) `perm:"admin"` + FinalizeReplicaUpdate func(p0 context.Context, p1 storiface.SectorRef) (storiface.CallID, error) `perm:"admin"` - FinalizeSector func(p0 context.Context, p1 storiface.SectorRef, p2 []storiface.Range) (storiface.CallID, error) `perm:"admin"` + FinalizeSector func(p0 context.Context, p1 storiface.SectorRef) (storiface.CallID, error) `perm:"admin"` GenerateSectorKeyFromData func(p0 context.Context, p1 storiface.SectorRef, p2 cid.Cid) (storiface.CallID, error) `perm:"admin"` @@ -5689,25 +5689,25 @@ func (s *WorkerStub) Fetch(p0 context.Context, p1 storiface.SectorRef, p2 storif return *new(storiface.CallID), ErrNotSupported } -func (s *WorkerStruct) FinalizeReplicaUpdate(p0 context.Context, p1 storiface.SectorRef, p2 []storiface.Range) (storiface.CallID, error) { +func (s *WorkerStruct) FinalizeReplicaUpdate(p0 context.Context, p1 storiface.SectorRef) (storiface.CallID, error) { if s.Internal.FinalizeReplicaUpdate == nil { return *new(storiface.CallID), ErrNotSupported } - return s.Internal.FinalizeReplicaUpdate(p0, p1, p2) + return s.Internal.FinalizeReplicaUpdate(p0, p1) } -func (s *WorkerStub) FinalizeReplicaUpdate(p0 context.Context, p1 storiface.SectorRef, p2 []storiface.Range) (storiface.CallID, error) { +func (s *WorkerStub) FinalizeReplicaUpdate(p0 context.Context, p1 storiface.SectorRef) (storiface.CallID, error) { return *new(storiface.CallID), ErrNotSupported } -func (s *WorkerStruct) FinalizeSector(p0 context.Context, p1 storiface.SectorRef, p2 []storiface.Range) (storiface.CallID, error) { +func (s *WorkerStruct) FinalizeSector(p0 context.Context, p1 storiface.SectorRef) (storiface.CallID, error) { if s.Internal.FinalizeSector == nil { return *new(storiface.CallID), ErrNotSupported } - return s.Internal.FinalizeSector(p0, p1, p2) + return s.Internal.FinalizeSector(p0, p1) } -func (s *WorkerStub) FinalizeSector(p0 context.Context, p1 storiface.SectorRef, p2 []storiface.Range) (storiface.CallID, error) { +func (s *WorkerStub) FinalizeSector(p0 context.Context, p1 storiface.SectorRef) (storiface.CallID, error) { return *new(storiface.CallID), ErrNotSupported } diff --git a/api/version.go b/api/version.go index 3b50a9502..9c2113578 100644 --- a/api/version.go +++ b/api/version.go @@ -58,7 +58,7 @@ var ( FullAPIVersion1 = newVer(2, 3, 0) MinerAPIVersion0 = newVer(1, 5, 0) - WorkerAPIVersion0 = newVer(1, 6, 0) + WorkerAPIVersion0 = newVer(1, 7, 0) ) //nolint:varcheck,deadcode diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index bef5adfce..21c8bf5ee 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/cmd/lotus-seed/seed/seed.go b/cmd/lotus-seed/seed/seed.go index 3b6359e0f..b98673712 100644 --- a/cmd/lotus-seed/seed/seed.go +++ b/cmd/lotus-seed/seed/seed.go @@ -164,7 +164,7 @@ func presealSector(sb *ffiwrapper.Sealer, sbfs *basicfs.Provider, sid storiface. return nil, xerrors.Errorf("commit: %w", err) } - if err := sb.FinalizeSector(context.TODO(), sid, nil); err != nil { + if err := sb.FinalizeSector(context.TODO(), sid); err != nil { return nil, xerrors.Errorf("trim cache: %w", err) } diff --git a/cmd/lotus-worker/main.go b/cmd/lotus-worker/main.go index afee6f1e1..9422c88a9 100644 --- a/cmd/lotus-worker/main.go +++ b/cmd/lotus-worker/main.go @@ -369,7 +369,7 @@ var runCmd = &cli.Command{ } if workerType == "" { - taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFinalizeReplicaUpdate) + taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTReleaseUnsealed, sealtasks.TTFinalizeReplicaUpdate) if !cctx.Bool("no-default") { workerType = sealtasks.WorkerSealing diff --git a/documentation/en/api-v0-methods-worker.md b/documentation/en/api-v0-methods-worker.md index e53211089..dab251a7c 100644 --- a/documentation/en/api-v0-methods-worker.md +++ b/documentation/en/api-v0-methods-worker.md @@ -1601,13 +1601,7 @@ Inputs: "Number": 9 }, "ProofType": 8 - }, - [ - { - "Offset": 1024, - "Size": 1024 - } - ] + } ] ``` @@ -1636,13 +1630,7 @@ Inputs: "Number": 9 }, "ProofType": 8 - }, - [ - { - "Offset": 1024, - "Size": 1024 - } - ] + } ] ``` diff --git a/itests/kit/node_opts.go b/itests/kit/node_opts.go index 9c482700c..add5b5c8a 100644 --- a/itests/kit/node_opts.go +++ b/itests/kit/node_opts.go @@ -58,7 +58,7 @@ var DefaultNodeOpts = nodeOpts{ sectors: DefaultPresealsPerBootstrapMiner, sectorSize: abi.SectorSize(2 << 10), // 2KiB. - workerTasks: []sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize}, + workerTasks: []sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTReleaseUnsealed}, workerStorageOpt: func(store paths.Store) paths.Store { return store }, } @@ -229,7 +229,7 @@ func WithWorkerName(n string) NodeOpt { } } -var WithSealWorkerTasks = WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal}) +var WithSealWorkerTasks = WithTaskTypes(append([]sealtasks.TaskType{sealtasks.TTAddPiece, sealtasks.TTDataCid, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal}, DefaultNodeOpts.workerTasks...)) func WithWorkerStorage(transform func(paths.Store) paths.Store) NodeOpt { return func(opts *nodeOpts) error { diff --git a/itests/worker_test.go b/itests/worker_test.go index db2a8d24c..b002660f1 100644 --- a/itests/worker_test.go +++ b/itests/worker_test.go @@ -77,7 +77,7 @@ func TestWorkerPledgeLocalFin(t *testing.T) { func TestWorkerDataCid(t *testing.T) { ctx := context.Background() _, miner, worker, _ := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true), - kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTDataCid, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal})) // no mock proofs + kit.WithSealWorkerTasks) // no mock proofs e, err := worker.Enabled(ctx) require.NoError(t, err) @@ -407,7 +407,7 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) { func TestSchedulerRemoveRequest(t *testing.T) { ctx := context.Background() _, miner, worker, _ := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true), - kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTDataCid, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit2, sealtasks.TTUnseal})) // no mock proofs + kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTAddPiece, sealtasks.TTPreCommit1})) // no mock proofs //ens.InterconnectAll().BeginMining(50 * time.Millisecond) diff --git a/itests/worker_upgrade_test.go b/itests/worker_upgrade_test.go index bdbdfb6df..dd4623cea 100644 --- a/itests/worker_upgrade_test.go +++ b/itests/worker_upgrade_test.go @@ -29,7 +29,7 @@ func TestWorkerUpgradeAbortCleanup(t *testing.T) { ens.Worker(miner, &worker, kit.ThroughRPC(), kit.NoStorage(), // no storage to have better control over path settings kit.WithTaskTypes([]sealtasks.TaskType{ sealtasks.TTFetch, sealtasks.TTAddPiece, - sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, + sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTReleaseUnsealed, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTReplicaUpdate, // only first update step, later steps will not run and we'll abort }), ) diff --git a/storage/pipeline/states_failed.go b/storage/pipeline/states_failed.go index 26b8b2d53..d952d8eda 100644 --- a/storage/pipeline/states_failed.go +++ b/storage/pipeline/states_failed.go @@ -431,9 +431,14 @@ func (m *Sealing) handleAbortUpgrade(ctx statemachine.Context, sector SectorInfo return xerrors.Errorf("removing CC update files from sector storage") } - // This removes the unsealed file from all storage, and makes sure sealed/unsealed files only exist in long-term-storage + // This removes the unsealed file from all storage // note: we're not keeping anything unsealed because we're reverting to CC - if err := m.sealer.FinalizeSector(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), []storiface.Range{}); err != nil { + if err := m.sealer.ReleaseUnsealed(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), []storiface.Range{}); err != nil { + log.Error(err) + } + + // and makes sure sealed/cache files only exist in long-term-storage + if err := m.sealer.FinalizeSector(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil { log.Error(err) } diff --git a/storage/pipeline/states_replica_update.go b/storage/pipeline/states_replica_update.go index 6a4708379..0a9db804c 100644 --- a/storage/pipeline/states_replica_update.go +++ b/storage/pipeline/states_replica_update.go @@ -305,7 +305,11 @@ func (m *Sealing) handleFinalizeReplicaUpdate(ctx statemachine.Context, sector S return xerrors.Errorf("getting sealing config: %w", err) } - if err := m.sealer.FinalizeReplicaUpdate(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(sector.Pieces, false, cfg.AlwaysKeepUnsealedCopy)); err != nil { + if err := m.sealer.ReleaseUnsealed(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(sector.Pieces, false, cfg.AlwaysKeepUnsealedCopy)); err != nil { + return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("release unsealed: %w", err)}) + } + + if err := m.sealer.FinalizeReplicaUpdate(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil { return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)}) } diff --git a/storage/pipeline/states_sealing.go b/storage/pipeline/states_sealing.go index 5b5d2e372..0608ead07 100644 --- a/storage/pipeline/states_sealing.go +++ b/storage/pipeline/states_sealing.go @@ -862,7 +862,11 @@ func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorIn return xerrors.Errorf("getting sealing config: %w", err) } - if err := m.sealer.FinalizeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(sector.Pieces, false, cfg.AlwaysKeepUnsealedCopy)); err != nil { + if err := m.sealer.ReleaseUnsealed(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(sector.Pieces, false, cfg.AlwaysKeepUnsealedCopy)); err != nil { + return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("release unsealed: %w", err)}) + } + + if err := m.sealer.FinalizeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil { return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)}) } diff --git a/storage/sealer/ffiwrapper/sealer_cgo.go b/storage/sealer/ffiwrapper/sealer_cgo.go index 67d519259..bfc736d3c 100644 --- a/storage/sealer/ffiwrapper/sealer_cgo.go +++ b/storage/sealer/ffiwrapper/sealer_cgo.go @@ -999,7 +999,7 @@ func (sb *Sealer) ReleaseSealed(ctx context.Context, sector storiface.SectorRef) return xerrors.Errorf("not supported at this layer") } -func (sb *Sealer) freeUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error { +func (sb *Sealer) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error { ssize, err := sector.ProofType.SectorSize() if err != nil { return err @@ -1067,16 +1067,12 @@ func (sb *Sealer) freeUnsealed(ctx context.Context, sector storiface.SectorRef, return nil } -func (sb *Sealer) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error { +func (sb *Sealer) FinalizeSector(ctx context.Context, sector storiface.SectorRef) error { ssize, err := sector.ProofType.SectorSize() if err != nil { return err } - if err := sb.freeUnsealed(ctx, sector, keepUnsealed); err != nil { - return err - } - paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, 0, storiface.PathStorage) if err != nil { return xerrors.Errorf("acquiring sector cache path: %w", err) @@ -1124,16 +1120,12 @@ func (sb *Sealer) FinalizeSectorInto(ctx context.Context, sector storiface.Secto return ffi.ClearCache(uint64(ssize), dest) } -func (sb *Sealer) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error { +func (sb *Sealer) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) error { ssize, err := sector.ProofType.SectorSize() if err != nil { return err } - if err := sb.freeUnsealed(ctx, sector, keepUnsealed); err != nil { - return err - } - { paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, 0, storiface.PathStorage) if err != nil { @@ -1161,16 +1153,6 @@ func (sb *Sealer) FinalizeReplicaUpdate(ctx context.Context, sector storiface.Se return nil } -func (sb *Sealer) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, safeToFree []storiface.Range) error { - // This call is meant to mark storage as 'freeable'. Given that unsealing is - // very expensive, we don't remove data as soon as we can - instead we only - // do that when we don't have free space for data that really needs it - - // This function should not be called at this layer, everything should be - // handled in localworker - return xerrors.Errorf("not supported at this layer") -} - func (sb *Sealer) ReleaseReplicaUpgrade(ctx context.Context, sector storiface.SectorRef) error { return xerrors.Errorf("not supported at this layer") } diff --git a/storage/sealer/manager.go b/storage/sealer/manager.go index 30ed9ca65..c919a4f5c 100644 --- a/storage/sealer/manager.go +++ b/storage/sealer/manager.go @@ -142,7 +142,7 @@ func New(ctx context.Context, lstor *paths.Local, stor paths.Store, ls paths.Loc go m.sched.runSched() localTasks := []sealtasks.TaskType{ - sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTFinalizeReplicaUpdate, + sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTReleaseUnsealed, sealtasks.TTFinalizeReplicaUpdate, } if sc.AllowSectorDownload { localTasks = append(localTasks, sealtasks.TTDownloadSector) @@ -613,7 +613,27 @@ func (m *Manager) SealCommit2(ctx context.Context, sector storiface.SectorRef, p return out, waitErr } -func (m *Manager) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error { +// sectorStorageType tries to figure out storage type for a given sector; expects only a single copy of the file in the +// storage system +func (m *Manager) sectorStorageType(ctx context.Context, sector storiface.SectorRef, ft storiface.SectorFileType) (bool, storiface.PathType, error) { + stores, err := m.index.StorageFindSector(ctx, sector.ID, ft, 0, false) + if err != nil { + return false, "", xerrors.Errorf("finding sector: %w", err) + } + if len(stores) == 0 { + return false, "", nil + } + + for _, store := range stores { + if store.CanSeal { + return true, storiface.PathSealing, nil + } + } + + return true, storiface.PathStorage, nil +} + +func (m *Manager) FinalizeSector(ctx context.Context, sector storiface.SectorRef) error { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -630,25 +650,10 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storiface.SectorRef */ - // first check if the unsealed file exists anywhere; If it doesn't ignore it - unsealed := storiface.FTUnsealed - { - unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false) - if err != nil { - return xerrors.Errorf("finding unsealed sector: %w", err) - } - - if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine - unsealed = storiface.FTNone - } else { - // remove redundant copies if there are any - if err := m.storage.RemoveCopies(ctx, sector.ID, storiface.FTUnsealed); err != nil { - return xerrors.Errorf("remove copies (unsealed): %w", err) - } - } - } - // remove redundant copies if there are any + if err := m.storage.RemoveCopies(ctx, sector.ID, storiface.FTUnsealed); err != nil { + return xerrors.Errorf("remove copies (sealed): %w", err) + } if err := m.storage.RemoveCopies(ctx, sector.ID, storiface.FTSealed); err != nil { return xerrors.Errorf("remove copies (sealed): %w", err) } @@ -658,29 +663,19 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storiface.SectorRef // Make sure that the cache files are still in sealing storage; In case not, // we want to do finalize in long-term storage - cachePathType := storiface.PathStorage - { - sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTCache, 0, false) - if err != nil { - return xerrors.Errorf("finding sealed sector: %w", err) - } - - for _, store := range sealedStores { - if store.CanSeal { - cachePathType = storiface.PathSealing - break - } - } + _, cachePathType, err := m.sectorStorageType(ctx, sector, storiface.FTCache) + if err != nil { + return xerrors.Errorf("checking cache storage type: %w", err) } // do the cache trimming wherever the likely still very large cache lives. // we really don't want to move it. selector := newExistingSelector(m.index, sector.ID, storiface.FTCache, false) - err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, - m.schedFetch(sector, storiface.FTCache|unsealed, cachePathType, storiface.AcquireMove), + err = m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, + m.schedFetch(sector, storiface.FTCache, cachePathType, storiface.AcquireMove), func(ctx context.Context, w Worker) error { - _, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed)) + _, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector)) return err }) if err != nil { @@ -691,9 +686,14 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storiface.SectorRef fetchSel := newMoveSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, storiface.PathStorage, !m.disallowRemoteFinalize) // only move the unsealed file if it still exists and needs moving - moveUnsealed := unsealed + moveUnsealed := storiface.FTUnsealed { - if len(keepUnsealed) == 0 { + found, unsealedPathType, err := m.sectorStorageType(ctx, sector, storiface.FTUnsealed) + if err != nil { + return xerrors.Errorf("checking cache storage type: %w", err) + } + + if !found || unsealedPathType == storiface.PathStorage { moveUnsealed = storiface.FTNone } } @@ -712,7 +712,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storiface.SectorRef return nil } -func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error { +func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) error { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -720,19 +720,6 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storiface.Se return xerrors.Errorf("acquiring sector lock: %w", err) } - // first check if the unsealed file exists anywhere; If it doesn't ignore it - moveUnsealed := storiface.FTUnsealed - { - unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false) - if err != nil { - return xerrors.Errorf("finding unsealed sector: %w", err) - } - - if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine - moveUnsealed = storiface.FTNone - } - } - // Make sure that the update file is still in sealing storage; In case it already // isn't, we want to do finalize in long-term storage pathType := storiface.PathStorage @@ -755,9 +742,9 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storiface.Se selector := newExistingSelector(m.index, sector.ID, storiface.FTUpdateCache, false) err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector, - m.schedFetch(sector, storiface.FTCache|storiface.FTUpdateCache|moveUnsealed, pathType, storiface.AcquireMove), + m.schedFetch(sector, storiface.FTCache|storiface.FTUpdateCache, pathType, storiface.AcquireMove), func(ctx context.Context, w Worker) error { - _, err := m.waitSimpleCall(ctx)(w.FinalizeReplicaUpdate(ctx, sector, keepUnsealed)) + _, err := m.waitSimpleCall(ctx)(w.FinalizeReplicaUpdate(ctx, sector)) return err }) if err != nil { @@ -782,9 +769,15 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storiface.Se err = multierr.Append(move(storiface.FTUpdate|storiface.FTUpdateCache), move(storiface.FTCache)) err = multierr.Append(err, move(storiface.FTSealed)) // Sealed separate from cache just in case ReleaseSectorKey was already called - // if we found unsealed files, AND have been asked to keep at least one, move unsealed - if moveUnsealed != storiface.FTNone && len(keepUnsealed) != 0 { - err = multierr.Append(err, move(moveUnsealed)) + + { + unsealedStores, ferr := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false) + if err != nil { + err = multierr.Append(err, xerrors.Errorf("find unsealed sector before move: %w", ferr)) + } else if len(unsealedStores) > 0 { + // if we found unsealed files, AND have been asked to keep at least one piece, move unsealed + err = multierr.Append(err, move(storiface.FTUnsealed)) + } } if err != nil { @@ -794,16 +787,7 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storiface.Se return nil } -func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, safeToFree []storiface.Range) error { - ssize, err := sector.ProofType.SectorSize() - if err != nil { - return err - } - if len(safeToFree) == 0 || safeToFree[0].Offset != 0 || safeToFree[0].Size.Padded() != abi.PaddedPieceSize(ssize) { - // todo support partial free - return nil - } - +func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -811,7 +795,25 @@ func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRe return xerrors.Errorf("acquiring sector lock: %w", err) } - return m.storage.Remove(ctx, sector.ID, storiface.FTUnsealed, true, nil) + found, pathType, err := m.sectorStorageType(ctx, sector, storiface.FTUnsealed) + if err != nil { + return xerrors.Errorf("checking cache storage type: %w", err) + } + if !found { + // already removed + return nil + } + + selector := newExistingSelector(m.index, sector.ID, storiface.FTUnsealed, false) + + return m.sched.Schedule(ctx, sector, sealtasks.TTReleaseUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, pathType, storiface.AcquireMove), func(ctx context.Context, w Worker) error { + _, err := m.waitSimpleCall(ctx)(w.ReleaseUnsealed(ctx, sector, keepUnsealed)) + if err != nil { + return err + } + + return nil + }) } func (m *Manager) ReleaseSectorKey(ctx context.Context, sector storiface.SectorRef) error { diff --git a/storage/sealer/manager_test.go b/storage/sealer/manager_test.go index 5759d0bc7..a633aaef4 100644 --- a/storage/sealer/manager_test.go +++ b/storage/sealer/manager_test.go @@ -336,7 +336,7 @@ func TestSnarkPackV2(t *testing.T) { localTasks := []sealtasks.TaskType{ sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit1, sealtasks.TTCommit2, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTReplicaUpdate, sealtasks.TTProveReplicaUpdate1, sealtasks.TTProveReplicaUpdate2, sealtasks.TTUnseal, - sealtasks.TTRegenSectorKey, + sealtasks.TTRegenSectorKey, sealtasks.TTReleaseUnsealed, } wds := datastore.NewMapDatastore() diff --git a/storage/sealer/mock/mock.go b/storage/sealer/mock/mock.go index 0797bf549..6e88b86a5 100644 --- a/storage/sealer/mock/mock.go +++ b/storage/sealer/mock/mock.go @@ -497,15 +497,15 @@ func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof return id, []abi.PieceInfo{pi}, nil } -func (mgr *SectorMgr) FinalizeSector(context.Context, storiface.SectorRef, []storiface.Range) error { +func (mgr *SectorMgr) FinalizeSector(context.Context, storiface.SectorRef) error { return nil } -func (mgr *SectorMgr) FinalizeReplicaUpdate(context.Context, storiface.SectorRef, []storiface.Range) error { +func (mgr *SectorMgr) FinalizeReplicaUpdate(context.Context, storiface.SectorRef) error { return nil } -func (mgr *SectorMgr) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, safeToFree []storiface.Range) error { +func (mgr *SectorMgr) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error { return nil } diff --git a/storage/sealer/piece_provider_test.go b/storage/sealer/piece_provider_test.go index 3605b2597..37102560e 100644 --- a/storage/sealer/piece_provider_test.go +++ b/storage/sealer/piece_provider_test.go @@ -107,7 +107,7 @@ func TestReadPieceRemoteWorkers(t *testing.T) { // the unsealed file from the miner. ppt.addRemoteWorker(t, []sealtasks.TaskType{ sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit1, - sealtasks.TTFetch, sealtasks.TTFinalize, + sealtasks.TTFetch, sealtasks.TTFinalize, sealtasks.TTReleaseUnsealed, }) // create a worker that can ONLY unseal and fetch diff --git a/storage/sealer/sched.go b/storage/sealer/sched.go index 335bb1249..3314473be 100644 --- a/storage/sealer/sched.go +++ b/storage/sealer/sched.go @@ -202,6 +202,7 @@ func (sh *Scheduler) Schedule(ctx context.Context, sector storiface.SectorRef, t ret: ret, Ctx: ctx, }: + log.Debugw("sched request sent", "sector", sector, "task", taskType) case <-sh.closing: return xerrors.New("closing") case <-ctx.Done(): diff --git a/storage/sealer/sched_assigner_common.go b/storage/sealer/sched_assigner_common.go index 09ff82a89..d3380fbdc 100644 --- a/storage/sealer/sched_assigner_common.go +++ b/storage/sealer/sched_assigner_common.go @@ -148,7 +148,6 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) { wg.Wait() - log.Debugf("SCHED windows: %+v", windows) log.Debugf("SCHED Acceptable win: %+v", acceptableWindows) // Step 2 diff --git a/storage/sealer/sealtasks/task.go b/storage/sealer/sealtasks/task.go index 1a06f7d34..a12599299 100644 --- a/storage/sealer/sealtasks/task.go +++ b/storage/sealer/sealtasks/task.go @@ -20,7 +20,8 @@ const ( TTCommit1 TaskType = "seal/v0/commit/1" TTCommit2 TaskType = "seal/v0/commit/2" - TTFinalize TaskType = "seal/v0/finalize" + TTFinalize TaskType = "seal/v0/finalize" + TTReleaseUnsealed TaskType = "seal/v0/releaseunsealed" TTFetch TaskType = "seal/v0/fetch" TTUnseal TaskType = "seal/v0/unseal" @@ -50,12 +51,13 @@ var order = map[TaskType]int{ TTCommit1: 2, TTUnseal: 1, - TTFetch: -1, - TTDownloadSector: -2, - TTFinalize: -3, + TTFetch: -1, + TTDownloadSector: -2, + TTFinalize: -3, + TTReleaseUnsealed: -4, - TTGenerateWindowPoSt: -4, - TTGenerateWinningPoSt: -5, // most priority + TTGenerateWindowPoSt: -5, + TTGenerateWinningPoSt: -6, // most priority } var shortNames = map[TaskType]string{ @@ -67,7 +69,8 @@ var shortNames = map[TaskType]string{ TTCommit1: "C1", TTCommit2: "C2", - TTFinalize: "FIN", + TTFinalize: "FIN", + TTReleaseUnsealed: "FUS", TTFetch: "GET", TTUnseal: "UNS", diff --git a/storage/sealer/storiface/storage.go b/storage/sealer/storiface/storage.go index 66f369105..b63c0480d 100644 --- a/storage/sealer/storiface/storage.go +++ b/storage/sealer/storiface/storage.go @@ -63,12 +63,12 @@ type Sealer interface { SealCommit1(ctx context.Context, sector SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids SectorCids) (Commit1Out, error) SealCommit2(ctx context.Context, sector SectorRef, c1o Commit1Out) (Proof, error) - FinalizeSector(ctx context.Context, sector SectorRef, keepUnsealed []Range) error + FinalizeSector(ctx context.Context, sector SectorRef) error // ReleaseUnsealed marks parts of the unsealed sector file as safe to drop // (called by the fsm on restart, allows storage to keep no persistent // state about unsealed fast-retrieval copies) - ReleaseUnsealed(ctx context.Context, sector SectorRef, safeToFree []Range) error + ReleaseUnsealed(ctx context.Context, sector SectorRef, keepUnsealed []Range) error // ReleaseSectorKey removes `sealed` from all storage // called after successful sector upgrade ReleaseSectorKey(ctx context.Context, sector SectorRef) error @@ -89,7 +89,7 @@ type Sealer interface { // GenerateSectorKeyFromData computes sector key given unsealed data and updated replica GenerateSectorKeyFromData(ctx context.Context, sector SectorRef, unsealed cid.Cid) error - FinalizeReplicaUpdate(ctx context.Context, sector SectorRef, keepUnsealed []Range) error + FinalizeReplicaUpdate(ctx context.Context, sector SectorRef) error DownloadSectorData(ctx context.Context, sector SectorRef, finalized bool, src map[SectorFileType]SectorLocation) error } diff --git a/storage/sealer/storiface/worker.go b/storage/sealer/storiface/worker.go index 51a7901b0..3cbf9f737 100644 --- a/storage/sealer/storiface/worker.go +++ b/storage/sealer/storiface/worker.go @@ -124,8 +124,8 @@ type WorkerCalls interface { SealPreCommit2(ctx context.Context, sector SectorRef, pc1o PreCommit1Out) (CallID, error) SealCommit1(ctx context.Context, sector SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids SectorCids) (CallID, error) SealCommit2(ctx context.Context, sector SectorRef, c1o Commit1Out) (CallID, error) - FinalizeSector(ctx context.Context, sector SectorRef, keepUnsealed []Range) (CallID, error) - FinalizeReplicaUpdate(ctx context.Context, sector SectorRef, keepUnsealed []Range) (CallID, error) + FinalizeSector(ctx context.Context, sector SectorRef) (CallID, error) + FinalizeReplicaUpdate(ctx context.Context, sector SectorRef) (CallID, error) ReleaseUnsealed(ctx context.Context, sector SectorRef, safeToFree []Range) (CallID, error) ReplicaUpdate(ctx context.Context, sector SectorRef, pieces []abi.PieceInfo) (CallID, error) ProveReplicaUpdate1(ctx context.Context, sector SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (CallID, error) diff --git a/storage/sealer/worker_local.go b/storage/sealer/worker_local.go index 326f38366..7f141780c 100644 --- a/storage/sealer/worker_local.go +++ b/storage/sealer/worker_local.go @@ -481,35 +481,36 @@ func (l *LocalWorker) GenerateSectorKeyFromData(ctx context.Context, sector stor }) } -func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) { +func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) { sb, err := l.executor() if err != nil { return storiface.UndefCall, err } return l.asyncCall(ctx, sector, FinalizeSector, func(ctx context.Context, ci storiface.CallID) (interface{}, error) { - if err := sb.FinalizeSector(ctx, sector, keepUnsealed); err != nil { - return nil, xerrors.Errorf("finalizing sector: %w", err) - } - - if len(keepUnsealed) == 0 { - if err := l.storage.Remove(ctx, sector.ID, storiface.FTUnsealed, true, nil); err != nil { - return nil, xerrors.Errorf("removing unsealed data: %w", err) - } - } - - return nil, err + return nil, sb.FinalizeSector(ctx, sector) }) } -func (l *LocalWorker) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) { +func (l *LocalWorker) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) { sb, err := l.executor() if err != nil { return storiface.UndefCall, err } return l.asyncCall(ctx, sector, FinalizeReplicaUpdate, func(ctx context.Context, ci storiface.CallID) (interface{}, error) { - if err := sb.FinalizeReplicaUpdate(ctx, sector, keepUnsealed); err != nil { + return nil, sb.FinalizeReplicaUpdate(ctx, sector) + }) +} + +func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) { + sb, err := l.executor() + if err != nil { + return storiface.UndefCall, err + } + + return l.asyncCall(ctx, sector, ReleaseUnsealed, func(ctx context.Context, ci storiface.CallID) (interface{}, error) { + if err := sb.ReleaseUnsealed(ctx, sector, keepUnsealed); err != nil { return nil, xerrors.Errorf("finalizing sector: %w", err) } @@ -523,10 +524,6 @@ func (l *LocalWorker) FinalizeReplicaUpdate(ctx context.Context, sector storifac }) } -func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, safeToFree []storiface.Range) (storiface.CallID, error) { - return storiface.UndefCall, xerrors.Errorf("implement me") -} - func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error { var err error diff --git a/storage/sealer/worker_tracked.go b/storage/sealer/worker_tracked.go index 970ba9a69..2df976f49 100644 --- a/storage/sealer/worker_tracked.go +++ b/storage/sealer/worker_tracked.go @@ -183,8 +183,12 @@ func (t *trackedWorker) SealCommit2(ctx context.Context, sector storiface.Sector return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTCommit2, func() (storiface.CallID, error) { return t.Worker.SealCommit2(ctx, sector, c1o) }) } -func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) { - return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalize, func() (storiface.CallID, error) { return t.Worker.FinalizeSector(ctx, sector, keepUnsealed) }) +func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) { + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalize, func() (storiface.CallID, error) { return t.Worker.FinalizeSector(ctx, sector) }) +} + +func (t *trackedWorker) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) { + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTReleaseUnsealed, func() (storiface.CallID, error) { return t.Worker.ReleaseUnsealed(ctx, sector, keepUnsealed) }) } func (t *trackedWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data) (storiface.CallID, error) { @@ -225,8 +229,8 @@ func (t *trackedWorker) ProveReplicaUpdate2(ctx context.Context, sector storifac }) } -func (t *trackedWorker) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) { - return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalizeReplicaUpdate, func() (storiface.CallID, error) { return t.Worker.FinalizeReplicaUpdate(ctx, sector, keepUnsealed) }) +func (t *trackedWorker) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) { + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalizeReplicaUpdate, func() (storiface.CallID, error) { return t.Worker.FinalizeReplicaUpdate(ctx, sector) }) } var _ Worker = &trackedWorker{}