lpseal: Implement KeepUnsealed
This commit is contained in:
parent
c68257236b
commit
bbf05bcccf
@ -279,18 +279,48 @@ func (sb *SealCalls) LocalStorage(ctx context.Context) ([]storiface.StoragePath,
|
|||||||
return sb.sectors.localStore.Local(ctx)
|
return sb.sectors.localStore.Local(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sb *SealCalls) FinalizeSector(ctx context.Context, sector storiface.SectorRef) error {
|
func (sb *SealCalls) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed bool) error {
|
||||||
paths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
|
alloc := storiface.FTNone
|
||||||
|
if keepUnsealed {
|
||||||
|
// note: In lotus-provider we don't write the unsealed file in any of the previous stages, it's only written here from tree-d
|
||||||
|
alloc = storiface.FTUnsealed
|
||||||
|
}
|
||||||
|
|
||||||
|
sectorPaths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, alloc, storiface.PathSealing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("acquiring sector paths: %w", err)
|
return xerrors.Errorf("acquiring sector paths: %w", err)
|
||||||
}
|
}
|
||||||
defer releaseSector()
|
defer releaseSector()
|
||||||
|
|
||||||
ssize, err := sector.ProofType.SectorSize()
|
ssize, err := sector.ProofType.SectorSize()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("getting sector size: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// todo treed into unsealed
|
if keepUnsealed {
|
||||||
|
// tree-d contains exactly unsealed data in the prefix, so
|
||||||
|
// * we move it to a temp file
|
||||||
|
// * we truncate the temp file to the sector size
|
||||||
|
// * we move the temp file to the unsealed location
|
||||||
|
|
||||||
if err := ffi.ClearCache(uint64(ssize), paths.Cache); err != nil {
|
// move tree-d to temp file
|
||||||
|
tempUnsealed := filepath.Join(sectorPaths.Cache, storiface.SectorName(sector.ID))
|
||||||
|
if err := os.Rename(filepath.Join(sectorPaths.Cache, proofpaths.TreeDName), tempUnsealed); err != nil {
|
||||||
|
return xerrors.Errorf("moving tree-d to temp file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// truncate sealed file to sector size
|
||||||
|
if err := os.Truncate(tempUnsealed, int64(ssize)); err != nil {
|
||||||
|
return xerrors.Errorf("truncating unsealed file to sector size: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// move temp file to unsealed location
|
||||||
|
if err := paths.Move(tempUnsealed, sectorPaths.Unsealed); err != nil {
|
||||||
|
return xerrors.Errorf("move temp unsealed sector to final location (%s -> %s): %w", tempUnsealed, sectorPaths.Unsealed, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := ffi.ClearCache(uint64(ssize), sectorPaths.Cache); err != nil {
|
||||||
return xerrors.Errorf("clearing cache: %w", err)
|
return xerrors.Errorf("clearing cache: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -112,7 +112,7 @@ func (p *PieceIngester) AllocatePieceToSector(ctx context.Context, maddr address
|
|||||||
f05_deal_end_epoch) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)`,
|
f05_deal_end_epoch) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)`,
|
||||||
mid, n, 0,
|
mid, n, 0,
|
||||||
piece.DealProposal.PieceCID, piece.DealProposal.PieceSize,
|
piece.DealProposal.PieceCID, piece.DealProposal.PieceSize,
|
||||||
source.String(), dataHdrJson, rawSize, true,
|
source.String(), dataHdrJson, rawSize, !piece.KeepUnsealed,
|
||||||
piece.PublishCid, piece.DealID, dealProposalJson, piece.DealSchedule.StartEpoch, piece.DealSchedule.EndEpoch)
|
piece.PublishCid, piece.DealID, dealProposalJson, piece.DealSchedule.StartEpoch, piece.DealSchedule.EndEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("inserting into sectors_sdr_initial_pieces: %w", err)
|
return false, xerrors.Errorf("inserting into sectors_sdr_initial_pieces: %w", err)
|
||||||
|
@ -50,6 +50,12 @@ func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do
|
|||||||
}
|
}
|
||||||
task := tasks[0]
|
task := tasks[0]
|
||||||
|
|
||||||
|
var keepUnsealed bool
|
||||||
|
|
||||||
|
if err := f.db.QueryRow(ctx, `select bool_or(not data_delete_on_finalize) from sectors_sdr_initial_pieces where sp_id=$1 and sector_number=$2`, task.SpID, task.SectorNumber).Scan(&keepUnsealed); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
sector := storiface.SectorRef{
|
sector := storiface.SectorRef{
|
||||||
ID: abi.SectorID{
|
ID: abi.SectorID{
|
||||||
Miner: abi.ActorID(task.SpID),
|
Miner: abi.ActorID(task.SpID),
|
||||||
@ -58,7 +64,7 @@ func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do
|
|||||||
ProofType: abi.RegisteredSealProof(task.RegSealProof),
|
ProofType: abi.RegisteredSealProof(task.RegSealProof),
|
||||||
}
|
}
|
||||||
|
|
||||||
err = f.sc.FinalizeSector(ctx, sector)
|
err = f.sc.FinalizeSector(ctx, sector, keepUnsealed)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("finalizing sector: %w", err)
|
return false, xerrors.Errorf("finalizing sector: %w", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user