Merge pull request #7622 from filecoin-project/feat/indexer-provider
Integrate indexer provider
This commit is contained in:
commit
21e27b8034
2
go.mod
2
go.mod
@ -37,7 +37,6 @@ require (
|
||||
github.com/filecoin-project/go-fil-commcid v0.1.0
|
||||
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
|
||||
github.com/filecoin-project/go-fil-markets v1.13.3-0.20211117072527-8713155662ff
|
||||
github.com/filecoin-project/go-indexer-core v0.2.4
|
||||
github.com/filecoin-project/go-jsonrpc v0.1.5
|
||||
github.com/filecoin-project/go-padreader v0.0.1
|
||||
github.com/filecoin-project/go-paramfetch v0.0.2
|
||||
@ -45,6 +44,7 @@ require (
|
||||
github.com/filecoin-project/go-statemachine v1.0.1
|
||||
github.com/filecoin-project/go-statestore v0.1.1
|
||||
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
|
||||
github.com/filecoin-project/index-provider v0.0.0-20211117103856-70cd9b7ab68b
|
||||
github.com/filecoin-project/specs-actors v0.9.14
|
||||
github.com/filecoin-project/specs-actors/v2 v2.3.5
|
||||
github.com/filecoin-project/specs-actors/v3 v3.1.1
|
||||
|
8
go.sum
8
go.sum
@ -351,9 +351,8 @@ github.com/filecoin-project/go-hamt-ipld/v3 v3.0.1/go.mod h1:gXpNmr3oQx8l3o7qkGy
|
||||
github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0 h1:rVVNq0x6RGQIzCo1iiJlGFm9AGIZzeifggxtKMU7zmI=
|
||||
github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0/go.mod h1:bxmzgT8tmeVQA1/gvBwFmYdT8SOFUwB3ovSUfG1Ux0g=
|
||||
github.com/filecoin-project/go-indexer-core v0.2.2/go.mod h1:wV+NmrF8fHG6Xii3ecoZf2JW3laGTe5xtsWz609jo+Y=
|
||||
github.com/filecoin-project/go-indexer-core v0.2.3 h1:kaUL2r8CuihK53lhmtCScffb7Bzs+N1yRGpwvxzCN+U=
|
||||
github.com/filecoin-project/go-indexer-core v0.2.3/go.mod h1:MSe5aRWmfRB+5syR4yDV+OApgJU+MUJ4rl9VUuzwCfc=
|
||||
github.com/filecoin-project/go-indexer-core v0.2.4 h1:90vvxoBeNZN+h4W+vZ+VsoxKaDBr/bfZJJNByapGeM0=
|
||||
github.com/filecoin-project/go-indexer-core v0.2.4/go.mod h1:MSe5aRWmfRB+5syR4yDV+OApgJU+MUJ4rl9VUuzwCfc=
|
||||
github.com/filecoin-project/go-jsonrpc v0.1.5 h1:ckxqZ09ivBAVf5CSmxxrqqNHC7PJm3GYGtYKiNQ+vGk=
|
||||
github.com/filecoin-project/go-jsonrpc v0.1.5/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4=
|
||||
github.com/filecoin-project/go-legs v0.0.0-20211013165050-9ab325b6d2eb/go.mod h1:lKwBnslfNGG7JnsP9uQZl3yK7f74fit1MyHcwuuOP3k=
|
||||
@ -383,8 +382,9 @@ github.com/filecoin-project/go-statestore v0.1.1 h1:ufMFq00VqnT2CAuDpcGnwLnCX1I/
|
||||
github.com/filecoin-project/go-statestore v0.1.1/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
|
||||
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg=
|
||||
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/go.mod h1:Q0GQOBtKf1oE10eSXSlhN45kDBdGvEcVOqMiffqX+N8=
|
||||
github.com/filecoin-project/index-provider v0.0.0-20211116211010-ae6b83454d89 h1:QwKK+eB+7jKbdtQxkBcoiWF2z3LusoPIj2N5UcZsV0w=
|
||||
github.com/filecoin-project/index-provider v0.0.0-20211116211010-ae6b83454d89/go.mod h1:wu0yi7NbT3VzYr3s0n2zheg3mpdSP09A0hBFIQfUs44=
|
||||
github.com/filecoin-project/index-provider v0.0.0-20211117103856-70cd9b7ab68b h1:qVQpqoguf9+vPONSMQZ3xYVzxzwAITyBHjM238zAr6c=
|
||||
github.com/filecoin-project/index-provider v0.0.0-20211117103856-70cd9b7ab68b/go.mod h1:wu0yi7NbT3VzYr3s0n2zheg3mpdSP09A0hBFIQfUs44=
|
||||
github.com/filecoin-project/specs-actors v0.9.4/go.mod h1:BStZQzx5x7TmCkLv0Bpa07U6cPKol6fd3w9KjMPZ6Z4=
|
||||
github.com/filecoin-project/specs-actors v0.9.12/go.mod h1:TS1AW/7LbG+615j4NsjMK1qlpAwaFsG9w0V2tg2gSao=
|
||||
github.com/filecoin-project/specs-actors v0.9.13/go.mod h1:TS1AW/7LbG+615j4NsjMK1qlpAwaFsG9w0V2tg2gSao=
|
||||
@ -427,7 +427,6 @@ github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09
|
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
|
||||
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
|
||||
github.com/gammazero/keymutex v0.0.2 h1:cmpLBJHdEwn+WlR5Z/o9/BN92znSZTp5AKPQDpu1QcI=
|
||||
github.com/gammazero/keymutex v0.0.2/go.mod h1:qtzWCCLMisQUmVa4dvqHVgwfh4BP2YB7JxNDGXnsKrs=
|
||||
github.com/gammazero/radixtree v0.2.5 h1:muPQ4eEgCkUymFWPiVQRuXOQv4IhWg8YXH2r71MoqPM=
|
||||
github.com/gammazero/radixtree v0.2.5/go.mod h1:VPqqCDZ3YZZxAzUUsIF/ytFBigVWV7JIV1Stld8hri0=
|
||||
@ -910,7 +909,6 @@ github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6/go.mod h1
|
||||
github.com/ipld/go-ipld-prime-proto v0.1.0/go.mod h1:11zp8f3sHVgIqtb/c9Kr5ZGqpnCLF1IVTNOez9TopzE=
|
||||
github.com/ipld/go-ipld-selector-text-lite v0.0.0 h1:MLU1YUAgd3Z+RfVCXUbvxH1RQjEe+larJ9jmlW1aMgA=
|
||||
github.com/ipld/go-ipld-selector-text-lite v0.0.0/go.mod h1:U2CQmFb+uWzfIEF3I1arrDa5rwtj00PrpiwwCO+k1RM=
|
||||
github.com/ipld/go-storethehash v0.0.0-20210915160027-d72ca9b0968c h1:izfvqCuEqk2V7BRkh7GCm7lyKC2ItyAbzUu4WgNmggc=
|
||||
github.com/ipld/go-storethehash v0.0.0-20210915160027-d72ca9b0968c/go.mod h1:PwE6iq8TiWJRI3zMGA1RtkFAnrDMK93dLA5SUeu0lH8=
|
||||
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=
|
||||
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4=
|
||||
|
@ -7,13 +7,15 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/actors/policy"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
datatransfer "github.com/filecoin-project/go-data-transfer"
|
||||
"github.com/filecoin-project/go-fil-markets/shared_testutil"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
provider "github.com/filecoin-project/index-provider"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors/policy"
|
||||
"github.com/filecoin-project/lotus/itests/kit"
|
||||
"github.com/filecoin-project/lotus/node"
|
||||
"github.com/filecoin-project/lotus/node/modules"
|
||||
@ -46,15 +48,18 @@ func TestDealWithMarketAndMinerNode(t *testing.T) {
|
||||
runTest := func(t *testing.T, n int, fastRetrieval bool, carExport bool) {
|
||||
api.RunningNodeType = api.NodeMiner // TODO(anteva): fix me
|
||||
|
||||
client, main, market, _ := kit.EnsembleWithMinerAndMarketNodes(t, kit.ThroughRPC())
|
||||
idxProv := shared_testutil.NewMockIndexProvider()
|
||||
idxProvOpt := kit.ConstructorOpts(node.Override(new(provider.Interface), idxProv))
|
||||
client, main, market, _ := kit.EnsembleWithMinerAndMarketNodes(t, kit.ThroughRPC(), idxProvOpt)
|
||||
|
||||
dh := kit.NewDealHarness(t, client, main, market)
|
||||
|
||||
dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{
|
||||
N: n,
|
||||
FastRetrieval: fastRetrieval,
|
||||
CarExport: carExport,
|
||||
StartEpoch: startEpoch,
|
||||
N: n,
|
||||
FastRetrieval: fastRetrieval,
|
||||
CarExport: carExport,
|
||||
StartEpoch: startEpoch,
|
||||
IndexerProvider: idxProv,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/go-fil-markets/shared_testutil"
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
@ -374,6 +375,7 @@ type RunConcurrentDealsOpts struct {
|
||||
CarExport bool
|
||||
StartEpoch abi.ChainEpoch
|
||||
UseCARFileForStorageDeal bool
|
||||
IndexerProvider *shared_testutil.MockIndexProvider
|
||||
}
|
||||
|
||||
func (dh *DealHarness) RunConcurrentDeals(opts RunConcurrentDealsOpts) {
|
||||
@ -400,6 +402,13 @@ func (dh *DealHarness) RunConcurrentDeals(opts RunConcurrentDealsOpts) {
|
||||
UseCARFileForStorageDeal: opts.UseCARFileForStorageDeal,
|
||||
})
|
||||
|
||||
// Check that the storage provider announced the deal to indexers
|
||||
if opts.IndexerProvider != nil {
|
||||
notifs := opts.IndexerProvider.GetNotifs()
|
||||
_, ok := notifs[string(deal.Bytes())]
|
||||
require.True(dh.t, ok)
|
||||
}
|
||||
|
||||
dh.t.Logf("retrieving deal %d/%d", i, opts.N)
|
||||
|
||||
outPath := dh.PerformRetrieval(context.Background(), deal, res.Root, opts.CarExport)
|
||||
|
@ -12,8 +12,6 @@ import (
|
||||
|
||||
carindex "github.com/ipld/go-car/v2/index"
|
||||
|
||||
"github.com/filecoin-project/go-indexer-core/store/storethehash"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
ds "github.com/ipfs/go-datastore"
|
||||
levelds "github.com/ipfs/go-ds-leveldb"
|
||||
@ -87,11 +85,11 @@ func NewDAGStore(cfg config.DAGStoreConfig, minerApi MinerAPI) (*dagstore.DAGSto
|
||||
return nil, nil, xerrors.Errorf("failed to initialise dagstore index repo: %w", err)
|
||||
}
|
||||
|
||||
store, err := storethehash.New(indexDir)
|
||||
if err != nil {
|
||||
return nil, nil, xerrors.Errorf("failed to initialise store the index: %w", err)
|
||||
}
|
||||
topIndex := index.NewInverted(store)
|
||||
//store, err := storethehash.New(indexDir)
|
||||
//if err != nil {
|
||||
//return nil, nil, xerrors.Errorf("failed to initialise store the index: %w", err)
|
||||
//}
|
||||
//topIndex := index.NewInverted(store)
|
||||
|
||||
dcfg := dagstore.Config{
|
||||
TransientsDir: transientsDir,
|
||||
@ -100,7 +98,7 @@ func NewDAGStore(cfg config.DAGStoreConfig, minerApi MinerAPI) (*dagstore.DAGSto
|
||||
MountRegistry: registry,
|
||||
FailureCh: failureCh,
|
||||
TraceCh: traceCh,
|
||||
TopLevelIndex: topIndex,
|
||||
//TopLevelIndex: topIndex,
|
||||
// not limiting fetches globally, as the Lotus mount does
|
||||
// conditional throttling.
|
||||
MaxConcurrentIndex: cfg.MaxConcurrentIndex,
|
||||
@ -281,11 +279,6 @@ func (w *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath s
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Wrapper) GetIterableIndexForPiece(pieceCid cid.Cid) (carindex.IterableIndex, error) {
|
||||
key := shard.KeyFromCID(pieceCid)
|
||||
return w.dagst.GetIterableIndex(key)
|
||||
}
|
||||
|
||||
func (w *Wrapper) MigrateDeals(ctx context.Context, deals []storagemarket.MinerDeal) (bool, error) {
|
||||
log := log.Named("migrator")
|
||||
|
||||
@ -438,6 +431,10 @@ func (w *Wrapper) GetPiecesContainingBlock(blockCID cid.Cid) ([]cid.Cid, error)
|
||||
return pieceCids, nil
|
||||
}
|
||||
|
||||
func (w *Wrapper) GetIterableIndexForPiece(pieceCid cid.Cid) (carindex.IterableIndex, error) {
|
||||
return w.dagst.GetIterableIndex(shard.KeyFromCID(pieceCid))
|
||||
}
|
||||
|
||||
func (w *Wrapper) Close() error {
|
||||
// Cancel the context
|
||||
w.cancel()
|
||||
|
@ -8,6 +8,10 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
mh "github.com/multiformats/go-multihash"
|
||||
|
||||
carindex "github.com/ipld/go-car/v2/index"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
@ -132,6 +136,14 @@ type mockDagStore struct {
|
||||
close chan struct{}
|
||||
}
|
||||
|
||||
func (m *mockDagStore) GetIterableIndex(key shard.Key) (carindex.IterableIndex, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *mockDagStore) ShardsContainingMultihash(h mh.Multihash) ([]shard.Key, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *mockDagStore) GetShardKeysForCid(c cid.Cid) ([]shard.Key, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
provider "github.com/filecoin-project/index-provider"
|
||||
storage2 "github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
@ -163,6 +164,7 @@ func ConfigStorageMiner(c interface{}) Option {
|
||||
Override(HandleRetrievalKey, modules.HandleRetrieval),
|
||||
|
||||
// Markets (storage)
|
||||
Override(new(provider.Interface), modules.IndexerProvider(cfg.IndexerProvider)),
|
||||
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer),
|
||||
Override(new(*storedask.StoredAsk), modules.NewStorageAsk),
|
||||
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(cfg.Dealmaking, nil)),
|
||||
|
@ -176,6 +176,15 @@ func DefaultStorageMiner() *StorageMiner {
|
||||
},
|
||||
},
|
||||
|
||||
IndexerProvider: IndexerProviderConfig{
|
||||
ListenAddresses: []string{
|
||||
"/ip4/0.0.0.0/tcp/0",
|
||||
"/ip6/::/tcp/0",
|
||||
},
|
||||
|
||||
MaxSimultaneousTransfers: DefaultSimultaneousTransfers,
|
||||
},
|
||||
|
||||
Subsystems: MinerSubsystemConfig{
|
||||
EnableMining: true,
|
||||
EnableSealing: true,
|
||||
|
@ -339,6 +339,22 @@ see https://docs.filecoin.io/mine/lotus/miner-configuration/#using-filters-for-f
|
||||
Comment: ``,
|
||||
},
|
||||
},
|
||||
"IndexerProviderConfig": []DocField{
|
||||
{
|
||||
Name: "ListenAddresses",
|
||||
Type: "[]string",
|
||||
|
||||
Comment: `Binding address for the libp2p host - 0 means random port.
|
||||
Format: multiaddress; see https://multiformats.io/multiaddr/`,
|
||||
},
|
||||
{
|
||||
Name: "MaxSimultaneousTransfers",
|
||||
Type: "uint64",
|
||||
|
||||
Comment: `The maximum number of simultaneous data transfers between the indexers
|
||||
and the indexer provider`,
|
||||
},
|
||||
},
|
||||
"Libp2p": []DocField{
|
||||
{
|
||||
Name: "ListenAddresses",
|
||||
@ -814,6 +830,12 @@ Default is 20 (about once a week).`,
|
||||
|
||||
Comment: ``,
|
||||
},
|
||||
{
|
||||
Name: "IndexerProvider",
|
||||
Type: "IndexerProviderConfig",
|
||||
|
||||
Comment: ``,
|
||||
},
|
||||
{
|
||||
Name: "Sealing",
|
||||
Type: "SealingConfig",
|
||||
|
@ -3,6 +3,8 @@ package config
|
||||
import (
|
||||
"github.com/ipfs/go-cid"
|
||||
|
||||
"github.com/filecoin-project/index-provider/config"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
||||
)
|
||||
@ -43,13 +45,14 @@ type Backup struct {
|
||||
type StorageMiner struct {
|
||||
Common
|
||||
|
||||
Subsystems MinerSubsystemConfig
|
||||
Dealmaking DealmakingConfig
|
||||
Sealing SealingConfig
|
||||
Storage sectorstorage.SealerConfig
|
||||
Fees MinerFeeConfig
|
||||
Addresses MinerAddressConfig
|
||||
DAGStore DAGStoreConfig
|
||||
Subsystems MinerSubsystemConfig
|
||||
Dealmaking DealmakingConfig
|
||||
IndexerProvider IndexerProviderConfig
|
||||
Sealing SealingConfig
|
||||
Storage sectorstorage.SealerConfig
|
||||
Fees MinerFeeConfig
|
||||
Addresses MinerAddressConfig
|
||||
DAGStore DAGStoreConfig
|
||||
}
|
||||
|
||||
type DAGStoreConfig struct {
|
||||
@ -146,6 +149,18 @@ type DealmakingConfig struct {
|
||||
RetrievalPricing *RetrievalPricing
|
||||
}
|
||||
|
||||
type IndexerProviderConfig struct {
|
||||
config.Ingest
|
||||
|
||||
// Binding address for the libp2p host - 0 means random port.
|
||||
// Format: multiaddress; see https://multiformats.io/multiaddr/
|
||||
ListenAddresses []string
|
||||
|
||||
// The maximum number of simultaneous data transfers between the indexers
|
||||
// and the indexer provider
|
||||
MaxSimultaneousTransfers uint64
|
||||
}
|
||||
|
||||
type RetrievalPricing struct {
|
||||
Strategy string // possible values: "default", "external"
|
||||
|
||||
|
@ -34,6 +34,7 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-statestore"
|
||||
"github.com/filecoin-project/go-storedcounter"
|
||||
provider "github.com/filecoin-project/index-provider"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
@ -581,6 +582,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
|
||||
h host.Host, ds dtypes.MetadataDS,
|
||||
r repo.LockedRepo,
|
||||
pieceStore dtypes.ProviderPieceStore,
|
||||
indexer provider.Interface,
|
||||
dataTransfer dtypes.ProviderDataTransfer,
|
||||
spn storagemarket.StorageProviderNode,
|
||||
df dtypes.StorageDealFilter,
|
||||
@ -609,6 +611,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
|
||||
namespace.Wrap(ds, datastore.NewKey("/deals/provider")),
|
||||
store,
|
||||
dsw,
|
||||
indexer,
|
||||
pieceStore,
|
||||
dataTransfer,
|
||||
spn,
|
||||
|
158
node/modules/storageminer_idxprov.go
Normal file
158
node/modules/storageminer_idxprov.go
Normal file
@ -0,0 +1,158 @@
|
||||
package modules
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-graphsync"
|
||||
graphsyncimpl "github.com/ipfs/go-graphsync/impl"
|
||||
gsnet "github.com/ipfs/go-graphsync/network"
|
||||
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
|
||||
"github.com/libp2p/go-libp2p"
|
||||
ci "github.com/libp2p/go-libp2p-core/crypto"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/peerstore"
|
||||
"go.uber.org/fx"
|
||||
|
||||
datatransfer "github.com/filecoin-project/go-data-transfer"
|
||||
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
|
||||
dtnet "github.com/filecoin-project/go-data-transfer/network"
|
||||
dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
|
||||
provider "github.com/filecoin-project/index-provider"
|
||||
"github.com/filecoin-project/index-provider/engine"
|
||||
p2pserver "github.com/filecoin-project/index-provider/server/provider/libp2p"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
marketevents "github.com/filecoin-project/lotus/markets/loggers"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
)
|
||||
|
||||
type IdxProv struct {
|
||||
fx.In
|
||||
|
||||
helpers.MetricsCtx
|
||||
fx.Lifecycle
|
||||
Repo repo.LockedRepo
|
||||
Datastore dtypes.MetadataDS
|
||||
PeerID peer.ID
|
||||
peerstore.Peerstore
|
||||
}
|
||||
|
||||
func IndexerProvider(cfg config.IndexerProviderConfig) func(params IdxProv) (provider.Interface, error) {
|
||||
return func(args IdxProv) (provider.Interface, error) {
|
||||
ipds := namespace.Wrap(args.Datastore, datastore.NewKey("/indexer-provider"))
|
||||
|
||||
pkey := args.Peerstore.PrivKey(args.PeerID)
|
||||
if pkey == nil {
|
||||
return nil, fmt.Errorf("missing private key for node ID: %s", args.PeerID.Pretty())
|
||||
}
|
||||
|
||||
h, err := createIndexerProviderHost(args.MetricsCtx, args.Lifecycle, pkey, args.Peerstore, cfg.ListenAddresses)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("creating indexer provider host: %w", err)
|
||||
}
|
||||
|
||||
dt, err := newIndexerProviderDataTransfer(cfg, args.MetricsCtx, args.Lifecycle, args.Repo, h, ipds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
e, err := engine.New(cfg.Ingest, pkey, dt, h, ipds, nil)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("creating indexer provider engine: %w", err)
|
||||
}
|
||||
|
||||
args.Lifecycle.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
// Start the engine
|
||||
err := e.Start(ctx)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("starting indexer provider engine: %s", err)
|
||||
}
|
||||
|
||||
// Add a handler to libp2p that listens for incoming index requests
|
||||
p2pserver.New(ctx, h, e)
|
||||
|
||||
return nil
|
||||
},
|
||||
OnStop: func(ctx context.Context) error {
|
||||
return e.Shutdown(ctx)
|
||||
},
|
||||
})
|
||||
|
||||
return e, nil
|
||||
}
|
||||
}
|
||||
|
||||
func createIndexerProviderHost(mctx helpers.MetricsCtx, lc fx.Lifecycle, pkey ci.PrivKey, pstore peerstore.Peerstore, listenAddrs []string) (host.Host, error) {
|
||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||
|
||||
opts := []libp2p.Option{
|
||||
libp2p.Identity(pkey),
|
||||
libp2p.Peerstore(pstore),
|
||||
libp2p.ListenAddrStrings(listenAddrs...),
|
||||
libp2p.Ping(true),
|
||||
libp2p.UserAgent("lotus-indexer-provider-" + build.UserVersion()),
|
||||
}
|
||||
|
||||
h, err := libp2p.New(ctx, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStop: func(ctx context.Context) error {
|
||||
return h.Close()
|
||||
},
|
||||
})
|
||||
|
||||
return h, nil
|
||||
}
|
||||
|
||||
func newIndexerProviderDataTransfer(cfg config.IndexerProviderConfig, mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, h host.Host, ds datastore.Batching) (datatransfer.Manager, error) {
|
||||
net := dtnet.NewFromLibp2pHost(h)
|
||||
|
||||
// Set up graphsync
|
||||
gs := newIndexerProviderGraphsync(cfg, mctx, lc, h)
|
||||
|
||||
// Set up data transfer
|
||||
dtDs := namespace.Wrap(ds, datastore.NewKey("/datatransfer/transfers"))
|
||||
transport := dtgstransport.NewTransport(h.ID(), gs, net)
|
||||
dtPath := filepath.Join(r.Path(), "indexer-provider", "data-transfer")
|
||||
err := os.MkdirAll(dtPath, 0755) //nolint: gosec
|
||||
if err != nil && !os.IsExist(err) {
|
||||
return nil, xerrors.Errorf("creating indexer provider data transfer dir %s: %w", dtPath, err)
|
||||
}
|
||||
|
||||
dt, err := dtimpl.NewDataTransfer(dtDs, dtPath, net, transport)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("creating indexer provider data transfer module: %w", err)
|
||||
}
|
||||
|
||||
dt.OnReady(marketevents.ReadyLogger("indexer-provider data transfer"))
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: dt.Start,
|
||||
OnStop: dt.Stop,
|
||||
})
|
||||
return dt, nil
|
||||
}
|
||||
|
||||
func newIndexerProviderGraphsync(cfg config.IndexerProviderConfig, mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host) graphsync.GraphExchange {
|
||||
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
|
||||
return graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc),
|
||||
graphsyncNetwork,
|
||||
cidlink.DefaultLinkSystem(),
|
||||
graphsyncimpl.MaxInProgressIncomingRequests(cfg.MaxSimultaneousTransfers),
|
||||
graphsyncimpl.MaxLinksPerIncomingRequests(config.MaxTraversalLinks),
|
||||
graphsyncimpl.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks))
|
||||
}
|
Loading…
Reference in New Issue
Block a user