deals: cleanup client state machine
This commit is contained in:
parent
79c9fb719e
commit
8ec37e8728
@ -541,7 +541,7 @@ func (t *ClientDeal) MarshalCBOR(w io.Writer) error {
|
|||||||
_, err := w.Write(cbg.CborNull)
|
_, err := w.Write(cbg.CborNull)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if _, err := w.Write([]byte{133}); err != nil {
|
if _, err := w.Write([]byte{134}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -573,6 +573,19 @@ func (t *ClientDeal) MarshalCBOR(w io.Writer) error {
|
|||||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.DealID))); err != nil {
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.DealID))); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// t.t.PublishMessage (cid.Cid) (struct)
|
||||||
|
|
||||||
|
if t.PublishMessage == nil {
|
||||||
|
if _, err := w.Write(cbg.CborNull); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err := cbg.WriteCid(w, *t.PublishMessage); err != nil {
|
||||||
|
return xerrors.Errorf("failed to write cid field t.PublishMessage: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -587,7 +600,7 @@ func (t *ClientDeal) UnmarshalCBOR(r io.Reader) error {
|
|||||||
return fmt.Errorf("cbor input should be of type array")
|
return fmt.Errorf("cbor input should be of type array")
|
||||||
}
|
}
|
||||||
|
|
||||||
if extra != 5 {
|
if extra != 6 {
|
||||||
return fmt.Errorf("cbor input had wrong number of fields")
|
return fmt.Errorf("cbor input had wrong number of fields")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -642,6 +655,30 @@ func (t *ClientDeal) UnmarshalCBOR(r io.Reader) error {
|
|||||||
return fmt.Errorf("wrong type for uint64 field")
|
return fmt.Errorf("wrong type for uint64 field")
|
||||||
}
|
}
|
||||||
t.DealID = uint64(extra)
|
t.DealID = uint64(extra)
|
||||||
|
// t.t.PublishMessage (cid.Cid) (struct)
|
||||||
|
|
||||||
|
{
|
||||||
|
|
||||||
|
pb, err := br.PeekByte()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if pb == cbg.CborNull[0] {
|
||||||
|
var nbuf [1]byte
|
||||||
|
if _, err := br.Read(nbuf[:]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
|
||||||
|
c, err := cbg.ReadCid(br)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to read cid field t.PublishMessage: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.PublishMessage = &c
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,6 +37,8 @@ type ClientDeal struct {
|
|||||||
Miner peer.ID
|
Miner peer.ID
|
||||||
DealID uint64
|
DealID uint64
|
||||||
|
|
||||||
|
PublishMessage *cid.Cid
|
||||||
|
|
||||||
s inet.Stream
|
s inet.Stream
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -64,6 +66,7 @@ type clientDealUpdate struct {
|
|||||||
newState api.DealState
|
newState api.DealState
|
||||||
id cid.Cid
|
id cid.Cid
|
||||||
err error
|
err error
|
||||||
|
mut func(*ClientDeal)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG, discovery *discovery.Local, mpool full.MpoolAPI, chainapi full.ChainAPI) *Client {
|
func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG, discovery *discovery.Local, mpool full.MpoolAPI, chainapi full.ChainAPI) *Client {
|
||||||
@ -137,6 +140,9 @@ func (c *Client) onUpdated(ctx context.Context, update clientDealUpdate) {
|
|||||||
var deal ClientDeal
|
var deal ClientDeal
|
||||||
err := c.deals.Mutate(update.id, func(d *ClientDeal) error {
|
err := c.deals.Mutate(update.id, func(d *ClientDeal) error {
|
||||||
d.State = update.newState
|
d.State = update.newState
|
||||||
|
if update.mut != nil {
|
||||||
|
update.mut(d)
|
||||||
|
}
|
||||||
deal = *d
|
deal = *d
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
package deals
|
package deals
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
|
"github.com/filecoin-project/lotus/lib/cborrpc"
|
||||||
|
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
@ -12,11 +14,11 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
type clientHandlerFunc func(ctx context.Context, deal ClientDeal) error
|
type clientHandlerFunc func(ctx context.Context, deal ClientDeal) (func(*ClientDeal), error)
|
||||||
|
|
||||||
func (c *Client) handle(ctx context.Context, deal ClientDeal, cb clientHandlerFunc, next api.DealState) {
|
func (c *Client) handle(ctx context.Context, deal ClientDeal, cb clientHandlerFunc, next api.DealState) {
|
||||||
go func() {
|
go func() {
|
||||||
err := cb(ctx, deal)
|
mut, err := cb(ctx, deal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
next = api.DealError
|
next = api.DealError
|
||||||
}
|
}
|
||||||
@ -30,87 +32,104 @@ func (c *Client) handle(ctx context.Context, deal ClientDeal, cb clientHandlerFu
|
|||||||
newState: next,
|
newState: next,
|
||||||
id: deal.ProposalCid,
|
id: deal.ProposalCid,
|
||||||
err: err,
|
err: err,
|
||||||
|
mut: mut,
|
||||||
}:
|
}:
|
||||||
case <-c.stop:
|
case <-c.stop:
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) new(ctx context.Context, deal ClientDeal) error {
|
func (c *Client) new(ctx context.Context, deal ClientDeal) (func(*ClientDeal), error) {
|
||||||
resp, err := c.readStorageDealResp(deal)
|
resp, err := c.readStorageDealResp(deal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := c.disconnect(deal); err != nil {
|
if err := c.disconnect(deal); err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* data transfer happens */
|
||||||
if resp.State != api.DealAccepted {
|
if resp.State != api.DealAccepted {
|
||||||
return xerrors.Errorf("deal wasn't accepted (State=%d)", resp.State)
|
return nil, xerrors.Errorf("deal wasn't accepted (State=%d)", resp.State)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: spec says it's optional
|
return func(info *ClientDeal) {
|
||||||
pubmsg, err := c.chain.GetMessage(*resp.PublishMessage)
|
info.PublishMessage = resp.PublishMessage
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) accepted(ctx context.Context, deal ClientDeal) (func(*ClientDeal), error) {
|
||||||
|
log.Infow("DEAL ACCEPTED!")
|
||||||
|
|
||||||
|
pubmsg, err := c.chain.GetMessage(*deal.PublishMessage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("getting deal pubsish message: %w", err)
|
return nil, xerrors.Errorf("getting deal pubsish message: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pw, err := stmgr.GetMinerWorker(ctx, c.sm, nil, deal.Proposal.Provider)
|
pw, err := stmgr.GetMinerWorker(ctx, c.sm, nil, deal.Proposal.Provider)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("getting miner worker failed: %w", err)
|
return nil, xerrors.Errorf("getting miner worker failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if pubmsg.From != pw {
|
if pubmsg.From != pw {
|
||||||
return xerrors.Errorf("deal wasn't published by storage provider: from=%s, provider=%s", pubmsg.From, deal.Proposal.Provider)
|
return nil, xerrors.Errorf("deal wasn't published by storage provider: from=%s, provider=%s", pubmsg.From, deal.Proposal.Provider)
|
||||||
}
|
}
|
||||||
|
|
||||||
if pubmsg.To != actors.StorageMarketAddress {
|
if pubmsg.To != actors.StorageMarketAddress {
|
||||||
return xerrors.Errorf("deal publish message wasn't set to StorageMarket actor (to=%s)", pubmsg.To)
|
return nil, xerrors.Errorf("deal publish message wasn't set to StorageMarket actor (to=%s)", pubmsg.To)
|
||||||
}
|
}
|
||||||
|
|
||||||
if pubmsg.Method != actors.SMAMethods.PublishStorageDeals {
|
if pubmsg.Method != actors.SMAMethods.PublishStorageDeals {
|
||||||
return xerrors.Errorf("deal publish message called incorrect method (method=%s)", pubmsg.Method)
|
return nil, xerrors.Errorf("deal publish message called incorrect method (method=%s)", pubmsg.Method)
|
||||||
|
}
|
||||||
|
|
||||||
|
var params actors.PublishStorageDealsParams
|
||||||
|
if err := params.UnmarshalCBOR(bytes.NewReader(pubmsg.Params)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
dealIdx := -1
|
||||||
|
for i, storageDeal := range params.Deals {
|
||||||
|
// TODO: make it less hacky
|
||||||
|
eq, err := cborrpc.Equals(&deal.Proposal, &storageDeal.Proposal)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if eq {
|
||||||
|
dealIdx = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if dealIdx == -1 {
|
||||||
|
return nil, xerrors.Errorf("deal publish didn't contain our deal (message cid: %s)", deal.PublishMessage)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: timeout
|
// TODO: timeout
|
||||||
_, ret, err := c.sm.WaitForMessage(ctx, *resp.PublishMessage)
|
_, ret, err := c.sm.WaitForMessage(ctx, *deal.PublishMessage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("waiting for deal publish message: %w", err)
|
return nil, xerrors.Errorf("waiting for deal publish message: %w", err)
|
||||||
}
|
}
|
||||||
if ret.ExitCode != 0 {
|
if ret.ExitCode != 0 {
|
||||||
return xerrors.Errorf("deal publish failed: exit=%d", ret.ExitCode)
|
return nil, xerrors.Errorf("deal publish failed: exit=%d", ret.ExitCode)
|
||||||
}
|
|
||||||
// TODO: persist dealId
|
|
||||||
|
|
||||||
log.Info("DEAL ACCEPTED!")
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) accepted(ctx context.Context, deal ClientDeal) error {
|
|
||||||
/* data transfer happens */
|
|
||||||
|
|
||||||
resp, err := c.readStorageDealResp(deal)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if resp.State != api.DealStaged {
|
var res actors.PublishStorageDealResponse
|
||||||
return xerrors.Errorf("deal wasn't staged (State=%d)", resp.State)
|
if err := res.UnmarshalCBOR(bytes.NewReader(ret.Return)); err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("DEAL STAGED!")
|
return func(info *ClientDeal) {
|
||||||
|
info.DealID = res.DealIDs[dealIdx]
|
||||||
return nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) staged(ctx context.Context, deal ClientDeal) error {
|
func (c *Client) staged(ctx context.Context, deal ClientDeal) (func(*ClientDeal), error) {
|
||||||
// wait
|
// TODO: Maybe wait for pre-commit
|
||||||
|
|
||||||
return nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) sealing(ctx context.Context, deal ClientDeal) error {
|
func (c *Client) sealing(ctx context.Context, deal ClientDeal) (func(*ClientDeal), error) {
|
||||||
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
|
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
|
||||||
sd, err := stmgr.GetStorageDeal(ctx, c.sm, deal.DealID, ts)
|
sd, err := stmgr.GetStorageDeal(ctx, c.sm, deal.DealID, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -172,9 +191,9 @@ func (c *Client) sealing(ctx context.Context, deal ClientDeal) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := c.events.Called(checkFunc, called, revert, 3, build.SealRandomnessLookbackLimit, deal.Proposal.Provider, actors.MAMethods.ProveCommitSector); err != nil {
|
if err := c.events.Called(checkFunc, called, revert, 3, build.SealRandomnessLookbackLimit, deal.Proposal.Provider, actors.MAMethods.ProveCommitSector); err != nil {
|
||||||
return xerrors.Errorf("failed to set up called handler")
|
return nil, xerrors.Errorf("failed to set up called handler")
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("DEAL COMPLETE!!")
|
log.Info("DEAL COMPLETE!!")
|
||||||
return nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
@ -66,3 +66,15 @@ func AsIpld(obj interface{}) (ipld.Node, error) {
|
|||||||
}
|
}
|
||||||
return cbor.WrapObject(obj, math.MaxUint64, -1)
|
return cbor.WrapObject(obj, math.MaxUint64, -1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Equals(a cbg.CBORMarshaler, b cbg.CBORMarshaler) (bool, error) {
|
||||||
|
ab, err := Dump(a)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
bb, err := Dump(b)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return bytes.Equal(ab, bb), nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user