Create a command to abort sealing calls

This commit is contained in:
Łukasz Magiera 2020-11-11 17:39:12 +01:00
parent 2a3d930933
commit 09f9f871a3
6 changed files with 79 additions and 9 deletions

View File

@ -71,6 +71,7 @@ type StorageMiner interface {
// SealingSchedDiag dumps internal sealing scheduler state
SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error)
SealingAbort(ctx context.Context, call storiface.CallID) error
stores.SectorIndex

View File

@ -324,6 +324,7 @@ type StorageMinerStruct struct {
ReturnFetch func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"`
SealingSchedDiag func(context.Context, bool) (interface{}, error) `perm:"admin"`
SealingAbort func(ctx context.Context, call storiface.CallID) error `perm:"admin"`
StorageList func(context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"`
StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"`
@ -1318,6 +1319,10 @@ func (c *StorageMinerStruct) SealingSchedDiag(ctx context.Context, doSched bool)
return c.Internal.SealingSchedDiag(ctx, doSched)
}
func (c *StorageMinerStruct) SealingAbort(ctx context.Context, call storiface.CallID) error {
return c.Internal.SealingAbort(ctx, call)
}
func (c *StorageMinerStruct) StorageAttach(ctx context.Context, si stores.StorageInfo, st fsutil.FsStat) error {
return c.Internal.StorageAttach(ctx, si, st)
}

View File

@ -28,6 +28,7 @@ var sealingCmd = &cli.Command{
sealingJobsCmd,
sealingWorkersCmd,
sealingSchedDiagCmd,
sealingAbortCmd,
},
}
@ -124,7 +125,7 @@ var sealingWorkersCmd = &cli.Command{
var sealingJobsCmd = &cli.Command{
Name: "jobs",
Usage: "list workers",
Usage: "list running jobs",
Flags: []cli.Flag{
&cli.BoolFlag{Name: "color"},
&cli.BoolFlag{
@ -215,9 +216,9 @@ var sealingJobsCmd = &cli.Command{
}
_, _ = fmt.Fprintf(tw, "%s\t%d\t%s\t%s\t%s\t%s\t%s\n",
hex.EncodeToString(l.ID.ID[10:]),
hex.EncodeToString(l.ID.ID[:4]),
l.Sector.Number,
hex.EncodeToString(l.wid[5:]),
hex.EncodeToString(l.wid[:4]),
hostname,
l.Task.Short(),
state,
@ -260,3 +261,46 @@ var sealingSchedDiagCmd = &cli.Command{
return nil
},
}
var sealingAbortCmd = &cli.Command{
Name: "abort",
Usage: "Abort a running job",
ArgsUsage: "[call id]",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 1 {
return xerrors.Errorf("expected 1 argument")
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
jobs, err := nodeApi.WorkerJobs(ctx)
if err != nil {
return xerrors.Errorf("getting worker jobs: %w", err)
}
var job *storiface.WorkerJob
outer:
for _, workerJobs := range jobs {
for _, j := range workerJobs {
if strings.HasPrefix(j.ID.ID.String(), cctx.Args().First()) {
job = &j
break outer
}
}
}
if job == nil {
return xerrors.Errorf("job with specified id prefix not found")
}
fmt.Printf("aborting job %s, task %s, sector %d, running on host %s\n", job.ID.String(), job.Task.Short(), job.Sector.Number, job.Hostname)
return nodeApi.SealingAbort(ctx, job.ID)
},
}

View File

@ -220,7 +220,9 @@ func (m *Manager) readPiece(sink io.Writer, sector abi.SectorID, offset storifac
if err != nil {
return err
}
*rok = r.(bool)
if r != nil {
*rok = r.(bool)
}
return nil
}
}
@ -342,7 +344,9 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
if err != nil {
return err
}
out = p.(abi.PieceInfo)
if p != nil {
out = p.(abi.PieceInfo)
}
return nil
})
@ -366,7 +370,9 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
waitErr = werr
return
}
out = p.(storage.PreCommit1Out)
if p != nil {
out = p.(storage.PreCommit1Out)
}
}
if wait { // already in progress
@ -415,7 +421,9 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
waitErr = werr
return
}
out = p.(storage.SectorCids)
if p != nil {
out = p.(storage.SectorCids)
}
}
if wait { // already in progress
@ -462,7 +470,9 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
waitErr = werr
return
}
out = p.(storage.Commit1Out)
if p != nil {
out = p.(storage.Commit1Out)
}
}
if wait { // already in progress
@ -509,7 +519,9 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou
waitErr = werr
return
}
out = p.(storage.Proof)
if p != nil {
out = p.(storage.Proof)
}
}
if wait { // already in progress

View File

@ -414,3 +414,7 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, serr stri
return nil
}
func (m *Manager) Abort(ctx context.Context, call storiface.CallID) error {
return m.returnResult(call, nil, "task aborted")
}

View File

@ -300,6 +300,10 @@ func (sm *StorageMinerAPI) SealingSchedDiag(ctx context.Context, doSched bool) (
return sm.StorageMgr.SchedDiag(ctx, doSched)
}
func (sm *StorageMinerAPI) SealingAbort(ctx context.Context, call storiface.CallID) error {
return sm.StorageMgr.Abort(ctx, call)
}
func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error {
fi, err := os.Open(path)
if err != nil {