Merge pull request #8526 from masih/idxprov-topic-by-netname

fix: market: Infer index provider topic from network name by default
This commit is contained in:
Jiaying Wang 2022-04-22 15:32:58 +02:00 committed by GitHub
commit 2be9452ced
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 134 additions and 15 deletions

View File

@ -291,11 +291,13 @@
#EntriesChunkSize = 16384 #EntriesChunkSize = 16384
# TopicName sets the topic name on which the changes to the advertised content are announced. # TopicName sets the topic name on which the changes to the advertised content are announced.
# Defaults to '/indexer/ingest/mainnet' if not specified. # If not explicitly specified, the topic name is automatically inferred from the network name
# in following format: '/indexer/ingest/<network-name>'
# Defaults to empty, which implies the topic name is inferred from network name.
# #
# type: string # type: string
# env var: LOTUS_INDEXPROVIDER_TOPICNAME # env var: LOTUS_INDEXPROVIDER_TOPICNAME
#TopicName = "/indexer/ingest/mainnet" #TopicName = ""
# PurgeCacheOnStart sets whether to clear any cached entries chunks when the provider engine # PurgeCacheOnStart sets whether to clear any cached entries chunks when the provider engine
# starts. By default, the cache is rehydrated from previously cached entries stored in # starts. By default, the cache is rehydrated from previously cached entries stored in

View File

