lotus/tools/stats/metrics.go
Steven Allen 5733c71c50 Lint everything
We were ignoring quite a few error cases, and had one case where we weren't
actually updating state where we wanted to. Unfortunately, if the linter doesn't
pass, nobody has any reason to actually check lint failures in CI.

There are three remaining XXXs marked in the code for lint.
2020-08-20 20:46:36 -07:00

396 lines
9.1 KiB
Go

package stats
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math/big"
"strings"
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"golang.org/x/xerrors"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
cbg "github.com/whyrusleeping/cbor-gen"
_ "github.com/influxdata/influxdb1-client"
models "github.com/influxdata/influxdb1-client/models"
client "github.com/influxdata/influxdb1-client/v2"
logging "github.com/ipfs/go-log/v2"
)
var log = logging.Logger("stats")
type PointList struct {
points []models.Point
}
func NewPointList() *PointList {
return &PointList{}
}
func (pl *PointList) AddPoint(p models.Point) {
pl.points = append(pl.points, p)
}
func (pl *PointList) Points() []models.Point {
return pl.points
}
type InfluxWriteQueue struct {
ch chan client.BatchPoints
}
func NewInfluxWriteQueue(ctx context.Context, influx client.Client) *InfluxWriteQueue {
ch := make(chan client.BatchPoints, 128)
maxRetries := 10
go func() {
main:
for {
select {
case <-ctx.Done():
return
case batch := <-ch:
for i := 0; i < maxRetries; i++ {
if err := influx.Write(batch); err != nil {
log.Warnw("Failed to write batch", "error", err)
build.Clock.Sleep(15 * time.Second)
continue
}
continue main
}
log.Error("Dropping batch due to failure to write")
}
}
}()
return &InfluxWriteQueue{
ch: ch,
}
}
func (i *InfluxWriteQueue) AddBatch(bp client.BatchPoints) {
i.ch <- bp
}
func (i *InfluxWriteQueue) Close() {
close(i.ch)
}
func InfluxClient(addr, user, pass string) (client.Client, error) {
return client.NewHTTPClient(client.HTTPConfig{
Addr: addr,
Username: user,
Password: pass,
})
}
func InfluxNewBatch() (client.BatchPoints, error) {
return client.NewBatchPoints(client.BatchPointsConfig{})
}
func NewPoint(name string, value interface{}) models.Point {
pt, _ := models.NewPoint(name, models.Tags{},
map[string]interface{}{"value": value}, build.Clock.Now().UTC())
return pt
}
func NewPointFrom(p models.Point) *client.Point {
return client.NewPointFrom(p)
}
func RecordTipsetPoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error {
cids := []string{}
for _, cid := range tipset.Cids() {
cids = append(cids, cid.String())
}
p := NewPoint("chain.height", int64(tipset.Height()))
p.AddTag("tipset", strings.Join(cids, " "))
pl.AddPoint(p)
p = NewPoint("chain.block_count", len(cids))
pl.AddPoint(p)
tsTime := time.Unix(int64(tipset.MinTimestamp()), int64(0))
p = NewPoint("chain.blocktime", tsTime.Unix())
pl.AddPoint(p)
baseFeeBig := tipset.Blocks()[0].ParentBaseFee.Copy()
baseFeeRat := new(big.Rat).SetFrac(baseFeeBig.Int, new(big.Int).SetUint64(build.FilecoinPrecision))
baseFeeFloat, _ := baseFeeRat.Float64()
p = NewPoint("chain.basefee", baseFeeFloat)
pl.AddPoint(p)
totalGasLimit := int64(0)
for _, blockheader := range tipset.Blocks() {
bs, err := blockheader.Serialize()
if err != nil {
return err
}
p := NewPoint("chain.election", blockheader.ElectionProof.WinCount)
p.AddTag("miner", blockheader.Miner.String())
pl.AddPoint(p)
p = NewPoint("chain.blockheader_size", len(bs))
pl.AddPoint(p)
msgs, err := api.ChainGetBlockMessages(ctx, blockheader.Cid())
if err != nil {
return xerrors.Errorf("ChainGetBlockMessages failed: %w", msgs)
}
for _, m := range msgs.BlsMessages {
totalGasLimit += m.GasLimit
}
for _, m := range msgs.SecpkMessages {
totalGasLimit += m.Message.GasLimit
}
}
p = NewPoint("chain.gas_limit_total", totalGasLimit)
pl.AddPoint(p)
return nil
}
type apiIpldStore struct {
ctx context.Context
api api.FullNode
}
func (ht *apiIpldStore) Context() context.Context {
return ht.ctx
}
func (ht *apiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
raw, err := ht.api.ChainReadObj(ctx, c)
if err != nil {
return err
}
cu, ok := out.(cbg.CBORUnmarshaler)
if ok {
if err := cu.UnmarshalCBOR(bytes.NewReader(raw)); err != nil {
return err
}
return nil
}
return fmt.Errorf("Object does not implement CBORUnmarshaler")
}
func (ht *apiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
return cid.Undef, fmt.Errorf("Put is not implemented on apiIpldStore")
}
func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error {
attoFil := types.NewInt(build.FilecoinPrecision).Int
//TODO: StatePledgeCollateral API is not implemented and is commented out - re-enable this block once the API is implemented again.
//pc, err := api.StatePledgeCollateral(ctx, tipset.Key())
//if err != nil {
//return err
//}
//pcFil := new(big.Rat).SetFrac(pc.Int, attoFil)
//pcFilFloat, _ := pcFil.Float64()
//p := NewPoint("chain.pledge_collateral", pcFilFloat)
//pl.AddPoint(p)
netBal, err := api.WalletBalance(ctx, builtin.RewardActorAddr)
if err != nil {
return err
}
netBalFil := new(big.Rat).SetFrac(netBal.Int, attoFil)
netBalFilFloat, _ := netBalFil.Float64()
p := NewPoint("network.balance", netBalFilFloat)
pl.AddPoint(p)
totalPower, err := api.StateMinerPower(ctx, address.Address{}, tipset.Key())
if err != nil {
return err
}
p = NewPoint("chain.power", totalPower.TotalPower.QualityAdjPower.Int64())
pl.AddPoint(p)
powerActor, err := api.StateGetActor(ctx, builtin.StoragePowerActorAddr, tipset.Key())
if err != nil {
return err
}
powerRaw, err := api.ChainReadObj(ctx, powerActor.Head)
if err != nil {
return err
}
var powerActorState power.State
if err := powerActorState.UnmarshalCBOR(bytes.NewReader(powerRaw)); err != nil {
return fmt.Errorf("failed to unmarshal power actor state: %w", err)
}
s := &apiIpldStore{ctx, api}
mp, err := adt.AsMap(s, powerActorState.Claims)
if err != nil {
return err
}
var claim power.Claim
err = mp.ForEach(&claim, func(key string) error {
addr, err := address.NewFromBytes([]byte(key))
if err != nil {
return err
}
if claim.QualityAdjPower.Int64() == 0 {
return nil
}
p = NewPoint("chain.miner_power", claim.QualityAdjPower.Int64())
p.AddTag("miner", addr.String())
pl.AddPoint(p)
return nil
})
if err != nil {
return err
}
return nil
}
type msgTag struct {
actor string
method uint64
exitcode uint8
}
func RecordTipsetMessagesPoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error {
cids := tipset.Cids()
if len(cids) == 0 {
return fmt.Errorf("no cids in tipset")
}
msgs, err := api.ChainGetParentMessages(ctx, cids[0])
if err != nil {
return err
}
recp, err := api.ChainGetParentReceipts(ctx, cids[0])
if err != nil {
return err
}
msgn := make(map[msgTag][]cid.Cid)
totalGasUsed := int64(0)
for _, r := range recp {
totalGasUsed += r.GasUsed
}
p := NewPoint("chain.gas_used_total", totalGasUsed)
pl.AddPoint(p)
for i, msg := range msgs {
// FIXME: use float so this doesn't overflow
// FIXME: this doesn't work as time points get overridden
p := NewPoint("chain.message_gaspremium", msg.Message.GasPremium.Int64())
pl.AddPoint(p)
p = NewPoint("chain.message_gasfeecap", msg.Message.GasFeeCap.Int64())
pl.AddPoint(p)
bs, err := msg.Message.Serialize()
if err != nil {
return err
}
p = NewPoint("chain.message_size", len(bs))
pl.AddPoint(p)
actor, err := api.StateGetActor(ctx, msg.Message.To, tipset.Key())
if err != nil {
return err
}
dm, err := multihash.Decode(actor.Code.Hash())
if err != nil {
continue
}
tag := msgTag{
actor: string(dm.Digest),
method: uint64(msg.Message.Method),
exitcode: uint8(recp[i].ExitCode),
}
found := false
for _, c := range msgn[tag] {
if c.Equals(msg.Cid) {
found = true
break
}
}
if !found {
msgn[tag] = append(msgn[tag], msg.Cid)
}
}
for t, m := range msgn {
p := NewPoint("chain.message_count", len(m))
p.AddTag("actor", t.actor)
p.AddTag("method", fmt.Sprintf("%d", t.method))
p.AddTag("exitcode", fmt.Sprintf("%d", t.exitcode))
pl.AddPoint(p)
}
return nil
}
func ResetDatabase(influx client.Client, database string) error {
log.Info("Resetting database")
q := client.NewQuery(fmt.Sprintf(`DROP DATABASE "%s"; CREATE DATABASE "%s";`, database, database), "", "")
_, err := influx.Query(q)
return err
}
func GetLastRecordedHeight(influx client.Client, database string) (int64, error) {
log.Info("Retrieving last record height")
q := client.NewQuery(`SELECT "value" FROM "chain.height" ORDER BY time DESC LIMIT 1`, database, "")
res, err := influx.Query(q)
if err != nil {
return 0, err
}
if len(res.Results) == 0 {
return 0, fmt.Errorf("No results found for last recorded height")
}
if len(res.Results[0].Series) == 0 {
return 0, fmt.Errorf("No results found for last recorded height")
}
height, err := (res.Results[0].Series[0].Values[0][1].(json.Number)).Int64()
if err != nil {
return 0, err
}
log.Infow("Last record height", "height", height)
return height, nil
}