diff --git a/ffiwrapper/partialfile.go b/ffiwrapper/partialfile.go index e7f89302e..094448e89 100644 --- a/ffiwrapper/partialfile.go +++ b/ffiwrapper/partialfile.go @@ -24,7 +24,7 @@ const veryLargeRle = 1 << 20 // [unpadded (raw) data][rle+][4B LE length fo the rle+ field] type partialFile struct { - maxPiece abi.UnpaddedPieceSize + maxPiece abi.PaddedPieceSize path string allocated rlepluslazy.RLE @@ -55,7 +55,7 @@ func writeTrailer(maxPieceSize int64, w *os.File, r rlepluslazy.RunIterator) err return w.Truncate(maxPieceSize + int64(rb) + 4) } -func createPartialFile(maxPieceSize abi.UnpaddedPieceSize, path string) (*partialFile, error) { +func createPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialFile, error) { f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644) if err != nil { return nil, xerrors.Errorf("openning partial file '%s': %w", path, err) @@ -84,7 +84,7 @@ func createPartialFile(maxPieceSize abi.UnpaddedPieceSize, path string) (*partia return openPartialFile(maxPieceSize, path) } -func openPartialFile(maxPieceSize abi.UnpaddedPieceSize, path string) (*partialFile, error) { +func openPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialFile, error) { f, err := os.OpenFile(path, os.O_RDWR, 0644) if err != nil { return nil, xerrors.Errorf("openning partial file '%s': %w", path, err) @@ -164,7 +164,7 @@ func (pf *partialFile) Close() error { return pf.file.Close() } -func (pf *partialFile) Writer(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.Writer, error) { +func (pf *partialFile) Writer(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Writer, error) { if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil { return nil, xerrors.Errorf("seek piece start: %w", err) } @@ -193,7 +193,7 @@ func (pf *partialFile) Writer(offset storiface.UnpaddedByteIndex, size abi.Unpad return pf.file, nil } -func (pf *partialFile) MarkAllocated(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { +func (pf *partialFile) MarkAllocated(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error { have, err := pf.allocated.RunIterator() if err != nil { return err @@ -211,7 +211,7 @@ func (pf *partialFile) MarkAllocated(offset storiface.UnpaddedByteIndex, size ab return nil } -func (pf *partialFile) Reader(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (*os.File, error) { +func (pf *partialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) { if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil { return nil, xerrors.Errorf("seek piece start: %w", err) } @@ -244,7 +244,7 @@ func (pf *partialFile) Allocated() (rlepluslazy.RunIterator, error) { return pf.allocated.RunIterator() } -func pieceRun(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) rlepluslazy.RunIterator { +func pieceRun(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) rlepluslazy.RunIterator { var runs []rlepluslazy.Run if offset > 0 { runs = append(runs, rlepluslazy.Run{ diff --git a/ffiwrapper/sealer_cgo.go b/ffiwrapper/sealer_cgo.go index b9b7975ab..aace82c44 100644 --- a/ffiwrapper/sealer_cgo.go +++ b/ffiwrapper/sealer_cgo.go @@ -8,7 +8,6 @@ import ( "io/ioutil" "math/bits" "os" - "path/filepath" "syscall" "github.com/ipfs/go-cid" @@ -18,6 +17,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-storage/storage" + "github.com/filecoin-project/sector-storage/fr32" "github.com/filecoin-project/sector-storage/stores" "github.com/filecoin-project/sector-storage/storiface" "github.com/filecoin-project/sector-storage/zerocomm" @@ -55,9 +55,9 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie offset += size } - maxPieceSize := abi.PaddedPieceSize(sb.ssize).Unpadded() + maxPieceSize := abi.PaddedPieceSize(sb.ssize) - if offset+pieceSize > maxPieceSize { + if offset.Padded()+pieceSize.Padded() > maxPieceSize { return abi.PieceInfo{}, xerrors.Errorf("can't add %d byte piece to sector %v with %d bytes of existing pieces", pieceSize, sector, offset) } @@ -100,10 +100,16 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie } } - w, err := stagedFile.Writer(storiface.UnpaddedByteIndex(offset), pieceSize) + w, err := stagedFile.Writer(storiface.UnpaddedByteIndex(offset).Padded(), pieceSize.Padded()) if err != nil { return abi.PieceInfo{}, xerrors.Errorf("getting partial file writer: %w", err) } + + w, err = fr32.NewPadWriter(w, pieceSize) + if err != nil { + return abi.PieceInfo{}, xerrors.Errorf("creating padded reader: %w", err) + } + pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), w) prf, werr, err := ToReadableFile(pr, int64(pieceSize)) if err != nil { @@ -115,7 +121,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie return abi.PieceInfo{}, xerrors.Errorf("generating piece commitment: %w", err) } - if err := stagedFile.MarkAllocated(storiface.UnpaddedByteIndex(offset), pieceSize); err != nil { + if err := stagedFile.MarkAllocated(storiface.UnpaddedByteIndex(offset).Padded(), pieceSize.Padded()); err != nil { return abi.PieceInfo{}, xerrors.Errorf("marking data range as allocated: %w", err) } @@ -137,7 +143,7 @@ func (cf closerFunc) Close() error { } func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error { - maxPieceSize := abi.PaddedPieceSize(sb.ssize).Unpadded() + maxPieceSize := abi.PaddedPieceSize(sb.ssize) // try finding existing unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, false) @@ -188,7 +194,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s } defer srcDone() - var at, nextat uint64 + var at, nextat abi.PaddedPieceSize for { piece, err := toUnseal.NextRun() if err != nil { @@ -196,13 +202,13 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s } at = nextat - nextat += piece.Len + nextat += abi.PaddedPieceSize(piece.Len) if !piece.Val { continue } - out, err := pf.Writer(offset, size) + out, err := pf.Writer(offset.Padded(), size.Padded()) if err != nil { return xerrors.Errorf("getting partial file writer: %w", err) } @@ -241,7 +247,13 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s } defer outpipe.Close() - _, perr = io.CopyN(out, outpipe, int64(size)) + padreader, err := fr32.NewPadReader(outpipe, abi.PaddedPieceSize(piece.Len).Unpadded()) + if err != nil { + perr = xerrors.Errorf("creating new padded reader: %w", err) + return + } + + _, perr = io.CopyN(out, padreader, int64(size)) }() } // @@ -255,8 +267,8 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s sector.Miner, randomness, commd, - at, - piece.Len) + uint64(at.Unpadded()), + uint64(abi.PaddedPieceSize(piece.Len).Unpadded())) if err != nil { return xerrors.Errorf("unseal range: %w", err) } @@ -271,7 +283,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s return xerrors.Errorf("piping output to unsealed file: %w", perr) } - if err := pf.MarkAllocated(storiface.UnpaddedByteIndex(at), abi.UnpaddedPieceSize(piece.Len)); err != nil { + if err := pf.MarkAllocated(storiface.PaddedByteIndex(at), abi.PaddedPieceSize(piece.Len)); err != nil { return xerrors.Errorf("marking unsealed range as allocated: %w", err) } @@ -290,20 +302,25 @@ func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.Se } defer done() - maxPieceSize := abi.PaddedPieceSize(sb.ssize).Unpadded() + maxPieceSize := abi.PaddedPieceSize(sb.ssize) pf, err := openPartialFile(maxPieceSize, path.Unsealed) if xerrors.Is(err, os.ErrNotExist) { return xerrors.Errorf("opening partial file: %w", err) } - f, err := pf.Reader(offset, size) + f, err := pf.Reader(offset.Padded(), size.Padded()) if err != nil { pf.Close() return xerrors.Errorf("getting partial file reader: %w", err) } - if _, err := io.CopyN(writer, f, int64(size)); err != nil { + upr, err := fr32.NewUnpadReader(f, size.Padded()) + if err != nil { + return xerrors.Errorf("creating unpadded reader: %w", err) + } + + if _, err := io.CopyN(writer, upr, int64(size)); err != nil { pf.Close() return xerrors.Errorf("reading unsealed file: %w", err) } @@ -355,22 +372,22 @@ func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke return nil, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum)) } - staged := filepath.Join(paths.Cache, "staged") + //staged := filepath.Join(paths.Cache, "staged") - if err := sb.rewriteAsPadded(paths.Unsealed, staged); err != nil { + /*if err := sb.rewriteAsPadded(paths.Unsealed, staged); err != nil { return nil, xerrors.Errorf("rewriting sector as padded: %w", err) - } - defer func() { + }*/ + /*defer func() { if err := os.Remove(staged); err != nil { log.Warnf("Removing staged sector file(%v): %s", sector, err) } }() - + */ // TODO: context cancellation respect p1o, err := ffi.SealPreCommitPhase1( sb.sealProofType, paths.Cache, - staged, + paths.Unsealed, paths.Sealed, sector.Number, sector.Miner, @@ -383,6 +400,7 @@ func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke return p1o, nil } +/* func (sb *Sealer) rewriteAsPadded(unsealed string, staged string) error { maxPieceSize := abi.PaddedPieceSize(sb.ssize).Unpadded() @@ -427,7 +445,7 @@ func (sb *Sealer) rewriteAsPadded(unsealed string, staged string) error { } return werr() -} +}*/ func (sb *Sealer) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (storage.SectorCids, error) { paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, 0, true) diff --git a/ffiwrapper/unseal_ranges.go b/ffiwrapper/unseal_ranges.go index 522b58138..0bc7b52df 100644 --- a/ffiwrapper/unseal_ranges.go +++ b/ffiwrapper/unseal_ranges.go @@ -16,7 +16,7 @@ const mergeGaps = 32 << 20 // TODO const expandRuns = 16 << 20 // unseal more than requested for future requests func computeUnsealRanges(unsealed rlepluslazy.RunIterator, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (rlepluslazy.RunIterator, error) { - todo := pieceRun(offset, size) + todo := pieceRun(offset.Padded(), size.Padded()) todo, err := rlepluslazy.Subtract(todo, unsealed) if err != nil { return nil, xerrors.Errorf("compute todo-unsealed: %w", err) diff --git a/fr32/fr32.go b/fr32/fr32.go index 20d158b3d..ab47311d6 100644 --- a/fr32/fr32.go +++ b/fr32/fr32.go @@ -55,11 +55,6 @@ func Pad(in, out []byte) { } func pad(in, out []byte) { - if len(out) > int(mtTresh) { - mt(in, out, len(out), Pad) - return - } - chunks := len(out) / 128 for chunk := 0; chunk < chunks; chunk++ { inOff := chunk * 127 diff --git a/fr32/readers.go b/fr32/readers.go index b7fdc3843..4e056b0e7 100644 --- a/fr32/readers.go +++ b/fr32/readers.go @@ -59,6 +59,39 @@ func (r *padReader) Read(out []byte) (int, error) { return int(todo.Padded()), err } +func NewPadWriter(dst io.Writer, sz abi.UnpaddedPieceSize) (io.Writer, error) { + if err := sz.Validate(); err != nil { + return nil, xerrors.Errorf("bad piece size: %w", err) + } + + buf := make([]byte, mtTresh*mtChunkCount(sz.Padded())) + + // TODO: Real writer + r, w := io.Pipe() + + pr, err := NewPadReader(r, sz) + if err != nil { + return nil, err + } + + go func() { + for { + n, err := pr.Read(buf) + if err != nil && err != io.EOF { + r.CloseWithError(err) + return + } + + if _, err := dst.Write(buf[:n]); err != nil { + r.CloseWithError(err) + return + } + } + }() + + return w, err +} + type unpadReader struct { src io.Reader @@ -86,7 +119,9 @@ func (r *unpadReader) Read(out []byte) (int, error) { return 0, io.EOF } - outTwoPow := 1 << (63 - bits.LeadingZeros64(uint64(len(out)))) + chunks := len(out) / 127 + + outTwoPow := 1 << (63 - bits.LeadingZeros64(uint64(chunks*128))) if err := abi.PaddedPieceSize(outTwoPow).Validate(); err != nil { return 0, xerrors.Errorf("output must be of valid padded piece size: %w", err) diff --git a/storiface/ffi.go b/storiface/ffi.go index 6821f9b35..6e16018f0 100644 --- a/storiface/ffi.go +++ b/storiface/ffi.go @@ -1,7 +1,17 @@ package storiface -import "errors" +import ( + "errors" + + "github.com/filecoin-project/specs-actors/actors/abi" +) var ErrSectorNotFound = errors.New("sector not found") type UnpaddedByteIndex uint64 + +func (i UnpaddedByteIndex) Padded() PaddedByteIndex { + return PaddedByteIndex(abi.UnpaddedPieceSize(i).Padded()) +} + +type PaddedByteIndex uint64