changes to the indexer message relay PR

This commit is contained in:
Aarsh Shah 2022-02-04 12:15:01 +04:00
parent 3ecf478ff0
commit 4691b2b809
3 changed files with 6 additions and 4 deletions

View File

@ -13,7 +13,9 @@ import (
func BlocksTopic(netName dtypes.NetworkName) string { return "/fil/blocks/" + string(netName) } func BlocksTopic(netName dtypes.NetworkName) string { return "/fil/blocks/" + string(netName) }
func MessagesTopic(netName dtypes.NetworkName) string { return "/fil/msgs/" + string(netName) } func MessagesTopic(netName dtypes.NetworkName) string { return "/fil/msgs/" + string(netName) }
func IngestTopic(netName dtypes.NetworkName) string { return "/indexer/ingest/" + string(netName) } func IndexerIngestTopic(netName dtypes.NetworkName) string {
return "/indexer/ingest/" + string(netName)
}
func DhtProtocolName(netName dtypes.NetworkName) protocol.ID { func DhtProtocolName(netName dtypes.NetworkName) protocol.ID {
return protocol.ID("/fil/kad/" + string(netName)) return protocol.ID("/fil/kad/" + string(netName))
} }

View File

@ -333,7 +333,7 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
allowTopics := []string{ allowTopics := []string{
build.BlocksTopic(in.Nn), build.BlocksTopic(in.Nn),
build.MessagesTopic(in.Nn), build.MessagesTopic(in.Nn),
build.IngestTopic(in.Nn), build.IndexerIngestTopic(in.Nn),
} }
allowTopics = append(allowTopics, drandTopics...) allowTopics = append(allowTopics, drandTopics...)
options = append(options, options = append(options,

View File

@ -199,12 +199,12 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub
} }
func RelayIndexerMessages(lc fx.Lifecycle, ps *pubsub.PubSub, nn dtypes.NetworkName, h host.Host) error { func RelayIndexerMessages(lc fx.Lifecycle, ps *pubsub.PubSub, nn dtypes.NetworkName, h host.Host) error {
topicName := build.IngestTopic(nn) topicName := build.IndexerIngestTopic(nn)
v := sub.NewIndexerMessageValidator(h.ID()) v := sub.NewIndexerMessageValidator(h.ID())
if err := ps.RegisterTopicValidator(topicName, v.Validate); err != nil { if err := ps.RegisterTopicValidator(topicName, v.Validate); err != nil {
panic(err) return xerrors.Errorf("failed to register validator for topic %s, err: %w", topicName, err)
} }
topicHandle, err := ps.Join(topicName) topicHandle, err := ps.Join(topicName)