129 lines
2.5 KiB
Go
129 lines
2.5 KiB
Go
package metrics
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
|
"github.com/ipfs/go-cid"
|
|
logging "github.com/ipfs/go-log/v2"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
"go.uber.org/fx"
|
|
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/node/impl/full"
|
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
|
)
|
|
|
|
var log = logging.Logger("metrics")
|
|
|
|
const baseTopic = "/fil/headnotifs/"
|
|
|
|
type Update struct {
|
|
Type string
|
|
}
|
|
|
|
func SendHeadNotifs(nickname string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, chain full.ChainAPI) error {
|
|
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, chain full.ChainAPI) error {
|
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
|
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(_ context.Context) error {
|
|
gen, err := chain.Chain.GetGenesis()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
topic := baseTopic + gen.Cid().String()
|
|
|
|
go func() {
|
|
if err := sendHeadNotifs(ctx, ps, topic, chain, nickname); err != nil {
|
|
log.Error("consensus metrics error", err)
|
|
return
|
|
}
|
|
}()
|
|
go func() {
|
|
sub, err := ps.Subscribe(topic)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer sub.Cancel()
|
|
|
|
for {
|
|
if _, err := sub.Next(ctx); err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
}()
|
|
return nil
|
|
},
|
|
})
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
type message struct {
|
|
// TipSet
|
|
Cids []cid.Cid
|
|
Blocks []*types.BlockHeader
|
|
Height abi.ChainEpoch
|
|
Weight types.BigInt
|
|
Time uint64
|
|
Nonce uint64
|
|
|
|
// Meta
|
|
|
|
NodeName string
|
|
}
|
|
|
|
func sendHeadNotifs(ctx context.Context, ps *pubsub.PubSub, topic string, chain full.ChainAPI, nickname string) error {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
notifs, err := chain.ChainNotify(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// using unix nano time makes very sure we pick a nonce higher than previous restart
|
|
nonce := uint64(time.Now().UnixNano())
|
|
|
|
for {
|
|
select {
|
|
case notif := <-notifs:
|
|
n := notif[len(notif)-1]
|
|
|
|
w, err := chain.ChainTipSetWeight(ctx, n.Val)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
m := message{
|
|
Cids: n.Val.Cids(),
|
|
Blocks: n.Val.Blocks(),
|
|
Height: n.Val.Height(),
|
|
Weight: w,
|
|
NodeName: nickname,
|
|
Time: uint64(time.Now().UnixNano() / 1000_000),
|
|
Nonce: nonce,
|
|
}
|
|
|
|
b, err := json.Marshal(m)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := ps.Publish(topic, b); err != nil {
|
|
return err
|
|
}
|
|
case <-ctx.Done():
|
|
return nil
|
|
}
|
|
|
|
nonce++
|
|
}
|
|
}
|