Merge pull request #2253 from filecoin-project/extract-stats

extract stats package
This commit is contained in:
Łukasz Magiera 2020-07-03 21:18:11 +02:00 committed by GitHub
commit e5fdae45b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 149 additions and 136 deletions

View File

@ -1,6 +1,6 @@
# Stats # lotus-stats
Stats is a small tool to push chain information into influxdb `lotus-stats` is a small tool to push chain information into influxdb
## Setup ## Setup
@ -14,13 +14,13 @@ INFLUX_PASS=""
## Usage ## Usage
Stats will be default look in `~/.lotus` to connect to a running daemon and resume collecting stats from last record block height. lotus-stats will be default look in `~/.lotus` to connect to a running daemon and resume collecting stats from last record block height.
For other usage see `./stats --help` For other usage see `./lotus-stats --help`
``` ```
go build -o stats *.go go build -o lotus-stats *.go
. env.stats && ./stats . env.stats && ./lotus-stats
``` ```

72
cmd/lotus-stats/main.go Normal file
View File

@ -0,0 +1,72 @@
package main
import (
"context"
"flag"
"os"
"github.com/filecoin-project/lotus/tools/stats"
logging "github.com/ipfs/go-log/v2"
)
var log = logging.Logger("stats")
const (
influxAddrEnvVar = "INFLUX_ADDR"
influxUserEnvVar = "INFLUX_USER"
influxPassEnvVar = "INFLUX_PASS"
)
func main() {
var repo string = "~/.lotus"
var database string = "lotus"
var reset bool = false
var nosync bool = false
var height int64 = 0
var headlag int = 3
flag.StringVar(&repo, "repo", repo, "lotus repo path")
flag.StringVar(&database, "database", database, "influx database")
flag.Int64Var(&height, "height", height, "block height to start syncing from (0 will resume)")
flag.IntVar(&headlag, "head-lag", headlag, "number of head events to hold to protect against small reorgs")
flag.BoolVar(&reset, "reset", reset, "truncate database before starting stats gathering")
flag.BoolVar(&nosync, "nosync", nosync, "skip waiting for sync")
flag.Parse()
ctx := context.Background()
influx, err := stats.InfluxClient(os.Getenv(influxAddrEnvVar), os.Getenv(influxUserEnvVar), os.Getenv(influxPassEnvVar))
if err != nil {
log.Fatal(err)
}
if reset {
if err := stats.ResetDatabase(influx, database); err != nil {
log.Fatal(err)
}
}
if !reset && height == 0 {
h, err := stats.GetLastRecordedHeight(influx, database)
if err != nil {
log.Info(err)
}
height = h
}
api, closer, err := stats.GetFullNodeAPI(repo)
if err != nil {
log.Fatal(err)
}
defer closer()
if !nosync {
if err := stats.WaitForSyncComplete(ctx, api); err != nil {
log.Fatal(err)
}
}
stats.Collect(ctx, api, influx, database, height, headlag)
}

63
tools/stats/collect.go Normal file
View File

@ -0,0 +1,63 @@
package stats
import (
"context"
"time"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/specs-actors/actors/abi"
client "github.com/influxdata/influxdb1-client/v2"
)
func Collect(ctx context.Context, api api.FullNode, influx client.Client, database string, height int64, headlag int) {
tipsetsCh, err := GetTips(ctx, api, abi.ChainEpoch(height), headlag)
if err != nil {
log.Fatal(err)
}
wq := NewInfluxWriteQueue(ctx, influx)
defer wq.Close()
for tipset := range tipsetsCh {
log.Infow("Collect stats", "height", tipset.Height())
pl := NewPointList()
height := tipset.Height()
if err := RecordTipsetPoints(ctx, api, pl, tipset); err != nil {
log.Warnw("Failed to record tipset", "height", height, "error", err)
continue
}
if err := RecordTipsetMessagesPoints(ctx, api, pl, tipset); err != nil {
log.Warnw("Failed to record messages", "height", height, "error", err)
continue
}
if err := RecordTipsetStatePoints(ctx, api, pl, tipset); err != nil {
log.Warnw("Failed to record state", "height", height, "error", err)
continue
}
// Instead of having to pass around a bunch of generic stuff we want for each point
// we will just add them at the end.
tsTimestamp := time.Unix(int64(tipset.MinTimestamp()), int64(0))
nb, err := InfluxNewBatch()
if err != nil {
log.Fatal(err)
}
for _, pt := range pl.Points() {
pt.SetTime(tsTimestamp)
nb.AddPoint(NewPointFrom(pt))
}
nb.SetDatabase(database)
log.Infow("Adding points", "count", len(nb.Points()), "height", tipset.Height())
wq.AddBatch(nb)
}
}

