retrieval: send blocks as cbor
This commit is contained in:
parent
c437b6884d
commit
a43890dc76
@ -5,7 +5,6 @@ import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
|
||||
pb "github.com/ipfs/go-bitswap/message/pb"
|
||||
blocks "github.com/ipfs/go-block-format"
|
||||
"github.com/ipfs/go-cid"
|
||||
cbor "github.com/ipfs/go-ipld-cbor"
|
||||
@ -13,7 +12,6 @@ import (
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-msgio"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-lotus/api"
|
||||
@ -172,42 +170,34 @@ func (cst *clientStream) doOneExchange(toFetch uint64, out io.Writer) error {
|
||||
func (cst *clientStream) fetchBlocks(toFetch uint64, out io.Writer) error {
|
||||
blocksToFetch := (toFetch + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize
|
||||
|
||||
// TODO: put msgio into spec
|
||||
reader := msgio.NewVarintReaderSize(cst.stream, network.MessageSizeMax)
|
||||
|
||||
for i := uint64(0); i < blocksToFetch; {
|
||||
log.Infof("block %d of %d", i+1, blocksToFetch)
|
||||
msg, err := reader.ReadMsg()
|
||||
if err != nil {
|
||||
|
||||
var block Block
|
||||
if err := cborrpc.ReadCborRPC(cst.stream, &block); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var pb pb.Message_Block
|
||||
if err := pb.Unmarshal(msg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dataBlocks, err := cst.consumeBlockMessage(pb, out)
|
||||
dataBlocks, err := cst.consumeBlockMessage(block, out)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
i += dataBlocks
|
||||
|
||||
reader.ReleaseMsg(msg)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cst *clientStream) consumeBlockMessage(pb pb.Message_Block, out io.Writer) (uint64, error) {
|
||||
prefix, err := cid.PrefixFromBytes(pb.GetPrefix())
|
||||
func (cst *clientStream) consumeBlockMessage(block Block, out io.Writer) (uint64, error) {
|
||||
prefix, err := cid.PrefixFromBytes(block.Prefix)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
cid, err := prefix.Sum(pb.GetData())
|
||||
|
||||
blk, err := blocks.NewBlockWithCid(pb.GetData(), cid)
|
||||
cid, err := prefix.Sum(block.Data)
|
||||
|
||||
blk, err := blocks.NewBlockWithCid(block.Data, cid)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -2,20 +2,19 @@ package retrieval
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/filecoin-project/go-lotus/build"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/libp2p/go-msgio"
|
||||
"golang.org/x/xerrors"
|
||||
"io"
|
||||
|
||||
"github.com/filecoin-project/go-lotus/chain/types"
|
||||
"github.com/filecoin-project/go-lotus/lib/cborrpc"
|
||||
"github.com/filecoin-project/go-lotus/storage/sectorblocks"
|
||||
pb "github.com/ipfs/go-bitswap/message/pb"
|
||||
"github.com/ipfs/go-blockservice"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-merkledag"
|
||||
unixfile "github.com/ipfs/go-unixfs/file"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-lotus/build"
|
||||
"github.com/filecoin-project/go-lotus/chain/types"
|
||||
"github.com/filecoin-project/go-lotus/lib/cborrpc"
|
||||
"github.com/filecoin-project/go-lotus/storage/sectorblocks"
|
||||
)
|
||||
|
||||
type Miner struct {
|
||||
@ -186,9 +185,6 @@ func (hnd *handlerDeal) accept(deal Deal) error {
|
||||
return err
|
||||
}
|
||||
|
||||
buf := make([]byte, network.MessageSizeMax)
|
||||
msgw := msgio.NewVarintWriter(hnd.stream)
|
||||
|
||||
blocksToSend := (deal.Unixfs0.Size + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize
|
||||
for i := uint64(0); i < blocksToSend; {
|
||||
data, offset, nd, err := hnd.ufsr.ReadBlock(context.TODO())
|
||||
@ -207,18 +203,12 @@ func (hnd *handlerDeal) accept(deal Deal) error {
|
||||
return
|
||||
}*/
|
||||
|
||||
block := pb.Message_Block{
|
||||
block := Block{
|
||||
Prefix: nd.Cid().Prefix().Bytes(),
|
||||
Data: nd.RawData(),
|
||||
}
|
||||
|
||||
n, err := block.MarshalTo(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := msgw.WriteMsg(buf[:n]); err != nil {
|
||||
log.Error(err)
|
||||
if err := cborrpc.WriteCborRPC(hnd.stream, block); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -31,6 +31,7 @@ func init() {
|
||||
cbor.RegisterCborType(Unixfs0Offer{})
|
||||
|
||||
cbor.RegisterCborType(DealResponse{})
|
||||
cbor.RegisterCborType(Block{})
|
||||
}
|
||||
|
||||
type Query struct {
|
||||
@ -62,3 +63,8 @@ type DealResponse struct {
|
||||
Status int // TODO: make this more spec complainant
|
||||
Message string
|
||||
}
|
||||
|
||||
type Block struct { // TODO: put in spec
|
||||
Prefix []byte // TODO: fix cid.Prefix marshaling somehow
|
||||
Data []byte
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user