diff --git a/retrieval/client.go b/retrieval/client.go index 1ba21f46a..8b21078d3 100644 --- a/retrieval/client.go +++ b/retrieval/client.go @@ -5,7 +5,6 @@ import ( "io" "io/ioutil" - pb "github.com/ipfs/go-bitswap/message/pb" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" @@ -13,7 +12,6 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-msgio" "golang.org/x/xerrors" "github.com/filecoin-project/go-lotus/api" @@ -172,42 +170,34 @@ func (cst *clientStream) doOneExchange(toFetch uint64, out io.Writer) error { func (cst *clientStream) fetchBlocks(toFetch uint64, out io.Writer) error { blocksToFetch := (toFetch + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize - // TODO: put msgio into spec - reader := msgio.NewVarintReaderSize(cst.stream, network.MessageSizeMax) - for i := uint64(0); i < blocksToFetch; { log.Infof("block %d of %d", i+1, blocksToFetch) - msg, err := reader.ReadMsg() - if err != nil { + + var block Block + if err := cborrpc.ReadCborRPC(cst.stream, &block); err != nil { return err } - var pb pb.Message_Block - if err := pb.Unmarshal(msg); err != nil { - return err - } - - dataBlocks, err := cst.consumeBlockMessage(pb, out) + dataBlocks, err := cst.consumeBlockMessage(block, out) if err != nil { return err } i += dataBlocks - - reader.ReleaseMsg(msg) } return nil } -func (cst *clientStream) consumeBlockMessage(pb pb.Message_Block, out io.Writer) (uint64, error) { - prefix, err := cid.PrefixFromBytes(pb.GetPrefix()) +func (cst *clientStream) consumeBlockMessage(block Block, out io.Writer) (uint64, error) { + prefix, err := cid.PrefixFromBytes(block.Prefix) if err != nil { return 0, err } - cid, err := prefix.Sum(pb.GetData()) - blk, err := blocks.NewBlockWithCid(pb.GetData(), cid) + cid, err := prefix.Sum(block.Data) + + blk, err := blocks.NewBlockWithCid(block.Data, cid) if err != nil { return 0, err } diff --git a/retrieval/miner.go b/retrieval/miner.go index 92f27aa8b..88fba6a46 100644 --- a/retrieval/miner.go +++ b/retrieval/miner.go @@ -2,20 +2,19 @@ package retrieval import ( "context" - "github.com/filecoin-project/go-lotus/build" - "github.com/ipfs/go-cid" - "github.com/libp2p/go-msgio" - "golang.org/x/xerrors" "io" - "github.com/filecoin-project/go-lotus/chain/types" - "github.com/filecoin-project/go-lotus/lib/cborrpc" - "github.com/filecoin-project/go-lotus/storage/sectorblocks" - pb "github.com/ipfs/go-bitswap/message/pb" "github.com/ipfs/go-blockservice" + "github.com/ipfs/go-cid" "github.com/ipfs/go-merkledag" unixfile "github.com/ipfs/go-unixfs/file" "github.com/libp2p/go-libp2p-core/network" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-lotus/build" + "github.com/filecoin-project/go-lotus/chain/types" + "github.com/filecoin-project/go-lotus/lib/cborrpc" + "github.com/filecoin-project/go-lotus/storage/sectorblocks" ) type Miner struct { @@ -186,9 +185,6 @@ func (hnd *handlerDeal) accept(deal Deal) error { return err } - buf := make([]byte, network.MessageSizeMax) - msgw := msgio.NewVarintWriter(hnd.stream) - blocksToSend := (deal.Unixfs0.Size + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize for i := uint64(0); i < blocksToSend; { data, offset, nd, err := hnd.ufsr.ReadBlock(context.TODO()) @@ -207,18 +203,12 @@ func (hnd *handlerDeal) accept(deal Deal) error { return }*/ - block := pb.Message_Block{ + block := Block{ Prefix: nd.Cid().Prefix().Bytes(), Data: nd.RawData(), } - n, err := block.MarshalTo(buf) - if err != nil { - return err - } - - if err := msgw.WriteMsg(buf[:n]); err != nil { - log.Error(err) + if err := cborrpc.WriteCborRPC(hnd.stream, block); err != nil { return err } diff --git a/retrieval/types.go b/retrieval/types.go index 765c583af..bef07c697 100644 --- a/retrieval/types.go +++ b/retrieval/types.go @@ -31,6 +31,7 @@ func init() { cbor.RegisterCborType(Unixfs0Offer{}) cbor.RegisterCborType(DealResponse{}) + cbor.RegisterCborType(Block{}) } type Query struct { @@ -62,3 +63,8 @@ type DealResponse struct { Status int // TODO: make this more spec complainant Message string } + +type Block struct { // TODO: put in spec + Prefix []byte // TODO: fix cid.Prefix marshaling somehow + Data []byte +}