diff --git a/tools/stats/README.md b/cmd/lotus-stats/README.md similarity index 61% rename from tools/stats/README.md rename to cmd/lotus-stats/README.md index a75671220..04220aa3b 100644 --- a/tools/stats/README.md +++ b/cmd/lotus-stats/README.md @@ -1,6 +1,6 @@ -# Stats +# lotus-stats -Stats is a small tool to push chain information into influxdb +`lotus-stats` is a small tool to push chain information into influxdb ## Setup @@ -14,13 +14,13 @@ INFLUX_PASS="" ## Usage -Stats will be default look in `~/.lotus` to connect to a running daemon and resume collecting stats from last record block height. +lotus-stats will be default look in `~/.lotus` to connect to a running daemon and resume collecting stats from last record block height. -For other usage see `./stats --help` +For other usage see `./lotus-stats --help` ``` -go build -o stats *.go -. env.stats && ./stats +go build -o lotus-stats *.go +. env.stats && ./lotus-stats ``` diff --git a/tools/stats/chain.dashboard.json b/cmd/lotus-stats/chain.dashboard.json similarity index 100% rename from tools/stats/chain.dashboard.json rename to cmd/lotus-stats/chain.dashboard.json diff --git a/tools/stats/docker-compose.yml b/cmd/lotus-stats/docker-compose.yml similarity index 100% rename from tools/stats/docker-compose.yml rename to cmd/lotus-stats/docker-compose.yml diff --git a/tools/stats/env.stats b/cmd/lotus-stats/env.stats similarity index 100% rename from tools/stats/env.stats rename to cmd/lotus-stats/env.stats diff --git a/cmd/lotus-stats/main.go b/cmd/lotus-stats/main.go new file mode 100644 index 000000000..a83493762 --- /dev/null +++ b/cmd/lotus-stats/main.go @@ -0,0 +1,76 @@ +package main + +import ( + "context" + "flag" + "os" + + "github.com/filecoin-project/lotus/tools/stats" + logging "github.com/ipfs/go-log/v2" +) + +var log = logging.Logger("stats") + +const ( + INFLUX_ADDR = "INFLUX_ADDR" + INFLUX_USER = "INFLUX_USER" + INFLUX_PASS = "INFLUX_PASS" +) + +func main() { + var repo string = "~/.lotus" + var database string = "lotus" + var reset bool = false + var nosync bool = false + var height int64 = 0 + var headlag int = 3 + + flag.StringVar(&repo, "repo", repo, "lotus repo path") + flag.StringVar(&database, "database", database, "influx database") + flag.Int64Var(&height, "height", height, "block height to start syncing from (0 will resume)") + flag.IntVar(&headlag, "head-lag", headlag, "number of head events to hold to protect against small reorgs") + flag.BoolVar(&reset, "reset", reset, "truncate database before starting stats gathering") + flag.BoolVar(&nosync, "nosync", nosync, "skip waiting for sync") + + flag.Parse() + + influxAddr := os.Getenv(INFLUX_ADDR) + influxUser := os.Getenv(INFLUX_USER) + influxPass := os.Getenv(INFLUX_PASS) + + ctx := context.Background() + + influx, err := stats.InfluxClient(influxAddr, influxUser, influxPass) + if err != nil { + log.Fatal(err) + } + + if reset { + if err := stats.ResetDatabase(influx, database); err != nil { + log.Fatal(err) + } + } + + if !reset && height == 0 { + h, err := stats.GetLastRecordedHeight(influx, database) + if err != nil { + log.Info(err) + } + + height = h + } + + api, closer, err := stats.GetFullNodeAPI(repo) + if err != nil { + log.Fatal(err) + } + defer closer() + + if !nosync { + if err := stats.WaitForSyncComplete(ctx, api); err != nil { + log.Fatal(err) + } + } + + stats.Collect(ctx, api, influx, database, height) +} diff --git a/tools/stats/setup.bash b/cmd/lotus-stats/setup.bash similarity index 100% rename from tools/stats/setup.bash rename to cmd/lotus-stats/setup.bash diff --git a/tools/stats/collect.go b/tools/stats/collect.go new file mode 100644 index 000000000..98a8c79d0 --- /dev/null +++ b/tools/stats/collect.go @@ -0,0 +1,65 @@ +package stats + +import ( + "context" + "time" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/specs-actors/actors/abi" + client "github.com/influxdata/influxdb1-client/v2" +) + +var headlag int = 3 + +func Collect(ctx context.Context, api api.FullNode, influx client.Client, database string, height int64) { + tipsetsCh, err := GetTips(ctx, api, abi.ChainEpoch(height), headlag) + if err != nil { + log.Fatal(err) + } + + wq := NewInfluxWriteQueue(ctx, influx) + defer wq.Close() + + for tipset := range tipsetsCh { + log.Infow("Collect stats", "height", tipset.Height()) + pl := NewPointList() + height := tipset.Height() + + if err := RecordTipsetPoints(ctx, api, pl, tipset); err != nil { + log.Warnw("Failed to record tipset", "height", height, "error", err) + continue + } + + if err := RecordTipsetMessagesPoints(ctx, api, pl, tipset); err != nil { + log.Warnw("Failed to record messages", "height", height, "error", err) + continue + } + + if err := RecordTipsetStatePoints(ctx, api, pl, tipset); err != nil { + log.Warnw("Failed to record state", "height", height, "error", err) + continue + } + + // Instead of having to pass around a bunch of generic stuff we want for each point + // we will just add them at the end. + + tsTimestamp := time.Unix(int64(tipset.MinTimestamp()), int64(0)) + + nb, err := InfluxNewBatch() + if err != nil { + log.Fatal(err) + } + + for _, pt := range pl.Points() { + pt.SetTime(tsTimestamp) + + nb.AddPoint(NewPointFrom(pt)) + } + + nb.SetDatabase(database) + + log.Infow("Adding points", "count", len(nb.Points()), "height", tipset.Height()) + + wq.AddBatch(nb) + } +} diff --git a/tools/stats/head_buffer.go b/tools/stats/head_buffer.go index 08a817233..1c3bf9777 100644 --- a/tools/stats/head_buffer.go +++ b/tools/stats/head_buffer.go @@ -1,4 +1,4 @@ -package main +package stats import ( "container/list" diff --git a/tools/stats/head_buffer_test.go b/tools/stats/head_buffer_test.go index 6266f35fd..098c90a96 100644 --- a/tools/stats/head_buffer_test.go +++ b/tools/stats/head_buffer_test.go @@ -1,4 +1,4 @@ -package main +package stats import ( "testing" diff --git a/tools/stats/main.go b/tools/stats/main.go deleted file mode 100644 index b3f6eff33..000000000 --- a/tools/stats/main.go +++ /dev/null @@ -1,126 +0,0 @@ -package main - -import ( - "context" - "flag" - "os" - "time" - - "github.com/filecoin-project/specs-actors/actors/abi" - logging "github.com/ipfs/go-log/v2" -) - -var log = logging.Logger("stats") - -const ( - INFLUX_ADDR = "INFLUX_ADDR" - INFLUX_USER = "INFLUX_USER" - INFLUX_PASS = "INFLUX_PASS" -) - -func main() { - var repo string = "~/.lotus" - var database string = "lotus" - var reset bool = false - var nosync bool = false - var height int64 = 0 - var headlag int = 3 - - flag.StringVar(&repo, "repo", repo, "lotus repo path") - flag.StringVar(&database, "database", database, "influx database") - flag.Int64Var(&height, "height", height, "block height to start syncing from (0 will resume)") - flag.IntVar(&headlag, "head-lag", headlag, "number of head events to hold to protect against small reorgs") - flag.BoolVar(&reset, "reset", reset, "truncate database before starting stats gathering") - flag.BoolVar(&nosync, "nosync", nosync, "skip waiting for sync") - - flag.Parse() - - influxAddr := os.Getenv(INFLUX_ADDR) - influxUser := os.Getenv(INFLUX_USER) - influxPass := os.Getenv(INFLUX_PASS) - - ctx := context.Background() - - influx, err := InfluxClient(influxAddr, influxUser, influxPass) - if err != nil { - log.Fatal(err) - } - - if reset { - if err := ResetDatabase(influx, database); err != nil { - log.Fatal(err) - } - } - - if !reset && height == 0 { - h, err := GetLastRecordedHeight(influx, database) - if err != nil { - log.Info(err) - } - - height = h - } - - api, closer, err := GetFullNodeAPI(repo) - if err != nil { - log.Fatal(err) - } - defer closer() - - if !nosync { - if err := WaitForSyncComplete(ctx, api); err != nil { - log.Fatal(err) - } - } - - tipsetsCh, err := GetTips(ctx, api, abi.ChainEpoch(height), headlag) - if err != nil { - log.Fatal(err) - } - - wq := NewInfluxWriteQueue(ctx, influx) - defer wq.Close() - - for tipset := range tipsetsCh { - log.Infow("Collect stats", "height", tipset.Height()) - pl := NewPointList() - height := tipset.Height() - - if err := RecordTipsetPoints(ctx, api, pl, tipset); err != nil { - log.Warnw("Failed to record tipset", "height", height, "error", err) - continue - } - - if err := RecordTipsetMessagesPoints(ctx, api, pl, tipset); err != nil { - log.Warnw("Failed to record messages", "height", height, "error", err) - continue - } - - if err := RecordTipsetStatePoints(ctx, api, pl, tipset); err != nil { - log.Warnw("Failed to record state", "height", height, "error", err) - continue - } - - // Instead of having to pass around a bunch of generic stuff we want for each point - // we will just add them at the end. - - tsTimestamp := time.Unix(int64(tipset.MinTimestamp()), int64(0)) - - nb, err := InfluxNewBatch() - if err != nil { - log.Fatal(err) - } - - for _, pt := range pl.Points() { - pt.SetTime(tsTimestamp) - - nb.AddPoint(NewPointFrom(pt)) - } - - nb.SetDatabase(database) - - log.Infow("Adding points", "count", len(nb.Points()), "height", tipset.Height()) - - wq.AddBatch(nb) - } -} diff --git a/tools/stats/metrics.go b/tools/stats/metrics.go index 53fa6ed63..626363731 100644 --- a/tools/stats/metrics.go +++ b/tools/stats/metrics.go @@ -1,4 +1,4 @@ -package main +package stats import ( "bytes" @@ -25,8 +25,12 @@ import ( _ "github.com/influxdata/influxdb1-client" models "github.com/influxdata/influxdb1-client/models" client "github.com/influxdata/influxdb1-client/v2" + + logging "github.com/ipfs/go-log/v2" ) +var log = logging.Logger("stats") + type PointList struct { points []models.Point } diff --git a/tools/stats/rpc.go b/tools/stats/rpc.go index a94ab955b..d053ff561 100644 --- a/tools/stats/rpc.go +++ b/tools/stats/rpc.go @@ -1,4 +1,4 @@ -package main +package stats import ( "context"