Merge branch 'feat/SchedRemoveRequest' of github.com:LexLuthr/lotus into LexLuthr-feat/SchedRemoveRequest
This commit is contained in:
commit
11e4914531
@ -150,6 +150,8 @@ type StorageMiner interface {
|
|||||||
// SealingSchedDiag dumps internal sealing scheduler state
|
// SealingSchedDiag dumps internal sealing scheduler state
|
||||||
SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error) //perm:admin
|
SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error) //perm:admin
|
||||||
SealingAbort(ctx context.Context, call storiface.CallID) error //perm:admin
|
SealingAbort(ctx context.Context, call storiface.CallID) error //perm:admin
|
||||||
|
//SealingSchedRemove removes a request from sealing pipeline
|
||||||
|
SealingRemoveRequest(ctx context.Context, schedId uuid.UUID) error //perm:admin
|
||||||
|
|
||||||
// paths.SectorIndex
|
// paths.SectorIndex
|
||||||
StorageAttach(context.Context, storiface.StorageInfo, fsutil.FsStat) error //perm:admin
|
StorageAttach(context.Context, storiface.StorageInfo, fsutil.FsStat) error //perm:admin
|
||||||
|
@ -802,6 +802,8 @@ type StorageMinerStruct struct {
|
|||||||
|
|
||||||
SealingAbort func(p0 context.Context, p1 storiface.CallID) error `perm:"admin"`
|
SealingAbort func(p0 context.Context, p1 storiface.CallID) error `perm:"admin"`
|
||||||
|
|
||||||
|
SealingRemoveRequest func(p0 context.Context, p1 uuid.UUID) error `perm:"admin"`
|
||||||
|
|
||||||
SealingSchedDiag func(p0 context.Context, p1 bool) (interface{}, error) `perm:"admin"`
|
SealingSchedDiag func(p0 context.Context, p1 bool) (interface{}, error) `perm:"admin"`
|
||||||
|
|
||||||
SectorAbortUpgrade func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"`
|
SectorAbortUpgrade func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"`
|
||||||
@ -4761,6 +4763,17 @@ func (s *StorageMinerStub) SealingAbort(p0 context.Context, p1 storiface.CallID)
|
|||||||
return ErrNotSupported
|
return ErrNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *StorageMinerStruct) SealingRemoveRequest(p0 context.Context, p1 uuid.UUID) error {
|
||||||
|
if s.Internal.SealingRemoveRequest == nil {
|
||||||
|
return ErrNotSupported
|
||||||
|
}
|
||||||
|
return s.Internal.SealingRemoveRequest(p0, p1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StorageMinerStub) SealingRemoveRequest(p0 context.Context, p1 uuid.UUID) error {
|
||||||
|
return ErrNotSupported
|
||||||
|
}
|
||||||
|
|
||||||
func (s *StorageMinerStruct) SealingSchedDiag(p0 context.Context, p1 bool) (interface{}, error) {
|
func (s *StorageMinerStruct) SealingSchedDiag(p0 context.Context, p1 bool) (interface{}, error) {
|
||||||
if s.Internal.SealingSchedDiag == nil {
|
if s.Internal.SealingSchedDiag == nil {
|
||||||
return nil, ErrNotSupported
|
return nil, ErrNotSupported
|
||||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -365,6 +365,12 @@ var sealingAbortCmd = &cli.Command{
|
|||||||
Name: "abort",
|
Name: "abort",
|
||||||
Usage: "Abort a running job",
|
Usage: "Abort a running job",
|
||||||
ArgsUsage: "[callid]",
|
ArgsUsage: "[callid]",
|
||||||
|
Flags: []cli.Flag{
|
||||||
|
&cli.BoolFlag{
|
||||||
|
Name: "sched",
|
||||||
|
Usage: "Specifies that the argument is UUID of the request to be removed from scheduler",
|
||||||
|
},
|
||||||
|
},
|
||||||
Action: func(cctx *cli.Context) error {
|
Action: func(cctx *cli.Context) error {
|
||||||
if cctx.Args().Len() != 1 {
|
if cctx.Args().Len() != 1 {
|
||||||
return xerrors.Errorf("expected 1 argument")
|
return xerrors.Errorf("expected 1 argument")
|
||||||
@ -378,6 +384,14 @@ var sealingAbortCmd = &cli.Command{
|
|||||||
|
|
||||||
ctx := lcli.ReqContext(cctx)
|
ctx := lcli.ReqContext(cctx)
|
||||||
|
|
||||||
|
if cctx.Bool("sched") {
|
||||||
|
err = nodeApi.SealingRemoveRequest(ctx, uuid.Must(uuid.Parse(cctx.Args().First())))
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("Failed to removed the request with UUID %s: %w", cctx.Args().First(), err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
jobs, err := nodeApi.WorkerJobs(ctx)
|
jobs, err := nodeApi.WorkerJobs(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("getting worker jobs: %w", err)
|
return xerrors.Errorf("getting worker jobs: %w", err)
|
||||||
|
@ -128,6 +128,7 @@
|
|||||||
* [RuntimeSubsystems](#RuntimeSubsystems)
|
* [RuntimeSubsystems](#RuntimeSubsystems)
|
||||||
* [Sealing](#Sealing)
|
* [Sealing](#Sealing)
|
||||||
* [SealingAbort](#SealingAbort)
|
* [SealingAbort](#SealingAbort)
|
||||||
|
* [SealingRemoveRequest](#SealingRemoveRequest)
|
||||||
* [SealingSchedDiag](#SealingSchedDiag)
|
* [SealingSchedDiag](#SealingSchedDiag)
|
||||||
* [Sector](#Sector)
|
* [Sector](#Sector)
|
||||||
* [SectorAbortUpgrade](#SectorAbortUpgrade)
|
* [SectorAbortUpgrade](#SectorAbortUpgrade)
|
||||||
@ -2749,6 +2750,21 @@ Inputs:
|
|||||||
|
|
||||||
Response: `{}`
|
Response: `{}`
|
||||||
|
|
||||||
|
### SealingRemoveRequest
|
||||||
|
SealingSchedRemove removes a request from sealing pipeline
|
||||||
|
|
||||||
|
|
||||||
|
Perms: admin
|
||||||
|
|
||||||
|
Inputs:
|
||||||
|
```json
|
||||||
|
[
|
||||||
|
"07070707-0707-0707-0707-070707070707"
|
||||||
|
]
|
||||||
|
```
|
||||||
|
|
||||||
|
Response: `{}`
|
||||||
|
|
||||||
### SealingSchedDiag
|
### SealingSchedDiag
|
||||||
SealingSchedDiag dumps internal sealing scheduler state
|
SealingSchedDiag dumps internal sealing scheduler state
|
||||||
|
|
||||||
|
@ -2342,7 +2342,7 @@ USAGE:
|
|||||||
lotus-miner sealing abort [command options] [callid]
|
lotus-miner sealing abort [command options] [callid]
|
||||||
|
|
||||||
OPTIONS:
|
OPTIONS:
|
||||||
--help, -h show help (default: false)
|
--sched Specifies that the argument is UUID of the request to be removed from scheduler (default: false)
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -2,11 +2,13 @@ package itests
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
@ -14,6 +16,7 @@ import (
|
|||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/itests/kit"
|
"github.com/filecoin-project/lotus/itests/kit"
|
||||||
@ -21,6 +24,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/node/impl"
|
"github.com/filecoin-project/lotus/node/impl"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
"github.com/filecoin-project/lotus/storage/paths"
|
"github.com/filecoin-project/lotus/storage/paths"
|
||||||
|
sealing "github.com/filecoin-project/lotus/storage/pipeline"
|
||||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||||
"github.com/filecoin-project/lotus/storage/wdpost"
|
"github.com/filecoin-project/lotus/storage/wdpost"
|
||||||
@ -402,6 +406,90 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
|
|||||||
require.Len(t, lastPending, 0)
|
require.Len(t, lastPending, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSchedulerRemoveRequest(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
_, miner, worker, ens := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
|
||||||
|
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTDataCid, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit2, sealtasks.TTUnseal})) // no mock proofs
|
||||||
|
|
||||||
|
ens.InterconnectAll().BeginMining(50 * time.Millisecond)
|
||||||
|
|
||||||
|
e, err := worker.Enabled(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, e)
|
||||||
|
|
||||||
|
type info struct {
|
||||||
|
CallToWork struct {
|
||||||
|
} `json:"CallToWork"`
|
||||||
|
EarlyRet interface{} `json:"EarlyRet"`
|
||||||
|
ReturnedWork interface{} `json:"ReturnedWork"`
|
||||||
|
SchedInfo struct {
|
||||||
|
OpenWindows []string `json:"OpenWindows"`
|
||||||
|
Requests []struct {
|
||||||
|
Priority int `json:"Priority"`
|
||||||
|
SchedID string `json:"SchedId"`
|
||||||
|
Sector struct {
|
||||||
|
Miner int `json:"Miner"`
|
||||||
|
Number int `json:"Number"`
|
||||||
|
} `json:"Sector"`
|
||||||
|
TaskType string `json:"TaskType"`
|
||||||
|
} `json:"Requests"`
|
||||||
|
} `json:"SchedInfo"`
|
||||||
|
Waiting interface{} `json:"Waiting"`
|
||||||
|
}
|
||||||
|
|
||||||
|
tocheck := miner.StartPledge(ctx, 1, 0, nil)
|
||||||
|
var sn abi.SectorNumber
|
||||||
|
for n := range tocheck {
|
||||||
|
sn = n
|
||||||
|
}
|
||||||
|
// Keep checking till sector state is PC2, the request should get stuck as worker cannot process PC2
|
||||||
|
for {
|
||||||
|
st, err := miner.SectorsStatus(ctx, sn, false)
|
||||||
|
require.NoError(t, err)
|
||||||
|
if st.State == api.SectorState(sealing.PreCommit2) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dump current scheduler info
|
||||||
|
schedb, err := miner.SealingSchedDiag(ctx, false)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
j, err := json.MarshalIndent(&schedb, "", " ")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var b info
|
||||||
|
err = json.Unmarshal(j, &b)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var schedidb uuid.UUID
|
||||||
|
|
||||||
|
// cast scheduler info and get the request UUID. Call the SealingRemoveRequest()
|
||||||
|
require.Len(t, b.SchedInfo.Requests, 1)
|
||||||
|
require.Equal(t, "seal/v0/precommit/2", b.SchedInfo.Requests[0].TaskType)
|
||||||
|
|
||||||
|
schedidb, err = uuid.Parse(b.SchedInfo.Requests[0].SchedID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = miner.SealingRemoveRequest(ctx, schedidb)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Dump the schduler again and compare the UUID if a request is present
|
||||||
|
// If no request present then pass the test
|
||||||
|
scheda, err := miner.SealingSchedDiag(ctx, false)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
k, err := json.MarshalIndent(&scheda, "", " ")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var a info
|
||||||
|
err = json.Unmarshal(k, &a)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Len(t, a.SchedInfo.Requests, 0)
|
||||||
|
}
|
||||||
|
|
||||||
func TestWorkerName(t *testing.T) {
|
func TestWorkerName(t *testing.T) {
|
||||||
name := "thisstringisprobablynotahostnameihope"
|
name := "thisstringisprobablynotahostnameihope"
|
||||||
|
|
||||||
|
@ -462,6 +462,10 @@ func (sm *StorageMinerAPI) SealingAbort(ctx context.Context, call storiface.Call
|
|||||||
return sm.StorageMgr.Abort(ctx, call)
|
return sm.StorageMgr.Abort(ctx, call)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sm *StorageMinerAPI) SealingRemoveRequest(ctx context.Context, schedId uuid.UUID) error {
|
||||||
|
return sm.StorageMgr.RemoveSchedRequest(ctx, schedId)
|
||||||
|
}
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error {
|
func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error {
|
||||||
fi, err := os.Open(path)
|
fi, err := os.Open(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1171,6 +1171,10 @@ func (m *Manager) SchedDiag(ctx context.Context, doSched bool) (interface{}, err
|
|||||||
return i, nil
|
return i, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Manager) RemoveSchedRequest(ctx context.Context, schedId uuid.UUID) error {
|
||||||
|
return m.sched.RemoveRequest(ctx, schedId)
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Manager) Close(ctx context.Context) error {
|
func (m *Manager) Close(ctx context.Context) error {
|
||||||
m.windowPoStSched.schedClose()
|
m.windowPoStSched.schedClose()
|
||||||
m.winningPoStSched.schedClose()
|
m.winningPoStSched.schedClose()
|
||||||
|
@ -68,7 +68,8 @@ type Scheduler struct {
|
|||||||
|
|
||||||
workTracker *workTracker
|
workTracker *workTracker
|
||||||
|
|
||||||
info chan func(interface{})
|
info chan func(interface{})
|
||||||
|
rmRequest chan *rmRequest
|
||||||
|
|
||||||
closing chan struct{}
|
closing chan struct{}
|
||||||
closed chan struct{}
|
closed chan struct{}
|
||||||
@ -122,6 +123,7 @@ type WorkerRequest struct {
|
|||||||
TaskType sealtasks.TaskType
|
TaskType sealtasks.TaskType
|
||||||
Priority int // larger values more important
|
Priority int // larger values more important
|
||||||
Sel WorkerSelector
|
Sel WorkerSelector
|
||||||
|
SchedId uuid.UUID
|
||||||
|
|
||||||
prepare WorkerAction
|
prepare WorkerAction
|
||||||
work WorkerAction
|
work WorkerAction
|
||||||
@ -139,6 +141,11 @@ type workerResponse struct {
|
|||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type rmRequest struct {
|
||||||
|
id uuid.UUID
|
||||||
|
res chan error
|
||||||
|
}
|
||||||
|
|
||||||
func newScheduler(assigner string) (*Scheduler, error) {
|
func newScheduler(assigner string) (*Scheduler, error) {
|
||||||
var a Assigner
|
var a Assigner
|
||||||
switch assigner {
|
switch assigner {
|
||||||
@ -168,7 +175,8 @@ func newScheduler(assigner string) (*Scheduler, error) {
|
|||||||
prepared: map[uuid.UUID]trackedWork{},
|
prepared: map[uuid.UUID]trackedWork{},
|
||||||
},
|
},
|
||||||
|
|
||||||
info: make(chan func(interface{})),
|
info: make(chan func(interface{})),
|
||||||
|
rmRequest: make(chan *rmRequest),
|
||||||
|
|
||||||
closing: make(chan struct{}),
|
closing: make(chan struct{}),
|
||||||
closed: make(chan struct{}),
|
closed: make(chan struct{}),
|
||||||
@ -184,6 +192,7 @@ func (sh *Scheduler) Schedule(ctx context.Context, sector storiface.SectorRef, t
|
|||||||
TaskType: taskType,
|
TaskType: taskType,
|
||||||
Priority: getPriority(ctx),
|
Priority: getPriority(ctx),
|
||||||
Sel: sel,
|
Sel: sel,
|
||||||
|
SchedId: uuid.New(),
|
||||||
|
|
||||||
prepare: prepare,
|
prepare: prepare,
|
||||||
work: work,
|
work: work,
|
||||||
@ -228,6 +237,7 @@ type SchedDiagRequestInfo struct {
|
|||||||
Sector abi.SectorID
|
Sector abi.SectorID
|
||||||
TaskType sealtasks.TaskType
|
TaskType sealtasks.TaskType
|
||||||
Priority int
|
Priority int
|
||||||
|
SchedId uuid.UUID
|
||||||
}
|
}
|
||||||
|
|
||||||
type SchedDiagInfo struct {
|
type SchedDiagInfo struct {
|
||||||
@ -246,6 +256,9 @@ func (sh *Scheduler) runSched() {
|
|||||||
var toDisable []workerDisableReq
|
var toDisable []workerDisableReq
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
case rmreq := <-sh.rmRequest:
|
||||||
|
sh.removeRequest(rmreq)
|
||||||
|
doSched = true
|
||||||
case <-sh.workerChange:
|
case <-sh.workerChange:
|
||||||
doSched = true
|
doSched = true
|
||||||
case dreq := <-sh.workerDisable:
|
case dreq := <-sh.workerDisable:
|
||||||
@ -263,7 +276,6 @@ func (sh *Scheduler) runSched() {
|
|||||||
doSched = true
|
doSched = true
|
||||||
case ireq := <-sh.info:
|
case ireq := <-sh.info:
|
||||||
ireq(sh.diag())
|
ireq(sh.diag())
|
||||||
|
|
||||||
case <-iw:
|
case <-iw:
|
||||||
initialised = true
|
initialised = true
|
||||||
iw = nil
|
iw = nil
|
||||||
@ -332,6 +344,7 @@ func (sh *Scheduler) diag() SchedDiagInfo {
|
|||||||
Sector: task.Sector.ID,
|
Sector: task.Sector.ID,
|
||||||
TaskType: task.TaskType,
|
TaskType: task.TaskType,
|
||||||
Priority: task.Priority,
|
Priority: task.Priority,
|
||||||
|
SchedId: task.SchedId,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -381,6 +394,49 @@ func (sh *Scheduler) Info(ctx context.Context) (interface{}, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sh *Scheduler) removeRequest(rmrequest *rmRequest) {
|
||||||
|
|
||||||
|
if sh.SchedQueue.Len() < 0 {
|
||||||
|
rmrequest.res <- xerrors.New("No requests in the scheduler")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
queue := sh.SchedQueue
|
||||||
|
for i, r := range *queue {
|
||||||
|
if r.SchedId == rmrequest.id {
|
||||||
|
queue.Remove(i)
|
||||||
|
rmrequest.res <- nil
|
||||||
|
go r.respond(xerrors.Errorf("scheduling request removed"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rmrequest.res <- xerrors.New("No request with provided details found")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sh *Scheduler) RemoveRequest(ctx context.Context, schedId uuid.UUID) error {
|
||||||
|
ret := make(chan error, 1)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case sh.rmRequest <- &rmRequest{
|
||||||
|
id: schedId,
|
||||||
|
res: ret,
|
||||||
|
}:
|
||||||
|
case <-sh.closing:
|
||||||
|
return xerrors.New("closing")
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case resp := <-ret:
|
||||||
|
return resp
|
||||||
|
case <-sh.closing:
|
||||||
|
return xerrors.New("closing")
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (sh *Scheduler) Close(ctx context.Context) error {
|
func (sh *Scheduler) Close(ctx context.Context) error {
|
||||||
close(sh.closing)
|
close(sh.closing)
|
||||||
select {
|
select {
|
||||||
|
Loading…
Reference in New Issue
Block a user