Show prepared tasks in sealing jobs

This commit is contained in:
Łukasz Magiera 2021-10-15 21:26:35 +02:00
parent 11d738eee0
commit 261238e157
4 changed files with 30 additions and 14 deletions

View File

@ -224,8 +224,10 @@ var sealingJobsCmd = &cli.Command{
for _, l := range lines { for _, l := range lines {
state := "running" state := "running"
switch { switch {
case l.RunWait > 0: case l.RunWait > 1:
state = fmt.Sprintf("assigned(%d)", l.RunWait-1) state = fmt.Sprintf("assigned(%d)", l.RunWait-1)
case l.RunWait == storiface.RWPrepared:
state = "prepared"
case l.RunWait == storiface.RWRetDone: case l.RunWait == storiface.RWRetDone:
if !cctx.Bool("show-ret-done") { if !cctx.Bool("show-ret-done") {
continue continue

View File

@ -35,7 +35,13 @@ func (m *Manager) WorkerJobs() map[uuid.UUID][]storiface.WorkerJob {
out := map[uuid.UUID][]storiface.WorkerJob{} out := map[uuid.UUID][]storiface.WorkerJob{}
calls := map[storiface.CallID]struct{}{} calls := map[storiface.CallID]struct{}{}
for _, t := range m.sched.workTracker.Running() { running, preparing := m.sched.workTracker.Running()
for _, t := range running {
out[uuid.UUID(t.worker)] = append(out[uuid.UUID(t.worker)], t.job)
calls[t.job.ID] = struct{}{}
}
for _, t := range preparing {
out[uuid.UUID(t.worker)] = append(out[uuid.UUID(t.worker)], t.job) out[uuid.UUID(t.worker)] = append(out[uuid.UUID(t.worker)], t.job)
calls[t.job.ID] = struct{}{} calls[t.job.ID] = struct{}{}
} }

View File

@ -47,6 +47,8 @@ type WorkerStats struct {
} }
const ( const (
RWPrepared = 1
RWRunning = 0
RWRetWait = -1 RWRetWait = -1
RWReturned = -2 RWReturned = -2
RWRetDone = -3 RWRetDone = -3
@ -57,7 +59,8 @@ type WorkerJob struct {
Sector abi.SectorID Sector abi.SectorID
Task sealtasks.TaskType Task sealtasks.TaskType
// 1+ - assigned // 2+ - assigned
// 1 - prepared
// 0 - running // 0 - running
// -1 - ret-wait // -1 - ret-wait
// -2 - returned // -2 - returned

View File

@ -72,13 +72,14 @@ func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid Worke
return callID, err return callID, err
} }
tracked := func() trackedWork { tracked := func(rw int) trackedWork {
return trackedWork{ return trackedWork{
job: storiface.WorkerJob{ job: storiface.WorkerJob{
ID: callID, ID: callID,
Sector: sid.ID, Sector: sid.ID,
Task: task, Task: task,
Start: time.Now(), Start: time.Now(),
RunWait: rw,
}, },
worker: wid, worker: wid,
workerHostname: wi.Hostname, workerHostname: wi.Hostname,
@ -90,7 +91,7 @@ func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid Worke
case <-ctx.Done(): case <-ctx.Done():
return callID, ctx.Err() return callID, ctx.Err()
default: default:
wt.prepared[callID] = tracked() wt.prepared[callID] = tracked(storiface.RWPrepared)
wt.lk.Unlock() wt.lk.Unlock()
select { select {
@ -111,7 +112,7 @@ func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid Worke
delete(wt.prepared, callID) delete(wt.prepared, callID)
} }
wt.running[callID] = tracked() wt.running[callID] = tracked(storiface.RWRunning)
ctx, _ = tag.New( ctx, _ = tag.New(
ctx, ctx,
@ -136,16 +137,20 @@ func (wt *workTracker) worker(wid WorkerID, wi storiface.WorkerInfo, w Worker) *
} }
} }
func (wt *workTracker) Running() []trackedWork { func (wt *workTracker) Running() ([]trackedWork, []trackedWork) {
wt.lk.Lock() wt.lk.Lock()
defer wt.lk.Unlock() defer wt.lk.Unlock()
out := make([]trackedWork, 0, len(wt.running)) running := make([]trackedWork, 0, len(wt.running))
for _, job := range wt.running { for _, job := range wt.running {
out = append(out, job) running = append(running, job)
}
prepared := make([]trackedWork, 0, len(wt.prepared))
for _, job := range wt.prepared {
prepared = append(prepared, job)
} }
return out return running, prepared
} }
type trackedWorker struct { type trackedWorker struct {