add some bits about windowed post

This commit is contained in:
whyrusleeping 2020-09-09 17:18:55 -07:00
parent 09194aa613
commit 0361ca1c39

View File

@ -14,9 +14,11 @@ import (
"go.opencensus.io/stats/view" "go.opencensus.io/stats/view"
"go.opencensus.io/tag" "go.opencensus.io/tag"
"github.com/filecoin-project/go-address"
lapi "github.com/filecoin-project/lotus/api" lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli" lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/specs-actors/actors/builtin"
) )
var ( var (
@ -29,33 +31,38 @@ var (
var ( var (
LeTag, _ = tag.NewKey("quantile") LeTag, _ = tag.NewKey("quantile")
MTTag, _ = tag.NewKey("msg_type")
) )
var ( var (
AgeView = &view.View{ AgeView = &view.View{
Name: "mpool-age", Name: "mpool-age",
Measure: MpoolAge, Measure: MpoolAge,
TagKeys: []tag.Key{LeTag}, TagKeys: []tag.Key{LeTag, MTTag},
Aggregation: view.LastValue(), Aggregation: view.LastValue(),
} }
SizeView = &view.View{ SizeView = &view.View{
Name: "mpool-size", Name: "mpool-size",
Measure: MpoolSize, Measure: MpoolSize,
TagKeys: []tag.Key{MTTag},
Aggregation: view.LastValue(), Aggregation: view.LastValue(),
} }
InboundRate = &view.View{ InboundRate = &view.View{
Name: "msg-inbound", Name: "msg-inbound",
Measure: MpoolInboundRate, Measure: MpoolInboundRate,
TagKeys: []tag.Key{MTTag},
Aggregation: view.Count(), Aggregation: view.Count(),
} }
InclusionRate = &view.View{ InclusionRate = &view.View{
Name: "msg-inclusion", Name: "msg-inclusion",
Measure: BlockInclusionRate, Measure: BlockInclusionRate,
TagKeys: []tag.Key{MTTag},
Aggregation: view.Count(), Aggregation: view.Count(),
} }
MsgWait = &view.View{ MsgWait = &view.View{
Name: "msg-wait", Name: "msg-wait",
Measure: MsgWaitTime, Measure: MsgWaitTime,
TagKeys: []tag.Key{MTTag},
Aggregation: view.Distribution(10, 30, 60, 120, 240, 600, 1800, 3600), Aggregation: view.Distribution(10, 30, 60, 120, 240, 600, 1800, 3600),
} }
) )
@ -102,6 +109,24 @@ var mpoolStatsCmd = &cli.Command{
return err return err
} }
mcache := make(map[address.Address]bool)
isMiner := func(addr address.Address) (bool, error) {
cache, ok := mcache[addr]
if ok {
return cache, nil
}
act, err := api.StateGetActor(ctx, addr, types.EmptyTSK)
if err != nil {
return false, err
}
ism := act.Code == builtin.StorageMinerActorCodeID
mcache[addr] = ism
return ism, nil
}
wpostTracker := make(map[cid.Cid]*msgInfo)
tracker := make(map[cid.Cid]*msgInfo) tracker := make(map[cid.Cid]*msgInfo)
tick := time.Tick(time.Second) tick := time.Tick(time.Second)
for { for {
@ -112,44 +137,87 @@ var mpoolStatsCmd = &cli.Command{
} }
switch u.Type { switch u.Type {
case lapi.MpoolAdd: case lapi.MpoolAdd:
stats.Record(ctx, MpoolInboundRate.M(1))
tracker[u.Message.Cid()] = &msgInfo{ tracker[u.Message.Cid()] = &msgInfo{
msg: u.Message, msg: u.Message,
seen: time.Now(), seen: time.Now(),
} }
stats.Record(ctx, MpoolInboundRate.M(1))
if u.Message.Message.Method == builtin.MethodsMiner.SubmitWindowedPoSt {
miner, err := isMiner(u.Message.Message.To)
if err != nil {
log.Warnf("failed to determine if message target was to a miner: %s", err)
continue
}
if miner {
wpostTracker[u.Message.Cid()] = &msgInfo{
msg: u.Message,
seen: time.Now(),
}
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(MTTag, "wpost")}, MpoolInboundRate.M(1))
}
}
case lapi.MpoolRemove: case lapi.MpoolRemove:
mi, ok := tracker[u.Message.Cid()] mi, ok := tracker[u.Message.Cid()]
if !ok { if ok {
continue fmt.Printf("%s was in the mempool for %s (feecap=%s, prem=%s)\n", u.Message.Cid(), time.Since(mi.seen), u.Message.Message.GasFeeCap, u.Message.Message.GasPremium)
stats.Record(ctx, BlockInclusionRate.M(1))
stats.Record(ctx, MsgWaitTime.M(time.Since(mi.seen).Seconds()))
delete(tracker, u.Message.Cid())
}
wm, ok := wpostTracker[u.Message.Cid()]
if ok {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(MTTag, "wpost")}, BlockInclusionRate.M(1))
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(MTTag, "wpost")}, MsgWaitTime.M(time.Since(wm.seen).Seconds()))
delete(wpostTracker, u.Message.Cid())
} }
fmt.Printf("%s was in the mempool for %s (feecap=%s, prem=%s)\n", u.Message.Cid(), time.Since(mi.seen), u.Message.Message.GasFeeCap, u.Message.Message.GasPremium)
stats.Record(ctx, BlockInclusionRate.M(1))
stats.Record(ctx, MsgWaitTime.M(time.Since(mi.seen).Seconds()))
delete(tracker, u.Message.Cid())
default: default:
return fmt.Errorf("unrecognized mpool update state: %d", u.Type) return fmt.Errorf("unrecognized mpool update state: %d", u.Type)
} }
case <-tick: case <-tick:
var ages []time.Duration var ages []time.Duration
if len(tracker) == 0 { if len(tracker) > 0 {
continue for _, v := range tracker {
} age := time.Since(v.seen)
for _, v := range tracker { ages = append(ages, age)
age := time.Since(v.seen) }
ages = append(ages, age)
st := ageStats(ages)
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "40")}, MpoolAge.M(st.Perc40.Seconds()))
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "50")}, MpoolAge.M(st.Perc50.Seconds()))
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "60")}, MpoolAge.M(st.Perc60.Seconds()))
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "70")}, MpoolAge.M(st.Perc70.Seconds()))
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "80")}, MpoolAge.M(st.Perc80.Seconds()))
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "90")}, MpoolAge.M(st.Perc90.Seconds()))
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "95")}, MpoolAge.M(st.Perc95.Seconds()))
stats.Record(ctx, MpoolSize.M(int64(len(tracker))))
fmt.Printf("%d messages in mempool for average of %s, (%s / %s / %s)\n", st.Count, st.Average, st.Perc50, st.Perc80, st.Perc95)
} }
st := ageStats(ages) var wpages []time.Duration
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "40")}, MpoolAge.M(st.Perc40.Seconds())) if len(wpostTracker) > 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "50")}, MpoolAge.M(st.Perc50.Seconds())) for _, v := range wpostTracker {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "60")}, MpoolAge.M(st.Perc60.Seconds())) age := time.Since(v.seen)
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "70")}, MpoolAge.M(st.Perc70.Seconds())) wpages = append(wpages, age)
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "80")}, MpoolAge.M(st.Perc80.Seconds())) }
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "90")}, MpoolAge.M(st.Perc90.Seconds()))
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "95")}, MpoolAge.M(st.Perc95.Seconds()))
stats.Record(ctx, MpoolSize.M(int64(len(tracker)))) st := ageStats(wpages)
fmt.Printf("%d messages in mempool for average of %s, (%s / %s / %s)\n", st.Count, st.Average, st.Perc50, st.Perc80, st.Perc95) stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "40"), tag.Upsert(MTTag, "wpost")}, MpoolAge.M(st.Perc40.Seconds()))
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "50"), tag.Upsert(MTTag, "wpost")}, MpoolAge.M(st.Perc50.Seconds()))
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "60"), tag.Upsert(MTTag, "wpost")}, MpoolAge.M(st.Perc60.Seconds()))
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "70"), tag.Upsert(MTTag, "wpost")}, MpoolAge.M(st.Perc70.Seconds()))
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "80"), tag.Upsert(MTTag, "wpost")}, MpoolAge.M(st.Perc80.Seconds()))
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "90"), tag.Upsert(MTTag, "wpost")}, MpoolAge.M(st.Perc90.Seconds()))
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(LeTag, "95"), tag.Upsert(MTTag, "wpost")}, MpoolAge.M(st.Perc95.Seconds()))
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(MTTag, "wpost")}, MpoolSize.M(int64(len(wpostTracker))))
fmt.Printf("%d wpost messages in mempool for average of %s, (%s / %s / %s)\n", st.Count, st.Average, st.Perc50, st.Perc80, st.Perc95)
}
} }
} }
return nil return nil