Merge pull request #3104 from filecoin-project/feat/pubsub-metrics

collect pubsub metrics through the tracer
This commit is contained in:
Łukasz Magiera 2020-08-17 10:39:46 +02:00 committed by GitHub
commit ae40555558
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 37 additions and 14 deletions

View File

@ -38,6 +38,10 @@ var (
BlockValidationSuccess = stats.Int64("block/success", "Counter for block validation successes", stats.UnitDimensionless) BlockValidationSuccess = stats.Int64("block/success", "Counter for block validation successes", stats.UnitDimensionless)
BlockValidationDurationMilliseconds = stats.Float64("block/validation_ms", "Duration for Block Validation in ms", stats.UnitMilliseconds) BlockValidationDurationMilliseconds = stats.Float64("block/validation_ms", "Duration for Block Validation in ms", stats.UnitMilliseconds)
PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless) PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless)
PubsubPublishMessage = stats.Int64("pubsub/published", "Counter for total published messages", stats.UnitDimensionless)
PubsubDeliverMessage = stats.Int64("pubsub/delivered", "Counter for total delivered messages", stats.UnitDimensionless)
PubsubRejectMessage = stats.Int64("pubsub/rejected", "Counter for total rejected messages", stats.UnitDimensionless)
PubsubDuplicateMessage = stats.Int64("pubsub/duplicate", "Counter for total duplicate messages", stats.UnitDimensionless)
) )
var ( var (

View File

@ -11,10 +11,12 @@ import (
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
blake2b "github.com/minio/blake2b-simd" blake2b "github.com/minio/blake2b-simd"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"go.opencensus.io/stats"
"go.uber.org/fx" "go.uber.org/fx"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/modules/helpers"
@ -303,12 +305,12 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
trw := newTracerWrapper(tr, build.BlocksTopic(in.Nn)) trw := newTracerWrapper(tr, build.BlocksTopic(in.Nn))
options = append(options, pubsub.WithEventTracer(trw)) options = append(options, pubsub.WithEventTracer(trw))
} else {
// still instantiate a tracer for collecting metrics
trw := newTracerWrapper(nil)
options = append(options, pubsub.WithEventTracer(trw))
} }
// TODO: we want to hook the peer score inspector so that we can gain visibility
// in peer scores for debugging purposes -- this might be trigged by metrics collection
// options = append(options, pubsub.WithPeerScoreInspect(XXX, time.Second))
return pubsub.NewGossipSub(helpers.LifecycleCtx(in.Mctx, in.Lc), in.Host, options...) return pubsub.NewGossipSub(helpers.LifecycleCtx(in.Mctx, in.Lc), in.Host, options...)
} }
@ -318,10 +320,14 @@ func HashMsgId(m *pubsub_pb.Message) string {
} }
func newTracerWrapper(tr pubsub.EventTracer, topics ...string) pubsub.EventTracer { func newTracerWrapper(tr pubsub.EventTracer, topics ...string) pubsub.EventTracer {
topicsMap := make(map[string]struct{}) var topicsMap map[string]struct{}
for _, topic := range topics { if len(topics) > 0 {
topicsMap[topic] = struct{}{} topicsMap = make(map[string]struct{})
for _, topic := range topics {
topicsMap[topic] = struct{}{}
}
} }
return &tracerWrapper{tr: tr, topics: topicsMap} return &tracerWrapper{tr: tr, topics: topicsMap}
} }
@ -347,23 +353,36 @@ func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) {
// distributions. // distributions.
// Furthermore, we only trace message publication and deliveries for specified topics // Furthermore, we only trace message publication and deliveries for specified topics
// (here just the blocks topic). // (here just the blocks topic).
// TODO: hook all events into local metrics for inspection through the dashboard
switch evt.GetType() { switch evt.GetType() {
case pubsub_pb.TraceEvent_PUBLISH_MESSAGE: case pubsub_pb.TraceEvent_PUBLISH_MESSAGE:
if trw.traceMessage(evt.GetPublishMessage().Topics) { stats.Record(context.TODO(), metrics.PubsubPublishMessage.M(1))
if trw.tr != nil && trw.traceMessage(evt.GetPublishMessage().Topics) {
trw.tr.Trace(evt) trw.tr.Trace(evt)
} }
case pubsub_pb.TraceEvent_DELIVER_MESSAGE: case pubsub_pb.TraceEvent_DELIVER_MESSAGE:
if trw.traceMessage(evt.GetDeliverMessage().Topics) { stats.Record(context.TODO(), metrics.PubsubDeliverMessage.M(1))
if trw.tr != nil && trw.traceMessage(evt.GetDeliverMessage().Topics) {
trw.tr.Trace(evt) trw.tr.Trace(evt)
} }
case pubsub_pb.TraceEvent_REJECT_MESSAGE:
stats.Record(context.TODO(), metrics.PubsubRejectMessage.M(1))
case pubsub_pb.TraceEvent_DUPLICATE_MESSAGE:
stats.Record(context.TODO(), metrics.PubsubDuplicateMessage.M(1))
case pubsub_pb.TraceEvent_JOIN: case pubsub_pb.TraceEvent_JOIN:
trw.tr.Trace(evt) if trw.tr != nil {
trw.tr.Trace(evt)
}
case pubsub_pb.TraceEvent_LEAVE: case pubsub_pb.TraceEvent_LEAVE:
trw.tr.Trace(evt) if trw.tr != nil {
trw.tr.Trace(evt)
}
case pubsub_pb.TraceEvent_GRAFT: case pubsub_pb.TraceEvent_GRAFT:
trw.tr.Trace(evt) if trw.tr != nil {
trw.tr.Trace(evt)
}
case pubsub_pb.TraceEvent_PRUNE: case pubsub_pb.TraceEvent_PRUNE:
trw.tr.Trace(evt) if trw.tr != nil {
trw.tr.Trace(evt)
}
} }
} }