Merge pull request #3489 from filecoin-project/fix/sched-deadlocks
sealing sched: Fix deadlock between worker.wndLk / workersLk
This commit is contained in:
commit
5f79ff340d
13
extern/sector-storage/sched.go
vendored
13
extern/sector-storage/sched.go
vendored
@ -563,6 +563,7 @@ func (sh *scheduler) runWorker(wid WorkerID) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sh.workersLk.RLock()
|
||||||
worker.wndLk.Lock()
|
worker.wndLk.Lock()
|
||||||
|
|
||||||
windowsRequested -= sh.workerCompactWindows(worker, wid)
|
windowsRequested -= sh.workerCompactWindows(worker, wid)
|
||||||
@ -574,8 +575,6 @@ func (sh *scheduler) runWorker(wid WorkerID) {
|
|||||||
|
|
||||||
// process tasks within a window, preferring tasks at lower indexes
|
// process tasks within a window, preferring tasks at lower indexes
|
||||||
for len(firstWindow.todo) > 0 {
|
for len(firstWindow.todo) > 0 {
|
||||||
sh.workersLk.RLock()
|
|
||||||
|
|
||||||
tidx := -1
|
tidx := -1
|
||||||
|
|
||||||
worker.lk.Lock()
|
worker.lk.Lock()
|
||||||
@ -589,7 +588,6 @@ func (sh *scheduler) runWorker(wid WorkerID) {
|
|||||||
worker.lk.Unlock()
|
worker.lk.Unlock()
|
||||||
|
|
||||||
if tidx == -1 {
|
if tidx == -1 {
|
||||||
sh.workersLk.RUnlock()
|
|
||||||
break assignLoop
|
break assignLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -597,7 +595,6 @@ func (sh *scheduler) runWorker(wid WorkerID) {
|
|||||||
|
|
||||||
log.Debugf("assign worker sector %d", todo.sector.Number)
|
log.Debugf("assign worker sector %d", todo.sector.Number)
|
||||||
err := sh.assignWorker(taskDone, wid, worker, todo)
|
err := sh.assignWorker(taskDone, wid, worker, todo)
|
||||||
sh.workersLk.RUnlock()
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("assignWorker error: %+v", err)
|
log.Error("assignWorker error: %+v", err)
|
||||||
@ -618,6 +615,7 @@ func (sh *scheduler) runWorker(wid WorkerID) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
worker.wndLk.Unlock()
|
worker.wndLk.Unlock()
|
||||||
|
sh.workersLk.RUnlock()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@ -776,14 +774,19 @@ func (sh *scheduler) dropWorker(wid WorkerID) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sh *scheduler) workerCleanup(wid WorkerID, w *workerHandle) {
|
func (sh *scheduler) workerCleanup(wid WorkerID, w *workerHandle) {
|
||||||
if !w.cleanupStarted {
|
select {
|
||||||
|
case <-w.closingMgr:
|
||||||
|
default:
|
||||||
close(w.closingMgr)
|
close(w.closingMgr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sh.workersLk.Unlock()
|
||||||
select {
|
select {
|
||||||
case <-w.closedMgr:
|
case <-w.closedMgr:
|
||||||
case <-time.After(time.Second):
|
case <-time.After(time.Second):
|
||||||
log.Errorf("timeout closing worker manager goroutine %d", wid)
|
log.Errorf("timeout closing worker manager goroutine %d", wid)
|
||||||
}
|
}
|
||||||
|
sh.workersLk.Lock()
|
||||||
|
|
||||||
if !w.cleanupStarted {
|
if !w.cleanupStarted {
|
||||||
w.cleanupStarted = true
|
w.cleanupStarted = true
|
||||||
|
Loading…
Reference in New Issue
Block a user