diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 5d67cf33d..20bf5defd 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -1,37 +1,27 @@ package main import ( - "context" - "net" - "net/http" + "fmt" _ "net/http/pprof" "os" - "os/signal" - "syscall" "github.com/filecoin-project/lotus/api/v1api" "github.com/filecoin-project/lotus/api/v0api" - mux "github.com/gorilla/mux" "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr/net" "github.com/urfave/cli/v2" "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" "golang.org/x/xerrors" - "github.com/filecoin-project/go-jsonrpc" - "github.com/filecoin-project/go-jsonrpc/auth" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/lib/ulimit" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node" - "github.com/filecoin-project/lotus/node/impl" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" ) @@ -164,54 +154,25 @@ var runCmd = &cli.Command{ log.Infof("Remote version %s", v) - lst, err := manet.Listen(endpoint) + // Instantiate the miner node handler. + handler, err := node.MinerHandler(minerapi) if err != nil { - return xerrors.Errorf("could not listen: %w", err) + return xerrors.Errorf("failed to instantiate rpc handler: %w", err) } - mux := mux.NewRouter() - - rpcServer := jsonrpc.NewServer() - rpcServer.Register("Filecoin", api.PermissionedStorMinerAPI(metrics.MetricedStorMinerAPI(minerapi))) - - mux.Handle("/rpc/v0", rpcServer) - mux.PathPrefix("/remote").HandlerFunc(minerapi.(*impl.StorageMinerAPI).ServeRemote) - mux.Handle("/debug/metrics", metrics.Exporter()) - mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof - - ah := &auth.Handler{ - Verify: minerapi.AuthVerify, - Next: mux.ServeHTTP, + // Serve the RPC. + rpcStopper, err := node.ServeRPC(handler, "lotus-miner", endpoint) + if err != nil { + return fmt.Errorf("failed to start json-rpc endpoint: %s", err) } - srv := &http.Server{ - Handler: ah, - BaseContext: func(listener net.Listener) context.Context { - ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-miner")) - return ctx - }, - } + // Monitor for shutdown. + finishCh := node.MonitorShutdown(shutdownChan, + node.ShutdownHandler{Component: "rpc server", StopFunc: rpcStopper}, + node.ShutdownHandler{Component: "miner", StopFunc: stop}, + ) - sigChan := make(chan os.Signal, 2) - go func() { - select { - case sig := <-sigChan: - log.Warnw("received shutdown", "signal", sig) - case <-shutdownChan: - log.Warn("received shutdown") - } - - log.Warn("Shutting down...") - if err := stop(context.TODO()); err != nil { - log.Errorf("graceful shutting down failed: %s", err) - } - if err := srv.Shutdown(context.TODO()); err != nil { - log.Errorf("shutting down RPC server failed: %s", err) - } - log.Warn("Graceful shutdown successful") - }() - signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) - - return srv.Serve(manet.NetListener(lst)) + <-finishCh + return nil }, } diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index ec4a638b4..715ab3dc2 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -15,6 +15,7 @@ import ( "runtime/pprof" "strings" + "github.com/filecoin-project/go-jsonrpc" paramfetch "github.com/filecoin-project/go-paramfetch" metricsprom "github.com/ipfs/go-metrics-prometheus" "github.com/mitchellh/go-homedir" @@ -351,10 +352,26 @@ var DaemonCmd = &cli.Command{ return xerrors.Errorf("getting api endpoint: %w", err) } - // Start the RPC server. - rpcStopper, err := node.ServeRPC(api, endpoint, int64(cctx.Int("api-max-req-size"))) + // + // Instantiate JSON-RPC endpoint. + // ---- + + // Populate JSON-RPC options. + serverOptions := make([]jsonrpc.ServerOption, 0) + if maxRequestSize := cctx.Int("api-max-req-size"); maxRequestSize != 0 { + serverOptions = append(serverOptions, jsonrpc.WithMaxRequestSize(int64(maxRequestSize))) + } + + // Instantiate the full node handler. + h, err := node.FullNodeHandler(api, serverOptions...) if err != nil { - return fmt.Errorf("failed to start JSON-RPC API: %s", err) + return fmt.Errorf("failed to instantiate rpc handler: %s", err) + } + + // Serve the RPC. + rpcStopper, err := node.ServeRPC(h, "lotus-daemon", endpoint) + if err != nil { + return fmt.Errorf("failed to start json-rpc endpoint: %s", err) } // Monitor for shutdown. diff --git a/node/rpc.go b/node/rpc.go index 3b31b3e6c..d5df7da1e 100644 --- a/node/rpc.go +++ b/node/rpc.go @@ -9,6 +9,7 @@ import ( "runtime" "strconv" + "github.com/gorilla/mux" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/multiformats/go-multiaddr" @@ -28,18 +29,44 @@ import ( var rpclog = logging.Logger("rpc") -// ServeRPC serves the full node API over the supplied listen multiaddr. -// -// It returns the stop function to be called to terminate the endpoint. +// ServeRPC serves an HTTP handler over the supplied listen multiaddr. // // This function spawns a goroutine to run the server, and returns immediately. -func ServeRPC(a v1api.FullNode, addr multiaddr.Multiaddr, maxRequestSize int64) (StopFunc, error) { - serverOptions := make([]jsonrpc.ServerOption, 0) - if maxRequestSize != 0 { // config set - serverOptions = append(serverOptions, jsonrpc.WithMaxRequestSize(maxRequestSize)) +// It returns the stop function to be called to terminate the endpoint. +// +// The supplied ID is used in tracing, by inserting a tag in the context. +func ServeRPC(h http.Handler, id string, addr multiaddr.Multiaddr) (StopFunc, error) { + // Start listening to the addr; if invalid or occupied, we will fail early. + lst, err := manet.Listen(addr) + if err != nil { + return nil, xerrors.Errorf("could not listen: %w", err) } + + // Instantiate the server and start listening. + srv := &http.Server{ + Handler: h, + BaseContext: func(listener net.Listener) context.Context { + ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, id)) + return ctx + }, + } + + go func() { + err = srv.Serve(manet.NetListener(lst)) + if err != http.ErrServerClosed { + rpclog.Warnf("rpc server failed: %s", err) + } + }() + + return srv.Shutdown, err +} + +// FullNodeHandler returns a full node handler, to be mounted as-is on the server. +func FullNodeHandler(a v1api.FullNode, opts ...jsonrpc.ServerOption) (http.Handler, error) { + m := mux.NewRouter() + serveRpc := func(path string, hnd interface{}) { - rpcServer := jsonrpc.NewServer(serverOptions...) + rpcServer := jsonrpc.NewServer(opts...) rpcServer.Register("Filecoin", hnd) ah := &auth.Handler{ @@ -47,7 +74,7 @@ func ServeRPC(a v1api.FullNode, addr multiaddr.Multiaddr, maxRequestSize int64) Next: rpcServer.ServeHTTP, } - http.Handle(path, ah) + m.Handle(path, ah) } pma := api.PermissionedFullAPI(metrics.MetricedFullAPI(a)) @@ -60,37 +87,39 @@ func ServeRPC(a v1api.FullNode, addr multiaddr.Multiaddr, maxRequestSize int64) Next: handleImport(a.(*impl.FullNodeAPI)), } - http.Handle("/rest/v0/import", importAH) + m.Handle("/rest/v0/import", importAH) - http.Handle("/debug/metrics", metrics.Exporter()) - http.Handle("/debug/pprof-set/block", handleFractionOpt("BlockProfileRate", runtime.SetBlockProfileRate)) - http.Handle("/debug/pprof-set/mutex", handleFractionOpt("MutexProfileFraction", func(x int) { + // debugging + m.Handle("/debug/metrics", metrics.Exporter()) + m.Handle("/debug/pprof-set/block", handleFractionOpt("BlockProfileRate", runtime.SetBlockProfileRate)) + m.Handle("/debug/pprof-set/mutex", handleFractionOpt("MutexProfileFraction", func(x int) { runtime.SetMutexProfileFraction(x) })) + m.PathPrefix("/").Handler(http.DefaultServeMux) // pprof - // Start listening to the addr; if invalid or occupied, we will fail early. - lst, err := manet.Listen(addr) - if err != nil { - return nil, xerrors.Errorf("could not listen: %w", err) + return m, nil +} + +// MinerHandler returns a miner handler, to be mounted as-is on the server. +func MinerHandler(a api.StorageMiner) (http.Handler, error) { + m := mux.NewRouter() + + rpcServer := jsonrpc.NewServer() + rpcServer.Register("Filecoin", api.PermissionedStorMinerAPI(metrics.MetricedStorMinerAPI(a))) + + m.Handle("/rpc/v0", rpcServer) + m.PathPrefix("/remote").HandlerFunc(a.(*impl.StorageMinerAPI).ServeRemote) + + // debugging + m.Handle("/debug/metrics", metrics.Exporter()) + m.PathPrefix("/").Handler(http.DefaultServeMux) // pprof + + ah := &auth.Handler{ + Verify: a.AuthVerify, + Next: m.ServeHTTP, } - // Instantiate the server and start listening. - srv := &http.Server{ - Handler: http.DefaultServeMux, - BaseContext: func(listener net.Listener) context.Context { - ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-daemon")) - return ctx - }, - } - - go func() { - err = srv.Serve(manet.NetListener(lst)) - if err != http.ErrServerClosed { - log.Warnf("rpc server failed: %s", err) - } - }() - - return srv.Shutdown, err + return ah, nil } func handleImport(a *impl.FullNodeAPI) func(w http.ResponseWriter, r *http.Request) { @@ -114,7 +143,7 @@ func handleImport(a *impl.FullNodeAPI) func(w http.ResponseWriter, r *http.Reque w.WriteHeader(200) err = json.NewEncoder(w).Encode(struct{ Cid cid.Cid }{c}) if err != nil { - log.Errorf("/rest/v0/import: Writing response failed: %+v", err) + rpclog.Errorf("/rest/v0/import: Writing response failed: %+v", err) return } } @@ -142,7 +171,7 @@ func handleFractionOpt(name string, setter func(int)) http.HandlerFunc { http.Error(rw, err.Error(), http.StatusBadRequest) return } - log.Infof("setting %s to %d", name, fr) + rpclog.Infof("setting %s to %d", name, fr) setter(fr) } }