package modules import ( "context" "path/filepath" "time" blockstore "github.com/ipfs/go-ipfs-blockstore" "github.com/ipfs/go-merkledag" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/routing" "go.uber.org/fx" dtimpl "github.com/filecoin-project/go-data-transfer/impl" dtnet "github.com/filecoin-project/go-data-transfer/network" dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync" "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket/discovery" retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl" rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network" "github.com/filecoin-project/go-fil-markets/storagemarket" storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/go-storedcounter" "github.com/ipfs/go-bitswap" "github.com/ipfs/go-bitswap/network" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-filestore" "github.com/filecoin-project/lotus/markets/retrievaladapter" "github.com/filecoin-project/lotus/node/impl/full" payapi "github.com/filecoin-project/lotus/node/impl/paych" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/paychmgr" ) func ClientFstore(r repo.LockedRepo) (dtypes.ClientFilestore, error) { clientds, err := r.Datastore("/client") if err != nil { return nil, err } blocks := namespace.Wrap(clientds, datastore.NewKey("blocks")) absPath, err := filepath.Abs(r.Path()) if err != nil { return nil, err } fm := filestore.NewFileManager(clientds, filepath.Dir(absPath)) fm.AllowFiles = true // TODO: fm.AllowUrls (needs more code in client import) bs := blockstore.NewBlockstore(blocks) return filestore.NewFilestore(bs, fm), nil } func ClientBlockstore(fstore dtypes.ClientFilestore) dtypes.ClientBlockstore { return blockstore.NewIdStore((*filestore.Filestore)(fstore)) } // RegisterClientValidator is an initialization hook that registers the client // request validator with the data transfer module as the validator for // StorageDataTransferVoucher types func RegisterClientValidator(crv dtypes.ClientRequestValidator, dtm dtypes.ClientDataTransfer) { if err := dtm.RegisterVoucherType(&requestvalidation.StorageDataTransferVoucher{}, (*requestvalidation.UnifiedRequestValidator)(crv)); err != nil { panic(err) } } // NewClientGraphsyncDataTransfer returns a data transfer manager that just // uses the clients's Client DAG service for transfers func NewClientGraphsyncDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.Graphsync, ds dtypes.MetadataDS) (dtypes.ClientDataTransfer, error) { sc := storedcounter.New(ds, datastore.NewKey("/datatransfer/client/counter")) net := dtnet.NewFromLibp2pHost(h) dtDs := namespace.Wrap(ds, datastore.NewKey("/datatransfer/client/transfers")) transport := dtgstransport.NewTransport(h.ID(), gs) dt, err := dtimpl.NewDataTransfer(dtDs, net, transport, sc) if err != nil { return nil, err } lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { return dt.Start(ctx) }, OnStop: func(context.Context) error { return dt.Stop() }, }) return dt, nil } // NewClientDealStore creates a statestore for the client to store its deals func NewClientDealStore(ds dtypes.ClientDatastore) dtypes.ClientDealStore { return statestore.New(ds) } // NewClientDatastore creates a datastore for the client to store its deals func NewClientDatastore(ds dtypes.MetadataDS) dtypes.ClientDatastore { return namespace.Wrap(ds, datastore.NewKey("/deals/client")) } // ClientDAG is a DAGService for the ClientBlockstore func ClientDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.ClientBlockstore, rt routing.Routing, h host.Host) dtypes.ClientDAG { bitswapNetwork := network.NewFromIpfsHost(h, rt) bitswapOptions := []bitswap.Option{bitswap.ProvideEnabled(false)} exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, ibs, bitswapOptions...) bsvc := blockservice.New(ibs, exch) dag := merkledag.NewDAGService(bsvc) lc.Append(fx.Hook{ OnStop: func(_ context.Context) error { return bsvc.Close() }, }) return dag } func NewClientRequestValidator(deals dtypes.ClientDealStore) dtypes.ClientRequestValidator { return requestvalidation.NewUnifiedRequestValidator(nil, deals) } func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode) (storagemarket.StorageClient, error) { net := smnet.NewFromLibp2pHost(h) c, err := storageimpl.NewClient(net, ibs, dataTransfer, discovery, deals, scn, storageimpl.DealPollingInterval(time.Second)) if err != nil { return nil, err } lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { return c.Start(ctx) }, OnStop: func(context.Context) error { c.Stop() return nil }, }) return c, nil } // RetrievalClient creates a new retrieval client attached to the client blockstore func RetrievalClient(h host.Host, bs dtypes.ClientBlockstore, pmgr *paychmgr.Manager, payapi payapi.PaychAPI, resolver retrievalmarket.PeerResolver, ds dtypes.MetadataDS, chainapi full.ChainAPI) (retrievalmarket.RetrievalClient, error) { adapter := retrievaladapter.NewRetrievalClientNode(pmgr, payapi, chainapi) network := rmnet.NewFromLibp2pHost(h) sc := storedcounter.New(ds, datastore.NewKey("/retr")) return retrievalimpl.NewClient(network, bs, adapter, resolver, namespace.Wrap(ds, datastore.NewKey("/retrievals/client")), sc) }