retrieval: Make types more spec complaiant

This commit is contained in:
Łukasz Magiera 2019-08-29 17:43:45 +02:00
parent fe6d5ff3a8
commit f79b755c58
5 changed files with 59 additions and 29 deletions

1
go.mod
View File

@ -53,7 +53,6 @@ require (
github.com/libp2p/go-libp2p-tls v0.1.0 github.com/libp2p/go-libp2p-tls v0.1.0
github.com/libp2p/go-libp2p-yamux v0.2.1 github.com/libp2p/go-libp2p-yamux v0.2.1
github.com/libp2p/go-maddr-filter v0.0.5 github.com/libp2p/go-maddr-filter v0.0.5
github.com/libp2p/go-msgio v0.0.4
github.com/miekg/dns v1.1.16 // indirect github.com/miekg/dns v1.1.16 // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1
github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/go-homedir v1.1.0

View File

@ -1,16 +1,27 @@
package cborrpc package cborrpc
import ( import (
"encoding/hex"
"io" "io"
cbor "github.com/ipfs/go-ipld-cbor" cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log"
cbg "github.com/whyrusleeping/cbor-gen" cbg "github.com/whyrusleeping/cbor-gen"
) )
const MessageSizeLimit = 1 << 20 var log = logging.Logger("cborrrpc")
const Debug = false
func init() {
if Debug {
log.Warn("CBOR-RPC Debugging enabled")
}
}
func WriteCborRPC(w io.Writer, obj interface{}) error { func WriteCborRPC(w io.Writer, obj interface{}) error {
if m, ok := obj.(cbg.CBORMarshaler); ok { if m, ok := obj.(cbg.CBORMarshaler); ok {
// TODO: impl debug
return m.MarshalCBOR(w) return m.MarshalCBOR(w)
} }
data, err := cbor.DumpObject(obj) data, err := cbor.DumpObject(obj)
@ -18,6 +29,10 @@ func WriteCborRPC(w io.Writer, obj interface{}) error {
return err return err
} }
if Debug {
log.Infof("> %s", hex.EncodeToString(data))
}
_, err = w.Write(data) _, err = w.Write(data)
return err return err
} }

View File

