sectorstorage: Show task type of ret-wait jobs

This commit is contained in:
Łukasz Magiera 2020-09-24 11:55:11 +02:00
parent f57652524c
commit 04ee53e061
5 changed files with 22 additions and 12 deletions

View File

@ -158,6 +158,9 @@ var sealingJobsCmd = &cli.Command{
if lines[i].RunWait != lines[j].RunWait {
return lines[i].RunWait < lines[j].RunWait
}
if lines[i].Start.Equal(lines[j].Start) {
return lines[i].ID.ID.String() < lines[j].ID.ID.String()
}
return lines[i].Start.Before(lines[j].Start)
})

View File

@ -6,6 +6,7 @@ import (
"fmt"
"io"
sealtasks "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
)
@ -378,7 +379,7 @@ func (t *WorkID) MarshalCBOR(w io.Writer) error {
scratch := make([]byte, 9)
// t.Method (string) (string)
// t.Method (sealtasks.TaskType) (string)
if len("Method") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Method\" was too long")
}
@ -459,7 +460,7 @@ func (t *WorkID) UnmarshalCBOR(r io.Reader) error {
}
switch name {
// t.Method (string) (string)
// t.Method (sealtasks.TaskType) (string)
case "Method":
{
@ -468,7 +469,7 @@ func (t *WorkID) UnmarshalCBOR(r io.Reader) error {
return err
}
t.Method = string(sval)
t.Method = sealtasks.TaskType(sval)
}
// t.Params (string) (string)
case "Params":

View File

@ -364,7 +364,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wk, wait, err := m.getWork(ctx, "PreCommit1", sector, ticket, pieces)
wk, wait, err := m.getWork(ctx, sealtasks.TTPreCommit1, sector, ticket, pieces)
if err != nil {
return nil, xerrors.Errorf("getWork: %w", err)
}
@ -408,7 +408,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wk, wait, err := m.getWork(ctx, "PreCommit2", sector, phase1Out)
wk, wait, err := m.getWork(ctx, sealtasks.TTPreCommit2, sector, phase1Out)
if err != nil {
return storage.SectorCids{}, xerrors.Errorf("getWork: %w", err)
}
@ -449,7 +449,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wk, wait, err := m.getWork(ctx, "Commit1", sector, ticket, seed, pieces, cids)
wk, wait, err := m.getWork(ctx, sealtasks.TTCommit1, sector, ticket, seed, pieces, cids)
if err != nil {
return storage.Commit1Out{}, xerrors.Errorf("getWork: %w", err)
}
@ -490,7 +490,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
}
func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (out storage.Proof, err error) {
wk, wait, err := m.getWork(ctx, "Commit2", sector, phase1Out)
wk, wait, err := m.getWork(ctx, sealtasks.TTCommit2, sector, phase1Out)
if err != nil {
return storage.Proof{}, xerrors.Errorf("getWork: %w", err)
}

View File

@ -8,12 +8,14 @@ import (
"errors"
"fmt"
"golang.org/x/xerrors"
"os"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
type WorkID struct {
Method string
Method sealtasks.TaskType
Params string // json [...params]
}
@ -40,7 +42,7 @@ type WorkState struct {
WorkError string // Status = wsDone, set when failed to start work
}
func newWorkID(method string, params ...interface{}) (WorkID, error) {
func newWorkID(method sealtasks.TaskType, params ...interface{}) (WorkID, error) {
pb, err := json.Marshal(params)
if err != nil {
return WorkID{}, xerrors.Errorf("marshaling work params: %w", err)
@ -74,6 +76,10 @@ func (m *Manager) setupWorkTracker() {
continue
}
if os.Getenv("LOTUS_MINER_ABORT_UNFINISHED_WORK") == "1" {
st.Status = wsDone
}
switch st.Status {
case wsStarted:
log.Warnf("dropping non-running work %s", wid)
@ -96,7 +102,7 @@ func (m *Manager) setupWorkTracker() {
}
// returns wait=true when the task is already tracked/running
func (m *Manager) getWork(ctx context.Context, method string, params ...interface{}) (wid WorkID, wait bool, err error) {
func (m *Manager) getWork(ctx context.Context, method sealtasks.TaskType, params ...interface{}) (wid WorkID, wait bool, err error) {
wid, err = newWorkID(method, params)
if err != nil {
return WorkID{}, false, xerrors.Errorf("creating WorkID: %w", err)

View File

@ -56,7 +56,7 @@ func (m *Manager) WorkerJobs() map[int64][]storiface.WorkerJob {
m.workLk.Lock()
defer m.workLk.Unlock()
for id := range m.callToWork {
for id, work := range m.callToWork {
_, found := calls[id]
if found {
continue
@ -65,7 +65,7 @@ func (m *Manager) WorkerJobs() map[int64][]storiface.WorkerJob {
out[-1] = append(out[-1], storiface.WorkerJob{
ID: id,
Sector: id.Sector,
Task: "???",
Task: work.Method,
RunWait: -1,
Start: time.Time{},
})