diff --git a/cmd/lotus-storage-miner/sealing.go b/cmd/lotus-storage-miner/sealing.go index 7d612a03a..62276a0e3 100644 --- a/cmd/lotus-storage-miner/sealing.go +++ b/cmd/lotus-storage-miner/sealing.go @@ -158,6 +158,9 @@ var sealingJobsCmd = &cli.Command{ if lines[i].RunWait != lines[j].RunWait { return lines[i].RunWait < lines[j].RunWait } + if lines[i].Start.Equal(lines[j].Start) { + return lines[i].ID.ID.String() < lines[j].ID.ID.String() + } return lines[i].Start.Before(lines[j].Start) }) diff --git a/extern/sector-storage/cbor_gen.go b/extern/sector-storage/cbor_gen.go index 51b82ef13..0db97f2c9 100644 --- a/extern/sector-storage/cbor_gen.go +++ b/extern/sector-storage/cbor_gen.go @@ -6,6 +6,7 @@ import ( "fmt" "io" + sealtasks "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" cbg "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" ) @@ -378,7 +379,7 @@ func (t *WorkID) MarshalCBOR(w io.Writer) error { scratch := make([]byte, 9) - // t.Method (string) (string) + // t.Method (sealtasks.TaskType) (string) if len("Method") > cbg.MaxLength { return xerrors.Errorf("Value in field \"Method\" was too long") } @@ -459,7 +460,7 @@ func (t *WorkID) UnmarshalCBOR(r io.Reader) error { } switch name { - // t.Method (string) (string) + // t.Method (sealtasks.TaskType) (string) case "Method": { @@ -468,7 +469,7 @@ func (t *WorkID) UnmarshalCBOR(r io.Reader) error { return err } - t.Method = string(sval) + t.Method = sealtasks.TaskType(sval) } // t.Params (string) (string) case "Params": diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index c8553a4e9..7d49cc958 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -364,7 +364,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke ctx, cancel := context.WithCancel(ctx) defer cancel() - wk, wait, err := m.getWork(ctx, "PreCommit1", sector, ticket, pieces) + wk, wait, err := m.getWork(ctx, sealtasks.TTPreCommit1, sector, ticket, pieces) if err != nil { return nil, xerrors.Errorf("getWork: %w", err) } @@ -408,7 +408,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase ctx, cancel := context.WithCancel(ctx) defer cancel() - wk, wait, err := m.getWork(ctx, "PreCommit2", sector, phase1Out) + wk, wait, err := m.getWork(ctx, sealtasks.TTPreCommit2, sector, phase1Out) if err != nil { return storage.SectorCids{}, xerrors.Errorf("getWork: %w", err) } @@ -449,7 +449,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a ctx, cancel := context.WithCancel(ctx) defer cancel() - wk, wait, err := m.getWork(ctx, "Commit1", sector, ticket, seed, pieces, cids) + wk, wait, err := m.getWork(ctx, sealtasks.TTCommit1, sector, ticket, seed, pieces, cids) if err != nil { return storage.Commit1Out{}, xerrors.Errorf("getWork: %w", err) } @@ -490,7 +490,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a } func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (out storage.Proof, err error) { - wk, wait, err := m.getWork(ctx, "Commit2", sector, phase1Out) + wk, wait, err := m.getWork(ctx, sealtasks.TTCommit2, sector, phase1Out) if err != nil { return storage.Proof{}, xerrors.Errorf("getWork: %w", err) } diff --git a/extern/sector-storage/manager_calltracker.go b/extern/sector-storage/manager_calltracker.go index 01bc7c38d..865090467 100644 --- a/extern/sector-storage/manager_calltracker.go +++ b/extern/sector-storage/manager_calltracker.go @@ -8,12 +8,14 @@ import ( "errors" "fmt" "golang.org/x/xerrors" + "os" + "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) type WorkID struct { - Method string + Method sealtasks.TaskType Params string // json [...params] } @@ -40,7 +42,7 @@ type WorkState struct { WorkError string // Status = wsDone, set when failed to start work } -func newWorkID(method string, params ...interface{}) (WorkID, error) { +func newWorkID(method sealtasks.TaskType, params ...interface{}) (WorkID, error) { pb, err := json.Marshal(params) if err != nil { return WorkID{}, xerrors.Errorf("marshaling work params: %w", err) @@ -74,6 +76,10 @@ func (m *Manager) setupWorkTracker() { continue } + if os.Getenv("LOTUS_MINER_ABORT_UNFINISHED_WORK") == "1" { + st.Status = wsDone + } + switch st.Status { case wsStarted: log.Warnf("dropping non-running work %s", wid) @@ -96,7 +102,7 @@ func (m *Manager) setupWorkTracker() { } // returns wait=true when the task is already tracked/running -func (m *Manager) getWork(ctx context.Context, method string, params ...interface{}) (wid WorkID, wait bool, err error) { +func (m *Manager) getWork(ctx context.Context, method sealtasks.TaskType, params ...interface{}) (wid WorkID, wait bool, err error) { wid, err = newWorkID(method, params) if err != nil { return WorkID{}, false, xerrors.Errorf("creating WorkID: %w", err) diff --git a/extern/sector-storage/stats.go b/extern/sector-storage/stats.go index 9b8cbc24e..849322be0 100644 --- a/extern/sector-storage/stats.go +++ b/extern/sector-storage/stats.go @@ -56,7 +56,7 @@ func (m *Manager) WorkerJobs() map[int64][]storiface.WorkerJob { m.workLk.Lock() defer m.workLk.Unlock() - for id := range m.callToWork { + for id, work := range m.callToWork { _, found := calls[id] if found { continue @@ -65,7 +65,7 @@ func (m *Manager) WorkerJobs() map[int64][]storiface.WorkerJob { out[-1] = append(out[-1], storiface.WorkerJob{ ID: id, Sector: id.Sector, - Task: "???", + Task: work.Method, RunWait: -1, Start: time.Time{}, })