collect pubsub metrics through the tracer

This commit is contained in:
vyzo 2020-08-17 10:46:20 +03:00
parent 1898b82dc4
commit 8cf9595d35
2 changed files with 37 additions and 14 deletions

View File

@ -38,6 +38,10 @@ var (
BlockValidationSuccess = stats.Int64("block/success", "Counter for block validation successes", stats.UnitDimensionless)
BlockValidationDurationMilliseconds = stats.Float64("block/validation_ms", "Duration for Block Validation in ms", stats.UnitMilliseconds)
PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless)
PubsubPublishMessage = stats.Int64("pubsub/published", "Counter for total published messages", stats.UnitDimensionless)
PubsubDeliverMessage = stats.Int64("pubsub/delivered", "Counter for total delivered messages", stats.UnitDimensionless)
PubsubRejectMessage = stats.Int64("pubsub/rejected", "Counter for total rejected messages", stats.UnitDimensionless)
PubsubDuplicateMessage = stats.Int64("pubsub/duplicate", "Counter for total duplicate messages", stats.UnitDimensionless)
)
var (

View File

@ -11,10 +11,12 @@ import (
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
blake2b "github.com/minio/blake2b-simd"
ma "github.com/multiformats/go-multiaddr"
"go.opencensus.io/stats"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
@ -303,12 +305,12 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
trw := newTracerWrapper(tr, build.BlocksTopic(in.Nn))
options = append(options, pubsub.WithEventTracer(trw))
} else {
// still instantiate a tracer for collecting metrics
trw := newTracerWrapper(nil)
options = append(options, pubsub.WithEventTracer(trw))
}
// TODO: we want to hook the peer score inspector so that we can gain visibility
// in peer scores for debugging purposes -- this might be trigged by metrics collection
// options = append(options, pubsub.WithPeerScoreInspect(XXX, time.Second))
return pubsub.NewGossipSub(helpers.LifecycleCtx(in.Mctx, in.Lc), in.Host, options...)
}
@ -318,10 +320,14 @@ func HashMsgId(m *pubsub_pb.Message) string {
}
func newTracerWrapper(tr pubsub.EventTracer, topics ...string) pubsub.EventTracer {
topicsMap := make(map[string]struct{})
for _, topic := range topics {
topicsMap[topic] = struct{}{}
var topicsMap map[string]struct{}
if len(topics) > 0 {
topicsMap = make(map[string]struct{})
for _, topic := range topics {
topicsMap[topic] = struct{}{}
}
}
return &tracerWrapper{tr: tr, topics: topicsMap}
}
@ -347,23 +353,36 @@ func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) {
// distributions.
// Furthermore, we only trace message publication and deliveries for specified topics
// (here just the blocks topic).
// TODO: hook all events into local metrics for inspection through the dashboard
switch evt.GetType() {
case pubsub_pb.TraceEvent_PUBLISH_MESSAGE:
if trw.traceMessage(evt.GetPublishMessage().Topics) {
stats.Record(context.TODO(), metrics.PubsubPublishMessage.M(1))
if trw.tr != nil && trw.traceMessage(evt.GetPublishMessage().Topics) {
trw.tr.Trace(evt)
}
case pubsub_pb.TraceEvent_DELIVER_MESSAGE:
if trw.traceMessage(evt.GetDeliverMessage().Topics) {
stats.Record(context.TODO(), metrics.PubsubDeliverMessage.M(1))
if trw.tr != nil && trw.traceMessage(evt.GetDeliverMessage().Topics) {
trw.tr.Trace(evt)
}
case pubsub_pb.TraceEvent_REJECT_MESSAGE:
stats.Record(context.TODO(), metrics.PubsubRejectMessage.M(1))
case pubsub_pb.TraceEvent_DUPLICATE_MESSAGE:
stats.Record(context.TODO(), metrics.PubsubDuplicateMessage.M(1))
case pubsub_pb.TraceEvent_JOIN:
trw.tr.Trace(evt)
if trw.tr != nil {
trw.tr.Trace(evt)
}
case pubsub_pb.TraceEvent_LEAVE:
trw.tr.Trace(evt)
if trw.tr != nil {
trw.tr.Trace(evt)
}
case pubsub_pb.TraceEvent_GRAFT:
trw.tr.Trace(evt)
if trw.tr != nil {
trw.tr.Trace(evt)
}
case pubsub_pb.TraceEvent_PRUNE:
trw.tr.Trace(evt)
if trw.tr != nil {
trw.tr.Trace(evt)
}
}
}