From 9d0c8ae3dde08240d199c0493cefe013bafdfc02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 28 Aug 2020 21:38:21 +0200 Subject: [PATCH] sectorstorage: update sched tests for new logic --- extern/sector-storage/sched.go | 10 ++--- extern/sector-storage/sched_resources.go | 14 +++---- extern/sector-storage/sched_test.go | 53 +++++++++++++++++++++++- 3 files changed, 64 insertions(+), 13 deletions(-) diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index 3ca6fa1bf..16e51f9a6 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -359,7 +359,7 @@ func (sh *scheduler) trySched() { } // TODO: allow bigger windows - if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, worker.info.Resources) { + if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info.Resources) { continue } @@ -430,11 +430,11 @@ func (sh *scheduler) trySched() { log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd) // TODO: allow bigger windows - if !windows[wnd].allocated.canHandleRequest(needRes, wid, wr) { + if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", wr) { continue } - log.Debugf("SCHED ASSIGNED sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd) + log.Debugf("SCHED ASSIGNED sqi:%d sector %d task %s to window %d", sqi, task.sector.Number, task.taskType, wnd) windows[wnd].allocated.add(wr, needRes) @@ -577,7 +577,7 @@ func (sh *scheduler) runWorker(wid WorkerID) { worker.lk.Lock() for t, todo := range firstWindow.todo { needRes := ResourceTable[todo.taskType][sh.spt] - if worker.preparing.canHandleRequest(needRes, wid, worker.info.Resources) { + if worker.preparing.canHandleRequest(needRes, wid, "startPreparing", worker.info.Resources) { tidx = t break } @@ -628,7 +628,7 @@ func (sh *scheduler) workerCompactWindows(worker *workerHandle, wid WorkerID) in for ti, todo := range window.todo { needRes := ResourceTable[todo.taskType][sh.spt] - if !lower.allocated.canHandleRequest(needRes, wid, worker.info.Resources) { + if !lower.allocated.canHandleRequest(needRes, wid, "compactWindows", worker.info.Resources) { continue } diff --git a/extern/sector-storage/sched_resources.go b/extern/sector-storage/sched_resources.go index f468d5fe9..92a3b32ad 100644 --- a/extern/sector-storage/sched_resources.go +++ b/extern/sector-storage/sched_resources.go @@ -7,7 +7,7 @@ import ( ) func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error { - for !a.canHandleRequest(r, id, wr) { + for !a.canHandleRequest(r, id, "withResources", wr) { if a.cond == nil { a.cond = sync.NewCond(locker) } @@ -52,37 +52,37 @@ func (a *activeResources) free(wr storiface.WorkerResources, r Resources) { a.memUsedMax -= r.MaxMemory } -func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, res storiface.WorkerResources) bool { +func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, caller string, res storiface.WorkerResources) bool { // TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running) minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory if minNeedMem > res.MemPhysical { - log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib) + log.Debugf("sched: not scheduling on worker %d for %s; not enough physical memory - need: %dM, have %dM", wid, caller, minNeedMem/mib, res.MemPhysical/mib) return false } maxNeedMem := res.MemReserved + a.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory if maxNeedMem > res.MemSwap+res.MemPhysical { - log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib) + log.Debugf("sched: not scheduling on worker %d for %s; not enough virtual memory - need: %dM, have %dM", wid, caller, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib) return false } if needRes.MultiThread() { if a.cpuUse > 0 { - log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, a.cpuUse, res.CPUs) + log.Debugf("sched: not scheduling on worker %d for %s; multicore process needs %d threads, %d in use, target %d", wid, caller, res.CPUs, a.cpuUse, res.CPUs) return false } } else { if a.cpuUse+uint64(needRes.Threads) > res.CPUs { - log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, a.cpuUse, res.CPUs) + log.Debugf("sched: not scheduling on worker %d for %s; not enough threads, need %d, %d in use, target %d", wid, caller, needRes.Threads, a.cpuUse, res.CPUs) return false } } if len(res.GPUs) > 0 && needRes.CanGPU { if a.gpuUsed { - log.Debugf("sched: not scheduling on worker %d; GPU in use", wid) + log.Debugf("sched: not scheduling on worker %d for %s; GPU in use", wid, caller) return false } } diff --git a/extern/sector-storage/sched_test.go b/extern/sector-storage/sched_test.go index 0bc0404e4..4aa1dc49b 100644 --- a/extern/sector-storage/sched_test.go +++ b/extern/sector-storage/sched_test.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "runtime" + "sort" "sync" "testing" "time" @@ -305,7 +306,8 @@ func TestSched(t *testing.T) { done: map[string]chan struct{}{}, } - for _, task := range tasks { + for i, task := range tasks { + log.Info("TASK", i) task(t, sched, index, &rm) } @@ -419,6 +421,45 @@ func TestSched(t *testing.T) { ) } + diag := func() task { + return func(t *testing.T, s *scheduler, index *stores.Index, meta *runMeta) { + time.Sleep(20 * time.Millisecond) + for _, request := range s.diag().Requests { + log.Infof("!!! sDIAG: sid(%d) task(%s)", request.Sector.Number, request.TaskType) + } + + wj := (&Manager{sched: s}).WorkerJobs() + + type line struct { + storiface.WorkerJob + wid uint64 + } + + lines := make([]line, 0) + + for wid, jobs := range wj { + for _, job := range jobs { + lines = append(lines, line{ + WorkerJob: job, + wid: wid, + }) + } + } + + // oldest first + sort.Slice(lines, func(i, j int) bool { + if lines[i].RunWait != lines[j].RunWait { + return lines[i].RunWait < lines[j].RunWait + } + return lines[i].Start.Before(lines[j].Start) + }) + + for _, l := range lines { + log.Infof("!!! wDIAG: rw(%d) sid(%d) t(%s)", l.RunWait, l.Sector.Number, l.Task) + } + } + } + // run this one a bunch of times, it had a very annoying tendency to fail randomly for i := 0; i < 40; i++ { t.Run("pc1-pc2-prio", testFunc([]workerSpec{ @@ -427,6 +468,8 @@ func TestSched(t *testing.T) { // fill queues twoPC1("w0", 0, taskStarted), twoPC1("w1", 2, taskNotScheduled), + sched("w2", "fred", 4, sealtasks.TTPreCommit1), + taskNotScheduled("w2"), // windowed @@ -439,10 +482,18 @@ func TestSched(t *testing.T) { sched("t3", "fred", 10, sealtasks.TTPreCommit2), taskNotScheduled("t3"), + diag(), + twoPC1Act("w0", taskDone), twoPC1Act("w1", taskStarted), + taskNotScheduled("w2"), twoPC1Act("w1", taskDone), + taskStarted("w2"), + + taskDone("w2"), + + diag(), taskStarted("t3"), taskNotScheduled("t1"),