Merge pull request #2047 from filecoin-project/feat/parallel-sealing-bench
allow sealing bench to run parallel seals
This commit is contained in:
commit
cc179c5270
@ -139,6 +139,10 @@ var sealBenchCmd = &cli.Command{
|
||||
Name: "num-sectors",
|
||||
Value: 1,
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "parallel",
|
||||
Value: 1,
|
||||
},
|
||||
},
|
||||
Action: func(c *cli.Context) error {
|
||||
if c.Bool("no-gpu") {
|
||||
@ -235,7 +239,12 @@ var sealBenchCmd = &cli.Command{
|
||||
|
||||
if robench == "" {
|
||||
var err error
|
||||
sealTimings, sealedSectors, err = runSeals(sb, sbfs, c.Int("num-sectors"), mid, sectorSize, []byte(c.String("ticket-preimage")), c.String("save-commit2-input"), c.Bool("skip-commit2"), c.Bool("skip-unseal"))
|
||||
parCfg := ParCfg{
|
||||
PreCommit1: c.Int("parallel"),
|
||||
PreCommit2: 1,
|
||||
Commit: 1,
|
||||
}
|
||||
sealTimings, sealedSectors, err = runSeals(sb, sbfs, c.Int("num-sectors"), parCfg, mid, sectorSize, []byte(c.String("ticket-preimage")), c.String("save-commit2-input"), c.Bool("skip-commit2"), c.Bool("skip-unseal"))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to run seals: %w", err)
|
||||
}
|
||||
@ -447,9 +456,23 @@ var sealBenchCmd = &cli.Command{
|
||||
},
|
||||
}
|
||||
|
||||
func runSeals(sb *ffiwrapper.Sealer, sbfs *basicfs.Provider, numSectors int, mid abi.ActorID, sectorSize abi.SectorSize, ticketPreimage []byte, saveC2inp string, skipc2, skipunseal bool) ([]SealingResult, []abi.SectorInfo, error) {
|
||||
var sealTimings []SealingResult
|
||||
var sealedSectors []abi.SectorInfo
|
||||
type ParCfg struct {
|
||||
PreCommit1 int
|
||||
PreCommit2 int
|
||||
Commit int
|
||||
}
|
||||
|
||||
func runSeals(sb *ffiwrapper.Sealer, sbfs *basicfs.Provider, numSectors int, par ParCfg, mid abi.ActorID, sectorSize abi.SectorSize, ticketPreimage []byte, saveC2inp string, skipc2, skipunseal bool) ([]SealingResult, []abi.SectorInfo, error) {
|
||||
var pieces []abi.PieceInfo
|
||||
sealTimings := make([]SealingResult, numSectors)
|
||||
sealedSectors := make([]abi.SectorInfo, numSectors)
|
||||
|
||||
preCommit2Sema := make(chan struct{}, par.PreCommit2)
|
||||
commitSema := make(chan struct{}, par.Commit)
|
||||
|
||||
if numSectors%par.PreCommit1 != 0 {
|
||||
return nil, nil, fmt.Errorf("parallelism factor must cleanly divide numSectors")
|
||||
}
|
||||
|
||||
for i := abi.SectorNumber(1); i <= abi.SectorNumber(numSectors); i++ {
|
||||
sid := abi.SectorID{
|
||||
@ -458,7 +481,7 @@ func runSeals(sb *ffiwrapper.Sealer, sbfs *basicfs.Provider, numSectors int, mid
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
log.Info("Writing piece into sector...")
|
||||
log.Infof("[%d] Writing piece into sector...", i)
|
||||
|
||||
r := rand.New(rand.NewSource(100 + int64(i)))
|
||||
|
||||
@ -467,129 +490,168 @@ func runSeals(sb *ffiwrapper.Sealer, sbfs *basicfs.Provider, numSectors int, mid
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
addpiece := time.Now()
|
||||
pieces = append(pieces, pi)
|
||||
|
||||
trand := blake2b.Sum256(ticketPreimage)
|
||||
ticket := abi.SealRandomness(trand[:])
|
||||
sealTimings[i-1].AddPiece = time.Since(start)
|
||||
}
|
||||
|
||||
log.Info("Running replication(1)...")
|
||||
pieces := []abi.PieceInfo{pi}
|
||||
pc1o, err := sb.SealPreCommit1(context.TODO(), sid, ticket, pieces)
|
||||
if err != nil {
|
||||
return nil, nil, xerrors.Errorf("commit: %w", err)
|
||||
}
|
||||
sectorsPerWorker := numSectors / par.PreCommit1
|
||||
|
||||
precommit1 := time.Now()
|
||||
errs := make(chan error, par.PreCommit1)
|
||||
for wid := 0; wid < par.PreCommit1; wid++ {
|
||||
go func(worker int) {
|
||||
sealerr := func() error {
|
||||
start := 1 + (worker * sectorsPerWorker)
|
||||
end := start + sectorsPerWorker
|
||||
for i := abi.SectorNumber(start); i < abi.SectorNumber(end); i++ {
|
||||
ix := int(i - 1)
|
||||
sid := abi.SectorID{
|
||||
Miner: mid,
|
||||
Number: i,
|
||||
}
|
||||
|
||||
log.Info("Running replication(2)...")
|
||||
cids, err := sb.SealPreCommit2(context.TODO(), sid, pc1o)
|
||||
if err != nil {
|
||||
return nil, nil, xerrors.Errorf("commit: %w", err)
|
||||
}
|
||||
start := time.Now()
|
||||
|
||||
precommit2 := time.Now()
|
||||
trand := blake2b.Sum256(ticketPreimage)
|
||||
ticket := abi.SealRandomness(trand[:])
|
||||
|
||||
sealedSectors = append(sealedSectors, abi.SectorInfo{
|
||||
SealProof: sb.SealProofType(),
|
||||
SectorNumber: i,
|
||||
SealedCID: cids.Sealed,
|
||||
})
|
||||
log.Infof("[%d] Running replication(1)...", i)
|
||||
pieces := []abi.PieceInfo{pieces[ix]}
|
||||
pc1o, err := sb.SealPreCommit1(context.TODO(), sid, ticket, pieces)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("commit: %w", err)
|
||||
}
|
||||
|
||||
seed := lapi.SealSeed{
|
||||
Epoch: 101,
|
||||
Value: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 255},
|
||||
}
|
||||
precommit1 := time.Now()
|
||||
|
||||
log.Info("Generating PoRep for sector (1)")
|
||||
c1o, err := sb.SealCommit1(context.TODO(), sid, ticket, seed.Value, pieces, cids)
|
||||
preCommit2Sema <- struct{}{}
|
||||
pc2Start := time.Now()
|
||||
log.Infof("[%d] Running replication(2)...", i)
|
||||
cids, err := sb.SealPreCommit2(context.TODO(), sid, pc1o)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("commit: %w", err)
|
||||
}
|
||||
|
||||
precommit2 := time.Now()
|
||||
<-preCommit2Sema
|
||||
|
||||
sealedSectors[ix] = abi.SectorInfo{
|
||||
SealProof: sb.SealProofType(),
|
||||
SectorNumber: i,
|
||||
SealedCID: cids.Sealed,
|
||||
}
|
||||
|
||||
seed := lapi.SealSeed{
|
||||
Epoch: 101,
|
||||
Value: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 255},
|
||||
}
|
||||
|
||||
commitSema <- struct{}{}
|
||||
commitStart := time.Now()
|
||||
log.Infof("[%d] Generating PoRep for sector (1)", i)
|
||||
c1o, err := sb.SealCommit1(context.TODO(), sid, ticket, seed.Value, pieces, cids)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sealcommit1 := time.Now()
|
||||
|
||||
log.Infof("[%d] Generating PoRep for sector (2)", i)
|
||||
|
||||
if saveC2inp != "" {
|
||||
c2in := Commit2In{
|
||||
SectorNum: int64(i),
|
||||
Phase1Out: c1o,
|
||||
SectorSize: uint64(sectorSize),
|
||||
}
|
||||
|
||||
b, err := json.Marshal(&c2in)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := ioutil.WriteFile(saveC2inp, b, 0664); err != nil {
|
||||
log.Warnf("%+v", err)
|
||||
}
|
||||
}
|
||||
|
||||
var proof storage.Proof
|
||||
if !skipc2 {
|
||||
proof, err = sb.SealCommit2(context.TODO(), sid, c1o)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
sealcommit2 := time.Now()
|
||||
<-commitSema
|
||||
|
||||
if !skipc2 {
|
||||
svi := abi.SealVerifyInfo{
|
||||
SectorID: abi.SectorID{Miner: mid, Number: i},
|
||||
SealedCID: cids.Sealed,
|
||||
SealProof: sb.SealProofType(),
|
||||
Proof: proof,
|
||||
DealIDs: nil,
|
||||
Randomness: ticket,
|
||||
InteractiveRandomness: seed.Value,
|
||||
UnsealedCID: cids.Unsealed,
|
||||
}
|
||||
|
||||
ok, err := ffiwrapper.ProofVerifier.VerifySeal(svi)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
return xerrors.Errorf("porep proof for sector %d was invalid", i)
|
||||
}
|
||||
}
|
||||
|
||||
verifySeal := time.Now()
|
||||
|
||||
if !skipunseal {
|
||||
log.Infof("[%d] Unsealing sector", i)
|
||||
{
|
||||
p, done, err := sbfs.AcquireSector(context.TODO(), abi.SectorID{Miner: mid, Number: 1}, stores.FTUnsealed, stores.FTNone, true)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquire unsealed sector for removing: %w", err)
|
||||
}
|
||||
done()
|
||||
|
||||
if err := os.Remove(p.Unsealed); err != nil {
|
||||
return xerrors.Errorf("removing unsealed sector: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
err := sb.UnsealPiece(context.TODO(), abi.SectorID{Miner: mid, Number: 1}, 0, abi.PaddedPieceSize(sectorSize).Unpadded(), ticket, cids.Unsealed)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
unseal := time.Now()
|
||||
|
||||
sealTimings[ix].PreCommit1 = precommit1.Sub(start)
|
||||
sealTimings[ix].PreCommit2 = precommit2.Sub(pc2Start)
|
||||
sealTimings[ix].Commit1 = sealcommit1.Sub(commitStart)
|
||||
sealTimings[ix].Commit2 = sealcommit2.Sub(sealcommit1)
|
||||
sealTimings[ix].Verify = verifySeal.Sub(sealcommit2)
|
||||
sealTimings[ix].Unseal = unseal.Sub(verifySeal)
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
if sealerr != nil {
|
||||
errs <- sealerr
|
||||
return
|
||||
}
|
||||
errs <- nil
|
||||
}(wid)
|
||||
}
|
||||
|
||||
for i := 0; i < par.PreCommit1; i++ {
|
||||
err := <-errs
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
sealcommit1 := time.Now()
|
||||
|
||||
log.Info("Generating PoRep for sector (2)")
|
||||
|
||||
if saveC2inp != "" {
|
||||
c2in := Commit2In{
|
||||
SectorNum: int64(i),
|
||||
Phase1Out: c1o,
|
||||
SectorSize: uint64(sectorSize),
|
||||
}
|
||||
|
||||
b, err := json.Marshal(&c2in)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if err := ioutil.WriteFile(saveC2inp, b, 0664); err != nil {
|
||||
log.Warnf("%+v", err)
|
||||
}
|
||||
}
|
||||
|
||||
var proof storage.Proof
|
||||
if !skipc2 {
|
||||
proof, err = sb.SealCommit2(context.TODO(), sid, c1o)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
sealcommit2 := time.Now()
|
||||
|
||||
if !skipc2 {
|
||||
svi := abi.SealVerifyInfo{
|
||||
SectorID: abi.SectorID{Miner: mid, Number: i},
|
||||
SealedCID: cids.Sealed,
|
||||
SealProof: sb.SealProofType(),
|
||||
Proof: proof,
|
||||
DealIDs: nil,
|
||||
Randomness: ticket,
|
||||
InteractiveRandomness: seed.Value,
|
||||
UnsealedCID: cids.Unsealed,
|
||||
}
|
||||
|
||||
ok, err := ffiwrapper.ProofVerifier.VerifySeal(svi)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if !ok {
|
||||
return nil, nil, xerrors.Errorf("porep proof for sector %d was invalid", i)
|
||||
}
|
||||
}
|
||||
|
||||
verifySeal := time.Now()
|
||||
|
||||
if !skipunseal {
|
||||
log.Info("Unsealing sector")
|
||||
{
|
||||
p, done, err := sbfs.AcquireSector(context.TODO(), abi.SectorID{Miner: mid, Number: 1}, stores.FTUnsealed, stores.FTNone, true)
|
||||
if err != nil {
|
||||
return nil, nil, xerrors.Errorf("acquire unsealed sector for removing: %w", err)
|
||||
}
|
||||
done()
|
||||
|
||||
if err := os.Remove(p.Unsealed); err != nil {
|
||||
return nil, nil, xerrors.Errorf("removing unsealed sector: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
err := sb.UnsealPiece(context.TODO(), abi.SectorID{Miner: mid, Number: 1}, 0, abi.PaddedPieceSize(sectorSize).Unpadded(), ticket, cids.Unsealed)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
unseal := time.Now()
|
||||
|
||||
sealTimings = append(sealTimings, SealingResult{
|
||||
AddPiece: addpiece.Sub(start),
|
||||
PreCommit1: precommit1.Sub(addpiece),
|
||||
PreCommit2: precommit2.Sub(precommit1),
|
||||
Commit1: sealcommit1.Sub(precommit2),
|
||||
Commit2: sealcommit2.Sub(sealcommit1),
|
||||
Verify: verifySeal.Sub(sealcommit2),
|
||||
Unseal: unseal.Sub(verifySeal),
|
||||
})
|
||||
}
|
||||
|
||||
return sealTimings, sealedSectors, nil
|
||||
|
1
go.sum
1
go.sum
@ -257,7 +257,6 @@ github.com/filecoin-project/sector-storage v0.0.0-20200615192001-42c9e08595b7 h1
|
||||
github.com/filecoin-project/sector-storage v0.0.0-20200615192001-42c9e08595b7/go.mod h1:M59QnAeA/oV+Z8oHFLoNpGMv0LZ8Rll+vHVXX7GirPM=
|
||||
github.com/filecoin-project/specs-actors v0.0.0-20200210130641-2d1fbd8672cf/go.mod h1:xtDZUB6pe4Pksa/bAJbJ693OilaC5Wbot9jMhLm3cZA=
|
||||
github.com/filecoin-project/specs-actors v0.3.0/go.mod h1:nQYnFbQ7Y0bHZyq6HDEuVlCPR+U3z5Q3wMOQ+2aiV+Y=
|
||||
github.com/filecoin-project/specs-actors v0.6.0 h1:IepUsmDGY60QliENVTkBTAkwqGWw9kNbbHOcU/9oiC0=
|
||||
github.com/filecoin-project/specs-actors v0.6.0/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY=
|
||||
github.com/filecoin-project/specs-actors v0.6.2-0.20200617175406-de392ca14121 h1:oRA+b4iN4H86xXDXbU3TOyvmBZp7//c5VqTc0oJ6nLg=
|
||||
github.com/filecoin-project/specs-actors v0.6.2-0.20200617175406-de392ca14121/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY=
|
||||
|
Loading…
Reference in New Issue
Block a user