View File

@ -1,4 +1,4 @@
package main package stats
import ( import (
"container/list" "container/list"

View File

@ -1,4 +1,4 @@
package main package stats
import ( import (
"testing" "testing"

View File

@ -1,126 +0,0 @@
package main
import (
"context"
"flag"
"os"
"time"
"github.com/filecoin-project/specs-actors/actors/abi"
logging "github.com/ipfs/go-log/v2"
)
var log = logging.Logger("stats")
const (
INFLUX_ADDR = "INFLUX_ADDR"
INFLUX_USER = "INFLUX_USER"
INFLUX_PASS = "INFLUX_PASS"
)
func main() {
var repo string = "~/.lotus"
var database string = "lotus"
var reset bool = false
var nosync bool = false
var height int64 = 0
var headlag int = 3
flag.StringVar(&repo, "repo", repo, "lotus repo path")
flag.StringVar(&database, "database", database, "influx database")
flag.Int64Var(&height, "height", height, "block height to start syncing from (0 will resume)")
flag.IntVar(&headlag, "head-lag", headlag, "number of head events to hold to protect against small reorgs")
flag.BoolVar(&reset, "reset", reset, "truncate database before starting stats gathering")
flag.BoolVar(&nosync, "nosync", nosync, "skip waiting for sync")
flag.Parse()
influxAddr := os.Getenv(INFLUX_ADDR)
influxUser := os.Getenv(INFLUX_USER)
influxPass := os.Getenv(INFLUX_PASS)
ctx := context.Background()
influx, err := InfluxClient(influxAddr, influxUser, influxPass)
if err != nil {
log.Fatal(err)
}
if reset {
if err := ResetDatabase(influx, database); err != nil {
log.Fatal(err)
}
}
if !reset && height == 0 {
h, err := GetLastRecordedHeight(influx, database)
if err != nil {
log.Info(err)
}
height = h
}
api, closer, err := GetFullNodeAPI(repo)
if err != nil {
log.Fatal(err)
}
defer closer()
if !nosync {
if err := WaitForSyncComplete(ctx, api); err != nil {
log.Fatal(err)
}
}
tipsetsCh, err := GetTips(ctx, api, abi.ChainEpoch(height), headlag)
if err != nil {
log.Fatal(err)
}
wq := NewInfluxWriteQueue(ctx, influx)
defer wq.Close()
for tipset := range tipsetsCh {
log.Infow("Collect stats", "height", tipset.Height())
pl := NewPointList()
height := tipset.Height()
if err := RecordTipsetPoints(ctx, api, pl, tipset); err != nil {
log.Warnw("Failed to record tipset", "height", height, "error", err)
continue
}
if err := RecordTipsetMessagesPoints(ctx, api, pl, tipset); err != nil {
log.Warnw("Failed to record messages", "height", height, "error", err)
continue
}
if err := RecordTipsetStatePoints(ctx, api, pl, tipset); err != nil {
log.Warnw("Failed to record state", "height", height, "error", err)
continue
}
// Instead of having to pass around a bunch of generic stuff we want for each point
// we will just add them at the end.
tsTimestamp := time.Unix(int64(tipset.MinTimestamp()), int64(0))
nb, err := InfluxNewBatch()
if err != nil {
log.Fatal(err)
}
for _, pt := range pl.Points() {
pt.SetTime(tsTimestamp)
nb.AddPoint(NewPointFrom(pt))
}
nb.SetDatabase(database)
log.Infow("Adding points", "count", len(nb.Points()), "height", tipset.Height())
wq.AddBatch(nb)
}
}

View File

@ -1,4 +1,4 @@
package main package stats
import ( import (
"bytes" "bytes"
@ -25,8 +25,12 @@ import (
_ "github.com/influxdata/influxdb1-client" _ "github.com/influxdata/influxdb1-client"
models "github.com/influxdata/influxdb1-client/models" models "github.com/influxdata/influxdb1-client/models"
client "github.com/influxdata/influxdb1-client/v2" client "github.com/influxdata/influxdb1-client/v2"
logging "github.com/ipfs/go-log/v2"
) )
var log = logging.Logger("stats")
type PointList struct { type PointList struct {
points []models.Point points []models.Point
} }

View File

@ -1,4 +1,4 @@
package main package stats
import ( import (
"context" "context"