From c0566399c6498ba538acc4ccb43eebb856894563 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 28 Aug 2019 00:10:23 +0200 Subject: [PATCH] retrieval: Way to get the data --- api/api.go | 2 +- api/struct.go | 6 +- build/params.go | 2 + cli/client.go | 8 +- cmd/lotus-storage-miner/init.go | 2 +- node/client/import.go | 1 - node/impl/full/client.go | 17 ++++- retrieval/client.go | 27 ++++--- retrieval/verify.go | 131 ++++++++++++++++++++++++++++++-- 9 files changed, 168 insertions(+), 28 deletions(-) delete mode 100644 node/client/import.go diff --git a/api/api.go b/api/api.go index 466bb6229..902fe90d2 100644 --- a/api/api.go +++ b/api/api.go @@ -93,7 +93,7 @@ type FullNode interface { ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) ClientFindData(ctx context.Context, root cid.Cid) ([]QueryOffer, error) // TODO: specify serialization mode we want (defaults to unixfs for now) - ClientRetrieve(ctx context.Context, order RetrievalOrder) error // TODO: maybe just allow putting this straight into some file + ClientRetrieve(ctx context.Context, order RetrievalOrder, path string) error // ClientUnimport removes references to the specified file from filestore //ClientUnimport(path string) diff --git a/api/struct.go b/api/struct.go index 53551010d..aeb1857d9 100644 --- a/api/struct.go +++ b/api/struct.go @@ -73,7 +73,7 @@ type FullNodeStruct struct { ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` ClientFindData func(ctx context.Context, root cid.Cid) ([]QueryOffer, error) `perm:"read"` ClientStartDeal func(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) `perm:"admin"` - ClientRetrieve func(ctx context.Context, order RetrievalOrder) error `perm:"admin"` + ClientRetrieve func(ctx context.Context, order RetrievalOrder, path string) error `perm:"admin"` StateMinerSectors func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"` StateMinerProvingSet func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"` @@ -167,8 +167,8 @@ func (c *FullNodeStruct) ClientStartDeal(ctx context.Context, data cid.Cid, mine return c.Internal.ClientStartDeal(ctx, data, miner, price, blocksDuration) } -func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order RetrievalOrder) error { - return c.Internal.ClientRetrieve(ctx, order) +func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order RetrievalOrder, path string) error { + return c.Internal.ClientRetrieve(ctx, order, path) } func (c *FullNodeStruct) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) { diff --git a/build/params.go b/build/params.go index 1be30939f..0901bebc7 100644 --- a/build/params.go +++ b/build/params.go @@ -3,6 +3,8 @@ package build // Core network constants const UnixfsChunkSize uint64 = 1 << 20 +const UnixfsLinksPerLevel = 1024 + const SectorSize = 1024 // TODO: Move other important consts here diff --git a/cli/client.go b/cli/client.go index 4d7f92476..8a3525e46 100644 --- a/cli/client.go +++ b/cli/client.go @@ -163,8 +163,8 @@ var clientRetrieveCmd = &cli.Command{ Name: "retrieve", Usage: "retrieve data from network", Action: func(cctx *cli.Context) error { - if !cctx.Args().Present() { - fmt.Println("Usage: retrieve [CID]") + if cctx.NArg() != 2 { + fmt.Println("Usage: retrieve [CID] [outfile]") return nil } @@ -189,7 +189,7 @@ var clientRetrieveCmd = &cli.Command{ if has { fmt.Println("Success: Already in local storage") return nil - }*/ // TODO: uncomment before merge + }*/ // TODO: fix offers, err := api.ClientFindData(ctx, file) if err != nil { @@ -199,7 +199,7 @@ var clientRetrieveCmd = &cli.Command{ // TODO: parse offer strings from `client find`, make this smarter order := offers[0].Order() - err = api.ClientRetrieve(ctx, order) + err = api.ClientRetrieve(ctx, order, cctx.Args().Get(1)) if err == nil { fmt.Println("Success") } diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 0567f51b0..ee49589fb 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -227,7 +227,7 @@ func createStorageMiner(ctx context.Context, api api.FullNode, peerid peer.ID) ( params, err := actors.SerializeParams(actors.CreateStorageMinerParams{ Owner: defOwner, Worker: k, - SectorSize: types.NewInt(actors.SectorSize), + SectorSize: types.NewInt(build.SectorSize), PeerID: peerid, }) if err != nil { diff --git a/node/client/import.go b/node/client/import.go deleted file mode 100644 index da13c8ef3..000000000 --- a/node/client/import.go +++ /dev/null @@ -1 +0,0 @@ -package client diff --git a/node/impl/full/client.go b/node/impl/full/client.go index f64202965..f70fa21ad 100644 --- a/node/impl/full/client.go +++ b/node/impl/full/client.go @@ -171,7 +171,7 @@ func (a *ClientAPI) ClientImport(ctx context.Context, path string) (cid.Cid, err bufferedDS := ipld.NewBufferedDAG(ctx, a.LocalDAG) params := ihelper.DagBuilderParams{ - Maxlinks: ihelper.DefaultLinksPerBlock, + Maxlinks: build.UnixfsLinksPerLevel, RawLeaves: true, CidBuilder: nil, Dagserv: bufferedDS, @@ -219,6 +219,17 @@ func (a *ClientAPI) ClientListImports(ctx context.Context) ([]api.Import, error) } } -func (a *ClientAPI) ClientRetrieve(ctx context.Context, order api.RetrievalOrder) error { - return a.Retrieval.RetrieveUnixfs(ctx, order.Root, order.Size, order.MinerPeerID, order.Miner) +func (a *ClientAPI) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path string) error { + outFile, err := os.OpenFile(path, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0777) + if err != nil { + return err + } + + err = a.Retrieval.RetrieveUnixfs(ctx, order.Root, order.Size, order.MinerPeerID, order.Miner, outFile) + if err != nil { + _ = outFile.Close() + return err + } + + return outFile.Close() } diff --git a/retrieval/client.go b/retrieval/client.go index 9d052ec30..baaca9d5d 100644 --- a/retrieval/client.go +++ b/retrieval/client.go @@ -2,6 +2,7 @@ package retrieval import ( "context" + "io" "io/ioutil" pb "github.com/ipfs/go-bitswap/message/pb" @@ -92,7 +93,7 @@ type clientStream struct { // < ..Blocks // > Deal(...) // < ... -func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, miner peer.ID, minerAddr address.Address) error { +func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, miner peer.ID, minerAddr address.Address, out io.Writer) error { s, err := c.h.NewStream(ctx, miner, ProtocolID) if err != nil { return err @@ -108,7 +109,7 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, // TODO: Allow client to specify this windowSize: build.UnixfsChunkSize, - verifier: &OptimisticVerifier{}, // TODO: Use a real verifier + verifier: &UnixFs0Verifier{Root: root}, } for cst.offset != size { @@ -116,8 +117,9 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, if toFetch+cst.offset > size { toFetch = size - cst.offset } + log.Infof("Retrieve %dB @%d", toFetch, cst.offset) - err := cst.doOneExchange(toFetch) + err := cst.doOneExchange(toFetch, out) if err != nil { return err } @@ -128,7 +130,7 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, return nil } -func (cst *clientStream) doOneExchange(toFetch uint64) error { +func (cst *clientStream) doOneExchange(toFetch uint64, out io.Writer) error { deal := Deal{Unixfs0: &Unixfs0Offer{ Root: cst.root, Offset: cst.offset, @@ -158,18 +160,21 @@ func (cst *clientStream) doOneExchange(toFetch uint64) error { return xerrors.New("storage deal response had no Accepted section") } - return cst.fetchBlocks(toFetch) + log.Info("Retrieval accepted, fetching blocks") + + return cst.fetchBlocks(toFetch, out) // TODO: maybe increase miner window size after success } -func (cst *clientStream) fetchBlocks(toFetch uint64) error { +func (cst *clientStream) fetchBlocks(toFetch uint64, out io.Writer) error { blocksToFetch := (toFetch + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize // TODO: put msgio into spec reader := msgio.NewVarintReaderSize(cst.stream, network.MessageSizeMax) for i := uint64(0); i < blocksToFetch; { + log.Infof("block %d of %d", i+1, blocksToFetch) msg, err := reader.ReadMsg() if err != nil { return err @@ -180,7 +185,7 @@ func (cst *clientStream) fetchBlocks(toFetch uint64) error { return err } - dataBlocks, err := cst.consumeBlockMessage(pb) + dataBlocks, err := cst.consumeBlockMessage(pb, out) if err != nil { return err } @@ -193,7 +198,7 @@ func (cst *clientStream) fetchBlocks(toFetch uint64) error { return nil } -func (cst *clientStream) consumeBlockMessage(pb pb.Message_Block) (uint64, error) { +func (cst *clientStream) consumeBlockMessage(pb pb.Message_Block, out io.Writer) (uint64, error) { prefix, err := cid.PrefixFromBytes(pb.GetPrefix()) if err != nil { return 0, err @@ -205,15 +210,17 @@ func (cst *clientStream) consumeBlockMessage(pb pb.Message_Block) (uint64, error return 0, err } - internal, err := cst.verifier.Verify(blk) + internal, err := cst.verifier.Verify(context.TODO(), blk, out) if err != nil { return 0, err } - // TODO: Persist block + // TODO: Smarter out, maybe add to filestore automagically + // (Also, persist intermediate nodes) if internal { return 0, nil } + return 1, nil } diff --git a/retrieval/verify.go b/retrieval/verify.go index 0c0dca227..e232a06ac 100644 --- a/retrieval/verify.go +++ b/retrieval/verify.go @@ -1,19 +1,140 @@ package retrieval -import blocks "github.com/ipfs/go-block-format" +import ( + "context" + "io" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-merkledag" + "github.com/ipfs/go-unixfs" + pb "github.com/ipfs/go-unixfs/pb" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-lotus/build" +) type BlockVerifier interface { - Verify(blocks.Block) (internal bool, err error) + Verify(context.Context, blocks.Block, io.Writer) (internal bool, err error) } -// TODO: BalancedUnixFs0Verifier - type OptimisticVerifier struct { } -func (o *OptimisticVerifier) Verify(blocks.Block) (bool, error) { +func (o *OptimisticVerifier) Verify(context.Context, blocks.Block, io.Writer) (bool, error) { // It's probably fine return false, nil } +type UnixFs0Verifier struct { + Root cid.Cid + rootBlk blocks.Block + + expect int + seen int + + sub *UnixFs0Verifier +} + +func (b *UnixFs0Verifier) verify(ctx context.Context, blk blocks.Block, out io.Writer) (last bool, internal bool, err error) { + if b.sub != nil { + subLast, internal, err := b.sub.verify(ctx, blk, out) + if err != nil { + return false, false, err + } + if subLast { + b.sub = nil + b.seen++ + } + + return b.seen == b.expect, internal, nil + } + + if b.seen >= b.expect { // this is probably impossible + return false, false, xerrors.New("unixfs verifier: too many nodes in level") + } + + links, err := b.checkInternal(blk, out) + if err != nil { + return false, false, err + } + + if links > 0 { // TODO: check if all links are intermediate (or all aren't) + if links > build.UnixfsLinksPerLevel { + return false, false, xerrors.New("unixfs verifier: too many links in intermediate node") + } + + if b.seen+1 == b.expect && links != build.UnixfsLinksPerLevel { + return false, false, xerrors.New("unixfs verifier: too few nodes in level") + } + + b.sub = &UnixFs0Verifier{ + Root: blk.Cid(), + rootBlk: blk, + expect: links, + } + + // don't mark as seen yet + return false, true, nil + } + + b.seen++ + return b.seen == b.expect, false, nil +} + +func (b *UnixFs0Verifier) checkInternal(blk blocks.Block, out io.Writer) (int, error) { + nd, err := ipld.Decode(blk) + if err != nil { + return 0, err + } + + // TODO: check size + switch nd.(type) { + case *merkledag.ProtoNode: + fsn, err := unixfs.FSNodeFromBytes(nd.RawData()) + if err != nil { + return 0, err + } + if fsn.Type() != pb.Data_File { + return 0, xerrors.New("internal nodes must be a file") + } + if len(fsn.Data()) > 0 { + return 0, xerrors.New("internal node with data") + } + if len(nd.Links()) == 0 { + return 0, xerrors.New("internal node with no links") + } + return len(nd.Links()), nil + + case *merkledag.RawNode: + // TODO: do we check the hash before writing? + _, err := out.Write(nd.RawData()) + return 0, err + default: + return 0, xerrors.New("verifier: unknown node type") + } +} + +func (b *UnixFs0Verifier) Verify(ctx context.Context, blk blocks.Block, w io.Writer) (bool, error) { + // root is special + if b.rootBlk == nil { + if !b.Root.Equals(blk.Cid()) { + return false, xerrors.Errorf("unixfs verifier: root block CID didn't match: valid %s, got %s", b.Root, blk.Cid()) + } + b.rootBlk = blk + links, err := b.checkInternal(blk, w) + if err != nil { + return false, err + } + + b.expect = links + return links != 0, nil + } + + _, internal, err := b.verify(ctx, blk, w) + return internal, err +} + var _ BlockVerifier = &OptimisticVerifier{} +var _ BlockVerifier = &UnixFs0Verifier{}