sealing: Parallel CommP calc in pledge sector
This commit is contained in:
parent
7b258eddc4
commit
c9c437cb3b
@ -6,14 +6,61 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"runtime"
|
||||||
|
"sync"
|
||||||
|
|
||||||
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
|
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
|
||||||
|
"github.com/hashicorp/go-multierror"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/actors"
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (m *Sealing) fastPledgeCommitment(size uint64, parts uint64) (commP [sectorbuilder.CommLen]byte, err error) {
|
||||||
|
piece := sectorbuilder.UserBytesForSectorSize((size / 127 + size) / parts)
|
||||||
|
out := make([]sectorbuilder.PublicPieceInfo, parts)
|
||||||
|
var lk sync.Mutex
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(int(parts))
|
||||||
|
for i := uint64(0); i < parts; i++ {
|
||||||
|
go func(i uint64) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
commP, perr := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42 + int64(i))), int64(piece)), piece)
|
||||||
|
|
||||||
|
lk.Lock()
|
||||||
|
if perr != nil {
|
||||||
|
err = multierror.Append(err, perr)
|
||||||
|
}
|
||||||
|
out[i] = sectorbuilder.PublicPieceInfo{
|
||||||
|
Size: piece,
|
||||||
|
CommP: commP,
|
||||||
|
}
|
||||||
|
lk.Unlock()
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return [32]byte{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return sectorbuilder.GenerateDataCommitment(m.sb.SectorSize(), out)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) pledgeReader(size uint64, parts uint64) io.Reader {
|
||||||
|
piece := sectorbuilder.UserBytesForSectorSize((size / 127 + size) / parts)
|
||||||
|
|
||||||
|
readers := make([]io.Reader, parts)
|
||||||
|
for i := range readers {
|
||||||
|
readers[i] = io.LimitReader(rand.New(rand.NewSource(42 + int64(i))), int64(piece))
|
||||||
|
}
|
||||||
|
|
||||||
|
return io.MultiReader(readers...)
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPieceSizes []uint64, sizes ...uint64) ([]Piece, error) {
|
func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPieceSizes []uint64, sizes ...uint64) ([]Piece, error) {
|
||||||
if len(sizes) == 0 {
|
if len(sizes) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
@ -21,10 +68,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie
|
|||||||
|
|
||||||
deals := make([]actors.StorageDealProposal, len(sizes))
|
deals := make([]actors.StorageDealProposal, len(sizes))
|
||||||
for i, size := range sizes {
|
for i, size := range sizes {
|
||||||
release := m.sb.RateLimit()
|
commP, err := m.fastPledgeCommitment(size, uint64(runtime.NumCPU()))
|
||||||
commP, err := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), size)
|
|
||||||
release()
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -81,7 +125,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie
|
|||||||
out := make([]Piece, len(sizes))
|
out := make([]Piece, len(sizes))
|
||||||
|
|
||||||
for i, size := range sizes {
|
for i, size := range sizes {
|
||||||
ppi, err := m.sb.AddPiece(size, sectorID, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), existingPieceSizes)
|
ppi, err := m.sb.AddPiece(size, sectorID, m.pledgeReader(size, uint64(runtime.NumCPU())), existingPieceSizes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user