wip improve scheduling of ready work
This commit is contained in:
parent
fc10281d96
commit
b87142ec8e
3
extern/sector-storage/sched.go
vendored
3
extern/sector-storage/sched.go
vendored
@ -117,7 +117,8 @@ type activeResources struct {
|
||||
gpuUsed bool
|
||||
cpuUse uint64
|
||||
|
||||
cond *sync.Cond
|
||||
cond *sync.Cond
|
||||
waiting int
|
||||
}
|
||||
|
||||
type workerRequest struct {
|
||||
|
14
extern/sector-storage/sched_resources.go
vendored
14
extern/sector-storage/sched_resources.go
vendored
@ -11,7 +11,9 @@ func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerInfo, r
|
||||
if a.cond == nil {
|
||||
a.cond = sync.NewCond(locker)
|
||||
}
|
||||
a.waiting++
|
||||
a.cond.Wait()
|
||||
a.waiting--
|
||||
}
|
||||
|
||||
a.add(wr.Resources, r)
|
||||
@ -19,13 +21,15 @@ func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerInfo, r
|
||||
err := cb()
|
||||
|
||||
a.free(wr.Resources, r)
|
||||
if a.cond != nil {
|
||||
a.cond.Broadcast()
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// must be called with the same lock as the one passed to withResources
|
||||
func (a *activeResources) hasWorkWaiting() bool {
|
||||
return a.waiting > 0
|
||||
}
|
||||
|
||||
func (a *activeResources) add(wr storiface.WorkerResources, r Resources) {
|
||||
if r.CanGPU {
|
||||
a.gpuUsed = true
|
||||
@ -42,6 +46,10 @@ func (a *activeResources) free(wr storiface.WorkerResources, r Resources) {
|
||||
a.cpuUse -= r.Threads(wr.CPUs)
|
||||
a.memUsedMin -= r.MinMemory
|
||||
a.memUsedMax -= r.MaxMemory
|
||||
|
||||
if a.cond != nil {
|
||||
a.cond.Broadcast()
|
||||
}
|
||||
}
|
||||
|
||||
// canHandleRequest evaluates if the worker has enough available resources to
|
||||
|
114
extern/sector-storage/sched_worker.go
vendored
114
extern/sector-storage/sched_worker.go
vendored
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||
@ -338,6 +339,11 @@ func (sw *schedWorker) workerCompactWindows() {
|
||||
}
|
||||
|
||||
func (sw *schedWorker) processAssignedWindows() {
|
||||
sw.assignReadyWork()
|
||||
sw.assignPreparingWork()
|
||||
}
|
||||
|
||||
func (sw *schedWorker) assignPreparingWork() {
|
||||
worker := sw.worker
|
||||
|
||||
assignLoop:
|
||||
@ -366,7 +372,7 @@ assignLoop:
|
||||
todo := firstWindow.todo[tidx]
|
||||
|
||||
log.Debugf("assign worker sector %d", todo.sector.ID.Number)
|
||||
err := sw.startProcessingTask(sw.taskDone, todo)
|
||||
err := sw.startProcessingTask(todo)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("startProcessingTask error: %+v", err)
|
||||
@ -387,7 +393,67 @@ assignLoop:
|
||||
}
|
||||
}
|
||||
|
||||
func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRequest) error {
|
||||
func (sw *schedWorker) assignReadyWork() {
|
||||
worker := sw.worker
|
||||
|
||||
worker.lk.Lock()
|
||||
defer worker.lk.Unlock()
|
||||
|
||||
if worker.active.hasWorkWaiting() {
|
||||
// prepared tasks have priority
|
||||
return
|
||||
}
|
||||
|
||||
assignLoop:
|
||||
// process windows in order
|
||||
for len(worker.activeWindows) > 0 {
|
||||
firstWindow := worker.activeWindows[0]
|
||||
|
||||
// process tasks within a window, preferring tasks at lower indexes
|
||||
for len(firstWindow.todo) > 0 {
|
||||
tidx := -1
|
||||
|
||||
for t, todo := range firstWindow.todo {
|
||||
if todo.taskType != sealtasks.TTCommit1 && todo.taskType != sealtasks.TTCommit2 { // todo put in task
|
||||
continue
|
||||
}
|
||||
|
||||
needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
|
||||
if worker.active.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) {
|
||||
tidx = t
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if tidx == -1 {
|
||||
break assignLoop
|
||||
}
|
||||
|
||||
todo := firstWindow.todo[tidx]
|
||||
|
||||
log.Debugf("assign worker sector %d (ready)", todo.sector.ID.Number)
|
||||
err := sw.startProcessingReadyTask(todo)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("startProcessingTask error: %+v", err)
|
||||
go todo.respond(xerrors.Errorf("startProcessingTask error: %w", err))
|
||||
}
|
||||
|
||||
// Note: we're not freeing window.allocated resources here very much on purpose
|
||||
copy(firstWindow.todo[tidx:], firstWindow.todo[tidx+1:])
|
||||
firstWindow.todo[len(firstWindow.todo)-1] = nil
|
||||
firstWindow.todo = firstWindow.todo[:len(firstWindow.todo)-1]
|
||||
}
|
||||
|
||||
copy(worker.activeWindows, worker.activeWindows[1:])
|
||||
worker.activeWindows[len(worker.activeWindows)-1] = nil
|
||||
worker.activeWindows = worker.activeWindows[:len(worker.activeWindows)-1]
|
||||
|
||||
sw.windowsRequested--
|
||||
}
|
||||
}
|
||||
|
||||
func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
|
||||
w, sh := sw.worker, sw.sched
|
||||
|
||||
needRes := ResourceTable[req.taskType][req.sector.ProofType]
|
||||
@ -406,7 +472,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
|
||||
w.lk.Unlock()
|
||||
|
||||
select {
|
||||
case taskDone <- struct{}{}:
|
||||
case sw.taskDone <- struct{}{}:
|
||||
case <-sh.closing:
|
||||
log.Warnf("scheduler closed while sending response (prepare error: %+v)", err)
|
||||
}
|
||||
@ -428,7 +494,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
|
||||
defer w.lk.Lock() // we MUST return locked from this function
|
||||
|
||||
select {
|
||||
case taskDone <- struct{}{}:
|
||||
case sw.taskDone <- struct{}{}:
|
||||
case <-sh.closing:
|
||||
}
|
||||
|
||||
@ -457,6 +523,46 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error {
|
||||
w, sh := sw.worker, sw.sched
|
||||
|
||||
needRes := ResourceTable[req.taskType][req.sector.ProofType]
|
||||
|
||||
w.active.add(w.info.Resources, needRes)
|
||||
|
||||
go func() {
|
||||
// Do the work!
|
||||
err := req.work(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc))
|
||||
|
||||
select {
|
||||
case req.ret <- workerResponse{err: err}:
|
||||
case <-req.ctx.Done():
|
||||
log.Warnf("request got cancelled before we could respond")
|
||||
case <-sh.closing:
|
||||
log.Warnf("scheduler closed while sending response")
|
||||
}
|
||||
|
||||
w.lk.Lock()
|
||||
|
||||
w.active.free(w.info.Resources, needRes)
|
||||
|
||||
select {
|
||||
case sw.taskDone <- struct{}{}:
|
||||
case <-sh.closing:
|
||||
log.Warnf("scheduler closed while sending response (prepare error: %+v)", err)
|
||||
}
|
||||
|
||||
w.lk.Unlock()
|
||||
|
||||
// This error should always be nil, since nothing is setting it, but just to be safe:
|
||||
if err != nil {
|
||||
log.Errorf("error executing worker (ready): %+v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sh *scheduler) workerCleanup(wid WorkerID, w *workerHandle) {
|
||||
select {
|
||||
case <-w.closingMgr:
|
||||
|
Loading…
Reference in New Issue
Block a user