move scheduling filtering logic down.

This commit is contained in:
Raúl Kripalani 2021-06-21 20:49:16 +01:00
parent 83839362c5
commit 59eab2df25
3 changed files with 18 additions and 12 deletions

View File

@ -393,8 +393,7 @@ func (sh *scheduler) trySched() {
} }
// TODO: allow bigger windows // TODO: allow bigger windows
ignoringResources := worker.info.IgnoreResources if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info) {
if !ignoringResources && !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info.Resources) {
continue continue
} }
@ -461,18 +460,18 @@ func (sh *scheduler) trySched() {
selectedWindow := -1 selectedWindow := -1
for _, wnd := range acceptableWindows[task.indexHeap] { for _, wnd := range acceptableWindows[task.indexHeap] {
wid := sh.openWindows[wnd].worker wid := sh.openWindows[wnd].worker
wr := sh.workers[wid].info.Resources info := sh.workers[wid].info
log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.ID.Number, wnd) log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.ID.Number, wnd)
// TODO: allow bigger windows // TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", wr) { if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", info) {
continue continue
} }
log.Debugf("SCHED ASSIGNED sqi:%d sector %d task %s to window %d", sqi, task.sector.ID.Number, task.taskType, wnd) log.Debugf("SCHED ASSIGNED sqi:%d sector %d task %s to window %d", sqi, task.sector.ID.Number, task.taskType, wnd)
windows[wnd].allocated.add(wr, needRes) windows[wnd].allocated.add(info.Resources, needRes)
// TODO: We probably want to re-sort acceptableWindows here based on new // TODO: We probably want to re-sort acceptableWindows here based on new
// workerHandle.utilization + windows[wnd].allocated.utilization (workerHandle.utilization is used in all // workerHandle.utilization + windows[wnd].allocated.utilization (workerHandle.utilization is used in all
// task selectors, but not in the same way, so need to figure out how to do that in a non-O(n^2 way), and // task selectors, but not in the same way, so need to figure out how to do that in a non-O(n^2 way), and

View File

@ -6,7 +6,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
) )
func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error { func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerInfo, r Resources, locker sync.Locker, cb func() error) error {
for !a.canHandleRequest(r, id, "withResources", wr) { for !a.canHandleRequest(r, id, "withResources", wr) {
if a.cond == nil { if a.cond == nil {
a.cond = sync.NewCond(locker) a.cond = sync.NewCond(locker)
@ -14,11 +14,11 @@ func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResource
a.cond.Wait() a.cond.Wait()
} }
a.add(wr, r) a.add(wr.Resources, r)
err := cb() err := cb()
a.free(wr, r) a.free(wr.Resources, r)
if a.cond != nil { if a.cond != nil {
a.cond.Broadcast() a.cond.Broadcast()
} }
@ -44,8 +44,15 @@ func (a *activeResources) free(wr storiface.WorkerResources, r Resources) {
a.memUsedMax -= r.MaxMemory a.memUsedMax -= r.MaxMemory
} }
func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, caller string, res storiface.WorkerResources) bool { // canHandleRequest evaluates if the worker has enough available resources to
// handle the request.
func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, caller string, info storiface.WorkerInfo) bool {
if info.IgnoreResources {
// shortcircuit; if this worker is ignoring resources, it can always handle the request.
return true
}
res := info.Resources
// TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running) // TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running)
minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory
if minNeedMem > res.MemPhysical { if minNeedMem > res.MemPhysical {

View File

@ -296,7 +296,7 @@ func (sw *schedWorker) workerCompactWindows() {
for ti, todo := range window.todo { for ti, todo := range window.todo {
needRes := ResourceTable[todo.taskType][todo.sector.ProofType] needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
if !lower.allocated.canHandleRequest(needRes, sw.wid, "compactWindows", worker.info.Resources) { if !lower.allocated.canHandleRequest(needRes, sw.wid, "compactWindows", worker.info) {
continue continue
} }
@ -352,7 +352,7 @@ assignLoop:
worker.lk.Lock() worker.lk.Lock()
for t, todo := range firstWindow.todo { for t, todo := range firstWindow.todo {
needRes := ResourceTable[todo.taskType][todo.sector.ProofType] needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
if worker.preparing.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info.Resources) { if worker.preparing.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) {
tidx = t tidx = t
break break
} }
@ -424,7 +424,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
} }
// wait (if needed) for resources in the 'active' window // wait (if needed) for resources in the 'active' window
err = w.active.withResources(sw.wid, w.info.Resources, needRes, &sh.workersLk, func() error { err = w.active.withResources(sw.wid, w.info, needRes, &sh.workersLk, func() error {
w.lk.Lock() w.lk.Lock()
w.preparing.free(w.info.Resources, needRes) w.preparing.free(w.info.Resources, needRes)
w.lk.Unlock() w.lk.Unlock()