fr32: real PadWriter

This commit is contained in:
Łukasz Magiera 2020-05-29 17:21:10 +02:00
parent 2a70ff3cf3
commit 3b698db127
5 changed files with 124 additions and 46 deletions

View File

@ -3,11 +3,13 @@
package ffiwrapper
import (
"bufio"
"context"
"io"
"io/ioutil"
"math/bits"
"os"
"runtime"
"syscall"
"github.com/ipfs/go-cid"
@ -105,12 +107,12 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
return abi.PieceInfo{}, xerrors.Errorf("getting partial file writer: %w", err)
}
w, err = fr32.NewPadWriter(w, pieceSize)
pw, err := fr32.NewPadWriter(w)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("creating padded reader: %w", err)
}
pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), w)
pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), pw)
prf, werr, err := ToReadableFile(pr, int64(pieceSize))
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("getting tee reader pipe: %w", err)
@ -121,6 +123,10 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
return abi.PieceInfo{}, xerrors.Errorf("generating piece commitment: %w", err)
}
if err := pw.Close(); err != nil {
return abi.PieceInfo{}, xerrors.Errorf("closing padded writer: %w", err)
}
if err := stagedFile.MarkAllocated(storiface.UnpaddedByteIndex(offset).Padded(), pieceSize.Padded()); err != nil {
return abi.PieceInfo{}, xerrors.Errorf("marking data range as allocated: %w", err)
}
@ -253,7 +259,14 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
return
}
_, perr = io.CopyN(out, padreader, int64(size))
bsize := uint64(size.Padded())
if bsize > uint64(runtime.NumCPU())*fr32.MTTresh {
bsize = uint64(runtime.NumCPU()) * fr32.MTTresh
}
padreader = bufio.NewReaderSize(padreader, int(bsize))
_, perr = io.CopyN(out, padreader, int64(size.Padded()))
}()
}
// </eww>

View File

