sealing sched: Show waiting tasks assigned to workers in sealing jobs cli

This commit is contained in:
Łukasz Magiera 2020-08-27 23:14:33 +02:00
parent d8e58e67c6
commit 59f554b658
4 changed files with 49 additions and 14 deletions

View File

@ -155,6 +155,9 @@ var sealingJobsCmd = &cli.Command{
// oldest first // oldest first
sort.Slice(lines, func(i, j int) bool { sort.Slice(lines, func(i, j int) bool {
if lines[i].RunWait != lines[j].RunWait {
return !lines[i].RunWait // already running tasks first
}
return lines[i].Start.Before(lines[j].Start) return lines[i].Start.Before(lines[j].Start)
}) })
@ -170,10 +173,14 @@ var sealingJobsCmd = &cli.Command{
} }
tw := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0) tw := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0)
_, _ = fmt.Fprintf(tw, "ID\tSector\tWorker\tHostname\tTask\tTime\n") _, _ = fmt.Fprintf(tw, "ID\tSector\tWorker\tHostname\tTask\tState\tTime\n")
for _, l := range lines { for _, l := range lines {
_, _ = fmt.Fprintf(tw, "%d\t%d\t%d\t%s\t%s\t%s\n", l.ID, l.Sector.Number, l.wid, workerHostnames[l.wid], l.Task.Short(), time.Now().Sub(l.Start).Truncate(time.Millisecond*100)) state := "assigned"
if !l.RunWait {
state = "running"
}
_, _ = fmt.Fprintf(tw, "%d\t%d\t%d\t%s\t%s\t%s\t%s\n", l.ID, l.Sector.Number, l.wid, workerHostnames[l.wid], l.Task.Short(), state, time.Now().Sub(l.Start).Truncate(time.Millisecond*100))
} }
return tw.Flush() return tw.Flush()

View File

@ -85,6 +85,9 @@ type workerHandle struct {
lk sync.Mutex lk sync.Mutex
wndLk sync.Mutex
activeWindows []*schedWindow
// stats / tracking // stats / tracking
wt *workTracker wt *workTracker
@ -123,6 +126,8 @@ type workerRequest struct {
prepare WorkerAction prepare WorkerAction
work WorkerAction work WorkerAction
start time.Time
index int // The index of the item in the heap. index int // The index of the item in the heap.
indexHeap int indexHeap int
@ -171,6 +176,8 @@ func (sh *scheduler) Schedule(ctx context.Context, sector abi.SectorID, taskType
prepare: prepare, prepare: prepare,
work: work, work: work,
start: time.Now(),
ret: ret, ret: ret,
ctx: ctx, ctx: ctx,
}: }:
@ -475,8 +482,6 @@ func (sh *scheduler) runWorker(wid WorkerID) {
taskDone := make(chan struct{}, 1) taskDone := make(chan struct{}, 1)
windowsRequested := 0 windowsRequested := 0
var activeWindows []*schedWindow
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(context.TODO())
defer cancel() defer cancel()
@ -510,7 +515,9 @@ func (sh *scheduler) runWorker(wid WorkerID) {
select { select {
case w := <-scheduledWindows: case w := <-scheduledWindows:
activeWindows = append(activeWindows, w) worker.wndLk.Lock()
worker.activeWindows = append(worker.activeWindows, w)
worker.wndLk.Unlock()
case <-taskDone: case <-taskDone:
log.Debugw("task done", "workerid", wid) log.Debugw("task done", "workerid", wid)
case <-sh.closing: case <-sh.closing:
@ -521,12 +528,14 @@ func (sh *scheduler) runWorker(wid WorkerID) {
return return
} }
worker.wndLk.Lock()
assignLoop: assignLoop:
// process windows in order // process windows in order
for len(activeWindows) > 0 { for len(worker.activeWindows) > 0 {
// process tasks within a window in order // process tasks within a window in order
for len(activeWindows[0].todo) > 0 { for len(worker.activeWindows[0].todo) > 0 {
todo := activeWindows[0].todo[0] todo := worker.activeWindows[0].todo[0]
needRes := ResourceTable[todo.taskType][sh.spt] needRes := ResourceTable[todo.taskType][sh.spt]
sh.workersLk.RLock() sh.workersLk.RLock()
@ -548,15 +557,17 @@ func (sh *scheduler) runWorker(wid WorkerID) {
go todo.respond(xerrors.Errorf("assignWorker error: %w", err)) go todo.respond(xerrors.Errorf("assignWorker error: %w", err))
} }
activeWindows[0].todo = activeWindows[0].todo[1:] worker.activeWindows[0].todo = worker.activeWindows[0].todo[1:]
} }
copy(activeWindows, activeWindows[1:]) copy(worker.activeWindows, worker.activeWindows[1:])
activeWindows[len(activeWindows)-1] = nil worker.activeWindows[len(worker.activeWindows)-1] = nil
activeWindows = activeWindows[:len(activeWindows)-1] worker.activeWindows = worker.activeWindows[:len(worker.activeWindows)-1]
windowsRequested-- windowsRequested--
} }
worker.wndLk.Unlock()
} }
}() }()
} }

View File

@ -1,6 +1,8 @@
package sectorstorage package sectorstorage
import "github.com/filecoin-project/lotus/extern/sector-storage/storiface" import (
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats { func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats {
m.sched.workersLk.RLock() m.sched.workersLk.RLock()
@ -29,6 +31,20 @@ func (m *Manager) WorkerJobs() map[uint64][]storiface.WorkerJob {
for id, handle := range m.sched.workers { for id, handle := range m.sched.workers {
out[uint64(id)] = handle.wt.Running() out[uint64(id)] = handle.wt.Running()
handle.wndLk.Lock()
for _, window := range handle.activeWindows {
for _, request := range window.todo {
out[uint64(id)] = append(out[uint64(id)], storiface.WorkerJob{
ID: 0,
Sector: request.sector,
Task: request.taskType,
RunWait: true,
Start: request.start,
})
}
}
handle.wndLk.Unlock()
} }
return out return out

View File

@ -37,5 +37,6 @@ type WorkerJob struct {
Sector abi.SectorID Sector abi.SectorID
Task sealtasks.TaskType Task sealtasks.TaskType
Start time.Time RunWait bool
Start time.Time
} }