Add chain stats tool

Simple chain stats tool for graphing the chain using influxdb and
grafana.

License: MIT
Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
Travis Person 2019-10-11 17:13:16 -07:00 committed by Jakub Sztandera
parent d891589cef
commit 0b2718a4af
No known key found for this signature in database
GPG Key ID: 9A9AF56F8B3879BA
12 changed files with 2738 additions and 0 deletions

View File

@ -104,6 +104,11 @@ fountain:
go run github.com/GeertJohan/go.rice/rice append --exec fountain -i ./cmd/lotus-fountain
.PHONY: fountain
stats:
rm -f stats
go build -o stats ./tools/stats
.PHONY: stats
clean:
rm -rf $(CLEAN)
-$(MAKE) -C $(BLS_PATH) clean

1
go.mod
View File

@ -16,6 +16,7 @@ require (
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/gorilla/websocket v1.4.0
github.com/hashicorp/golang-lru v0.5.3
github.com/influxdata/influxdb1-client v0.0.0-20190809212627-fc22c7df067e
github.com/ipfs/go-bitswap v0.1.8
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c

2
go.sum
View File

@ -138,6 +138,8 @@ github.com/huin/goupnp v1.0.0 h1:wg75sLpL6DZqwHQN6E1Cfk6mtfzS45z8OV+ic+DtHRo=
github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc=
github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb1-client v0.0.0-20190809212627-fc22c7df067e h1:txQltCyjXAqVVSZDArPEhUTg35hKwVIuXwtQo7eAMNQ=
github.com/influxdata/influxdb1-client v0.0.0-20190809212627-fc22c7df067e/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/ipfs/bbloom v0.0.1/go.mod h1:oqo8CVWsJFMOZqTglBG4wydCE4IQA/G2/SEofB0rjUI=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=

1
tools/stats/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
env.stats

39
tools/stats/README.md Normal file
View File

@ -0,0 +1,39 @@
# Stats
Stats is a small tool to push chain information into influxdb
## Setup
Influx configuration can be configured through env variables.
```
INFLUX_ADDR="http://localhost:8086"
INFLUX_USER=""
INFLUX_PASS=""
```
## Usage
Stats will be default look in `~/.lotus` to connect to a running daemon and resume collecting stats from last record block height.
For other usage see `./stats --help`
```
go build -o stats *.go
. env.stats && ./stats
```
## Development
Start grafana and influxdb containers and import the dashboard to grafana.
The url of the imported dashboard will be returned.
If the script doesn't work, you can manually setup the datasource and import the dashboard.
```
docker-compose up -d
./setup.bash
```
The default username and password for grafana are both `admin`.

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,26 @@
version: '3'
services:
influxdb:
image: influxdb:latest
container_name: influxdb
environment:
- INFLUXDB_DB=lotus
ports:
- "8086:8086"
volumes:
- influxdb:/var/lib/influxdb
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
links:
- influxdb
volumes:
- grafana:/var/lib/grafana
volumes:
influxdb:
grafana:

3
tools/stats/env.stats Normal file
View File

@ -0,0 +1,3 @@
export INFLUX_ADDR="http://localhost:8086"
export INFLUX_USER=""
export INFLUX_PASS=""

118
tools/stats/main.go Normal file
View File

@ -0,0 +1,118 @@
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"time"
)
const (
INFLUX_ADDR = "INFLUX_ADDR"
INFLUX_USER = "INFLUX_USER"
INFLUX_PASS = "INFLUX_PASS"
)
func main() {
var repo string = "~/.lotus"
var database string = "lotus"
var reset bool = false
var height int64 = 0
flag.StringVar(&repo, "repo", repo, "lotus repo path")
flag.StringVar(&database, "database", database, "influx database")
flag.Int64Var(&height, "height", height, "block height to start syncing from (0 will resume)")
flag.BoolVar(&reset, "reset", reset, "truncate database before starting stats gathering")
flag.Parse()
influxAddr := os.Getenv(INFLUX_ADDR)
influxUser := os.Getenv(INFLUX_USER)
influxPass := os.Getenv(INFLUX_PASS)
ctx := context.Background()
influx, err := InfluxClient(influxAddr, influxUser, influxPass)
if err != nil {
log.Fatal(err)
}
if reset {
if err := ResetDatabase(influx, database); err != nil {
log.Fatal(err)
}
}
if !reset && height == 0 {
h, err := GetLastRecordedHeight(influx, database)
if err != nil {
log.Print(err)
}
height = h
}
api, closer, err := GetFullNodeAPI(repo)
if err != nil {
log.Fatal(err)
}
defer closer()
if err := WaitForSyncComplete(ctx, api); err != nil {
log.Fatal(err)
}
tipsetsCh, err := GetTips(ctx, api, uint64(height))
if err != nil {
log.Fatal(err)
}
wq := NewInfluxWriteQueue(ctx, influx)
defer wq.Close()
for tipset := range tipsetsCh {
pl := NewPointList()
height := tipset.Height()
if err := RecordTipsetPoints(ctx, api, pl, tipset); err != nil {
log.Printf("Failed to record tipset at height %d: %w", height, err)
continue
}
if err := RecordTipsetMessagesPoints(ctx, api, pl, tipset); err != nil {
log.Printf("Failed to record messages at height %d: %w", height, err)
continue
}
if err := RecordTipsetStatePoints(ctx, api, pl, tipset); err != nil {
log.Printf("Failed to record state at height %d: %w", height, err)
continue
}
// Instead of having to pass around a bunch of generic stuff we want for each point
// we will just add them at the end.
tsHeight := fmt.Sprintf("%d", tipset.Height())
tsTimestamp := time.Unix(int64(tipset.MinTimestamp()), int64(0))
nb, err := InfluxNewBatch()
if err != nil {
log.Fatal(err)
}
for _, pt := range pl.Points() {
pt.AddTag("height", tsHeight)
pt.SetTime(tsTimestamp)
nb.AddPoint(NewPointFrom(pt))
}
nb.SetDatabase(database)
log.Printf("Writing %d points for height %d", len(nb.Points()), tipset.Height())
wq.AddBatch(nb)
}
}

