Show lost calls in sealing jobs cli

This commit is contained in:
Łukasz Magiera 2020-09-23 19:26:26 +02:00
parent c17f0d7e61
commit d817dceb05
7 changed files with 50 additions and 20 deletions

View File

@ -63,7 +63,7 @@ type StorageMiner interface {
// WorkerConnect tells the node to connect to workers RPC
WorkerConnect(context.Context, string) error
WorkerStats(context.Context) (map[uint64]storiface.WorkerStats, error)
WorkerJobs(context.Context) (map[uint64][]storiface.WorkerJob, error)
WorkerJobs(context.Context) (map[int64][]storiface.WorkerJob, error)
storiface.WorkerReturn
// SealingSchedDiag dumps internal sealing scheduler state

View File

@ -280,7 +280,7 @@ type StorageMinerStruct struct {
WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm
WorkerStats func(context.Context) (map[uint64]storiface.WorkerStats, error) `perm:"admin"`
WorkerJobs func(context.Context) (map[uint64][]storiface.WorkerJob, error) `perm:"admin"`
WorkerJobs func(context.Context) (map[int64][]storiface.WorkerJob, error) `perm:"admin"`
ReturnAddPiece func(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err string) error `perm:"admin"`
ReturnSealPreCommit1 func(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err string) error `perm:"admin"`
@ -1093,7 +1093,7 @@ func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (map[uint64]storif
return c.Internal.WorkerStats(ctx)
}
func (c *StorageMinerStruct) WorkerJobs(ctx context.Context) (map[uint64][]storiface.WorkerJob, error) {
func (c *StorageMinerStruct) WorkerJobs(ctx context.Context) (map[int64][]storiface.WorkerJob, error) {
return c.Internal.WorkerJobs(ctx)
}

View File

@ -1,6 +1,7 @@
package main
import (
"encoding/hex"
"encoding/json"
"fmt"
"os"
@ -9,10 +10,9 @@ import (
"text/tabwriter"
"time"
"golang.org/x/xerrors"
"github.com/fatih/color"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
@ -139,7 +139,7 @@ var sealingJobsCmd = &cli.Command{
type line struct {
storiface.WorkerJob
wid uint64
wid int64
}
lines := make([]line, 0)
@ -161,7 +161,7 @@ var sealingJobsCmd = &cli.Command{
return lines[i].Start.Before(lines[j].Start)
})
workerHostnames := map[uint64]string{}
workerHostnames := map[int64]string{}
wst, err := nodeApi.WorkerStats(ctx)
if err != nil {
@ -169,7 +169,7 @@ var sealingJobsCmd = &cli.Command{
}
for wid, st := range wst {
workerHostnames[wid] = st.Info.Hostname
workerHostnames[int64(wid)] = st.Info.Hostname
}
tw := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0)
@ -177,10 +177,18 @@ var sealingJobsCmd = &cli.Command{
for _, l := range lines {
state := "running"
if l.RunWait != 0 {
if l.RunWait > 0 {
state = fmt.Sprintf("assigned(%d)", l.RunWait-1)
}
_, _ = fmt.Fprintf(tw, "%d\t%d\t%d\t%s\t%s\t%s\t%s\n", l.ID, l.Sector.Number, l.wid, workerHostnames[l.wid], l.Task.Short(), state, time.Now().Sub(l.Start).Truncate(time.Millisecond*100))
if l.RunWait == -1 {
state = "ret-wait"
}
dur := "n/a"
if !l.Start.IsZero() {
dur = time.Now().Sub(l.Start).Truncate(time.Millisecond * 100).String()
}
_, _ = fmt.Fprintf(tw, "%s\t%d\t%d\t%s\t%s\t%s\t%s\n", hex.EncodeToString(l.ID.ID[10:]), l.Sector.Number, l.wid, workerHostnames[l.wid], l.Task.Short(), state, dur)
}
return tw.Flush()

View File

@ -57,7 +57,7 @@ type SectorManager interface {
FaultTracker
}
type WorkerID uint64
type WorkerID int64
type Manager struct {
scfg *ffiwrapper.Config

View File

@ -2,6 +2,7 @@ package sectorstorage
import (
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"time"
)
func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats {
@ -23,21 +24,22 @@ func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats {
return out
}
func (m *Manager) WorkerJobs() map[uint64][]storiface.WorkerJob {
m.sched.workersLk.RLock()
defer m.sched.workersLk.RUnlock()
out := map[uint64][]storiface.WorkerJob{}
func (m *Manager) WorkerJobs() map[int64][]storiface.WorkerJob {
out := map[int64][]storiface.WorkerJob{}
calls := map[storiface.CallID]struct{}{}
for _, t := range m.sched.wt.Running() {
out[uint64(t.worker)] = append(out[uint64(t.worker)], t.job)
out[int64(t.worker)] = append(out[int64(t.worker)], t.job)
calls[t.job.ID] = struct{}{}
}
m.sched.workersLk.RLock()
for id, handle := range m.sched.workers {
handle.wndLk.Lock()
for wi, window := range handle.activeWindows {
for _, request := range window.todo {
out[uint64(id)] = append(out[uint64(id)], storiface.WorkerJob{
out[int64(id)] = append(out[int64(id)], storiface.WorkerJob{
ID: storiface.UndefCall,
Sector: request.sector,
Task: request.taskType,
@ -49,5 +51,25 @@ func (m *Manager) WorkerJobs() map[uint64][]storiface.WorkerJob {
handle.wndLk.Unlock()
}
m.sched.workersLk.RUnlock()
m.workLk.Lock()
defer m.workLk.Unlock()
for id := range m.callToWork {
_, found := calls[id]
if found {
continue
}
out[-1] = append(out[-1], storiface.WorkerJob{
ID: id,
Sector: id.Sector,
Task: "???",
RunWait: -1,
Start: time.Time{},
})
}
return out
}

View File

@ -45,7 +45,7 @@ type WorkerJob struct {
Sector abi.SectorID
Task sealtasks.TaskType
RunWait int // 0 - running, 1+ - assigned
RunWait int // -1 - ret-wait, 0 - running, 1+ - assigned
Start time.Time
}

View File

@ -87,7 +87,7 @@ func (sm *StorageMinerAPI) WorkerStats(context.Context) (map[uint64]storiface.Wo
return sm.StorageMgr.WorkerStats(), nil
}
func (sm *StorageMinerAPI) WorkerJobs(ctx context.Context) (map[uint64][]storiface.WorkerJob, error) {
func (sm *StorageMinerAPI) WorkerJobs(ctx context.Context) (map[int64][]storiface.WorkerJob, error) {
return sm.StorageMgr.WorkerJobs(), nil
}