lotus/chain/deals/client_states.go

234 lines
5.8 KiB
Go
Raw Normal View History

2019-09-10 12:35:43 +00:00
package deals
import (
2019-11-07 13:29:43 +00:00
"bytes"
2019-09-10 12:35:43 +00:00
"context"
"golang.org/x/xerrors"
2020-01-07 14:00:10 +00:00
"github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/lotus/api"
2019-11-08 20:11:56 +00:00
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
2019-09-10 12:35:43 +00:00
)
2019-11-07 13:29:43 +00:00
type clientHandlerFunc func(ctx context.Context, deal ClientDeal) (func(*ClientDeal), error)
2019-09-10 12:35:43 +00:00
2019-09-10 14:13:24 +00:00
func (c *Client) handle(ctx context.Context, deal ClientDeal, cb clientHandlerFunc, next api.DealState) {
2019-09-10 12:35:43 +00:00
go func() {
2019-11-07 13:29:43 +00:00
mut, err := cb(ctx, deal)
2019-09-13 19:43:33 +00:00
if err != nil {
next = api.DealError
}
if err == nil && next == api.DealNoUpdate {
return
}
2019-09-10 12:35:43 +00:00
select {
case c.updated <- clientDealUpdate{
newState: next,
id: deal.ProposalCid,
err: err,
2019-11-07 13:29:43 +00:00
mut: mut,
2019-09-10 12:35:43 +00:00
}:
case <-c.stop:
}
}()
}
2019-11-07 13:29:43 +00:00
func (c *Client) new(ctx context.Context, deal ClientDeal) (func(*ClientDeal), error) {
2019-09-10 12:35:43 +00:00
resp, err := c.readStorageDealResp(deal)
if err != nil {
2019-11-07 13:29:43 +00:00
return nil, err
2019-09-10 12:35:43 +00:00
}
2019-12-07 14:12:10 +00:00
// TODO: verify StorageDealSubmission
2019-11-07 12:57:00 +00:00
if err := c.disconnect(deal); err != nil {
2019-11-07 13:29:43 +00:00
return nil, err
2019-11-07 12:57:00 +00:00
}
2019-09-10 12:35:43 +00:00
2019-11-07 13:29:43 +00:00
/* data transfer happens */
2019-09-10 14:13:24 +00:00
if resp.State != api.DealAccepted {
2019-11-07 13:29:43 +00:00
return nil, xerrors.Errorf("deal wasn't accepted (State=%d)", resp.State)
2019-09-10 12:35:43 +00:00
}
2019-11-07 13:29:43 +00:00
return func(info *ClientDeal) {
2019-12-07 14:12:10 +00:00
info.PublishMessage = resp.StorageDealSubmission
2019-11-07 13:29:43 +00:00
}, nil
}
func (c *Client) accepted(ctx context.Context, deal ClientDeal) (func(*ClientDeal), error) {
log.Infow("DEAL ACCEPTED!")
2019-12-07 14:12:10 +00:00
pubmsg := deal.PublishMessage.Message
pw, err := stmgr.GetMinerWorker(ctx, c.sm, nil, deal.Proposal.Provider)
if err != nil {
2019-11-07 13:29:43 +00:00
return nil, xerrors.Errorf("getting miner worker failed: %w", err)
}
if pubmsg.From != pw {
2019-11-07 13:29:43 +00:00
return nil, xerrors.Errorf("deal wasn't published by storage provider: from=%s, provider=%s", pubmsg.From, deal.Proposal.Provider)
}
if pubmsg.To != actors.StorageMarketAddress {
2019-11-07 13:29:43 +00:00
return nil, xerrors.Errorf("deal publish message wasn't set to StorageMarket actor (to=%s)", pubmsg.To)
}
if pubmsg.Method != actors.SMAMethods.PublishStorageDeals {
2019-11-07 13:29:43 +00:00
return nil, xerrors.Errorf("deal publish message called incorrect method (method=%s)", pubmsg.Method)
}
2019-11-07 13:29:43 +00:00
var params actors.PublishStorageDealsParams
if err := params.UnmarshalCBOR(bytes.NewReader(pubmsg.Params)); err != nil {
return nil, err
}
2019-09-10 12:35:43 +00:00
2019-11-07 13:29:43 +00:00
dealIdx := -1
for i, storageDeal := range params.Deals {
// TODO: make it less hacky
sd := storageDeal
eq, err := cborutil.Equals(&deal.Proposal, &sd)
2019-11-07 13:29:43 +00:00
if err != nil {
return nil, err
}
if eq {
dealIdx = i
break
2019-11-07 13:29:43 +00:00
}
}
2019-09-10 12:35:43 +00:00
2019-11-07 13:29:43 +00:00
if dealIdx == -1 {
2019-12-07 14:12:10 +00:00
return nil, xerrors.Errorf("deal publish didn't contain our deal (message cid: %s)", deal.PublishMessage.Cid())
2019-11-07 13:29:43 +00:00
}
2019-09-10 12:35:43 +00:00
2019-11-07 13:29:43 +00:00
// TODO: timeout
2019-12-07 14:12:10 +00:00
_, ret, err := c.sm.WaitForMessage(ctx, deal.PublishMessage.Cid())
2019-09-10 12:35:43 +00:00
if err != nil {
2019-11-07 13:29:43 +00:00
return nil, xerrors.Errorf("waiting for deal publish message: %w", err)
2019-09-10 12:35:43 +00:00
}
2019-11-07 13:29:43 +00:00
if ret.ExitCode != 0 {
return nil, xerrors.Errorf("deal publish failed: exit=%d", ret.ExitCode)
2019-09-10 12:35:43 +00:00
}
2019-11-07 13:29:43 +00:00
var res actors.PublishStorageDealResponse
if err := res.UnmarshalCBOR(bytes.NewReader(ret.Return)); err != nil {
return nil, err
}
2019-09-10 12:35:43 +00:00
2019-11-07 13:29:43 +00:00
return func(info *ClientDeal) {
info.DealID = res.DealIDs[dealIdx]
}, nil
2019-09-10 12:35:43 +00:00
}
2019-11-07 13:29:43 +00:00
func (c *Client) staged(ctx context.Context, deal ClientDeal) (func(*ClientDeal), error) {
// TODO: Maybe wait for pre-commit
2019-09-10 12:35:43 +00:00
2019-11-07 13:29:43 +00:00
return nil, nil
2019-09-10 12:35:43 +00:00
}
2019-11-07 13:29:43 +00:00
func (c *Client) sealing(ctx context.Context, deal ClientDeal) (func(*ClientDeal), error) {
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
sd, err := stmgr.GetStorageDeal(ctx, c.sm, deal.DealID, ts)
if err != nil {
// TODO: This may be fine for some errors
return false, false, xerrors.Errorf("failed to look up deal on chain: %w", err)
}
if sd.ActivationEpoch > 0 {
select {
case c.updated <- clientDealUpdate{
newState: api.DealComplete,
id: deal.ProposalCid,
}:
case <-c.stop:
}
return true, false, nil
}
return false, true, nil
}
2019-11-19 21:27:25 +00:00
called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) {
defer func() {
if err != nil {
select {
case c.updated <- clientDealUpdate{
newState: api.DealComplete,
id: deal.ProposalCid,
err: xerrors.Errorf("handling applied event: %w", err),
}:
case <-c.stop:
}
}
}()
if msg == nil {
log.Error("timed out waiting for deal activation... what now?")
return false, nil
}
sd, err := stmgr.GetStorageDeal(ctx, c.sm, deal.DealID, ts)
if err != nil {
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
}
if sd.ActivationEpoch == 0 {
return false, xerrors.Errorf("deal wasn't active: deal=%d, parentState=%s, h=%d", deal.DealID, ts.ParentState(), ts.Height())
}
log.Infof("Storage deal %d activated at epoch %d", deal.DealID, sd.ActivationEpoch)
select {
case c.updated <- clientDealUpdate{
newState: api.DealComplete,
id: deal.ProposalCid,
}:
case <-c.stop:
}
return false, nil
}
revert := func(ctx context.Context, ts *types.TipSet) error {
log.Warn("deal activation reverted; TODO: actually handle this!")
// TODO: Just go back to DealSealing?
return nil
2019-09-10 12:35:43 +00:00
}
2019-11-19 16:17:26 +00:00
matchEvent := func(msg *types.Message) (bool, error) {
if msg.To != deal.Proposal.Provider {
return false, nil
}
if msg.Method != actors.MAMethods.ProveCommitSector {
return false, nil
}
var params actors.SectorProveCommitInfo
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, err
}
var found bool
for _, dealID := range params.DealIDs {
if dealID == deal.DealID {
found = true
break
}
}
return found, nil
}
if err := c.events.Called(checkFunc, called, revert, 3, build.SealRandomnessLookbackLimit, matchEvent); err != nil {
2019-11-07 13:29:43 +00:00
return nil, xerrors.Errorf("failed to set up called handler")
2019-09-10 12:35:43 +00:00
}
2019-11-07 13:29:43 +00:00
return nil, nil
2019-09-10 12:35:43 +00:00
}