diff --git a/build/params_shared_funcs.go b/build/params_shared_funcs.go index f59fee653..9d5dfa347 100644 --- a/build/params_shared_funcs.go +++ b/build/params_shared_funcs.go @@ -13,6 +13,7 @@ import ( func BlocksTopic(netName dtypes.NetworkName) string { return "/fil/blocks/" + string(netName) } func MessagesTopic(netName dtypes.NetworkName) string { return "/fil/msgs/" + string(netName) } +func IngestTopic(netName dtypes.NetworkName) string { return "/indexer/ingest/" + string(netName) } func DhtProtocolName(netName dtypes.NetworkName) protocol.ID { return protocol.ID("/fil/kad/" + string(netName)) } diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 2e962a249..dd18f25d0 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -444,3 +444,24 @@ func recordFailure(ctx context.Context, metric *stats.Int64Measure, failureType ) stats.Record(ctx, metric.M(1)) } + +type IndexerMessageValidator struct { + self peer.ID +} + +func NewIndexerMessageValidator(self peer.ID) *IndexerMessageValidator { + return &IndexerMessageValidator{self: self} +} + +func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { + // This chain-node should not be publishing its own messages. These are + // relayed from miner-nodes or index publishers. If a node appears to be + // local, reject it. + if pid == v.self { + log.Warnf("refusing to relay indexer message from self") + stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1)) + return pubsub.ValidationReject + } + stats.Record(ctx, metrics.IndexerMessageValidationSuccess.M(1)) + return pubsub.ValidationAccept +} diff --git a/metrics/metrics.go b/metrics/metrics.go index ddd149d8d..745499069 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -76,6 +76,8 @@ var ( ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) ChainNodeHeightExpected = stats.Int64("chain/node_height_expected", "Expected Height of the node", stats.UnitDimensionless) ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless) + IndexerMessageValidationFailure = stats.Int64("indexer/failure", "Counter for indexer message validation failures", stats.UnitDimensionless) + IndexerMessageValidationSuccess = stats.Int64("indexer/success", "Counter for indexer message validation successes", stats.UnitDimensionless) MessagePublished = stats.Int64("message/published", "Counter for total locally published messages", stats.UnitDimensionless) MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless) MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless) @@ -191,6 +193,15 @@ var ( return view.Distribution(bounds...) }(), } + IndexerMessageValidationFailureView = &view.View{ + Measure: IndexerMessageValidationFailure, + Aggregation: view.Count(), + TagKeys: []tag.Key{FailureType, Local}, + } + IndexerMessageValidationSuccessView = &view.View{ + Measure: IndexerMessageValidationSuccess, + Aggregation: view.Count(), + } MessagePublishedView = &view.View{ Measure: MessagePublished, Aggregation: view.Count(), @@ -490,6 +501,8 @@ var ChainNodeViews = append([]*view.View{ BlockValidationSuccessView, BlockValidationDurationView, BlockDelayView, + IndexerMessageValidationFailureView, + IndexerMessageValidationSuccessView, MessagePublishedView, MessageReceivedView, MessageValidationFailureView, diff --git a/node/builder.go b/node/builder.go index 3f2e59503..2f1ca1820 100644 --- a/node/builder.go +++ b/node/builder.go @@ -104,6 +104,8 @@ const ( HandleMigrateClientFundsKey HandlePaymentChannelManagerKey + RelayIndexerMessagesKey + // miner GetParamsKey HandleMigrateProviderFundsKey diff --git a/node/builder_chain.go b/node/builder_chain.go index 11283ec3a..1e568397e 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -134,6 +134,8 @@ var ChainNode = Options( Override(new(*full.GasPriceCache), full.NewGasPriceCache), + Override(RelayIndexerMessagesKey, modules.RelayIndexerMessages), + // Lite node API ApplyIf(isLiteNode, Override(new(messagepool.Provider), messagepool.NewProviderLite), diff --git a/node/modules/lp2p/pubsub.go b/node/modules/lp2p/pubsub.go index 32b85daf3..8fa005fcd 100644 --- a/node/modules/lp2p/pubsub.go +++ b/node/modules/lp2p/pubsub.go @@ -333,6 +333,7 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { allowTopics := []string{ build.BlocksTopic(in.Nn), build.MessagesTopic(in.Nn), + build.IngestTopic(in.Nn), } allowTopics = append(allowTopics, drandTopics...) options = append(options, diff --git a/node/modules/services.go b/node/modules/services.go index 17d4a7476..db470aee2 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -198,6 +198,36 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub waitForSync(stmgr, pubsubMsgsSyncEpochs, subscribe) } +func RelayIndexerMessages(lc fx.Lifecycle, ps *pubsub.PubSub, nn dtypes.NetworkName, h host.Host) error { + topicName := build.IngestTopic(nn) + + v := sub.NewIndexerMessageValidator(h.ID()) + + if err := ps.RegisterTopicValidator(topicName, v.Validate); err != nil { + panic(err) + } + + topicHandle, err := ps.Join(topicName) + if err != nil { + return xerrors.Errorf("failed to join pubsub topic %s: %w", topicName, err) + } + cancelFunc, err := topicHandle.Relay() + if err != nil { + return xerrors.Errorf("failed to relay to pubsub messages for topic %s: %w", topicName, err) + } + + // Cancel message relay on shutdown. + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + cancelFunc() + return nil + }, + }) + + log.Infof("relaying messages for pubsub topic %s", topicName) + return nil +} + func NewLocalDiscovery(lc fx.Lifecycle, ds dtypes.MetadataDS) (*discoveryimpl.Local, error) { local, err := discoveryimpl.NewLocal(namespace.Wrap(ds, datastore.NewKey("/deals/local"))) if err != nil {