diff --git a/extern/sector-storage/sched_worker.go b/extern/sector-storage/sched_worker.go index e56e9056d..29c730b03 100644 --- a/extern/sector-storage/sched_worker.go +++ b/extern/sector-storage/sched_worker.go @@ -139,10 +139,17 @@ func (sw *schedWorker) handleWorker() { // wait for more tasks to be assigned by the main scheduler or for the worker // to finish precessing a task - update, ok := sw.waitForUpdates() + update, pokeSched, ok := sw.waitForUpdates() if !ok { return } + if pokeSched { + // a task has finished preparing, which can mean that we've freed some space on some worker + select { + case sched.workerChange <- struct{}{}: + default: // workerChange is buffered, and scheduling is global, so it's ok if we don't send here + } + } if update { break } @@ -257,23 +264,23 @@ func (sw *schedWorker) requestWindows() bool { return true } -func (sw *schedWorker) waitForUpdates() (update bool, ok bool) { +func (sw *schedWorker) waitForUpdates() (update bool, sched bool, ok bool) { select { case <-sw.heartbeatTimer.C: - return false, true + return false, false, true case w := <-sw.scheduledWindows: sw.worker.wndLk.Lock() sw.worker.activeWindows = append(sw.worker.activeWindows, w) sw.worker.wndLk.Unlock() - return true, true + return true, false, true case <-sw.taskDone: log.Debugw("task done", "workerid", sw.wid) - return true, true + return true, true, true case <-sw.sched.closing: case <-sw.worker.closingMgr: } - return false, false + return false, false, false } func (sw *schedWorker) workerCompactWindows() {