sealing: Parallel CommP calc in pledge sector

This commit is contained in:
Łukasz Magiera 2020-01-25 12:15:28 +01:00
parent 248a362c3e
commit f6b12a33a2

View File

@ -6,14 +6,61 @@ import (
"io" "io"
"math" "math"
"math/rand" "math/rand"
"runtime"
"sync"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder" sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
func (m *Sealing) fastPledgeCommitment(size uint64, parts uint64) (commP [sectorbuilder.CommLen]byte, err error) {
piece := sectorbuilder.UserBytesForSectorSize((size / 127 + size) / parts)
out := make([]sectorbuilder.PublicPieceInfo, parts)
var lk sync.Mutex
var wg sync.WaitGroup
wg.Add(int(parts))
for i := uint64(0); i < parts; i++ {
go func(i uint64) {
defer wg.Done()
commP, perr := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42 + int64(i))), int64(piece)), piece)
lk.Lock()
if perr != nil {
err = multierror.Append(err, perr)
}
out[i] = sectorbuilder.PublicPieceInfo{
Size: piece,
CommP: commP,
}
lk.Unlock()
}(i)
}
wg.Wait()
if err != nil {
return [32]byte{}, err
}
return sectorbuilder.GenerateDataCommitment(m.sb.SectorSize(), out)
}
func (m *Sealing) pledgeReader(size uint64, parts uint64) io.Reader {
piece := sectorbuilder.UserBytesForSectorSize((size / 127 + size) / parts)
readers := make([]io.Reader, parts)
for i := range readers {
readers[i] = io.LimitReader(rand.New(rand.NewSource(42 + int64(i))), int64(piece))
}
return io.MultiReader(readers...)
}
func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPieceSizes []uint64, sizes ...uint64) ([]Piece, error) { func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPieceSizes []uint64, sizes ...uint64) ([]Piece, error) {
if len(sizes) == 0 { if len(sizes) == 0 {
return nil, nil return nil, nil
@ -21,10 +68,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie
deals := make([]actors.StorageDealProposal, len(sizes)) deals := make([]actors.StorageDealProposal, len(sizes))
for i, size := range sizes { for i, size := range sizes {
release := m.sb.RateLimit() commP, err := m.fastPledgeCommitment(size, uint64(runtime.NumCPU()))
commP, err := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), size)
release()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -81,7 +125,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie
out := make([]Piece, len(sizes)) out := make([]Piece, len(sizes))
for i, size := range sizes { for i, size := range sizes {
ppi, err := m.sb.AddPiece(size, sectorID, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), existingPieceSizes) ppi, err := m.sb.AddPiece(size, sectorID, m.pledgeReader(size, uint64(runtime.NumCPU())), existingPieceSizes)
if err != nil { if err != nil {
return nil, err return nil, err
} }