Replica unseal working
This commit is contained in:
parent
b2a6d308ba
commit
98637b357b
50
extern/sector-storage/ffiwrapper/sealer_cgo.go
vendored
50
extern/sector-storage/ffiwrapper/sealer_cgo.go
vendored
@ -15,6 +15,8 @@ import (
|
||||
"os"
|
||||
"runtime"
|
||||
|
||||
nr "github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
@ -369,8 +371,8 @@ func (sb *Sealer) pieceCid(spt abi.RegisteredSealProof, in []byte) (cid.Cid, err
|
||||
return pieceCID, werr()
|
||||
}
|
||||
|
||||
func (sb *Sealer) tryDecodeUpdatedReplica(ctx context.Context, sector storage.SectorRef, commD cid.Cid, unsealedPath string) (bool, error) {
|
||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUpdate|storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage)
|
||||
func (sb *Sealer) tryDecodeUpdatedReplica(ctx context.Context, sector storage.SectorRef, commD cid.Cid, unsealedPath string, randomness abi.SealRandomness) (bool, error) {
|
||||
replicaPath, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTNone, storiface.PathStorage)
|
||||
if xerrors.Is(err, storiface.ErrSectorNotFound) {
|
||||
return false, nil
|
||||
} else if err != nil {
|
||||
@ -378,12 +380,52 @@ func (sb *Sealer) tryDecodeUpdatedReplica(ctx context.Context, sector storage.Se
|
||||
}
|
||||
defer done()
|
||||
|
||||
sealedPaths, releaseSectorKey, err := sb.AcquireSectorKeyOrRegenerate(ctx, sector, randomness)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("acquiring sealed sector: %w", err)
|
||||
}
|
||||
defer releaseSectorKey()
|
||||
|
||||
// Sector data stored in replica update
|
||||
updateProof, err := sector.ProofType.RegisteredUpdateProof()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, ffi.SectorUpdate.DecodeFrom(updateProof, unsealedPath, paths.Update, paths.Sealed, paths.Cache, commD)
|
||||
return true, ffi.SectorUpdate.DecodeFrom(updateProof, unsealedPath, replicaPath.Update, sealedPaths.Sealed, sealedPaths.Cache, commD)
|
||||
}
|
||||
|
||||
func (sb *Sealer) AcquireSectorKeyOrRegenerate(ctx context.Context, sector storage.SectorRef, randomness abi.SealRandomness) (storiface.SectorPaths, func(), error) {
|
||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage)
|
||||
if err == nil {
|
||||
return paths, done, err
|
||||
} else if !xerrors.Is(err, storiface.ErrSectorNotFound) {
|
||||
return paths, done, xerrors.Errorf("reading sector key: %w", err)
|
||||
}
|
||||
|
||||
// Sector key can't be found, so let's regenerate it
|
||||
sectorSize, err := sector.ProofType.SectorSize()
|
||||
if err != nil {
|
||||
return paths, done, xerrors.Errorf("retrieving sector size: %w", err)
|
||||
}
|
||||
paddedSize := abi.PaddedPieceSize(sectorSize)
|
||||
|
||||
_, err = sb.AddPiece(ctx, sector, nil, paddedSize.Unpadded(), nr.NewNullReader(paddedSize.Unpadded()))
|
||||
if err != nil {
|
||||
return paths, done, xerrors.Errorf("recomputing empty data: %w", err)
|
||||
}
|
||||
|
||||
pc1, err := sb.SealPreCommit1(ctx, sector, randomness, []abi.PieceInfo{{PieceCID: zerocomm.ZeroPieceCommitment(paddedSize.Unpadded()), Size: paddedSize}})
|
||||
if err != nil {
|
||||
return paths, done, xerrors.Errorf("during pc1: %w", err)
|
||||
}
|
||||
|
||||
_, err = sb.SealPreCommit2(ctx, sector, pc1)
|
||||
if err != nil {
|
||||
return paths, done, xerrors.Errorf("during pc2: %w", err)
|
||||
}
|
||||
|
||||
// Sector key should exist now, let's grab the paths
|
||||
return sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage)
|
||||
}
|
||||
|
||||
func (sb *Sealer) UnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error {
|
||||
@ -437,7 +479,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector storage.SectorRef, off
|
||||
}
|
||||
|
||||
// If piece data stored in updated replica decode whole sector
|
||||
decoded, err := sb.tryDecodeUpdatedReplica(ctx, sector, commd, unsealedPath.Unsealed)
|
||||
decoded, err := sb.tryDecodeUpdatedReplica(ctx, sector, commd, unsealedPath.Unsealed, randomness)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("decoding sector from replica: %w", err)
|
||||
}
|
||||
|
10
extern/sector-storage/manager.go
vendored
10
extern/sector-storage/manager.go
vendored
@ -273,7 +273,7 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storage.SectorR
|
||||
defer cancel()
|
||||
|
||||
log.Debugf("acquire unseal sector lock for sector %d", sector.ID)
|
||||
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed|storiface.FTCache, storiface.FTUnsealed); err != nil {
|
||||
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed|storiface.FTCache|storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTUnsealed); err != nil {
|
||||
return xerrors.Errorf("acquiring unseal sector lock: %w", err)
|
||||
}
|
||||
|
||||
@ -281,8 +281,12 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storage.SectorR
|
||||
// put it in the sealing scratch space.
|
||||
sealFetch := func(ctx context.Context, worker Worker) error {
|
||||
log.Debugf("copy sealed/cache sector data for sector %d", sector.ID)
|
||||
if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)); err != nil {
|
||||
return xerrors.Errorf("copy sealed/cache sector data: %w", err)
|
||||
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy))
|
||||
if err != nil {
|
||||
_, err2 := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing, storiface.AcquireCopy))
|
||||
if err2 != nil {
|
||||
return xerrors.Errorf("copy sealed/cache sector data: %w %w", err, err2)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
6
extern/sector-storage/manager_test.go
vendored
6
extern/sector-storage/manager_test.go
vendored
@ -315,6 +315,12 @@ func TestSnapDeals(t *testing.T) {
|
||||
require.NoError(t, m.GenerateSectorKeyFromData(ctx, sid, out.NewUnsealed))
|
||||
fmt.Printf("GSK duration (%s): %s\n", ss.ShortString(), time.Since(startGSK))
|
||||
|
||||
fmt.Printf("Remove data\n")
|
||||
require.NoError(t, m.FinalizeSector(ctx, sid, nil))
|
||||
fmt.Printf("Release Sector Key\n")
|
||||
require.NoError(t, m.ReleaseSectorKey(ctx, sid))
|
||||
fmt.Printf("Unseal Replica\n")
|
||||
require.NoError(t, m.SectorsUnsealPiece(ctx, sid, 0, p1.Size.Unpadded(), ticket, &out.NewUnsealed))
|
||||
}
|
||||
|
||||
func TestRedoPC1(t *testing.T) {
|
||||
|
Loading…
Reference in New Issue
Block a user