ffiwrapper: Fix sector acquire logic in unsealing

This commit is contained in:
Łukasz Magiera 2023-06-28 16:02:43 +02:00 committed by Jennifer Wang
parent fa0f117edd
commit 6167bcc8ce
4 changed files with 95 additions and 77 deletions

View File

@ -89,3 +89,7 @@ func (b *Provider) AcquireSector(ctx context.Context, id storiface.SectorRef, ex
return out, done, nil
}
func (b *Provider) AcquireSectorCopy(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) {
return b.AcquireSector(ctx, id, existing, allocate, ptype)
}

View File

@ -404,7 +404,8 @@ func (sb *Sealer) pieceCid(spt abi.RegisteredSealProof, in []byte) (cid.Cid, err
}
func (sb *Sealer) maybeAcquireUpdatePath(ctx context.Context, sector storiface.SectorRef) (string, func(), error) {
replicaPath, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUpdate, storiface.FTNone, storiface.PathSealing)
// copy so that the sector doesn't get removed from a long-term storage path
replicaPath, done, err := sb.sectors.AcquireSectorCopy(ctx, sector, storiface.FTUpdate, storiface.FTNone, storiface.PathSealing)
if xerrors.Is(err, storiface.ErrSectorNotFound) {
return "", done, nil
} else if err != nil {
@ -414,8 +415,8 @@ func (sb *Sealer) maybeAcquireUpdatePath(ctx context.Context, sector storiface.S
return replicaPath.Update, done, nil
}
func (sb *Sealer) decodeUpdatedReplica(ctx context.Context, sector storiface.SectorRef, commD cid.Cid, updatePath, unsealedPath string, randomness abi.SealRandomness) (error) {
keyPaths, done2, err := sb.AcquireSectorKeyOrRegenerate(ctx, sector, randomness)
func (sb *Sealer) decodeUpdatedReplica(ctx context.Context, sector storiface.SectorRef, commD cid.Cid, updatePath, unsealedPath string, randomness abi.SealRandomness) error {
keyPaths, done2, err := sb.acquireSectorKeyOrRegenerate(ctx, sector, randomness)
if err != nil {
return xerrors.Errorf("acquiring sealed sector: %w", err)
}
@ -452,8 +453,9 @@ func (sb *Sealer) decodeUpdatedReplica(ctx context.Context, sector storiface.Sec
return nil
}
func (sb *Sealer) AcquireSectorKeyOrRegenerate(ctx context.Context, sector storiface.SectorRef, randomness abi.SealRandomness) (storiface.SectorPaths, func(), error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage)
func (sb *Sealer) acquireSectorKeyOrRegenerate(ctx context.Context, sector storiface.SectorRef, randomness abi.SealRandomness) (storiface.SectorPaths, func(), error) {
// copy so that the files aren't removed from long-term storage
paths, done, err := sb.sectors.AcquireSectorCopy(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathSealing)
if err == nil {
return paths, done, err
} else if !xerrors.Is(err, storiface.ErrSectorNotFound) {
@ -462,26 +464,90 @@ func (sb *Sealer) AcquireSectorKeyOrRegenerate(ctx context.Context, sector stori
sectorSize, err := sector.ProofType.SectorSize()
if err != nil {
return paths, done, xerrors.Errorf("retrieving sector size: %w", err)
return storiface.SectorPaths{}, nil, xerrors.Errorf("retrieving sector size: %w", err)
}
err = sb.RegenerateSectorKey(ctx, sector, randomness, zerocomm.ZeroPieceCommitment(abi.PaddedPieceSize(sectorSize).Unpadded()))
err = sb.regenerateSectorKey(ctx, sector, randomness, zerocomm.ZeroPieceCommitment(abi.PaddedPieceSize(sectorSize).Unpadded()))
if err != nil {
return paths, done, xerrors.Errorf("regenerating sector key: %w", err)
return storiface.SectorPaths{}, nil, xerrors.Errorf("regenerating sector key: %w", err)
}
// Sector key should exist now, let's grab the paths
return sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage)
return sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathSealing)
}
func (sb *Sealer) regenerateSectorKey(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, keyDataCid cid.Cid) error {
paths, done, err := sb.sectors.AcquireSectorCopy(ctx, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}
defer done()
// stat paths.Sealed, make sure it doesn't exist
_, err = os.Stat(paths.Sealed)
if err == nil {
return xerrors.Errorf("sealed file exists before regenerating sector key")
}
if !os.IsNotExist(err) {
return xerrors.Errorf("stat sealed path: %w", err)
}
// prepare SDR params
commp, err := commcid.CIDToDataCommitmentV1(keyDataCid)
if err != nil {
return xerrors.Errorf("computing commP: %w", err)
}
replicaID, err := sector.ProofType.ReplicaId(sector.ID.Miner, sector.ID.Number, ticket, commp)
if err != nil {
return xerrors.Errorf("computing replica id: %w", err)
}
// generate new sector key
err = ffi.GenerateSDR(
sector.ProofType,
paths.Cache,
replicaID,
)
if err != nil {
return xerrors.Errorf("presealing sector %d (%s): %w", sector.ID.Number, paths.Unsealed, err)
}
// move the last layer (sector key) to the sealed location
layerCount, err := proofpaths.SDRLayers(sector.ProofType)
if err != nil {
return xerrors.Errorf("getting SDR layer count: %w", err)
}
err = spaths.Move(proofpaths.LayerFileName(layerCount), paths.Sealed)
if err != nil {
return xerrors.Errorf("moving sector key: %w", err)
}
// remove other layer files
for i := 1; i < layerCount; i++ {
err = os.Remove(proofpaths.LayerFileName(i))
if err != nil {
return xerrors.Errorf("removing layer file %d: %w", i, err)
}
}
return nil
}
func (sb *Sealer) UnsealPiece(ctx context.Context, sector storiface.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error {
// NOTE: This function will copy sealed/unsealed (and possible update) files
// into sealing storage. Those copies get cleaned up in LocalWorker.UnsealPiece
// after this call exists. The resulting unsealed file is going to be moved to
// long-term storage as well.
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return err
}
maxPieceSize := abi.PaddedPieceSize(ssize)
// try finding existing
// try finding existing (also move to a sealing path if it's not here)
unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTNone, storiface.PathSealing)
var pf *partialfile.PartialFile
@ -545,7 +611,8 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector storiface.SectorRef, o
}
// Piece data non-upgrade sealed in sector
srcPaths, srcDone, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathSealing)
// (copy so that files stay in long-term storage)
srcPaths, srcDone, err := sb.sectors.AcquireSectorCopy(ctx, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquire sealed sector paths: %w", err)
}
@ -717,65 +784,6 @@ func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector storif
return true, nil
}
func (sb *Sealer) RegenerateSectorKey(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, keyDataCid cid.Cid) error {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}
defer done()
// stat paths.Sealed, make sure it doesn't exist
_, err = os.Stat(paths.Sealed)
if err == nil {
return xerrors.Errorf("sealed file exists before regenerating sector key")
}
if !os.IsNotExist(err) {
return xerrors.Errorf("stat sealed path: %w", err)
}
// prepare SDR params
commp, err := commcid.CIDToDataCommitmentV1(keyDataCid)
if err != nil {
return xerrors.Errorf("computing commP: %w", err)
}
replicaID, err := sector.ProofType.ReplicaId(sector.ID.Miner, sector.ID.Number, ticket, commp)
if err != nil {
return xerrors.Errorf("computing replica id: %w", err)
}
// generate new sector key
err = ffi.GenerateSDR(
sector.ProofType,
paths.Cache,
replicaID,
)
if err != nil {
return xerrors.Errorf("presealing sector %d (%s): %w", sector.ID.Number, paths.Unsealed, err)
}
// move the last layer (sector key) to the sealed location
layerCount, err := proofpaths.SDRLayers(sector.ProofType)
if err != nil {
return xerrors.Errorf("getting SDR layer count: %w", err)
}
err = spaths.Move(proofpaths.LayerFileName(layerCount), paths.Sealed)
if err != nil {
return xerrors.Errorf("moving sector key: %w", err)
}
// remove other layer files
for i := 1; i < layerCount; i++ {
err = os.Remove(proofpaths.LayerFileName(i))
if err != nil {
return xerrors.Errorf("removing layer file %d: %w", i, err)
}
}
return nil
}
func (sb *Sealer) SealPreCommit1(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storiface.PreCommit1Out, err error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTSealed|storiface.FTCache, storiface.PathSealing)
if err != nil {

View File

@ -11,6 +11,7 @@ type SectorProvider interface {
// * returns storiface.ErrSectorNotFound if a requested existing sector doesn't exist
// * returns an error when allocate is set, and existing isn't, and the sector exists
AcquireSector(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error)
AcquireSectorCopy(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error)
}
var _ SectorProvider = &basicfs.Provider{}

View File

@ -180,6 +180,10 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector stor
}, nil
}
func (l *localWorkerPathProvider) AcquireSectorCopy(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) {
return (&localWorkerPathProvider{w: l.w, op: storiface.AcquireCopy}).AcquireSector(ctx, id, existing, allocate, ptype)
}
func (l *LocalWorker) ffiExec() (storiface.Storage, error) {
return ffiwrapper.New(&localWorkerPathProvider{w: l})
}
@ -571,15 +575,16 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storiface.SectorRe
return nil, xerrors.Errorf("unsealing sector: %w", err)
}
if err = l.storage.RemoveCopies(ctx, sector.ID, storiface.FTSealed); err != nil {
return nil, xerrors.Errorf("removing source data: %w", err)
// todo move unsealed to long term storage (IN MANAGER< NOT HERE!!)
storageTypes := []storiface.SectorFileType{storiface.FTSealed, storiface.FTCache, storiface.FTUpdate, storiface.FTUpdateCache}
for _, fileType := range storageTypes {
if err = l.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil {
return nil, xerrors.Errorf("removing source data: %w", err)
}
}
if err = l.storage.RemoveCopies(ctx, sector.ID, storiface.FTCache); err != nil {
return nil, xerrors.Errorf("removing source data: %w", err)
}
log.Debugf("worker has unsealed piece, sector=%+v", sector.ID)
log.Debugf("unsealed piece, sector=%+v", sector.ID)
return nil, nil
})