diff --git a/ffiwrapper/partialfile.go b/ffiwrapper/partialfile.go index 256a416f3..a5b8f2548 100644 --- a/ffiwrapper/partialfile.go +++ b/ffiwrapper/partialfile.go @@ -53,7 +53,7 @@ func writeTrailer(psz int64, w *os.File, r rlepluslazy.RunIterator) error { } func createPartialFile(maxPieceSize abi.UnpaddedPieceSize, path string) (*partialFile, error) { - f, err := os.OpenFile(path, os.O_RDWR | os.O_CREATE, 0644) + f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644) if err != nil { return nil, xerrors.Errorf("openning partial file '%s': %w", path, err) } @@ -98,7 +98,7 @@ func openPartialFile(maxPieceSize abi.UnpaddedPieceSize, path string) (*partialF } // read trailer var tlen [4]byte - _, err = f.ReadAt(tlen[:], st.Size() - int64(len(tlen))) + _, err = f.ReadAt(tlen[:], st.Size()-int64(len(tlen))) if err != nil { return xerrors.Errorf("reading trailer length: %w", err) } @@ -107,7 +107,7 @@ func openPartialFile(maxPieceSize abi.UnpaddedPieceSize, path string) (*partialF trailerLen := binary.LittleEndian.Uint32(tlen[:]) expectLen := int64(trailerLen) + int64(len(tlen)) + int64(maxPieceSize) if expectLen != st.Size() { - return xerrors.Errorf("file '%d' has inconsistent length; has %d bytes; expected %d (%d trailer, %d sector data)", path, st.Size(), expectLen, int64(trailerLen) + int64(len(tlen)), maxPieceSize) + return xerrors.Errorf("file '%d' has inconsistent length; has %d bytes; expected %d (%d trailer, %d sector data)", path, st.Size(), expectLen, int64(trailerLen)+int64(len(tlen)), maxPieceSize) } if trailerLen > veryLargeRle { log.Warnf("Partial file '%s' has a VERY large trailer with %d bytes", path, trailerLen) @@ -161,7 +161,7 @@ func (pf *partialFile) Close() error { return pf.file.Close() } -func (pf *partialFile) Writer(offset abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize) (io.Writer, error) { +func (pf *partialFile) Writer(offset UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.Writer, error) { if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil { return nil, xerrors.Errorf("seek piece start: %w", err) } @@ -190,7 +190,7 @@ func (pf *partialFile) Writer(offset abi.UnpaddedPieceSize, size abi.UnpaddedPie return pf.file, nil } -func (pf *partialFile) MarkAllocated(offset abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize) error { +func (pf *partialFile) MarkAllocated(offset UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { have, err := pf.allocated.RunIterator() if err != nil { return err @@ -208,7 +208,7 @@ func (pf *partialFile) MarkAllocated(offset abi.UnpaddedPieceSize, size abi.Unpa return nil } -func (pf *partialFile) Reader(offset abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize) (*os.File, error) { +func (pf *partialFile) Reader(offset UnpaddedByteIndex, size abi.UnpaddedPieceSize) (*os.File, error) { if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil { return nil, xerrors.Errorf("seek piece start: %w", err) } @@ -230,14 +230,18 @@ func (pf *partialFile) Reader(offset abi.UnpaddedPieceSize, size abi.UnpaddedPie } if c != uint64(size) { - log.Warnf("getting partial file reader reading %d unallocated bytes", uint64(size) - c) + log.Warnf("getting partial file reader reading %d unallocated bytes", uint64(size)-c) } } return pf.file, nil } -func pieceRun(offset abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize) rlepluslazy.RunIterator { +func (pf *partialFile) Allocated() (rlepluslazy.RunIterator, error) { + return pf.allocated.RunIterator() +} + +func pieceRun(offset UnpaddedByteIndex, size abi.UnpaddedPieceSize) rlepluslazy.RunIterator { var runs []rlepluslazy.Run if offset > 0 { runs = append(runs, rlepluslazy.Run{ diff --git a/ffiwrapper/sealer_cgo.go b/ffiwrapper/sealer_cgo.go index 900d728c7..0ed665f2d 100644 --- a/ffiwrapper/sealer_cgo.go +++ b/ffiwrapper/sealer_cgo.go @@ -5,9 +5,11 @@ package ffiwrapper import ( "context" "io" + "io/ioutil" "math/bits" "os" "path/filepath" + "syscall" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -17,6 +19,7 @@ import ( "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/sector-storage/stores" + "github.com/filecoin-project/sector-storage/storiface" "github.com/filecoin-project/sector-storage/zerocomm" ) @@ -54,7 +57,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie maxPieceSize := abi.PaddedPieceSize(sb.ssize).Unpadded() - if offset + pieceSize > maxPieceSize { + if offset+pieceSize > maxPieceSize { return abi.PieceInfo{}, xerrors.Errorf("can't add %d byte piece to sector %v with %d bytes of existing pieces", pieceSize, sector, offset) } @@ -97,7 +100,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie } } - w, err := stagedFile.Writer(offset, pieceSize) + w, err := stagedFile.Writer(UnpaddedByteIndex(offset), pieceSize) if err != nil { return abi.PieceInfo{}, xerrors.Errorf("getting partial file writer: %w", err) } @@ -112,7 +115,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie return abi.PieceInfo{}, xerrors.Errorf("generating piece commitment: %w", err) } - if err := stagedFile.MarkAllocated(offset, pieceSize); err != nil { + if err := stagedFile.MarkAllocated(UnpaddedByteIndex(offset), pieceSize); err != nil { return abi.PieceInfo{}, xerrors.Errorf("marking data range as allocated: %w", err) } @@ -133,6 +136,184 @@ func (cf closerFunc) Close() error { return cf() } +func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error { + maxPieceSize := abi.PaddedPieceSize(sb.ssize).Unpadded() + + // try finding existing + unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, false) + var pf *partialFile + + switch { + case xerrors.Is(err, storiface.ErrSectorNotFound): + unsealedPath, done, err = sb.sectors.AcquireSector(ctx, sector, stores.FTNone, stores.FTUnsealed, false) + if err != nil { + return xerrors.Errorf("acquire unsealed sector path (allocate): %w", err) + } + defer done() + + pf, err = createPartialFile(maxPieceSize, unsealedPath.Unsealed) + if err != nil { + return xerrors.Errorf("create unsealed file: %w", err) + } + + case err == nil: + defer done() + + pf, err = openPartialFile(maxPieceSize, unsealedPath.Unsealed) + if err != nil { + return xerrors.Errorf("opening partial file: %w", err) + } + default: + return xerrors.Errorf("acquire unsealed sector path (existing): %w", err) + } + defer pf.Close() + + allocated, err := pf.Allocated() + if err != nil { + return xerrors.Errorf("getting bitruns of allocated data: %w", err) + } + + toUnseal, err := computeUnsealRanges(allocated, offset, size) + if err != nil { + return xerrors.Errorf("computing unseal ranges: %w", err) + } + + if !toUnseal.HasNext() { + return nil + } + + srcPaths, srcDone, err := sb.sectors.AcquireSector(ctx, sector, stores.FTCache|stores.FTSealed, stores.FTNone, false) + if err != nil { + return xerrors.Errorf("acquire sealed sector paths: %w", err) + } + defer srcDone() + + var at, nextat uint64 + for { + piece, err := toUnseal.NextRun() + if err != nil { + return xerrors.Errorf("getting next range to unseal: %w", err) + } + + at = nextat + nextat += piece.Len + + if !piece.Val { + continue + } + + out, err := pf.Writer(offset, size) + if err != nil { + return xerrors.Errorf("getting partial file writer: %w", err) + } + + // + outpipe, err := ioutil.TempFile(os.TempDir(), "sector-storage-unseal-") + if err != nil { + return xerrors.Errorf("creating temp pipe file: %w", err) + } + var outpath string + var perr error + outWait := make(chan struct{}) + + { + outpath = outpipe.Name() + if err := outpipe.Close(); err != nil { + return xerrors.Errorf("close pipe temp: %w", err) + } + if err := os.Remove(outpath); err != nil { + return xerrors.Errorf("rm pipe temp: %w", err) + } + + // TODO: Make UnsealRange write to an FD + if err := syscall.Mkfifo(outpath, 0600); err != nil { + return xerrors.Errorf("mk temp fifo: %w", err) + } + + outpipe, err = os.OpenFile(outpath, os.O_RDONLY, 0600) + if err != nil { + return xerrors.Errorf("open temp pipe: %w", err) + } + + go func() { + defer close(outWait) + defer os.Remove(outpath) + defer outpipe.Close() + + _, perr = io.CopyN(out, outpipe, int64(size)) + }() + } + // + + // TODO: This may be possible to do in parallel + err = ffi.UnsealRange(sb.sealProofType, + srcPaths.Cache, + srcPaths.Sealed, + outpath, + sector.Number, + sector.Miner, + randomness, + cid, + at, + piece.Len) + if err != nil { + return xerrors.Errorf("unseal range: %w", err) + } + + select { + case <-outWait: + case <-ctx.Done(): + return ctx.Err() + } + + if perr != nil { + return xerrors.Errorf("piping output to unsealed file: %w", perr) + } + + if err := pf.MarkAllocated(UnpaddedByteIndex(at), abi.UnpaddedPieceSize(piece.Len)); err != nil { + return xerrors.Errorf("marking unsealed range as allocated: %w", err) + } + + if !toUnseal.HasNext() { + break + } + } + + return nil +} + +func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { + path, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, false) + if err != nil { + return xerrors.Errorf("acquire unsealed sector path: %w", err) + } + defer done() + + maxPieceSize := abi.PaddedPieceSize(sb.ssize).Unpadded() + + pf, err := openPartialFile(maxPieceSize, path.Unsealed) + if xerrors.Is(err, os.ErrNotExist) { + return xerrors.Errorf("opening partial file: %w", err) + } + + f, err := pf.Reader(offset, size) + if err != nil { + pf.Close() + return xerrors.Errorf("getting partial file reader: %w", err) + } + + if _, err := io.CopyN(writer, f, int64(size)); err != nil { + pf.Close() + return xerrors.Errorf("reading unsealed file: %w", err) + } + + if err := pf.Close(); err != nil { + return xerrors.Errorf("closing partial file: %w", err) + } + + return nil +} + func (sb *Sealer) ReadPieceFromSealedSector(ctx context.Context, sector abi.SectorID, offset UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealedCID cid.Cid) (io.ReadCloser, error) { { path, doneUnsealed, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, false) diff --git a/ffiwrapper/types.go b/ffiwrapper/types.go index 98612175d..932dc3c42 100644 --- a/ffiwrapper/types.go +++ b/ffiwrapper/types.go @@ -2,7 +2,6 @@ package ffiwrapper import ( "context" - "errors" "io" "github.com/ipfs/go-cid" @@ -41,10 +40,8 @@ type Verifier interface { GenerateWinningPoStSectorChallenge(context.Context, abi.RegisteredProof, abi.ActorID, abi.PoStRandomness, uint64) ([]uint64, error) } -var ErrSectorNotFound = errors.New("sector not found") - type SectorProvider interface { - // * returns ErrSectorNotFound if a requested existing sector doesn't exist + // * returns storiface.ErrSectorNotFound if a requested existing sector doesn't exist // * returns an error when allocate is set, and existing isn't, and the sector exists AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) } diff --git a/go.mod b/go.mod index fe4a255ef..429911583 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e github.com/elastic/go-sysinfo v1.3.0 github.com/filecoin-project/filecoin-ffi v0.0.0-20200326153646-e899cc1dd072 - github.com/filecoin-project/go-bitfield v0.0.2-0.20200518131841-989ba5ae71af + github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5 github.com/filecoin-project/go-paramfetch v0.0.1 github.com/filecoin-project/specs-actors v0.4.1-0.20200508202406-42be6629284d diff --git a/go.sum b/go.sum index 1538984aa..fcff267f9 100644 --- a/go.sum +++ b/go.sum @@ -37,6 +37,8 @@ github.com/filecoin-project/go-bitfield v0.0.0-20200416002808-b3ee67ec9060 h1:/3 github.com/filecoin-project/go-bitfield v0.0.0-20200416002808-b3ee67ec9060/go.mod h1:iodsLxOFZnqKtjj2zkgqzoGNrv6vUqj69AT/J8DKXEw= github.com/filecoin-project/go-bitfield v0.0.2-0.20200518131841-989ba5ae71af h1:g34Sk2coFzyNUv61ZLQ+yyS4Fm8aJCqEaZMKf8Dv6Hs= github.com/filecoin-project/go-bitfield v0.0.2-0.20200518131841-989ba5ae71af/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= +github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e h1:gkG/7G+iKy4He+IiQNeQn+nndFznb/vCoOR8iRQsm60= +github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5 h1:yvQJCW9mmi9zy+51xA01Ea2X7/dL7r8eKDPuGUjRmbo= diff --git a/stores/remote.go b/stores/remote.go index 325060747..151c0ed2f 100644 --- a/stores/remote.go +++ b/stores/remote.go @@ -19,6 +19,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/sector-storage/storiface" "github.com/filecoin-project/sector-storage/tarutil" ) @@ -118,7 +119,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi. } if len(si) == 0 { - return "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote(%d): not found", s, fileType) + return "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote(%d): %w", s, fileType, storiface.ErrSectorNotFound) } sort.Slice(si, func(i, j int) bool {