Multicore AddPiece CommP

This commit is contained in:
Łukasz Magiera 2021-01-10 22:54:05 +01:00
parent 7d0296c409
commit 9e74c3c4e8
2 changed files with 136 additions and 11 deletions

View File

@ -45,6 +45,10 @@ func (sb *Sealer) NewSector(ctx context.Context, sector storage.SectorRef) error
}
func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) {
// TODO: allow tuning those:
chunk := abi.PaddedPieceSize(4 << 20)
parallel := runtime.NumCPU()
var offset abi.UnpaddedPieceSize
for _, size := range existingPieceSizes {
offset += size
@ -108,10 +112,16 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existi
pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), pw)
chunk := abi.PaddedPieceSize(4 << 20)
throttle := make(chan []byte, parallel)
piecePromises := make([]func() (abi.PieceInfo, error), 0)
buf := make([]byte, chunk.Unpadded())
var pieceCids []abi.PieceInfo
for i := 0; i < parallel; i++ {
if abi.UnpaddedPieceSize(i)*chunk.Unpadded() >= pieceSize {
break // won't use this many buffers
}
throttle <- make([]byte, chunk.Unpadded())
}
for {
var read int
@ -132,13 +142,33 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existi
break
}
c, err := sb.pieceCid(sector.ProofType, buf[:read])
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("pieceCid error: %w", err)
}
pieceCids = append(pieceCids, abi.PieceInfo{
Size: abi.UnpaddedPieceSize(len(buf[:read])).Padded(),
PieceCID: c,
done := make(chan struct{cid.Cid; error}, 1)
pbuf := <-throttle
copy(pbuf, buf[:read])
go func(read int) {
defer func() {
throttle <- pbuf
}()
c, err := sb.pieceCid(sector.ProofType, pbuf[:read])
done <- struct {cid.Cid; error }{c, err}
}(read)
piecePromises = append(piecePromises, func() (abi.PieceInfo, error) {
select {
case e := <-done:
if e.error != nil {
return abi.PieceInfo{}, e.error
}
return abi.PieceInfo{
Size: abi.UnpaddedPieceSize(len(buf[:read])).Padded(),
PieceCID: e.Cid,
}, nil
case <-ctx.Done():
return abi.PieceInfo{}, ctx.Err()
}
})
}
@ -155,8 +185,16 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existi
}
stagedFile = nil
if len(pieceCids) == 1 {
return pieceCids[0], nil
if len(piecePromises) == 1 {
return piecePromises[0]()
}
pieceCids := make([]abi.PieceInfo, len(piecePromises))
for i, promise := range piecePromises {
pieceCids[i], err = promise()
if err != nil {
return abi.PieceInfo{}, err
}
}
pieceCID, err := ffi.GenerateUnsealedCID(sector.ProofType, pieceCids)

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader"
"io"
"io/ioutil"
"math/rand"
@ -622,3 +623,89 @@ func TestGenerateUnsealedCID(t *testing.T) {
[][]byte{barr(1, 16), barr(0, 16), barr(2, 8), barr(3, 16), barr(0, 16), barr(0, 8), barr(4, 4), barr(5, 16), barr(0, 16), barr(0, 8)},
)
}
func TestAddPiece512M(t *testing.T) {
sz := abi.PaddedPieceSize(512 << 20).Unpadded()
cdir, err := ioutil.TempDir("", "sbtest-c-")
if err != nil {
t.Fatal(err)
}
miner := abi.ActorID(123)
sp := &basicfs.Provider{
Root: cdir,
}
sb, err := New(sp)
if err != nil {
t.Fatalf("%+v", err)
}
cleanup := func() {
if t.Failed() {
fmt.Printf("not removing %s\n", cdir)
return
}
if err := os.RemoveAll(cdir); err != nil {
t.Error(err)
}
}
t.Cleanup(cleanup)
r := rand.New(rand.NewSource(0x7e5))
c, err := sb.AddPiece(context.TODO(), storage.SectorRef{
ID: abi.SectorID{
Miner: miner,
Number: 0,
},
ProofType: abi.RegisteredSealProof_StackedDrg512MiBV1_1,
}, nil, sz, io.LimitReader(r, int64(sz)))
if err != nil {
t.Fatal(err)
}
require.Equal(t, "baga6ea4seaqhyticusemlcrjhvulpfng4nint6bu3wpe5s3x4bnuj2rs47hfacy", c.PieceCID.String())
}
func BenchmarkAddPiece512M(b *testing.B) {
sz := abi.PaddedPieceSize(512 << 20).Unpadded()
b.SetBytes(int64(sz))
cdir, err := ioutil.TempDir("", "sbtest-c-")
if err != nil {
b.Fatal(err)
}
miner := abi.ActorID(123)
sp := &basicfs.Provider{
Root: cdir,
}
sb, err := New(sp)
if err != nil {
b.Fatalf("%+v", err)
}
cleanup := func() {
if b.Failed() {
fmt.Printf("not removing %s\n", cdir)
return
}
if err := os.RemoveAll(cdir); err != nil {
b.Error(err)
}
}
b.Cleanup(cleanup)
for i := 0; i < b.N; i++ {
c, err := sb.AddPiece(context.TODO(), storage.SectorRef{
ID: abi.SectorID{
Miner: miner,
Number: abi.SectorNumber(i),
},
ProofType: abi.RegisteredSealProof_StackedDrg512MiBV1_1,
}, nil, sz, io.LimitReader(&nullreader.Reader{}, int64(sz)))
if err != nil {
b.Fatal(err)
}
fmt.Println(c)
}
}