1203 lines
		
	
	
		
			34 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1203 lines
		
	
	
		
			34 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package client
 | |
| 
 | |
| import (
 | |
| 	"bufio"
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"os"
 | |
| 	"sort"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
 | |
| 
 | |
| 	"golang.org/x/xerrors"
 | |
| 
 | |
| 	"github.com/filecoin-project/go-padreader"
 | |
| 	"github.com/filecoin-project/go-state-types/big"
 | |
| 	"github.com/filecoin-project/go-state-types/dline"
 | |
| 	"github.com/ipfs/go-blockservice"
 | |
| 	"github.com/ipfs/go-cid"
 | |
| 	"github.com/ipfs/go-cidutil"
 | |
| 	chunker "github.com/ipfs/go-ipfs-chunker"
 | |
| 	offline "github.com/ipfs/go-ipfs-exchange-offline"
 | |
| 	files "github.com/ipfs/go-ipfs-files"
 | |
| 	ipld "github.com/ipfs/go-ipld-format"
 | |
| 	"github.com/ipfs/go-merkledag"
 | |
| 	unixfile "github.com/ipfs/go-unixfs/file"
 | |
| 	"github.com/ipfs/go-unixfs/importer/balanced"
 | |
| 	ihelper "github.com/ipfs/go-unixfs/importer/helpers"
 | |
| 	"github.com/ipld/go-car"
 | |
| 	basicnode "github.com/ipld/go-ipld-prime/node/basic"
 | |
| 	"github.com/ipld/go-ipld-prime/traversal/selector"
 | |
| 	"github.com/ipld/go-ipld-prime/traversal/selector/builder"
 | |
| 	"github.com/libp2p/go-libp2p-core/host"
 | |
| 	"github.com/libp2p/go-libp2p-core/peer"
 | |
| 	"github.com/multiformats/go-multibase"
 | |
| 	mh "github.com/multiformats/go-multihash"
 | |
| 	"go.uber.org/fx"
 | |
| 
 | |
| 	"github.com/filecoin-project/go-address"
 | |
| 	cborutil "github.com/filecoin-project/go-cbor-util"
 | |
| 	"github.com/filecoin-project/go-commp-utils/ffiwrapper"
 | |
| 	"github.com/filecoin-project/go-commp-utils/writer"
 | |
| 	datatransfer "github.com/filecoin-project/go-data-transfer"
 | |
| 	"github.com/filecoin-project/go-fil-markets/discovery"
 | |
| 	"github.com/filecoin-project/go-fil-markets/retrievalmarket"
 | |
| 	rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
 | |
| 	"github.com/filecoin-project/go-fil-markets/shared"
 | |
| 	"github.com/filecoin-project/go-fil-markets/storagemarket"
 | |
| 	"github.com/filecoin-project/go-fil-markets/storagemarket/network"
 | |
| 	"github.com/filecoin-project/go-multistore"
 | |
| 	"github.com/filecoin-project/go-state-types/abi"
 | |
| 	"github.com/filecoin-project/specs-actors/v3/actors/builtin/market"
 | |
| 
 | |
| 	marketevents "github.com/filecoin-project/lotus/markets/loggers"
 | |
| 
 | |
| 	"github.com/filecoin-project/lotus/api"
 | |
| 	"github.com/filecoin-project/lotus/build"
 | |
| 	"github.com/filecoin-project/lotus/chain/store"
 | |
| 	"github.com/filecoin-project/lotus/chain/types"
 | |
| 	"github.com/filecoin-project/lotus/markets/utils"
 | |
| 	"github.com/filecoin-project/lotus/node/impl/full"
 | |
| 	"github.com/filecoin-project/lotus/node/impl/paych"
 | |
| 	"github.com/filecoin-project/lotus/node/modules/dtypes"
 | |
| 	"github.com/filecoin-project/lotus/node/repo/importmgr"
 | |
| 	"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
 | |
| )
 | |
| 
 | |
| var DefaultHashFunction = uint64(mh.BLAKE2B_MIN + 31)
 | |
| 
 | |
| // 8 days ~=  SealDuration + PreCommit + MaxProveCommitDuration + 8 hour buffer
 | |
| const dealStartBufferHours uint64 = 8 * 24
 | |
| 
 | |
| type API struct {
 | |
| 	fx.In
 | |
| 
 | |
| 	full.ChainAPI
 | |
| 	full.WalletAPI
 | |
| 	paych.PaychAPI
 | |
| 	full.StateAPI
 | |
| 
 | |
| 	SMDealClient storagemarket.StorageClient
 | |
| 	RetDiscovery discovery.PeerResolver
 | |
| 	Retrieval    rm.RetrievalClient
 | |
| 	Chain        *store.ChainStore
 | |
| 
 | |
| 	Imports dtypes.ClientImportMgr
 | |
| 	Mds     dtypes.ClientMultiDstore
 | |
| 
 | |
| 	CombinedBstore    dtypes.ClientBlockstore // TODO: try to remove
 | |
| 	RetrievalStoreMgr dtypes.ClientRetrievalStoreManager
 | |
| 	DataTransfer      dtypes.ClientDataTransfer
 | |
| 	Host              host.Host
 | |
| }
 | |
| 
 | |
| func calcDealExpiration(minDuration uint64, md *dline.Info, startEpoch abi.ChainEpoch) abi.ChainEpoch {
 | |
| 	// Make sure we give some time for the miner to seal
 | |
| 	minExp := startEpoch + abi.ChainEpoch(minDuration)
 | |
| 
 | |
| 	// Align on miners ProvingPeriodBoundary
 | |
| 	exp := minExp + md.WPoStProvingPeriod - (minExp % md.WPoStProvingPeriod) + (md.PeriodStart % md.WPoStProvingPeriod) - 1
 | |
| 	// Should only be possible for miners created around genesis
 | |
| 	for exp < minExp {
 | |
| 		exp += md.WPoStProvingPeriod
 | |
| 	}
 | |
| 
 | |
| 	return exp
 | |
| }
 | |
| 
 | |
| func (a *API) imgr() *importmgr.Mgr {
 | |
| 	return a.Imports
 | |
| }
 | |
| 
 | |
| func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) {
 | |
| 	return a.dealStarter(ctx, params, false)
 | |
| }
 | |
| 
 | |
| func (a *API) ClientStatelessDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) {
 | |
| 	return a.dealStarter(ctx, params, true)
 | |
| }
 | |
