Merge pull request #5320 from filecoin-project/feat/multicore-addpiece

Multicore AddPiece CommP
This commit is contained in:
Łukasz Magiera 2021-01-12 10:25:55 +01:00 committed by GitHub
commit 56277756a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 142 additions and 11 deletions

View File

@ -45,6 +45,10 @@ func (sb *Sealer) NewSector(ctx context.Context, sector storage.SectorRef) error
}
func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) {
// TODO: allow tuning those:
chunk := abi.PaddedPieceSize(4 << 20)
parallel := runtime.NumCPU()
var offset abi.UnpaddedPieceSize
for _, size := range existingPieceSizes {
offset += size
@ -108,10 +112,16 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existi
pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), pw)
chunk := abi.PaddedPieceSize(4 << 20)
throttle := make(chan []byte, parallel)
piecePromises := make([]func() (abi.PieceInfo, error), 0)
buf := make([]byte, chunk.Unpadded())
var pieceCids []abi.PieceInfo
for i := 0; i < parallel; i++ {
if abi.UnpaddedPieceSize(i)*chunk.Unpadded() >= pieceSize {
break // won't use this many buffers
}
throttle <- make([]byte, chunk.Unpadded())
}
for {
var read int
@ -132,13 +142,39 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existi
break
}
c, err := sb.pieceCid(sector.ProofType, buf[:read])
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("pieceCid error: %w", err)
done := make(chan struct {
cid.Cid
error
}, 1)
pbuf := <-throttle
copy(pbuf, buf[:read])
go func(read int) {
defer func() {
throttle <- pbuf
}()
c, err := sb.pieceCid(sector.ProofType, pbuf[:read])
done <- struct {
cid.Cid
error
}{c, err}
}(read)
piecePromises = append(piecePromises, func() (abi.PieceInfo, error) {
select {
case e := <-done:
if e.error != nil {
return abi.PieceInfo{}, e.error
}
pieceCids = append(pieceCids, abi.PieceInfo{
return abi.PieceInfo{
Size: abi.UnpaddedPieceSize(len(buf[:read])).Padded(),
PieceCID: c,
PieceCID: e.Cid,
}, nil
case <-ctx.Done():
return abi.PieceInfo{}, ctx.Err()
}
})
}
@ -155,8 +191,16 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existi
}
stagedFile = nil
if len(pieceCids) == 1 {
return pieceCids[0], nil
if len(piecePromises) == 1 {
return piecePromises[0]()
}
pieceCids := make([]abi.PieceInfo, len(piecePromises))
for i, promise := range piecePromises {
pieceCids[i], err = promise()
if err != nil {
return abi.PieceInfo{}, err
}
}
pieceCID, err := ffi.GenerateUnsealedCID(sector.ProofType, pieceCids)

View File

@ -33,6 +33,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper/basicfs"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader"
)
func init() {
@ -622,3 +623,89 @@ func TestGenerateUnsealedCID(t *testing.T) {
[][]byte{barr(1, 16), barr(0, 16), barr(2, 8), barr(3, 16), barr(0, 16), barr(0, 8), barr(4, 4), barr(5, 16), barr(0, 16), barr(0, 8)},
)
}
func TestAddPiece512M(t *testing.T) {
sz := abi.PaddedPieceSize(512 << 20).Unpadded()
cdir, err := ioutil.TempDir("", "sbtest-c-")
if err != nil {
t.Fatal(err)
}
miner := abi.ActorID(123)
sp := &basicfs.Provider{
Root: cdir,
}
sb, err := New(sp)
if err != nil {
t.Fatalf("%+v", err)
}
cleanup := func() {
if t.Failed() {
fmt.Printf("not removing %s\n", cdir)
return
}
if err := os.RemoveAll(cdir); err != nil {
t.Error(err)
}
}
t.Cleanup(cleanup)
r := rand.New(rand.NewSource(0x7e5))
c, err := sb.AddPiece(context.TODO(), storage.SectorRef{
ID: abi.SectorID{
Miner: miner,
Number: 0,
},
ProofType: abi.RegisteredSealProof_StackedDrg512MiBV1_1,
}, nil, sz, io.LimitReader(r, int64(sz)))
if err != nil {
t.Fatal(err)
}
require.Equal(t, "baga6ea4seaqhyticusemlcrjhvulpfng4nint6bu3wpe5s3x4bnuj2rs47hfacy", c.PieceCID.String())
}
func BenchmarkAddPiece512M(b *testing.B) {
sz := abi.PaddedPieceSize(512 << 20).Unpadded()
b.SetBytes(int64(sz))
cdir, err := ioutil.TempDir("", "sbtest-c-")
if err != nil {
b.Fatal(err)
}
miner := abi.ActorID(123)
sp := &basicfs.Provider{
Root: cdir,
}
sb, err := New(sp)
if err != nil {
b.Fatalf("%+v", err)
}
cleanup := func() {
if b.Failed() {
fmt.Printf("not removing %s\n", cdir)
return
}
if err := os.RemoveAll(cdir); err != nil {
b.Error(err)
}
}
b.Cleanup(cleanup)
for i := 0; i < b.N; i++ {
c, err := sb.AddPiece(context.TODO(), storage.SectorRef{
ID: abi.SectorID{
Miner: miner,
Number: abi.SectorNumber(i),
},
ProofType: abi.RegisteredSealProof_StackedDrg512MiBV1_1,
}, nil, sz, io.LimitReader(&nullreader.Reader{}, int64(sz)))
if err != nil {
b.Fatal(err)
}
fmt.Println(c)
}
}