fr32: io.Reader utils

This commit is contained in:
Łukasz Magiera 2020-05-29 00:17:23 +02:00
parent d38296a553
commit 55867ab48b
4 changed files with 190 additions and 15 deletions

View File

@ -8,19 +8,30 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
) )
var mtTresh = 32 << 20 var mtTresh = uint64(32 << 20)
func mt(in, out []byte, padLen int, op func(unpadded, padded []byte)) { func mtChunkCount(usz abi.PaddedPieceSize) uint64 {
threads := padLen / mtTresh threads := (uint64(usz)) / mtTresh
if threads > runtime.NumCPU() { if threads > uint64(runtime.NumCPU()) {
threads = 1 << (32 - bits.LeadingZeros32(uint32(runtime.NumCPU()))) threads = 1 << (32 - bits.LeadingZeros32(uint32(runtime.NumCPU())))
} }
threadBytes := abi.PaddedPieceSize(padLen / threads) if threads == 0 {
return 1
}
if threads > 64 {
return 64 // avoid too large buffers
}
return threads
}
func mt(in, out []byte, padLen int, op func(unpadded, padded []byte)) {
threads := mtChunkCount(abi.PaddedPieceSize(padLen))
threadBytes := abi.PaddedPieceSize(padLen / int(threads))
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(threads) wg.Add(int(threads))
for i := 0; i < threads; i++ { for i := 0; i < int(threads); i++ {
go func(thread int) { go func(thread int) {
defer wg.Done() defer wg.Done()
@ -35,7 +46,7 @@ func mt(in, out []byte, padLen int, op func(unpadded, padded []byte)) {
// Assumes len(in)%127==0 and len(out)%128==0 // Assumes len(in)%127==0 and len(out)%128==0
func Pad(in, out []byte) { func Pad(in, out []byte) {
if len(out) > mtTresh { if len(out) > int(mtTresh) {
mt(in, out, len(out), pad) mt(in, out, len(out), pad)
return return
} }
@ -44,7 +55,7 @@ func Pad(in, out []byte) {
} }
func pad(in, out []byte) { func pad(in, out []byte) {
if len(out) > mtTresh { if len(out) > int(mtTresh) {
mt(in, out, len(out), Pad) mt(in, out, len(out), Pad)
return return
} }
@ -88,11 +99,9 @@ func pad(in, out []byte) {
} }
} }
// Assumes len(in)%128==0 and len(out)%127==0 // Assumes len(in)%128==0 and len(out)%127==0
func Unpad(in []byte, out []byte) { func Unpad(in []byte, out []byte) {
if len(in) > mtTresh { if len(in) > int(mtTresh) {
mt(out, in, len(in), unpad) mt(out, in, len(in), unpad)
return return
} }

View File

@ -55,12 +55,11 @@ func TestWriteTwoPcs(t *testing.T) {
panic(err) panic(err)
} }
outBytes := make([]byte, int(paddedSize) * n) outBytes := make([]byte, int(paddedSize)*n)
Pad(rawBytes, outBytes) Pad(rawBytes, outBytes)
require.Equal(t, ffiBytes, outBytes) require.Equal(t, ffiBytes, outBytes)
unpadBytes := make([]byte, int(paddedSize.Unpadded()) * n) unpadBytes := make([]byte, int(paddedSize.Unpadded())*n)
Unpad(ffiBytes, unpadBytes) Unpad(ffiBytes, unpadBytes)
require.Equal(t, rawBytes, unpadBytes) require.Equal(t, rawBytes, unpadBytes)
} }

114
fr32/readers.go Normal file
View File

@ -0,0 +1,114 @@
package fr32
import (
"io"
"math/bits"
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi"
)
type padReader struct {
src io.Reader
left uint64
work []byte
}
func NewPadReader(src io.Reader, sz abi.UnpaddedPieceSize) (io.Reader, error) {
if err := sz.Validate(); err != nil {
return nil, xerrors.Errorf("bad piece size: %w", err)
}
buf := make([]byte, mtTresh*mtChunkCount(sz.Padded()))
return &padReader{
src: src,
left: uint64(sz.Padded()),
work: buf,
}, nil
}
func (r *padReader) Read(out []byte) (int, error) {
if r.left == 0 {
return 0, io.EOF
}
outTwoPow := 1 << (63 - bits.LeadingZeros64(uint64(len(out))))
if err := abi.PaddedPieceSize(outTwoPow).Validate(); err != nil {
return 0, xerrors.Errorf("output must be of valid padded piece size: %w", err)
}
todo := abi.PaddedPieceSize(outTwoPow).Unpadded()
if r.left < uint64(todo.Padded()) {
todo = abi.PaddedPieceSize(1 << (63 - bits.LeadingZeros64(r.left))).Unpadded()
}
r.left -= uint64(todo.Padded())
n, err := r.src.Read(r.work[:todo])
if err != nil && err != io.EOF {
return n, err
}
Pad(r.work[:todo], out[:todo.Padded()])
return int(todo.Padded()), err
}
type unpadReader struct {
src io.Reader
left uint64
work []byte
}
func NewUnpadReader(src io.Reader, sz abi.PaddedPieceSize) (io.Reader, error) {
if err := sz.Validate(); err != nil {
return nil, xerrors.Errorf("bad piece size: %w", err)
}
buf := make([]byte, mtTresh*mtChunkCount(sz))
return &unpadReader{
src: src,
left: uint64(sz),
work: buf,
}, nil
}
func (r *unpadReader) Read(out []byte) (int, error) {
if r.left == 0 {
return 0, io.EOF
}
outTwoPow := 1 << (63 - bits.LeadingZeros64(uint64(len(out))))
if err := abi.PaddedPieceSize(outTwoPow).Validate(); err != nil {
return 0, xerrors.Errorf("output must be of valid padded piece size: %w", err)
}
todo := abi.PaddedPieceSize(outTwoPow)
if r.left < uint64(todo) {
todo = abi.PaddedPieceSize(1 << (63 - bits.LeadingZeros64(r.left)))
}
r.left -= uint64(todo)
n, err := r.src.Read(r.work[:todo])
if err != nil && err != io.EOF {
return n, err
}
if n != int(todo) {
return 0, xerrors.Errorf("didn't read enough: %w", err)
}
Unpad(r.work[:todo], out[:todo.Unpadded()])
return int(todo.Unpadded()), err
}

53
fr32/readers_test.go Normal file
View File

@ -0,0 +1,53 @@
package fr32
import (
"bytes"
"io/ioutil"
"testing"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/specs-actors/actors/abi"
)
func TestPadReader(t *testing.T) {
ps := abi.PaddedPieceSize(64 << 20).Unpadded()
raw := bytes.Repeat([]byte{0x55}, int(ps))
r, err := NewPadReader(bytes.NewReader(raw), ps)
if err != nil {
t.Fatal(err)
}
readerPadded, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
padOut := make([]byte, ps.Padded())
Pad(raw, padOut)
require.Equal(t, padOut, readerPadded)
}
func TestUnpadReader(t *testing.T) {
ps := abi.PaddedPieceSize(64 << 20).Unpadded()
raw := bytes.Repeat([]byte{0x77}, int(ps))
padOut := make([]byte, ps.Padded())
Pad(raw, padOut)
r, err := NewUnpadReader(bytes.NewReader(padOut), ps.Padded())
if err != nil {
t.Fatal(err)
}
readered, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
require.Equal(t, raw, readered)
}