From c9c437cb3bb0098a2cac12544f915f35645c8e8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 25 Jan 2020 12:15:28 +0100 Subject: [PATCH 1/3] sealing: Parallel CommP calc in pledge sector --- storage/sealing/garbage.go | 54 ++++++++++++++++++++++++++++++++++---- 1 file changed, 49 insertions(+), 5 deletions(-) diff --git a/storage/sealing/garbage.go b/storage/sealing/garbage.go index 1c3925671..a6b439258 100644 --- a/storage/sealing/garbage.go +++ b/storage/sealing/garbage.go @@ -6,14 +6,61 @@ import ( "io" "math" "math/rand" + "runtime" + "sync" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" + "github.com/hashicorp/go-multierror" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/types" ) +func (m *Sealing) fastPledgeCommitment(size uint64, parts uint64) (commP [sectorbuilder.CommLen]byte, err error) { + piece := sectorbuilder.UserBytesForSectorSize((size / 127 + size) / parts) + out := make([]sectorbuilder.PublicPieceInfo, parts) + var lk sync.Mutex + + var wg sync.WaitGroup + wg.Add(int(parts)) + for i := uint64(0); i < parts; i++ { + go func(i uint64) { + defer wg.Done() + + commP, perr := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42 + int64(i))), int64(piece)), piece) + + lk.Lock() + if perr != nil { + err = multierror.Append(err, perr) + } + out[i] = sectorbuilder.PublicPieceInfo{ + Size: piece, + CommP: commP, + } + lk.Unlock() + }(i) + } + wg.Wait() + + if err != nil { + return [32]byte{}, err + } + + return sectorbuilder.GenerateDataCommitment(m.sb.SectorSize(), out) +} + +func (m *Sealing) pledgeReader(size uint64, parts uint64) io.Reader { + piece := sectorbuilder.UserBytesForSectorSize((size / 127 + size) / parts) + + readers := make([]io.Reader, parts) + for i := range readers { + readers[i] = io.LimitReader(rand.New(rand.NewSource(42 + int64(i))), int64(piece)) + } + + return io.MultiReader(readers...) +} + func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPieceSizes []uint64, sizes ...uint64) ([]Piece, error) { if len(sizes) == 0 { return nil, nil @@ -21,10 +68,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie deals := make([]actors.StorageDealProposal, len(sizes)) for i, size := range sizes { - release := m.sb.RateLimit() - commP, err := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), size) - release() - + commP, err := m.fastPledgeCommitment(size, uint64(runtime.NumCPU())) if err != nil { return nil, err } @@ -81,7 +125,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie out := make([]Piece, len(sizes)) for i, size := range sizes { - ppi, err := m.sb.AddPiece(size, sectorID, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), existingPieceSizes) + ppi, err := m.sb.AddPiece(size, sectorID, m.pledgeReader(size, uint64(runtime.NumCPU())), existingPieceSizes) if err != nil { return nil, err } From 25f544f3db5f3683efcf5241bd07d3232b942af6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 29 Jan 2020 20:12:08 +0100 Subject: [PATCH 2/3] sealing: round parts in fastPledgeCommitment --- storage/sealing/garbage.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/storage/sealing/garbage.go b/storage/sealing/garbage.go index a6b439258..d33b63276 100644 --- a/storage/sealing/garbage.go +++ b/storage/sealing/garbage.go @@ -5,6 +5,7 @@ import ( "context" "io" "math" + "math/bits" "math/rand" "runtime" "sync" @@ -18,7 +19,9 @@ import ( ) func (m *Sealing) fastPledgeCommitment(size uint64, parts uint64) (commP [sectorbuilder.CommLen]byte, err error) { - piece := sectorbuilder.UserBytesForSectorSize((size / 127 + size) / parts) + parts = 1 << bits.Len64(parts) // round down to nearest power of 2 + + piece := sectorbuilder.UserBytesForSectorSize((size/127 + size) / parts) out := make([]sectorbuilder.PublicPieceInfo, parts) var lk sync.Mutex @@ -28,7 +31,7 @@ func (m *Sealing) fastPledgeCommitment(size uint64, parts uint64) (commP [sector go func(i uint64) { defer wg.Done() - commP, perr := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42 + int64(i))), int64(piece)), piece) + commP, perr := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42+int64(i))), int64(piece)), piece) lk.Lock() if perr != nil { @@ -51,11 +54,11 @@ func (m *Sealing) fastPledgeCommitment(size uint64, parts uint64) (commP [sector } func (m *Sealing) pledgeReader(size uint64, parts uint64) io.Reader { - piece := sectorbuilder.UserBytesForSectorSize((size / 127 + size) / parts) + piece := sectorbuilder.UserBytesForSectorSize((size/127 + size) / parts) readers := make([]io.Reader, parts) for i := range readers { - readers[i] = io.LimitReader(rand.New(rand.NewSource(42 + int64(i))), int64(piece)) + readers[i] = io.LimitReader(rand.New(rand.NewSource(42+int64(i))), int64(piece)) } return io.MultiReader(readers...) From ba542cf67a97d105bdc0875da8da29891f3df72c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 29 Jan 2020 21:01:20 +0100 Subject: [PATCH 3/3] test fastPledgeCommitment --- storage/sealing/garbage.go | 38 --------------------------------- storage/sealing/utils.go | 40 +++++++++++++++++++++++++++++++++++ storage/sealing/utils_test.go | 11 +++++++++- 3 files changed, 50 insertions(+), 39 deletions(-) diff --git a/storage/sealing/garbage.go b/storage/sealing/garbage.go index d33b63276..4a3b1331b 100644 --- a/storage/sealing/garbage.go +++ b/storage/sealing/garbage.go @@ -5,54 +5,16 @@ import ( "context" "io" "math" - "math/bits" "math/rand" "runtime" - "sync" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" - "github.com/hashicorp/go-multierror" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/types" ) -func (m *Sealing) fastPledgeCommitment(size uint64, parts uint64) (commP [sectorbuilder.CommLen]byte, err error) { - parts = 1 << bits.Len64(parts) // round down to nearest power of 2 - - piece := sectorbuilder.UserBytesForSectorSize((size/127 + size) / parts) - out := make([]sectorbuilder.PublicPieceInfo, parts) - var lk sync.Mutex - - var wg sync.WaitGroup - wg.Add(int(parts)) - for i := uint64(0); i < parts; i++ { - go func(i uint64) { - defer wg.Done() - - commP, perr := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42+int64(i))), int64(piece)), piece) - - lk.Lock() - if perr != nil { - err = multierror.Append(err, perr) - } - out[i] = sectorbuilder.PublicPieceInfo{ - Size: piece, - CommP: commP, - } - lk.Unlock() - }(i) - } - wg.Wait() - - if err != nil { - return [32]byte{}, err - } - - return sectorbuilder.GenerateDataCommitment(m.sb.SectorSize(), out) -} - func (m *Sealing) pledgeReader(size uint64, parts uint64) io.Reader { piece := sectorbuilder.UserBytesForSectorSize((size/127 + size) / parts) diff --git a/storage/sealing/utils.go b/storage/sealing/utils.go index 8fa887d3c..b235a8a83 100644 --- a/storage/sealing/utils.go +++ b/storage/sealing/utils.go @@ -1,7 +1,12 @@ package sealing import ( + "io" "math/bits" + "math/rand" + "sync" + + "github.com/hashicorp/go-multierror" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" ) @@ -42,6 +47,41 @@ func fillersFromRem(toFill uint64) ([]uint64, error) { return out, nil } +func (m *Sealing) fastPledgeCommitment(size uint64, parts uint64) (commP [sectorbuilder.CommLen]byte, err error) { + parts = 1 << bits.Len64(parts) // round down to nearest power of 2 + + piece := sectorbuilder.UserBytesForSectorSize(size / parts) + out := make([]sectorbuilder.PublicPieceInfo, parts) + var lk sync.Mutex + + var wg sync.WaitGroup + wg.Add(int(parts)) + for i := uint64(0); i < parts; i++ { + go func(i uint64) { + defer wg.Done() + + commP, perr := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42+int64(i))), int64(piece)), piece) + + lk.Lock() + if perr != nil { + err = multierror.Append(err, perr) + } + out[i] = sectorbuilder.PublicPieceInfo{ + Size: piece, + CommP: commP, + } + lk.Unlock() + }(i) + } + wg.Wait() + + if err != nil { + return [32]byte{}, err + } + + return sectorbuilder.GenerateDataCommitment(m.sb.SectorSize(), out) +} + func (m *Sealing) ListSectors() ([]SectorInfo, error) { var sectors []SectorInfo if err := m.sectors.List(§ors); err != nil { diff --git a/storage/sealing/utils_test.go b/storage/sealing/utils_test.go index 02746a3d8..14d512a52 100644 --- a/storage/sealing/utils_test.go +++ b/storage/sealing/utils_test.go @@ -1,6 +1,7 @@ package sealing import ( + "github.com/filecoin-project/lotus/storage/sbmock" "testing" "github.com/stretchr/testify/assert" @@ -42,5 +43,13 @@ func TestFillersFromRem(t *testing.T) { ub = sectorbuilder.UserBytesForSectorSize(uint64(9) << i) testFill(t, ub, []uint64{ub1, ub4}) } - +} + +func TestFastPledge(t *testing.T) { + sz := uint64(16 << 20) + + s := Sealing{sb: sbmock.NewMockSectorBuilder(0, sz)} + if _, err := s.fastPledgeCommitment(sz, 5); err != nil { + t.Fatalf("%+v", err) + } }