diff --git a/node/modules/storageminer_idxprov.go b/node/modules/storageminer_idxprov.go index 2c656193d..365648691 100644 --- a/node/modules/storageminer_idxprov.go +++ b/node/modules/storageminer_idxprov.go @@ -9,6 +9,7 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" "github.com/libp2p/go-libp2p-core/host" + pubsub "github.com/libp2p/go-libp2p-pubsub" "go.uber.org/fx" "golang.org/x/xerrors" @@ -23,8 +24,8 @@ type IdxProv struct { Datastore dtypes.MetadataDS } -func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress) (provider.Interface, error) { - return func(args IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress) (provider.Interface, error) { +func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub) (provider.Interface, error) { + return func(args IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub) (provider.Interface, error) { ipds := namespace.Wrap(args.Datastore, datastore.NewKey("/index-provider")) var opts = []engine.Option{ engine.WithDatastore(ipds), @@ -36,16 +37,31 @@ func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHo engine.WithPurgeCacheOnStart(cfg.PurgeCacheOnStart), } - llog := log.With("idxProvEnabled", cfg.Enable, "pid", marketHost.ID(), "retAddrs", marketHost.Addrs()) + llog := log.With( + "idxProvEnabled", cfg.Enable, + "pid", marketHost.ID(), + "topic", cfg.TopicName, + "retAddrs", marketHost.Addrs()) // If announcements to the network are enabled, then set options for datatransfer publisher. if cfg.Enable { + // Join the indexer topic using the market's pubsub instance. Otherwise, the provider + // engine would create its own instance of pubsub down the line in go-legs, which has + // no validators by default. + t, err := ps.Join(cfg.TopicName) + if err != nil { + llog.Errorw("Failed to join indexer topic", "err", err) + return nil, xerrors.Errorf("joining indexer topic %s: %w", cfg.TopicName, err) + } + // Get the miner ID and set as extra gossip data. // The extra data is required by the lotus-specific index-provider gossip message validators. ma := address.Address(maddr) opts = append(opts, engine.WithPublisherKind(engine.DataTransferPublisher), engine.WithDataTransfer(dt), - engine.WithExtraGossipData(ma.Bytes())) + engine.WithExtraGossipData(ma.Bytes()), + engine.WithTopic(t), + ) llog = llog.With("extraGossipData", ma) } else { opts = append(opts, engine.WithPublisherKind(engine.NoPublisher))