@ -14,6 +14,7 @@ import (
"time"
logging "github.com/ipfs/go-log"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi"
@ -29,8 +30,9 @@ func init() {
logging.SetLogLevel("*", "DEBUG") //nolint: errcheck
}
var sectorSize = abi.SectorSize(2048)
var sealProofType = abi.RegisteredProof_StackedDRG2KiBSeal
var sectorSize, _ = sealProofType.SectorSize()
var sealRand = abi.SealRandomness{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2}
type seal struct {
@ -139,9 +141,7 @@ func (s *seal) unseal(t *testing.T, sb *Sealer, sp *basicfs.Provider, si abi.Sec
}
expect, _ = ioutil.ReadAll(data(si.Number, 1016))
if !bytes.Equal(b.Bytes(), expect) {
t.Fatal("read wrong bytes")
}
require.Equal(t, expect, b.Bytes())
b.Reset()
err = sb.ReadPiece(context.TODO(), &b, si, 0, 2032)

View File

@ -8,10 +8,10 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi"
)
var mtTresh = uint64(32 << 20)
var MTTresh = uint64(32 << 20)
func mtChunkCount(usz abi.PaddedPieceSize) uint64 {
threads := (uint64(usz)) / mtTresh
threads := (uint64(usz)) / MTTresh
if threads > uint64(runtime.NumCPU()) {
threads = 1 << (32 - bits.LeadingZeros32(uint32(runtime.NumCPU())))
}
@ -46,7 +46,7 @@ func mt(in, out []byte, padLen int, op func(unpadded, padded []byte)) {
// Assumes len(in)%127==0 and len(out)%128==0
func Pad(in, out []byte) {
if len(out) > int(mtTresh) {
if len(out) > int(MTTresh) {
mt(in, out, len(out), pad)
return
}
@ -96,7 +96,7 @@ func pad(in, out []byte) {
// Assumes len(in)%128==0 and len(out)%127==0
func Unpad(in []byte, out []byte) {
if len(in) > int(mtTresh) {
if len(in) > int(MTTresh) {
mt(out, in, len(in), unpad)
return
}

View File

@ -21,7 +21,7 @@ func NewPadReader(src io.Reader, sz abi.UnpaddedPieceSize) (io.Reader, error) {
return nil, xerrors.Errorf("bad piece size: %w", err)
}
buf := make([]byte, mtTresh*mtChunkCount(sz.Padded()))
buf := make([]byte, MTTresh*mtChunkCount(sz.Padded()))
return &padReader{
src: src,
@ -59,39 +59,6 @@ func (r *padReader) Read(out []byte) (int, error) {
return int(todo.Padded()), err
}
func NewPadWriter(dst io.Writer, sz abi.UnpaddedPieceSize) (io.Writer, error) {
if err := sz.Validate(); err != nil {
return nil, xerrors.Errorf("bad piece size: %w", err)
}
buf := make([]byte, mtTresh*mtChunkCount(sz.Padded()))
// TODO: Real writer
r, w := io.Pipe()
pr, err := NewPadReader(r, sz)
if err != nil {
return nil, err
}
go func() {
for {
n, err := pr.Read(buf)
if err != nil && err != io.EOF {
r.CloseWithError(err)
return
}
if _, err := dst.Write(buf[:n]); err != nil {
r.CloseWithError(err)
return
}
}
}()
return w, err
}
type unpadReader struct {
src io.Reader
@ -104,7 +71,7 @@ func NewUnpadReader(src io.Reader, sz abi.PaddedPieceSize) (io.Reader, error) {
return nil, xerrors.Errorf("bad piece size: %w", err)
}
buf := make([]byte, mtTresh*mtChunkCount(sz))
buf := make([]byte, MTTresh*mtChunkCount(sz))
return &unpadReader{
src: src,
@ -147,3 +114,70 @@ func (r *unpadReader) Read(out []byte) (int, error) {
return int(todo.Unpadded()), err
}
type padWriter struct {
dst io.Writer
stash []byte
work []byte
}
func NewPadWriter(dst io.Writer) (io.WriteCloser, error) {
return &padWriter{
dst: dst,
}, nil
}
func (w *padWriter) Write(p []byte) (int, error) {
in := p
if len(p)+len(w.stash) < 127 {
w.stash = append(w.stash, p...)
return len(p), nil
}
if len(w.stash) != 0 {
in = append(w.stash, in...)
}
for {
pieces := subPieces(abi.UnpaddedPieceSize(len(in)))
biggest := pieces[len(pieces)-1]
if abi.PaddedPieceSize(cap(w.work)) < biggest.Padded() {
w.work = make([]byte, 0, biggest.Padded())
}
Pad(in[:int(biggest)], w.work[:int(biggest.Padded())])
n, err := w.dst.Write(w.work[:int(biggest.Padded())])
if err != nil {
return int(abi.PaddedPieceSize(n).Unpadded()), err
}
in = in[biggest:]
if len(in) < 127 {
if cap(w.stash) < len(in) {
w.stash = make([]byte, 0, len(in))
}
w.stash = w.stash[:len(in)]
copy(w.stash, in)
return len(p), nil
}
}
}
func (w *padWriter) Close() error {
if len(w.stash) > 0 {
return xerrors.Errorf("still have %d unprocessed bytes", len(w.stash))
}
// allow gc
w.stash = nil
w.work = nil
w.dst = nil
return nil
}

31
fr32/utils.go Normal file
View File

@ -0,0 +1,31 @@
package fr32
import (
"math/bits"
"github.com/filecoin-project/specs-actors/actors/abi"
)
func subPieces(in abi.UnpaddedPieceSize) []abi.UnpaddedPieceSize {
// Convert to in-sector bytes for easier math:
//
// (we convert to sector bytes as they are nice round binary numbers)
w := uint64(in.Padded())
out := make([]abi.UnpaddedPieceSize, bits.OnesCount64(w))
for i := range out {
// Extract the next lowest non-zero bit
next := bits.TrailingZeros64(w)
psize := uint64(1) << next
// e.g: if the number is 0b010100, psize will be 0b000100
// set that bit to 0 by XORing it, so the next iteration looks at the
// next bit
w ^= psize
// Add the piece size to the list of pieces we need to create
out[i] = abi.PaddedPieceSize(psize).Unpadded()
}
return out
}