sealing: Split unsealed cleanup from Finalize
This commit is contained in:
parent
0bba2bd1ba
commit
87e5549af5
@ -39,13 +39,13 @@ type Worker interface {
|
|||||||
SealPreCommit2(ctx context.Context, sector storiface.SectorRef, pc1o storiface.PreCommit1Out) (storiface.CallID, error) //perm:admin
|
SealPreCommit2(ctx context.Context, sector storiface.SectorRef, pc1o storiface.PreCommit1Out) (storiface.CallID, error) //perm:admin
|
||||||
SealCommit1(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storiface.SectorCids) (storiface.CallID, error) //perm:admin
|
SealCommit1(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storiface.SectorCids) (storiface.CallID, error) //perm:admin
|
||||||
SealCommit2(ctx context.Context, sector storiface.SectorRef, c1o storiface.Commit1Out) (storiface.CallID, error) //perm:admin
|
SealCommit2(ctx context.Context, sector storiface.SectorRef, c1o storiface.Commit1Out) (storiface.CallID, error) //perm:admin
|
||||||
FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) //perm:admin
|
FinalizeSector(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) //perm:admin
|
||||||
FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) //perm:admin
|
FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) //perm:admin
|
||||||
ReplicaUpdate(ctx context.Context, sector storiface.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
|
ReplicaUpdate(ctx context.Context, sector storiface.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
|
||||||
ProveReplicaUpdate1(ctx context.Context, sector storiface.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) //perm:admin
|
ProveReplicaUpdate1(ctx context.Context, sector storiface.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) //perm:admin
|
||||||
ProveReplicaUpdate2(ctx context.Context, sector storiface.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storiface.ReplicaVanillaProofs) (storiface.CallID, error) //perm:admin
|
ProveReplicaUpdate2(ctx context.Context, sector storiface.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storiface.ReplicaVanillaProofs) (storiface.CallID, error) //perm:admin
|
||||||
GenerateSectorKeyFromData(ctx context.Context, sector storiface.SectorRef, commD cid.Cid) (storiface.CallID, error) //perm:admin
|
GenerateSectorKeyFromData(ctx context.Context, sector storiface.SectorRef, commD cid.Cid) (storiface.CallID, error) //perm:admin
|
||||||
ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, safeToFree []storiface.Range) (storiface.CallID, error) //perm:admin
|
ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) //perm:admin
|
||||||
MoveStorage(ctx context.Context, sector storiface.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) //perm:admin
|
MoveStorage(ctx context.Context, sector storiface.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) //perm:admin
|
||||||
UnsealPiece(context.Context, storiface.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) //perm:admin
|
UnsealPiece(context.Context, storiface.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) //perm:admin
|
||||||
Fetch(context.Context, storiface.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) //perm:admin
|
Fetch(context.Context, storiface.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) //perm:admin
|
||||||
|
@ -978,9 +978,9 @@ type WorkerStruct struct {
|
|||||||
|
|
||||||
Fetch func(p0 context.Context, p1 storiface.SectorRef, p2 storiface.SectorFileType, p3 storiface.PathType, p4 storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"`
|
Fetch func(p0 context.Context, p1 storiface.SectorRef, p2 storiface.SectorFileType, p3 storiface.PathType, p4 storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"`
|
||||||
|
|
||||||
FinalizeReplicaUpdate func(p0 context.Context, p1 storiface.SectorRef, p2 []storiface.Range) (storiface.CallID, error) `perm:"admin"`
|
FinalizeReplicaUpdate func(p0 context.Context, p1 storiface.SectorRef) (storiface.CallID, error) `perm:"admin"`
|
||||||
|
|
||||||
FinalizeSector func(p0 context.Context, p1 storiface.SectorRef, p2 []storiface.Range) (storiface.CallID, error) `perm:"admin"`
|
FinalizeSector func(p0 context.Context, p1 storiface.SectorRef) (storiface.CallID, error) `perm:"admin"`
|
||||||
|
|
||||||
GenerateSectorKeyFromData func(p0 context.Context, p1 storiface.SectorRef, p2 cid.Cid) (storiface.CallID, error) `perm:"admin"`
|
GenerateSectorKeyFromData func(p0 context.Context, p1 storiface.SectorRef, p2 cid.Cid) (storiface.CallID, error) `perm:"admin"`
|
||||||
|
|
||||||
@ -5689,25 +5689,25 @@ func (s *WorkerStub) Fetch(p0 context.Context, p1 storiface.SectorRef, p2 storif
|
|||||||
return *new(storiface.CallID), ErrNotSupported
|
return *new(storiface.CallID), ErrNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WorkerStruct) FinalizeReplicaUpdate(p0 context.Context, p1 storiface.SectorRef, p2 []storiface.Range) (storiface.CallID, error) {
|
func (s *WorkerStruct) FinalizeReplicaUpdate(p0 context.Context, p1 storiface.SectorRef) (storiface.CallID, error) {
|
||||||
if s.Internal.FinalizeReplicaUpdate == nil {
|
if s.Internal.FinalizeReplicaUpdate == nil {
|
||||||
return *new(storiface.CallID), ErrNotSupported
|
return *new(storiface.CallID), ErrNotSupported
|
||||||
}
|
}
|
||||||
return s.Internal.FinalizeReplicaUpdate(p0, p1, p2)
|
return s.Internal.FinalizeReplicaUpdate(p0, p1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WorkerStub) FinalizeReplicaUpdate(p0 context.Context, p1 storiface.SectorRef, p2 []storiface.Range) (storiface.CallID, error) {
|
func (s *WorkerStub) FinalizeReplicaUpdate(p0 context.Context, p1 storiface.SectorRef) (storiface.CallID, error) {
|
||||||
return *new(storiface.CallID), ErrNotSupported
|
return *new(storiface.CallID), ErrNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WorkerStruct) FinalizeSector(p0 context.Context, p1 storiface.SectorRef, p2 []storiface.Range) (storiface.CallID, error) {
|
func (s *WorkerStruct) FinalizeSector(p0 context.Context, p1 storiface.SectorRef) (storiface.CallID, error) {
|
||||||
if s.Internal.FinalizeSector == nil {
|
if s.Internal.FinalizeSector == nil {
|
||||||
return *new(storiface.CallID), ErrNotSupported
|
return *new(storiface.CallID), ErrNotSupported
|
||||||
}
|
}
|
||||||
return s.Internal.FinalizeSector(p0, p1, p2)
|
return s.Internal.FinalizeSector(p0, p1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WorkerStub) FinalizeSector(p0 context.Context, p1 storiface.SectorRef, p2 []storiface.Range) (storiface.CallID, error) {
|
func (s *WorkerStub) FinalizeSector(p0 context.Context, p1 storiface.SectorRef) (storiface.CallID, error) {
|
||||||
return *new(storiface.CallID), ErrNotSupported
|
return *new(storiface.CallID), ErrNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,7 +58,7 @@ var (
|
|||||||
FullAPIVersion1 = newVer(2, 3, 0)
|
FullAPIVersion1 = newVer(2, 3, 0)
|
||||||
|
|
||||||
MinerAPIVersion0 = newVer(1, 5, 0)
|
MinerAPIVersion0 = newVer(1, 5, 0)
|
||||||
WorkerAPIVersion0 = newVer(1, 6, 0)
|
WorkerAPIVersion0 = newVer(1, 7, 0)
|
||||||
)
|
)
|
||||||
|
|
||||||
//nolint:varcheck,deadcode
|
//nolint:varcheck,deadcode
|
||||||
|
Binary file not shown.
@ -164,7 +164,7 @@ func presealSector(sb *ffiwrapper.Sealer, sbfs *basicfs.Provider, sid storiface.
|
|||||||
return nil, xerrors.Errorf("commit: %w", err)
|
return nil, xerrors.Errorf("commit: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := sb.FinalizeSector(context.TODO(), sid, nil); err != nil {
|
if err := sb.FinalizeSector(context.TODO(), sid); err != nil {
|
||||||
return nil, xerrors.Errorf("trim cache: %w", err)
|
return nil, xerrors.Errorf("trim cache: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -369,7 +369,7 @@ var runCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
|
|
||||||
if workerType == "" {
|
if workerType == "" {
|
||||||
taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFinalizeReplicaUpdate)
|
taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTReleaseUnsealed, sealtasks.TTFinalizeReplicaUpdate)
|
||||||
|
|
||||||
if !cctx.Bool("no-default") {
|
if !cctx.Bool("no-default") {
|
||||||
workerType = sealtasks.WorkerSealing
|
workerType = sealtasks.WorkerSealing
|
||||||
|
@ -1601,13 +1601,7 @@ Inputs:
|
|||||||
"Number": 9
|
"Number": 9
|
||||||
},
|
},
|
||||||
"ProofType": 8
|
"ProofType": 8
|
||||||
},
|
}
|
||||||
[
|
|
||||||
{
|
|
||||||
"Offset": 1024,
|
|
||||||
"Size": 1024
|
|
||||||
}
|
|
||||||
]
|
|
||||||
]
|
]
|
||||||
```
|
```
|
||||||
|
|
||||||
@ -1636,13 +1630,7 @@ Inputs:
|
|||||||
"Number": 9
|
"Number": 9
|
||||||
},
|
},
|
||||||
"ProofType": 8
|
"ProofType": 8
|
||||||
},
|
}
|
||||||
[
|
|
||||||
{
|
|
||||||
"Offset": 1024,
|
|
||||||
"Size": 1024
|
|
||||||
}
|
|
||||||
]
|
|
||||||
]
|
]
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -58,7 +58,7 @@ var DefaultNodeOpts = nodeOpts{
|
|||||||
sectors: DefaultPresealsPerBootstrapMiner,
|
sectors: DefaultPresealsPerBootstrapMiner,
|
||||||
sectorSize: abi.SectorSize(2 << 10), // 2KiB.
|
sectorSize: abi.SectorSize(2 << 10), // 2KiB.
|
||||||
|
|
||||||
workerTasks: []sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize},
|
workerTasks: []sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTReleaseUnsealed},
|
||||||
workerStorageOpt: func(store paths.Store) paths.Store { return store },
|
workerStorageOpt: func(store paths.Store) paths.Store { return store },
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -229,7 +229,7 @@ func WithWorkerName(n string) NodeOpt {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var WithSealWorkerTasks = WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal})
|
var WithSealWorkerTasks = WithTaskTypes(append([]sealtasks.TaskType{sealtasks.TTAddPiece, sealtasks.TTDataCid, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal}, DefaultNodeOpts.workerTasks...))
|
||||||
|
|
||||||
func WithWorkerStorage(transform func(paths.Store) paths.Store) NodeOpt {
|
func WithWorkerStorage(transform func(paths.Store) paths.Store) NodeOpt {
|
||||||
return func(opts *nodeOpts) error {
|
return func(opts *nodeOpts) error {
|
||||||
|
@ -77,7 +77,7 @@ func TestWorkerPledgeLocalFin(t *testing.T) {
|
|||||||
func TestWorkerDataCid(t *testing.T) {
|
func TestWorkerDataCid(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
_, miner, worker, _ := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
|
_, miner, worker, _ := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
|
||||||
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTDataCid, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal})) // no mock proofs
|
kit.WithSealWorkerTasks) // no mock proofs
|
||||||
|
|
||||||
e, err := worker.Enabled(ctx)
|
e, err := worker.Enabled(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -407,7 +407,7 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
|
|||||||
func TestSchedulerRemoveRequest(t *testing.T) {
|
func TestSchedulerRemoveRequest(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
_, miner, worker, _ := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
|
_, miner, worker, _ := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
|
||||||
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTDataCid, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit2, sealtasks.TTUnseal})) // no mock proofs
|
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTAddPiece, sealtasks.TTPreCommit1})) // no mock proofs
|
||||||
|
|
||||||
//ens.InterconnectAll().BeginMining(50 * time.Millisecond)
|
//ens.InterconnectAll().BeginMining(50 * time.Millisecond)
|
||||||
|
|
||||||
|
@ -29,7 +29,7 @@ func TestWorkerUpgradeAbortCleanup(t *testing.T) {
|
|||||||
ens.Worker(miner, &worker, kit.ThroughRPC(), kit.NoStorage(), // no storage to have better control over path settings
|
ens.Worker(miner, &worker, kit.ThroughRPC(), kit.NoStorage(), // no storage to have better control over path settings
|
||||||
kit.WithTaskTypes([]sealtasks.TaskType{
|
kit.WithTaskTypes([]sealtasks.TaskType{
|
||||||
sealtasks.TTFetch, sealtasks.TTAddPiece,
|
sealtasks.TTFetch, sealtasks.TTAddPiece,
|
||||||
sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2,
|
sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTReleaseUnsealed, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2,
|
||||||
sealtasks.TTReplicaUpdate, // only first update step, later steps will not run and we'll abort
|
sealtasks.TTReplicaUpdate, // only first update step, later steps will not run and we'll abort
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
@ -431,9 +431,14 @@ func (m *Sealing) handleAbortUpgrade(ctx statemachine.Context, sector SectorInfo
|
|||||||
return xerrors.Errorf("removing CC update files from sector storage")
|
return xerrors.Errorf("removing CC update files from sector storage")
|
||||||
}
|
}
|
||||||
|
|
||||||
// This removes the unsealed file from all storage, and makes sure sealed/unsealed files only exist in long-term-storage
|
// This removes the unsealed file from all storage
|
||||||
// note: we're not keeping anything unsealed because we're reverting to CC
|
// note: we're not keeping anything unsealed because we're reverting to CC
|
||||||
if err := m.sealer.FinalizeSector(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), []storiface.Range{}); err != nil {
|
if err := m.sealer.ReleaseUnsealed(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), []storiface.Range{}); err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// and makes sure sealed/cache files only exist in long-term-storage
|
||||||
|
if err := m.sealer.FinalizeSector(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -305,7 +305,11 @@ func (m *Sealing) handleFinalizeReplicaUpdate(ctx statemachine.Context, sector S
|
|||||||
return xerrors.Errorf("getting sealing config: %w", err)
|
return xerrors.Errorf("getting sealing config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := m.sealer.FinalizeReplicaUpdate(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(sector.Pieces, false, cfg.AlwaysKeepUnsealedCopy)); err != nil {
|
if err := m.sealer.ReleaseUnsealed(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(sector.Pieces, false, cfg.AlwaysKeepUnsealedCopy)); err != nil {
|
||||||
|
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("release unsealed: %w", err)})
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := m.sealer.FinalizeReplicaUpdate(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil {
|
||||||
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
|
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -862,7 +862,11 @@ func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorIn
|
|||||||
return xerrors.Errorf("getting sealing config: %w", err)
|
return xerrors.Errorf("getting sealing config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := m.sealer.FinalizeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(sector.Pieces, false, cfg.AlwaysKeepUnsealedCopy)); err != nil {
|
if err := m.sealer.ReleaseUnsealed(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(sector.Pieces, false, cfg.AlwaysKeepUnsealedCopy)); err != nil {
|
||||||
|
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("release unsealed: %w", err)})
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := m.sealer.FinalizeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil {
|
||||||
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
|
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -999,7 +999,7 @@ func (sb *Sealer) ReleaseSealed(ctx context.Context, sector storiface.SectorRef)
|
|||||||
return xerrors.Errorf("not supported at this layer")
|
return xerrors.Errorf("not supported at this layer")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sb *Sealer) freeUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
|
func (sb *Sealer) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
|
||||||
ssize, err := sector.ProofType.SectorSize()
|
ssize, err := sector.ProofType.SectorSize()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -1067,16 +1067,12 @@ func (sb *Sealer) freeUnsealed(ctx context.Context, sector storiface.SectorRef,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sb *Sealer) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
|
func (sb *Sealer) FinalizeSector(ctx context.Context, sector storiface.SectorRef) error {
|
||||||
ssize, err := sector.ProofType.SectorSize()
|
ssize, err := sector.ProofType.SectorSize()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := sb.freeUnsealed(ctx, sector, keepUnsealed); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, 0, storiface.PathStorage)
|
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, 0, storiface.PathStorage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("acquiring sector cache path: %w", err)
|
return xerrors.Errorf("acquiring sector cache path: %w", err)
|
||||||
@ -1124,16 +1120,12 @@ func (sb *Sealer) FinalizeSectorInto(ctx context.Context, sector storiface.Secto
|
|||||||
return ffi.ClearCache(uint64(ssize), dest)
|
return ffi.ClearCache(uint64(ssize), dest)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sb *Sealer) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
|
func (sb *Sealer) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) error {
|
||||||
ssize, err := sector.ProofType.SectorSize()
|
ssize, err := sector.ProofType.SectorSize()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := sb.freeUnsealed(ctx, sector, keepUnsealed); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
{
|
||||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, 0, storiface.PathStorage)
|
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, 0, storiface.PathStorage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1161,16 +1153,6 @@ func (sb *Sealer) FinalizeReplicaUpdate(ctx context.Context, sector storiface.Se
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sb *Sealer) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, safeToFree []storiface.Range) error {
|
|
||||||
// This call is meant to mark storage as 'freeable'. Given that unsealing is
|
|
||||||
// very expensive, we don't remove data as soon as we can - instead we only
|
|
||||||
// do that when we don't have free space for data that really needs it
|
|
||||||
|
|
||||||
// This function should not be called at this layer, everything should be
|
|
||||||
// handled in localworker
|
|
||||||
return xerrors.Errorf("not supported at this layer")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sb *Sealer) ReleaseReplicaUpgrade(ctx context.Context, sector storiface.SectorRef) error {
|
func (sb *Sealer) ReleaseReplicaUpgrade(ctx context.Context, sector storiface.SectorRef) error {
|
||||||
return xerrors.Errorf("not supported at this layer")
|
return xerrors.Errorf("not supported at this layer")
|
||||||
}
|
}
|
||||||
|
@ -142,7 +142,7 @@ func New(ctx context.Context, lstor *paths.Local, stor paths.Store, ls paths.Loc
|
|||||||
go m.sched.runSched()
|
go m.sched.runSched()
|
||||||
|
|
||||||
localTasks := []sealtasks.TaskType{
|
localTasks := []sealtasks.TaskType{
|
||||||
sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTFinalizeReplicaUpdate,
|
sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTReleaseUnsealed, sealtasks.TTFinalizeReplicaUpdate,
|
||||||
}
|
}
|
||||||
if sc.AllowSectorDownload {
|
if sc.AllowSectorDownload {
|
||||||
localTasks = append(localTasks, sealtasks.TTDownloadSector)
|
localTasks = append(localTasks, sealtasks.TTDownloadSector)
|
||||||
@ -613,7 +613,27 @@ func (m *Manager) SealCommit2(ctx context.Context, sector storiface.SectorRef, p
|
|||||||
return out, waitErr
|
return out, waitErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
|
// sectorStorageType tries to figure out storage type for a given sector; expects only a single copy of the file in the
|
||||||
|
// storage system
|
||||||
|
func (m *Manager) sectorStorageType(ctx context.Context, sector storiface.SectorRef, ft storiface.SectorFileType) (bool, storiface.PathType, error) {
|
||||||
|
stores, err := m.index.StorageFindSector(ctx, sector.ID, ft, 0, false)
|
||||||
|
if err != nil {
|
||||||
|
return false, "", xerrors.Errorf("finding sector: %w", err)
|
||||||
|
}
|
||||||
|
if len(stores) == 0 {
|
||||||
|
return false, "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, store := range stores {
|
||||||
|
if store.CanSeal {
|
||||||
|
return true, storiface.PathSealing, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, storiface.PathStorage, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) FinalizeSector(ctx context.Context, sector storiface.SectorRef) error {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
@ -630,25 +650,10 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storiface.SectorRef
|
|||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
// first check if the unsealed file exists anywhere; If it doesn't ignore it
|
|
||||||
unsealed := storiface.FTUnsealed
|
|
||||||
{
|
|
||||||
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("finding unsealed sector: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine
|
|
||||||
unsealed = storiface.FTNone
|
|
||||||
} else {
|
|
||||||
// remove redundant copies if there are any
|
|
||||||
if err := m.storage.RemoveCopies(ctx, sector.ID, storiface.FTUnsealed); err != nil {
|
|
||||||
return xerrors.Errorf("remove copies (unsealed): %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// remove redundant copies if there are any
|
// remove redundant copies if there are any
|
||||||
|
if err := m.storage.RemoveCopies(ctx, sector.ID, storiface.FTUnsealed); err != nil {
|
||||||
|
return xerrors.Errorf("remove copies (sealed): %w", err)
|
||||||
|
}
|
||||||
if err := m.storage.RemoveCopies(ctx, sector.ID, storiface.FTSealed); err != nil {
|
if err := m.storage.RemoveCopies(ctx, sector.ID, storiface.FTSealed); err != nil {
|
||||||
return xerrors.Errorf("remove copies (sealed): %w", err)
|
return xerrors.Errorf("remove copies (sealed): %w", err)
|
||||||
}
|
}
|
||||||
@ -658,29 +663,19 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storiface.SectorRef
|
|||||||
|
|
||||||
// Make sure that the cache files are still in sealing storage; In case not,
|
// Make sure that the cache files are still in sealing storage; In case not,
|
||||||
// we want to do finalize in long-term storage
|
// we want to do finalize in long-term storage
|
||||||
cachePathType := storiface.PathStorage
|
_, cachePathType, err := m.sectorStorageType(ctx, sector, storiface.FTCache)
|
||||||
{
|
if err != nil {
|
||||||
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTCache, 0, false)
|
return xerrors.Errorf("checking cache storage type: %w", err)
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("finding sealed sector: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, store := range sealedStores {
|
|
||||||
if store.CanSeal {
|
|
||||||
cachePathType = storiface.PathSealing
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// do the cache trimming wherever the likely still very large cache lives.
|
// do the cache trimming wherever the likely still very large cache lives.
|
||||||
// we really don't want to move it.
|
// we really don't want to move it.
|
||||||
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache, false)
|
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache, false)
|
||||||
|
|
||||||
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
||||||
m.schedFetch(sector, storiface.FTCache|unsealed, cachePathType, storiface.AcquireMove),
|
m.schedFetch(sector, storiface.FTCache, cachePathType, storiface.AcquireMove),
|
||||||
func(ctx context.Context, w Worker) error {
|
func(ctx context.Context, w Worker) error {
|
||||||
_, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed))
|
_, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector))
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -691,9 +686,14 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storiface.SectorRef
|
|||||||
fetchSel := newMoveSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, storiface.PathStorage, !m.disallowRemoteFinalize)
|
fetchSel := newMoveSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, storiface.PathStorage, !m.disallowRemoteFinalize)
|
||||||
|
|
||||||
// only move the unsealed file if it still exists and needs moving
|
// only move the unsealed file if it still exists and needs moving
|
||||||
moveUnsealed := unsealed
|
moveUnsealed := storiface.FTUnsealed
|
||||||
{
|
{
|
||||||
if len(keepUnsealed) == 0 {
|
found, unsealedPathType, err := m.sectorStorageType(ctx, sector, storiface.FTUnsealed)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("checking cache storage type: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !found || unsealedPathType == storiface.PathStorage {
|
||||||
moveUnsealed = storiface.FTNone
|
moveUnsealed = storiface.FTNone
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -712,7 +712,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storiface.SectorRef
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
|
func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) error {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
@ -720,19 +720,6 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storiface.Se
|
|||||||
return xerrors.Errorf("acquiring sector lock: %w", err)
|
return xerrors.Errorf("acquiring sector lock: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// first check if the unsealed file exists anywhere; If it doesn't ignore it
|
|
||||||
moveUnsealed := storiface.FTUnsealed
|
|
||||||
{
|
|
||||||
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("finding unsealed sector: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine
|
|
||||||
moveUnsealed = storiface.FTNone
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make sure that the update file is still in sealing storage; In case it already
|
// Make sure that the update file is still in sealing storage; In case it already
|
||||||
// isn't, we want to do finalize in long-term storage
|
// isn't, we want to do finalize in long-term storage
|
||||||
pathType := storiface.PathStorage
|
pathType := storiface.PathStorage
|
||||||
@ -755,9 +742,9 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storiface.Se
|
|||||||
selector := newExistingSelector(m.index, sector.ID, storiface.FTUpdateCache, false)
|
selector := newExistingSelector(m.index, sector.ID, storiface.FTUpdateCache, false)
|
||||||
|
|
||||||
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector,
|
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector,
|
||||||
m.schedFetch(sector, storiface.FTCache|storiface.FTUpdateCache|moveUnsealed, pathType, storiface.AcquireMove),
|
m.schedFetch(sector, storiface.FTCache|storiface.FTUpdateCache, pathType, storiface.AcquireMove),
|
||||||
func(ctx context.Context, w Worker) error {
|
func(ctx context.Context, w Worker) error {
|
||||||
_, err := m.waitSimpleCall(ctx)(w.FinalizeReplicaUpdate(ctx, sector, keepUnsealed))
|
_, err := m.waitSimpleCall(ctx)(w.FinalizeReplicaUpdate(ctx, sector))
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -782,9 +769,15 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storiface.Se
|
|||||||
|
|
||||||
err = multierr.Append(move(storiface.FTUpdate|storiface.FTUpdateCache), move(storiface.FTCache))
|
err = multierr.Append(move(storiface.FTUpdate|storiface.FTUpdateCache), move(storiface.FTCache))
|
||||||
err = multierr.Append(err, move(storiface.FTSealed)) // Sealed separate from cache just in case ReleaseSectorKey was already called
|
err = multierr.Append(err, move(storiface.FTSealed)) // Sealed separate from cache just in case ReleaseSectorKey was already called
|
||||||
// if we found unsealed files, AND have been asked to keep at least one, move unsealed
|
|
||||||
if moveUnsealed != storiface.FTNone && len(keepUnsealed) != 0 {
|
{
|
||||||
err = multierr.Append(err, move(moveUnsealed))
|
unsealedStores, ferr := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
|
||||||
|
if err != nil {
|
||||||
|
err = multierr.Append(err, xerrors.Errorf("find unsealed sector before move: %w", ferr))
|
||||||
|
} else if len(unsealedStores) > 0 {
|
||||||
|
// if we found unsealed files, AND have been asked to keep at least one piece, move unsealed
|
||||||
|
err = multierr.Append(err, move(storiface.FTUnsealed))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -794,16 +787,7 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storiface.Se
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, safeToFree []storiface.Range) error {
|
func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
|
||||||
ssize, err := sector.ProofType.SectorSize()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if len(safeToFree) == 0 || safeToFree[0].Offset != 0 || safeToFree[0].Size.Padded() != abi.PaddedPieceSize(ssize) {
|
|
||||||
// todo support partial free
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
@ -811,7 +795,25 @@ func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRe
|
|||||||
return xerrors.Errorf("acquiring sector lock: %w", err)
|
return xerrors.Errorf("acquiring sector lock: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return m.storage.Remove(ctx, sector.ID, storiface.FTUnsealed, true, nil)
|
found, pathType, err := m.sectorStorageType(ctx, sector, storiface.FTUnsealed)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("checking cache storage type: %w", err)
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
// already removed
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
selector := newExistingSelector(m.index, sector.ID, storiface.FTUnsealed, false)
|
||||||
|
|
||||||
|
return m.sched.Schedule(ctx, sector, sealtasks.TTReleaseUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, pathType, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
|
||||||
|
_, err := m.waitSimpleCall(ctx)(w.ReleaseUnsealed(ctx, sector, keepUnsealed))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) ReleaseSectorKey(ctx context.Context, sector storiface.SectorRef) error {
|
func (m *Manager) ReleaseSectorKey(ctx context.Context, sector storiface.SectorRef) error {
|
||||||
|
@ -336,7 +336,7 @@ func TestSnarkPackV2(t *testing.T) {
|
|||||||
localTasks := []sealtasks.TaskType{
|
localTasks := []sealtasks.TaskType{
|
||||||
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit1, sealtasks.TTCommit2, sealtasks.TTFinalize,
|
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit1, sealtasks.TTCommit2, sealtasks.TTFinalize,
|
||||||
sealtasks.TTFetch, sealtasks.TTReplicaUpdate, sealtasks.TTProveReplicaUpdate1, sealtasks.TTProveReplicaUpdate2, sealtasks.TTUnseal,
|
sealtasks.TTFetch, sealtasks.TTReplicaUpdate, sealtasks.TTProveReplicaUpdate1, sealtasks.TTProveReplicaUpdate2, sealtasks.TTUnseal,
|
||||||
sealtasks.TTRegenSectorKey,
|
sealtasks.TTRegenSectorKey, sealtasks.TTReleaseUnsealed,
|
||||||
}
|
}
|
||||||
wds := datastore.NewMapDatastore()
|
wds := datastore.NewMapDatastore()
|
||||||
|
|
||||||
|
@ -497,15 +497,15 @@ func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof
|
|||||||
return id, []abi.PieceInfo{pi}, nil
|
return id, []abi.PieceInfo{pi}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mgr *SectorMgr) FinalizeSector(context.Context, storiface.SectorRef, []storiface.Range) error {
|
func (mgr *SectorMgr) FinalizeSector(context.Context, storiface.SectorRef) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mgr *SectorMgr) FinalizeReplicaUpdate(context.Context, storiface.SectorRef, []storiface.Range) error {
|
func (mgr *SectorMgr) FinalizeReplicaUpdate(context.Context, storiface.SectorRef) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mgr *SectorMgr) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, safeToFree []storiface.Range) error {
|
func (mgr *SectorMgr) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,7 +107,7 @@ func TestReadPieceRemoteWorkers(t *testing.T) {
|
|||||||
// the unsealed file from the miner.
|
// the unsealed file from the miner.
|
||||||
ppt.addRemoteWorker(t, []sealtasks.TaskType{
|
ppt.addRemoteWorker(t, []sealtasks.TaskType{
|
||||||
sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit1,
|
sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit1,
|
||||||
sealtasks.TTFetch, sealtasks.TTFinalize,
|
sealtasks.TTFetch, sealtasks.TTFinalize, sealtasks.TTReleaseUnsealed,
|
||||||
})
|
})
|
||||||
|
|
||||||
// create a worker that can ONLY unseal and fetch
|
// create a worker that can ONLY unseal and fetch
|
||||||
|
@ -202,6 +202,7 @@ func (sh *Scheduler) Schedule(ctx context.Context, sector storiface.SectorRef, t
|
|||||||
ret: ret,
|
ret: ret,
|
||||||
Ctx: ctx,
|
Ctx: ctx,
|
||||||
}:
|
}:
|
||||||
|
log.Debugw("sched request sent", "sector", sector, "task", taskType)
|
||||||
case <-sh.closing:
|
case <-sh.closing:
|
||||||
return xerrors.New("closing")
|
return xerrors.New("closing")
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -148,7 +148,6 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
|
|||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
log.Debugf("SCHED windows: %+v", windows)
|
|
||||||
log.Debugf("SCHED Acceptable win: %+v", acceptableWindows)
|
log.Debugf("SCHED Acceptable win: %+v", acceptableWindows)
|
||||||
|
|
||||||
// Step 2
|
// Step 2
|
||||||
|
@ -20,7 +20,8 @@ const (
|
|||||||
TTCommit1 TaskType = "seal/v0/commit/1"
|
TTCommit1 TaskType = "seal/v0/commit/1"
|
||||||
TTCommit2 TaskType = "seal/v0/commit/2"
|
TTCommit2 TaskType = "seal/v0/commit/2"
|
||||||
|
|
||||||
TTFinalize TaskType = "seal/v0/finalize"
|
TTFinalize TaskType = "seal/v0/finalize"
|
||||||
|
TTReleaseUnsealed TaskType = "seal/v0/releaseunsealed"
|
||||||
|
|
||||||
TTFetch TaskType = "seal/v0/fetch"
|
TTFetch TaskType = "seal/v0/fetch"
|
||||||
TTUnseal TaskType = "seal/v0/unseal"
|
TTUnseal TaskType = "seal/v0/unseal"
|
||||||
@ -50,12 +51,13 @@ var order = map[TaskType]int{
|
|||||||
TTCommit1: 2,
|
TTCommit1: 2,
|
||||||
TTUnseal: 1,
|
TTUnseal: 1,
|
||||||
|
|
||||||
TTFetch: -1,
|
TTFetch: -1,
|
||||||
TTDownloadSector: -2,
|
TTDownloadSector: -2,
|
||||||
TTFinalize: -3,
|
TTFinalize: -3,
|
||||||
|
TTReleaseUnsealed: -4,
|
||||||
|
|
||||||
TTGenerateWindowPoSt: -4,
|
TTGenerateWindowPoSt: -5,
|
||||||
TTGenerateWinningPoSt: -5, // most priority
|
TTGenerateWinningPoSt: -6, // most priority
|
||||||
}
|
}
|
||||||
|
|
||||||
var shortNames = map[TaskType]string{
|
var shortNames = map[TaskType]string{
|
||||||
@ -67,7 +69,8 @@ var shortNames = map[TaskType]string{
|
|||||||
TTCommit1: "C1",
|
TTCommit1: "C1",
|
||||||
TTCommit2: "C2",
|
TTCommit2: "C2",
|
||||||
|
|
||||||
TTFinalize: "FIN",
|
TTFinalize: "FIN",
|
||||||
|
TTReleaseUnsealed: "FUS",
|
||||||
|
|
||||||
TTFetch: "GET",
|
TTFetch: "GET",
|
||||||
TTUnseal: "UNS",
|
TTUnseal: "UNS",
|
||||||
|
@ -63,12 +63,12 @@ type Sealer interface {
|
|||||||
SealCommit1(ctx context.Context, sector SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids SectorCids) (Commit1Out, error)
|
SealCommit1(ctx context.Context, sector SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids SectorCids) (Commit1Out, error)
|
||||||
SealCommit2(ctx context.Context, sector SectorRef, c1o Commit1Out) (Proof, error)
|
SealCommit2(ctx context.Context, sector SectorRef, c1o Commit1Out) (Proof, error)
|
||||||
|
|
||||||
FinalizeSector(ctx context.Context, sector SectorRef, keepUnsealed []Range) error
|
FinalizeSector(ctx context.Context, sector SectorRef) error
|
||||||
|
|
||||||
// ReleaseUnsealed marks parts of the unsealed sector file as safe to drop
|
// ReleaseUnsealed marks parts of the unsealed sector file as safe to drop
|
||||||
// (called by the fsm on restart, allows storage to keep no persistent
|
// (called by the fsm on restart, allows storage to keep no persistent
|
||||||
// state about unsealed fast-retrieval copies)
|
// state about unsealed fast-retrieval copies)
|
||||||
ReleaseUnsealed(ctx context.Context, sector SectorRef, safeToFree []Range) error
|
ReleaseUnsealed(ctx context.Context, sector SectorRef, keepUnsealed []Range) error
|
||||||
// ReleaseSectorKey removes `sealed` from all storage
|
// ReleaseSectorKey removes `sealed` from all storage
|
||||||
// called after successful sector upgrade
|
// called after successful sector upgrade
|
||||||
ReleaseSectorKey(ctx context.Context, sector SectorRef) error
|
ReleaseSectorKey(ctx context.Context, sector SectorRef) error
|
||||||
@ -89,7 +89,7 @@ type Sealer interface {
|
|||||||
// GenerateSectorKeyFromData computes sector key given unsealed data and updated replica
|
// GenerateSectorKeyFromData computes sector key given unsealed data and updated replica
|
||||||
GenerateSectorKeyFromData(ctx context.Context, sector SectorRef, unsealed cid.Cid) error
|
GenerateSectorKeyFromData(ctx context.Context, sector SectorRef, unsealed cid.Cid) error
|
||||||
|
|
||||||
FinalizeReplicaUpdate(ctx context.Context, sector SectorRef, keepUnsealed []Range) error
|
FinalizeReplicaUpdate(ctx context.Context, sector SectorRef) error
|
||||||
|
|
||||||
DownloadSectorData(ctx context.Context, sector SectorRef, finalized bool, src map[SectorFileType]SectorLocation) error
|
DownloadSectorData(ctx context.Context, sector SectorRef, finalized bool, src map[SectorFileType]SectorLocation) error
|
||||||
}
|
}
|
||||||
|
@ -124,8 +124,8 @@ type WorkerCalls interface {
|
|||||||
SealPreCommit2(ctx context.Context, sector SectorRef, pc1o PreCommit1Out) (CallID, error)
|
SealPreCommit2(ctx context.Context, sector SectorRef, pc1o PreCommit1Out) (CallID, error)
|
||||||
SealCommit1(ctx context.Context, sector SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids SectorCids) (CallID, error)
|
SealCommit1(ctx context.Context, sector SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids SectorCids) (CallID, error)
|
||||||
SealCommit2(ctx context.Context, sector SectorRef, c1o Commit1Out) (CallID, error)
|
SealCommit2(ctx context.Context, sector SectorRef, c1o Commit1Out) (CallID, error)
|
||||||
FinalizeSector(ctx context.Context, sector SectorRef, keepUnsealed []Range) (CallID, error)
|
FinalizeSector(ctx context.Context, sector SectorRef) (CallID, error)
|
||||||
FinalizeReplicaUpdate(ctx context.Context, sector SectorRef, keepUnsealed []Range) (CallID, error)
|
FinalizeReplicaUpdate(ctx context.Context, sector SectorRef) (CallID, error)
|
||||||
ReleaseUnsealed(ctx context.Context, sector SectorRef, safeToFree []Range) (CallID, error)
|
ReleaseUnsealed(ctx context.Context, sector SectorRef, safeToFree []Range) (CallID, error)
|
||||||
ReplicaUpdate(ctx context.Context, sector SectorRef, pieces []abi.PieceInfo) (CallID, error)
|
ReplicaUpdate(ctx context.Context, sector SectorRef, pieces []abi.PieceInfo) (CallID, error)
|
||||||
ProveReplicaUpdate1(ctx context.Context, sector SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (CallID, error)
|
ProveReplicaUpdate1(ctx context.Context, sector SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (CallID, error)
|
||||||
|
@ -481,35 +481,36 @@ func (l *LocalWorker) GenerateSectorKeyFromData(ctx context.Context, sector stor
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) {
|
func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) {
|
||||||
sb, err := l.executor()
|
sb, err := l.executor()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return storiface.UndefCall, err
|
return storiface.UndefCall, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return l.asyncCall(ctx, sector, FinalizeSector, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
return l.asyncCall(ctx, sector, FinalizeSector, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||||
if err := sb.FinalizeSector(ctx, sector, keepUnsealed); err != nil {
|
return nil, sb.FinalizeSector(ctx, sector)
|
||||||
return nil, xerrors.Errorf("finalizing sector: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(keepUnsealed) == 0 {
|
|
||||||
if err := l.storage.Remove(ctx, sector.ID, storiface.FTUnsealed, true, nil); err != nil {
|
|
||||||
return nil, xerrors.Errorf("removing unsealed data: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, err
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *LocalWorker) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) {
|
func (l *LocalWorker) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) {
|
||||||
sb, err := l.executor()
|
sb, err := l.executor()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return storiface.UndefCall, err
|
return storiface.UndefCall, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return l.asyncCall(ctx, sector, FinalizeReplicaUpdate, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
return l.asyncCall(ctx, sector, FinalizeReplicaUpdate, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||||
if err := sb.FinalizeReplicaUpdate(ctx, sector, keepUnsealed); err != nil {
|
return nil, sb.FinalizeReplicaUpdate(ctx, sector)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) {
|
||||||
|
sb, err := l.executor()
|
||||||
|
if err != nil {
|
||||||
|
return storiface.UndefCall, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return l.asyncCall(ctx, sector, ReleaseUnsealed, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||||
|
if err := sb.ReleaseUnsealed(ctx, sector, keepUnsealed); err != nil {
|
||||||
return nil, xerrors.Errorf("finalizing sector: %w", err)
|
return nil, xerrors.Errorf("finalizing sector: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -523,10 +524,6 @@ func (l *LocalWorker) FinalizeReplicaUpdate(ctx context.Context, sector storifac
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, safeToFree []storiface.Range) (storiface.CallID, error) {
|
|
||||||
return storiface.UndefCall, xerrors.Errorf("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error {
|
func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
@ -183,8 +183,12 @@ func (t *trackedWorker) SealCommit2(ctx context.Context, sector storiface.Sector
|
|||||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTCommit2, func() (storiface.CallID, error) { return t.Worker.SealCommit2(ctx, sector, c1o) })
|
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTCommit2, func() (storiface.CallID, error) { return t.Worker.SealCommit2(ctx, sector, c1o) })
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) {
|
func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) {
|
||||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalize, func() (storiface.CallID, error) { return t.Worker.FinalizeSector(ctx, sector, keepUnsealed) })
|
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalize, func() (storiface.CallID, error) { return t.Worker.FinalizeSector(ctx, sector) })
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *trackedWorker) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) {
|
||||||
|
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTReleaseUnsealed, func() (storiface.CallID, error) { return t.Worker.ReleaseUnsealed(ctx, sector, keepUnsealed) })
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *trackedWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data) (storiface.CallID, error) {
|
func (t *trackedWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data) (storiface.CallID, error) {
|
||||||
@ -225,8 +229,8 @@ func (t *trackedWorker) ProveReplicaUpdate2(ctx context.Context, sector storifac
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *trackedWorker) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) {
|
func (t *trackedWorker) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) {
|
||||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalizeReplicaUpdate, func() (storiface.CallID, error) { return t.Worker.FinalizeReplicaUpdate(ctx, sector, keepUnsealed) })
|
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalizeReplicaUpdate, func() (storiface.CallID, error) { return t.Worker.FinalizeReplicaUpdate(ctx, sector) })
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Worker = &trackedWorker{}
|
var _ Worker = &trackedWorker{}
|
||||||
|
Loading…
Reference in New Issue
Block a user