refactor indexprovider libp2p host connection to fullnode with meshcreator

This commit is contained in:
Anton Evangelatov 2022-02-03 15:44:18 +01:00
parent 20fc5ffa38
commit 595ad44ee7
16 changed files with 122 additions and 57 deletions

View File

@ -257,27 +257,27 @@
#Path = ""
[IndexerProvider]
# env var: LOTUS_INDEXERPROVIDER_LINKCACHESIZE
[IndexProvider]
# env var: LOTUS_INDEXPROVIDER_LINKCACHESIZE
#LinkCacheSize = 1024
# env var: LOTUS_INDEXERPROVIDER_LINKEDCHUNKSIZE
# env var: LOTUS_INDEXPROVIDER_LINKEDCHUNKSIZE
#LinkedChunkSize = 16384
# env var: LOTUS_INDEXERPROVIDER_PUBSUBTOPIC
# env var: LOTUS_INDEXPROVIDER_PUBSUBTOPIC
#PubSubTopic = "/indexer/ingest/mainnet"
# env var: LOTUS_INDEXERPROVIDER_PURGELINKCACHE
# env var: LOTUS_INDEXPROVIDER_PURGELINKCACHE
#PurgeLinkCache = false
# env var: LOTUS_INDEXERPROVIDER_PUBLISHERKIND
# env var: LOTUS_INDEXPROVIDER_PUBLISHERKIND
#PublisherKind = "dtsync"
# Binding address for the libp2p host - 0 means random port.
# Format: multiaddress; see https://multiformats.io/multiaddr/
#
# type: []string
# env var: LOTUS_INDEXERPROVIDER_LISTENADDRESSES
# env var: LOTUS_INDEXPROVIDER_LISTENADDRESSES
#ListenAddresses = ["/ip4/0.0.0.0/tcp/0", "/ip6/::/tcp/0"]
# Addresses to explicitly announce to other peers. If not specified,
@ -285,18 +285,18 @@
# Format: multiaddress
#
# type: []string
# env var: LOTUS_INDEXERPROVIDER_ANNOUNCEADDRESSES
# env var: LOTUS_INDEXPROVIDER_ANNOUNCEADDRESSES
#AnnounceAddresses = []
# The maximum number of simultaneous data transfers between the indexers
# and the indexer provider
#
# type: uint64
# env var: LOTUS_INDEXERPROVIDER_MAXSIMULTANEOUSTRANSFERS
# env var: LOTUS_INDEXPROVIDER_MAXSIMULTANEOUSTRANSFERS
#MaxSimultaneousTransfers = 20
[IndexerProvider.HttpPublisher]
# env var: LOTUS_INDEXERPROVIDER_HTTPPUBLISHER_LISTENMULTIADDR
[IndexProvider.HttpPublisher]
# env var: LOTUS_INDEXPROVIDER_HTTPPUBLISHER_LISTENMULTIADDR
#ListenMultiaddr = "/ip4/0.0.0.0/tcp/3104/http"

2
go.mod
View File

@ -36,7 +36,7 @@ require (
github.com/filecoin-project/go-data-transfer v1.14.0
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-fil-markets v1.19.1-0.20220202151058-ea0c8f1c8630
github.com/filecoin-project/go-fil-markets v1.19.1-0.20220203143446-7ec33d5dc6ee
github.com/filecoin-project/go-indexer-core v0.2.8
github.com/filecoin-project/go-jsonrpc v0.1.5
github.com/filecoin-project/go-padreader v0.0.1

2
go.sum
View File

@ -338,6 +338,8 @@ github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7L
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8=
github.com/filecoin-project/go-fil-markets v1.19.1-0.20220202151058-ea0c8f1c8630 h1:h46LVtyElHCjLt7JtIPHjLId+H65izL0ytZ9EC/hA44=
github.com/filecoin-project/go-fil-markets v1.19.1-0.20220202151058-ea0c8f1c8630/go.mod h1:gnZw2JAtaGcAwGA7ASD3cvO7Laz1N06VLP9FVEQfnRg=
github.com/filecoin-project/go-fil-markets v1.19.1-0.20220203143446-7ec33d5dc6ee h1:mL4jV1+c5jA9823E4DUfUVSSqPS/jGz75QuIOXVLqvE=
github.com/filecoin-project/go-fil-markets v1.19.1-0.20220203143446-7ec33d5dc6ee/go.mod h1:gnZw2JAtaGcAwGA7ASD3cvO7Laz1N06VLP9FVEQfnRg=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=

View File

@ -55,11 +55,11 @@ func TestDealWithMarketAndMinerNode(t *testing.T) {
dh := kit.NewDealHarness(t, client, main, market)
dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{
N: n,
FastRetrieval: fastRetrieval,
CarExport: carExport,
StartEpoch: startEpoch,
IndexerProvider: idxProv,
N: n,
FastRetrieval: fastRetrieval,
CarExport: carExport,
StartEpoch: startEpoch,
IndexProvider: idxProv,
})
}

