From f6b12a33a2b4c9d847ea2c5a4d522bac61202dfb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 25 Jan 2020 12:15:28 +0100 Subject: [PATCH 1/5] sealing: Parallel CommP calc in pledge sector --- garbage.go | 54 +++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 49 insertions(+), 5 deletions(-) diff --git a/garbage.go b/garbage.go index 1c3925671..a6b439258 100644 --- a/garbage.go +++ b/garbage.go @@ -6,14 +6,61 @@ import ( "io" "math" "math/rand" + "runtime" + "sync" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" + "github.com/hashicorp/go-multierror" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/types" ) +func (m *Sealing) fastPledgeCommitment(size uint64, parts uint64) (commP [sectorbuilder.CommLen]byte, err error) { + piece := sectorbuilder.UserBytesForSectorSize((size / 127 + size) / parts) + out := make([]sectorbuilder.PublicPieceInfo, parts) + var lk sync.Mutex + + var wg sync.WaitGroup + wg.Add(int(parts)) + for i := uint64(0); i < parts; i++ { + go func(i uint64) { + defer wg.Done() + + commP, perr := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42 + int64(i))), int64(piece)), piece) + + lk.Lock() + if perr != nil { + err = multierror.Append(err, perr) + } + out[i] = sectorbuilder.PublicPieceInfo{ + Size: piece, + CommP: commP, + } + lk.Unlock() + }(i) + } + wg.Wait() + + if err != nil { + return [32]byte{}, err + } + + return sectorbuilder.GenerateDataCommitment(m.sb.SectorSize(), out) +} + +func (m *Sealing) pledgeReader(size uint64, parts uint64) io.Reader { + piece := sectorbuilder.UserBytesForSectorSize((size / 127 + size) / parts) + + readers := make([]io.Reader, parts) + for i := range readers { + readers[i] = io.LimitReader(rand.New(rand.NewSource(42 + int64(i))), int64(piece)) + } + + return io.MultiReader(readers...) +} + func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPieceSizes []uint64, sizes ...uint64) ([]Piece, error) { if len(sizes) == 0 { return nil, nil @@ -21,10 +68,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie deals := make([]actors.StorageDealProposal, len(sizes)) for i, size := range sizes { - release := m.sb.RateLimit() - commP, err := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), size) - release() - + commP, err := m.fastPledgeCommitment(size, uint64(runtime.NumCPU())) if err != nil { return nil, err } @@ -81,7 +125,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie out := make([]Piece, len(sizes)) for i, size := range sizes { - ppi, err := m.sb.AddPiece(size, sectorID, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), existingPieceSizes) + ppi, err := m.sb.AddPiece(size, sectorID, m.pledgeReader(size, uint64(runtime.NumCPU())), existingPieceSizes) if err != nil { return nil, err } From e62515c04254584c4787170a63cf5adf5bf877cb Mon Sep 17 00:00:00 2001 From: laser Date: Tue, 28 Jan 2020 11:46:26 -0800 Subject: [PATCH 2/5] planCommitting must handle SectorCommitFailed The SectorCommitFailed struct can be created from within Sealing#handleCommitting, and is created if actors.SerializeParams(params) produces an error or if m.api.MpoolPushMessage(ctx.Context(), msg) produces an error. --- fsm.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fsm.go b/fsm.go index ad0803488..4fdd81d35 100644 --- a/fsm.go +++ b/fsm.go @@ -205,6 +205,8 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error { state.State = api.SealCommitFailed case SectorSealFailed: state.State = api.CommitFailed + case SectorCommitFailed: + state.State = api.CommitFailed default: return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events) } From debf107b5426d970115e49018fadaa5ba9106d0c Mon Sep 17 00:00:00 2001 From: laser Date: Tue, 28 Jan 2020 12:39:07 -0800 Subject: [PATCH 3/5] write basic test affirming state change --- fsm_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/fsm_test.go b/fsm_test.go index 2dada5470..7430bb634 100644 --- a/fsm_test.go +++ b/fsm_test.go @@ -83,3 +83,17 @@ func TestSeedRevert(t *testing.T) { m.planSingle(SectorProving{}) require.Equal(m.t, m.state.State, api.Proving) } + +func TestPlanCommittingHandlesSectorCommitFailed(t *testing.T) { + m := test{ + s: &Sealing{}, + t: t, + state: &SectorInfo{State: api.Committing}, + } + + events := []statemachine.Event{{SectorCommitFailed{}}} + + require.NoError(t, planCommitting(events, m.state)) + + require.Equal(t, api.SectorStates[api.CommitFailed], api.SectorStates[m.state.State]) +} From 367062aee4651c83777771ad3f09de37a4be83bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 29 Jan 2020 20:12:08 +0100 Subject: [PATCH 4/5] sealing: round parts in fastPledgeCommitment --- garbage.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/garbage.go b/garbage.go index a6b439258..d33b63276 100644 --- a/garbage.go +++ b/garbage.go @@ -5,6 +5,7 @@ import ( "context" "io" "math" + "math/bits" "math/rand" "runtime" "sync" @@ -18,7 +19,9 @@ import ( ) func (m *Sealing) fastPledgeCommitment(size uint64, parts uint64) (commP [sectorbuilder.CommLen]byte, err error) { - piece := sectorbuilder.UserBytesForSectorSize((size / 127 + size) / parts) + parts = 1 << bits.Len64(parts) // round down to nearest power of 2 + + piece := sectorbuilder.UserBytesForSectorSize((size/127 + size) / parts) out := make([]sectorbuilder.PublicPieceInfo, parts) var lk sync.Mutex @@ -28,7 +31,7 @@ func (m *Sealing) fastPledgeCommitment(size uint64, parts uint64) (commP [sector go func(i uint64) { defer wg.Done() - commP, perr := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42 + int64(i))), int64(piece)), piece) + commP, perr := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42+int64(i))), int64(piece)), piece) lk.Lock() if perr != nil { @@ -51,11 +54,11 @@ func (m *Sealing) fastPledgeCommitment(size uint64, parts uint64) (commP [sector } func (m *Sealing) pledgeReader(size uint64, parts uint64) io.Reader { - piece := sectorbuilder.UserBytesForSectorSize((size / 127 + size) / parts) + piece := sectorbuilder.UserBytesForSectorSize((size/127 + size) / parts) readers := make([]io.Reader, parts) for i := range readers { - readers[i] = io.LimitReader(rand.New(rand.NewSource(42 + int64(i))), int64(piece)) + readers[i] = io.LimitReader(rand.New(rand.NewSource(42+int64(i))), int64(piece)) } return io.MultiReader(readers...) From 731660c76ffb406f80a0549f81e8cb4a4e2fb3e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 29 Jan 2020 21:01:20 +0100 Subject: [PATCH 5/5] test fastPledgeCommitment --- garbage.go | 38 -------------------------------------- utils.go | 40 ++++++++++++++++++++++++++++++++++++++++ utils_test.go | 11 ++++++++++- 3 files changed, 50 insertions(+), 39 deletions(-) diff --git a/garbage.go b/garbage.go index d33b63276..4a3b1331b 100644 --- a/garbage.go +++ b/garbage.go @@ -5,54 +5,16 @@ import ( "context" "io" "math" - "math/bits" "math/rand" "runtime" - "sync" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" - "github.com/hashicorp/go-multierror" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/types" ) -func (m *Sealing) fastPledgeCommitment(size uint64, parts uint64) (commP [sectorbuilder.CommLen]byte, err error) { - parts = 1 << bits.Len64(parts) // round down to nearest power of 2 - - piece := sectorbuilder.UserBytesForSectorSize((size/127 + size) / parts) - out := make([]sectorbuilder.PublicPieceInfo, parts) - var lk sync.Mutex - - var wg sync.WaitGroup - wg.Add(int(parts)) - for i := uint64(0); i < parts; i++ { - go func(i uint64) { - defer wg.Done() - - commP, perr := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42+int64(i))), int64(piece)), piece) - - lk.Lock() - if perr != nil { - err = multierror.Append(err, perr) - } - out[i] = sectorbuilder.PublicPieceInfo{ - Size: piece, - CommP: commP, - } - lk.Unlock() - }(i) - } - wg.Wait() - - if err != nil { - return [32]byte{}, err - } - - return sectorbuilder.GenerateDataCommitment(m.sb.SectorSize(), out) -} - func (m *Sealing) pledgeReader(size uint64, parts uint64) io.Reader { piece := sectorbuilder.UserBytesForSectorSize((size/127 + size) / parts) diff --git a/utils.go b/utils.go index 8fa887d3c..b235a8a83 100644 --- a/utils.go +++ b/utils.go @@ -1,7 +1,12 @@ package sealing import ( + "io" "math/bits" + "math/rand" + "sync" + + "github.com/hashicorp/go-multierror" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" ) @@ -42,6 +47,41 @@ func fillersFromRem(toFill uint64) ([]uint64, error) { return out, nil } +func (m *Sealing) fastPledgeCommitment(size uint64, parts uint64) (commP [sectorbuilder.CommLen]byte, err error) { + parts = 1 << bits.Len64(parts) // round down to nearest power of 2 + + piece := sectorbuilder.UserBytesForSectorSize(size / parts) + out := make([]sectorbuilder.PublicPieceInfo, parts) + var lk sync.Mutex + + var wg sync.WaitGroup + wg.Add(int(parts)) + for i := uint64(0); i < parts; i++ { + go func(i uint64) { + defer wg.Done() + + commP, perr := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42+int64(i))), int64(piece)), piece) + + lk.Lock() + if perr != nil { + err = multierror.Append(err, perr) + } + out[i] = sectorbuilder.PublicPieceInfo{ + Size: piece, + CommP: commP, + } + lk.Unlock() + }(i) + } + wg.Wait() + + if err != nil { + return [32]byte{}, err + } + + return sectorbuilder.GenerateDataCommitment(m.sb.SectorSize(), out) +} + func (m *Sealing) ListSectors() ([]SectorInfo, error) { var sectors []SectorInfo if err := m.sectors.List(§ors); err != nil { diff --git a/utils_test.go b/utils_test.go index 02746a3d8..14d512a52 100644 --- a/utils_test.go +++ b/utils_test.go @@ -1,6 +1,7 @@ package sealing import ( + "github.com/filecoin-project/lotus/storage/sbmock" "testing" "github.com/stretchr/testify/assert" @@ -42,5 +43,13 @@ func TestFillersFromRem(t *testing.T) { ub = sectorbuilder.UserBytesForSectorSize(uint64(9) << i) testFill(t, ub, []uint64{ub1, ub4}) } - +} + +func TestFastPledge(t *testing.T) { + sz := uint64(16 << 20) + + s := Sealing{sb: sbmock.NewMockSectorBuilder(0, sz)} + if _, err := s.fastPledgeCommitment(sz, 5); err != nil { + t.Fatalf("%+v", err) + } }