Merge pull request #4804 from filecoin-project/feat/storage-retwait-cleanup

Expand sched-diag; Command to abort sealing calls
This commit is contained in:
Aayush Rajasekaran 2020-11-12 02:11:15 -05:00 committed by GitHub
commit 3a3986320c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 130 additions and 11 deletions

View File

@ -71,6 +71,7 @@ type StorageMiner interface {
// SealingSchedDiag dumps internal sealing scheduler state // SealingSchedDiag dumps internal sealing scheduler state
SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error) SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error)
SealingAbort(ctx context.Context, call storiface.CallID) error
stores.SectorIndex stores.SectorIndex

View File

@ -324,6 +324,7 @@ type StorageMinerStruct struct {
ReturnFetch func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"` ReturnFetch func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"`
SealingSchedDiag func(context.Context, bool) (interface{}, error) `perm:"admin"` SealingSchedDiag func(context.Context, bool) (interface{}, error) `perm:"admin"`
SealingAbort func(ctx context.Context, call storiface.CallID) error `perm:"admin"`
StorageList func(context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"` StorageList func(context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"`
StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"` StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"`
@ -1318,6 +1319,10 @@ func (c *StorageMinerStruct) SealingSchedDiag(ctx context.Context, doSched bool)
return c.Internal.SealingSchedDiag(ctx, doSched) return c.Internal.SealingSchedDiag(ctx, doSched)
} }
func (c *StorageMinerStruct) SealingAbort(ctx context.Context, call storiface.CallID) error {
return c.Internal.SealingAbort(ctx, call)
}
func (c *StorageMinerStruct) StorageAttach(ctx context.Context, si stores.StorageInfo, st fsutil.FsStat) error { func (c *StorageMinerStruct) StorageAttach(ctx context.Context, si stores.StorageInfo, st fsutil.FsStat) error {
return c.Internal.StorageAttach(ctx, si, st) return c.Internal.StorageAttach(ctx, si, st)
} }

View File

@ -28,6 +28,7 @@ var sealingCmd = &cli.Command{
sealingJobsCmd, sealingJobsCmd,
sealingWorkersCmd, sealingWorkersCmd,
sealingSchedDiagCmd, sealingSchedDiagCmd,
sealingAbortCmd,
}, },
} }
@ -124,9 +125,13 @@ var sealingWorkersCmd = &cli.Command{
var sealingJobsCmd = &cli.Command{ var sealingJobsCmd = &cli.Command{
Name: "jobs", Name: "jobs",
Usage: "list workers", Usage: "list running jobs",
Flags: []cli.Flag{ Flags: []cli.Flag{
&cli.BoolFlag{Name: "color"}, &cli.BoolFlag{Name: "color"},
&cli.BoolFlag{
Name: "show-ret-done",
Usage: "show returned but not consumed calls",
},
}, },
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
color.NoColor = !cctx.Bool("color") color.NoColor = !cctx.Bool("color")
@ -191,6 +196,9 @@ var sealingJobsCmd = &cli.Command{
case l.RunWait > 0: case l.RunWait > 0:
state = fmt.Sprintf("assigned(%d)", l.RunWait-1) state = fmt.Sprintf("assigned(%d)", l.RunWait-1)
case l.RunWait == storiface.RWRetDone: case l.RunWait == storiface.RWRetDone:
if !cctx.Bool("show-ret-done") {
continue
}
state = "ret-done" state = "ret-done"
case l.RunWait == storiface.RWReturned: case l.RunWait == storiface.RWReturned:
state = "returned" state = "returned"
@ -208,9 +216,9 @@ var sealingJobsCmd = &cli.Command{
} }
_, _ = fmt.Fprintf(tw, "%s\t%d\t%s\t%s\t%s\t%s\t%s\n", _, _ = fmt.Fprintf(tw, "%s\t%d\t%s\t%s\t%s\t%s\t%s\n",
hex.EncodeToString(l.ID.ID[10:]), hex.EncodeToString(l.ID.ID[:4]),
l.Sector.Number, l.Sector.Number,
hex.EncodeToString(l.wid[5:]), hex.EncodeToString(l.wid[:4]),
hostname, hostname,
l.Task.Short(), l.Task.Short(),
state, state,
@ -253,3 +261,47 @@ var sealingSchedDiagCmd = &cli.Command{
return nil return nil
}, },
} }
var sealingAbortCmd = &cli.Command{
Name: "abort",
Usage: "Abort a running job",
ArgsUsage: "[callid]",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 1 {
return xerrors.Errorf("expected 1 argument")
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
jobs, err := nodeApi.WorkerJobs(ctx)
if err != nil {
return xerrors.Errorf("getting worker jobs: %w", err)
}
var job *storiface.WorkerJob
outer:
for _, workerJobs := range jobs {
for _, j := range workerJobs {
if strings.HasPrefix(j.ID.ID.String(), cctx.Args().First()) {
j := j
job = &j
break outer
}
}
}
if job == nil {
return xerrors.Errorf("job with specified id prefix not found")
}
fmt.Printf("aborting job %s, task %s, sector %d, running on host %s\n", job.ID.String(), job.Task.Short(), job.Sector.Number, job.Hostname)
return nodeApi.SealingAbort(ctx, job.ID)
},
}

View File

@ -220,7 +220,9 @@ func (m *Manager) readPiece(sink io.Writer, sector abi.SectorID, offset storifac
if err != nil { if err != nil {
return err return err
} }
if r != nil {
*rok = r.(bool) *rok = r.(bool)
}
return nil return nil
} }
} }
@ -342,7 +344,9 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
if err != nil { if err != nil {
return err return err
} }
if p != nil {
out = p.(abi.PieceInfo) out = p.(abi.PieceInfo)
}
return nil return nil
}) })
@ -366,8 +370,10 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
waitErr = werr waitErr = werr
return return
} }
if p != nil {
out = p.(storage.PreCommit1Out) out = p.(storage.PreCommit1Out)
} }
}
if wait { // already in progress if wait { // already in progress
waitRes() waitRes()
@ -415,8 +421,10 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
waitErr = werr waitErr = werr
return return
} }
if p != nil {
out = p.(storage.SectorCids) out = p.(storage.SectorCids)
} }
}
if wait { // already in progress if wait { // already in progress
waitRes() waitRes()
@ -462,8 +470,10 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
waitErr = werr waitErr = werr
return return
} }
if p != nil {
out = p.(storage.Commit1Out) out = p.(storage.Commit1Out)
} }
}
if wait { // already in progress if wait { // already in progress
waitRes() waitRes()
@ -509,8 +519,10 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou
waitErr = werr waitErr = werr
return return
} }
if p != nil {
out = p.(storage.Proof) out = p.(storage.Proof)
} }
}
if wait { // already in progress if wait { // already in progress
waitRes() waitRes()
@ -688,7 +700,48 @@ func (m *Manager) SchedDiag(ctx context.Context, doSched bool) (interface{}, err
} }
} }
return m.sched.Info(ctx) si, err := m.sched.Info(ctx)
if err != nil {
return nil, err
}
type SchedInfo interface{}
i := struct {
SchedInfo
ReturnedWork []string
Waiting []string
CallToWork map[string]string
EarlyRet []string
}{
SchedInfo: si,
CallToWork: map[string]string{},
}
m.workLk.Lock()
for w := range m.results {
i.ReturnedWork = append(i.ReturnedWork, w.String())
}
for id := range m.callRes {
i.EarlyRet = append(i.EarlyRet, id.String())
}
for w := range m.waitRes {
i.Waiting = append(i.Waiting, w.String())
}
for c, w := range m.callToWork {
i.CallToWork[c.String()] = w.String()
}
m.workLk.Unlock()
return i, nil
} }
func (m *Manager) Close(ctx context.Context) error { func (m *Manager) Close(ctx context.Context) error {

View File

@ -414,3 +414,7 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, serr stri
return nil return nil
} }
func (m *Manager) Abort(ctx context.Context, call storiface.CallID) error {
return m.returnResult(call, nil, "task aborted")
}

View File

@ -300,6 +300,10 @@ func (sm *StorageMinerAPI) SealingSchedDiag(ctx context.Context, doSched bool) (
return sm.StorageMgr.SchedDiag(ctx, doSched) return sm.StorageMgr.SchedDiag(ctx, doSched)
} }
func (sm *StorageMinerAPI) SealingAbort(ctx context.Context, call storiface.CallID) error {
return sm.StorageMgr.Abort(ctx, call)
}
func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error { func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error {
fi, err := os.Open(path) fi, err := os.Open(path)
if err != nil { if err != nil {