lotus/chain/deals/provider_states.go

232 lines
6.8 KiB
Go
Raw Normal View History

package deals
import (
"bytes"
"context"
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
unixfile "github.com/ipfs/go-unixfs/file"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
2019-11-06 19:00:04 +00:00
"github.com/filecoin-project/lotus/lib/padreader"
"github.com/filecoin-project/lotus/storage/sectorblocks"
)
type providerHandlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error)
func (p *Provider) handle(ctx context.Context, deal MinerDeal, cb providerHandlerFunc, next api.DealState) {
go func() {
mut, err := cb(ctx, deal)
if err == nil && next == api.DealNoUpdate {
return
}
select {
case p.updated <- minerDealUpdate{
newState: next,
id: deal.ProposalCid,
err: err,
mut: mut,
}:
case <-p.stop:
}
}()
}
// ACCEPTED
func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
switch deal.Proposal.PieceSerialization {
//case SerializationRaw:
//case SerializationIPLD:
case actors.SerializationUnixFSv0:
default:
return nil, xerrors.Errorf("deal proposal with unsupported serialization: %s", deal.Proposal.PieceSerialization)
}
head, err := p.full.ChainHead(ctx)
if err != nil {
return nil, err
}
if head.Height() >= deal.Proposal.ProposalExpiration {
return nil, xerrors.Errorf("deal proposal already expired")
}
2019-10-25 12:44:34 +00:00
// TODO: check StorageCollateral
2019-10-29 12:02:24 +00:00
minPrice := types.BigDiv(types.BigMul(p.ask.Ask.Price, types.NewInt(deal.Proposal.PieceSize)), types.NewInt(1<<30))
2019-10-29 10:01:18 +00:00
if deal.Proposal.StoragePricePerEpoch.LessThan(minPrice) {
return nil, xerrors.Errorf("storage price per epoch less than asking price: %s < %s", deal.Proposal.StoragePricePerEpoch, minPrice)
2019-10-25 12:44:34 +00:00
}
if deal.Proposal.PieceSize < p.ask.Ask.MinPieceSize {
return nil, xerrors.Errorf("piece size less than minimum required size: %d < %d", deal.Proposal.PieceSize, p.ask.Ask.MinPieceSize)
}
// check market funds
2019-10-22 10:09:36 +00:00
clientMarketBalance, err := p.full.StateMarketBalance(ctx, deal.Proposal.Client, nil)
if err != nil {
return nil, xerrors.Errorf("getting client market balance failed: %w", err)
}
// This doesn't guarantee that the client won't withdraw / lock those funds
// but it's a decent first filter
2019-10-29 10:01:18 +00:00
if clientMarketBalance.Available.LessThan(deal.Proposal.TotalStoragePrice()) {
return nil, xerrors.New("clientMarketBalance.Available too small")
}
waddr, err := p.full.StateMinerWorker(ctx, deal.Proposal.Provider, nil)
if err != nil {
return nil, err
}
2019-11-08 17:15:38 +00:00
// TODO: check StorageCollateral (may be too large (or too small))
if err := p.full.MarketEnsureAvailable(ctx, waddr, deal.Proposal.StorageCollateral); err != nil {
return nil, err
}
log.Info("publishing deal")
storageDeal := actors.StorageDeal{
Proposal: deal.Proposal,
}
if err := api.SignWith(ctx, p.full.WalletSign, waddr, &storageDeal); err != nil {
return nil, xerrors.Errorf("signing storage deal failed: ", err)
}
params, err := actors.SerializeParams(&actors.PublishStorageDealsParams{
Deals: []actors.StorageDeal{storageDeal},
})
if err != nil {
return nil, xerrors.Errorf("serializing PublishStorageDeals params failed: ", err)
}
// TODO: We may want this to happen after fetching data
smsg, err := p.full.MpoolPushMessage(ctx, &types.Message{
To: actors.StorageMarketAddress,
From: waddr,
Value: types.NewInt(0),
GasPrice: types.NewInt(0),
GasLimit: types.NewInt(1000000),
Method: actors.SMAMethods.PublishStorageDeals,
Params: params,
})
if err != nil {
return nil, err
}
r, err := p.full.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return nil, err
}
if r.Receipt.ExitCode != 0 {
return nil, xerrors.Errorf("publishing deal failed: exit %d", r.Receipt.ExitCode)
}
var resp actors.PublishStorageDealResponse
if err := resp.UnmarshalCBOR(bytes.NewReader(r.Receipt.Return)); err != nil {
return nil, err
}
if len(resp.DealIDs) != 1 {
return nil, xerrors.Errorf("got unexpected number of DealIDs from SMA")
}
log.Infof("fetching data for a deal %d", resp.DealIDs[0])
mcid := smsg.Cid()
err = p.sendSignedResponse(&Response{
2019-11-07 12:57:00 +00:00
State: api.DealAccepted,
2019-10-22 10:09:36 +00:00
Proposal: deal.ProposalCid,
PublishMessage: &mcid,
2019-11-07 12:57:00 +00:00
StorageDeal: &storageDeal,
})
if err != nil {
return nil, err
}
2019-11-07 12:57:00 +00:00
if err := p.disconnect(deal); err != nil {
log.Warnf("closing client connection: %+v", err)
}
ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
// this is the selector for "get the whole DAG"
// TODO: support storage deals with custom payload selectors
allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()
// initiate a pull data transfer. This will complete asynchronously and the
// completion of the data transfer will trigger a change in deal state
// (see onDataTransferEvent)
_, err = p.dataTransfer.OpenPullDataChannel(ctx,
deal.Client,
&StorageDataTransferVoucher{Proposal: deal.ProposalCid, DealID: resp.DealIDs[0]},
deal.Ref,
allSelector,
)
2019-12-05 05:14:19 +00:00
if err != nil {
return nil, xerrors.Errorf("failed to open pull data channel: %w", err)
}
return nil, nil
}
// STAGED
func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
root, err := p.dag.Get(ctx, deal.Ref)
if err != nil {
return nil, xerrors.Errorf("failed to get file root for deal: %s", err)
}
// TODO: abstract this away into ReadSizeCloser + implement different modes
n, err := unixfile.NewUnixfsFile(ctx, p.dag, root)
if err != nil {
return nil, xerrors.Errorf("cannot open unixfs file: %s", err)
}
uf, ok := n.(sectorblocks.UnixfsReader)
if !ok {
// we probably got directory, unsupported for now
return nil, xerrors.Errorf("unsupported unixfs file type")
}
2019-10-23 18:04:07 +00:00
// TODO: uf.Size() is user input, not trusted
// This won't be useful / here after we migrate to putting CARs into sectors
size, err := uf.Size()
if err != nil {
return nil, xerrors.Errorf("getting unixfs file size: %w", err)
}
2019-11-06 19:00:04 +00:00
if padreader.PaddedSize(uint64(size)) != deal.Proposal.PieceSize {
return nil, xerrors.Errorf("deal.Proposal.PieceSize didn't match padded unixfs file size")
2019-10-23 18:04:07 +00:00
}
2019-12-01 17:58:31 +00:00
sectorID, err := p.secb.AddUnixfsPiece(ctx, uf, deal.DealID)
2019-11-05 18:40:51 +00:00
if err != nil {
return nil, xerrors.Errorf("AddPiece failed: %s", err)
}
log.Warnf("New Sector: %d (deal %d)", sectorID, deal.DealID)
2019-11-05 18:40:51 +00:00
return func(deal *MinerDeal) {
deal.SectorID = sectorID
}, nil
}
// SEALING
func (p *Provider) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
2019-11-07 18:22:59 +00:00
// TODO: consider waiting for seal to happen
return nil, nil
}
func (p *Provider) complete(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
// TODO: observe sector lifecycle, status, expiration..
return nil, nil
}