Merge pull request #2731 from filecoin-project/pubsub-block-tracing
pubsub: only trace messages for the blocks topic
This commit is contained in:
commit
bd6a80505b
2
go.mod
2
go.mod
@ -87,7 +87,7 @@ require (
|
||||
github.com/libp2p/go-libp2p-mplex v0.2.4
|
||||
github.com/libp2p/go-libp2p-peer v0.2.0
|
||||
github.com/libp2p/go-libp2p-peerstore v0.2.6
|
||||
github.com/libp2p/go-libp2p-pubsub v0.3.3
|
||||
github.com/libp2p/go-libp2p-pubsub v0.3.4-0.20200731161531-2b5243c72f0d
|
||||
github.com/libp2p/go-libp2p-quic-transport v0.7.1
|
||||
github.com/libp2p/go-libp2p-record v0.1.3
|
||||
github.com/libp2p/go-libp2p-routing-helpers v0.2.3
|
||||
|
4
go.sum
4
go.sum
@ -881,8 +881,8 @@ github.com/libp2p/go-libp2p-protocol v0.0.1/go.mod h1:Af9n4PiruirSDjHycM1QuiMi/1
|
||||
github.com/libp2p/go-libp2p-protocol v0.1.0/go.mod h1:KQPHpAabB57XQxGrXCNvbL6UEXfQqUgC/1adR2Xtflk=
|
||||
github.com/libp2p/go-libp2p-pubsub v0.1.1/go.mod h1:ZwlKzRSe1eGvSIdU5bD7+8RZN/Uzw0t1Bp9R1znpR/Q=
|
||||
github.com/libp2p/go-libp2p-pubsub v0.3.2-0.20200527132641-c0712c6e92cf/go.mod h1:TxPOBuo1FPdsTjFnv+FGZbNbWYsp74Culx+4ViQpato=
|
||||
github.com/libp2p/go-libp2p-pubsub v0.3.3 h1:/AzOAmjDc+IJWybEzhYj1UaV1HErqmo4v3pQVepbgi8=
|
||||
github.com/libp2p/go-libp2p-pubsub v0.3.3/go.mod h1:DTMSVmZZfXodB/pvdTGrY2eHPZ9W2ev7hzTH83OKHrI=
|
||||
github.com/libp2p/go-libp2p-pubsub v0.3.4-0.20200731161531-2b5243c72f0d h1:1kfMc74C1DZGh97VJpA5efPXWU3tmdRF/wKYbFYya/4=
|
||||
github.com/libp2p/go-libp2p-pubsub v0.3.4-0.20200731161531-2b5243c72f0d/go.mod h1:DTMSVmZZfXodB/pvdTGrY2eHPZ9W2ev7hzTH83OKHrI=
|
||||
github.com/libp2p/go-libp2p-quic-transport v0.1.1/go.mod h1:wqG/jzhF3Pu2NrhJEvE+IE0NTHNXslOPn9JQzyCAxzU=
|
||||
github.com/libp2p/go-libp2p-quic-transport v0.5.0 h1:BUN1lgYNUrtv4WLLQ5rQmC9MCJ6uEXusezGvYRNoJXE=
|
||||
github.com/libp2p/go-libp2p-quic-transport v0.5.0/go.mod h1:IEcuC5MLxvZ5KuHKjRu+dr3LjCT1Be3rcD/4d8JrX8M=
|
||||
|
@ -301,7 +301,7 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
trw := newTracerWrapper(tr)
|
||||
trw := newTracerWrapper(tr, build.BlocksTopic(in.Nn))
|
||||
options = append(options, pubsub.WithEventTracer(trw))
|
||||
}
|
||||
|
||||
@ -317,12 +317,27 @@ func HashMsgId(m *pubsub_pb.Message) string {
|
||||
return string(hash[:])
|
||||
}
|
||||
|
||||
func newTracerWrapper(tr pubsub.EventTracer) pubsub.EventTracer {
|
||||
return &tracerWrapper{tr: tr}
|
||||
func newTracerWrapper(tr pubsub.EventTracer, topics ...string) pubsub.EventTracer {
|
||||
topicsMap := make(map[string]struct{})
|
||||
for _, topic := range topics {
|
||||
topicsMap[topic] = struct{}{}
|
||||
}
|
||||
return &tracerWrapper{tr: tr, topics: topicsMap}
|
||||
}
|
||||
|
||||
type tracerWrapper struct {
|
||||
tr pubsub.EventTracer
|
||||
tr pubsub.EventTracer
|
||||
topics map[string]struct{}
|
||||
}
|
||||
|
||||
func (trw *tracerWrapper) traceMessage(topics []string) bool {
|
||||
for _, topic := range topics {
|
||||
_, ok := trw.topics[topic]
|
||||
if ok {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) {
|
||||
@ -330,12 +345,18 @@ func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) {
|
||||
// JOIN/LEAVE/GRAFT/PRUNE/PUBLISH/DELIVER. This significantly reduces bandwidth usage and still
|
||||
// collects enough data to recover the state of the mesh and compute message delivery latency
|
||||
// distributions.
|
||||
// Furthermore, we only trace message publication and deliveries for specified topics
|
||||
// (here just the blocks topic).
|
||||
// TODO: hook all events into local metrics for inspection through the dashboard
|
||||
switch evt.GetType() {
|
||||
case pubsub_pb.TraceEvent_PUBLISH_MESSAGE:
|
||||
trw.tr.Trace(evt)
|
||||
if trw.traceMessage(evt.GetPublishMessage().Topics) {
|
||||
trw.tr.Trace(evt)
|
||||
}
|
||||
case pubsub_pb.TraceEvent_DELIVER_MESSAGE:
|
||||
trw.tr.Trace(evt)
|
||||
if trw.traceMessage(evt.GetDeliverMessage().Topics) {
|
||||
trw.tr.Trace(evt)
|
||||
}
|
||||
case pubsub_pb.TraceEvent_JOIN:
|
||||
trw.tr.Trace(evt)
|
||||
case pubsub_pb.TraceEvent_LEAVE:
|
||||
|
Loading…
Reference in New Issue
Block a user