on chain deals: Fix some serialization bugs

This commit is contained in:
Łukasz Magiera 2019-10-23 14:59:57 +02:00
parent bd77bba676
commit 61e14d0f4c
10 changed files with 106 additions and 58 deletions

View File

@ -114,7 +114,7 @@ func (sdp *StorageDealProposal) Verify() error {
unsigned := *sdp
unsigned.ProposerSignature = nil
var buf bytes.Buffer
if err := sdp.MarshalCBOR(&buf); err != nil {
if err := unsigned.MarshalCBOR(&buf); err != nil {
return err
}

View File

@ -4,11 +4,11 @@ import (
"fmt"
"io"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/types"
)
/* This file was generated by github.com/whyrusleeping/cbor-gen */
@ -165,7 +165,7 @@ func (t *Response) MarshalCBOR(w io.Writer) error {
return err
}
// t.t.State (api.DealState)
// t.t.State (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.State)); err != nil {
return err
}
@ -191,14 +191,26 @@ func (t *Response) MarshalCBOR(w io.Writer) error {
// t.t.PublishMessage (cid.Cid)
if err := cbg.WriteCid(w, t.PublishMessage); err != nil {
return xerrors.Errorf("failed to write cid field t.PublishMessage: %w", err)
if t.PublishMessage == nil {
if _, err := w.Write(cbg.CborNull); err != nil {
return err
}
} else {
if err := cbg.WriteCid(w, *t.PublishMessage); err != nil {
return xerrors.Errorf("failed to write cid field t.PublishMessage: %w", err)
}
}
// t.t.CommitMessage (cid.Cid)
if err := cbg.WriteCid(w, t.CommitMessage); err != nil {
return xerrors.Errorf("failed to write cid field t.CommitMessage: %w", err)
if t.CommitMessage == nil {
if _, err := w.Write(cbg.CborNull); err != nil {
return err
}
} else {
if err := cbg.WriteCid(w, *t.CommitMessage); err != nil {
return xerrors.Errorf("failed to write cid field t.CommitMessage: %w", err)
}
}
return nil
@ -219,7 +231,7 @@ func (t *Response) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.t.State (api.DealState)
// t.t.State (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
@ -255,33 +267,69 @@ func (t *Response) UnmarshalCBOR(r io.Reader) error {
{
if err := t.StorageDeal.UnmarshalCBOR(br); err != nil {
pb, err := br.PeekByte()
if err != nil {
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
return err
}
} else {
t.StorageDeal = new(actors.StorageDeal)
if err := t.StorageDeal.UnmarshalCBOR(br); err != nil {
return err
}
}
}
// t.t.PublishMessage (cid.Cid)
{
c, err := cbg.ReadCid(br)
pb, err := br.PeekByte()
if err != nil {
return xerrors.Errorf("failed to read cid field t.PublishMessage: %w", err)
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
return err
}
} else {
t.PublishMessage = c
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.PublishMessage: %w", err)
}
t.PublishMessage = &c
}
}
// t.t.CommitMessage (cid.Cid)
{
c, err := cbg.ReadCid(br)
pb, err := br.PeekByte()
if err != nil {
return xerrors.Errorf("failed to read cid field t.CommitMessage: %w", err)
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
return err
}
} else {
t.CommitMessage = c
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.CommitMessage: %w", err)
}
t.CommitMessage = &c
}
}
return nil
@ -528,7 +576,7 @@ func (t *ClientDeal) MarshalCBOR(w io.Writer) error {
return err
}
// t.t.State (api.DealState)
// t.t.State (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.State)); err != nil {
return err
}
@ -579,7 +627,7 @@ func (t *ClientDeal) UnmarshalCBOR(r io.Reader) error {
}
}
// t.t.State (api.DealState)
// t.t.State (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
@ -630,7 +678,7 @@ func (t *MinerDeal) MarshalCBOR(w io.Writer) error {
return xerrors.Errorf("failed to write cid field t.ProposalCid: %w", err)
}
// t.t.State (api.DealState)
// t.t.State (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.State)); err != nil {
return err
}
@ -694,7 +742,7 @@ func (t *MinerDeal) UnmarshalCBOR(r io.Reader) error {
t.ProposalCid = c
}
// t.t.State (api.DealState)
// t.t.State (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {

View File

