diff --git a/extern/sector-storage/ffiwrapper/sealer_cgo.go b/extern/sector-storage/ffiwrapper/sealer_cgo.go index 1c8c7ee84..657fac08c 100644 --- a/extern/sector-storage/ffiwrapper/sealer_cgo.go +++ b/extern/sector-storage/ffiwrapper/sealer_cgo.go @@ -45,6 +45,10 @@ func (sb *Sealer) NewSector(ctx context.Context, sector storage.SectorRef) error } func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) { + // TODO: allow tuning those: + chunk := abi.PaddedPieceSize(4 << 20) + parallel := runtime.NumCPU() + var offset abi.UnpaddedPieceSize for _, size := range existingPieceSizes { offset += size @@ -108,10 +112,16 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existi pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), pw) - chunk := abi.PaddedPieceSize(4 << 20) + throttle := make(chan []byte, parallel) + piecePromises := make([]func() (abi.PieceInfo, error), 0) buf := make([]byte, chunk.Unpadded()) - var pieceCids []abi.PieceInfo + for i := 0; i < parallel; i++ { + if abi.UnpaddedPieceSize(i)*chunk.Unpadded() >= pieceSize { + break // won't use this many buffers + } + throttle <- make([]byte, chunk.Unpadded()) + } for { var read int @@ -132,13 +142,33 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existi break } - c, err := sb.pieceCid(sector.ProofType, buf[:read]) - if err != nil { - return abi.PieceInfo{}, xerrors.Errorf("pieceCid error: %w", err) - } - pieceCids = append(pieceCids, abi.PieceInfo{ - Size: abi.UnpaddedPieceSize(len(buf[:read])).Padded(), - PieceCID: c, + done := make(chan struct{cid.Cid; error}, 1) + pbuf := <-throttle + copy(pbuf, buf[:read]) + + go func(read int) { + defer func() { + throttle <- pbuf + }() + + c, err := sb.pieceCid(sector.ProofType, pbuf[:read]) + done <- struct {cid.Cid; error }{c, err} + }(read) + + piecePromises = append(piecePromises, func() (abi.PieceInfo, error) { + select { + case e := <-done: + if e.error != nil { + return abi.PieceInfo{}, e.error + } + + return abi.PieceInfo{ + Size: abi.UnpaddedPieceSize(len(buf[:read])).Padded(), + PieceCID: e.Cid, + }, nil + case <-ctx.Done(): + return abi.PieceInfo{}, ctx.Err() + } }) } @@ -155,8 +185,16 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existi } stagedFile = nil - if len(pieceCids) == 1 { - return pieceCids[0], nil + if len(piecePromises) == 1 { + return piecePromises[0]() + } + + pieceCids := make([]abi.PieceInfo, len(piecePromises)) + for i, promise := range piecePromises { + pieceCids[i], err = promise() + if err != nil { + return abi.PieceInfo{}, err + } } pieceCID, err := ffi.GenerateUnsealedCID(sector.ProofType, pieceCids) diff --git a/extern/sector-storage/ffiwrapper/sealer_test.go b/extern/sector-storage/ffiwrapper/sealer_test.go index 1292a9513..9e4aa0042 100644 --- a/extern/sector-storage/ffiwrapper/sealer_test.go +++ b/extern/sector-storage/ffiwrapper/sealer_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader" "io" "io/ioutil" "math/rand" @@ -622,3 +623,89 @@ func TestGenerateUnsealedCID(t *testing.T) { [][]byte{barr(1, 16), barr(0, 16), barr(2, 8), barr(3, 16), barr(0, 16), barr(0, 8), barr(4, 4), barr(5, 16), barr(0, 16), barr(0, 8)}, ) } + +func TestAddPiece512M(t *testing.T) { + sz := abi.PaddedPieceSize(512 << 20).Unpadded() + + cdir, err := ioutil.TempDir("", "sbtest-c-") + if err != nil { + t.Fatal(err) + } + miner := abi.ActorID(123) + + sp := &basicfs.Provider{ + Root: cdir, + } + sb, err := New(sp) + if err != nil { + t.Fatalf("%+v", err) + } + cleanup := func() { + if t.Failed() { + fmt.Printf("not removing %s\n", cdir) + return + } + if err := os.RemoveAll(cdir); err != nil { + t.Error(err) + } + } + t.Cleanup(cleanup) + + r := rand.New(rand.NewSource(0x7e5)) + + c, err := sb.AddPiece(context.TODO(), storage.SectorRef{ + ID: abi.SectorID{ + Miner: miner, + Number: 0, + }, + ProofType: abi.RegisteredSealProof_StackedDrg512MiBV1_1, + }, nil, sz, io.LimitReader(r, int64(sz))) + if err != nil { + t.Fatal(err) + } + + require.Equal(t, "baga6ea4seaqhyticusemlcrjhvulpfng4nint6bu3wpe5s3x4bnuj2rs47hfacy", c.PieceCID.String()) +} + +func BenchmarkAddPiece512M(b *testing.B) { + sz := abi.PaddedPieceSize(512 << 20).Unpadded() + b.SetBytes(int64(sz)) + + cdir, err := ioutil.TempDir("", "sbtest-c-") + if err != nil { + b.Fatal(err) + } + miner := abi.ActorID(123) + + sp := &basicfs.Provider{ + Root: cdir, + } + sb, err := New(sp) + if err != nil { + b.Fatalf("%+v", err) + } + cleanup := func() { + if b.Failed() { + fmt.Printf("not removing %s\n", cdir) + return + } + if err := os.RemoveAll(cdir); err != nil { + b.Error(err) + } + } + b.Cleanup(cleanup) + + for i := 0; i < b.N; i++ { + c, err := sb.AddPiece(context.TODO(), storage.SectorRef{ + ID: abi.SectorID{ + Miner: miner, + Number: abi.SectorNumber(i), + }, + ProofType: abi.RegisteredSealProof_StackedDrg512MiBV1_1, + }, nil, sz, io.LimitReader(&nullreader.Reader{}, int64(sz))) + if err != nil { + b.Fatal(err) + } + fmt.Println(c) + } +} \ No newline at end of file