node: Basic graceful shutdown

This commit is contained in:
Łukasz Magiera 2019-09-17 16:23:08 +02:00
parent 215f95aa5b
commit 83f1a336a6
10 changed files with 102 additions and 19 deletions

View File

@ -124,7 +124,7 @@ func (h *Handler) Run(ctx context.Context) {
// TODO: restore state // TODO: restore state
go func() { go func() {
defer log.Error("quitting deal handler loop") defer log.Warn("quitting deal handler loop")
defer close(h.stopped) defer close(h.stopped)
for { for {

View File

@ -2,8 +2,6 @@ package sub
import ( import (
"context" "context"
"fmt"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
@ -17,6 +15,10 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
for { for {
msg, err := bsub.Next(ctx) msg, err := bsub.Next(ctx)
if err != nil { if err != nil {
if ctx.Err() != nil {
log.Warn("quitting HandleIncomingBlocks loop")
return
}
log.Error("error from block subscription: ", err) log.Error("error from block subscription: ", err)
continue continue
} }
@ -55,7 +57,11 @@ func HandleIncomingMessages(ctx context.Context, mpool *chain.MessagePool, msub
for { for {
msg, err := msub.Next(ctx) msg, err := msub.Next(ctx)
if err != nil { if err != nil {
fmt.Println("error from message subscription: ", err) log.Warn("error from message subscription: ", err)
if ctx.Err() != nil {
log.Warn("quitting HandleIncomingMessages loop")
return
}
continue continue
} }

View File

@ -67,7 +67,7 @@ func main() {
} }
if err := app.Run(os.Args); err != nil { if err := app.Run(os.Args); err != nil {
log.Error(err) log.Warn(err)
return return
} }
} }

View File

@ -1,8 +1,12 @@
package main package main
import ( import (
"context"
"github.com/filecoin-project/go-lotus/lib/valctx"
"net/http" "net/http"
"os" "os"
"os/signal"
"syscall"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -32,7 +36,7 @@ var runCmd = &cli.Command{
if err != nil { if err != nil {
return err return err
} }
ctx := lcli.ReqContext(cctx) ctx := &valctx.Context{Parent: lcli.ReqContext(cctx)}
go func() { go func() {
// a hack for now to handle sigint // a hack for now to handle sigint
@ -61,7 +65,7 @@ var runCmd = &cli.Command{
} }
var minerapi api.StorageMiner var minerapi api.StorageMiner
err = node.New(ctx, stop, err := node.New(ctx,
node.StorageMiner(&minerapi), node.StorageMiner(&minerapi),
node.Online(), node.Online(),
node.Repo(r), node.Repo(r),
@ -101,6 +105,23 @@ var runCmd = &cli.Command{
} }
http.Handle("/rpc/v0", ah) http.Handle("/rpc/v0", ah)
return http.ListenAndServe("127.0.0.1:"+cctx.String("api"), http.DefaultServeMux)
srv := &http.Server{Addr: "127.0.0.1:" + cctx.String("api"), Handler: http.DefaultServeMux}
sigChan := make(chan os.Signal, 2)
go func() {
<-sigChan
log.Warn("Shutting down..")
if err := stop(context.TODO()); err != nil {
log.Errorf("graceful shutting down failed: %s", err)
}
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
}
log.Warn("Graceful shutdown successful")
}()
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
return srv.ListenAndServe()
}, },
} }

View File

@ -65,7 +65,7 @@ var DaemonCmd = &cli.Command{
} }
var api api.FullNode var api api.FullNode
err = node.New(ctx, stop, err := node.New(ctx,
node.FullAPI(&api), node.FullAPI(&api),
node.Online(), node.Online(),
@ -86,6 +86,6 @@ var DaemonCmd = &cli.Command{
} }
// TODO: properly parse api endpoint (or make it a URL) // TODO: properly parse api endpoint (or make it a URL)
return serveRPC(api, "127.0.0.1:"+cctx.String("api")) return serveRPC(api, stop, "127.0.0.1:"+cctx.String("api"))
}, },
} }

View File

@ -2,7 +2,6 @@ package main
import ( import (
"context" "context"
"log"
"os" "os"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
@ -65,7 +64,7 @@ func main() {
Code: trace.StatusCodeFailedPrecondition, Code: trace.StatusCodeFailedPrecondition,
Message: err.Error(), Message: err.Error(),
}) })
log.Printf("%+v\n", err) log.Warn(err)
} }
return return
} }

