retrieval: Only output matching nodes, MatchPath dagspec
This commit is contained in:
parent
77d75b7739
commit
61791b90ea
@ -214,6 +214,10 @@ type DagSpec struct {
|
||||
// - when using textselector, the path specifies subtree
|
||||
// - the matched graph must have a single root
|
||||
DataSelector *Selector
|
||||
|
||||
// MatchPath matches the path traversal when DataSelector is a textselector.
|
||||
// Ignored when DataSelector is a json selector and in non-car retrieval
|
||||
MatchPath bool
|
||||
}
|
||||
|
||||
type ExportRef struct {
|
||||
|
@ -330,6 +330,7 @@ func (w *WrapperV1Full) clientRetrieve(ctx context.Context, order RetrievalOrder
|
||||
s := api.Selector(*order.DatamodelPathSelector)
|
||||
eref.DAGs = append(eref.DAGs, api.DagSpec{
|
||||
DataSelector: &s,
|
||||
MatchPath: true,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -435,7 +435,7 @@ var clientRetrieveCatCmd = &cli.Command{
|
||||
}
|
||||
|
||||
func pathToSel(psel string, sub builder.SelectorSpec) (lapi.Selector, error) {
|
||||
rs, err := textselector.SelectorSpecFromPath(textselector.Expression(psel), sub)
|
||||
rs, err := textselector.SelectorSpecFromPath(textselector.Expression(psel), true, sub)
|
||||
if err != nil {
|
||||
return "", xerrors.Errorf("failed to parse path-selector: %w", err)
|
||||
}
|
||||
|
2
go.mod
2
go.mod
@ -101,7 +101,7 @@ require (
|
||||
github.com/ipld/go-car/v2 v2.0.3-0.20210811121346-c514a30114d7
|
||||
github.com/ipld/go-codec-dagpb v1.3.0
|
||||
github.com/ipld/go-ipld-prime v0.12.3
|
||||
github.com/ipld/go-ipld-selector-text-lite v0.0.0
|
||||
github.com/ipld/go-ipld-selector-text-lite v0.0.1-0.20211129193845-ce1872a97d94
|
||||
github.com/kelseyhightower/envconfig v1.4.0
|
||||
github.com/libp2p/go-buffer-pool v0.0.2
|
||||
github.com/libp2p/go-eventbus v0.2.1
|
||||
|
2
go.sum
2
go.sum
@ -887,6 +887,8 @@ github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6/go.mod h1
|
||||
github.com/ipld/go-ipld-prime-proto v0.1.0/go.mod h1:11zp8f3sHVgIqtb/c9Kr5ZGqpnCLF1IVTNOez9TopzE=
|
||||
github.com/ipld/go-ipld-selector-text-lite v0.0.0 h1:MLU1YUAgd3Z+RfVCXUbvxH1RQjEe+larJ9jmlW1aMgA=
|
||||
github.com/ipld/go-ipld-selector-text-lite v0.0.0/go.mod h1:U2CQmFb+uWzfIEF3I1arrDa5rwtj00PrpiwwCO+k1RM=
|
||||
github.com/ipld/go-ipld-selector-text-lite v0.0.1-0.20211129193845-ce1872a97d94 h1:2OKA5W46CmLOp62m4gam9AdzhGGs708m62BW5CybhhU=
|
||||
github.com/ipld/go-ipld-selector-text-lite v0.0.1-0.20211129193845-ce1872a97d94/go.mod h1:U2CQmFb+uWzfIEF3I1arrDa5rwtj00PrpiwwCO+k1RM=
|
||||
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=
|
||||
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4=
|
||||
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
|
||||
|
@ -32,7 +32,7 @@ var (
|
||||
dmTextSelector = textselector.Expression(dmSelector)
|
||||
dmExpectedResult = "NO ADL"
|
||||
dmExpectedCarBlockCount = 4
|
||||
dmDagSpec = []api.DagSpec{{DataSelector: &dmSelector}}
|
||||
dmDagSpec = []api.DagSpec{{DataSelector: &dmSelector, MatchPath: true}}
|
||||
)
|
||||
|
||||
func TestDMLevelPartialRetrieval(t *testing.T) {
|
||||
|
@ -96,6 +96,7 @@ func TestPartialRetrieval(t *testing.T) {
|
||||
retOrder.DataSelector = &textSelector
|
||||
eref.DAGs = append(eref.DAGs, api.DagSpec{
|
||||
DataSelector: &textSelector,
|
||||
MatchPath: true,
|
||||
})
|
||||
eref.Root = carRoot
|
||||
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
format "github.com/ipfs/go-ipld-format"
|
||||
unixfile "github.com/ipfs/go-unixfs/file"
|
||||
"github.com/ipld/go-car"
|
||||
"github.com/ipld/go-car/util"
|
||||
carv2 "github.com/ipld/go-car/v2"
|
||||
carv2bs "github.com/ipld/go-car/v2/blockstore"
|
||||
"github.com/ipld/go-ipld-prime/datamodel"
|
||||
@ -763,7 +764,7 @@ func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID rm.DealID) e
|
||||
}
|
||||
}
|
||||
|
||||
func getDataSelector(dps *api.Selector) (datamodel.Node, error) {
|
||||
func getDataSelector(dps *api.Selector, matchPath bool) (datamodel.Node, error) {
|
||||
sel := selectorparse.CommonSelector_ExploreAllRecursively
|
||||
if dps != nil {
|
||||
|
||||
@ -777,13 +778,13 @@ func getDataSelector(dps *api.Selector) (datamodel.Node, error) {
|
||||
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
|
||||
|
||||
selspec, err := textselector.SelectorSpecFromPath(
|
||||
textselector.Expression(*dps),
|
||||
textselector.Expression(*dps), matchPath,
|
||||
|
||||
// URGH - this is a direct copy from https://github.com/filecoin-project/go-fil-markets/blob/v1.12.0/shared/selectors.go#L10-L16
|
||||
// Unable to use it because we need the SelectorSpec, and markets exposes just a reified node
|
||||
ssb.ExploreRecursive(
|
||||
selector.RecursionLimitNone(),
|
||||
ssb.ExploreAll(ssb.ExploreRecursiveEdge()),
|
||||
ssb.ExploreUnion(ssb.Matcher(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())),
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
@ -799,7 +800,7 @@ func getDataSelector(dps *api.Selector) (datamodel.Node, error) {
|
||||
}
|
||||
|
||||
func (a *API) ClientRetrieve(ctx context.Context, params api.RetrievalOrder) (*api.RestrievalRes, error) {
|
||||
sel, err := getDataSelector(params.DataSelector)
|
||||
sel, err := getDataSelector(params.DataSelector, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -973,24 +974,14 @@ func (a *API) ClientExportInto(ctx context.Context, exportRef api.ExportRef, car
|
||||
if !retrieveIntoIPFS && len(exportRef.DAGs) == 0 && dest.Writer == nil {
|
||||
return carv2.ExtractV1File(carPath, dest.Path)
|
||||
}
|
||||
|
||||
// if this is a path-selector, the user expects the car to start from the
|
||||
// root they asked for ( full merkle proof, no heuristic )
|
||||
if len(exportRef.DAGs) == 1 && exportRef.DAGs[0].DataSelector != nil && !strings.HasPrefix(string(*exportRef.DAGs[0].DataSelector), "{") {
|
||||
sel, err := getDataSelector(exportRef.DAGs[0].DataSelector)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("parsing dag spec: %w", err)
|
||||
}
|
||||
return a.outputCAR(ctx, []dagSpec{{root: exportRef.Root, selector: sel}}, retrievalBs, dest)
|
||||
}
|
||||
}
|
||||
|
||||
roots, err := parseDagSpec(ctx, exportRef.Root, exportRef.DAGs, dserv, !car)
|
||||
roots, err := parseDagSpec(ctx, exportRef.Root, exportRef.DAGs, dserv, car)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("parsing dag spec: %w", err)
|
||||
}
|
||||
if car {
|
||||
return a.outputCAR(ctx, roots, retrievalBs, dest)
|
||||
return a.outputCAR(ctx, dserv, retrievalBs, exportRef.Root, roots, dest)
|
||||
}
|
||||
|
||||
if len(roots) != 1 {
|
||||
@ -1000,32 +991,101 @@ func (a *API) ClientExportInto(ctx context.Context, exportRef api.ExportRef, car
|
||||
return a.outputUnixFS(ctx, roots[0].root, dserv, dest)
|
||||
}
|
||||
|
||||
func (a *API) outputCAR(ctx context.Context, dags []dagSpec, bs bstore.Blockstore, dest ExportDest) error {
|
||||
func (a *API) outputCAR(ctx context.Context, ds format.DAGService, bs bstore.Blockstore, root cid.Cid, dags []dagSpec, dest ExportDest) error {
|
||||
// generating a CARv1 from the configured blockstore
|
||||
carDags := make([]car.Dag, len(dags))
|
||||
roots := make([]cid.Cid, len(dags))
|
||||
for i, dag := range dags {
|
||||
carDags[i] = car.Dag{
|
||||
Root: dag.root,
|
||||
Selector: dag.selector,
|
||||
}
|
||||
roots[i] = dag.root
|
||||
}
|
||||
|
||||
return dest.doWrite(func(w io.Writer) error {
|
||||
return car.NewSelectiveCar(
|
||||
ctx,
|
||||
bs,
|
||||
carDags,
|
||||
car.MaxTraversalLinks(config.MaxTraversalLinks),
|
||||
).Write(w)
|
||||
|
||||
if err := car.WriteHeader(&car.CarHeader{
|
||||
Roots: roots,
|
||||
Version: 1,
|
||||
}, w); err != nil {
|
||||
return fmt.Errorf("failed to write car header: %s", err)
|
||||
}
|
||||
|
||||
cs := cid.NewSet()
|
||||
|
||||
for _, dagSpec := range dags {
|
||||
if err := utils.TraverseDag(
|
||||
ctx,
|
||||
ds,
|
||||
root,
|
||||
dagSpec.selector,
|
||||
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
|
||||
if r == traversal.VisitReason_SelectionMatch {
|
||||
var c cid.Cid
|
||||
if p.LastBlock.Link == nil {
|
||||
c = root
|
||||
} else {
|
||||
cidLnk, castOK := p.LastBlock.Link.(cidlink.Link)
|
||||
if !castOK {
|
||||
return xerrors.Errorf("cidlink cast unexpectedly failed on '%s'", p.LastBlock.Link)
|
||||
}
|
||||
|
||||
c = cidLnk.Cid
|
||||
}
|
||||
|
||||
if cs.Visit(c) {
|
||||
nb, err := bs.Get(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting block data: %w", err)
|
||||
}
|
||||
|
||||
err = util.LdWrite(w, c.Bytes(), nb.RawData())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("writing block data: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
},
|
||||
); err != nil {
|
||||
return xerrors.Errorf("error while traversing car dag: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, ds format.DAGService, dest ExportDest) error {
|
||||
nd, err := ds.Get(ctx, root)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("ClientRetrieve: %w", err)
|
||||
}
|
||||
file, err := unixfile.NewUnixfsFile(ctx, ds, nd)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("ClientRetrieve: %w", err)
|
||||
}
|
||||
|
||||
if dest.Writer == nil {
|
||||
return files.WriteTo(file, dest.Path)
|
||||
}
|
||||
|
||||
switch f := file.(type) {
|
||||
case files.File:
|
||||
_, err = io.Copy(dest.Writer, f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("file type %T is not supported", nd)
|
||||
}
|
||||
}
|
||||
|
||||
type dagSpec struct {
|
||||
root cid.Cid
|
||||
selector ipld.Node
|
||||
}
|
||||
|
||||
func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds format.DAGService, rootOnNodeBoundary bool) ([]dagSpec, error) {
|
||||
func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds format.DAGService, car bool) ([]dagSpec, error) {
|
||||
if len(dsp) == 0 {
|
||||
return []dagSpec{
|
||||
{
|
||||
@ -1044,7 +1104,7 @@ func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds forma
|
||||
|
||||
// reify selector
|
||||
var err error
|
||||
out[i].selector, err = getDataSelector(spec.DataSelector)
|
||||
out[i].selector, err = getDataSelector(spec.DataSelector, car && spec.MatchPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -1059,7 +1119,7 @@ func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds forma
|
||||
return nil, xerrors.Errorf("failed to parse json-selector '%s': %w", *spec.DataSelector, err)
|
||||
}
|
||||
} else {
|
||||
selspec, _ := textselector.SelectorSpecFromPath(textselector.Expression(*spec.DataSelector), nil) //nolint:errcheck
|
||||
selspec, _ := textselector.SelectorSpecFromPath(textselector.Expression(*spec.DataSelector), car && spec.MatchPath, nil) //nolint:errcheck
|
||||
rsn = selspec.Node()
|
||||
}
|
||||
|
||||
@ -1072,7 +1132,7 @@ func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds forma
|
||||
rsn,
|
||||
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
|
||||
if r == traversal.VisitReason_SelectionMatch {
|
||||
if rootOnNodeBoundary && p.LastBlock.Path.String() != p.Path.String() {
|
||||
if !car && p.LastBlock.Path.String() != p.Path.String() {
|
||||
return xerrors.Errorf("unsupported selection path '%s' does not correspond to a block boundary (a.k.a. CID link)", p.Path.String())
|
||||
}
|
||||
|
||||
@ -1107,32 +1167,6 @@ func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds forma
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, ds format.DAGService, dest ExportDest) error {
|
||||
nd, err := ds.Get(ctx, root)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("ClientRetrieve: %w", err)
|
||||
}
|
||||
file, err := unixfile.NewUnixfsFile(ctx, ds, nd)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("ClientRetrieve: %w", err)
|
||||
}
|
||||
|
||||
if dest.Writer == nil {
|
||||
return files.WriteTo(file, dest.Path)
|
||||
}
|
||||
|
||||
switch f := file.(type) {
|
||||
case files.File:
|
||||
_, err = io.Copy(dest.Writer, f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("file type %T is not supported", nd)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *API) ClientListRetrievals(ctx context.Context) ([]api.RetrievalInfo, error) {
|
||||
deals, err := a.Retrieval.ListDeals()
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user