lotus/chain/deals/handler_states.go

256 lines
8.3 KiB
Go
Raw Normal View History

package deals
import (
"bytes"
"context"
2019-09-10 14:13:24 +00:00
"github.com/filecoin-project/go-lotus/api"
2019-09-09 19:21:37 +00:00
"github.com/filecoin-project/go-lotus/build"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/ipfs/go-merkledag"
unixfile "github.com/ipfs/go-unixfs/file"
"golang.org/x/xerrors"
2019-08-26 08:02:26 +00:00
2019-09-06 22:39:47 +00:00
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/types"
2019-08-26 08:02:26 +00:00
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
2019-08-26 10:04:57 +00:00
"github.com/filecoin-project/go-lotus/storage/sectorblocks"
)
2019-09-10 12:35:43 +00:00
type minerHandlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error)
2019-09-10 14:13:24 +00:00
func (h *Handler) handle(ctx context.Context, deal MinerDeal, cb minerHandlerFunc, next api.DealState) {
go func() {
2019-08-12 21:48:18 +00:00
mut, err := cb(ctx, deal)
select {
2019-09-10 12:35:43 +00:00
case h.updated <- minerDealUpdate{
newState: next,
id: deal.ProposalCid,
err: err,
2019-08-12 21:48:18 +00:00
mut: mut,
}:
case <-h.stop:
}
}()
}
// ACCEPTED
2019-09-09 19:21:37 +00:00
func (h *Handler) validateVouchers(ctx context.Context, deal MinerDeal) error {
2019-09-06 22:39:47 +00:00
curHead, err := h.full.ChainHead(ctx)
if err != nil {
2019-09-09 19:21:37 +00:00
return err
2019-09-06 22:39:47 +00:00
}
for i, voucher := range deal.Proposal.Payment.Vouchers {
err := h.full.PaychVoucherCheckValid(ctx, deal.Proposal.Payment.PayChActor, voucher)
if err != nil {
2019-09-09 19:21:37 +00:00
return xerrors.Errorf("validating payment voucher %d: %w", i, err)
}
if voucher.Extra == nil {
return xerrors.Errorf("validating payment voucher %d: voucher.Extra not set")
2019-09-06 22:39:47 +00:00
}
2019-09-09 19:21:37 +00:00
if voucher.Extra.Actor != deal.Proposal.MinerAddress {
return xerrors.Errorf("validating payment voucher %d: extra params actor didn't match miner address in proposal: '%s' != '%s'", i, voucher.Extra.Actor, deal.Proposal.MinerAddress)
}
if voucher.Extra.Method != actors.MAMethods.PaymentVerifyInclusion {
return xerrors.Errorf("validating payment voucher %d: expected extra method %d, got %d", i, actors.MAMethods.PaymentVerifyInclusion, voucher.Extra.Method)
}
var inclChallenge actors.PieceInclVoucherData
if err := cbor.DecodeInto(voucher.Extra.Data, &inclChallenge); err != nil {
return xerrors.Errorf("validating payment voucher %d: failed to decode storage voucher data for verification: %w", i, err)
}
if inclChallenge.PieceSize.Uint64() != deal.Proposal.Size {
return xerrors.Errorf("validating payment voucher %d: paych challenge piece size didn't match deal proposal size: %d != %d", i, inclChallenge.PieceSize.Uint64(), deal.Proposal.Size)
}
if !bytes.Equal(inclChallenge.CommP, deal.Proposal.CommP) {
return xerrors.Errorf("validating payment voucher %d: paych challenge commP didn't match deal proposal", i)
2019-09-06 22:39:47 +00:00
}
2019-09-09 19:21:37 +00:00
maxClose := curHead.Height() + deal.Proposal.Duration + build.DealVoucherSkewLimit
if voucher.MinCloseHeight > maxClose {
return xerrors.Errorf("validating payment voucher %d: MinCloseHeight too high (%d), max expected: %d", i, voucher.MinCloseHeight, maxClose)
2019-09-06 22:39:47 +00:00
}
2019-09-09 19:21:37 +00:00
if voucher.TimeLock > maxClose {
return xerrors.Errorf("validating payment voucher %d: TimeLock too high (%d), max expected: %d", i, voucher.TimeLock, maxClose)
2019-09-06 22:39:47 +00:00
}
if len(voucher.Merges) > 0 {
2019-09-09 19:21:37 +00:00
return xerrors.Errorf("validating payment voucher %d: didn't expect any merges", i)
2019-09-06 22:39:47 +00:00
}
// TODO: make sure that current laneStatus.Amount == 0
if types.BigCmp(voucher.Amount, deal.Proposal.TotalPrice) < 0 {
2019-09-09 19:21:37 +00:00
return xerrors.Errorf("validating payment voucher %d: not enough funds in the voucher", i)
2019-09-06 22:39:47 +00:00
}
minPrice := types.BigMul(types.BigMul(h.pricePerByteBlock, types.NewInt(deal.Proposal.Size)), types.NewInt(deal.Proposal.Duration))
if types.BigCmp(minPrice, deal.Proposal.TotalPrice) > 0 {
2019-09-09 19:21:37 +00:00
return xerrors.Errorf("validating payment voucher %d: minimum price: %s", i, minPrice)
2019-09-06 22:39:47 +00:00
}
}
2019-09-09 19:21:37 +00:00
return nil
}
func (h *Handler) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
switch deal.Proposal.SerializationMode {
//case SerializationRaw:
//case SerializationIPLD:
case SerializationUnixFs:
default:
return nil, xerrors.Errorf("deal proposal with unsupported serialization: %s", deal.Proposal.SerializationMode)
}
if deal.Proposal.Payment.ChannelMessage != nil {
log.Info("waiting for channel message to appear on chain")
2019-09-13 20:50:07 +00:00
if _, err := h.full.ChainWaitMsg(ctx, *deal.Proposal.Payment.ChannelMessage); err != nil {
return nil, xerrors.Errorf("waiting for paych message: %w", err)
}
}
2019-09-09 19:21:37 +00:00
if err := h.validateVouchers(ctx, deal); err != nil {
return nil, err
}
2019-09-06 22:39:47 +00:00
for i, voucher := range deal.Proposal.Payment.Vouchers {
if err := h.full.PaychVoucherAdd(ctx, deal.Proposal.Payment.PayChActor, voucher, nil); err != nil {
2019-09-06 22:39:47 +00:00
return nil, xerrors.Errorf("consuming payment voucher %d: %w", i, err)
}
}
2019-08-07 20:16:26 +00:00
log.Info("fetching data for a deal")
2019-09-09 19:21:37 +00:00
err := h.sendSignedResponse(StorageDealResponse{
2019-09-10 14:13:24 +00:00
State: api.DealAccepted,
Message: "",
Proposal: deal.ProposalCid,
})
if err != nil {
2019-08-12 21:48:18 +00:00
return nil, err
}
2019-08-12 21:48:18 +00:00
return nil, merkledag.FetchGraph(ctx, deal.Ref, h.dag)
}
// STAGED
2019-08-12 21:48:18 +00:00
func (h *Handler) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
err := h.sendSignedResponse(StorageDealResponse{
2019-09-10 14:13:24 +00:00
State: api.DealStaged,
Proposal: deal.ProposalCid,
})
if err != nil {
2019-08-07 19:48:53 +00:00
log.Warnf("Sending deal response failed: %s", err)
}
root, err := h.dag.Get(ctx, deal.Ref)
if err != nil {
2019-08-12 21:48:18 +00:00
return nil, xerrors.Errorf("failed to get file root for deal: %s", err)
}
// TODO: abstract this away into ReadSizeCloser + implement different modes
n, err := unixfile.NewUnixfsFile(ctx, h.dag, root)
if err != nil {
2019-08-12 21:48:18 +00:00
return nil, xerrors.Errorf("cannot open unixfs file: %s", err)
}
2019-08-26 10:04:57 +00:00
uf, ok := n.(sectorblocks.UnixfsReader)
if !ok {
// we probably got directory, unsupported for now
2019-08-26 10:04:57 +00:00
return nil, xerrors.Errorf("unsupported unixfs file type")
}
2019-08-26 08:02:26 +00:00
sectorID, err := h.secst.AddUnixfsPiece(deal.Proposal.PieceRef, uf, deal.Proposal.Duration)
if err != nil {
2019-08-12 21:48:18 +00:00
return nil, xerrors.Errorf("AddPiece failed: %s", err)
}
log.Warnf("New Sector: %d", sectorID)
2019-08-12 21:48:18 +00:00
return func(deal *MinerDeal) {
deal.SectorID = sectorID
}, nil
}
// SEALING
2019-08-12 21:48:18 +00:00
func getInclusionProof(ref string, status sectorbuilder.SectorSealingStatus) (PieceInclusionProof, error) {
for i, p := range status.Pieces {
if p.Key == ref {
return PieceInclusionProof{
Position: uint64(i),
ProofElements: p.InclusionProof,
}, nil
}
}
return PieceInclusionProof{}, xerrors.Errorf("pieceInclusionProof for %s in sector %d not found", ref, status.SectorID)
}
2019-08-14 20:27:10 +00:00
func (h *Handler) waitSealed(deal MinerDeal) (sectorbuilder.SectorSealingStatus, error) {
status, err := h.secst.WaitSeal(context.TODO(), deal.SectorID)
if err != nil {
return sectorbuilder.SectorSealingStatus{}, err
}
2019-08-12 21:48:18 +00:00
2019-08-14 20:27:10 +00:00
switch status.SealStatusCode {
case 0: // sealed
case 2: // failed
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sealing sector %d for deal %s (ref=%s) failed: %s", deal.SectorID, deal.ProposalCid, deal.Ref, status.SealErrorMsg)
case 1: // pending
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sector status was 'pending' after call to WaitSeal (for sector %d)", deal.SectorID)
case 3: // sealing
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sector status was 'wait' after call to WaitSeal (for sector %d)", deal.SectorID)
default:
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("unknown SealStatusCode: %d", status.SectorID)
2019-08-12 21:48:18 +00:00
}
2019-08-14 20:27:10 +00:00
2019-08-12 21:48:18 +00:00
return status, nil
}
func (h *Handler) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
2019-08-14 20:27:10 +00:00
status, err := h.waitSealed(deal)
2019-08-12 21:48:18 +00:00
if err != nil {
return nil, err
}
2019-08-28 21:11:29 +00:00
// TODO: don't hardcode unixfs
2019-08-28 23:01:38 +00:00
ip, err := getInclusionProof(string(sectorblocks.SerializationUnixfs0)+deal.Ref.String(), status)
2019-08-12 21:48:18 +00:00
if err != nil {
return nil, err
}
proof := &actors.InclusionProof{
Sector: deal.SectorID,
Proof: ip.ProofElements,
}
proofB, err := cbor.DumpObject(proof)
if err != nil {
return nil, err
}
// store proofs for channels
for i, v := range deal.Proposal.Payment.Vouchers {
if v.Extra.Method == actors.MAMethods.PaymentVerifyInclusion {
if err := h.full.PaychVoucherAdd(ctx, deal.Proposal.Payment.PayChActor, v, proofB); err != nil {
return nil, xerrors.Errorf("storing payment voucher %d proof: %w", i, err)
}
}
}
2019-08-12 21:48:18 +00:00
err = h.sendSignedResponse(StorageDealResponse{
2019-09-10 14:13:24 +00:00
State: api.DealSealing,
2019-08-12 21:48:18 +00:00
Proposal: deal.ProposalCid,
PieceInclusionProof: ip,
2019-09-10 12:35:43 +00:00
CommD: status.CommD[:],
2019-08-12 21:48:18 +00:00
})
if err != nil {
log.Warnf("Sending deal response failed: %s", err)
}
return nil, nil
}