Fix some locking issues
This commit is contained in:
parent
898a72d078
commit
ed251d9f82
26
sched.go
26
sched.go
@ -52,7 +52,7 @@ type WorkerSelector interface {
|
|||||||
type scheduler struct {
|
type scheduler struct {
|
||||||
spt abi.RegisteredSealProof
|
spt abi.RegisteredSealProof
|
||||||
|
|
||||||
workersLk sync.Mutex
|
workersLk sync.RWMutex
|
||||||
nextWorker WorkerID
|
nextWorker WorkerID
|
||||||
workers map[WorkerID]*workerHandle
|
workers map[WorkerID]*workerHandle
|
||||||
|
|
||||||
@ -83,6 +83,8 @@ type workerHandle struct {
|
|||||||
preparing *activeResources
|
preparing *activeResources
|
||||||
active *activeResources
|
active *activeResources
|
||||||
|
|
||||||
|
lk sync.Mutex
|
||||||
|
|
||||||
// stats / tracking
|
// stats / tracking
|
||||||
wt *workTracker
|
wt *workTracker
|
||||||
|
|
||||||
@ -283,6 +285,9 @@ func (sh *scheduler) trySched() {
|
|||||||
|
|
||||||
log.Debugf("SCHED %d queued; %d open windows", sh.schedQueue.Len(), len(windows))
|
log.Debugf("SCHED %d queued; %d open windows", sh.schedQueue.Len(), len(windows))
|
||||||
|
|
||||||
|
sh.workersLk.RLock()
|
||||||
|
defer sh.workersLk.RUnlock()
|
||||||
|
|
||||||
// Step 1
|
// Step 1
|
||||||
for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ {
|
for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ {
|
||||||
task := (*sh.schedQueue)[sqi]
|
task := (*sh.schedQueue)[sqi]
|
||||||
@ -428,9 +433,9 @@ func (sh *scheduler) runWorker(wid WorkerID) {
|
|||||||
defer ready.Wait()
|
defer ready.Wait()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
sh.workersLk.Lock()
|
sh.workersLk.RLock()
|
||||||
worker, found := sh.workers[wid]
|
worker, found := sh.workers[wid]
|
||||||
sh.workersLk.Unlock()
|
sh.workersLk.RUnlock()
|
||||||
|
|
||||||
ready.Done()
|
ready.Done()
|
||||||
|
|
||||||
@ -498,16 +503,19 @@ func (sh *scheduler) runWorker(wid WorkerID) {
|
|||||||
todo := activeWindows[0].todo[0]
|
todo := activeWindows[0].todo[0]
|
||||||
needRes := ResourceTable[todo.taskType][sh.spt]
|
needRes := ResourceTable[todo.taskType][sh.spt]
|
||||||
|
|
||||||
sh.workersLk.Lock()
|
sh.workersLk.RLock()
|
||||||
|
worker.lk.Lock()
|
||||||
ok := worker.preparing.canHandleRequest(needRes, wid, worker.info.Resources)
|
ok := worker.preparing.canHandleRequest(needRes, wid, worker.info.Resources)
|
||||||
|
worker.lk.Unlock()
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
sh.workersLk.Unlock()
|
sh.workersLk.RUnlock()
|
||||||
break assignLoop
|
break assignLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("assign worker sector %d", todo.sector.Number)
|
log.Debugf("assign worker sector %d", todo.sector.Number)
|
||||||
err := sh.assignWorker(taskDone, wid, worker, todo)
|
err := sh.assignWorker(taskDone, wid, worker, todo)
|
||||||
sh.workersLk.Unlock()
|
sh.workersLk.RUnlock()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("assignWorker error: %+v", err)
|
log.Error("assignWorker error: %+v", err)
|
||||||
@ -530,14 +538,18 @@ func (sh *scheduler) runWorker(wid WorkerID) {
|
|||||||
func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *workerHandle, req *workerRequest) error {
|
func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *workerHandle, req *workerRequest) error {
|
||||||
needRes := ResourceTable[req.taskType][sh.spt]
|
needRes := ResourceTable[req.taskType][sh.spt]
|
||||||
|
|
||||||
|
w.lk.Lock()
|
||||||
w.preparing.add(w.info.Resources, needRes)
|
w.preparing.add(w.info.Resources, needRes)
|
||||||
|
w.lk.Unlock()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
err := req.prepare(req.ctx, w.wt.worker(w.w))
|
err := req.prepare(req.ctx, w.wt.worker(w.w))
|
||||||
sh.workersLk.Lock()
|
sh.workersLk.Lock()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
w.lk.Lock()
|
||||||
w.preparing.free(w.info.Resources, needRes)
|
w.preparing.free(w.info.Resources, needRes)
|
||||||
|
w.lk.Unlock()
|
||||||
sh.workersLk.Unlock()
|
sh.workersLk.Unlock()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@ -557,7 +569,9 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke
|
|||||||
}
|
}
|
||||||
|
|
||||||
err = w.active.withResources(wid, w.info.Resources, needRes, &sh.workersLk, func() error {
|
err = w.active.withResources(wid, w.info.Resources, needRes, &sh.workersLk, func() error {
|
||||||
|
w.lk.Lock()
|
||||||
w.preparing.free(w.info.Resources, needRes)
|
w.preparing.free(w.info.Resources, needRes)
|
||||||
|
w.lk.Unlock()
|
||||||
sh.workersLk.Unlock()
|
sh.workersLk.Unlock()
|
||||||
defer sh.workersLk.Lock() // we MUST return locked from this function
|
defer sh.workersLk.Lock() // we MUST return locked from this function
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user