lotus/chain/metrics/consensus.go

128 lines
2.4 KiB
Go
Raw Normal View History

2019-10-10 11:07:00 +00:00
package metrics
import (
"context"
"encoding/json"
"time"
2019-10-11 02:45:45 +00:00
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
2019-10-10 11:07:00 +00:00
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"
2019-10-10 11:07:00 +00:00
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/modules/helpers"
2019-10-10 11:07:00 +00:00
)
var log = logging.Logger("metrics")
const baseTopic = "/fil/headnotifs/"
type Update struct {
Type string
}
2019-10-11 02:45:45 +00:00
func SendHeadNotifs(nickname string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, chain full.ChainAPI) error {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, chain full.ChainAPI) error {
ctx := helpers.LifecycleCtx(mctx, lc)
2019-10-11 02:45:45 +00:00
lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
gen, err := chain.Chain.GetGenesis()
if err != nil {
2019-10-11 02:45:45 +00:00
return err
}
2019-10-11 02:45:45 +00:00
topic := baseTopic + gen.Cid().String()
go func() {
if err := sendHeadNotifs(ctx, ps, topic, chain, nickname); err != nil {
log.Error("consensus metrics error", err)
return
}
2019-10-11 02:45:45 +00:00
}()
go func() {
sub, err := ps.Subscribe(topic)
if err != nil {
return
}
defer sub.Cancel()
2019-10-11 02:45:45 +00:00
for {
if _, err := sub.Next(ctx); err != nil {
return
}
}
}()
return nil
},
})
return nil
}
}
type message struct {
// TipSet
Cids []cid.Cid
Blocks []*types.BlockHeader
Height uint64
Weight types.BigInt
Time uint64
2019-11-14 16:37:02 +00:00
Nonce uint64
2019-10-11 02:45:45 +00:00
// Meta
2019-10-10 11:07:00 +00:00
2019-10-11 02:45:45 +00:00
NodeName string
2019-10-10 11:07:00 +00:00
}
2019-10-11 02:45:45 +00:00
func sendHeadNotifs(ctx context.Context, ps *pubsub.PubSub, topic string, chain full.ChainAPI, nickname string) error {
2019-10-10 11:07:00 +00:00
ctx, cancel := context.WithCancel(ctx)
defer cancel()
notifs, err := chain.ChainNotify(ctx)
if err != nil {
return err
}
2019-11-14 16:37:02 +00:00
// using unix nano time makes very sure we pick a nonce higher than previous restart
nonce := uint64(time.Now().UnixNano())
2019-10-10 11:07:00 +00:00
for {
select {
case notif := <-notifs:
n := notif[len(notif)-1]
w, err := chain.ChainTipSetWeight(ctx, n.Val.Key())
2019-10-15 05:00:30 +00:00
if err != nil {
return err
}
2019-10-11 02:45:45 +00:00
m := message{
Cids: n.Val.Cids(),
Blocks: n.Val.Blocks(),
Height: n.Val.Height(),
2019-10-15 05:00:30 +00:00
Weight: w,
2019-10-11 02:45:45 +00:00
NodeName: nickname,
2019-11-10 16:26:39 +00:00
Time: uint64(time.Now().UnixNano() / 1000_000),
2019-11-14 16:37:02 +00:00
Nonce: nonce,
2019-10-11 02:45:45 +00:00
}
b, err := json.Marshal(m)
2019-10-10 11:07:00 +00:00
if err != nil {
return err
}
if err := ps.Publish(topic, b); err != nil {
return err
}
case <-ctx.Done():
return nil
}
2019-11-14 16:37:02 +00:00
nonce++
2019-10-10 11:07:00 +00:00
}
}