From c084130d3eb84a2ecc5acf3ceb63aafec90941f4 Mon Sep 17 00:00:00 2001 From: gammazero Date: Thu, 3 Feb 2022 14:56:21 -0800 Subject: [PATCH] Lotus chain nodes relay indexer pubsub messages Content providers announce the availability of indexer data using gossip pubsub. The content providers are not connected directly to indexers, so the pubsub messages are relayed to indexers via chain nodes. This PR makes chain nodes relay gossip pubsub messages, on the /indexer/ingest/ topic. --- build/params_shared_funcs.go | 1 + chain/sub/incoming.go | 21 +++++++++++++++++++++ metrics/metrics.go | 13 +++++++++++++ node/builder.go | 2 ++ node/builder_chain.go | 2 ++ node/modules/lp2p/pubsub.go | 1 + node/modules/services.go | 30 ++++++++++++++++++++++++++++++ 7 files changed, 70 insertions(+) diff --git a/build/params_shared_funcs.go b/build/params_shared_funcs.go index f59fee653..9d5dfa347 100644 --- a/build/params_shared_funcs.go +++ b/build/params_shared_funcs.go @@ -13,6 +13,7 @@ import ( func BlocksTopic(netName dtypes.NetworkName) string { return "/fil/blocks/" + string(netName) } func MessagesTopic(netName dtypes.NetworkName) string { return "/fil/msgs/" + string(netName) } +func IngestTopic(netName dtypes.NetworkName) string { return "/indexer/ingest/" + string(netName) } func DhtProtocolName(netName dtypes.NetworkName) protocol.ID { return protocol.ID("/fil/kad/" + string(netName)) } diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 2e962a249..dd18f25d0 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -444,3 +444,24 @@ func recordFailure(ctx context.Context, metric *stats.Int64Measure, failureType ) stats.Record(ctx, metric.M(1)) } + +type IndexerMessageValidator struct { + self peer.ID +} + +func NewIndexerMessageValidator(self peer.ID) *IndexerMessageValidator { + return &IndexerMessageValidator{self: self} +} + +func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { + // This chain-node should not be publishing its own messages. These are + // relayed from miner-nodes or index publishers. If a node appears to be + // local, reject it. + if pid == v.self { + log.Warnf("refusing to relay indexer message from self") + stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1)) + return pubsub.ValidationReject + } + stats.Record(ctx, metrics.IndexerMessageValidationSuccess.M(1)) + return pubsub.ValidationAccept +} diff --git a/metrics/metrics.go b/metrics/metrics.go index ddd149d8d..745499069 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -76,6 +76,8 @@ var ( ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) ChainNodeHeightExpected = stats.Int64("chain/node_height_expected", "Expected Height of the node", stats.UnitDimensionless) ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless) + IndexerMessageValidationFailure = stats.Int64("indexer/failure", "Counter for indexer message validation failures", stats.UnitDimensionless) + IndexerMessageValidationSuccess = stats.Int64("indexer/success", "Counter for indexer message validation successes", stats.UnitDimensionless) MessagePublished = stats.Int64("message/published", "Counter for total locally published messages", stats.UnitDimensionless) MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless) MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless) @@ -191,6 +193,15 @@ var ( return view.Distribution(bounds...) }(), } + IndexerMessageValidationFailureView = &view.View{ + Measure: IndexerMessageValidationFailure, + Aggregation: view.Count(), + TagKeys: []tag.Key{FailureType, Local}, + } + IndexerMessageValidationSuccessView = &view.View{ + Measure: IndexerMessageValidationSuccess, + Aggregation: view.Count(), + } MessagePublishedView = &view.View{ Measure: MessagePublished, Aggregation: view.Count(), @@ -490,6 +501,8 @@ var ChainNodeViews = append([]*view.View{ BlockValidationSuccessView, BlockValidationDurationView, BlockDelayView, + IndexerMessageValidationFailureView, + IndexerMessageValidationSuccessView, MessagePublishedView, MessageReceivedView, MessageValidationFailureView, diff --git a/node/builder.go b/node/builder.go index 3f2e59503..2f1ca1820 100644 --- a/node/builder.go +++ b/node/builder.go @@ -104,6 +104,8 @@ const ( HandleMigrateClientFundsKey HandlePaymentChannelManagerKey + RelayIndexerMessagesKey + // miner GetParamsKey HandleMigrateProviderFundsKey diff --git a/node/builder_chain.go b/node/builder_chain.go index 11283ec3a..1e568397e 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -134,6 +134,8 @@ var ChainNode = Options( Override(new(*full.GasPriceCache), full.NewGasPriceCache), + Override(RelayIndexerMessagesKey, modules.RelayIndexerMessages), + // Lite node API ApplyIf(isLiteNode, Override(new(messagepool.Provider), messagepool.NewProviderLite), diff --git a/node/modules/lp2p/pubsub.go b/node/modules/lp2p/pubsub.go index 32b85daf3..8fa005fcd 100644 --- a/node/modules/lp2p/pubsub.go +++ b/node/modules/lp2p/pubsub.go @@ -333,6 +333,7 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { allowTopics := []string{ build.BlocksTopic(in.Nn), build.MessagesTopic(in.Nn), + build.IngestTopic(in.Nn), } allowTopics = append(allowTopics, drandTopics...) options = append(options, diff --git a/node/modules/services.go b/node/modules/services.go index 17d4a7476..db470aee2 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -198,6 +198,36 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub waitForSync(stmgr, pubsubMsgsSyncEpochs, subscribe) } +func RelayIndexerMessages(lc fx.Lifecycle, ps *pubsub.PubSub, nn dtypes.NetworkName, h host.Host) error { + topicName := build.IngestTopic(nn) + + v := sub.NewIndexerMessageValidator(h.ID()) + + if err := ps.RegisterTopicValidator(topicName, v.Validate); err != nil { + panic(err) + } + + topicHandle, err := ps.Join(topicName) + if err != nil { + return xerrors.Errorf("failed to join pubsub topic %s: %w", topicName, err) + } + cancelFunc, err := topicHandle.Relay() + if err != nil { + return xerrors.Errorf("failed to relay to pubsub messages for topic %s: %w", topicName, err) + } + + // Cancel message relay on shutdown. + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + cancelFunc() + return nil + }, + }) + + log.Infof("relaying messages for pubsub topic %s", topicName) + return nil +} + func NewLocalDiscovery(lc fx.Lifecycle, ds dtypes.MetadataDS) (*discoveryimpl.Local, error) { local, err := discoveryimpl.NewLocal(namespace.Wrap(ds, datastore.NewKey("/deals/local"))) if err != nil {