package client import ( "bufio" "context" "fmt" "io" "os" "golang.org/x/xerrors" "github.com/filecoin-project/go-padreader" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/dline" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil" chunker "github.com/ipfs/go-ipfs-chunker" offline "github.com/ipfs/go-ipfs-exchange-offline" files "github.com/ipfs/go-ipfs-files" ipld "github.com/ipfs/go-ipld-format" "github.com/ipfs/go-merkledag" unixfile "github.com/ipfs/go-unixfs/file" "github.com/ipfs/go-unixfs/importer/balanced" ihelper "github.com/ipfs/go-unixfs/importer/helpers" "github.com/ipld/go-car" basicnode "github.com/ipld/go-ipld-prime/node/basic" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipld/go-ipld-prime/traversal/selector/builder" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" mh "github.com/multiformats/go-multihash" "go.uber.org/fx" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-commp-utils/ffiwrapper" "github.com/filecoin-project/go-commp-utils/writer" datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/discovery" "github.com/filecoin-project/go-fil-markets/retrievalmarket" rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/go-state-types/abi" marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/markets/utils" "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/impl/paych" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo/importmgr" ) var DefaultHashFunction = uint64(mh.BLAKE2B_MIN + 31) const dealStartBufferHours uint64 = 49 type API struct { fx.In full.ChainAPI full.WalletAPI paych.PaychAPI full.StateAPI SMDealClient storagemarket.StorageClient RetDiscovery discovery.PeerResolver Retrieval rm.RetrievalClient Chain *store.ChainStore Imports dtypes.ClientImportMgr CombinedBstore dtypes.ClientBlockstore // TODO: try to remove RetrievalStoreMgr dtypes.ClientRetrievalStoreManager DataTransfer dtypes.ClientDataTransfer Host host.Host } func calcDealExpiration(minDuration uint64, md *dline.Info, startEpoch abi.ChainEpoch) abi.ChainEpoch { // Make sure we give some time for the miner to seal minExp := startEpoch + abi.ChainEpoch(minDuration) // Align on miners ProvingPeriodBoundary return minExp + md.WPoStProvingPeriod - (minExp % md.WPoStProvingPeriod) + (md.PeriodStart % md.WPoStProvingPeriod) - 1 } func (a *API) imgr() *importmgr.Mgr { return a.Imports } func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) { var storeID *multistore.StoreID if params.Data.TransferType == storagemarket.TTGraphsync { importIDs := a.imgr().List() for _, importID := range importIDs { info, err := a.imgr().Info(importID) if err != nil { continue } if info.Labels[importmgr.LRootCid] == "" { continue } c, err := cid.Parse(info.Labels[importmgr.LRootCid]) if err != nil { continue } if c.Equals(params.Data.Root) { storeID = &importID //nolint break } } } walletKey, err := a.StateAccountKey(ctx, params.Wallet, types.EmptyTSK) if err != nil { return nil, xerrors.Errorf("failed resolving params.Wallet addr: %w", params.Wallet) } exist, err := a.WalletHas(ctx, walletKey) if err != nil { return nil, xerrors.Errorf("failed getting addr from wallet: %w", params.Wallet) } if !exist { return nil, xerrors.Errorf("provided address doesn't exist in wallet") } mi, err := a.StateMinerInfo(ctx, params.Miner, types.EmptyTSK) if err != nil { return nil, xerrors.Errorf("failed getting peer ID: %w", err) } md, err := a.StateMinerProvingDeadline(ctx, params.Miner, types.EmptyTSK) if err != nil { return nil, xerrors.Errorf("failed getting miner's deadline info: %w", err) } if uint64(params.Data.PieceSize.Padded()) > uint64(mi.SectorSize) { return nil, xerrors.New("data doesn't fit in a sector") } providerInfo := utils.NewStorageProviderInfo(params.Miner, mi.Worker, mi.SectorSize, *mi.PeerId, mi.Multiaddrs) dealStart := params.DealStartEpoch if dealStart <= 0 { // unset, or explicitly 'epoch undefined' ts, err := a.ChainHead(ctx) if err != nil { return nil, xerrors.Errorf("failed getting chain height: %w", err) } blocksPerHour := 60 * 60 / build.BlockDelaySecs dealStart = ts.Height() + abi.ChainEpoch(dealStartBufferHours*blocksPerHour) // TODO: Get this from storage ask } result, err := a.SMDealClient.ProposeStorageDeal(ctx, storagemarket.ProposeStorageDealParams{ Addr: params.Wallet, Info: &providerInfo, Data: params.Data, StartEpoch: dealStart, EndEpoch: calcDealExpiration(params.MinBlocksDuration, md, dealStart), Price: params.EpochPrice, Collateral: params.ProviderCollateral, Rt: mi.SealProofType, FastRetrieval: params.FastRetrieval, VerifiedDeal: params.VerifiedDeal, StoreID: storeID, }) if err != nil { return nil, xerrors.Errorf("failed to start deal: %w", err) } return &result.ProposalCid, nil } func (a *API) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) { deals, err := a.SMDealClient.ListLocalDeals(ctx) if err != nil { return nil, err } // Get a map of transfer ID => DataTransfer dataTransfersByID, err := a.transfersByID(ctx) if err != nil { return nil, err } out := make([]api.DealInfo, len(deals)) for k, v := range deals { // Find the data transfer associated with this deal var transferCh *api.DataTransferChannel if v.TransferChannelID != nil { if ch, ok := dataTransfersByID[*v.TransferChannelID]; ok { transferCh = &ch } } out[k] = a.newDealInfoWithTransfer(transferCh, v) } return out, nil } func (a *API) transfersByID(ctx context.Context) (map[datatransfer.ChannelID]api.DataTransferChannel, error) { inProgressChannels, err := a.DataTransfer.InProgressChannels(ctx) if err != nil { return nil, err } dataTransfersByID := make(map[datatransfer.ChannelID]api.DataTransferChannel, len(inProgressChannels)) for id, channelState := range inProgressChannels { ch := api.NewDataTransferChannel(a.Host.ID(), channelState) dataTransfersByID[id] = ch } return dataTransfersByID, nil } func (a *API) ClientGetDealInfo(ctx context.Context, d cid.Cid) (*api.DealInfo, error) { v, err := a.SMDealClient.GetLocalDeal(ctx, d) if err != nil { return nil, err } di := a.newDealInfo(ctx, v) return &di, nil } func (a *API) ClientGetDealUpdates(ctx context.Context) (<-chan api.DealInfo, error) { updates := make(chan api.DealInfo) unsub := a.SMDealClient.SubscribeToEvents(func(_ storagemarket.ClientEvent, deal storagemarket.ClientDeal) { updates <- a.newDealInfo(ctx, deal) }) go func() { defer unsub() <-ctx.Done() }() return updates, nil } func (a *API) newDealInfo(ctx context.Context, v storagemarket.ClientDeal) api.DealInfo { // Find the data transfer associated with this deal var transferCh *api.DataTransferChannel if v.TransferChannelID != nil { state, err := a.DataTransfer.ChannelState(ctx, *v.TransferChannelID) // Note: If there was an error just ignore it, as the data transfer may // be not found if it's no longer active if err == nil { ch := api.NewDataTransferChannel(a.Host.ID(), state) transferCh = &ch } } return a.newDealInfoWithTransfer(transferCh, v) } func (a *API) newDealInfoWithTransfer(transferCh *api.DataTransferChannel, v storagemarket.ClientDeal) api.DealInfo { return api.DealInfo{ ProposalCid: v.ProposalCid, DataRef: v.DataRef, State: v.State, Message: v.Message, Provider: v.Proposal.Provider, PieceCID: v.Proposal.PieceCID, Size: uint64(v.Proposal.PieceSize.Unpadded()), PricePerEpoch: v.Proposal.StoragePricePerEpoch, Duration: uint64(v.Proposal.Duration()), DealID: v.DealID, CreationTime: v.CreationTime.Time(), Verified: v.Proposal.VerifiedDeal, TransferChannelID: v.TransferChannelID, DataTransfer: transferCh, } } func (a *API) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) { // TODO: check if we have the ENTIRE dag offExch := merkledag.NewDAGService(blockservice.New(a.Imports.Blockstore, offline.Exchange(a.Imports.Blockstore))) _, err := offExch.Get(ctx, root) if err == ipld.ErrNotFound { return false, nil } if err != nil { return false, err } return true, nil } func (a *API) ClientFindData(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) { peers, err := a.RetDiscovery.GetPeers(root) if err != nil { return nil, err } out := make([]api.QueryOffer, 0, len(peers)) for _, p := range peers { if piece != nil && !piece.Equals(*p.PieceCID) { continue } out = append(out, a.makeRetrievalQuery(ctx, p, root, piece, rm.QueryParams{})) } return out, nil } func (a *API) ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) { mi, err := a.StateMinerInfo(ctx, miner, types.EmptyTSK) if err != nil { return api.QueryOffer{}, err } rp := rm.RetrievalPeer{ Address: miner, ID: *mi.PeerId, } return a.makeRetrievalQuery(ctx, rp, root, piece, rm.QueryParams{}), nil } func (a *API) makeRetrievalQuery(ctx context.Context, rp rm.RetrievalPeer, payload cid.Cid, piece *cid.Cid, qp rm.QueryParams) api.QueryOffer { queryResponse, err := a.Retrieval.Query(ctx, rp, payload, qp) if err != nil { return api.QueryOffer{Err: err.Error(), Miner: rp.Address, MinerPeer: rp} } var errStr string switch queryResponse.Status { case rm.QueryResponseAvailable: errStr = "" case rm.QueryResponseUnavailable: errStr = fmt.Sprintf("retrieval query offer was unavailable: %s", queryResponse.Message) case rm.QueryResponseError: errStr = fmt.Sprintf("retrieval query offer errored: %s", queryResponse.Message) } return api.QueryOffer{ Root: payload, Piece: piece, Size: queryResponse.Size, MinPrice: queryResponse.PieceRetrievalPrice(), UnsealPrice: queryResponse.UnsealPrice, PaymentInterval: queryResponse.MaxPaymentInterval, PaymentIntervalIncrease: queryResponse.MaxPaymentIntervalIncrease, Miner: queryResponse.PaymentAddress, // TODO: check MinerPeer: rp, Err: errStr, } } func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) { id, st, err := a.imgr().NewStore() if err != nil { return nil, err } if err := a.imgr().AddLabel(id, importmgr.LSource, "import"); err != nil { return nil, err } if err := a.imgr().AddLabel(id, importmgr.LFileName, ref.Path); err != nil { return nil, err } nd, err := a.clientImport(ctx, ref, st) if err != nil { return nil, err } if err := a.imgr().AddLabel(id, importmgr.LRootCid, nd.String()); err != nil { return nil, err } return &api.ImportRes{ Root: nd, ImportID: id, }, nil } func (a *API) ClientRemoveImport(ctx context.Context, importID multistore.StoreID) error { return a.imgr().Remove(importID) } func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, error) { file := files.NewReaderFile(f) id, st, err := a.imgr().NewStore() if err != nil { return cid.Undef, err } if err := a.imgr().AddLabel(id, "source", "import-local"); err != nil { return cid.Cid{}, err } bufferedDS := ipld.NewBufferedDAG(ctx, st.DAG) prefix, err := merkledag.PrefixForCidVersion(1) if err != nil { return cid.Undef, err } prefix.MhType = DefaultHashFunction params := ihelper.DagBuilderParams{ Maxlinks: build.UnixfsLinksPerLevel, RawLeaves: true, CidBuilder: cidutil.InlineBuilder{ Builder: prefix, Limit: 126, }, Dagserv: bufferedDS, } db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize))) if err != nil { return cid.Undef, err } nd, err := balanced.Layout(db) if err != nil { return cid.Undef, err } if err := a.imgr().AddLabel(id, "root", nd.Cid().String()); err != nil { return cid.Cid{}, err } return nd.Cid(), bufferedDS.Commit() } func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) { importIDs := a.imgr().List() out := make([]api.Import, len(importIDs)) for i, id := range importIDs { info, err := a.imgr().Info(id) if err != nil { out[i] = api.Import{ Key: id, Err: xerrors.Errorf("getting info: %w", err).Error(), } continue } ai := api.Import{ Key: id, Source: info.Labels[importmgr.LSource], FilePath: info.Labels[importmgr.LFileName], } if info.Labels[importmgr.LRootCid] != "" { c, err := cid.Parse(info.Labels[importmgr.LRootCid]) if err != nil { ai.Err = err.Error() } else { ai.Root = &c } } out[i] = ai } return out, nil } func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error { events := make(chan marketevents.RetrievalEvent) go a.clientRetrieve(ctx, order, ref, events) for { select { case evt, ok := <-events: if !ok { // done successfully return nil } if evt.Err != "" { return xerrors.Errorf("retrieval failed: %s", evt.Err) } case <-ctx.Done(): return xerrors.Errorf("retrieval timed out") } } } func (a *API) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { events := make(chan marketevents.RetrievalEvent) go a.clientRetrieve(ctx, order, ref, events) return events, nil } type retrievalSubscribeEvent struct { event rm.ClientEvent state rm.ClientDealState } func readSubscribeEvents(ctx context.Context, dealID retrievalmarket.DealID, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error { for { var subscribeEvent retrievalSubscribeEvent select { case <-ctx.Done(): return xerrors.New("Retrieval Timed Out") case subscribeEvent = <-subscribeEvents: if subscribeEvent.state.ID != dealID { // we can't check the deal ID ahead of time because: // 1. We need to subscribe before retrieving. // 2. We won't know the deal ID until after retrieving. continue } } select { case <-ctx.Done(): return xerrors.New("Retrieval Timed Out") case events <- marketevents.RetrievalEvent{ Event: subscribeEvent.event, Status: subscribeEvent.state.Status, BytesReceived: subscribeEvent.state.TotalReceived, FundsSpent: subscribeEvent.state.FundsSpent, }: } state := subscribeEvent.state switch state.Status { case rm.DealStatusCompleted: return nil case rm.DealStatusRejected: return xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message) case rm.DealStatusDealNotFound, rm.DealStatusErrored: return xerrors.Errorf("Retrieval Error: %s", state.Message) } } } func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef, events chan marketevents.RetrievalEvent) { defer close(events) finish := func(e error) { if e != nil { events <- marketevents.RetrievalEvent{Err: e.Error(), FundsSpent: big.Zero()} } } if order.MinerPeer.ID == "" { mi, err := a.StateMinerInfo(ctx, order.Miner, types.EmptyTSK) if err != nil { finish(err) return } order.MinerPeer = retrievalmarket.RetrievalPeer{ ID: *mi.PeerId, Address: order.Miner, } } if order.Size == 0 { finish(xerrors.Errorf("cannot make retrieval deal for zero bytes")) return } /*id, st, err := a.imgr().NewStore() if err != nil { return err } if err := a.imgr().AddLabel(id, "source", "retrieval"); err != nil { return err }*/ ppb := types.BigDiv(order.Total, types.NewInt(order.Size)) params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, shared.AllSelector(), order.Piece, order.UnsealPrice) if err != nil { finish(xerrors.Errorf("Error in retrieval params: %s", err)) return } store, err := a.RetrievalStoreMgr.NewStore() if err != nil { finish(xerrors.Errorf("Error setting up new store: %w", err)) return } defer func() { _ = a.RetrievalStoreMgr.ReleaseStore(store) }() // Subscribe to events before retrieving to avoid losing events. subscribeEvents := make(chan retrievalSubscribeEvent, 1) subscribeCtx, cancel := context.WithCancel(ctx) defer cancel() unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) { // We'll check the deal IDs inside readSubscribeEvents. if state.PayloadCID.Equals(order.Root) { select { case <-subscribeCtx.Done(): case subscribeEvents <- retrievalSubscribeEvent{event, state}: } } }) dealID, err := a.Retrieval.Retrieve( ctx, order.Root, params, order.Total, order.MinerPeer, order.Client, order.Miner, store.StoreID()) if err != nil { unsubscribe() finish(xerrors.Errorf("Retrieve failed: %w", err)) return } err = readSubscribeEvents(ctx, dealID, subscribeEvents, events) unsubscribe() if err != nil { finish(xerrors.Errorf("Retrieve: %w", err)) return } // If ref is nil, it only fetches the data into the configured blockstore. if ref == nil { finish(nil) return } rdag := store.DAGService() if ref.IsCAR { f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644) if err != nil { finish(err) return } err = car.WriteCar(ctx, rdag, []cid.Cid{order.Root}, f) if err != nil { finish(err) return } finish(f.Close()) return } nd, err := rdag.Get(ctx, order.Root) if err != nil { finish(xerrors.Errorf("ClientRetrieve: %w", err)) return } file, err := unixfile.NewUnixfsFile(ctx, rdag, nd) if err != nil { finish(xerrors.Errorf("ClientRetrieve: %w", err)) return } finish(files.WriteTo(file, ref.Path)) return } func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) { mi, err := a.StateMinerInfo(ctx, miner, types.EmptyTSK) if err != nil { return nil, xerrors.Errorf("failed getting miner info: %w", err) } info := utils.NewStorageProviderInfo(miner, mi.Worker, mi.SectorSize, p, mi.Multiaddrs) ask, err := a.SMDealClient.GetAsk(ctx, info) if err != nil { return nil, err } return ask, nil } func (a *API) ClientCalcCommP(ctx context.Context, inpath string) (*api.CommPRet, error) { // Hard-code the sector type to 32GiBV1_1, because: // - ffiwrapper.GeneratePieceCIDFromFile requires a RegisteredSealProof // - commP itself is sector-size independent, with rather low probability of that changing // ( note how the final rust call is identical for every RegSP type ) // https://github.com/filecoin-project/rust-filecoin-proofs-api/blob/v5.0.0/src/seal.rs#L1040-L1050 // // IF/WHEN this changes in the future we will have to be able to calculate // "old style" commP, and thus will need to introduce a version switch or similar arbitraryProofType := abi.RegisteredSealProof_StackedDrg32GiBV1_1 rdr, err := os.Open(inpath) if err != nil { return nil, err } defer rdr.Close() //nolint:errcheck stat, err := rdr.Stat() if err != nil { return nil, err } pieceReader, pieceSize := padreader.New(rdr, uint64(stat.Size())) commP, err := ffiwrapper.GeneratePieceCIDFromFile(arbitraryProofType, pieceReader, pieceSize) if err != nil { return nil, xerrors.Errorf("computing commP failed: %w", err) } return &api.CommPRet{ Root: commP, Size: pieceSize, }, nil } type lenWriter int64 func (w *lenWriter) Write(p []byte) (n int, err error) { *w += lenWriter(len(p)) return len(p), nil } func (a *API) ClientDealSize(ctx context.Context, root cid.Cid) (api.DataSize, error) { dag := merkledag.NewDAGService(blockservice.New(a.CombinedBstore, offline.Exchange(a.CombinedBstore))) w := lenWriter(0) err := car.WriteCar(ctx, dag, []cid.Cid{root}, &w) if err != nil { return api.DataSize{}, err } up := padreader.PaddedSize(uint64(w)) return api.DataSize{ PayloadSize: int64(w), PieceSize: up.Padded(), }, nil } func (a *API) ClientDealPieceCID(ctx context.Context, root cid.Cid) (api.DataCIDSize, error) { dag := merkledag.NewDAGService(blockservice.New(a.CombinedBstore, offline.Exchange(a.CombinedBstore))) w := &writer.Writer{} bw := bufio.NewWriterSize(w, int(writer.CommPBuf)) err := car.WriteCar(ctx, dag, []cid.Cid{root}, w) if err != nil { return api.DataCIDSize{}, err } if err := bw.Flush(); err != nil { return api.DataCIDSize{}, err } dataCIDSize, err := w.Sum() return api.DataCIDSize(dataCIDSize), err } func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath string) error { id, st, err := a.imgr().NewStore() if err != nil { return err } if err := a.imgr().AddLabel(id, "source", "gen-car"); err != nil { return err } bufferedDS := ipld.NewBufferedDAG(ctx, st.DAG) c, err := a.clientImport(ctx, ref, st) if err != nil { return err } // TODO: does that defer mean to remove the whole blockstore? defer bufferedDS.Remove(ctx, c) //nolint:errcheck ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) // entire DAG selector allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() f, err := os.Create(outputPath) if err != nil { return err } sc := car.NewSelectiveCar(ctx, st.Bstore, []car.Dag{{Root: c, Selector: allSelector}}) if err = sc.Write(f); err != nil { return err } return f.Close() } func (a *API) clientImport(ctx context.Context, ref api.FileRef, store *multistore.Store) (cid.Cid, error) { f, err := os.Open(ref.Path) if err != nil { return cid.Undef, err } defer f.Close() //nolint:errcheck stat, err := f.Stat() if err != nil { return cid.Undef, err } file, err := files.NewReaderPathFile(ref.Path, f, stat) if err != nil { return cid.Undef, err } if ref.IsCAR { var st car.Store if store.Fstore == nil { st = store.Bstore } else { st = store.Fstore } result, err := car.LoadCar(st, file) if err != nil { return cid.Undef, err } if len(result.Roots) != 1 { return cid.Undef, xerrors.New("cannot import car with more than one root") } return result.Roots[0], nil } bufDs := ipld.NewBufferedDAG(ctx, store.DAG) prefix, err := merkledag.PrefixForCidVersion(1) if err != nil { return cid.Undef, err } prefix.MhType = DefaultHashFunction params := ihelper.DagBuilderParams{ Maxlinks: build.UnixfsLinksPerLevel, RawLeaves: true, CidBuilder: cidutil.InlineBuilder{ Builder: prefix, Limit: 126, }, Dagserv: bufDs, NoCopy: true, } db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize))) if err != nil { return cid.Undef, err } nd, err := balanced.Layout(db) if err != nil { return cid.Undef, err } if err := bufDs.Commit(); err != nil { return cid.Undef, err } return nd.Cid(), nil } func (a *API) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) { inProgressChannels, err := a.DataTransfer.InProgressChannels(ctx) if err != nil { return nil, err } apiChannels := make([]api.DataTransferChannel, 0, len(inProgressChannels)) for _, channelState := range inProgressChannels { apiChannels = append(apiChannels, api.NewDataTransferChannel(a.Host.ID(), channelState)) } return apiChannels, nil } func (a *API) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) { channels := make(chan api.DataTransferChannel) unsub := a.DataTransfer.SubscribeToEvents(func(evt datatransfer.Event, channelState datatransfer.ChannelState) { channel := api.NewDataTransferChannel(a.Host.ID(), channelState) select { case <-ctx.Done(): case channels <- channel: } }) go func() { defer unsub() <-ctx.Done() }() return channels, nil } func (a *API) ClientRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error { selfPeer := a.Host.ID() if isInitiator { return a.DataTransfer.RestartDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: selfPeer, Responder: otherPeer, ID: transferID}) } return a.DataTransfer.RestartDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: otherPeer, Responder: selfPeer, ID: transferID}) } func (a *API) ClientCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error { selfPeer := a.Host.ID() if isInitiator { return a.DataTransfer.CloseDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: selfPeer, Responder: otherPeer, ID: transferID}) } return a.DataTransfer.CloseDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: otherPeer, Responder: selfPeer, ID: transferID}) } func (a *API) ClientRetrieveTryRestartInsufficientFunds(ctx context.Context, paymentChannel address.Address) error { return a.Retrieval.TryRestartInsufficientFunds(paymentChannel) } func (a *API) ClientGetDealStatus(ctx context.Context, statusCode uint64) (string, error) { ststr, ok := storagemarket.DealStates[statusCode] if !ok { return "", fmt.Errorf("no such deal state %d", statusCode) } return ststr, nil }