diff --git a/cmd/lotus-storage-miner/sealing.go b/cmd/lotus-storage-miner/sealing.go index 2f966dcca..1e34859d7 100644 --- a/cmd/lotus-storage-miner/sealing.go +++ b/cmd/lotus-storage-miner/sealing.go @@ -155,6 +155,9 @@ var sealingJobsCmd = &cli.Command{ // oldest first sort.Slice(lines, func(i, j int) bool { + if lines[i].RunWait != lines[j].RunWait { + return !lines[i].RunWait // already running tasks first + } return lines[i].Start.Before(lines[j].Start) }) @@ -170,10 +173,14 @@ var sealingJobsCmd = &cli.Command{ } tw := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0) - _, _ = fmt.Fprintf(tw, "ID\tSector\tWorker\tHostname\tTask\tTime\n") + _, _ = fmt.Fprintf(tw, "ID\tSector\tWorker\tHostname\tTask\tState\tTime\n") for _, l := range lines { - _, _ = fmt.Fprintf(tw, "%d\t%d\t%d\t%s\t%s\t%s\n", l.ID, l.Sector.Number, l.wid, workerHostnames[l.wid], l.Task.Short(), time.Now().Sub(l.Start).Truncate(time.Millisecond*100)) + state := "assigned" + if !l.RunWait { + state = "running" + } + _, _ = fmt.Fprintf(tw, "%d\t%d\t%d\t%s\t%s\t%s\t%s\n", l.ID, l.Sector.Number, l.wid, workerHostnames[l.wid], l.Task.Short(), state, time.Now().Sub(l.Start).Truncate(time.Millisecond*100)) } return tw.Flush() diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index 7a4b6f9ef..43d5dc158 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -85,6 +85,9 @@ type workerHandle struct { lk sync.Mutex + wndLk sync.Mutex + activeWindows []*schedWindow + // stats / tracking wt *workTracker @@ -123,6 +126,8 @@ type workerRequest struct { prepare WorkerAction work WorkerAction + start time.Time + index int // The index of the item in the heap. indexHeap int @@ -171,6 +176,8 @@ func (sh *scheduler) Schedule(ctx context.Context, sector abi.SectorID, taskType prepare: prepare, work: work, + start: time.Now(), + ret: ret, ctx: ctx, }: @@ -475,8 +482,6 @@ func (sh *scheduler) runWorker(wid WorkerID) { taskDone := make(chan struct{}, 1) windowsRequested := 0 - var activeWindows []*schedWindow - ctx, cancel := context.WithCancel(context.TODO()) defer cancel() @@ -510,7 +515,9 @@ func (sh *scheduler) runWorker(wid WorkerID) { select { case w := <-scheduledWindows: - activeWindows = append(activeWindows, w) + worker.wndLk.Lock() + worker.activeWindows = append(worker.activeWindows, w) + worker.wndLk.Unlock() case <-taskDone: log.Debugw("task done", "workerid", wid) case <-sh.closing: @@ -521,12 +528,14 @@ func (sh *scheduler) runWorker(wid WorkerID) { return } + worker.wndLk.Lock() + assignLoop: // process windows in order - for len(activeWindows) > 0 { + for len(worker.activeWindows) > 0 { // process tasks within a window in order - for len(activeWindows[0].todo) > 0 { - todo := activeWindows[0].todo[0] + for len(worker.activeWindows[0].todo) > 0 { + todo := worker.activeWindows[0].todo[0] needRes := ResourceTable[todo.taskType][sh.spt] sh.workersLk.RLock() @@ -548,15 +557,17 @@ func (sh *scheduler) runWorker(wid WorkerID) { go todo.respond(xerrors.Errorf("assignWorker error: %w", err)) } - activeWindows[0].todo = activeWindows[0].todo[1:] + worker.activeWindows[0].todo = worker.activeWindows[0].todo[1:] } - copy(activeWindows, activeWindows[1:]) - activeWindows[len(activeWindows)-1] = nil - activeWindows = activeWindows[:len(activeWindows)-1] + copy(worker.activeWindows, worker.activeWindows[1:]) + worker.activeWindows[len(worker.activeWindows)-1] = nil + worker.activeWindows = worker.activeWindows[:len(worker.activeWindows)-1] windowsRequested-- } + + worker.wndLk.Unlock() } }() } diff --git a/extern/sector-storage/stats.go b/extern/sector-storage/stats.go index 9abbdb83a..a915c4320 100644 --- a/extern/sector-storage/stats.go +++ b/extern/sector-storage/stats.go @@ -1,6 +1,8 @@ package sectorstorage -import "github.com/filecoin-project/lotus/extern/sector-storage/storiface" +import ( + "github.com/filecoin-project/lotus/extern/sector-storage/storiface" +) func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats { m.sched.workersLk.RLock() @@ -29,6 +31,20 @@ func (m *Manager) WorkerJobs() map[uint64][]storiface.WorkerJob { for id, handle := range m.sched.workers { out[uint64(id)] = handle.wt.Running() + + handle.wndLk.Lock() + for _, window := range handle.activeWindows { + for _, request := range window.todo { + out[uint64(id)] = append(out[uint64(id)], storiface.WorkerJob{ + ID: 0, + Sector: request.sector, + Task: request.taskType, + RunWait: true, + Start: request.start, + }) + } + } + handle.wndLk.Unlock() } return out diff --git a/extern/sector-storage/storiface/worker.go b/extern/sector-storage/storiface/worker.go index 37c447031..1140ed4df 100644 --- a/extern/sector-storage/storiface/worker.go +++ b/extern/sector-storage/storiface/worker.go @@ -37,5 +37,6 @@ type WorkerJob struct { Sector abi.SectorID Task sealtasks.TaskType - Start time.Time + RunWait bool + Start time.Time }