retrieval: Address review

This commit is contained in:
Łukasz Magiera 2019-08-29 01:01:28 +02:00
parent 985ca7d439
commit d8bcb37f9f
4 changed files with 28 additions and 14 deletions

View File

@ -116,7 +116,7 @@ var clientFindCmd = &cli.Command{
Usage: "find data in the network", Usage: "find data in the network",
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() { if !cctx.Args().Present() {
fmt.Println("Usage: retrieve [CID]") fmt.Println("Usage: find [CID]")
return nil return nil
} }

View File

@ -100,19 +100,21 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64,
} }
defer s.Close() defer s.Close()
initialOffset := uint64(0) // TODO: Check how much data we have locally
// TODO: Support in handler
// TODO: Allow client to specify this
cst := clientStream{ cst := clientStream{
stream: s, stream: s,
root: root, root: root,
offset: 0, // TODO: Check how much data we have locally offset: initialOffset,
// TODO: Support in handler
// TODO: Allow client to specify this
windowSize: build.UnixfsChunkSize, windowSize: build.UnixfsChunkSize,
verifier: &UnixFs0Verifier{Root: root}, verifier: &UnixFs0Verifier{Root: root},
} }
for cst.offset != size { for cst.offset != size + initialOffset {
toFetch := cst.windowSize toFetch := cst.windowSize
if toFetch+cst.offset > size { if toFetch+cst.offset > size {
toFetch = size - cst.offset toFetch = size - cst.offset

View File

@ -6,6 +6,7 @@ import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/libp2p/go-msgio" "github.com/libp2p/go-msgio"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"io"
"github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/lib/cborrpc" "github.com/filecoin-project/go-lotus/lib/cborrpc"
@ -90,8 +91,11 @@ func (m *Miner) HandleDealStream(stream network.Stream) { // TODO: should we blo
stream: stream, stream: stream,
} }
for { var err error
err := hnd.handleNext() // TODO: 'more' bool more := true
for more {
more, err = hnd.handleNext() // TODO: 'more' bool
if err != nil { if err != nil {
writeErr(stream, err) writeErr(stream, err)
return return
@ -100,14 +104,17 @@ func (m *Miner) HandleDealStream(stream network.Stream) { // TODO: should we blo
} }
func (hnd *handlerDeal) handleNext() error { func (hnd *handlerDeal) handleNext() (bool, error) {
var deal Deal var deal Deal
if err := cborrpc.ReadCborRPC(hnd.stream, &deal); err != nil { if err := cborrpc.ReadCborRPC(hnd.stream, &deal); err != nil {
return err if err == io.EOF { // client sent all deals
err = nil
}
return false, err
} }
if deal.Unixfs0 == nil { if deal.Unixfs0 == nil {
return xerrors.New("unknown deal type") return false, xerrors.New("unknown deal type")
} }
// TODO: Verify payment, check how much we can send based on that // TODO: Verify payment, check how much we can send based on that
@ -116,15 +123,19 @@ func (hnd *handlerDeal) handleNext() error {
if hnd.open != deal.Unixfs0.Root || hnd.at != deal.Unixfs0.Offset { if hnd.open != deal.Unixfs0.Root || hnd.at != deal.Unixfs0.Offset {
log.Infof("opening file for sending (open '%s') (@%d, want %d)", hnd.open, hnd.at, deal.Unixfs0.Offset) log.Infof("opening file for sending (open '%s') (@%d, want %d)", hnd.open, hnd.at, deal.Unixfs0.Offset)
if err := hnd.openFile(deal); err != nil { if err := hnd.openFile(deal); err != nil {
return err return false, err
} }
} }
if deal.Unixfs0.Offset+deal.Unixfs0.Size > hnd.size { if deal.Unixfs0.Offset+deal.Unixfs0.Size > hnd.size {
return xerrors.Errorf("tried to read too much %d+%d > %d", deal.Unixfs0.Offset, deal.Unixfs0.Size, hnd.size) return false, xerrors.Errorf("tried to read too much %d+%d > %d", deal.Unixfs0.Offset, deal.Unixfs0.Size, hnd.size)
} }
return hnd.accept(deal) err := hnd.accept(deal)
if err != nil {
return false, err
}
return true, nil
} }
func (hnd *handlerDeal) openFile(deal Deal) error { func (hnd *handlerDeal) openFile(deal Deal) error {

View File

@ -39,6 +39,8 @@ type UnixFs0Verifier struct {
func (b *UnixFs0Verifier) verify(ctx context.Context, blk blocks.Block, out io.Writer) (last bool, internal bool, err error) { func (b *UnixFs0Verifier) verify(ctx context.Context, blk blocks.Block, out io.Writer) (last bool, internal bool, err error) {
if b.sub != nil { if b.sub != nil {
// TODO: check links here (iff b.sub.sub == nil)
subLast, internal, err := b.sub.verify(ctx, blk, out) subLast, internal, err := b.sub.verify(ctx, blk, out)
if err != nil { if err != nil {
return false, false, err return false, false, err
@ -110,7 +112,6 @@ func (b *UnixFs0Verifier) checkInternal(blk blocks.Block, out io.Writer) (int, e
return len(nd.Links()), nil return len(nd.Links()), nil
case *merkledag.RawNode: case *merkledag.RawNode:
// TODO: do we check the hash before writing?
_, err := out.Write(nd.RawData()) _, err := out.Write(nd.RawData())
return 0, err return 0, err
default: default: