diff --git a/extern/sector-storage/ffiwrapper/sealer_cgo.go b/extern/sector-storage/ffiwrapper/sealer_cgo.go index 400b67211..18a4e64cb 100644 --- a/extern/sector-storage/ffiwrapper/sealer_cgo.go +++ b/extern/sector-storage/ffiwrapper/sealer_cgo.go @@ -15,6 +15,8 @@ import ( "os" "runtime" + nr "github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader" + "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -369,8 +371,8 @@ func (sb *Sealer) pieceCid(spt abi.RegisteredSealProof, in []byte) (cid.Cid, err return pieceCID, werr() } -func (sb *Sealer) tryDecodeUpdatedReplica(ctx context.Context, sector storage.SectorRef, commD cid.Cid, unsealedPath string) (bool, error) { - paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUpdate|storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage) +func (sb *Sealer) tryDecodeUpdatedReplica(ctx context.Context, sector storage.SectorRef, commD cid.Cid, unsealedPath string, randomness abi.SealRandomness) (bool, error) { + replicaPath, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTNone, storiface.PathStorage) if xerrors.Is(err, storiface.ErrSectorNotFound) { return false, nil } else if err != nil { @@ -378,12 +380,52 @@ func (sb *Sealer) tryDecodeUpdatedReplica(ctx context.Context, sector storage.Se } defer done() + sealedPaths, releaseSectorKey, err := sb.AcquireSectorKeyOrRegenerate(ctx, sector, randomness) + if err != nil { + return false, xerrors.Errorf("acquiring sealed sector: %w", err) + } + defer releaseSectorKey() + // Sector data stored in replica update updateProof, err := sector.ProofType.RegisteredUpdateProof() if err != nil { return false, err } - return true, ffi.SectorUpdate.DecodeFrom(updateProof, unsealedPath, paths.Update, paths.Sealed, paths.Cache, commD) + return true, ffi.SectorUpdate.DecodeFrom(updateProof, unsealedPath, replicaPath.Update, sealedPaths.Sealed, sealedPaths.Cache, commD) +} + +func (sb *Sealer) AcquireSectorKeyOrRegenerate(ctx context.Context, sector storage.SectorRef, randomness abi.SealRandomness) (storiface.SectorPaths, func(), error) { + paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage) + if err == nil { + return paths, done, err + } else if !xerrors.Is(err, storiface.ErrSectorNotFound) { + return paths, done, xerrors.Errorf("reading sector key: %w", err) + } + + // Sector key can't be found, so let's regenerate it + sectorSize, err := sector.ProofType.SectorSize() + if err != nil { + return paths, done, xerrors.Errorf("retrieving sector size: %w", err) + } + paddedSize := abi.PaddedPieceSize(sectorSize) + + _, err = sb.AddPiece(ctx, sector, nil, paddedSize.Unpadded(), nr.NewNullReader(paddedSize.Unpadded())) + if err != nil { + return paths, done, xerrors.Errorf("recomputing empty data: %w", err) + } + + pc1, err := sb.SealPreCommit1(ctx, sector, randomness, []abi.PieceInfo{{PieceCID: zerocomm.ZeroPieceCommitment(paddedSize.Unpadded()), Size: paddedSize}}) + if err != nil { + return paths, done, xerrors.Errorf("during pc1: %w", err) + } + + _, err = sb.SealPreCommit2(ctx, sector, pc1) + if err != nil { + return paths, done, xerrors.Errorf("during pc2: %w", err) + } + + // Sector key should exist now, let's grab the paths + return sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage) } func (sb *Sealer) UnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error { @@ -437,7 +479,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector storage.SectorRef, off } // If piece data stored in updated replica decode whole sector - decoded, err := sb.tryDecodeUpdatedReplica(ctx, sector, commd, unsealedPath.Unsealed) + decoded, err := sb.tryDecodeUpdatedReplica(ctx, sector, commd, unsealedPath.Unsealed, randomness) if err != nil { return xerrors.Errorf("decoding sector from replica: %w", err) } diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 4b52f9a1d..10869a65f 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -273,7 +273,7 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storage.SectorR defer cancel() log.Debugf("acquire unseal sector lock for sector %d", sector.ID) - if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed|storiface.FTCache, storiface.FTUnsealed); err != nil { + if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed|storiface.FTCache|storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTUnsealed); err != nil { return xerrors.Errorf("acquiring unseal sector lock: %w", err) } @@ -281,8 +281,12 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storage.SectorR // put it in the sealing scratch space. sealFetch := func(ctx context.Context, worker Worker) error { log.Debugf("copy sealed/cache sector data for sector %d", sector.ID) - if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)); err != nil { - return xerrors.Errorf("copy sealed/cache sector data: %w", err) + _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)) + if err != nil { + _, err2 := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing, storiface.AcquireCopy)) + if err2 != nil { + return xerrors.Errorf("copy sealed/cache sector data: %w %w", err, err2) + } } return nil diff --git a/extern/sector-storage/manager_test.go b/extern/sector-storage/manager_test.go index 9c844292e..d9f8af461 100644 --- a/extern/sector-storage/manager_test.go +++ b/extern/sector-storage/manager_test.go @@ -315,6 +315,12 @@ func TestSnapDeals(t *testing.T) { require.NoError(t, m.GenerateSectorKeyFromData(ctx, sid, out.NewUnsealed)) fmt.Printf("GSK duration (%s): %s\n", ss.ShortString(), time.Since(startGSK)) + fmt.Printf("Remove data\n") + require.NoError(t, m.FinalizeSector(ctx, sid, nil)) + fmt.Printf("Release Sector Key\n") + require.NoError(t, m.ReleaseSectorKey(ctx, sid)) + fmt.Printf("Unseal Replica\n") + require.NoError(t, m.SectorsUnsealPiece(ctx, sid, 0, p1.Size.Unpadded(), ticket, &out.NewUnsealed)) } func TestRedoPC1(t *testing.T) {