pubsub: only trace message for the blocks topic
This commit is contained in:
parent
741818ee67
commit
1649c32325
@ -301,7 +301,7 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
trw := newTracerWrapper(tr)
|
trw := newTracerWrapper(tr, build.BlocksTopic(in.Nn))
|
||||||
options = append(options, pubsub.WithEventTracer(trw))
|
options = append(options, pubsub.WithEventTracer(trw))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -317,12 +317,27 @@ func HashMsgId(m *pubsub_pb.Message) string {
|
|||||||
return string(hash[:])
|
return string(hash[:])
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTracerWrapper(tr pubsub.EventTracer) pubsub.EventTracer {
|
func newTracerWrapper(tr pubsub.EventTracer, topics ...string) pubsub.EventTracer {
|
||||||
return &tracerWrapper{tr: tr}
|
topicsMap := make(map[string]struct{})
|
||||||
|
for _, topic := range topics {
|
||||||
|
topicsMap[topic] = struct{}{}
|
||||||
|
}
|
||||||
|
return &tracerWrapper{tr: tr, topics: topicsMap}
|
||||||
}
|
}
|
||||||
|
|
||||||
type tracerWrapper struct {
|
type tracerWrapper struct {
|
||||||
tr pubsub.EventTracer
|
tr pubsub.EventTracer
|
||||||
|
topics map[string]struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (trw *tracerWrapper) traceMessage(topics []string) bool {
|
||||||
|
for _, topic := range topics {
|
||||||
|
_, ok := trw.topics[topic]
|
||||||
|
if ok {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) {
|
func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) {
|
||||||
@ -330,12 +345,18 @@ func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) {
|
|||||||
// JOIN/LEAVE/GRAFT/PRUNE/PUBLISH/DELIVER. This significantly reduces bandwidth usage and still
|
// JOIN/LEAVE/GRAFT/PRUNE/PUBLISH/DELIVER. This significantly reduces bandwidth usage and still
|
||||||
// collects enough data to recover the state of the mesh and compute message delivery latency
|
// collects enough data to recover the state of the mesh and compute message delivery latency
|
||||||
// distributions.
|
// distributions.
|
||||||
|
// Furthermore, we only trace message publication and deliveries for specified topics
|
||||||
|
// (here just the blocks topic).
|
||||||
// TODO: hook all events into local metrics for inspection through the dashboard
|
// TODO: hook all events into local metrics for inspection through the dashboard
|
||||||
switch evt.GetType() {
|
switch evt.GetType() {
|
||||||
case pubsub_pb.TraceEvent_PUBLISH_MESSAGE:
|
case pubsub_pb.TraceEvent_PUBLISH_MESSAGE:
|
||||||
trw.tr.Trace(evt)
|
if trw.traceMessage(evt.GetPublishMessage().Topics) {
|
||||||
|
trw.tr.Trace(evt)
|
||||||
|
}
|
||||||
case pubsub_pb.TraceEvent_DELIVER_MESSAGE:
|
case pubsub_pb.TraceEvent_DELIVER_MESSAGE:
|
||||||
trw.tr.Trace(evt)
|
if trw.traceMessage(evt.GetDeliverMessage().Topics) {
|
||||||
|
trw.tr.Trace(evt)
|
||||||
|
}
|
||||||
case pubsub_pb.TraceEvent_JOIN:
|
case pubsub_pb.TraceEvent_JOIN:
|
||||||
trw.tr.Trace(evt)
|
trw.tr.Trace(evt)
|
||||||
case pubsub_pb.TraceEvent_LEAVE:
|
case pubsub_pb.TraceEvent_LEAVE:
|
||||||
|
Loading…
Reference in New Issue
Block a user