diff --git a/api/api_storage.go b/api/api_storage.go index aab4e364a..aee5b5b5b 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -63,7 +63,7 @@ type StorageMiner interface { // WorkerConnect tells the node to connect to workers RPC WorkerConnect(context.Context, string) error WorkerStats(context.Context) (map[uint64]storiface.WorkerStats, error) - WorkerJobs(context.Context) (map[uint64][]storiface.WorkerJob, error) + WorkerJobs(context.Context) (map[int64][]storiface.WorkerJob, error) storiface.WorkerReturn // SealingSchedDiag dumps internal sealing scheduler state diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 60e03f565..d4b48d66d 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -280,7 +280,7 @@ type StorageMinerStruct struct { WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm WorkerStats func(context.Context) (map[uint64]storiface.WorkerStats, error) `perm:"admin"` - WorkerJobs func(context.Context) (map[uint64][]storiface.WorkerJob, error) `perm:"admin"` + WorkerJobs func(context.Context) (map[int64][]storiface.WorkerJob, error) `perm:"admin"` ReturnAddPiece func(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err string) error `perm:"admin"` ReturnSealPreCommit1 func(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err string) error `perm:"admin"` @@ -1093,7 +1093,7 @@ func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (map[uint64]storif return c.Internal.WorkerStats(ctx) } -func (c *StorageMinerStruct) WorkerJobs(ctx context.Context) (map[uint64][]storiface.WorkerJob, error) { +func (c *StorageMinerStruct) WorkerJobs(ctx context.Context) (map[int64][]storiface.WorkerJob, error) { return c.Internal.WorkerJobs(ctx) } diff --git a/cmd/lotus-storage-miner/sealing.go b/cmd/lotus-storage-miner/sealing.go index 5cc5c419a..7d612a03a 100644 --- a/cmd/lotus-storage-miner/sealing.go +++ b/cmd/lotus-storage-miner/sealing.go @@ -1,6 +1,7 @@ package main import ( + "encoding/hex" "encoding/json" "fmt" "os" @@ -9,10 +10,9 @@ import ( "text/tabwriter" "time" - "golang.org/x/xerrors" - "github.com/fatih/color" "github.com/urfave/cli/v2" + "golang.org/x/xerrors" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" @@ -139,7 +139,7 @@ var sealingJobsCmd = &cli.Command{ type line struct { storiface.WorkerJob - wid uint64 + wid int64 } lines := make([]line, 0) @@ -161,7 +161,7 @@ var sealingJobsCmd = &cli.Command{ return lines[i].Start.Before(lines[j].Start) }) - workerHostnames := map[uint64]string{} + workerHostnames := map[int64]string{} wst, err := nodeApi.WorkerStats(ctx) if err != nil { @@ -169,7 +169,7 @@ var sealingJobsCmd = &cli.Command{ } for wid, st := range wst { - workerHostnames[wid] = st.Info.Hostname + workerHostnames[int64(wid)] = st.Info.Hostname } tw := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0) @@ -177,10 +177,18 @@ var sealingJobsCmd = &cli.Command{ for _, l := range lines { state := "running" - if l.RunWait != 0 { + if l.RunWait > 0 { state = fmt.Sprintf("assigned(%d)", l.RunWait-1) } - _, _ = fmt.Fprintf(tw, "%d\t%d\t%d\t%s\t%s\t%s\t%s\n", l.ID, l.Sector.Number, l.wid, workerHostnames[l.wid], l.Task.Short(), state, time.Now().Sub(l.Start).Truncate(time.Millisecond*100)) + if l.RunWait == -1 { + state = "ret-wait" + } + dur := "n/a" + if !l.Start.IsZero() { + dur = time.Now().Sub(l.Start).Truncate(time.Millisecond * 100).String() + } + + _, _ = fmt.Fprintf(tw, "%s\t%d\t%d\t%s\t%s\t%s\t%s\n", hex.EncodeToString(l.ID.ID[10:]), l.Sector.Number, l.wid, workerHostnames[l.wid], l.Task.Short(), state, dur) } return tw.Flush() diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index d4db60806..c8553a4e9 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -57,7 +57,7 @@ type SectorManager interface { FaultTracker } -type WorkerID uint64 +type WorkerID int64 type Manager struct { scfg *ffiwrapper.Config diff --git a/extern/sector-storage/stats.go b/extern/sector-storage/stats.go index 1ce415fd2..9b8cbc24e 100644 --- a/extern/sector-storage/stats.go +++ b/extern/sector-storage/stats.go @@ -2,6 +2,7 @@ package sectorstorage import ( "github.com/filecoin-project/lotus/extern/sector-storage/storiface" + "time" ) func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats { @@ -23,21 +24,22 @@ func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats { return out } -func (m *Manager) WorkerJobs() map[uint64][]storiface.WorkerJob { - m.sched.workersLk.RLock() - defer m.sched.workersLk.RUnlock() - - out := map[uint64][]storiface.WorkerJob{} +func (m *Manager) WorkerJobs() map[int64][]storiface.WorkerJob { + out := map[int64][]storiface.WorkerJob{} + calls := map[storiface.CallID]struct{}{} for _, t := range m.sched.wt.Running() { - out[uint64(t.worker)] = append(out[uint64(t.worker)], t.job) + out[int64(t.worker)] = append(out[int64(t.worker)], t.job) + calls[t.job.ID] = struct{}{} } + m.sched.workersLk.RLock() + for id, handle := range m.sched.workers { handle.wndLk.Lock() for wi, window := range handle.activeWindows { for _, request := range window.todo { - out[uint64(id)] = append(out[uint64(id)], storiface.WorkerJob{ + out[int64(id)] = append(out[int64(id)], storiface.WorkerJob{ ID: storiface.UndefCall, Sector: request.sector, Task: request.taskType, @@ -49,5 +51,25 @@ func (m *Manager) WorkerJobs() map[uint64][]storiface.WorkerJob { handle.wndLk.Unlock() } + m.sched.workersLk.RUnlock() + + m.workLk.Lock() + defer m.workLk.Unlock() + + for id := range m.callToWork { + _, found := calls[id] + if found { + continue + } + + out[-1] = append(out[-1], storiface.WorkerJob{ + ID: id, + Sector: id.Sector, + Task: "???", + RunWait: -1, + Start: time.Time{}, + }) + } + return out } diff --git a/extern/sector-storage/storiface/worker.go b/extern/sector-storage/storiface/worker.go index ead770524..e6ab2246f 100644 --- a/extern/sector-storage/storiface/worker.go +++ b/extern/sector-storage/storiface/worker.go @@ -45,7 +45,7 @@ type WorkerJob struct { Sector abi.SectorID Task sealtasks.TaskType - RunWait int // 0 - running, 1+ - assigned + RunWait int // -1 - ret-wait, 0 - running, 1+ - assigned Start time.Time } diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 337640027..ba0719078 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -87,7 +87,7 @@ func (sm *StorageMinerAPI) WorkerStats(context.Context) (map[uint64]storiface.Wo return sm.StorageMgr.WorkerStats(), nil } -func (sm *StorageMinerAPI) WorkerJobs(ctx context.Context) (map[uint64][]storiface.WorkerJob, error) { +func (sm *StorageMinerAPI) WorkerJobs(ctx context.Context) (map[int64][]storiface.WorkerJob, error) { return sm.StorageMgr.WorkerJobs(), nil }