View File

@ -406,7 +406,7 @@ type RunConcurrentDealsOpts struct {
CarExport bool
StartEpoch abi.ChainEpoch
UseCARFileForStorageDeal bool
IndexerProvider *shared_testutil.MockIndexProvider
IndexProvider *shared_testutil.MockIndexProvider
}
func (dh *DealHarness) RunConcurrentDeals(opts RunConcurrentDealsOpts) {
@ -434,8 +434,8 @@ func (dh *DealHarness) RunConcurrentDeals(opts RunConcurrentDealsOpts) {
})
// Check that the storage provider announced the deal to indexers
if opts.IndexerProvider != nil {
notifs := opts.IndexerProvider.GetNotifs()
if opts.IndexProvider != nil {
notifs := opts.IndexProvider.GetNotifs()
_, ok := notifs[string(deal.Bytes())]
require.True(dh.t, ok)
}

View File

@ -36,6 +36,8 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/mock"
"github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/markets/idxprov"
idxprov_test "github.com/filecoin-project/lotus/markets/idxprov/idxprov_test"
lotusminer "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
@ -543,7 +545,7 @@ func (n *Ensemble) Start() *Ensemble {
if m.options.subsystems.Has(SMarkets) {
opts = append(opts,
node.Override(new(*modules.IdxProvHost), node.MockHost(n.mn)),
node.Override(new(idxprov.MeshCreator), idxprov_test.NewNoopMeshCreator),
)
}

5
markets/idxprov/host.go Normal file
View File

@ -0,0 +1,5 @@
package idxprov
import "github.com/libp2p/go-libp2p-core/host"
type Host host.Host

View File

@ -0,0 +1,16 @@
package idxprov_test
import (
"context"
)
type NoopMeshCreator struct {
}
func NewNoopMeshCreator() *NoopMeshCreator {
return &NoopMeshCreator{}
}
func (mc NoopMeshCreator) Connect(ctx context.Context) error {
return nil
}

40
markets/idxprov/mesh.go Normal file
View File

@ -0,0 +1,40 @@
package idxprov
import (
"context"
"fmt"
"github.com/filecoin-project/lotus/api/v1api"
logging "github.com/ipfs/go-log/v2"
)
var log = logging.Logger("idxprov")
type MeshCreator interface {
Connect(ctx context.Context) error
}
type Libp2pMeshCreator struct {
fullnodeApi v1api.FullNode
idxProvHost Host
}
func (mc Libp2pMeshCreator) Connect(ctx context.Context) error {
addrs, err := mc.fullnodeApi.NetAddrsListen(ctx)
if err != nil {
return err
}
if err := mc.idxProvHost.Connect(ctx, addrs); err != nil {
return fmt.Errorf("failed to connect index provider host with the full node: %w", err)
}
mc.idxProvHost.ConnManager().Protect(addrs.ID, "markets")
log.Debugw("successfully connected to full node", "fullNodeInfo", addrs.String())
return nil
}
func NewMeshCreator(fullnodeApi v1api.FullNode, idxProvHost Host) MeshCreator {
return Libp2pMeshCreator{fullnodeApi, idxProvHost}
}

View File

@ -26,6 +26,7 @@ import (
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/markets/dagstore"
"github.com/filecoin-project/lotus/markets/dealfilter"
"github.com/filecoin-project/lotus/markets/idxprov"
"github.com/filecoin-project/lotus/markets/retrievaladapter"
"github.com/filecoin-project/lotus/markets/sectoraccessor"
"github.com/filecoin-project/lotus/markets/storageadapter"
@ -168,8 +169,9 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(dtypes.ProviderTransferNetwork), modules.NewProviderTransferNetwork),
Override(new(dtypes.ProviderTransport), modules.NewProviderTransport),
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDataTransfer),
Override(new(*modules.IdxProvHost), modules.IndexerProviderHost(cfg.IndexerProvider)),
Override(new(provider.Interface), modules.IndexerProvider(cfg.IndexerProvider)),
Override(new(idxprov.MeshCreator), idxprov.NewMeshCreator),
Override(new(idxprov.Host), modules.IndexProviderHost(cfg.IndexProvider)),
Override(new(provider.Interface), modules.IndexProvider(cfg.IndexProvider)),
Override(new(*storedask.StoredAsk), modules.NewStorageAsk),
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(cfg.Dealmaking, nil)),
Override(new(storagemarket.StorageProvider), modules.StorageProvider),

