sched: Strong preferrence in WorkerSelector

This commit is contained in:
Łukasz Magiera 2022-05-23 22:56:11 +02:00
parent 3de34ea3c0
commit b576008e87
4 changed files with 34 additions and 19 deletions

View File

@ -44,7 +44,9 @@ const mib = 1 << 20
type WorkerAction func(ctx context.Context, w Worker) error type WorkerAction func(ctx context.Context, w Worker) error
type WorkerSelector interface { type WorkerSelector interface {
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, error) // true if worker is acceptable for performing a task // Ok is true if worker is acceptable for performing a task.
// If any worker is preferred for a task, other workers won't be considered for that task.
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (ok, preferred bool, err error)
Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) // true if a is preferred over b Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) // true if a is preferred over b
} }

View File

@ -61,8 +61,10 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
}() }()
task := (*sh.SchedQueue)[sqi] task := (*sh.SchedQueue)[sqi]
task.IndexHeap = sqi task.IndexHeap = sqi
var havePreferred bool
for wnd, windowRequest := range sh.OpenWindows { for wnd, windowRequest := range sh.OpenWindows {
worker, ok := sh.Workers[windowRequest.Worker] worker, ok := sh.Workers[windowRequest.Worker]
if !ok { if !ok {
@ -84,7 +86,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
} }
rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout) rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout)
ok, err := task.Sel.Ok(rpcCtx, task.TaskType, task.Sector.ProofType, worker) ok, preferred, err := task.Sel.Ok(rpcCtx, task.TaskType, task.Sector.ProofType, worker)
cancel() cancel()
if err != nil { if err != nil {
log.Errorf("trySched(1) req.Sel.Ok error: %+v", err) log.Errorf("trySched(1) req.Sel.Ok error: %+v", err)
@ -95,6 +97,17 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
continue continue
} }
if havePreferred && !preferred {
// we have a way better worker for this task
continue
}
if preferred && !havePreferred {
// all workers we considered previously are much worse choice
acceptableWindows[sqi] = acceptableWindows[sqi][:0]
havePreferred = true
}
acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd) acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd)
} }

View File

@ -26,18 +26,18 @@ func newAllocSelector(index stores.SectorIndex, alloc storiface.SectorFileType,
} }
} }
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, error) { func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx) tasks, err := whnd.TaskTypes(ctx)
if err != nil { if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err) return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
} }
if _, supported := tasks[task]; !supported { if _, supported := tasks[task]; !supported {
return false, nil return false, false, nil
} }
paths, err := whnd.workerRpc.Paths(ctx) paths, err := whnd.workerRpc.Paths(ctx)
if err != nil { if err != nil {
return false, xerrors.Errorf("getting worker paths: %w", err) return false, false, xerrors.Errorf("getting worker paths: %w", err)
} }
have := map[storiface.ID]struct{}{} have := map[storiface.ID]struct{}{}
@ -47,21 +47,21 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi
ssize, err := spt.SectorSize() ssize, err := spt.SectorSize()
if err != nil { if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err) return false, false, xerrors.Errorf("getting sector size: %w", err)
} }
best, err := s.index.StorageBestAlloc(ctx, s.alloc, ssize, s.ptype) best, err := s.index.StorageBestAlloc(ctx, s.alloc, ssize, s.ptype)
if err != nil { if err != nil {
return false, xerrors.Errorf("finding best alloc storage: %w", err) return false, false, xerrors.Errorf("finding best alloc storage: %w", err)
} }
for _, info := range best { for _, info := range best {
if _, ok := have[info.ID]; ok { if _, ok := have[info.ID]; ok {
return true, nil return true, false, nil
} }
} }
return false, nil return false, false, nil
} }
func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) { func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {

View File

@ -28,18 +28,18 @@ func newExistingSelector(index stores.SectorIndex, sector abi.SectorID, alloc st
} }
} }
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, error) { func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx) tasks, err := whnd.TaskTypes(ctx)
if err != nil { if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err) return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
} }
if _, supported := tasks[task]; !supported { if _, supported := tasks[task]; !supported {
return false, nil return false, false, nil
} }
paths, err := whnd.workerRpc.Paths(ctx) paths, err := whnd.workerRpc.Paths(ctx)
if err != nil { if err != nil {
return false, xerrors.Errorf("getting worker paths: %w", err) return false, false, xerrors.Errorf("getting worker paths: %w", err)
} }
have := map[storiface.ID]struct{}{} have := map[storiface.ID]struct{}{}
@ -49,21 +49,21 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt
ssize, err := spt.SectorSize() ssize, err := spt.SectorSize()
if err != nil { if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err) return false, false, xerrors.Errorf("getting sector size: %w", err)
} }
best, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, ssize, s.allowFetch) best, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, ssize, s.allowFetch)
if err != nil { if err != nil {
return false, xerrors.Errorf("finding best storage: %w", err) return false, false, xerrors.Errorf("finding best storage: %w", err)
} }
for _, info := range best { for _, info := range best {
if _, ok := have[info.ID]; ok { if _, ok := have[info.ID]; ok {
return true, nil return true, false, nil
} }
} }
return false, nil return false, false, nil
} }
func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) { func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {