Merge pull request #6393 from filecoin-project/feat/datamodel-selector-retrieval
Feat/datamodel selector retrieval
This commit is contained in:
commit
26ae8bb3a7
@ -809,6 +809,11 @@ workflows:
|
||||
suite: itest-deals_padding
|
||||
target: "./itests/deals_padding_test.go"
|
||||
|
||||
- test:
|
||||
name: test-itest-deals_partial_retrieval
|
||||
suite: itest-deals_partial_retrieval
|
||||
target: "./itests/deals_partial_retrieval_test.go"
|
||||
|
||||
- test:
|
||||
name: test-itest-deals_power
|
||||
suite: itest-deals_power
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
textselector "github.com/ipld/go-ipld-selector-text-lite"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
@ -932,6 +933,7 @@ type RetrievalOrder struct {
|
||||
// TODO: make this less unixfs specific
|
||||
Root cid.Cid
|
||||
Piece *cid.Cid
|
||||
DatamodelPathSelector *textselector.Expression
|
||||
Size uint64
|
||||
|
||||
FromLocalCAR string // if specified, get data from a local CARv2 file.
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
filestore2 "github.com/filecoin-project/go-fil-markets/filestore"
|
||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||
"github.com/filecoin-project/go-jsonrpc/auth"
|
||||
textselector "github.com/ipld/go-ipld-selector-text-lite"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/crypto"
|
||||
@ -90,6 +91,7 @@ func init() {
|
||||
addExample(&pid)
|
||||
|
||||
storeIDExample := imports.ID(50)
|
||||
textSelExample := textselector.Expression("Links/21/Hash/Links/42/Hash")
|
||||
|
||||
addExample(bitfield.NewFromSet([]uint64{5}))
|
||||
addExample(abi.RegisteredSealProof_StackedDrg32GiBV1_1)
|
||||
@ -124,6 +126,7 @@ func init() {
|
||||
addExample(&storeIDExample)
|
||||
addExample(retrievalmarket.ClientEventDealAccepted)
|
||||
addExample(retrievalmarket.DealStatusNew)
|
||||
addExample(&textSelExample)
|
||||
addExample(network.ReachabilityPublic)
|
||||
addExample(build.NewestNetworkVersion)
|
||||
addExample(map[string]int{"name": 42})
|
||||
|
Binary file not shown.
@ -26,6 +26,7 @@ import (
|
||||
datatransfer "github.com/filecoin-project/go-data-transfer"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-cidutil/cidenc"
|
||||
textselector "github.com/ipld/go-ipld-selector-text-lite"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/multiformats/go-multibase"
|
||||
"github.com/urfave/cli/v2"
|
||||
@ -1047,6 +1048,10 @@ var clientRetrieveCmd = &cli.Command{
|
||||
Name: "miner",
|
||||
Usage: "miner address for retrieval, if not present it'll use local discovery",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "datamodel-path-selector",
|
||||
Usage: "a rudimentary (DM-level-only) text-path selector, allowing for sub-selection within a deal",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "maxPrice",
|
||||
Usage: fmt.Sprintf("maximum price the client is willing to consider (default: %s FIL)", DefaultMaxRetrievePrice),
|
||||
@ -1182,6 +1187,10 @@ var clientRetrieveCmd = &cli.Command{
|
||||
IsCAR: cctx.Bool("car"),
|
||||
}
|
||||
|
||||
if sel := textselector.Expression(cctx.String("datamodel-path-selector")); sel != "" {
|
||||
order.DatamodelPathSelector = &sel
|
||||
}
|
||||
|
||||
updates, err := fapi.ClientRetrieveWithEvents(ctx, *order, ref)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error setting up retrieval: %w", err)
|
||||
|
@ -1469,6 +1469,7 @@ Inputs:
|
||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||
},
|
||||
"Piece": null,
|
||||
"DatamodelPathSelector": "Links/21/Hash/Links/42/Hash",
|
||||
"Size": 42,
|
||||
"FromLocalCAR": "string value",
|
||||
"Total": "0",
|
||||
@ -1523,6 +1524,7 @@ Inputs:
|
||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||
},
|
||||
"Piece": null,
|
||||
"DatamodelPathSelector": "Links/21/Hash/Links/42/Hash",
|
||||
"Size": 42,
|
||||
"FromLocalCAR": "string value",
|
||||
"Total": "0",
|
||||
|
@ -1481,6 +1481,7 @@ Inputs:
|
||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||
},
|
||||
"Piece": null,
|
||||
"DatamodelPathSelector": "Links/21/Hash/Links/42/Hash",
|
||||
"Size": 42,
|
||||
"FromLocalCAR": "string value",
|
||||
"Total": "0",
|
||||
@ -1535,6 +1536,7 @@ Inputs:
|
||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||
},
|
||||
"Piece": null,
|
||||
"DatamodelPathSelector": "Links/21/Hash/Links/42/Hash",
|
||||
"Size": 42,
|
||||
"FromLocalCAR": "string value",
|
||||
"Total": "0",
|
||||
|
@ -548,6 +548,7 @@ OPTIONS:
|
||||
--from value address to send transactions from
|
||||
--car export to a car file instead of a regular file (default: false)
|
||||
--miner value miner address for retrieval, if not present it'll use local discovery
|
||||
--datamodel-path-selector value a rudimentary (DM-level-only) text-path selector, allowing for sub-selection within a deal
|
||||
--maxPrice value maximum price the client is willing to consider (default: 0.01 FIL)
|
||||
--pieceCid value require data to be retrieved from a specific Piece CID
|
||||
--allow-local (default: false)
|
||||
|
2
go.mod
2
go.mod
@ -101,7 +101,9 @@ require (
|
||||
github.com/ipfs/interface-go-ipfs-core v0.4.0
|
||||
github.com/ipld/go-car v0.3.2-0.20211001225732-32d0d9933823
|
||||
github.com/ipld/go-car/v2 v2.0.3-0.20210811121346-c514a30114d7
|
||||
github.com/ipld/go-codec-dagpb v1.3.0
|
||||
github.com/ipld/go-ipld-prime v0.12.3
|
||||
github.com/ipld/go-ipld-selector-text-lite v0.0.0-20210817134355-4c190a2bb825
|
||||
github.com/kelseyhightower/envconfig v1.4.0
|
||||
github.com/libp2p/go-buffer-pool v0.0.2
|
||||
github.com/libp2p/go-eventbus v0.2.1
|
||||
|
3
go.sum
3
go.sum
@ -815,6 +815,7 @@ github.com/ipld/go-ipld-prime v0.0.2-0.20200428162820-8b59dc292b8e/go.mod h1:uVI
|
||||
github.com/ipld/go-ipld-prime v0.5.1-0.20200828233916-988837377a7f/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM=
|
||||
github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM=
|
||||
github.com/ipld/go-ipld-prime v0.9.0/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8=
|
||||
github.com/ipld/go-ipld-prime v0.10.0/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8=
|
||||
github.com/ipld/go-ipld-prime v0.11.0/go.mod h1:+WIAkokurHmZ/KwzDOMUuoeJgaRQktHtEaLglS3ZeV8=
|
||||
github.com/ipld/go-ipld-prime v0.12.3-0.20210930132912-0b3aef3ca569/go.mod h1:PaeLYq8k6dJLmDUSLrzkEpoGV4PEfe/1OtFN/eALOc8=
|
||||
github.com/ipld/go-ipld-prime v0.12.3 h1:furVobw7UBLQZwlEwfE26tYORy3PAK8VYSgZOSr3JMQ=
|
||||
@ -823,6 +824,8 @@ github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5/go.mod h1
|
||||
github.com/ipld/go-ipld-prime-proto v0.0.0-20200428191222-c1ffdadc01e1/go.mod h1:OAV6xBmuTLsPZ+epzKkPB1e25FHk/vCtyatkdHcArLs=
|
||||
github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6/go.mod h1:3pHYooM9Ea65jewRwrb2u5uHZCNkNTe9ABsVB+SrkH0=
|
||||
github.com/ipld/go-ipld-prime-proto v0.1.0/go.mod h1:11zp8f3sHVgIqtb/c9Kr5ZGqpnCLF1IVTNOez9TopzE=
|
||||
github.com/ipld/go-ipld-selector-text-lite v0.0.0-20210817134355-4c190a2bb825 h1:sGlmVUuWEhuJpVsErFqCHWy9XTsIy511hZWRWI/Lc4I=
|
||||
github.com/ipld/go-ipld-selector-text-lite v0.0.0-20210817134355-4c190a2bb825/go.mod h1:U2CQmFb+uWzfIEF3I1arrDa5rwtj00PrpiwwCO+k1RM=
|
||||
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=
|
||||
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4=
|
||||
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
|
||||
|
221
itests/deals_partial_retrieval_test.go
Normal file
221
itests/deals_partial_retrieval_test.go
Normal file
@ -0,0 +1,221 @@
|
||||
package itests
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors/policy"
|
||||
"github.com/filecoin-project/lotus/itests/kit"
|
||||
blocks "github.com/ipfs/go-block-format"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipld/go-car"
|
||||
textselector "github.com/ipld/go-ipld-selector-text-lite"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// use the mainnet carfile as text fixture: it will always be here
|
||||
// https://dweb.link/ipfs/bafy2bzacecnamqgqmifpluoeldx7zzglxcljo6oja4vrmtj7432rphldpdmm2/8/1/8/1/0/1/0
|
||||
var (
|
||||
sourceCar = "../build/genesis/mainnet.car"
|
||||
carRoot, _ = cid.Parse("bafy2bzacecnamqgqmifpluoeldx7zzglxcljo6oja4vrmtj7432rphldpdmm2")
|
||||
carCommp, _ = cid.Parse("baga6ea4seaqmrivgzei3fmx5qxtppwankmtou6zvigyjaveu3z2zzwhysgzuina")
|
||||
carPieceSize = abi.PaddedPieceSize(2097152)
|
||||
textSelector = textselector.Expression("8/1/8/1/0/1/0")
|
||||
textSelectorNonLink = textselector.Expression("8/1/8/1/0/1")
|
||||
textSelectorNonexistent = textselector.Expression("42")
|
||||
expectedResult = "fil/1/storagepower"
|
||||
)
|
||||
|
||||
func TestPartialRetrieval(t *testing.T) {
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
policy.SetPreCommitChallengeDelay(2)
|
||||
kit.EnableLargeSectors(t)
|
||||
kit.QuietMiningLogs()
|
||||
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.MockProofs(), kit.SectorSize(512<<20))
|
||||
dh := kit.NewDealHarness(t, client, miner, miner)
|
||||
ens.InterconnectAll().BeginMining(50 * time.Millisecond)
|
||||
|
||||
_, err := client.ClientImport(ctx, api.FileRef{Path: sourceCar, IsCAR: true})
|
||||
require.NoError(t, err)
|
||||
|
||||
caddr, err := client.WalletDefaultAddress(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// first test retrieval from local car, then do an actual deal
|
||||
for _, fullCycle := range []bool{false, true} {
|
||||
|
||||
var retOrder api.RetrievalOrder
|
||||
|
||||
if !fullCycle {
|
||||
|
||||
retOrder.FromLocalCAR = sourceCar
|
||||
retOrder.Root = carRoot
|
||||
|
||||
} else {
|
||||
|
||||
dp := dh.DefaultStartDealParams()
|
||||
dp.Data = &storagemarket.DataRef{
|
||||
// FIXME: figure out how to do this with an online partial transfer
|
||||
TransferType: storagemarket.TTManual,
|
||||
Root: carRoot,
|
||||
PieceCid: &carCommp,
|
||||
PieceSize: carPieceSize.Unpadded(),
|
||||
}
|
||||
proposalCid := dh.StartDeal(ctx, dp)
|
||||
|
||||
// Wait for the deal to reach StorageDealCheckForAcceptance on the client
|
||||
cd, err := client.ClientGetDealInfo(ctx, *proposalCid)
|
||||
require.NoError(t, err)
|
||||
require.Eventually(t, func() bool {
|
||||
cd, _ := client.ClientGetDealInfo(ctx, *proposalCid)
|
||||
return cd.State == storagemarket.StorageDealCheckForAcceptance
|
||||
}, 30*time.Second, 1*time.Second, "actual deal status is %s", storagemarket.DealStates[cd.State])
|
||||
|
||||
err = miner.DealsImportData(ctx, *proposalCid, sourceCar)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait for the deal to be published, we should be able to start retrieval right away
|
||||
dh.WaitDealPublished(ctx, proposalCid)
|
||||
|
||||
offers, err := client.ClientFindData(ctx, carRoot, nil)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, offers, "no offers")
|
||||
|
||||
retOrder = offers[0].Order(caddr)
|
||||
}
|
||||
|
||||
retOrder.DatamodelPathSelector = &textSelector
|
||||
|
||||
// test retrieval of either data or constructing a partial selective-car
|
||||
for _, retrieveAsCar := range []bool{false, true} {
|
||||
outFile, err := ioutil.TempFile(t.TempDir(), "ret-file")
|
||||
require.NoError(t, err)
|
||||
defer outFile.Close() //nolint:errcheck
|
||||
|
||||
require.NoError(t, testGenesisRetrieval(
|
||||
ctx,
|
||||
client,
|
||||
retOrder,
|
||||
&api.FileRef{
|
||||
Path: outFile.Name(),
|
||||
IsCAR: retrieveAsCar,
|
||||
},
|
||||
outFile,
|
||||
))
|
||||
|
||||
// UGH if I do not sleep here, I get things like:
|
||||
/*
|
||||
retrieval failed: Retrieve failed: there is an active retrieval deal with peer 12D3KooWK9fB9a3HZ4PQLVmEQ6pweMMn5CAyKtumB71CPTnuBDi6 for payload CID bafy2bzacecnamqgqmifpluoeldx7zzglxcljo6oja4vrmtj7432rphldpdmm2 (retrieval deal ID 1631259332180384709, state DealStatusFinalizingBlockstore) - existing deal must be cancelled before starting a new retrieval deal:
|
||||
github.com/filecoin-project/lotus/node/impl/client.(*API).ClientRetrieve
|
||||
/home/circleci/project/node/impl/client/client.go:774
|
||||
*/
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// ensure non-existent paths fail
|
||||
require.EqualError(
|
||||
t,
|
||||
testGenesisRetrieval(
|
||||
ctx,
|
||||
client,
|
||||
api.RetrievalOrder{
|
||||
FromLocalCAR: sourceCar,
|
||||
Root: carRoot,
|
||||
DatamodelPathSelector: &textSelectorNonexistent,
|
||||
},
|
||||
&api.FileRef{},
|
||||
nil,
|
||||
),
|
||||
fmt.Sprintf("retrieval failed: path selection '%s' does not match a node within %s", textSelectorNonexistent, carRoot),
|
||||
)
|
||||
|
||||
// ensure non-boundary retrievals fail
|
||||
require.EqualError(
|
||||
t,
|
||||
testGenesisRetrieval(
|
||||
ctx,
|
||||
client,
|
||||
api.RetrievalOrder{
|
||||
FromLocalCAR: sourceCar,
|
||||
Root: carRoot,
|
||||
DatamodelPathSelector: &textSelectorNonLink,
|
||||
},
|
||||
&api.FileRef{},
|
||||
nil,
|
||||
),
|
||||
fmt.Sprintf("retrieval failed: error while locating partial retrieval sub-root: unsupported selection path '%s' does not correspond to a block boundary (a.k.a. CID link)", textSelectorNonLink),
|
||||
)
|
||||
}
|
||||
|
||||
func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrder api.RetrievalOrder, retRef *api.FileRef, outFile *os.File) error {
|
||||
|
||||
if retOrder.Total.Nil() {
|
||||
retOrder.Total = big.Zero()
|
||||
}
|
||||
if retOrder.UnsealPrice.Nil() {
|
||||
retOrder.UnsealPrice = big.Zero()
|
||||
}
|
||||
|
||||
err := client.ClientRetrieve(ctx, retOrder, retRef)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var data []byte
|
||||
if !retRef.IsCAR {
|
||||
|
||||
data, err = io.ReadAll(outFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
cr, err := car.NewCarReader(outFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(cr.Header.Roots) != 1 {
|
||||
return fmt.Errorf("expected a single root in result car, got %d", len(cr.Header.Roots))
|
||||
} else if cr.Header.Roots[0].String() != carRoot.String() {
|
||||
return fmt.Errorf("expected root cid '%s', got '%s'", carRoot.String(), cr.Header.Roots[0].String())
|
||||
}
|
||||
|
||||
blks := make([]blocks.Block, 0)
|
||||
for {
|
||||
b, err := cr.Next()
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
blks = append(blks, b)
|
||||
}
|
||||
|
||||
if len(blks) != 3 {
|
||||
return fmt.Errorf("expected a car file with 3 blocks, got one with %d instead", len(blks))
|
||||
}
|
||||
|
||||
data = blks[2].RawData()
|
||||
}
|
||||
|
||||
if string(data) != expectedResult {
|
||||
return fmt.Errorf("retrieved data mismatch: expected '%s' got '%s'", expectedResult, data)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
89
markets/utils/selectors.go
Normal file
89
markets/utils/selectors.go
Normal file
@ -0,0 +1,89 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
// must be imported to init() raw-codec support
|
||||
_ "github.com/ipld/go-ipld-prime/codec/raw"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
mdagipld "github.com/ipfs/go-ipld-format"
|
||||
dagpb "github.com/ipld/go-codec-dagpb"
|
||||
"github.com/ipld/go-ipld-prime"
|
||||
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
|
||||
basicnode "github.com/ipld/go-ipld-prime/node/basic"
|
||||
"github.com/ipld/go-ipld-prime/traversal"
|
||||
"github.com/ipld/go-ipld-prime/traversal/selector"
|
||||
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
|
||||
)
|
||||
|
||||
func TraverseDag(
|
||||
ctx context.Context,
|
||||
ds mdagipld.DAGService,
|
||||
startFrom cid.Cid,
|
||||
optionalSelector ipld.Node,
|
||||
visitCallback traversal.AdvVisitFn,
|
||||
) error {
|
||||
|
||||
if optionalSelector == nil {
|
||||
optionalSelector = selectorparse.CommonSelector_MatchAllRecursively
|
||||
}
|
||||
|
||||
parsedSelector, err := selector.ParseSelector(optionalSelector)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// not sure what this is for TBH: we also provide ctx in &traversal.Config{}
|
||||
linkContext := ipld.LinkContext{Ctx: ctx}
|
||||
|
||||
// this is what allows us to understand dagpb
|
||||
nodePrototypeChooser := dagpb.AddSupportToChooser(
|
||||
func(ipld.Link, ipld.LinkContext) (ipld.NodePrototype, error) {
|
||||
return basicnode.Prototype.Any, nil
|
||||
},
|
||||
)
|
||||
|
||||
// this is how we implement GETs
|
||||
linkSystem := cidlink.DefaultLinkSystem()
|
||||
linkSystem.StorageReadOpener = func(lctx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
|
||||
cl, isCid := lnk.(cidlink.Link)
|
||||
if !isCid {
|
||||
return nil, fmt.Errorf("unexpected link type %#v", lnk)
|
||||
}
|
||||
|
||||
node, err := ds.Get(lctx.Ctx, cl.Cid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return bytes.NewBuffer(node.RawData()), nil
|
||||
}
|
||||
|
||||
// this is how we pull the start node out of the DS
|
||||
startLink := cidlink.Link{Cid: startFrom}
|
||||
startNodePrototype, err := nodePrototypeChooser(startLink, linkContext)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
startNode, err := linkSystem.Load(
|
||||
linkContext,
|
||||
startLink,
|
||||
startNodePrototype,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// this is the actual execution, invoking the supplied callback
|
||||
return traversal.Progress{
|
||||
Cfg: &traversal.Config{
|
||||
Ctx: ctx,
|
||||
LinkSystem: linkSystem,
|
||||
LinkTargetNodePrototypeChooser: nodePrototypeChooser,
|
||||
},
|
||||
}.WalkAdv(startNode, parsedSelector, visitCallback)
|
||||
}
|
@ -24,10 +24,15 @@ import (
|
||||
"github.com/ipfs/go-cid"
|
||||
offline "github.com/ipfs/go-ipfs-exchange-offline"
|
||||
files "github.com/ipfs/go-ipfs-files"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/ipfs/go-merkledag"
|
||||
"github.com/ipld/go-ipld-prime"
|
||||
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
|
||||
basicnode "github.com/ipld/go-ipld-prime/node/basic"
|
||||
"github.com/ipld/go-ipld-prime/traversal"
|
||||
"github.com/ipld/go-ipld-prime/traversal/selector"
|
||||
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
|
||||
textselector "github.com/ipld/go-ipld-selector-text-lite"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/multiformats/go-multibase"
|
||||
@ -69,6 +74,8 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
)
|
||||
|
||||
var log = logging.Logger("client")
|
||||
|
||||
var DefaultHashFunction = uint64(mh.BLAKE2B_MIN + 31)
|
||||
|
||||
// 8 days ~= SealDuration + PreCommit + MaxProveCommitDuration + 8 hour buffer
|
||||
@ -501,7 +508,7 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (res *api.Impor
|
||||
}
|
||||
|
||||
if ref.IsCAR {
|
||||
// user gave us a CAR fil, use it as-is
|
||||
// user gave us a CAR file, use it as-is
|
||||
// validate that it's either a carv1 or carv2, and has one root.
|
||||
f, err := os.Open(ref.Path)
|
||||
if err != nil {
|
||||
@ -836,6 +843,29 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
||||
}
|
||||
|
||||
sel := shared.AllSelector()
|
||||
if order.DatamodelPathSelector != nil {
|
||||
|
||||
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
|
||||
|
||||
selspec, err := textselector.SelectorSpecFromPath(
|
||||
|
||||
*order.DatamodelPathSelector,
|
||||
|
||||
// URGH - this is a direct copy from https://github.com/filecoin-project/go-fil-markets/blob/v1.12.0/shared/selectors.go#L10-L16
|
||||
// Unable to use it because we need the SelectorSpec, and markets exposes just a reified node
|
||||
ssb.ExploreRecursive(
|
||||
selector.RecursionLimitNone(),
|
||||
ssb.ExploreAll(ssb.ExploreRecursiveEdge()),
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
finish(xerrors.Errorf("failed to parse text-selector '%s': %w", *order.DatamodelPathSelector, err))
|
||||
return
|
||||
}
|
||||
|
||||
sel = selspec.Node()
|
||||
log.Infof("partial retrieval of datamodel-path-selector %s/*", *order.DatamodelPathSelector)
|
||||
}
|
||||
|
||||
// summary:
|
||||
// 1. if we're retrieving from an import, FromLocalCAR will be set.
|
||||
@ -962,8 +992,8 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
||||
// Are we outputting a CAR?
|
||||
if ref.IsCAR {
|
||||
|
||||
// not IPFS - just extract the CARv1 from the CARv2 we stored the retrieval in
|
||||
if !retrieveIntoIPFS {
|
||||
// not IPFS and we do full selection - just extract the CARv1 from the CARv2 we stored the retrieval in
|
||||
if !retrieveIntoIPFS && order.DatamodelPathSelector == nil {
|
||||
finish(carv2.ExtractV1File(carPath, ref.Path))
|
||||
return
|
||||
}
|
||||
@ -997,6 +1027,46 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
||||
ds := merkledag.NewDAGService(blockservice.New(retrievalBs, offline.Exchange(retrievalBs)))
|
||||
root := order.Root
|
||||
|
||||
// if we used a selector - need to find the sub-root the user actually wanted to retrieve
|
||||
if order.DatamodelPathSelector != nil {
|
||||
|
||||
var subRootFound bool
|
||||
|
||||
// no err check - we just compiled this before starting, but now we do not wrap a `*`
|
||||
selspec, _ := textselector.SelectorSpecFromPath(*order.DatamodelPathSelector, nil) //nolint:errcheck
|
||||
if err := utils.TraverseDag(
|
||||
ctx,
|
||||
ds,
|
||||
root,
|
||||
selspec.Node(),
|
||||
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
|
||||
if r == traversal.VisitReason_SelectionMatch {
|
||||
|
||||
if p.LastBlock.Path.String() != p.Path.String() {
|
||||
return xerrors.Errorf("unsupported selection path '%s' does not correspond to a block boundary (a.k.a. CID link)", p.Path.String())
|
||||
}
|
||||
|
||||
cidLnk, castOK := p.LastBlock.Link.(cidlink.Link)
|
||||
if !castOK {
|
||||
return xerrors.Errorf("cidlink cast unexpectedly failed on '%s'", p.LastBlock.Link.String())
|
||||
}
|
||||
|
||||
root = cidLnk.Cid
|
||||
subRootFound = true
|
||||
}
|
||||
return nil
|
||||
},
|
||||
); err != nil {
|
||||
finish(xerrors.Errorf("error while locating partial retrieval sub-root: %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
if !subRootFound {
|
||||
finish(xerrors.Errorf("path selection '%s' does not match a node within %s", *order.DatamodelPathSelector, root))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
nd, err := ds.Get(ctx, root)
|
||||
if err != nil {
|
||||
finish(xerrors.Errorf("ClientRetrieve: %w", err))
|
||||
|
Loading…
Reference in New Issue
Block a user