From 3672053ae984372ba408f741e557de88cb6b52cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 26 Nov 2020 17:33:22 +0100 Subject: [PATCH] worker: Support setting task types at runtime --- api/api_worker.go | 3 + api/apistruct/struct.go | 11 ++++ cmd/lotus-seal-worker/info.go | 22 ++++--- cmd/lotus-seal-worker/main.go | 1 + cmd/lotus-seal-worker/tasks.go | 83 +++++++++++++++++++++++++++ extern/sector-storage/worker_local.go | 20 +++++++ go.sum | 4 ++ 7 files changed, 135 insertions(+), 9 deletions(-) create mode 100644 cmd/lotus-seal-worker/tasks.go diff --git a/api/api_worker.go b/api/api_worker.go index 805b23bc1..812a2eb3a 100644 --- a/api/api_worker.go +++ b/api/api_worker.go @@ -23,6 +23,9 @@ type WorkerAPI interface { storiface.WorkerCalls + DisableTask(ctx context.Context, tt sealtasks.TaskType) error + EnableTask(ctx context.Context, tt sealtasks.TaskType) error + // Storage / Other Remove(ctx context.Context, sector abi.SectorID) error diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index e57052f87..bc7c5d9ad 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -388,6 +388,9 @@ type WorkerStruct struct { ReadPiece func(context.Context, io.Writer, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (storiface.CallID, error) `perm:"admin"` Fetch func(context.Context, storage.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"` + DisableTask func(ctx context.Context, tt sealtasks.TaskType) error + EnableTask func(ctx context.Context, tt sealtasks.TaskType) error + Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"` StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"` @@ -1573,6 +1576,14 @@ func (w *WorkerStruct) Fetch(ctx context.Context, id storage.SectorRef, fileType return w.Internal.Fetch(ctx, id, fileType, ptype, am) } +func (w *WorkerStruct) DisableTask(ctx context.Context, tt sealtasks.TaskType) error { + return w.Internal.DisableTask(ctx, tt) +} + +func (w *WorkerStruct) EnableTask(ctx context.Context, tt sealtasks.TaskType) error { + return w.Internal.EnableTask(ctx, tt) +} + func (w *WorkerStruct) Remove(ctx context.Context, sector abi.SectorID) error { return w.Internal.Remove(ctx, sector) } diff --git a/cmd/lotus-seal-worker/info.go b/cmd/lotus-seal-worker/info.go index 3ec2a4309..6d5c2d64e 100644 --- a/cmd/lotus-seal-worker/info.go +++ b/cmd/lotus-seal-worker/info.go @@ -2,7 +2,6 @@ package main import ( "fmt" - "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "sort" "github.com/urfave/cli/v2" @@ -10,6 +9,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" lcli "github.com/filecoin-project/lotus/cli" + "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" ) var infoCmd = &cli.Command{ @@ -62,14 +62,7 @@ var infoCmd = &cli.Command{ fmt.Printf("Reserved memory: %s\n", types.SizeStr(types.NewInt(info.Resources.MemReserved))) fmt.Printf("Task types: ") - tasks := make([]sealtasks.TaskType, 0, len(tt)) - for taskType := range tt { - tasks = append(tasks, taskType) - } - sort.Slice(tasks, func(i, j int) bool { - return tasks[i].Less(tasks[j]) - }) - for _, t := range tasks { + for _, t := range ttList(tt) { fmt.Printf("%s ", t.Short()) } fmt.Println() @@ -101,3 +94,14 @@ var infoCmd = &cli.Command{ return nil }, } + +func ttList(tt map[sealtasks.TaskType]struct{}) []sealtasks.TaskType { + tasks := make([]sealtasks.TaskType, 0, len(tt)) + for taskType := range tt { + tasks = append(tasks, taskType) + } + sort.Slice(tasks, func(i, j int) bool { + return tasks[i].Less(tasks[j]) + }) + return tasks +} diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index b1df138c7..8726a6e0d 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -59,6 +59,7 @@ func main() { storageCmd, setCmd, waitQuietCmd, + tasksCmd, } app := &cli.App{ diff --git a/cmd/lotus-seal-worker/tasks.go b/cmd/lotus-seal-worker/tasks.go new file mode 100644 index 000000000..08f956ebe --- /dev/null +++ b/cmd/lotus-seal-worker/tasks.go @@ -0,0 +1,83 @@ +package main + +import ( + "context" + "strings" + + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/api" + lcli "github.com/filecoin-project/lotus/cli" + "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" +) + +var tasksCmd = &cli.Command{ + Name: "tasks", + Usage: "Manage task processing", + Subcommands: []*cli.Command{ + tasksEnableCmd, + tasksDisableCmd, + }, +} + +var allowSetting = map[sealtasks.TaskType]struct{}{ + sealtasks.TTAddPiece: {}, + sealtasks.TTPreCommit1: {}, + sealtasks.TTPreCommit2: {}, + sealtasks.TTCommit2: {}, + sealtasks.TTCommit1: {}, + sealtasks.TTUnseal: {}, +} + +var settableStr = func() string { + var s []string + for _, tt := range ttList(allowSetting) { + s = append(s, tt.Short()) + } + return strings.Join(s, "|") +}() + +var tasksEnableCmd = &cli.Command{ + Name: "enable", + Usage: "Enable a task type", + ArgsUsage: "[" + settableStr + "]", + Action: taskAction(api.WorkerAPI.EnableTask), +} + +var tasksDisableCmd = &cli.Command{ + Name: "disable", + Usage: "Disable a task type", + ArgsUsage: "[" + settableStr + "]", + Action: taskAction(api.WorkerAPI.DisableTask), +} + +func taskAction(tf func(a api.WorkerAPI, ctx context.Context, tt sealtasks.TaskType) error) func(cctx *cli.Context) error { + return func(cctx *cli.Context) error { + if cctx.NArg() != 0 { + return xerrors.Errorf("expected 1 argument") + } + + var tt sealtasks.TaskType + for taskType := range allowSetting { + if taskType.Short() == cctx.Args().First() { + tt = taskType + break + } + } + + if tt == "" { + return xerrors.Errorf("unknown task type '%s'", cctx.Args().First()) + } + + api, closer, err := lcli.GetWorkerAPI(cctx) + if err != nil { + return err + } + defer closer() + + ctx := lcli.ReqContext(cctx) + + return tf(api, ctx, tt) + } +} diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index c069d7bf7..40e077db9 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -49,6 +49,7 @@ type LocalWorker struct { ct *workerCallTracker acceptTasks map[sealtasks.TaskType]struct{} running sync.WaitGroup + taskLk sync.Mutex session uuid.UUID testDisable int64 @@ -457,9 +458,28 @@ func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector st } func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) { + l.taskLk.Lock() + defer l.taskLk.Unlock() + return l.acceptTasks, nil } +func (l *LocalWorker) DisableTask(ctx context.Context, tt sealtasks.TaskType) error { + l.taskLk.Lock() + defer l.taskLk.Unlock() + + delete(l.acceptTasks, tt) + return nil +} + +func (l *LocalWorker) EnableTask(ctx context.Context, tt sealtasks.TaskType) error { + l.taskLk.Lock() + defer l.taskLk.Unlock() + + l.acceptTasks[tt] = struct{}{} + return nil +} + func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) { return l.localStore.Local(ctx) } diff --git a/go.sum b/go.sum index 736d3b378..2196eda90 100644 --- a/go.sum +++ b/go.sum @@ -698,6 +698,7 @@ github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0 github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= @@ -1210,6 +1211,7 @@ github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJE github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= github.com/nikkolasg/hexjson v0.0.0-20181101101858-78e39397e00c h1:5bFTChQxSKNwy8ALwOebjekYExl9HTT9urdawqC95tA= github.com/nikkolasg/hexjson v0.0.0-20181101101858-78e39397e00c/go.mod h1:7qN3Y0BvzRUf4LofcoJplQL10lsFDb4PYlePTVwrP28= +github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229 h1:E2B8qYyeSgv5MXpmzZXRNp8IAQ4vjxIjhpAf5hv/tAg= github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -1423,7 +1425,9 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX github.com/urfave/cli/v2 v2.0.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= github.com/urfave/cli/v2 v2.2.0 h1:JTTnM6wKzdA0Jqodd966MVj4vWbbquZykeX1sKbe2C4= github.com/urfave/cli/v2 v2.2.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasttemplate v1.0.1 h1:tY9CJiPnMXf1ERmG2EyK7gNUd+c6RKGD0IfU8WdUSz8= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=