Merge pull request #4778 from filecoin-project/fix/miner-fixes

Fix scheduler lockups after storage is freed
This commit is contained in:
Łukasz Magiera 2020-11-10 19:17:10 +01:00 committed by GitHub
commit 61fa94617d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 19 deletions

View File

@ -139,10 +139,17 @@ func (sw *schedWorker) handleWorker() {
// wait for more tasks to be assigned by the main scheduler or for the worker // wait for more tasks to be assigned by the main scheduler or for the worker
// to finish precessing a task // to finish precessing a task
update, ok := sw.waitForUpdates() update, pokeSched, ok := sw.waitForUpdates()
if !ok { if !ok {
return return
} }
if pokeSched {
// a task has finished preparing, which can mean that we've freed some space on some worker
select {
case sched.workerChange <- struct{}{}:
default: // workerChange is buffered, and scheduling is global, so it's ok if we don't send here
}
}
if update { if update {
break break
} }
@ -257,23 +264,23 @@ func (sw *schedWorker) requestWindows() bool {
return true return true
} }
func (sw *schedWorker) waitForUpdates() (update bool, ok bool) { func (sw *schedWorker) waitForUpdates() (update bool, sched bool, ok bool) {
select { select {
case <-sw.heartbeatTimer.C: case <-sw.heartbeatTimer.C:
return false, true return false, false, true
case w := <-sw.scheduledWindows: case w := <-sw.scheduledWindows:
sw.worker.wndLk.Lock() sw.worker.wndLk.Lock()
sw.worker.activeWindows = append(sw.worker.activeWindows, w) sw.worker.activeWindows = append(sw.worker.activeWindows, w)
sw.worker.wndLk.Unlock() sw.worker.wndLk.Unlock()
return true, true return true, false, true
case <-sw.taskDone: case <-sw.taskDone:
log.Debugw("task done", "workerid", sw.wid) log.Debugw("task done", "workerid", sw.wid)
return true, true return true, true, true
case <-sw.sched.closing: case <-sw.sched.closing:
case <-sw.worker.closingMgr: case <-sw.worker.closingMgr:
} }
return false, false return false, false, false
} }
func (sw *schedWorker) workerCompactWindows() { func (sw *schedWorker) workerCompactWindows() {

View File

@ -298,24 +298,28 @@ func (st *Local) reportHealth(ctx context.Context) {
return return
} }
st.localLk.RLock() st.reportStorage(ctx)
}
}
toReport := map[ID]HealthReport{} func (st *Local) reportStorage(ctx context.Context) {
for id, p := range st.paths { st.localLk.RLock()
stat, err := p.stat(st.localStorage)
toReport[id] = HealthReport{ toReport := map[ID]HealthReport{}
Stat: stat, for id, p := range st.paths {
Err: err, stat, err := p.stat(st.localStorage)
}
toReport[id] = HealthReport{
Stat: stat,
Err: err,
} }
}
st.localLk.RUnlock() st.localLk.RUnlock()
for id, report := range toReport { for id, report := range toReport {
if err := st.index.StorageReportHealth(ctx, id, report); err != nil { if err := st.index.StorageReportHealth(ctx, id, report); err != nil {
log.Warnf("error reporting storage health for %s (%+v): %+v", id, report, err) log.Warnf("error reporting storage health for %s (%+v): %+v", id, report, err)
}
} }
} }
} }
@ -568,6 +572,8 @@ func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ storifa
log.Errorf("removing sector (%v) from %s: %+v", sid, spath, err) log.Errorf("removing sector (%v) from %s: %+v", sid, spath, err)
} }
st.reportStorage(ctx) // report freed space
return nil return nil
} }
@ -623,6 +629,8 @@ func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, ssize abi.Sect
} }
} }
st.reportStorage(ctx) // report space use changes
return nil return nil
} }