diff --git a/go.mod b/go.mod index 8d71c3637..06a6799b6 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,6 @@ require ( github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 github.com/filecoin-project/go-fil-markets v1.13.3-0.20211117072527-8713155662ff - github.com/filecoin-project/go-indexer-core v0.2.4 github.com/filecoin-project/go-jsonrpc v0.1.5 github.com/filecoin-project/go-padreader v0.0.1 github.com/filecoin-project/go-paramfetch v0.0.2 @@ -45,6 +44,7 @@ require ( github.com/filecoin-project/go-statemachine v1.0.1 github.com/filecoin-project/go-statestore v0.1.1 github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b + github.com/filecoin-project/index-provider v0.0.0-20211117103856-70cd9b7ab68b github.com/filecoin-project/specs-actors v0.9.14 github.com/filecoin-project/specs-actors/v2 v2.3.5 github.com/filecoin-project/specs-actors/v3 v3.1.1 diff --git a/go.sum b/go.sum index f6da54a9c..4831d889b 100644 --- a/go.sum +++ b/go.sum @@ -351,9 +351,8 @@ github.com/filecoin-project/go-hamt-ipld/v3 v3.0.1/go.mod h1:gXpNmr3oQx8l3o7qkGy github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0 h1:rVVNq0x6RGQIzCo1iiJlGFm9AGIZzeifggxtKMU7zmI= github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0/go.mod h1:bxmzgT8tmeVQA1/gvBwFmYdT8SOFUwB3ovSUfG1Ux0g= github.com/filecoin-project/go-indexer-core v0.2.2/go.mod h1:wV+NmrF8fHG6Xii3ecoZf2JW3laGTe5xtsWz609jo+Y= +github.com/filecoin-project/go-indexer-core v0.2.3 h1:kaUL2r8CuihK53lhmtCScffb7Bzs+N1yRGpwvxzCN+U= github.com/filecoin-project/go-indexer-core v0.2.3/go.mod h1:MSe5aRWmfRB+5syR4yDV+OApgJU+MUJ4rl9VUuzwCfc= -github.com/filecoin-project/go-indexer-core v0.2.4 h1:90vvxoBeNZN+h4W+vZ+VsoxKaDBr/bfZJJNByapGeM0= -github.com/filecoin-project/go-indexer-core v0.2.4/go.mod h1:MSe5aRWmfRB+5syR4yDV+OApgJU+MUJ4rl9VUuzwCfc= github.com/filecoin-project/go-jsonrpc v0.1.5 h1:ckxqZ09ivBAVf5CSmxxrqqNHC7PJm3GYGtYKiNQ+vGk= github.com/filecoin-project/go-jsonrpc v0.1.5/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4= github.com/filecoin-project/go-legs v0.0.0-20211013165050-9ab325b6d2eb/go.mod h1:lKwBnslfNGG7JnsP9uQZl3yK7f74fit1MyHcwuuOP3k= @@ -383,8 +382,9 @@ github.com/filecoin-project/go-statestore v0.1.1 h1:ufMFq00VqnT2CAuDpcGnwLnCX1I/ github.com/filecoin-project/go-statestore v0.1.1/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg= github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/go.mod h1:Q0GQOBtKf1oE10eSXSlhN45kDBdGvEcVOqMiffqX+N8= -github.com/filecoin-project/index-provider v0.0.0-20211116211010-ae6b83454d89 h1:QwKK+eB+7jKbdtQxkBcoiWF2z3LusoPIj2N5UcZsV0w= github.com/filecoin-project/index-provider v0.0.0-20211116211010-ae6b83454d89/go.mod h1:wu0yi7NbT3VzYr3s0n2zheg3mpdSP09A0hBFIQfUs44= +github.com/filecoin-project/index-provider v0.0.0-20211117103856-70cd9b7ab68b h1:qVQpqoguf9+vPONSMQZ3xYVzxzwAITyBHjM238zAr6c= +github.com/filecoin-project/index-provider v0.0.0-20211117103856-70cd9b7ab68b/go.mod h1:wu0yi7NbT3VzYr3s0n2zheg3mpdSP09A0hBFIQfUs44= github.com/filecoin-project/specs-actors v0.9.4/go.mod h1:BStZQzx5x7TmCkLv0Bpa07U6cPKol6fd3w9KjMPZ6Z4= github.com/filecoin-project/specs-actors v0.9.12/go.mod h1:TS1AW/7LbG+615j4NsjMK1qlpAwaFsG9w0V2tg2gSao= github.com/filecoin-project/specs-actors v0.9.13/go.mod h1:TS1AW/7LbG+615j4NsjMK1qlpAwaFsG9w0V2tg2gSao= @@ -427,7 +427,6 @@ github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09 github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= -github.com/gammazero/keymutex v0.0.2 h1:cmpLBJHdEwn+WlR5Z/o9/BN92znSZTp5AKPQDpu1QcI= github.com/gammazero/keymutex v0.0.2/go.mod h1:qtzWCCLMisQUmVa4dvqHVgwfh4BP2YB7JxNDGXnsKrs= github.com/gammazero/radixtree v0.2.5 h1:muPQ4eEgCkUymFWPiVQRuXOQv4IhWg8YXH2r71MoqPM= github.com/gammazero/radixtree v0.2.5/go.mod h1:VPqqCDZ3YZZxAzUUsIF/ytFBigVWV7JIV1Stld8hri0= @@ -910,7 +909,6 @@ github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6/go.mod h1 github.com/ipld/go-ipld-prime-proto v0.1.0/go.mod h1:11zp8f3sHVgIqtb/c9Kr5ZGqpnCLF1IVTNOez9TopzE= github.com/ipld/go-ipld-selector-text-lite v0.0.0 h1:MLU1YUAgd3Z+RfVCXUbvxH1RQjEe+larJ9jmlW1aMgA= github.com/ipld/go-ipld-selector-text-lite v0.0.0/go.mod h1:U2CQmFb+uWzfIEF3I1arrDa5rwtj00PrpiwwCO+k1RM= -github.com/ipld/go-storethehash v0.0.0-20210915160027-d72ca9b0968c h1:izfvqCuEqk2V7BRkh7GCm7lyKC2ItyAbzUu4WgNmggc= github.com/ipld/go-storethehash v0.0.0-20210915160027-d72ca9b0968c/go.mod h1:PwE6iq8TiWJRI3zMGA1RtkFAnrDMK93dLA5SUeu0lH8= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4= diff --git a/itests/deals_concurrent_test.go b/itests/deals_concurrent_test.go index c0458e8d1..6507a5f27 100644 --- a/itests/deals_concurrent_test.go +++ b/itests/deals_concurrent_test.go @@ -7,13 +7,15 @@ import ( "testing" "time" - "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/stretchr/testify/require" datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-state-types/abi" + provider "github.com/filecoin-project/index-provider" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/itests/kit" "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/modules" @@ -46,15 +48,18 @@ func TestDealWithMarketAndMinerNode(t *testing.T) { runTest := func(t *testing.T, n int, fastRetrieval bool, carExport bool) { api.RunningNodeType = api.NodeMiner // TODO(anteva): fix me - client, main, market, _ := kit.EnsembleWithMinerAndMarketNodes(t, kit.ThroughRPC()) + idxProv := shared_testutil.NewMockIndexProvider() + idxProvOpt := kit.ConstructorOpts(node.Override(new(provider.Interface), idxProv)) + client, main, market, _ := kit.EnsembleWithMinerAndMarketNodes(t, kit.ThroughRPC(), idxProvOpt) dh := kit.NewDealHarness(t, client, main, market) dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{ - N: n, - FastRetrieval: fastRetrieval, - CarExport: carExport, - StartEpoch: startEpoch, + N: n, + FastRetrieval: fastRetrieval, + CarExport: carExport, + StartEpoch: startEpoch, + IndexerProvider: idxProv, }) } diff --git a/itests/kit/deals.go b/itests/kit/deals.go index c96d18acf..09527525f 100644 --- a/itests/kit/deals.go +++ b/itests/kit/deals.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" @@ -374,6 +375,7 @@ type RunConcurrentDealsOpts struct { CarExport bool StartEpoch abi.ChainEpoch UseCARFileForStorageDeal bool + IndexerProvider *shared_testutil.MockIndexProvider } func (dh *DealHarness) RunConcurrentDeals(opts RunConcurrentDealsOpts) { @@ -400,6 +402,13 @@ func (dh *DealHarness) RunConcurrentDeals(opts RunConcurrentDealsOpts) { UseCARFileForStorageDeal: opts.UseCARFileForStorageDeal, }) + // Check that the storage provider announced the deal to indexers + if opts.IndexerProvider != nil { + notifs := opts.IndexerProvider.GetNotifs() + _, ok := notifs[string(deal.Bytes())] + require.True(dh.t, ok) + } + dh.t.Logf("retrieving deal %d/%d", i, opts.N) outPath := dh.PerformRetrieval(context.Background(), deal, res.Root, opts.CarExport) diff --git a/markets/dagstore/wrapper.go b/markets/dagstore/wrapper.go index b6c8801de..98c63361f 100644 --- a/markets/dagstore/wrapper.go +++ b/markets/dagstore/wrapper.go @@ -12,8 +12,6 @@ import ( carindex "github.com/ipld/go-car/v2/index" - "github.com/filecoin-project/go-indexer-core/store/storethehash" - "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" levelds "github.com/ipfs/go-ds-leveldb" @@ -87,11 +85,11 @@ func NewDAGStore(cfg config.DAGStoreConfig, minerApi MinerAPI) (*dagstore.DAGSto return nil, nil, xerrors.Errorf("failed to initialise dagstore index repo: %w", err) } - store, err := storethehash.New(indexDir) - if err != nil { - return nil, nil, xerrors.Errorf("failed to initialise store the index: %w", err) - } - topIndex := index.NewInverted(store) + //store, err := storethehash.New(indexDir) + //if err != nil { + //return nil, nil, xerrors.Errorf("failed to initialise store the index: %w", err) + //} + //topIndex := index.NewInverted(store) dcfg := dagstore.Config{ TransientsDir: transientsDir, @@ -100,7 +98,7 @@ func NewDAGStore(cfg config.DAGStoreConfig, minerApi MinerAPI) (*dagstore.DAGSto MountRegistry: registry, FailureCh: failureCh, TraceCh: traceCh, - TopLevelIndex: topIndex, + //TopLevelIndex: topIndex, // not limiting fetches globally, as the Lotus mount does // conditional throttling. MaxConcurrentIndex: cfg.MaxConcurrentIndex, @@ -281,11 +279,6 @@ func (w *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath s return nil } -func (w *Wrapper) GetIterableIndexForPiece(pieceCid cid.Cid) (carindex.IterableIndex, error) { - key := shard.KeyFromCID(pieceCid) - return w.dagst.GetIterableIndex(key) -} - func (w *Wrapper) MigrateDeals(ctx context.Context, deals []storagemarket.MinerDeal) (bool, error) { log := log.Named("migrator") @@ -438,6 +431,10 @@ func (w *Wrapper) GetPiecesContainingBlock(blockCID cid.Cid) ([]cid.Cid, error) return pieceCids, nil } +func (w *Wrapper) GetIterableIndexForPiece(pieceCid cid.Cid) (carindex.IterableIndex, error) { + return w.dagst.GetIterableIndex(shard.KeyFromCID(pieceCid)) +} + func (w *Wrapper) Close() error { // Cancel the context w.cancel() diff --git a/markets/dagstore/wrapper_test.go b/markets/dagstore/wrapper_test.go index 578dc0b74..4cf537373 100644 --- a/markets/dagstore/wrapper_test.go +++ b/markets/dagstore/wrapper_test.go @@ -8,6 +8,10 @@ import ( "testing" "time" + mh "github.com/multiformats/go-multihash" + + carindex "github.com/ipld/go-car/v2/index" + "golang.org/x/xerrors" "github.com/filecoin-project/lotus/node/config" @@ -132,6 +136,14 @@ type mockDagStore struct { close chan struct{} } +func (m *mockDagStore) GetIterableIndex(key shard.Key) (carindex.IterableIndex, error) { + return nil, nil +} + +func (m *mockDagStore) ShardsContainingMultihash(h mh.Multihash) ([]shard.Key, error) { + return nil, nil +} + func (m *mockDagStore) GetShardKeysForCid(c cid.Cid) ([]shard.Key, error) { panic("implement me") } diff --git a/node/builder_miner.go b/node/builder_miner.go index 3447eb3e6..aeb12eb9a 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -12,6 +12,7 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask" "github.com/filecoin-project/go-state-types/abi" + provider "github.com/filecoin-project/index-provider" storage2 "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/api" @@ -163,6 +164,7 @@ func ConfigStorageMiner(c interface{}) Option { Override(HandleRetrievalKey, modules.HandleRetrieval), // Markets (storage) + Override(new(provider.Interface), modules.IndexerProvider(cfg.IndexerProvider)), Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer), Override(new(*storedask.StoredAsk), modules.NewStorageAsk), Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(cfg.Dealmaking, nil)), diff --git a/node/config/def.go b/node/config/def.go index 735107e29..6c518d209 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -176,6 +176,15 @@ func DefaultStorageMiner() *StorageMiner { }, }, + IndexerProvider: IndexerProviderConfig{ + ListenAddresses: []string{ + "/ip4/0.0.0.0/tcp/0", + "/ip6/::/tcp/0", + }, + + MaxSimultaneousTransfers: DefaultSimultaneousTransfers, + }, + Subsystems: MinerSubsystemConfig{ EnableMining: true, EnableSealing: true, diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 296501edc..394f8ab09 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -339,6 +339,22 @@ see https://docs.filecoin.io/mine/lotus/miner-configuration/#using-filters-for-f Comment: ``, }, }, + "IndexerProviderConfig": []DocField{ + { + Name: "ListenAddresses", + Type: "[]string", + + Comment: `Binding address for the libp2p host - 0 means random port. +Format: multiaddress; see https://multiformats.io/multiaddr/`, + }, + { + Name: "MaxSimultaneousTransfers", + Type: "uint64", + + Comment: `The maximum number of simultaneous data transfers between the indexers +and the indexer provider`, + }, + }, "Libp2p": []DocField{ { Name: "ListenAddresses", @@ -814,6 +830,12 @@ Default is 20 (about once a week).`, Comment: ``, }, + { + Name: "IndexerProvider", + Type: "IndexerProviderConfig", + + Comment: ``, + }, { Name: "Sealing", Type: "SealingConfig", diff --git a/node/config/types.go b/node/config/types.go index 1be40029e..44c40e306 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -3,6 +3,8 @@ package config import ( "github.com/ipfs/go-cid" + "github.com/filecoin-project/index-provider/config" + "github.com/filecoin-project/lotus/chain/types" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" ) @@ -43,13 +45,14 @@ type Backup struct { type StorageMiner struct { Common - Subsystems MinerSubsystemConfig - Dealmaking DealmakingConfig - Sealing SealingConfig - Storage sectorstorage.SealerConfig - Fees MinerFeeConfig - Addresses MinerAddressConfig - DAGStore DAGStoreConfig + Subsystems MinerSubsystemConfig + Dealmaking DealmakingConfig + IndexerProvider IndexerProviderConfig + Sealing SealingConfig + Storage sectorstorage.SealerConfig + Fees MinerFeeConfig + Addresses MinerAddressConfig + DAGStore DAGStoreConfig } type DAGStoreConfig struct { @@ -146,6 +149,18 @@ type DealmakingConfig struct { RetrievalPricing *RetrievalPricing } +type IndexerProviderConfig struct { + config.Ingest + + // Binding address for the libp2p host - 0 means random port. + // Format: multiaddress; see https://multiformats.io/multiaddr/ + ListenAddresses []string + + // The maximum number of simultaneous data transfers between the indexers + // and the indexer provider + MaxSimultaneousTransfers uint64 +} + type RetrievalPricing struct { Strategy string // possible values: "default", "external" diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 1a2dfc19f..3389c2bee 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -34,6 +34,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/go-storedcounter" + provider "github.com/filecoin-project/index-provider" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" @@ -581,6 +582,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress, h host.Host, ds dtypes.MetadataDS, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, + indexer provider.Interface, dataTransfer dtypes.ProviderDataTransfer, spn storagemarket.StorageProviderNode, df dtypes.StorageDealFilter, @@ -609,6 +611,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, dsw, + indexer, pieceStore, dataTransfer, spn, diff --git a/node/modules/storageminer_idxprov.go b/node/modules/storageminer_idxprov.go new file mode 100644 index 000000000..7022823a8 --- /dev/null +++ b/node/modules/storageminer_idxprov.go @@ -0,0 +1,158 @@ +package modules + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "golang.org/x/xerrors" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-graphsync" + graphsyncimpl "github.com/ipfs/go-graphsync/impl" + gsnet "github.com/ipfs/go-graphsync/network" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/libp2p/go-libp2p" + ci "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" + "go.uber.org/fx" + + datatransfer "github.com/filecoin-project/go-data-transfer" + dtimpl "github.com/filecoin-project/go-data-transfer/impl" + dtnet "github.com/filecoin-project/go-data-transfer/network" + dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync" + provider "github.com/filecoin-project/index-provider" + "github.com/filecoin-project/index-provider/engine" + p2pserver "github.com/filecoin-project/index-provider/server/provider/libp2p" + "github.com/ipfs/go-datastore/namespace" + "github.com/libp2p/go-libp2p-core/host" + + "github.com/filecoin-project/lotus/build" + marketevents "github.com/filecoin-project/lotus/markets/loggers" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/modules/helpers" + "github.com/filecoin-project/lotus/node/repo" +) + +type IdxProv struct { + fx.In + + helpers.MetricsCtx + fx.Lifecycle + Repo repo.LockedRepo + Datastore dtypes.MetadataDS + PeerID peer.ID + peerstore.Peerstore +} + +func IndexerProvider(cfg config.IndexerProviderConfig) func(params IdxProv) (provider.Interface, error) { + return func(args IdxProv) (provider.Interface, error) { + ipds := namespace.Wrap(args.Datastore, datastore.NewKey("/indexer-provider")) + + pkey := args.Peerstore.PrivKey(args.PeerID) + if pkey == nil { + return nil, fmt.Errorf("missing private key for node ID: %s", args.PeerID.Pretty()) + } + + h, err := createIndexerProviderHost(args.MetricsCtx, args.Lifecycle, pkey, args.Peerstore, cfg.ListenAddresses) + if err != nil { + return nil, xerrors.Errorf("creating indexer provider host: %w", err) + } + + dt, err := newIndexerProviderDataTransfer(cfg, args.MetricsCtx, args.Lifecycle, args.Repo, h, ipds) + if err != nil { + return nil, err + } + + e, err := engine.New(cfg.Ingest, pkey, dt, h, ipds, nil) + if err != nil { + return nil, xerrors.Errorf("creating indexer provider engine: %w", err) + } + + args.Lifecycle.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + // Start the engine + err := e.Start(ctx) + if err != nil { + return xerrors.Errorf("starting indexer provider engine: %s", err) + } + + // Add a handler to libp2p that listens for incoming index requests + p2pserver.New(ctx, h, e) + + return nil + }, + OnStop: func(ctx context.Context) error { + return e.Shutdown(ctx) + }, + }) + + return e, nil + } +} + +func createIndexerProviderHost(mctx helpers.MetricsCtx, lc fx.Lifecycle, pkey ci.PrivKey, pstore peerstore.Peerstore, listenAddrs []string) (host.Host, error) { + ctx := helpers.LifecycleCtx(mctx, lc) + + opts := []libp2p.Option{ + libp2p.Identity(pkey), + libp2p.Peerstore(pstore), + libp2p.ListenAddrStrings(listenAddrs...), + libp2p.Ping(true), + libp2p.UserAgent("lotus-indexer-provider-" + build.UserVersion()), + } + + h, err := libp2p.New(ctx, opts...) + if err != nil { + return nil, err + } + + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return h.Close() + }, + }) + + return h, nil +} + +func newIndexerProviderDataTransfer(cfg config.IndexerProviderConfig, mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, h host.Host, ds datastore.Batching) (datatransfer.Manager, error) { + net := dtnet.NewFromLibp2pHost(h) + + // Set up graphsync + gs := newIndexerProviderGraphsync(cfg, mctx, lc, h) + + // Set up data transfer + dtDs := namespace.Wrap(ds, datastore.NewKey("/datatransfer/transfers")) + transport := dtgstransport.NewTransport(h.ID(), gs, net) + dtPath := filepath.Join(r.Path(), "indexer-provider", "data-transfer") + err := os.MkdirAll(dtPath, 0755) //nolint: gosec + if err != nil && !os.IsExist(err) { + return nil, xerrors.Errorf("creating indexer provider data transfer dir %s: %w", dtPath, err) + } + + dt, err := dtimpl.NewDataTransfer(dtDs, dtPath, net, transport) + if err != nil { + return nil, xerrors.Errorf("creating indexer provider data transfer module: %w", err) + } + + dt.OnReady(marketevents.ReadyLogger("indexer-provider data transfer")) + lc.Append(fx.Hook{ + OnStart: dt.Start, + OnStop: dt.Stop, + }) + return dt, nil +} + +func newIndexerProviderGraphsync(cfg config.IndexerProviderConfig, mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host) graphsync.GraphExchange { + graphsyncNetwork := gsnet.NewFromLibp2pHost(h) + return graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc), + graphsyncNetwork, + cidlink.DefaultLinkSystem(), + graphsyncimpl.MaxInProgressIncomingRequests(cfg.MaxSimultaneousTransfers), + graphsyncimpl.MaxLinksPerIncomingRequests(config.MaxTraversalLinks), + graphsyncimpl.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks)) +}