Merge pull request #7335 from filecoin-project/feat/sched-ready-work

sealing: Improve scheduling of ready work
This commit is contained in:
Łukasz Magiera 2021-10-07 18:49:02 +01:00 committed by GitHub
commit aed6e8d0be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 126 additions and 8 deletions

View File

@ -118,6 +118,7 @@ type activeResources struct {
cpuUse uint64 cpuUse uint64
cond *sync.Cond cond *sync.Cond
waiting int
} }
type workerRequest struct { type workerRequest struct {

View File

@ -11,7 +11,9 @@ func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerInfo, r
if a.cond == nil { if a.cond == nil {
a.cond = sync.NewCond(locker) a.cond = sync.NewCond(locker)
} }
a.waiting++
a.cond.Wait() a.cond.Wait()
a.waiting--
} }
a.add(wr.Resources, r) a.add(wr.Resources, r)
@ -19,13 +21,15 @@ func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerInfo, r
err := cb() err := cb()
a.free(wr.Resources, r) a.free(wr.Resources, r)
if a.cond != nil {
a.cond.Broadcast()
}
return err return err
} }
// must be called with the same lock as the one passed to withResources
func (a *activeResources) hasWorkWaiting() bool {
return a.waiting > 0
}
func (a *activeResources) add(wr storiface.WorkerResources, r Resources) { func (a *activeResources) add(wr storiface.WorkerResources, r Resources) {
if r.CanGPU { if r.CanGPU {
a.gpuUsed = true a.gpuUsed = true
@ -42,6 +46,10 @@ func (a *activeResources) free(wr storiface.WorkerResources, r Resources) {
a.cpuUse -= r.Threads(wr.CPUs) a.cpuUse -= r.Threads(wr.CPUs)
a.memUsedMin -= r.MinMemory a.memUsedMin -= r.MinMemory
a.memUsedMax -= r.MaxMemory a.memUsedMax -= r.MaxMemory
if a.cond != nil {
a.cond.Broadcast()
}
} }
// canHandleRequest evaluates if the worker has enough available resources to // canHandleRequest evaluates if the worker has enough available resources to

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"time" "time"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/stores"
@ -338,6 +339,11 @@ func (sw *schedWorker) workerCompactWindows() {
} }
func (sw *schedWorker) processAssignedWindows() { func (sw *schedWorker) processAssignedWindows() {
sw.assignReadyWork()
sw.assignPreparingWork()
}
func (sw *schedWorker) assignPreparingWork() {
worker := sw.worker worker := sw.worker
assignLoop: assignLoop:
@ -366,7 +372,7 @@ assignLoop:
todo := firstWindow.todo[tidx] todo := firstWindow.todo[tidx]
log.Debugf("assign worker sector %d", todo.sector.ID.Number) log.Debugf("assign worker sector %d", todo.sector.ID.Number)
err := sw.startProcessingTask(sw.taskDone, todo) err := sw.startProcessingTask(todo)
if err != nil { if err != nil {
log.Errorf("startProcessingTask error: %+v", err) log.Errorf("startProcessingTask error: %+v", err)
@ -387,7 +393,67 @@ assignLoop:
} }
} }
func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRequest) error { func (sw *schedWorker) assignReadyWork() {
worker := sw.worker
worker.lk.Lock()
defer worker.lk.Unlock()
if worker.active.hasWorkWaiting() {
// prepared tasks have priority
return
}
assignLoop:
// process windows in order
for len(worker.activeWindows) > 0 {
firstWindow := worker.activeWindows[0]
// process tasks within a window, preferring tasks at lower indexes
for len(firstWindow.todo) > 0 {
tidx := -1
for t, todo := range firstWindow.todo {
if todo.taskType != sealtasks.TTCommit1 && todo.taskType != sealtasks.TTCommit2 { // todo put in task
continue
}
needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
if worker.active.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) {
tidx = t
break
}
}
if tidx == -1 {
break assignLoop
}
todo := firstWindow.todo[tidx]
log.Debugf("assign worker sector %d (ready)", todo.sector.ID.Number)
err := sw.startProcessingReadyTask(todo)
if err != nil {
log.Errorf("startProcessingTask error: %+v", err)
go todo.respond(xerrors.Errorf("startProcessingTask error: %w", err))
}
// Note: we're not freeing window.allocated resources here very much on purpose
copy(firstWindow.todo[tidx:], firstWindow.todo[tidx+1:])
firstWindow.todo[len(firstWindow.todo)-1] = nil
firstWindow.todo = firstWindow.todo[:len(firstWindow.todo)-1]
}
copy(worker.activeWindows, worker.activeWindows[1:])
worker.activeWindows[len(worker.activeWindows)-1] = nil
worker.activeWindows = worker.activeWindows[:len(worker.activeWindows)-1]
sw.windowsRequested--
}
}
func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
w, sh := sw.worker, sw.sched w, sh := sw.worker, sw.sched
needRes := ResourceTable[req.taskType][req.sector.ProofType] needRes := ResourceTable[req.taskType][req.sector.ProofType]
@ -406,9 +472,10 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
w.lk.Unlock() w.lk.Unlock()
select { select {
case taskDone <- struct{}{}: case sw.taskDone <- struct{}{}:
case <-sh.closing: case <-sh.closing:
log.Warnf("scheduler closed while sending response (prepare error: %+v)", err) log.Warnf("scheduler closed while sending response (prepare error: %+v)", err)
default: // there is a notification pending already
} }
select { select {
@ -428,8 +495,9 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
defer w.lk.Lock() // we MUST return locked from this function defer w.lk.Lock() // we MUST return locked from this function
select { select {
case taskDone <- struct{}{}: case sw.taskDone <- struct{}{}:
case <-sh.closing: case <-sh.closing:
default: // there is a notification pending already
} }
// Do the work! // Do the work!
@ -457,6 +525,47 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
return nil return nil
} }
func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error {
w, sh := sw.worker, sw.sched
needRes := ResourceTable[req.taskType][req.sector.ProofType]
w.active.add(w.info.Resources, needRes)
go func() {
// Do the work!
err := req.work(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc))
select {
case req.ret <- workerResponse{err: err}:
case <-req.ctx.Done():
log.Warnf("request got cancelled before we could respond")
case <-sh.closing:
log.Warnf("scheduler closed while sending response")
}
w.lk.Lock()
w.active.free(w.info.Resources, needRes)
select {
case sw.taskDone <- struct{}{}:
case <-sh.closing:
log.Warnf("scheduler closed while sending response (prepare error: %+v)", err)
default: // there is a notification pending already
}
w.lk.Unlock()
// This error should always be nil, since nothing is setting it, but just to be safe:
if err != nil {
log.Errorf("error executing worker (ready): %+v", err)
}
}()
return nil
}
func (sh *scheduler) workerCleanup(wid WorkerID, w *workerHandle) { func (sh *scheduler) workerCleanup(wid WorkerID, w *workerHandle) {
select { select {
case <-w.closingMgr: case <-w.closingMgr: