diff --git a/cmd/lotus-shed/mempool-stats.go b/cmd/lotus-shed/mempool-stats.go index 320ba31a5..d7408b9cb 100644 --- a/cmd/lotus-shed/mempool-stats.go +++ b/cmd/lotus-shed/mempool-stats.go @@ -2,18 +2,52 @@ package main import ( "fmt" + "net/http" "sort" "time" + "contrib.go.opencensus.io/exporter/prometheus" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" "github.com/urfave/cli/v2" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" lapi "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" lcli "github.com/filecoin-project/lotus/cli" ) +var ( + MpoolAge = stats.Float64("mpoolage", "Age of messages in the mempool", stats.UnitSeconds) + MpoolSize = stats.Int64("mpoolsize", "Number of messages in mempool", stats.UnitDimensionless) + MpoolInboundRate = stats.Int64("inbound", "Counter for inbound messages", stats.UnitDimensionless) +) + +var ( + LeTag, _ = tag.NewKey("le") +) + +var ( + AgeView = &view.View{ + Name: "mpool-age", + Measure: MpoolAge, + TagKeys: []tag.Key{LeTag}, + Aggregation: view.LastValue(), + } + SizeView = &view.View{ + Name: "mpool-size", + Measure: MpoolSize, + Aggregation: view.LastValue(), + } + InboundRate = &view.View{ + Name: "msg-inbound", + Measure: MpoolInboundRate, + Aggregation: view.Count(), + } +) + type msgInfo struct { msg *types.SignedMessage seen time.Time @@ -24,6 +58,25 @@ var mpoolStatsCmd = &cli.Command{ Action: func(cctx *cli.Context) error { logging.SetLogLevel("rpc", "ERROR") + if err := view.Register(AgeView); err != nil { + return err + } + + expo, err := prometheus.NewExporter(prometheus.Options{ + Namespace: "lotusmpool", + }) + if err != nil { + return err + } + + http.Handle("/debug/metrics", expo) + + go func() { + if err := http.ListenAndServe(":10555", nil); err != nil { + panic(err) + } + }() + api, closer, err := lcli.GetFullNodeAPI(cctx) if err != nil { return err @@ -48,6 +101,7 @@ var mpoolStatsCmd = &cli.Command{ msg: u.Message, seen: time.Now(), } + stats.Record(ctx, MpoolInboundRate.M(1)) case lapi.MpoolRemove: mi, ok := tracker[u.Message.Cid()] if !ok { @@ -69,6 +123,15 @@ var mpoolStatsCmd = &cli.Command{ } st := ageStats(ages) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "40")}, MpoolAge.M(st.Perc40.Seconds())) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "50")}, MpoolAge.M(st.Perc50.Seconds())) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "60")}, MpoolAge.M(st.Perc60.Seconds())) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "70")}, MpoolAge.M(st.Perc70.Seconds())) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "80")}, MpoolAge.M(st.Perc80.Seconds())) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "90")}, MpoolAge.M(st.Perc90.Seconds())) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "95")}, MpoolAge.M(st.Perc95.Seconds())) + + stats.Record(ctx, MpoolSize.M(int64(len(tracker)))) fmt.Printf("%d messages in mempool for average of %s, (%s / %s / %s)\n", st.Count, st.Average, st.Perc50, st.Perc80, st.Perc95) } }