From 55867ab48b7037b800c5838f7cce1c462dba0c68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 29 May 2020 00:17:23 +0200 Subject: [PATCH] fr32: io.Reader utils --- fr32/fr32.go | 33 +++++++---- fr32/fr32_ffi_cmp_test.go | 5 +- fr32/readers.go | 114 ++++++++++++++++++++++++++++++++++++++ fr32/readers_test.go | 53 ++++++++++++++++++ 4 files changed, 190 insertions(+), 15 deletions(-) create mode 100644 fr32/readers.go create mode 100644 fr32/readers_test.go diff --git a/fr32/fr32.go b/fr32/fr32.go index 06579dd0d..20d158b3d 100644 --- a/fr32/fr32.go +++ b/fr32/fr32.go @@ -8,19 +8,30 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" ) -var mtTresh = 32 << 20 +var mtTresh = uint64(32 << 20) -func mt(in, out []byte, padLen int, op func(unpadded, padded []byte)) { - threads := padLen / mtTresh - if threads > runtime.NumCPU() { +func mtChunkCount(usz abi.PaddedPieceSize) uint64 { + threads := (uint64(usz)) / mtTresh + if threads > uint64(runtime.NumCPU()) { threads = 1 << (32 - bits.LeadingZeros32(uint32(runtime.NumCPU()))) } - threadBytes := abi.PaddedPieceSize(padLen / threads) + if threads == 0 { + return 1 + } + if threads > 64 { + return 64 // avoid too large buffers + } + return threads +} + +func mt(in, out []byte, padLen int, op func(unpadded, padded []byte)) { + threads := mtChunkCount(abi.PaddedPieceSize(padLen)) + threadBytes := abi.PaddedPieceSize(padLen / int(threads)) var wg sync.WaitGroup - wg.Add(threads) + wg.Add(int(threads)) - for i := 0; i < threads; i++ { + for i := 0; i < int(threads); i++ { go func(thread int) { defer wg.Done() @@ -35,7 +46,7 @@ func mt(in, out []byte, padLen int, op func(unpadded, padded []byte)) { // Assumes len(in)%127==0 and len(out)%128==0 func Pad(in, out []byte) { - if len(out) > mtTresh { + if len(out) > int(mtTresh) { mt(in, out, len(out), pad) return } @@ -44,7 +55,7 @@ func Pad(in, out []byte) { } func pad(in, out []byte) { - if len(out) > mtTresh { + if len(out) > int(mtTresh) { mt(in, out, len(out), Pad) return } @@ -88,11 +99,9 @@ func pad(in, out []byte) { } } - - // Assumes len(in)%128==0 and len(out)%127==0 func Unpad(in []byte, out []byte) { - if len(in) > mtTresh { + if len(in) > int(mtTresh) { mt(out, in, len(in), unpad) return } diff --git a/fr32/fr32_ffi_cmp_test.go b/fr32/fr32_ffi_cmp_test.go index 24bb24d49..ece13051d 100644 --- a/fr32/fr32_ffi_cmp_test.go +++ b/fr32/fr32_ffi_cmp_test.go @@ -55,12 +55,11 @@ func TestWriteTwoPcs(t *testing.T) { panic(err) } - outBytes := make([]byte, int(paddedSize) * n) + outBytes := make([]byte, int(paddedSize)*n) Pad(rawBytes, outBytes) require.Equal(t, ffiBytes, outBytes) - unpadBytes := make([]byte, int(paddedSize.Unpadded()) * n) + unpadBytes := make([]byte, int(paddedSize.Unpadded())*n) Unpad(ffiBytes, unpadBytes) require.Equal(t, rawBytes, unpadBytes) } - diff --git a/fr32/readers.go b/fr32/readers.go new file mode 100644 index 000000000..b7fdc3843 --- /dev/null +++ b/fr32/readers.go @@ -0,0 +1,114 @@ +package fr32 + +import ( + "io" + "math/bits" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/specs-actors/actors/abi" +) + +type padReader struct { + src io.Reader + + left uint64 + work []byte +} + +func NewPadReader(src io.Reader, sz abi.UnpaddedPieceSize) (io.Reader, error) { + if err := sz.Validate(); err != nil { + return nil, xerrors.Errorf("bad piece size: %w", err) + } + + buf := make([]byte, mtTresh*mtChunkCount(sz.Padded())) + + return &padReader{ + src: src, + + left: uint64(sz.Padded()), + work: buf, + }, nil +} + +func (r *padReader) Read(out []byte) (int, error) { + if r.left == 0 { + return 0, io.EOF + } + + outTwoPow := 1 << (63 - bits.LeadingZeros64(uint64(len(out)))) + + if err := abi.PaddedPieceSize(outTwoPow).Validate(); err != nil { + return 0, xerrors.Errorf("output must be of valid padded piece size: %w", err) + } + + todo := abi.PaddedPieceSize(outTwoPow).Unpadded() + if r.left < uint64(todo.Padded()) { + todo = abi.PaddedPieceSize(1 << (63 - bits.LeadingZeros64(r.left))).Unpadded() + } + + r.left -= uint64(todo.Padded()) + + n, err := r.src.Read(r.work[:todo]) + if err != nil && err != io.EOF { + return n, err + } + + Pad(r.work[:todo], out[:todo.Padded()]) + + return int(todo.Padded()), err +} + +type unpadReader struct { + src io.Reader + + left uint64 + work []byte +} + +func NewUnpadReader(src io.Reader, sz abi.PaddedPieceSize) (io.Reader, error) { + if err := sz.Validate(); err != nil { + return nil, xerrors.Errorf("bad piece size: %w", err) + } + + buf := make([]byte, mtTresh*mtChunkCount(sz)) + + return &unpadReader{ + src: src, + + left: uint64(sz), + work: buf, + }, nil +} + +func (r *unpadReader) Read(out []byte) (int, error) { + if r.left == 0 { + return 0, io.EOF + } + + outTwoPow := 1 << (63 - bits.LeadingZeros64(uint64(len(out)))) + + if err := abi.PaddedPieceSize(outTwoPow).Validate(); err != nil { + return 0, xerrors.Errorf("output must be of valid padded piece size: %w", err) + } + + todo := abi.PaddedPieceSize(outTwoPow) + if r.left < uint64(todo) { + todo = abi.PaddedPieceSize(1 << (63 - bits.LeadingZeros64(r.left))) + } + + r.left -= uint64(todo) + + n, err := r.src.Read(r.work[:todo]) + if err != nil && err != io.EOF { + return n, err + } + + if n != int(todo) { + return 0, xerrors.Errorf("didn't read enough: %w", err) + } + + Unpad(r.work[:todo], out[:todo.Unpadded()]) + + return int(todo.Unpadded()), err +} diff --git a/fr32/readers_test.go b/fr32/readers_test.go new file mode 100644 index 000000000..b987e8287 --- /dev/null +++ b/fr32/readers_test.go @@ -0,0 +1,53 @@ +package fr32 + +import ( + "bytes" + "io/ioutil" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/specs-actors/actors/abi" +) + +func TestPadReader(t *testing.T) { + ps := abi.PaddedPieceSize(64 << 20).Unpadded() + + raw := bytes.Repeat([]byte{0x55}, int(ps)) + + r, err := NewPadReader(bytes.NewReader(raw), ps) + if err != nil { + t.Fatal(err) + } + + readerPadded, err := ioutil.ReadAll(r) + if err != nil { + t.Fatal(err) + } + + padOut := make([]byte, ps.Padded()) + Pad(raw, padOut) + + require.Equal(t, padOut, readerPadded) +} + +func TestUnpadReader(t *testing.T) { + ps := abi.PaddedPieceSize(64 << 20).Unpadded() + + raw := bytes.Repeat([]byte{0x77}, int(ps)) + + padOut := make([]byte, ps.Padded()) + Pad(raw, padOut) + + r, err := NewUnpadReader(bytes.NewReader(padOut), ps.Padded()) + if err != nil { + t.Fatal(err) + } + + readered, err := ioutil.ReadAll(r) + if err != nil { + t.Fatal(err) + } + + require.Equal(t, raw, readered) +}