Merge pull request #7706 from filecoin-project/feat/ret-matchding-cars
retrieval: Only output matching nodes, MatchPath dagspec
This commit is contained in:
commit
f39942283b
@ -214,6 +214,12 @@ type DagSpec struct {
|
||||
// - when using textselector, the path specifies subtree
|
||||
// - the matched graph must have a single root
|
||||
DataSelector *Selector
|
||||
|
||||
// ExportMerkleProof is applicable only when exporting to a CAR file via a path textselector
|
||||
// When true, in addition to the selection target, the resulting CAR will contain every block along the
|
||||
// path back to, and including the original root
|
||||
// When false the resulting CAR contains only the blocks of the target subdag
|
||||
ExportMerkleProof bool
|
||||
}
|
||||
|
||||
type ExportRef struct {
|
||||
|
@ -329,7 +329,8 @@ func (w *WrapperV1Full) clientRetrieve(ctx context.Context, order RetrievalOrder
|
||||
if order.DatamodelPathSelector != nil {
|
||||
s := api.Selector(*order.DatamodelPathSelector)
|
||||
eref.DAGs = append(eref.DAGs, api.DagSpec{
|
||||
DataSelector: &s,
|
||||
DataSelector: &s,
|
||||
ExportMerkleProof: true,
|
||||
})
|
||||
}
|
||||
|
||||
|
Binary file not shown.
@ -273,19 +273,29 @@ Examples:
|
||||
Flags: append([]cli.Flag{
|
||||
&cli.BoolFlag{
|
||||
Name: "car",
|
||||
Usage: "export to a car file instead of a regular file",
|
||||
Usage: "Export to a car file instead of a regular file",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "data-selector",
|
||||
Aliases: []string{"data-selector-selector"},
|
||||
Aliases: []string{"datamodel-path-selector"},
|
||||
Usage: "IPLD datamodel text-path selector, or IPLD json selector",
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "car-export-merkle-proof",
|
||||
Usage: "(requires --data-selector and --car) Export data-selector merkle proof",
|
||||
},
|
||||
}, retrFlagsCommon...),
|
||||
Action: func(cctx *cli.Context) error {
|
||||
if cctx.NArg() != 2 {
|
||||
return ShowHelp(cctx, fmt.Errorf("incorrect number of arguments"))
|
||||
}
|
||||
|
||||
if cctx.Bool("car-export-merkle-proof") {
|
||||
if !cctx.Bool("car") || !cctx.IsSet("data-selector") {
|
||||
return ShowHelp(cctx, fmt.Errorf("--car-export-merkle-proof requires --car and --data-selector"))
|
||||
}
|
||||
}
|
||||
|
||||
fapi, closer, err := GetFullNodeAPIV1(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -305,7 +315,7 @@ Examples:
|
||||
}
|
||||
|
||||
if s != nil {
|
||||
eref.DAGs = append(eref.DAGs, lapi.DagSpec{DataSelector: s})
|
||||
eref.DAGs = append(eref.DAGs, lapi.DagSpec{DataSelector: s, ExportMerkleProof: cctx.Bool("car-export-merkle-proof")})
|
||||
}
|
||||
|
||||
err = fapi.ClientExport(ctx, *eref, lapi.FileRef{
|
||||
@ -434,8 +444,8 @@ var clientRetrieveCatCmd = &cli.Command{
|
||||
},
|
||||
}
|
||||
|
||||
func pathToSel(psel string, sub builder.SelectorSpec) (lapi.Selector, error) {
|
||||
rs, err := textselector.SelectorSpecFromPath(textselector.Expression(psel), sub)
|
||||
func pathToSel(psel string, matchTraversal bool, sub builder.SelectorSpec) (lapi.Selector, error) {
|
||||
rs, err := textselector.SelectorSpecFromPath(textselector.Expression(psel), matchTraversal, sub)
|
||||
if err != nil {
|
||||
return "", xerrors.Errorf("failed to parse path-selector: %w", err)
|
||||
}
|
||||
@ -489,7 +499,7 @@ var clientRetrieveLsCmd = &cli.Command{
|
||||
|
||||
if cctx.IsSet("data-selector") {
|
||||
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
|
||||
dataSelector, err = pathToSel(cctx.String("data-selector"),
|
||||
dataSelector, err = pathToSel(cctx.String("data-selector"), cctx.Bool("ipld"),
|
||||
ssb.ExploreUnion(
|
||||
ssb.Matcher(),
|
||||
ssb.ExploreAll(
|
||||
@ -554,7 +564,7 @@ var clientRetrieveLsCmd = &cli.Command{
|
||||
|
||||
if cctx.IsSet("data-selector") {
|
||||
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
|
||||
jsel, err = pathToSel(cctx.String("data-selector"),
|
||||
jsel, err = pathToSel(cctx.String("data-selector"), false,
|
||||
ssb.ExploreRecursive(selector.RecursionLimitDepth(int64(cctx.Int("depth"))), ssb.ExploreAll(ssb.ExploreUnion(ssb.Matcher(), ssb.ExploreRecursiveEdge()))),
|
||||
)
|
||||
}
|
||||
|
@ -581,14 +581,15 @@ Examples:
|
||||
|
||||
|
||||
OPTIONS:
|
||||
--car export to a car file instead of a regular file (default: false)
|
||||
--data-selector value, --data-selector-selector value IPLD datamodel text-path selector, or IPLD json selector
|
||||
--from value address to send transactions from
|
||||
--provider value, --miner value provider to use for retrieval, if not present it'll use local discovery
|
||||
--maxPrice value maximum price the client is willing to consider (default: 0 FIL)
|
||||
--pieceCid value require data to be retrieved from a specific Piece CID
|
||||
--allow-local (default: false)
|
||||
--help, -h show help (default: false)
|
||||
--car Export to a car file instead of a regular file (default: false)
|
||||
--data-selector value, --datamodel-path-selector value IPLD datamodel text-path selector, or IPLD json selector
|
||||
--car-export-merkle-proof (requires --data-selector and --car) Export data-selector merkle proof (default: false)
|
||||
--from value address to send transactions from
|
||||
--provider value, --miner value provider to use for retrieval, if not present it'll use local discovery
|
||||
--maxPrice value maximum price the client is willing to consider (default: 0 FIL)
|
||||
--pieceCid value require data to be retrieved from a specific Piece CID
|
||||
--allow-local (default: false)
|
||||
--help, -h show help (default: false)
|
||||
|
||||
```
|
||||
|
||||
|
2
go.mod
2
go.mod
@ -101,7 +101,7 @@ require (
|
||||
github.com/ipld/go-car/v2 v2.0.3-0.20210811121346-c514a30114d7
|
||||
github.com/ipld/go-codec-dagpb v1.3.0
|
||||
github.com/ipld/go-ipld-prime v0.12.3
|
||||
github.com/ipld/go-ipld-selector-text-lite v0.0.0
|
||||
github.com/ipld/go-ipld-selector-text-lite v0.0.1
|
||||
github.com/kelseyhightower/envconfig v1.4.0
|
||||
github.com/libp2p/go-buffer-pool v0.0.2
|
||||
github.com/libp2p/go-eventbus v0.2.1
|
||||
|
4
go.sum
4
go.sum
@ -885,8 +885,8 @@ github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5/go.mod h1
|
||||
github.com/ipld/go-ipld-prime-proto v0.0.0-20200428191222-c1ffdadc01e1/go.mod h1:OAV6xBmuTLsPZ+epzKkPB1e25FHk/vCtyatkdHcArLs=
|
||||
github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6/go.mod h1:3pHYooM9Ea65jewRwrb2u5uHZCNkNTe9ABsVB+SrkH0=
|
||||
github.com/ipld/go-ipld-prime-proto v0.1.0/go.mod h1:11zp8f3sHVgIqtb/c9Kr5ZGqpnCLF1IVTNOez9TopzE=
|
||||
github.com/ipld/go-ipld-selector-text-lite v0.0.0 h1:MLU1YUAgd3Z+RfVCXUbvxH1RQjEe+larJ9jmlW1aMgA=
|
||||
github.com/ipld/go-ipld-selector-text-lite v0.0.0/go.mod h1:U2CQmFb+uWzfIEF3I1arrDa5rwtj00PrpiwwCO+k1RM=
|
||||
github.com/ipld/go-ipld-selector-text-lite v0.0.1 h1:lNqFsQpBHc3p5xHob2KvEg/iM5dIFn6iw4L/Hh+kS1Y=
|
||||
github.com/ipld/go-ipld-selector-text-lite v0.0.1/go.mod h1:U2CQmFb+uWzfIEF3I1arrDa5rwtj00PrpiwwCO+k1RM=
|
||||
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=
|
||||
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4=
|
||||
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
|
||||
|
@ -32,7 +32,7 @@ var (
|
||||
dmTextSelector = textselector.Expression(dmSelector)
|
||||
dmExpectedResult = "NO ADL"
|
||||
dmExpectedCarBlockCount = 4
|
||||
dmDagSpec = []api.DagSpec{{DataSelector: &dmSelector}}
|
||||
dmDagSpec = []api.DagSpec{{DataSelector: &dmSelector, ExportMerkleProof: true}}
|
||||
)
|
||||
|
||||
func TestDMLevelPartialRetrieval(t *testing.T) {
|
||||
|
@ -29,6 +29,7 @@ var (
|
||||
sourceCar = "../build/genesis/mainnet.car"
|
||||
carRoot, _ = cid.Parse("bafy2bzacecnamqgqmifpluoeldx7zzglxcljo6oja4vrmtj7432rphldpdmm2")
|
||||
carCommp, _ = cid.Parse("baga6ea4seaqmrivgzei3fmx5qxtppwankmtou6zvigyjaveu3z2zzwhysgzuina")
|
||||
selectedCid, _ = cid.Parse("bafkqaetgnfwc6mjpon2g64tbm5sxa33xmvza")
|
||||
carPieceSize = abi.PaddedPieceSize(2097152)
|
||||
textSelector = api.Selector("8/1/8/1/0/1/0")
|
||||
textSelectorNonLink = api.Selector("8/1/8/1/0/1")
|
||||
@ -54,76 +55,79 @@ func TestPartialRetrieval(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
// first test retrieval from local car, then do an actual deal
|
||||
for _, fullCycle := range []bool{false, true} {
|
||||
for _, exportMerkleProof := range []bool{false, true} {
|
||||
for _, fullCycle := range []bool{false, true} {
|
||||
|
||||
var retOrder api.RetrievalOrder
|
||||
var eref api.ExportRef
|
||||
var retOrder api.RetrievalOrder
|
||||
var eref api.ExportRef
|
||||
|
||||
if !fullCycle {
|
||||
eref.FromLocalCAR = sourceCar
|
||||
} else {
|
||||
dp := dh.DefaultStartDealParams()
|
||||
dp.Data = &storagemarket.DataRef{
|
||||
// FIXME: figure out how to do this with an online partial transfer
|
||||
TransferType: storagemarket.TTManual,
|
||||
Root: carRoot,
|
||||
PieceCid: &carCommp,
|
||||
PieceSize: carPieceSize.Unpadded(),
|
||||
if !fullCycle {
|
||||
eref.FromLocalCAR = sourceCar
|
||||
} else {
|
||||
dp := dh.DefaultStartDealParams()
|
||||
dp.Data = &storagemarket.DataRef{
|
||||
// FIXME: figure out how to do this with an online partial transfer
|
||||
TransferType: storagemarket.TTManual,
|
||||
Root: carRoot,
|
||||
PieceCid: &carCommp,
|
||||
PieceSize: carPieceSize.Unpadded(),
|
||||
}
|
||||
proposalCid := dh.StartDeal(ctx, dp)
|
||||
|
||||
// Wait for the deal to reach StorageDealCheckForAcceptance on the client
|
||||
cd, err := client.ClientGetDealInfo(ctx, *proposalCid)
|
||||
require.NoError(t, err)
|
||||
require.Eventually(t, func() bool {
|
||||
cd, _ := client.ClientGetDealInfo(ctx, *proposalCid)
|
||||
return cd.State == storagemarket.StorageDealCheckForAcceptance
|
||||
}, 30*time.Second, 1*time.Second, "actual deal status is %s", storagemarket.DealStates[cd.State])
|
||||
|
||||
err = miner.DealsImportData(ctx, *proposalCid, sourceCar)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait for the deal to be published, we should be able to start retrieval right away
|
||||
dh.WaitDealPublished(ctx, proposalCid)
|
||||
|
||||
offers, err := client.ClientFindData(ctx, carRoot, nil)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, offers, "no offers")
|
||||
|
||||
retOrder = offers[0].Order(caddr)
|
||||
}
|
||||
proposalCid := dh.StartDeal(ctx, dp)
|
||||
|
||||
// Wait for the deal to reach StorageDealCheckForAcceptance on the client
|
||||
cd, err := client.ClientGetDealInfo(ctx, *proposalCid)
|
||||
require.NoError(t, err)
|
||||
require.Eventually(t, func() bool {
|
||||
cd, _ := client.ClientGetDealInfo(ctx, *proposalCid)
|
||||
return cd.State == storagemarket.StorageDealCheckForAcceptance
|
||||
}, 30*time.Second, 1*time.Second, "actual deal status is %s", storagemarket.DealStates[cd.State])
|
||||
retOrder.DataSelector = &textSelector
|
||||
eref.DAGs = append(eref.DAGs, api.DagSpec{
|
||||
DataSelector: &textSelector,
|
||||
ExportMerkleProof: exportMerkleProof,
|
||||
})
|
||||
eref.Root = carRoot
|
||||
|
||||
err = miner.DealsImportData(ctx, *proposalCid, sourceCar)
|
||||
require.NoError(t, err)
|
||||
// test retrieval of either data or constructing a partial selective-car
|
||||
for _, retrieveAsCar := range []bool{false, true} {
|
||||
outFile, err := ioutil.TempFile(t.TempDir(), "ret-file")
|
||||
require.NoError(t, err)
|
||||
defer outFile.Close() //nolint:errcheck
|
||||
|
||||
// Wait for the deal to be published, we should be able to start retrieval right away
|
||||
dh.WaitDealPublished(ctx, proposalCid)
|
||||
require.NoError(t, testGenesisRetrieval(
|
||||
ctx,
|
||||
client,
|
||||
retOrder,
|
||||
eref,
|
||||
&api.FileRef{
|
||||
Path: outFile.Name(),
|
||||
IsCAR: retrieveAsCar,
|
||||
},
|
||||
outFile,
|
||||
))
|
||||
|
||||
offers, err := client.ClientFindData(ctx, carRoot, nil)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, offers, "no offers")
|
||||
|
||||
retOrder = offers[0].Order(caddr)
|
||||
}
|
||||
|
||||
retOrder.DataSelector = &textSelector
|
||||
eref.DAGs = append(eref.DAGs, api.DagSpec{
|
||||
DataSelector: &textSelector,
|
||||
})
|
||||
eref.Root = carRoot
|
||||
|
||||
// test retrieval of either data or constructing a partial selective-car
|
||||
for _, retrieveAsCar := range []bool{false, true} {
|
||||
outFile, err := ioutil.TempFile(t.TempDir(), "ret-file")
|
||||
require.NoError(t, err)
|
||||
defer outFile.Close() //nolint:errcheck
|
||||
|
||||
require.NoError(t, testGenesisRetrieval(
|
||||
ctx,
|
||||
client,
|
||||
retOrder,
|
||||
eref,
|
||||
&api.FileRef{
|
||||
Path: outFile.Name(),
|
||||
IsCAR: retrieveAsCar,
|
||||
},
|
||||
outFile,
|
||||
))
|
||||
|
||||
// UGH if I do not sleep here, I get things like:
|
||||
/*
|
||||
retrieval failed: Retrieve failed: there is an active retrieval deal with peer 12D3KooWK9fB9a3HZ4PQLVmEQ6pweMMn5CAyKtumB71CPTnuBDi6 for payload CID bafy2bzacecnamqgqmifpluoeldx7zzglxcljo6oja4vrmtj7432rphldpdmm2 (retrieval deal ID 1631259332180384709, state DealStatusFinalizingBlockstore) - existing deal must be cancelled before starting a new retrieval deal:
|
||||
github.com/filecoin-project/lotus/node/impl/client.(*API).ClientRetrieve
|
||||
/home/circleci/project/node/impl/client/client.go:774
|
||||
*/
|
||||
time.Sleep(time.Second)
|
||||
// UGH if I do not sleep here, I get things like:
|
||||
/*
|
||||
retrieval failed: Retrieve failed: there is an active retrieval deal with peer 12D3KooWK9fB9a3HZ4PQLVmEQ6pweMMn5CAyKtumB71CPTnuBDi6 for payload CID bafy2bzacecnamqgqmifpluoeldx7zzglxcljo6oja4vrmtj7432rphldpdmm2 (retrieval deal ID 1631259332180384709, state DealStatusFinalizingBlockstore) - existing deal must be cancelled before starting a new retrieval deal:
|
||||
github.com/filecoin-project/lotus/node/impl/client.(*API).ClientRetrieve
|
||||
/home/circleci/project/node/impl/client/client.go:774
|
||||
*/
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -213,8 +217,10 @@ func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrde
|
||||
|
||||
if len(cr.Header.Roots) != 1 {
|
||||
return fmt.Errorf("expected a single root in result car, got %d", len(cr.Header.Roots))
|
||||
} else if cr.Header.Roots[0].String() != carRoot.String() {
|
||||
} else if eref.DAGs[0].ExportMerkleProof && cr.Header.Roots[0].String() != carRoot.String() {
|
||||
return fmt.Errorf("expected root cid '%s', got '%s'", carRoot.String(), cr.Header.Roots[0].String())
|
||||
} else if !eref.DAGs[0].ExportMerkleProof && cr.Header.Roots[0].String() != selectedCid.String() {
|
||||
return fmt.Errorf("expected root cid '%s', got '%s'", selectedCid.String(), cr.Header.Roots[0].String())
|
||||
}
|
||||
|
||||
blks := make([]blocks.Block, 0)
|
||||
@ -229,11 +235,11 @@ func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrde
|
||||
blks = append(blks, b)
|
||||
}
|
||||
|
||||
if len(blks) != 3 {
|
||||
return fmt.Errorf("expected a car file with 3 blocks, got one with %d instead", len(blks))
|
||||
if (eref.DAGs[0].ExportMerkleProof && len(blks) != 3) || (!eref.DAGs[0].ExportMerkleProof && len(blks) != 1) {
|
||||
return fmt.Errorf("expected a car file with 3/1 blocks, got one with %d instead", len(blks))
|
||||
}
|
||||
|
||||
data = blks[2].RawData()
|
||||
data = blks[len(blks)-1].RawData()
|
||||
}
|
||||
|
||||
if string(data) != expectedResult {
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
format "github.com/ipfs/go-ipld-format"
|
||||
unixfile "github.com/ipfs/go-unixfs/file"
|
||||
"github.com/ipld/go-car"
|
||||
"github.com/ipld/go-car/util"
|
||||
carv2 "github.com/ipld/go-car/v2"
|
||||
carv2bs "github.com/ipld/go-car/v2/blockstore"
|
||||
"github.com/ipld/go-ipld-prime/datamodel"
|
||||
@ -763,7 +764,7 @@ func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID rm.DealID) e
|
||||
}
|
||||
}
|
||||
|
||||
func getDataSelector(dps *api.Selector) (datamodel.Node, error) {
|
||||
func getDataSelector(dps *api.Selector, matchPath bool) (datamodel.Node, error) {
|
||||
sel := selectorparse.CommonSelector_ExploreAllRecursively
|
||||
if dps != nil {
|
||||
|
||||
@ -777,13 +778,11 @@ func getDataSelector(dps *api.Selector) (datamodel.Node, error) {
|
||||
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
|
||||
|
||||
selspec, err := textselector.SelectorSpecFromPath(
|
||||
textselector.Expression(*dps),
|
||||
textselector.Expression(*dps), matchPath,
|
||||
|
||||
// URGH - this is a direct copy from https://github.com/filecoin-project/go-fil-markets/blob/v1.12.0/shared/selectors.go#L10-L16
|
||||
// Unable to use it because we need the SelectorSpec, and markets exposes just a reified node
|
||||
ssb.ExploreRecursive(
|
||||
selector.RecursionLimitNone(),
|
||||
ssb.ExploreAll(ssb.ExploreRecursiveEdge()),
|
||||
ssb.ExploreUnion(ssb.Matcher(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())),
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
@ -799,7 +798,7 @@ func getDataSelector(dps *api.Selector) (datamodel.Node, error) {
|
||||
}
|
||||
|
||||
func (a *API) ClientRetrieve(ctx context.Context, params api.RetrievalOrder) (*api.RestrievalRes, error) {
|
||||
sel, err := getDataSelector(params.DataSelector)
|
||||
sel, err := getDataSelector(params.DataSelector, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -973,24 +972,14 @@ func (a *API) ClientExportInto(ctx context.Context, exportRef api.ExportRef, car
|
||||
if !retrieveIntoIPFS && len(exportRef.DAGs) == 0 && dest.Writer == nil {
|
||||
return carv2.ExtractV1File(carPath, dest.Path)
|
||||
}
|
||||
|
||||
// if this is a path-selector, the user expects the car to start from the
|
||||
// root they asked for ( full merkle proof, no heuristic )
|
||||
if len(exportRef.DAGs) == 1 && exportRef.DAGs[0].DataSelector != nil && !strings.HasPrefix(string(*exportRef.DAGs[0].DataSelector), "{") {
|
||||
sel, err := getDataSelector(exportRef.DAGs[0].DataSelector)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("parsing dag spec: %w", err)
|
||||
}
|
||||
return a.outputCAR(ctx, []dagSpec{{root: exportRef.Root, selector: sel}}, retrievalBs, dest)
|
||||
}
|
||||
}
|
||||
|
||||
roots, err := parseDagSpec(ctx, exportRef.Root, exportRef.DAGs, dserv, !car)
|
||||
roots, err := parseDagSpec(ctx, exportRef.Root, exportRef.DAGs, dserv, car)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("parsing dag spec: %w", err)
|
||||
}
|
||||
if car {
|
||||
return a.outputCAR(ctx, roots, retrievalBs, dest)
|
||||
return a.outputCAR(ctx, dserv, retrievalBs, exportRef.Root, roots, dest)
|
||||
}
|
||||
|
||||
if len(roots) != 1 {
|
||||
@ -1000,32 +989,101 @@ func (a *API) ClientExportInto(ctx context.Context, exportRef api.ExportRef, car
|
||||
return a.outputUnixFS(ctx, roots[0].root, dserv, dest)
|
||||
}
|
||||
|
||||
func (a *API) outputCAR(ctx context.Context, dags []dagSpec, bs bstore.Blockstore, dest ExportDest) error {
|
||||
func (a *API) outputCAR(ctx context.Context, ds format.DAGService, bs bstore.Blockstore, root cid.Cid, dags []dagSpec, dest ExportDest) error {
|
||||
// generating a CARv1 from the configured blockstore
|
||||
carDags := make([]car.Dag, len(dags))
|
||||
roots := make([]cid.Cid, len(dags))
|
||||
for i, dag := range dags {
|
||||
carDags[i] = car.Dag{
|
||||
Root: dag.root,
|
||||
Selector: dag.selector,
|
||||
}
|
||||
roots[i] = dag.root
|
||||
}
|
||||
|
||||
return dest.doWrite(func(w io.Writer) error {
|
||||
return car.NewSelectiveCar(
|
||||
ctx,
|
||||
bs,
|
||||
carDags,
|
||||
car.MaxTraversalLinks(config.MaxTraversalLinks),
|
||||
).Write(w)
|
||||
|
||||
if err := car.WriteHeader(&car.CarHeader{
|
||||
Roots: roots,
|
||||
Version: 1,
|
||||
}, w); err != nil {
|
||||
return fmt.Errorf("failed to write car header: %s", err)
|
||||
}
|
||||
|
||||
cs := cid.NewSet()
|
||||
|
||||
for _, dagSpec := range dags {
|
||||
if err := utils.TraverseDag(
|
||||
ctx,
|
||||
ds,
|
||||
root,
|
||||
dagSpec.selector,
|
||||
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
|
||||
if r == traversal.VisitReason_SelectionMatch {
|
||||
var c cid.Cid
|
||||
if p.LastBlock.Link == nil {
|
||||
c = root
|
||||
} else {
|
||||
cidLnk, castOK := p.LastBlock.Link.(cidlink.Link)
|
||||
if !castOK {
|
||||
return xerrors.Errorf("cidlink cast unexpectedly failed on '%s'", p.LastBlock.Link)
|
||||
}
|
||||
|
||||
c = cidLnk.Cid
|
||||
}
|
||||
|
||||
if cs.Visit(c) {
|
||||
nb, err := bs.Get(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting block data: %w", err)
|
||||
}
|
||||
|
||||
err = util.LdWrite(w, c.Bytes(), nb.RawData())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("writing block data: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
},
|
||||
); err != nil {
|
||||
return xerrors.Errorf("error while traversing car dag: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, ds format.DAGService, dest ExportDest) error {
|
||||
nd, err := ds.Get(ctx, root)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("ClientRetrieve: %w", err)
|
||||
}
|
||||
file, err := unixfile.NewUnixfsFile(ctx, ds, nd)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("ClientRetrieve: %w", err)
|
||||
}
|
||||
|
||||
if dest.Writer == nil {
|
||||
return files.WriteTo(file, dest.Path)
|
||||
}
|
||||
|
||||
switch f := file.(type) {
|
||||
case files.File:
|
||||
_, err = io.Copy(dest.Writer, f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("file type %T is not supported", nd)
|
||||
}
|
||||
}
|
||||
|
||||
type dagSpec struct {
|
||||
root cid.Cid
|
||||
selector ipld.Node
|
||||
}
|
||||
|
||||
func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds format.DAGService, rootOnNodeBoundary bool) ([]dagSpec, error) {
|
||||
func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds format.DAGService, car bool) ([]dagSpec, error) {
|
||||
if len(dsp) == 0 {
|
||||
return []dagSpec{
|
||||
{
|
||||
@ -1044,7 +1102,7 @@ func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds forma
|
||||
|
||||
// reify selector
|
||||
var err error
|
||||
out[i].selector, err = getDataSelector(spec.DataSelector)
|
||||
out[i].selector, err = getDataSelector(spec.DataSelector, car && spec.ExportMerkleProof)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -1059,7 +1117,7 @@ func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds forma
|
||||
return nil, xerrors.Errorf("failed to parse json-selector '%s': %w", *spec.DataSelector, err)
|
||||
}
|
||||
} else {
|
||||
selspec, _ := textselector.SelectorSpecFromPath(textselector.Expression(*spec.DataSelector), nil) //nolint:errcheck
|
||||
selspec, _ := textselector.SelectorSpecFromPath(textselector.Expression(*spec.DataSelector), car && spec.ExportMerkleProof, nil) //nolint:errcheck
|
||||
rsn = selspec.Node()
|
||||
}
|
||||
|
||||
@ -1072,7 +1130,7 @@ func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds forma
|
||||
rsn,
|
||||
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
|
||||
if r == traversal.VisitReason_SelectionMatch {
|
||||
if rootOnNodeBoundary && p.LastBlock.Path.String() != p.Path.String() {
|
||||
if !car && p.LastBlock.Path.String() != p.Path.String() {
|
||||
return xerrors.Errorf("unsupported selection path '%s' does not correspond to a block boundary (a.k.a. CID link)", p.Path.String())
|
||||
}
|
||||
|
||||
@ -1107,32 +1165,6 @@ func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds forma
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, ds format.DAGService, dest ExportDest) error {
|
||||
nd, err := ds.Get(ctx, root)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("ClientRetrieve: %w", err)
|
||||
}
|
||||
file, err := unixfile.NewUnixfsFile(ctx, ds, nd)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("ClientRetrieve: %w", err)
|
||||
}
|
||||
|
||||
if dest.Writer == nil {
|
||||
return files.WriteTo(file, dest.Path)
|
||||
}
|
||||
|
||||
switch f := file.(type) {
|
||||
case files.File:
|
||||
_, err = io.Copy(dest.Writer, f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("file type %T is not supported", nd)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *API) ClientListRetrievals(ctx context.Context) ([]api.RetrievalInfo, error) {
|
||||
deals, err := a.Retrieval.ListDeals()
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user