lotus/node/impl/client/client.go

638 lines
16 KiB
Go
Raw Normal View History

2019-09-16 13:46:05 +00:00
package client
2019-08-20 16:48:33 +00:00
import (
"context"
"fmt"
2019-10-23 09:18:22 +00:00
"io"
2019-08-20 16:48:33 +00:00
"os"
"golang.org/x/xerrors"
2019-09-06 22:39:47 +00:00
"github.com/ipfs/go-blockservice"
2019-08-20 16:48:33 +00:00
"github.com/ipfs/go-cid"
2020-07-07 09:12:32 +00:00
"github.com/ipfs/go-cidutil"
2019-08-20 16:48:33 +00:00
chunker "github.com/ipfs/go-ipfs-chunker"
2019-09-06 22:39:47 +00:00
offline "github.com/ipfs/go-ipfs-exchange-offline"
2019-08-20 16:48:33 +00:00
files "github.com/ipfs/go-ipfs-files"
ipld "github.com/ipfs/go-ipld-format"
2019-09-06 22:39:47 +00:00
"github.com/ipfs/go-merkledag"
unixfile "github.com/ipfs/go-unixfs/file"
2019-08-20 16:48:33 +00:00
"github.com/ipfs/go-unixfs/importer/balanced"
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
"github.com/ipld/go-car"
2020-07-07 08:52:19 +00:00
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
2019-08-20 16:48:33 +00:00
"github.com/libp2p/go-libp2p-core/peer"
2020-07-07 08:52:19 +00:00
"github.com/multiformats/go-multiaddr"
2020-07-07 09:12:32 +00:00
mh "github.com/multiformats/go-multihash"
2019-08-20 16:48:33 +00:00
"go.uber.org/fx"
2019-09-06 22:39:47 +00:00
"github.com/filecoin-project/go-address"
2020-07-07 08:52:19 +00:00
"github.com/filecoin-project/go-fil-markets/pieceio"
2020-06-23 19:22:33 +00:00
rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
2020-04-16 21:43:39 +00:00
"github.com/filecoin-project/sector-storage/ffiwrapper"
2020-02-13 00:15:33 +00:00
"github.com/filecoin-project/specs-actors/actors/abi"
2020-04-16 21:43:39 +00:00
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
2020-02-13 00:15:33 +00:00
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/markets/utils"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/impl/paych"
2020-07-07 08:52:19 +00:00
"github.com/filecoin-project/lotus/node/modules/dtypes"
2020-07-06 23:39:30 +00:00
"github.com/filecoin-project/lotus/node/repo/importmgr"
2019-08-20 16:48:33 +00:00
)
2020-07-07 09:12:32 +00:00
var DefaultHashFunction = uint64(mh.BLAKE2B_MIN + 31)
2020-07-07 09:38:22 +00:00
2020-02-13 00:15:33 +00:00
const dealStartBuffer abi.ChainEpoch = 10000 // TODO: allow setting
2019-09-16 13:46:05 +00:00
type API struct {
2019-08-20 16:48:33 +00:00
fx.In
2019-09-16 13:46:05 +00:00
full.ChainAPI
full.StateAPI
full.WalletAPI
paych.PaychAPI
2019-08-20 16:48:33 +00:00
SMDealClient storagemarket.StorageClient
2020-06-23 19:22:33 +00:00
RetDiscovery rm.PeerResolver
Retrieval rm.RetrievalClient
2019-09-06 22:39:47 +00:00
Chain *store.ChainStore
2019-08-20 16:48:33 +00:00
2020-07-07 08:52:19 +00:00
Imports dtypes.ClientImportMgr
2020-07-07 12:35:02 +00:00
RetBstore dtypes.ClientBlockstore // TODO: try to remove
2019-08-20 16:48:33 +00:00
}
2020-04-30 17:42:16 +00:00
func calcDealExpiration(minDuration uint64, md *miner.DeadlineInfo, startEpoch abi.ChainEpoch) abi.ChainEpoch {
2020-04-21 21:38:26 +00:00
// Make sure we give some time for the miner to seal
2020-04-30 17:42:16 +00:00
minExp := startEpoch + abi.ChainEpoch(minDuration)
2020-04-21 21:38:26 +00:00
// Align on miners ProvingPeriodBoundary
return minExp + miner.WPoStProvingPeriod - (minExp % miner.WPoStProvingPeriod) + (md.PeriodStart % miner.WPoStProvingPeriod) - 1
2020-04-21 21:38:26 +00:00
}
2020-07-07 08:52:19 +00:00
func (a *API) imgr() *importmgr.Mgr {
return a.Imports
}
func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) {
exist, err := a.WalletHas(ctx, params.Wallet)
2019-08-20 16:48:33 +00:00
if err != nil {
return nil, xerrors.Errorf("failed getting addr from wallet: %w", params.Wallet)
}
if !exist {
return nil, xerrors.Errorf("provided address doesn't exist in wallet")
2019-08-20 16:48:33 +00:00
}
mi, err := a.StateMinerInfo(ctx, params.Miner, types.EmptyTSK)
2019-08-20 16:48:33 +00:00
if err != nil {
return nil, xerrors.Errorf("failed getting peer ID: %w", err)
}
multiaddrs := make([]multiaddr.Multiaddr, 0, len(mi.Multiaddrs))
for _, a := range mi.Multiaddrs {
maddr, err := multiaddr.NewMultiaddrBytes(a)
if err != nil {
return nil, err
}
multiaddrs = append(multiaddrs, maddr)
}
md, err := a.StateMinerProvingDeadline(ctx, params.Miner, types.EmptyTSK)
if err != nil {
2020-07-07 22:29:45 +00:00
return nil, xerrors.Errorf("failed getting miner's deadline info: %w", err)
}
rt, err := ffiwrapper.SealProofTypeFromSectorSize(mi.SectorSize)
2020-02-27 21:45:31 +00:00
if err != nil {
return nil, xerrors.Errorf("bad sector size: %w", err)
}
if uint64(params.Data.PieceSize.Padded()) > uint64(mi.SectorSize) {
return nil, xerrors.New("data doesn't fit in a sector")
}
providerInfo := utils.NewStorageProviderInfo(params.Miner, mi.Worker, mi.SectorSize, mi.PeerId, multiaddrs)
2020-04-30 17:42:16 +00:00
dealStart := params.DealStartEpoch
if dealStart <= 0 { // unset, or explicitly 'epoch undefined'
ts, err := a.ChainHead(ctx)
if err != nil {
return nil, xerrors.Errorf("failed getting chain height: %w", err)
}
dealStart = ts.Height() + dealStartBuffer
}
result, err := a.SMDealClient.ProposeStorageDeal(
ctx,
params.Wallet,
&providerInfo,
params.Data,
2020-04-30 17:42:16 +00:00
dealStart,
calcDealExpiration(params.MinBlocksDuration, md, dealStart),
params.EpochPrice,
2020-02-27 21:45:31 +00:00
big.Zero(),
rt,
params.FastRetrieval,
params.VerifiedDeal,
2020-02-27 21:45:31 +00:00
)
2019-08-20 16:48:33 +00:00
if err != nil {
return nil, xerrors.Errorf("failed to start deal: %w", err)
}
return &result.ProposalCid, nil
2019-08-20 16:48:33 +00:00
}
2019-09-16 13:46:05 +00:00
func (a *API) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) {
deals, err := a.SMDealClient.ListLocalDeals(ctx)
2019-09-10 14:13:24 +00:00
if err != nil {
return nil, err
}
out := make([]api.DealInfo, len(deals))
for k, v := range deals {
out[k] = api.DealInfo{
ProposalCid: v.ProposalCid,
State: v.State,
Message: v.Message,
2020-01-10 18:01:48 +00:00
Provider: v.Proposal.Provider,
2019-09-10 14:48:54 +00:00
PieceCID: v.Proposal.PieceCID,
2020-02-13 00:15:33 +00:00
Size: uint64(v.Proposal.PieceSize.Unpadded()),
2019-09-10 14:48:54 +00:00
PricePerEpoch: v.Proposal.StoragePricePerEpoch,
2020-02-13 00:15:33 +00:00
Duration: uint64(v.Proposal.Duration()),
DealID: v.DealID,
2019-09-10 14:13:24 +00:00
}
}
return out, nil
}
2019-11-06 19:44:28 +00:00
func (a *API) ClientGetDealInfo(ctx context.Context, d cid.Cid) (*api.DealInfo, error) {
v, err := a.SMDealClient.GetLocalDeal(ctx, d)
2019-11-06 19:44:28 +00:00
if err != nil {
return nil, err
}
2019-11-06 19:44:28 +00:00
return &api.DealInfo{
ProposalCid: v.ProposalCid,
State: v.State,
Message: v.Message,
2020-01-10 18:01:48 +00:00
Provider: v.Proposal.Provider,
PieceCID: v.Proposal.PieceCID,
2020-02-13 00:15:33 +00:00
Size: uint64(v.Proposal.PieceSize.Unpadded()),
PricePerEpoch: v.Proposal.StoragePricePerEpoch,
2020-02-13 00:15:33 +00:00
Duration: uint64(v.Proposal.Duration()),
DealID: v.DealID,
2019-11-06 19:44:28 +00:00
}, nil
}
2019-09-16 13:46:05 +00:00
func (a *API) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) {
2019-08-26 13:45:36 +00:00
// TODO: check if we have the ENTIRE dag
2020-07-07 08:52:19 +00:00
offExch := merkledag.NewDAGService(blockservice.New(a.Imports.Blockstore, offline.Exchange(a.Imports.Blockstore)))
2019-08-26 13:45:36 +00:00
_, err := offExch.Get(ctx, root)
if err == ipld.ErrNotFound {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}
2020-07-09 16:29:57 +00:00
func (a *API) ClientFindData(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) {
2019-08-26 13:45:36 +00:00
peers, err := a.RetDiscovery.GetPeers(root)
if err != nil {
return nil, err
}
2020-07-09 16:29:57 +00:00
out := make([]api.QueryOffer, 0, len(peers))
for _, p := range peers {
if piece != nil && !piece.Equals(*p.PieceCID) {
continue
}
out = append(out, a.makeRetrievalQuery(ctx, p, root, piece, rm.QueryParams{}))
2019-08-26 13:45:36 +00:00
}
return out, nil
}
func (a *API) ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) {
mi, err := a.StateMinerInfo(ctx, miner, types.EmptyTSK)
if err != nil {
return api.QueryOffer{}, err
}
2020-06-23 19:22:33 +00:00
rp := rm.RetrievalPeer{
Address: miner,
ID: mi.PeerId,
}
return a.makeRetrievalQuery(ctx, rp, root, piece, rm.QueryParams{}), nil
}
2020-07-09 16:29:57 +00:00
func (a *API) makeRetrievalQuery(ctx context.Context, rp rm.RetrievalPeer, payload cid.Cid, piece *cid.Cid, qp rm.QueryParams) api.QueryOffer {
queryResponse, err := a.Retrieval.Query(ctx, rp, payload, qp)
if err != nil {
return api.QueryOffer{Err: err.Error(), Miner: rp.Address, MinerPeerID: rp.ID}
}
var errStr string
switch queryResponse.Status {
2020-06-23 19:22:33 +00:00
case rm.QueryResponseAvailable:
errStr = ""
2020-06-23 19:22:33 +00:00
case rm.QueryResponseUnavailable:
errStr = fmt.Sprintf("retrieval query offer was unavailable: %s", queryResponse.Message)
2020-06-23 19:22:33 +00:00
case rm.QueryResponseError:
errStr = fmt.Sprintf("retrieval query offer errored: %s", queryResponse.Message)
}
return api.QueryOffer{
Root: payload,
2020-07-09 16:29:57 +00:00
Piece: piece,
Size: queryResponse.Size,
MinPrice: queryResponse.PieceRetrievalPrice(),
PaymentInterval: queryResponse.MaxPaymentInterval,
PaymentIntervalIncrease: queryResponse.MaxPaymentIntervalIncrease,
Miner: queryResponse.PaymentAddress, // TODO: check
MinerPeerID: rp.ID,
Err: errStr,
}
}
2020-07-07 11:45:02 +00:00
func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) {
2020-07-07 08:52:19 +00:00
id, st, err := a.imgr().NewStore()
2020-07-06 23:39:30 +00:00
if err != nil {
2020-07-07 11:45:02 +00:00
return nil, err
2020-07-06 23:39:30 +00:00
}
2020-07-07 09:38:09 +00:00
if err := a.imgr().AddLabel(id, importmgr.LSource, "import"); err != nil {
2020-07-07 11:45:02 +00:00
return nil, err
2020-07-06 23:39:30 +00:00
}
2020-07-07 09:38:09 +00:00
if err := a.imgr().AddLabel(id, importmgr.LFileName, ref.Path); err != nil {
2020-07-07 11:45:02 +00:00
return nil, err
2020-07-07 09:38:09 +00:00
}
2019-08-20 16:48:33 +00:00
2020-07-07 09:38:09 +00:00
nd, err := a.clientImport(ctx, ref, st)
2019-08-20 16:48:33 +00:00
if err != nil {
2020-07-07 11:45:02 +00:00
return nil, err
2019-08-20 16:48:33 +00:00
}
2020-07-07 09:38:09 +00:00
if err := a.imgr().AddLabel(id, importmgr.LRootCid, nd.String()); err != nil {
2020-07-07 11:45:02 +00:00
return nil, err
2020-07-07 09:38:09 +00:00
}
2020-07-07 11:45:02 +00:00
return &api.ImportRes{
Root: nd,
ImportID: id,
}, nil
}
func (a *API) ClientRemoveImport(ctx context.Context, importID int64) error {
return a.imgr().Remove(importID)
2019-08-20 16:48:33 +00:00
}
2019-10-23 09:18:22 +00:00
func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, error) {
file := files.NewReaderFile(f)
2020-07-07 08:52:19 +00:00
id, st, err := a.imgr().NewStore()
2020-07-06 23:39:30 +00:00
if err != nil {
2020-07-07 09:12:32 +00:00
return cid.Undef, err
2020-07-06 23:39:30 +00:00
}
2020-07-07 08:52:19 +00:00
if err := a.imgr().AddLabel(id, "source", "import-local"); err != nil {
2020-07-06 23:39:30 +00:00
return cid.Cid{}, err
}
bufferedDS := ipld.NewBufferedDAG(ctx, st.DAG)
2019-10-23 09:18:22 +00:00
2020-07-07 09:12:32 +00:00
prefix, err := merkledag.PrefixForCidVersion(1)
if err != nil {
return cid.Undef, err
}
prefix.MhType = DefaultHashFunction
2019-10-23 09:18:22 +00:00
params := ihelper.DagBuilderParams{
2020-07-07 09:38:22 +00:00
Maxlinks: build.UnixfsLinksPerLevel,
RawLeaves: true,
2020-07-07 09:12:32 +00:00
CidBuilder: cidutil.InlineBuilder{
Builder: prefix,
Limit: 126,
},
2020-07-07 09:38:22 +00:00
Dagserv: bufferedDS,
2019-10-23 09:18:22 +00:00
}
db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize)))
if err != nil {
return cid.Undef, err
}
nd, err := balanced.Layout(db)
if err != nil {
return cid.Undef, err
}
return nd.Cid(), bufferedDS.Commit()
}
2019-09-16 13:46:05 +00:00
func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) {
2020-07-07 08:52:19 +00:00
importIDs := a.imgr().List()
2019-08-20 16:48:33 +00:00
2020-07-07 08:52:19 +00:00
out := make([]api.Import, len(importIDs))
for i, id := range importIDs {
info, err := a.imgr().Info(id)
if err != nil {
out[i] = api.Import{
Key: id,
2020-07-07 09:12:32 +00:00
Err: xerrors.Errorf("getting info: %w", err).Error(),
2020-07-07 08:52:19 +00:00
}
continue
}
2019-08-20 16:48:33 +00:00
2020-07-07 08:52:19 +00:00
ai := api.Import{
Key: id,
Source: info.Labels[importmgr.LSource],
FilePath: info.Labels[importmgr.LFileName],
2019-08-20 16:48:33 +00:00
}
2020-07-07 08:52:19 +00:00
if info.Labels[importmgr.LRootCid] != "" {
c, err := cid.Parse(info.Labels[importmgr.LRootCid])
if err != nil {
2020-07-07 09:12:32 +00:00
ai.Err = err.Error()
2020-07-07 08:52:19 +00:00
} else {
ai.Root = &c
}
}
2020-07-07 08:52:19 +00:00
out[i] = ai
2019-08-20 16:48:33 +00:00
}
2020-07-07 08:52:19 +00:00
return out, nil
2019-08-20 16:48:33 +00:00
}
2019-08-27 18:45:21 +00:00
func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error {
2019-09-16 20:11:17 +00:00
if order.MinerPeerID == "" {
mi, err := a.StateMinerInfo(ctx, order.Miner, types.EmptyTSK)
2019-09-16 20:11:17 +00:00
if err != nil {
return err
}
2020-07-06 23:39:30 +00:00
order.MinerPeerID = mi.PeerId
2019-09-16 20:11:17 +00:00
}
if order.Size == 0 {
return xerrors.Errorf("cannot make retrieval deal for zero bytes")
}
2020-07-07 12:35:02 +00:00
/*id, st, err := a.imgr().NewStore()
2020-07-06 23:39:30 +00:00
if err != nil {
return err
}
2020-07-07 08:52:19 +00:00
if err := a.imgr().AddLabel(id, "source", "retrieval"); err != nil {
2020-07-06 23:39:30 +00:00
return err
2020-07-07 12:35:02 +00:00
}*/
2020-07-06 23:39:30 +00:00
retrievalResult := make(chan error, 1)
2020-06-23 19:22:33 +00:00
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
if state.PayloadCID.Equals(order.Root) {
switch state.Status {
2020-06-23 19:22:33 +00:00
case rm.DealStatusCompleted:
retrievalResult <- nil
2020-06-23 19:22:33 +00:00
case rm.DealStatusRejected:
retrievalResult <- xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message)
case
2020-06-23 19:22:33 +00:00
rm.DealStatusDealNotFound,
rm.DealStatusErrored,
rm.DealStatusFailed:
retrievalResult <- xerrors.Errorf("Retrieval Error: %s", state.Message)
case
2020-06-23 19:22:33 +00:00
rm.DealStatusAccepted,
rm.DealStatusAwaitingAcceptance,
rm.DealStatusBlocksComplete,
rm.DealStatusFinalizing,
rm.DealStatusFundsNeeded,
rm.DealStatusFundsNeededLastPayment,
rm.DealStatusNew,
rm.DealStatusOngoing,
rm.DealStatusPaymentChannelAddingFunds,
rm.DealStatusPaymentChannelAllocatingLane,
rm.DealStatusPaymentChannelCreating,
rm.DealStatusPaymentChannelReady,
rm.DealStatusVerified:
return
default:
retrievalResult <- xerrors.Errorf("Unhandled Retrieval Status: %+v", state.Status)
}
}
})
ppb := types.BigDiv(order.Total, types.NewInt(order.Size))
2020-07-07 12:35:02 +00:00
_, err := a.Retrieval.Retrieve(
ctx,
order.Root,
2020-06-23 19:22:33 +00:00
rm.NewParamsV0(ppb, order.PaymentInterval, order.PaymentIntervalIncrease),
order.Total,
order.MinerPeerID,
2020-01-10 18:01:48 +00:00
order.Client,
2020-07-06 23:39:30 +00:00
order.Miner) // TODO: pass the store here somehow
2020-05-20 17:24:42 +00:00
if err != nil {
return xerrors.Errorf("Retrieve failed: %w", err)
}
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
case err := <-retrievalResult:
if err != nil {
return xerrors.Errorf("RetrieveUnixfs: %w", err)
}
2019-08-27 22:10:23 +00:00
}
unsubscribe()
// If ref is nil, it only fetches the data into the configured blockstore.
if ref == nil {
return nil
}
2020-07-07 12:35:02 +00:00
rdag := merkledag.NewDAGService(blockservice.New(a.RetBstore, offline.Exchange(a.RetBstore)))
if ref.IsCAR {
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
2020-07-07 12:35:02 +00:00
err = car.WriteCar(ctx, rdag, []cid.Cid{order.Root}, f)
if err != nil {
return err
}
return f.Close()
}
2020-07-07 12:35:02 +00:00
nd, err := rdag.Get(ctx, order.Root)
2019-08-27 22:10:23 +00:00
if err != nil {
return xerrors.Errorf("ClientRetrieve: %w", err)
2019-08-27 22:10:23 +00:00
}
2020-07-07 12:35:02 +00:00
file, err := unixfile.NewUnixfsFile(ctx, rdag, nd)
if err != nil {
return xerrors.Errorf("ClientRetrieve: %w", err)
}
return files.WriteTo(file, ref.Path)
2019-08-27 18:45:21 +00:00
}
2019-09-13 21:00:36 +00:00
2020-02-13 00:15:33 +00:00
func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) {
info := utils.NewStorageProviderInfo(miner, address.Undef, 0, p, nil)
signedAsk, err := a.SMDealClient.GetAsk(ctx, info)
if err != nil {
return nil, err
}
2020-02-13 00:15:33 +00:00
return signedAsk, nil
2019-09-13 21:00:36 +00:00
}
func (a *API) ClientCalcCommP(ctx context.Context, inpath string, miner address.Address) (*api.CommPRet, error) {
mi, err := a.StateMinerInfo(ctx, miner, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("failed checking miners sector size: %w", err)
}
rt, err := ffiwrapper.SealProofTypeFromSectorSize(mi.SectorSize)
if err != nil {
return nil, xerrors.Errorf("bad sector size: %w", err)
}
rdr, err := os.Open(inpath)
if err != nil {
return nil, err
}
stat, err := rdr.Stat()
if err != nil {
return nil, err
}
commP, pieceSize, err := pieceio.GeneratePieceCommitment(rt, rdr, uint64(stat.Size()))
if err != nil {
return nil, xerrors.Errorf("computing commP failed: %w", err)
}
return &api.CommPRet{
Root: commP,
Size: pieceSize,
}, nil
}
func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath string) error {
2020-07-07 08:52:19 +00:00
id, st, err := a.imgr().NewStore()
2020-07-06 23:39:30 +00:00
if err != nil {
return err
}
2020-07-07 08:52:19 +00:00
if err := a.imgr().AddLabel(id, "source", "gen-car"); err != nil {
2020-07-06 23:39:30 +00:00
return err
}
2020-07-06 23:39:30 +00:00
bufferedDS := ipld.NewBufferedDAG(ctx, st.DAG)
c, err := a.clientImport(ctx, ref, st)
if err != nil {
return err
}
2020-07-06 23:39:30 +00:00
// TODO: does that defer mean to remove the whole blockstore?
defer bufferedDS.Remove(ctx, c) //nolint:errcheck
ssb := builder.NewSelectorSpecBuilder(basicnode.Style.Any)
// entire DAG selector
allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()
f, err := os.Create(outputPath)
if err != nil {
return err
}
2020-07-06 23:39:30 +00:00
sc := car.NewSelectiveCar(ctx, st.Bstore, []car.Dag{{Root: c, Selector: allSelector}})
if err = sc.Write(f); err != nil {
return err
}
return f.Close()
}
2020-07-06 23:39:30 +00:00
func (a *API) clientImport(ctx context.Context, ref api.FileRef, store *importmgr.Store) (cid.Cid, error) {
f, err := os.Open(ref.Path)
if err != nil {
return cid.Undef, err
}
stat, err := f.Stat()
if err != nil {
return cid.Undef, err
}
file, err := files.NewReaderPathFile(ref.Path, f, stat)
if err != nil {
return cid.Undef, err
}
if ref.IsCAR {
2020-07-06 23:39:30 +00:00
var st car.Store
if store.Fstore == nil {
st = store.Bstore
} else {
2020-07-06 23:39:30 +00:00
st = store.Fstore
}
2020-07-06 23:39:30 +00:00
result, err := car.LoadCar(st, file)
if err != nil {
return cid.Undef, err
}
if len(result.Roots) != 1 {
return cid.Undef, xerrors.New("cannot import car with more than one root")
}
return result.Roots[0], nil
}
2020-07-06 23:39:30 +00:00
bufDs := ipld.NewBufferedDAG(ctx, store.DAG)
2020-07-07 09:12:32 +00:00
prefix, err := merkledag.PrefixForCidVersion(1)
if err != nil {
return cid.Undef, err
}
prefix.MhType = DefaultHashFunction
params := ihelper.DagBuilderParams{
2020-07-07 09:38:22 +00:00
Maxlinks: build.UnixfsLinksPerLevel,
RawLeaves: true,
2020-07-07 09:12:32 +00:00
CidBuilder: cidutil.InlineBuilder{
Builder: prefix,
Limit: 126,
},
2020-07-07 09:38:22 +00:00
Dagserv: bufDs,
2020-07-08 21:05:43 +00:00
NoCopy: true,
}
db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize)))
if err != nil {
return cid.Undef, err
}
nd, err := balanced.Layout(db)
if err != nil {
return cid.Undef, err
}
2020-07-06 23:39:30 +00:00
if err := bufDs.Commit(); err != nil {
return cid.Undef, err
}
return nd.Cid(), nil
}