@ -199,8 +199,10 @@ func DefaultStorageMiner() *StorageMiner {
Enable: true, Enable: true,
EntriesCacheCapacity: 1024, EntriesCacheCapacity: 1024,
EntriesChunkSize: 16384, EntriesChunkSize: 16384,
TopicName: "/indexer/ingest/mainnet", // The default empty TopicName means it is inferred from network name, in the following
PurgeCacheOnStart: false, // format: "/indexer/ingest/<network-name>"
TopicName: "",
PurgeCacheOnStart: false,
}, },
Subsystems: MinerSubsystemConfig{ Subsystems: MinerSubsystemConfig{

View File

@ -74,8 +74,8 @@ func TestDefaultMinerRoundtrip(t *testing.T) {
require.True(t, reflect.DeepEqual(c, c2)) require.True(t, reflect.DeepEqual(c, c2))
} }
func TestDefaultStorageMiner_SetsIndexIngestTopic(t *testing.T) { func TestDefaultStorageMiner_IsEmpty(t *testing.T) {
subject := DefaultStorageMiner() subject := DefaultStorageMiner()
require.True(t, subject.IndexProvider.Enable) require.True(t, subject.IndexProvider.Enable)
require.Equal(t, "/indexer/ingest/mainnet", subject.IndexProvider.TopicName) require.Equal(t, "", subject.IndexProvider.TopicName)
} }

View File

@ -404,7 +404,9 @@ advertisements that include more multihashes than the configured EntriesChunkSiz
Type: "string", Type: "string",
Comment: `TopicName sets the topic name on which the changes to the advertised content are announced. Comment: `TopicName sets the topic name on which the changes to the advertised content are announced.
Defaults to '/indexer/ingest/mainnet' if not specified.`, If not explicitly specified, the topic name is automatically inferred from the network name
in following format: '/indexer/ingest/<network-name>'
Defaults to empty, which implies the topic name is inferred from network name.`,
}, },
{ {
Name: "PurgeCacheOnStart", Name: "PurgeCacheOnStart",

View File

@ -186,7 +186,9 @@ type IndexProviderConfig struct {
EntriesChunkSize int EntriesChunkSize int
// TopicName sets the topic name on which the changes to the advertised content are announced. // TopicName sets the topic name on which the changes to the advertised content are announced.
// Defaults to '/indexer/ingest/mainnet' if not specified. // If not explicitly specified, the topic name is automatically inferred from the network name
// in following format: '/indexer/ingest/<network-name>'
// Defaults to empty, which implies the topic name is inferred from network name.
TopicName string TopicName string
// PurgeCacheOnStart sets whether to clear any cached entries chunks when the provider engine // PurgeCacheOnStart sets whether to clear any cached entries chunks when the provider engine

View File

@ -13,6 +13,7 @@ import (
"go.uber.org/fx" "go.uber.org/fx"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
) )
@ -24,8 +25,20 @@ type IdxProv struct {
Datastore dtypes.MetadataDS Datastore dtypes.MetadataDS
} }
func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub) (provider.Interface, error) { func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub, nn dtypes.NetworkName) (provider.Interface, error) {
return func(args IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub) (provider.Interface, error) { return func(args IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub, nn dtypes.NetworkName) (provider.Interface, error) {
topicName := cfg.TopicName
// If indexer topic name is left empty, infer it from the network name.
if topicName == "" {
// Use the same mechanism as the Dependency Injection (DI) to construct the topic name,
// so that we are certain it is consistent with the name allowed by the subscription
// filter.
//
// See: lp2p.GossipSub.
topicName = build.IndexerIngestTopic(nn)
log.Debugw("Inferred indexer topic from network name", "topic", topicName)
}
ipds := namespace.Wrap(args.Datastore, datastore.NewKey("/index-provider")) ipds := namespace.Wrap(args.Datastore, datastore.NewKey("/index-provider"))
var opts = []engine.Option{ var opts = []engine.Option{
engine.WithDatastore(ipds), engine.WithDatastore(ipds),
@ -33,24 +46,24 @@ func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHo
engine.WithRetrievalAddrs(marketHost.Addrs()...), engine.WithRetrievalAddrs(marketHost.Addrs()...),
engine.WithEntriesCacheCapacity(cfg.EntriesCacheCapacity), engine.WithEntriesCacheCapacity(cfg.EntriesCacheCapacity),
engine.WithEntriesChunkSize(cfg.EntriesChunkSize), engine.WithEntriesChunkSize(cfg.EntriesChunkSize),
engine.WithTopicName(cfg.TopicName), engine.WithTopicName(topicName),
engine.WithPurgeCacheOnStart(cfg.PurgeCacheOnStart), engine.WithPurgeCacheOnStart(cfg.PurgeCacheOnStart),
} }
llog := log.With( llog := log.With(
"idxProvEnabled", cfg.Enable, "idxProvEnabled", cfg.Enable,
"pid", marketHost.ID(), "pid", marketHost.ID(),
"topic", cfg.TopicName, "topic", topicName,
"retAddrs", marketHost.Addrs()) "retAddrs", marketHost.Addrs())
// If announcements to the network are enabled, then set options for datatransfer publisher. // If announcements to the network are enabled, then set options for datatransfer publisher.
if cfg.Enable { if cfg.Enable {
// Join the indexer topic using the market's pubsub instance. Otherwise, the provider // Join the indexer topic using the market's pubsub instance. Otherwise, the provider
// engine would create its own instance of pubsub down the line in go-legs, which has // engine would create its own instance of pubsub down the line in go-legs, which has
// no validators by default. // no validators by default.
t, err := ps.Join(cfg.TopicName) t, err := ps.Join(topicName)
if err != nil { if err != nil {
llog.Errorw("Failed to join indexer topic", "err", err) llog.Errorw("Failed to join indexer topic", "err", err)
return nil, xerrors.Errorf("joining indexer topic %s: %w", cfg.TopicName, err) return nil, xerrors.Errorf("joining indexer topic %s: %w", topicName, err)
} }
// Get the miner ID and set as extra gossip data. // Get the miner ID and set as extra gossip data.
@ -62,9 +75,10 @@ func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHo
engine.WithExtraGossipData(ma.Bytes()), engine.WithExtraGossipData(ma.Bytes()),
engine.WithTopic(t), engine.WithTopic(t),
) )
llog = llog.With("extraGossipData", ma) llog = llog.With("extraGossipData", ma, "publisher", "data-transfer")
} else { } else {
opts = append(opts, engine.WithPublisherKind(engine.NoPublisher)) opts = append(opts, engine.WithPublisherKind(engine.NoPublisher))
llog = llog.With("publisher", "none")
} }
// Instantiate the index provider engine. // Instantiate the index provider engine.

View File

@ -0,0 +1,97 @@
package modules_test
import (
"context"
"strings"
"testing"
"time"
"github.com/filecoin-project/go-address"
provider "github.com/filecoin-project/index-provider"
"github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/host"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/stretchr/testify/require"
"go.uber.org/fx"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
func Test_IndexProviderTopic(t *testing.T) {
tests := []struct {
name string
givenAllowedTopics []string
givenConfiguredTopic string
givenNetworkName dtypes.NetworkName
wantErr string
}{
{
name: "Joins configured topic when allowed",
givenAllowedTopics: []string{"fish"},
givenConfiguredTopic: "fish",
},
{
name: "Joins topic inferred from network name when allowed",
givenAllowedTopics: []string{"/indexer/ingest/fish"},
givenNetworkName: "fish",
},
{
name: "Fails to join configured topic when disallowed",
givenAllowedTopics: []string{"/indexer/ingest/fish"},
givenConfiguredTopic: "lobster",
wantErr: "joining indexer topic lobster: topic is not allowed by the subscription filter",
},
{
name: "Fails to join topic inferred from network name when disallowed",
givenAllowedTopics: []string{"/indexer/ingest/fish"},
givenNetworkName: "lobster",
wantErr: "joining indexer topic /indexer/ingest/lobster: topic is not allowed by the subscription filter",
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
h, err := libp2p.New()
require.NoError(t, err)
defer func() {
require.NoError(t, h.Close())
}()
filter := pubsub.WithSubscriptionFilter(pubsub.NewAllowlistSubscriptionFilter(test.givenAllowedTopics...))
ps, err := pubsub.NewGossipSub(ctx, h, filter)
require.NoError(t, err)
app := fx.New(
fx.Provide(
func() host.Host { return h },
func() dtypes.NetworkName { return test.givenNetworkName },
func() dtypes.MinerAddress { return dtypes.MinerAddress(address.TestAddress) },
func() dtypes.ProviderDataTransfer { return nil },
func() *pubsub.PubSub { return ps },
func() dtypes.MetadataDS { return datastore.NewMapDatastore() },
modules.IndexProvider(config.IndexProviderConfig{
Enable: true,
TopicName: test.givenConfiguredTopic,
}),
),
fx.Invoke(func(p provider.Interface) {}),
)
err = app.Start(ctx)
if test.wantErr == "" {
require.NoError(t, err)
err = app.Stop(ctx)
require.NoError(t, err)
} else {
require.True(t, strings.HasSuffix(err.Error(), test.wantErr))
}
})
}
}