more sched test debugging

This commit is contained in:
Łukasz Magiera 2020-07-17 01:26:55 +02:00
parent 2e557573f4
commit cab0c74e08
2 changed files with 105 additions and 58 deletions

View File

@ -69,6 +69,7 @@ type scheduler struct {
openWindows []*schedWindowRequest
closing chan struct{}
testSync chan struct{} // used for testing
}
type workerHandle struct {
@ -195,6 +196,9 @@ func (sh *scheduler) runSched() {
heap.Push(sh.schedQueue, req)
sh.trySched()
if sh.testSync != nil {
sh.testSync <- struct{}{}
}
case req := <-sh.windowRequests:
sh.openWindows = append(sh.openWindows, req)
sh.trySched()
@ -226,6 +230,8 @@ func (sh *scheduler) trySched() {
windows := make([]schedWindow, len(sh.openWindows))
acceptableWindows := make([][]int, sh.schedQueue.Len())
log.Debugf("trySched %d queued; %d open windows", sh.schedQueue.Len(), len(windows))
// Step 1
for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ {
task := (*sh.schedQueue)[sqi]
@ -295,11 +301,15 @@ func (sh *scheduler) trySched() {
wid := sh.openWindows[wnd].worker
wr := sh.workers[wid].info.Resources
log.Debugf("trySched try assign sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd)
// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, wid, wr) {
continue
}
log.Debugf("trySched ASSIGNED sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd)
windows[wnd].allocated.add(wr, needRes)
selectedWindow = wnd
@ -419,6 +429,7 @@ func (sh *scheduler) runWorker(wid WorkerID) {
break assignLoop
}
log.Debugf("assign worker sector %d", todo.sector.Number)
err := sh.assignWorker(taskDone, wid, worker, todo)
sh.workersLk.Unlock()

View File

@ -2,7 +2,9 @@ package sectorstorage
import (
"context"
"fmt"
"io"
"runtime"
"sync"
"testing"
"time"
@ -171,13 +173,10 @@ func TestSchedStartStop(t *testing.T) {
}
func TestSched(t *testing.T) {
ctx := context.Background()
spt := abi.RegisteredSealProof_StackedDrg32GiBV1
ctx, done := context.WithTimeout(context.Background(), 20 * time.Second)
defer done()
sectorAte := abi.SectorID{
Miner: 8,
Number: 8,
}
spt := abi.RegisteredSealProof_StackedDrg32GiBV1
type workerSpec struct {
name string
@ -196,7 +195,10 @@ func TestSched(t *testing.T) {
type task func(*testing.T, *scheduler, *stores.Index, *runMeta)
sched := func(taskName, expectWorker string, taskType sealtasks.TaskType) task {
sched := func(taskName, expectWorker string, sid abi.SectorNumber, taskType sealtasks.TaskType) task {
_, _, l, _ := runtime.Caller(1)
_, _, l2, _ := runtime.Caller(2)
return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) {
done := make(chan struct{})
rm.done[taskName] = done
@ -207,7 +209,12 @@ func TestSched(t *testing.T) {
go func() {
defer rm.wg.Done()
err := sched.Schedule(ctx, sectorAte, taskType, sel, noopPrepare, func(ctx context.Context, w Worker) error {
sectorNum := abi.SectorID{
Miner: 8,
Number: sid,
}
err := sched.Schedule(ctx, sectorNum, taskType, sel, noopPrepare, func(ctx context.Context, w Worker) error {
wi, err := w.Info(ctx)
require.NoError(t, err)
@ -226,29 +233,45 @@ func TestSched(t *testing.T) {
return nil
})
require.NoError(t, err)
require.NoError(t, err, fmt.Sprint(l, l2))
}()
<-sched.testSync
}
}
taskStarted := func(name string) task {
_, _, l, _ := runtime.Caller(1)
_, _, l2, _ := runtime.Caller(2)
return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) {
rm.done[name] <- struct{}{}
select {
case rm.done[name] <- struct{}{}:
case <-ctx.Done():
t.Fatal("ctx error", ctx.Err(), l, l2)
}
}
}
taskDone := func(name string) task {
_, _, l, _ := runtime.Caller(1)
_, _, l2, _ := runtime.Caller(2)
return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) {
rm.done[name] <- struct{}{}
select {
case rm.done[name] <- struct{}{}:
case <-ctx.Done():
t.Fatal("ctx error", ctx.Err(), l, l2)
}
close(rm.done[name])
}
}
taskNotScheduled := func(name string) task {
_, _, l, _ := runtime.Caller(1)
_, _, l2, _ := runtime.Caller(2)
return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) {
select {
case rm.done[name] <- struct{}{}:
t.Fatal("not expected")
t.Fatal("not expected", l, l2)
case <-time.After(10 * time.Millisecond): // TODO: better synchronization thingy
}
}
@ -259,6 +282,8 @@ func TestSched(t *testing.T) {
index := stores.NewIndex()
sched := newScheduler(spt)
sched.testSync = make(chan struct{})
go sched.runSched()
for _, worker := range workers {
@ -291,7 +316,7 @@ func TestSched(t *testing.T) {
t.Run("one-pc1", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
}, []task{
sched("pc1-1", "fred", sealtasks.TTPreCommit1),
sched("pc1-1", "fred", 8, sealtasks.TTPreCommit1),
taskDone("pc1-1"),
}))
@ -299,7 +324,7 @@ func TestSched(t *testing.T) {
{name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}},
{name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
}, []task{
sched("pc1-1", "fred1", sealtasks.TTPreCommit1),
sched("pc1-1", "fred1", 8, sealtasks.TTPreCommit1),
taskDone("pc1-1"),
}))
@ -307,17 +332,17 @@ func TestSched(t *testing.T) {
{name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
{name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}},
}, []task{
sched("pc1-1", "fred1", sealtasks.TTPreCommit1),
sched("pc1-1", "fred1", 8, sealtasks.TTPreCommit1),
taskDone("pc1-1"),
}))
t.Run("pc1-block-pc2", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
}, []task{
sched("pc1", "fred", sealtasks.TTPreCommit1),
sched("pc1", "fred", 8, sealtasks.TTPreCommit1),
taskStarted("pc1"),
sched("pc2", "fred", sealtasks.TTPreCommit2),
sched("pc2", "fred", 8, sealtasks.TTPreCommit2),
taskNotScheduled("pc2"),
taskDone("pc1"),
@ -327,10 +352,10 @@ func TestSched(t *testing.T) {
t.Run("pc2-block-pc1", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
}, []task{
sched("pc2", "fred", sealtasks.TTPreCommit2),
sched("pc2", "fred", 8, sealtasks.TTPreCommit2),
taskStarted("pc2"),
sched("pc1", "fred", sealtasks.TTPreCommit1),
sched("pc1", "fred", 8, sealtasks.TTPreCommit1),
taskNotScheduled("pc1"),
taskDone("pc2"),
@ -340,20 +365,20 @@ func TestSched(t *testing.T) {
t.Run("pc1-batching", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
}, []task{
sched("t1", "fred", sealtasks.TTPreCommit1),
sched("t1", "fred", 8, sealtasks.TTPreCommit1),
taskStarted("t1"),
sched("t2", "fred", sealtasks.TTPreCommit1),
sched("t2", "fred", 8, sealtasks.TTPreCommit1),
taskStarted("t2"),
// with worker settings, we can only run 2 parallel PC1s
// start 2 more to fill fetch buffer
sched("t3", "fred", sealtasks.TTPreCommit1),
sched("t3", "fred", 8, sealtasks.TTPreCommit1),
taskNotScheduled("t3"),
sched("t4", "fred", sealtasks.TTPreCommit1),
sched("t4", "fred", 8, sealtasks.TTPreCommit1),
taskNotScheduled("t4"),
taskDone("t1"),
@ -366,60 +391,71 @@ func TestSched(t *testing.T) {
taskDone("t4"),
}))
twoPC1 := func(prefix string, schedAssert func(name string) task) task {
twoPC1 := func(prefix string, sid abi.SectorNumber, schedAssert func(name string) task) task {
return multTask(
sched(prefix+"-a", "fred", sealtasks.TTPreCommit1),
sched(prefix+"-a", "fred", sid, sealtasks.TTPreCommit1),
schedAssert(prefix+"-a"),
sched(prefix+"-b", "fred", sealtasks.TTPreCommit1),
sched(prefix+"-b", "fred", sid + 1, sealtasks.TTPreCommit1),
schedAssert(prefix+"-b"),
)
}
twoPC1Done := func(prefix string) task {
twoPC1Act := func(prefix string, schedAssert func(name string) task) task {
return multTask(
taskDone(prefix+"-1"),
taskDone(prefix+"-b"),
schedAssert(prefix+"-a"),
schedAssert(prefix+"-b"),
)
}
t.Run("pc1-pc2-prio", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
}, []task{
// fill exec/fetch buffers
twoPC1("w0", taskStarted),
twoPC1("w1", taskNotScheduled),
for i := 0; i < 100; i++ {
t.Run("pc1-pc2-prio", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
}, []task{
// fill exec/fetch buffers
twoPC1("w0", 0, taskStarted),
twoPC1("w1", 2, taskNotScheduled),
// fill worker windows
twoPC1("w2", taskNotScheduled),
twoPC1("w3", taskNotScheduled),
// fill worker windows
twoPC1("w2", 4, taskNotScheduled),
//twoPC1("w3", taskNotScheduled),
// windowed
// windowed
sched("t1", "fred", sealtasks.TTPreCommit1),
taskNotScheduled("t1"),
sched("t1", "fred", 6, sealtasks.TTPreCommit1),
taskNotScheduled("t1"),
sched("t2", "fred", sealtasks.TTPreCommit1),
taskNotScheduled("t2"),
sched("t2", "fred", 7, sealtasks.TTPreCommit1),
taskNotScheduled("t2"),
sched("t3", "fred", sealtasks.TTPreCommit2),
taskNotScheduled("t3"),
sched("t3", "fred", 8, sealtasks.TTPreCommit2),
taskNotScheduled("t3"),
twoPC1Done("w0"),
twoPC1Done("w1"),
twoPC1Done("w2"),
twoPC1Done("w3"),
twoPC1Act("w0", taskDone),
twoPC1Act("w1", taskStarted),
twoPC1Act("w2", taskNotScheduled),
//twoPC1Act("w3", taskNotScheduled),
taskStarted("t1"),
taskNotScheduled("t2"),
taskNotScheduled("t3"),
twoPC1Act("w1", taskDone),
twoPC1Act("w2", taskStarted),
//twoPC1Act("w3", taskNotScheduled),
taskDone("t1"),
twoPC1Act("w2", taskDone),
//twoPC1Act("w3", taskStarted),
taskStarted("t2"),
taskStarted("t3"),
//twoPC1Act("w3", taskDone),
taskDone("t2"),
taskDone("t3"),
}))
taskStarted("t3"),
taskNotScheduled("t1"),
taskNotScheduled("t2"),
taskDone("t3"),
taskStarted("t1"),
taskStarted("t2"),
taskDone("t1"),
taskDone("t2"),
}))
}
}