move requestremove inside runSched
This commit is contained in:
parent
cf78fa99ee
commit
c736dedfa6
@ -151,7 +151,7 @@ type StorageMiner interface {
|
|||||||
SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error) //perm:admin
|
SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error) //perm:admin
|
||||||
SealingAbort(ctx context.Context, call storiface.CallID) error //perm:admin
|
SealingAbort(ctx context.Context, call storiface.CallID) error //perm:admin
|
||||||
//SealingSchedRemove removes a request from sealing pipeline
|
//SealingSchedRemove removes a request from sealing pipeline
|
||||||
SealingRemoveRequest(ctx context.Context, sectorID abi.SectorID, task string, priority int) error //perm:admin
|
SealingRemoveRequest(ctx context.Context, SchedId uuid.UUID) error //perm:admin
|
||||||
|
|
||||||
// paths.SectorIndex
|
// paths.SectorIndex
|
||||||
StorageAttach(context.Context, storiface.StorageInfo, fsutil.FsStat) error //perm:admin
|
StorageAttach(context.Context, storiface.StorageInfo, fsutil.FsStat) error //perm:admin
|
||||||
|
@ -802,7 +802,7 @@ type StorageMinerStruct struct {
|
|||||||
|
|
||||||
SealingAbort func(p0 context.Context, p1 storiface.CallID) error `perm:"admin"`
|
SealingAbort func(p0 context.Context, p1 storiface.CallID) error `perm:"admin"`
|
||||||
|
|
||||||
SealingRemoveRequest func(p0 context.Context, p1 abi.SectorID, p2 string, p3 int) error `perm:"admin"`
|
SealingRemoveRequest func(p0 context.Context, p1 uuid.UUID) error `perm:"admin"`
|
||||||
|
|
||||||
SealingSchedDiag func(p0 context.Context, p1 bool) (interface{}, error) `perm:"admin"`
|
SealingSchedDiag func(p0 context.Context, p1 bool) (interface{}, error) `perm:"admin"`
|
||||||
|
|
||||||
@ -4763,14 +4763,14 @@ func (s *StorageMinerStub) SealingAbort(p0 context.Context, p1 storiface.CallID)
|
|||||||
return ErrNotSupported
|
return ErrNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StorageMinerStruct) SealingRemoveRequest(p0 context.Context, p1 abi.SectorID, p2 string, p3 int) error {
|
func (s *StorageMinerStruct) SealingRemoveRequest(p0 context.Context, p1 uuid.UUID) error {
|
||||||
if s.Internal.SealingRemoveRequest == nil {
|
if s.Internal.SealingRemoveRequest == nil {
|
||||||
return ErrNotSupported
|
return ErrNotSupported
|
||||||
}
|
}
|
||||||
return s.Internal.SealingRemoveRequest(p0, p1, p2, p3)
|
return s.Internal.SealingRemoveRequest(p0, p1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StorageMinerStub) SealingRemoveRequest(p0 context.Context, p1 abi.SectorID, p2 string, p3 int) error {
|
func (s *StorageMinerStub) SealingRemoveRequest(p0 context.Context, p1 uuid.UUID) error {
|
||||||
return ErrNotSupported
|
return ErrNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Binary file not shown.
@ -365,6 +365,12 @@ var sealingAbortCmd = &cli.Command{
|
|||||||
Name: "abort",
|
Name: "abort",
|
||||||
Usage: "Abort a running job",
|
Usage: "Abort a running job",
|
||||||
ArgsUsage: "[callid]",
|
ArgsUsage: "[callid]",
|
||||||
|
Flags: []cli.Flag{
|
||||||
|
&cli.BoolFlag{
|
||||||
|
Name: "requestId",
|
||||||
|
Usage: "Specifies that the argument is SchedId of the request to be removed from scheduler",
|
||||||
|
},
|
||||||
|
},
|
||||||
Action: func(cctx *cli.Context) error {
|
Action: func(cctx *cli.Context) error {
|
||||||
if cctx.Args().Len() != 1 {
|
if cctx.Args().Len() != 1 {
|
||||||
return xerrors.Errorf("expected 1 argument")
|
return xerrors.Errorf("expected 1 argument")
|
||||||
@ -378,6 +384,14 @@ var sealingAbortCmd = &cli.Command{
|
|||||||
|
|
||||||
ctx := lcli.ReqContext(cctx)
|
ctx := lcli.ReqContext(cctx)
|
||||||
|
|
||||||
|
if cctx.Bool("requestId") {
|
||||||
|
err = nodeApi.SealingRemoveRequest(ctx, uuid.Must(uuid.Parse(cctx.Args().First())))
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("Failed to removed the request with UUID %s: %w", cctx.Args().First(), err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
jobs, err := nodeApi.WorkerJobs(ctx)
|
jobs, err := nodeApi.WorkerJobs(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("getting worker jobs: %w", err)
|
return xerrors.Errorf("getting worker jobs: %w", err)
|
||||||
|
@ -2759,12 +2759,7 @@ Perms: admin
|
|||||||
Inputs:
|
Inputs:
|
||||||
```json
|
```json
|
||||||
[
|
[
|
||||||
{
|
"07070707-0707-0707-0707-070707070707"
|
||||||
"Miner": 1000,
|
|
||||||
"Number": 9
|
|
||||||
},
|
|
||||||
"string value",
|
|
||||||
123
|
|
||||||
]
|
]
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -2342,7 +2342,7 @@ USAGE:
|
|||||||
lotus-miner sealing abort [command options] [callid]
|
lotus-miner sealing abort [command options] [callid]
|
||||||
|
|
||||||
OPTIONS:
|
OPTIONS:
|
||||||
--help, -h show help (default: false)
|
--requestId Specifies that the argument is SchedId of the request to be removed from scheduler (default: false)
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -56,7 +56,6 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/storage/pipeline/sealiface"
|
"github.com/filecoin-project/lotus/storage/pipeline/sealiface"
|
||||||
"github.com/filecoin-project/lotus/storage/sealer"
|
"github.com/filecoin-project/lotus/storage/sealer"
|
||||||
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
||||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
|
||||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||||
"github.com/filecoin-project/lotus/storage/wdpost"
|
"github.com/filecoin-project/lotus/storage/wdpost"
|
||||||
@ -463,9 +462,8 @@ func (sm *StorageMinerAPI) SealingAbort(ctx context.Context, call storiface.Call
|
|||||||
return sm.StorageMgr.Abort(ctx, call)
|
return sm.StorageMgr.Abort(ctx, call)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) SealingRemoveRequest(ctx context.Context, sectorID abi.SectorID, task string, priority int) error {
|
func (sm *StorageMinerAPI) SealingRemoveRequest(ctx context.Context, SchedId uuid.UUID) error {
|
||||||
rtask := sealtasks.TaskType(task)
|
return sm.StorageMgr.RemoveSchedRequest(ctx, SchedId)
|
||||||
return sm.StorageMgr.RemoveSchedRequest(ctx, sectorID, rtask, priority)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error {
|
func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error {
|
||||||
|
@ -1168,10 +1168,10 @@ func (m *Manager) SchedDiag(ctx context.Context, doSched bool) (interface{}, err
|
|||||||
return i, nil
|
return i, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) RemoveSchedRequest(ctx context.Context, sectorID abi.SectorID, tasktype sealtasks.TaskType, priority int) error {
|
func (m *Manager) RemoveSchedRequest(ctx context.Context, SchedId uuid.UUID) error {
|
||||||
m.workLk.Lock()
|
m.workLk.Lock()
|
||||||
defer m.workLk.Unlock()
|
defer m.workLk.Unlock()
|
||||||
return m.sched.RemoveRequest(ctx, sectorID, tasktype, priority)
|
return m.sched.RemoveRequest(ctx, SchedId)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) Close(ctx context.Context) error {
|
func (m *Manager) Close(ctx context.Context) error {
|
||||||
|
@ -69,6 +69,7 @@ type Scheduler struct {
|
|||||||
workTracker *workTracker
|
workTracker *workTracker
|
||||||
|
|
||||||
info chan func(interface{})
|
info chan func(interface{})
|
||||||
|
rmRequest chan *rmRequest
|
||||||
|
|
||||||
closing chan struct{}
|
closing chan struct{}
|
||||||
closed chan struct{}
|
closed chan struct{}
|
||||||
@ -122,6 +123,7 @@ type WorkerRequest struct {
|
|||||||
TaskType sealtasks.TaskType
|
TaskType sealtasks.TaskType
|
||||||
Priority int // larger values more important
|
Priority int // larger values more important
|
||||||
Sel WorkerSelector
|
Sel WorkerSelector
|
||||||
|
SchedId uuid.UUID
|
||||||
|
|
||||||
prepare WorkerAction
|
prepare WorkerAction
|
||||||
work WorkerAction
|
work WorkerAction
|
||||||
@ -139,6 +141,13 @@ type workerResponse struct {
|
|||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type rmRequest struct {
|
||||||
|
id uuid.UUID
|
||||||
|
rmresE chan error
|
||||||
|
rmresC chan struct{}
|
||||||
|
Ctx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
func newScheduler(assigner string) (*Scheduler, error) {
|
func newScheduler(assigner string) (*Scheduler, error) {
|
||||||
var a Assigner
|
var a Assigner
|
||||||
switch assigner {
|
switch assigner {
|
||||||
@ -169,6 +178,7 @@ func newScheduler(assigner string) (*Scheduler, error) {
|
|||||||
},
|
},
|
||||||
|
|
||||||
info: make(chan func(interface{})),
|
info: make(chan func(interface{})),
|
||||||
|
rmRequest: make(chan *rmRequest, 1),
|
||||||
|
|
||||||
closing: make(chan struct{}),
|
closing: make(chan struct{}),
|
||||||
closed: make(chan struct{}),
|
closed: make(chan struct{}),
|
||||||
@ -184,6 +194,7 @@ func (sh *Scheduler) Schedule(ctx context.Context, sector storiface.SectorRef, t
|
|||||||
TaskType: taskType,
|
TaskType: taskType,
|
||||||
Priority: getPriority(ctx),
|
Priority: getPriority(ctx),
|
||||||
Sel: sel,
|
Sel: sel,
|
||||||
|
SchedId: uuid.New(),
|
||||||
|
|
||||||
prepare: prepare,
|
prepare: prepare,
|
||||||
work: work,
|
work: work,
|
||||||
@ -228,6 +239,7 @@ type SchedDiagRequestInfo struct {
|
|||||||
Sector abi.SectorID
|
Sector abi.SectorID
|
||||||
TaskType sealtasks.TaskType
|
TaskType sealtasks.TaskType
|
||||||
Priority int
|
Priority int
|
||||||
|
SchedId uuid.UUID
|
||||||
}
|
}
|
||||||
|
|
||||||
type SchedDiagInfo struct {
|
type SchedDiagInfo struct {
|
||||||
@ -246,6 +258,9 @@ func (sh *Scheduler) runSched() {
|
|||||||
var toDisable []workerDisableReq
|
var toDisable []workerDisableReq
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
case rmreq := <-sh.rmRequest:
|
||||||
|
sh.removeRequest(rmreq)
|
||||||
|
doSched = true
|
||||||
case <-sh.workerChange:
|
case <-sh.workerChange:
|
||||||
doSched = true
|
doSched = true
|
||||||
case dreq := <-sh.workerDisable:
|
case dreq := <-sh.workerDisable:
|
||||||
@ -263,7 +278,6 @@ func (sh *Scheduler) runSched() {
|
|||||||
doSched = true
|
doSched = true
|
||||||
case ireq := <-sh.info:
|
case ireq := <-sh.info:
|
||||||
ireq(sh.diag())
|
ireq(sh.diag())
|
||||||
|
|
||||||
case <-iw:
|
case <-iw:
|
||||||
initialised = true
|
initialised = true
|
||||||
iw = nil
|
iw = nil
|
||||||
@ -332,6 +346,7 @@ func (sh *Scheduler) diag() SchedDiagInfo {
|
|||||||
Sector: task.Sector.ID,
|
Sector: task.Sector.ID,
|
||||||
TaskType: task.TaskType,
|
TaskType: task.TaskType,
|
||||||
Priority: task.Priority,
|
Priority: task.Priority,
|
||||||
|
SchedId: task.SchedId,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -381,20 +396,49 @@ func (sh *Scheduler) Info(ctx context.Context) (interface{}, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sh *Scheduler) RemoveRequest(ctx context.Context, sectorID abi.SectorID, tasktype sealtasks.TaskType, priority int) error {
|
func (sh *Scheduler) removeRequest(rmrequest *rmRequest) {
|
||||||
|
|
||||||
if sh.SchedQueue.Len() < 0 {
|
if sh.SchedQueue.Len() < 0 {
|
||||||
return xerrors.Errorf("No requests in the scheduler")
|
rmrequest.rmresE <- xerrors.New("No requests in the scheduler")
|
||||||
}
|
}
|
||||||
sh.workersLk.Lock()
|
|
||||||
defer sh.workersLk.Unlock()
|
|
||||||
queue := sh.SchedQueue
|
queue := sh.SchedQueue
|
||||||
for i, r := range *queue {
|
for i, r := range *queue {
|
||||||
if r.Sector.ID == sectorID && r.Priority == priority && r.TaskType == tasktype { // TODO: Add check to ensure request in not scheduled
|
if r.SchedId == rmrequest.id {
|
||||||
queue.Remove(i)
|
queue.Remove(i)
|
||||||
|
rmrequest.rmresC <- struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rmrequest.rmresE <- xerrors.New("No request with provided details found")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sh *Scheduler) RemoveRequest(ctx context.Context, schedId uuid.UUID) error {
|
||||||
|
retE := make(chan error)
|
||||||
|
retC := make(chan struct{})
|
||||||
|
|
||||||
|
select {
|
||||||
|
case sh.rmRequest <- &rmRequest{
|
||||||
|
id: schedId,
|
||||||
|
rmresE: retE,
|
||||||
|
rmresC: retC,
|
||||||
|
Ctx: ctx,
|
||||||
|
}:
|
||||||
|
case <-sh.closing:
|
||||||
|
return xerrors.New("closing")
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case resp := <-retE:
|
||||||
|
return resp
|
||||||
|
case <-sh.closing:
|
||||||
|
return xerrors.New("closing")
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-retC:
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return xerrors.Errorf("No request with provided details found")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sh *Scheduler) Close(ctx context.Context) error {
|
func (sh *Scheduler) Close(ctx context.Context) error {
|
||||||
|
Loading…
Reference in New Issue
Block a user