Merge pull request #8026 from gammazero/feat/cid-to-piece-idx

Lotus chain nodes relay indexer pubsub messages
This commit is contained in:
Aarsh Shah 2022-02-04 12:05:46 +04:00 committed by GitHub
commit 3ecf478ff0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 70 additions and 0 deletions

View File

@ -13,6 +13,7 @@ import (
func BlocksTopic(netName dtypes.NetworkName) string { return "/fil/blocks/" + string(netName) }
func MessagesTopic(netName dtypes.NetworkName) string { return "/fil/msgs/" + string(netName) }
func IngestTopic(netName dtypes.NetworkName) string { return "/indexer/ingest/" + string(netName) }
func DhtProtocolName(netName dtypes.NetworkName) protocol.ID {
return protocol.ID("/fil/kad/" + string(netName))
}

View File

@ -444,3 +444,24 @@ func recordFailure(ctx context.Context, metric *stats.Int64Measure, failureType
)
stats.Record(ctx, metric.M(1))
}
type IndexerMessageValidator struct {
self peer.ID
}
func NewIndexerMessageValidator(self peer.ID) *IndexerMessageValidator {
return &IndexerMessageValidator{self: self}
}
func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
// This chain-node should not be publishing its own messages. These are
// relayed from miner-nodes or index publishers. If a node appears to be
// local, reject it.
if pid == v.self {
log.Warnf("refusing to relay indexer message from self")
stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1))
return pubsub.ValidationReject
}
stats.Record(ctx, metrics.IndexerMessageValidationSuccess.M(1))
return pubsub.ValidationAccept
}

View File

@ -76,6 +76,8 @@ var (
ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless)
ChainNodeHeightExpected = stats.Int64("chain/node_height_expected", "Expected Height of the node", stats.UnitDimensionless)
ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless)
IndexerMessageValidationFailure = stats.Int64("indexer/failure", "Counter for indexer message validation failures", stats.UnitDimensionless)
IndexerMessageValidationSuccess = stats.Int64("indexer/success", "Counter for indexer message validation successes", stats.UnitDimensionless)
MessagePublished = stats.Int64("message/published", "Counter for total locally published messages", stats.UnitDimensionless)
MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless)
MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless)
@ -200,6 +202,15 @@ var (
return view.Distribution(bounds...)
}(),
}
IndexerMessageValidationFailureView = &view.View{
Measure: IndexerMessageValidationFailure,
Aggregation: view.Count(),
TagKeys: []tag.Key{FailureType, Local},
}
IndexerMessageValidationSuccessView = &view.View{
Measure: IndexerMessageValidationSuccess,
Aggregation: view.Count(),
}
MessagePublishedView = &view.View{
Measure: MessagePublished,
Aggregation: view.Count(),
@ -532,6 +543,8 @@ var ChainNodeViews = append([]*view.View{
BlockValidationSuccessView,
BlockValidationDurationView,
BlockDelayView,
IndexerMessageValidationFailureView,
IndexerMessageValidationSuccessView,
MessagePublishedView,
MessageReceivedView,
MessageValidationFailureView,

View File

@ -105,6 +105,8 @@ const (
HandleMigrateClientFundsKey
HandlePaymentChannelManagerKey
RelayIndexerMessagesKey
// miner
GetParamsKey
HandleMigrateProviderFundsKey

View File

@ -134,6 +134,8 @@ var ChainNode = Options(
Override(new(*full.GasPriceCache), full.NewGasPriceCache),
Override(RelayIndexerMessagesKey, modules.RelayIndexerMessages),
// Lite node API
ApplyIf(isLiteNode,
Override(new(messagepool.Provider), messagepool.NewProviderLite),

View File

@ -333,6 +333,7 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
allowTopics := []string{
build.BlocksTopic(in.Nn),
build.MessagesTopic(in.Nn),
build.IngestTopic(in.Nn),
}
allowTopics = append(allowTopics, drandTopics...)
options = append(options,

View File

@ -198,6 +198,36 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub
waitForSync(stmgr, pubsubMsgsSyncEpochs, subscribe)
}
func RelayIndexerMessages(lc fx.Lifecycle, ps *pubsub.PubSub, nn dtypes.NetworkName, h host.Host) error {
topicName := build.IngestTopic(nn)
v := sub.NewIndexerMessageValidator(h.ID())
if err := ps.RegisterTopicValidator(topicName, v.Validate); err != nil {
panic(err)
}
topicHandle, err := ps.Join(topicName)
if err != nil {
return xerrors.Errorf("failed to join pubsub topic %s: %w", topicName, err)
}
cancelFunc, err := topicHandle.Relay()
if err != nil {
return xerrors.Errorf("failed to relay to pubsub messages for topic %s: %w", topicName, err)
}
// Cancel message relay on shutdown.
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
cancelFunc()
return nil
},
})
log.Infof("relaying messages for pubsub topic %s", topicName)
return nil
}
func NewLocalDiscovery(lc fx.Lifecycle, ds dtypes.MetadataDS) (*discoveryimpl.Local, error) {
local, err := discoveryimpl.NewLocal(namespace.Wrap(ds, datastore.NewKey("/deals/local")))
if err != nil {