Update pubsub tracking to trace from lotus and lp2p tracers

This commit is contained in:
Matija Petrunic 2021-08-30 14:38:15 +02:00
parent 522858c871
commit 99fbd7039e

View File

@ -358,11 +358,12 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
return nil, err return nil, err
} }
trw := newTracerWrapper(tr, build.BlocksTopic(in.Nn)) lt := newLotusTracer(tr)
trw := newTracerWrapper(tr, lt, build.BlocksTopic(in.Nn))
options = append(options, pubsub.WithEventTracer(trw)) options = append(options, pubsub.WithEventTracer(trw))
} else { } else {
// still instantiate a tracer for collecting metrics // still instantiate a tracer for collecting metrics
trw := newTracerWrapper(nil) trw := newTracerWrapper(nil, nil)
options = append(options, pubsub.WithEventTracer(trw)) options = append(options, pubsub.WithEventTracer(trw))
} }
@ -374,7 +375,11 @@ func HashMsgId(m *pubsub_pb.Message) string {
return string(hash[:]) return string(hash[:])
} }
func newTracerWrapper(tr pubsub.EventTracer, topics ...string) pubsub.EventTracer { func newTracerWrapper(
lp2pTracer pubsub.EventTracer,
lotusTracer pubsub.EventTracer,
topics ...string,
) pubsub.EventTracer {
var topicsMap map[string]struct{} var topicsMap map[string]struct{}
if len(topics) > 0 { if len(topics) > 0 {
topicsMap = make(map[string]struct{}) topicsMap = make(map[string]struct{})
@ -383,12 +388,13 @@ func newTracerWrapper(tr pubsub.EventTracer, topics ...string) pubsub.EventTrace
} }
} }
return &tracerWrapper{tr: tr, topics: topicsMap} return &tracerWrapper{lp2pTracer: lp2pTracer, lotusTracer: lotusTracer, topics: topicsMap}
} }
type tracerWrapper struct { type tracerWrapper struct {
tr pubsub.EventTracer lp2pTracer pubsub.EventTracer
topics map[string]struct{} lotusTracer pubsub.EventTracer
topics map[string]struct{}
} }
func (trw *tracerWrapper) traceMessage(topic string) bool { func (trw *tracerWrapper) traceMessage(topic string) bool {
@ -406,33 +412,61 @@ func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) {
switch evt.GetType() { switch evt.GetType() {
case pubsub_pb.TraceEvent_PUBLISH_MESSAGE: case pubsub_pb.TraceEvent_PUBLISH_MESSAGE:
stats.Record(context.TODO(), metrics.PubsubPublishMessage.M(1)) stats.Record(context.TODO(), metrics.PubsubPublishMessage.M(1))
if trw.tr != nil && trw.traceMessage(evt.GetPublishMessage().GetTopic()) { if trw.traceMessage(evt.GetPublishMessage().GetTopic()) {
trw.tr.Trace(evt) if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
} }
case pubsub_pb.TraceEvent_DELIVER_MESSAGE: case pubsub_pb.TraceEvent_DELIVER_MESSAGE:
stats.Record(context.TODO(), metrics.PubsubDeliverMessage.M(1)) stats.Record(context.TODO(), metrics.PubsubDeliverMessage.M(1))
if trw.tr != nil && trw.traceMessage(evt.GetDeliverMessage().GetTopic()) { if trw.traceMessage(evt.GetDeliverMessage().GetTopic()) {
trw.tr.Trace(evt) if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
} }
case pubsub_pb.TraceEvent_REJECT_MESSAGE: case pubsub_pb.TraceEvent_REJECT_MESSAGE:
stats.Record(context.TODO(), metrics.PubsubRejectMessage.M(1)) stats.Record(context.TODO(), metrics.PubsubRejectMessage.M(1))
case pubsub_pb.TraceEvent_DUPLICATE_MESSAGE: case pubsub_pb.TraceEvent_DUPLICATE_MESSAGE:
stats.Record(context.TODO(), metrics.PubsubDuplicateMessage.M(1)) stats.Record(context.TODO(), metrics.PubsubDuplicateMessage.M(1))
case pubsub_pb.TraceEvent_JOIN: case pubsub_pb.TraceEvent_JOIN:
if trw.tr != nil { if trw.lp2pTracer != nil {
trw.tr.Trace(evt) trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
} }
case pubsub_pb.TraceEvent_LEAVE: case pubsub_pb.TraceEvent_LEAVE:
if trw.tr != nil { if trw.lp2pTracer != nil {
trw.tr.Trace(evt) trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
} }
case pubsub_pb.TraceEvent_GRAFT: case pubsub_pb.TraceEvent_GRAFT:
if trw.tr != nil { if trw.lp2pTracer != nil {
trw.tr.Trace(evt) trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
} }
case pubsub_pb.TraceEvent_PRUNE: case pubsub_pb.TraceEvent_PRUNE:
if trw.tr != nil { if trw.lp2pTracer != nil {
trw.tr.Trace(evt) trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
} }
case pubsub_pb.TraceEvent_RECV_RPC: case pubsub_pb.TraceEvent_RECV_RPC:
stats.Record(context.TODO(), metrics.PubsubRecvRPC.M(1)) stats.Record(context.TODO(), metrics.PubsubRecvRPC.M(1))