View File

@ -1,14 +1,22 @@
package main package main
import ( import (
"context"
"net/http" "net/http"
"os"
"os/signal"
"syscall"
"github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/lib/auth" "github.com/filecoin-project/go-lotus/lib/auth"
"github.com/filecoin-project/go-lotus/lib/jsonrpc" "github.com/filecoin-project/go-lotus/lib/jsonrpc"
"github.com/filecoin-project/go-lotus/node"
logging "github.com/ipfs/go-log"
) )
func serveRPC(a api.FullNode, addr string) error { var log = logging.Logger("main")
func serveRPC(a api.FullNode, stop node.StopFunc, addr string) error {
rpcServer := jsonrpc.NewServer() rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", api.PermissionedFullAPI(a)) rpcServer.Register("Filecoin", api.PermissionedFullAPI(a))
@ -18,5 +26,20 @@ func serveRPC(a api.FullNode, addr string) error {
} }
http.Handle("/rpc/v0", ah) http.Handle("/rpc/v0", ah)
return http.ListenAndServe(addr, http.DefaultServeMux)
srv := &http.Server{Addr: addr, Handler: http.DefaultServeMux}
sigChan := make(chan os.Signal, 2)
go func() {
<-sigChan
if err := stop(context.TODO()); err != nil {
log.Errorf("graceful shutting down failed: %s", err)
}
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
}
}()
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
return srv.ListenAndServe()
} }

28
lib/valctx/context.go Normal file
View File

@ -0,0 +1,28 @@
package valctx
import (
"context"
"time"
)
type Context struct {
Parent context.Context
}
func (c *Context) Deadline() (deadline time.Time, ok bool) {
return
}
func (c *Context) Done() <-chan struct{} {
return nil
}
func (c *Context) Err() error {
return nil
}
func (c *Context) Value(key interface{}) interface{} {
return c.Parent.Value(key)
}
var _ context.Context = &Context{}

View File

@ -337,8 +337,10 @@ func FullAPI(out *api.FullNode) Option {
} }
} }
type StopFunc func(context.Context) error
// New builds and starts new Filecoin node // New builds and starts new Filecoin node
func New(ctx context.Context, opts ...Option) error { func New(ctx context.Context, opts ...Option) (StopFunc, error) {
settings := Settings{ settings := Settings{
modules: map[interface{}]fx.Option{}, modules: map[interface{}]fx.Option{},
invokes: make([]fx.Option, _nInvokes), invokes: make([]fx.Option, _nInvokes),
@ -346,7 +348,7 @@ func New(ctx context.Context, opts ...Option) error {
// apply module options in the right order // apply module options in the right order
if err := Options(Options(defaults()...), Options(opts...))(&settings); err != nil { if err := Options(Options(defaults()...), Options(opts...))(&settings); err != nil {
return err return nil, err
} }
// gather constructors for fx.Options // gather constructors for fx.Options
@ -374,10 +376,10 @@ func New(ctx context.Context, opts ...Option) error {
// correctly // correctly
if err := app.Start(ctx); err != nil { if err := app.Start(ctx); err != nil {
// comment fx.NopLogger few lines above for easier debugging // comment fx.NopLogger few lines above for easier debugging
return err return nil, err
} }
return nil return app.Stop, nil
} }
// In-memory / testing // In-memory / testing

View File

@ -147,9 +147,13 @@ func RegisterMiner(lc fx.Lifecycle, ds dtypes.MetadataDS, api api.FullNode) erro
lc.Append(fx.Hook{ lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error { OnStart: func(ctx context.Context) error {
log.Infof("registering miner '%s' with full node", minerAddr) log.Infof("Registering miner '%s' with full node", minerAddr)
return api.MinerRegister(ctx, minerAddr) return api.MinerRegister(ctx, minerAddr)
}, },
OnStop: func(ctx context.Context) error {
log.Infof("Unregistering miner '%s' from full node", minerAddr)
return api.MinerUnregister(ctx, minerAddr)
},
}) })
return nil return nil
} }