diff --git a/sched.go b/sched.go index ed648bf19..e8eda4834 100644 --- a/sched.go +++ b/sched.go @@ -52,7 +52,7 @@ type WorkerSelector interface { type scheduler struct { spt abi.RegisteredSealProof - workersLk sync.Mutex + workersLk sync.RWMutex nextWorker WorkerID workers map[WorkerID]*workerHandle @@ -83,6 +83,8 @@ type workerHandle struct { preparing *activeResources active *activeResources + lk sync.Mutex + // stats / tracking wt *workTracker @@ -283,6 +285,9 @@ func (sh *scheduler) trySched() { log.Debugf("SCHED %d queued; %d open windows", sh.schedQueue.Len(), len(windows)) + sh.workersLk.RLock() + defer sh.workersLk.RUnlock() + // Step 1 for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ { task := (*sh.schedQueue)[sqi] @@ -428,9 +433,9 @@ func (sh *scheduler) runWorker(wid WorkerID) { defer ready.Wait() go func() { - sh.workersLk.Lock() + sh.workersLk.RLock() worker, found := sh.workers[wid] - sh.workersLk.Unlock() + sh.workersLk.RUnlock() ready.Done() @@ -498,16 +503,19 @@ func (sh *scheduler) runWorker(wid WorkerID) { todo := activeWindows[0].todo[0] needRes := ResourceTable[todo.taskType][sh.spt] - sh.workersLk.Lock() + sh.workersLk.RLock() + worker.lk.Lock() ok := worker.preparing.canHandleRequest(needRes, wid, worker.info.Resources) + worker.lk.Unlock() + if !ok { - sh.workersLk.Unlock() + sh.workersLk.RUnlock() break assignLoop } log.Debugf("assign worker sector %d", todo.sector.Number) err := sh.assignWorker(taskDone, wid, worker, todo) - sh.workersLk.Unlock() + sh.workersLk.RUnlock() if err != nil { log.Error("assignWorker error: %+v", err) @@ -530,14 +538,18 @@ func (sh *scheduler) runWorker(wid WorkerID) { func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *workerHandle, req *workerRequest) error { needRes := ResourceTable[req.taskType][sh.spt] + w.lk.Lock() w.preparing.add(w.info.Resources, needRes) + w.lk.Unlock() go func() { err := req.prepare(req.ctx, w.wt.worker(w.w)) sh.workersLk.Lock() if err != nil { + w.lk.Lock() w.preparing.free(w.info.Resources, needRes) + w.lk.Unlock() sh.workersLk.Unlock() select { @@ -557,7 +569,9 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke } err = w.active.withResources(wid, w.info.Resources, needRes, &sh.workersLk, func() error { + w.lk.Lock() w.preparing.free(w.info.Resources, needRes) + w.lk.Unlock() sh.workersLk.Unlock() defer sh.workersLk.Lock() // we MUST return locked from this function