wireup fr32 padding to the unsealed file

This commit is contained in:
Łukasz Magiera 2020-05-29 01:33:00 +02:00
parent 55867ab48b
commit 2a70ff3cf3
6 changed files with 96 additions and 38 deletions

View File

@ -24,7 +24,7 @@ const veryLargeRle = 1 << 20
// [unpadded (raw) data][rle+][4B LE length fo the rle+ field]
type partialFile struct {
maxPiece abi.UnpaddedPieceSize
maxPiece abi.PaddedPieceSize
path string
allocated rlepluslazy.RLE
@ -55,7 +55,7 @@ func writeTrailer(maxPieceSize int64, w *os.File, r rlepluslazy.RunIterator) err
return w.Truncate(maxPieceSize + int64(rb) + 4)
}
func createPartialFile(maxPieceSize abi.UnpaddedPieceSize, path string) (*partialFile, error) {
func createPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialFile, error) {
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, xerrors.Errorf("openning partial file '%s': %w", path, err)
@ -84,7 +84,7 @@ func createPartialFile(maxPieceSize abi.UnpaddedPieceSize, path string) (*partia
return openPartialFile(maxPieceSize, path)
}
func openPartialFile(maxPieceSize abi.UnpaddedPieceSize, path string) (*partialFile, error) {
func openPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialFile, error) {
f, err := os.OpenFile(path, os.O_RDWR, 0644)
if err != nil {
return nil, xerrors.Errorf("openning partial file '%s': %w", path, err)
@ -164,7 +164,7 @@ func (pf *partialFile) Close() error {
return pf.file.Close()
}
func (pf *partialFile) Writer(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.Writer, error) {
func (pf *partialFile) Writer(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Writer, error) {
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek piece start: %w", err)
}
@ -193,7 +193,7 @@ func (pf *partialFile) Writer(offset storiface.UnpaddedByteIndex, size abi.Unpad
return pf.file, nil
}
func (pf *partialFile) MarkAllocated(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
func (pf *partialFile) MarkAllocated(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error {
have, err := pf.allocated.RunIterator()
if err != nil {
return err
@ -211,7 +211,7 @@ func (pf *partialFile) MarkAllocated(offset storiface.UnpaddedByteIndex, size ab
return nil
}
func (pf *partialFile) Reader(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (*os.File, error) {
func (pf *partialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) {
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek piece start: %w", err)
}
@ -244,7 +244,7 @@ func (pf *partialFile) Allocated() (rlepluslazy.RunIterator, error) {
return pf.allocated.RunIterator()
}
func pieceRun(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) rlepluslazy.RunIterator {
func pieceRun(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) rlepluslazy.RunIterator {
var runs []rlepluslazy.Run
if offset > 0 {
runs = append(runs, rlepluslazy.Run{

View File

@ -8,7 +8,6 @@ import (
"io/ioutil"
"math/bits"
"os"
"path/filepath"
"syscall"
"github.com/ipfs/go-cid"
@ -18,6 +17,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/sector-storage/fr32"
"github.com/filecoin-project/sector-storage/stores"
"github.com/filecoin-project/sector-storage/storiface"
"github.com/filecoin-project/sector-storage/zerocomm"
@ -55,9 +55,9 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
offset += size
}
maxPieceSize := abi.PaddedPieceSize(sb.ssize).Unpadded()
maxPieceSize := abi.PaddedPieceSize(sb.ssize)
if offset+pieceSize > maxPieceSize {
if offset.Padded()+pieceSize.Padded() > maxPieceSize {
return abi.PieceInfo{}, xerrors.Errorf("can't add %d byte piece to sector %v with %d bytes of existing pieces", pieceSize, sector, offset)
}
@ -100,10 +100,16 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
}
}
w, err := stagedFile.Writer(storiface.UnpaddedByteIndex(offset), pieceSize)
w, err := stagedFile.Writer(storiface.UnpaddedByteIndex(offset).Padded(), pieceSize.Padded())
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("getting partial file writer: %w", err)
}
w, err = fr32.NewPadWriter(w, pieceSize)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("creating padded reader: %w", err)
}
pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), w)
prf, werr, err := ToReadableFile(pr, int64(pieceSize))
if err != nil {
@ -115,7 +121,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
return abi.PieceInfo{}, xerrors.Errorf("generating piece commitment: %w", err)
}
if err := stagedFile.MarkAllocated(storiface.UnpaddedByteIndex(offset), pieceSize); err != nil {
if err := stagedFile.MarkAllocated(storiface.UnpaddedByteIndex(offset).Padded(), pieceSize.Padded()); err != nil {
return abi.PieceInfo{}, xerrors.Errorf("marking data range as allocated: %w", err)
}
@ -137,7 +143,7 @@ func (cf closerFunc) Close() error {
}
func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error {
maxPieceSize := abi.PaddedPieceSize(sb.ssize).Unpadded()
maxPieceSize := abi.PaddedPieceSize(sb.ssize)
// try finding existing
unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, false)
@ -188,7 +194,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
}
defer srcDone()
var at, nextat uint64
var at, nextat abi.PaddedPieceSize
for {
piece, err := toUnseal.NextRun()
if err != nil {
@ -196,13 +202,13 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
}
at = nextat
nextat += piece.Len
nextat += abi.PaddedPieceSize(piece.Len)
if !piece.Val {
continue
}
out, err := pf.Writer(offset, size)
out, err := pf.Writer(offset.Padded(), size.Padded())
if err != nil {
return xerrors.Errorf("getting partial file writer: %w", err)
}
@ -241,7 +247,13 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
}
defer outpipe.Close()
_, perr = io.CopyN(out, outpipe, int64(size))
padreader, err := fr32.NewPadReader(outpipe, abi.PaddedPieceSize(piece.Len).Unpadded())
if err != nil {
perr = xerrors.Errorf("creating new padded reader: %w", err)
return
}
_, perr = io.CopyN(out, padreader, int64(size))
}()
}
// </eww>
@ -255,8 +267,8 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
sector.Miner,
randomness,
commd,
at,
piece.Len)
uint64(at.Unpadded()),
uint64(abi.PaddedPieceSize(piece.Len).Unpadded()))
if err != nil {
return xerrors.Errorf("unseal range: %w", err)
}
@ -271,7 +283,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
return xerrors.Errorf("piping output to unsealed file: %w", perr)
}
if err := pf.MarkAllocated(storiface.UnpaddedByteIndex(at), abi.UnpaddedPieceSize(piece.Len)); err != nil {
if err := pf.MarkAllocated(storiface.PaddedByteIndex(at), abi.PaddedPieceSize(piece.Len)); err != nil {
return xerrors.Errorf("marking unsealed range as allocated: %w", err)
}
@ -290,20 +302,25 @@ func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.Se
}
defer done()
maxPieceSize := abi.PaddedPieceSize(sb.ssize).Unpadded()
maxPieceSize := abi.PaddedPieceSize(sb.ssize)
pf, err := openPartialFile(maxPieceSize, path.Unsealed)
if xerrors.Is(err, os.ErrNotExist) {
return xerrors.Errorf("opening partial file: %w", err)
}
f, err := pf.Reader(offset, size)
f, err := pf.Reader(offset.Padded(), size.Padded())
if err != nil {
pf.Close()
return xerrors.Errorf("getting partial file reader: %w", err)
}
if _, err := io.CopyN(writer, f, int64(size)); err != nil {
upr, err := fr32.NewUnpadReader(f, size.Padded())
if err != nil {
return xerrors.Errorf("creating unpadded reader: %w", err)
}
if _, err := io.CopyN(writer, upr, int64(size)); err != nil {
pf.Close()
return xerrors.Errorf("reading unsealed file: %w", err)
}
@ -355,22 +372,22 @@ func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
return nil, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum))
}
staged := filepath.Join(paths.Cache, "staged")
//staged := filepath.Join(paths.Cache, "staged")
if err := sb.rewriteAsPadded(paths.Unsealed, staged); err != nil {
/*if err := sb.rewriteAsPadded(paths.Unsealed, staged); err != nil {
return nil, xerrors.Errorf("rewriting sector as padded: %w", err)
}
defer func() {
}*/
/*defer func() {
if err := os.Remove(staged); err != nil {
log.Warnf("Removing staged sector file(%v): %s", sector, err)
}
}()
*/
// TODO: context cancellation respect
p1o, err := ffi.SealPreCommitPhase1(
sb.sealProofType,
paths.Cache,
staged,
paths.Unsealed,
paths.Sealed,
sector.Number,
sector.Miner,
@ -383,6 +400,7 @@ func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
return p1o, nil
}
/*
func (sb *Sealer) rewriteAsPadded(unsealed string, staged string) error {
maxPieceSize := abi.PaddedPieceSize(sb.ssize).Unpadded()
@ -427,7 +445,7 @@ func (sb *Sealer) rewriteAsPadded(unsealed string, staged string) error {
}
return werr()
}
}*/
func (sb *Sealer) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (storage.SectorCids, error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, 0, true)

View File

@ -16,7 +16,7 @@ const mergeGaps = 32 << 20
// TODO const expandRuns = 16 << 20 // unseal more than requested for future requests
func computeUnsealRanges(unsealed rlepluslazy.RunIterator, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (rlepluslazy.RunIterator, error) {
todo := pieceRun(offset, size)
todo := pieceRun(offset.Padded(), size.Padded())
todo, err := rlepluslazy.Subtract(todo, unsealed)
if err != nil {
return nil, xerrors.Errorf("compute todo-unsealed: %w", err)

View File

@ -55,11 +55,6 @@ func Pad(in, out []byte) {
}
func pad(in, out []byte) {
if len(out) > int(mtTresh) {
mt(in, out, len(out), Pad)
return
}
chunks := len(out) / 128
for chunk := 0; chunk < chunks; chunk++ {
inOff := chunk * 127

View File

@ -59,6 +59,39 @@ func (r *padReader) Read(out []byte) (int, error) {
return int(todo.Padded()), err
}
func NewPadWriter(dst io.Writer, sz abi.UnpaddedPieceSize) (io.Writer, error) {
if err := sz.Validate(); err != nil {
return nil, xerrors.Errorf("bad piece size: %w", err)
}
buf := make([]byte, mtTresh*mtChunkCount(sz.Padded()))
// TODO: Real writer
r, w := io.Pipe()
pr, err := NewPadReader(r, sz)
if err != nil {
return nil, err
}
go func() {
for {
n, err := pr.Read(buf)
if err != nil && err != io.EOF {
r.CloseWithError(err)
return
}
if _, err := dst.Write(buf[:n]); err != nil {
r.CloseWithError(err)
return
}
}
}()
return w, err
}
type unpadReader struct {
src io.Reader
@ -86,7 +119,9 @@ func (r *unpadReader) Read(out []byte) (int, error) {
return 0, io.EOF
}
outTwoPow := 1 << (63 - bits.LeadingZeros64(uint64(len(out))))
chunks := len(out) / 127
outTwoPow := 1 << (63 - bits.LeadingZeros64(uint64(chunks*128)))
if err := abi.PaddedPieceSize(outTwoPow).Validate(); err != nil {
return 0, xerrors.Errorf("output must be of valid padded piece size: %w", err)

View File

@ -1,7 +1,17 @@
package storiface
import "errors"
import (
"errors"
"github.com/filecoin-project/specs-actors/actors/abi"
)
var ErrSectorNotFound = errors.New("sector not found")
type UnpaddedByteIndex uint64
func (i UnpaddedByteIndex) Padded() PaddedByteIndex {
return PaddedByteIndex(abi.UnpaddedPieceSize(i).Padded())
}
type PaddedByteIndex uint64