diff --git a/ffiwrapper/files.go b/ffiwrapper/files.go index 30e4a6803..a13776d2d 100644 --- a/ffiwrapper/files.go +++ b/ffiwrapper/files.go @@ -8,7 +8,7 @@ import ( "golang.org/x/xerrors" ) -func toReadableFile(r io.Reader, n int64) (*os.File, func() error, error) { +func ToReadableFile(r io.Reader, n int64) (*os.File, func() error, error) { f, ok := r.(*os.File) if ok { return f, func() error { return nil }, nil diff --git a/ffiwrapper/sealer_cgo.go b/ffiwrapper/sealer_cgo.go index 382d3853f..b9b7975ab 100644 --- a/ffiwrapper/sealer_cgo.go +++ b/ffiwrapper/sealer_cgo.go @@ -105,7 +105,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie return abi.PieceInfo{}, xerrors.Errorf("getting partial file writer: %w", err) } pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), w) - prf, werr, err := toReadableFile(pr, int64(pieceSize)) + prf, werr, err := ToReadableFile(pr, int64(pieceSize)) if err != nil { return abi.PieceInfo{}, xerrors.Errorf("getting tee reader pipe: %w", err) } @@ -405,7 +405,7 @@ func (sb *Sealer) rewriteAsPadded(unsealed string, staged string) error { // OPTIMIZATION: upr is a file, so it could be passed straight to // WriteWithAlignment IF it wouldn't care about the trailer - lupr, werr, err := toReadableFile(io.LimitReader(upr, int64(maxPieceSize)), int64(maxPieceSize)) + lupr, werr, err := ToReadableFile(io.LimitReader(upr, int64(maxPieceSize)), int64(maxPieceSize)) if err != nil { return err } @@ -489,7 +489,7 @@ func (sb *Sealer) FinalizeSector(ctx context.Context, sector abi.SectorID) error } func GeneratePieceCIDFromFile(proofType abi.RegisteredProof, piece io.Reader, pieceSize abi.UnpaddedPieceSize) (cid.Cid, error) { - f, werr, err := toReadableFile(piece, int64(pieceSize)) + f, werr, err := ToReadableFile(piece, int64(pieceSize)) if err != nil { return cid.Undef, err } diff --git a/ffiwrapper/sealer_test.go b/ffiwrapper/sealer_test.go index 9c73b2496..9af563dc3 100644 --- a/ffiwrapper/sealer_test.go +++ b/ffiwrapper/sealer_test.go @@ -16,12 +16,13 @@ import ( logging "github.com/ipfs/go-log" "golang.org/x/xerrors" + ffi "github.com/filecoin-project/filecoin-ffi" paramfetch "github.com/filecoin-project/go-paramfetch" "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/sector-storage/ffiwrapper/basicfs" "github.com/filecoin-project/sector-storage/stores" - "github.com/filecoin-project/specs-storage/storage" ) func init() { @@ -414,3 +415,18 @@ func TestSealAndVerify2(t *testing.T) { post(t, sb, s1, s2) } + +func BenchmarkWriteWithAlignment(b *testing.B) { + bt := abi.UnpaddedPieceSize(2 * 127 * 1024 * 1024) + b.SetBytes(int64(bt)) + + for i := 0; i < b.N; i++ { + b.StopTimer() + rf, w, _ := ToReadableFile(bytes.NewReader(bytes.Repeat([]byte{0xff, 0}, int(bt/2))), int64(bt)) + tf, _ := ioutil.TempFile("/tmp/", "scrb-") + b.StartTimer() + + ffi.WriteWithAlignment(abi.RegisteredProof_StackedDRG2KiBSeal, rf, bt, tf, nil) + w() + } +} diff --git a/fr32/fr32.go b/fr32/fr32.go new file mode 100644 index 000000000..cb2dcaed6 --- /dev/null +++ b/fr32/fr32.go @@ -0,0 +1,138 @@ +package fr32 + +import ( + "math/bits" + "runtime" + "sync" + + "github.com/filecoin-project/specs-actors/actors/abi" +) + +var mtTresh = 32 << 20 + +func mt(in, out []byte, padLen int, op func(in, out []byte)) { + threads := padLen / mtTresh + if threads > runtime.NumCPU() { + threads = 1 << (32 - bits.LeadingZeros32(uint32(runtime.NumCPU()))) + } + threadBytes := abi.PaddedPieceSize(padLen / threads) + + var wg sync.WaitGroup + wg.Add(threads) + + for i := 0; i < threads; i++ { + go func(thread int) { + defer wg.Done() + + start := threadBytes * abi.PaddedPieceSize(thread) + end := start + threadBytes + + op(in[start.Unpadded():end.Unpadded()], out[start:end]) + }(i) + } + wg.Wait() +} + +// Assumes len(in)%127==0 and len(out)%128==0 +func Pad(in, out []byte) { + if len(out) > mtTresh { + mt(in, out, len(out), Pad) + return + } + + chunks := len(out) / 128 + for chunk := 0; chunk < chunks; chunk++ { + inOff := chunk * 127 + outOff := chunk * 128 + + copy(out[outOff:outOff+31], in[inOff:inOff+31]) + + t := in[inOff+31] >> 6 + out[outOff+31] = in[inOff+31] & 0x3f + var v byte + + for i := 32; i < 64; i++ { + v = in[inOff+i] + out[outOff+i] = (v << 2) | t + t = v >> 6 + } + + t = v >> 4 + out[outOff+63] &= 0x3f + + for i := 64; i < 96; i++ { + v = in[inOff+i] + out[outOff+i] = (v << 4) | t + t = v >> 4 + } + + t = v >> 2 + out[outOff+95] &= 0x3f + + for i := 96; i < 127; i++ { + v = in[inOff+i] + out[outOff+i] = (v << 6) | t + t = v >> 2 + } + + out[outOff+127] = t & 0x3f + } +} + +// Assumes len(in)%128==0 and len(out)%127==0 +func Unpad(in []byte, out []byte) { + if len(out) > mtTresh { + mt(in, out, len(in), Unpad) + return + } + + chunks := len(in) / 128 + for chunk := 0; chunk < chunks; chunk++ { + inOffNext := chunk*128 + 1 + outOff := chunk * 127 + + at := in[chunk*128] + + for i := 0; i < 32; i++ { + next := in[i+inOffNext] + + out[outOff+i] = at + //out[i] |= next << 8 + + at = next + } + + out[outOff+31] |= at << 6 + + for i := 32; i < 64; i++ { + next := in[i+inOffNext] + + out[outOff+i] = at >> 2 + out[outOff+i] |= next << 6 + + at = next + } + + out[outOff+63] ^= (at << 6) ^ (at << 4) + + for i := 64; i < 96; i++ { + next := in[i+inOffNext] + + out[outOff+i] = at >> 4 + out[outOff+i] |= next << 4 + + at = next + } + + out[outOff+95] ^= (at << 4) ^ (at << 2) + + for i := 96; i < 127; i++ { + next := in[i+inOffNext] + + out[outOff+i] = at >> 6 + out[outOff+i] |= next << 2 + + at = next + } + } +} diff --git a/fr32/fr32_test.go b/fr32/fr32_test.go new file mode 100644 index 000000000..df500035d --- /dev/null +++ b/fr32/fr32_test.go @@ -0,0 +1,248 @@ +package fr32 + +import ( + "bytes" + "io" + "io/ioutil" + "math/rand" + "os" + "testing" + + ffi "github.com/filecoin-project/filecoin-ffi" + "github.com/filecoin-project/sector-storage/ffiwrapper" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/stretchr/testify/require" +) + +func padFFI(buf []byte) []byte { + rf, w, _ := ffiwrapper.ToReadableFile(bytes.NewReader(buf), int64(len(buf))) + tf, _ := ioutil.TempFile("/tmp/", "scrb-") + + _, _, _, err := ffi.WriteWithAlignment(abi.RegisteredProof_StackedDRG32GiBSeal, rf, abi.UnpaddedPieceSize(len(buf)), tf, nil) + if err != nil { + panic(err) + } + if err := w(); err != nil { + panic(err) + } + + if _, err := tf.Seek(io.SeekStart, 0); err != nil { + panic(err) + } + + padded, err := ioutil.ReadAll(tf) + if err != nil { + panic(err) + } + + if err := tf.Close(); err != nil { + panic(err) + } + + if err := os.Remove(tf.Name()); err != nil { + panic(err) + } + + return padded +} + +func TestPadChunkFFI(t *testing.T) { + testByteChunk := func(b byte) func(*testing.T) { + return func(t *testing.T) { + var buf [128]byte + copy(buf[:], bytes.Repeat([]byte{b}, 127)) + + Pad(buf[:], buf[:]) + + expect := padFFI(bytes.Repeat([]byte{b}, 127)) + + require.Equal(t, expect, buf[:]) + } + } + + t.Run("ones", testByteChunk(0xff)) + t.Run("lsb1", testByteChunk(0x01)) + t.Run("msb1", testByteChunk(0x80)) + t.Run("zero", testByteChunk(0x0)) + t.Run("mid", testByteChunk(0x3c)) +} + +func TestPadChunkRandEqFFI(t *testing.T) { + for i := 0; i < 200; i++ { + var input [127]byte + rand.Read(input[:]) + + var buf [128]byte + + Pad(input[:], buf[:]) + + expect := padFFI(input[:]) + + require.Equal(t, expect, buf[:]) + } +} + +func TestRoundtrip(t *testing.T) { + testByteChunk := func(b byte) func(*testing.T) { + return func(t *testing.T) { + var buf [128]byte + input := bytes.Repeat([]byte{0x01}, 127) + + Pad(input, buf[:]) + + var out [127]byte + Unpad(buf[:], out[:]) + + require.Equal(t, input, out[:]) + } + } + + t.Run("ones", testByteChunk(0xff)) + t.Run("lsb1", testByteChunk(0x01)) + t.Run("msb1", testByteChunk(0x80)) + t.Run("zero", testByteChunk(0x0)) + t.Run("mid", testByteChunk(0x3c)) +} + +func TestRoundtripChunkRand(t *testing.T) { + for i := 0; i < 200; i++ { + var input [127]byte + rand.Read(input[:]) + + var buf [128]byte + copy(buf[:], input[:]) + + Pad(buf[:], buf[:]) + + var out [127]byte + Unpad(buf[:], out[:]) + + require.Equal(t, input[:], out[:]) + } +} + +func TestRoundtrip16MRand(t *testing.T) { + up := abi.PaddedPieceSize(16 << 20).Unpadded() + + input := make([]byte, up) + rand.Read(input[:]) + + buf := make([]byte, 16<<20) + + Pad(input, buf) + + out := make([]byte, up) + Unpad(buf, out) + + require.Equal(t, input, out) + + ffi := padFFI(input) + require.Equal(t, ffi, buf) +} + +func BenchmarkPadChunk(b *testing.B) { + var buf [128]byte + in := bytes.Repeat([]byte{0xff}, 127) + + b.SetBytes(127) + + for i := 0; i < b.N; i++ { + Pad(in, buf[:]) + } +} + +func BenchmarkChunkRoundtrip(b *testing.B) { + var buf [128]byte + copy(buf[:], bytes.Repeat([]byte{0xff}, 127)) + var out [127]byte + + b.SetBytes(127) + + for i := 0; i < b.N; i++ { + Pad(buf[:], buf[:]) + Unpad(buf[:], out[:]) + } +} + +func BenchmarkUnpadChunk(b *testing.B) { + var buf [128]byte + copy(buf[:], bytes.Repeat([]byte{0xff}, 127)) + + Pad(buf[:], buf[:]) + var out [127]byte + + b.SetBytes(127) + b.ReportAllocs() + + bs := buf[:] + + for i := 0; i < b.N; i++ { + Unpad(bs, out[:]) + } +} + +func BenchmarkUnpad16MChunk(b *testing.B) { + up := abi.PaddedPieceSize(16 << 20).Unpadded() + + var buf [16 << 20]byte + + Pad(bytes.Repeat([]byte{0xff}, int(up)), buf[:]) + var out [16 << 20]byte + + b.SetBytes(16 << 20) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + Unpad(buf[:], out[:]) + } +} + +func BenchmarkPad16MChunk(b *testing.B) { + up := abi.PaddedPieceSize(16 << 20).Unpadded() + + var buf [16 << 20]byte + + in := bytes.Repeat([]byte{0xff}, int(up)) + + b.SetBytes(16 << 20) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + Pad(in, buf[:]) + } +} + +func BenchmarkPad1GChunk(b *testing.B) { + up := abi.PaddedPieceSize(1 << 30).Unpadded() + + var buf [1 << 30]byte + + in := bytes.Repeat([]byte{0xff}, int(up)) + + b.SetBytes(1 << 30) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + Pad(in, buf[:]) + } +} + +func BenchmarkUnpad1GChunk(b *testing.B) { + up := abi.PaddedPieceSize(1 << 30).Unpadded() + + var buf [1 << 30]byte + + Pad(bytes.Repeat([]byte{0xff}, int(up)), buf[:]) + var out [1 << 30]byte + + b.SetBytes(1 << 30) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + Unpad(buf[:], out[:]) + } +}