241
tools/stats/metrics.go Normal file
View File

@ -0,0 +1,241 @@
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"time"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
_ "github.com/influxdata/influxdb1-client"
models "github.com/influxdata/influxdb1-client/models"
client "github.com/influxdata/influxdb1-client/v2"
)
type PointList struct {
points []models.Point
}
func NewPointList() *PointList {
return &PointList{}
}
func (pl *PointList) AddPoint(p models.Point) {
pl.points = append(pl.points, p)
}
func (pl *PointList) Points() []models.Point {
return pl.points
}
type InfluxWriteQueue struct {
influx client.Client
ch chan client.BatchPoints
}
func NewInfluxWriteQueue(ctx context.Context, influx client.Client) *InfluxWriteQueue {
ch := make(chan client.BatchPoints, 128)
maxRetries := 10
go func() {
main:
for {
select {
case <-ctx.Done():
return
case batch := <-ch:
for i := 0; i < maxRetries; i++ {
if err := influx.Write(batch); err != nil {
log.Printf("Failed to write batch: %w", err)
time.Sleep(time.Second * 15)
continue
}
continue main
}
log.Printf("Dropping batch due to failure to write")
}
}
}()
return &InfluxWriteQueue{
ch: ch,
}
}
func (i *InfluxWriteQueue) AddBatch(bp client.BatchPoints) {
i.ch <- bp
}
func (i *InfluxWriteQueue) Close() {
close(i.ch)
}
func InfluxClient(addr, user, pass string) (client.Client, error) {
return client.NewHTTPClient(client.HTTPConfig{
Addr: addr,
Username: user,
Password: pass,
})
}
func InfluxNewBatch() (client.BatchPoints, error) {
return client.NewBatchPoints(client.BatchPointsConfig{})
}
func NewPoint(name string, value interface{}) models.Point {
pt, _ := models.NewPoint(name, models.Tags{}, map[string]interface{}{"value": value}, time.Now())
return pt
}
func NewPointFrom(p models.Point) *client.Point {
return client.NewPointFrom(p)
}
func RecordTipsetPoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error {
cids := []string{}
for _, cid := range tipset.Cids() {
cids = append(cids, cid.String())
}
p := NewPoint("chain.height", int64(tipset.Height()))
p.AddTag("tipset", strings.Join(cids, " "))
pl.AddPoint(p)
p = NewPoint("chain.block_count", len(cids))
pl.AddPoint(p)
tsTime := time.Unix(int64(tipset.MinTimestamp()), int64(0))
p = NewPoint("chain.blocktime", tsTime.Unix())
pl.AddPoint(p)
for _, blockheader := range tipset.Blocks() {
bs, err := blockheader.Serialize()
if err != nil {
return err
}
p := NewPoint("chain.blockheader_size", len(bs))
pl.AddPoint(p)
}
return nil
}
func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error {
pc, err := api.StatePledgeCollateral(ctx, tipset)
if err != nil {
return err
}
pcfil := types.BigDiv(pc, types.NewInt(uint64(build.FilecoinPrecision)))
p := NewPoint("chain.pledge_collateral", pcfil.Int64())
pl.AddPoint(p)
power, err := api.StateMinerPower(ctx, address.Address{}, tipset)
if err != nil {
return err
}
p = NewPoint("chain.power", power.TotalPower.Int64())
pl.AddPoint(p)
miners, err := api.StateListMiners(ctx, tipset)
for _, miner := range miners {
power, err := api.StateMinerPower(ctx, miner, tipset)
if err != nil {
return err
}
p = NewPoint("chain.miner_power", power.MinerPower.Int64())
p.AddTag("miner", miner.String())
pl.AddPoint(p)
}
return nil
}
func RecordTipsetMessagesPoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error {
cids := tipset.Cids()
if len(cids) == 0 {
return fmt.Errorf("no cids in tipset")
}
msgs, err := api.ChainGetParentMessages(ctx, cids[0])
if err != nil {
return err
}
recp, err := api.ChainGetParentReceipts(ctx, cids[0])
if err != nil {
return err
}
for i, msg := range msgs {
p := NewPoint("chain.message_gasprice", msg.Message.GasPrice.Int64())
pl.AddPoint(p)
bs, err := msg.Message.Serialize()
if err != nil {
return err
}
p = NewPoint("chain.message_size", len(bs))
pl.AddPoint(p)
actor, err := api.StateGetActor(ctx, msg.Message.To, tipset)
if err != nil {
return err
}
p = NewPoint("chain.message_count", 1)
p.AddTag("actor", actor.Code.String())
p.AddTag("method", fmt.Sprintf("%d", msg.Message.Method))
p.AddTag("exitcode", fmt.Sprintf("%d", recp[i].ExitCode))
pl.AddPoint(p)
}
return nil
}
func ResetDatabase(influx client.Client, database string) error {
log.Print("Resetting database")
q := client.NewQuery(fmt.Sprintf(`DROP DATABASE "%s"; CREATE DATABASE "%s";`, database, database), "", "")
_, err := influx.Query(q)
return err
}
func GetLastRecordedHeight(influx client.Client, database string) (int64, error) {
log.Print("Retrieving last record height")
q := client.NewQuery(`SELECT "value" FROM "chain.height" ORDER BY time DESC LIMIT 1`, database, "")
res, err := influx.Query(q)
if err != nil {
return 0, err
}
if len(res.Results) == 0 {
return 0, fmt.Errorf("No results found for last recorded height")
}
if len(res.Results[0].Series) == 0 {
return 0, fmt.Errorf("No results found for last recorded height")
}
height, err := (res.Results[0].Series[0].Values[0][1].(json.Number)).Int64()
if err != nil {
return 0, err
}
log.Printf("Last record height %d", height)
return height, nil
}

