diff --git a/chain/deals/handler.go b/chain/deals/handler.go index 98f789294..112f827f5 100644 --- a/chain/deals/handler.go +++ b/chain/deals/handler.go @@ -124,7 +124,7 @@ func (h *Handler) Run(ctx context.Context) { // TODO: restore state go func() { - defer log.Error("quitting deal handler loop") + defer log.Warn("quitting deal handler loop") defer close(h.stopped) for { diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 8ff51804c..d41eac887 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -2,8 +2,6 @@ package sub import ( "context" - "fmt" - logging "github.com/ipfs/go-log" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -17,6 +15,10 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha for { msg, err := bsub.Next(ctx) if err != nil { + if ctx.Err() != nil { + log.Warn("quitting HandleIncomingBlocks loop") + return + } log.Error("error from block subscription: ", err) continue } @@ -55,7 +57,11 @@ func HandleIncomingMessages(ctx context.Context, mpool *chain.MessagePool, msub for { msg, err := msub.Next(ctx) if err != nil { - fmt.Println("error from message subscription: ", err) + log.Warn("error from message subscription: ", err) + if ctx.Err() != nil { + log.Warn("quitting HandleIncomingMessages loop") + return + } continue } diff --git a/cmd/lotus-storage-miner/main.go b/cmd/lotus-storage-miner/main.go index d91745783..131a4b42d 100644 --- a/cmd/lotus-storage-miner/main.go +++ b/cmd/lotus-storage-miner/main.go @@ -67,7 +67,7 @@ func main() { } if err := app.Run(os.Args); err != nil { - log.Error(err) + log.Warn(err) return } } diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 8696c7721..63cd3a53f 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -1,8 +1,12 @@ package main import ( + "context" + "github.com/filecoin-project/go-lotus/lib/valctx" "net/http" "os" + "os/signal" + "syscall" "github.com/multiformats/go-multiaddr" "golang.org/x/xerrors" @@ -32,7 +36,7 @@ var runCmd = &cli.Command{ if err != nil { return err } - ctx := lcli.ReqContext(cctx) + ctx := &valctx.Context{Parent: lcli.ReqContext(cctx)} go func() { // a hack for now to handle sigint @@ -61,7 +65,7 @@ var runCmd = &cli.Command{ } var minerapi api.StorageMiner - err = node.New(ctx, + stop, err := node.New(ctx, node.StorageMiner(&minerapi), node.Online(), node.Repo(r), @@ -101,6 +105,23 @@ var runCmd = &cli.Command{ } http.Handle("/rpc/v0", ah) - return http.ListenAndServe("127.0.0.1:"+cctx.String("api"), http.DefaultServeMux) + + srv := &http.Server{Addr: "127.0.0.1:" + cctx.String("api"), Handler: http.DefaultServeMux} + + sigChan := make(chan os.Signal, 2) + go func() { + <-sigChan + log.Warn("Shutting down..") + if err := stop(context.TODO()); err != nil { + log.Errorf("graceful shutting down failed: %s", err) + } + if err := srv.Shutdown(context.TODO()); err != nil { + log.Errorf("shutting down RPC server failed: %s", err) + } + log.Warn("Graceful shutdown successful") + }() + signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) + + return srv.ListenAndServe() }, } diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 2fc4e38b4..fa287ab77 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -65,7 +65,7 @@ var DaemonCmd = &cli.Command{ } var api api.FullNode - err = node.New(ctx, + stop, err := node.New(ctx, node.FullAPI(&api), node.Online(), @@ -86,6 +86,6 @@ var DaemonCmd = &cli.Command{ } // TODO: properly parse api endpoint (or make it a URL) - return serveRPC(api, "127.0.0.1:"+cctx.String("api")) + return serveRPC(api, stop, "127.0.0.1:"+cctx.String("api")) }, } diff --git a/cmd/lotus/main.go b/cmd/lotus/main.go index 3b6c194d8..894ca0b09 100644 --- a/cmd/lotus/main.go +++ b/cmd/lotus/main.go @@ -2,7 +2,6 @@ package main import ( "context" - "log" "os" logging "github.com/ipfs/go-log" @@ -65,7 +64,7 @@ func main() { Code: trace.StatusCodeFailedPrecondition, Message: err.Error(), }) - log.Printf("%+v\n", err) + log.Warn(err) } return } diff --git a/cmd/lotus/rpc.go b/cmd/lotus/rpc.go index 68ac13c87..358e4cd3e 100644 --- a/cmd/lotus/rpc.go +++ b/cmd/lotus/rpc.go @@ -1,14 +1,22 @@ package main import ( + "context" "net/http" + "os" + "os/signal" + "syscall" "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/lib/auth" "github.com/filecoin-project/go-lotus/lib/jsonrpc" + "github.com/filecoin-project/go-lotus/node" + logging "github.com/ipfs/go-log" ) -func serveRPC(a api.FullNode, addr string) error { +var log = logging.Logger("main") + +func serveRPC(a api.FullNode, stop node.StopFunc, addr string) error { rpcServer := jsonrpc.NewServer() rpcServer.Register("Filecoin", api.PermissionedFullAPI(a)) @@ -18,5 +26,20 @@ func serveRPC(a api.FullNode, addr string) error { } http.Handle("/rpc/v0", ah) - return http.ListenAndServe(addr, http.DefaultServeMux) + + srv := &http.Server{Addr: addr, Handler: http.DefaultServeMux} + + sigChan := make(chan os.Signal, 2) + go func() { + <-sigChan + if err := stop(context.TODO()); err != nil { + log.Errorf("graceful shutting down failed: %s", err) + } + if err := srv.Shutdown(context.TODO()); err != nil { + log.Errorf("shutting down RPC server failed: %s", err) + } + }() + signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) + + return srv.ListenAndServe() } diff --git a/lib/valctx/context.go b/lib/valctx/context.go new file mode 100644 index 000000000..82983cc6e --- /dev/null +++ b/lib/valctx/context.go @@ -0,0 +1,28 @@ +package valctx + +import ( + "context" + "time" +) + +type Context struct { + Parent context.Context +} + +func (c *Context) Deadline() (deadline time.Time, ok bool) { + return +} + +func (c *Context) Done() <-chan struct{} { + return nil +} + +func (c *Context) Err() error { + return nil +} + +func (c *Context) Value(key interface{}) interface{} { + return c.Parent.Value(key) +} + +var _ context.Context = &Context{} diff --git a/node/builder.go b/node/builder.go index 70e489285..db0ee64f0 100644 --- a/node/builder.go +++ b/node/builder.go @@ -337,8 +337,10 @@ func FullAPI(out *api.FullNode) Option { } } +type StopFunc func(context.Context) error + // New builds and starts new Filecoin node -func New(ctx context.Context, opts ...Option) error { +func New(ctx context.Context, opts ...Option) (StopFunc, error) { settings := Settings{ modules: map[interface{}]fx.Option{}, invokes: make([]fx.Option, _nInvokes), @@ -346,7 +348,7 @@ func New(ctx context.Context, opts ...Option) error { // apply module options in the right order if err := Options(Options(defaults()...), Options(opts...))(&settings); err != nil { - return err + return nil, err } // gather constructors for fx.Options @@ -374,10 +376,10 @@ func New(ctx context.Context, opts ...Option) error { // correctly if err := app.Start(ctx); err != nil { // comment fx.NopLogger few lines above for easier debugging - return err + return nil, err } - return nil + return app.Stop, nil } // In-memory / testing diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 830dcdf97..83e507e55 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -147,9 +147,13 @@ func RegisterMiner(lc fx.Lifecycle, ds dtypes.MetadataDS, api api.FullNode) erro lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { - log.Infof("registering miner '%s' with full node", minerAddr) + log.Infof("Registering miner '%s' with full node", minerAddr) return api.MinerRegister(ctx, minerAddr) }, + OnStop: func(ctx context.Context) error { + log.Infof("Unregistering miner '%s' from full node", minerAddr) + return api.MinerUnregister(ctx, minerAddr) + }, }) return nil }