| 
 | |
| func (a *API) dealStarter(ctx context.Context, params *api.StartDealParams, isStateless bool) (*cid.Cid, error) {
 | |
| 	var storeID *multistore.StoreID
 | |
| 	if isStateless {
 | |
| 		if params.Data.TransferType != storagemarket.TTManual {
 | |
| 			return nil, xerrors.Errorf("invalid transfer type %s for stateless storage deal", params.Data.TransferType)
 | |
| 		}
 | |
| 		if !params.EpochPrice.IsZero() {
 | |
| 			return nil, xerrors.New("stateless storage deals can only be initiated with storage price of 0")
 | |
| 		}
 | |
| 	} else if params.Data.TransferType == storagemarket.TTGraphsync {
 | |
| 		importIDs := a.imgr().List()
 | |
| 		for _, importID := range importIDs {
 | |
| 			info, err := a.imgr().Info(importID)
 | |
| 			if err != nil {
 | |
| 				continue
 | |
| 			}
 | |
| 			if info.Labels[importmgr.LRootCid] == "" {
 | |
| 				continue
 | |
| 			}
 | |
| 			c, err := cid.Parse(info.Labels[importmgr.LRootCid])
 | |
| 			if err != nil {
 | |
| 				continue
 | |
| 			}
 | |
| 			if c.Equals(params.Data.Root) {
 | |
| 				storeID = &importID //nolint
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	walletKey, err := a.StateAccountKey(ctx, params.Wallet, types.EmptyTSK)
 | |
| 	if err != nil {
 | |
| 		return nil, xerrors.Errorf("failed resolving params.Wallet addr: %w", params.Wallet)
 | |
| 	}
 | |
| 
 | |
| 	exist, err := a.WalletHas(ctx, walletKey)
 | |
| 	if err != nil {
 | |
| 		return nil, xerrors.Errorf("failed getting addr from wallet: %w", params.Wallet)
 | |
| 	}
 | |
| 	if !exist {
 | |
| 		return nil, xerrors.Errorf("provided address doesn't exist in wallet")
 | |
| 	}
 | |
| 
 | |
| 	mi, err := a.StateMinerInfo(ctx, params.Miner, types.EmptyTSK)
 | |
| 	if err != nil {
 | |
| 		return nil, xerrors.Errorf("failed getting peer ID: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	md, err := a.StateMinerProvingDeadline(ctx, params.Miner, types.EmptyTSK)
 | |
| 	if err != nil {
 | |
| 		return nil, xerrors.Errorf("failed getting miner's deadline info: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	if uint64(params.Data.PieceSize.Padded()) > uint64(mi.SectorSize) {
 | |
| 		return nil, xerrors.New("data doesn't fit in a sector")
 | |
| 	}
 | |
| 
 | |
| 	dealStart := params.DealStartEpoch
 | |
| 	if dealStart <= 0 { // unset, or explicitly 'epoch undefined'
 | |
| 		ts, err := a.ChainHead(ctx)
 | |
| 		if err != nil {
 | |
| 			return nil, xerrors.Errorf("failed getting chain height: %w", err)
 | |
| 		}
 | |
| 
 | |
| 		blocksPerHour := 60 * 60 / build.BlockDelaySecs
 | |
| 		dealStart = ts.Height() + abi.ChainEpoch(dealStartBufferHours*blocksPerHour) // TODO: Get this from storage ask
 | |
| 	}
 | |
| 
 | |
| 	networkVersion, err := a.StateNetworkVersion(ctx, types.EmptyTSK)
 | |
| 	if err != nil {
 | |
| 		return nil, xerrors.Errorf("failed to get network version: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	st, err := miner.PreferredSealProofTypeFromWindowPoStType(networkVersion, mi.WindowPoStProofType)
 | |
| 	if err != nil {
 | |
| 		return nil, xerrors.Errorf("failed to get seal proof type: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// regular flow
 | |
| 	if !isStateless {
 | |
| 		providerInfo := utils.NewStorageProviderInfo(params.Miner, mi.Worker, mi.SectorSize, *mi.PeerId, mi.Multiaddrs)
 | |
| 
 | |
| 		result, err := a.SMDealClient.ProposeStorageDeal(ctx, storagemarket.ProposeStorageDealParams{
 | |
| 			Addr:          params.Wallet,
 | |
| 			Info:          &providerInfo,
 | |
| 			Data:          params.Data,
 | |
| 			StartEpoch:    dealStart,
 | |
| 			EndEpoch:      calcDealExpiration(params.MinBlocksDuration, md, dealStart),
 | |
| 			Price:         params.EpochPrice,
 | |
| 			Collateral:    params.ProviderCollateral,
 | |
| 			Rt:            st,
 | |
| 			FastRetrieval: params.FastRetrieval,
 | |
| 			VerifiedDeal:  params.VerifiedDeal,
 | |
| 			StoreID:       storeID,
 | |
| 		})
 | |
| 
 | |
| 		if err != nil {
 | |
| 			return nil, xerrors.Errorf("failed to start deal: %w", err)
 | |
| 		}
 | |
| 
 | |
| 		return &result.ProposalCid, nil
 | |
| 	}
 | |
| 
 | |
| 	//
 | |
| 	// stateless flow from here to the end
 | |
| 	//
 | |
| 
 | |
| 	dealProposal := &market.DealProposal{
 | |
| 		PieceCID:             *params.Data.PieceCid,
 | |
| 		PieceSize:            params.Data.PieceSize.Padded(),
 | |
| 		Client:               walletKey,
 | |
| 		Provider:             params.Miner,
 | |
| 		Label:                params.Data.Root.Encode(multibase.MustNewEncoder('u')),
 | |
| 		StartEpoch:           dealStart,
 | |
| 		EndEpoch:             calcDealExpiration(params.MinBlocksDuration, md, dealStart),
 | |
| 		StoragePricePerEpoch: big.Zero(),
 | |
| 		ProviderCollateral:   params.ProviderCollateral,
 | |
| 		ClientCollateral:     big.Zero(),
 | |
| 		VerifiedDeal:         params.VerifiedDeal,
 | |
| 	}
 | |
| 
 | |
| 	if dealProposal.ProviderCollateral.IsZero() {
 | |
| 		networkCollateral, err := a.StateDealProviderCollateralBounds(ctx, params.Data.PieceSize.Padded(), params.VerifiedDeal, types.EmptyTSK)
 | |
| 		if err != nil {
 | |
| 			return nil, xerrors.Errorf("failed to determine minimum provider collateral: %w", err)
 | |
| 		}
 | |
| 		dealProposal.ProviderCollateral = networkCollateral.Min
 | |
| 	}
 | |
| 
 | |
| 	dealProposalSerialized, err := cborutil.Dump(dealProposal)
 | |
| 	if err != nil {
 | |
| 		return nil, xerrors.Errorf("failed to serialize deal proposal: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	dealProposalSig, err := a.WalletSign(ctx, walletKey, dealProposalSerialized)
 | |
| 	if err != nil {
 | |
| 		return nil, xerrors.Errorf("failed to sign proposal : %w", err)
 | |
| 	}
 | |
| 
 | |
| 	dealProposalSigned := &market.ClientDealProposal{
 | |
| 		Proposal:        *dealProposal,
 | |
| 		ClientSignature: *dealProposalSig,
 | |
| 	}
 | |
| 	dStream, err := network.NewFromLibp2pHost(a.Host,
 | |
| 		// params duplicated from .../node/modules/client.go
 | |
| 		// https://github.com/filecoin-project/lotus/pull/5961#discussion_r629768011
 | |
| 		network.RetryParameters(time.Second, 5*time.Minute, 15, 5),
 | |
| 	).NewDealStream(ctx, *mi.PeerId)
 | |
| 	if err != nil {
 | |
| 		return nil, xerrors.Errorf("opening dealstream to %s/%s failed: %w", params.Miner, *mi.PeerId, err)
 | |
| 	}
 | |
| 
 | |
| 	if err = dStream.WriteDealProposal(network.Proposal{
 | |
| 		FastRetrieval: true,
 | |
| 		DealProposal:  dealProposalSigned,
 | |
| 		Piece: &storagemarket.DataRef{
 | |
| 			TransferType: storagemarket.TTManual,
 | |
| 			Root:         params.Data.Root,
 | |
| 			PieceCid:     params.Data.PieceCid,
 | |
| 			PieceSize:    params.Data.PieceSize,
 | |
| 		},
 | |
| 	}); err != nil {
 | |
| 		return nil, xerrors.Errorf("sending deal proposal failed: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	resp, _, err := dStream.ReadDealResponse()
 | |
| 	if err != nil {
 | |
| 		return nil, xerrors.Errorf("reading proposal response failed: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	dealProposalIpld, err := cborutil.AsIpld(dealProposalSigned)
 | |
| 	if err != nil {
 | |
| 		return nil, xerrors.Errorf("serializing proposal node failed: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	if !dealProposalIpld.Cid().Equals(resp.Response.Proposal) {
 | |
| 		return nil, xerrors.Errorf("provider returned proposal cid %s but we expected %s", resp.Response.Proposal, dealProposalIpld.Cid())
 | |
| 	}
 | |
| 
 | |
| 	if resp.Response.State != storagemarket.StorageDealWaitingForData {
 | |
| 		return nil, xerrors.Errorf("provider returned unexpected state %d for proposal %s, with message: %s", resp.Response.State, resp.Response.Proposal, resp.Response.Message)
 | |
| 	}
 | |
| 
 | |
| 	return &resp.Response.Proposal, nil
 | |
| }
 | |
| 
 | |
| func (a *API) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) {
 | |
| 	deals, err := a.SMDealClient.ListLocalDeals(ctx)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// Get a map of transfer ID => DataTransfer
 | |
| 	dataTransfersByID, err := a.transfersByID(ctx)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	out := make([]api.DealInfo, len(deals))
 | |
| 	for k, v := range deals {
 | |
| 		// Find the data transfer associated with this deal
 | |
| 		var transferCh *api.DataTransferChannel
 | |
| 		if v.TransferChannelID != nil {
 | |
| 			if ch, ok := dataTransfersByID[*v.TransferChannelID]; ok {
 | |
| 				transferCh = &ch
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		out[k] = a.newDealInfoWithTransfer(transferCh, v)
 | |
| 	}
 | |
| 
 | |
| 	return out, nil
 | |
| }
 | |
| 
 | |
| func (a *API) transfersByID(ctx context.Context) (map[datatransfer.ChannelID]api.DataTransferChannel, error) {
 | |
| 	inProgressChannels, err := a.DataTransfer.InProgressChannels(ctx)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	dataTransfersByID := make(map[datatransfer.ChannelID]api.DataTransferChannel, len(inProgressChannels))
 | |
| 	for id, channelState := range inProgressChannels {
 | |
| 		ch := api.NewDataTransferChannel(a.Host.ID(), channelState)
 | |
| 		dataTransfersByID[id] = ch
 | |
| 	}
 | |
| 	return dataTransfersByID, nil
 | |
| }
 | |
| 
 | |
| func (a *API) ClientGetDealInfo(ctx context.Context, d cid.Cid) (*api.DealInfo, error) {
 | |
| 	v, err := a.SMDealClient.GetLocalDeal(ctx, d)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	di := a.newDealInfo(ctx, v)
 | |
| 	return &di, nil
 | |
| }
 | |
| 
 | |
| func (a *API) ClientGetDealUpdates(ctx context.Context) (<-chan api.DealInfo, error) {
 | |
| 	updates := make(chan api.DealInfo)
 | |
| 
 | |
| 	unsub := a.SMDealClient.SubscribeToEvents(func(_ storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
 | |
| 		updates <- a.newDealInfo(ctx, deal)
 | |
| 	})
 | |
| 
 | |
| 	go func() {
 | |
| 		defer unsub()
 | |
| 		<-ctx.Done()
 | |
| 	}()
 | |
| 
 | |
| 	return updates, nil
 | |
| }
 | |
| 
 | |
| func (a *API) newDealInfo(ctx context.Context, v storagemarket.ClientDeal) api.DealInfo {
 | |
| 	// Find the data transfer associated with this deal
 | |
| 	var transferCh *api.DataTransferChannel
 | |
| 	if v.TransferChannelID != nil {
 | |
| 		state, err := a.DataTransfer.ChannelState(ctx, *v.TransferChannelID)
 | |
| 
 | |
| 		// Note: If there was an error just ignore it, as the data transfer may
 | |
| 		// be not found if it's no longer active
 | |
| 		if err == nil {
 | |
| 			ch := api.NewDataTransferChannel(a.Host.ID(), state)
 | |
| 			ch.Stages = state.Stages()
 | |
| 			transferCh = &ch
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	di := a.newDealInfoWithTransfer(transferCh, v)
 | |
| 	di.DealStages = v.DealStages
 | |
| 	return di
 | |
| }
 | |
| 
 | |
| func (a *API) newDealInfoWithTransfer(transferCh *api.DataTransferChannel, v storagemarket.ClientDeal) api.DealInfo {
 | |
| 	return api.DealInfo{
 | |
| 		ProposalCid:       v.ProposalCid,
 | |
| 		DataRef:           v.DataRef,
 | |
| 		State:             v.State,
 | |
| 		Message:           v.Message,
 | |
| 		Provider:          v.Proposal.Provider,
 | |
| 		PieceCID:          v.Proposal.PieceCID,
 | |
| 		Size:              uint64(v.Proposal.PieceSize.Unpadded()),
 | |
| 		PricePerEpoch:     v.Proposal.StoragePricePerEpoch,
 | |
| 		Duration:          uint64(v.Proposal.Duration()),
 | |
| 		DealID:            v.DealID,
 | |
| 		CreationTime:      v.CreationTime.Time(),
 | |
| 		Verified:          v.Proposal.VerifiedDeal,
 | |
| 		TransferChannelID: v.TransferChannelID,
 | |
| 		DataTransfer:      transferCh,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (a *API) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) {
 | |
| 	// TODO: check if we have the ENTIRE dag
 | |
| 
 | |
| 	offExch := merkledag.NewDAGService(blockservice.New(a.Imports.Blockstore, offline.Exchange(a.Imports.Blockstore)))
 | |
| 	_, err := offExch.Get(ctx, root)
 | |
| 	if err == ipld.ErrNotFound {
 | |
| 		return false, nil
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		return false, err
 | |
| 	}
 | |
| 	return true, nil
 | |
| }
 | |
| 
 | |
| func (a *API) ClientFindData(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) {
 | |
| 	peers, err := a.RetDiscovery.GetPeers(root)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	out := make([]api.QueryOffer, 0, len(peers))
 | |
| 	for _, p := range peers {
 | |
| 		if piece != nil && !piece.Equals(*p.PieceCID) {
 | |
| 			continue
 | |
| 		}
 | |
| 		out = append(out, a.makeRetrievalQuery(ctx, p, root, piece, rm.QueryParams{}))
 | |
| 	}
 | |
| 
 | |
| 	return out, nil
 | |
| }
 | |
| 
 | |
| func (a *API) ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) {
 | |
| 	mi, err := a.StateMinerInfo(ctx, miner, types.EmptyTSK)
 | |
| 	if err != nil {
 | |
| 		return api.QueryOffer{}, err
 | |
| 	}
 | |
| 	rp := rm.RetrievalPeer{
 | |
| 		Address: miner,
 | |
| 		ID:      *mi.PeerId,
 | |
| 	}
 | |
| 	return a.makeRetrievalQuery(ctx, rp, root, piece, rm.QueryParams{}), nil
 | |
| }
 | |
| 
 | |
| func (a *API) makeRetrievalQuery(ctx context.Context, rp rm.RetrievalPeer, payload cid.Cid, piece *cid.Cid, qp rm.QueryParams) api.QueryOffer {
 | |
| 	queryResponse, err := a.Retrieval.Query(ctx, rp, payload, qp)
 | |
| 	if err != nil {
 | |
| 		return api.QueryOffer{Err: err.Error(), Miner: rp.Address, MinerPeer: rp}
 | |
| 	}
 | |
| 	var errStr string
 | |
| 	switch queryResponse.Status {
 | |
| 	case rm.QueryResponseAvailable:
 | |
| 		errStr = ""
 | |
| 	case rm.QueryResponseUnavailable:
 | |
| 		errStr = fmt.Sprintf("retrieval query offer was unavailable: %s", queryResponse.Message)
 | |
| 	case rm.QueryResponseError:
 | |
| 		errStr = fmt.Sprintf("retrieval query offer errored: %s", queryResponse.Message)
 | |
| 	}
 | |
| 
 | |
| 	return api.QueryOffer{
 | |
| 		Root:                    payload,
 | |
| 		Piece:                   piece,
 | |
| 		Size:                    queryResponse.Size,
 | |
| 		MinPrice:                queryResponse.PieceRetrievalPrice(),
 | |
| 		UnsealPrice:             queryResponse.UnsealPrice,
 | |
| 		PaymentInterval:         queryResponse.MaxPaymentInterval,
 | |
| 		PaymentIntervalIncrease: queryResponse.MaxPaymentIntervalIncrease,
 | |
| 		Miner:                   queryResponse.PaymentAddress, // TODO: check
 | |
| 		MinerPeer:               rp,
 | |
| 		Err:                     errStr,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) {
 | |
| 	id, st, err := a.imgr().NewStore()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	if err := a.imgr().AddLabel(id, importmgr.LSource, "import"); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	if err := a.imgr().AddLabel(id, importmgr.LFileName, ref.Path); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	nd, err := a.clientImport(ctx, ref, st)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	if err := a.imgr().AddLabel(id, importmgr.LRootCid, nd.String()); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return &api.ImportRes{
 | |
| 		Root:     nd,
 | |
| 		ImportID: id,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (a *API) ClientRemoveImport(ctx context.Context, importID multistore.StoreID) error {
 | |
| 	return a.imgr().Remove(importID)
 | |
| }
 | |
| 
 | |
| func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, error) {
 | |
| 	file := files.NewReaderFile(f)
 | |
| 
 | |
| 	id, st, err := a.imgr().NewStore()
 | |
| 	if err != nil {
 | |
| 		return cid.Undef, err
 | |
| 	}
 | |
| 	if err := a.imgr().AddLabel(id, "source", "import-local"); err != nil {
 | |
| 		return cid.Cid{}, err
 | |
| 	}
 | |
| 
 | |
| 	bufferedDS := ipld.NewBufferedDAG(ctx, st.DAG)
 | |
| 
 | |
| 	prefix, err := merkledag.PrefixForCidVersion(1)
 | |
| 	if err != nil {
 | |
| 		return cid.Undef, err
 | |
| 	}
 | |
| 	prefix.MhType = DefaultHashFunction
 | |
| 
 | |
| 	params := ihelper.DagBuilderParams{
 | |
| 		Maxlinks:  build.UnixfsLinksPerLevel,
 | |
| 		RawLeaves: true,
 | |
| 		CidBuilder: cidutil.InlineBuilder{
 | |
| 			Builder: prefix,
 | |
| 			Limit:   126,
 | |
| 		},
 | |
| 		Dagserv: bufferedDS,
 | |
| 	}
 | |
| 
 | |
| 	db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize)))
 | |
| 	if err != nil {
 | |
| 		return cid.Undef, err
 | |
| 	}
 | |
| 	nd, err := balanced.Layout(db)
 | |
| 	if err != nil {
 | |
| 		return cid.Undef, err
 | |
| 	}
 | |
| 	if err := a.imgr().AddLabel(id, "root", nd.Cid().String()); err != nil {
 | |
| 		return cid.Cid{}, err
 | |
| 	}
 | |
| 
 | |
| 	return nd.Cid(), bufferedDS.Commit()
 | |
| }
 | |
| 
 | |
| func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) {
 | |
| 	importIDs := a.imgr().List()
 | |
| 
 | |
| 	out := make([]api.Import, len(importIDs))
 | |
| 	for i, id := range importIDs {
 | |
| 		info, err := a.imgr().Info(id)
 | |
| 		if err != nil {
 | |
| 			out[i] = api.Import{
 | |
| 				Key: id,
 | |
| 				Err: xerrors.Errorf("getting info: %w", err).Error(),
 | |
| 			}
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		ai := api.Import{
 | |
| 			Key:      id,
 | |
| 			Source:   info.Labels[importmgr.LSource],
 | |
| 			FilePath: info.Labels[importmgr.LFileName],
 | |
| 		}
 | |
| 
 | |
| 		if info.Labels[importmgr.LRootCid] != "" {
 | |
| 			c, err := cid.Parse(info.Labels[importmgr.LRootCid])
 | |
| 			if err != nil {
 | |
| 				ai.Err = err.Error()
 | |
| 			} else {
 | |
| 				ai.Root = &c
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		out[i] = ai
 | |
| 	}
 | |
| 
 | |
| 	return out, nil
 | |
| }
 | |
| 
 | |
| func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID retrievalmarket.DealID) error {
 | |
| 	cerr := make(chan error)
 | |
| 	go func() {
 | |
| 		err := a.Retrieval.CancelDeal(dealID)
 | |
| 
 | |
| 		select {
 | |
| 		case cerr <- err:
 | |
| 		case <-ctx.Done():
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	select {
 | |
| 	case err := <-cerr:
 | |
| 		if err != nil {
 | |
| 			return xerrors.Errorf("failed to cancel retrieval deal: %w", err)
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	case <-ctx.Done():
 | |
| 		return xerrors.Errorf("context timeout while canceling retrieval deal: %w", ctx.Err())
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error {
 | |
| 	events := make(chan marketevents.RetrievalEvent)
 | |
| 	go a.clientRetrieve(ctx, order, ref, events)
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case evt, ok := <-events:
 | |
| 			if !ok { // done successfully
 | |
| 				return nil
 | |
| 			}
 | |
| 
 | |
| 			if evt.Err != "" {
 | |
| 				return xerrors.Errorf("retrieval failed: %s", evt.Err)
 | |
| 			}
 | |
| 		case <-ctx.Done():
 | |
| 			return xerrors.Errorf("retrieval timed out")
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (a *API) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
 | |
| 	events := make(chan marketevents.RetrievalEvent)
 | |
| 	go a.clientRetrieve(ctx, order, ref, events)
 | |
| 	return events, nil
 | |
| }
 | |
| 
 | |
| type retrievalSubscribeEvent struct {
 | |
| 	event rm.ClientEvent
 | |
| 	state rm.ClientDealState
 | |
| }
 | |
| 
 | |
| func readSubscribeEvents(ctx context.Context, dealID retrievalmarket.DealID, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error {
 | |
| 	for {
 | |
| 		var subscribeEvent retrievalSubscribeEvent
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return xerrors.New("Retrieval Timed Out")
 | |
| 		case subscribeEvent = <-subscribeEvents:
 | |
| 			if subscribeEvent.state.ID != dealID {
 | |
| 				// we can't check the deal ID ahead of time because:
 | |
| 				// 1. We need to subscribe before retrieving.
 | |
| 				// 2. We won't know the deal ID until after retrieving.
 | |
| 				continue
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return xerrors.New("Retrieval Timed Out")
 | |
| 		case events <- marketevents.RetrievalEvent{
 | |
| 			Event:         subscribeEvent.event,
 | |
| 			Status:        subscribeEvent.state.Status,
 | |
| 			BytesReceived: subscribeEvent.state.TotalReceived,
 | |
| 			FundsSpent:    subscribeEvent.state.FundsSpent,
 | |
| 		}:
 | |
| 		}
 | |
| 
 | |
| 		state := subscribeEvent.state
 | |
| 		switch state.Status {
 | |
| 		case rm.DealStatusCompleted:
 | |
| 			return nil
 | |
| 		case rm.DealStatusRejected:
 | |
| 			return xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message)
 | |
| 		case
 | |
| 			rm.DealStatusDealNotFound,
 | |
| 			rm.DealStatusErrored:
 | |
| 			return xerrors.Errorf("Retrieval Error: %s", state.Message)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef, events chan marketevents.RetrievalEvent) {
 | |
| 	defer close(events)
 | |
| 
 | |
| 	finish := func(e error) {
 | |
| 		if e != nil {
 | |
| 			events <- marketevents.RetrievalEvent{Err: e.Error(), FundsSpent: big.Zero()}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	var store retrievalstoremgr.RetrievalStore
 | |
| 
 | |
| 	if order.LocalStore == nil {
 | |
| 		if order.MinerPeer == nil || order.MinerPeer.ID == "" {
 | |
| 			mi, err := a.StateMinerInfo(ctx, order.Miner, types.EmptyTSK)
 | |
| 			if err != nil {
 | |
| 				finish(err)
 | |
| 				return
 | |
| 			}
 | |
| 
 | |
| 			order.MinerPeer = &retrievalmarket.RetrievalPeer{
 | |
| 				ID:      *mi.PeerId,
 | |
| 				Address: order.Miner,
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if order.Total.Int == nil {
 | |
| 			finish(xerrors.Errorf("cannot make retrieval deal for null total"))
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		if order.Size == 0 {
 | |
| 			finish(xerrors.Errorf("cannot make retrieval deal for zero bytes"))
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		/*id, st, err := a.imgr().NewStore()
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		if err := a.imgr().AddLabel(id, "source", "retrieval"); err != nil {
 | |
| 			return err
 | |
| 		}*/
 | |
| 
 | |
| 		ppb := types.BigDiv(order.Total, types.NewInt(order.Size))
 | |
| 
 | |
| 		params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, shared.AllSelector(), order.Piece, order.UnsealPrice)
 | |
| 		if err != nil {
 | |
| 			finish(xerrors.Errorf("Error in retrieval params: %s", err))
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		store, err = a.RetrievalStoreMgr.NewStore()
 | |
| 		if err != nil {
 | |
| 			finish(xerrors.Errorf("Error setting up new store: %w", err))
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		defer func() {
 | |
| 			_ = a.RetrievalStoreMgr.ReleaseStore(store)
 | |
| 		}()
 | |
| 
 | |
| 		// Subscribe to events before retrieving to avoid losing events.
 | |
| 		subscribeEvents := make(chan retrievalSubscribeEvent, 1)
 | |
| 		subscribeCtx, cancel := context.WithCancel(ctx)
 | |
| 		defer cancel()
 | |
| 		unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
 | |
| 			// We'll check the deal IDs inside readSubscribeEvents.
 | |
| 			if state.PayloadCID.Equals(order.Root) {
 | |
| 				select {
 | |
| 				case <-subscribeCtx.Done():
 | |
| 				case subscribeEvents <- retrievalSubscribeEvent{event, state}:
 | |
| 				}
 | |
| 			}
 | |
| 		})
 | |
| 
 | |
| 		dealID, err := a.Retrieval.Retrieve(
 | |
| 			ctx,
 | |
| 			order.Root,
 | |
| 			params,
 | |
| 			order.Total,
 | |
| 			*order.MinerPeer,
 | |
| 			order.Client,
 | |
| 			order.Miner,
 | |
| 			store.StoreID())
 | |
| 
 | |
| 		if err != nil {
 | |
| 			unsubscribe()
 | |
| 			finish(xerrors.Errorf("Retrieve failed: %w", err))
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		err = readSubscribeEvents(ctx, dealID, subscribeEvents, events)
 | |
| 
 | |
| 		unsubscribe()
 | |
| 		if err != nil {
 | |
| 			finish(xerrors.Errorf("Retrieve: %w", err))
 | |
| 			return
 | |
| 		}
 | |
| 	} else {
 | |
| 		// local retrieval
 | |
| 		st, err := ((*multistore.MultiStore)(a.Mds)).Get(*order.LocalStore)
 | |
| 		if err != nil {
 | |
| 			finish(xerrors.Errorf("Retrieve: %w", err))
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		store = &multiStoreRetrievalStore{
 | |
| 			storeID: *order.LocalStore,
 | |
| 			store:   st,
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// If ref is nil, it only fetches the data into the configured blockstore.
 | |
| 	if ref == nil {
 | |
| 		finish(nil)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	rdag := store.DAGService()
 | |
| 
 | |
| 	if ref.IsCAR {
 | |
| 		f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
 | |
| 		if err != nil {
 | |
| 			finish(err)
 | |
| 			return
 | |
| 		}
 | |
| 		err = car.WriteCar(ctx, rdag, []cid.Cid{order.Root}, f)
 | |
| 		if err != nil {
 | |
| 			finish(err)
 | |
| 			return
 | |
| 		}
 | |
| 		finish(f.Close())
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	nd, err := rdag.Get(ctx, order.Root)
 | |
| 	if err != nil {
 | |
| 		finish(xerrors.Errorf("ClientRetrieve: %w", err))
 | |
| 		return
 | |
| 	}
 | |
| 	file, err := unixfile.NewUnixfsFile(ctx, rdag, nd)
 | |
| 	if err != nil {
 | |
| 		finish(xerrors.Errorf("ClientRetrieve: %w", err))
 | |
| 		return
 | |
| 	}
 | |
| 	finish(files.WriteTo(file, ref.Path))
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (a *API) ClientListRetrievals(ctx context.Context) ([]api.RetrievalInfo, error) {
 | |
| 	deals, err := a.Retrieval.ListDeals()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	dataTransfersByID, err := a.transfersByID(ctx)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	out := make([]api.RetrievalInfo, 0, len(deals))
 | |
| 	for _, v := range deals {
 | |
| 		// Find the data transfer associated with this deal
 | |
| 		var transferCh *api.DataTransferChannel
 | |
| 		if v.ChannelID != nil {
 | |
| 			if ch, ok := dataTransfersByID[*v.ChannelID]; ok {
 | |
| 				transferCh = &ch
 | |
| 			}
 | |
| 		}
 | |
| 		out = append(out, a.newRetrievalInfoWithTransfer(transferCh, v))
 | |
| 	}
 | |
| 	sort.Slice(out, func(a, b int) bool {
 | |
| 		return out[a].ID < out[b].ID
 | |
| 	})
 | |
| 	return out, nil
 | |
| }
 | |
| 
 | |
| func (a *API) ClientGetRetrievalUpdates(ctx context.Context) (<-chan api.RetrievalInfo, error) {
 | |
| 	updates := make(chan api.RetrievalInfo)
 | |
| 
 | |
| 	unsub := a.Retrieval.SubscribeToEvents(func(_ rm.ClientEvent, deal rm.ClientDealState) {
 | |
| 		updates <- a.newRetrievalInfo(ctx, deal)
 | |
| 	})
 | |
| 
 | |
| 	go func() {
 | |
| 		defer unsub()
 | |
| 		<-ctx.Done()
 | |
| 	}()
 | |
| 
 | |
| 	return updates, nil
 | |
| }
 | |
| 
 | |
| func (a *API) newRetrievalInfoWithTransfer(ch *api.DataTransferChannel, deal rm.ClientDealState) api.RetrievalInfo {
 | |
| 	return api.RetrievalInfo{
 | |
| 		PayloadCID:        deal.PayloadCID,
 | |
| 		ID:                deal.ID,
 | |
| 		PieceCID:          deal.PieceCID,
 | |
| 		PricePerByte:      deal.PricePerByte,
 | |
| 		UnsealPrice:       deal.UnsealPrice,
 | |
| 		Status:            deal.Status,
 | |
| 		Message:           deal.Message,
 | |
| 		Provider:          deal.Sender,
 | |
| 		BytesReceived:     deal.TotalReceived,
 | |
| 		BytesPaidFor:      deal.BytesPaidFor,
 | |
| 		TotalPaid:         deal.FundsSpent,
 | |
| 		TransferChannelID: deal.ChannelID,
 | |
| 		DataTransfer:      ch,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (a *API) newRetrievalInfo(ctx context.Context, v rm.ClientDealState) api.RetrievalInfo {
 | |
| 	// Find the data transfer associated with this deal
 | |
| 	var transferCh *api.DataTransferChannel
 | |
| 	if v.ChannelID != nil {
 | |
| 		state, err := a.DataTransfer.ChannelState(ctx, *v.ChannelID)
 | |
| 
 | |
| 		// Note: If there was an error just ignore it, as the data transfer may
 | |
| 		// be not found if it's no longer active
 | |
| 		if err == nil {
 | |
| 			ch := api.NewDataTransferChannel(a.Host.ID(), state)
 | |
| 			ch.Stages = state.Stages()
 | |
| 			transferCh = &ch
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return a.newRetrievalInfoWithTransfer(transferCh, v)
 | |
| }
 | |
| 
 | |
| type multiStoreRetrievalStore struct {
 | |
| 	storeID multistore.StoreID
 | |
| 	store   *multistore.Store
 | |
| }
 | |
| 
 | |
| func (mrs *multiStoreRetrievalStore) StoreID() *multistore.StoreID {
 | |
| 	return &mrs.storeID
 | |
| }
 | |
| 
 | |
| func (mrs *multiStoreRetrievalStore) DAGService() ipld.DAGService {
 | |
| 	return mrs.store.DAG
 | |
| }
 | |
| 
 | |
| func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) {
 | |
| 	mi, err := a.StateMinerInfo(ctx, miner, types.EmptyTSK)
 | |
| 	if err != nil {
 | |
| 		return nil, xerrors.Errorf("failed getting miner info: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	info := utils.NewStorageProviderInfo(miner, mi.Worker, mi.SectorSize, p, mi.Multiaddrs)
 | |
| 	ask, err := a.SMDealClient.GetAsk(ctx, info)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return ask, nil
 | |
| }
 | |
| 
 | |
| func (a *API) ClientCalcCommP(ctx context.Context, inpath string) (*api.CommPRet, error) {
 | |
| 
 | |
| 	// Hard-code the sector type to 32GiBV1_1, because:
 | |
| 	// - ffiwrapper.GeneratePieceCIDFromFile requires a RegisteredSealProof
 | |
| 	// - commP itself is sector-size independent, with rather low probability of that changing
 | |
| 	//   ( note how the final rust call is identical for every RegSP type )
 | |
| 	//   https://github.com/filecoin-project/rust-filecoin-proofs-api/blob/v5.0.0/src/seal.rs#L1040-L1050
 | |
| 	//
 | |
| 	// IF/WHEN this changes in the future we will have to be able to calculate
 | |
| 	// "old style" commP, and thus will need to introduce a version switch or similar
 | |
| 	arbitraryProofType := abi.RegisteredSealProof_StackedDrg32GiBV1_1
 | |
| 
 | |
| 	rdr, err := os.Open(inpath)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	defer rdr.Close() //nolint:errcheck
 | |
| 
 | |
| 	stat, err := rdr.Stat()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// check that the data is a car file; if it's not, retrieval won't work
 | |
| 	_, _, err = car.ReadHeader(bufio.NewReader(rdr))
 | |
| 	if err != nil {
 | |
| 		return nil, xerrors.Errorf("not a car file: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	if _, err := rdr.Seek(0, io.SeekStart); err != nil {
 | |
| 		return nil, xerrors.Errorf("seek to start: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	pieceReader, pieceSize := padreader.New(rdr, uint64(stat.Size()))
 | |
| 	commP, err := ffiwrapper.GeneratePieceCIDFromFile(arbitraryProofType, pieceReader, pieceSize)
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return nil, xerrors.Errorf("computing commP failed: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	return &api.CommPRet{
 | |
| 		Root: commP,
 | |
| 		Size: pieceSize,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| type lenWriter int64
 | |
| 
 | |
| func (w *lenWriter) Write(p []byte) (n int, err error) {
 | |
| 	*w += lenWriter(len(p))
 | |
| 	return len(p), nil
 | |
| }
 | |
| 
 | |
| func (a *API) ClientDealSize(ctx context.Context, root cid.Cid) (api.DataSize, error) {
 | |
| 	dag := merkledag.NewDAGService(blockservice.New(a.CombinedBstore, offline.Exchange(a.CombinedBstore)))
 | |
| 
 | |
| 	w := lenWriter(0)
 | |
| 
 | |
| 	err := car.WriteCar(ctx, dag, []cid.Cid{root}, &w)
 | |
| 	if err != nil {
 | |
| 		return api.DataSize{}, err
 | |
| 	}
 | |
| 
 | |
| 	up := padreader.PaddedSize(uint64(w))
 | |
| 
 | |
| 	return api.DataSize{
 | |
| 		PayloadSize: int64(w),
 | |
| 		PieceSize:   up.Padded(),
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (a *API) ClientDealPieceCID(ctx context.Context, root cid.Cid) (api.DataCIDSize, error) {
 | |
| 	dag := merkledag.NewDAGService(blockservice.New(a.CombinedBstore, offline.Exchange(a.CombinedBstore)))
 | |
| 
 | |
| 	w := &writer.Writer{}
 | |
| 	bw := bufio.NewWriterSize(w, int(writer.CommPBuf))
 | |
| 
 | |
| 	err := car.WriteCar(ctx, dag, []cid.Cid{root}, w)
 | |
| 	if err != nil {
 | |
| 		return api.DataCIDSize{}, err
 | |
| 	}
 | |
| 
 | |
| 	if err := bw.Flush(); err != nil {
 | |
| 		return api.DataCIDSize{}, err
 | |
| 	}
 | |
| 
 | |
| 	dataCIDSize, err := w.Sum()
 | |
| 	return api.DataCIDSize(dataCIDSize), err
 | |
| }
 | |
| 
 | |
| func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath string) error {
 | |
| 	id, st, err := a.imgr().NewStore()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if err := a.imgr().AddLabel(id, "source", "gen-car"); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	bufferedDS := ipld.NewBufferedDAG(ctx, st.DAG)
 | |
| 	c, err := a.clientImport(ctx, ref, st)
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// TODO: does that defer mean to remove the whole blockstore?
 | |
| 	defer bufferedDS.Remove(ctx, c) //nolint:errcheck
 | |
| 	ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
 | |
| 
 | |
| 	// entire DAG selector
 | |
| 	allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(),
 | |
| 		ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()
 | |
| 
 | |
| 	f, err := os.Create(outputPath)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	sc := car.NewSelectiveCar(ctx, st.Bstore, []car.Dag{{Root: c, Selector: allSelector}})
 | |
| 	if err = sc.Write(f); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return f.Close()
 | |
| }
 | |
| 
 | |
| func (a *API) clientImport(ctx context.Context, ref api.FileRef, store *multistore.Store) (cid.Cid, error) {
 | |
| 	f, err := os.Open(ref.Path)
 | |
| 	if err != nil {
 | |
| 		return cid.Undef, err
 | |
| 	}
 | |
| 	defer f.Close() //nolint:errcheck
 | |
| 
 | |
| 	stat, err := f.Stat()
 | |
| 	if err != nil {
 | |
| 		return cid.Undef, err
 | |
| 	}
 | |
| 
 | |
| 	file, err := files.NewReaderPathFile(ref.Path, f, stat)
 | |
| 	if err != nil {
 | |
| 		return cid.Undef, err
 | |
| 	}
 | |
| 
 | |
| 	if ref.IsCAR {
 | |
| 		var st car.Store
 | |
| 		if store.Fstore == nil {
 | |
| 			st = store.Bstore
 | |
| 		} else {
 | |
| 			st = store.Fstore
 | |
| 		}
 | |
| 		result, err := car.LoadCar(st, file)
 | |
| 		if err != nil {
 | |
| 			return cid.Undef, err
 | |
| 		}
 | |
| 
 | |
| 		if len(result.Roots) != 1 {
 | |
| 			return cid.Undef, xerrors.New("cannot import car with more than one root")
 | |
| 		}
 | |
| 
 | |
| 		return result.Roots[0], nil
 | |
| 	}
 | |
| 
 | |
| 	bufDs := ipld.NewBufferedDAG(ctx, store.DAG)
 | |
| 
 | |
| 	prefix, err := merkledag.PrefixForCidVersion(1)
 | |
| 	if err != nil {
 | |
| 		return cid.Undef, err
 | |
| 	}
 | |
| 	prefix.MhType = DefaultHashFunction
 | |
| 
 | |
| 	params := ihelper.DagBuilderParams{
 | |
| 		Maxlinks:  build.UnixfsLinksPerLevel,
 | |
| 		RawLeaves: true,
 | |
| 		CidBuilder: cidutil.InlineBuilder{
 | |
| 			Builder: prefix,
 | |
| 			Limit:   126,
 | |
| 		},
 | |
| 		Dagserv: bufDs,
 | |
| 		NoCopy:  true,
 | |
| 	}
 | |
| 
 | |
| 	db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize)))
 | |
| 	if err != nil {
 | |
| 		return cid.Undef, err
 | |
| 	}
 | |
| 	nd, err := balanced.Layout(db)
 | |
| 	if err != nil {
 | |
| 		return cid.Undef, err
 | |
| 	}
 | |
| 
 | |
| 	if err := bufDs.Commit(); err != nil {
 | |
| 		return cid.Undef, err
 | |
| 	}
 | |
| 
 | |
| 	return nd.Cid(), nil
 | |
| }
 | |
| 
 | |
| func (a *API) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) {
 | |
| 	inProgressChannels, err := a.DataTransfer.InProgressChannels(ctx)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	apiChannels := make([]api.DataTransferChannel, 0, len(inProgressChannels))
 | |
| 	for _, channelState := range inProgressChannels {
 | |
| 		apiChannels = append(apiChannels, api.NewDataTransferChannel(a.Host.ID(), channelState))
 | |
| 	}
 | |
| 
 | |
| 	return apiChannels, nil
 | |
| }
 | |
| 
 | |
| func (a *API) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) {
 | |
| 	channels := make(chan api.DataTransferChannel)
 | |
| 
 | |
| 	unsub := a.DataTransfer.SubscribeToEvents(func(evt datatransfer.Event, channelState datatransfer.ChannelState) {
 | |
| 		channel := api.NewDataTransferChannel(a.Host.ID(), channelState)
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 		case channels <- channel:
 | |
| 		}
 | |
| 	})
 | |
| 
 | |
| 	go func() {
 | |
| 		defer unsub()
 | |
| 		<-ctx.Done()
 | |
| 	}()
 | |
| 
 | |
| 	return channels, nil
 | |
| }
 | |
| 
 | |
| func (a *API) ClientRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
 | |
| 	selfPeer := a.Host.ID()
 | |
| 	if isInitiator {
 | |
| 		return a.DataTransfer.RestartDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: selfPeer, Responder: otherPeer, ID: transferID})
 | |
| 	}
 | |
| 	return a.DataTransfer.RestartDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: otherPeer, Responder: selfPeer, ID: transferID})
 | |
| }
 | |
| 
 | |
| func (a *API) ClientCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
 | |
| 	selfPeer := a.Host.ID()
 | |
| 	if isInitiator {
 | |
| 		return a.DataTransfer.CloseDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: selfPeer, Responder: otherPeer, ID: transferID})
 | |
| 	}
 | |
| 	return a.DataTransfer.CloseDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: otherPeer, Responder: selfPeer, ID: transferID})
 | |
| }
 | |
| 
 | |
| func (a *API) ClientRetrieveTryRestartInsufficientFunds(ctx context.Context, paymentChannel address.Address) error {
 | |
| 	return a.Retrieval.TryRestartInsufficientFunds(paymentChannel)
 | |
| }
 | |
| 
 | |
| func (a *API) ClientGetDealStatus(ctx context.Context, statusCode uint64) (string, error) {
 | |
| 	ststr, ok := storagemarket.DealStates[statusCode]
 | |
| 	if !ok {
 | |
| 		return "", fmt.Errorf("no such deal state %d", statusCode)
 | |
| 	}
 | |
| 
 | |
| 	return ststr, nil
 | |
| }
 |