Merge pull request #8478 from filecoin-project/gstuart/replica-update-unseal

Snap Deals full unseal
This commit is contained in:
Łukasz Magiera 2022-05-24 14:48:10 +02:00 committed by GitHub
commit 4b1bfa9964
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 128 additions and 36 deletions

View File

@ -31,6 +31,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/fr32"
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
nr "github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader"
)
var _ Storage = &Sealer{}
@ -369,8 +370,8 @@ func (sb *Sealer) pieceCid(spt abi.RegisteredSealProof, in []byte) (cid.Cid, err
return pieceCID, werr()
}
func (sb *Sealer) tryDecodeUpdatedReplica(ctx context.Context, sector storage.SectorRef, commD cid.Cid, unsealedPath string) (bool, error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUpdate|storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage)
func (sb *Sealer) tryDecodeUpdatedReplica(ctx context.Context, sector storage.SectorRef, commD cid.Cid, unsealedPath string, randomness abi.SealRandomness) (bool, error) {
replicaPath, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTNone, storiface.PathStorage)
if xerrors.Is(err, storiface.ErrSectorNotFound) {
return false, nil
} else if err != nil {
@ -378,12 +379,47 @@ func (sb *Sealer) tryDecodeUpdatedReplica(ctx context.Context, sector storage.Se
}
defer done()
sealedPaths, done2, err := sb.AcquireSectorKeyOrRegenerate(ctx, sector, randomness)
if err != nil {
return false, xerrors.Errorf("acquiring sealed sector: %w", err)
}
defer done2()
// Sector data stored in replica update
updateProof, err := sector.ProofType.RegisteredUpdateProof()
if err != nil {
return false, err
}
return true, ffi.SectorUpdate.DecodeFrom(updateProof, unsealedPath, paths.Update, paths.Sealed, paths.Cache, commD)
return true, ffi.SectorUpdate.DecodeFrom(updateProof, unsealedPath, replicaPath.Update, sealedPaths.Sealed, sealedPaths.Cache, commD)
}
func (sb *Sealer) AcquireSectorKeyOrRegenerate(ctx context.Context, sector storage.SectorRef, randomness abi.SealRandomness) (storiface.SectorPaths, func(), error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage)
if err == nil {
return paths, done, err
} else if !xerrors.Is(err, storiface.ErrSectorNotFound) {
return paths, done, xerrors.Errorf("reading sector key: %w", err)
}
// Sector key can't be found, so let's regenerate it
sectorSize, err := sector.ProofType.SectorSize()
if err != nil {
return paths, done, xerrors.Errorf("retrieving sector size: %w", err)
}
paddedSize := abi.PaddedPieceSize(sectorSize)
_, err = sb.AddPiece(ctx, sector, nil, paddedSize.Unpadded(), nr.NewNullReader(paddedSize.Unpadded()))
if err != nil {
return paths, done, xerrors.Errorf("recomputing empty data: %w", err)
}
err = sb.RegenerateSectorKey(ctx, sector, randomness, []abi.PieceInfo{{PieceCID: zerocomm.ZeroPieceCommitment(paddedSize.Unpadded()), Size: paddedSize}})
if err != nil {
return paths, done, xerrors.Errorf("during pc1: %w", err)
}
// Sector key should exist now, let's grab the paths
return sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage)
}
func (sb *Sealer) UnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error {
@ -437,7 +473,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector storage.SectorRef, off
}
// If piece data stored in updated replica decode whole sector
decoded, err := sb.tryDecodeUpdatedReplica(ctx, sector, commd, unsealedPath.Unsealed)
decoded, err := sb.tryDecodeUpdatedReplica(ctx, sector, commd, unsealedPath.Unsealed, randomness)
if err != nil {
return xerrors.Errorf("decoding sector from replica: %w", err)
}
@ -618,6 +654,51 @@ func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector storag
return true, nil
}
func (sb *Sealer) RegenerateSectorKey(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) error {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed|storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}
defer done()
e, err := os.OpenFile(paths.Sealed, os.O_RDWR|os.O_CREATE, 0644) // nolint:gosec
if err != nil {
return xerrors.Errorf("ensuring sealed file exists: %w", err)
}
if err := e.Close(); err != nil {
return err
}
var sum abi.UnpaddedPieceSize
for _, piece := range pieces {
sum += piece.Size.Unpadded()
}
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return err
}
ussize := abi.PaddedPieceSize(ssize).Unpadded()
if sum != ussize {
return xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum))
}
// TODO: context cancellation respect
_, err = ffi.SealPreCommitPhase1(
sector.ProofType,
paths.Cache,
paths.Unsealed,
paths.Sealed,
sector.ID.Number,
sector.ID.Miner,
ticket,
pieces,
)
if err != nil {
return xerrors.Errorf("presealing sector %d (%s): %w", sector.ID.Number, paths.Unsealed, err)
}
return nil
}
func (sb *Sealer) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTSealed|storiface.FTCache, storiface.PathSealing)
if err != nil {

View File

@ -273,7 +273,7 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storage.SectorR
defer cancel()
log.Debugf("acquire unseal sector lock for sector %d", sector.ID)
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed|storiface.FTCache, storiface.FTUnsealed); err != nil {
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed|storiface.FTCache|storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTUnsealed); err != nil {
return xerrors.Errorf("acquiring unseal sector lock: %w", err)
}
@ -281,8 +281,11 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storage.SectorR
// put it in the sealing scratch space.
sealFetch := func(ctx context.Context, worker Worker) error {
log.Debugf("copy sealed/cache sector data for sector %d", sector.ID)
if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)); err != nil {
return xerrors.Errorf("copy sealed/cache sector data: %w", err)
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy))
_, err2 := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing, storiface.AcquireCopy))
if err != nil && err2 != nil {
return xerrors.Errorf("cannot unseal piece. error fetching sealed data: %w. error fetching replica data: %w", err, err2)
}
return nil

