Lotus chain nodes relay indexer pubsub messages

Content providers announce the availability of indexer data using gossip pubsub.  The content providers are not connected directly to indexers, so the pubsub messages are relayed to indexers via chain nodes. This PR makes chain nodes relay gossip pubsub messages, on the /indexer/ingest/<netname> topic.
This commit is contained in:
gammazero 2022-02-03 14:56:21 -08:00
parent 43cde484f4
commit c084130d3e
7 changed files with 70 additions and 0 deletions

View File

@ -13,6 +13,7 @@ import (
func BlocksTopic(netName dtypes.NetworkName) string { return "/fil/blocks/" + string(netName) } func BlocksTopic(netName dtypes.NetworkName) string { return "/fil/blocks/" + string(netName) }
func MessagesTopic(netName dtypes.NetworkName) string { return "/fil/msgs/" + string(netName) } func MessagesTopic(netName dtypes.NetworkName) string { return "/fil/msgs/" + string(netName) }
func IngestTopic(netName dtypes.NetworkName) string { return "/indexer/ingest/" + string(netName) }
func DhtProtocolName(netName dtypes.NetworkName) protocol.ID { func DhtProtocolName(netName dtypes.NetworkName) protocol.ID {
return protocol.ID("/fil/kad/" + string(netName)) return protocol.ID("/fil/kad/" + string(netName))
} }

View File

@ -444,3 +444,24 @@ func recordFailure(ctx context.Context, metric *stats.Int64Measure, failureType
) )
stats.Record(ctx, metric.M(1)) stats.Record(ctx, metric.M(1))
} }
type IndexerMessageValidator struct {
self peer.ID
}
func NewIndexerMessageValidator(self peer.ID) *IndexerMessageValidator {
return &IndexerMessageValidator{self: self}
}
func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
// This chain-node should not be publishing its own messages. These are
// relayed from miner-nodes or index publishers. If a node appears to be
// local, reject it.
if pid == v.self {
log.Warnf("refusing to relay indexer message from self")
stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1))
return pubsub.ValidationReject
}
stats.Record(ctx, metrics.IndexerMessageValidationSuccess.M(1))
return pubsub.ValidationAccept
}

View File

@ -76,6 +76,8 @@ var (
ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless)
ChainNodeHeightExpected = stats.Int64("chain/node_height_expected", "Expected Height of the node", stats.UnitDimensionless) ChainNodeHeightExpected = stats.Int64("chain/node_height_expected", "Expected Height of the node", stats.UnitDimensionless)
ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless) ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless)
IndexerMessageValidationFailure = stats.Int64("indexer/failure", "Counter for indexer message validation failures", stats.UnitDimensionless)
IndexerMessageValidationSuccess = stats.Int64("indexer/success", "Counter for indexer message validation successes", stats.UnitDimensionless)
MessagePublished = stats.Int64("message/published", "Counter for total locally published messages", stats.UnitDimensionless) MessagePublished = stats.Int64("message/published", "Counter for total locally published messages", stats.UnitDimensionless)
MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless) MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless)
MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless) MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless)
@ -191,6 +193,15 @@ var (
return view.Distribution(bounds...) return view.Distribution(bounds...)
}(), }(),
} }
IndexerMessageValidationFailureView = &view.View{
Measure: IndexerMessageValidationFailure,
Aggregation: view.Count(),
TagKeys: []tag.Key{FailureType, Local},
}
IndexerMessageValidationSuccessView = &view.View{
Measure: IndexerMessageValidationSuccess,
Aggregation: view.Count(),
}
MessagePublishedView = &view.View{ MessagePublishedView = &view.View{
Measure: MessagePublished, Measure: MessagePublished,
Aggregation: view.Count(), Aggregation: view.Count(),
@ -490,6 +501,8 @@ var ChainNodeViews = append([]*view.View{
BlockValidationSuccessView, BlockValidationSuccessView,
BlockValidationDurationView, BlockValidationDurationView,
BlockDelayView, BlockDelayView,
IndexerMessageValidationFailureView,
IndexerMessageValidationSuccessView,
MessagePublishedView, MessagePublishedView,
MessageReceivedView, MessageReceivedView,
MessageValidationFailureView, MessageValidationFailureView,

View File

@ -104,6 +104,8 @@ const (
HandleMigrateClientFundsKey HandleMigrateClientFundsKey
HandlePaymentChannelManagerKey HandlePaymentChannelManagerKey
RelayIndexerMessagesKey
// miner // miner
GetParamsKey GetParamsKey
HandleMigrateProviderFundsKey HandleMigrateProviderFundsKey

View File

@ -134,6 +134,8 @@ var ChainNode = Options(
Override(new(*full.GasPriceCache), full.NewGasPriceCache), Override(new(*full.GasPriceCache), full.NewGasPriceCache),
Override(RelayIndexerMessagesKey, modules.RelayIndexerMessages),
// Lite node API // Lite node API
ApplyIf(isLiteNode, ApplyIf(isLiteNode,
Override(new(messagepool.Provider), messagepool.NewProviderLite), Override(new(messagepool.Provider), messagepool.NewProviderLite),

View File

@ -333,6 +333,7 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
allowTopics := []string{ allowTopics := []string{
build.BlocksTopic(in.Nn), build.BlocksTopic(in.Nn),
build.MessagesTopic(in.Nn), build.MessagesTopic(in.Nn),
build.IngestTopic(in.Nn),
} }
allowTopics = append(allowTopics, drandTopics...) allowTopics = append(allowTopics, drandTopics...)
options = append(options, options = append(options,

View File

@ -198,6 +198,36 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub
waitForSync(stmgr, pubsubMsgsSyncEpochs, subscribe) waitForSync(stmgr, pubsubMsgsSyncEpochs, subscribe)
} }
func RelayIndexerMessages(lc fx.Lifecycle, ps *pubsub.PubSub, nn dtypes.NetworkName, h host.Host) error {
topicName := build.IngestTopic(nn)
v := sub.NewIndexerMessageValidator(h.ID())
if err := ps.RegisterTopicValidator(topicName, v.Validate); err != nil {
panic(err)
}
topicHandle, err := ps.Join(topicName)
if err != nil {
return xerrors.Errorf("failed to join pubsub topic %s: %w", topicName, err)
}
cancelFunc, err := topicHandle.Relay()
if err != nil {
return xerrors.Errorf("failed to relay to pubsub messages for topic %s: %w", topicName, err)
}
// Cancel message relay on shutdown.
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
cancelFunc()
return nil
},
})
log.Infof("relaying messages for pubsub topic %s", topicName)
return nil
}
func NewLocalDiscovery(lc fx.Lifecycle, ds dtypes.MetadataDS) (*discoveryimpl.Local, error) { func NewLocalDiscovery(lc fx.Lifecycle, ds dtypes.MetadataDS) (*discoveryimpl.Local, error) {
local, err := discoveryimpl.NewLocal(namespace.Wrap(ds, datastore.NewKey("/deals/local"))) local, err := discoveryimpl.NewLocal(namespace.Wrap(ds, datastore.NewKey("/deals/local")))
if err != nil { if err != nil {