diff --git a/chain/deals/client.go b/chain/deals/client.go index 427565890..7be56801e 100644 --- a/chain/deals/client.go +++ b/chain/deals/client.go @@ -4,25 +4,20 @@ import ( "context" "math" - sectorbuilder "github.com/filecoin-project/go-sectorbuilder" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" - files "github.com/ipfs/go-ipfs-files" cbor "github.com/ipfs/go-ipld-cbor" logging "github.com/ipfs/go-log" - unixfile "github.com/ipfs/go-unixfs/file" "github.com/libp2p/go-libp2p-core/host" inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" - "golang.org/x/xerrors" "github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/wallet" - "github.com/filecoin-project/go-lotus/lib/cborrpc" "github.com/filecoin-project/go-lotus/node/modules/dtypes" "github.com/filecoin-project/go-lotus/retrieval/discovery" ) @@ -33,18 +28,13 @@ func init() { var log = logging.Logger("deals") -const ProtocolID = "/fil/storage/mk/1.0.0" - -type DealStatus int - -const ( - DealResolvingMiner = DealStatus(iota) -) - type ClientDeal struct { ProposalCid cid.Cid - Status DealStatus + Proposal StorageDealProposal + State DealState Miner peer.ID + + s inet.Stream } type Client struct { @@ -55,13 +45,21 @@ type Client struct { discovery *discovery.Local deals StateStore + conns map[cid.Cid]inet.Stream incoming chan ClientDeal + updated chan clientDealUpdate stop chan struct{} stopped chan struct{} } +type clientDealUpdate struct { + newState DealState + id cid.Cid + err error +} + func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG, discovery *discovery.Local) *Client { c := &Client{ cs: cs, @@ -71,8 +69,10 @@ func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.Me discovery: discovery, deals: StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))}, + conns: map[cid.Cid]inet.Stream{}, incoming: make(chan ClientDeal, 16), + updated: make(chan clientDealUpdate, 16), stop: make(chan struct{}), stopped: make(chan struct{}), @@ -81,21 +81,16 @@ func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.Me return c } -func (c *Client) Run() { +func (c *Client) Run(ctx context.Context) { go func() { defer close(c.stopped) for { select { case deal := <-c.incoming: - log.Info("incoming deal") - - // TODO: track in datastore - if err := c.deals.Begin(deal.ProposalCid, deal); err != nil { - log.Errorf("deal state begin failed: %s", err) - continue - } - + c.onIncoming(deal) + case update := <-c.updated: + c.onUpdated(ctx, update) case <-c.stop: return } @@ -103,87 +98,59 @@ func (c *Client) Run() { }() } -func (c *Client) commP(ctx context.Context, data cid.Cid) ([]byte, int64, error) { - root, err := c.dag.Get(ctx, data) - if err != nil { - log.Errorf("failed to get file root for deal: %s", err) - return nil, 0, err +func (c *Client) onIncoming(deal ClientDeal) { + log.Info("incoming deal") + + if _, ok := c.conns[deal.ProposalCid]; ok { + log.Errorf("tracking deal connection: already tracking connection for deal %s", deal.ProposalCid) + return + } + c.conns[deal.ProposalCid] = deal.s + + if err := c.deals.Begin(deal.ProposalCid, deal); err != nil { + // We may have re-sent the proposal + log.Errorf("deal tracking failed: %s", err) + c.failDeal(deal.ProposalCid, err) + return } - n, err := unixfile.NewUnixfsFile(ctx, c.dag, root) - if err != nil { - log.Errorf("cannot open unixfs file: %s", err) - return nil, 0, err - } + go func() { + c.updated <- clientDealUpdate{ + newState: Unknown, + id: deal.ProposalCid, + err: nil, + } + }() +} - uf, ok := n.(files.File) - if !ok { - // TODO: we probably got directory, how should we handle this in unixfs mode? - return nil, 0, xerrors.New("unsupported unixfs type") +func (c *Client) onUpdated(ctx context.Context, update clientDealUpdate) { + log.Infof("Deal %s updated state to %d", update.id, update.newState) + if update.err != nil { + log.Errorf("deal %s failed: %s", update.id, update.err) + c.failDeal(update.id, update.err) + return } - - size, err := uf.Size() - if err != nil { - return nil, 0, err - } - - var commP [sectorbuilder.CommitmentBytesLen]byte - err = withTemp(uf, func(f string) error { - commP, err = sectorbuilder.GeneratePieceCommitment(f, uint64(size)) - return err + var deal ClientDeal + err := c.deals.MutateClient(update.id, func(d *ClientDeal) error { + d.State = update.newState + deal = *d + return nil }) - return commP[:], size, err -} - -func (c *Client) sendProposal(s inet.Stream, proposal StorageDealProposal, from address.Address) error { - log.Info("Sending deal proposal") - - msg, err := cbor.DumpObject(proposal) if err != nil { - return err - } - sig, err := c.w.Sign(context.TODO(), from, msg) - if err != nil { - return err + c.failDeal(update.id, err) + return } - signedProposal := &SignedStorageDealProposal{ - Proposal: proposal, - Signature: sig, + switch update.newState { + case Unknown: // new + c.handle(ctx, deal, c.new, Accepted) + case Accepted: + c.handle(ctx, deal, c.accepted, Staged) + case Staged: + c.handle(ctx, deal, c.staged, Sealing) + case Sealing: + c.handle(ctx, deal, c.sealing, Complete) } - - return cborrpc.WriteCborRPC(s, signedProposal) -} - -func (c *Client) waitAccept(s inet.Stream, proposal StorageDealProposal, minerID peer.ID) (ClientDeal, error) { - log.Info("Waiting for response") - - var resp SignedStorageDealResponse - if err := cborrpc.ReadCborRPC(s, &resp); err != nil { - log.Errorw("failed to read StorageDealResponse message", "error", err) - return ClientDeal{}, err - } - - // TODO: verify signature - - if resp.Response.State != Accepted { - return ClientDeal{}, xerrors.Errorf("Deal wasn't accepted (State=%d)", resp.Response.State) - } - - proposalNd, err := cbor.WrapObject(proposal, math.MaxUint64, -1) - if err != nil { - return ClientDeal{}, err - } - - if resp.Response.Proposal != proposalNd.Cid() { - return ClientDeal{}, xerrors.New("miner responded to a wrong proposal") - } - - return ClientDeal{ - ProposalCid: proposalNd.Cid(), - Status: DealResolvingMiner, - Miner: minerID, - }, nil } type ClientDealProposal struct { @@ -228,21 +195,27 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal, vd *actors.Pie if err != nil { return cid.Undef, err } - defer s.Reset() // TODO: handle other updates if err := c.sendProposal(s, proposal, p.ClientAddress); err != nil { return cid.Undef, err } - deal, err := c.waitAccept(s, proposal, p.MinerID) + proposalNd, err := cbor.WrapObject(proposal, math.MaxUint64, -1) if err != nil { return cid.Undef, err } - log.Info("DEAL ACCEPTED!") + deal := ClientDeal{ + ProposalCid: proposalNd.Cid(), + Proposal: proposal, + State: Unknown, + Miner: p.MinerID, + + s: s, + } // TODO: actually care about what happens with the deal after it was accepted - //c.incoming <- deal + c.incoming <- deal // TODO: start tracking after the deal is sealed return deal.ProposalCid, c.discovery.AddPeer(p.Data, discovery.RetrievalPeer{ diff --git a/chain/deals/client_states.go b/chain/deals/client_states.go new file mode 100644 index 000000000..0139252e3 --- /dev/null +++ b/chain/deals/client_states.go @@ -0,0 +1,99 @@ +package deals + +import ( + "context" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-lotus/build" + "github.com/filecoin-project/go-lotus/lib/sectorbuilder" +) + +type clientHandlerFunc func(ctx context.Context, deal ClientDeal) error + +func (c *Client) handle(ctx context.Context, deal ClientDeal, cb clientHandlerFunc, next DealState) { + go func() { + err := cb(ctx, deal) + select { + case c.updated <- clientDealUpdate{ + newState: next, + id: deal.ProposalCid, + err: err, + }: + case <-c.stop: + } + }() +} + +func (c *Client) new(ctx context.Context, deal ClientDeal) error { + resp, err := c.readStorageDealResp(deal) + if err != nil { + return err + } + + if resp.State != Accepted { + return xerrors.Errorf("deal wasn't accepted (State=%d)", resp.State) + } + + log.Info("DEAL ACCEPTED!") + + return nil +} + +func (c *Client) accepted(ctx context.Context, deal ClientDeal) error { + /* data transfer happens */ + + resp, err := c.readStorageDealResp(deal) + if err != nil { + return err + } + + if resp.State != Staged { + return xerrors.Errorf("deal wasn't staged (State=%d)", resp.State) + } + + log.Info("DEAL STAGED!") + + return nil +} + +func (c *Client) staged(ctx context.Context, deal ClientDeal) error { + /* miner seals our data, hopefully */ + + resp, err := c.readStorageDealResp(deal) + if err != nil { + return err + } + + if resp.State != Sealing { + return xerrors.Errorf("deal wasn't sealed (State=%d)", resp.State) + } + + log.Info("DEAL SEALED!") + + ok, err := sectorbuilder.VerifyPieceInclusionProof(build.SectorSize, deal.Proposal.Size, deal.Proposal.CommP, resp.CommD, resp.PieceInclusionProof.ProofElements) + if err != nil { + return xerrors.Errorf("verifying piece inclusion proof in staged deal %s: %w", deal.ProposalCid, err) + } + if !ok { + return xerrors.Errorf("verifying piece inclusion proof in staged deal %s failed", deal.ProposalCid) + } + + return nil +} + +func (c *Client) sealing(ctx context.Context, deal ClientDeal) error { + resp, err := c.readStorageDealResp(deal) + if err != nil { + return err + } + + if resp.State != Complete { + return xerrors.Errorf("deal wasn't complete (State=%d)", resp.State) + } + + // TODO: look for the commit message on chain, negotiate better payment vouchers + + log.Info("DEAL COMPLETE!!") + return nil +} diff --git a/chain/deals/client_utils.go b/chain/deals/client_utils.go new file mode 100644 index 000000000..94aa58f9b --- /dev/null +++ b/chain/deals/client_utils.go @@ -0,0 +1,111 @@ +package deals + +import ( + "context" + "runtime" + + sectorbuilder "github.com/filecoin-project/go-sectorbuilder" + "github.com/ipfs/go-cid" + files "github.com/ipfs/go-ipfs-files" + cbor "github.com/ipfs/go-ipld-cbor" + unixfile "github.com/ipfs/go-unixfs/file" + inet "github.com/libp2p/go-libp2p-core/network" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/lib/cborrpc" +) + +func (c *Client) failDeal(id cid.Cid, cerr error) { + if err := c.deals.End(id); err != nil { + log.Warnf("deals.End: %s", err) + } + + if cerr == nil { + _, f, l, _ := runtime.Caller(1) + cerr = xerrors.Errorf("unknown error (fail called at %s:%d)", f, l) + } + + s, ok := c.conns[id] + if ok { + _ = s.Reset() + delete(c.conns, id) + } + + // TODO: store in some sort of audit log + log.Errorf("deal %s failed: %s", id, cerr) +} + +func (c *Client) commP(ctx context.Context, data cid.Cid) ([]byte, int64, error) { + root, err := c.dag.Get(ctx, data) + if err != nil { + log.Errorf("failed to get file root for deal: %s", err) + return nil, 0, err + } + + n, err := unixfile.NewUnixfsFile(ctx, c.dag, root) + if err != nil { + log.Errorf("cannot open unixfs file: %s", err) + return nil, 0, err + } + + uf, ok := n.(files.File) + if !ok { + // TODO: we probably got directory, how should we handle this in unixfs mode? + return nil, 0, xerrors.New("unsupported unixfs type") + } + + size, err := uf.Size() + if err != nil { + return nil, 0, err + } + + var commP [sectorbuilder.CommitmentBytesLen]byte + err = withTemp(uf, func(f string) error { + commP, err = sectorbuilder.GeneratePieceCommitment(f, uint64(size)) + return err + }) + return commP[:], size, err +} + +func (c *Client) sendProposal(s inet.Stream, proposal StorageDealProposal, from address.Address) error { + log.Info("Sending deal proposal") + + msg, err := cbor.DumpObject(proposal) + if err != nil { + return err + } + sig, err := c.w.Sign(context.TODO(), from, msg) + if err != nil { + return err + } + + signedProposal := &SignedStorageDealProposal{ + Proposal: proposal, + Signature: sig, + } + + return cborrpc.WriteCborRPC(s, signedProposal) +} + +func (c *Client) readStorageDealResp(deal ClientDeal) (*StorageDealResponse, error) { + s, ok := c.conns[deal.ProposalCid] + if !ok { + // TODO: Try to re-establish the connection using query protocol + return nil, xerrors.Errorf("no connection to miner") + } + + var resp SignedStorageDealResponse + if err := cborrpc.ReadCborRPC(s, &resp); err != nil { + log.Errorw("failed to read StorageDealResponse message", "error", err) + return nil, err + } + + // TODO: verify signature + + if resp.Response.Proposal != deal.ProposalCid { + return nil, xerrors.New("miner responded to a wrong proposal") + } + + return &resp.Response, nil +} diff --git a/chain/deals/handler.go b/chain/deals/handler.go index 602ccc15f..c98cc7f4c 100644 --- a/chain/deals/handler.go +++ b/chain/deals/handler.go @@ -2,19 +2,20 @@ package deals import ( "context" - "github.com/filecoin-project/go-lotus/chain/types" - "github.com/filecoin-project/go-lotus/storage/sectorblocks" "math" - "github.com/filecoin-project/go-lotus/api" - "github.com/filecoin-project/go-lotus/chain/address" - "github.com/filecoin-project/go-lotus/node/modules/dtypes" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" cbor "github.com/ipfs/go-ipld-cbor" inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + + "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/types" + "github.com/filecoin-project/go-lotus/node/modules/dtypes" + "github.com/filecoin-project/go-lotus/storage/sectorblocks" ) func init() { @@ -50,12 +51,12 @@ type Handler struct { actor address.Address incoming chan MinerDeal - updated chan dealUpdate + updated chan minerDealUpdate stop chan struct{} stopped chan struct{} } -type dealUpdate struct { +type minerDealUpdate struct { newState DealState id cid.Cid err error @@ -82,7 +83,7 @@ func NewHandler(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, dag dtyp conns: map[cid.Cid]inet.Stream{}, incoming: make(chan MinerDeal), - updated: make(chan dealUpdate), + updated: make(chan minerDealUpdate), stop: make(chan struct{}), stopped: make(chan struct{}), @@ -125,7 +126,7 @@ func (h *Handler) onIncoming(deal MinerDeal) { } go func() { - h.updated <- dealUpdate{ + h.updated <- minerDealUpdate{ newState: Accepted, id: deal.ProposalCid, err: nil, @@ -133,7 +134,7 @@ func (h *Handler) onIncoming(deal MinerDeal) { }() } -func (h *Handler) onUpdated(ctx context.Context, update dealUpdate) { +func (h *Handler) onUpdated(ctx context.Context, update minerDealUpdate) { log.Infof("Deal %s updated state to %d", update.id, update.newState) if update.err != nil { log.Errorf("deal %s failed: %s", update.id, update.err) diff --git a/chain/deals/handler_states.go b/chain/deals/handler_states.go index ec4f279a8..503b503a2 100644 --- a/chain/deals/handler_states.go +++ b/chain/deals/handler_states.go @@ -16,13 +16,13 @@ import ( "github.com/filecoin-project/go-lotus/storage/sectorblocks" ) -type handlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) +type minerHandlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) -func (h *Handler) handle(ctx context.Context, deal MinerDeal, cb handlerFunc, next DealState) { +func (h *Handler) handle(ctx context.Context, deal MinerDeal, cb minerHandlerFunc, next DealState) { go func() { mut, err := cb(ctx, deal) select { - case h.updated <- dealUpdate{ + case h.updated <- minerDealUpdate{ newState: next, id: deal.ProposalCid, err: err, @@ -237,6 +237,7 @@ func (h *Handler) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal) State: Sealing, Proposal: deal.ProposalCid, PieceInclusionProof: ip, + CommD: status.CommD[:], }) if err != nil { log.Warnf("Sending deal response failed: %s", err) diff --git a/chain/deals/handler_utils.go b/chain/deals/handler_utils.go index d4abe2861..e5d652a5e 100644 --- a/chain/deals/handler_utils.go +++ b/chain/deals/handler_utils.go @@ -35,7 +35,7 @@ func (h *Handler) failDeal(id cid.Cid, cerr error) { s, ok := h.conns[id] if ok { - _ = s.Close() + _ = s.Reset() delete(h.conns, id) } diff --git a/chain/deals/types.go b/chain/deals/types.go index f17fb1cac..5197024a0 100644 --- a/chain/deals/types.go +++ b/chain/deals/types.go @@ -19,6 +19,8 @@ func init() { cbor.RegisterCborType(SignedStorageDealResponse{}) } +const ProtocolID = "/fil/storage/mk/1.0.0" + type SerializationMode string const ( @@ -77,6 +79,7 @@ type StorageDealResponse struct { // Sealing PieceInclusionProof PieceInclusionProof + CommD []byte // TODO: not in spec // Complete SectorCommitMessage *cid.Cid diff --git a/node/modules/services.go b/node/modules/services.go index 658ac5fd8..95d25961b 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -59,10 +59,12 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pu go sub.HandleIncomingMessages(ctx, mpool, msgsub) } -func RunDealClient(lc fx.Lifecycle, c *deals.Client) { +func RunDealClient(mctx helpers.MetricsCtx, lc fx.Lifecycle, c *deals.Client) { + ctx := helpers.LifecycleCtx(mctx, lc) + lc.Append(fx.Hook{ OnStart: func(context.Context) error { - c.Run() + c.Run(ctx) return nil }, OnStop: func(context.Context) error {