From 9fd91bb70aeb0fc7048f6fa1e0e10380573a8948 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 27 Jul 2020 12:17:09 +0200 Subject: [PATCH] sched: function to dump scheduler state --- manager.go | 4 ++++ sched.go | 62 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/manager.go b/manager.go index 64dd2dcbc..4791eb5e6 100644 --- a/manager.go +++ b/manager.go @@ -494,6 +494,10 @@ func (m *Manager) FsStat(ctx context.Context, id stores.ID) (fsutil.FsStat, erro return m.storage.FsStat(ctx, id) } +func (m *Manager) SchedDiag(ctx context.Context) (interface{}, error) { + return m.sched.Info(ctx) +} + func (m *Manager) Close(ctx context.Context) error { return m.sched.Close(ctx) } diff --git a/sched.go b/sched.go index a7a6d3e86..239b52063 100644 --- a/sched.go +++ b/sched.go @@ -69,6 +69,8 @@ type scheduler struct { schedQueue *requestQueue openWindows []*schedWindowRequest + info chan func(interface{}) + closing chan struct{} closed chan struct{} testSync chan struct{} // used for testing @@ -148,6 +150,8 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler { schedQueue: &requestQueue{}, + info: make(chan func(interface{})), + closing: make(chan struct{}), closed: make(chan struct{}), } @@ -193,6 +197,17 @@ func (r *workerRequest) respond(err error) { } } +type SchedDiagRequestInfo struct { + Sector abi.SectorID + TaskType sealtasks.TaskType + Priority int +} + +type SchedDiagInfo struct { + Requests []SchedDiagRequestInfo + OpenWindows []WorkerID +} + func (sh *scheduler) runSched() { defer close(sh.closed) @@ -217,6 +232,9 @@ func (sh *scheduler) runSched() { sh.openWindows = append(sh.openWindows, req) sh.trySched() + case ireq := <-sh.info: + ireq(sh.diag()) + case <-sh.closing: sh.schedClose() return @@ -224,6 +242,26 @@ func (sh *scheduler) runSched() { } } +func (sh *scheduler) diag() SchedDiagInfo { + var out SchedDiagInfo + + for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ { + task := (*sh.schedQueue)[sqi] + + out.Requests = append(out.Requests, SchedDiagRequestInfo{ + Sector: task.sector, + TaskType: task.taskType, + Priority: task.priority, + }) + } + + for _, window := range sh.openWindows { + out.OpenWindows = append(out.OpenWindows, window.worker) + } + + return out +} + func (sh *scheduler) trySched() { /* This assigns tasks to workers based on: @@ -244,7 +282,7 @@ func (sh *scheduler) trySched() { windows := make([]schedWindow, len(sh.openWindows)) acceptableWindows := make([][]int, sh.schedQueue.Len()) - log.Debugf("trySched %d queued; %d open windows", sh.schedQueue.Len(), len(windows)) + log.Debugf("SCHED %d queued; %d open windows", sh.schedQueue.Len(), len(windows)) // Step 1 for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ { @@ -306,6 +344,9 @@ func (sh *scheduler) trySched() { }) } + log.Debugf("SCHED windows: %+v", windows) + log.Debugf("SCHED Acceptable win: %+v", acceptableWindows) + // Step 2 scheduled := 0 @@ -318,14 +359,14 @@ func (sh *scheduler) trySched() { wid := sh.openWindows[wnd].worker wr := sh.workers[wid].info.Resources - log.Debugf("trySched try assign sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd) + log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd) // TODO: allow bigger windows if !windows[wnd].allocated.canHandleRequest(needRes, wid, wr) { continue } - log.Debugf("trySched ASSIGNED sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd) + log.Debugf("SCHED ASSIGNED sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd) windows[wnd].allocated.add(wr, needRes) @@ -623,6 +664,21 @@ func (sh *scheduler) schedClose() { } } +func (sh *scheduler) Info(ctx context.Context) (interface{}, error) { + ch := make(chan interface{}, 1) + + sh.info <- func(res interface{}) { + ch <- res + } + + select { + case res := <- ch: + return res, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + func (sh *scheduler) Close(ctx context.Context) error { close(sh.closing) select {