Merge pull request #8443 from filecoin-project/masih/idxprov-reuse-pubsub

fix: market: Reuse the market `PubSub` in index provider
This commit is contained in:
Łukasz Magiera 2022-04-06 17:51:46 -04:00 committed by GitHub
commit 9f98d0ae4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 25 additions and 4 deletions

View File

@ -75,6 +75,11 @@ func ConfigStorageMiner(c interface{}) Option {
enableLibp2pNode := cfg.Subsystems.EnableMarkets // we enable libp2p nodes if the storage market subsystem is enabled, otherwise we don't enableLibp2pNode := cfg.Subsystems.EnableMarkets // we enable libp2p nodes if the storage market subsystem is enabled, otherwise we don't
return Options( return Options(
// Needed to instantiate pubsub used by index provider via ConfigCommon
Override(new(dtypes.DrandSchedule), modules.BuiltinDrandConfig),
Override(new(dtypes.BootstrapPeers), modules.BuiltinBootstrap),
Override(new(dtypes.DrandBootstrap), modules.DrandBootstrap),
ConfigCommon(&cfg.Common, enableLibp2pNode), ConfigCommon(&cfg.Common, enableLibp2pNode),
Override(CheckFDLimit, modules.CheckFdLimit(build.MinerFDLimit)), // recommend at least 100k FD limit to miners Override(CheckFDLimit, modules.CheckFdLimit(build.MinerFDLimit)), // recommend at least 100k FD limit to miners

View File

@ -9,6 +9,7 @@ import (
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/namespace"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx" "go.uber.org/fx"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -23,8 +24,8 @@ type IdxProv struct {
Datastore dtypes.MetadataDS Datastore dtypes.MetadataDS
} }
func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress) (provider.Interface, error) { func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub) (provider.Interface, error) {
return func(args IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress) (provider.Interface, error) { return func(args IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub) (provider.Interface, error) {
ipds := namespace.Wrap(args.Datastore, datastore.NewKey("/index-provider")) ipds := namespace.Wrap(args.Datastore, datastore.NewKey("/index-provider"))
var opts = []engine.Option{ var opts = []engine.Option{
engine.WithDatastore(ipds), engine.WithDatastore(ipds),
@ -36,16 +37,31 @@ func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHo
engine.WithPurgeCacheOnStart(cfg.PurgeCacheOnStart), engine.WithPurgeCacheOnStart(cfg.PurgeCacheOnStart),
} }
llog := log.With("idxProvEnabled", cfg.Enable, "pid", marketHost.ID(), "retAddrs", marketHost.Addrs()) llog := log.With(
"idxProvEnabled", cfg.Enable,
"pid", marketHost.ID(),
"topic", cfg.TopicName,
"retAddrs", marketHost.Addrs())
// If announcements to the network are enabled, then set options for datatransfer publisher. // If announcements to the network are enabled, then set options for datatransfer publisher.
if cfg.Enable { if cfg.Enable {
// Join the indexer topic using the market's pubsub instance. Otherwise, the provider
// engine would create its own instance of pubsub down the line in go-legs, which has
// no validators by default.
t, err := ps.Join(cfg.TopicName)
if err != nil {
llog.Errorw("Failed to join indexer topic", "err", err)
return nil, xerrors.Errorf("joining indexer topic %s: %w", cfg.TopicName, err)
}
// Get the miner ID and set as extra gossip data. // Get the miner ID and set as extra gossip data.
// The extra data is required by the lotus-specific index-provider gossip message validators. // The extra data is required by the lotus-specific index-provider gossip message validators.
ma := address.Address(maddr) ma := address.Address(maddr)
opts = append(opts, opts = append(opts,
engine.WithPublisherKind(engine.DataTransferPublisher), engine.WithPublisherKind(engine.DataTransferPublisher),
engine.WithDataTransfer(dt), engine.WithDataTransfer(dt),
engine.WithExtraGossipData(ma.Bytes())) engine.WithExtraGossipData(ma.Bytes()),
engine.WithTopic(t),
)
llog = llog.With("extraGossipData", ma) llog = llog.With("extraGossipData", ma)
} else { } else {
opts = append(opts, engine.WithPublisherKind(engine.NoPublisher)) opts = append(opts, engine.WithPublisherKind(engine.NoPublisher))