Merge branch 'LexLuthr-feat/SchedRemoveRequest'
This commit is contained in:
commit
9b4cca9fd0
@ -150,6 +150,8 @@ type StorageMiner interface {
|
||||
// SealingSchedDiag dumps internal sealing scheduler state
|
||||
SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error) //perm:admin
|
||||
SealingAbort(ctx context.Context, call storiface.CallID) error //perm:admin
|
||||
//SealingSchedRemove removes a request from sealing pipeline
|
||||
SealingRemoveRequest(ctx context.Context, schedId uuid.UUID) error //perm:admin
|
||||
|
||||
// paths.SectorIndex
|
||||
StorageAttach(context.Context, storiface.StorageInfo, fsutil.FsStat) error //perm:admin
|
||||
|
@ -802,6 +802,8 @@ type StorageMinerStruct struct {
|
||||
|
||||
SealingAbort func(p0 context.Context, p1 storiface.CallID) error `perm:"admin"`
|
||||
|
||||
SealingRemoveRequest func(p0 context.Context, p1 uuid.UUID) error `perm:"admin"`
|
||||
|
||||
SealingSchedDiag func(p0 context.Context, p1 bool) (interface{}, error) `perm:"admin"`
|
||||
|
||||
SectorAbortUpgrade func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"`
|
||||
@ -4761,6 +4763,17 @@ func (s *StorageMinerStub) SealingAbort(p0 context.Context, p1 storiface.CallID)
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) SealingRemoveRequest(p0 context.Context, p1 uuid.UUID) error {
|
||||
if s.Internal.SealingRemoveRequest == nil {
|
||||
return ErrNotSupported
|
||||
}
|
||||
return s.Internal.SealingRemoveRequest(p0, p1)
|
||||
}
|
||||
|
||||
func (s *StorageMinerStub) SealingRemoveRequest(p0 context.Context, p1 uuid.UUID) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) SealingSchedDiag(p0 context.Context, p1 bool) (interface{}, error) {
|
||||
if s.Internal.SealingSchedDiag == nil {
|
||||
return nil, ErrNotSupported
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -365,6 +365,12 @@ var sealingAbortCmd = &cli.Command{
|
||||
Name: "abort",
|
||||
Usage: "Abort a running job",
|
||||
ArgsUsage: "[callid]",
|
||||
Flags: []cli.Flag{
|
||||
&cli.BoolFlag{
|
||||
Name: "sched",
|
||||
Usage: "Specifies that the argument is UUID of the request to be removed from scheduler",
|
||||
},
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
if cctx.Args().Len() != 1 {
|
||||
return xerrors.Errorf("expected 1 argument")
|
||||
@ -378,6 +384,14 @@ var sealingAbortCmd = &cli.Command{
|
||||
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
if cctx.Bool("sched") {
|
||||
err = nodeApi.SealingRemoveRequest(ctx, uuid.Must(uuid.Parse(cctx.Args().First())))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("Failed to removed the request with UUID %s: %w", cctx.Args().First(), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
jobs, err := nodeApi.WorkerJobs(ctx)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting worker jobs: %w", err)
|
||||
|
@ -128,6 +128,7 @@
|
||||
* [RuntimeSubsystems](#RuntimeSubsystems)
|
||||
* [Sealing](#Sealing)
|
||||
* [SealingAbort](#SealingAbort)
|
||||
* [SealingRemoveRequest](#SealingRemoveRequest)
|
||||
* [SealingSchedDiag](#SealingSchedDiag)
|
||||
* [Sector](#Sector)
|
||||
* [SectorAbortUpgrade](#SectorAbortUpgrade)
|
||||
@ -2749,6 +2750,21 @@ Inputs:
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### SealingRemoveRequest
|
||||
SealingSchedRemove removes a request from sealing pipeline
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
"07070707-0707-0707-0707-070707070707"
|
||||
]
|
||||
```
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### SealingSchedDiag
|
||||
SealingSchedDiag dumps internal sealing scheduler state
|
||||
|
||||
|
@ -2342,7 +2342,7 @@ USAGE:
|
||||
lotus-miner sealing abort [command options] [callid]
|
||||
|
||||
OPTIONS:
|
||||
--help, -h show help (default: false)
|
||||
--sched Specifies that the argument is UUID of the request to be removed from scheduler (default: false)
|
||||
|
||||
```
|
||||
|
||||
|
@ -2,11 +2,13 @@ package itests
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/xerrors"
|
||||
@ -14,6 +16,7 @@ import (
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/itests/kit"
|
||||
@ -21,6 +24,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/impl"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
sealing "github.com/filecoin-project/lotus/storage/pipeline"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
"github.com/filecoin-project/lotus/storage/wdpost"
|
||||
@ -402,6 +406,90 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
|
||||
require.Len(t, lastPending, 0)
|
||||
}
|
||||
|
||||
func TestSchedulerRemoveRequest(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
_, miner, worker, ens := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
|
||||
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTDataCid, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit2, sealtasks.TTUnseal})) // no mock proofs
|
||||
|
||||
ens.InterconnectAll().BeginMining(50 * time.Millisecond)
|
||||
|
||||
e, err := worker.Enabled(ctx)
|
||||
require.NoError(t, err)
|
||||
require.True(t, e)
|
||||
|
||||
type info struct {
|
||||
CallToWork struct {
|
||||
} `json:"CallToWork"`
|
||||
EarlyRet interface{} `json:"EarlyRet"`
|
||||
ReturnedWork interface{} `json:"ReturnedWork"`
|
||||
SchedInfo struct {
|
||||
OpenWindows []string `json:"OpenWindows"`
|
||||
Requests []struct {
|
||||
Priority int `json:"Priority"`
|
||||
SchedID string `json:"SchedId"`
|
||||
Sector struct {
|
||||
Miner int `json:"Miner"`
|
||||
Number int `json:"Number"`
|
||||
} `json:"Sector"`
|
||||
TaskType string `json:"TaskType"`
|
||||
} `json:"Requests"`
|
||||
} `json:"SchedInfo"`
|
||||
Waiting interface{} `json:"Waiting"`
|
||||
}
|
||||
|
||||
tocheck := miner.StartPledge(ctx, 1, 0, nil)
|
||||
var sn abi.SectorNumber
|
||||
for n := range tocheck {
|
||||
sn = n
|
||||
}
|
||||
// Keep checking till sector state is PC2, the request should get stuck as worker cannot process PC2
|
||||
for {
|
||||
st, err := miner.SectorsStatus(ctx, sn, false)
|
||||
require.NoError(t, err)
|
||||
if st.State == api.SectorState(sealing.PreCommit2) {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
// Dump current scheduler info
|
||||
schedb, err := miner.SealingSchedDiag(ctx, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
j, err := json.MarshalIndent(&schedb, "", " ")
|
||||
require.NoError(t, err)
|
||||
|
||||
var b info
|
||||
err = json.Unmarshal(j, &b)
|
||||
require.NoError(t, err)
|
||||
|
||||
var schedidb uuid.UUID
|
||||
|
||||
// cast scheduler info and get the request UUID. Call the SealingRemoveRequest()
|
||||
require.Len(t, b.SchedInfo.Requests, 1)
|
||||
require.Equal(t, "seal/v0/precommit/2", b.SchedInfo.Requests[0].TaskType)
|
||||
|
||||
schedidb, err = uuid.Parse(b.SchedInfo.Requests[0].SchedID)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = miner.SealingRemoveRequest(ctx, schedidb)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Dump the schduler again and compare the UUID if a request is present
|
||||
// If no request present then pass the test
|
||||
scheda, err := miner.SealingSchedDiag(ctx, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
k, err := json.MarshalIndent(&scheda, "", " ")
|
||||
require.NoError(t, err)
|
||||
|
||||
var a info
|
||||
err = json.Unmarshal(k, &a)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, a.SchedInfo.Requests, 0)
|
||||
}
|
||||
|
||||
func TestWorkerName(t *testing.T) {
|
||||
name := "thisstringisprobablynotahostnameihope"
|
||||
|
||||
|
@ -462,6 +462,10 @@ func (sm *StorageMinerAPI) SealingAbort(ctx context.Context, call storiface.Call
|
||||
return sm.StorageMgr.Abort(ctx, call)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) SealingRemoveRequest(ctx context.Context, schedId uuid.UUID) error {
|
||||
return sm.StorageMgr.RemoveSchedRequest(ctx, schedId)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error {
|
||||
fi, err := os.Open(path)
|
||||
if err != nil {
|
||||
|
@ -1171,6 +1171,10 @@ func (m *Manager) SchedDiag(ctx context.Context, doSched bool) (interface{}, err
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func (m *Manager) RemoveSchedRequest(ctx context.Context, schedId uuid.UUID) error {
|
||||
return m.sched.RemoveRequest(ctx, schedId)
|
||||
}
|
||||
|
||||
func (m *Manager) Close(ctx context.Context) error {
|
||||
m.windowPoStSched.schedClose()
|
||||
m.winningPoStSched.schedClose()
|
||||
|
@ -68,7 +68,8 @@ type Scheduler struct {
|
||||
|
||||
workTracker *workTracker
|
||||
|
||||
info chan func(interface{})
|
||||
info chan func(interface{})
|
||||
rmRequest chan *rmRequest
|
||||
|
||||
closing chan struct{}
|
||||
closed chan struct{}
|
||||
@ -122,6 +123,7 @@ type WorkerRequest struct {
|
||||
TaskType sealtasks.TaskType
|
||||
Priority int // larger values more important
|
||||
Sel WorkerSelector
|
||||
SchedId uuid.UUID
|
||||
|
||||
prepare WorkerAction
|
||||
work WorkerAction
|
||||
@ -139,6 +141,11 @@ type workerResponse struct {
|
||||
err error
|
||||
}
|
||||
|
||||
type rmRequest struct {
|
||||
id uuid.UUID
|
||||
res chan error
|
||||
}
|
||||
|
||||
func newScheduler(assigner string) (*Scheduler, error) {
|
||||
var a Assigner
|
||||
switch assigner {
|
||||
@ -168,7 +175,8 @@ func newScheduler(assigner string) (*Scheduler, error) {
|
||||
prepared: map[uuid.UUID]trackedWork{},
|
||||
},
|
||||
|
||||
info: make(chan func(interface{})),
|
||||
info: make(chan func(interface{})),
|
||||
rmRequest: make(chan *rmRequest),
|
||||
|
||||
closing: make(chan struct{}),
|
||||
closed: make(chan struct{}),
|
||||
@ -184,6 +192,7 @@ func (sh *Scheduler) Schedule(ctx context.Context, sector storiface.SectorRef, t
|
||||
TaskType: taskType,
|
||||
Priority: getPriority(ctx),
|
||||
Sel: sel,
|
||||
SchedId: uuid.New(),
|
||||
|
||||
prepare: prepare,
|
||||
work: work,
|
||||
@ -228,6 +237,7 @@ type SchedDiagRequestInfo struct {
|
||||
Sector abi.SectorID
|
||||
TaskType sealtasks.TaskType
|
||||
Priority int
|
||||
SchedId uuid.UUID
|
||||
}
|
||||
|
||||
type SchedDiagInfo struct {
|
||||
@ -246,6 +256,9 @@ func (sh *Scheduler) runSched() {
|
||||
var toDisable []workerDisableReq
|
||||
|
||||
select {
|
||||
case rmreq := <-sh.rmRequest:
|
||||
sh.removeRequest(rmreq)
|
||||
doSched = true
|
||||
case <-sh.workerChange:
|
||||
doSched = true
|
||||
case dreq := <-sh.workerDisable:
|
||||
@ -263,7 +276,6 @@ func (sh *Scheduler) runSched() {
|
||||
doSched = true
|
||||
case ireq := <-sh.info:
|
||||
ireq(sh.diag())
|
||||
|
||||
case <-iw:
|
||||
initialised = true
|
||||
iw = nil
|
||||
@ -332,6 +344,7 @@ func (sh *Scheduler) diag() SchedDiagInfo {
|
||||
Sector: task.Sector.ID,
|
||||
TaskType: task.TaskType,
|
||||
Priority: task.Priority,
|
||||
SchedId: task.SchedId,
|
||||
})
|
||||
}
|
||||
|
||||
@ -381,6 +394,49 @@ func (sh *Scheduler) Info(ctx context.Context) (interface{}, error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (sh *Scheduler) removeRequest(rmrequest *rmRequest) {
|
||||
|
||||
if sh.SchedQueue.Len() < 0 {
|
||||
rmrequest.res <- xerrors.New("No requests in the scheduler")
|
||||
return
|
||||
}
|
||||
|
||||
queue := sh.SchedQueue
|
||||
for i, r := range *queue {
|
||||
if r.SchedId == rmrequest.id {
|
||||
queue.Remove(i)
|
||||
rmrequest.res <- nil
|
||||
go r.respond(xerrors.Errorf("scheduling request removed"))
|
||||
return
|
||||
}
|
||||
}
|
||||
rmrequest.res <- xerrors.New("No request with provided details found")
|
||||
}
|
||||
|
||||
func (sh *Scheduler) RemoveRequest(ctx context.Context, schedId uuid.UUID) error {
|
||||
ret := make(chan error, 1)
|
||||
|
||||
select {
|
||||
case sh.rmRequest <- &rmRequest{
|
||||
id: schedId,
|
||||
res: ret,
|
||||
}:
|
||||
case <-sh.closing:
|
||||
return xerrors.New("closing")
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
select {
|
||||
case resp := <-ret:
|
||||
return resp
|
||||
case <-sh.closing:
|
||||
return xerrors.New("closing")
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (sh *Scheduler) Close(ctx context.Context) error {
|
||||
close(sh.closing)
|
||||
select {
|
||||
|
Loading…
Reference in New Issue
Block a user