This commit is contained in:
Anton Evangelatov 2020-07-03 15:38:19 +02:00
parent 7258ad00f4
commit 84bc071179
8 changed files with 1395 additions and 68 deletions

View File

@ -15,69 +15,60 @@
"editable": true,
"gnetId": null,
"graphTooltip": 0,
"id": 16,
"iteration": 1593448862489,
"id": 1,
"iteration": 1593533384941,
"links": [],
"panels": [
{
"collapsed": false,
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"gridPos": {
"h": 1,
"w": 24,
"x": 0,
"y": 0
},
"id": 10,
"panels": [],
"title": "Params",
"type": "row"
},
{
"datasource": "influxdb",
"fieldConfig": {
"defaults": {
"custom": {},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
"custom": {}
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 1
"y": 0
},
"id": 11,
"hiddenSeries": false,
"id": 21,
"legend": {
"alignAsTable": true,
"avg": false,
"current": false,
"max": false,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"colorMode": "value",
"graphMode": "none",
"justifyMode": "auto",
"orientation": "auto",
"reduceOptions": {
"calcs": [
"last"
],
"fields": "",
"values": false
}
"dataLinks": []
},
"pluginVersion": "7.0.3",
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"alias": "$tag_instance",
"groupBy": [
{
"params": [
@ -87,12 +78,18 @@
},
{
"params": [
"0"
"instance"
],
"type": "tag"
},
{
"params": [
"previous"
],
"type": "fill"
}
],
"measurement": "diagnostics.miner.block-delay.gauge",
"measurement": "message/received",
"orderByTime": "ASC",
"policy": "default",
"refId": "A",
@ -101,29 +98,607 @@
[
{
"params": [
"value"
"count"
],
"type": "field"
},
{
"params": [],
"type": "distinct"
"type": "last"
}
]
],
"tags": [
{
"key": "run",
"operator": "=~",
"value": "/^$runid$/"
}
]
"tags": []
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "block delay",
"type": "stat"
"title": "message/received",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"fieldConfig": {
"defaults": {
"custom": {}
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 0
},
"hiddenSeries": false,
"id": 22,
"legend": {
"alignAsTable": true,
"avg": false,
"current": false,
"max": false,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"dataLinks": []
},
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"alias": "$tag_instance",
"groupBy": [
{
"params": [
"$__interval"
],
"type": "time"
},
{
"params": [
"instance"
],
"type": "tag"
},
{
"params": [
"previous"
],
"type": "fill"
}
],
"measurement": "message/success",
"orderByTime": "ASC",
"policy": "default",
"refId": "A",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"count"
],
"type": "field"
},
{
"params": [],
"type": "last"
}
]
],
"tags": []
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "message/success",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"fieldConfig": {
"defaults": {
"custom": {}
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 8
},
"hiddenSeries": false,
"id": 18,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"max": false,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"dataLinks": []
},
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"alias": "$tag_instance",
"groupBy": [
{
"params": [
"$__interval"
],
"type": "time"
},
{
"params": [
"instance"
],
"type": "tag"
},
{
"params": [
"none"
],
"type": "fill"
}
],
"measurement": "chain/node_height",
"orderByTime": "ASC",
"policy": "default",
"refId": "A",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"last"
],
"type": "field"
},
{
"params": [],
"type": "last"
}
]
],
"tags": []
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "chain/node_height",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"fieldConfig": {
"defaults": {
"custom": {}
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 8
},
"hiddenSeries": false,
"id": 19,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"max": false,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"dataLinks": []
},
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"alias": "$tag_instance",
"groupBy": [
{
"params": [
"$__interval"
],
"type": "time"
},
{
"params": [
"instance"
],
"type": "tag"
},
{
"params": [
"none"
],
"type": "fill"
}
],
"measurement": "peer/count",
"orderByTime": "ASC",
"policy": "default",
"refId": "A",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"last"
],
"type": "field"
},
{
"params": [],
"type": "last"
}
]
],
"tags": []
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "peer/count",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"collapsed": false,
"datasource": null,
"gridPos": {
"h": 1,
"w": 24,
"x": 0,
"y": 16
},
"id": 10,
"panels": [],
"title": "Blocks",
"type": "row"
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "influxdb",
"fieldConfig": {
"defaults": {
"custom": {}
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 17
},
"hiddenSeries": false,
"id": 15,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"max": false,
"min": false,
"rightSide": true,
"show": true,
"sort": "total",
"sortDesc": false,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"dataLinks": []
},
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"alias": "$tag_instance",
"groupBy": [
{
"params": [
"$myinterval"
],
"type": "time"
},
{
"params": [
"instance"
],
"type": "tag"
},
{
"params": [
"null"
],
"type": "fill"
}
],
"measurement": "block/received",
"orderByTime": "ASC",
"policy": "default",
"refId": "A",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"count"
],
"type": "field"
},
{
"params": [],
"type": "last"
}
]
],
"tags": []
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "block/received",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
@ -143,7 +718,142 @@
"h": 8,
"w": 12,
"x": 12,
"y": 1
"y": 17
},
"hiddenSeries": false,
"id": 16,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"max": false,
"min": false,
"rightSide": true,
"show": true,
"sort": "total",
"sortDesc": false,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"dataLinks": []
},
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"alias": "$tag_instance",
"groupBy": [
{
"params": [
"$myinterval"
],
"type": "time"
},
{
"params": [
"instance"
],
"type": "tag"
},
{
"params": [
"null"
],
"type": "fill"
}
],
"measurement": "block/success",
"orderByTime": "ASC",
"policy": "default",
"refId": "A",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"count"
],
"type": "field"
},
{
"params": [],
"type": "last"
}
]
],
"tags": []
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "block/success",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "influxdb",
"fieldConfig": {
"defaults": {
"custom": {}
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 25
},
"hiddenSeries": false,
"id": 13,
@ -284,7 +994,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "mined blocks",
"title": "mined blocks from testplan",
"tooltip": {
"shared": true,
"sort": 0,
@ -322,13 +1032,13 @@
}
},
{
"collapsed": true,
"collapsed": false,
"datasource": null,
"gridPos": {
"h": 1,
"w": 24,
"x": 0,
"y": 9
"y": 33
},
"id": 8,
"panels": [],
@ -353,7 +1063,7 @@
"h": 9,
"w": 12,
"x": 0,
"y": 10
"y": 34
},
"hiddenSeries": false,
"id": 2,
@ -492,7 +1202,7 @@
"h": 9,
"w": 12,
"x": 12,
"y": 10
"y": 34
},
"hiddenSeries": false,
"id": 14,
@ -683,7 +1393,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 19
"y": 43
},
"id": 6,
"panels": [],
@ -708,7 +1418,7 @@
"h": 9,
"w": 12,
"x": 0,
"y": 20
"y": 44
},
"hiddenSeries": false,
"id": 3,
@ -847,7 +1557,7 @@
"h": 9,
"w": 12,
"x": 12,
"y": 20
"y": 44
},
"hiddenSeries": false,
"id": 4,
@ -1008,7 +1718,7 @@
{
"allValue": null,
"current": {
"selected": true,
"selected": false,
"text": "All",
"value": "$__all"
},

View File

@ -13,6 +13,7 @@ require (
github.com/filecoin-project/specs-actors v0.6.2-0.20200617175406-de392ca14121
github.com/gorilla/mux v1.7.4
github.com/influxdata/influxdb v1.8.0 // indirect
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d
github.com/ipfs/go-cid v0.0.6
github.com/ipfs/go-datastore v0.4.4
github.com/ipfs/go-ipfs-files v0.0.8
@ -27,7 +28,9 @@ require (
github.com/libp2p/go-libp2p-pubsub-tracer v0.0.0-20200626141350-e730b32bf1e6
github.com/multiformats/go-multiaddr v0.2.2
github.com/multiformats/go-multiaddr-net v0.1.5
github.com/multiformats/go-multihash v0.0.13
github.com/testground/sdk-go v0.2.3-0.20200630140907-cda3c5ac055b
github.com/whyrusleeping/cbor-gen v0.0.0-20200504204219-64967432584d
go.opencensus.io v0.22.4
)

View File

@ -0,0 +1,47 @@
package stats
import (
"container/list"
"github.com/filecoin-project/lotus/api"
)
type headBuffer struct {
buffer *list.List
size int
}
func NewHeadBuffer(size int) *headBuffer {
buffer := list.New()
buffer.Init()
return &headBuffer{
buffer: buffer,
size: size,
}
}
func (h *headBuffer) Push(hc *api.HeadChange) (rethc *api.HeadChange) {
if h.buffer.Len() == h.size {
var ok bool
el := h.buffer.Front()
rethc, ok = el.Value.(*api.HeadChange)
if !ok {
panic("Value from list is not the correct type")
}
h.buffer.Remove(el)
}
h.buffer.PushBack(hc)
return
}
func (h *headBuffer) Pop() {
el := h.buffer.Back()
if el != nil {
h.buffer.Remove(el)
}
}

360
lotus-soup/stats/metrics.go Normal file
View File

@ -0,0 +1,360 @@
package stats
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math/big"
"strings"
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
cbg "github.com/whyrusleeping/cbor-gen"
_ "github.com/influxdata/influxdb1-client"
models "github.com/influxdata/influxdb1-client/models"
client "github.com/influxdata/influxdb1-client/v2"
)
type PointList struct {
points []models.Point
}
func NewPointList() *PointList {
return &PointList{}
}
func (pl *PointList) AddPoint(p models.Point) {
pl.points = append(pl.points, p)
}
func (pl *PointList) Points() []models.Point {
return pl.points
}
type InfluxWriteQueue struct {
ch chan client.BatchPoints
}
func NewInfluxWriteQueue(ctx context.Context, influx client.Client) *InfluxWriteQueue {
ch := make(chan client.BatchPoints, 128)
maxRetries := 10
go func() {
main:
for {
select {
case <-ctx.Done():
return
case batch := <-ch:
for i := 0; i < maxRetries; i++ {
if err := influx.Write(batch); err != nil {
log.Warnw("Failed to write batch", "error", err)
time.Sleep(time.Second * 15)
continue
}
continue main
}
log.Error("Dropping batch due to failure to write")
}
}
}()
return &InfluxWriteQueue{
ch: ch,
}
}
func (i *InfluxWriteQueue) AddBatch(bp client.BatchPoints) {
i.ch <- bp
}
func (i *InfluxWriteQueue) Close() {
close(i.ch)
}
func InfluxClient(addr, user, pass string) (client.Client, error) {
return client.NewHTTPClient(client.HTTPConfig{
Addr: addr,
Username: user,
Password: pass,
})
}
func InfluxNewBatch() (client.BatchPoints, error) {
return client.NewBatchPoints(client.BatchPointsConfig{})
}
func NewPoint(name string, value interface{}) models.Point {
pt, _ := models.NewPoint(name, models.Tags{}, map[string]interface{}{"value": value}, time.Now())
return pt
}
func NewPointFrom(p models.Point) *client.Point {
return client.NewPointFrom(p)
}
func RecordTipsetPoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error {
cids := []string{}
for _, cid := range tipset.Cids() {
cids = append(cids, cid.String())
}
p := NewPoint("chain.height", int64(tipset.Height()))
p.AddTag("tipset", strings.Join(cids, " "))
pl.AddPoint(p)
p = NewPoint("chain.block_count", len(cids))
pl.AddPoint(p)
tsTime := time.Unix(int64(tipset.MinTimestamp()), int64(0))
p = NewPoint("chain.blocktime", tsTime.Unix())
pl.AddPoint(p)
for _, blockheader := range tipset.Blocks() {
bs, err := blockheader.Serialize()
if err != nil {
return err
}
p := NewPoint("chain.election", 1)
p.AddTag("miner", blockheader.Miner.String())
pl.AddPoint(p)
p = NewPoint("chain.blockheader_size", len(bs))
pl.AddPoint(p)
}
return nil
}
type apiIpldStore struct {
ctx context.Context
api api.FullNode
}
func (ht *apiIpldStore) Context() context.Context {
return ht.ctx
}
func (ht *apiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
raw, err := ht.api.ChainReadObj(ctx, c)
if err != nil {
return err
}
cu, ok := out.(cbg.CBORUnmarshaler)
if ok {
if err := cu.UnmarshalCBOR(bytes.NewReader(raw)); err != nil {
return err
}
return nil
}
return fmt.Errorf("Object does not implement CBORUnmarshaler")
}
func (ht *apiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
return cid.Undef, fmt.Errorf("Put is not implemented on apiIpldStore")
}
func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error {
//pc, err := api.StatePledgeCollateral(ctx, tipset.Key())
//if err != nil {
//return err
//}
attoFil := types.NewInt(build.FilecoinPrecision).Int
//pcFil := new(big.Rat).SetFrac(pc.Int, attoFil)
//pcFilFloat, _ := pcFil.Float64()
//p := NewPoint("chain.pledge_collateral", pcFilFloat)
//pl.AddPoint(p)
netBal, err := api.WalletBalance(ctx, builtin.RewardActorAddr)
if err != nil {
return err
}
netBalFil := new(big.Rat).SetFrac(netBal.Int, attoFil)
netBalFilFloat, _ := netBalFil.Float64()
p := NewPoint("network.balance", netBalFilFloat)
pl.AddPoint(p)
totalPower, err := api.StateMinerPower(ctx, address.Address{}, tipset.Key())
if err != nil {
return err
}
p = NewPoint("chain.power", totalPower.TotalPower.QualityAdjPower.Int64())
pl.AddPoint(p)
powerActor, err := api.StateGetActor(ctx, builtin.StoragePowerActorAddr, tipset.Key())
if err != nil {
return err
}
powerRaw, err := api.ChainReadObj(ctx, powerActor.Head)
if err != nil {
return err
}
var powerActorState power.State
if err := powerActorState.UnmarshalCBOR(bytes.NewReader(powerRaw)); err != nil {
return fmt.Errorf("failed to unmarshal power actor state: %w", err)
}
s := &apiIpldStore{ctx, api}
mp, err := adt.AsMap(s, powerActorState.Claims)
if err != nil {
return err
}
err = mp.ForEach(nil, func(key string) error {
addr, err := address.NewFromBytes([]byte(key))
if err != nil {
return err
}
var claim power.Claim
keyerAddr := adt.AddrKey(addr)
mp.Get(keyerAddr, &claim)
if claim.QualityAdjPower.Int64() == 0 {
return nil
}
p = NewPoint("chain.miner_power", claim.QualityAdjPower.Int64())
p.AddTag("miner", addr.String())
pl.AddPoint(p)
return nil
})
if err != nil {
return err
}
return nil
}
type msgTag struct {
actor string
method uint64
exitcode uint8
}
func RecordTipsetMessagesPoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error {
cids := tipset.Cids()
if len(cids) == 0 {
return fmt.Errorf("no cids in tipset")
}
msgs, err := api.ChainGetParentMessages(ctx, cids[0])
if err != nil {
return err
}
recp, err := api.ChainGetParentReceipts(ctx, cids[0])
if err != nil {
return err
}
msgn := make(map[msgTag][]cid.Cid)
for i, msg := range msgs {
p := NewPoint("chain.message_gasprice", msg.Message.GasPrice.Int64())
pl.AddPoint(p)
bs, err := msg.Message.Serialize()
if err != nil {
return err
}
p = NewPoint("chain.message_size", len(bs))
pl.AddPoint(p)
actor, err := api.StateGetActor(ctx, msg.Message.To, tipset.Key())
if err != nil {
return err
}
dm, err := multihash.Decode(actor.Code.Hash())
if err != nil {
continue
}
tag := msgTag{
actor: string(dm.Digest),
method: uint64(msg.Message.Method),
exitcode: uint8(recp[i].ExitCode),
}
found := false
for _, c := range msgn[tag] {
if c.Equals(msg.Cid) {
found = true
break
}
}
if !found {
msgn[tag] = append(msgn[tag], msg.Cid)
}
}
for t, m := range msgn {
p := NewPoint("chain.message_count", len(m))
p.AddTag("actor", t.actor)
p.AddTag("method", fmt.Sprintf("%d", t.method))
p.AddTag("exitcode", fmt.Sprintf("%d", t.exitcode))
pl.AddPoint(p)
}
return nil
}
func ResetDatabase(influx client.Client, database string) error {
log.Info("Resetting database")
q := client.NewQuery(fmt.Sprintf(`DROP DATABASE "%s"; CREATE DATABASE "%s";`, database, database), "", "")
_, err := influx.Query(q)
return err
}
func GetLastRecordedHeight(influx client.Client, database string) (int64, error) {
log.Info("Retrieving last record height")
q := client.NewQuery(`SELECT "value" FROM "chain.height" ORDER BY time DESC LIMIT 1`, database, "")
res, err := influx.Query(q)
if err != nil {
return 0, err
}
if len(res.Results) == 0 {
return 0, fmt.Errorf("No results found for last recorded height")
}
if len(res.Results[0].Series) == 0 {
return 0, fmt.Errorf("No results found for last recorded height")
}
height, err := (res.Results[0].Series[0].Values[0][1].(json.Number)).Int64()
if err != nil {
return 0, err
}
log.Infow("Last record height", "height", height)
return height, nil
}

105
lotus-soup/stats/rpc.go Normal file
View File

@ -0,0 +1,105 @@
package stats
import (
"context"
"time"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
func GetTips(ctx context.Context, api api.FullNode, lastHeight abi.ChainEpoch, headlag int) (<-chan *types.TipSet, error) {
chmain := make(chan *types.TipSet)
hb := NewHeadBuffer(headlag)
notif, err := api.ChainNotify(ctx)
if err != nil {
return nil, err
}
go func() {
defer close(chmain)
ping := time.Tick(30 * time.Second)
for {
select {
case changes := <-notif:
for _, change := range changes {
log.Infow("Head event", "height", change.Val.Height(), "type", change.Type)
switch change.Type {
case store.HCCurrent:
tipsets, err := loadTipsets(ctx, api, change.Val, lastHeight)
if err != nil {
log.Info(err)
return
}
for _, tipset := range tipsets {
chmain <- tipset
}
case store.HCApply:
if out := hb.Push(change); out != nil {
chmain <- out.Val
}
case store.HCRevert:
hb.Pop()
}
}
case <-ping:
log.Info("Running health check")
cctx, cancel := context.WithTimeout(ctx, 5*time.Second)
if _, err := api.ID(cctx); err != nil {
log.Error("Health check failed")
cancel()
return
}
cancel()
log.Info("Node online")
case <-ctx.Done():
return
}
}
}()
return chmain, nil
}
func loadTipsets(ctx context.Context, api api.FullNode, curr *types.TipSet, lowestHeight abi.ChainEpoch) ([]*types.TipSet, error) {
tipsets := []*types.TipSet{}
for {
if curr.Height() == 0 {
break
}
if curr.Height() <= lowestHeight {
break
}
log.Infow("Walking back", "height", curr.Height())
tipsets = append(tipsets, curr)
tsk := curr.Parents()
prev, err := api.ChainGetTipSet(ctx, tsk)
if err != nil {
return tipsets, err
}
curr = prev
}
for i, j := 0, len(tipsets)-1; i < j; i, j = i+1, j-1 {
tipsets[i], tipsets[j] = tipsets[j], tipsets[i]
}
return tipsets, nil
}

92
lotus-soup/stats/stats.go Normal file
View File

@ -0,0 +1,92 @@
package stats
import (
"context"
"fmt"
"os"
"time"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/specs-actors/actors/abi"
logging "github.com/ipfs/go-log/v2"
)
var log = logging.Logger("stats")
func Collect(api api.FullNode) {
time.Sleep(15 * time.Second)
fmt.Println("collect stats")
var database string = "testground"
var height int64 = 0
var headlag int = 3
influxAddr := os.Getenv("INFLUXDB_URL")
influxUser := ""
influxPass := ""
ctx := context.Background()
influx, err := InfluxClient(influxAddr, influxUser, influxPass)
if err != nil {
log.Fatal(err)
}
h, err := GetLastRecordedHeight(influx, database)
if err != nil {
log.Info(err)
}
height = h
tipsetsCh, err := GetTips(ctx, api, abi.ChainEpoch(height), headlag)
if err != nil {
log.Fatal(err)
}
wq := NewInfluxWriteQueue(ctx, influx)
defer wq.Close()
for tipset := range tipsetsCh {
log.Infow("Collect stats", "height", tipset.Height())
pl := NewPointList()
height := tipset.Height()
if err := RecordTipsetPoints(ctx, api, pl, tipset); err != nil {
log.Warnw("Failed to record tipset", "height", height, "error", err)
continue
}
if err := RecordTipsetMessagesPoints(ctx, api, pl, tipset); err != nil {
log.Warnw("Failed to record messages", "height", height, "error", err)
continue
}
if err := RecordTipsetStatePoints(ctx, api, pl, tipset); err != nil {
log.Warnw("Failed to record state", "height", height, "error", err)
continue
}
// Instead of having to pass around a bunch of generic stuff we want for each point
// we will just add them at the end.
tsTimestamp := time.Unix(int64(tipset.MinTimestamp()), int64(0))
nb, err := InfluxNewBatch()
if err != nil {
log.Fatal(err)
}
for _, pt := range pl.Points() {
pt.SetTime(tsTimestamp)
nb.AddPoint(NewPointFrom(pt))
}
nb.SetDatabase(database)
log.Infow("Adding points", "count", len(nb.Points()), "height", tipset.Height())
wq.AddBatch(nb)
}
}

View File

@ -14,6 +14,8 @@ import (
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/specs-actors/actors/crypto"
tstats "github.com/filecoin-project/oni/lotus-soup/stats"
)
type LotusClient struct {
@ -90,6 +92,9 @@ func PrepareClient(t *TestEnvironment) (*LotusClient, error) {
registerAndExportMetrics(fmt.Sprintf("client_%d", t.GroupSeq))
// collect stats based on Travis' scripts
go tstats.Collect(n.FullApi)
t.RecordMessage("publish our address to the clients addr topic")
addrinfo, err := n.FullApi.NetAddrsListen(ctx)
if err != nil {

View File

@ -33,6 +33,8 @@ import (
libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/testground/sdk-go/sync"
tstats "github.com/filecoin-project/oni/lotus-soup/stats"
)
type LotusMiner struct {
@ -227,6 +229,9 @@ func PrepareMiner(t *TestEnvironment) (*LotusMiner, error) {
registerAndExportMetrics(minerAddr.String())
// collect stats based on Travis' scripts
go tstats.Collect(n.FullApi)
// Bootstrap with full node
remoteAddrs, err := n.FullApi.NetAddrsListen(ctx)
if err != nil {