Merge pull request #11011 from filecoin-project/fix/snap-unseal

Snapdeals unsealing fixes
This commit is contained in:
Andrew Jackson (Ajax) 2023-09-05 10:25:34 -05:00 committed by GitHub
commit 70895afa3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 323 additions and 121 deletions

View File

@ -120,6 +120,11 @@ p: pvC0JBrEyUqtIIUvB2UUx/2a24c3Cvnu6AZ0D3IMBYAu...
type benchSectorProvider map[storiface.SectorFileType]string type benchSectorProvider map[storiface.SectorFileType]string
func (b benchSectorProvider) AcquireSectorCopy(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) {
// there's no copying in this context
return b.AcquireSector(ctx, id, existing, allocate, ptype)
}
func (b benchSectorProvider) AcquireSector(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) { func (b benchSectorProvider) AcquireSector(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) {
out := storiface.SectorPaths{ out := storiface.SectorPaths{
ID: id.ID, ID: id.ID,

View File

@ -91,7 +91,7 @@ func FetchWithTemp(ctx context.Context, urls []string, dest string, header http.
continue continue
} }
if err := move(tempDest, dest); err != nil { if err := Move(tempDest, dest); err != nil {
return "", xerrors.Errorf("fetch move error %s -> %s: %w", tempDest, dest, err) return "", xerrors.Errorf("fetch move error %s -> %s: %w", tempDest, dest, err)
} }

View File

@ -720,7 +720,7 @@ func (st *Local) MoveStorage(ctx context.Context, s storiface.SectorRef, types s
return xerrors.Errorf("dropping source sector from index: %w", err) return xerrors.Errorf("dropping source sector from index: %w", err)
} }
if err := move(storiface.PathByType(src, fileType), storiface.PathByType(dest, fileType)); err != nil { if err := Move(storiface.PathByType(src, fileType), storiface.PathByType(dest, fileType)); err != nil {
// TODO: attempt some recovery (check if src is still there, re-declare) // TODO: attempt some recovery (check if src is still there, re-declare)
return xerrors.Errorf("moving sector %v(%d): %w", s, fileType, err) return xerrors.Errorf("moving sector %v(%d): %w", s, fileType, err)
} }

View File

@ -249,7 +249,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
continue continue
} }
if err := move(tempDest, dest); err != nil { if err := Move(tempDest, dest); err != nil {
return "", xerrors.Errorf("fetch move error (storage %s) %s -> %s: %w", info.ID, tempDest, dest, err) return "", xerrors.Errorf("fetch move error (storage %s) %s -> %s: %w", info.ID, tempDest, dest, err)
} }

View File