@ -46,7 +46,7 @@ type Client struct {
deals ClientStateStore
conns map[cid.Cid]inet.Stream
incoming chan ClientDeal
incoming chan *ClientDeal
updated chan clientDealUpdate
stop chan struct{}
@ -71,7 +71,7 @@ func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *
deals: ClientStateStore{StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))}},
conns: map[cid.Cid]inet.Stream{},
incoming: make(chan ClientDeal, 16),
incoming: make(chan *ClientDeal, 16),
updated: make(chan clientDealUpdate, 16),
stop: make(chan struct{}),
@ -98,7 +98,7 @@ func (c *Client) Run(ctx context.Context) {
}()
}
func (c *Client) onIncoming(deal ClientDeal) {
func (c *Client) onIncoming(deal *ClientDeal) {
log.Info("incoming deal")
if _, ok := c.conns[deal.ProposalCid]; ok {
@ -167,7 +167,7 @@ type ClientDealProposal struct {
}
func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, error) {
proposal := actors.StorageDealProposal{
proposal := &actors.StorageDealProposal{
PieceRef: p.Data.Bytes(),
PieceSize: p.DataSize,
PieceSerialization: actors.SerializationUnixFSv0,
@ -179,7 +179,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro
StorageCollateral: types.NewInt(p.DataSize), // TODO: real calc
}
if err := api.SignWith(ctx, c.w.Sign, p.Client, &proposal); err != nil {
if err := api.SignWith(ctx, c.w.Sign, p.Client, proposal); err != nil {
return cid.Undef, xerrors.Errorf("signing deal proposal failed: %w", err)
}
@ -199,9 +199,9 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro
return cid.Undef, xerrors.Errorf("sending proposal to storage provider failed: %w", err)
}
deal := ClientDeal{
deal := &ClientDeal{
ProposalCid: proposalNd.Cid(),
Proposal: proposal,
Proposal: *proposal,
State: api.DealUnknown,
Miner: p.MinerID,

View File

@ -38,7 +38,7 @@ func (c *Client) new(ctx context.Context, deal ClientDeal) error {
}
// TODO: spec says it's optional
pubmsg, err := c.chain.GetMessage(resp.PublishMessage)
pubmsg, err := c.chain.GetMessage(*resp.PublishMessage)
if err != nil {
return xerrors.Errorf("getting deal pubsish message: %w", err)
}
@ -56,7 +56,7 @@ func (c *Client) new(ctx context.Context, deal ClientDeal) error {
}
// TODO: timeout
_, ret, err := c.sm.WaitForMessage(ctx, resp.PublishMessage)
_, ret, err := c.sm.WaitForMessage(ctx, *resp.PublishMessage)
if err != nil {
return xerrors.Errorf("Waiting for deal publish message: %w", err)
}

View File

@ -141,7 +141,7 @@ func (p *Provider) onIncoming(deal MinerDeal) {
p.conns[deal.ProposalCid] = deal.s
if err := p.deals.Begin(deal.ProposalCid, deal); err != nil {
if err := p.deals.Begin(deal.ProposalCid, &deal); err != nil {
// This can happen when client re-sends proposal
p.failDeal(deal.ProposalCid, err)
log.Errorf("deal tracking failed: %s", err)
@ -191,7 +191,7 @@ func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) {
}
func (p *Provider) newDeal(s inet.Stream, proposal actors.StorageDealProposal) (MinerDeal, error) {
proposalNd, err := cborrpc.AsIpld(proposal)
proposalNd, err := cborrpc.AsIpld(&proposal)
if err != nil {
return MinerDeal{}, err
}

View File

@ -80,7 +80,7 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
// check market funds
clientMarketBalance, err := p.full.StateMarketBalance(ctx, deal.Proposal.Client, nil)
if err != nil {
return nil, err
return nil, xerrors.Errorf("getting client market balance failed: %w", err)
}
// This doesn't guarantee that the client won't withdraw / lock those funds
@ -89,9 +89,9 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
return nil, xerrors.New("clientMarketBalance.Available too small")
}
providerMarketBalance, err := p.full.StateMarketBalance(ctx, deal.Proposal.Client, nil)
providerMarketBalance, err := p.full.StateMarketBalance(ctx, deal.Proposal.Provider, nil)
if err != nil {
return nil, err
return nil, xerrors.Errorf("getting provider market balance failed: %w", err)
}
// TODO: this needs to be atomic
@ -124,11 +124,12 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
}
log.Info("fetching data for a deal")
err = p.sendSignedResponse(Response{
mcid := smsg.Cid()
err = p.sendSignedResponse(&Response{
State: api.DealAccepted,
Message: "",
Proposal: deal.ProposalCid,
PublishMessage: smsg.Cid(),
PublishMessage: &mcid,
})
if err != nil {
return nil, err
@ -140,7 +141,7 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
// STAGED
func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
err := p.sendSignedResponse(Response{
err := p.sendSignedResponse(&Response{
State: api.DealStaged,
Proposal: deal.ProposalCid,
})
@ -205,7 +206,7 @@ func (p *Provider) waitSealed(ctx context.Context, deal MinerDeal) (sectorbuilde
}
func (p *Provider) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
err := p.sendSignedResponse(Response{
err := p.sendSignedResponse(&Response{
State: api.DealSealing,
Proposal: deal.ProposalCid,
})
@ -231,11 +232,11 @@ func (p *Provider) complete(ctx context.Context, deal MinerDeal) (func(*MinerDea
log.Warnf("Waiting for sector commitment message: %s", err)
}
err = p.sendSignedResponse(Response{
err = p.sendSignedResponse(&Response{
State: api.DealComplete,
Proposal: deal.ProposalCid,
CommitMessage: mcid,
CommitMessage: &mcid,
})
if err != nil {
log.Warnf("Sending deal response failed: %s", err)

View File

@ -12,7 +12,6 @@ import (
"github.com/filecoin-project/lotus/lib/cborrpc"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
inet "github.com/libp2p/go-libp2p-core/network"
"golang.org/x/xerrors"
)
@ -29,7 +28,7 @@ func (p *Provider) failDeal(id cid.Cid, cerr error) {
log.Errorf("deal %s failed: %s", id, cerr)
err := p.sendSignedResponse(Response{
err := p.sendSignedResponse(&Response{
State: api.DealFailed,
Message: cerr.Error(),
Proposal: id,
@ -67,13 +66,13 @@ func (p *Provider) readProposal(s inet.Stream) (proposal actors.StorageDealPropo
return
}
func (p *Provider) sendSignedResponse(resp Response) error {
func (p *Provider) sendSignedResponse(resp *Response) error {
s, ok := p.conns[resp.Proposal]
if !ok {
return xerrors.New("couldn't send response: not connected")
}
msg, err := cbor.DumpObject(&resp)
msg, err := cborrpc.Dump(resp)
if err != nil {
return xerrors.Errorf("serializing response: %w", err)
}
@ -88,8 +87,8 @@ func (p *Provider) sendSignedResponse(resp Response) error {
return xerrors.Errorf("failed to sign response message: %w", err)
}
signedResponse := SignedResponse{
Response: resp,
signedResponse := &SignedResponse{
Response: *resp,
Signature: sig,
}

View File

@ -76,13 +76,13 @@ func (st *MinerStateStore) MutateMiner(i cid.Cid, mutator func(*MinerDeal) error
func minerMutator(m func(*MinerDeal) error) func([]byte) ([]byte, error) {
return func(in []byte) ([]byte, error) {
var deal MinerDeal
err := cborrpc.ReadCborRPC(bytes.NewReader(in), &deal)
deal := new(MinerDeal)
err := cborrpc.ReadCborRPC(bytes.NewReader(in), deal)
if err != nil {
return nil, err
}
if err := m(&deal); err != nil {
if err := m(deal); err != nil {
return nil, err
}
@ -100,13 +100,13 @@ func (st *ClientStateStore) MutateClient(i cid.Cid, mutator func(*ClientDeal) er
func clientMutator(m func(*ClientDeal) error) func([]byte) ([]byte, error) {
return func(in []byte) ([]byte, error) {
var deal ClientDeal
err := cborrpc.ReadCborRPC(bytes.NewReader(in), &deal)
deal := new(ClientDeal)
err := cborrpc.ReadCborRPC(bytes.NewReader(in), deal)
if err != nil {
return nil, err
}
if err := m(&deal); err != nil {
if err := m(deal); err != nil {
return nil, err
}

View File

@ -23,11 +23,11 @@ type Response struct {
Proposal cid.Cid
// DealAccepted
StorageDeal actors.StorageDeal
PublishMessage cid.Cid
StorageDeal *actors.StorageDeal
PublishMessage *cid.Cid
// DealComplete
CommitMessage cid.Cid
CommitMessage *cid.Cid
}
// TODO: Do we actually need this to be signed?

View File

@ -233,7 +233,7 @@ func (a *StateAPI) StateListActors(ctx context.Context, ts *types.TipSet) ([]add
func (a *StateAPI) StateMarketBalance(ctx context.Context, addr address.Address, ts *types.TipSet) (actors.StorageParticipantBalance, error) {
var state actors.StorageMarketState
if _, err := a.StateManager.LoadActorState(ctx, actors.StoragePowerAddress, &state, ts); err != nil {
if _, err := a.StateManager.LoadActorState(ctx, actors.StorageMarketAddress, &state, ts); err != nil {
return actors.StorageParticipantBalance{}, err
}
cst := hamt.CSTFromBstore(a.StateManager.ChainStore().Blockstore())
@ -249,7 +249,7 @@ func (a *StateAPI) StateMarketParticipants(ctx context.Context, ts *types.TipSet
out := map[string]actors.StorageParticipantBalance{}
var state actors.StorageMarketState
if _, err := a.StateManager.LoadActorState(ctx, actors.StoragePowerAddress, &state, ts); err != nil {
if _, err := a.StateManager.LoadActorState(ctx, actors.StorageMarketAddress, &state, ts); err != nil {
return nil, err
}
cst := hamt.CSTFromBstore(a.StateManager.ChainStore().Blockstore())
@ -281,7 +281,7 @@ func (a *StateAPI) StateMarketDeals(ctx context.Context, ts *types.TipSet) (map[
out := map[string]actors.OnChainDeal{}
var state actors.StorageMarketState
if _, err := a.StateManager.LoadActorState(ctx, actors.StoragePowerAddress, &state, ts); err != nil {
if _, err := a.StateManager.LoadActorState(ctx, actors.StorageMarketAddress, &state, ts); err != nil {
return nil, err
}