deals: Set correct Refs

This commit is contained in:
Łukasz Magiera 2019-11-06 18:38:42 +01:00
parent 40b1f91843
commit 68c2d4f58a
11 changed files with 123 additions and 67 deletions

View File

@ -118,7 +118,7 @@ func (t *Proposal) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull) _, err := w.Write(cbg.CborNull)
return err return err
} }
if _, err := w.Write([]byte{129}); err != nil { if _, err := w.Write([]byte{130}); err != nil {
return err return err
} }
@ -126,6 +126,13 @@ func (t *Proposal) MarshalCBOR(w io.Writer) error {
if err := t.DealProposal.MarshalCBOR(w); err != nil { if err := t.DealProposal.MarshalCBOR(w); err != nil {
return err return err
} }
// t.t.Piece (cid.Cid) (struct)
if err := cbg.WriteCid(w, t.Piece); err != nil {
return xerrors.Errorf("failed to write cid field t.Piece: %w", err)
}
return nil return nil
} }
@ -140,7 +147,7 @@ func (t *Proposal) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type array") return fmt.Errorf("cbor input should be of type array")
} }
if extra != 1 { if extra != 2 {
return fmt.Errorf("cbor input had wrong number of fields") return fmt.Errorf("cbor input had wrong number of fields")
} }
@ -148,9 +155,33 @@ func (t *Proposal) UnmarshalCBOR(r io.Reader) error {
{ {
if err := t.DealProposal.UnmarshalCBOR(br); err != nil { pb, err := br.PeekByte()
if err != nil {
return err return err
} }
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
return err
}
} else {
t.DealProposal = new(actors.StorageDealProposal)
if err := t.DealProposal.UnmarshalCBOR(br); err != nil {
return err
}
}
}
// t.t.Piece (cid.Cid) (struct)
{
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.Piece: %w", err)
}
t.Piece = c
} }
return nil return nil

View File

