Merge pull request #8447 from filecoin-project/feat/sched-better

feat: sched: Improve worker assigning logic
This commit is contained in:
Łukasz Magiera 2022-04-09 20:50:31 +02:00 committed by GitHub
commit 7900479e9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 90 additions and 19 deletions

View File

@ -2,6 +2,7 @@ package sectorstorage
import ( import (
"context" "context"
"math"
"math/rand" "math/rand"
"sort" "sort"
"sync" "sync"
@ -76,6 +77,10 @@ type scheduler struct {
type workerHandle struct { type workerHandle struct {
workerRpc Worker workerRpc Worker
tasksCache map[sealtasks.TaskType]struct{}
tasksUpdate time.Time
tasksLk sync.Mutex
info storiface.WorkerInfo info storiface.WorkerInfo
preparing *activeResources // use with workerHandle.lk preparing *activeResources // use with workerHandle.lk
@ -361,7 +366,7 @@ func (sh *scheduler) trySched() {
} }
windows := make([]schedWindow, windowsLen) windows := make([]schedWindow, windowsLen)
acceptableWindows := make([][]int, queueLen) acceptableWindows := make([][]int, queueLen) // QueueIndex -> []OpenWindowIndex
// Step 1 // Step 1
throttle := make(chan struct{}, windowsLen) throttle := make(chan struct{}, windowsLen)
@ -455,41 +460,73 @@ func (sh *scheduler) trySched() {
// Step 2 // Step 2
scheduled := 0 scheduled := 0
rmQueue := make([]int, 0, queueLen) rmQueue := make([]int, 0, queueLen)
workerUtil := map[storiface.WorkerID]float64{}
for sqi := 0; sqi < queueLen; sqi++ { for sqi := 0; sqi < queueLen; sqi++ {
task := (*sh.schedQueue)[sqi] task := (*sh.schedQueue)[sqi]
selectedWindow := -1 selectedWindow := -1
for _, wnd := range acceptableWindows[task.indexHeap] { var needRes storiface.Resources
var info storiface.WorkerInfo
var bestWid storiface.WorkerID
bestUtilization := math.MaxFloat64 // smaller = better
for i, wnd := range acceptableWindows[task.indexHeap] {
wid := sh.openWindows[wnd].worker wid := sh.openWindows[wnd].worker
info := sh.workers[wid].info w := sh.workers[wid]
log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.ID.Number, wnd) res := info.Resources.ResourceSpec(task.sector.ProofType, task.taskType)
needRes := info.Resources.ResourceSpec(task.sector.ProofType, task.taskType) log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.sector.ID.Number, wnd, i)
// TODO: allow bigger windows // TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", info) { if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", info) {
continue continue
} }
log.Debugf("SCHED ASSIGNED sqi:%d sector %d task %s to window %d", sqi, task.sector.ID.Number, task.taskType, wnd) wu, found := workerUtil[wid]
if !found {
windows[wnd].allocated.add(info.Resources, needRes) wu = w.utilization()
// TODO: We probably want to re-sort acceptableWindows here based on new workerUtil[wid] = wu
// workerHandle.utilization + windows[wnd].allocated.utilization (workerHandle.utilization is used in all }
// task selectors, but not in the same way, so need to figure out how to do that in a non-O(n^2 way), and if wu >= bestUtilization {
// without additional network roundtrips (O(n^2) could be avoided by turning acceptableWindows.[] into heaps)) // acceptable worker list is initially sorted by utilization, and the initially-best workers
// will be assigned tasks first. This means that if we find a worker which isn't better, it
selectedWindow = wnd // probably means that the other workers aren't better either.
//
// utilization
// ^
// | /
// | \ /
// | \ /
// | *
// #--------> acceptableWindow index
//
// * -> we're here
break break
} }
info = w.info
needRes = res
bestWid = wid
selectedWindow = wnd
bestUtilization = wu
}
if selectedWindow < 0 { if selectedWindow < 0 {
// all windows full // all windows full
continue continue
} }
log.Debugw("SCHED ASSIGNED",
"sqi", sqi,
"sector", task.sector.ID.Number,
"task", task.taskType,
"window", selectedWindow,
"worker", bestWid,
"utilization", bestUtilization)
workerUtil[bestWid] += windows[selectedWindow].allocated.add(info.Resources, needRes)
windows[selectedWindow].todo = append(windows[selectedWindow].todo, task) windows[selectedWindow].todo = append(windows[selectedWindow].todo, task)
rmQueue = append(rmQueue, sqi) rmQueue = append(rmQueue, sqi)

View File

@ -1,8 +1,11 @@
package sectorstorage package sectorstorage
import ( import (
"context"
"sync" "sync"
"time"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
) )
@ -30,13 +33,18 @@ func (a *activeResources) hasWorkWaiting() bool {
return a.waiting > 0 return a.waiting > 0
} }
func (a *activeResources) add(wr storiface.WorkerResources, r storiface.Resources) { // add task resources to activeResources and return utilization difference
func (a *activeResources) add(wr storiface.WorkerResources, r storiface.Resources) float64 {
startUtil := a.utilization(wr)
if r.GPUUtilization > 0 { if r.GPUUtilization > 0 {
a.gpuUsed += r.GPUUtilization a.gpuUsed += r.GPUUtilization
} }
a.cpuUse += r.Threads(wr.CPUs, len(wr.GPUs)) a.cpuUse += r.Threads(wr.CPUs, len(wr.GPUs))
a.memUsedMin += r.MinMemory a.memUsedMin += r.MinMemory
a.memUsedMax += r.MaxMemory a.memUsedMax += r.MaxMemory
return a.utilization(wr) - startUtil
} }
func (a *activeResources) free(wr storiface.WorkerResources, r storiface.Resources) { func (a *activeResources) free(wr storiface.WorkerResources, r storiface.Resources) {
@ -101,6 +109,7 @@ func (a *activeResources) canHandleRequest(needRes storiface.Resources, wid stor
return true return true
} }
// utilization returns a number in 0..1 range indicating fraction of used resources
func (a *activeResources) utilization(wr storiface.WorkerResources) float64 { func (a *activeResources) utilization(wr storiface.WorkerResources) float64 {
var max float64 var max float64
@ -126,6 +135,13 @@ func (a *activeResources) utilization(wr storiface.WorkerResources) float64 {
max = memMax max = memMax
} }
if len(wr.GPUs) > 0 {
gpuMax := a.gpuUsed / float64(len(wr.GPUs))
if gpuMax > max {
max = gpuMax
}
}
return max return max
} }
@ -142,3 +158,20 @@ func (wh *workerHandle) utilization() float64 {
return u return u
} }
var tasksCacheTimeout = 30 * time.Second
func (wh *workerHandle) TaskTypes(ctx context.Context) (t map[sealtasks.TaskType]struct{}, err error) {
wh.tasksLk.Lock()
defer wh.tasksLk.Unlock()
if wh.tasksCache == nil || time.Now().Sub(wh.tasksUpdate) > tasksCacheTimeout {
wh.tasksCache, err = wh.workerRpc.TaskTypes(ctx)
if err != nil {
return nil, err
}
wh.tasksUpdate = time.Now()
}
return wh.tasksCache, nil
}

View File

@ -27,7 +27,7 @@ func newAllocSelector(index stores.SectorIndex, alloc storiface.SectorFileType,
} }
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) { func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) {
tasks, err := whnd.workerRpc.TaskTypes(ctx) tasks, err := whnd.TaskTypes(ctx)
if err != nil { if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err) return false, xerrors.Errorf("getting supported worker task types: %w", err)
} }

