2020-08-31 23:37:18 +00:00
package main
import (
"fmt"
2020-09-01 21:09:14 +00:00
"net/http"
2020-09-01 19:40:42 +00:00
"sort"
2020-08-31 23:37:18 +00:00
"time"
2020-09-01 21:09:14 +00:00
"contrib.go.opencensus.io/exporter/prometheus"
2020-08-31 23:37:18 +00:00
"github.com/ipfs/go-cid"
2021-04-06 13:04:32 +00:00
logging "github.com/ipfs/go-log/v2"
2020-08-31 23:37:18 +00:00
"github.com/urfave/cli/v2"
2020-09-01 21:09:14 +00:00
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
2022-11-28 18:05:44 +00:00
"golang.org/x/xerrors"
2020-08-31 23:37:18 +00:00
2020-09-10 00:18:55 +00:00
"github.com/filecoin-project/go-address"
2022-06-14 15:00:51 +00:00
"github.com/filecoin-project/go-state-types/builtin"
2020-08-31 23:37:18 +00:00
lapi "github.com/filecoin-project/lotus/api"
2022-06-14 15:00:51 +00:00
lbuiltin "github.com/filecoin-project/lotus/chain/actors/builtin"
2020-08-31 23:37:18 +00:00
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
)
2020-09-01 21:09:14 +00:00
var (
2020-09-01 22:11:02 +00:00
MpoolAge = stats . Float64 ( "mpoolage" , "Age of messages in the mempool" , stats . UnitSeconds )
MpoolSize = stats . Int64 ( "mpoolsize" , "Number of messages in mempool" , stats . UnitDimensionless )
MpoolInboundRate = stats . Int64 ( "inbound" , "Counter for inbound messages" , stats . UnitDimensionless )
BlockInclusionRate = stats . Int64 ( "inclusion" , "Counter for message included in blocks" , stats . UnitDimensionless )
MsgWaitTime = stats . Float64 ( "msg-wait-time" , "Wait time of messages to make it into a block" , stats . UnitSeconds )
2020-09-01 21:09:14 +00:00
)
var (
2020-09-01 22:46:55 +00:00
LeTag , _ = tag . NewKey ( "quantile" )
2020-09-10 00:18:55 +00:00
MTTag , _ = tag . NewKey ( "msg_type" )
2020-09-01 21:09:14 +00:00
)
var (
AgeView = & view . View {
Name : "mpool-age" ,
Measure : MpoolAge ,
2020-09-10 00:18:55 +00:00
TagKeys : [ ] tag . Key { LeTag , MTTag } ,
2020-09-01 21:09:14 +00:00
Aggregation : view . LastValue ( ) ,
}
SizeView = & view . View {
Name : "mpool-size" ,
Measure : MpoolSize ,
2020-09-10 00:18:55 +00:00
TagKeys : [ ] tag . Key { MTTag } ,
2020-09-01 21:09:14 +00:00
Aggregation : view . LastValue ( ) ,
}
InboundRate = & view . View {
Name : "msg-inbound" ,
Measure : MpoolInboundRate ,
2020-09-10 00:18:55 +00:00
TagKeys : [ ] tag . Key { MTTag } ,
2020-09-01 21:09:14 +00:00
Aggregation : view . Count ( ) ,
}
2020-09-01 22:11:02 +00:00
InclusionRate = & view . View {
Name : "msg-inclusion" ,
Measure : BlockInclusionRate ,
2020-09-10 00:18:55 +00:00
TagKeys : [ ] tag . Key { MTTag } ,
2020-09-01 22:11:02 +00:00
Aggregation : view . Count ( ) ,
}
MsgWait = & view . View {
Name : "msg-wait" ,
Measure : MsgWaitTime ,
2020-09-10 00:18:55 +00:00
TagKeys : [ ] tag . Key { MTTag } ,
2020-09-01 22:11:02 +00:00
Aggregation : view . Distribution ( 10 , 30 , 60 , 120 , 240 , 600 , 1800 , 3600 ) ,
}
2020-09-01 21:09:14 +00:00
)
2020-08-31 23:37:18 +00:00
type msgInfo struct {
msg * types . SignedMessage
seen time . Time
}
var mpoolStatsCmd = & cli . Command {
Name : "mpool-stats" ,
2022-11-25 22:32:42 +00:00
Flags : [ ] cli . Flag {
& cli . StringFlag {
Name : "http-server-timeout" ,
2022-11-29 14:38:44 +00:00
Value : "30s" ,
2022-11-25 22:32:42 +00:00
} ,
} ,
2020-08-31 23:37:18 +00:00
Action : func ( cctx * cli . Context ) error {
2022-11-25 22:32:42 +00:00
_ = logging . SetLogLevel ( "rpc" , "ERROR" )
2020-08-31 23:37:18 +00:00
2020-09-01 22:11:02 +00:00
if err := view . Register ( AgeView , SizeView , InboundRate , InclusionRate , MsgWait ) ; err != nil {
2020-09-01 21:09:14 +00:00
return err
}
expo , err := prometheus . NewExporter ( prometheus . Options {
Namespace : "lotusmpool" ,
} )
if err != nil {
return err
}
http . Handle ( "/debug/metrics" , expo )
2022-11-25 22:32:42 +00:00
timeout , err := time . ParseDuration ( cctx . String ( "http-server-timeout" ) )
if err != nil {
return xerrors . Errorf ( "invalid time string %s: %x" , cctx . String ( "http-server-timeout" ) , err )
}
2020-09-01 21:09:14 +00:00
go func ( ) {
2022-11-25 21:19:20 +00:00
server := & http . Server {
Addr : ":10555" ,
2022-11-25 22:32:42 +00:00
ReadHeaderTimeout : timeout ,
2022-11-25 21:19:20 +00:00
}
if err := server . ListenAndServe ( ) ; err != nil {
2020-09-01 21:09:14 +00:00
panic ( err )
}
} ( )
2020-08-31 23:37:18 +00:00
api , closer , err := lcli . GetFullNodeAPI ( cctx )
if err != nil {
return err
}
defer closer ( )
ctx := lcli . ReqContext ( cctx )
updates , err := api . MpoolSub ( ctx )
if err != nil {
return err
}
2020-09-10 00:18:55 +00:00
mcache := make ( map [ address . Address ] bool )
isMiner := func ( addr address . Address ) ( bool , error ) {
cache , ok := mcache [ addr ]
if ok {
return cache , nil
}
act , err := api . StateGetActor ( ctx , addr , types . EmptyTSK )
if err != nil {
return false , err
}
2022-04-20 21:34:28 +00:00
ism := lbuiltin . IsStorageMinerActor ( act . Code )
2020-09-10 00:18:55 +00:00
mcache [ addr ] = ism
return ism , nil
}
wpostTracker := make ( map [ cid . Cid ] * msgInfo )
2020-08-31 23:37:18 +00:00
tracker := make ( map [ cid . Cid ] * msgInfo )
tick := time . Tick ( time . Second )
for {
select {
2020-09-02 22:57:42 +00:00
case u , ok := <- updates :
if ! ok {
return fmt . Errorf ( "connection with lotus node broke" )
}
2020-08-31 23:37:18 +00:00
switch u . Type {
case lapi . MpoolAdd :
2020-09-10 00:18:55 +00:00
stats . Record ( ctx , MpoolInboundRate . M ( 1 ) )
2020-08-31 23:37:18 +00:00
tracker [ u . Message . Cid ( ) ] = & msgInfo {
msg : u . Message ,
seen : time . Now ( ) ,
}
2020-09-10 00:18:55 +00:00
2022-04-20 21:34:28 +00:00
if u . Message . Message . Method == builtin . MethodsMiner . SubmitWindowedPoSt {
2020-09-10 00:18:55 +00:00
miner , err := isMiner ( u . Message . Message . To )
if err != nil {
log . Warnf ( "failed to determine if message target was to a miner: %s" , err )
continue
}
if miner {
wpostTracker [ u . Message . Cid ( ) ] = & msgInfo {
msg : u . Message ,
seen : time . Now ( ) ,
}
2020-09-10 00:37:49 +00:00
_ = stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( MTTag , "wpost" ) } , MpoolInboundRate . M ( 1 ) )
2020-09-10 00:18:55 +00:00
}
}
2020-08-31 23:37:18 +00:00
case lapi . MpoolRemove :
mi , ok := tracker [ u . Message . Cid ( ) ]
2020-09-10 00:18:55 +00:00
if ok {
fmt . Printf ( "%s was in the mempool for %s (feecap=%s, prem=%s)\n" , u . Message . Cid ( ) , time . Since ( mi . seen ) , u . Message . Message . GasFeeCap , u . Message . Message . GasPremium )
stats . Record ( ctx , BlockInclusionRate . M ( 1 ) )
stats . Record ( ctx , MsgWaitTime . M ( time . Since ( mi . seen ) . Seconds ( ) ) )
delete ( tracker , u . Message . Cid ( ) )
}
wm , ok := wpostTracker [ u . Message . Cid ( ) ]
if ok {
2020-09-10 00:37:49 +00:00
_ = stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( MTTag , "wpost" ) } , BlockInclusionRate . M ( 1 ) )
_ = stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( MTTag , "wpost" ) } , MsgWaitTime . M ( time . Since ( wm . seen ) . Seconds ( ) ) )
2020-09-10 00:18:55 +00:00
delete ( wpostTracker , u . Message . Cid ( ) )
2020-08-31 23:37:18 +00:00
}
default :
return fmt . Errorf ( "unrecognized mpool update state: %d" , u . Type )
}
case <- tick :
2020-09-01 19:40:42 +00:00
var ages [ ] time . Duration
2020-09-10 00:18:55 +00:00
if len ( tracker ) > 0 {
for _ , v := range tracker {
age := time . Since ( v . seen )
ages = append ( ages , age )
}
st := ageStats ( ages )
2020-09-10 00:37:49 +00:00
_ = stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( LeTag , "40" ) } , MpoolAge . M ( st . Perc40 . Seconds ( ) ) )
_ = stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( LeTag , "50" ) } , MpoolAge . M ( st . Perc50 . Seconds ( ) ) )
_ = stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( LeTag , "60" ) } , MpoolAge . M ( st . Perc60 . Seconds ( ) ) )
_ = stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( LeTag , "70" ) } , MpoolAge . M ( st . Perc70 . Seconds ( ) ) )
_ = stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( LeTag , "80" ) } , MpoolAge . M ( st . Perc80 . Seconds ( ) ) )
_ = stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( LeTag , "90" ) } , MpoolAge . M ( st . Perc90 . Seconds ( ) ) )
_ = stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( LeTag , "95" ) } , MpoolAge . M ( st . Perc95 . Seconds ( ) ) )
2020-09-10 00:18:55 +00:00
stats . Record ( ctx , MpoolSize . M ( int64 ( len ( tracker ) ) ) )
fmt . Printf ( "%d messages in mempool for average of %s, (%s / %s / %s)\n" , st . Count , st . Average , st . Perc50 , st . Perc80 , st . Perc95 )
2020-08-31 23:37:18 +00:00
}
2020-09-01 19:40:42 +00:00
2020-09-10 00:18:55 +00:00
var wpages [ ] time . Duration
if len ( wpostTracker ) > 0 {
for _ , v := range wpostTracker {
age := time . Since ( v . seen )
wpages = append ( wpages , age )
}
st := ageStats ( wpages )
2020-09-10 00:37:49 +00:00
_ = stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( LeTag , "40" ) , tag . Upsert ( MTTag , "wpost" ) } , MpoolAge . M ( st . Perc40 . Seconds ( ) ) )
_ = stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( LeTag , "50" ) , tag . Upsert ( MTTag , "wpost" ) } , MpoolAge . M ( st . Perc50 . Seconds ( ) ) )
_ = stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( LeTag , "60" ) , tag . Upsert ( MTTag , "wpost" ) } , MpoolAge . M ( st . Perc60 . Seconds ( ) ) )
_ = stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( LeTag , "70" ) , tag . Upsert ( MTTag , "wpost" ) } , MpoolAge . M ( st . Perc70 . Seconds ( ) ) )
_ = stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( LeTag , "80" ) , tag . Upsert ( MTTag , "wpost" ) } , MpoolAge . M ( st . Perc80 . Seconds ( ) ) )
_ = stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( LeTag , "90" ) , tag . Upsert ( MTTag , "wpost" ) } , MpoolAge . M ( st . Perc90 . Seconds ( ) ) )
_ = stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( LeTag , "95" ) , tag . Upsert ( MTTag , "wpost" ) } , MpoolAge . M ( st . Perc95 . Seconds ( ) ) )
_ = stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( MTTag , "wpost" ) } , MpoolSize . M ( int64 ( len ( wpostTracker ) ) ) )
2020-09-10 00:18:55 +00:00
fmt . Printf ( "%d wpost messages in mempool for average of %s, (%s / %s / %s)\n" , st . Count , st . Average , st . Perc50 , st . Perc80 , st . Perc95 )
}
2020-08-31 23:37:18 +00:00
}
}
} ,
}
2020-09-01 19:40:42 +00:00
type ageStat struct {
Average time . Duration
Max time . Duration
Perc40 time . Duration
Perc50 time . Duration
Perc60 time . Duration
Perc70 time . Duration
Perc80 time . Duration
Perc90 time . Duration
Perc95 time . Duration
Count int
}
func ageStats ( ages [ ] time . Duration ) * ageStat {
sort . Slice ( ages , func ( i , j int ) bool {
return ages [ i ] < ages [ j ]
} )
st := ageStat {
Count : len ( ages ) ,
}
var sum time . Duration
for _ , a := range ages {
sum += a
if a > st . Max {
st . Max = a
}
}
st . Average = sum / time . Duration ( len ( ages ) )
p40 := ( 4 * len ( ages ) ) / 10
p50 := len ( ages ) / 2
p60 := ( 6 * len ( ages ) ) / 10
p70 := ( 7 * len ( ages ) ) / 10
p80 := ( 4 * len ( ages ) ) / 5
p90 := ( 9 * len ( ages ) ) / 10
p95 := ( 19 * len ( ages ) ) / 20
st . Perc40 = ages [ p40 ]
st . Perc50 = ages [ p50 ]
st . Perc60 = ages [ p60 ]
st . Perc70 = ages [ p70 ]
st . Perc80 = ages [ p80 ]
st . Perc90 = ages [ p90 ]
st . Perc95 = ages [ p95 ]
return & st
}