use subscription filter in pubsub

This commit is contained in:
vyzo 2020-10-08 21:40:36 +03:00
parent 8a175a7465
commit 4cd73f1560

View File

@ -187,13 +187,14 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
build.MessagesTopic(in.Nn): 1, build.MessagesTopic(in.Nn): 1,
} }
var drandTopic string
for _, d := range in.Dr { for _, d := range in.Dr {
topic, err := getDrandTopic(d.Config.ChainInfoJSON) drandTopic, err = getDrandTopic(d.Config.ChainInfoJSON)
if err != nil { if err != nil {
return nil, err return nil, err
} }
topicParams[topic] = drandTopicParams topicParams[drandTopic] = drandTopicParams
pgTopicWeights[topic] = 5 pgTopicWeights[drandTopic] = 5
} }
options := []pubsub.Option{ options := []pubsub.Option{
@ -308,6 +309,14 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
} }
options = append(options, pubsub.WithPeerGater(pgParams)) options = append(options, pubsub.WithPeerGater(pgParams))
options = append(options,
pubsub.WithSubscriptionFilter(
pubsub.WrapLimitSubscriptionFilter(
pubsub.NewAllowlistSubscriptionFilter(
build.BlocksTopic(in.Nn),
build.MessagesTopic(in.Nn),
drandTopic),
100)))
// tracer // tracer
if in.Cfg.RemoteTracer != "" { if in.Cfg.RemoteTracer != "" {
@ -359,14 +368,9 @@ type tracerWrapper struct {
topics map[string]struct{} topics map[string]struct{}
} }
func (trw *tracerWrapper) traceMessage(topics []string) bool { func (trw *tracerWrapper) traceMessage(topic string) bool {
for _, topic := range topics {
_, ok := trw.topics[topic] _, ok := trw.topics[topic]
if ok { return ok
return true
}
}
return false
} }
func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) { func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) {
@ -379,12 +383,12 @@ func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) {
switch evt.GetType() { switch evt.GetType() {
case pubsub_pb.TraceEvent_PUBLISH_MESSAGE: case pubsub_pb.TraceEvent_PUBLISH_MESSAGE:
stats.Record(context.TODO(), metrics.PubsubPublishMessage.M(1)) stats.Record(context.TODO(), metrics.PubsubPublishMessage.M(1))
if trw.tr != nil && trw.traceMessage(evt.GetPublishMessage().Topics) { if trw.tr != nil && trw.traceMessage(evt.GetPublishMessage().GetTopic()) {
trw.tr.Trace(evt) trw.tr.Trace(evt)
} }
case pubsub_pb.TraceEvent_DELIVER_MESSAGE: case pubsub_pb.TraceEvent_DELIVER_MESSAGE:
stats.Record(context.TODO(), metrics.PubsubDeliverMessage.M(1)) stats.Record(context.TODO(), metrics.PubsubDeliverMessage.M(1))
if trw.tr != nil && trw.traceMessage(evt.GetDeliverMessage().Topics) { if trw.tr != nil && trw.traceMessage(evt.GetDeliverMessage().GetTopic()) {
trw.tr.Trace(evt) trw.tr.Trace(evt)
} }
case pubsub_pb.TraceEvent_REJECT_MESSAGE: case pubsub_pb.TraceEvent_REJECT_MESSAGE: