worker: Support setting task types at runtime
This commit is contained in:
parent
4dfec7b232
commit
3672053ae9
@ -23,6 +23,9 @@ type WorkerAPI interface {
|
|||||||
|
|
||||||
storiface.WorkerCalls
|
storiface.WorkerCalls
|
||||||
|
|
||||||
|
DisableTask(ctx context.Context, tt sealtasks.TaskType) error
|
||||||
|
EnableTask(ctx context.Context, tt sealtasks.TaskType) error
|
||||||
|
|
||||||
// Storage / Other
|
// Storage / Other
|
||||||
Remove(ctx context.Context, sector abi.SectorID) error
|
Remove(ctx context.Context, sector abi.SectorID) error
|
||||||
|
|
||||||
|
@ -388,6 +388,9 @@ type WorkerStruct struct {
|
|||||||
ReadPiece func(context.Context, io.Writer, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (storiface.CallID, error) `perm:"admin"`
|
ReadPiece func(context.Context, io.Writer, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (storiface.CallID, error) `perm:"admin"`
|
||||||
Fetch func(context.Context, storage.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"`
|
Fetch func(context.Context, storage.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"`
|
||||||
|
|
||||||
|
DisableTask func(ctx context.Context, tt sealtasks.TaskType) error
|
||||||
|
EnableTask func(ctx context.Context, tt sealtasks.TaskType) error
|
||||||
|
|
||||||
Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
|
Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
|
||||||
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`
|
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`
|
||||||
|
|
||||||
@ -1573,6 +1576,14 @@ func (w *WorkerStruct) Fetch(ctx context.Context, id storage.SectorRef, fileType
|
|||||||
return w.Internal.Fetch(ctx, id, fileType, ptype, am)
|
return w.Internal.Fetch(ctx, id, fileType, ptype, am)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *WorkerStruct) DisableTask(ctx context.Context, tt sealtasks.TaskType) error {
|
||||||
|
return w.Internal.DisableTask(ctx, tt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WorkerStruct) EnableTask(ctx context.Context, tt sealtasks.TaskType) error {
|
||||||
|
return w.Internal.EnableTask(ctx, tt)
|
||||||
|
}
|
||||||
|
|
||||||
func (w *WorkerStruct) Remove(ctx context.Context, sector abi.SectorID) error {
|
func (w *WorkerStruct) Remove(ctx context.Context, sector abi.SectorID) error {
|
||||||
return w.Internal.Remove(ctx, sector)
|
return w.Internal.Remove(ctx, sector)
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,6 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
|
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
"github.com/urfave/cli/v2"
|
"github.com/urfave/cli/v2"
|
||||||
@ -10,6 +9,7 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
lcli "github.com/filecoin-project/lotus/cli"
|
lcli "github.com/filecoin-project/lotus/cli"
|
||||||
|
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
|
||||||
)
|
)
|
||||||
|
|
||||||
var infoCmd = &cli.Command{
|
var infoCmd = &cli.Command{
|
||||||
@ -62,14 +62,7 @@ var infoCmd = &cli.Command{
|
|||||||
fmt.Printf("Reserved memory: %s\n", types.SizeStr(types.NewInt(info.Resources.MemReserved)))
|
fmt.Printf("Reserved memory: %s\n", types.SizeStr(types.NewInt(info.Resources.MemReserved)))
|
||||||
|
|
||||||
fmt.Printf("Task types: ")
|
fmt.Printf("Task types: ")
|
||||||
tasks := make([]sealtasks.TaskType, 0, len(tt))
|
for _, t := range ttList(tt) {
|
||||||
for taskType := range tt {
|
|
||||||
tasks = append(tasks, taskType)
|
|
||||||
}
|
|
||||||
sort.Slice(tasks, func(i, j int) bool {
|
|
||||||
return tasks[i].Less(tasks[j])
|
|
||||||
})
|
|
||||||
for _, t := range tasks {
|
|
||||||
fmt.Printf("%s ", t.Short())
|
fmt.Printf("%s ", t.Short())
|
||||||
}
|
}
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
@ -101,3 +94,14 @@ var infoCmd = &cli.Command{
|
|||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ttList(tt map[sealtasks.TaskType]struct{}) []sealtasks.TaskType {
|
||||||
|
tasks := make([]sealtasks.TaskType, 0, len(tt))
|
||||||
|
for taskType := range tt {
|
||||||
|
tasks = append(tasks, taskType)
|
||||||
|
}
|
||||||
|
sort.Slice(tasks, func(i, j int) bool {
|
||||||
|
return tasks[i].Less(tasks[j])
|
||||||
|
})
|
||||||
|
return tasks
|
||||||
|
}
|
||||||
|
@ -59,6 +59,7 @@ func main() {
|
|||||||
storageCmd,
|
storageCmd,
|
||||||
setCmd,
|
setCmd,
|
||||||
waitQuietCmd,
|
waitQuietCmd,
|
||||||
|
tasksCmd,
|
||||||
}
|
}
|
||||||
|
|
||||||
app := &cli.App{
|
app := &cli.App{
|
||||||
|
83
cmd/lotus-seal-worker/tasks.go
Normal file
83
cmd/lotus-seal-worker/tasks.go
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/urfave/cli/v2"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
lcli "github.com/filecoin-project/lotus/cli"
|
||||||
|
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
|
||||||
|
)
|
||||||
|
|
||||||
|
var tasksCmd = &cli.Command{
|
||||||
|
Name: "tasks",
|
||||||
|
Usage: "Manage task processing",
|
||||||
|
Subcommands: []*cli.Command{
|
||||||
|
tasksEnableCmd,
|
||||||
|
tasksDisableCmd,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var allowSetting = map[sealtasks.TaskType]struct{}{
|
||||||
|
sealtasks.TTAddPiece: {},
|
||||||
|
sealtasks.TTPreCommit1: {},
|
||||||
|
sealtasks.TTPreCommit2: {},
|
||||||
|
sealtasks.TTCommit2: {},
|
||||||
|
sealtasks.TTCommit1: {},
|
||||||
|
sealtasks.TTUnseal: {},
|
||||||
|
}
|
||||||
|
|
||||||
|
var settableStr = func() string {
|
||||||
|
var s []string
|
||||||
|
for _, tt := range ttList(allowSetting) {
|
||||||
|
s = append(s, tt.Short())
|
||||||
|
}
|
||||||
|
return strings.Join(s, "|")
|
||||||
|
}()
|
||||||
|
|
||||||
|
var tasksEnableCmd = &cli.Command{
|
||||||
|
Name: "enable",
|
||||||
|
Usage: "Enable a task type",
|
||||||
|
ArgsUsage: "[" + settableStr + "]",
|
||||||
|
Action: taskAction(api.WorkerAPI.EnableTask),
|
||||||
|
}
|
||||||
|
|
||||||
|
var tasksDisableCmd = &cli.Command{
|
||||||
|
Name: "disable",
|
||||||
|
Usage: "Disable a task type",
|
||||||
|
ArgsUsage: "[" + settableStr + "]",
|
||||||
|
Action: taskAction(api.WorkerAPI.DisableTask),
|
||||||
|
}
|
||||||
|
|
||||||
|
func taskAction(tf func(a api.WorkerAPI, ctx context.Context, tt sealtasks.TaskType) error) func(cctx *cli.Context) error {
|
||||||
|
return func(cctx *cli.Context) error {
|
||||||
|
if cctx.NArg() != 0 {
|
||||||
|
return xerrors.Errorf("expected 1 argument")
|
||||||
|
}
|
||||||
|
|
||||||
|
var tt sealtasks.TaskType
|
||||||
|
for taskType := range allowSetting {
|
||||||
|
if taskType.Short() == cctx.Args().First() {
|
||||||
|
tt = taskType
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if tt == "" {
|
||||||
|
return xerrors.Errorf("unknown task type '%s'", cctx.Args().First())
|
||||||
|
}
|
||||||
|
|
||||||
|
api, closer, err := lcli.GetWorkerAPI(cctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer closer()
|
||||||
|
|
||||||
|
ctx := lcli.ReqContext(cctx)
|
||||||
|
|
||||||
|
return tf(api, ctx, tt)
|
||||||
|
}
|
||||||
|
}
|
20
extern/sector-storage/worker_local.go
vendored
20
extern/sector-storage/worker_local.go
vendored
@ -49,6 +49,7 @@ type LocalWorker struct {
|
|||||||
ct *workerCallTracker
|
ct *workerCallTracker
|
||||||
acceptTasks map[sealtasks.TaskType]struct{}
|
acceptTasks map[sealtasks.TaskType]struct{}
|
||||||
running sync.WaitGroup
|
running sync.WaitGroup
|
||||||
|
taskLk sync.Mutex
|
||||||
|
|
||||||
session uuid.UUID
|
session uuid.UUID
|
||||||
testDisable int64
|
testDisable int64
|
||||||
@ -457,9 +458,28 @@ func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector st
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) {
|
func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) {
|
||||||
|
l.taskLk.Lock()
|
||||||
|
defer l.taskLk.Unlock()
|
||||||
|
|
||||||
return l.acceptTasks, nil
|
return l.acceptTasks, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *LocalWorker) DisableTask(ctx context.Context, tt sealtasks.TaskType) error {
|
||||||
|
l.taskLk.Lock()
|
||||||
|
defer l.taskLk.Unlock()
|
||||||
|
|
||||||
|
delete(l.acceptTasks, tt)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *LocalWorker) EnableTask(ctx context.Context, tt sealtasks.TaskType) error {
|
||||||
|
l.taskLk.Lock()
|
||||||
|
defer l.taskLk.Unlock()
|
||||||
|
|
||||||
|
l.acceptTasks[tt] = struct{}{}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) {
|
func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) {
|
||||||
return l.localStore.Local(ctx)
|
return l.localStore.Local(ctx)
|
||||||
}
|
}
|
||||||
|
4
go.sum
4
go.sum
@ -698,6 +698,7 @@ github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0
|
|||||||
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
|
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
|
||||||
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
|
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
|
||||||
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||||
|
github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA=
|
||||||
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||||
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
|
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
|
||||||
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
|
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
|
||||||
@ -1210,6 +1211,7 @@ github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJE
|
|||||||
github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
|
github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
|
||||||
github.com/nikkolasg/hexjson v0.0.0-20181101101858-78e39397e00c h1:5bFTChQxSKNwy8ALwOebjekYExl9HTT9urdawqC95tA=
|
github.com/nikkolasg/hexjson v0.0.0-20181101101858-78e39397e00c h1:5bFTChQxSKNwy8ALwOebjekYExl9HTT9urdawqC95tA=
|
||||||
github.com/nikkolasg/hexjson v0.0.0-20181101101858-78e39397e00c/go.mod h1:7qN3Y0BvzRUf4LofcoJplQL10lsFDb4PYlePTVwrP28=
|
github.com/nikkolasg/hexjson v0.0.0-20181101101858-78e39397e00c/go.mod h1:7qN3Y0BvzRUf4LofcoJplQL10lsFDb4PYlePTVwrP28=
|
||||||
|
github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229 h1:E2B8qYyeSgv5MXpmzZXRNp8IAQ4vjxIjhpAf5hv/tAg=
|
||||||
github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E=
|
github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E=
|
||||||
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
|
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
|
||||||
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
||||||
@ -1423,7 +1425,9 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX
|
|||||||
github.com/urfave/cli/v2 v2.0.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
|
github.com/urfave/cli/v2 v2.0.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
|
||||||
github.com/urfave/cli/v2 v2.2.0 h1:JTTnM6wKzdA0Jqodd966MVj4vWbbquZykeX1sKbe2C4=
|
github.com/urfave/cli/v2 v2.2.0 h1:JTTnM6wKzdA0Jqodd966MVj4vWbbquZykeX1sKbe2C4=
|
||||||
github.com/urfave/cli/v2 v2.2.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
|
github.com/urfave/cli/v2 v2.2.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
|
||||||
|
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
|
||||||
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
|
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
|
||||||
|
github.com/valyala/fasttemplate v1.0.1 h1:tY9CJiPnMXf1ERmG2EyK7gNUd+c6RKGD0IfU8WdUSz8=
|
||||||
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
|
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
|
||||||
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
|
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
|
||||||
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
|
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
|
||||||
|
Loading…
Reference in New Issue
Block a user