From da96f06202c7dd6d4482396fcf9f2e0e22287a16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 9 Jul 2020 13:49:01 +0200 Subject: [PATCH] sched: implement runWorker --- sched.go | 222 ++++++++++++++++++++++++++------------------- sched_resources.go | 110 ++++++++++++++++++++++ 2 files changed, 238 insertions(+), 94 deletions(-) create mode 100644 sched_resources.go diff --git a/sched.go b/sched.go index af6981b08..966bf2c46 100644 --- a/sched.go +++ b/sched.go @@ -349,116 +349,150 @@ func (sh *scheduler) trySched() { } func (sh *scheduler) runWorker(wid WorkerID) { - w := sh.workers[wid] - go func() { - for { + worker := sh.workers[wid] + scheduledWindows := make(chan *schedWindow, SchedWindows) + taskDone := make(chan struct{}, 1) + windowsRequested := 0 + var activeWindows []*schedWindow + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + workerClosing, err := worker.w.Closing(ctx) + if err != nil { + return + } + + defer func() { + log.Warnw("Worker closing", "workerid", wid) + + // TODO: close / return all queued tasks + }() + + for { + // ask for more windows if we need them + for ; windowsRequested < SchedWindows; windowsRequested++ { + select { + case sh.windowRequests <- &schedWindowRequest{ + worker: wid, + done: scheduledWindows, + }: + case <-sh.closing: + return + case <-workerClosing: + return + } + } + + select { + case w := <-scheduledWindows: + activeWindows = append(activeWindows, w) + case <-taskDone: + case <-sh.closing: + return + case <-workerClosing: + return + } + + assignLoop: + // process windows in order + for len(activeWindows) > 0 { + // process tasks within a window in order + for len(activeWindows[0].todo) > 0 { + todo := activeWindows[0].todo[0] + needRes := ResourceTable[todo.taskType][sh.spt] + + sh.workersLk.Lock() + ok := worker.preparing.canHandleRequest(needRes, wid, worker.info.Resources) + if !ok { + sh.workersLk.Unlock() + break assignLoop + } + + err := sh.assignWorker(taskDone, wid, worker, todo) + sh.workersLk.Unlock() + + if err != nil { + log.Error("assignWorker error: %+v", err) + go todo.respond(xerrors.Errorf("assignWorker error: %w", err)) + } + + activeWindows[0].todo = activeWindows[0].todo[1:] + } + + copy(activeWindows, activeWindows[1:]) + activeWindows[len(activeWindows)-1] = nil + activeWindows = activeWindows[:len(activeWindows)-1] + + windowsRequested-- + } } }() } -func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error { - for !a.canHandleRequest(r, id, wr) { - if a.cond == nil { - a.cond = sync.NewCond(locker) +func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *workerHandle, req *workerRequest) error { + needRes := ResourceTable[req.taskType][sh.spt] + + w.preparing.add(w.info.Resources, needRes) + + go func() { + err := req.prepare(req.ctx, w.w) + sh.workersLk.Lock() + + if err != nil { + w.preparing.free(w.info.Resources, needRes) + sh.workersLk.Unlock() + + select { + case taskDone <- struct{}{}: + case <-sh.closing: + log.Warnf("scheduler closed while sending response (prepare error: %+v)", err) + } + + select { + case req.ret <- workerResponse{err: err}: + case <-req.ctx.Done(): + log.Warnf("request got cancelled before we could respond (prepare error: %+v)", err) + case <-sh.closing: + log.Warnf("scheduler closed while sending response (prepare error: %+v)", err) + } + return } - a.cond.Wait() - } - a.add(wr, r) + err = w.active.withResources(wid, w.info.Resources, needRes, &sh.workersLk, func() error { + w.preparing.free(w.info.Resources, needRes) + sh.workersLk.Unlock() + defer sh.workersLk.Lock() // we MUST return locked from this function - err := cb() + select { + case taskDone <- struct{}{}: + case <-sh.closing: + } - a.free(wr, r) - if a.cond != nil { - a.cond.Broadcast() - } + err = req.work(req.ctx, w.w) - return err -} + select { + case req.ret <- workerResponse{err: err}: + case <-req.ctx.Done(): + log.Warnf("request got cancelled before we could respond") + case <-sh.closing: + log.Warnf("scheduler closed while sending response") + } -func (a *activeResources) add(wr storiface.WorkerResources, r Resources) { - a.gpuUsed = r.CanGPU - if r.MultiThread() { - a.cpuUse += wr.CPUs - } else { - a.cpuUse += uint64(r.Threads) - } + return nil + }) - a.memUsedMin += r.MinMemory - a.memUsedMax += r.MaxMemory -} + sh.workersLk.Unlock() -func (a *activeResources) free(wr storiface.WorkerResources, r Resources) { - if r.CanGPU { - a.gpuUsed = false - } - if r.MultiThread() { - a.cpuUse -= wr.CPUs - } else { - a.cpuUse -= uint64(r.Threads) - } - - a.memUsedMin -= r.MinMemory - a.memUsedMax -= r.MaxMemory -} - -func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, res storiface.WorkerResources) bool { - - // TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running) - minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory - if minNeedMem > res.MemPhysical { - log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib) - return false - } - - maxNeedMem := res.MemReserved + a.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory - - if maxNeedMem > res.MemSwap+res.MemPhysical { - log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib) - return false - } - - if needRes.MultiThread() { - if a.cpuUse > 0 { - log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, a.cpuUse, res.CPUs) - return false + // This error should always be nil, since nothing is setting it, but just to be safe: + if err != nil { + log.Errorf("error executing worker (withResources): %+v", err) } - } else { - if a.cpuUse+uint64(needRes.Threads) > res.CPUs { - log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, a.cpuUse, res.CPUs) - return false - } - } + }() - if len(res.GPUs) > 0 && needRes.CanGPU { - if a.gpuUsed { - log.Debugf("sched: not scheduling on worker %d; GPU in use", wid) - return false - } - } - - return true -} - -func (a *activeResources) utilization(wr storiface.WorkerResources) float64 { - var max float64 - - cpu := float64(a.cpuUse) / float64(wr.CPUs) - max = cpu - - memMin := float64(a.memUsedMin+wr.MemReserved) / float64(wr.MemPhysical) - if memMin > max { - max = memMin - } - - memMax := float64(a.memUsedMax+wr.MemReserved) / float64(wr.MemPhysical+wr.MemSwap) - if memMax > max { - max = memMax - } - - return max + return nil } func (sh *scheduler) newWorker(w *workerHandle) { diff --git a/sched_resources.go b/sched_resources.go new file mode 100644 index 000000000..0ba9d1f66 --- /dev/null +++ b/sched_resources.go @@ -0,0 +1,110 @@ +package sectorstorage + +import ( + "sync" + + "github.com/filecoin-project/sector-storage/storiface" +) + +func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error { + for !a.canHandleRequest(r, id, wr) { + if a.cond == nil { + a.cond = sync.NewCond(locker) + } + a.cond.Wait() + } + + a.add(wr, r) + + err := cb() + + a.free(wr, r) + if a.cond != nil { + a.cond.Broadcast() + } + + return err +} + +func (a *activeResources) add(wr storiface.WorkerResources, r Resources) { + a.gpuUsed = r.CanGPU + if r.MultiThread() { + a.cpuUse += wr.CPUs + } else { + a.cpuUse += uint64(r.Threads) + } + + a.memUsedMin += r.MinMemory + a.memUsedMax += r.MaxMemory +} + +func (a *activeResources) free(wr storiface.WorkerResources, r Resources) { + if r.CanGPU { + a.gpuUsed = false + } + if r.MultiThread() { + a.cpuUse -= wr.CPUs + } else { + a.cpuUse -= uint64(r.Threads) + } + + a.memUsedMin -= r.MinMemory + a.memUsedMax -= r.MaxMemory +} + +func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, res storiface.WorkerResources) bool { + + // TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running) + minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory + if minNeedMem > res.MemPhysical { + log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib) + return false + } + + maxNeedMem := res.MemReserved + a.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory + + if maxNeedMem > res.MemSwap+res.MemPhysical { + log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib) + return false + } + + if needRes.MultiThread() { + if a.cpuUse > 0 { + log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, a.cpuUse, res.CPUs) + return false + } + } else { + if a.cpuUse+uint64(needRes.Threads) > res.CPUs { + log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, a.cpuUse, res.CPUs) + return false + } + } + + if len(res.GPUs) > 0 && needRes.CanGPU { + if a.gpuUsed { + log.Debugf("sched: not scheduling on worker %d; GPU in use", wid) + return false + } + } + + return true +} + +func (a *activeResources) utilization(wr storiface.WorkerResources) float64 { + var max float64 + + cpu := float64(a.cpuUse) / float64(wr.CPUs) + max = cpu + + memMin := float64(a.memUsedMin+wr.MemReserved) / float64(wr.MemPhysical) + if memMin > max { + max = memMin + } + + memMax := float64(a.memUsedMax+wr.MemReserved) / float64(wr.MemPhysical+wr.MemSwap) + if memMax > max { + max = memMax + } + + return max +}