155
tools/stats/rpc.go Normal file
View File

@ -0,0 +1,155 @@
package main
import (
"context"
"log"
"net/http"
"time"
"github.com/multiformats/go-multiaddr-net"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/api/client"
"github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/lib/jsonrpc"
"github.com/filecoin-project/go-lotus/node/repo"
)
func getAPI(path string) (string, http.Header, error) {
r, err := repo.NewFS(path)
if err != nil {
return "", nil, err
}
ma, err := r.APIEndpoint()
if err != nil {
return "", nil, xerrors.Errorf("failed to get api endpoint: %w", err)
}
_, addr, err := manet.DialArgs(ma)
if err != nil {
return "", nil, err
}
var headers http.Header
token, err := r.APIToken()
if err != nil {
log.Printf("Couldn't load CLI token, capabilities may be limited: %w", err)
} else {
headers = http.Header{}
headers.Add("Authorization", "Bearer "+string(token))
}
return "ws://" + addr + "/rpc/v0", headers, nil
}
func WaitForSyncComplete(ctx context.Context, napi api.FullNode) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(30 * time.Second):
state, err := napi.SyncState(ctx)
if err != nil {
return err
}
log.Printf("Stage %s, Height %d", chain.SyncStageString(state.Stage), state.Height)
if state.Stage == api.StageSyncComplete {
return nil
}
}
}
}
func GetTips(ctx context.Context, api api.FullNode, lastHeight uint64) (<-chan *types.TipSet, error) {
chmain := make(chan *types.TipSet)
notif, err := api.ChainNotify(ctx)
if err != nil {
return nil, err
}
go func() {
defer close(chmain)
for {
select {
case changes := <-notif:
for _, change := range changes {
log.Printf("Head event { height:%d; type: %s }", change.Val.Height(), change.Type)
switch change.Type {
case store.HCCurrent:
tipsets, err := loadTipsets(ctx, api, change.Val, lastHeight)
if err != nil {
log.Print(err)
return
}
for _, tipset := range tipsets {
chmain <- tipset
}
case store.HCApply:
chmain <- change.Val
}
}
case <-ctx.Done():
return
}
}
}()
return chmain, nil
}
func loadTipsets(ctx context.Context, api api.FullNode, curr *types.TipSet, lowestHeight uint64) ([]*types.TipSet, error) {
tipsets := []*types.TipSet{}
for {
if curr.Height() == 0 {
break
}
if curr.Height() <= lowestHeight {
break
}
log.Printf("Walking back { height:%d }", curr.Height())
tipsets = append(tipsets, curr)
ph := ParentTipsetHeight(curr)
if ph == 0 {
break
}
prev, err := api.ChainGetTipSetByHeight(ctx, ph, curr)
if err != nil {
return tipsets, err
}
curr = prev
}
for i, j := 0, len(tipsets)-1; i < j; i, j = i+1, j-1 {
tipsets[i], tipsets[j] = tipsets[j], tipsets[i]
}
return tipsets, nil
}
func ParentTipsetHeight(tipset *types.TipSet) uint64 {
mtb := tipset.MinTicketBlock()
return tipset.Height() - uint64(len(mtb.Tickets)) - 1
}
func GetFullNodeAPI(repo string) (api.FullNode, jsonrpc.ClientCloser, error) {
addr, headers, err := getAPI(repo)
if err != nil {
return nil, nil, err
}
return client.NewFullNodeRPC(addr, headers)
}

29
tools/stats/setup.bash Executable file
View File

@ -0,0 +1,29 @@
#!/usr/bin/env bash
GRAFANA_HOST="localhost:3000"
curl -s -XPOST http://admin:admin@$GRAFANA_HOST/api/datasources -H 'Content-Type: text/json' --data-binary @- > /dev/null << EOF
{
"name":"InfluxDB",
"type":"influxdb",
"database":"lotus",
"url": "http://influxdb:8086",
"basicAuth":false,
"access": "proxy"
}
EOF
curl -s -XPOST http://admin:admin@$GRAFANA_HOST/api/dashboards/import -H 'Content-Type: text/json' --data-binary @- << EOF | jq -r "\"http://$GRAFANA_HOST\" + .importedUrl"
{
"dashboard": $(cat ./chain.dashboard.json),
"overwrite": true,
"inputs": [
{
"name": "DS_INFLUXDB",
"pluginId": "influxdb",
"type": "datasource",
"value": "InfluxDB"
}
]
}
EOF