From b26906963b5a19c69c340861b61af86fe4fa4bb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 11 Nov 2021 16:17:39 +0100 Subject: [PATCH] retrieval: Support multi-root export --- api/api_full.go | 7 +- api/types.go | 34 ++++- api/v0api/v1_wrapper.go | 14 +- cli/client.go | 13 +- itests/deals_partial_retrieval_test.go | 43 ++++--- node/impl/client/client.go | 171 +++++++++++++++---------- 6 files changed, 182 insertions(+), 100 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index 48b5d0d3c..139f0326c 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -7,7 +7,6 @@ import ( "time" "github.com/ipfs/go-cid" - textselector "github.com/ipld/go-ipld-selector-text-lite" "github.com/libp2p/go-libp2p-core/peer" "github.com/filecoin-project/go-address" @@ -936,9 +935,9 @@ type MarketDeal struct { type RetrievalOrder struct { // TODO: make this less unixfs specific Root cid.Cid - Piece *cid.Cid - DatamodelPathSelector *textselector.Expression - Size uint64 + Piece *cid.Cid + DataSelector *Selector + Size uint64 Total types.BigInt UnsealPrice types.BigInt diff --git a/api/types.go b/api/types.go index a4c477545..05f86fa88 100644 --- a/api/types.go +++ b/api/types.go @@ -5,12 +5,10 @@ import ( "fmt" "time" - "github.com/filecoin-project/go-fil-markets/retrievalmarket" - "github.com/filecoin-project/lotus/chain/types" - textselector "github.com/ipld/go-ipld-selector-text-lite" - datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/peer" @@ -204,9 +202,35 @@ type RestrievalRes struct { DealID retrievalmarket.DealID } +// Selector specifies ipld selector string +// - if the string starts with '{', it's interpreted as json selector string +// see https://ipld.io/specs/selectors/ and https://ipld.io/specs/selectors/fixtures/selector-fixtures-1/ +// - otherwise the string is interpreted as ipld-selector-text-lite (simple ipld path) +// see https://github.com/ipld/go-ipld-selector-text-lite +type Selector string + +type DagSpec struct { + // RootSelector specifies root node + // - when using textselector, the path specifies the root node + // - if nil then RootSelector is inferred from DataSelector + // - must match a single node + RootSelector *Selector + + // DataSelector matches data to be retrieved + // - when using textselector, the path specifies subtree + DataSelector *Selector +} + type ExportRef struct { Root cid.Cid - DatamodelPathSelector *textselector.Expression + + // DAGs array specifies a list of DAGs to export + // - If exporting into a car file, defines car roots + // - If exporting into unixfs files, only one DAG is supported, DataSelector is ignored + // - When not specified defaults to a single DAG: + // - Root - the root node: `{".": {}}` + // - Data - the entire DAG: `{"R":{"l":{"none":{}},":>":{"a":{">":{"@":{}}}}}}` + DAGs []DagSpec FromLocalCAR string // if specified, get data from a local CARv2 file. DealID retrievalmarket.DealID diff --git a/api/v0api/v1_wrapper.go b/api/v0api/v1_wrapper.go index 0a1a463e5..5418d99c7 100644 --- a/api/v0api/v1_wrapper.go +++ b/api/v0api/v1_wrapper.go @@ -320,12 +320,20 @@ func (w *WrapperV1Full) clientRetrieve(ctx context.Context, order RetrievalOrder return } - finish(w.ClientExport(ctx, api.ExportRef{ + eref := api.ExportRef{ Root: order.Root, - DatamodelPathSelector: order.DatamodelPathSelector, FromLocalCAR: order.FromLocalCAR, DealID: dealID, - }, *ref)) + } + + if order.DatamodelPathSelector != nil { + s := api.Selector(*order.DatamodelPathSelector) + eref.DAGs = append(eref.DAGs, api.DagSpec{ + DataSelector: &s, + }) + } + + finish(w.ClientExport(ctx, eref, *ref)) } var _ FullNode = &WrapperV1Full{} diff --git a/cli/client.go b/cli/client.go index 4aa64ef55..91e431eb0 100644 --- a/cli/client.go +++ b/cli/client.go @@ -26,7 +26,6 @@ import ( datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil/cidenc" - textselector "github.com/ipld/go-ipld-selector-text-lite" "github.com/libp2p/go-libp2p-core/peer" "github.com/multiformats/go-multibase" "github.com/urfave/cli/v2" @@ -1202,10 +1201,16 @@ var clientRetrieveCmd = &cli.Command{ continue } } + + event := "New" + if evt.Event != nil { + event = retrievalmarket.ClientEvents[*evt.Event] + } + afmt.Printf("> Recv: %s, Paid %s, %s (%s)\n", types.SizeStr(types.NewInt(evt.BytesReceived)), types.FIL(evt.TotalPaid), - retrievalmarket.ClientEvents[*evt.Event], + event, retrievalmarket.DealStatuses[evt.Status], ) switch evt.Status { @@ -1226,8 +1231,8 @@ var clientRetrieveCmd = &cli.Command{ } } - if sel := textselector.Expression(cctx.String("datamodel-path-selector")); sel != "" { - eref.DatamodelPathSelector = &sel + if sel := api.Selector(cctx.String("datamodel-path-selector")); sel != "" { + eref.DAGs = append(eref.DAGs, api.DagSpec{DataSelector: &sel}) } err = fapi.ClientExport(ctx, *eref, lapi.FileRef{ diff --git a/itests/deals_partial_retrieval_test.go b/itests/deals_partial_retrieval_test.go index 4b3b90f02..9eeae3692 100644 --- a/itests/deals_partial_retrieval_test.go +++ b/itests/deals_partial_retrieval_test.go @@ -20,7 +20,6 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/ipld/go-car" - textselector "github.com/ipld/go-ipld-selector-text-lite" "github.com/stretchr/testify/require" ) @@ -31,9 +30,10 @@ var ( carRoot, _ = cid.Parse("bafy2bzacecnamqgqmifpluoeldx7zzglxcljo6oja4vrmtj7432rphldpdmm2") carCommp, _ = cid.Parse("baga6ea4seaqmrivgzei3fmx5qxtppwankmtou6zvigyjaveu3z2zzwhysgzuina") carPieceSize = abi.PaddedPieceSize(2097152) - textSelector = textselector.Expression("8/1/8/1/0/1/0") - textSelectorNonLink = textselector.Expression("8/1/8/1/0/1") - textSelectorNonexistent = textselector.Expression("42") + textSelector = api.Selector("8/1/8/1/0/1/0") + storPowCid, _ = cid.Parse("bafkqaetgnfwc6mjpon2g64tbm5sxa33xmvza") + textSelectorNonLink = api.Selector("8/1/8/1/0/1") + textSelectorNonexistent = api.Selector("42") expectedResult = "fil/1/storagepower" ) @@ -94,8 +94,10 @@ func TestPartialRetrieval(t *testing.T) { retOrder = offers[0].Order(caddr) } - retOrder.DatamodelPathSelector = &textSelector - eref.DatamodelPathSelector = &textSelector + retOrder.DataSelector = &textSelector + eref.DAGs = append(eref.DAGs, api.DagSpec{ + DataSelector: &textSelector, + }) eref.Root = carRoot // test retrieval of either data or constructing a partial selective-car @@ -113,6 +115,7 @@ func TestPartialRetrieval(t *testing.T) { Path: outFile.Name(), IsCAR: retrieveAsCar, }, + storPowCid, outFile, )) @@ -133,18 +136,19 @@ func TestPartialRetrieval(t *testing.T) { ctx, client, api.RetrievalOrder{ - Root: carRoot, - DatamodelPathSelector: &textSelectorNonexistent, + Root: carRoot, + DataSelector: &textSelectorNonexistent, }, api.ExportRef{ Root: carRoot, FromLocalCAR: sourceCar, - DatamodelPathSelector: &textSelectorNonexistent, + DAGs: []api.DagSpec{{DataSelector: &textSelectorNonexistent}}, }, &api.FileRef{}, + storPowCid, nil, ), - fmt.Sprintf("path selection does not match a node within %s", carRoot), + fmt.Sprintf("parsing dag spec: path selection does not match a node within %s", carRoot), ) // ensure non-boundary retrievals fail @@ -154,22 +158,23 @@ func TestPartialRetrieval(t *testing.T) { ctx, client, api.RetrievalOrder{ - Root: carRoot, - DatamodelPathSelector: &textSelectorNonLink, + Root: carRoot, + DataSelector: &textSelectorNonLink, }, api.ExportRef{ Root: carRoot, FromLocalCAR: sourceCar, - DatamodelPathSelector: &textSelectorNonLink, + DAGs: []api.DagSpec{{DataSelector: &textSelectorNonLink}}, }, &api.FileRef{}, + storPowCid, nil, ), - fmt.Sprintf("error while locating partial retrieval sub-root: unsupported selection path '%s' does not correspond to a block boundary (a.k.a. CID link)", textSelectorNonLink), + fmt.Sprintf("parsing dag spec: error while locating partial retrieval sub-root: unsupported selection path '%s' does not correspond to a block boundary (a.k.a. CID link)", textSelectorNonLink), ) } -func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrder api.RetrievalOrder, eref api.ExportRef, retRef *api.FileRef, outFile *os.File) error { +func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrder api.RetrievalOrder, eref api.ExportRef, retRef *api.FileRef, expRootCid cid.Cid, outFile *os.File) error { if retOrder.Total.Nil() { retOrder.Total = big.Zero() @@ -212,7 +217,7 @@ func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrde if len(cr.Header.Roots) != 1 { return fmt.Errorf("expected a single root in result car, got %d", len(cr.Header.Roots)) - } else if cr.Header.Roots[0].String() != carRoot.String() { + } else if cr.Header.Roots[0].String() != expRootCid.String() { return fmt.Errorf("expected root cid '%s', got '%s'", carRoot.String(), cr.Header.Roots[0].String()) } @@ -228,11 +233,11 @@ func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrde blks = append(blks, b) } - if len(blks) != 3 { - return fmt.Errorf("expected a car file with 3 blocks, got one with %d instead", len(blks)) + if len(blks) != 1 { + return fmt.Errorf("expected a car file with 1 blocks, got one with %d instead", len(blks)) } - data = blks[2].RawData() + data = blks[0].RawData() } if string(data) != expectedResult { diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 97ed9f289..5ea620110 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -12,6 +12,7 @@ import ( "time" bstore "github.com/ipfs/go-ipfs-blockstore" + format "github.com/ipfs/go-ipld-format" unixfile "github.com/ipfs/go-unixfs/file" "github.com/ipld/go-car" carv2 "github.com/ipld/go-car/v2" @@ -761,7 +762,7 @@ func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID rm.DealID) e } } -func getRetrievalSelector(dps *textselector.Expression) (datamodel.Node, error) { +func getDataSelector(dps *api.Selector) (datamodel.Node, error) { sel := selectorparse.CommonSelector_ExploreAllRecursively if dps != nil { @@ -775,7 +776,7 @@ func getRetrievalSelector(dps *textselector.Expression) (datamodel.Node, error) ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) selspec, err := textselector.SelectorSpecFromPath( - *dps, + textselector.Expression(*dps), // URGH - this is a direct copy from https://github.com/filecoin-project/go-fil-markets/blob/v1.12.0/shared/selectors.go#L10-L16 // Unable to use it because we need the SelectorSpec, and markets exposes just a reified node @@ -797,7 +798,7 @@ func getRetrievalSelector(dps *textselector.Expression) (datamodel.Node, error) } func (a *API) ClientRetrieve(ctx context.Context, params api.RetrievalOrder) (*api.RestrievalRes, error) { - sel, err := getRetrievalSelector(params.DatamodelPathSelector) + sel, err := getDataSelector(params.DataSelector) if err != nil { return nil, err } @@ -914,11 +915,6 @@ func (a *API) ClientExport(ctx context.Context, exportRef api.ExportRef, ref api carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievaladapter.CARBlockstoreAccessor) carPath := exportRef.FromLocalCAR - sel, err := getRetrievalSelector(exportRef.DatamodelPathSelector) - if err != nil { - return err - } - if carPath == "" { if !retrieveIntoIPFS && !retrieveIntoCAR { return xerrors.Errorf("unsupported retrieval blockstore accessor") @@ -941,33 +937,48 @@ func (a *API) ClientExport(ctx context.Context, exportRef api.ExportRef, ref api retrievalBs = cbs } + dserv := merkledag.NewDAGService(blockservice.New(retrievalBs, offline.Exchange(retrievalBs))) + roots, err := parseDagSpec(ctx, exportRef.Root, exportRef.DAGs, dserv) + if err != nil { + return xerrors.Errorf("parsing dag spec: %w", err) + } + // Are we outputting a CAR? if ref.IsCAR { // not IPFS and we do full selection - just extract the CARv1 from the CARv2 we stored the retrieval in - if !retrieveIntoIPFS && exportRef.DatamodelPathSelector == nil { + if !retrieveIntoIPFS && len(exportRef.DAGs) == 0 { return carv2.ExtractV1File(carPath, ref.Path) } - return a.outputCAR(ctx, exportRef.Root, sel, retrievalBs, ref) + return a.outputCAR(ctx, roots, retrievalBs, ref) } - return a.outputUnixFS(ctx, exportRef.Root, exportRef.DatamodelPathSelector, retrievalBs, ref) + if len(roots) != 1 { + return xerrors.Errorf("unixfs retrieval requires one root node, got %d", len(roots)) + } + + return a.outputUnixFS(ctx, roots[0].root, dserv, ref) } -func (a *API) outputCAR(ctx context.Context, root cid.Cid, sel datamodel.Node, bs bstore.Blockstore, ref api.FileRef) error { +func (a *API) outputCAR(ctx context.Context, dags []dagSpec, bs bstore.Blockstore, ref api.FileRef) error { // generating a CARv1 from the configured blockstore f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return err } + carDags := make([]car.Dag, len(dags)) + for i, dag := range dags { + carDags[i] = car.Dag{ + Root: dag.root, + Selector: dag.selector, + } + } + err = car.NewSelectiveCar( ctx, bs, - []car.Dag{{ - Root: root, - Selector: sel, - }}, + carDags, car.MaxTraversalLinks(config.MaxTraversalLinks), ).Write(f) if err != nil { @@ -977,60 +988,88 @@ func (a *API) outputCAR(ctx context.Context, root cid.Cid, sel datamodel.Node, b return f.Close() } -func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, sels *textselector.Expression, bs bstore.Blockstore, ref api.FileRef) error { - ds := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) +type dagSpec struct { + root cid.Cid + selector ipld.Node +} - // if we used a selector - need to find the sub-root the user actually wanted to retrieve - if sels != nil { - var subRootFound bool - sel := selectorparse.CommonSelector_ExploreAllRecursively - - if strings.HasPrefix(string(*sels), "{") { - var err error - sel, err = selectorparse.ParseJSONSelector(string(*sels)) - if err != nil { - return xerrors.Errorf("failed to parse json-selector '%s': %w", *sels, err) - } - } else { - selspec, _ := textselector.SelectorSpecFromPath(*sels, nil) //nolint:errcheck - sel = selspec.Node() - } - - if err := utils.TraverseDag( - ctx, - ds, - root, - sel, - func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error { - if r == traversal.VisitReason_SelectionMatch { - - if p.LastBlock.Path.String() != p.Path.String() { - return xerrors.Errorf("unsupported selection path '%s' does not correspond to a block boundary (a.k.a. CID link)", p.Path.String()) - } - - if p.LastBlock.Link == nil { - return nil - } - - cidLnk, castOK := p.LastBlock.Link.(cidlink.Link) - if !castOK { - return xerrors.Errorf("cidlink cast unexpectedly failed on '%s'", p.LastBlock.Link) - } - - root = cidLnk.Cid - subRootFound = true - } - return nil +func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds format.DAGService) ([]dagSpec, error) { + if len(dsp) == 0 { + return []dagSpec{ + { + root: root, + selector: nil, }, - ); err != nil { - return xerrors.Errorf("error while locating partial retrieval sub-root: %w", err) + }, nil + } + + out := make([]dagSpec, len(dsp)) + for i, spec := range dsp { + if spec.RootSelector == nil { + spec.RootSelector = spec.DataSelector } - if !subRootFound { - return xerrors.Errorf("path selection does not match a node within %s", root) + if spec.RootSelector != nil { + var rsn ipld.Node + + if strings.HasPrefix(string(*spec.RootSelector), "{") { + var err error + rsn, err = selectorparse.ParseJSONSelector(string(*spec.RootSelector)) + if err != nil { + return nil, xerrors.Errorf("failed to parse json-selector '%s': %w", *spec.RootSelector, err) + } + } else { + selspec, _ := textselector.SelectorSpecFromPath(textselector.Expression(*spec.RootSelector), nil) //nolint:errcheck + rsn = selspec.Node() + } + + if err := utils.TraverseDag( + ctx, + ds, + root, + rsn, + func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error { + if r == traversal.VisitReason_SelectionMatch { + + if p.LastBlock.Path.String() != p.Path.String() { + return xerrors.Errorf("unsupported selection path '%s' does not correspond to a block boundary (a.k.a. CID link)", p.Path.String()) + } + + if p.LastBlock.Link == nil { + return nil + } + + cidLnk, castOK := p.LastBlock.Link.(cidlink.Link) + if !castOK { + return xerrors.Errorf("cidlink cast unexpectedly failed on '%s'", p.LastBlock.Link) + } + + out[i].root = cidLnk.Cid + } + return nil + }, + ); err != nil { + return nil, xerrors.Errorf("error while locating partial retrieval sub-root: %w", err) + } + + if out[i].root == cid.Undef { + return nil, xerrors.Errorf("path selection does not match a node within %s", root) + } + } + + if spec.DataSelector != nil { + var err error + out[i].selector, err = getDataSelector(spec.DataSelector) + if err != nil { + return nil, err + } } } + return out, nil +} + +func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, ds format.DAGService, ref api.FileRef) error { nd, err := ds.Get(ctx, root) if err != nil { return xerrors.Errorf("ClientRetrieve: %w", err) @@ -1072,8 +1111,10 @@ func (a *API) ClientListRetrievals(ctx context.Context) ([]api.RetrievalInfo, er func (a *API) ClientGetRetrievalUpdates(ctx context.Context) (<-chan api.RetrievalInfo, error) { updates := make(chan api.RetrievalInfo) - unsub := a.Retrieval.SubscribeToEvents(func(_ rm.ClientEvent, deal rm.ClientDealState) { - updates <- a.newRetrievalInfo(ctx, deal) + unsub := a.Retrieval.SubscribeToEvents(func(evt rm.ClientEvent, deal rm.ClientDealState) { + update := a.newRetrievalInfo(ctx, deal) + update.Event = &evt + updates <- update }) go func() {