lotus/tools/stats/points/collect.go
2021-11-01 09:05:14 +00:00

364 lines
10 KiB
Go

package points
import (
"context"
"fmt"
"math"
"math/big"
"strings"
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/adt"
"github.com/filecoin-project/lotus/chain/actors/builtin/power"
"github.com/filecoin-project/lotus/chain/actors/builtin/reward"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/tools/stats/influx"
"github.com/filecoin-project/lotus/tools/stats/metrics"
lru "github.com/hashicorp/golang-lru"
client "github.com/influxdata/influxdb1-client/v2"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
"go.opencensus.io/stats"
"golang.org/x/xerrors"
)
type LotusApi interface {
WalletBalance(context.Context, address.Address) (types.BigInt, error)
StateMinerPower(context.Context, address.Address, types.TipSetKey) (*api.MinerPower, error)
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error)
ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]api.Message, error)
ChainGetParentReceipts(ctx context.Context, blockCid cid.Cid) ([]*types.MessageReceipt, error)
ChainGetBlockMessages(ctx context.Context, blockCid cid.Cid) (*api.BlockMessages, error)
}
type ChainPointCollector struct {
ctx context.Context
api LotusApi
store adt.Store
actorDigestCache *lru.TwoQueueCache
}
func NewChainPointCollector(ctx context.Context, store adt.Store, api LotusApi) (*ChainPointCollector, error) {
actorDigestCache, err := lru.New2Q(2 << 15)
if err != nil {
return nil, err
}
collector := &ChainPointCollector{
ctx: ctx,
store: store,
actorDigestCache: actorDigestCache,
api: api,
}
return collector, nil
}
func (c *ChainPointCollector) actorDigest(ctx context.Context, addr address.Address, tipset *types.TipSet) (string, error) {
if code, ok := c.actorDigestCache.Get(addr); ok {
return code.(string), nil
}
actor, err := c.api.StateGetActor(ctx, addr, tipset.Key())
if err != nil {
return "", err
}
dm, err := multihash.Decode(actor.Code.Hash())
if err != nil {
return "", err
}
digest := string(dm.Digest)
c.actorDigestCache.Add(addr, digest)
return digest, nil
}
func (c *ChainPointCollector) Collect(ctx context.Context, tipset *types.TipSet) (client.BatchPoints, error) {
start := time.Now()
done := metrics.Timer(ctx, metrics.TipsetCollectionDuration)
defer func() {
log.Infow("record tipset", "elapsed", time.Now().Sub(start).Seconds())
done()
}()
pl := influx.NewPointList()
height := tipset.Height()
log.Debugw("collecting tipset points", "height", tipset.Height())
stats.Record(ctx, metrics.TipsetCollectionHeight.M(int64(height)))
if err := c.collectBlockheaderPoints(ctx, pl, tipset); err != nil {
log.Errorw("failed to record tipset", "height", height, "error", err, "tipset", tipset.Key())
}
if err := c.collectMessagePoints(ctx, pl, tipset); err != nil {
log.Errorw("failed to record messages", "height", height, "error", err, "tipset", tipset.Key())
}
if err := c.collectStaterootPoints(ctx, pl, tipset); err != nil {
log.Errorw("failed to record state", "height", height, "error", err, "tipset", tipset.Key())
}
tsTimestamp := time.Unix(int64(tipset.MinTimestamp()), int64(0))
nb, err := influx.NewBatch()
if err != nil {
return nil, err
}
for _, pt := range pl.Points() {
pt.SetTime(tsTimestamp)
nb.AddPoint(influx.NewPointFrom(pt))
}
log.Infow("collected tipset points", "count", len(nb.Points()), "height", tipset.Height())
stats.Record(ctx, metrics.TipsetCollectionPoints.M(int64(len(nb.Points()))))
return nb, nil
}
func (c *ChainPointCollector) collectBlockheaderPoints(ctx context.Context, pl *influx.PointList, tipset *types.TipSet) error {
start := time.Now()
done := metrics.Timer(ctx, metrics.TipsetCollectionBlockHeaderDuration)
defer func() {
log.Infow("collect blockheader points", "elapsed", time.Now().Sub(start).Seconds())
done()
}()
cids := []string{}
for _, cid := range tipset.Cids() {
cids = append(cids, cid.String())
}
p := influx.NewPoint("chain.height", int64(tipset.Height()))
p.AddTag("tipset", strings.Join(cids, " "))
pl.AddPoint(p)
p = influx.NewPoint("chain.block_count", len(cids))
pl.AddPoint(p)
tsTime := time.Unix(int64(tipset.MinTimestamp()), int64(0))
p = influx.NewPoint("chain.blocktime", tsTime.Unix())
pl.AddPoint(p)
totalGasLimit := int64(0)
totalUniqGasLimit := int64(0)
seen := make(map[cid.Cid]struct{})
for _, blockheader := range tipset.Blocks() {
bs, err := blockheader.Serialize()
if err != nil {
return err
}
p := influx.NewPoint("chain.election", blockheader.ElectionProof.WinCount)
p.AddTag("miner", blockheader.Miner.String())
pl.AddPoint(p)
p = influx.NewPoint("chain.blockheader_size", len(bs))
pl.AddPoint(p)
msgs, err := c.api.ChainGetBlockMessages(ctx, blockheader.Cid())
if err != nil {
return xerrors.Errorf("ChainGetBlockMessages failed: %w", msgs)
}
for _, m := range msgs.BlsMessages {
c := m.Cid()
totalGasLimit += m.GasLimit
if _, ok := seen[c]; !ok {
totalUniqGasLimit += m.GasLimit
seen[c] = struct{}{}
}
}
for _, m := range msgs.SecpkMessages {
c := m.Cid()
totalGasLimit += m.Message.GasLimit
if _, ok := seen[c]; !ok {
totalUniqGasLimit += m.Message.GasLimit
seen[c] = struct{}{}
}
}
}
p = influx.NewPoint("chain.gas_limit_total", totalGasLimit)
pl.AddPoint(p)
p = influx.NewPoint("chain.gas_limit_uniq_total", totalUniqGasLimit)
pl.AddPoint(p)
{
baseFeeIn := tipset.Blocks()[0].ParentBaseFee
newBaseFee := store.ComputeNextBaseFee(baseFeeIn, totalUniqGasLimit, len(tipset.Blocks()), tipset.Height())
baseFeeRat := new(big.Rat).SetFrac(newBaseFee.Int, new(big.Int).SetUint64(build.FilecoinPrecision))
baseFeeFloat, _ := baseFeeRat.Float64()
p = influx.NewPoint("chain.basefee", baseFeeFloat)
pl.AddPoint(p)
baseFeeChange := new(big.Rat).SetFrac(newBaseFee.Int, baseFeeIn.Int)
baseFeeChangeF, _ := baseFeeChange.Float64()
p = influx.NewPoint("chain.basefee_change_log", math.Log(baseFeeChangeF)/math.Log(1.125))
pl.AddPoint(p)
}
{
blks := int64(len(cids))
p = influx.NewPoint("chain.gas_fill_ratio", float64(totalGasLimit)/float64(blks*build.BlockGasTarget))
pl.AddPoint(p)
p = influx.NewPoint("chain.gas_capacity_ratio", float64(totalUniqGasLimit)/float64(blks*build.BlockGasTarget))
pl.AddPoint(p)
p = influx.NewPoint("chain.gas_waste_ratio", float64(totalGasLimit-totalUniqGasLimit)/float64(blks*build.BlockGasTarget))
pl.AddPoint(p)
}
return nil
}
func (c *ChainPointCollector) collectStaterootPoints(ctx context.Context, pl *influx.PointList, tipset *types.TipSet) error {
start := time.Now()
done := metrics.Timer(ctx, metrics.TipsetCollectionStaterootDuration)
defer func() {
log.Infow("collect stateroot points", "elapsed", time.Now().Sub(start).Seconds())
done()
}()
attoFil := types.NewInt(build.FilecoinPrecision).Int
netBal, err := c.api.WalletBalance(ctx, reward.Address)
if err != nil {
return err
}
netBalFil := new(big.Rat).SetFrac(netBal.Int, attoFil)
netBalFilFloat, _ := netBalFil.Float64()
p := influx.NewPoint("network.balance", netBalFilFloat)
pl.AddPoint(p)
totalPower, err := c.api.StateMinerPower(ctx, address.Address{}, tipset.Key())
if err != nil {
return err
}
// We divide the power into gibibytes because 2^63 bytes is 8 exbibytes which is smaller than the Filecoin Mainnet.
// Dividing by a gibibyte gives us more room to work with. This will allow the dashboard to report network and miner
// sizes up to 8192 yobibytes.
gibi := types.NewInt(1024 * 1024 * 1024)
p = influx.NewPoint("chain.power", types.BigDiv(totalPower.TotalPower.QualityAdjPower, gibi).Int64())
pl.AddPoint(p)
powerActor, err := c.api.StateGetActor(ctx, power.Address, tipset.Key())
if err != nil {
return err
}
powerActorState, err := power.Load(c.store, powerActor)
if err != nil {
return err
}
return powerActorState.ForEachClaim(func(addr address.Address, claim power.Claim) error {
// BigCmp returns 0 if values are equal
if types.BigCmp(claim.QualityAdjPower, types.NewInt(0)) == 0 {
return nil
}
p = influx.NewPoint("chain.miner_power", types.BigDiv(claim.QualityAdjPower, gibi).Int64())
p.AddTag("miner", addr.String())
pl.AddPoint(p)
return nil
})
}
type msgTag struct {
actor string
method uint64
exitcode uint8
}
func (c *ChainPointCollector) collectMessagePoints(ctx context.Context, pl *influx.PointList, tipset *types.TipSet) error {
start := time.Now()
done := metrics.Timer(ctx, metrics.TipsetCollectionMessageDuration)
defer func() {
log.Infow("collect message points", "elapsed", time.Now().Sub(start).Seconds())
done()
}()
cids := tipset.Cids()
if len(cids) == 0 {
return fmt.Errorf("no cids in tipset")
}
msgs, err := c.api.ChainGetParentMessages(ctx, cids[0])
if err != nil {
return err
}
recp, err := c.api.ChainGetParentReceipts(ctx, cids[0])
if err != nil {
return err
}
msgn := make(map[msgTag][]cid.Cid)
totalGasUsed := int64(0)
for _, r := range recp {
totalGasUsed += r.GasUsed
}
p := influx.NewPoint("chain.gas_used_total", totalGasUsed)
pl.AddPoint(p)
for i, msg := range msgs {
digest, err := c.actorDigest(ctx, msg.Message.To, tipset)
if err != nil {
continue
}
// FIXME: use float so this doesn't overflow
// FIXME: this doesn't work as time points get overridden
p := influx.NewPoint("chain.message_gaspremium", msg.Message.GasPremium.Int64())
pl.AddPoint(p)
p = influx.NewPoint("chain.message_gasfeecap", msg.Message.GasFeeCap.Int64())
pl.AddPoint(p)
bs, err := msg.Message.Serialize()
if err != nil {
return err
}
p = influx.NewPoint("chain.message_size", len(bs))
pl.AddPoint(p)
tag := msgTag{
actor: digest,
method: uint64(msg.Message.Method),
exitcode: uint8(recp[i].ExitCode),
}
found := false
for _, c := range msgn[tag] {
if c.Equals(msg.Cid) {
found = true
break
}
}
if !found {
msgn[tag] = append(msgn[tag], msg.Cid)
}
}
for t, m := range msgn {
p := influx.NewPoint("chain.message_count", len(m))
p.AddTag("actor", t.actor)
p.AddTag("method", fmt.Sprintf("%d", t.method))
p.AddTag("exitcode", fmt.Sprintf("%d", t.exitcode))
pl.AddPoint(p)
}
return nil
}