Add stop cmd for lotus worker

This commit is contained in:
Shrenuj Bansal 2022-07-29 18:33:30 -04:00
parent a843c52e38
commit dfea74cca8
6 changed files with 67 additions and 0 deletions

View File

@ -76,6 +76,10 @@ type Worker interface {
// Like ProcessSession, but returns an error when worker is disabled
Session(context.Context) (uuid.UUID, error) //perm:admin
// Trigger shutdown
Shutdown(context.Context) error //perm:admin
}
var _ storiface.WorkerCalls = *new(Worker)

View File

@ -965,6 +965,8 @@ type WorkerStruct struct {
SetEnabled func(p0 context.Context, p1 bool) error `perm:"admin"`
Shutdown func(p0 context.Context) error `perm:"admin"`
StorageAddLocal func(p0 context.Context, p1 string) error `perm:"admin"`
TaskDisable func(p0 context.Context, p1 sealtasks.TaskType) error `perm:"admin"`
@ -5560,6 +5562,17 @@ func (s *WorkerStub) SetEnabled(p0 context.Context, p1 bool) error {
return ErrNotSupported
}
func (s *WorkerStruct) Shutdown(p0 context.Context) error {
if s.Internal.Shutdown == nil {
return ErrNotSupported
}
return s.Internal.Shutdown(p0)
}
func (s *WorkerStub) Shutdown(p0 context.Context) error {
return ErrNotSupported
}
func (s *WorkerStruct) StorageAddLocal(p0 context.Context, p1 string) error {
if s.Internal.StorageAddLocal == nil {
return ErrNotSupported

View File

@ -55,6 +55,7 @@ func main() {
local := []*cli.Command{
runCmd,
stopCmd,
infoCmd,
storageCmd,
setCmd,
@ -115,6 +116,27 @@ func main() {
}
}
var stopCmd = &cli.Command{
Name: "stop",
Usage: "Stop a running lotus worker",
Flags: []cli.Flag{},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetWorkerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
err = api.Shutdown(ctx)
if err != nil {
return err
}
return nil
},
}
var runCmd = &cli.Command{
Name: "run",
Usage: "Start lotus worker",
@ -623,6 +645,15 @@ var runCmd = &cli.Command{
}
}()
go func() {
<-workerApi.Done()
log.Warn("Shutting down...")
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
}
log.Warn("Graceful shutdown successful")
}()
return srv.Serve(nl)
},
}

View File

@ -118,4 +118,8 @@ func (w *Worker) Discover(ctx context.Context) (apitypes.OpenRPCDocument, error)
return build.OpenRPCDiscoverJSON_Worker(), nil
}
func (w *Worker) Shutdown(ctx context.Context) error {
return w.LocalWorker.Close()
}
var _ storiface.WorkerCalls = &Worker{}

View File

@ -6,6 +6,7 @@
* [Paths](#Paths)
* [Remove](#Remove)
* [Session](#Session)
* [Shutdown](#Shutdown)
* [Version](#Version)
* [Add](#Add)
* [AddPiece](#AddPiece)
@ -1453,6 +1454,16 @@ Inputs: `null`
Response: `"07070707-0707-0707-0707-070707070707"`
### Shutdown
Trigger shutdown
Perms: admin
Inputs: `null`
Response: `{}`
### Version

View File

@ -829,6 +829,10 @@ func (l *LocalWorker) Close() error {
return nil
}
func (l *LocalWorker) Done() <-chan struct{} {
return l.closing
}
// WaitQuiet blocks as long as there are tasks running
func (l *LocalWorker) WaitQuiet() {
l.running.Wait()