Reuse the market process PubSub
instance in index provider engine
The markets process instantiates its own `PubSub` instance with all validators, peer scoring, etc. set up. Use that instane to join the indexing topic, otherwise the default topic instantiated by index-provider internally (via go-legs) has no validators.
This commit is contained in:
parent
f378c6d3ee
commit
7e7e88e330
@ -9,6 +9,7 @@ import (
|
|||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
"github.com/ipfs/go-datastore/namespace"
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
@ -23,8 +24,8 @@ type IdxProv struct {
|
|||||||
Datastore dtypes.MetadataDS
|
Datastore dtypes.MetadataDS
|
||||||
}
|
}
|
||||||
|
|
||||||
func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress) (provider.Interface, error) {
|
func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub) (provider.Interface, error) {
|
||||||
return func(args IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress) (provider.Interface, error) {
|
return func(args IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub) (provider.Interface, error) {
|
||||||
ipds := namespace.Wrap(args.Datastore, datastore.NewKey("/index-provider"))
|
ipds := namespace.Wrap(args.Datastore, datastore.NewKey("/index-provider"))
|
||||||
var opts = []engine.Option{
|
var opts = []engine.Option{
|
||||||
engine.WithDatastore(ipds),
|
engine.WithDatastore(ipds),
|
||||||
@ -36,16 +37,31 @@ func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHo
|
|||||||
engine.WithPurgeCacheOnStart(cfg.PurgeCacheOnStart),
|
engine.WithPurgeCacheOnStart(cfg.PurgeCacheOnStart),
|
||||||
}
|
}
|
||||||
|
|
||||||
llog := log.With("idxProvEnabled", cfg.Enable, "pid", marketHost.ID(), "retAddrs", marketHost.Addrs())
|
llog := log.With(
|
||||||
|
"idxProvEnabled", cfg.Enable,
|
||||||
|
"pid", marketHost.ID(),
|
||||||
|
"topic", cfg.TopicName,
|
||||||
|
"retAddrs", marketHost.Addrs())
|
||||||
// If announcements to the network are enabled, then set options for datatransfer publisher.
|
// If announcements to the network are enabled, then set options for datatransfer publisher.
|
||||||
if cfg.Enable {
|
if cfg.Enable {
|
||||||
|
// Join the indexer topic using the market's pubsub instance. Otherwise, the provider
|
||||||
|
// engine would create its own instance of pubsub down the line in go-legs, which has
|
||||||
|
// no validators by default.
|
||||||
|
t, err := ps.Join(cfg.TopicName)
|
||||||
|
if err != nil {
|
||||||
|
llog.Errorw("Failed to join indexer topic", "err", err)
|
||||||
|
return nil, xerrors.Errorf("joining indexer topic %s: %w", cfg.TopicName, err)
|
||||||
|
}
|
||||||
|
|
||||||
// Get the miner ID and set as extra gossip data.
|
// Get the miner ID and set as extra gossip data.
|
||||||
// The extra data is required by the lotus-specific index-provider gossip message validators.
|
// The extra data is required by the lotus-specific index-provider gossip message validators.
|
||||||
ma := address.Address(maddr)
|
ma := address.Address(maddr)
|
||||||
opts = append(opts,
|
opts = append(opts,
|
||||||
engine.WithPublisherKind(engine.DataTransferPublisher),
|
engine.WithPublisherKind(engine.DataTransferPublisher),
|
||||||
engine.WithDataTransfer(dt),
|
engine.WithDataTransfer(dt),
|
||||||
engine.WithExtraGossipData(ma.Bytes()))
|
engine.WithExtraGossipData(ma.Bytes()),
|
||||||
|
engine.WithTopic(t),
|
||||||
|
)
|
||||||
llog = llog.With("extraGossipData", ma)
|
llog = llog.With("extraGossipData", ma)
|
||||||
} else {
|
} else {
|
||||||
opts = append(opts, engine.WithPublisherKind(engine.NoPublisher))
|
opts = append(opts, engine.WithPublisherKind(engine.NoPublisher))
|
||||||
|
Loading…
Reference in New Issue
Block a user