From 553408d57348d290fe544d7464bd9b452117ccc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 27 Oct 2020 17:28:44 +0100 Subject: [PATCH] worker: Commands to pause/resume task processing --- api/api_worker.go | 14 ++++++++++ api/apistruct/struct.go | 24 ++++++++++++++++- cmd/lotus-seal-worker/cli.go | 51 +++++++++++++++++++++++++++++++++++ cmd/lotus-seal-worker/info.go | 12 +++++++++ cmd/lotus-seal-worker/main.go | 2 ++ cmd/lotus-seal-worker/rpc.go | 34 +++++++++++++++++++++++ 6 files changed, 136 insertions(+), 1 deletion(-) create mode 100644 cmd/lotus-seal-worker/cli.go diff --git a/api/api_worker.go b/api/api_worker.go index 036748ec6..805b23bc1 100644 --- a/api/api_worker.go +++ b/api/api_worker.go @@ -28,5 +28,19 @@ type WorkerAPI interface { StorageAddLocal(ctx context.Context, path string) error + // SetEnabled marks the worker as enabled/disabled. Not that this setting + // may take a few seconds to propagate to task scheduler + SetEnabled(ctx context.Context, enabled bool) error + + Enabled(ctx context.Context) (bool, error) + + // WaitQuiet blocks until there are no tasks running + WaitQuiet(ctx context.Context) error + + // returns a random UUID of worker session, generated randomly when worker + // process starts + ProcessSession(context.Context) (uuid.UUID, error) + + // Like ProcessSession, but returns an error when worker is disabled Session(context.Context) (uuid.UUID, error) } diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 3a4ae75a8..5448e318e 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -385,7 +385,13 @@ type WorkerStruct struct { Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"` StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"` - Session func(context.Context) (uuid.UUID, error) `perm:"admin"` + SetEnabled func(ctx context.Context, enabled bool) error `perm:"admin"` + Enabled func(ctx context.Context) (bool, error) `perm:"admin"` + + WaitQuiet func(ctx context.Context) error `perm:"admin"` + + ProcessSession func(context.Context) (uuid.UUID, error) `perm:"admin"` + Session func(context.Context) (uuid.UUID, error) `perm:"admin"` } } @@ -1544,6 +1550,22 @@ func (w *WorkerStruct) StorageAddLocal(ctx context.Context, path string) error { return w.Internal.StorageAddLocal(ctx, path) } +func (w *WorkerStruct) SetEnabled(ctx context.Context, enabled bool) error { + return w.Internal.SetEnabled(ctx, enabled) +} + +func (w *WorkerStruct) Enabled(ctx context.Context) (bool, error) { + return w.Internal.Enabled(ctx) +} + +func (w *WorkerStruct) WaitQuiet(ctx context.Context) error { + return w.Internal.WaitQuiet(ctx) +} + +func (w *WorkerStruct) ProcessSession(ctx context.Context) (uuid.UUID, error) { + return w.Internal.ProcessSession(ctx) +} + func (w *WorkerStruct) Session(ctx context.Context) (uuid.UUID, error) { return w.Internal.Session(ctx) } diff --git a/cmd/lotus-seal-worker/cli.go b/cmd/lotus-seal-worker/cli.go new file mode 100644 index 000000000..b1501fca7 --- /dev/null +++ b/cmd/lotus-seal-worker/cli.go @@ -0,0 +1,51 @@ +package main + +import ( + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + + lcli "github.com/filecoin-project/lotus/cli" +) + +var setCmd = &cli.Command{ + Name: "set", + Usage: "Manage worker settings", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "enabled", + Usage: "enable/disable new task processing", + Value: true, + }, + }, + Action: func(cctx *cli.Context) error { + api, closer, err := lcli.GetWorkerAPI(cctx) + if err != nil { + return err + } + defer closer() + + ctx := lcli.ReqContext(cctx) + + if err := api.SetEnabled(ctx, cctx.Bool("enabled")); err != nil { + return xerrors.Errorf("SetEnabled: %w", err) + } + + return nil + }, +} + +var waitQuietCmd = &cli.Command{ + Name: "wait-quiet", + Usage: "Block until all running tasks exit", + Action: func(cctx *cli.Context) error { + api, closer, err := lcli.GetWorkerAPI(cctx) + if err != nil { + return err + } + defer closer() + + ctx := lcli.ReqContext(cctx) + + return api.WaitQuiet(ctx) + }, +} diff --git a/cmd/lotus-seal-worker/info.go b/cmd/lotus-seal-worker/info.go index 9b08a0c80..3388d8a59 100644 --- a/cmd/lotus-seal-worker/info.go +++ b/cmd/lotus-seal-worker/info.go @@ -32,6 +32,18 @@ var infoCmd = &cli.Command{ cli.VersionPrinter(cctx) fmt.Println() + sess, err := api.ProcessSession(ctx) + if err != nil { + return xerrors.Errorf("getting session: %w", err) + } + fmt.Printf("Session: %s\n", sess) + + enabled, err := api.Enabled(ctx) + if err != nil { + return xerrors.Errorf("checking worker status: %w", err) + } + fmt.Printf("Enabled: %t", enabled) + info, err := api.Info(ctx) if err != nil { return xerrors.Errorf("getting info: %w", err) diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index 36c9d5eff..6c2fce582 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -58,6 +58,8 @@ func main() { runCmd, infoCmd, storageCmd, + setCmd, + waitQuietCmd, } app := &cli.App{ diff --git a/cmd/lotus-seal-worker/rpc.go b/cmd/lotus-seal-worker/rpc.go index b543babbf..f4e8494d0 100644 --- a/cmd/lotus-seal-worker/rpc.go +++ b/cmd/lotus-seal-worker/rpc.go @@ -2,7 +2,9 @@ package main import ( "context" + "sync/atomic" + "github.com/google/uuid" "github.com/mitchellh/go-homedir" "golang.org/x/xerrors" @@ -17,6 +19,8 @@ type worker struct { localStore *stores.Local ls stores.LocalStorage + + disabled int64 } func (w *worker) Version(context.Context) (build.Version, error) { @@ -42,4 +46,34 @@ func (w *worker) StorageAddLocal(ctx context.Context, path string) error { return nil } +func (w *worker) SetEnabled(ctx context.Context, enabled bool) error { + disabled := int64(1) + if enabled { + disabled = 0 + } + atomic.StoreInt64(&w.disabled, disabled) + return nil +} + +func (w *worker) Enabled(ctx context.Context) (bool, error) { + return atomic.LoadInt64(&w.disabled) == 0, nil +} + +func (w *worker) WaitQuiet(ctx context.Context) error { + w.LocalWorker.WaitQuiet() // uses WaitGroup under the hood so no ctx :/ + return nil +} + +func (w *worker) ProcessSession(ctx context.Context) (uuid.UUID, error) { + return w.LocalWorker.Session(ctx) +} + +func (w *worker) Session(ctx context.Context) (uuid.UUID, error) { + if atomic.LoadInt64(&w.disabled) == 1 { + return uuid.UUID{}, xerrors.Errorf("worker disabled") + } + + return w.LocalWorker.Session(ctx) +} + var _ storiface.WorkerCalls = &worker{}