diff --git a/api/api_storage.go b/api/api_storage.go index 9c3262df9..d86ed4724 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -151,7 +151,7 @@ type StorageMiner interface { SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error) //perm:admin SealingAbort(ctx context.Context, call storiface.CallID) error //perm:admin //SealingSchedRemove removes a request from sealing pipeline - SealingRemoveRequest(ctx context.Context, sectorID abi.SectorID, task string, priority int) error //perm:admin + SealingRemoveRequest(ctx context.Context, SchedId uuid.UUID) error //perm:admin // paths.SectorIndex StorageAttach(context.Context, storiface.StorageInfo, fsutil.FsStat) error //perm:admin diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 7c377f9b4..506e615f2 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -802,7 +802,7 @@ type StorageMinerStruct struct { SealingAbort func(p0 context.Context, p1 storiface.CallID) error `perm:"admin"` - SealingRemoveRequest func(p0 context.Context, p1 abi.SectorID, p2 string, p3 int) error `perm:"admin"` + SealingRemoveRequest func(p0 context.Context, p1 uuid.UUID) error `perm:"admin"` SealingSchedDiag func(p0 context.Context, p1 bool) (interface{}, error) `perm:"admin"` @@ -4763,14 +4763,14 @@ func (s *StorageMinerStub) SealingAbort(p0 context.Context, p1 storiface.CallID) return ErrNotSupported } -func (s *StorageMinerStruct) SealingRemoveRequest(p0 context.Context, p1 abi.SectorID, p2 string, p3 int) error { +func (s *StorageMinerStruct) SealingRemoveRequest(p0 context.Context, p1 uuid.UUID) error { if s.Internal.SealingRemoveRequest == nil { return ErrNotSupported } - return s.Internal.SealingRemoveRequest(p0, p1, p2, p3) + return s.Internal.SealingRemoveRequest(p0, p1) } -func (s *StorageMinerStub) SealingRemoveRequest(p0 context.Context, p1 abi.SectorID, p2 string, p3 int) error { +func (s *StorageMinerStub) SealingRemoveRequest(p0 context.Context, p1 uuid.UUID) error { return ErrNotSupported } diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index ca987eeef..310ac2727 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/cmd/lotus-miner/sealing.go b/cmd/lotus-miner/sealing.go index d334f657f..867504e2a 100644 --- a/cmd/lotus-miner/sealing.go +++ b/cmd/lotus-miner/sealing.go @@ -365,6 +365,12 @@ var sealingAbortCmd = &cli.Command{ Name: "abort", Usage: "Abort a running job", ArgsUsage: "[callid]", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "requestId", + Usage: "Specifies that the argument is SchedId of the request to be removed from scheduler", + }, + }, Action: func(cctx *cli.Context) error { if cctx.Args().Len() != 1 { return xerrors.Errorf("expected 1 argument") @@ -378,6 +384,14 @@ var sealingAbortCmd = &cli.Command{ ctx := lcli.ReqContext(cctx) + if cctx.Bool("requestId") { + err = nodeApi.SealingRemoveRequest(ctx, uuid.Must(uuid.Parse(cctx.Args().First()))) + if err != nil { + return xerrors.Errorf("Failed to removed the request with UUID %s: %w", cctx.Args().First(), err) + } + return nil + } + jobs, err := nodeApi.WorkerJobs(ctx) if err != nil { return xerrors.Errorf("getting worker jobs: %w", err) diff --git a/documentation/en/api-v0-methods-miner.md b/documentation/en/api-v0-methods-miner.md index 885b19a08..4507e03b8 100644 --- a/documentation/en/api-v0-methods-miner.md +++ b/documentation/en/api-v0-methods-miner.md @@ -2759,12 +2759,7 @@ Perms: admin Inputs: ```json [ - { - "Miner": 1000, - "Number": 9 - }, - "string value", - 123 + "07070707-0707-0707-0707-070707070707" ] ``` diff --git a/documentation/en/cli-lotus-miner.md b/documentation/en/cli-lotus-miner.md index 883c66ae8..45763e8e4 100644 --- a/documentation/en/cli-lotus-miner.md +++ b/documentation/en/cli-lotus-miner.md @@ -2342,7 +2342,7 @@ USAGE: lotus-miner sealing abort [command options] [callid] OPTIONS: - --help, -h show help (default: false) + --requestId Specifies that the argument is SchedId of the request to be removed from scheduler (default: false) ``` diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 264349116..5d81f30c4 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -56,7 +56,6 @@ import ( "github.com/filecoin-project/lotus/storage/pipeline/sealiface" "github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer/fsutil" - "github.com/filecoin-project/lotus/storage/sealer/sealtasks" "github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/sectorblocks" "github.com/filecoin-project/lotus/storage/wdpost" @@ -463,9 +462,8 @@ func (sm *StorageMinerAPI) SealingAbort(ctx context.Context, call storiface.Call return sm.StorageMgr.Abort(ctx, call) } -func (sm *StorageMinerAPI) SealingRemoveRequest(ctx context.Context, sectorID abi.SectorID, task string, priority int) error { - rtask := sealtasks.TaskType(task) - return sm.StorageMgr.RemoveSchedRequest(ctx, sectorID, rtask, priority) +func (sm *StorageMinerAPI) SealingRemoveRequest(ctx context.Context, SchedId uuid.UUID) error { + return sm.StorageMgr.RemoveSchedRequest(ctx, SchedId) } func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error { diff --git a/storage/sealer/manager.go b/storage/sealer/manager.go index 11061a626..5abfe0869 100644 --- a/storage/sealer/manager.go +++ b/storage/sealer/manager.go @@ -1168,10 +1168,10 @@ func (m *Manager) SchedDiag(ctx context.Context, doSched bool) (interface{}, err return i, nil } -func (m *Manager) RemoveSchedRequest(ctx context.Context, sectorID abi.SectorID, tasktype sealtasks.TaskType, priority int) error { +func (m *Manager) RemoveSchedRequest(ctx context.Context, SchedId uuid.UUID) error { m.workLk.Lock() defer m.workLk.Unlock() - return m.sched.RemoveRequest(ctx, sectorID, tasktype, priority) + return m.sched.RemoveRequest(ctx, SchedId) } func (m *Manager) Close(ctx context.Context) error { diff --git a/storage/sealer/sched.go b/storage/sealer/sched.go index 8c6ece4d0..8066bb7bb 100644 --- a/storage/sealer/sched.go +++ b/storage/sealer/sched.go @@ -68,7 +68,8 @@ type Scheduler struct { workTracker *workTracker - info chan func(interface{}) + info chan func(interface{}) + rmRequest chan *rmRequest closing chan struct{} closed chan struct{} @@ -122,6 +123,7 @@ type WorkerRequest struct { TaskType sealtasks.TaskType Priority int // larger values more important Sel WorkerSelector + SchedId uuid.UUID prepare WorkerAction work WorkerAction @@ -139,6 +141,13 @@ type workerResponse struct { err error } +type rmRequest struct { + id uuid.UUID + rmresE chan error + rmresC chan struct{} + Ctx context.Context +} + func newScheduler(assigner string) (*Scheduler, error) { var a Assigner switch assigner { @@ -168,7 +177,8 @@ func newScheduler(assigner string) (*Scheduler, error) { prepared: map[uuid.UUID]trackedWork{}, }, - info: make(chan func(interface{})), + info: make(chan func(interface{})), + rmRequest: make(chan *rmRequest, 1), closing: make(chan struct{}), closed: make(chan struct{}), @@ -184,6 +194,7 @@ func (sh *Scheduler) Schedule(ctx context.Context, sector storiface.SectorRef, t TaskType: taskType, Priority: getPriority(ctx), Sel: sel, + SchedId: uuid.New(), prepare: prepare, work: work, @@ -228,6 +239,7 @@ type SchedDiagRequestInfo struct { Sector abi.SectorID TaskType sealtasks.TaskType Priority int + SchedId uuid.UUID } type SchedDiagInfo struct { @@ -246,6 +258,9 @@ func (sh *Scheduler) runSched() { var toDisable []workerDisableReq select { + case rmreq := <-sh.rmRequest: + sh.removeRequest(rmreq) + doSched = true case <-sh.workerChange: doSched = true case dreq := <-sh.workerDisable: @@ -263,7 +278,6 @@ func (sh *Scheduler) runSched() { doSched = true case ireq := <-sh.info: ireq(sh.diag()) - case <-iw: initialised = true iw = nil @@ -332,6 +346,7 @@ func (sh *Scheduler) diag() SchedDiagInfo { Sector: task.Sector.ID, TaskType: task.TaskType, Priority: task.Priority, + SchedId: task.SchedId, }) } @@ -381,20 +396,49 @@ func (sh *Scheduler) Info(ctx context.Context) (interface{}, error) { } } -func (sh *Scheduler) RemoveRequest(ctx context.Context, sectorID abi.SectorID, tasktype sealtasks.TaskType, priority int) error { +func (sh *Scheduler) removeRequest(rmrequest *rmRequest) { + if sh.SchedQueue.Len() < 0 { - return xerrors.Errorf("No requests in the scheduler") + rmrequest.rmresE <- xerrors.New("No requests in the scheduler") } - sh.workersLk.Lock() - defer sh.workersLk.Unlock() + queue := sh.SchedQueue for i, r := range *queue { - if r.Sector.ID == sectorID && r.Priority == priority && r.TaskType == tasktype { // TODO: Add check to ensure request in not scheduled + if r.SchedId == rmrequest.id { queue.Remove(i) - return nil + rmrequest.rmresC <- struct{}{} } } - return xerrors.Errorf("No request with provided details found") + rmrequest.rmresE <- xerrors.New("No request with provided details found") +} + +func (sh *Scheduler) RemoveRequest(ctx context.Context, schedId uuid.UUID) error { + retE := make(chan error) + retC := make(chan struct{}) + + select { + case sh.rmRequest <- &rmRequest{ + id: schedId, + rmresE: retE, + rmresC: retC, + Ctx: ctx, + }: + case <-sh.closing: + return xerrors.New("closing") + case <-ctx.Done(): + return ctx.Err() + } + + select { + case resp := <-retE: + return resp + case <-sh.closing: + return xerrors.New("closing") + case <-ctx.Done(): + return ctx.Err() + case <-retC: + return nil + } } func (sh *Scheduler) Close(ctx context.Context) error {