From 61e14d0f4c1679ffa424e3cce43a23df0147c06b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 23 Oct 2019 14:59:57 +0200 Subject: [PATCH] on chain deals: Fix some serialization bugs --- chain/actors/actor_storagemarket.go | 2 +- chain/deals/cbor_gen.go | 86 ++++++++++++++++++++++------- chain/deals/client.go | 14 ++--- chain/deals/client_states.go | 4 +- chain/deals/provider.go | 4 +- chain/deals/provider_states.go | 19 ++++--- chain/deals/provider_utils.go | 11 ++-- chain/deals/state_store.go | 12 ++-- chain/deals/types.go | 6 +- node/impl/full/state.go | 6 +- 10 files changed, 106 insertions(+), 58 deletions(-) diff --git a/chain/actors/actor_storagemarket.go b/chain/actors/actor_storagemarket.go index 0007fb0b3..651b02b77 100644 --- a/chain/actors/actor_storagemarket.go +++ b/chain/actors/actor_storagemarket.go @@ -114,7 +114,7 @@ func (sdp *StorageDealProposal) Verify() error { unsigned := *sdp unsigned.ProposerSignature = nil var buf bytes.Buffer - if err := sdp.MarshalCBOR(&buf); err != nil { + if err := unsigned.MarshalCBOR(&buf); err != nil { return err } diff --git a/chain/deals/cbor_gen.go b/chain/deals/cbor_gen.go index 9952630e3..41342d56f 100644 --- a/chain/deals/cbor_gen.go +++ b/chain/deals/cbor_gen.go @@ -4,11 +4,11 @@ import ( "fmt" "io" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/types" "github.com/libp2p/go-libp2p-core/peer" cbg "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" - - "github.com/filecoin-project/lotus/chain/types" ) /* This file was generated by github.com/whyrusleeping/cbor-gen */ @@ -165,7 +165,7 @@ func (t *Response) MarshalCBOR(w io.Writer) error { return err } - // t.t.State (api.DealState) + // t.t.State (uint64) if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.State)); err != nil { return err } @@ -191,14 +191,26 @@ func (t *Response) MarshalCBOR(w io.Writer) error { // t.t.PublishMessage (cid.Cid) - if err := cbg.WriteCid(w, t.PublishMessage); err != nil { - return xerrors.Errorf("failed to write cid field t.PublishMessage: %w", err) + if t.PublishMessage == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(w, *t.PublishMessage); err != nil { + return xerrors.Errorf("failed to write cid field t.PublishMessage: %w", err) + } } // t.t.CommitMessage (cid.Cid) - if err := cbg.WriteCid(w, t.CommitMessage); err != nil { - return xerrors.Errorf("failed to write cid field t.CommitMessage: %w", err) + if t.CommitMessage == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(w, *t.CommitMessage); err != nil { + return xerrors.Errorf("failed to write cid field t.CommitMessage: %w", err) + } } return nil @@ -219,7 +231,7 @@ func (t *Response) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input had wrong number of fields") } - // t.t.State (api.DealState) + // t.t.State (uint64) maj, extra, err = cbg.CborReadHeader(br) if err != nil { @@ -255,33 +267,69 @@ func (t *Response) UnmarshalCBOR(r io.Reader) error { { - if err := t.StorageDeal.UnmarshalCBOR(br); err != nil { + pb, err := br.PeekByte() + if err != nil { return err } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + t.StorageDeal = new(actors.StorageDeal) + if err := t.StorageDeal.UnmarshalCBOR(br); err != nil { + return err + } + } } // t.t.PublishMessage (cid.Cid) { - c, err := cbg.ReadCid(br) + pb, err := br.PeekByte() if err != nil { - return xerrors.Errorf("failed to read cid field t.PublishMessage: %w", err) + return err } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { - t.PublishMessage = c + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.PublishMessage: %w", err) + } + + t.PublishMessage = &c + } } // t.t.CommitMessage (cid.Cid) { - c, err := cbg.ReadCid(br) + pb, err := br.PeekByte() if err != nil { - return xerrors.Errorf("failed to read cid field t.CommitMessage: %w", err) + return err } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { - t.CommitMessage = c + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.CommitMessage: %w", err) + } + + t.CommitMessage = &c + } } return nil @@ -528,7 +576,7 @@ func (t *ClientDeal) MarshalCBOR(w io.Writer) error { return err } - // t.t.State (api.DealState) + // t.t.State (uint64) if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.State)); err != nil { return err } @@ -579,7 +627,7 @@ func (t *ClientDeal) UnmarshalCBOR(r io.Reader) error { } } - // t.t.State (api.DealState) + // t.t.State (uint64) maj, extra, err = cbg.CborReadHeader(br) if err != nil { @@ -630,7 +678,7 @@ func (t *MinerDeal) MarshalCBOR(w io.Writer) error { return xerrors.Errorf("failed to write cid field t.ProposalCid: %w", err) } - // t.t.State (api.DealState) + // t.t.State (uint64) if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.State)); err != nil { return err } @@ -694,7 +742,7 @@ func (t *MinerDeal) UnmarshalCBOR(r io.Reader) error { t.ProposalCid = c } - // t.t.State (api.DealState) + // t.t.State (uint64) maj, extra, err = cbg.CborReadHeader(br) if err != nil { diff --git a/chain/deals/client.go b/chain/deals/client.go index ac3d5d54c..05ac4d60f 100644 --- a/chain/deals/client.go +++ b/chain/deals/client.go @@ -46,7 +46,7 @@ type Client struct { deals ClientStateStore conns map[cid.Cid]inet.Stream - incoming chan ClientDeal + incoming chan *ClientDeal updated chan clientDealUpdate stop chan struct{} @@ -71,7 +71,7 @@ func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w * deals: ClientStateStore{StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))}}, conns: map[cid.Cid]inet.Stream{}, - incoming: make(chan ClientDeal, 16), + incoming: make(chan *ClientDeal, 16), updated: make(chan clientDealUpdate, 16), stop: make(chan struct{}), @@ -98,7 +98,7 @@ func (c *Client) Run(ctx context.Context) { }() } -func (c *Client) onIncoming(deal ClientDeal) { +func (c *Client) onIncoming(deal *ClientDeal) { log.Info("incoming deal") if _, ok := c.conns[deal.ProposalCid]; ok { @@ -167,7 +167,7 @@ type ClientDealProposal struct { } func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, error) { - proposal := actors.StorageDealProposal{ + proposal := &actors.StorageDealProposal{ PieceRef: p.Data.Bytes(), PieceSize: p.DataSize, PieceSerialization: actors.SerializationUnixFSv0, @@ -179,7 +179,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro StorageCollateral: types.NewInt(p.DataSize), // TODO: real calc } - if err := api.SignWith(ctx, c.w.Sign, p.Client, &proposal); err != nil { + if err := api.SignWith(ctx, c.w.Sign, p.Client, proposal); err != nil { return cid.Undef, xerrors.Errorf("signing deal proposal failed: %w", err) } @@ -199,9 +199,9 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro return cid.Undef, xerrors.Errorf("sending proposal to storage provider failed: %w", err) } - deal := ClientDeal{ + deal := &ClientDeal{ ProposalCid: proposalNd.Cid(), - Proposal: proposal, + Proposal: *proposal, State: api.DealUnknown, Miner: p.MinerID, diff --git a/chain/deals/client_states.go b/chain/deals/client_states.go index 28164c61f..26f16cc51 100644 --- a/chain/deals/client_states.go +++ b/chain/deals/client_states.go @@ -38,7 +38,7 @@ func (c *Client) new(ctx context.Context, deal ClientDeal) error { } // TODO: spec says it's optional - pubmsg, err := c.chain.GetMessage(resp.PublishMessage) + pubmsg, err := c.chain.GetMessage(*resp.PublishMessage) if err != nil { return xerrors.Errorf("getting deal pubsish message: %w", err) } @@ -56,7 +56,7 @@ func (c *Client) new(ctx context.Context, deal ClientDeal) error { } // TODO: timeout - _, ret, err := c.sm.WaitForMessage(ctx, resp.PublishMessage) + _, ret, err := c.sm.WaitForMessage(ctx, *resp.PublishMessage) if err != nil { return xerrors.Errorf("Waiting for deal publish message: %w", err) } diff --git a/chain/deals/provider.go b/chain/deals/provider.go index caf73ca84..69b39cc42 100644 --- a/chain/deals/provider.go +++ b/chain/deals/provider.go @@ -141,7 +141,7 @@ func (p *Provider) onIncoming(deal MinerDeal) { p.conns[deal.ProposalCid] = deal.s - if err := p.deals.Begin(deal.ProposalCid, deal); err != nil { + if err := p.deals.Begin(deal.ProposalCid, &deal); err != nil { // This can happen when client re-sends proposal p.failDeal(deal.ProposalCid, err) log.Errorf("deal tracking failed: %s", err) @@ -191,7 +191,7 @@ func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) { } func (p *Provider) newDeal(s inet.Stream, proposal actors.StorageDealProposal) (MinerDeal, error) { - proposalNd, err := cborrpc.AsIpld(proposal) + proposalNd, err := cborrpc.AsIpld(&proposal) if err != nil { return MinerDeal{}, err } diff --git a/chain/deals/provider_states.go b/chain/deals/provider_states.go index ed7baea86..02a058c3c 100644 --- a/chain/deals/provider_states.go +++ b/chain/deals/provider_states.go @@ -80,7 +80,7 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal) // check market funds clientMarketBalance, err := p.full.StateMarketBalance(ctx, deal.Proposal.Client, nil) if err != nil { - return nil, err + return nil, xerrors.Errorf("getting client market balance failed: %w", err) } // This doesn't guarantee that the client won't withdraw / lock those funds @@ -89,9 +89,9 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal) return nil, xerrors.New("clientMarketBalance.Available too small") } - providerMarketBalance, err := p.full.StateMarketBalance(ctx, deal.Proposal.Client, nil) + providerMarketBalance, err := p.full.StateMarketBalance(ctx, deal.Proposal.Provider, nil) if err != nil { - return nil, err + return nil, xerrors.Errorf("getting provider market balance failed: %w", err) } // TODO: this needs to be atomic @@ -124,11 +124,12 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal) } log.Info("fetching data for a deal") - err = p.sendSignedResponse(Response{ + mcid := smsg.Cid() + err = p.sendSignedResponse(&Response{ State: api.DealAccepted, Message: "", Proposal: deal.ProposalCid, - PublishMessage: smsg.Cid(), + PublishMessage: &mcid, }) if err != nil { return nil, err @@ -140,7 +141,7 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal) // STAGED func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { - err := p.sendSignedResponse(Response{ + err := p.sendSignedResponse(&Response{ State: api.DealStaged, Proposal: deal.ProposalCid, }) @@ -205,7 +206,7 @@ func (p *Provider) waitSealed(ctx context.Context, deal MinerDeal) (sectorbuilde } func (p *Provider) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { - err := p.sendSignedResponse(Response{ + err := p.sendSignedResponse(&Response{ State: api.DealSealing, Proposal: deal.ProposalCid, }) @@ -231,11 +232,11 @@ func (p *Provider) complete(ctx context.Context, deal MinerDeal) (func(*MinerDea log.Warnf("Waiting for sector commitment message: %s", err) } - err = p.sendSignedResponse(Response{ + err = p.sendSignedResponse(&Response{ State: api.DealComplete, Proposal: deal.ProposalCid, - CommitMessage: mcid, + CommitMessage: &mcid, }) if err != nil { log.Warnf("Sending deal response failed: %s", err) diff --git a/chain/deals/provider_utils.go b/chain/deals/provider_utils.go index 3a9f96233..5931cefc9 100644 --- a/chain/deals/provider_utils.go +++ b/chain/deals/provider_utils.go @@ -12,7 +12,6 @@ import ( "github.com/filecoin-project/lotus/lib/cborrpc" "github.com/ipfs/go-cid" - cbor "github.com/ipfs/go-ipld-cbor" inet "github.com/libp2p/go-libp2p-core/network" "golang.org/x/xerrors" ) @@ -29,7 +28,7 @@ func (p *Provider) failDeal(id cid.Cid, cerr error) { log.Errorf("deal %s failed: %s", id, cerr) - err := p.sendSignedResponse(Response{ + err := p.sendSignedResponse(&Response{ State: api.DealFailed, Message: cerr.Error(), Proposal: id, @@ -67,13 +66,13 @@ func (p *Provider) readProposal(s inet.Stream) (proposal actors.StorageDealPropo return } -func (p *Provider) sendSignedResponse(resp Response) error { +func (p *Provider) sendSignedResponse(resp *Response) error { s, ok := p.conns[resp.Proposal] if !ok { return xerrors.New("couldn't send response: not connected") } - msg, err := cbor.DumpObject(&resp) + msg, err := cborrpc.Dump(resp) if err != nil { return xerrors.Errorf("serializing response: %w", err) } @@ -88,8 +87,8 @@ func (p *Provider) sendSignedResponse(resp Response) error { return xerrors.Errorf("failed to sign response message: %w", err) } - signedResponse := SignedResponse{ - Response: resp, + signedResponse := &SignedResponse{ + Response: *resp, Signature: sig, } diff --git a/chain/deals/state_store.go b/chain/deals/state_store.go index 32b73835b..ae959146f 100644 --- a/chain/deals/state_store.go +++ b/chain/deals/state_store.go @@ -76,13 +76,13 @@ func (st *MinerStateStore) MutateMiner(i cid.Cid, mutator func(*MinerDeal) error func minerMutator(m func(*MinerDeal) error) func([]byte) ([]byte, error) { return func(in []byte) ([]byte, error) { - var deal MinerDeal - err := cborrpc.ReadCborRPC(bytes.NewReader(in), &deal) + deal := new(MinerDeal) + err := cborrpc.ReadCborRPC(bytes.NewReader(in), deal) if err != nil { return nil, err } - if err := m(&deal); err != nil { + if err := m(deal); err != nil { return nil, err } @@ -100,13 +100,13 @@ func (st *ClientStateStore) MutateClient(i cid.Cid, mutator func(*ClientDeal) er func clientMutator(m func(*ClientDeal) error) func([]byte) ([]byte, error) { return func(in []byte) ([]byte, error) { - var deal ClientDeal - err := cborrpc.ReadCborRPC(bytes.NewReader(in), &deal) + deal := new(ClientDeal) + err := cborrpc.ReadCborRPC(bytes.NewReader(in), deal) if err != nil { return nil, err } - if err := m(&deal); err != nil { + if err := m(deal); err != nil { return nil, err } diff --git a/chain/deals/types.go b/chain/deals/types.go index d4b445328..27bf31a8b 100644 --- a/chain/deals/types.go +++ b/chain/deals/types.go @@ -23,11 +23,11 @@ type Response struct { Proposal cid.Cid // DealAccepted - StorageDeal actors.StorageDeal - PublishMessage cid.Cid + StorageDeal *actors.StorageDeal + PublishMessage *cid.Cid // DealComplete - CommitMessage cid.Cid + CommitMessage *cid.Cid } // TODO: Do we actually need this to be signed? diff --git a/node/impl/full/state.go b/node/impl/full/state.go index 44f0e5220..115d73d86 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -233,7 +233,7 @@ func (a *StateAPI) StateListActors(ctx context.Context, ts *types.TipSet) ([]add func (a *StateAPI) StateMarketBalance(ctx context.Context, addr address.Address, ts *types.TipSet) (actors.StorageParticipantBalance, error) { var state actors.StorageMarketState - if _, err := a.StateManager.LoadActorState(ctx, actors.StoragePowerAddress, &state, ts); err != nil { + if _, err := a.StateManager.LoadActorState(ctx, actors.StorageMarketAddress, &state, ts); err != nil { return actors.StorageParticipantBalance{}, err } cst := hamt.CSTFromBstore(a.StateManager.ChainStore().Blockstore()) @@ -249,7 +249,7 @@ func (a *StateAPI) StateMarketParticipants(ctx context.Context, ts *types.TipSet out := map[string]actors.StorageParticipantBalance{} var state actors.StorageMarketState - if _, err := a.StateManager.LoadActorState(ctx, actors.StoragePowerAddress, &state, ts); err != nil { + if _, err := a.StateManager.LoadActorState(ctx, actors.StorageMarketAddress, &state, ts); err != nil { return nil, err } cst := hamt.CSTFromBstore(a.StateManager.ChainStore().Blockstore()) @@ -281,7 +281,7 @@ func (a *StateAPI) StateMarketDeals(ctx context.Context, ts *types.TipSet) (map[ out := map[string]actors.OnChainDeal{} var state actors.StorageMarketState - if _, err := a.StateManager.LoadActorState(ctx, actors.StoragePowerAddress, &state, ts); err != nil { + if _, err := a.StateManager.LoadActorState(ctx, actors.StorageMarketAddress, &state, ts); err != nil { return nil, err }