View File

@ -315,6 +315,12 @@ func TestSnapDeals(t *testing.T) {
require.NoError(t, m.GenerateSectorKeyFromData(ctx, sid, out.NewUnsealed))
fmt.Printf("GSK duration (%s): %s\n", ss.ShortString(), time.Since(startGSK))
fmt.Printf("Remove data\n")
require.NoError(t, m.FinalizeSector(ctx, sid, nil))
fmt.Printf("Release Sector Key\n")
require.NoError(t, m.ReleaseSectorKey(ctx, sid))
fmt.Printf("Unseal Replica\n")
require.NoError(t, m.SectorsUnsealPiece(ctx, sid, 0, p1.Size.Unpadded(), ticket, &out.NewUnsealed))
}
func TestRedoPC1(t *testing.T) {

View File

@ -289,7 +289,7 @@ func (l *LocalWorker) asyncCall(ctx context.Context, sector storage.SectorRef, r
log.Errorf("get hostname err: %+v", err)
}
err = xerrors.Errorf("%w [Hostname: %s]", err.Error(), hostname)
err = xerrors.Errorf("%w [Hostname: %s]", err, hostname)
}
if doReturn(ctx, rt, ci, l.ret, res, toCallError(err)) {

View File

@ -22,6 +22,7 @@ import (
"github.com/filecoin-project/lotus/chain/types"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
)
@ -220,7 +221,7 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er
m.minerSector(sector.SectorType, sector.SectorNumber),
pieceSizes,
p.Unpadded(),
NewNullReader(p.Unpadded()))
nullreader.NewNullReader(p.Unpadded()))
if err != nil {
err = xerrors.Errorf("writing padding piece: %w", err)
deal.accepted(sector.SectorNumber, offset, err)

View File

@ -1,5 +1,23 @@
package nullreader
import (
"io"
"github.com/filecoin-project/go-state-types/abi"
)
type NullReader struct {
*io.LimitedReader
}
func NewNullReader(size abi.UnpaddedPieceSize) io.Reader {
return &NullReader{(io.LimitReader(&Reader{}, int64(size))).(*io.LimitedReader)}
}
func (m NullReader) NullBytes() int64 {
return m.N
}
// TODO: extract this to someplace where it can be shared with lotus
type Reader struct{}

View File

@ -1,20 +0,0 @@
package sealing
import (
"io"
"github.com/filecoin-project/go-state-types/abi"
nr "github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader"
)
type NullReader struct {
*io.LimitedReader
}
func NewNullReader(size abi.UnpaddedPieceSize) io.Reader {
return &NullReader{(io.LimitReader(&nr.Reader{}, int64(size))).(*io.LimitedReader)}
}
func (m NullReader) NullBytes() int64 {
return m.N
}

View File

@ -21,6 +21,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader"
)
var DealSectorPriority = 1024
@ -91,7 +92,7 @@ func (m *Sealing) padSector(ctx context.Context, sectorID storage.SectorRef, exi
for i, size := range sizes {
expectCid := zerocomm.ZeroPieceCommitment(size)
ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, NewNullReader(size))
ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, nullreader.NewNullReader(size))
if err != nil {
return nil, xerrors.Errorf("add piece: %w", err)
}

View File

@ -21,7 +21,8 @@ import (
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader"
)
var log = logging.Logger("rpcenc")
@ -101,7 +102,7 @@ func ReaderParamEncoder(addr string) jsonrpc.Option {
return jsonrpc.WithParamEncoder(new(io.Reader), func(value reflect.Value) (reflect.Value, error) {
r := value.Interface().(io.Reader)
if r, ok := r.(*sealing.NullReader); ok {
if r, ok := r.(*nullreader.NullReader); ok {
return reflect.ValueOf(ReaderStream{Type: Null, Info: fmt.Sprint(r.N)}), nil
}
@ -418,7 +419,7 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
return reflect.Value{}, xerrors.Errorf("parsing null byte count: %w", err)
}
return reflect.ValueOf(sealing.NewNullReader(abi.UnpaddedPieceSize(n))), nil
return reflect.ValueOf(nullreader.NewNullReader(abi.UnpaddedPieceSize(n))), nil
}
u, err := uuid.Parse(rs.Info)

View File

@ -14,7 +14,8 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-jsonrpc"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader"
)
type ReaderHandler struct {
@ -57,7 +58,7 @@ func (h *ReaderHandler) ReadAll(ctx context.Context, r io.Reader) ([]byte, error
}
func (h *ReaderHandler) ReadNullLen(ctx context.Context, r io.Reader) (int64, error) {
return r.(*sealing.NullReader).N, nil
return r.(*nullreader.NullReader).N, nil
}
func (h *ReaderHandler) ReadUrl(ctx context.Context, u string) (string, error) {
@ -118,7 +119,7 @@ func TestNullReaderProxy(t *testing.T) {
defer closer()
n, err := client.ReadNullLen(context.TODO(), sealing.NewNullReader(1016))
n, err := client.ReadNullLen(context.TODO(), nullreader.NewNullReader(1016))
require.NoError(t, err)
require.Equal(t, int64(1016), n)
}