sched: function to dump scheduler state

This commit is contained in:
Łukasz Magiera 2020-07-27 12:17:09 +02:00
parent 57868bafa6
commit 9fd91bb70a
2 changed files with 63 additions and 3 deletions

View File

@ -494,6 +494,10 @@ func (m *Manager) FsStat(ctx context.Context, id stores.ID) (fsutil.FsStat, erro
return m.storage.FsStat(ctx, id)
}
func (m *Manager) SchedDiag(ctx context.Context) (interface{}, error) {
return m.sched.Info(ctx)
}
func (m *Manager) Close(ctx context.Context) error {
return m.sched.Close(ctx)
}

View File

@ -69,6 +69,8 @@ type scheduler struct {
schedQueue *requestQueue
openWindows []*schedWindowRequest
info chan func(interface{})
closing chan struct{}
closed chan struct{}
testSync chan struct{} // used for testing
@ -148,6 +150,8 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler {
schedQueue: &requestQueue{},
info: make(chan func(interface{})),
closing: make(chan struct{}),
closed: make(chan struct{}),
}
@ -193,6 +197,17 @@ func (r *workerRequest) respond(err error) {
}
}
type SchedDiagRequestInfo struct {
Sector abi.SectorID
TaskType sealtasks.TaskType
Priority int
}
type SchedDiagInfo struct {
Requests []SchedDiagRequestInfo
OpenWindows []WorkerID
}
func (sh *scheduler) runSched() {
defer close(sh.closed)
@ -217,6 +232,9 @@ func (sh *scheduler) runSched() {
sh.openWindows = append(sh.openWindows, req)
sh.trySched()
case ireq := <-sh.info:
ireq(sh.diag())
case <-sh.closing:
sh.schedClose()
return
@ -224,6 +242,26 @@ func (sh *scheduler) runSched() {
}
}
func (sh *scheduler) diag() SchedDiagInfo {
var out SchedDiagInfo
for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ {
task := (*sh.schedQueue)[sqi]
out.Requests = append(out.Requests, SchedDiagRequestInfo{
Sector: task.sector,
TaskType: task.taskType,
Priority: task.priority,
})
}
for _, window := range sh.openWindows {
out.OpenWindows = append(out.OpenWindows, window.worker)
}
return out
}
func (sh *scheduler) trySched() {
/*
This assigns tasks to workers based on:
@ -244,7 +282,7 @@ func (sh *scheduler) trySched() {
windows := make([]schedWindow, len(sh.openWindows))
acceptableWindows := make([][]int, sh.schedQueue.Len())
log.Debugf("trySched %d queued; %d open windows", sh.schedQueue.Len(), len(windows))
log.Debugf("SCHED %d queued; %d open windows", sh.schedQueue.Len(), len(windows))
// Step 1
for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ {
@ -306,6 +344,9 @@ func (sh *scheduler) trySched() {
})
}
log.Debugf("SCHED windows: %+v", windows)
log.Debugf("SCHED Acceptable win: %+v", acceptableWindows)
// Step 2
scheduled := 0
@ -318,14 +359,14 @@ func (sh *scheduler) trySched() {
wid := sh.openWindows[wnd].worker
wr := sh.workers[wid].info.Resources
log.Debugf("trySched try assign sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd)
log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd)
// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, wid, wr) {
continue
}
log.Debugf("trySched ASSIGNED sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd)
log.Debugf("SCHED ASSIGNED sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd)
windows[wnd].allocated.add(wr, needRes)
@ -623,6 +664,21 @@ func (sh *scheduler) schedClose() {
}
}
func (sh *scheduler) Info(ctx context.Context) (interface{}, error) {
ch := make(chan interface{}, 1)
sh.info <- func(res interface{}) {
ch <- res
}
select {
case res := <- ch:
return res, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (sh *scheduler) Close(ctx context.Context) error {
close(sh.closing)
select {