diff --git a/api/api_worker.go b/api/api_worker.go index 68d8e7baf..ba50a9459 100644 --- a/api/api_worker.go +++ b/api/api_worker.go @@ -39,6 +39,7 @@ type Worker interface { SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) //perm:admin SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) //perm:admin FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) //perm:admin + FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) //perm:admin ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) //perm:admin ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storiface.CallID, error) //perm:admin diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 32f59e0ec..a3b498e09 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -864,6 +864,8 @@ type WorkerStruct struct { Fetch func(p0 context.Context, p1 storage.SectorRef, p2 storiface.SectorFileType, p3 storiface.PathType, p4 storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"` + FinalizeReplicaUpdate func(p0 context.Context, p1 storage.SectorRef, p2 []storage.Range) (storiface.CallID, error) `perm:"admin"` + FinalizeSector func(p0 context.Context, p1 storage.SectorRef, p2 []storage.Range) (storiface.CallID, error) `perm:"admin"` GenerateSectorKeyFromData func(p0 context.Context, p1 storage.SectorRef, p2 cid.Cid) (storiface.CallID, error) `perm:"admin"` @@ -4954,6 +4956,17 @@ func (s *WorkerStub) Fetch(p0 context.Context, p1 storage.SectorRef, p2 storifac return *new(storiface.CallID), ErrNotSupported } +func (s *WorkerStruct) FinalizeReplicaUpdate(p0 context.Context, p1 storage.SectorRef, p2 []storage.Range) (storiface.CallID, error) { + if s.Internal.FinalizeReplicaUpdate == nil { + return *new(storiface.CallID), ErrNotSupported + } + return s.Internal.FinalizeReplicaUpdate(p0, p1, p2) +} + +func (s *WorkerStub) FinalizeReplicaUpdate(p0 context.Context, p1 storage.SectorRef, p2 []storage.Range) (storiface.CallID, error) { + return *new(storiface.CallID), ErrNotSupported +} + func (s *WorkerStruct) FinalizeSector(p0 context.Context, p1 storage.SectorRef, p2 []storage.Range) (storiface.CallID, error) { if s.Internal.FinalizeSector == nil { return *new(storiface.CallID), ErrNotSupported diff --git a/extern/sector-storage/ffiwrapper/sealer_cgo.go b/extern/sector-storage/ffiwrapper/sealer_cgo.go index c35cefd56..cdda79a75 100644 --- a/extern/sector-storage/ffiwrapper/sealer_cgo.go +++ b/extern/sector-storage/ffiwrapper/sealer_cgo.go @@ -843,6 +843,82 @@ func (sb *Sealer) FinalizeSector(ctx context.Context, sector storage.SectorRef, return ffi.ClearCache(uint64(ssize), paths.Cache) } +func (sb *Sealer) FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error { + ssize, err := sector.ProofType.SectorSize() + if err != nil { + return err + } + maxPieceSize := abi.PaddedPieceSize(ssize) + + if len(keepUnsealed) > 0 { // TODO dedupe with the above + + sr := partialfile.PieceRun(0, maxPieceSize) + + for _, s := range keepUnsealed { + si := &rlepluslazy.RunSliceIterator{} + if s.Offset != 0 { + si.Runs = append(si.Runs, rlepluslazy.Run{Val: false, Len: uint64(s.Offset)}) + } + si.Runs = append(si.Runs, rlepluslazy.Run{Val: true, Len: uint64(s.Size)}) + + var err error + sr, err = rlepluslazy.Subtract(sr, si) + if err != nil { + return err + } + } + + paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, 0, storiface.PathStorage) + if err != nil { + return xerrors.Errorf("acquiring sector cache path: %w", err) + } + defer done() + + pf, err := partialfile.OpenPartialFile(maxPieceSize, paths.Unsealed) + if err == nil { + var at uint64 + for sr.HasNext() { + r, err := sr.NextRun() + if err != nil { + _ = pf.Close() + return err + } + + offset := at + at += r.Len + if !r.Val { + continue + } + + err = pf.Free(storiface.PaddedByteIndex(abi.UnpaddedPieceSize(offset).Padded()), abi.UnpaddedPieceSize(r.Len).Padded()) + if err != nil { + _ = pf.Close() + return xerrors.Errorf("free partial file range: %w", err) + } + } + + if err := pf.Close(); err != nil { + return err + } + } else { + if !xerrors.Is(err, os.ErrNotExist) { + return xerrors.Errorf("opening partial file: %w", err) + } + } + + } + + paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, 0, storiface.PathStorage) + if err != nil { + return xerrors.Errorf("acquiring sector cache path: %w", err) + } + defer done() + + return ffi.ClearCache(uint64(ssize), paths.Cache) + + // TODO: ^ above but for snapdeals +} + func (sb *Sealer) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) error { // This call is meant to mark storage as 'freeable'. Given that unsealing is // very expensive, we don't remove data as soon as we can - instead we only diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 475c399e9..f1f84a057 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -577,6 +577,74 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef, return nil } +func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache|storiface.FTUpdate|storiface.FTUpdateCache); err != nil { + return xerrors.Errorf("acquiring sector lock: %w", err) + } + + fts := storiface.FTUnsealed + { + unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false) + if err != nil { + return xerrors.Errorf("finding unsealed sector: %w", err) + } + + if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine + fts = storiface.FTNone + } + } + + pathType := storiface.PathStorage + { + sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUpdate, 0, false) + if err != nil { + return xerrors.Errorf("finding sealed sector: %w", err) + } + + for _, store := range sealedStores { + if store.CanSeal { + pathType = storiface.PathSealing + break + } + } + } + + selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache, false) + + err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector, + m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|fts, pathType, storiface.AcquireMove), + func(ctx context.Context, w Worker) error { + _, err := m.waitSimpleCall(ctx)(w.FinalizeReplicaUpdate(ctx, sector, keepUnsealed)) + return err + }) + if err != nil { + return err + } + + fetchSel := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathStorage) + moveUnsealed := fts + { + if len(keepUnsealed) == 0 { + moveUnsealed = storiface.FTNone + } + } + + err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel, + m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|moveUnsealed, storiface.PathStorage, storiface.AcquireMove), + func(ctx context.Context, w Worker) error { + _, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|moveUnsealed)) + return err + }) + if err != nil { + return xerrors.Errorf("moving sector to storage: %w", err) + } + + return nil +} + func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) error { return nil } diff --git a/extern/sector-storage/sealtasks/task.go b/extern/sector-storage/sealtasks/task.go index f6104878b..89bfd3a88 100644 --- a/extern/sector-storage/sealtasks/task.go +++ b/extern/sector-storage/sealtasks/task.go @@ -14,10 +14,11 @@ const ( TTFetch TaskType = "seal/v0/fetch" TTUnseal TaskType = "seal/v0/unseal" - TTReplicaUpdate TaskType = "seal/v0/replicaupdate" - TTProveReplicaUpdate1 TaskType = "seal/v0/provereplicaupdate/1" - TTProveReplicaUpdate2 TaskType = "seal/v0/provereplicaupdate/2" - TTRegenSectorKey TaskType = "seal/v0/regensectorkey" + TTReplicaUpdate TaskType = "seal/v0/replicaupdate" + TTProveReplicaUpdate1 TaskType = "seal/v0/provereplicaupdate/1" + TTProveReplicaUpdate2 TaskType = "seal/v0/provereplicaupdate/2" + TTRegenSectorKey TaskType = "seal/v0/regensectorkey" + TTFinalizeReplicaUpdate TaskType = "seal/v0/finalize/replicaupdate" ) var order = map[TaskType]int{ @@ -48,10 +49,11 @@ var shortNames = map[TaskType]string{ TTFetch: "GET", TTUnseal: "UNS", - TTReplicaUpdate: "RU", - TTProveReplicaUpdate1: "PR1", - TTProveReplicaUpdate2: "PR2", - TTRegenSectorKey: "GSK", + TTReplicaUpdate: "RU", + TTProveReplicaUpdate1: "PR1", + TTProveReplicaUpdate2: "PR2", + TTRegenSectorKey: "GSK", + TTFinalizeReplicaUpdate: "FRU", } func (a TaskType) MuchLess(b TaskType) (bool, bool) { diff --git a/extern/sector-storage/storiface/worker.go b/extern/sector-storage/storiface/worker.go index 8bb6a256a..476854cd7 100644 --- a/extern/sector-storage/storiface/worker.go +++ b/extern/sector-storage/storiface/worker.go @@ -120,6 +120,7 @@ type WorkerCalls interface { SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (CallID, error) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (CallID, error) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (CallID, error) + FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (CallID, error) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (CallID, error) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (CallID, error) ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (CallID, error) diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index a5f5a0b9d..232877f2f 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -162,20 +162,21 @@ func (l *LocalWorker) ffiExec() (ffiwrapper.Storage, error) { type ReturnType string const ( - AddPiece ReturnType = "AddPiece" - SealPreCommit1 ReturnType = "SealPreCommit1" - SealPreCommit2 ReturnType = "SealPreCommit2" - SealCommit1 ReturnType = "SealCommit1" - SealCommit2 ReturnType = "SealCommit2" - FinalizeSector ReturnType = "FinalizeSector" - ReplicaUpdate ReturnType = "ReplicaUpdate" - ProveReplicaUpdate1 ReturnType = "ProveReplicaUpdate1" - ProveReplicaUpdate2 ReturnType = "ProveReplicaUpdate2" - GenerateSectorKey ReturnType = "GenerateSectorKey" - ReleaseUnsealed ReturnType = "ReleaseUnsealed" - MoveStorage ReturnType = "MoveStorage" - UnsealPiece ReturnType = "UnsealPiece" - Fetch ReturnType = "Fetch" + AddPiece ReturnType = "AddPiece" + SealPreCommit1 ReturnType = "SealPreCommit1" + SealPreCommit2 ReturnType = "SealPreCommit2" + SealCommit1 ReturnType = "SealCommit1" + SealCommit2 ReturnType = "SealCommit2" + FinalizeSector ReturnType = "FinalizeSector" + FinalizeReplicaUpdate ReturnType = "FinalizeReplicaUpdate" + ReplicaUpdate ReturnType = "ReplicaUpdate" + ProveReplicaUpdate1 ReturnType = "ProveReplicaUpdate1" + ProveReplicaUpdate2 ReturnType = "ProveReplicaUpdate2" + GenerateSectorKey ReturnType = "GenerateSectorKey" + ReleaseUnsealed ReturnType = "ReleaseUnsealed" + MoveStorage ReturnType = "MoveStorage" + UnsealPiece ReturnType = "UnsealPiece" + Fetch ReturnType = "Fetch" ) // in: func(WorkerReturn, context.Context, CallID, err string) @@ -456,6 +457,27 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storage.SectorR }) } +func (l *LocalWorker) FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) { + sb, err := l.executor() + if err != nil { + return storiface.UndefCall, err + } + + return l.asyncCall(ctx, sector, FinalizeReplicaUpdate, func(ctx context.Context, ci storiface.CallID) (interface{}, error) { + if err := sb.FinalizeReplicaUpdate(ctx, sector, keepUnsealed); err != nil { + return nil, xerrors.Errorf("finalizing sector: %w", err) + } + + if len(keepUnsealed) == 0 { + if err := l.storage.Remove(ctx, sector.ID, storiface.FTUnsealed, true, nil); err != nil { + return nil, xerrors.Errorf("removing unsealed data: %w", err) + } + } + + return nil, err + }) +} + func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) { return storiface.UndefCall, xerrors.Errorf("implement me") } diff --git a/extern/storage-sealing/states_replica_update.go b/extern/storage-sealing/states_replica_update.go index aecd3512a..6222d49db 100644 --- a/extern/storage-sealing/states_replica_update.go +++ b/extern/storage-sealing/states_replica_update.go @@ -213,6 +213,15 @@ func (m *Sealing) handleReplicaUpdateWait(ctx statemachine.Context, sector Secto } func (m *Sealing) handleFinalizeReplicaUpdate(ctx statemachine.Context, sector SectorInfo) error { + cfg, err := m.getConfig() + if err != nil { + return xerrors.Errorf("getting sealing config: %w", err) + } + + if err := m.sealer.FinalizeReplicaUpdate(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(false, cfg.AlwaysKeepUnsealedCopy)); err != nil { + return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)}) + } + return ctx.Send(SectorFinalized{}) } diff --git a/go.mod b/go.mod index 67bd4ca77..262763d9d 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,7 @@ require ( github.com/filecoin-project/specs-actors/v5 v5.0.4 github.com/filecoin-project/specs-actors/v6 v6.0.1 github.com/filecoin-project/specs-actors/v7 v7.0.0-rc1 - github.com/filecoin-project/specs-storage v0.1.1-0.20211228030229-6d460d25a0c9 + github.com/filecoin-project/specs-storage v0.1.1-0.20220202201749-ae62d2332aa8 github.com/filecoin-project/test-vectors/schema v0.0.5 github.com/gbrlsnchs/jwt/v3 v3.0.1 github.com/gdamore/tcell/v2 v2.2.0 diff --git a/go.sum b/go.sum index cfc8e50eb..079604134 100644 --- a/go.sum +++ b/go.sum @@ -401,6 +401,8 @@ github.com/filecoin-project/specs-actors/v7 v7.0.0-rc1 h1:FuDaXIbcw2hRsFI8SDTmsG github.com/filecoin-project/specs-actors/v7 v7.0.0-rc1/go.mod h1:TA5FwCna+Yi36POaT7SLKXsgEDvJwc0V/L6ZsO19B9M= github.com/filecoin-project/specs-storage v0.1.1-0.20211228030229-6d460d25a0c9 h1:oUYOvF7EvdXS0Zmk9mNkaB6Bu0l+WXBYPzVodKMiLug= github.com/filecoin-project/specs-storage v0.1.1-0.20211228030229-6d460d25a0c9/go.mod h1:Tb88Zq+IBJbvAn3mS89GYj3jdRThBTE/771HCVZdRJU= +github.com/filecoin-project/specs-storage v0.1.1-0.20220202201749-ae62d2332aa8 h1:lHg1G44FX6LNuIcGJWE6ty4dH+WoFIA2UIfGGV06Hpg= +github.com/filecoin-project/specs-storage v0.1.1-0.20220202201749-ae62d2332aa8/go.mod h1:Tb88Zq+IBJbvAn3mS89GYj3jdRThBTE/771HCVZdRJU= github.com/filecoin-project/test-vectors/schema v0.0.5 h1:w3zHQhzM4pYxJDl21avXjOKBLF8egrvwUwjpT8TquDg= github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=