lotus/node/impl/client/client.go

1138 lines
34 KiB
Go
Raw Normal View History

2019-09-16 13:46:05 +00:00
package client
2019-08-20 16:48:33 +00:00
import (
"bufio"
2019-08-20 16:48:33 +00:00
"context"
"fmt"
2019-10-23 09:18:22 +00:00
"io"
2021-07-03 07:25:20 +00:00
"math/rand"
2019-08-20 16:48:33 +00:00
"os"
"sort"
2021-05-11 02:26:04 +00:00
"time"
2019-08-20 16:48:33 +00:00
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
2021-07-03 04:40:56 +00:00
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
"golang.org/x/xerrors"
2020-10-26 14:16:10 +00:00
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/dline"
2019-09-06 22:39:47 +00:00
"github.com/ipfs/go-blockservice"
2019-08-20 16:48:33 +00:00
"github.com/ipfs/go-cid"
2019-09-06 22:39:47 +00:00
offline "github.com/ipfs/go-ipfs-exchange-offline"
2019-08-20 16:48:33 +00:00
files "github.com/ipfs/go-ipfs-files"
2019-09-06 22:39:47 +00:00
"github.com/ipfs/go-merkledag"
unixfile "github.com/ipfs/go-unixfs/file"
"github.com/ipld/go-car"
2020-07-07 08:52:19 +00:00
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/libp2p/go-libp2p-core/host"
2019-08-20 16:48:33 +00:00
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multibase"
2020-07-07 09:12:32 +00:00
mh "github.com/multiformats/go-multihash"
2019-08-20 16:48:33 +00:00
"go.uber.org/fx"
2019-09-06 22:39:47 +00:00
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-commp-utils/ffiwrapper"
"github.com/filecoin-project/go-commp-utils/writer"
datatransfer "github.com/filecoin-project/go-data-transfer"
2020-09-29 11:53:30 +00:00
"github.com/filecoin-project/go-fil-markets/discovery"
2020-08-05 22:35:59 +00:00
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
2020-06-23 19:22:33 +00:00
rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/v3/actors/builtin/market"
2020-02-13 00:15:33 +00:00
2020-08-11 20:04:00 +00:00
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/markets/utils"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/impl/paych"
2020-07-07 08:52:19 +00:00
"github.com/filecoin-project/lotus/node/modules/dtypes"
2020-07-06 23:39:30 +00:00
"github.com/filecoin-project/lotus/node/repo/importmgr"
2019-08-20 16:48:33 +00:00
)
2020-07-07 09:12:32 +00:00
var DefaultHashFunction = uint64(mh.BLAKE2B_MIN + 31)
2020-07-07 09:38:22 +00:00
// 8 days ~= SealDuration + PreCommit + MaxProveCommitDuration + 8 hour buffer
const dealStartBufferHours uint64 = 8 * 24
2019-09-16 13:46:05 +00:00
type API struct {
2019-08-20 16:48:33 +00:00
fx.In
2019-09-16 13:46:05 +00:00
full.ChainAPI
full.WalletAPI
paych.PaychAPI
full.StateAPI
2019-08-20 16:48:33 +00:00
SMDealClient storagemarket.StorageClient
2020-09-29 11:53:30 +00:00
RetDiscovery discovery.PeerResolver
2020-06-23 19:22:33 +00:00
Retrieval rm.RetrievalClient
2019-09-06 22:39:47 +00:00
Chain *store.ChainStore
2019-08-20 16:48:33 +00:00
2020-07-07 08:52:19 +00:00
Imports dtypes.ClientImportMgr
2020-07-07 12:35:02 +00:00
2021-07-03 08:31:32 +00:00
DataTransfer dtypes.ClientDataTransfer
Host host.Host
// TODO How do we inject the Repo Path here ?
2019-08-20 16:48:33 +00:00
}
2020-09-10 06:30:47 +00:00
func calcDealExpiration(minDuration uint64, md *dline.Info, startEpoch abi.ChainEpoch) abi.ChainEpoch {
2020-04-21 21:38:26 +00:00
// Make sure we give some time for the miner to seal
2020-04-30 17:42:16 +00:00
minExp := startEpoch + abi.ChainEpoch(minDuration)
2020-04-21 21:38:26 +00:00
// Align on miners ProvingPeriodBoundary
exp := minExp + md.WPoStProvingPeriod - (minExp % md.WPoStProvingPeriod) + (md.PeriodStart % md.WPoStProvingPeriod) - 1
// Should only be possible for miners created around genesis
for exp < minExp {
exp += md.WPoStProvingPeriod
}
return exp
2020-04-21 21:38:26 +00:00
}
2020-07-07 08:52:19 +00:00
func (a *API) imgr() *importmgr.Mgr {
return a.Imports
}
func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) {
return a.dealStarter(ctx, params, false)
}
func (a *API) ClientStatelessDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) {
return a.dealStarter(ctx, params, true)
}
func (a *API) dealStarter(ctx context.Context, params *api.StartDealParams, isStateless bool) (*cid.Cid, error) {
var CARV2FilePath string
if isStateless {
if params.Data.TransferType != storagemarket.TTManual {
return nil, xerrors.Errorf("invalid transfer type %s for stateless storage deal", params.Data.TransferType)
}
if !params.EpochPrice.IsZero() {
return nil, xerrors.New("stateless storage deals can only be initiated with storage price of 0")
}
} else if params.Data.TransferType == storagemarket.TTGraphsync {
2021-07-03 08:31:32 +00:00
c, err := a.imgr().CARV2FilePathFor(params.Data.Root)
2021-07-03 04:40:56 +00:00
if err != nil {
2021-07-03 08:31:32 +00:00
return nil, xerrors.Errorf("failed to find CARv2 file path: %w", err)
2021-07-03 04:40:56 +00:00
}
2021-07-03 08:31:32 +00:00
if c == "" {
return nil, xerrors.New("no CARv2 file path for deal")
}
2021-07-03 08:31:32 +00:00
CARV2FilePath = c
}
walletKey, err := a.StateAccountKey(ctx, params.Wallet, types.EmptyTSK)
if err != nil {
2021-06-25 08:48:47 +00:00
return nil, xerrors.Errorf("failed resolving params.Wallet addr (%s): %w", params.Wallet, err)
}
exist, err := a.WalletHas(ctx, walletKey)
2019-08-20 16:48:33 +00:00
if err != nil {
2021-06-25 08:48:47 +00:00
return nil, xerrors.Errorf("failed getting addr from wallet (%s): %w", params.Wallet, err)
}
if !exist {
return nil, xerrors.Errorf("provided address doesn't exist in wallet")
2019-08-20 16:48:33 +00:00
}
mi, err := a.StateMinerInfo(ctx, params.Miner, types.EmptyTSK)
2019-08-20 16:48:33 +00:00
if err != nil {
return nil, xerrors.Errorf("failed getting peer ID: %w", err)
}
md, err := a.StateMinerProvingDeadline(ctx, params.Miner, types.EmptyTSK)
if err != nil {
2020-07-07 22:29:45 +00:00
return nil, xerrors.Errorf("failed getting miner's deadline info: %w", err)
}
if uint64(params.Data.PieceSize.Padded()) > uint64(mi.SectorSize) {
return nil, xerrors.New("data doesn't fit in a sector")
}
2020-04-30 17:42:16 +00:00
dealStart := params.DealStartEpoch
if dealStart <= 0 { // unset, or explicitly 'epoch undefined'
ts, err := a.ChainHead(ctx)
if err != nil {
return nil, xerrors.Errorf("failed getting chain height: %w", err)
}
blocksPerHour := 60 * 60 / build.BlockDelaySecs
2020-10-02 17:31:21 +00:00
dealStart = ts.Height() + abi.ChainEpoch(dealStartBufferHours*blocksPerHour) // TODO: Get this from storage ask
}
networkVersion, err := a.StateNetworkVersion(ctx, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("failed to get network version: %w", err)
}
st, err := miner.PreferredSealProofTypeFromWindowPoStType(networkVersion, mi.WindowPoStProofType)
if err != nil {
return nil, xerrors.Errorf("failed to get seal proof type: %w", err)
}
// regular flow
if !isStateless {
providerInfo := utils.NewStorageProviderInfo(params.Miner, mi.Worker, mi.SectorSize, *mi.PeerId, mi.Multiaddrs)
result, err := a.SMDealClient.ProposeStorageDeal(ctx, storagemarket.ProposeStorageDealParams{
Addr: params.Wallet,
Info: &providerInfo,
Data: params.Data,
StartEpoch: dealStart,
EndEpoch: calcDealExpiration(params.MinBlocksDuration, md, dealStart),
Price: params.EpochPrice,
Collateral: params.ProviderCollateral,
Rt: st,
FastRetrieval: params.FastRetrieval,
VerifiedDeal: params.VerifiedDeal,
CARV2FilePath: CARV2FilePath,
})
if err != nil {
return nil, xerrors.Errorf("failed to start deal: %w", err)
}
2019-08-20 16:48:33 +00:00
return &result.ProposalCid, nil
}
//
// stateless flow from here to the end
//
dealProposal := &market.DealProposal{
PieceCID: *params.Data.PieceCid,
PieceSize: params.Data.PieceSize.Padded(),
Client: walletKey,
Provider: params.Miner,
Label: params.Data.Root.Encode(multibase.MustNewEncoder('u')),
StartEpoch: dealStart,
EndEpoch: calcDealExpiration(params.MinBlocksDuration, md, dealStart),
StoragePricePerEpoch: big.Zero(),
ProviderCollateral: params.ProviderCollateral,
ClientCollateral: big.Zero(),
VerifiedDeal: params.VerifiedDeal,
}
if dealProposal.ProviderCollateral.IsZero() {
networkCollateral, err := a.StateDealProviderCollateralBounds(ctx, params.Data.PieceSize.Padded(), params.VerifiedDeal, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("failed to determine minimum provider collateral: %w", err)
}
dealProposal.ProviderCollateral = networkCollateral.Min
}
dealProposalSerialized, err := cborutil.Dump(dealProposal)
if err != nil {
return nil, xerrors.Errorf("failed to serialize deal proposal: %w", err)
}
dealProposalSig, err := a.WalletSign(ctx, walletKey, dealProposalSerialized)
if err != nil {
return nil, xerrors.Errorf("failed to sign proposal : %w", err)
}
dealProposalSigned := &market.ClientDealProposal{
Proposal: *dealProposal,
ClientSignature: *dealProposalSig,
}
dStream, err := network.NewFromLibp2pHost(a.Host,
2021-05-11 02:26:04 +00:00
// params duplicated from .../node/modules/client.go
// https://github.com/filecoin-project/lotus/pull/5961#discussion_r629768011
network.RetryParameters(time.Second, 5*time.Minute, 15, 5),
).NewDealStream(ctx, *mi.PeerId)
if err != nil {
return nil, xerrors.Errorf("opening dealstream to %s/%s failed: %w", params.Miner, *mi.PeerId, err)
}
if err = dStream.WriteDealProposal(network.Proposal{
FastRetrieval: true,
DealProposal: dealProposalSigned,
Piece: &storagemarket.DataRef{
TransferType: storagemarket.TTManual,
Root: params.Data.Root,
PieceCid: params.Data.PieceCid,
PieceSize: params.Data.PieceSize,
},
}); err != nil {
return nil, xerrors.Errorf("sending deal proposal failed: %w", err)
}
resp, _, err := dStream.ReadDealResponse()
if err != nil {
return nil, xerrors.Errorf("reading proposal response failed: %w", err)
}
dealProposalIpld, err := cborutil.AsIpld(dealProposalSigned)
if err != nil {
return nil, xerrors.Errorf("serializing proposal node failed: %w", err)
}
if !dealProposalIpld.Cid().Equals(resp.Response.Proposal) {
return nil, xerrors.Errorf("provider returned proposal cid %s but we expected %s", resp.Response.Proposal, dealProposalIpld.Cid())
}
if resp.Response.State != storagemarket.StorageDealWaitingForData {
return nil, xerrors.Errorf("provider returned unexpected state %d for proposal %s, with message: %s", resp.Response.State, resp.Response.Proposal, resp.Response.Message)
}
return &resp.Response.Proposal, nil
2019-08-20 16:48:33 +00:00
}
2019-09-16 13:46:05 +00:00
func (a *API) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) {
deals, err := a.SMDealClient.ListLocalDeals(ctx)
2019-09-10 14:13:24 +00:00
if err != nil {
return nil, err
}
// Get a map of transfer ID => DataTransfer
dataTransfersByID, err := a.transfersByID(ctx)
if err != nil {
return nil, err
}
2019-09-10 14:13:24 +00:00
out := make([]api.DealInfo, len(deals))
for k, v := range deals {
// Find the data transfer associated with this deal
var transferCh *api.DataTransferChannel
if v.TransferChannelID != nil {
if ch, ok := dataTransfersByID[*v.TransferChannelID]; ok {
transferCh = &ch
}
2019-09-10 14:13:24 +00:00
}
out[k] = a.newDealInfoWithTransfer(transferCh, v)
2019-09-10 14:13:24 +00:00
}
return out, nil
}
func (a *API) transfersByID(ctx context.Context) (map[datatransfer.ChannelID]api.DataTransferChannel, error) {
inProgressChannels, err := a.DataTransfer.InProgressChannels(ctx)
if err != nil {
return nil, err
}
dataTransfersByID := make(map[datatransfer.ChannelID]api.DataTransferChannel, len(inProgressChannels))
for id, channelState := range inProgressChannels {
ch := api.NewDataTransferChannel(a.Host.ID(), channelState)
dataTransfersByID[id] = ch
}
return dataTransfersByID, nil
}
2019-11-06 19:44:28 +00:00
func (a *API) ClientGetDealInfo(ctx context.Context, d cid.Cid) (*api.DealInfo, error) {
v, err := a.SMDealClient.GetLocalDeal(ctx, d)
2019-11-06 19:44:28 +00:00
if err != nil {
return nil, err
}
di := a.newDealInfo(ctx, v)
return &di, nil
2019-11-06 19:44:28 +00:00
}
2020-08-27 18:32:51 +00:00
func (a *API) ClientGetDealUpdates(ctx context.Context) (<-chan api.DealInfo, error) {
updates := make(chan api.DealInfo)
unsub := a.SMDealClient.SubscribeToEvents(func(_ storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
updates <- a.newDealInfo(ctx, deal)
2020-08-27 18:32:51 +00:00
})
go func() {
defer unsub()
<-ctx.Done()
}()
return updates, nil
}
func (a *API) newDealInfo(ctx context.Context, v storagemarket.ClientDeal) api.DealInfo {
// Find the data transfer associated with this deal
var transferCh *api.DataTransferChannel
if v.TransferChannelID != nil {
state, err := a.DataTransfer.ChannelState(ctx, *v.TransferChannelID)
// Note: If there was an error just ignore it, as the data transfer may
// be not found if it's no longer active
if err == nil {
ch := api.NewDataTransferChannel(a.Host.ID(), state)
ch.Stages = state.Stages()
transferCh = &ch
}
}
di := a.newDealInfoWithTransfer(transferCh, v)
di.DealStages = v.DealStages
return di
}
func (a *API) newDealInfoWithTransfer(transferCh *api.DataTransferChannel, v storagemarket.ClientDeal) api.DealInfo {
return api.DealInfo{
ProposalCid: v.ProposalCid,
DataRef: v.DataRef,
State: v.State,
Message: v.Message,
Provider: v.Proposal.Provider,
PieceCID: v.Proposal.PieceCID,
Size: uint64(v.Proposal.PieceSize.Unpadded()),
PricePerEpoch: v.Proposal.StoragePricePerEpoch,
Duration: uint64(v.Proposal.Duration()),
DealID: v.DealID,
CreationTime: v.CreationTime.Time(),
Verified: v.Proposal.VerifiedDeal,
TransferChannelID: v.TransferChannelID,
DataTransfer: transferCh,
}
}
2019-09-16 13:46:05 +00:00
func (a *API) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) {
2019-08-26 13:45:36 +00:00
// TODO: check if we have the ENTIRE dag
2021-07-03 08:31:32 +00:00
carv2Path, err := a.imgr().CARV2FilePathFor(root)
2019-08-26 13:45:36 +00:00
if err != nil {
2021-07-03 08:31:32 +00:00
return false, err
2021-07-03 07:25:20 +00:00
}
2021-07-03 08:31:32 +00:00
if len(carv2Path) == 0 {
return false, nil
2019-08-26 13:45:36 +00:00
}
2021-07-03 08:31:32 +00:00
return true, nil
2019-08-26 13:45:36 +00:00
}
2020-07-09 16:29:57 +00:00
func (a *API) ClientFindData(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) {
2019-08-26 13:45:36 +00:00
peers, err := a.RetDiscovery.GetPeers(root)
if err != nil {
return nil, err
}
2020-07-09 16:29:57 +00:00
out := make([]api.QueryOffer, 0, len(peers))
for _, p := range peers {
if piece != nil && !piece.Equals(*p.PieceCID) {
continue
}
out = append(out, a.makeRetrievalQuery(ctx, p, root, piece, rm.QueryParams{}))
2019-08-26 13:45:36 +00:00
}
return out, nil
}
func (a *API) ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) {
mi, err := a.StateMinerInfo(ctx, miner, types.EmptyTSK)
if err != nil {
return api.QueryOffer{}, err
}
2020-06-23 19:22:33 +00:00
rp := rm.RetrievalPeer{
Address: miner,
ID: *mi.PeerId,
}
return a.makeRetrievalQuery(ctx, rp, root, piece, rm.QueryParams{}), nil
}
2020-07-09 16:29:57 +00:00
func (a *API) makeRetrievalQuery(ctx context.Context, rp rm.RetrievalPeer, payload cid.Cid, piece *cid.Cid, qp rm.QueryParams) api.QueryOffer {
queryResponse, err := a.Retrieval.Query(ctx, rp, payload, qp)
if err != nil {
2020-08-05 22:35:59 +00:00
return api.QueryOffer{Err: err.Error(), Miner: rp.Address, MinerPeer: rp}
}
var errStr string
switch queryResponse.Status {
2020-06-23 19:22:33 +00:00
case rm.QueryResponseAvailable:
errStr = ""
2020-06-23 19:22:33 +00:00
case rm.QueryResponseUnavailable:
errStr = fmt.Sprintf("retrieval query offer was unavailable: %s", queryResponse.Message)
2020-06-23 19:22:33 +00:00
case rm.QueryResponseError:
errStr = fmt.Sprintf("retrieval query offer errored: %s", queryResponse.Message)
}
return api.QueryOffer{
Root: payload,
2020-07-09 16:29:57 +00:00
Piece: piece,
Size: queryResponse.Size,
MinPrice: queryResponse.PieceRetrievalPrice(),
UnsealPrice: queryResponse.UnsealPrice,
PaymentInterval: queryResponse.MaxPaymentInterval,
PaymentIntervalIncrease: queryResponse.MaxPaymentIntervalIncrease,
Miner: queryResponse.PaymentAddress, // TODO: check
2020-08-05 22:35:59 +00:00
MinerPeer: rp,
Err: errStr,
}
}
func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (res *api.ImportRes, finalErr error) {
2021-07-03 07:25:20 +00:00
id, err := a.imgr().NewStore()
2020-07-06 23:39:30 +00:00
if err != nil {
2020-07-07 11:45:02 +00:00
return nil, err
2020-07-06 23:39:30 +00:00
}
2021-07-03 04:40:56 +00:00
carV2File, err := a.imgr().NewTempFile(id)
if err != nil {
return nil, xerrors.Errorf("failed to create temp CARv2 file: %w", err)
}
// make sure to remove the CARv2 file if anything goes wrong from here on.
defer func() {
if finalErr != nil {
_ = os.Remove(carV2File)
}
}()
var root cid.Cid
if ref.IsCAR {
root, err = transformCarToCARv2(ref.Path, carV2File)
if err != nil {
return nil, xerrors.Errorf("failed to import CAR file: %w", err)
}
} else {
2021-07-03 07:25:20 +00:00
root, err = a.importNormalFileToCARv2(ctx, id, ref.Path, carV2File)
if err != nil {
return nil, xerrors.Errorf("failed to import normal file to CARv2: %w", err)
}
}
2020-07-07 09:38:09 +00:00
if err := a.imgr().AddLabel(id, importmgr.LSource, "import"); err != nil {
2020-07-07 11:45:02 +00:00
return nil, err
2020-07-06 23:39:30 +00:00
}
2020-07-07 09:38:09 +00:00
if err := a.imgr().AddLabel(id, importmgr.LFileName, ref.Path); err != nil {
2020-07-07 11:45:02 +00:00
return nil, err
2020-07-07 09:38:09 +00:00
}
if err := a.imgr().AddLabel(id, importmgr.LCARv2FilePath, carV2File); err != nil {
2020-07-07 11:45:02 +00:00
return nil, err
2019-08-20 16:48:33 +00:00
}
if err := a.imgr().AddLabel(id, importmgr.LRootCid, root.String()); err != nil {
2020-07-07 11:45:02 +00:00
return nil, err
2020-07-07 09:38:09 +00:00
}
2020-07-07 11:45:02 +00:00
return &api.ImportRes{
Root: root,
2020-07-07 11:45:02 +00:00
ImportID: id,
}, nil
}
2021-07-03 07:25:20 +00:00
func (a *API) ClientRemoveImport(ctx context.Context, importID uint64) error {
info, err := a.imgr().Info(importID)
2020-07-06 23:39:30 +00:00
if err != nil {
2021-07-03 07:25:20 +00:00
return xerrors.Errorf("failed to fetch import info: %w", err)
2020-07-06 23:39:30 +00:00
}
// remove the CARv2 file if we've created one.
if path := info.Labels[importmgr.LCARv2FilePath]; path != "" {
_ = os.Remove(path)
2020-07-06 23:39:30 +00:00
}
return a.imgr().Remove(importID)
}
2019-10-23 09:18:22 +00:00
2021-07-03 07:25:20 +00:00
func (a *API) ClientImportLocal(ctx context.Context, r io.Reader) (cid.Cid, error) {
// write payload to temp file
tmpPath, err := a.imgr().NewTempFile(rand.Uint64())
2020-07-07 09:12:32 +00:00
if err != nil {
return cid.Undef, err
}
2021-07-03 07:25:20 +00:00
defer os.Remove(tmpPath) //nolint:errcheck
tmpF, err := os.Open(tmpPath)
if err != nil {
2021-07-03 07:25:20 +00:00
return cid.Undef, err
2019-10-23 09:18:22 +00:00
}
2021-07-03 07:25:20 +00:00
defer tmpF.Close() //nolint:errcheck
if _, err := io.Copy(tmpF, r); err != nil {
return cid.Undef, err
}
2021-07-03 07:25:20 +00:00
if err := tmpF.Close(); err != nil {
return cid.Undef, err
}
2019-10-23 09:18:22 +00:00
2021-07-03 07:25:20 +00:00
res, err := a.ClientImport(ctx, api.FileRef{
Path: tmpPath,
IsCAR: false,
})
if err != nil {
return cid.Undef, err
}
return res.Root, nil
2019-10-23 09:18:22 +00:00
}
2019-09-16 13:46:05 +00:00
func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) {
2021-07-03 04:40:56 +00:00
importIDs, err := a.imgr().List()
if err != nil {
return nil, xerrors.Errorf("failed to fetch imports: %w", err)
}
2019-08-20 16:48:33 +00:00
2020-07-07 08:52:19 +00:00
out := make([]api.Import, len(importIDs))
for i, id := range importIDs {
info, err := a.imgr().Info(id)
if err != nil {
out[i] = api.Import{
Key: id,
2020-07-07 09:12:32 +00:00
Err: xerrors.Errorf("getting info: %w", err).Error(),
2020-07-07 08:52:19 +00:00
}
continue
}
2019-08-20 16:48:33 +00:00
2020-07-07 08:52:19 +00:00
ai := api.Import{
2021-07-03 04:40:56 +00:00
Key: id,
Source: info.Labels[importmgr.LSource],
FilePath: info.Labels[importmgr.LFileName],
CARv2FilePath: info.Labels[importmgr.LCARv2FilePath],
2019-08-20 16:48:33 +00:00
}
2020-07-07 08:52:19 +00:00
if info.Labels[importmgr.LRootCid] != "" {
c, err := cid.Parse(info.Labels[importmgr.LRootCid])
if err != nil {
2020-07-07 09:12:32 +00:00
ai.Err = err.Error()
2020-07-07 08:52:19 +00:00
} else {
ai.Root = &c
}
}
2020-07-07 08:52:19 +00:00
out[i] = ai
2019-08-20 16:48:33 +00:00
}
2020-07-07 08:52:19 +00:00
return out, nil
2019-08-20 16:48:33 +00:00
}
2019-08-27 18:45:21 +00:00
func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID retrievalmarket.DealID) error {
2021-03-24 12:36:21 +00:00
cerr := make(chan error)
go func() {
err := a.Retrieval.CancelDeal(dealID)
2021-03-24 12:36:21 +00:00
select {
case cerr <- err:
case <-ctx.Done():
}
}()
select {
case err := <-cerr:
if err != nil {
return xerrors.Errorf("failed to cancel retrieval deal: %w", err)
2021-03-24 12:36:21 +00:00
}
return nil
case <-ctx.Done():
return xerrors.Errorf("context timeout while canceling retrieval deal: %w", ctx.Err())
2021-03-24 12:36:21 +00:00
}
}
func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error {
2021-07-03 04:40:56 +00:00
if ref == nil || ref.Path == "" {
return xerrors.New("must pass output file path for the retrieval deal")
}
events := make(chan marketevents.RetrievalEvent)
go a.clientRetrieve(ctx, order, ref, events)
2020-08-18 13:27:56 +00:00
for {
select {
case evt, ok := <-events:
if !ok { // done successfully
return nil
}
if evt.Err != "" {
return xerrors.Errorf("retrieval failed: %s", evt.Err)
}
case <-ctx.Done():
return xerrors.Errorf("retrieval timed out")
}
}
}
func (a *API) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
2021-07-03 04:40:56 +00:00
if ref == nil || ref.Path == "" {
return nil, xerrors.New("must pass output file path for the retrieval deal")
}
2020-08-11 20:04:00 +00:00
events := make(chan marketevents.RetrievalEvent)
go a.clientRetrieve(ctx, order, ref, events)
return events, nil
}
type retrievalSubscribeEvent struct {
event rm.ClientEvent
state rm.ClientDealState
}
func readSubscribeEvents(ctx context.Context, dealID retrievalmarket.DealID, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error {
for {
var subscribeEvent retrievalSubscribeEvent
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
case subscribeEvent = <-subscribeEvents:
if subscribeEvent.state.ID != dealID {
// we can't check the deal ID ahead of time because:
// 1. We need to subscribe before retrieving.
// 2. We won't know the deal ID until after retrieving.
continue
}
}
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
case events <- marketevents.RetrievalEvent{
Event: subscribeEvent.event,
Status: subscribeEvent.state.Status,
BytesReceived: subscribeEvent.state.TotalReceived,
FundsSpent: subscribeEvent.state.FundsSpent,
}:
}
state := subscribeEvent.state
switch state.Status {
case rm.DealStatusCompleted:
return nil
case rm.DealStatusRejected:
return xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message)
case
rm.DealStatusDealNotFound,
rm.DealStatusErrored:
return xerrors.Errorf("Retrieval Error: %s", state.Message)
}
}
}
2020-08-11 20:04:00 +00:00
func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef, events chan marketevents.RetrievalEvent) {
defer close(events)
finish := func(e error) {
if e != nil {
events <- marketevents.RetrievalEvent{Err: e.Error(), FundsSpent: big.Zero()}
}
2020-08-11 20:04:00 +00:00
}
2021-07-03 04:40:56 +00:00
var carV2FilePath string
if order.LocalCARV2FilePath == "" {
2021-03-30 19:28:54 +00:00
if order.MinerPeer == nil || order.MinerPeer.ID == "" {
mi, err := a.StateMinerInfo(ctx, order.Miner, types.EmptyTSK)
if err != nil {
finish(err)
return
}
2019-09-16 20:11:17 +00:00
2021-03-30 19:28:54 +00:00
order.MinerPeer = &retrievalmarket.RetrievalPeer{
ID: *mi.PeerId,
Address: order.Miner,
}
2019-09-16 20:11:17 +00:00
}
if order.Total.Int == nil {
finish(xerrors.Errorf("cannot make retrieval deal for null total"))
return
}
2021-04-23 08:10:51 +00:00
2021-03-30 19:28:54 +00:00
if order.Size == 0 {
finish(xerrors.Errorf("cannot make retrieval deal for zero bytes"))
return
2020-08-05 22:35:59 +00:00
}
2019-09-16 20:11:17 +00:00
2021-03-30 19:28:54 +00:00
ppb := types.BigDiv(order.Total, types.NewInt(order.Size))
2020-07-06 23:39:30 +00:00
2021-03-30 19:28:54 +00:00
params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, shared.AllSelector(), order.Piece, order.UnsealPrice)
if err != nil {
finish(xerrors.Errorf("Error in retrieval params: %s", err))
return
}
2021-03-30 19:28:54 +00:00
// Subscribe to events before retrieving to avoid losing events.
subscribeEvents := make(chan retrievalSubscribeEvent, 1)
subscribeCtx, cancel := context.WithCancel(ctx)
defer cancel()
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
// We'll check the deal IDs inside readSubscribeEvents.
if state.PayloadCID.Equals(order.Root) {
select {
case <-subscribeCtx.Done():
case subscribeEvents <- retrievalSubscribeEvent{event, state}:
}
}
2021-03-30 19:28:54 +00:00
})
2021-07-03 04:40:56 +00:00
resp, err := a.Retrieval.Retrieve(
2021-03-30 19:28:54 +00:00
ctx,
order.Root,
params,
order.Total,
*order.MinerPeer,
order.Client,
2021-07-03 04:40:56 +00:00
order.Miner)
2021-03-30 19:28:54 +00:00
if err != nil {
unsubscribe()
finish(xerrors.Errorf("Retrieve failed: %w", err))
return
}
2021-07-03 04:40:56 +00:00
err = readSubscribeEvents(ctx, resp.DealID, subscribeEvents, events)
unsubscribe()
2021-03-30 19:28:54 +00:00
if err != nil {
finish(xerrors.Errorf("Retrieve: %w", err))
return
}
2019-08-27 22:10:23 +00:00
2021-07-03 04:40:56 +00:00
carV2FilePath = resp.CarFilePath
2021-07-03 07:25:20 +00:00
// remove the temp CARv2 file when retrieval is complete
defer os.Remove(carV2FilePath) //nolint:errcheck
2021-07-03 04:40:56 +00:00
} else {
carV2FilePath = order.LocalCARV2FilePath
}
if ref.IsCAR {
2021-07-03 04:40:56 +00:00
// user wants a CAR file, transform the CARv2 to a CARv1 and write it out.
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
2020-08-11 20:04:00 +00:00
finish(err)
return
}
2021-07-03 04:40:56 +00:00
carv2Reader, err := carv2.NewReaderMmap(carV2FilePath)
if err != nil {
2020-08-11 20:04:00 +00:00
finish(err)
return
}
2021-07-03 07:25:20 +00:00
defer carv2Reader.Close() //nolint:errcheck
2021-07-03 04:40:56 +00:00
if _, err := io.Copy(f, carv2Reader.CarV1Reader()); err != nil {
finish(err)
return
}
2020-08-11 20:04:00 +00:00
finish(f.Close())
return
}
2021-07-03 04:40:56 +00:00
rw, err := blockstore.OpenReadOnly(carV2FilePath, false)
if err != nil {
finish(err)
return
}
2021-07-03 07:25:20 +00:00
defer rw.Close() //nolint:errcheck
2021-07-03 04:40:56 +00:00
bsvc := blockservice.New(rw, offline.Exchange(rw))
dag := merkledag.NewDAGService(bsvc)
nd, err := dag.Get(ctx, order.Root)
2019-08-27 22:10:23 +00:00
if err != nil {
2020-08-11 20:04:00 +00:00
finish(xerrors.Errorf("ClientRetrieve: %w", err))
return
2019-08-27 22:10:23 +00:00
}
2021-07-03 04:40:56 +00:00
file, err := unixfile.NewUnixfsFile(ctx, dag, nd)
if err != nil {
2020-08-11 20:04:00 +00:00
finish(xerrors.Errorf("ClientRetrieve: %w", err))
return
}
2020-08-11 20:04:00 +00:00
finish(files.WriteTo(file, ref.Path))
return
2019-08-27 18:45:21 +00:00
}
2019-09-13 21:00:36 +00:00
func (a *API) ClientListRetrievals(ctx context.Context) ([]api.RetrievalInfo, error) {
deals, err := a.Retrieval.ListDeals()
if err != nil {
return nil, err
}
dataTransfersByID, err := a.transfersByID(ctx)
if err != nil {
return nil, err
}
out := make([]api.RetrievalInfo, 0, len(deals))
for _, v := range deals {
// Find the data transfer associated with this deal
var transferCh *api.DataTransferChannel
if v.ChannelID != nil {
if ch, ok := dataTransfersByID[*v.ChannelID]; ok {
transferCh = &ch
}
}
out = append(out, a.newRetrievalInfoWithTransfer(transferCh, v))
}
sort.Slice(out, func(a, b int) bool {
return out[a].ID < out[b].ID
})
return out, nil
}
func (a *API) ClientGetRetrievalUpdates(ctx context.Context) (<-chan api.RetrievalInfo, error) {
updates := make(chan api.RetrievalInfo)
unsub := a.Retrieval.SubscribeToEvents(func(_ rm.ClientEvent, deal rm.ClientDealState) {
updates <- a.newRetrievalInfo(ctx, deal)
})
go func() {
defer unsub()
<-ctx.Done()
}()
return updates, nil
}
func (a *API) newRetrievalInfoWithTransfer(ch *api.DataTransferChannel, deal rm.ClientDealState) api.RetrievalInfo {
return api.RetrievalInfo{
PayloadCID: deal.PayloadCID,
ID: deal.ID,
PieceCID: deal.PieceCID,
PricePerByte: deal.PricePerByte,
UnsealPrice: deal.UnsealPrice,
Status: deal.Status,
Message: deal.Message,
Provider: deal.Sender,
BytesReceived: deal.TotalReceived,
BytesPaidFor: deal.BytesPaidFor,
TotalPaid: deal.FundsSpent,
TransferChannelID: deal.ChannelID,
DataTransfer: ch,
}
}
func (a *API) newRetrievalInfo(ctx context.Context, v rm.ClientDealState) api.RetrievalInfo {
// Find the data transfer associated with this deal
var transferCh *api.DataTransferChannel
if v.ChannelID != nil {
state, err := a.DataTransfer.ChannelState(ctx, *v.ChannelID)
// Note: If there was an error just ignore it, as the data transfer may
// be not found if it's no longer active
if err == nil {
ch := api.NewDataTransferChannel(a.Host.ID(), state)
ch.Stages = state.Stages()
transferCh = &ch
}
}
return a.newRetrievalInfoWithTransfer(transferCh, v)
}
2020-09-29 11:53:30 +00:00
func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) {
2020-08-05 20:54:45 +00:00
mi, err := a.StateMinerInfo(ctx, miner, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("failed getting miner info: %w", err)
}
info := utils.NewStorageProviderInfo(miner, mi.Worker, mi.SectorSize, p, mi.Multiaddrs)
2020-09-29 11:53:30 +00:00
ask, err := a.SMDealClient.GetAsk(ctx, info)
if err != nil {
return nil, err
}
2020-09-29 11:53:30 +00:00
return ask, nil
2019-09-13 21:00:36 +00:00
}
func (a *API) ClientCalcCommP(ctx context.Context, inpath string) (*api.CommPRet, error) {
// Hard-code the sector type to 32GiBV1_1, because:
// - ffiwrapper.GeneratePieceCIDFromFile requires a RegisteredSealProof
// - commP itself is sector-size independent, with rather low probability of that changing
// ( note how the final rust call is identical for every RegSP type )
// https://github.com/filecoin-project/rust-filecoin-proofs-api/blob/v5.0.0/src/seal.rs#L1040-L1050
//
// IF/WHEN this changes in the future we will have to be able to calculate
// "old style" commP, and thus will need to introduce a version switch or similar
arbitraryProofType := abi.RegisteredSealProof_StackedDrg32GiBV1_1
rdr, err := os.Open(inpath)
if err != nil {
return nil, err
}
defer rdr.Close() //nolint:errcheck
stat, err := rdr.Stat()
if err != nil {
return nil, err
}
2021-02-16 19:48:31 +00:00
// check that the data is a car file; if it's not, retrieval won't work
_, _, err = car.ReadHeader(bufio.NewReader(rdr))
if err != nil {
return nil, xerrors.Errorf("not a car file: %w", err)
}
if _, err := rdr.Seek(0, io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek to start: %w", err)
}
pieceReader, pieceSize := padreader.New(rdr, uint64(stat.Size()))
commP, err := ffiwrapper.GeneratePieceCIDFromFile(arbitraryProofType, pieceReader, pieceSize)
if err != nil {
return nil, xerrors.Errorf("computing commP failed: %w", err)
}
return &api.CommPRet{
Root: commP,
Size: pieceSize,
}, nil
}
2020-07-31 16:22:04 +00:00
type lenWriter int64
func (w *lenWriter) Write(p []byte) (n int, err error) {
*w += lenWriter(len(p))
return len(p), nil
}
func (a *API) ClientDealSize(ctx context.Context, root cid.Cid) (api.DataSize, error) {
2021-07-03 08:31:32 +00:00
carv2FilePath, err := a.imgr().CARV2FilePathFor(root)
if err != nil {
return api.DataSize{}, xerrors.Errorf("failed to find CARv2 file for root: %w", err)
}
if len(carv2FilePath) == 0 {
return api.DataSize{}, xerrors.New("no CARv2 file for root")
}
2020-07-31 16:22:04 +00:00
2021-07-03 08:31:32 +00:00
rdOnly, err := blockstore.OpenReadOnly(carv2FilePath, false)
if err != nil {
return api.DataSize{}, xerrors.Errorf("failed to open read only blockstore: %w", err)
}
defer rdOnly.Close() //nolint:errcheck
dag := merkledag.NewDAGService(blockservice.New(rdOnly, offline.Exchange(rdOnly)))
2020-07-31 16:22:04 +00:00
w := lenWriter(0)
2021-07-03 08:31:32 +00:00
err = car.WriteCar(ctx, dag, []cid.Cid{root}, &w)
2020-07-31 16:22:04 +00:00
if err != nil {
return api.DataSize{}, err
}
up := padreader.PaddedSize(uint64(w))
return api.DataSize{
PayloadSize: int64(w),
PieceSize: up.Padded(),
}, nil
}
func (a *API) ClientDealPieceCID(ctx context.Context, root cid.Cid) (api.DataCIDSize, error) {
2021-07-03 08:31:32 +00:00
carv2FilePath, err := a.imgr().CARV2FilePathFor(root)
if err != nil {
return api.DataCIDSize{}, xerrors.Errorf("failed to find CARv2 file for root: %w", err)
}
if len(carv2FilePath) == 0 {
return api.DataCIDSize{}, xerrors.New("no CARv2 file for root")
}
rdOnly, err := blockstore.OpenReadOnly(carv2FilePath, false)
if err != nil {
return api.DataCIDSize{}, xerrors.Errorf("failed to open read only blockstore: %w", err)
}
defer rdOnly.Close() //nolint:errcheck
2021-07-03 08:31:32 +00:00
dag := merkledag.NewDAGService(blockservice.New(rdOnly, offline.Exchange(rdOnly)))
w := &writer.Writer{}
bw := bufio.NewWriterSize(w, int(writer.CommPBuf))
2021-07-03 08:31:32 +00:00
err = car.WriteCar(ctx, dag, []cid.Cid{root}, w)
if err != nil {
return api.DataCIDSize{}, err
}
if err := bw.Flush(); err != nil {
return api.DataCIDSize{}, err
}
dataCIDSize, err := w.Sum()
return api.DataCIDSize(dataCIDSize), err
}
func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath string) error {
2021-07-03 07:25:20 +00:00
id := rand.Uint64()
tmpCARv2File, err := a.imgr().NewTempFile(id)
2020-07-06 23:39:30 +00:00
if err != nil {
2021-07-03 07:25:20 +00:00
return xerrors.Errorf("failed to create temp file: %w", err)
2020-07-06 23:39:30 +00:00
}
2021-07-03 07:25:20 +00:00
defer os.Remove(tmpCARv2File) //nolint:errcheck
2021-07-03 07:25:20 +00:00
root, err := a.importNormalFileToCARv2(ctx, id, ref.Path, tmpCARv2File)
if err != nil {
2021-07-03 07:25:20 +00:00
return xerrors.Errorf("failed to import normal file to CARv2")
}
// generate a deterministic CARv1 payload from the UnixFS DAG by doing an IPLD
2021-07-03 07:25:20 +00:00
// traversal over the Unixfs DAG in the CARv2 file using the "all selector" i.e the entire DAG selector.
rdOnly, err := blockstore.OpenReadOnly(tmpCARv2File, true)
if err != nil {
return xerrors.Errorf("failed to open read only CARv2 blockstore: %w", err)
}
defer rdOnly.Close() //nolint:errcheck
2020-09-23 19:16:26 +00:00
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()
2021-07-03 07:25:20 +00:00
sc := car.NewSelectiveCar(ctx, rdOnly, []car.Dag{{Root: root, Selector: allSelector}})
f, err := os.Create(outputPath)
if err != nil {
return err
}
if err = sc.Write(f); err != nil {
2021-07-03 04:40:56 +00:00
return xerrors.Errorf("failed to write CAR to output file: %w", err)
}
return f.Close()
}
func (a *API) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) {
inProgressChannels, err := a.DataTransfer.InProgressChannels(ctx)
if err != nil {
return nil, err
}
apiChannels := make([]api.DataTransferChannel, 0, len(inProgressChannels))
for _, channelState := range inProgressChannels {
apiChannels = append(apiChannels, api.NewDataTransferChannel(a.Host.ID(), channelState))
}
return apiChannels, nil
}
func (a *API) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) {
channels := make(chan api.DataTransferChannel)
unsub := a.DataTransfer.SubscribeToEvents(func(evt datatransfer.Event, channelState datatransfer.ChannelState) {
channel := api.NewDataTransferChannel(a.Host.ID(), channelState)
select {
case <-ctx.Done():
case channels <- channel:
}
})
go func() {
defer unsub()
<-ctx.Done()
}()
return channels, nil
}
2020-08-27 18:32:51 +00:00
func (a *API) ClientRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
selfPeer := a.Host.ID()
if isInitiator {
return a.DataTransfer.RestartDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: selfPeer, Responder: otherPeer, ID: transferID})
}
return a.DataTransfer.RestartDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: otherPeer, Responder: selfPeer, ID: transferID})
2020-10-22 20:40:26 +00:00
}
func (a *API) ClientCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
selfPeer := a.Host.ID()
if isInitiator {
return a.DataTransfer.CloseDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: selfPeer, Responder: otherPeer, ID: transferID})
}
return a.DataTransfer.CloseDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: otherPeer, Responder: selfPeer, ID: transferID})
}
2020-09-04 05:34:59 +00:00
func (a *API) ClientRetrieveTryRestartInsufficientFunds(ctx context.Context, paymentChannel address.Address) error {
return a.Retrieval.TryRestartInsufficientFunds(paymentChannel)
}
func (a *API) ClientGetDealStatus(ctx context.Context, statusCode uint64) (string, error) {
ststr, ok := storagemarket.DealStates[statusCode]
if !ok {
return "", fmt.Errorf("no such deal state %d", statusCode)
}
return ststr, nil
}