View File

@ -181,7 +181,7 @@ func DefaultStorageMiner() *StorageMiner {
},
},
IndexerProvider: IndexerProviderConfig{
IndexProvider: IndexProviderConfig{
ListenAddresses: []string{
"/ip4/0.0.0.0/tcp/0",
"/ip6/::/tcp/0",
@ -234,7 +234,7 @@ func DefaultStorageMiner() *StorageMiner {
// TODO: Remove hardcoded defaults once provider library exposes them.
// See: https://github.com/filecoin-project/index-provider/issues/108
cfg.IndexerProvider.Ingest = ipconfig.NewIngest()
cfg.IndexProvider.Ingest = ipconfig.NewIngest()
cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http"
cfg.Common.API.RemoteListenAddress = "127.0.0.1:2345"

View File

@ -55,5 +55,5 @@ func TestDefaultMinerRoundtrip(t *testing.T) {
func TestDefaultStorageMiner_SetsIndexIngestTopic(t *testing.T) {
subject := DefaultStorageMiner()
require.Equal(t, "/indexer/ingest/mainnet", subject.IndexerProvider.PubSubTopic)
require.Equal(t, "/indexer/ingest/mainnet", subject.IndexProvider.PubSubTopic)
}

View File

@ -358,7 +358,7 @@ see https://docs.filecoin.io/mine/lotus/miner-configuration/#using-filters-for-f
Comment: ``,
},
},
"IndexerProviderConfig": []DocField{
"IndexProviderConfig": []DocField{
{
Name: "ListenAddresses",
Type: "[]string",
@ -858,8 +858,8 @@ Default is 20 (about once a week).`,
Comment: ``,
},
{
Name: "IndexerProvider",
Type: "IndexerProviderConfig",
Name: "IndexProvider",
Type: "IndexProviderConfig",
Comment: ``,
},

View File

@ -45,14 +45,14 @@ type Backup struct {
type StorageMiner struct {
Common
Subsystems MinerSubsystemConfig
Dealmaking DealmakingConfig
IndexerProvider IndexerProviderConfig
Sealing SealingConfig
Storage sectorstorage.SealerConfig
Fees MinerFeeConfig
Addresses MinerAddressConfig
DAGStore DAGStoreConfig
Subsystems MinerSubsystemConfig
Dealmaking DealmakingConfig
IndexProvider IndexProviderConfig
Sealing SealingConfig
Storage sectorstorage.SealerConfig
Fees MinerFeeConfig
Addresses MinerAddressConfig
DAGStore DAGStoreConfig
}
type DAGStoreConfig struct {
@ -161,7 +161,7 @@ type DealmakingConfig struct {
RetrievalPricing *RetrievalPricing
}
type IndexerProviderConfig struct {
type IndexProviderConfig struct {
config.Ingest
// Binding address for the libp2p host - 0 means random port.

View File

@ -62,6 +62,7 @@ import (
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/markets"
"github.com/filecoin-project/lotus/markets/dagstore"
"github.com/filecoin-project/lotus/markets/idxprov"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/markets/pricing"
lotusminer "github.com/filecoin-project/lotus/miner"
@ -590,8 +591,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
spn storagemarket.StorageProviderNode,
df dtypes.StorageDealFilter,
dsw *dagstore.Wrapper,
fullnodeApi v1api.FullNode,
idxProvHost *IdxProvHost,
meshCreator idxprov.MeshCreator,
) (storagemarket.StorageProvider, error) {
net := smnet.NewFromLibp2pHost(h)
@ -622,8 +622,9 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
spn,
address.Address(minerAddress),
storedAsk,
fullnodeApi,
idxProvHost.Host,
meshCreator,
//fullnodeApi,
//idxProvHost,
opt,
)
}

View File

@ -29,6 +29,7 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/markets/idxprov"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
@ -48,28 +49,24 @@ type IdxProv struct {
peerstore.Peerstore
}
type IdxProvHost struct {
host.Host
}
func IndexerProviderHost(cfg config.IndexerProviderConfig) func(IdxProv) (*IdxProvHost, error) {
return func(args IdxProv) (*IdxProvHost, error) {
func IndexProviderHost(cfg config.IndexProviderConfig) func(IdxProv) (idxprov.Host, error) {
return func(args IdxProv) (idxprov.Host, error) {
pkey := args.Peerstore.PrivKey(args.PeerID)
if pkey == nil {
return nil, fmt.Errorf("missing private key for node ID: %s", args.PeerID.Pretty())
}
h, err := createIndexerProviderHost(args.MetricsCtx, args.Lifecycle, pkey, args.Peerstore, cfg.ListenAddresses, cfg.AnnounceAddresses)
h, err := createIndexProviderHost(args.MetricsCtx, args.Lifecycle, pkey, args.Peerstore, cfg.ListenAddresses, cfg.AnnounceAddresses)
if err != nil {
return nil, xerrors.Errorf("creating indexer provider host: %w", err)
}
return &IdxProvHost{h}, nil
return h, nil
}
}
func IndexerProvider(cfg config.IndexerProviderConfig) func(params IdxProv, marketHost host.Host, h *IdxProvHost) (provider.Interface, error) {
return func(args IdxProv, marketHost host.Host, h *IdxProvHost) (provider.Interface, error) {
func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, h idxprov.Host) (provider.Interface, error) {
return func(args IdxProv, marketHost host.Host, h idxprov.Host) (provider.Interface, error) {
ipds := namespace.Wrap(args.Datastore, datastore.NewKey("/indexer-provider"))
pkey := args.Peerstore.PrivKey(args.PeerID)
@ -77,7 +74,7 @@ func IndexerProvider(cfg config.IndexerProviderConfig) func(params IdxProv, mark
return nil, fmt.Errorf("missing private key for node ID: %s", args.PeerID.Pretty())
}
dt, err := newIndexerProviderDataTransfer(cfg, args.MetricsCtx, args.Lifecycle, args.Repo, h, ipds)
dt, err := newIndexProviderDataTransfer(cfg, args.MetricsCtx, args.Lifecycle, args.Repo, h, ipds)
if err != nil {
return nil, err
}
@ -111,7 +108,7 @@ func IndexerProvider(cfg config.IndexerProviderConfig) func(params IdxProv, mark
}
}
func createIndexerProviderHost(mctx helpers.MetricsCtx, lc fx.Lifecycle, pkey ci.PrivKey, pstore peerstore.Peerstore, listenAddrs []string, announceAddrs []string) (host.Host, error) {
func createIndexProviderHost(mctx helpers.MetricsCtx, lc fx.Lifecycle, pkey ci.PrivKey, pstore peerstore.Peerstore, listenAddrs []string, announceAddrs []string) (host.Host, error) {
addrsFactory, err := lp2p.MakeAddrsFactory(announceAddrs, nil)
if err != nil {
return nil, err
@ -140,11 +137,11 @@ func createIndexerProviderHost(mctx helpers.MetricsCtx, lc fx.Lifecycle, pkey ci
return h, nil
}
func newIndexerProviderDataTransfer(cfg config.IndexerProviderConfig, mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, h host.Host, ds datastore.Batching) (datatransfer.Manager, error) {
func newIndexProviderDataTransfer(cfg config.IndexProviderConfig, mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, h host.Host, ds datastore.Batching) (datatransfer.Manager, error) {
net := dtnet.NewFromLibp2pHost(h)
// Set up graphsync
gs := newIndexerProviderGraphsync(cfg, mctx, lc, h)
gs := newIndexProviderGraphsync(cfg, mctx, lc, h)
// Set up data transfer
dtDs := namespace.Wrap(ds, datastore.NewKey("/datatransfer/transfers"))
@ -168,7 +165,7 @@ func newIndexerProviderDataTransfer(cfg config.IndexerProviderConfig, mctx helpe
return dt, nil
}
func newIndexerProviderGraphsync(cfg config.IndexerProviderConfig, mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host) graphsync.GraphExchange {
func newIndexProviderGraphsync(cfg config.IndexProviderConfig, mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host) graphsync.GraphExchange {
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
return graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc),
graphsyncNetwork,