lotus/cmd/lotus/daemon.go
Raúl Kripalani 656ece06e5 fix metrics wiring.
Some components like go-ds-measure, go-ipfs-blockstore and go-bitswap
expose metrics via ipfs/go-metrics-interface, but Lotus never injects
the Prometheus exporter (ipfs/go-metrics-prometheus). Therefore, those
metrics never surface in instrumentation.

Instead, Lotus uses OpenCensus directly.

This commit injects the Prometheus exporter for go-metrics-interface, and
instructs the OpenCensus Prometheus exporter to use the DefaultRegistry.

This has the effect of exposing blending the metrics of both metrics
libraries.

With this patch, the datastore, cache utilisation, and bitswap metrics
are now exported via the /debug/metrics endpoint.

This commit also fixes an issue where the metrics scope was empty, making
go-metrics-interface default to "<no-scope>". Angle brackets are inadmissible
characters for Prometheus, so it was refusing to export the affected metrics.
(These were the ARC cache metrics.)
2020-11-02 17:56:49 +00:00

476 lines
11 KiB
Go

// +build !nodaemon
package main
import (
"bufio"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"runtime/pprof"
"strings"
paramfetch "github.com/filecoin-project/go-paramfetch"
metricsprom "github.com/ipfs/go-metrics-prometheus"
"github.com/mitchellh/go-homedir"
"github.com/multiformats/go-multiaddr"
"github.com/urfave/cli/v2"
"go.opencensus.io/plugin/runmetrics"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"golang.org/x/xerrors"
"gopkg.in/cheggaaa/pb.v1"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/lib/peermgr"
"github.com/filecoin-project/lotus/lib/ulimit"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/testing"
"github.com/filecoin-project/lotus/node/repo"
)
const (
makeGenFlag = "lotus-make-genesis"
preTemplateFlag = "genesis-template"
)
var daemonStopCmd = &cli.Command{
Name: "stop",
Usage: "Stop a running lotus daemon",
Flags: []cli.Flag{},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetAPI(cctx)
if err != nil {
return err
}
defer closer()
err = api.Shutdown(lcli.ReqContext(cctx))
if err != nil {
return err
}
return nil
},
}
// DaemonCmd is the `go-lotus daemon` command
var DaemonCmd = &cli.Command{
Name: "daemon",
Usage: "Start a lotus daemon process",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "api",
Value: "1234",
},
&cli.StringFlag{
Name: makeGenFlag,
Value: "",
Hidden: true,
},
&cli.StringFlag{
Name: preTemplateFlag,
Hidden: true,
},
&cli.StringFlag{
Name: "import-key",
Usage: "on first run, import a default key from a given file",
Hidden: true,
},
&cli.StringFlag{
Name: "genesis",
Usage: "genesis file to use for first node run",
},
&cli.BoolFlag{
Name: "bootstrap",
Value: true,
},
&cli.StringFlag{
Name: "import-chain",
Usage: "on first run, load chain from given file or url and validate",
},
&cli.StringFlag{
Name: "import-snapshot",
Usage: "import chain state from a given chain export file or url",
},
&cli.BoolFlag{
Name: "halt-after-import",
Usage: "halt the process after importing chain from file",
},
&cli.BoolFlag{
Name: "lite",
Usage: "start lotus in lite mode",
Hidden: true,
},
&cli.StringFlag{
Name: "pprof",
Usage: "specify name of file for writing cpu profile to",
},
&cli.StringFlag{
Name: "profile",
Usage: "specify type of node",
},
&cli.BoolFlag{
Name: "manage-fdlimit",
Usage: "manage open file limit",
Value: true,
},
&cli.StringFlag{
Name: "config",
Usage: "specify path of config file to use",
},
},
Action: func(cctx *cli.Context) error {
isLite := cctx.Bool("lite")
err := runmetrics.Enable(runmetrics.RunMetricOptions{
EnableCPU: true,
EnableMemory: true,
})
if err != nil {
return xerrors.Errorf("enabling runtime metrics: %w", err)
}
if cctx.Bool("manage-fdlimit") {
if _, _, err := ulimit.ManageFdLimit(); err != nil {
log.Errorf("setting file descriptor limit: %s", err)
}
}
if prof := cctx.String("pprof"); prof != "" {
profile, err := os.Create(prof)
if err != nil {
return err
}
if err := pprof.StartCPUProfile(profile); err != nil {
return err
}
defer pprof.StopCPUProfile()
}
var isBootstrapper dtypes.Bootstrapper
switch profile := cctx.String("profile"); profile {
case "bootstrapper":
isBootstrapper = true
case "":
// do nothing
default:
return fmt.Errorf("unrecognized profile type: %q", profile)
}
ctx, _ := tag.New(context.Background(), tag.Insert(metrics.Version, build.BuildVersion), tag.Insert(metrics.Commit, build.CurrentCommit))
{
dir, err := homedir.Expand(cctx.String("repo"))
if err != nil {
log.Warnw("could not expand repo location", "error", err)
} else {
log.Infof("lotus repo: %s", dir)
}
}
r, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return xerrors.Errorf("opening fs repo: %w", err)
}
if cctx.String("config") != "" {
r.SetConfigPath(cctx.String("config"))
}
if err := r.Init(repo.FullNode); err != nil && err != repo.ErrRepoExists {
return xerrors.Errorf("repo init error: %w", err)
}
if !isLite {
if err := paramfetch.GetParams(lcli.ReqContext(cctx), build.ParametersJSON(), 0); err != nil {
return xerrors.Errorf("fetching proof parameters: %w", err)
}
}
var genBytes []byte
if cctx.String("genesis") != "" {
genBytes, err = ioutil.ReadFile(cctx.String("genesis"))
if err != nil {
return xerrors.Errorf("reading genesis: %w", err)
}
} else {
genBytes = build.MaybeGenesis()
}
chainfile := cctx.String("import-chain")
snapshot := cctx.String("import-snapshot")
if chainfile != "" || snapshot != "" {
if chainfile != "" && snapshot != "" {
return fmt.Errorf("cannot specify both 'import-snapshot' and 'import-chain'")
}
var issnapshot bool
if chainfile == "" {
chainfile = snapshot
issnapshot = true
}
if err := ImportChain(r, chainfile, issnapshot); err != nil {
return err
}
if cctx.Bool("halt-after-import") {
fmt.Println("Chain import complete, halting as requested...")
return nil
}
}
genesis := node.Options()
if len(genBytes) > 0 {
genesis = node.Override(new(modules.Genesis), modules.LoadGenesis(genBytes))
}
if cctx.String(makeGenFlag) != "" {
if cctx.String(preTemplateFlag) == "" {
return xerrors.Errorf("must also pass file with genesis template to `--%s`", preTemplateFlag)
}
genesis = node.Override(new(modules.Genesis), testing.MakeGenesis(cctx.String(makeGenFlag), cctx.String(preTemplateFlag)))
}
shutdownChan := make(chan struct{})
// If the daemon is started in "lite mode", provide a GatewayAPI
// for RPC calls
liteModeDeps := node.Options()
if isLite {
gapi, closer, err := lcli.GetGatewayAPI(cctx)
if err != nil {
return err
}
defer closer()
liteModeDeps = node.Override(new(api.GatewayAPI), gapi)
}
// some libraries like ipfs/go-ds-measure and ipfs/go-ipfs-blockstore
// use ipfs/go-metrics-interface. This injects a Prometheus exporter
// for those. Metrics are exported to the default registry.
if err := metricsprom.Inject(); err != nil {
log.Warnf("unable to inject prometheus ipfs/go-metrics exporter; some metrics will be unavailable; err: %s", err)
}
var api api.FullNode
stop, err := node.New(ctx,
node.FullAPI(&api, node.Lite(isLite)),
node.Override(new(dtypes.Bootstrapper), isBootstrapper),
node.Override(new(dtypes.ShutdownChan), shutdownChan),
node.Online(),
node.Repo(r),
genesis,
liteModeDeps,
node.ApplyIf(func(s *node.Settings) bool { return cctx.IsSet("api") },
node.Override(node.SetApiEndpointKey, func(lr repo.LockedRepo) error {
apima, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/" +
cctx.String("api"))
if err != nil {
return err
}
return lr.SetAPIEndpoint(apima)
})),
node.ApplyIf(func(s *node.Settings) bool { return !cctx.Bool("bootstrap") },
node.Unset(node.RunPeerMgrKey),
node.Unset(new(*peermgr.PeerMgr)),
),
)
if err != nil {
return xerrors.Errorf("initializing node: %w", err)
}
if cctx.String("import-key") != "" {
if err := importKey(ctx, api, cctx.String("import-key")); err != nil {
log.Errorf("importing key failed: %+v", err)
}
}
// Register all metric views
if err = view.Register(
metrics.DefaultViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}
// Set the metric to one so it is published to the exporter
stats.Record(ctx, metrics.LotusInfo.M(1))
endpoint, err := r.APIEndpoint()
if err != nil {
return xerrors.Errorf("getting api endpoint: %w", err)
}
// TODO: properly parse api endpoint (or make it a URL)
return serveRPC(api, stop, endpoint, shutdownChan)
},
Subcommands: []*cli.Command{
daemonStopCmd,
},
}
func importKey(ctx context.Context, api api.FullNode, f string) error {
f, err := homedir.Expand(f)
if err != nil {
return err
}
hexdata, err := ioutil.ReadFile(f)
if err != nil {
return err
}
data, err := hex.DecodeString(strings.TrimSpace(string(hexdata)))
if err != nil {
return err
}
var ki types.KeyInfo
if err := json.Unmarshal(data, &ki); err != nil {
return err
}
addr, err := api.WalletImport(ctx, &ki)
if err != nil {
return err
}
if err := api.WalletSetDefault(ctx, addr); err != nil {
return err
}
log.Info("successfully imported key for %s", addr)
return nil
}
func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
var rd io.Reader
var l int64
if strings.HasPrefix(fname, "http://") || strings.HasPrefix(fname, "https://") {
resp, err := http.Get(fname) //nolint:gosec
if err != nil {
return err
}
defer resp.Body.Close() //nolint:errcheck
if resp.StatusCode != http.StatusOK {
return xerrors.Errorf("non-200 response: %d", resp.StatusCode)
}
rd = resp.Body
l = resp.ContentLength
} else {
fname, err = homedir.Expand(fname)
if err != nil {
return err
}
fi, err := os.Open(fname)
if err != nil {
return err
}
defer fi.Close() //nolint:errcheck
st, err := os.Stat(fname)
if err != nil {
return err
}
rd = fi
l = st.Size()
}
lr, err := r.Lock(repo.FullNode)
if err != nil {
return err
}
defer lr.Close() //nolint:errcheck
ds, err := lr.Datastore("/chain")
if err != nil {
return err
}
mds, err := lr.Datastore("/metadata")
if err != nil {
return err
}
bs := blockstore.NewBlockstore(ds)
j, err := journal.OpenFSJournal(lr, journal.EnvDisabledEvents())
if err != nil {
return xerrors.Errorf("failed to open journal: %w", err)
}
cst := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), j)
log.Infof("importing chain from %s...", fname)
bufr := bufio.NewReaderSize(rd, 1<<20)
bar := pb.New64(l)
br := bar.NewProxyReader(bufr)
bar.ShowTimeLeft = true
bar.ShowPercent = true
bar.ShowSpeed = true
bar.Units = pb.U_BYTES
bar.Start()
ts, err := cst.Import(br)
bar.Finish()
if err != nil {
return xerrors.Errorf("importing chain failed: %w", err)
}
if err := cst.FlushValidationCache(); err != nil {
return xerrors.Errorf("flushing validation cache failed: %w", err)
}
gb, err := cst.GetTipsetByHeight(context.TODO(), 0, ts, true)
if err != nil {
return err
}
err = cst.SetGenesis(gb.Blocks()[0])
if err != nil {
return err
}
stm := stmgr.NewStateManager(cst)
if !snapshot {
log.Infof("validating imported chain...")
if err := stm.ValidateChain(context.TODO(), ts); err != nil {
return xerrors.Errorf("chain validation failed: %w", err)
}
}
log.Infof("accepting %s as new head", ts.Cids())
if err := cst.SetHead(ts); err != nil {
return err
}
return nil
}