View File

@ -29,7 +29,7 @@ func newExistingSelector(index stores.SectorIndex, sector abi.SectorID, alloc st
} }
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) { func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) {
tasks, err := whnd.workerRpc.TaskTypes(ctx) tasks, err := whnd.TaskTypes(ctx)
if err != nil { if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err) return false, xerrors.Errorf("getting supported worker task types: %w", err)
} }

View File

@ -20,7 +20,7 @@ func newTaskSelector() *taskSelector {
} }
func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) { func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) {
tasks, err := whnd.workerRpc.TaskTypes(ctx) tasks, err := whnd.TaskTypes(ctx)
if err != nil { if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err) return false, xerrors.Errorf("getting supported worker task types: %w", err)
} }
@ -30,11 +30,12 @@ func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.
} }
func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *workerHandle) (bool, error) { func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *workerHandle) (bool, error) {
atasks, err := a.workerRpc.TaskTypes(ctx) atasks, err := a.TaskTypes(ctx)
if err != nil { if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err) return false, xerrors.Errorf("getting supported worker task types: %w", err)
} }
btasks, err := b.workerRpc.TaskTypes(ctx)
btasks, err := b.TaskTypes(ctx)
if err != nil { if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err) return false, xerrors.Errorf("getting supported worker task types: %w", err)
} }