deals: disconnect early, rely on chain

This commit is contained in:
Łukasz Magiera 2019-11-07 13:57:00 +01:00
parent eca5501923
commit 79c9fb719e
6 changed files with 35 additions and 89 deletions

View File

@ -192,7 +192,7 @@ func (t *Response) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{134}); err != nil {
if _, err := w.Write([]byte{133}); err != nil {
return err
}
@ -232,18 +232,6 @@ func (t *Response) MarshalCBOR(w io.Writer) error {
}
}
// t.t.CommitMessage (cid.Cid) (struct)
if t.CommitMessage == nil {
if _, err := w.Write(cbg.CborNull); err != nil {
return err
}
} else {
if err := cbg.WriteCid(w, *t.CommitMessage); err != nil {
return xerrors.Errorf("failed to write cid field t.CommitMessage: %w", err)
}
}
return nil
}
@ -258,7 +246,7 @@ func (t *Response) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 6 {
if extra != 5 {
return fmt.Errorf("cbor input had wrong number of fields")
}
@ -338,30 +326,6 @@ func (t *Response) UnmarshalCBOR(r io.Reader) error {
t.PublishMessage = &c
}
}
// t.t.CommitMessage (cid.Cid) (struct)
{
pb, err := br.PeekByte()
if err != nil {
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
return err
}
} else {
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.CommitMessage: %w", err)
}
t.CommitMessage = &c
}
}
return nil
}

View File

@ -41,6 +41,9 @@ func (c *Client) new(ctx context.Context, deal ClientDeal) error {
if err != nil {
return err
}
if err := c.disconnect(deal); err != nil {
return err
}
if resp.State != api.DealAccepted {
return xerrors.Errorf("deal wasn't accepted (State=%d)", resp.State)
@ -102,40 +105,12 @@ func (c *Client) accepted(ctx context.Context, deal ClientDeal) error {
}
func (c *Client) staged(ctx context.Context, deal ClientDeal) error {
/* miner seals our data, hopefully */
resp, err := c.readStorageDealResp(deal)
if err != nil {
return err
}
if resp.State != api.DealSealing {
return xerrors.Errorf("deal wasn't sealed (State=%d)", resp.State)
}
log.Info("DEAL SEALED!")
// TODO: want?
/*ssize, err := stmgr.GetMinerSectorSize(ctx, c.sm, nil, deal.Proposal.MinerAddress)
if err != nil {
return xerrors.Errorf("failed to get miner sector size: %w", err)
}
ok, err := sectorbuilder.VerifyPieceInclusionProof(ssize, deal.Proposal.Size, deal.Proposal.CommP, resp.CommD, resp.PieceInclusionProof.ProofElements)
if err != nil {
return xerrors.Errorf("verifying piece inclusion proof in staged deal %s: %w", deal.ProposalCid, err)
}
if !ok {
return xerrors.Errorf("verifying piece inclusion proof in staged deal %s failed", deal.ProposalCid)
}*/
// wait
return nil
}
func (c *Client) sealing(ctx context.Context, deal ClientDeal) error {
// TODO: disconnect
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
sd, err := stmgr.GetStorageDeal(ctx, c.sm, deal.DealID, ts)
if err != nil {

View File

@ -85,3 +85,14 @@ func (c *Client) readStorageDealResp(deal ClientDeal) (*Response, error) {
return &resp.Response, nil
}
func (c *Client) disconnect(deal ClientDeal) error {
s, ok := c.conns[deal.ProposalCid]
if !ok {
return nil
}
err := s.Close()
delete(c.conns, deal.ProposalCid)
return err
}

View File

@ -171,15 +171,20 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
log.Info("fetching data for a deal")
mcid := smsg.Cid()
err = p.sendSignedResponse(&Response{
State: api.DealAccepted,
Message: "",
State: api.DealAccepted,
Proposal: deal.ProposalCid,
PublishMessage: &mcid,
StorageDeal: &storageDeal,
})
if err != nil {
return nil, err
}
if err := p.disconnect(deal); err != nil {
log.Warnf("closing client connection: %+v", err)
}
return func(deal *MinerDeal) {
deal.DealID = resp.DealIDs[0]
}, merkledag.FetchGraph(ctx, deal.Ref, p.dag)
@ -188,14 +193,6 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
// STAGED
func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
err := p.sendSignedResponse(&Response{
State: api.DealStaged,
Proposal: deal.ProposalCid,
})
if err != nil {
log.Warnf("Sending deal response failed: %s", err)
}
root, err := p.dag.Get(ctx, deal.Ref)
if err != nil {
return nil, xerrors.Errorf("failed to get file root for deal: %s", err)
@ -238,15 +235,6 @@ func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
// SEALING
func (p *Provider) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
err := p.sendSignedResponse(&Response{
State: api.DealSealing,
Proposal: deal.ProposalCid,
// TODO: Send sector ID
})
if err != nil {
log.Warnf("Sending deal response failed: %s", err)
}
log.Info("About to seal sector!", deal.ProposalCid, deal.SectorID)
if err := p.sminer.SealSector(ctx, deal.SectorID); err != nil {
return nil, xerrors.Errorf("sealing sector failed: %w", err)

View File

@ -100,6 +100,17 @@ func (p *Provider) sendSignedResponse(resp *Response) error {
return err
}
func (p *Provider) disconnect(deal MinerDeal) error {
s, ok := p.conns[deal.ProposalCid]
if !ok {
return nil
}
err := s.Close()
delete(p.conns, deal.ProposalCid)
return err
}
func (p *Provider) getWorker(miner address.Address) (address.Address, error) {
getworker := &types.Message{
To: miner,

View File

@ -27,9 +27,6 @@ type Response struct {
// DealAccepted
StorageDeal *actors.StorageDeal
PublishMessage *cid.Cid
// DealComplete
CommitMessage *cid.Cid
}
// TODO: Do we actually need this to be signed?