2019-09-16 13:46:05 +00:00
package client
2019-08-20 16:48:33 +00:00
import (
2020-10-22 11:59:08 +00:00
"bufio"
2019-08-20 16:48:33 +00:00
"context"
2020-06-16 20:52:47 +00:00
"fmt"
2019-10-23 09:18:22 +00:00
"io"
2019-08-20 16:48:33 +00:00
"os"
2021-05-11 02:26:04 +00:00
"time"
2019-08-20 16:48:33 +00:00
2021-01-16 06:42:56 +00:00
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
2019-11-05 03:42:13 +00:00
"golang.org/x/xerrors"
2020-10-26 14:16:10 +00:00
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/dline"
2019-09-06 22:39:47 +00:00
"github.com/ipfs/go-blockservice"
2019-08-20 16:48:33 +00:00
"github.com/ipfs/go-cid"
2020-07-07 09:12:32 +00:00
"github.com/ipfs/go-cidutil"
2019-08-20 16:48:33 +00:00
chunker "github.com/ipfs/go-ipfs-chunker"
2019-09-06 22:39:47 +00:00
offline "github.com/ipfs/go-ipfs-exchange-offline"
2019-08-20 16:48:33 +00:00
files "github.com/ipfs/go-ipfs-files"
ipld "github.com/ipfs/go-ipld-format"
2019-09-06 22:39:47 +00:00
"github.com/ipfs/go-merkledag"
2019-12-10 04:19:59 +00:00
unixfile "github.com/ipfs/go-unixfs/file"
2019-08-20 16:48:33 +00:00
"github.com/ipfs/go-unixfs/importer/balanced"
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
2020-05-05 01:31:56 +00:00
"github.com/ipld/go-car"
2020-07-07 08:52:19 +00:00
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
2020-08-18 23:26:21 +00:00
"github.com/libp2p/go-libp2p-core/host"
2019-08-20 16:48:33 +00:00
"github.com/libp2p/go-libp2p-core/peer"
2021-04-05 11:11:10 +00:00
"github.com/multiformats/go-multibase"
2020-07-07 09:12:32 +00:00
mh "github.com/multiformats/go-multihash"
2019-08-20 16:48:33 +00:00
"go.uber.org/fx"
2019-09-06 22:39:47 +00:00
2019-12-19 20:13:17 +00:00
"github.com/filecoin-project/go-address"
2021-04-05 11:11:10 +00:00
cborutil "github.com/filecoin-project/go-cbor-util"
2020-11-20 00:28:18 +00:00
"github.com/filecoin-project/go-commp-utils/ffiwrapper"
"github.com/filecoin-project/go-commp-utils/writer"
2020-10-13 10:37:00 +00:00
datatransfer "github.com/filecoin-project/go-data-transfer"
2020-09-29 11:53:30 +00:00
"github.com/filecoin-project/go-fil-markets/discovery"
2020-08-05 22:35:59 +00:00
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
2020-06-23 19:22:33 +00:00
rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
2020-07-23 21:01:34 +00:00
"github.com/filecoin-project/go-fil-markets/shared"
2019-12-17 10:46:39 +00:00
"github.com/filecoin-project/go-fil-markets/storagemarket"
2021-04-05 11:11:10 +00:00
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
2020-07-24 21:47:22 +00:00
"github.com/filecoin-project/go-multistore"
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/abi"
2021-04-05 11:11:10 +00:00
"github.com/filecoin-project/specs-actors/v3/actors/builtin/market"
2020-02-13 00:15:33 +00:00
2020-08-11 20:04:00 +00:00
marketevents "github.com/filecoin-project/lotus/markets/loggers"
2019-10-18 04:47:41 +00:00
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
2020-01-10 18:21:46 +00:00
"github.com/filecoin-project/lotus/markets/utils"
2019-10-18 04:47:41 +00:00
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/impl/paych"
2020-07-07 08:52:19 +00:00
"github.com/filecoin-project/lotus/node/modules/dtypes"
2020-07-06 23:39:30 +00:00
"github.com/filecoin-project/lotus/node/repo/importmgr"
2021-03-30 19:28:54 +00:00
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
2019-08-20 16:48:33 +00:00
)
2020-07-07 09:12:32 +00:00
var DefaultHashFunction = uint64 ( mh . BLAKE2B_MIN + 31 )
2020-07-07 09:38:22 +00:00
2020-10-02 17:31:21 +00:00
const dealStartBufferHours uint64 = 49
2020-02-12 22:32:26 +00:00
2019-09-16 13:46:05 +00:00
type API struct {
2019-08-20 16:48:33 +00:00
fx . In
2019-09-16 13:46:05 +00:00
full . ChainAPI
full . WalletAPI
paych . PaychAPI
2020-10-23 14:51:27 +00:00
full . StateAPI
2019-08-20 16:48:33 +00:00
2019-11-04 19:57:54 +00:00
SMDealClient storagemarket . StorageClient
2020-09-29 11:53:30 +00:00
RetDiscovery discovery . PeerResolver
2020-06-23 19:22:33 +00:00
Retrieval rm . RetrievalClient
2019-09-06 22:39:47 +00:00
Chain * store . ChainStore
2019-08-20 16:48:33 +00:00
2020-07-07 08:52:19 +00:00
Imports dtypes . ClientImportMgr
2021-03-30 19:28:54 +00:00
Mds dtypes . ClientMultiDstore
2020-07-07 12:35:02 +00:00
2020-08-01 07:40:25 +00:00
CombinedBstore dtypes . ClientBlockstore // TODO: try to remove
2020-07-31 20:24:50 +00:00
RetrievalStoreMgr dtypes . ClientRetrievalStoreManager
2020-08-18 23:26:21 +00:00
DataTransfer dtypes . ClientDataTransfer
Host host . Host
2019-08-20 16:48:33 +00:00
}
2020-09-10 06:30:47 +00:00
func calcDealExpiration ( minDuration uint64 , md * dline . Info , startEpoch abi . ChainEpoch ) abi . ChainEpoch {
2020-04-21 21:38:26 +00:00
// Make sure we give some time for the miner to seal
2020-04-30 17:42:16 +00:00
minExp := startEpoch + abi . ChainEpoch ( minDuration )
2020-04-21 21:38:26 +00:00
// Align on miners ProvingPeriodBoundary
2020-09-30 20:17:56 +00:00
return minExp + md . WPoStProvingPeriod - ( minExp % md . WPoStProvingPeriod ) + ( md . PeriodStart % md . WPoStProvingPeriod ) - 1
2020-04-21 21:38:26 +00:00
}
2020-07-07 08:52:19 +00:00
func ( a * API ) imgr ( ) * importmgr . Mgr {
return a . Imports
}
2020-03-03 00:36:01 +00:00
func ( a * API ) ClientStartDeal ( ctx context . Context , params * api . StartDealParams ) ( * cid . Cid , error ) {
2021-04-05 11:11:10 +00:00
return a . dealStarter ( ctx , params , false )
}
func ( a * API ) ClientStatelessDeal ( ctx context . Context , params * api . StartDealParams ) ( * cid . Cid , error ) {
return a . dealStarter ( ctx , params , true )
}
func ( a * API ) dealStarter ( ctx context . Context , params * api . StartDealParams , isStateless bool ) ( * cid . Cid , error ) {
2020-07-28 06:13:10 +00:00
var storeID * multistore . StoreID
2021-04-05 11:11:10 +00:00
if isStateless {
if params . Data . TransferType != storagemarket . TTManual {
return nil , xerrors . Errorf ( "invalid transfer type %s for stateless storage deal" , params . Data . TransferType )
}
if ! params . EpochPrice . IsZero ( ) {
return nil , xerrors . New ( "stateless storage deals can only be initiated with storage price of 0" )
}
} else if params . Data . TransferType == storagemarket . TTGraphsync {
2020-07-28 06:13:10 +00:00
importIDs := a . imgr ( ) . List ( )
for _ , importID := range importIDs {
info , err := a . imgr ( ) . Info ( importID )
if err != nil {
continue
}
if info . Labels [ importmgr . LRootCid ] == "" {
continue
}
c , err := cid . Parse ( info . Labels [ importmgr . LRootCid ] )
if err != nil {
continue
}
if c . Equals ( params . Data . Root ) {
2020-08-20 04:49:10 +00:00
storeID = & importID //nolint
2020-07-28 06:13:10 +00:00
break
}
}
}
2020-10-01 08:30:34 +00:00
2020-10-23 14:51:27 +00:00
walletKey , err := a . StateAccountKey ( ctx , params . Wallet , types . EmptyTSK )
2020-10-01 08:30:34 +00:00
if err != nil {
return nil , xerrors . Errorf ( "failed resolving params.Wallet addr: %w" , params . Wallet )
}
exist , err := a . WalletHas ( ctx , walletKey )
2019-08-20 16:48:33 +00:00
if err != nil {
2020-03-03 00:36:01 +00:00
return nil , xerrors . Errorf ( "failed getting addr from wallet: %w" , params . Wallet )
2019-12-13 19:15:56 +00:00
}
if ! exist {
return nil , xerrors . Errorf ( "provided address doesn't exist in wallet" )
2019-08-20 16:48:33 +00:00
}
2020-04-16 17:36:36 +00:00
mi , err := a . StateMinerInfo ( ctx , params . Miner , types . EmptyTSK )
2019-08-20 16:48:33 +00:00
if err != nil {
2019-11-06 06:26:50 +00:00
return nil , xerrors . Errorf ( "failed getting peer ID: %w" , err )
}
2020-04-24 17:12:30 +00:00
md , err := a . StateMinerProvingDeadline ( ctx , params . Miner , types . EmptyTSK )
if err != nil {
2020-07-07 22:29:45 +00:00
return nil , xerrors . Errorf ( "failed getting miner's deadline info: %w" , err )
2020-04-24 17:12:30 +00:00
}
2020-04-17 16:38:20 +00:00
if uint64 ( params . Data . PieceSize . Padded ( ) ) > uint64 ( mi . SectorSize ) {
2020-04-07 18:33:12 +00:00
return nil , xerrors . New ( "data doesn't fit in a sector" )
}
2020-04-30 17:42:16 +00:00
dealStart := params . DealStartEpoch
if dealStart <= 0 { // unset, or explicitly 'epoch undefined'
ts , err := a . ChainHead ( ctx )
if err != nil {
return nil , xerrors . Errorf ( "failed getting chain height: %w" , err )
}
2020-07-29 01:18:21 +00:00
blocksPerHour := 60 * 60 / build . BlockDelaySecs
2020-10-02 17:31:21 +00:00
dealStart = ts . Height ( ) + abi . ChainEpoch ( dealStartBufferHours * blocksPerHour ) // TODO: Get this from storage ask
2020-02-12 22:32:26 +00:00
}
2020-03-04 02:19:28 +00:00
2021-01-20 02:06:00 +00:00
networkVersion , err := a . StateNetworkVersion ( ctx , types . EmptyTSK )
if err != nil {
return nil , xerrors . Errorf ( "failed to get network version: %w" , err )
}
st , err := miner . PreferredSealProofTypeFromWindowPoStType ( networkVersion , mi . WindowPoStProofType )
2021-01-16 06:42:56 +00:00
if err != nil {
return nil , xerrors . Errorf ( "failed to get seal proof type: %w" , err )
}
2021-04-05 11:11:10 +00:00
// regular flow
if ! isStateless {
providerInfo := utils . NewStorageProviderInfo ( params . Miner , mi . Worker , mi . SectorSize , * mi . PeerId , mi . Multiaddrs )
result , err := a . SMDealClient . ProposeStorageDeal ( ctx , storagemarket . ProposeStorageDealParams {
Addr : params . Wallet ,
Info : & providerInfo ,
Data : params . Data ,
StartEpoch : dealStart ,
EndEpoch : calcDealExpiration ( params . MinBlocksDuration , md , dealStart ) ,
Price : params . EpochPrice ,
Collateral : params . ProviderCollateral ,
Rt : st ,
FastRetrieval : params . FastRetrieval ,
VerifiedDeal : params . VerifiedDeal ,
StoreID : storeID ,
} )
if err != nil {
return nil , xerrors . Errorf ( "failed to start deal: %w" , err )
}
2019-08-20 16:48:33 +00:00
2021-04-05 11:11:10 +00:00
return & result . ProposalCid , nil
}
//
// stateless flow from here to the end
//
dealProposal := & market . DealProposal {
PieceCID : * params . Data . PieceCid ,
PieceSize : params . Data . PieceSize . Padded ( ) ,
Client : walletKey ,
Provider : params . Miner ,
Label : params . Data . Root . Encode ( multibase . MustNewEncoder ( 'u' ) ) ,
StartEpoch : dealStart ,
EndEpoch : calcDealExpiration ( params . MinBlocksDuration , md , dealStart ) ,
StoragePricePerEpoch : big . Zero ( ) ,
ProviderCollateral : params . ProviderCollateral ,
ClientCollateral : big . Zero ( ) ,
VerifiedDeal : params . VerifiedDeal ,
}
if dealProposal . ProviderCollateral . IsZero ( ) {
networkCollateral , err := a . StateDealProviderCollateralBounds ( ctx , params . Data . PieceSize . Padded ( ) , params . VerifiedDeal , types . EmptyTSK )
if err != nil {
return nil , xerrors . Errorf ( "failed to determine minimum provider collateral: %w" , err )
}
dealProposal . ProviderCollateral = networkCollateral . Min
}
dealProposalSerialized , err := cborutil . Dump ( dealProposal )
2019-11-06 06:26:50 +00:00
if err != nil {
2021-04-05 11:11:10 +00:00
return nil , xerrors . Errorf ( "failed to serialize deal proposal: %w" , err )
}
dealProposalSig , err := a . WalletSign ( ctx , walletKey , dealProposalSerialized )
if err != nil {
return nil , xerrors . Errorf ( "failed to sign proposal : %w" , err )
}
dealProposalSigned := & market . ClientDealProposal {
Proposal : * dealProposal ,
ClientSignature : * dealProposalSig ,
}
dStream , err := network . NewFromLibp2pHost ( a . Host ,
2021-05-11 02:26:04 +00:00
// params duplicated from .../node/modules/client.go
// https://github.com/filecoin-project/lotus/pull/5961#discussion_r629768011
network . RetryParameters ( time . Second , 5 * time . Minute , 15 , 5 ) ,
2021-04-05 11:11:10 +00:00
) . NewDealStream ( ctx , * mi . PeerId )
if err != nil {
return nil , xerrors . Errorf ( "opening dealstream to %s/%s failed: %w" , params . Miner , * mi . PeerId , err )
}
if err = dStream . WriteDealProposal ( network . Proposal {
FastRetrieval : true ,
DealProposal : dealProposalSigned ,
Piece : & storagemarket . DataRef {
TransferType : storagemarket . TTManual ,
Root : params . Data . Root ,
PieceCid : params . Data . PieceCid ,
PieceSize : params . Data . PieceSize ,
} ,
} ) ; err != nil {
return nil , xerrors . Errorf ( "sending deal proposal failed: %w" , err )
}
resp , _ , err := dStream . ReadDealResponse ( )
if err != nil {
return nil , xerrors . Errorf ( "reading proposal response failed: %w" , err )
}
dealProposalIpld , err := cborutil . AsIpld ( dealProposalSigned )
if err != nil {
return nil , xerrors . Errorf ( "serializing proposal node failed: %w" , err )
}
if ! dealProposalIpld . Cid ( ) . Equals ( resp . Response . Proposal ) {
return nil , xerrors . Errorf ( "provider returned proposal cid %s but we expected %s" , resp . Response . Proposal , dealProposalIpld . Cid ( ) )
}
if resp . Response . State != storagemarket . StorageDealWaitingForData {
return nil , xerrors . Errorf ( "provider returned unexpected state %d for proposal %s, with message: %s" , resp . Response . State , resp . Response . Proposal , resp . Response . Message )
2019-11-06 06:26:50 +00:00
}
2021-04-05 11:11:10 +00:00
return & resp . Response . Proposal , nil
2019-08-20 16:48:33 +00:00
}
2019-09-16 13:46:05 +00:00
func ( a * API ) ClientListDeals ( ctx context . Context ) ( [ ] api . DealInfo , error ) {
2020-04-07 02:17:02 +00:00
deals , err := a . SMDealClient . ListLocalDeals ( ctx )
2019-09-10 14:13:24 +00:00
if err != nil {
return nil , err
}
2020-12-08 14:18:47 +00:00
// Get a map of transfer ID => DataTransfer
dataTransfersByID , err := a . transfersByID ( ctx )
if err != nil {
return nil , err
}
2019-09-10 14:13:24 +00:00
out := make ( [ ] api . DealInfo , len ( deals ) )
for k , v := range deals {
2020-12-08 14:18:47 +00:00
// Find the data transfer associated with this deal
var transferCh * api . DataTransferChannel
if v . TransferChannelID != nil {
if ch , ok := dataTransfersByID [ * v . TransferChannelID ] ; ok {
transferCh = & ch
}
2019-09-10 14:13:24 +00:00
}
2020-12-08 14:18:47 +00:00
out [ k ] = a . newDealInfoWithTransfer ( transferCh , v )
2019-09-10 14:13:24 +00:00
}
return out , nil
}
2020-12-08 14:18:47 +00:00
func ( a * API ) transfersByID ( ctx context . Context ) ( map [ datatransfer . ChannelID ] api . DataTransferChannel , error ) {
inProgressChannels , err := a . DataTransfer . InProgressChannels ( ctx )
if err != nil {
return nil , err
}
dataTransfersByID := make ( map [ datatransfer . ChannelID ] api . DataTransferChannel , len ( inProgressChannels ) )
for id , channelState := range inProgressChannels {
ch := api . NewDataTransferChannel ( a . Host . ID ( ) , channelState )
dataTransfersByID [ id ] = ch
}
return dataTransfersByID , nil
}
2019-11-06 19:44:28 +00:00
func ( a * API ) ClientGetDealInfo ( ctx context . Context , d cid . Cid ) ( * api . DealInfo , error ) {
2020-04-07 02:17:02 +00:00
v , err := a . SMDealClient . GetLocalDeal ( ctx , d )
2019-11-06 19:44:28 +00:00
if err != nil {
return nil , err
}
2019-11-04 19:57:54 +00:00
2020-12-08 14:18:47 +00:00
di := a . newDealInfo ( ctx , v )
return & di , nil
2019-11-06 19:44:28 +00:00
}
2020-08-27 18:32:51 +00:00
func ( a * API ) ClientGetDealUpdates ( ctx context . Context ) ( <- chan api . DealInfo , error ) {
updates := make ( chan api . DealInfo )
unsub := a . SMDealClient . SubscribeToEvents ( func ( _ storagemarket . ClientEvent , deal storagemarket . ClientDeal ) {
2020-12-08 14:18:47 +00:00
updates <- a . newDealInfo ( ctx , deal )
2020-08-27 18:32:51 +00:00
} )
go func ( ) {
defer unsub ( )
<- ctx . Done ( )
} ( )
return updates , nil
}
2020-12-08 14:18:47 +00:00
func ( a * API ) newDealInfo ( ctx context . Context , v storagemarket . ClientDeal ) api . DealInfo {
// Find the data transfer associated with this deal
var transferCh * api . DataTransferChannel
if v . TransferChannelID != nil {
state , err := a . DataTransfer . ChannelState ( ctx , * v . TransferChannelID )
// Note: If there was an error just ignore it, as the data transfer may
// be not found if it's no longer active
if err == nil {
ch := api . NewDataTransferChannel ( a . Host . ID ( ) , state )
2021-04-01 13:57:39 +00:00
ch . Stages = state . Stages ( )
2020-12-08 14:18:47 +00:00
transferCh = & ch
}
}
2021-04-01 13:57:39 +00:00
di := a . newDealInfoWithTransfer ( transferCh , v )
di . DealStages = v . DealStages
return di
2020-12-08 14:18:47 +00:00
}
func ( a * API ) newDealInfoWithTransfer ( transferCh * api . DataTransferChannel , v storagemarket . ClientDeal ) api . DealInfo {
return api . DealInfo {
ProposalCid : v . ProposalCid ,
DataRef : v . DataRef ,
State : v . State ,
Message : v . Message ,
Provider : v . Proposal . Provider ,
PieceCID : v . Proposal . PieceCID ,
Size : uint64 ( v . Proposal . PieceSize . Unpadded ( ) ) ,
PricePerEpoch : v . Proposal . StoragePricePerEpoch ,
Duration : uint64 ( v . Proposal . Duration ( ) ) ,
DealID : v . DealID ,
CreationTime : v . CreationTime . Time ( ) ,
Verified : v . Proposal . VerifiedDeal ,
TransferChannelID : v . TransferChannelID ,
DataTransfer : transferCh ,
}
}
2019-09-16 13:46:05 +00:00
func ( a * API ) ClientHasLocal ( ctx context . Context , root cid . Cid ) ( bool , error ) {
2019-08-26 13:45:36 +00:00
// TODO: check if we have the ENTIRE dag
2020-07-07 08:52:19 +00:00
offExch := merkledag . NewDAGService ( blockservice . New ( a . Imports . Blockstore , offline . Exchange ( a . Imports . Blockstore ) ) )
2019-08-26 13:45:36 +00:00
_ , err := offExch . Get ( ctx , root )
if err == ipld . ErrNotFound {
return false , nil
}
if err != nil {
return false , err
}
return true , nil
}
2020-07-09 16:29:57 +00:00
func ( a * API ) ClientFindData ( ctx context . Context , root cid . Cid , piece * cid . Cid ) ( [ ] api . QueryOffer , error ) {
2019-08-26 13:45:36 +00:00
peers , err := a . RetDiscovery . GetPeers ( root )
if err != nil {
return nil , err
}
2020-07-09 16:29:57 +00:00
out := make ( [ ] api . QueryOffer , 0 , len ( peers ) )
for _ , p := range peers {
if piece != nil && ! piece . Equals ( * p . PieceCID ) {
continue
}
out = append ( out , a . makeRetrievalQuery ( ctx , p , root , piece , rm . QueryParams { } ) )
2019-08-26 13:45:36 +00:00
}
return out , nil
}
2020-07-09 20:02:12 +00:00
func ( a * API ) ClientMinerQueryOffer ( ctx context . Context , miner address . Address , root cid . Cid , piece * cid . Cid ) ( api . QueryOffer , error ) {
2020-06-16 15:22:44 +00:00
mi , err := a . StateMinerInfo ( ctx , miner , types . EmptyTSK )
if err != nil {
return api . QueryOffer { } , err
}
2020-06-23 19:22:33 +00:00
rp := rm . RetrievalPeer {
2020-06-16 14:14:49 +00:00
Address : miner ,
2020-08-18 18:17:06 +00:00
ID : * mi . PeerId ,
2020-06-16 14:14:49 +00:00
}
2020-07-09 20:02:12 +00:00
return a . makeRetrievalQuery ( ctx , rp , root , piece , rm . QueryParams { } ) , nil
2020-06-16 14:14:49 +00:00
}
2020-07-09 16:29:57 +00:00
func ( a * API ) makeRetrievalQuery ( ctx context . Context , rp rm . RetrievalPeer , payload cid . Cid , piece * cid . Cid , qp rm . QueryParams ) api . QueryOffer {
2020-06-16 14:14:49 +00:00
queryResponse , err := a . Retrieval . Query ( ctx , rp , payload , qp )
if err != nil {
2020-08-05 22:35:59 +00:00
return api . QueryOffer { Err : err . Error ( ) , Miner : rp . Address , MinerPeer : rp }
2020-06-16 14:14:49 +00:00
}
2020-06-16 20:32:03 +00:00
var errStr string
switch queryResponse . Status {
2020-06-23 19:22:33 +00:00
case rm . QueryResponseAvailable :
2020-06-16 20:32:03 +00:00
errStr = ""
2020-06-23 19:22:33 +00:00
case rm . QueryResponseUnavailable :
2020-06-16 20:52:47 +00:00
errStr = fmt . Sprintf ( "retrieval query offer was unavailable: %s" , queryResponse . Message )
2020-06-23 19:22:33 +00:00
case rm . QueryResponseError :
2020-06-16 20:52:47 +00:00
errStr = fmt . Sprintf ( "retrieval query offer errored: %s" , queryResponse . Message )
2020-06-16 20:32:03 +00:00
}
2020-06-16 14:14:49 +00:00
return api . QueryOffer {
Root : payload ,
2020-07-09 16:29:57 +00:00
Piece : piece ,
2020-06-16 14:14:49 +00:00
Size : queryResponse . Size ,
MinPrice : queryResponse . PieceRetrievalPrice ( ) ,
2020-07-23 21:01:34 +00:00
UnsealPrice : queryResponse . UnsealPrice ,
2020-06-16 14:14:49 +00:00
PaymentInterval : queryResponse . MaxPaymentInterval ,
PaymentIntervalIncrease : queryResponse . MaxPaymentIntervalIncrease ,
Miner : queryResponse . PaymentAddress , // TODO: check
2020-08-05 22:35:59 +00:00
MinerPeer : rp ,
2020-06-16 20:32:03 +00:00
Err : errStr ,
2020-06-16 14:14:49 +00:00
}
}
2020-07-07 11:45:02 +00:00
func ( a * API ) ClientImport ( ctx context . Context , ref api . FileRef ) ( * api . ImportRes , error ) {
2020-07-07 08:52:19 +00:00
id , st , err := a . imgr ( ) . NewStore ( )
2020-07-06 23:39:30 +00:00
if err != nil {
2020-07-07 11:45:02 +00:00
return nil , err
2020-07-06 23:39:30 +00:00
}
2020-07-07 09:38:09 +00:00
if err := a . imgr ( ) . AddLabel ( id , importmgr . LSource , "import" ) ; err != nil {
2020-07-07 11:45:02 +00:00
return nil , err
2020-07-06 23:39:30 +00:00
}
2020-03-03 04:13:08 +00:00
2020-07-07 09:38:09 +00:00
if err := a . imgr ( ) . AddLabel ( id , importmgr . LFileName , ref . Path ) ; err != nil {
2020-07-07 11:45:02 +00:00
return nil , err
2020-07-07 09:38:09 +00:00
}
2019-08-20 16:48:33 +00:00
2020-07-07 09:38:09 +00:00
nd , err := a . clientImport ( ctx , ref , st )
2019-08-20 16:48:33 +00:00
if err != nil {
2020-07-07 11:45:02 +00:00
return nil , err
2019-08-20 16:48:33 +00:00
}
2020-07-07 09:38:09 +00:00
if err := a . imgr ( ) . AddLabel ( id , importmgr . LRootCid , nd . String ( ) ) ; err != nil {
2020-07-07 11:45:02 +00:00
return nil , err
2020-07-07 09:38:09 +00:00
}
2020-07-07 11:45:02 +00:00
return & api . ImportRes {
Root : nd ,
ImportID : id ,
} , nil
}
2020-07-28 06:13:10 +00:00
func ( a * API ) ClientRemoveImport ( ctx context . Context , importID multistore . StoreID ) error {
2020-07-07 11:45:02 +00:00
return a . imgr ( ) . Remove ( importID )
2019-08-20 16:48:33 +00:00
}
2019-10-23 09:18:22 +00:00
func ( a * API ) ClientImportLocal ( ctx context . Context , f io . Reader ) ( cid . Cid , error ) {
file := files . NewReaderFile ( f )
2020-07-07 08:52:19 +00:00
id , st , err := a . imgr ( ) . NewStore ( )
2020-07-06 23:39:30 +00:00
if err != nil {
2020-07-07 09:12:32 +00:00
return cid . Undef , err
2020-07-06 23:39:30 +00:00
}
2020-07-07 08:52:19 +00:00
if err := a . imgr ( ) . AddLabel ( id , "source" , "import-local" ) ; err != nil {
2020-07-06 23:39:30 +00:00
return cid . Cid { } , err
}
bufferedDS := ipld . NewBufferedDAG ( ctx , st . DAG )
2019-10-23 09:18:22 +00:00
2020-07-07 09:12:32 +00:00
prefix , err := merkledag . PrefixForCidVersion ( 1 )
if err != nil {
return cid . Undef , err
}
prefix . MhType = DefaultHashFunction
2019-10-23 09:18:22 +00:00
params := ihelper . DagBuilderParams {
2020-07-07 09:38:22 +00:00
Maxlinks : build . UnixfsLinksPerLevel ,
RawLeaves : true ,
2020-07-07 09:12:32 +00:00
CidBuilder : cidutil . InlineBuilder {
Builder : prefix ,
Limit : 126 ,
} ,
2020-07-07 09:38:22 +00:00
Dagserv : bufferedDS ,
2019-10-23 09:18:22 +00:00
}
db , err := params . New ( chunker . NewSizeSplitter ( file , int64 ( build . UnixfsChunkSize ) ) )
if err != nil {
return cid . Undef , err
}
nd , err := balanced . Layout ( db )
if err != nil {
return cid . Undef , err
}
2020-07-28 06:13:10 +00:00
if err := a . imgr ( ) . AddLabel ( id , "root" , nd . Cid ( ) . String ( ) ) ; err != nil {
return cid . Cid { } , err
}
2019-10-23 09:18:22 +00:00
return nd . Cid ( ) , bufferedDS . Commit ( )
}
2019-09-16 13:46:05 +00:00
func ( a * API ) ClientListImports ( ctx context . Context ) ( [ ] api . Import , error ) {
2020-07-07 08:52:19 +00:00
importIDs := a . imgr ( ) . List ( )
2019-08-20 16:48:33 +00:00
2020-07-07 08:52:19 +00:00
out := make ( [ ] api . Import , len ( importIDs ) )
for i , id := range importIDs {
info , err := a . imgr ( ) . Info ( id )
if err != nil {
out [ i ] = api . Import {
Key : id ,
2020-07-07 09:12:32 +00:00
Err : xerrors . Errorf ( "getting info: %w" , err ) . Error ( ) ,
2020-07-07 08:52:19 +00:00
}
continue
}
2019-08-20 16:48:33 +00:00
2020-07-07 08:52:19 +00:00
ai := api . Import {
Key : id ,
Source : info . Labels [ importmgr . LSource ] ,
FilePath : info . Labels [ importmgr . LFileName ] ,
2019-08-20 16:48:33 +00:00
}
2020-07-07 08:52:19 +00:00
if info . Labels [ importmgr . LRootCid ] != "" {
c , err := cid . Parse ( info . Labels [ importmgr . LRootCid ] )
if err != nil {
2020-07-07 09:12:32 +00:00
ai . Err = err . Error ( )
2020-07-07 08:52:19 +00:00
} else {
ai . Root = & c
2020-03-04 02:31:35 +00:00
}
}
2020-07-07 08:52:19 +00:00
out [ i ] = ai
2019-08-20 16:48:33 +00:00
}
2020-07-07 08:52:19 +00:00
return out , nil
2019-08-20 16:48:33 +00:00
}
2019-08-27 18:45:21 +00:00
2021-03-26 16:37:46 +00:00
func ( a * API ) ClientCancelRetrievalDeal ( ctx context . Context , dealID retrievalmarket . DealID ) error {
2021-03-24 12:36:21 +00:00
cerr := make ( chan error )
go func ( ) {
2021-03-26 16:37:46 +00:00
err := a . Retrieval . CancelDeal ( dealID )
2021-03-24 12:36:21 +00:00
select {
case cerr <- err :
case <- ctx . Done ( ) :
}
} ( )
select {
case err := <- cerr :
if err != nil {
2021-03-26 08:50:52 +00:00
return xerrors . Errorf ( "failed to cancel retrieval deal: %w" , err )
2021-03-24 12:36:21 +00:00
}
return nil
case <- ctx . Done ( ) :
2021-03-26 08:50:52 +00:00
return xerrors . Errorf ( "context timeout while canceling retrieval deal: %w" , ctx . Err ( ) )
2021-03-24 12:36:21 +00:00
}
}
2020-08-18 09:49:56 +00:00
func ( a * API ) ClientRetrieve ( ctx context . Context , order api . RetrievalOrder , ref * api . FileRef ) error {
events := make ( chan marketevents . RetrievalEvent )
go a . clientRetrieve ( ctx , order , ref , events )
2020-08-18 13:27:56 +00:00
for {
select {
case evt , ok := <- events :
if ! ok { // done successfully
return nil
}
if evt . Err != "" {
return xerrors . Errorf ( "retrieval failed: %s" , evt . Err )
}
case <- ctx . Done ( ) :
return xerrors . Errorf ( "retrieval timed out" )
}
}
2020-08-18 09:49:56 +00:00
}
func ( a * API ) ClientRetrieveWithEvents ( ctx context . Context , order api . RetrievalOrder , ref * api . FileRef ) ( <- chan marketevents . RetrievalEvent , error ) {
2020-08-11 20:04:00 +00:00
events := make ( chan marketevents . RetrievalEvent )
go a . clientRetrieve ( ctx , order , ref , events )
return events , nil
}
2020-08-26 17:48:23 +00:00
type retrievalSubscribeEvent struct {
event rm . ClientEvent
state rm . ClientDealState
}
2020-10-09 22:35:44 +00:00
func readSubscribeEvents ( ctx context . Context , dealID retrievalmarket . DealID , subscribeEvents chan retrievalSubscribeEvent , events chan marketevents . RetrievalEvent ) error {
2020-08-26 17:48:23 +00:00
for {
var subscribeEvent retrievalSubscribeEvent
select {
case <- ctx . Done ( ) :
return xerrors . New ( "Retrieval Timed Out" )
case subscribeEvent = <- subscribeEvents :
2020-10-09 22:35:44 +00:00
if subscribeEvent . state . ID != dealID {
// we can't check the deal ID ahead of time because:
// 1. We need to subscribe before retrieving.
// 2. We won't know the deal ID until after retrieving.
continue
}
2020-08-26 17:48:23 +00:00
}
select {
case <- ctx . Done ( ) :
return xerrors . New ( "Retrieval Timed Out" )
case events <- marketevents . RetrievalEvent {
Event : subscribeEvent . event ,
Status : subscribeEvent . state . Status ,
BytesReceived : subscribeEvent . state . TotalReceived ,
FundsSpent : subscribeEvent . state . FundsSpent ,
} :
}
state := subscribeEvent . state
switch state . Status {
case rm . DealStatusCompleted :
return nil
case rm . DealStatusRejected :
return xerrors . Errorf ( "Retrieval Proposal Rejected: %s" , state . Message )
case
rm . DealStatusDealNotFound ,
rm . DealStatusErrored :
return xerrors . Errorf ( "Retrieval Error: %s" , state . Message )
}
}
}
2020-08-11 20:04:00 +00:00
func ( a * API ) clientRetrieve ( ctx context . Context , order api . RetrievalOrder , ref * api . FileRef , events chan marketevents . RetrievalEvent ) {
defer close ( events )
finish := func ( e error ) {
2020-08-11 23:49:11 +00:00
if e != nil {
2020-08-17 21:38:50 +00:00
events <- marketevents . RetrievalEvent { Err : e . Error ( ) , FundsSpent : big . Zero ( ) }
2020-08-11 23:49:11 +00:00
}
2020-08-11 20:04:00 +00:00
}
2021-03-30 19:28:54 +00:00
var store retrievalstoremgr . RetrievalStore
2019-09-16 20:11:17 +00:00
2021-03-30 19:28:54 +00:00
if order . LocalStore == nil {
if order . MinerPeer == nil || order . MinerPeer . ID == "" {
mi , err := a . StateMinerInfo ( ctx , order . Miner , types . EmptyTSK )
if err != nil {
finish ( err )
return
}
2019-09-16 20:11:17 +00:00
2021-03-30 19:28:54 +00:00
order . MinerPeer = & retrievalmarket . RetrievalPeer {
ID : * mi . PeerId ,
Address : order . Miner ,
}
2019-09-16 20:11:17 +00:00
}
2021-04-23 07:43:41 +00:00
if order . Total . Int == nil {
finish ( xerrors . Errorf ( "cannot make retrieval deal for null total" ) )
return
}
2021-04-23 08:10:51 +00:00
2021-03-30 19:28:54 +00:00
if order . Size == 0 {
finish ( xerrors . Errorf ( "cannot make retrieval deal for zero bytes" ) )
return
2020-08-05 22:35:59 +00:00
}
2019-09-16 20:11:17 +00:00
2021-03-30 19:28:54 +00:00
/ * id , st , err := a . imgr ( ) . NewStore ( )
if err != nil {
return err
}
if err := a . imgr ( ) . AddLabel ( id , "source" , "retrieval" ) ; err != nil {
return err
} * /
2020-02-28 18:01:43 +00:00
2021-03-30 19:28:54 +00:00
ppb := types . BigDiv ( order . Total , types . NewInt ( order . Size ) )
2020-07-06 23:39:30 +00:00
2021-03-30 19:28:54 +00:00
params , err := rm . NewParamsV1 ( ppb , order . PaymentInterval , order . PaymentIntervalIncrease , shared . AllSelector ( ) , order . Piece , order . UnsealPrice )
if err != nil {
finish ( xerrors . Errorf ( "Error in retrieval params: %s" , err ) )
return
}
2020-02-28 18:01:43 +00:00
2021-03-31 17:38:02 +00:00
store , err = a . RetrievalStoreMgr . NewStore ( )
2021-03-30 19:28:54 +00:00
if err != nil {
finish ( xerrors . Errorf ( "Error setting up new store: %w" , err ) )
return
}
2020-07-31 20:24:50 +00:00
2021-03-30 19:28:54 +00:00
defer func ( ) {
_ = a . RetrievalStoreMgr . ReleaseStore ( store )
} ( )
// Subscribe to events before retrieving to avoid losing events.
subscribeEvents := make ( chan retrievalSubscribeEvent , 1 )
subscribeCtx , cancel := context . WithCancel ( ctx )
defer cancel ( )
unsubscribe := a . Retrieval . SubscribeToEvents ( func ( event rm . ClientEvent , state rm . ClientDealState ) {
// We'll check the deal IDs inside readSubscribeEvents.
if state . PayloadCID . Equals ( order . Root ) {
select {
case <- subscribeCtx . Done ( ) :
case subscribeEvents <- retrievalSubscribeEvent { event , state } :
}
2020-10-09 22:35:44 +00:00
}
2021-03-30 19:28:54 +00:00
} )
2020-07-31 20:24:50 +00:00
2021-03-30 19:28:54 +00:00
dealID , err := a . Retrieval . Retrieve (
ctx ,
order . Root ,
params ,
order . Total ,
* order . MinerPeer ,
order . Client ,
order . Miner ,
store . StoreID ( ) )
2020-07-31 20:24:50 +00:00
2021-03-30 19:28:54 +00:00
if err != nil {
unsubscribe ( )
finish ( xerrors . Errorf ( "Retrieve failed: %w" , err ) )
return
2020-10-09 22:35:44 +00:00
}
2021-03-30 19:28:54 +00:00
err = readSubscribeEvents ( ctx , dealID , subscribeEvents , events )
2020-07-31 16:27:44 +00:00
2020-10-09 22:35:44 +00:00
unsubscribe ( )
2021-03-30 19:28:54 +00:00
if err != nil {
finish ( xerrors . Errorf ( "Retrieve: %w" , err ) )
return
}
} else {
// local retrieval
st , err := ( ( * multistore . MultiStore ) ( a . Mds ) ) . Get ( * order . LocalStore )
if err != nil {
finish ( xerrors . Errorf ( "Retrieve: %w" , err ) )
return
}
2020-08-26 17:48:23 +00:00
2021-03-30 19:28:54 +00:00
store = & multiStoreRetrievalStore {
storeID : * order . LocalStore ,
store : st ,
}
2019-08-27 22:10:23 +00:00
}
2020-05-26 15:36:21 +00:00
// If ref is nil, it only fetches the data into the configured blockstore.
if ref == nil {
2020-08-11 20:04:00 +00:00
finish ( nil )
return
2020-05-26 15:36:21 +00:00
}
2020-07-31 20:24:50 +00:00
rdag := store . DAGService ( )
2020-07-31 16:27:44 +00:00
2020-03-03 04:13:08 +00:00
if ref . IsCAR {
2020-03-30 14:08:20 +00:00
f , err := os . OpenFile ( ref . Path , os . O_CREATE | os . O_WRONLY , 0644 )
2020-03-03 04:13:08 +00:00
if err != nil {
2020-08-11 20:04:00 +00:00
finish ( err )
return
2020-03-03 04:13:08 +00:00
}
2020-07-31 16:27:44 +00:00
err = car . WriteCar ( ctx , rdag , [ ] cid . Cid { order . Root } , f )
2020-03-03 04:13:08 +00:00
if err != nil {
2020-08-11 20:04:00 +00:00
finish ( err )
return
2020-03-03 04:13:08 +00:00
}
2020-08-11 20:04:00 +00:00
finish ( f . Close ( ) )
return
2020-03-03 04:13:08 +00:00
}
2020-07-31 16:27:44 +00:00
nd , err := rdag . Get ( ctx , order . Root )
2019-08-27 22:10:23 +00:00
if err != nil {
2020-08-11 20:04:00 +00:00
finish ( xerrors . Errorf ( "ClientRetrieve: %w" , err ) )
return
2019-08-27 22:10:23 +00:00
}
2020-07-31 16:27:44 +00:00
file , err := unixfile . NewUnixfsFile ( ctx , rdag , nd )
2019-12-10 04:19:59 +00:00
if err != nil {
2020-08-11 20:04:00 +00:00
finish ( xerrors . Errorf ( "ClientRetrieve: %w" , err ) )
return
2019-12-10 04:19:59 +00:00
}
2020-08-11 20:04:00 +00:00
finish ( files . WriteTo ( file , ref . Path ) )
return
2019-08-27 18:45:21 +00:00
}
2019-09-13 21:00:36 +00:00
2021-03-30 19:28:54 +00:00
type multiStoreRetrievalStore struct {
storeID multistore . StoreID
store * multistore . Store
}
func ( mrs * multiStoreRetrievalStore ) StoreID ( ) * multistore . StoreID {
return & mrs . storeID
}
func ( mrs * multiStoreRetrievalStore ) DAGService ( ) ipld . DAGService {
return mrs . store . DAG
}
2020-09-29 11:53:30 +00:00
func ( a * API ) ClientQueryAsk ( ctx context . Context , p peer . ID , miner address . Address ) ( * storagemarket . StorageAsk , error ) {
2020-08-05 20:54:45 +00:00
mi , err := a . StateMinerInfo ( ctx , miner , types . EmptyTSK )
if err != nil {
return nil , xerrors . Errorf ( "failed getting miner info: %w" , err )
}
info := utils . NewStorageProviderInfo ( miner , mi . Worker , mi . SectorSize , p , mi . Multiaddrs )
2020-09-29 11:53:30 +00:00
ask , err := a . SMDealClient . GetAsk ( ctx , info )
2019-12-17 10:46:39 +00:00
if err != nil {
return nil , err
}
2020-09-29 11:53:30 +00:00
return ask , nil
2019-09-13 21:00:36 +00:00
}
2020-04-03 22:17:57 +00:00
2020-08-12 19:40:25 +00:00
func ( a * API ) ClientCalcCommP ( ctx context . Context , inpath string ) ( * api . CommPRet , error ) {
2020-11-04 20:29:08 +00:00
// Hard-code the sector type to 32GiBV1_1, because:
2020-11-20 00:28:18 +00:00
// - ffiwrapper.GeneratePieceCIDFromFile requires a RegisteredSealProof
2020-08-12 19:40:25 +00:00
// - commP itself is sector-size independent, with rather low probability of that changing
// ( note how the final rust call is identical for every RegSP type )
// https://github.com/filecoin-project/rust-filecoin-proofs-api/blob/v5.0.0/src/seal.rs#L1040-L1050
//
// IF/WHEN this changes in the future we will have to be able to calculate
// "old style" commP, and thus will need to introduce a version switch or similar
2020-11-04 20:29:08 +00:00
arbitraryProofType := abi . RegisteredSealProof_StackedDrg32GiBV1_1
2020-04-03 22:17:57 +00:00
rdr , err := os . Open ( inpath )
if err != nil {
return nil , err
}
2020-08-20 04:49:10 +00:00
defer rdr . Close ( ) //nolint:errcheck
2020-04-03 22:17:57 +00:00
stat , err := rdr . Stat ( )
if err != nil {
return nil , err
}
2021-02-16 19:48:31 +00:00
// check that the data is a car file; if it's not, retrieval won't work
_ , _ , err = car . ReadHeader ( bufio . NewReader ( rdr ) )
if err != nil {
return nil , xerrors . Errorf ( "not a car file: %w" , err )
}
if _ , err := rdr . Seek ( 0 , io . SeekStart ) ; err != nil {
return nil , xerrors . Errorf ( "seek to start: %w" , err )
}
2020-11-20 00:28:18 +00:00
pieceReader , pieceSize := padreader . New ( rdr , uint64 ( stat . Size ( ) ) )
commP , err := ffiwrapper . GeneratePieceCIDFromFile ( arbitraryProofType , pieceReader , pieceSize )
2020-04-03 22:17:57 +00:00
if err != nil {
return nil , xerrors . Errorf ( "computing commP failed: %w" , err )
}
return & api . CommPRet {
Root : commP ,
Size : pieceSize ,
} , nil
}
2020-07-31 16:22:04 +00:00
type lenWriter int64
func ( w * lenWriter ) Write ( p [ ] byte ) ( n int , err error ) {
* w += lenWriter ( len ( p ) )
return len ( p ) , nil
}
func ( a * API ) ClientDealSize ( ctx context . Context , root cid . Cid ) ( api . DataSize , error ) {
dag := merkledag . NewDAGService ( blockservice . New ( a . CombinedBstore , offline . Exchange ( a . CombinedBstore ) ) )
w := lenWriter ( 0 )
err := car . WriteCar ( ctx , dag , [ ] cid . Cid { root } , & w )
if err != nil {
return api . DataSize { } , err
}
up := padreader . PaddedSize ( uint64 ( w ) )
return api . DataSize {
PayloadSize : int64 ( w ) ,
PieceSize : up . Padded ( ) ,
} , nil
}
2020-10-22 11:59:08 +00:00
func ( a * API ) ClientDealPieceCID ( ctx context . Context , root cid . Cid ) ( api . DataCIDSize , error ) {
dag := merkledag . NewDAGService ( blockservice . New ( a . CombinedBstore , offline . Exchange ( a . CombinedBstore ) ) )
2020-11-20 00:28:18 +00:00
w := & writer . Writer { }
bw := bufio . NewWriterSize ( w , int ( writer . CommPBuf ) )
2020-10-22 11:59:08 +00:00
2021-02-16 11:32:45 +00:00
err := car . WriteCar ( ctx , dag , [ ] cid . Cid { root } , w )
2020-10-22 11:59:08 +00:00
if err != nil {
return api . DataCIDSize { } , err
}
if err := bw . Flush ( ) ; err != nil {
return api . DataCIDSize { } , err
}
2020-11-20 00:28:18 +00:00
dataCIDSize , err := w . Sum ( )
2021-02-16 11:32:45 +00:00
return api . DataCIDSize ( dataCIDSize ) , err
2020-10-22 11:59:08 +00:00
}
2020-04-03 22:17:57 +00:00
func ( a * API ) ClientGenCar ( ctx context . Context , ref api . FileRef , outputPath string ) error {
2020-07-07 08:52:19 +00:00
id , st , err := a . imgr ( ) . NewStore ( )
2020-07-06 23:39:30 +00:00
if err != nil {
return err
}
2020-07-07 08:52:19 +00:00
if err := a . imgr ( ) . AddLabel ( id , "source" , "gen-car" ) ; err != nil {
2020-07-06 23:39:30 +00:00
return err
}
2020-04-03 22:17:57 +00:00
2020-07-06 23:39:30 +00:00
bufferedDS := ipld . NewBufferedDAG ( ctx , st . DAG )
c , err := a . clientImport ( ctx , ref , st )
2020-04-03 22:17:57 +00:00
if err != nil {
return err
}
2020-07-06 23:39:30 +00:00
// TODO: does that defer mean to remove the whole blockstore?
2020-05-27 20:53:20 +00:00
defer bufferedDS . Remove ( ctx , c ) //nolint:errcheck
2020-09-23 19:16:26 +00:00
ssb := builder . NewSelectorSpecBuilder ( basicnode . Prototype . Any )
2020-04-03 22:17:57 +00:00
// entire DAG selector
allSelector := ssb . ExploreRecursive ( selector . RecursionLimitNone ( ) ,
ssb . ExploreAll ( ssb . ExploreRecursiveEdge ( ) ) ) . Node ( )
f , err := os . Create ( outputPath )
if err != nil {
return err
}
2020-07-06 23:39:30 +00:00
sc := car . NewSelectiveCar ( ctx , st . Bstore , [ ] car . Dag { { Root : c , Selector : allSelector } } )
2020-04-03 22:17:57 +00:00
if err = sc . Write ( f ) ; err != nil {
return err
}
2020-05-27 23:15:19 +00:00
return f . Close ( )
2020-04-03 22:17:57 +00:00
}
2020-07-24 21:47:22 +00:00
func ( a * API ) clientImport ( ctx context . Context , ref api . FileRef , store * multistore . Store ) ( cid . Cid , error ) {
2020-04-03 22:17:57 +00:00
f , err := os . Open ( ref . Path )
if err != nil {
return cid . Undef , err
}
2020-08-20 04:49:10 +00:00
defer f . Close ( ) //nolint:errcheck
2020-04-03 22:17:57 +00:00
stat , err := f . Stat ( )
if err != nil {
return cid . Undef , err
}
file , err := files . NewReaderPathFile ( ref . Path , f , stat )
if err != nil {
return cid . Undef , err
}
if ref . IsCAR {
2020-07-06 23:39:30 +00:00
var st car . Store
if store . Fstore == nil {
st = store . Bstore
2020-04-03 22:17:57 +00:00
} else {
2020-07-06 23:39:30 +00:00
st = store . Fstore
2020-04-03 22:17:57 +00:00
}
2020-07-06 23:39:30 +00:00
result , err := car . LoadCar ( st , file )
2020-04-03 22:17:57 +00:00
if err != nil {
return cid . Undef , err
}
if len ( result . Roots ) != 1 {
return cid . Undef , xerrors . New ( "cannot import car with more than one root" )
}
return result . Roots [ 0 ] , nil
}
2020-07-06 23:39:30 +00:00
bufDs := ipld . NewBufferedDAG ( ctx , store . DAG )
2020-07-07 09:12:32 +00:00
prefix , err := merkledag . PrefixForCidVersion ( 1 )
if err != nil {
return cid . Undef , err
}
prefix . MhType = DefaultHashFunction
2020-04-03 22:17:57 +00:00
params := ihelper . DagBuilderParams {
2020-07-07 09:38:22 +00:00
Maxlinks : build . UnixfsLinksPerLevel ,
RawLeaves : true ,
2020-07-07 09:12:32 +00:00
CidBuilder : cidutil . InlineBuilder {
Builder : prefix ,
Limit : 126 ,
} ,
2020-07-07 09:38:22 +00:00
Dagserv : bufDs ,
2020-07-08 21:05:43 +00:00
NoCopy : true ,
2020-04-03 22:17:57 +00:00
}
db , err := params . New ( chunker . NewSizeSplitter ( file , int64 ( build . UnixfsChunkSize ) ) )
if err != nil {
return cid . Undef , err
}
nd , err := balanced . Layout ( db )
if err != nil {
return cid . Undef , err
}
2020-07-06 23:39:30 +00:00
if err := bufDs . Commit ( ) ; err != nil {
2020-04-03 22:17:57 +00:00
return cid . Undef , err
}
return nd . Cid ( ) , nil
}
2020-08-18 23:26:21 +00:00
func ( a * API ) ClientListDataTransfers ( ctx context . Context ) ( [ ] api . DataTransferChannel , error ) {
inProgressChannels , err := a . DataTransfer . InProgressChannels ( ctx )
if err != nil {
return nil , err
}
apiChannels := make ( [ ] api . DataTransferChannel , 0 , len ( inProgressChannels ) )
2020-08-19 00:36:22 +00:00
for _ , channelState := range inProgressChannels {
2020-08-20 08:18:05 +00:00
apiChannels = append ( apiChannels , api . NewDataTransferChannel ( a . Host . ID ( ) , channelState ) )
2020-08-19 00:36:22 +00:00
}
return apiChannels , nil
}
func ( a * API ) ClientDataTransferUpdates ( ctx context . Context ) ( <- chan api . DataTransferChannel , error ) {
channels := make ( chan api . DataTransferChannel )
unsub := a . DataTransfer . SubscribeToEvents ( func ( evt datatransfer . Event , channelState datatransfer . ChannelState ) {
2020-08-20 08:18:05 +00:00
channel := api . NewDataTransferChannel ( a . Host . ID ( ) , channelState )
2020-08-19 00:36:22 +00:00
select {
case <- ctx . Done ( ) :
case channels <- channel :
2020-08-18 23:26:21 +00:00
}
2020-08-19 00:36:22 +00:00
} )
go func ( ) {
defer unsub ( )
<- ctx . Done ( )
} ( )
return channels , nil
}
2020-08-27 18:32:51 +00:00
2020-10-13 10:37:00 +00:00
func ( a * API ) ClientRestartDataTransfer ( ctx context . Context , transferID datatransfer . TransferID , otherPeer peer . ID , isInitiator bool ) error {
selfPeer := a . Host . ID ( )
if isInitiator {
return a . DataTransfer . RestartDataTransferChannel ( ctx , datatransfer . ChannelID { Initiator : selfPeer , Responder : otherPeer , ID : transferID } )
}
return a . DataTransfer . RestartDataTransferChannel ( ctx , datatransfer . ChannelID { Initiator : otherPeer , Responder : selfPeer , ID : transferID } )
2020-10-22 20:40:26 +00:00
}
func ( a * API ) ClientCancelDataTransfer ( ctx context . Context , transferID datatransfer . TransferID , otherPeer peer . ID , isInitiator bool ) error {
selfPeer := a . Host . ID ( )
if isInitiator {
return a . DataTransfer . CloseDataTransferChannel ( ctx , datatransfer . ChannelID { Initiator : selfPeer , Responder : otherPeer , ID : transferID } )
}
return a . DataTransfer . CloseDataTransferChannel ( ctx , datatransfer . ChannelID { Initiator : otherPeer , Responder : selfPeer , ID : transferID } )
2020-10-13 10:37:00 +00:00
}
2020-09-04 05:34:59 +00:00
func ( a * API ) ClientRetrieveTryRestartInsufficientFunds ( ctx context . Context , paymentChannel address . Address ) error {
return a . Retrieval . TryRestartInsufficientFunds ( paymentChannel )
}
2020-10-07 04:57:51 +00:00
func ( a * API ) ClientGetDealStatus ( ctx context . Context , statusCode uint64 ) ( string , error ) {
ststr , ok := storagemarket . DealStates [ statusCode ]
if ! ok {
return "" , fmt . Errorf ( "no such deal state %d" , statusCode )
}
return ststr , nil
}