diff --git a/sched.go b/sched.go index ffaad211d..ad3c948dd 100644 --- a/sched.go +++ b/sched.go @@ -194,6 +194,8 @@ func (sh *scheduler) maybeSchedRequest(req *workerRequest) (bool, error) { tried := 0 var acceptable []WorkerID + needRes := ResourceTable[req.taskType][sh.spt] + for wid, worker := range sh.workers { ok, err := req.sel.Ok(req.ctx, req.taskType, worker) if err != nil { @@ -205,7 +207,6 @@ func (sh *scheduler) maybeSchedRequest(req *workerRequest) (bool, error) { } tried++ - needRes := ResourceTable[req.taskType][sh.spt] if !canHandleRequest(needRes, sh.spt, wid, worker.info.Resources, worker.preparing) { continue } @@ -384,6 +385,25 @@ func canHandleRequest(needRes Resources, spt abi.RegisteredProof, wid WorkerID, return true } +func (a *activeResources) utilization(wr storiface.WorkerResources) float64 { + var max float64 + + cpu := float64(a.cpuUse) / float64(wr.CPUs) + max = cpu + + memMin := float64(a.memUsedMin + wr.MemReserved) / float64(wr.MemPhysical) + if memMin > max { + max = memMin + } + + memMax := float64(a.memUsedMax + wr.MemReserved) / float64(wr.MemPhysical + wr.MemSwap) + if memMax > max { + max = memMax + } + + return max +} + func (sh *scheduler) schedNewWorker(w *workerHandle) { sh.workersLk.Lock() defer sh.workersLk.Unlock() diff --git a/selector_alloc.go b/selector_alloc.go index 1ceab0ecb..c7d06a7bc 100644 --- a/selector_alloc.go +++ b/selector_alloc.go @@ -53,7 +53,7 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, whnd *w } func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) { - return a.info.Hostname > b.info.Hostname, nil // TODO: Better strategy + return a.active.utilization(a.info.Resources) < b.active.utilization(b.info.Resources), nil } var _ WorkerSelector = &allocSelector{} diff --git a/selector_existing.go b/selector_existing.go index eccdefbf2..46dd3278e 100644 --- a/selector_existing.go +++ b/selector_existing.go @@ -54,7 +54,7 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, whnd } func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) { - return a.info.Hostname > b.info.Hostname, nil // TODO: Better strategy + return a.active.utilization(a.info.Resources) < b.active.utilization(b.info.Resources), nil } var _ WorkerSelector = &existingSelector{} diff --git a/selector_task.go b/selector_task.go index 5e67ad665..2f20f9a28 100644 --- a/selector_task.go +++ b/selector_task.go @@ -40,7 +40,7 @@ func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *work return len(atasks) < len(btasks), nil // prefer workers which can do less } - return a.info.Hostname > b.info.Hostname, nil // TODO: Better fallback strategy + return a.active.utilization(a.info.Resources) < b.active.utilization(b.info.Resources), nil } var _ WorkerSelector = &allocSelector{}