@ -201,25 +201,25 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro
} }
} }
dataSize, err := c.dataSize(ctx, p.Data) commP, pieceSize, err := c.commP(ctx, p.Data)
proposal := &actors.StorageDealProposal{ dealProposal := &actors.StorageDealProposal{
PieceRef: p.Data.Bytes(), PieceRef: commP,
PieceSize: uint64(dataSize), PieceSize: uint64(pieceSize),
PieceSerialization: actors.SerializationUnixFSv0, PieceSerialization: actors.SerializationUnixFSv0,
Client: p.Client, Client: p.Client,
Provider: p.ProviderAddress, Provider: p.ProviderAddress,
ProposalExpiration: p.ProposalExpiration, ProposalExpiration: p.ProposalExpiration,
Duration: p.Duration, Duration: p.Duration,
StoragePricePerEpoch: p.PricePerEpoch, StoragePricePerEpoch: p.PricePerEpoch,
StorageCollateral: types.NewInt(uint64(dataSize)), // TODO: real calc StorageCollateral: types.NewInt(uint64(pieceSize)), // TODO: real calc
} }
if err := api.SignWith(ctx, c.w.Sign, p.Client, proposal); err != nil { if err := api.SignWith(ctx, c.w.Sign, p.Client, dealProposal); err != nil {
return cid.Undef, xerrors.Errorf("signing deal proposal failed: %w", err) return cid.Undef, xerrors.Errorf("signing deal proposal failed: %w", err)
} }
proposalNd, err := cborrpc.AsIpld(proposal) proposalNd, err := cborrpc.AsIpld(dealProposal)
if err != nil { if err != nil {
return cid.Undef, xerrors.Errorf("getting proposal node failed: %w", err) return cid.Undef, xerrors.Errorf("getting proposal node failed: %w", err)
} }
@ -230,6 +230,11 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro
return cid.Undef, xerrors.Errorf("connecting to storage provider failed: %w", err) return cid.Undef, xerrors.Errorf("connecting to storage provider failed: %w", err)
} }
proposal := &Proposal{
DealProposal: dealProposal,
Piece: p.Data,
}
if err := cborrpc.WriteCborRPC(s, proposal); err != nil { if err := cborrpc.WriteCborRPC(s, proposal); err != nil {
s.Reset() s.Reset()
return cid.Undef, xerrors.Errorf("sending proposal to storage provider failed: %w", err) return cid.Undef, xerrors.Errorf("sending proposal to storage provider failed: %w", err)
@ -237,7 +242,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro
deal := &ClientDeal{ deal := &ClientDeal{
ProposalCid: proposalNd.Cid(), ProposalCid: proposalNd.Cid(),
Proposal: *proposal, Proposal: *dealProposal,
State: api.DealUnknown, State: api.DealUnknown,
Miner: p.MinerID, Miner: p.MinerID,
@ -247,7 +252,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro
c.incoming <- deal c.incoming <- deal
return deal.ProposalCid, c.discovery.AddPeer(p.Data, discovery.RetrievalPeer{ return deal.ProposalCid, c.discovery.AddPeer(p.Data, discovery.RetrievalPeer{
Address: proposal.Provider, Address: dealProposal.Provider,
ID: deal.Miner, ID: deal.Miner,
}) })
} }

View File

@ -10,6 +10,8 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/lib/cborrpc" "github.com/filecoin-project/lotus/lib/cborrpc"
"github.com/filecoin-project/lotus/lib/padreader"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
) )
func (c *Client) failDeal(id cid.Cid, cerr error) { func (c *Client) failDeal(id cid.Cid, cerr error) {
@ -28,26 +30,38 @@ func (c *Client) failDeal(id cid.Cid, cerr error) {
log.Errorf("deal %s failed: %+v", id, cerr) log.Errorf("deal %s failed: %+v", id, cerr)
} }
func (c *Client) dataSize(ctx context.Context, data cid.Cid) (int64, error) { func (c *Client) commP(ctx context.Context, data cid.Cid) ([]byte, uint64, error) {
root, err := c.dag.Get(ctx, data) root, err := c.dag.Get(ctx, data)
if err != nil { if err != nil {
log.Errorf("failed to get file root for deal: %s", err) log.Errorf("failed to get file root for deal: %s", err)
return 0, err return nil, 0, err
} }
n, err := unixfile.NewUnixfsFile(ctx, c.dag, root) n, err := unixfile.NewUnixfsFile(ctx, c.dag, root)
if err != nil { if err != nil {
log.Errorf("cannot open unixfs file: %s", err) log.Errorf("cannot open unixfs file: %s", err)
return 0, err return nil, 0, err
} }
uf, ok := n.(files.File) uf, ok := n.(files.File)
if !ok { if !ok {
// TODO: we probably got directory, how should we handle this in unixfs mode? // TODO: we probably got directory, how should we handle this in unixfs mode?
return 0, xerrors.New("unsupported unixfs type") return nil, 0, xerrors.New("unsupported unixfs type")
} }
return uf.Size() s, err := uf.Size()
if err != nil {
return nil, 0, err
}
pr, psize := padreader.New(uf, uint64(s))
commp, err := sectorbuilder.GeneratePieceCommitment(pr, psize)
if err != nil {
return nil, 0, xerrors.Errorf("generating CommP: %w", err)
}
return commp[:], psize, nil
} }
func (c *Client) readStorageDealResp(deal ClientDeal) (*Response, error) { func (c *Client) readStorageDealResp(deal ClientDeal) (*Response, error) {

View File

@ -192,24 +192,19 @@ func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) {
} }
} }
func (p *Provider) newDeal(s inet.Stream, proposal actors.StorageDealProposal) (MinerDeal, error) { func (p *Provider) newDeal(s inet.Stream, proposal Proposal) (MinerDeal, error) {
proposalNd, err := cborrpc.AsIpld(&proposal) proposalNd, err := cborrpc.AsIpld(&proposal)
if err != nil { if err != nil {
return MinerDeal{}, err return MinerDeal{}, err
} }
ref, err := cid.Cast(proposal.PieceRef)
if err != nil {
return MinerDeal{}, err
}
return MinerDeal{ return MinerDeal{
Client: s.Conn().RemotePeer(), Client: s.Conn().RemotePeer(),
Proposal: proposal, Proposal: *proposal.DealProposal,
ProposalCid: proposalNd.Cid(), ProposalCid: proposalNd.Cid(),
State: api.DealUnknown, State: api.DealUnknown,
Ref: ref, Ref: proposal.Piece,
s: s, s: s,
}, nil }, nil

View File

@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"context" "context"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-merkledag" "github.com/ipfs/go-merkledag"
unixfile "github.com/ipfs/go-unixfs/file" unixfile "github.com/ipfs/go-unixfs/file"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -223,12 +222,7 @@ func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
return nil, xerrors.Errorf("deal.Proposal.PieceSize didn't match unixfs file size") return nil, xerrors.Errorf("deal.Proposal.PieceSize didn't match unixfs file size")
} }
pcid, err := cid.Cast(deal.Proposal.PieceRef) sectorID, err := p.secb.AddUnixfsPiece(deal.Ref, uf, deal.DealID)
if err != nil {
return nil, err
}
sectorID, err := p.secb.AddUnixfsPiece(pcid, uf, deal.DealID)
if err != nil { if err != nil {
return nil, xerrors.Errorf("AddPiece failed: %s", err) return nil, xerrors.Errorf("AddPiece failed: %s", err)
} }

View File

