retrieval: Way to get the data

This commit is contained in:
Łukasz Magiera 2019-08-28 00:10:23 +02:00
parent 433550e9a4
commit c0566399c6
9 changed files with 168 additions and 28 deletions

View File

@ -93,7 +93,7 @@ type FullNode interface {
ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error)
ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error)
ClientFindData(ctx context.Context, root cid.Cid) ([]QueryOffer, error) // TODO: specify serialization mode we want (defaults to unixfs for now)
ClientRetrieve(ctx context.Context, order RetrievalOrder) error // TODO: maybe just allow putting this straight into some file
ClientRetrieve(ctx context.Context, order RetrievalOrder, path string) error
// ClientUnimport removes references to the specified file from filestore
//ClientUnimport(path string)

View File

@ -73,7 +73,7 @@ type FullNodeStruct struct {
ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"`
ClientFindData func(ctx context.Context, root cid.Cid) ([]QueryOffer, error) `perm:"read"`
ClientStartDeal func(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) `perm:"admin"`
ClientRetrieve func(ctx context.Context, order RetrievalOrder) error `perm:"admin"`
ClientRetrieve func(ctx context.Context, order RetrievalOrder, path string) error `perm:"admin"`
StateMinerSectors func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"`
StateMinerProvingSet func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"`
@ -167,8 +167,8 @@ func (c *FullNodeStruct) ClientStartDeal(ctx context.Context, data cid.Cid, mine
return c.Internal.ClientStartDeal(ctx, data, miner, price, blocksDuration)
}
func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order RetrievalOrder) error {
return c.Internal.ClientRetrieve(ctx, order)
func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order RetrievalOrder, path string) error {
return c.Internal.ClientRetrieve(ctx, order, path)
}
func (c *FullNodeStruct) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) {

View File

@ -3,6 +3,8 @@ package build
// Core network constants
const UnixfsChunkSize uint64 = 1 << 20
const UnixfsLinksPerLevel = 1024
const SectorSize = 1024
// TODO: Move other important consts here

View File

@ -163,8 +163,8 @@ var clientRetrieveCmd = &cli.Command{
Name: "retrieve",
Usage: "retrieve data from network",
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
fmt.Println("Usage: retrieve [CID]")
if cctx.NArg() != 2 {
fmt.Println("Usage: retrieve [CID] [outfile]")
return nil
}
@ -189,7 +189,7 @@ var clientRetrieveCmd = &cli.Command{
if has {
fmt.Println("Success: Already in local storage")
return nil
}*/ // TODO: uncomment before merge
}*/ // TODO: fix
offers, err := api.ClientFindData(ctx, file)
if err != nil {
@ -199,7 +199,7 @@ var clientRetrieveCmd = &cli.Command{
// TODO: parse offer strings from `client find`, make this smarter
order := offers[0].Order()
err = api.ClientRetrieve(ctx, order)
err = api.ClientRetrieve(ctx, order, cctx.Args().Get(1))
if err == nil {
fmt.Println("Success")
}

View File

@ -227,7 +227,7 @@ func createStorageMiner(ctx context.Context, api api.FullNode, peerid peer.ID) (
params, err := actors.SerializeParams(actors.CreateStorageMinerParams{
Owner: defOwner,
Worker: k,
SectorSize: types.NewInt(actors.SectorSize),
SectorSize: types.NewInt(build.SectorSize),
PeerID: peerid,
})
if err != nil {

View File

@ -1 +0,0 @@
package client

View File

@ -171,7 +171,7 @@ func (a *ClientAPI) ClientImport(ctx context.Context, path string) (cid.Cid, err
bufferedDS := ipld.NewBufferedDAG(ctx, a.LocalDAG)
params := ihelper.DagBuilderParams{
Maxlinks: ihelper.DefaultLinksPerBlock,
Maxlinks: build.UnixfsLinksPerLevel,
RawLeaves: true,
CidBuilder: nil,
Dagserv: bufferedDS,
@ -219,6 +219,17 @@ func (a *ClientAPI) ClientListImports(ctx context.Context) ([]api.Import, error)
}
}
func (a *ClientAPI) ClientRetrieve(ctx context.Context, order api.RetrievalOrder) error {
return a.Retrieval.RetrieveUnixfs(ctx, order.Root, order.Size, order.MinerPeerID, order.Miner)
func (a *ClientAPI) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path string) error {
outFile, err := os.OpenFile(path, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0777)
if err != nil {
return err
}
err = a.Retrieval.RetrieveUnixfs(ctx, order.Root, order.Size, order.MinerPeerID, order.Miner, outFile)
if err != nil {
_ = outFile.Close()
return err
}
return outFile.Close()
}

View File

@ -2,6 +2,7 @@ package retrieval
import (
"context"
"io"
"io/ioutil"
pb "github.com/ipfs/go-bitswap/message/pb"
@ -92,7 +93,7 @@ type clientStream struct {
// < ..Blocks
// > Deal(...)
// < ...
func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, miner peer.ID, minerAddr address.Address) error {
func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, miner peer.ID, minerAddr address.Address, out io.Writer) error {
s, err := c.h.NewStream(ctx, miner, ProtocolID)
if err != nil {
return err
@ -108,7 +109,7 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64,
// TODO: Allow client to specify this
windowSize: build.UnixfsChunkSize,
verifier: &OptimisticVerifier{}, // TODO: Use a real verifier
verifier: &UnixFs0Verifier{Root: root},
}
for cst.offset != size {
@ -116,8 +117,9 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64,
if toFetch+cst.offset > size {
toFetch = size - cst.offset
}
log.Infof("Retrieve %dB @%d", toFetch, cst.offset)
err := cst.doOneExchange(toFetch)
err := cst.doOneExchange(toFetch, out)
if err != nil {
return err
}
@ -128,7 +130,7 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64,
return nil
}
func (cst *clientStream) doOneExchange(toFetch uint64) error {
func (cst *clientStream) doOneExchange(toFetch uint64, out io.Writer) error {
deal := Deal{Unixfs0: &Unixfs0Offer{
Root: cst.root,
Offset: cst.offset,
@ -158,18 +160,21 @@ func (cst *clientStream) doOneExchange(toFetch uint64) error {
return xerrors.New("storage deal response had no Accepted section")
}
return cst.fetchBlocks(toFetch)
log.Info("Retrieval accepted, fetching blocks")
return cst.fetchBlocks(toFetch, out)
// TODO: maybe increase miner window size after success
}
func (cst *clientStream) fetchBlocks(toFetch uint64) error {
func (cst *clientStream) fetchBlocks(toFetch uint64, out io.Writer) error {
blocksToFetch := (toFetch + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize
// TODO: put msgio into spec
reader := msgio.NewVarintReaderSize(cst.stream, network.MessageSizeMax)
for i := uint64(0); i < blocksToFetch; {
log.Infof("block %d of %d", i+1, blocksToFetch)
msg, err := reader.ReadMsg()
if err != nil {
return err
@ -180,7 +185,7 @@ func (cst *clientStream) fetchBlocks(toFetch uint64) error {
return err
}
dataBlocks, err := cst.consumeBlockMessage(pb)
dataBlocks, err := cst.consumeBlockMessage(pb, out)
if err != nil {
return err
}
@ -193,7 +198,7 @@ func (cst *clientStream) fetchBlocks(toFetch uint64) error {
return nil
}
func (cst *clientStream) consumeBlockMessage(pb pb.Message_Block) (uint64, error) {
func (cst *clientStream) consumeBlockMessage(pb pb.Message_Block, out io.Writer) (uint64, error) {
prefix, err := cid.PrefixFromBytes(pb.GetPrefix())
if err != nil {
return 0, err
@ -205,15 +210,17 @@ func (cst *clientStream) consumeBlockMessage(pb pb.Message_Block) (uint64, error
return 0, err
}
internal, err := cst.verifier.Verify(blk)
internal, err := cst.verifier.Verify(context.TODO(), blk, out)
if err != nil {
return 0, err
}
// TODO: Persist block
// TODO: Smarter out, maybe add to filestore automagically
// (Also, persist intermediate nodes)
if internal {
return 0, nil
}
return 1, nil
}

View File

@ -1,19 +1,140 @@
package retrieval
import blocks "github.com/ipfs/go-block-format"
import (
"context"
"io"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-unixfs"
pb "github.com/ipfs/go-unixfs/pb"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/build"
)
type BlockVerifier interface {
Verify(blocks.Block) (internal bool, err error)
Verify(context.Context, blocks.Block, io.Writer) (internal bool, err error)
}
// TODO: BalancedUnixFs0Verifier
type OptimisticVerifier struct {
}
func (o *OptimisticVerifier) Verify(blocks.Block) (bool, error) {
func (o *OptimisticVerifier) Verify(context.Context, blocks.Block, io.Writer) (bool, error) {
// It's probably fine
return false, nil
}
type UnixFs0Verifier struct {
Root cid.Cid
rootBlk blocks.Block
expect int
seen int
sub *UnixFs0Verifier
}
func (b *UnixFs0Verifier) verify(ctx context.Context, blk blocks.Block, out io.Writer) (last bool, internal bool, err error) {
if b.sub != nil {
subLast, internal, err := b.sub.verify(ctx, blk, out)
if err != nil {
return false, false, err
}
if subLast {
b.sub = nil
b.seen++
}
return b.seen == b.expect, internal, nil
}
if b.seen >= b.expect { // this is probably impossible
return false, false, xerrors.New("unixfs verifier: too many nodes in level")
}
links, err := b.checkInternal(blk, out)
if err != nil {
return false, false, err
}
if links > 0 { // TODO: check if all links are intermediate (or all aren't)
if links > build.UnixfsLinksPerLevel {
return false, false, xerrors.New("unixfs verifier: too many links in intermediate node")
}
if b.seen+1 == b.expect && links != build.UnixfsLinksPerLevel {
return false, false, xerrors.New("unixfs verifier: too few nodes in level")
}
b.sub = &UnixFs0Verifier{
Root: blk.Cid(),
rootBlk: blk,
expect: links,
}
// don't mark as seen yet
return false, true, nil
}
b.seen++
return b.seen == b.expect, false, nil
}
func (b *UnixFs0Verifier) checkInternal(blk blocks.Block, out io.Writer) (int, error) {
nd, err := ipld.Decode(blk)
if err != nil {
return 0, err
}
// TODO: check size
switch nd.(type) {
case *merkledag.ProtoNode:
fsn, err := unixfs.FSNodeFromBytes(nd.RawData())
if err != nil {
return 0, err
}
if fsn.Type() != pb.Data_File {
return 0, xerrors.New("internal nodes must be a file")
}
if len(fsn.Data()) > 0 {
return 0, xerrors.New("internal node with data")
}
if len(nd.Links()) == 0 {
return 0, xerrors.New("internal node with no links")
}
return len(nd.Links()), nil
case *merkledag.RawNode:
// TODO: do we check the hash before writing?
_, err := out.Write(nd.RawData())
return 0, err
default:
return 0, xerrors.New("verifier: unknown node type")
}
}
func (b *UnixFs0Verifier) Verify(ctx context.Context, blk blocks.Block, w io.Writer) (bool, error) {
// root is special
if b.rootBlk == nil {
if !b.Root.Equals(blk.Cid()) {
return false, xerrors.Errorf("unixfs verifier: root block CID didn't match: valid %s, got %s", b.Root, blk.Cid())
}
b.rootBlk = blk
links, err := b.checkInternal(blk, w)
if err != nil {
return false, err
}
b.expect = links
return links != 0, nil
}
_, internal, err := b.verify(ctx, blk, w)
return internal, err
}
var _ BlockVerifier = &OptimisticVerifier{}
var _ BlockVerifier = &UnixFs0Verifier{}