diff --git a/ffiwrapper/sealer_cgo.go b/ffiwrapper/sealer_cgo.go index aace82c44..d7c03ff58 100644 --- a/ffiwrapper/sealer_cgo.go +++ b/ffiwrapper/sealer_cgo.go @@ -3,11 +3,13 @@ package ffiwrapper import ( + "bufio" "context" "io" "io/ioutil" "math/bits" "os" + "runtime" "syscall" "github.com/ipfs/go-cid" @@ -105,12 +107,12 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie return abi.PieceInfo{}, xerrors.Errorf("getting partial file writer: %w", err) } - w, err = fr32.NewPadWriter(w, pieceSize) + pw, err := fr32.NewPadWriter(w) if err != nil { return abi.PieceInfo{}, xerrors.Errorf("creating padded reader: %w", err) } - pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), w) + pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), pw) prf, werr, err := ToReadableFile(pr, int64(pieceSize)) if err != nil { return abi.PieceInfo{}, xerrors.Errorf("getting tee reader pipe: %w", err) @@ -121,6 +123,10 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie return abi.PieceInfo{}, xerrors.Errorf("generating piece commitment: %w", err) } + if err := pw.Close(); err != nil { + return abi.PieceInfo{}, xerrors.Errorf("closing padded writer: %w", err) + } + if err := stagedFile.MarkAllocated(storiface.UnpaddedByteIndex(offset).Padded(), pieceSize.Padded()); err != nil { return abi.PieceInfo{}, xerrors.Errorf("marking data range as allocated: %w", err) } @@ -253,7 +259,14 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s return } - _, perr = io.CopyN(out, padreader, int64(size)) + bsize := uint64(size.Padded()) + if bsize > uint64(runtime.NumCPU())*fr32.MTTresh { + bsize = uint64(runtime.NumCPU()) * fr32.MTTresh + } + + padreader = bufio.NewReaderSize(padreader, int(bsize)) + + _, perr = io.CopyN(out, padreader, int64(size.Padded())) }() } // diff --git a/ffiwrapper/sealer_test.go b/ffiwrapper/sealer_test.go index 9af563dc3..fdc7db5c3 100644 --- a/ffiwrapper/sealer_test.go +++ b/ffiwrapper/sealer_test.go @@ -14,6 +14,7 @@ import ( "time" logging "github.com/ipfs/go-log" + "github.com/stretchr/testify/require" "golang.org/x/xerrors" ffi "github.com/filecoin-project/filecoin-ffi" @@ -29,8 +30,9 @@ func init() { logging.SetLogLevel("*", "DEBUG") //nolint: errcheck } -var sectorSize = abi.SectorSize(2048) var sealProofType = abi.RegisteredProof_StackedDRG2KiBSeal +var sectorSize, _ = sealProofType.SectorSize() + var sealRand = abi.SealRandomness{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2} type seal struct { @@ -139,9 +141,7 @@ func (s *seal) unseal(t *testing.T, sb *Sealer, sp *basicfs.Provider, si abi.Sec } expect, _ = ioutil.ReadAll(data(si.Number, 1016)) - if !bytes.Equal(b.Bytes(), expect) { - t.Fatal("read wrong bytes") - } + require.Equal(t, expect, b.Bytes()) b.Reset() err = sb.ReadPiece(context.TODO(), &b, si, 0, 2032) diff --git a/fr32/fr32.go b/fr32/fr32.go index ab47311d6..08ecb767c 100644 --- a/fr32/fr32.go +++ b/fr32/fr32.go @@ -8,10 +8,10 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" ) -var mtTresh = uint64(32 << 20) +var MTTresh = uint64(32 << 20) func mtChunkCount(usz abi.PaddedPieceSize) uint64 { - threads := (uint64(usz)) / mtTresh + threads := (uint64(usz)) / MTTresh if threads > uint64(runtime.NumCPU()) { threads = 1 << (32 - bits.LeadingZeros32(uint32(runtime.NumCPU()))) } @@ -46,7 +46,7 @@ func mt(in, out []byte, padLen int, op func(unpadded, padded []byte)) { // Assumes len(in)%127==0 and len(out)%128==0 func Pad(in, out []byte) { - if len(out) > int(mtTresh) { + if len(out) > int(MTTresh) { mt(in, out, len(out), pad) return } @@ -96,7 +96,7 @@ func pad(in, out []byte) { // Assumes len(in)%128==0 and len(out)%127==0 func Unpad(in []byte, out []byte) { - if len(in) > int(mtTresh) { + if len(in) > int(MTTresh) { mt(out, in, len(in), unpad) return } diff --git a/fr32/readers.go b/fr32/readers.go index 4e056b0e7..f974f2cd1 100644 --- a/fr32/readers.go +++ b/fr32/readers.go @@ -21,7 +21,7 @@ func NewPadReader(src io.Reader, sz abi.UnpaddedPieceSize) (io.Reader, error) { return nil, xerrors.Errorf("bad piece size: %w", err) } - buf := make([]byte, mtTresh*mtChunkCount(sz.Padded())) + buf := make([]byte, MTTresh*mtChunkCount(sz.Padded())) return &padReader{ src: src, @@ -59,39 +59,6 @@ func (r *padReader) Read(out []byte) (int, error) { return int(todo.Padded()), err } -func NewPadWriter(dst io.Writer, sz abi.UnpaddedPieceSize) (io.Writer, error) { - if err := sz.Validate(); err != nil { - return nil, xerrors.Errorf("bad piece size: %w", err) - } - - buf := make([]byte, mtTresh*mtChunkCount(sz.Padded())) - - // TODO: Real writer - r, w := io.Pipe() - - pr, err := NewPadReader(r, sz) - if err != nil { - return nil, err - } - - go func() { - for { - n, err := pr.Read(buf) - if err != nil && err != io.EOF { - r.CloseWithError(err) - return - } - - if _, err := dst.Write(buf[:n]); err != nil { - r.CloseWithError(err) - return - } - } - }() - - return w, err -} - type unpadReader struct { src io.Reader @@ -104,7 +71,7 @@ func NewUnpadReader(src io.Reader, sz abi.PaddedPieceSize) (io.Reader, error) { return nil, xerrors.Errorf("bad piece size: %w", err) } - buf := make([]byte, mtTresh*mtChunkCount(sz)) + buf := make([]byte, MTTresh*mtChunkCount(sz)) return &unpadReader{ src: src, @@ -147,3 +114,70 @@ func (r *unpadReader) Read(out []byte) (int, error) { return int(todo.Unpadded()), err } + +type padWriter struct { + dst io.Writer + + stash []byte + work []byte +} + +func NewPadWriter(dst io.Writer) (io.WriteCloser, error) { + return &padWriter{ + dst: dst, + }, nil +} + +func (w *padWriter) Write(p []byte) (int, error) { + in := p + + if len(p)+len(w.stash) < 127 { + w.stash = append(w.stash, p...) + return len(p), nil + } + + if len(w.stash) != 0 { + in = append(w.stash, in...) + } + + for { + pieces := subPieces(abi.UnpaddedPieceSize(len(in))) + biggest := pieces[len(pieces)-1] + + if abi.PaddedPieceSize(cap(w.work)) < biggest.Padded() { + w.work = make([]byte, 0, biggest.Padded()) + } + + Pad(in[:int(biggest)], w.work[:int(biggest.Padded())]) + + n, err := w.dst.Write(w.work[:int(biggest.Padded())]) + if err != nil { + return int(abi.PaddedPieceSize(n).Unpadded()), err + } + + in = in[biggest:] + + if len(in) < 127 { + if cap(w.stash) < len(in) { + w.stash = make([]byte, 0, len(in)) + } + w.stash = w.stash[:len(in)] + copy(w.stash, in) + + return len(p), nil + } + } +} + +func (w *padWriter) Close() error { + if len(w.stash) > 0 { + return xerrors.Errorf("still have %d unprocessed bytes", len(w.stash)) + } + + // allow gc + w.stash = nil + w.work = nil + w.dst = nil + + return nil +} diff --git a/fr32/utils.go b/fr32/utils.go new file mode 100644 index 000000000..9f4093c40 --- /dev/null +++ b/fr32/utils.go @@ -0,0 +1,31 @@ +package fr32 + +import ( + "math/bits" + + "github.com/filecoin-project/specs-actors/actors/abi" +) + +func subPieces(in abi.UnpaddedPieceSize) []abi.UnpaddedPieceSize { + // Convert to in-sector bytes for easier math: + // + // (we convert to sector bytes as they are nice round binary numbers) + + w := uint64(in.Padded()) + + out := make([]abi.UnpaddedPieceSize, bits.OnesCount64(w)) + for i := range out { + // Extract the next lowest non-zero bit + next := bits.TrailingZeros64(w) + psize := uint64(1) << next + // e.g: if the number is 0b010100, psize will be 0b000100 + + // set that bit to 0 by XORing it, so the next iteration looks at the + // next bit + w ^= psize + + // Add the piece size to the list of pieces we need to create + out[i] = abi.PaddedPieceSize(psize).Unpadded() + } + return out +}