Use the same host and datatransfer as markets for index provider

Remove the bespoke instantiation of libp2p host and datatransfer manager
for index provider and reuse the existing instances used by markets.

The rationale for reuse is the following:
 1. Separation of host introduces a discovery problem, where without
    gossipsub the index provider endpoint will not be discoverable.
    Using the same host as markets would mean the chain can be used to
    discover addresses, putting less empassis on criticality of
    gossipsub considering its set-up cost and lack of message delivery
    guarantees.

 2. Only a single instance of graphsync/datatransfer can be instantiated
    per libp2p host; therefore, if the host is shared, so should
    datatransfer manager.

 3. it is not clear if the assumptions under which separation was
    decided still hold.
This commit is contained in:
Masih H. Derkani 2022-02-16 12:46:03 +00:00
parent 031bfaf120
commit 2ebc111b70
4 changed files with 28 additions and 142 deletions

View File

@ -1,5 +0,0 @@
package idxprov
import "github.com/libp2p/go-libp2p-core/host"
type Host host.Host

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/lotus/api/v1api"
@ -13,21 +14,23 @@ import (
var log = logging.Logger("idxprov")
const protectTag = "index-provider-gossipsub"
type MeshCreator interface {
Connect(ctx context.Context) error
}
type Libp2pMeshCreator struct {
fullnodeApi v1api.FullNode
idxProvHost Host
marketsHost host.Host
}
func (mc Libp2pMeshCreator) Connect(ctx context.Context) error {
// Add the index provider's host ID to list of protected peers first, before any attempt to
// connect to full node.
idxProvID := mc.idxProvHost.ID()
if err := mc.fullnodeApi.NetProtectAdd(ctx, []peer.ID{idxProvID}); err != nil {
// Add the markets host ID to list of daemon's protected peers first, before any attempt to
// connect to full node over libp2p.
marketsPeerID := mc.marketsHost.ID()
if err := mc.fullnodeApi.NetProtectAdd(ctx, []peer.ID{marketsPeerID}); err != nil {
return fmt.Errorf("failed to call NetProtectAdd on the full node, err: %w", err)
}
@ -38,17 +41,17 @@ func (mc Libp2pMeshCreator) Connect(ctx context.Context) error {
// Connect to the full node, ask it to protect the connection and protect the connection on
// markets end too.
if err := mc.idxProvHost.Connect(ctx, faddrs); err != nil {
if err := mc.marketsHost.Connect(ctx, faddrs); err != nil {
return fmt.Errorf("failed to connect index provider host with the full node: %w", err)
}
mc.idxProvHost.ConnManager().Protect(faddrs.ID, "index-provider-gossipsub")
mc.marketsHost.ConnManager().Protect(faddrs.ID, protectTag)
log.Debugw("successfully connected to full node and asked it protect indexer provider peer conn", "fullNodeInfo", faddrs.String(),
"idxProviderPeerId", idxProvID)
"peerId", marketsPeerID)
return nil
}
func NewMeshCreator(fullnodeApi v1api.FullNode, idxProvHost Host) MeshCreator {
return Libp2pMeshCreator{fullnodeApi, idxProvHost}
func NewMeshCreator(fullnodeApi v1api.FullNode, marketsHost host.Host) MeshCreator {
return Libp2pMeshCreator{fullnodeApi, marketsHost}
}

View File

@ -170,7 +170,6 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(dtypes.ProviderTransport), modules.NewProviderTransport),
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDataTransfer),
Override(new(idxprov.MeshCreator), idxprov.NewMeshCreator),
Override(new(idxprov.Host), modules.IndexProviderHost(cfg.IndexProvider)),
Override(new(provider.Interface), modules.IndexProvider(cfg.IndexProvider)),
Override(new(*storedask.StoredAsk), modules.NewStorageAsk),
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(cfg.Dealmaking, nil)),

View File

@ -2,181 +2,70 @@ package modules
import (
"context"
"fmt"
"os"
"path/filepath"
"golang.org/x/xerrors"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-graphsync"
graphsyncimpl "github.com/ipfs/go-graphsync/impl"
gsnet "github.com/ipfs/go-graphsync/network"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p"
ci "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"go.uber.org/fx"
"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
dtnet "github.com/filecoin-project/go-data-transfer/network"
dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
provider "github.com/filecoin-project/index-provider"
"github.com/filecoin-project/index-provider/engine"
"github.com/ipfs/go-datastore/namespace"
"github.com/libp2p/go-libp2p-core/host"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/markets/idxprov"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/modules/lp2p"
"github.com/filecoin-project/lotus/node/repo"
)
type IdxProv struct {
fx.In
helpers.MetricsCtx
fx.Lifecycle
Repo repo.LockedRepo
Datastore dtypes.MetadataDS
PeerID peer.ID
peerstore.Peerstore
}
func IndexProviderHost(cfg config.IndexProviderConfig) func(IdxProv) (idxprov.Host, error) {
return func(args IdxProv) (idxprov.Host, error) {
pkey := args.Peerstore.PrivKey(args.PeerID)
if pkey == nil {
return nil, fmt.Errorf("missing private key for node ID: %s", args.PeerID.Pretty())
}
h, err := createIndexProviderHost(args.MetricsCtx, args.Lifecycle, pkey, args.Peerstore, cfg.ListenAddresses, cfg.AnnounceAddresses)
if err != nil {
return nil, xerrors.Errorf("creating indexer provider host: %w", err)
}
return h, nil
}
}
func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, h idxprov.Host, maddr dtypes.MinerAddress) (provider.Interface, error) {
return func(args IdxProv, marketHost host.Host, h idxprov.Host, maddr dtypes.MinerAddress) (provider.Interface, error) {
func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress) (provider.Interface, error) {
return func(args IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress) (provider.Interface, error) {
ipds := namespace.Wrap(args.Datastore, datastore.NewKey("/index-provider"))
pkey := args.Peerstore.PrivKey(args.PeerID)
if pkey == nil {
return nil, fmt.Errorf("missing private key for node ID: %s", args.PeerID.Pretty())
return nil, xerrors.Errorf("missing private key for node ID: %s", args.PeerID)
}
dt, err := newIndexProviderDataTransfer(cfg, args.MetricsCtx, args.Lifecycle, args.Repo, h, ipds)
if err != nil {
return nil, err
}
var maddrs []string
var retAdds []string
for _, a := range marketHost.Addrs() {
maddrs = append(maddrs, a.String())
retAdds = append(retAdds, a.String())
}
// Get the miner ID and set as extra gossip data.
// The extra data is required by the lotus-specific index-provider gossip message validators.
ma := address.Address(maddr)
log.Infof("Using extra gossip data in index provider engine: %s", ma.String())
log.Infof("Using extra gossip data in index provider engine: %s", ma)
e, err := engine.New(cfg.Ingest, pkey, dt, h, ipds, maddrs, engine.WithExtraGossipData(ma.Bytes()))
e, err := engine.New(cfg.Ingest, pkey, dt, marketHost, ipds, retAdds, engine.WithExtraGossipData(ma.Bytes()))
if err != nil {
return nil, xerrors.Errorf("creating indexer provider engine: %w", err)
}
args.Lifecycle.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
// Start the engine
err := e.Start(ctx)
if err != nil {
return xerrors.Errorf("starting indexer provider engine: %s", err)
if err := e.Start(ctx); err != nil {
return xerrors.Errorf("starting indexer provider engine: %w", err)
}
return nil
},
OnStop: func(ctx context.Context) error {
return e.Shutdown()
OnStop: func(_ context.Context) error {
if err := e.Shutdown(); err != nil {
return xerrors.Errorf("shutting down indexer provider engine: %w", err)
}
return nil
},
})
return e, nil
}
}
func createIndexProviderHost(mctx helpers.MetricsCtx, lc fx.Lifecycle, pkey ci.PrivKey, pstore peerstore.Peerstore, listenAddrs []string, announceAddrs []string) (host.Host, error) {
addrsFactory, err := lp2p.MakeAddrsFactory(announceAddrs, nil)
if err != nil {
return nil, err
}
opts := []libp2p.Option{
libp2p.Identity(pkey),
libp2p.Peerstore(pstore),
libp2p.ListenAddrStrings(listenAddrs...),
libp2p.AddrsFactory(addrsFactory),
libp2p.Ping(true),
libp2p.UserAgent("lotus-indexer-provider-" + build.UserVersion()),
}
h, err := libp2p.New(opts...)
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return h.Close()
},
})
return h, nil
}
func newIndexProviderDataTransfer(cfg config.IndexProviderConfig, mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, h host.Host, ds datastore.Batching) (datatransfer.Manager, error) {
net := dtnet.NewFromLibp2pHost(h)
// Set up graphsync
gs := newIndexProviderGraphsync(cfg, mctx, lc, h)
// Set up data transfer
dtDs := namespace.Wrap(ds, datastore.NewKey("/datatransfer/transfers"))
transport := dtgstransport.NewTransport(h.ID(), gs)
dtPath := filepath.Join(r.Path(), "indexer-provider", "data-transfer")
err := os.MkdirAll(dtPath, 0755) //nolint: gosec
if err != nil && !os.IsExist(err) {
return nil, xerrors.Errorf("creating indexer provider data transfer dir %s: %w", dtPath, err)
}
dt, err := dtimpl.NewDataTransfer(dtDs, net, transport)
if err != nil {
return nil, xerrors.Errorf("creating indexer provider data transfer module: %w", err)
}
dt.OnReady(marketevents.ReadyLogger("indexer-provider data transfer"))
lc.Append(fx.Hook{
OnStart: dt.Start,
OnStop: dt.Stop,
})
return dt, nil
}
func newIndexProviderGraphsync(cfg config.IndexProviderConfig, mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host) graphsync.GraphExchange {
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
return graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc),
graphsyncNetwork,
cidlink.DefaultLinkSystem(),
graphsyncimpl.MaxInProgressIncomingRequests(cfg.MaxSimultaneousTransfers),
graphsyncimpl.MaxLinksPerIncomingRequests(config.MaxTraversalLinks),
graphsyncimpl.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks))
}