wip fixing addpiece

This commit is contained in:
Łukasz Magiera 2020-06-09 11:13:23 +02:00
parent 796af1351c
commit 3cd79b6fec
3 changed files with 104 additions and 75 deletions

View File

@ -4,16 +4,19 @@ package ffiwrapper
import ( import (
"bufio" "bufio"
"bytes"
"context" "context"
"io" "io"
"math/bits" "math/bits"
"os" "os"
"runtime" "runtime"
"sync"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"golang.org/x/xerrors" "golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi" ffi "github.com/filecoin-project/filecoin-ffi"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
@ -105,20 +108,58 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
return abi.PieceInfo{}, xerrors.Errorf("getting partial file writer: %w", err) return abi.PieceInfo{}, xerrors.Errorf("getting partial file writer: %w", err)
} }
pw, err := fr32.NewPadWriter(w) pw := fr32.NewPadWriter(w)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("creating padded reader: %w", err)
}
pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), pw) pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), pw)
prf, werr, err := ToReadableFile(pr, int64(pieceSize))
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("getting tee reader pipe: %w", err)
}
pieceCID, err := ffi.GeneratePieceCIDFromFile(sb.sealProofType, prf, pieceSize) thr := 1 << bits.Len32(uint32(runtime.NumCPU()))
if err != nil { chunk := abi.PaddedPieceSize(4 << 20)
return abi.PieceInfo{}, xerrors.Errorf("generating piece commitment: %w", err) var wg sync.WaitGroup
buf := make([]byte, (chunk * abi.PaddedPieceSize(thr)).Unpadded())
var pieceCids []abi.PieceInfo
for {
n, err := pr.Read(buf[:])
if err != nil && err != io.EOF {
return abi.PieceInfo{}, xerrors.Errorf("pr read error: %w", err)
}
if err == io.EOF {
break
}
wg.Add(n/int(chunk))
res := make([]interface{}, n/int(chunk))
for i := 0; i < n/int(chunk); i++ {
go func(i int) {
defer wg.Done()
b := buf[i*int(chunk.Unpadded()):((i+1)*int(chunk.Unpadded()))]
c, err := sb.pieceCid(b)
if err != nil {
res[i] = err
return
}
res[i] = abi.PieceInfo{
Size: abi.UnpaddedPieceSize(len(b)).Padded(),
PieceCID: c,
}
}(i)
}
wg.Wait()
for _, r := range res {
switch r := r.(type) {
case abi.PieceInfo:
pieceCids = append(pieceCids, r)
case error:
return abi.PieceInfo{}, xerrors.Errorf("pieceCid error: %w", r)
default:
return abi.PieceInfo{}, xerrors.Errorf("pieceCid mystery result: %v", r)
}
}
} }
if err := pw.Close(); err != nil { if err := pw.Close(); err != nil {
@ -134,16 +175,40 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
} }
stagedFile = nil stagedFile = nil
if len(pieceCids) == 1 {
return pieceCids[0], nil
}
pieceCID, err := ffi.GenerateUnsealedCID(sb.sealProofType, pieceCids)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("generate unsealed CID: %w", err)
}
commp, err := commcid.CIDToDataCommitmentV1(pieceCID)
if err != nil {
return abi.PieceInfo{}, err
}
return abi.PieceInfo{ return abi.PieceInfo{
Size: pieceSize.Padded(), Size: pieceSize.Padded(),
PieceCID: pieceCID, PieceCID: commcid.PieceCommitmentV1ToCID(commp),
}, werr() }, nil
} }
type closerFunc func() error func (sb *Sealer) pieceCid(in []byte) (cid.Cid, error) {
prf, werr, err := ToReadableFile(bytes.NewReader(in), int64(len(in)))
if err != nil {
return cid.Undef, xerrors.Errorf("getting tee reader pipe: %w", err)
}
func (cf closerFunc) Close() error { pieceCID, err := ffi.GeneratePieceCIDFromFile(sb.sealProofType, prf, abi.UnpaddedPieceSize(len(in)))
return cf() if err != nil {
return cid.Undef, xerrors.Errorf("generating piece commitment: %w", err)
}
prf.Close()
return pieceCID, werr()
} }
func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error { func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error {
@ -237,9 +302,9 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
defer close(outWait) defer close(outWait)
defer opr.Close() defer opr.Close()
padreader, err := fr32.NewPadReader(opr, abi.PaddedPieceSize(piece.Len).Unpadded()) padwriter := fr32.NewPadWriter(out)
if err != nil { if err != nil {
perr = xerrors.Errorf("creating new padded reader: %w", err) perr = xerrors.Errorf("creating new padded writer: %w", err)
return return
} }
@ -248,9 +313,23 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
bsize = uint64(runtime.NumCPU()) * fr32.MTTresh bsize = uint64(runtime.NumCPU()) * fr32.MTTresh
} }
padreader = bufio.NewReaderSize(padreader, int(bsize)) bw := bufio.NewWriterSize(padwriter, int(bsize))
_, perr = io.CopyN(out, padreader, int64(size.Padded())) _, err = io.CopyN(bw, opr, int64(size.Padded()))
if err != nil {
perr = xerrors.Errorf("copying data: %w", err)
return
}
if err := bw.Flush(); err != nil {
perr = xerrors.Errorf("flushing unpadded data: %w", err)
return
}
if err := padwriter.Close(); err != nil {
perr = xerrors.Errorf("closing padwriter: %w", err)
return
}
}() }()
} }
// </eww> // </eww>