@ -5,7 +5,6 @@ import (
"runtime" "runtime"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
@ -45,21 +44,21 @@ func (p *Provider) failDeal(id cid.Cid, cerr error) {
} }
} }
func (p *Provider) readProposal(s inet.Stream) (proposal actors.StorageDealProposal, err error) { func (p *Provider) readProposal(s inet.Stream) (proposal Proposal, err error) {
if err := cborrpc.ReadCborRPC(s, &proposal); err != nil { if err := cborrpc.ReadCborRPC(s, &proposal); err != nil {
log.Errorw("failed to read proposal message", "error", err) log.Errorw("failed to read proposal message", "error", err)
return proposal, err return proposal, err
} }
if err := proposal.Verify(); err != nil { if err := proposal.DealProposal.Verify(); err != nil {
return proposal, xerrors.Errorf("verifying StorageDealProposal: %w", err) return proposal, xerrors.Errorf("verifying StorageDealProposal: %w", err)
} }
// TODO: Validate proposal maybe // TODO: Validate proposal maybe
// (and signature, obviously) // (and signature, obviously)
if proposal.Provider != p.actor { if proposal.DealProposal.Provider != p.actor {
log.Errorf("proposal with wrong ProviderAddress: %s", proposal.Provider) log.Errorf("proposal with wrong ProviderAddress: %s", proposal.DealProposal.Provider)
return proposal, err return proposal, err
} }

View File

@ -8,11 +8,13 @@ import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
) )
const DealProtocolID = "/fil/storage/mk/1.0.0" const DealProtocolID = "/fil/storage/mk/1.0.1"
const AskProtocolID = "/fil/storage/ask/1.0.0" const AskProtocolID = "/fil/storage/ask/1.0.1"
type Proposal struct { type Proposal struct {
DealProposal actors.StorageDealProposal DealProposal *actors.StorageDealProposal
Piece cid.Cid // Used for retrieving from the client
} }
type Response struct { type Response struct {

View File

@ -0,0 +1,38 @@
package padreader
import (
"io"
"math/bits"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
)
func PaddedSize(size uint64) uint64 {
logv := 64 - bits.LeadingZeros64(size)
sectSize := uint64(1 << logv)
bound := sectorbuilder.GetMaxUserBytesPerStagedSector(sectSize)
if size <= bound {
return bound
}
return sectorbuilder.GetMaxUserBytesPerStagedSector(1 << (logv + 1))
}
type nullReader struct{}
func (nr nullReader) Read(b []byte) (int, error) {
for i := range b {
b[i] = 0
}
return len(b), nil
}
func New(r io.Reader, size uint64) (io.Reader, uint64) {
padSize := PaddedSize(size)
return io.MultiReader(
io.LimitReader(r, int64(size)),
io.LimitReader(nullReader{}, int64(padSize-size)),
), padSize
}

View File

@ -88,7 +88,6 @@ func (a *API) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.A
} }
c, err := a.DealClient.Start(ctx, proposal) c, err := a.DealClient.Start(ctx, proposal)
// TODO: send updated voucher with PaymentVerifySector for cheaper validation (validate the sector the miner sent us first!)
return &c, err return &c, err
} }

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"math/bits"
"sync" "sync"
"github.com/filecoin-project/go-sectorbuilder/sealing_state" "github.com/filecoin-project/go-sectorbuilder/sealing_state"
@ -60,32 +59,8 @@ func (s *Store) SectorStatus(sid uint64) (*sectorbuilder.SectorSealingStatus, er
return &status, nil return &status, nil
} }
func computePaddedSize(size uint64) uint64 {
logv := 64 - bits.LeadingZeros64(size)
sectSize := uint64(1 << logv)
bound := sectorbuilder.UserBytesForSectorSize(sectSize)
if size <= bound {
return bound
}
return sectorbuilder.UserBytesForSectorSize(1 << (logv + 1))
}
type nullReader struct{}
func (nr nullReader) Read(b []byte) (int, error) {
for i := range b {
b[i] = 0
}
return len(b), nil
}
func (s *Store) AddPiece(ref string, size uint64, r io.Reader, dealIDs ...uint64) (sectorID uint64, err error) { func (s *Store) AddPiece(ref string, size uint64, r io.Reader, dealIDs ...uint64) (sectorID uint64, err error) {
padSize := computePaddedSize(size) sectorID, err = s.sb.AddPiece(ref, size, r)
r = io.MultiReader(r, io.LimitReader(nullReader{}, int64(padSize-size)))
sectorID, err = s.sb.AddPiece(ref, padSize, r)
if err != nil { if err != nil {
return 0, err return 0, err
} }

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"errors" "errors"
"io"
"sync" "sync"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -130,6 +131,9 @@ func (r *refStorer) Read(p []byte) (n int, err error) {
for { for {
data, offset, nd, err := r.blockReader.ReadBlock(context.TODO()) data, offset, nd, err := r.blockReader.ReadBlock(context.TODO())
if err != nil { if err != nil {
if err == io.EOF {
return 0, io.EOF
}
return 0, xerrors.Errorf("reading block: %w", err) return 0, xerrors.Errorf("reading block: %w", err)
} }