From 7fddbb528dff3aa540ca37b27951e2667ec44e36 Mon Sep 17 00:00:00 2001 From: Peter Rabbitson Date: Mon, 5 Apr 2021 13:11:10 +0200 Subject: [PATCH] Introduce stateless offline dealflow, bypassing the FSM/deallists This is aproposal for an additional flag --manual-stateless-deal and a corresponding API endpoint ClientStatelessDeal. This allows firing off an offline-style deal against a miner without keeping further track of it locally. Not keeping any local state introduces the limitation of requiring free storage deals, as there is nothing to tie the payment channel setup to. Rationale/need for this type of flow is the case of incredibly large sets of data nd deals, where the client and providers have prearranged payment ahead of time, and the client has a separate-from-lotus database of deal inventory. This way the client can use their lotus node merely as a network gateway, without running into any limitations currently present in both lotus as a whole and go-fil-markets in particular. Specific context for this work is filecoin-discover, where the requirement is to onboard ~ 12,000,000 individual deals against a pool of miners with whom the client has prearranged a relationship. --- api/api_full.go | 2 + api/apistruct/struct.go | 5 ++ cli/client.go | 19 ++++- node/impl/client/client.go | 140 ++++++++++++++++++++++++++++++++----- 4 files changed, 145 insertions(+), 21 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index ca3a02c74..3ed28f429 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -289,6 +289,8 @@ type FullNode interface { ClientRemoveImport(ctx context.Context, importID multistore.StoreID) error // ClientStartDeal proposes a deal with a miner. ClientStartDeal(ctx context.Context, params *StartDealParams) (*cid.Cid, error) + // ClientStatelessDeal fire-and-forget-proposes an offline deal to a miner without subsequent tracking. + ClientStatelessDeal(ctx context.Context, params *StartDealParams) (*cid.Cid, error) // ClientGetDealInfo returns the latest information about a given deal. ClientGetDealInfo(context.Context, cid.Cid) (*DealInfo, error) // ClientListDeals returns information about the deals made by the local client. diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 34b18cd41..fb08c24ee 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -163,6 +163,7 @@ type FullNodeStruct struct { ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"` ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"` ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"` + ClientStatelessDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"write"` ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"` ClientGetDealStatus func(context.Context, uint64) (string, error) `perm:"read"` ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"` @@ -604,6 +605,10 @@ func (c *FullNodeStruct) ClientStartDeal(ctx context.Context, params *api.StartD return c.Internal.ClientStartDeal(ctx, params) } +func (c *FullNodeStruct) ClientStatelessDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) { + return c.Internal.ClientStatelessDeal(ctx, params) +} + func (c *FullNodeStruct) ClientGetDealInfo(ctx context.Context, deal cid.Cid) (*api.DealInfo, error) { return c.Internal.ClientGetDealInfo(ctx, deal) } diff --git a/cli/client.go b/cli/client.go index 98f4b0229..189d34882 100644 --- a/cli/client.go +++ b/cli/client.go @@ -311,6 +311,10 @@ var clientDealCmd = &cli.Command{ Name: "manual-piece-size", Usage: "if manually specifying piece cid, used to specify size (dataCid must be to a car file)", }, + &cli.BoolFlag{ + Name: "manual-stateless-deal", + Usage: "instructs the node to send an offline deal without registering it with the deallist/fsm", + }, &cli.StringFlag{ Name: "from", Usage: "specify address to fund the deal with", @@ -447,7 +451,7 @@ var clientDealCmd = &cli.Command{ isVerified = verifiedDealParam } - proposal, err := api.ClientStartDeal(ctx, &lapi.StartDealParams{ + sdParams := &lapi.StartDealParams{ Data: ref, Wallet: a, Miner: miner, @@ -457,7 +461,18 @@ var clientDealCmd = &cli.Command{ FastRetrieval: cctx.Bool("fast-retrieval"), VerifiedDeal: isVerified, ProviderCollateral: provCol, - }) + } + + var proposal *cid.Cid + if cctx.Bool("manual-stateless-deal") { + if ref.TransferType != storagemarket.TTManual { + return xerrors.New("when manual-stateless-deal is enabled, you must also provide a 'price' of 0 and specify 'manual-piece-cid' and 'manual-piece-size'") + } + proposal, err = api.ClientStatelessDeal(ctx, sdParams) + } else { + proposal, err = api.ClientStartDeal(ctx, sdParams) + } + if err != nil { return err } diff --git a/node/impl/client/client.go b/node/impl/client/client.go index ac526ac60..0576fcbf4 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -31,10 +31,12 @@ import ( "github.com/ipld/go-ipld-prime/traversal/selector/builder" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" + "github.com/multiformats/go-multibase" mh "github.com/multiformats/go-multihash" "go.uber.org/fx" "github.com/filecoin-project/go-address" + cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/go-commp-utils/ffiwrapper" "github.com/filecoin-project/go-commp-utils/writer" datatransfer "github.com/filecoin-project/go-data-transfer" @@ -43,8 +45,10 @@ import ( rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/specs-actors/v3/actors/builtin/market" marketevents "github.com/filecoin-project/lotus/markets/loggers" @@ -97,8 +101,23 @@ func (a *API) imgr() *importmgr.Mgr { } func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) { + return a.dealStarter(ctx, params, false) +} + +func (a *API) ClientStatelessDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) { + return a.dealStarter(ctx, params, true) +} + +func (a *API) dealStarter(ctx context.Context, params *api.StartDealParams, isStateless bool) (*cid.Cid, error) { var storeID *multistore.StoreID - if params.Data.TransferType == storagemarket.TTGraphsync { + if isStateless { + if params.Data.TransferType != storagemarket.TTManual { + return nil, xerrors.Errorf("invalid transfer type %s for stateless storage deal", params.Data.TransferType) + } + if !params.EpochPrice.IsZero() { + return nil, xerrors.New("stateless storage deals can only be initiated with storage price of 0") + } + } else if params.Data.TransferType == storagemarket.TTGraphsync { importIDs := a.imgr().List() for _, importID := range importIDs { info, err := a.imgr().Info(importID) @@ -146,8 +165,6 @@ func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams) return nil, xerrors.New("data doesn't fit in a sector") } - providerInfo := utils.NewStorageProviderInfo(params.Miner, mi.Worker, mi.SectorSize, *mi.PeerId, mi.Multiaddrs) - dealStart := params.DealStartEpoch if dealStart <= 0 { // unset, or explicitly 'epoch undefined' ts, err := a.ChainHead(ctx) @@ -169,25 +186,110 @@ func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams) return nil, xerrors.Errorf("failed to get seal proof type: %w", err) } - result, err := a.SMDealClient.ProposeStorageDeal(ctx, storagemarket.ProposeStorageDealParams{ - Addr: params.Wallet, - Info: &providerInfo, - Data: params.Data, - StartEpoch: dealStart, - EndEpoch: calcDealExpiration(params.MinBlocksDuration, md, dealStart), - Price: params.EpochPrice, - Collateral: params.ProviderCollateral, - Rt: st, - FastRetrieval: params.FastRetrieval, - VerifiedDeal: params.VerifiedDeal, - StoreID: storeID, - }) + // regular flow + if !isStateless { + providerInfo := utils.NewStorageProviderInfo(params.Miner, mi.Worker, mi.SectorSize, *mi.PeerId, mi.Multiaddrs) - if err != nil { - return nil, xerrors.Errorf("failed to start deal: %w", err) + result, err := a.SMDealClient.ProposeStorageDeal(ctx, storagemarket.ProposeStorageDealParams{ + Addr: params.Wallet, + Info: &providerInfo, + Data: params.Data, + StartEpoch: dealStart, + EndEpoch: calcDealExpiration(params.MinBlocksDuration, md, dealStart), + Price: params.EpochPrice, + Collateral: params.ProviderCollateral, + Rt: st, + FastRetrieval: params.FastRetrieval, + VerifiedDeal: params.VerifiedDeal, + StoreID: storeID, + }) + + if err != nil { + return nil, xerrors.Errorf("failed to start deal: %w", err) + } + + return &result.ProposalCid, nil } - return &result.ProposalCid, nil + // + // stateless flow from here to the end + // + + dealProposal := &market.DealProposal{ + PieceCID: *params.Data.PieceCid, + PieceSize: params.Data.PieceSize.Padded(), + Client: walletKey, + Provider: params.Miner, + Label: params.Data.Root.Encode(multibase.MustNewEncoder('u')), + StartEpoch: dealStart, + EndEpoch: calcDealExpiration(params.MinBlocksDuration, md, dealStart), + StoragePricePerEpoch: big.Zero(), + ProviderCollateral: params.ProviderCollateral, + ClientCollateral: big.Zero(), + VerifiedDeal: params.VerifiedDeal, + } + + if dealProposal.ProviderCollateral.IsZero() { + networkCollateral, err := a.StateDealProviderCollateralBounds(ctx, params.Data.PieceSize.Padded(), params.VerifiedDeal, types.EmptyTSK) + if err != nil { + return nil, xerrors.Errorf("failed to determine minimum provider collateral: %w", err) + } + dealProposal.ProviderCollateral = networkCollateral.Min + } + + dealProposalSerialized, err := cborutil.Dump(dealProposal) + if err != nil { + return nil, xerrors.Errorf("failed to serialize deal proposal: %w", err) + } + + dealProposalSig, err := a.WalletSign(ctx, walletKey, dealProposalSerialized) + if err != nil { + return nil, xerrors.Errorf("failed to sign proposal : %w", err) + } + + dealProposalSigned := &market.ClientDealProposal{ + Proposal: *dealProposal, + ClientSignature: *dealProposalSig, + } + dStream, err := network.NewFromLibp2pHost(a.Host, + network.RetryParameters(0, 0, 0, 0), + ).NewDealStream(ctx, *mi.PeerId) + if err != nil { + return nil, xerrors.Errorf("opening dealstream to %s/%s failed: %w", params.Miner, *mi.PeerId, err) + } + + if err = dStream.WriteDealProposal(network.Proposal{ + FastRetrieval: true, + DealProposal: dealProposalSigned, + Piece: &storagemarket.DataRef{ + TransferType: storagemarket.TTManual, + Root: params.Data.Root, + PieceCid: params.Data.PieceCid, + PieceSize: params.Data.PieceSize, + }, + }); err != nil { + return nil, xerrors.Errorf("sending deal proposal failed: %w", err) + } + + resp, _, err := dStream.ReadDealResponse() + if err != nil { + return nil, xerrors.Errorf("reading proposal response failed: %w", err) + } + + dealProposalIpld, err := cborutil.AsIpld(dealProposalSigned) + if err != nil { + return nil, xerrors.Errorf("serializing proposal node failed: %w", err) + } + + if !dealProposalIpld.Cid().Equals(resp.Response.Proposal) { + return nil, xerrors.Errorf("provider returned proposal cid %s but we expected %s", resp.Response.Proposal, dealProposalIpld.Cid()) + } + + if resp.Response.State != storagemarket.StorageDealWaitingForData { + return nil, xerrors.Errorf("provider returned unexpected state %d for proposal %s, with message: %s", resp.Response.State, resp.Response.Proposal, resp.Response.Message) + } + + return &resp.Response.Proposal, nil } func (a *API) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) {