View File

@ -13,13 +13,13 @@ var MTTresh = uint64(32 << 20)
func mtChunkCount(usz abi.PaddedPieceSize) uint64 { func mtChunkCount(usz abi.PaddedPieceSize) uint64 {
threads := (uint64(usz)) / MTTresh threads := (uint64(usz)) / MTTresh
if threads > uint64(runtime.NumCPU()) { if threads > uint64(runtime.NumCPU()) {
threads = 1 << (32 - bits.LeadingZeros32(uint32(runtime.NumCPU()))) threads = 1 << (bits.Len32(uint32(runtime.NumCPU())))
} }
if threads == 0 { if threads == 0 {
return 1 return 1
} }
if threads > 64 { if threads > 32 {
return 64 // avoid too large buffers return 32 // avoid too large buffers
} }
return threads return threads
} }

View File

@ -9,56 +9,6 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
) )
type padReader struct {
src io.Reader
left uint64
work []byte
}
func NewPadReader(src io.Reader, sz abi.UnpaddedPieceSize) (io.Reader, error) {
if err := sz.Validate(); err != nil {
return nil, xerrors.Errorf("bad piece size: %w", err)
}
buf := make([]byte, MTTresh*mtChunkCount(sz.Padded()))
return &padReader{
src: src,
left: uint64(sz.Padded()),
work: buf,
}, nil
}
func (r *padReader) Read(out []byte) (int, error) {
if r.left == 0 {
return 0, io.EOF
}
outTwoPow := 1 << (63 - bits.LeadingZeros64(uint64(len(out))))
if err := abi.PaddedPieceSize(outTwoPow).Validate(); err != nil {
return 0, xerrors.Errorf("output must be of valid padded piece size: %w", err)
}
todo := abi.PaddedPieceSize(outTwoPow).Unpadded()
if r.left < uint64(todo.Padded()) {
todo = abi.PaddedPieceSize(1 << (63 - bits.LeadingZeros64(r.left))).Unpadded()
}
r.left -= uint64(todo.Padded())
n, err := r.src.Read(r.work[:todo])
if err != nil && err != io.EOF {
return n, err
}
Pad(r.work[:todo], out[:todo.Padded()])
return int(todo.Padded()), err
}
type unpadReader struct { type unpadReader struct {
src io.Reader src io.Reader
@ -122,10 +72,10 @@ type padWriter struct {
work []byte work []byte
} }
func NewPadWriter(dst io.Writer) (io.WriteCloser, error) { func NewPadWriter(dst io.Writer) io.WriteCloser {
return &padWriter{ return &padWriter{
dst: dst, dst: dst,
}, nil }
} }
func (w *padWriter) Write(p []byte) (int, error) { func (w *padWriter) Write(p []byte) (int, error) {