Merge pull request #4615 from filecoin-project/feat/worker-pause

worker: Commands to pause/resume task processing
This commit is contained in:
Łukasz Magiera 2020-10-31 00:26:37 +01:00 committed by GitHub
commit 6fcdf3c7ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 136 additions and 1 deletions

View File

@ -28,5 +28,19 @@ type WorkerAPI interface {
StorageAddLocal(ctx context.Context, path string) error
// SetEnabled marks the worker as enabled/disabled. Not that this setting
// may take a few seconds to propagate to task scheduler
SetEnabled(ctx context.Context, enabled bool) error
Enabled(ctx context.Context) (bool, error)
// WaitQuiet blocks until there are no tasks running
WaitQuiet(ctx context.Context) error
// returns a random UUID of worker session, generated randomly when worker
// process starts
ProcessSession(context.Context) (uuid.UUID, error)
// Like ProcessSession, but returns an error when worker is disabled
Session(context.Context) (uuid.UUID, error)
}

View File

@ -385,7 +385,13 @@ type WorkerStruct struct {
Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`
Session func(context.Context) (uuid.UUID, error) `perm:"admin"`
SetEnabled func(ctx context.Context, enabled bool) error `perm:"admin"`
Enabled func(ctx context.Context) (bool, error) `perm:"admin"`
WaitQuiet func(ctx context.Context) error `perm:"admin"`
ProcessSession func(context.Context) (uuid.UUID, error) `perm:"admin"`
Session func(context.Context) (uuid.UUID, error) `perm:"admin"`
}
}
@ -1544,6 +1550,22 @@ func (w *WorkerStruct) StorageAddLocal(ctx context.Context, path string) error {
return w.Internal.StorageAddLocal(ctx, path)
}
func (w *WorkerStruct) SetEnabled(ctx context.Context, enabled bool) error {
return w.Internal.SetEnabled(ctx, enabled)
}
func (w *WorkerStruct) Enabled(ctx context.Context) (bool, error) {
return w.Internal.Enabled(ctx)
}
func (w *WorkerStruct) WaitQuiet(ctx context.Context) error {
return w.Internal.WaitQuiet(ctx)
}
func (w *WorkerStruct) ProcessSession(ctx context.Context) (uuid.UUID, error) {
return w.Internal.ProcessSession(ctx)
}
func (w *WorkerStruct) Session(ctx context.Context) (uuid.UUID, error) {
return w.Internal.Session(ctx)
}

View File

@ -0,0 +1,51 @@
package main
import (
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
lcli "github.com/filecoin-project/lotus/cli"
)
var setCmd = &cli.Command{
Name: "set",
Usage: "Manage worker settings",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "enabled",
Usage: "enable/disable new task processing",
Value: true,
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetWorkerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
if err := api.SetEnabled(ctx, cctx.Bool("enabled")); err != nil {
return xerrors.Errorf("SetEnabled: %w", err)
}
return nil
},
}
var waitQuietCmd = &cli.Command{
Name: "wait-quiet",
Usage: "Block until all running tasks exit",
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetWorkerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
return api.WaitQuiet(ctx)
},
}

View File

@ -32,6 +32,18 @@ var infoCmd = &cli.Command{
cli.VersionPrinter(cctx)
fmt.Println()
sess, err := api.ProcessSession(ctx)
if err != nil {
return xerrors.Errorf("getting session: %w", err)
}
fmt.Printf("Session: %s\n", sess)
enabled, err := api.Enabled(ctx)
if err != nil {
return xerrors.Errorf("checking worker status: %w", err)
}
fmt.Printf("Enabled: %t", enabled)
info, err := api.Info(ctx)
if err != nil {
return xerrors.Errorf("getting info: %w", err)

View File

@ -58,6 +58,8 @@ func main() {
runCmd,
infoCmd,
storageCmd,
setCmd,
waitQuietCmd,
}
app := &cli.App{

View File

@ -2,7 +2,9 @@ package main
import (
"context"
"sync/atomic"
"github.com/google/uuid"
"github.com/mitchellh/go-homedir"
"golang.org/x/xerrors"
@ -17,6 +19,8 @@ type worker struct {
localStore *stores.Local
ls stores.LocalStorage
disabled int64
}
func (w *worker) Version(context.Context) (build.Version, error) {
@ -42,4 +46,34 @@ func (w *worker) StorageAddLocal(ctx context.Context, path string) error {
return nil
}
func (w *worker) SetEnabled(ctx context.Context, enabled bool) error {
disabled := int64(1)
if enabled {
disabled = 0
}
atomic.StoreInt64(&w.disabled, disabled)
return nil
}
func (w *worker) Enabled(ctx context.Context) (bool, error) {
return atomic.LoadInt64(&w.disabled) == 0, nil
}
func (w *worker) WaitQuiet(ctx context.Context) error {
w.LocalWorker.WaitQuiet() // uses WaitGroup under the hood so no ctx :/
return nil
}
func (w *worker) ProcessSession(ctx context.Context) (uuid.UUID, error) {
return w.LocalWorker.Session(ctx)
}
func (w *worker) Session(ctx context.Context) (uuid.UUID, error) {
if atomic.LoadInt64(&w.disabled) == 1 {
return uuid.UUID{}, xerrors.Errorf("worker disabled")
}
return w.LocalWorker.Session(ctx)
}
var _ storiface.WorkerCalls = &worker{}