From cab0c74e08b35b3687c60f24a7a8e2724e5f4379 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 17 Jul 2020 01:26:55 +0200 Subject: [PATCH] more sched test debugging --- sched.go | 11 ++++ sched_test.go | 152 +++++++++++++++++++++++++++++++------------------- 2 files changed, 105 insertions(+), 58 deletions(-) diff --git a/sched.go b/sched.go index 241440beb..d89dad3f5 100644 --- a/sched.go +++ b/sched.go @@ -69,6 +69,7 @@ type scheduler struct { openWindows []*schedWindowRequest closing chan struct{} + testSync chan struct{} // used for testing } type workerHandle struct { @@ -195,6 +196,9 @@ func (sh *scheduler) runSched() { heap.Push(sh.schedQueue, req) sh.trySched() + if sh.testSync != nil { + sh.testSync <- struct{}{} + } case req := <-sh.windowRequests: sh.openWindows = append(sh.openWindows, req) sh.trySched() @@ -226,6 +230,8 @@ func (sh *scheduler) trySched() { windows := make([]schedWindow, len(sh.openWindows)) acceptableWindows := make([][]int, sh.schedQueue.Len()) + log.Debugf("trySched %d queued; %d open windows", sh.schedQueue.Len(), len(windows)) + // Step 1 for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ { task := (*sh.schedQueue)[sqi] @@ -295,11 +301,15 @@ func (sh *scheduler) trySched() { wid := sh.openWindows[wnd].worker wr := sh.workers[wid].info.Resources + log.Debugf("trySched try assign sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd) + // TODO: allow bigger windows if !windows[wnd].allocated.canHandleRequest(needRes, wid, wr) { continue } + log.Debugf("trySched ASSIGNED sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd) + windows[wnd].allocated.add(wr, needRes) selectedWindow = wnd @@ -419,6 +429,7 @@ func (sh *scheduler) runWorker(wid WorkerID) { break assignLoop } + log.Debugf("assign worker sector %d", todo.sector.Number) err := sh.assignWorker(taskDone, wid, worker, todo) sh.workersLk.Unlock() diff --git a/sched_test.go b/sched_test.go index 1c7b88905..26961a4f6 100644 --- a/sched_test.go +++ b/sched_test.go @@ -2,7 +2,9 @@ package sectorstorage import ( "context" + "fmt" "io" + "runtime" "sync" "testing" "time" @@ -171,13 +173,10 @@ func TestSchedStartStop(t *testing.T) { } func TestSched(t *testing.T) { - ctx := context.Background() - spt := abi.RegisteredSealProof_StackedDrg32GiBV1 + ctx, done := context.WithTimeout(context.Background(), 20 * time.Second) + defer done() - sectorAte := abi.SectorID{ - Miner: 8, - Number: 8, - } + spt := abi.RegisteredSealProof_StackedDrg32GiBV1 type workerSpec struct { name string @@ -196,7 +195,10 @@ func TestSched(t *testing.T) { type task func(*testing.T, *scheduler, *stores.Index, *runMeta) - sched := func(taskName, expectWorker string, taskType sealtasks.TaskType) task { + sched := func(taskName, expectWorker string, sid abi.SectorNumber, taskType sealtasks.TaskType) task { + _, _, l, _ := runtime.Caller(1) + _, _, l2, _ := runtime.Caller(2) + return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) { done := make(chan struct{}) rm.done[taskName] = done @@ -207,7 +209,12 @@ func TestSched(t *testing.T) { go func() { defer rm.wg.Done() - err := sched.Schedule(ctx, sectorAte, taskType, sel, noopPrepare, func(ctx context.Context, w Worker) error { + sectorNum := abi.SectorID{ + Miner: 8, + Number: sid, + } + + err := sched.Schedule(ctx, sectorNum, taskType, sel, noopPrepare, func(ctx context.Context, w Worker) error { wi, err := w.Info(ctx) require.NoError(t, err) @@ -226,29 +233,45 @@ func TestSched(t *testing.T) { return nil }) - require.NoError(t, err) + require.NoError(t, err, fmt.Sprint(l, l2)) }() + + <-sched.testSync } } taskStarted := func(name string) task { + _, _, l, _ := runtime.Caller(1) + _, _, l2, _ := runtime.Caller(2) return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) { - rm.done[name] <- struct{}{} + select { + case rm.done[name] <- struct{}{}: + case <-ctx.Done(): + t.Fatal("ctx error", ctx.Err(), l, l2) + } } } taskDone := func(name string) task { + _, _, l, _ := runtime.Caller(1) + _, _, l2, _ := runtime.Caller(2) return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) { - rm.done[name] <- struct{}{} + select { + case rm.done[name] <- struct{}{}: + case <-ctx.Done(): + t.Fatal("ctx error", ctx.Err(), l, l2) + } close(rm.done[name]) } } taskNotScheduled := func(name string) task { + _, _, l, _ := runtime.Caller(1) + _, _, l2, _ := runtime.Caller(2) return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) { select { case rm.done[name] <- struct{}{}: - t.Fatal("not expected") + t.Fatal("not expected", l, l2) case <-time.After(10 * time.Millisecond): // TODO: better synchronization thingy } } @@ -259,6 +282,8 @@ func TestSched(t *testing.T) { index := stores.NewIndex() sched := newScheduler(spt) + sched.testSync = make(chan struct{}) + go sched.runSched() for _, worker := range workers { @@ -291,7 +316,7 @@ func TestSched(t *testing.T) { t.Run("one-pc1", testFunc([]workerSpec{ {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, }, []task{ - sched("pc1-1", "fred", sealtasks.TTPreCommit1), + sched("pc1-1", "fred", 8, sealtasks.TTPreCommit1), taskDone("pc1-1"), })) @@ -299,7 +324,7 @@ func TestSched(t *testing.T) { {name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}}, {name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, }, []task{ - sched("pc1-1", "fred1", sealtasks.TTPreCommit1), + sched("pc1-1", "fred1", 8, sealtasks.TTPreCommit1), taskDone("pc1-1"), })) @@ -307,17 +332,17 @@ func TestSched(t *testing.T) { {name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, {name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}}, }, []task{ - sched("pc1-1", "fred1", sealtasks.TTPreCommit1), + sched("pc1-1", "fred1", 8, sealtasks.TTPreCommit1), taskDone("pc1-1"), })) t.Run("pc1-block-pc2", testFunc([]workerSpec{ {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}}, }, []task{ - sched("pc1", "fred", sealtasks.TTPreCommit1), + sched("pc1", "fred", 8, sealtasks.TTPreCommit1), taskStarted("pc1"), - sched("pc2", "fred", sealtasks.TTPreCommit2), + sched("pc2", "fred", 8, sealtasks.TTPreCommit2), taskNotScheduled("pc2"), taskDone("pc1"), @@ -327,10 +352,10 @@ func TestSched(t *testing.T) { t.Run("pc2-block-pc1", testFunc([]workerSpec{ {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}}, }, []task{ - sched("pc2", "fred", sealtasks.TTPreCommit2), + sched("pc2", "fred", 8, sealtasks.TTPreCommit2), taskStarted("pc2"), - sched("pc1", "fred", sealtasks.TTPreCommit1), + sched("pc1", "fred", 8, sealtasks.TTPreCommit1), taskNotScheduled("pc1"), taskDone("pc2"), @@ -340,20 +365,20 @@ func TestSched(t *testing.T) { t.Run("pc1-batching", testFunc([]workerSpec{ {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, }, []task{ - sched("t1", "fred", sealtasks.TTPreCommit1), + sched("t1", "fred", 8, sealtasks.TTPreCommit1), taskStarted("t1"), - sched("t2", "fred", sealtasks.TTPreCommit1), + sched("t2", "fred", 8, sealtasks.TTPreCommit1), taskStarted("t2"), // with worker settings, we can only run 2 parallel PC1s // start 2 more to fill fetch buffer - sched("t3", "fred", sealtasks.TTPreCommit1), + sched("t3", "fred", 8, sealtasks.TTPreCommit1), taskNotScheduled("t3"), - sched("t4", "fred", sealtasks.TTPreCommit1), + sched("t4", "fred", 8, sealtasks.TTPreCommit1), taskNotScheduled("t4"), taskDone("t1"), @@ -366,60 +391,71 @@ func TestSched(t *testing.T) { taskDone("t4"), })) - twoPC1 := func(prefix string, schedAssert func(name string) task) task { + twoPC1 := func(prefix string, sid abi.SectorNumber, schedAssert func(name string) task) task { return multTask( - sched(prefix+"-a", "fred", sealtasks.TTPreCommit1), + sched(prefix+"-a", "fred", sid, sealtasks.TTPreCommit1), schedAssert(prefix+"-a"), - sched(prefix+"-b", "fred", sealtasks.TTPreCommit1), + sched(prefix+"-b", "fred", sid + 1, sealtasks.TTPreCommit1), schedAssert(prefix+"-b"), ) } - twoPC1Done := func(prefix string) task { + twoPC1Act := func(prefix string, schedAssert func(name string) task) task { return multTask( - taskDone(prefix+"-1"), - taskDone(prefix+"-b"), + schedAssert(prefix+"-a"), + schedAssert(prefix+"-b"), ) } - t.Run("pc1-pc2-prio", testFunc([]workerSpec{ - {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}}, - }, []task{ - // fill exec/fetch buffers - twoPC1("w0", taskStarted), - twoPC1("w1", taskNotScheduled), + for i := 0; i < 100; i++ { + t.Run("pc1-pc2-prio", testFunc([]workerSpec{ + {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}}, + }, []task{ + // fill exec/fetch buffers + twoPC1("w0", 0, taskStarted), + twoPC1("w1", 2, taskNotScheduled), - // fill worker windows - twoPC1("w2", taskNotScheduled), - twoPC1("w3", taskNotScheduled), + // fill worker windows + twoPC1("w2", 4, taskNotScheduled), + //twoPC1("w3", taskNotScheduled), - // windowed + // windowed - sched("t1", "fred", sealtasks.TTPreCommit1), - taskNotScheduled("t1"), + sched("t1", "fred", 6, sealtasks.TTPreCommit1), + taskNotScheduled("t1"), - sched("t2", "fred", sealtasks.TTPreCommit1), - taskNotScheduled("t2"), + sched("t2", "fred", 7, sealtasks.TTPreCommit1), + taskNotScheduled("t2"), - sched("t3", "fred", sealtasks.TTPreCommit2), - taskNotScheduled("t3"), + sched("t3", "fred", 8, sealtasks.TTPreCommit2), + taskNotScheduled("t3"), - twoPC1Done("w0"), - twoPC1Done("w1"), - twoPC1Done("w2"), - twoPC1Done("w3"), + twoPC1Act("w0", taskDone), + twoPC1Act("w1", taskStarted), + twoPC1Act("w2", taskNotScheduled), + //twoPC1Act("w3", taskNotScheduled), - taskStarted("t1"), - taskNotScheduled("t2"), - taskNotScheduled("t3"), + twoPC1Act("w1", taskDone), + twoPC1Act("w2", taskStarted), + //twoPC1Act("w3", taskNotScheduled), - taskDone("t1"), + twoPC1Act("w2", taskDone), + //twoPC1Act("w3", taskStarted), - taskStarted("t2"), - taskStarted("t3"), + //twoPC1Act("w3", taskDone), - taskDone("t2"), - taskDone("t3"), - })) + taskStarted("t3"), + taskNotScheduled("t1"), + taskNotScheduled("t2"), + + taskDone("t3"), + + taskStarted("t1"), + taskStarted("t2"), + + taskDone("t1"), + taskDone("t2"), + })) + } }