Introduce stateless offline dealflow, bypassing the FSM/deallists

This is aproposal for an additional flag --manual-stateless-deal and a
corresponding API endpoint ClientStatelessDeal. This allows firing off
an offline-style deal against a miner without keeping further track of
it locally.

Not keeping any local state introduces the limitation of requiring free
storage deals, as there is nothing to tie the payment channel setup to.

Rationale/need for this type of flow is the case of incredibly large
sets of data nd deals, where the client and providers have prearranged
payment ahead of time, and the client has a separate-from-lotus database
of deal inventory. This way the client can use their lotus node merely
as a network gateway, without running into any limitations currently
present in both lotus as a whole and go-fil-markets in particular.

Specific context for this work is filecoin-discover, where the requirement
is to onboard ~ 12,000,000 individual deals against a pool of miners
with whom the client has prearranged a relationship.
This commit is contained in:
Peter Rabbitson 2021-04-05 13:11:10 +02:00
parent 3e6dcb578b
commit 7fddbb528d
4 changed files with 145 additions and 21 deletions

View File

@ -289,6 +289,8 @@ type FullNode interface {
ClientRemoveImport(ctx context.Context, importID multistore.StoreID) error
// ClientStartDeal proposes a deal with a miner.
ClientStartDeal(ctx context.Context, params *StartDealParams) (*cid.Cid, error)
// ClientStatelessDeal fire-and-forget-proposes an offline deal to a miner without subsequent tracking.
ClientStatelessDeal(ctx context.Context, params *StartDealParams) (*cid.Cid, error)
// ClientGetDealInfo returns the latest information about a given deal.
ClientGetDealInfo(context.Context, cid.Cid) (*DealInfo, error)
// ClientListDeals returns information about the deals made by the local client.

View File

@ -163,6 +163,7 @@ type FullNodeStruct struct {
ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"`
ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"`
ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"`
ClientStatelessDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"write"`
ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"`
ClientGetDealStatus func(context.Context, uint64) (string, error) `perm:"read"`
ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"`
@ -604,6 +605,10 @@ func (c *FullNodeStruct) ClientStartDeal(ctx context.Context, params *api.StartD
return c.Internal.ClientStartDeal(ctx, params)
}
func (c *FullNodeStruct) ClientStatelessDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) {
return c.Internal.ClientStatelessDeal(ctx, params)
}
func (c *FullNodeStruct) ClientGetDealInfo(ctx context.Context, deal cid.Cid) (*api.DealInfo, error) {
return c.Internal.ClientGetDealInfo(ctx, deal)
}

View File

@ -311,6 +311,10 @@ var clientDealCmd = &cli.Command{
Name: "manual-piece-size",
Usage: "if manually specifying piece cid, used to specify size (dataCid must be to a car file)",
},
&cli.BoolFlag{
Name: "manual-stateless-deal",
Usage: "instructs the node to send an offline deal without registering it with the deallist/fsm",
},
&cli.StringFlag{
Name: "from",
Usage: "specify address to fund the deal with",
@ -447,7 +451,7 @@ var clientDealCmd = &cli.Command{
isVerified = verifiedDealParam
}
proposal, err := api.ClientStartDeal(ctx, &lapi.StartDealParams{
sdParams := &lapi.StartDealParams{
Data: ref,
Wallet: a,
Miner: miner,
@ -457,7 +461,18 @@ var clientDealCmd = &cli.Command{
FastRetrieval: cctx.Bool("fast-retrieval"),
VerifiedDeal: isVerified,
ProviderCollateral: provCol,
})
}
var proposal *cid.Cid
if cctx.Bool("manual-stateless-deal") {
if ref.TransferType != storagemarket.TTManual {
return xerrors.New("when manual-stateless-deal is enabled, you must also provide a 'price' of 0 and specify 'manual-piece-cid' and 'manual-piece-size'")
}
proposal, err = api.ClientStatelessDeal(ctx, sdParams)
} else {
proposal, err = api.ClientStartDeal(ctx, sdParams)
}
if err != nil {
return err
}

View File

@ -31,10 +31,12 @@ import (
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multibase"
mh "github.com/multiformats/go-multihash"
"go.uber.org/fx"
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-commp-utils/ffiwrapper"
"github.com/filecoin-project/go-commp-utils/writer"
datatransfer "github.com/filecoin-project/go-data-transfer"
@ -43,8 +45,10 @@ import (
rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/v3/actors/builtin/market"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
@ -97,8 +101,23 @@ func (a *API) imgr() *importmgr.Mgr {
}
func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) {
return a.dealStarter(ctx, params, false)
}
func (a *API) ClientStatelessDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) {
return a.dealStarter(ctx, params, true)
}
func (a *API) dealStarter(ctx context.Context, params *api.StartDealParams, isStateless bool) (*cid.Cid, error) {
var storeID *multistore.StoreID
if params.Data.TransferType == storagemarket.TTGraphsync {
if isStateless {
if params.Data.TransferType != storagemarket.TTManual {
return nil, xerrors.Errorf("invalid transfer type %s for stateless storage deal", params.Data.TransferType)
}
if !params.EpochPrice.IsZero() {
return nil, xerrors.New("stateless storage deals can only be initiated with storage price of 0")
}
} else if params.Data.TransferType == storagemarket.TTGraphsync {
importIDs := a.imgr().List()
for _, importID := range importIDs {
info, err := a.imgr().Info(importID)
@ -146,8 +165,6 @@ func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams)
return nil, xerrors.New("data doesn't fit in a sector")
}
providerInfo := utils.NewStorageProviderInfo(params.Miner, mi.Worker, mi.SectorSize, *mi.PeerId, mi.Multiaddrs)
dealStart := params.DealStartEpoch
if dealStart <= 0 { // unset, or explicitly 'epoch undefined'
ts, err := a.ChainHead(ctx)
@ -169,25 +186,110 @@ func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams)
return nil, xerrors.Errorf("failed to get seal proof type: %w", err)
}
result, err := a.SMDealClient.ProposeStorageDeal(ctx, storagemarket.ProposeStorageDealParams{
Addr: params.Wallet,
Info: &providerInfo,
Data: params.Data,
StartEpoch: dealStart,
EndEpoch: calcDealExpiration(params.MinBlocksDuration, md, dealStart),
Price: params.EpochPrice,
Collateral: params.ProviderCollateral,
Rt: st,
FastRetrieval: params.FastRetrieval,
VerifiedDeal: params.VerifiedDeal,
StoreID: storeID,
})
// regular flow
if !isStateless {
providerInfo := utils.NewStorageProviderInfo(params.Miner, mi.Worker, mi.SectorSize, *mi.PeerId, mi.Multiaddrs)
if err != nil {
return nil, xerrors.Errorf("failed to start deal: %w", err)
result, err := a.SMDealClient.ProposeStorageDeal(ctx, storagemarket.ProposeStorageDealParams{
Addr: params.Wallet,
Info: &providerInfo,
Data: params.Data,
StartEpoch: dealStart,
EndEpoch: calcDealExpiration(params.MinBlocksDuration, md, dealStart),
Price: params.EpochPrice,
Collateral: params.ProviderCollateral,
Rt: st,
FastRetrieval: params.FastRetrieval,
VerifiedDeal: params.VerifiedDeal,
StoreID: storeID,
})
if err != nil {
return nil, xerrors.Errorf("failed to start deal: %w", err)
}
return &result.ProposalCid, nil
}
return &result.ProposalCid, nil
//
// stateless flow from here to the end
//
dealProposal := &market.DealProposal{
PieceCID: *params.Data.PieceCid,
PieceSize: params.Data.PieceSize.Padded(),
Client: walletKey,
Provider: params.Miner,
Label: params.Data.Root.Encode(multibase.MustNewEncoder('u')),
StartEpoch: dealStart,
EndEpoch: calcDealExpiration(params.MinBlocksDuration, md, dealStart),
StoragePricePerEpoch: big.Zero(),
ProviderCollateral: params.ProviderCollateral,
ClientCollateral: big.Zero(),
VerifiedDeal: params.VerifiedDeal,
}
if dealProposal.ProviderCollateral.IsZero() {
networkCollateral, err := a.StateDealProviderCollateralBounds(ctx, params.Data.PieceSize.Padded(), params.VerifiedDeal, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("failed to determine minimum provider collateral: %w", err)
}
dealProposal.ProviderCollateral = networkCollateral.Min
}
dealProposalSerialized, err := cborutil.Dump(dealProposal)
if err != nil {
return nil, xerrors.Errorf("failed to serialize deal proposal: %w", err)
}
dealProposalSig, err := a.WalletSign(ctx, walletKey, dealProposalSerialized)
if err != nil {
return nil, xerrors.Errorf("failed to sign proposal : %w", err)
}
dealProposalSigned := &market.ClientDealProposal{
Proposal: *dealProposal,
ClientSignature: *dealProposalSig,
}
dStream, err := network.NewFromLibp2pHost(a.Host,
network.RetryParameters(0, 0, 0, 0),
).NewDealStream(ctx, *mi.PeerId)
if err != nil {
return nil, xerrors.Errorf("opening dealstream to %s/%s failed: %w", params.Miner, *mi.PeerId, err)
}
if err = dStream.WriteDealProposal(network.Proposal{
FastRetrieval: true,
DealProposal: dealProposalSigned,
Piece: &storagemarket.DataRef{
TransferType: storagemarket.TTManual,
Root: params.Data.Root,
PieceCid: params.Data.PieceCid,
PieceSize: params.Data.PieceSize,
},
}); err != nil {
return nil, xerrors.Errorf("sending deal proposal failed: %w", err)
}
resp, _, err := dStream.ReadDealResponse()
if err != nil {
return nil, xerrors.Errorf("reading proposal response failed: %w", err)
}
dealProposalIpld, err := cborutil.AsIpld(dealProposalSigned)
if err != nil {
return nil, xerrors.Errorf("serializing proposal node failed: %w", err)
}
if !dealProposalIpld.Cid().Equals(resp.Response.Proposal) {
return nil, xerrors.Errorf("provider returned proposal cid %s but we expected %s", resp.Response.Proposal, dealProposalIpld.Cid())
}
if resp.Response.State != storagemarket.StorageDealWaitingForData {
return nil, xerrors.Errorf("provider returned unexpected state %d for proposal %s, with message: %s", resp.Response.State, resp.Response.Proposal, resp.Response.Message)
}
return &resp.Response.Proposal, nil
}
func (a *API) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) {