@ -83,13 +83,13 @@ type clientStream struct {
// //
// Offset MUST be aligned on chunking boundaries, size is rounded up to leaf size // Offset MUST be aligned on chunking boundaries, size is rounded up to leaf size
// //
// > Deal{Mode: Unixfs0, RootCid, Offset, Size, Payment(nil if free)} // > DealProposal{Mode: Unixfs0, RootCid, Offset, Size, Payment(nil if free)}
// < Resp{Accept} // < Resp{Accept}
// < ..(Intermediate Block) // < ..(Intermediate Block)
// < ..Blocks // < ..Blocks
// < ..(Intermediate Block) // < ..(Intermediate Block)
// < ..Blocks // < ..Blocks
// > Deal(...) // > DealProposal(...)
// < ... // < ...
func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, miner peer.ID, minerAddr address.Address, out io.Writer) error { func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, miner peer.ID, minerAddr address.Address, out io.Writer) error {
s, err := c.h.NewStream(ctx, miner, ProtocolID) s, err := c.h.NewStream(ctx, miner, ProtocolID)
@ -131,11 +131,15 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64,
} }
func (cst *clientStream) doOneExchange(toFetch uint64, out io.Writer) error { func (cst *clientStream) doOneExchange(toFetch uint64, out io.Writer) error {
deal := Deal{Unixfs0: &Unixfs0Offer{ deal := DealProposal{
Root: cst.root, Ref: cst.root,
Offset: cst.offset, Params: RetParams{
Size: toFetch, Unixfs0: &Unixfs0Offer{
}} Offset: cst.offset,
Size: toFetch,
},
},
}
if err := cborrpc.WriteCborRPC(cst.stream, deal); err != nil { if err := cborrpc.WriteCborRPC(cst.stream, deal); err != nil {
return err return err

View File

@ -104,7 +104,7 @@ func (m *Miner) HandleDealStream(stream network.Stream) {
} }
func (hnd *handlerDeal) handleNext() (bool, error) { func (hnd *handlerDeal) handleNext() (bool, error) {
var deal Deal var deal DealProposal
if err := cborrpc.ReadCborRPC(hnd.stream, &deal); err != nil { if err := cborrpc.ReadCborRPC(hnd.stream, &deal); err != nil {
if err == io.EOF { // client sent all deals if err == io.EOF { // client sent all deals
err = nil err = nil
@ -112,22 +112,24 @@ func (hnd *handlerDeal) handleNext() (bool, error) {
return false, err return false, err
} }
if deal.Unixfs0 == nil { if deal.Params.Unixfs0 == nil {
return false, xerrors.New("unknown deal type") return false, xerrors.New("unknown deal type")
} }
unixfs0 := deal.Params.Unixfs0
// TODO: Verify payment, check how much we can send based on that // TODO: Verify payment, check how much we can send based on that
// Or reject (possibly returning the payment to retain reputation with the client) // Or reject (possibly returning the payment to retain reputation with the client)
if hnd.open != deal.Unixfs0.Root || hnd.at != deal.Unixfs0.Offset { if hnd.open != deal.Ref || hnd.at != unixfs0.Offset {
log.Infof("opening file for sending (open '%s') (@%d, want %d)", hnd.open, hnd.at, deal.Unixfs0.Offset) log.Infof("opening file for sending (open '%s') (@%d, want %d)", hnd.open, hnd.at, unixfs0.Offset)
if err := hnd.openFile(deal); err != nil { if err := hnd.openFile(deal); err != nil {
return false, err return false, err
} }
} }
if deal.Unixfs0.Offset+deal.Unixfs0.Size > hnd.size { if unixfs0.Offset+unixfs0.Size > hnd.size {
return false, xerrors.Errorf("tried to read too much %d+%d > %d", deal.Unixfs0.Offset, deal.Unixfs0.Size, hnd.size) return false, xerrors.Errorf("tried to read too much %d+%d > %d", unixfs0.Offset, unixfs0.Size, hnd.size)
} }
err := hnd.accept(deal) err := hnd.accept(deal)
@ -137,19 +139,21 @@ func (hnd *handlerDeal) handleNext() (bool, error) {
return true, nil return true, nil
} }
func (hnd *handlerDeal) openFile(deal Deal) error { func (hnd *handlerDeal) openFile(deal DealProposal) error {
if deal.Unixfs0.Offset != 0 { unixfs0 := deal.Params.Unixfs0
if unixfs0.Offset != 0 {
// TODO: Implement SeekBlock (like ReadBlock) in go-unixfs // TODO: Implement SeekBlock (like ReadBlock) in go-unixfs
return xerrors.New("sending merkle proofs for nonzero offset not supported yet") return xerrors.New("sending merkle proofs for nonzero offset not supported yet")
} }
hnd.at = deal.Unixfs0.Offset hnd.at = unixfs0.Offset
bstore := hnd.m.sectorBlocks.SealedBlockstore(func() error { bstore := hnd.m.sectorBlocks.SealedBlockstore(func() error {
return nil // TODO: approve unsealing based on amount paid return nil // TODO: approve unsealing based on amount paid
}) })
ds := merkledag.NewDAGService(blockservice.New(bstore, nil)) ds := merkledag.NewDAGService(blockservice.New(bstore, nil))
rootNd, err := ds.Get(context.TODO(), deal.Unixfs0.Root) rootNd, err := ds.Get(context.TODO(), deal.Ref)
if err != nil { if err != nil {
return err return err
} }
@ -162,7 +166,7 @@ func (hnd *handlerDeal) openFile(deal Deal) error {
var ok bool var ok bool
hnd.ufsr, ok = fsr.(sectorblocks.UnixfsReader) hnd.ufsr, ok = fsr.(sectorblocks.UnixfsReader)
if !ok { if !ok {
return xerrors.Errorf("file %s didn't implement sectorblocks.UnixfsReader", deal.Unixfs0.Root) return xerrors.Errorf("file %s didn't implement sectorblocks.UnixfsReader", deal.Ref)
} }
isize, err := hnd.ufsr.Size() isize, err := hnd.ufsr.Size()
@ -171,12 +175,14 @@ func (hnd *handlerDeal) openFile(deal Deal) error {
} }
hnd.size = uint64(isize) hnd.size = uint64(isize)
hnd.open = deal.Unixfs0.Root hnd.open = deal.Ref
return nil return nil
} }
func (hnd *handlerDeal) accept(deal Deal) error { func (hnd *handlerDeal) accept(deal DealProposal) error {
unixfs0 := deal.Params.Unixfs0
resp := DealResponse{ resp := DealResponse{
Status: Accepted, Status: Accepted,
} }
@ -185,7 +191,7 @@ func (hnd *handlerDeal) accept(deal Deal) error {
return err return err
} }
blocksToSend := (deal.Unixfs0.Size + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize blocksToSend := (unixfs0.Size + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize
for i := uint64(0); i < blocksToSend; { for i := uint64(0); i < blocksToSend; {
data, offset, nd, err := hnd.ufsr.ReadBlock(context.TODO()) data, offset, nd, err := hnd.ufsr.ReadBlock(context.TODO())
if err != nil { if err != nil {
@ -194,8 +200,8 @@ func (hnd *handlerDeal) accept(deal Deal) error {
log.Infof("sending block for a deal: %s", nd.Cid()) log.Infof("sending block for a deal: %s", nd.Cid())
if offset != deal.Unixfs0.Offset { if offset != unixfs0.Offset {
return xerrors.Errorf("ReadBlock on wrong offset: want %d, got %d", deal.Unixfs0.Offset, offset) return xerrors.Errorf("ReadBlock on wrong offset: want %d, got %d", unixfs0.Offset, offset)
} }
/*if uint64(len(data)) != deal.Unixfs0.Size { // TODO: Fix for internal nodes (and any other node too) /*if uint64(len(data)) != deal.Unixfs0.Size { // TODO: Fix for internal nodes (and any other node too)

View File

@ -21,10 +21,11 @@ const (
Accepted = iota Accepted = iota
Error Error
Rejected Rejected
Unsealing
) )
func init() { func init() {
cbor.RegisterCborType(Deal{}) cbor.RegisterCborType(DealProposal{})
cbor.RegisterCborType(Query{}) cbor.RegisterCborType(Query{})
cbor.RegisterCborType(QueryResponse{}) cbor.RegisterCborType(QueryResponse{})
@ -50,17 +51,22 @@ type QueryResponse struct {
} }
type Unixfs0Offer struct { // UNBORK type Unixfs0Offer struct { // UNBORK
Root cid.Cid
Offset uint64 Offset uint64
Size uint64 Size uint64
} }
type Deal struct { type RetParams struct {
Unixfs0 *Unixfs0Offer Unixfs0 *Unixfs0Offer
} }
type DealProposal struct {
Ref cid.Cid
Params RetParams
// TODO: payment
}
type DealResponse struct { type DealResponse struct {
Status int // TODO: make this more spec complainant Status int
Message string Message string
} }