diff --git a/api/api_storage.go b/api/api_storage.go index 4379031b0..3d019bb9e 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -150,6 +150,8 @@ type StorageMiner interface { // SealingSchedDiag dumps internal sealing scheduler state SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error) //perm:admin SealingAbort(ctx context.Context, call storiface.CallID) error //perm:admin + //SealingSchedRemove removes a request from sealing pipeline + SealingRemoveRequest(ctx context.Context, schedId uuid.UUID) error //perm:admin // paths.SectorIndex StorageAttach(context.Context, storiface.StorageInfo, fsutil.FsStat) error //perm:admin diff --git a/api/proxy_gen.go b/api/proxy_gen.go index f1e520b7e..506e615f2 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -802,6 +802,8 @@ type StorageMinerStruct struct { SealingAbort func(p0 context.Context, p1 storiface.CallID) error `perm:"admin"` + SealingRemoveRequest func(p0 context.Context, p1 uuid.UUID) error `perm:"admin"` + SealingSchedDiag func(p0 context.Context, p1 bool) (interface{}, error) `perm:"admin"` SectorAbortUpgrade func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"` @@ -4761,6 +4763,17 @@ func (s *StorageMinerStub) SealingAbort(p0 context.Context, p1 storiface.CallID) return ErrNotSupported } +func (s *StorageMinerStruct) SealingRemoveRequest(p0 context.Context, p1 uuid.UUID) error { + if s.Internal.SealingRemoveRequest == nil { + return ErrNotSupported + } + return s.Internal.SealingRemoveRequest(p0, p1) +} + +func (s *StorageMinerStub) SealingRemoveRequest(p0 context.Context, p1 uuid.UUID) error { + return ErrNotSupported +} + func (s *StorageMinerStruct) SealingSchedDiag(p0 context.Context, p1 bool) (interface{}, error) { if s.Internal.SealingSchedDiag == nil { return nil, ErrNotSupported diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 1e2bf6bd3..5f798d942 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/gateway.json.gz b/build/openrpc/gateway.json.gz index c8489039b..79d54ee39 100644 Binary files a/build/openrpc/gateway.json.gz and b/build/openrpc/gateway.json.gz differ diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index f1f95854c..158286ef4 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index c2b5bdedd..e1cf8100d 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/cmd/lotus-miner/sealing.go b/cmd/lotus-miner/sealing.go index d334f657f..970f54a55 100644 --- a/cmd/lotus-miner/sealing.go +++ b/cmd/lotus-miner/sealing.go @@ -365,6 +365,12 @@ var sealingAbortCmd = &cli.Command{ Name: "abort", Usage: "Abort a running job", ArgsUsage: "[callid]", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "sched", + Usage: "Specifies that the argument is UUID of the request to be removed from scheduler", + }, + }, Action: func(cctx *cli.Context) error { if cctx.Args().Len() != 1 { return xerrors.Errorf("expected 1 argument") @@ -378,6 +384,14 @@ var sealingAbortCmd = &cli.Command{ ctx := lcli.ReqContext(cctx) + if cctx.Bool("sched") { + err = nodeApi.SealingRemoveRequest(ctx, uuid.Must(uuid.Parse(cctx.Args().First()))) + if err != nil { + return xerrors.Errorf("Failed to removed the request with UUID %s: %w", cctx.Args().First(), err) + } + return nil + } + jobs, err := nodeApi.WorkerJobs(ctx) if err != nil { return xerrors.Errorf("getting worker jobs: %w", err) diff --git a/documentation/en/api-v0-methods-miner.md b/documentation/en/api-v0-methods-miner.md index e24520b20..4507e03b8 100644 --- a/documentation/en/api-v0-methods-miner.md +++ b/documentation/en/api-v0-methods-miner.md @@ -128,6 +128,7 @@ * [RuntimeSubsystems](#RuntimeSubsystems) * [Sealing](#Sealing) * [SealingAbort](#SealingAbort) + * [SealingRemoveRequest](#SealingRemoveRequest) * [SealingSchedDiag](#SealingSchedDiag) * [Sector](#Sector) * [SectorAbortUpgrade](#SectorAbortUpgrade) @@ -2749,6 +2750,21 @@ Inputs: Response: `{}` +### SealingRemoveRequest +SealingSchedRemove removes a request from sealing pipeline + + +Perms: admin + +Inputs: +```json +[ + "07070707-0707-0707-0707-070707070707" +] +``` + +Response: `{}` + ### SealingSchedDiag SealingSchedDiag dumps internal sealing scheduler state diff --git a/documentation/en/cli-lotus-miner.md b/documentation/en/cli-lotus-miner.md index 883c66ae8..cd78e9a02 100644 --- a/documentation/en/cli-lotus-miner.md +++ b/documentation/en/cli-lotus-miner.md @@ -2342,7 +2342,7 @@ USAGE: lotus-miner sealing abort [command options] [callid] OPTIONS: - --help, -h show help (default: false) + --sched Specifies that the argument is UUID of the request to be removed from scheduler (default: false) ``` diff --git a/itests/worker_test.go b/itests/worker_test.go index 19bb763fc..5b26f481c 100644 --- a/itests/worker_test.go +++ b/itests/worker_test.go @@ -2,11 +2,13 @@ package itests import ( "context" + "encoding/json" "strings" "sync/atomic" "testing" "time" + "github.com/google/uuid" logging "github.com/ipfs/go-log/v2" "github.com/stretchr/testify/require" "golang.org/x/xerrors" @@ -14,6 +16,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/itests/kit" @@ -21,6 +24,7 @@ import ( "github.com/filecoin-project/lotus/node/impl" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/storage/paths" + sealing "github.com/filecoin-project/lotus/storage/pipeline" "github.com/filecoin-project/lotus/storage/sealer/sealtasks" "github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/wdpost" @@ -402,6 +406,90 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) { require.Len(t, lastPending, 0) } +func TestSchedulerRemoveRequest(t *testing.T) { + ctx := context.Background() + _, miner, worker, ens := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true), + kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTDataCid, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit2, sealtasks.TTUnseal})) // no mock proofs + + ens.InterconnectAll().BeginMining(50 * time.Millisecond) + + e, err := worker.Enabled(ctx) + require.NoError(t, err) + require.True(t, e) + + type info struct { + CallToWork struct { + } `json:"CallToWork"` + EarlyRet interface{} `json:"EarlyRet"` + ReturnedWork interface{} `json:"ReturnedWork"` + SchedInfo struct { + OpenWindows []string `json:"OpenWindows"` + Requests []struct { + Priority int `json:"Priority"` + SchedID string `json:"SchedId"` + Sector struct { + Miner int `json:"Miner"` + Number int `json:"Number"` + } `json:"Sector"` + TaskType string `json:"TaskType"` + } `json:"Requests"` + } `json:"SchedInfo"` + Waiting interface{} `json:"Waiting"` + } + + tocheck := miner.StartPledge(ctx, 1, 0, nil) + var sn abi.SectorNumber + for n := range tocheck { + sn = n + } + // Keep checking till sector state is PC2, the request should get stuck as worker cannot process PC2 + for { + st, err := miner.SectorsStatus(ctx, sn, false) + require.NoError(t, err) + if st.State == api.SectorState(sealing.PreCommit2) { + break + } + time.Sleep(time.Second) + } + + // Dump current scheduler info + schedb, err := miner.SealingSchedDiag(ctx, false) + require.NoError(t, err) + + j, err := json.MarshalIndent(&schedb, "", " ") + require.NoError(t, err) + + var b info + err = json.Unmarshal(j, &b) + require.NoError(t, err) + + var schedidb uuid.UUID + + // cast scheduler info and get the request UUID. Call the SealingRemoveRequest() + require.Len(t, b.SchedInfo.Requests, 1) + require.Equal(t, "seal/v0/precommit/2", b.SchedInfo.Requests[0].TaskType) + + schedidb, err = uuid.Parse(b.SchedInfo.Requests[0].SchedID) + require.NoError(t, err) + + err = miner.SealingRemoveRequest(ctx, schedidb) + require.NoError(t, err) + + // Dump the schduler again and compare the UUID if a request is present + // If no request present then pass the test + scheda, err := miner.SealingSchedDiag(ctx, false) + require.NoError(t, err) + + k, err := json.MarshalIndent(&scheda, "", " ") + require.NoError(t, err) + + var a info + err = json.Unmarshal(k, &a) + require.NoError(t, err) + + require.Len(t, a.SchedInfo.Requests, 0) +} + func TestWorkerName(t *testing.T) { name := "thisstringisprobablynotahostnameihope" diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 74b1dc776..7f595af08 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -462,6 +462,10 @@ func (sm *StorageMinerAPI) SealingAbort(ctx context.Context, call storiface.Call return sm.StorageMgr.Abort(ctx, call) } +func (sm *StorageMinerAPI) SealingRemoveRequest(ctx context.Context, schedId uuid.UUID) error { + return sm.StorageMgr.RemoveSchedRequest(ctx, schedId) +} + func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error { fi, err := os.Open(path) if err != nil { diff --git a/storage/sealer/manager.go b/storage/sealer/manager.go index 4335fe6e4..68eae077c 100644 --- a/storage/sealer/manager.go +++ b/storage/sealer/manager.go @@ -1171,6 +1171,10 @@ func (m *Manager) SchedDiag(ctx context.Context, doSched bool) (interface{}, err return i, nil } +func (m *Manager) RemoveSchedRequest(ctx context.Context, schedId uuid.UUID) error { + return m.sched.RemoveRequest(ctx, schedId) +} + func (m *Manager) Close(ctx context.Context) error { m.windowPoStSched.schedClose() m.winningPoStSched.schedClose() diff --git a/storage/sealer/sched.go b/storage/sealer/sched.go index 323def986..335bb1249 100644 --- a/storage/sealer/sched.go +++ b/storage/sealer/sched.go @@ -68,7 +68,8 @@ type Scheduler struct { workTracker *workTracker - info chan func(interface{}) + info chan func(interface{}) + rmRequest chan *rmRequest closing chan struct{} closed chan struct{} @@ -122,6 +123,7 @@ type WorkerRequest struct { TaskType sealtasks.TaskType Priority int // larger values more important Sel WorkerSelector + SchedId uuid.UUID prepare WorkerAction work WorkerAction @@ -139,6 +141,11 @@ type workerResponse struct { err error } +type rmRequest struct { + id uuid.UUID + res chan error +} + func newScheduler(assigner string) (*Scheduler, error) { var a Assigner switch assigner { @@ -168,7 +175,8 @@ func newScheduler(assigner string) (*Scheduler, error) { prepared: map[uuid.UUID]trackedWork{}, }, - info: make(chan func(interface{})), + info: make(chan func(interface{})), + rmRequest: make(chan *rmRequest), closing: make(chan struct{}), closed: make(chan struct{}), @@ -184,6 +192,7 @@ func (sh *Scheduler) Schedule(ctx context.Context, sector storiface.SectorRef, t TaskType: taskType, Priority: getPriority(ctx), Sel: sel, + SchedId: uuid.New(), prepare: prepare, work: work, @@ -228,6 +237,7 @@ type SchedDiagRequestInfo struct { Sector abi.SectorID TaskType sealtasks.TaskType Priority int + SchedId uuid.UUID } type SchedDiagInfo struct { @@ -246,6 +256,9 @@ func (sh *Scheduler) runSched() { var toDisable []workerDisableReq select { + case rmreq := <-sh.rmRequest: + sh.removeRequest(rmreq) + doSched = true case <-sh.workerChange: doSched = true case dreq := <-sh.workerDisable: @@ -263,7 +276,6 @@ func (sh *Scheduler) runSched() { doSched = true case ireq := <-sh.info: ireq(sh.diag()) - case <-iw: initialised = true iw = nil @@ -332,6 +344,7 @@ func (sh *Scheduler) diag() SchedDiagInfo { Sector: task.Sector.ID, TaskType: task.TaskType, Priority: task.Priority, + SchedId: task.SchedId, }) } @@ -381,6 +394,49 @@ func (sh *Scheduler) Info(ctx context.Context) (interface{}, error) { } } +func (sh *Scheduler) removeRequest(rmrequest *rmRequest) { + + if sh.SchedQueue.Len() < 0 { + rmrequest.res <- xerrors.New("No requests in the scheduler") + return + } + + queue := sh.SchedQueue + for i, r := range *queue { + if r.SchedId == rmrequest.id { + queue.Remove(i) + rmrequest.res <- nil + go r.respond(xerrors.Errorf("scheduling request removed")) + return + } + } + rmrequest.res <- xerrors.New("No request with provided details found") +} + +func (sh *Scheduler) RemoveRequest(ctx context.Context, schedId uuid.UUID) error { + ret := make(chan error, 1) + + select { + case sh.rmRequest <- &rmRequest{ + id: schedId, + res: ret, + }: + case <-sh.closing: + return xerrors.New("closing") + case <-ctx.Done(): + return ctx.Err() + } + + select { + case resp := <-ret: + return resp + case <-sh.closing: + return xerrors.New("closing") + case <-ctx.Done(): + return ctx.Err() + } +} + func (sh *Scheduler) Close(ctx context.Context) error { close(sh.closing) select {