sched: implement runWorker

This commit is contained in:
Łukasz Magiera 2020-07-09 13:49:01 +02:00
parent ac7dc28cfb
commit da96f06202
2 changed files with 238 additions and 94 deletions

222
sched.go
View File

@ -349,116 +349,150 @@ func (sh *scheduler) trySched() {
} }
func (sh *scheduler) runWorker(wid WorkerID) { func (sh *scheduler) runWorker(wid WorkerID) {
w := sh.workers[wid]
go func() { go func() {
for { worker := sh.workers[wid]
scheduledWindows := make(chan *schedWindow, SchedWindows)
taskDone := make(chan struct{}, 1)
windowsRequested := 0
var activeWindows []*schedWindow
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
workerClosing, err := worker.w.Closing(ctx)
if err != nil {
return
}
defer func() {
log.Warnw("Worker closing", "workerid", wid)
// TODO: close / return all queued tasks
}()
for {
// ask for more windows if we need them
for ; windowsRequested < SchedWindows; windowsRequested++ {
select {
case sh.windowRequests <- &schedWindowRequest{
worker: wid,
done: scheduledWindows,
}:
case <-sh.closing:
return
case <-workerClosing:
return
}
}
select {
case w := <-scheduledWindows:
activeWindows = append(activeWindows, w)
case <-taskDone:
case <-sh.closing:
return
case <-workerClosing:
return
}
assignLoop:
// process windows in order
for len(activeWindows) > 0 {
// process tasks within a window in order
for len(activeWindows[0].todo) > 0 {
todo := activeWindows[0].todo[0]
needRes := ResourceTable[todo.taskType][sh.spt]
sh.workersLk.Lock()
ok := worker.preparing.canHandleRequest(needRes, wid, worker.info.Resources)
if !ok {
sh.workersLk.Unlock()
break assignLoop
}
err := sh.assignWorker(taskDone, wid, worker, todo)
sh.workersLk.Unlock()
if err != nil {
log.Error("assignWorker error: %+v", err)
go todo.respond(xerrors.Errorf("assignWorker error: %w", err))
}
activeWindows[0].todo = activeWindows[0].todo[1:]
}
copy(activeWindows, activeWindows[1:])
activeWindows[len(activeWindows)-1] = nil
activeWindows = activeWindows[:len(activeWindows)-1]
windowsRequested--
}
} }
}() }()
} }
func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error { func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *workerHandle, req *workerRequest) error {
for !a.canHandleRequest(r, id, wr) { needRes := ResourceTable[req.taskType][sh.spt]
if a.cond == nil {
a.cond = sync.NewCond(locker) w.preparing.add(w.info.Resources, needRes)
go func() {
err := req.prepare(req.ctx, w.w)
sh.workersLk.Lock()
if err != nil {
w.preparing.free(w.info.Resources, needRes)
sh.workersLk.Unlock()
select {
case taskDone <- struct{}{}:
case <-sh.closing:
log.Warnf("scheduler closed while sending response (prepare error: %+v)", err)
}
select {
case req.ret <- workerResponse{err: err}:
case <-req.ctx.Done():
log.Warnf("request got cancelled before we could respond (prepare error: %+v)", err)
case <-sh.closing:
log.Warnf("scheduler closed while sending response (prepare error: %+v)", err)
}
return
} }
a.cond.Wait()
}
a.add(wr, r) err = w.active.withResources(wid, w.info.Resources, needRes, &sh.workersLk, func() error {
w.preparing.free(w.info.Resources, needRes)
sh.workersLk.Unlock()
defer sh.workersLk.Lock() // we MUST return locked from this function
err := cb() select {
case taskDone <- struct{}{}:
case <-sh.closing:
}
a.free(wr, r) err = req.work(req.ctx, w.w)
if a.cond != nil {
a.cond.Broadcast()
}
return err select {
} case req.ret <- workerResponse{err: err}:
case <-req.ctx.Done():
log.Warnf("request got cancelled before we could respond")
case <-sh.closing:
log.Warnf("scheduler closed while sending response")
}
func (a *activeResources) add(wr storiface.WorkerResources, r Resources) { return nil
a.gpuUsed = r.CanGPU })
if r.MultiThread() {
a.cpuUse += wr.CPUs
} else {
a.cpuUse += uint64(r.Threads)
}
a.memUsedMin += r.MinMemory sh.workersLk.Unlock()
a.memUsedMax += r.MaxMemory
}
func (a *activeResources) free(wr storiface.WorkerResources, r Resources) { // This error should always be nil, since nothing is setting it, but just to be safe:
if r.CanGPU { if err != nil {
a.gpuUsed = false log.Errorf("error executing worker (withResources): %+v", err)
}
if r.MultiThread() {
a.cpuUse -= wr.CPUs
} else {
a.cpuUse -= uint64(r.Threads)
}
a.memUsedMin -= r.MinMemory
a.memUsedMax -= r.MaxMemory
}
func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, res storiface.WorkerResources) bool {
// TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running)
minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory
if minNeedMem > res.MemPhysical {
log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib)
return false
}
maxNeedMem := res.MemReserved + a.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory
if maxNeedMem > res.MemSwap+res.MemPhysical {
log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib)
return false
}
if needRes.MultiThread() {
if a.cpuUse > 0 {
log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, a.cpuUse, res.CPUs)
return false
} }
} else { }()
if a.cpuUse+uint64(needRes.Threads) > res.CPUs {
log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, a.cpuUse, res.CPUs)
return false
}
}
if len(res.GPUs) > 0 && needRes.CanGPU { return nil
if a.gpuUsed {
log.Debugf("sched: not scheduling on worker %d; GPU in use", wid)
return false
}
}
return true
}
func (a *activeResources) utilization(wr storiface.WorkerResources) float64 {
var max float64
cpu := float64(a.cpuUse) / float64(wr.CPUs)
max = cpu
memMin := float64(a.memUsedMin+wr.MemReserved) / float64(wr.MemPhysical)
if memMin > max {
max = memMin
}
memMax := float64(a.memUsedMax+wr.MemReserved) / float64(wr.MemPhysical+wr.MemSwap)
if memMax > max {
max = memMax
}
return max
} }
func (sh *scheduler) newWorker(w *workerHandle) { func (sh *scheduler) newWorker(w *workerHandle) {

110
sched_resources.go Normal file
View File

@ -0,0 +1,110 @@
package sectorstorage
import (
"sync"
"github.com/filecoin-project/sector-storage/storiface"
)
func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error {
for !a.canHandleRequest(r, id, wr) {
if a.cond == nil {
a.cond = sync.NewCond(locker)
}
a.cond.Wait()
}
a.add(wr, r)
err := cb()
a.free(wr, r)
if a.cond != nil {
a.cond.Broadcast()
}
return err
}
func (a *activeResources) add(wr storiface.WorkerResources, r Resources) {
a.gpuUsed = r.CanGPU
if r.MultiThread() {
a.cpuUse += wr.CPUs
} else {
a.cpuUse += uint64(r.Threads)
}
a.memUsedMin += r.MinMemory
a.memUsedMax += r.MaxMemory
}
func (a *activeResources) free(wr storiface.WorkerResources, r Resources) {
if r.CanGPU {
a.gpuUsed = false
}
if r.MultiThread() {
a.cpuUse -= wr.CPUs
} else {
a.cpuUse -= uint64(r.Threads)
}
a.memUsedMin -= r.MinMemory
a.memUsedMax -= r.MaxMemory
}
func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, res storiface.WorkerResources) bool {
// TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running)
minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory
if minNeedMem > res.MemPhysical {
log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib)
return false
}
maxNeedMem := res.MemReserved + a.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory
if maxNeedMem > res.MemSwap+res.MemPhysical {
log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib)
return false
}
if needRes.MultiThread() {
if a.cpuUse > 0 {
log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, a.cpuUse, res.CPUs)
return false
}
} else {
if a.cpuUse+uint64(needRes.Threads) > res.CPUs {
log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, a.cpuUse, res.CPUs)
return false
}
}
if len(res.GPUs) > 0 && needRes.CanGPU {
if a.gpuUsed {
log.Debugf("sched: not scheduling on worker %d; GPU in use", wid)
return false
}
}
return true
}
func (a *activeResources) utilization(wr storiface.WorkerResources) float64 {
var max float64
cpu := float64(a.cpuUse) / float64(wr.CPUs)
max = cpu
memMin := float64(a.memUsedMin+wr.MemReserved) / float64(wr.MemPhysical)
if memMin > max {
max = memMin
}
memMax := float64(a.memUsedMax+wr.MemReserved) / float64(wr.MemPhysical+wr.MemSwap)
if memMax > max {
max = memMax
}
return max
}