worker: Commands to pause/resume task processing
This commit is contained in:
parent
32ea060e99
commit
553408d573
@ -28,5 +28,19 @@ type WorkerAPI interface {
|
|||||||
|
|
||||||
StorageAddLocal(ctx context.Context, path string) error
|
StorageAddLocal(ctx context.Context, path string) error
|
||||||
|
|
||||||
|
// SetEnabled marks the worker as enabled/disabled. Not that this setting
|
||||||
|
// may take a few seconds to propagate to task scheduler
|
||||||
|
SetEnabled(ctx context.Context, enabled bool) error
|
||||||
|
|
||||||
|
Enabled(ctx context.Context) (bool, error)
|
||||||
|
|
||||||
|
// WaitQuiet blocks until there are no tasks running
|
||||||
|
WaitQuiet(ctx context.Context) error
|
||||||
|
|
||||||
|
// returns a random UUID of worker session, generated randomly when worker
|
||||||
|
// process starts
|
||||||
|
ProcessSession(context.Context) (uuid.UUID, error)
|
||||||
|
|
||||||
|
// Like ProcessSession, but returns an error when worker is disabled
|
||||||
Session(context.Context) (uuid.UUID, error)
|
Session(context.Context) (uuid.UUID, error)
|
||||||
}
|
}
|
||||||
|
@ -385,7 +385,13 @@ type WorkerStruct struct {
|
|||||||
Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
|
Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
|
||||||
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`
|
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`
|
||||||
|
|
||||||
Session func(context.Context) (uuid.UUID, error) `perm:"admin"`
|
SetEnabled func(ctx context.Context, enabled bool) error `perm:"admin"`
|
||||||
|
Enabled func(ctx context.Context) (bool, error) `perm:"admin"`
|
||||||
|
|
||||||
|
WaitQuiet func(ctx context.Context) error `perm:"admin"`
|
||||||
|
|
||||||
|
ProcessSession func(context.Context) (uuid.UUID, error) `perm:"admin"`
|
||||||
|
Session func(context.Context) (uuid.UUID, error) `perm:"admin"`
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1544,6 +1550,22 @@ func (w *WorkerStruct) StorageAddLocal(ctx context.Context, path string) error {
|
|||||||
return w.Internal.StorageAddLocal(ctx, path)
|
return w.Internal.StorageAddLocal(ctx, path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *WorkerStruct) SetEnabled(ctx context.Context, enabled bool) error {
|
||||||
|
return w.Internal.SetEnabled(ctx, enabled)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WorkerStruct) Enabled(ctx context.Context) (bool, error) {
|
||||||
|
return w.Internal.Enabled(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WorkerStruct) WaitQuiet(ctx context.Context) error {
|
||||||
|
return w.Internal.WaitQuiet(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WorkerStruct) ProcessSession(ctx context.Context) (uuid.UUID, error) {
|
||||||
|
return w.Internal.ProcessSession(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
func (w *WorkerStruct) Session(ctx context.Context) (uuid.UUID, error) {
|
func (w *WorkerStruct) Session(ctx context.Context) (uuid.UUID, error) {
|
||||||
return w.Internal.Session(ctx)
|
return w.Internal.Session(ctx)
|
||||||
}
|
}
|
||||||
|
51
cmd/lotus-seal-worker/cli.go
Normal file
51
cmd/lotus-seal-worker/cli.go
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/urfave/cli/v2"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
lcli "github.com/filecoin-project/lotus/cli"
|
||||||
|
)
|
||||||
|
|
||||||
|
var setCmd = &cli.Command{
|
||||||
|
Name: "set",
|
||||||
|
Usage: "Manage worker settings",
|
||||||
|
Flags: []cli.Flag{
|
||||||
|
&cli.BoolFlag{
|
||||||
|
Name: "enabled",
|
||||||
|
Usage: "enable/disable new task processing",
|
||||||
|
Value: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Action: func(cctx *cli.Context) error {
|
||||||
|
api, closer, err := lcli.GetWorkerAPI(cctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer closer()
|
||||||
|
|
||||||
|
ctx := lcli.ReqContext(cctx)
|
||||||
|
|
||||||
|
if err := api.SetEnabled(ctx, cctx.Bool("enabled")); err != nil {
|
||||||
|
return xerrors.Errorf("SetEnabled: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var waitQuietCmd = &cli.Command{
|
||||||
|
Name: "wait-quiet",
|
||||||
|
Usage: "Block until all running tasks exit",
|
||||||
|
Action: func(cctx *cli.Context) error {
|
||||||
|
api, closer, err := lcli.GetWorkerAPI(cctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer closer()
|
||||||
|
|
||||||
|
ctx := lcli.ReqContext(cctx)
|
||||||
|
|
||||||
|
return api.WaitQuiet(ctx)
|
||||||
|
},
|
||||||
|
}
|
@ -32,6 +32,18 @@ var infoCmd = &cli.Command{
|
|||||||
cli.VersionPrinter(cctx)
|
cli.VersionPrinter(cctx)
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
|
|
||||||
|
sess, err := api.ProcessSession(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("getting session: %w", err)
|
||||||
|
}
|
||||||
|
fmt.Printf("Session: %s\n", sess)
|
||||||
|
|
||||||
|
enabled, err := api.Enabled(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("checking worker status: %w", err)
|
||||||
|
}
|
||||||
|
fmt.Printf("Enabled: %t", enabled)
|
||||||
|
|
||||||
info, err := api.Info(ctx)
|
info, err := api.Info(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("getting info: %w", err)
|
return xerrors.Errorf("getting info: %w", err)
|
||||||
|
@ -58,6 +58,8 @@ func main() {
|
|||||||
runCmd,
|
runCmd,
|
||||||
infoCmd,
|
infoCmd,
|
||||||
storageCmd,
|
storageCmd,
|
||||||
|
setCmd,
|
||||||
|
waitQuietCmd,
|
||||||
}
|
}
|
||||||
|
|
||||||
app := &cli.App{
|
app := &cli.App{
|
||||||
|
@ -2,7 +2,9 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
"github.com/mitchellh/go-homedir"
|
"github.com/mitchellh/go-homedir"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
@ -17,6 +19,8 @@ type worker struct {
|
|||||||
|
|
||||||
localStore *stores.Local
|
localStore *stores.Local
|
||||||
ls stores.LocalStorage
|
ls stores.LocalStorage
|
||||||
|
|
||||||
|
disabled int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *worker) Version(context.Context) (build.Version, error) {
|
func (w *worker) Version(context.Context) (build.Version, error) {
|
||||||
@ -42,4 +46,34 @@ func (w *worker) StorageAddLocal(ctx context.Context, path string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *worker) SetEnabled(ctx context.Context, enabled bool) error {
|
||||||
|
disabled := int64(1)
|
||||||
|
if enabled {
|
||||||
|
disabled = 0
|
||||||
|
}
|
||||||
|
atomic.StoreInt64(&w.disabled, disabled)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *worker) Enabled(ctx context.Context) (bool, error) {
|
||||||
|
return atomic.LoadInt64(&w.disabled) == 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *worker) WaitQuiet(ctx context.Context) error {
|
||||||
|
w.LocalWorker.WaitQuiet() // uses WaitGroup under the hood so no ctx :/
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *worker) ProcessSession(ctx context.Context) (uuid.UUID, error) {
|
||||||
|
return w.LocalWorker.Session(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *worker) Session(ctx context.Context) (uuid.UUID, error) {
|
||||||
|
if atomic.LoadInt64(&w.disabled) == 1 {
|
||||||
|
return uuid.UUID{}, xerrors.Errorf("worker disabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
return w.LocalWorker.Session(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
var _ storiface.WorkerCalls = &worker{}
|
var _ storiface.WorkerCalls = &worker{}
|
||||||
|
Loading…
Reference in New Issue
Block a user