diff --git a/cmd/lotus-shed/mempool-stats.go b/cmd/lotus-shed/mempool-stats.go index 4c87b5b5a..f38dc6a38 100644 --- a/cmd/lotus-shed/mempool-stats.go +++ b/cmd/lotus-shed/mempool-stats.go @@ -14,9 +14,11 @@ import ( "go.opencensus.io/stats/view" "go.opencensus.io/tag" + "github.com/filecoin-project/go-address" lapi "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" lcli "github.com/filecoin-project/lotus/cli" + "github.com/filecoin-project/specs-actors/actors/builtin" ) var ( @@ -29,33 +31,38 @@ var ( var ( LeTag, _ = tag.NewKey("quantile") + MTTag, _ = tag.NewKey("msg_type") ) var ( AgeView = &view.View{ Name: "mpool-age", Measure: MpoolAge, - TagKeys: []tag.Key{LeTag}, + TagKeys: []tag.Key{LeTag, MTTag}, Aggregation: view.LastValue(), } SizeView = &view.View{ Name: "mpool-size", Measure: MpoolSize, + TagKeys: []tag.Key{MTTag}, Aggregation: view.LastValue(), } InboundRate = &view.View{ Name: "msg-inbound", Measure: MpoolInboundRate, + TagKeys: []tag.Key{MTTag}, Aggregation: view.Count(), } InclusionRate = &view.View{ Name: "msg-inclusion", Measure: BlockInclusionRate, + TagKeys: []tag.Key{MTTag}, Aggregation: view.Count(), } MsgWait = &view.View{ Name: "msg-wait", Measure: MsgWaitTime, + TagKeys: []tag.Key{MTTag}, Aggregation: view.Distribution(10, 30, 60, 120, 240, 600, 1800, 3600), } ) @@ -102,6 +109,24 @@ var mpoolStatsCmd = &cli.Command{ return err } + mcache := make(map[address.Address]bool) + isMiner := func(addr address.Address) (bool, error) { + cache, ok := mcache[addr] + if ok { + return cache, nil + } + + act, err := api.StateGetActor(ctx, addr, types.EmptyTSK) + if err != nil { + return false, err + } + + ism := act.Code == builtin.StorageMinerActorCodeID + mcache[addr] = ism + return ism, nil + } + + wpostTracker := make(map[cid.Cid]*msgInfo) tracker := make(map[cid.Cid]*msgInfo) tick := time.Tick(time.Second) for { @@ -112,44 +137,87 @@ var mpoolStatsCmd = &cli.Command{ } switch u.Type { case lapi.MpoolAdd: + stats.Record(ctx, MpoolInboundRate.M(1)) tracker[u.Message.Cid()] = &msgInfo{ msg: u.Message, seen: time.Now(), } - stats.Record(ctx, MpoolInboundRate.M(1)) + + if u.Message.Message.Method == builtin.MethodsMiner.SubmitWindowedPoSt { + + miner, err := isMiner(u.Message.Message.To) + if err != nil { + log.Warnf("failed to determine if message target was to a miner: %s", err) + continue + } + + if miner { + wpostTracker[u.Message.Cid()] = &msgInfo{ + msg: u.Message, + seen: time.Now(), + } + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(MTTag, "wpost")}, MpoolInboundRate.M(1)) + } + } + case lapi.MpoolRemove: mi, ok := tracker[u.Message.Cid()] - if !ok { - continue + if ok { + fmt.Printf("%s was in the mempool for %s (feecap=%s, prem=%s)\n", u.Message.Cid(), time.Since(mi.seen), u.Message.Message.GasFeeCap, u.Message.Message.GasPremium) + stats.Record(ctx, BlockInclusionRate.M(1)) + stats.Record(ctx, MsgWaitTime.M(time.Since(mi.seen).Seconds())) + delete(tracker, u.Message.Cid()) + } + + wm, ok := wpostTracker[u.Message.Cid()] + if ok { + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(MTTag, "wpost")}, BlockInclusionRate.M(1)) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(MTTag, "wpost")}, MsgWaitTime.M(time.Since(wm.seen).Seconds())) + delete(wpostTracker, u.Message.Cid()) } - fmt.Printf("%s was in the mempool for %s (feecap=%s, prem=%s)\n", u.Message.Cid(), time.Since(mi.seen), u.Message.Message.GasFeeCap, u.Message.Message.GasPremium) - stats.Record(ctx, BlockInclusionRate.M(1)) - stats.Record(ctx, MsgWaitTime.M(time.Since(mi.seen).Seconds())) - delete(tracker, u.Message.Cid()) default: return fmt.Errorf("unrecognized mpool update state: %d", u.Type) } case <-tick: var ages []time.Duration - if len(tracker) == 0 { - continue - } - for _, v := range tracker { - age := time.Since(v.seen) - ages = append(ages, age) + if len(tracker) > 0 { + for _, v := range tracker { + age := time.Since(v.seen) + ages = append(ages, age) + } + + st := ageStats(ages) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "40")}, MpoolAge.M(st.Perc40.Seconds())) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "50")}, MpoolAge.M(st.Perc50.Seconds())) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "60")}, MpoolAge.M(st.Perc60.Seconds())) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "70")}, MpoolAge.M(st.Perc70.Seconds())) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "80")}, MpoolAge.M(st.Perc80.Seconds())) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "90")}, MpoolAge.M(st.Perc90.Seconds())) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "95")}, MpoolAge.M(st.Perc95.Seconds())) + + stats.Record(ctx, MpoolSize.M(int64(len(tracker)))) + fmt.Printf("%d messages in mempool for average of %s, (%s / %s / %s)\n", st.Count, st.Average, st.Perc50, st.Perc80, st.Perc95) } - st := ageStats(ages) - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "40")}, MpoolAge.M(st.Perc40.Seconds())) - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "50")}, MpoolAge.M(st.Perc50.Seconds())) - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "60")}, MpoolAge.M(st.Perc60.Seconds())) - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "70")}, MpoolAge.M(st.Perc70.Seconds())) - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "80")}, MpoolAge.M(st.Perc80.Seconds())) - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "90")}, MpoolAge.M(st.Perc90.Seconds())) - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "95")}, MpoolAge.M(st.Perc95.Seconds())) + var wpages []time.Duration + if len(wpostTracker) > 0 { + for _, v := range wpostTracker { + age := time.Since(v.seen) + wpages = append(wpages, age) + } - stats.Record(ctx, MpoolSize.M(int64(len(tracker)))) - fmt.Printf("%d messages in mempool for average of %s, (%s / %s / %s)\n", st.Count, st.Average, st.Perc50, st.Perc80, st.Perc95) + st := ageStats(wpages) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "40"), tag.Upsert(MTTag, "wpost")}, MpoolAge.M(st.Perc40.Seconds())) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "50"), tag.Upsert(MTTag, "wpost")}, MpoolAge.M(st.Perc50.Seconds())) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "60"), tag.Upsert(MTTag, "wpost")}, MpoolAge.M(st.Perc60.Seconds())) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "70"), tag.Upsert(MTTag, "wpost")}, MpoolAge.M(st.Perc70.Seconds())) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "80"), tag.Upsert(MTTag, "wpost")}, MpoolAge.M(st.Perc80.Seconds())) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "90"), tag.Upsert(MTTag, "wpost")}, MpoolAge.M(st.Perc90.Seconds())) + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "95"), tag.Upsert(MTTag, "wpost")}, MpoolAge.M(st.Perc95.Seconds())) + + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(MTTag, "wpost")}, MpoolSize.M(int64(len(wpostTracker)))) + fmt.Printf("%d wpost messages in mempool for average of %s, (%s / %s / %s)\n", st.Count, st.Average, st.Perc50, st.Perc80, st.Perc95) + } } } return nil