Merge pull request #33 from filecoin-project/feat/sched-sequential-ids

sched: Take sector numbers into accout when scheduling work
This commit is contained in:
Łukasz Magiera 2020-05-18 15:31:36 +02:00 committed by GitHub
commit a9631587f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 14 additions and 8 deletions

View File

@ -205,7 +205,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
} }
var out abi.PieceInfo var out abi.PieceInfo
err = m.sched.Schedule(ctx, sealtasks.TTAddPiece, selector, schedNop, func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTAddPiece, selector, schedNop, func(ctx context.Context, w Worker) error {
p, err := w.AddPiece(ctx, sector, existingPieces, sz, r) p, err := w.AddPiece(ctx, sector, existingPieces, sz, r)
if err != nil { if err != nil {
return err return err
@ -225,7 +225,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
return nil, xerrors.Errorf("creating path selector: %w", err) return nil, xerrors.Errorf("creating path selector: %w", err)
} }
err = m.sched.Schedule(ctx, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, true), func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, true), func(ctx context.Context, w Worker) error {
p, err := w.SealPreCommit1(ctx, sector, ticket, pieces) p, err := w.SealPreCommit1(ctx, sector, ticket, pieces)
if err != nil { if err != nil {
return err return err
@ -243,7 +243,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
return storage.SectorCids{}, xerrors.Errorf("creating path selector: %w", err) return storage.SectorCids{}, xerrors.Errorf("creating path selector: %w", err)
} }
err = m.sched.Schedule(ctx, sealtasks.TTPreCommit2, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true), func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true), func(ctx context.Context, w Worker) error {
p, err := w.SealPreCommit2(ctx, sector, phase1Out) p, err := w.SealPreCommit2(ctx, sector, phase1Out)
if err != nil { if err != nil {
return err return err
@ -264,7 +264,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
// (except, don't.. for now at least - we are using this step to bring data // (except, don't.. for now at least - we are using this step to bring data
// into 'provable' storage. Optimally we'd do that in commit2, in parallel // into 'provable' storage. Optimally we'd do that in commit2, in parallel
// with snark compute) // with snark compute)
err = m.sched.Schedule(ctx, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true), func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true), func(ctx context.Context, w Worker) error {
p, err := w.SealCommit1(ctx, sector, ticket, seed, pieces, cids) p, err := w.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
if err != nil { if err != nil {
return err return err
@ -278,7 +278,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (out storage.Proof, err error) { func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (out storage.Proof, err error) {
selector := newTaskSelector() selector := newTaskSelector()
err = m.sched.Schedule(ctx, sealtasks.TTCommit2, selector, schedNop, func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit2, selector, schedNop, func(ctx context.Context, w Worker) error {
p, err := w.SealCommit2(ctx, sector, phase1Out) p, err := w.SealCommit2(ctx, sector, phase1Out)
if err != nil { if err != nil {
return err return err
@ -296,7 +296,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error
return xerrors.Errorf("creating path selector: %w", err) return xerrors.Errorf("creating path selector: %w", err)
} }
return m.sched.Schedule(ctx, sealtasks.TTFinalize, selector, return m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
schedFetch(sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, false), schedFetch(sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, false),
func(ctx context.Context, w Worker) error { func(ctx context.Context, w Worker) error {
return w.FinalizeSector(ctx, sector) return w.FinalizeSector(ctx, sector)

View File

@ -7,7 +7,11 @@ type requestQueue []*workerRequest
func (q requestQueue) Len() int { return len(q) } func (q requestQueue) Len() int { return len(q) }
func (q requestQueue) Less(i, j int) bool { func (q requestQueue) Less(i, j int) bool {
return q[i].taskType.Less(q[j].taskType) if q[i].taskType != q[j].taskType {
return q[i].taskType.Less(q[j].taskType)
}
return q[i].sector.Number < q[j].sector.Number // optimize minerActor.NewSectors bitfield
} }
func (q requestQueue) Swap(i, j int) { func (q requestQueue) Swap(i, j int) {

View File

@ -64,11 +64,12 @@ func newScheduler(spt abi.RegisteredProof) *scheduler {
} }
} }
func (sh *scheduler) Schedule(ctx context.Context, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error { func (sh *scheduler) Schedule(ctx context.Context, sector abi.SectorID, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error {
ret := make(chan workerResponse) ret := make(chan workerResponse)
select { select {
case sh.schedule <- &workerRequest{ case sh.schedule <- &workerRequest{
sector: sector,
taskType: taskType, taskType: taskType,
sel: sel, sel: sel,
@ -95,6 +96,7 @@ func (sh *scheduler) Schedule(ctx context.Context, taskType sealtasks.TaskType,
} }
type workerRequest struct { type workerRequest struct {
sector abi.SectorID
taskType sealtasks.TaskType taskType sealtasks.TaskType
sel WorkerSelector sel WorkerSelector