From 821e03bcd78596c02505861416fd299317ec8ac6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 7 Aug 2019 21:48:53 +0200 Subject: [PATCH] deals: Cleanup client a bit --- chain/deals/client.go | 137 ++++++++++++++++++---------------- chain/deals/handler_states.go | 2 +- chain/deals/handler_utils.go | 4 +- 3 files changed, 76 insertions(+), 67 deletions(-) diff --git a/chain/deals/client.go b/chain/deals/client.go index 5476c5e8c..8c5e35a54 100644 --- a/chain/deals/client.go +++ b/chain/deals/client.go @@ -2,10 +2,7 @@ package deals import ( "context" - "io" - "io/ioutil" "math" - "os" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" "github.com/ipfs/go-cid" @@ -16,6 +13,7 @@ import ( logging "github.com/ipfs/go-log" unixfile "github.com/ipfs/go-unixfs/file" "github.com/libp2p/go-libp2p-core/host" + inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" @@ -101,46 +99,92 @@ func (c *Client) Run() { }() } -func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigInt, from address.Address, miner address.Address, minerID peer.ID, blocksDuration uint64) (cid.Cid, error) { - // TODO: Eww +func (c *Client) commP(ctx context.Context, data cid.Cid) ([]byte, int64, error) { root, err := c.dag.Get(ctx, data) if err != nil { log.Errorf("failed to get file root for deal: %s", err) - return cid.Undef, err + return nil, 0, err } n, err := unixfile.NewUnixfsFile(ctx, c.dag, root) if err != nil { log.Errorf("cannot open unixfs file: %s", err) - return cid.Undef, err + return nil, 0, err } uf, ok := n.(files.File) if !ok { // TODO: we probably got directory, how should we handle this in unixfs mode? - return cid.Undef, xerrors.New("unsupported unixfs type") + return nil, 0, xerrors.New("unsupported unixfs type") } size, err := uf.Size() if err != nil { - return cid.Cid{}, err + return nil, 0, err } - f, err := ioutil.TempFile(os.TempDir(), "commP-temp-") + var commP [sectorbuilder.CommitmentBytesLen]byte + err = withTemp(uf, func(f string) error { + commP, err = sectorbuilder.GeneratePieceCommitment(f, uint64(size)) + return err + }) + return commP[:], size, err +} + +func (c *Client) sendProposal(s inet.Stream, proposal StorageDealProposal, from address.Address) error { + log.Info("Sending deal proposal") + + msg, err := cbor.DumpObject(proposal) if err != nil { - return cid.Undef, err + return err } - if _, err := io.Copy(f, uf); err != nil { - return cid.Undef, err - } - if err := f.Close(); err != nil { - return cid.Undef, err - } - commP, err := sectorbuilder.GeneratePieceCommitment(f.Name(), uint64(size)) + sig, err := c.w.Sign(from, msg) if err != nil { - return cid.Undef, err + return err } - if err := os.Remove(f.Name()); err != nil { + + signedProposal := &SignedStorageDealProposal{ + Proposal: proposal, + Signature: sig, + } + + return cborrpc.WriteCborRPC(s, signedProposal) +} + +func (c *Client) waitAccept(s inet.Stream, proposal StorageDealProposal, minerID peer.ID) (ClientDeal, error) { + log.Info("Waiting for response") + + var resp SignedStorageDealResponse + if err := cborrpc.ReadCborRPC(s, &resp); err != nil { + log.Errorw("failed to read StorageDealResponse message", "error", err) + return ClientDeal{}, err + } + + // TODO: verify signature + + if resp.Response.State != Accepted { + return ClientDeal{}, xerrors.Errorf("Deal wasn't accepted (State=%d)", resp.Response.State) + } + + proposalNd, err := cbor.WrapObject(proposal, math.MaxUint64, -1) + if err != nil { + return ClientDeal{}, err + } + + if resp.Response.Proposal != proposalNd.Cid() { + return ClientDeal{}, xerrors.New("miner responded to a wrong proposal") + } + + return ClientDeal{ + ProposalCid: proposalNd.Cid(), + Status: DealResolvingMiner, + Miner: minerID, + }, nil +} + +func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigInt, from address.Address, miner address.Address, minerID peer.ID, blocksDuration uint64) (cid.Cid, error) { + commP, size, err := c.commP(ctx, data) + if err != nil { return cid.Undef, err } @@ -151,7 +195,7 @@ func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigIn PieceRef: data.String(), SerializationMode: SerializationUnixFs, CommP: commP[:], - Size: 6, + Size: uint64(size), TotalPrice: totalPrice, Duration: blocksDuration, Payment: PaymentInfo{ @@ -168,57 +212,22 @@ func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigIn if err != nil { return cid.Undef, err } - defer s.Close() // TODO: not too soon? + defer s.Reset() // TODO: handle other updates - log.Info("Sending deal proposal") - - msg, err := cbor.DumpObject(proposal) - if err != nil { + if err := c.sendProposal(s, proposal, from); err != nil { return cid.Undef, err } - sig, err := c.w.Sign(from, msg) + + deal, err := c.waitAccept(s, proposal, minerID) if err != nil { return cid.Undef, err } - signedProposal := &SignedStorageDealProposal{ - Proposal: proposal, - Signature: sig, - } + log.Info("DEAL ACCEPTED!") - if err := cborrpc.WriteCborRPC(s, signedProposal); err != nil { - return cid.Undef, err - } - - log.Info("Reading response") - - var resp SignedStorageDealResponse - if err := cborrpc.ReadCborRPC(s, &resp); err != nil { - log.Errorw("failed to read StorageDealResponse message", "error", err) - return cid.Undef, err - } - - // TODO: verify signature - - if resp.Response.State != Accepted { - return cid.Undef, xerrors.Errorf("Deal wasn't accepted (State=%d)", resp.Response.State) - } - - log.Info("Registering deal") - - proposalNd, err := cbor.WrapObject(proposal, math.MaxUint64, -1) - if err != nil { - return cid.Undef, err - } - - deal := ClientDeal{ - ProposalCid: proposalNd.Cid(), - Status: DealResolvingMiner, - Miner: minerID, - } - - c.incoming <- deal - return proposalNd.Cid(), nil + // TODO: actually care about what happens with the deal after it was accepted + //c.incoming <- deal + return deal.ProposalCid, nil } func (c *Client) Stop() { diff --git a/chain/deals/handler_states.go b/chain/deals/handler_states.go index 620d43201..fb9c411e5 100644 --- a/chain/deals/handler_states.go +++ b/chain/deals/handler_states.go @@ -59,7 +59,7 @@ func (h *Handler) staged(ctx context.Context, deal MinerDeal) error { Proposal: deal.ProposalCid, }) if err != nil { - return err + log.Warnf("Sending deal response failed: %s", err) } root, err := h.dag.Get(ctx, deal.Ref) diff --git a/chain/deals/handler_utils.go b/chain/deals/handler_utils.go index 00e70fe69..c359bac67 100644 --- a/chain/deals/handler_utils.go +++ b/chain/deals/handler_utils.go @@ -21,7 +21,7 @@ func (h *Handler) failDeal(id cid.Cid, cerr error) { cerr = xerrors.Errorf("unknown error (fail called at %s:%d)", f, l) } - log.Error("Deal %s failed: %s", id, cerr) + log.Errorf("deal %s failed: %s", id, cerr) err := h.sendSignedResponse(StorageDealResponse{ State: Failed, @@ -76,7 +76,7 @@ func (h *Handler) sendSignedResponse(resp StorageDealResponse) error { if len(def) != 1 { // NOTE: If this ever happens for a good reason, implement this with GetWorker on the miner actor // TODO: implement with GetWorker on the miner actor - return xerrors.Errorf("Expected only 1 address in wallet, got %d", len(def)) + return xerrors.Errorf("expected only 1 address in wallet, got %d", len(def)) } sig, err := h.w.Sign(def[0], msg)