@ -12,7 +12,7 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
) )
func move(from, to string) error { func Move(from, to string) error {
from, err := homedir.Expand(from) from, err := homedir.Expand(from)
if err != nil { if err != nil {
return xerrors.Errorf("move: expanding from: %w", err) return xerrors.Errorf("move: expanding from: %w", err)

View File

@ -89,3 +89,7 @@ func (b *Provider) AcquireSector(ctx context.Context, id storiface.SectorRef, ex
return out, done, nil return out, done, nil
} }
func (b *Provider) AcquireSectorCopy(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) {
return b.AcquireSector(ctx, id, existing, allocate, ptype)
}

View File

@ -10,6 +10,7 @@ import (
"crypto/rand" "crypto/rand"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"errors"
"io" "io"
"math/bits" "math/bits"
"os" "os"
@ -31,9 +32,9 @@ import (
"github.com/filecoin-project/lotus/lib/nullreader" "github.com/filecoin-project/lotus/lib/nullreader"
spaths "github.com/filecoin-project/lotus/storage/paths" spaths "github.com/filecoin-project/lotus/storage/paths"
nr "github.com/filecoin-project/lotus/storage/pipeline/lib/nullreader"
"github.com/filecoin-project/lotus/storage/sealer/fr32" "github.com/filecoin-project/lotus/storage/sealer/fr32"
"github.com/filecoin-project/lotus/storage/sealer/partialfile" "github.com/filecoin-project/lotus/storage/sealer/partialfile"
"github.com/filecoin-project/lotus/storage/sealer/proofpaths"
"github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/sealer/storiface"
) )
@ -403,91 +404,189 @@ func (sb *Sealer) pieceCid(spt abi.RegisteredSealProof, in []byte) (cid.Cid, err
return pieceCID, werr() return pieceCID, werr()
} }
func (sb *Sealer) tryDecodeUpdatedReplica(ctx context.Context, sector storiface.SectorRef, commD cid.Cid, unsealedPath string, randomness abi.SealRandomness) (bool, error) { func (sb *Sealer) acquireUpdatePath(ctx context.Context, sector storiface.SectorRef) (string, func(), error) {
replicaPath, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTNone, storiface.PathSealing) // copy so that the sector doesn't get removed from a long-term storage path
replicaPath, releaseSector, err := sb.sectors.AcquireSectorCopy(ctx, sector, storiface.FTUpdate, storiface.FTNone, storiface.PathSealing)
if xerrors.Is(err, storiface.ErrSectorNotFound) { if xerrors.Is(err, storiface.ErrSectorNotFound) {
return false, nil return "", releaseSector, nil
} else if err != nil { } else if err != nil {
return false, xerrors.Errorf("reading updated replica: %w", err) return "", releaseSector, xerrors.Errorf("reading updated replica: %w", err)
} }
defer done()
sealedPaths, done2, err := sb.AcquireSectorKeyOrRegenerate(ctx, sector, randomness) return replicaPath.Update, releaseSector, nil
}
func (sb *Sealer) decodeUpdatedReplica(ctx context.Context, sector storiface.SectorRef, commD cid.Cid, updatePath, unsealedPath string, randomness abi.SealRandomness) error {
keyPaths, done2, err := sb.acquireSectorKeyOrRegenerate(ctx, sector, randomness)
if err != nil { if err != nil {
return false, xerrors.Errorf("acquiring sealed sector: %w", err) return xerrors.Errorf("acquiring sealed sector: %w", err)
} }
defer done2() defer done2()
// Sector data stored in replica update // Sector data stored in replica update
updateProof, err := sector.ProofType.RegisteredUpdateProof() updateProof, err := sector.ProofType.RegisteredUpdateProof()
if err != nil { if err != nil {
return false, err return err
} }
return true, ffi.SectorUpdate.DecodeFrom(updateProof, unsealedPath, replicaPath.Update, sealedPaths.Sealed, sealedPaths.Cache, commD) if err := ffi.SectorUpdate.DecodeFrom(updateProof, unsealedPath, updatePath, keyPaths.Sealed, keyPaths.Cache, commD); err != nil {
} return xerrors.Errorf("decoding unsealed sector data: %w", err)
func (sb *Sealer) AcquireSectorKeyOrRegenerate(ctx context.Context, sector storiface.SectorRef, randomness abi.SealRandomness) (storiface.SectorPaths, func(), error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage)
if err == nil {
return paths, done, err
} else if !xerrors.Is(err, storiface.ErrSectorNotFound) {
return paths, done, xerrors.Errorf("reading sector key: %w", err)
} }
// Sector key can't be found, so let's regenerate it
sectorSize, err := sector.ProofType.SectorSize()
if err != nil {
return paths, done, xerrors.Errorf("retrieving sector size: %w", err)
}
paddedSize := abi.PaddedPieceSize(sectorSize)
_, err = sb.AddPiece(ctx, sector, nil, paddedSize.Unpadded(), nr.NewNullReader(paddedSize.Unpadded()))
if err != nil {
return paths, done, xerrors.Errorf("recomputing empty data: %w", err)
}
err = sb.RegenerateSectorKey(ctx, sector, randomness, []abi.PieceInfo{{PieceCID: zerocomm.ZeroPieceCommitment(paddedSize.Unpadded()), Size: paddedSize}})
if err != nil {
return paths, done, xerrors.Errorf("during pc1: %w", err)
}
// Sector key should exist now, let's grab the paths
return sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage)
}
func (sb *Sealer) UnsealPiece(ctx context.Context, sector storiface.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error {
ssize, err := sector.ProofType.SectorSize() ssize, err := sector.ProofType.SectorSize()
if err != nil { if err != nil {
return err return err
} }
maxPieceSize := abi.PaddedPieceSize(ssize) maxPieceSize := abi.PaddedPieceSize(ssize)
// try finding existing pf, err := partialfile.OpenPartialFile(maxPieceSize, unsealedPath)
if err != nil {
return xerrors.Errorf("opening partial file: %w", err)
}
if err := pf.MarkAllocated(0, maxPieceSize); err != nil {
return xerrors.Errorf("marking range allocated: %w", err)
}
if err := pf.Close(); err != nil {
return err
}
return nil
}
func (sb *Sealer) acquireSectorKeyOrRegenerate(ctx context.Context, sector storiface.SectorRef, randomness abi.SealRandomness) (storiface.SectorPaths, func(), error) {
// copy so that the files aren't removed from long-term storage
paths, done, err := sb.sectors.AcquireSectorCopy(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathSealing)
if err == nil {
return paths, done, err
} else if !xerrors.Is(err, storiface.ErrSectorNotFound) {
return paths, done, xerrors.Errorf("reading sector key: %w", err)
}
sectorSize, err := sector.ProofType.SectorSize()
if err != nil {
return storiface.SectorPaths{}, nil, xerrors.Errorf("retrieving sector size: %w", err)
}
err = sb.regenerateSectorKey(ctx, sector, randomness, zerocomm.ZeroPieceCommitment(abi.PaddedPieceSize(sectorSize).Unpadded()))
if err != nil {
return storiface.SectorPaths{}, nil, xerrors.Errorf("regenerating sector key: %w", err)
}
// Sector key should exist now, let's grab the paths
return sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathSealing)
}
func (sb *Sealer) regenerateSectorKey(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, keyDataCid cid.Cid) error {
paths, releaseSector, err := sb.sectors.AcquireSectorCopy(ctx, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}
defer releaseSector()
// stat paths.Sealed, make sure it doesn't exist
_, err = os.Stat(paths.Sealed)
if err == nil {
return xerrors.Errorf("sealed file exists before regenerating sector key")
}
if !os.IsNotExist(err) {
return xerrors.Errorf("stat sealed path: %w", err)
}
// prepare SDR params
commp, err := commcid.CIDToDataCommitmentV1(keyDataCid)
if err != nil {
return xerrors.Errorf("computing commP: %w", err)
}
replicaID, err := sector.ProofType.ReplicaId(sector.ID.Miner, sector.ID.Number, ticket, commp)
if err != nil {
return xerrors.Errorf("computing replica id: %w", err)
}
// generate new sector key
err = ffi.GenerateSDR(
sector.ProofType,
paths.Cache,
replicaID,
)
if err != nil {
return xerrors.Errorf("presealing sector %d (%s): %w", sector.ID.Number, paths.Unsealed, err)
}
// move the last layer (sector key) to the sealed location
layerCount, err := proofpaths.SDRLayers(sector.ProofType)
if err != nil {
return xerrors.Errorf("getting SDR layer count: %w", err)
}
lastLayer := filepath.Join(paths.Cache, proofpaths.LayerFileName(layerCount))
sealedInCache := filepath.Join(paths.Cache, filepath.Base(paths.Sealed))
// rename last layer to sealed sector file name in the cache dir, which is
// almost guaranteed to happen on one filesystem
err = os.Rename(lastLayer, sealedInCache)
if err != nil {
return xerrors.Errorf("renaming last layer: %w", err)
}
err = spaths.Move(sealedInCache, paths.Sealed)
if err != nil {
return xerrors.Errorf("moving sector key: %w", err)
}
// remove other layer files
for i := 1; i < layerCount; i++ {
err = os.Remove(filepath.Join(paths.Cache, proofpaths.LayerFileName(i)))
if err != nil {
return xerrors.Errorf("removing layer file %d: %w", i, err)
}
}
return nil
}
func (sb *Sealer) UnsealPiece(ctx context.Context, sector storiface.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error {
// NOTE: This function will copy sealed/unsealed (and possible update) files
// into sealing storage. Those copies get cleaned up in LocalWorker.UnsealPiece
// after this call exists. The resulting unsealed file is going to be moved to
// long-term storage as well.
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return err
}
maxPieceSize := abi.PaddedPieceSize(ssize)
// try finding existing (also move to a sealing path if it's not here)
unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTNone, storiface.PathSealing) unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTNone, storiface.PathSealing)
var pf *partialfile.PartialFile var pf *partialfile.PartialFile
switch { switch {
case xerrors.Is(err, storiface.ErrSectorNotFound): case xerrors.Is(err, storiface.ErrSectorNotFound):
// allocate if doesn't exist
unsealedPath, done, err = sb.sectors.AcquireSector(ctx, sector, storiface.FTNone, storiface.FTUnsealed, storiface.PathSealing) unsealedPath, done, err = sb.sectors.AcquireSector(ctx, sector, storiface.FTNone, storiface.FTUnsealed, storiface.PathSealing)
if err != nil { if err != nil {
return xerrors.Errorf("acquire unsealed sector path (allocate): %w", err) return xerrors.Errorf("acquire unsealed sector path (allocate): %w", err)
} }
defer done() case err == nil:
// no-op
pf, err = partialfile.CreatePartialFile(maxPieceSize, unsealedPath.Unsealed) default:
if err != nil { return xerrors.Errorf("acquire unsealed sector path (existing): %w", err)
return xerrors.Errorf("create unsealed file: %w", err)
} }
case err == nil:
defer done() defer done()
pf, err = partialfile.OpenPartialFile(maxPieceSize, unsealedPath.Unsealed) pf, err = partialfile.OpenPartialFile(maxPieceSize, unsealedPath.Unsealed)
if err != nil { if err != nil {
if errors.Is(err, os.ErrNotExist) {
pf, err = partialfile.CreatePartialFile(maxPieceSize, unsealedPath.Unsealed)
if err != nil {
return xerrors.Errorf("creating partial file: %w", err)
}
} else {
return xerrors.Errorf("opening partial file: %w", err) return xerrors.Errorf("opening partial file: %w", err)
} }
default:
return xerrors.Errorf("acquire unsealed sector path (existing): %w", err)
} }
defer pf.Close() // nolint defer pf.Close() // nolint
@ -496,6 +595,8 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector storiface.SectorRef, o
return xerrors.Errorf("getting bitruns of allocated data: %w", err) return xerrors.Errorf("getting bitruns of allocated data: %w", err)
} }
// figure out if there's anything that needs to be unsealed
toUnseal, err := computeUnsealRanges(allocated, offset, size) toUnseal, err := computeUnsealRanges(allocated, offset, size)
if err != nil { if err != nil {
return xerrors.Errorf("computing unseal ranges: %w", err) return xerrors.Errorf("computing unseal ranges: %w", err)
@ -505,21 +606,36 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector storiface.SectorRef, o
return nil return nil
} }
// need to unseal
// If piece data stored in updated replica decode whole sector // If piece data stored in updated replica decode whole sector
decoded, err := sb.tryDecodeUpdatedReplica(ctx, sector, commd, unsealedPath.Unsealed, randomness) upd, updDone, err := sb.acquireUpdatePath(ctx, sector)
if err != nil {
return xerrors.Errorf("acquiring update path: %w", err)
}
if upd != "" {
defer updDone()
// decodeUpdatedReplica mill modify the unsealed file
if err := pf.Close(); err != nil {
return err
}
err := sb.decodeUpdatedReplica(ctx, sector, commd, upd, unsealedPath.Unsealed, randomness)
if err != nil { if err != nil {
return xerrors.Errorf("decoding sector from replica: %w", err) return xerrors.Errorf("decoding sector from replica: %w", err)
} }
if decoded { return nil
return pf.MarkAllocated(0, maxPieceSize)
} }
// Piece data sealed in sector // Piece data non-upgrade sealed in sector
srcPaths, srcDone, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathSealing) // (copy so that files stay in long-term storage)
srcPaths, releaseSector, err := sb.sectors.AcquireSectorCopy(ctx, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathSealing)
if err != nil { if err != nil {
return xerrors.Errorf("acquire sealed sector paths: %w", err) return xerrors.Errorf("acquire sealed sector paths: %w", err)
} }
defer srcDone() defer releaseSector()
sealed, err := os.OpenFile(srcPaths.Sealed, os.O_RDONLY, 0644) // nolint:gosec sealed, err := os.OpenFile(srcPaths.Sealed, os.O_RDONLY, 0644) // nolint:gosec
if err != nil { if err != nil {
@ -687,51 +803,6 @@ func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector storif
return true, nil return true, nil
} }
func (sb *Sealer) RegenerateSectorKey(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) error {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed|storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}
defer done()
e, err := os.OpenFile(paths.Sealed, os.O_RDWR|os.O_CREATE, 0644) // nolint:gosec
if err != nil {
return xerrors.Errorf("ensuring sealed file exists: %w", err)
}
if err := e.Close(); err != nil {
return err
}
var sum abi.UnpaddedPieceSize
for _, piece := range pieces {
sum += piece.Size.Unpadded()
}
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return err
}
ussize := abi.PaddedPieceSize(ssize).Unpadded()
if sum != ussize {
return xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum))
}
// TODO: context cancellation respect
_, err = ffi.SealPreCommitPhase1(
sector.ProofType,
paths.Cache,
paths.Unsealed,
paths.Sealed,
sector.ID.Number,
sector.ID.Miner,
ticket,
pieces,
)
if err != nil {
return xerrors.Errorf("presealing sector %d (%s): %w", sector.ID.Number, paths.Unsealed, err)
}
return nil
}
func (sb *Sealer) SealPreCommit1(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storiface.PreCommit1Out, err error) { func (sb *Sealer) SealPreCommit1(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storiface.PreCommit1Out, err error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTSealed|storiface.FTCache, storiface.PathSealing) paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTSealed|storiface.FTCache, storiface.PathSealing)
if err != nil { if err != nil {

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"io/fs"
"math/rand" "math/rand"
"os" "os"
"path/filepath" "path/filepath"
@ -22,6 +23,7 @@ import (
ffi "github.com/filecoin-project/filecoin-ffi" ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/filecoin-ffi/cgo" "github.com/filecoin-project/filecoin-ffi/cgo"
commpffi "github.com/filecoin-project/go-commp-utils/ffiwrapper" commpffi "github.com/filecoin-project/go-commp-utils/ffiwrapper"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/go-paramfetch" "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
prooftypes "github.com/filecoin-project/go-state-types/proof" prooftypes "github.com/filecoin-project/go-state-types/proof"
@ -1103,3 +1105,66 @@ func (c *closeAssertReader) Close() error {
} }
var _ io.Closer = &closeAssertReader{} var _ io.Closer = &closeAssertReader{}
func TestGenerateSDR(t *testing.T) {
d := t.TempDir()
miner := abi.ActorID(123)
sp := &basicfs.Provider{
Root: d,
}
sb, err := New(sp)
require.NoError(t, err)
si := storiface.SectorRef{
ID: abi.SectorID{Miner: miner, Number: 1},
ProofType: sealProofType,
}
s := seal{ref: si}
sz := abi.PaddedPieceSize(sectorSize).Unpadded()
s.pi, err = sb.AddPiece(context.TODO(), si, []abi.UnpaddedPieceSize{}, sz, nullreader.NewNullReader(sz))
require.NoError(t, err)
s.ticket = sealRand
_, err = sb.SealPreCommit1(context.TODO(), si, s.ticket, []abi.PieceInfo{s.pi})
require.NoError(t, err)
// sdr for comparison
sdrCache := filepath.Join(d, "sdrcache")
commd, err := commcid.CIDToDataCommitmentV1(s.pi.PieceCID)
require.NoError(t, err)
replicaID, err := sealProofType.ReplicaId(si.ID.Miner, si.ID.Number, s.ticket, commd)
require.NoError(t, err)
err = ffi.GenerateSDR(sealProofType, sdrCache, replicaID)
require.NoError(t, err)
// list files in d recursively, for debug
require.NoError(t, filepath.Walk(d, func(path string, info fs.FileInfo, err error) error {
fmt.Println(path)
return nil
}))
// compare
lastLayerFile := "sc-02-data-layer-2.dat"
sdrFile := filepath.Join(sdrCache, lastLayerFile)
pc1File := filepath.Join(d, "cache/s-t0123-1/", lastLayerFile)
sdrData, err := os.ReadFile(sdrFile)
require.NoError(t, err)
pc1Data, err := os.ReadFile(pc1File)
require.NoError(t, err)
require.Equal(t, sdrData, pc1Data)
}

View File

@ -11,6 +11,8 @@ type SectorProvider interface {
// * returns storiface.ErrSectorNotFound if a requested existing sector doesn't exist // * returns storiface.ErrSectorNotFound if a requested existing sector doesn't exist
// * returns an error when allocate is set, and existing isn't, and the sector exists // * returns an error when allocate is set, and existing isn't, and the sector exists
AcquireSector(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) AcquireSector(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error)
// AcquireSector, but a copy to preseve its long-term storage location.
AcquireSectorCopy(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error)
} }
var _ SectorProvider = &basicfs.Provider{} var _ SectorProvider = &basicfs.Provider{}

View File

@ -330,7 +330,7 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storiface.Secto
// if the selected worker does NOT have the sealed files for the sector, instruct it to fetch it from a worker that has them and // if the selected worker does NOT have the sealed files for the sector, instruct it to fetch it from a worker that has them and
// put it in the sealing scratch space. // put it in the sealing scratch space.
sealFetch := PrepareAction{ unsealFetch := PrepareAction{
Action: func(ctx context.Context, worker Worker) error { Action: func(ctx context.Context, worker Worker) error {
log.Debugf("copy sealed/cache sector data for sector %d", sector.ID) log.Debugf("copy sealed/cache sector data for sector %d", sector.ID)
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)) _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy))
@ -359,7 +359,7 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storiface.Secto
selector := newExistingSelector(m.index, sector.ID, storiface.FTSealed|storiface.FTCache, true) selector := newExistingSelector(m.index, sector.ID, storiface.FTSealed|storiface.FTCache, true)
log.Debugf("will schedule unseal for sector %d", sector.ID) log.Debugf("will schedule unseal for sector %d", sector.ID)
err = m.sched.Schedule(ctx, sector, sealtasks.TTUnseal, selector, sealFetch, func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTUnseal, selector, unsealFetch, func(ctx context.Context, w Worker) error {
// TODO: make restartable // TODO: make restartable
// NOTE: we're unsealing the whole sector here as with SDR we can't really // NOTE: we're unsealing the whole sector here as with SDR we can't really

View File

@ -261,7 +261,7 @@ func TestSnapDeals(t *testing.T) {
// Precommit and Seal a CC sector // Precommit and Seal a CC sector
fmt.Printf("PC1\n") fmt.Printf("PC1\n")
ticket := abi.SealRandomness{9, 9, 9, 9, 9, 9, 9, 9} ticket := abi.SealRandomness{9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9}
pc1Out, err := m.SealPreCommit1(ctx, sid, ticket, ccPieces) pc1Out, err := m.SealPreCommit1(ctx, sid, ticket, ccPieces)
require.NoError(t, err) require.NoError(t, err)
fmt.Printf("PC2\n") fmt.Printf("PC2\n")

View File

@ -0,0 +1,30 @@
package proofpaths
import (
"fmt"
"github.com/filecoin-project/go-state-types/abi"
)
var dataFilePrefix = "sc-02-data-"
func LayerFileName(layer int) string {
return fmt.Sprintf("%slayer-%d.dat", dataFilePrefix, layer)
}
func SDRLayers(spt abi.RegisteredSealProof) (int, error) {
switch spt {
case abi.RegisteredSealProof_StackedDrg2KiBV1, abi.RegisteredSealProof_StackedDrg2KiBV1_1:
return 2, nil
case abi.RegisteredSealProof_StackedDrg8MiBV1, abi.RegisteredSealProof_StackedDrg8MiBV1_1:
return 2, nil
case abi.RegisteredSealProof_StackedDrg512MiBV1, abi.RegisteredSealProof_StackedDrg512MiBV1_1:
return 2, nil
case abi.RegisteredSealProof_StackedDrg32GiBV1, abi.RegisteredSealProof_StackedDrg32GiBV1_1:
return 11, nil
case abi.RegisteredSealProof_StackedDrg64GiBV1, abi.RegisteredSealProof_StackedDrg64GiBV1_1:
return 11, nil
default:
return 0, fmt.Errorf("unsupported proof type: %v", spt)
}
}

View File

@ -0,0 +1,16 @@
package proofpaths
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-state-types/abi"
)
func TestSDRLayersDefined(t *testing.T) {
for proof := range abi.SealProofInfos {
_, err := SDRLayers(proof)
require.NoError(t, err)
}
}

View File

@ -36,3 +36,7 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id storiface.Secto
return p, cancel, err return p, cancel, err
} }
func (l *readonlyProvider) AcquireSectorCopy(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) {
return storiface.SectorPaths{}, nil, xerrors.New("read-only storage")
}

View File

@ -180,6 +180,10 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector stor
}, nil }, nil
} }
func (l *localWorkerPathProvider) AcquireSectorCopy(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) {
return (&localWorkerPathProvider{w: l.w, op: storiface.AcquireCopy}).AcquireSector(ctx, id, existing, allocate, ptype)
}
func (l *LocalWorker) ffiExec() (storiface.Storage, error) { func (l *LocalWorker) ffiExec() (storiface.Storage, error) {
return ffiwrapper.New(&localWorkerPathProvider{w: l}) return ffiwrapper.New(&localWorkerPathProvider{w: l})
} }
@ -571,15 +575,16 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storiface.SectorRe
return nil, xerrors.Errorf("unsealing sector: %w", err) return nil, xerrors.Errorf("unsealing sector: %w", err)
} }
if err = l.storage.RemoveCopies(ctx, sector.ID, storiface.FTSealed); err != nil { // note: the unsealed file is moved to long-term storage in Manager.SectorsUnsealPiece
storageTypes := []storiface.SectorFileType{storiface.FTSealed, storiface.FTCache, storiface.FTUpdate, storiface.FTUpdateCache}
for _, fileType := range storageTypes {
if err = l.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil {
return nil, xerrors.Errorf("removing source data: %w", err) return nil, xerrors.Errorf("removing source data: %w", err)
} }
if err = l.storage.RemoveCopies(ctx, sector.ID, storiface.FTCache); err != nil {
return nil, xerrors.Errorf("removing source data: %w", err)
} }
log.Debugf("worker has unsealed piece, sector=%+v", sector.ID) log.Debugf("unsealed piece, sector=%+v", sector.ID)
return nil, nil return nil, nil
}) })