From f79b755c5843b6df2e6347a2f93156d6bc207cdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 29 Aug 2019 17:43:45 +0200 Subject: [PATCH] retrieval: Make types more spec complaiant --- go.mod | 1 - lib/cborrpc/rpc.go | 17 ++++++++++++++++- retrieval/client.go | 18 +++++++++++------- retrieval/miner.go | 38 ++++++++++++++++++++++---------------- retrieval/types.go | 14 ++++++++++---- 5 files changed, 59 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index db54da84a..7aa974af9 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,6 @@ require ( github.com/libp2p/go-libp2p-tls v0.1.0 github.com/libp2p/go-libp2p-yamux v0.2.1 github.com/libp2p/go-maddr-filter v0.0.5 - github.com/libp2p/go-msgio v0.0.4 github.com/miekg/dns v1.1.16 // indirect github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 github.com/mitchellh/go-homedir v1.1.0 diff --git a/lib/cborrpc/rpc.go b/lib/cborrpc/rpc.go index db3553212..c02fc5298 100644 --- a/lib/cborrpc/rpc.go +++ b/lib/cborrpc/rpc.go @@ -1,16 +1,27 @@ package cborrpc import ( + "encoding/hex" "io" cbor "github.com/ipfs/go-ipld-cbor" + logging "github.com/ipfs/go-log" cbg "github.com/whyrusleeping/cbor-gen" ) -const MessageSizeLimit = 1 << 20 +var log = logging.Logger("cborrrpc") + +const Debug = false + +func init() { + if Debug { + log.Warn("CBOR-RPC Debugging enabled") + } +} func WriteCborRPC(w io.Writer, obj interface{}) error { if m, ok := obj.(cbg.CBORMarshaler); ok { + // TODO: impl debug return m.MarshalCBOR(w) } data, err := cbor.DumpObject(obj) @@ -18,6 +29,10 @@ func WriteCborRPC(w io.Writer, obj interface{}) error { return err } + if Debug { + log.Infof("> %s", hex.EncodeToString(data)) + } + _, err = w.Write(data) return err } diff --git a/retrieval/client.go b/retrieval/client.go index 8b21078d3..d46d9620c 100644 --- a/retrieval/client.go +++ b/retrieval/client.go @@ -83,13 +83,13 @@ type clientStream struct { // // Offset MUST be aligned on chunking boundaries, size is rounded up to leaf size // -// > Deal{Mode: Unixfs0, RootCid, Offset, Size, Payment(nil if free)} +// > DealProposal{Mode: Unixfs0, RootCid, Offset, Size, Payment(nil if free)} // < Resp{Accept} // < ..(Intermediate Block) // < ..Blocks // < ..(Intermediate Block) // < ..Blocks -// > Deal(...) +// > DealProposal(...) // < ... func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, miner peer.ID, minerAddr address.Address, out io.Writer) error { s, err := c.h.NewStream(ctx, miner, ProtocolID) @@ -131,11 +131,15 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, } func (cst *clientStream) doOneExchange(toFetch uint64, out io.Writer) error { - deal := Deal{Unixfs0: &Unixfs0Offer{ - Root: cst.root, - Offset: cst.offset, - Size: toFetch, - }} + deal := DealProposal{ + Ref: cst.root, + Params: RetParams{ + Unixfs0: &Unixfs0Offer{ + Offset: cst.offset, + Size: toFetch, + }, + }, + } if err := cborrpc.WriteCborRPC(cst.stream, deal); err != nil { return err diff --git a/retrieval/miner.go b/retrieval/miner.go index 9bb3f429d..bc1a95886 100644 --- a/retrieval/miner.go +++ b/retrieval/miner.go @@ -104,7 +104,7 @@ func (m *Miner) HandleDealStream(stream network.Stream) { } func (hnd *handlerDeal) handleNext() (bool, error) { - var deal Deal + var deal DealProposal if err := cborrpc.ReadCborRPC(hnd.stream, &deal); err != nil { if err == io.EOF { // client sent all deals err = nil @@ -112,22 +112,24 @@ func (hnd *handlerDeal) handleNext() (bool, error) { return false, err } - if deal.Unixfs0 == nil { + if deal.Params.Unixfs0 == nil { return false, xerrors.New("unknown deal type") } + unixfs0 := deal.Params.Unixfs0 + // TODO: Verify payment, check how much we can send based on that // Or reject (possibly returning the payment to retain reputation with the client) - if hnd.open != deal.Unixfs0.Root || hnd.at != deal.Unixfs0.Offset { - log.Infof("opening file for sending (open '%s') (@%d, want %d)", hnd.open, hnd.at, deal.Unixfs0.Offset) + if hnd.open != deal.Ref || hnd.at != unixfs0.Offset { + log.Infof("opening file for sending (open '%s') (@%d, want %d)", hnd.open, hnd.at, unixfs0.Offset) if err := hnd.openFile(deal); err != nil { return false, err } } - if deal.Unixfs0.Offset+deal.Unixfs0.Size > hnd.size { - return false, xerrors.Errorf("tried to read too much %d+%d > %d", deal.Unixfs0.Offset, deal.Unixfs0.Size, hnd.size) + if unixfs0.Offset+unixfs0.Size > hnd.size { + return false, xerrors.Errorf("tried to read too much %d+%d > %d", unixfs0.Offset, unixfs0.Size, hnd.size) } err := hnd.accept(deal) @@ -137,19 +139,21 @@ func (hnd *handlerDeal) handleNext() (bool, error) { return true, nil } -func (hnd *handlerDeal) openFile(deal Deal) error { - if deal.Unixfs0.Offset != 0 { +func (hnd *handlerDeal) openFile(deal DealProposal) error { + unixfs0 := deal.Params.Unixfs0 + + if unixfs0.Offset != 0 { // TODO: Implement SeekBlock (like ReadBlock) in go-unixfs return xerrors.New("sending merkle proofs for nonzero offset not supported yet") } - hnd.at = deal.Unixfs0.Offset + hnd.at = unixfs0.Offset bstore := hnd.m.sectorBlocks.SealedBlockstore(func() error { return nil // TODO: approve unsealing based on amount paid }) ds := merkledag.NewDAGService(blockservice.New(bstore, nil)) - rootNd, err := ds.Get(context.TODO(), deal.Unixfs0.Root) + rootNd, err := ds.Get(context.TODO(), deal.Ref) if err != nil { return err } @@ -162,7 +166,7 @@ func (hnd *handlerDeal) openFile(deal Deal) error { var ok bool hnd.ufsr, ok = fsr.(sectorblocks.UnixfsReader) if !ok { - return xerrors.Errorf("file %s didn't implement sectorblocks.UnixfsReader", deal.Unixfs0.Root) + return xerrors.Errorf("file %s didn't implement sectorblocks.UnixfsReader", deal.Ref) } isize, err := hnd.ufsr.Size() @@ -171,12 +175,14 @@ func (hnd *handlerDeal) openFile(deal Deal) error { } hnd.size = uint64(isize) - hnd.open = deal.Unixfs0.Root + hnd.open = deal.Ref return nil } -func (hnd *handlerDeal) accept(deal Deal) error { +func (hnd *handlerDeal) accept(deal DealProposal) error { + unixfs0 := deal.Params.Unixfs0 + resp := DealResponse{ Status: Accepted, } @@ -185,7 +191,7 @@ func (hnd *handlerDeal) accept(deal Deal) error { return err } - blocksToSend := (deal.Unixfs0.Size + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize + blocksToSend := (unixfs0.Size + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize for i := uint64(0); i < blocksToSend; { data, offset, nd, err := hnd.ufsr.ReadBlock(context.TODO()) if err != nil { @@ -194,8 +200,8 @@ func (hnd *handlerDeal) accept(deal Deal) error { log.Infof("sending block for a deal: %s", nd.Cid()) - if offset != deal.Unixfs0.Offset { - return xerrors.Errorf("ReadBlock on wrong offset: want %d, got %d", deal.Unixfs0.Offset, offset) + if offset != unixfs0.Offset { + return xerrors.Errorf("ReadBlock on wrong offset: want %d, got %d", unixfs0.Offset, offset) } /*if uint64(len(data)) != deal.Unixfs0.Size { // TODO: Fix for internal nodes (and any other node too) diff --git a/retrieval/types.go b/retrieval/types.go index 1392a815f..968c5eaad 100644 --- a/retrieval/types.go +++ b/retrieval/types.go @@ -21,10 +21,11 @@ const ( Accepted = iota Error Rejected + Unsealing ) func init() { - cbor.RegisterCborType(Deal{}) + cbor.RegisterCborType(DealProposal{}) cbor.RegisterCborType(Query{}) cbor.RegisterCborType(QueryResponse{}) @@ -50,17 +51,22 @@ type QueryResponse struct { } type Unixfs0Offer struct { // UNBORK - Root cid.Cid Offset uint64 Size uint64 } -type Deal struct { +type RetParams struct { Unixfs0 *Unixfs0Offer } +type DealProposal struct { + Ref cid.Cid + Params RetParams + // TODO: payment +} + type DealResponse struct { - Status int // TODO: make this more spec complainant + Status int Message string }