lotus/chain/deals/client_states.go

189 lines
5.2 KiB
Go
Raw Normal View History

2019-09-10 12:35:43 +00:00
package deals
import (
"context"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
2019-09-10 12:35:43 +00:00
)
type clientHandlerFunc func(ctx context.Context, deal ClientDeal) error
2019-09-10 14:13:24 +00:00
func (c *Client) handle(ctx context.Context, deal ClientDeal, cb clientHandlerFunc, next api.DealState) {
2019-09-10 12:35:43 +00:00
go func() {
err := cb(ctx, deal)
2019-09-13 19:43:33 +00:00
if err != nil {
next = api.DealError
}
2019-09-10 12:35:43 +00:00
select {
case c.updated <- clientDealUpdate{
newState: next,
id: deal.ProposalCid,
err: err,
}:
case <-c.stop:
}
}()
}
func (c *Client) new(ctx context.Context, deal ClientDeal) error {
resp, err := c.readStorageDealResp(deal)
if err != nil {
return err
}
2019-09-10 14:13:24 +00:00
if resp.State != api.DealAccepted {
2019-09-10 12:35:43 +00:00
return xerrors.Errorf("deal wasn't accepted (State=%d)", resp.State)
}
// TODO: spec says it's optional
pubmsg, err := c.chain.GetMessage(*resp.PublishMessage)
if err != nil {
return xerrors.Errorf("getting deal pubsish message: %w", err)
}
pw, err := stmgr.GetMinerWorker(ctx, c.sm, nil, deal.Proposal.Provider)
if err != nil {
return xerrors.Errorf("getting miner worker failed: %w", err)
}
if pubmsg.From != pw {
return xerrors.Errorf("deal wasn't published by storage provider: from=%s, provider=%s", pubmsg.From, deal.Proposal.Provider)
}
if pubmsg.To != actors.StorageMarketAddress {
return xerrors.Errorf("deal publish message wasn't set to StorageMarket actor (to=%s)", pubmsg.To)
}
if pubmsg.Method != actors.SMAMethods.PublishStorageDeals {
return xerrors.Errorf("deal publish message called incorrect method (method=%s)", pubmsg.Method)
}
// TODO: timeout
_, ret, err := c.sm.WaitForMessage(ctx, *resp.PublishMessage)
if err != nil {
return xerrors.Errorf("waiting for deal publish message: %w", err)
}
if ret.ExitCode != 0 {
return xerrors.Errorf("deal publish failed: exit=%d", ret.ExitCode)
}
// TODO: persist dealId
2019-09-10 12:35:43 +00:00
log.Info("DEAL ACCEPTED!")
return nil
}
func (c *Client) accepted(ctx context.Context, deal ClientDeal) error {
/* data transfer happens */
resp, err := c.readStorageDealResp(deal)
if err != nil {
return err
}
2019-09-10 14:13:24 +00:00
if resp.State != api.DealStaged {
2019-09-10 12:35:43 +00:00
return xerrors.Errorf("deal wasn't staged (State=%d)", resp.State)
}
log.Info("DEAL STAGED!")
return nil
}
func (c *Client) staged(ctx context.Context, deal ClientDeal) error {
/* miner seals our data, hopefully */
resp, err := c.readStorageDealResp(deal)
if err != nil {
return err
}
2019-09-10 14:13:24 +00:00
if resp.State != api.DealSealing {
2019-09-10 12:35:43 +00:00
return xerrors.Errorf("deal wasn't sealed (State=%d)", resp.State)
}
log.Info("DEAL SEALED!")
// TODO: want?
/*ssize, err := stmgr.GetMinerSectorSize(ctx, c.sm, nil, deal.Proposal.MinerAddress)
if err != nil {
return xerrors.Errorf("failed to get miner sector size: %w", err)
}
ok, err := sectorbuilder.VerifyPieceInclusionProof(ssize, deal.Proposal.Size, deal.Proposal.CommP, resp.CommD, resp.PieceInclusionProof.ProofElements)
2019-09-10 12:35:43 +00:00
if err != nil {
return xerrors.Errorf("verifying piece inclusion proof in staged deal %s: %w", deal.ProposalCid, err)
}
if !ok {
return xerrors.Errorf("verifying piece inclusion proof in staged deal %s failed", deal.ProposalCid)
}*/
2019-09-10 12:35:43 +00:00
return nil
}
func (c *Client) sealing(ctx context.Context, deal ClientDeal) error {
//func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHandler, confidence int, timeout uint64, actor address.Address, method uint64) error {
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
sd, err := stmgr.GetStorageDeal(ctx, c.stmgr, deal.DealID, ts)
if err != nil {
return false, false, xerrors.Errorf("failed to look up deal on chain: %w", err)
}
if sd.ActivationEpoch > 0 {
// Deal is active already!
panic("handle me")
return true, false, nil
}
return false, true, nil
}
called := func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) {
// To ask Magik: Does this trigger when the message in question is part of the parent state execution? Or just when its included in the block (aka, not executed)
// main thing i want to ensure is that ts.ParentState is the result of the execution of msg
if msg == nil {
log.Error("timed out waiting for deal activation... what now?")
return false, nil
}
// TODO: can check msg.Params to see if we should even bother checking the state
sd, err := stmgr.GetStorageDeal(ctx, c.stmgr, deal.DealID, ts)
if err != nil {
return false, false, xerrors.Errorf("failed to look up deal on chain: %w", err)
}
if sd.ActivationEpoch == 0 {
return true, nil
}
// Deal is active!
panic("handle me")
return false, nil
}
if err := c.events.Called(checkFunc, handler, rev, 3, 100, actors.StorageMarketAddress, actors.SMAMethods.ActivateStorageDeals); err != nil {
return xerrors.Errorf("failed to set up called handler")
}
2019-09-10 12:35:43 +00:00
resp, err := c.readStorageDealResp(deal)
if err != nil {
return err
}
2019-09-10 14:13:24 +00:00
if resp.State != api.DealComplete {
2019-09-10 12:35:43 +00:00
return xerrors.Errorf("deal wasn't complete (State=%d)", resp.State)
}
// TODO: look for the commit message on chain, negotiate better payment vouchers
log.Info("DEAL COMPLETE!!")
return nil
}