diff --git a/ffiwrapper/sealer_cgo.go b/ffiwrapper/sealer_cgo.go index 3b20f33bc..74ebeff40 100644 --- a/ffiwrapper/sealer_cgo.go +++ b/ffiwrapper/sealer_cgo.go @@ -4,16 +4,19 @@ package ffiwrapper import ( "bufio" + "bytes" "context" "io" "math/bits" "os" "runtime" + "sync" "github.com/ipfs/go-cid" "golang.org/x/xerrors" ffi "github.com/filecoin-project/filecoin-ffi" + commcid "github.com/filecoin-project/go-fil-commcid" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-storage/storage" @@ -105,20 +108,58 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie return abi.PieceInfo{}, xerrors.Errorf("getting partial file writer: %w", err) } - pw, err := fr32.NewPadWriter(w) - if err != nil { - return abi.PieceInfo{}, xerrors.Errorf("creating padded reader: %w", err) - } + pw := fr32.NewPadWriter(w) pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), pw) - prf, werr, err := ToReadableFile(pr, int64(pieceSize)) - if err != nil { - return abi.PieceInfo{}, xerrors.Errorf("getting tee reader pipe: %w", err) - } - pieceCID, err := ffi.GeneratePieceCIDFromFile(sb.sealProofType, prf, pieceSize) - if err != nil { - return abi.PieceInfo{}, xerrors.Errorf("generating piece commitment: %w", err) + thr := 1 << bits.Len32(uint32(runtime.NumCPU())) + chunk := abi.PaddedPieceSize(4 << 20) + var wg sync.WaitGroup + + buf := make([]byte, (chunk * abi.PaddedPieceSize(thr)).Unpadded()) + var pieceCids []abi.PieceInfo + + for { + n, err := pr.Read(buf[:]) + if err != nil && err != io.EOF { + return abi.PieceInfo{}, xerrors.Errorf("pr read error: %w", err) + } + if err == io.EOF { + break + } + + wg.Add(n/int(chunk)) + res := make([]interface{}, n/int(chunk)) + + for i := 0; i < n/int(chunk); i++ { + go func(i int) { + defer wg.Done() + + b := buf[i*int(chunk.Unpadded()):((i+1)*int(chunk.Unpadded()))] + + c, err := sb.pieceCid(b) + if err != nil { + res[i] = err + return + } + res[i] = abi.PieceInfo{ + Size: abi.UnpaddedPieceSize(len(b)).Padded(), + PieceCID: c, + } + }(i) + } + wg.Wait() + + for _, r := range res { + switch r := r.(type) { + case abi.PieceInfo: + pieceCids = append(pieceCids, r) + case error: + return abi.PieceInfo{}, xerrors.Errorf("pieceCid error: %w", r) + default: + return abi.PieceInfo{}, xerrors.Errorf("pieceCid mystery result: %v", r) + } + } } if err := pw.Close(); err != nil { @@ -134,16 +175,40 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie } stagedFile = nil + if len(pieceCids) == 1 { + return pieceCids[0], nil + } + + pieceCID, err := ffi.GenerateUnsealedCID(sb.sealProofType, pieceCids) + if err != nil { + return abi.PieceInfo{}, xerrors.Errorf("generate unsealed CID: %w", err) + } + + commp, err := commcid.CIDToDataCommitmentV1(pieceCID) + if err != nil { + return abi.PieceInfo{}, err + } + return abi.PieceInfo{ Size: pieceSize.Padded(), - PieceCID: pieceCID, - }, werr() + PieceCID: commcid.PieceCommitmentV1ToCID(commp), + }, nil } -type closerFunc func() error +func (sb *Sealer) pieceCid(in []byte) (cid.Cid, error) { + prf, werr, err := ToReadableFile(bytes.NewReader(in), int64(len(in))) + if err != nil { + return cid.Undef, xerrors.Errorf("getting tee reader pipe: %w", err) + } -func (cf closerFunc) Close() error { - return cf() + pieceCID, err := ffi.GeneratePieceCIDFromFile(sb.sealProofType, prf, abi.UnpaddedPieceSize(len(in))) + if err != nil { + return cid.Undef, xerrors.Errorf("generating piece commitment: %w", err) + } + + prf.Close() + + return pieceCID, werr() } func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error { @@ -237,9 +302,9 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s defer close(outWait) defer opr.Close() - padreader, err := fr32.NewPadReader(opr, abi.PaddedPieceSize(piece.Len).Unpadded()) + padwriter := fr32.NewPadWriter(out) if err != nil { - perr = xerrors.Errorf("creating new padded reader: %w", err) + perr = xerrors.Errorf("creating new padded writer: %w", err) return } @@ -248,9 +313,23 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s bsize = uint64(runtime.NumCPU()) * fr32.MTTresh } - padreader = bufio.NewReaderSize(padreader, int(bsize)) + bw := bufio.NewWriterSize(padwriter, int(bsize)) - _, perr = io.CopyN(out, padreader, int64(size.Padded())) + _, err = io.CopyN(bw, opr, int64(size.Padded())) + if err != nil { + perr = xerrors.Errorf("copying data: %w", err) + return + } + + if err := bw.Flush(); err != nil { + perr = xerrors.Errorf("flushing unpadded data: %w", err) + return + } + + if err := padwriter.Close(); err != nil { + perr = xerrors.Errorf("closing padwriter: %w", err) + return + } }() } // diff --git a/fr32/fr32.go b/fr32/fr32.go index 08ecb767c..fdf9d9223 100644 --- a/fr32/fr32.go +++ b/fr32/fr32.go @@ -13,13 +13,13 @@ var MTTresh = uint64(32 << 20) func mtChunkCount(usz abi.PaddedPieceSize) uint64 { threads := (uint64(usz)) / MTTresh if threads > uint64(runtime.NumCPU()) { - threads = 1 << (32 - bits.LeadingZeros32(uint32(runtime.NumCPU()))) + threads = 1 << (bits.Len32(uint32(runtime.NumCPU()))) } if threads == 0 { return 1 } - if threads > 64 { - return 64 // avoid too large buffers + if threads > 32 { + return 32 // avoid too large buffers } return threads } diff --git a/fr32/readers.go b/fr32/readers.go index f974f2cd1..8a1bbe087 100644 --- a/fr32/readers.go +++ b/fr32/readers.go @@ -9,56 +9,6 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" ) -type padReader struct { - src io.Reader - - left uint64 - work []byte -} - -func NewPadReader(src io.Reader, sz abi.UnpaddedPieceSize) (io.Reader, error) { - if err := sz.Validate(); err != nil { - return nil, xerrors.Errorf("bad piece size: %w", err) - } - - buf := make([]byte, MTTresh*mtChunkCount(sz.Padded())) - - return &padReader{ - src: src, - - left: uint64(sz.Padded()), - work: buf, - }, nil -} - -func (r *padReader) Read(out []byte) (int, error) { - if r.left == 0 { - return 0, io.EOF - } - - outTwoPow := 1 << (63 - bits.LeadingZeros64(uint64(len(out)))) - - if err := abi.PaddedPieceSize(outTwoPow).Validate(); err != nil { - return 0, xerrors.Errorf("output must be of valid padded piece size: %w", err) - } - - todo := abi.PaddedPieceSize(outTwoPow).Unpadded() - if r.left < uint64(todo.Padded()) { - todo = abi.PaddedPieceSize(1 << (63 - bits.LeadingZeros64(r.left))).Unpadded() - } - - r.left -= uint64(todo.Padded()) - - n, err := r.src.Read(r.work[:todo]) - if err != nil && err != io.EOF { - return n, err - } - - Pad(r.work[:todo], out[:todo.Padded()]) - - return int(todo.Padded()), err -} - type unpadReader struct { src io.Reader @@ -122,10 +72,10 @@ type padWriter struct { work []byte } -func NewPadWriter(dst io.Writer) (io.WriteCloser, error) { +func NewPadWriter(dst io.Writer) io.WriteCloser { return &padWriter{ dst: dst, - }, nil + } } func (w *padWriter) Write(p []byte) (int, error) {