diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index ef2c4f343..1e1b0369d 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -291,11 +291,13 @@ #EntriesChunkSize = 16384 # TopicName sets the topic name on which the changes to the advertised content are announced. - # Defaults to '/indexer/ingest/mainnet' if not specified. + # If not explicitly specified, the topic name is automatically inferred from the network name + # in following format: '/indexer/ingest/' + # Defaults to empty, which implies the topic name is inferred from network name. # # type: string # env var: LOTUS_INDEXPROVIDER_TOPICNAME - #TopicName = "/indexer/ingest/mainnet" + #TopicName = "" # PurgeCacheOnStart sets whether to clear any cached entries chunks when the provider engine # starts. By default, the cache is rehydrated from previously cached entries stored in diff --git a/node/config/def.go b/node/config/def.go index 0401b0e44..04c512082 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -199,8 +199,10 @@ func DefaultStorageMiner() *StorageMiner { Enable: true, EntriesCacheCapacity: 1024, EntriesChunkSize: 16384, - TopicName: "/indexer/ingest/mainnet", - PurgeCacheOnStart: false, + // The default empty TopicName means it is inferred from network name, in the following + // format: "/indexer/ingest/" + TopicName: "", + PurgeCacheOnStart: false, }, Subsystems: MinerSubsystemConfig{ diff --git a/node/config/def_test.go b/node/config/def_test.go index 9a450e66b..d644ae336 100644 --- a/node/config/def_test.go +++ b/node/config/def_test.go @@ -74,8 +74,8 @@ func TestDefaultMinerRoundtrip(t *testing.T) { require.True(t, reflect.DeepEqual(c, c2)) } -func TestDefaultStorageMiner_SetsIndexIngestTopic(t *testing.T) { +func TestDefaultStorageMiner_IsEmpty(t *testing.T) { subject := DefaultStorageMiner() require.True(t, subject.IndexProvider.Enable) - require.Equal(t, "/indexer/ingest/mainnet", subject.IndexProvider.TopicName) + require.Equal(t, "", subject.IndexProvider.TopicName) } diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 7d3306d91..cf51fb13e 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -404,7 +404,9 @@ advertisements that include more multihashes than the configured EntriesChunkSiz Type: "string", Comment: `TopicName sets the topic name on which the changes to the advertised content are announced. -Defaults to '/indexer/ingest/mainnet' if not specified.`, +If not explicitly specified, the topic name is automatically inferred from the network name +in following format: '/indexer/ingest/' +Defaults to empty, which implies the topic name is inferred from network name.`, }, { Name: "PurgeCacheOnStart", diff --git a/node/config/types.go b/node/config/types.go index 3d50a14e6..b5b1fae7e 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -186,7 +186,9 @@ type IndexProviderConfig struct { EntriesChunkSize int // TopicName sets the topic name on which the changes to the advertised content are announced. - // Defaults to '/indexer/ingest/mainnet' if not specified. + // If not explicitly specified, the topic name is automatically inferred from the network name + // in following format: '/indexer/ingest/' + // Defaults to empty, which implies the topic name is inferred from network name. TopicName string // PurgeCacheOnStart sets whether to clear any cached entries chunks when the provider engine diff --git a/node/modules/storageminer_idxprov.go b/node/modules/storageminer_idxprov.go index 365648691..1102f8295 100644 --- a/node/modules/storageminer_idxprov.go +++ b/node/modules/storageminer_idxprov.go @@ -13,6 +13,7 @@ import ( "go.uber.org/fx" "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/dtypes" ) @@ -24,8 +25,20 @@ type IdxProv struct { Datastore dtypes.MetadataDS } -func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub) (provider.Interface, error) { - return func(args IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub) (provider.Interface, error) { +func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub, nn dtypes.NetworkName) (provider.Interface, error) { + return func(args IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub, nn dtypes.NetworkName) (provider.Interface, error) { + topicName := cfg.TopicName + // If indexer topic name is left empty, infer it from the network name. + if topicName == "" { + // Use the same mechanism as the Dependency Injection (DI) to construct the topic name, + // so that we are certain it is consistent with the name allowed by the subscription + // filter. + // + // See: lp2p.GossipSub. + topicName = build.IndexerIngestTopic(nn) + log.Debugw("Inferred indexer topic from network name", "topic", topicName) + } + ipds := namespace.Wrap(args.Datastore, datastore.NewKey("/index-provider")) var opts = []engine.Option{ engine.WithDatastore(ipds), @@ -33,24 +46,24 @@ func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHo engine.WithRetrievalAddrs(marketHost.Addrs()...), engine.WithEntriesCacheCapacity(cfg.EntriesCacheCapacity), engine.WithEntriesChunkSize(cfg.EntriesChunkSize), - engine.WithTopicName(cfg.TopicName), + engine.WithTopicName(topicName), engine.WithPurgeCacheOnStart(cfg.PurgeCacheOnStart), } llog := log.With( "idxProvEnabled", cfg.Enable, "pid", marketHost.ID(), - "topic", cfg.TopicName, + "topic", topicName, "retAddrs", marketHost.Addrs()) // If announcements to the network are enabled, then set options for datatransfer publisher. if cfg.Enable { // Join the indexer topic using the market's pubsub instance. Otherwise, the provider // engine would create its own instance of pubsub down the line in go-legs, which has // no validators by default. - t, err := ps.Join(cfg.TopicName) + t, err := ps.Join(topicName) if err != nil { llog.Errorw("Failed to join indexer topic", "err", err) - return nil, xerrors.Errorf("joining indexer topic %s: %w", cfg.TopicName, err) + return nil, xerrors.Errorf("joining indexer topic %s: %w", topicName, err) } // Get the miner ID and set as extra gossip data. @@ -62,9 +75,10 @@ func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHo engine.WithExtraGossipData(ma.Bytes()), engine.WithTopic(t), ) - llog = llog.With("extraGossipData", ma) + llog = llog.With("extraGossipData", ma, "publisher", "data-transfer") } else { opts = append(opts, engine.WithPublisherKind(engine.NoPublisher)) + llog = llog.With("publisher", "none") } // Instantiate the index provider engine. diff --git a/node/modules/storageminer_idxprov_test.go b/node/modules/storageminer_idxprov_test.go new file mode 100644 index 000000000..c9189349b --- /dev/null +++ b/node/modules/storageminer_idxprov_test.go @@ -0,0 +1,97 @@ +package modules_test + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/filecoin-project/go-address" + provider "github.com/filecoin-project/index-provider" + "github.com/ipfs/go-datastore" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/host" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/stretchr/testify/require" + "go.uber.org/fx" + + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/modules" + "github.com/filecoin-project/lotus/node/modules/dtypes" +) + +func Test_IndexProviderTopic(t *testing.T) { + tests := []struct { + name string + givenAllowedTopics []string + givenConfiguredTopic string + givenNetworkName dtypes.NetworkName + wantErr string + }{ + { + name: "Joins configured topic when allowed", + givenAllowedTopics: []string{"fish"}, + givenConfiguredTopic: "fish", + }, + { + name: "Joins topic inferred from network name when allowed", + givenAllowedTopics: []string{"/indexer/ingest/fish"}, + givenNetworkName: "fish", + }, + { + name: "Fails to join configured topic when disallowed", + givenAllowedTopics: []string{"/indexer/ingest/fish"}, + givenConfiguredTopic: "lobster", + wantErr: "joining indexer topic lobster: topic is not allowed by the subscription filter", + }, + { + name: "Fails to join topic inferred from network name when disallowed", + givenAllowedTopics: []string{"/indexer/ingest/fish"}, + givenNetworkName: "lobster", + wantErr: "joining indexer topic /indexer/ingest/lobster: topic is not allowed by the subscription filter", + }, + } + + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + h, err := libp2p.New() + require.NoError(t, err) + defer func() { + require.NoError(t, h.Close()) + }() + + filter := pubsub.WithSubscriptionFilter(pubsub.NewAllowlistSubscriptionFilter(test.givenAllowedTopics...)) + ps, err := pubsub.NewGossipSub(ctx, h, filter) + require.NoError(t, err) + + app := fx.New( + fx.Provide( + func() host.Host { return h }, + func() dtypes.NetworkName { return test.givenNetworkName }, + func() dtypes.MinerAddress { return dtypes.MinerAddress(address.TestAddress) }, + func() dtypes.ProviderDataTransfer { return nil }, + func() *pubsub.PubSub { return ps }, + func() dtypes.MetadataDS { return datastore.NewMapDatastore() }, + modules.IndexProvider(config.IndexProviderConfig{ + Enable: true, + TopicName: test.givenConfiguredTopic, + }), + ), + fx.Invoke(func(p provider.Interface) {}), + ) + err = app.Start(ctx) + + if test.wantErr == "" { + require.NoError(t, err) + err = app.Stop(ctx) + require.NoError(t, err) + } else { + require.True(t, strings.HasSuffix(err.Error(), test.wantErr)) + } + }) + } +}