From b576008e87236812ff257140fd9b5e511ce84c94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 23 May 2022 22:56:11 +0200 Subject: [PATCH] sched: Strong preferrence in WorkerSelector --- extern/sector-storage/sched.go | 4 +++- extern/sector-storage/sched_assigner_common.go | 17 +++++++++++++++-- extern/sector-storage/selector_alloc.go | 16 ++++++++-------- extern/sector-storage/selector_existing.go | 16 ++++++++-------- 4 files changed, 34 insertions(+), 19 deletions(-) diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index 53b6415ff..c47ae5fd9 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -44,7 +44,9 @@ const mib = 1 << 20 type WorkerAction func(ctx context.Context, w Worker) error type WorkerSelector interface { - Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, error) // true if worker is acceptable for performing a task + // Ok is true if worker is acceptable for performing a task. + // If any worker is preferred for a task, other workers won't be considered for that task. + Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (ok, preferred bool, err error) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) // true if a is preferred over b } diff --git a/extern/sector-storage/sched_assigner_common.go b/extern/sector-storage/sched_assigner_common.go index a4a2dfc23..5be551693 100644 --- a/extern/sector-storage/sched_assigner_common.go +++ b/extern/sector-storage/sched_assigner_common.go @@ -61,8 +61,10 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) { }() task := (*sh.SchedQueue)[sqi] - task.IndexHeap = sqi + + var havePreferred bool + for wnd, windowRequest := range sh.OpenWindows { worker, ok := sh.Workers[windowRequest.Worker] if !ok { @@ -84,7 +86,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) { } rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout) - ok, err := task.Sel.Ok(rpcCtx, task.TaskType, task.Sector.ProofType, worker) + ok, preferred, err := task.Sel.Ok(rpcCtx, task.TaskType, task.Sector.ProofType, worker) cancel() if err != nil { log.Errorf("trySched(1) req.Sel.Ok error: %+v", err) @@ -95,6 +97,17 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) { continue } + if havePreferred && !preferred { + // we have a way better worker for this task + continue + } + + if preferred && !havePreferred { + // all workers we considered previously are much worse choice + acceptableWindows[sqi] = acceptableWindows[sqi][:0] + havePreferred = true + } + acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd) } diff --git a/extern/sector-storage/selector_alloc.go b/extern/sector-storage/selector_alloc.go index d2a49f3ce..ffdf35b5e 100644 --- a/extern/sector-storage/selector_alloc.go +++ b/extern/sector-storage/selector_alloc.go @@ -26,18 +26,18 @@ func newAllocSelector(index stores.SectorIndex, alloc storiface.SectorFileType, } } -func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, error) { +func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) { tasks, err := whnd.TaskTypes(ctx) if err != nil { - return false, xerrors.Errorf("getting supported worker task types: %w", err) + return false, false, xerrors.Errorf("getting supported worker task types: %w", err) } if _, supported := tasks[task]; !supported { - return false, nil + return false, false, nil } paths, err := whnd.workerRpc.Paths(ctx) if err != nil { - return false, xerrors.Errorf("getting worker paths: %w", err) + return false, false, xerrors.Errorf("getting worker paths: %w", err) } have := map[storiface.ID]struct{}{} @@ -47,21 +47,21 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi ssize, err := spt.SectorSize() if err != nil { - return false, xerrors.Errorf("getting sector size: %w", err) + return false, false, xerrors.Errorf("getting sector size: %w", err) } best, err := s.index.StorageBestAlloc(ctx, s.alloc, ssize, s.ptype) if err != nil { - return false, xerrors.Errorf("finding best alloc storage: %w", err) + return false, false, xerrors.Errorf("finding best alloc storage: %w", err) } for _, info := range best { if _, ok := have[info.ID]; ok { - return true, nil + return true, false, nil } } - return false, nil + return false, false, nil } func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) { diff --git a/extern/sector-storage/selector_existing.go b/extern/sector-storage/selector_existing.go index 0703f22ed..78a3b1988 100644 --- a/extern/sector-storage/selector_existing.go +++ b/extern/sector-storage/selector_existing.go @@ -28,18 +28,18 @@ func newExistingSelector(index stores.SectorIndex, sector abi.SectorID, alloc st } } -func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, error) { +func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) { tasks, err := whnd.TaskTypes(ctx) if err != nil { - return false, xerrors.Errorf("getting supported worker task types: %w", err) + return false, false, xerrors.Errorf("getting supported worker task types: %w", err) } if _, supported := tasks[task]; !supported { - return false, nil + return false, false, nil } paths, err := whnd.workerRpc.Paths(ctx) if err != nil { - return false, xerrors.Errorf("getting worker paths: %w", err) + return false, false, xerrors.Errorf("getting worker paths: %w", err) } have := map[storiface.ID]struct{}{} @@ -49,21 +49,21 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt ssize, err := spt.SectorSize() if err != nil { - return false, xerrors.Errorf("getting sector size: %w", err) + return false, false, xerrors.Errorf("getting sector size: %w", err) } best, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, ssize, s.allowFetch) if err != nil { - return false, xerrors.Errorf("finding best storage: %w", err) + return false, false, xerrors.Errorf("finding best storage: %w", err) } for _, info := range best { if _, ok := have[info.ID]; ok { - return true, nil + return true, false, nil } } - return false, nil + return false, false, nil } func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {