sectorstorage: update sched tests for new logic

This commit is contained in:
Łukasz Magiera 2020-08-28 21:38:21 +02:00
parent 4a75e1e4b4
commit 9d0c8ae3dd
3 changed files with 64 additions and 13 deletions

View File

@ -359,7 +359,7 @@ func (sh *scheduler) trySched() {
} }
// TODO: allow bigger windows // TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, worker.info.Resources) { if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info.Resources) {
continue continue
} }
@ -430,11 +430,11 @@ func (sh *scheduler) trySched() {
log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd) log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd)
// TODO: allow bigger windows // TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, wid, wr) { if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", wr) {
continue continue
} }
log.Debugf("SCHED ASSIGNED sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd) log.Debugf("SCHED ASSIGNED sqi:%d sector %d task %s to window %d", sqi, task.sector.Number, task.taskType, wnd)
windows[wnd].allocated.add(wr, needRes) windows[wnd].allocated.add(wr, needRes)
@ -577,7 +577,7 @@ func (sh *scheduler) runWorker(wid WorkerID) {
worker.lk.Lock() worker.lk.Lock()
for t, todo := range firstWindow.todo { for t, todo := range firstWindow.todo {
needRes := ResourceTable[todo.taskType][sh.spt] needRes := ResourceTable[todo.taskType][sh.spt]
if worker.preparing.canHandleRequest(needRes, wid, worker.info.Resources) { if worker.preparing.canHandleRequest(needRes, wid, "startPreparing", worker.info.Resources) {
tidx = t tidx = t
break break
} }
@ -628,7 +628,7 @@ func (sh *scheduler) workerCompactWindows(worker *workerHandle, wid WorkerID) in
for ti, todo := range window.todo { for ti, todo := range window.todo {
needRes := ResourceTable[todo.taskType][sh.spt] needRes := ResourceTable[todo.taskType][sh.spt]
if !lower.allocated.canHandleRequest(needRes, wid, worker.info.Resources) { if !lower.allocated.canHandleRequest(needRes, wid, "compactWindows", worker.info.Resources) {
continue continue
} }

View File

@ -7,7 +7,7 @@ import (
) )
func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error { func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error {
for !a.canHandleRequest(r, id, wr) { for !a.canHandleRequest(r, id, "withResources", wr) {
if a.cond == nil { if a.cond == nil {
a.cond = sync.NewCond(locker) a.cond = sync.NewCond(locker)
} }
@ -52,37 +52,37 @@ func (a *activeResources) free(wr storiface.WorkerResources, r Resources) {
a.memUsedMax -= r.MaxMemory a.memUsedMax -= r.MaxMemory
} }
func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, res storiface.WorkerResources) bool { func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, caller string, res storiface.WorkerResources) bool {
// TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running) // TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running)
minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory
if minNeedMem > res.MemPhysical { if minNeedMem > res.MemPhysical {
log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib) log.Debugf("sched: not scheduling on worker %d for %s; not enough physical memory - need: %dM, have %dM", wid, caller, minNeedMem/mib, res.MemPhysical/mib)
return false return false
} }
maxNeedMem := res.MemReserved + a.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory maxNeedMem := res.MemReserved + a.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory
if maxNeedMem > res.MemSwap+res.MemPhysical { if maxNeedMem > res.MemSwap+res.MemPhysical {
log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib) log.Debugf("sched: not scheduling on worker %d for %s; not enough virtual memory - need: %dM, have %dM", wid, caller, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib)
return false return false
} }
if needRes.MultiThread() { if needRes.MultiThread() {
if a.cpuUse > 0 { if a.cpuUse > 0 {
log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, a.cpuUse, res.CPUs) log.Debugf("sched: not scheduling on worker %d for %s; multicore process needs %d threads, %d in use, target %d", wid, caller, res.CPUs, a.cpuUse, res.CPUs)
return false return false
} }
} else { } else {
if a.cpuUse+uint64(needRes.Threads) > res.CPUs { if a.cpuUse+uint64(needRes.Threads) > res.CPUs {
log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, a.cpuUse, res.CPUs) log.Debugf("sched: not scheduling on worker %d for %s; not enough threads, need %d, %d in use, target %d", wid, caller, needRes.Threads, a.cpuUse, res.CPUs)
return false return false
} }
} }
if len(res.GPUs) > 0 && needRes.CanGPU { if len(res.GPUs) > 0 && needRes.CanGPU {
if a.gpuUsed { if a.gpuUsed {
log.Debugf("sched: not scheduling on worker %d; GPU in use", wid) log.Debugf("sched: not scheduling on worker %d for %s; GPU in use", wid, caller)
return false return false
} }
} }

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"io" "io"
"runtime" "runtime"
"sort"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -305,7 +306,8 @@ func TestSched(t *testing.T) {
done: map[string]chan struct{}{}, done: map[string]chan struct{}{},
} }
for _, task := range tasks { for i, task := range tasks {
log.Info("TASK", i)
task(t, sched, index, &rm) task(t, sched, index, &rm)
} }
@ -419,6 +421,45 @@ func TestSched(t *testing.T) {
) )
} }
diag := func() task {
return func(t *testing.T, s *scheduler, index *stores.Index, meta *runMeta) {
time.Sleep(20 * time.Millisecond)
for _, request := range s.diag().Requests {
log.Infof("!!! sDIAG: sid(%d) task(%s)", request.Sector.Number, request.TaskType)
}
wj := (&Manager{sched: s}).WorkerJobs()
type line struct {
storiface.WorkerJob
wid uint64
}
lines := make([]line, 0)
for wid, jobs := range wj {
for _, job := range jobs {
lines = append(lines, line{
WorkerJob: job,
wid: wid,
})
}
}
// oldest first
sort.Slice(lines, func(i, j int) bool {
if lines[i].RunWait != lines[j].RunWait {
return lines[i].RunWait < lines[j].RunWait
}
return lines[i].Start.Before(lines[j].Start)
})
for _, l := range lines {
log.Infof("!!! wDIAG: rw(%d) sid(%d) t(%s)", l.RunWait, l.Sector.Number, l.Task)
}
}
}
// run this one a bunch of times, it had a very annoying tendency to fail randomly // run this one a bunch of times, it had a very annoying tendency to fail randomly
for i := 0; i < 40; i++ { for i := 0; i < 40; i++ {
t.Run("pc1-pc2-prio", testFunc([]workerSpec{ t.Run("pc1-pc2-prio", testFunc([]workerSpec{
@ -427,6 +468,8 @@ func TestSched(t *testing.T) {
// fill queues // fill queues
twoPC1("w0", 0, taskStarted), twoPC1("w0", 0, taskStarted),
twoPC1("w1", 2, taskNotScheduled), twoPC1("w1", 2, taskNotScheduled),
sched("w2", "fred", 4, sealtasks.TTPreCommit1),
taskNotScheduled("w2"),
// windowed // windowed
@ -439,10 +482,18 @@ func TestSched(t *testing.T) {
sched("t3", "fred", 10, sealtasks.TTPreCommit2), sched("t3", "fred", 10, sealtasks.TTPreCommit2),
taskNotScheduled("t3"), taskNotScheduled("t3"),
diag(),
twoPC1Act("w0", taskDone), twoPC1Act("w0", taskDone),
twoPC1Act("w1", taskStarted), twoPC1Act("w1", taskStarted),
taskNotScheduled("w2"),
twoPC1Act("w1", taskDone), twoPC1Act("w1", taskDone),
taskStarted("w2"),
taskDone("w2"),
diag(),
taskStarted("t3"), taskStarted("t3"),
taskNotScheduled("t1"), taskNotScheduled("t1"),