sched: Update after tasks finish preparing

This commit is contained in:
Łukasz Magiera 2020-11-09 21:44:28 +01:00
parent fbc05fa5a8
commit 5b8f555221

View File

@ -139,10 +139,17 @@ func (sw *schedWorker) handleWorker() {
// wait for more tasks to be assigned by the main scheduler or for the worker
// to finish precessing a task
update, ok := sw.waitForUpdates()
update, pokeSched, ok := sw.waitForUpdates()
if !ok {
return
}
if pokeSched {
// a task has finished preparing, which can mean that we've freed some space on some worker
select {
case sched.workerChange <- struct{}{}:
default: // workerChange is buffered, and scheduling is global, so it's ok if we don't send here
}
}
if update {
break
}
@ -257,23 +264,23 @@ func (sw *schedWorker) requestWindows() bool {
return true
}
func (sw *schedWorker) waitForUpdates() (update bool, ok bool) {
func (sw *schedWorker) waitForUpdates() (update bool, sched bool, ok bool) {
select {
case <-sw.heartbeatTimer.C:
return false, true
return false, false, true
case w := <-sw.scheduledWindows:
sw.worker.wndLk.Lock()
sw.worker.activeWindows = append(sw.worker.activeWindows, w)
sw.worker.wndLk.Unlock()
return true, true
return true, false, true
case <-sw.taskDone:
log.Debugw("task done", "workerid", sw.wid)
return true, true
return true, true, true
case <-sw.sched.closing:
case <-sw.worker.closingMgr:
}
return false, false
return false, false, false
}
func (sw *schedWorker) workerCompactWindows() {