Expose basic text-based datamodel selector on retrieval

Syntaxt of selection is located at
https://pkg.go.dev/github.com/ipld/go-ipld-selector-text-lite#SelectorSpecFromPath

Example use, assuming that:
  - The root of the deal is a plain dag-pb unixfs directory
  - The directory is not sharded
  - The user wants to retrieve the first entry in that directory

  lotus client retrieve --miner f0XXXXX --datamodel-path-selector 'Links/0/Hash' bafyROOTCID ~/output

For a much more elaborate example see the top of ./itests/deals_partial_retrieval_test.go
This commit is contained in:
Peter Rabbitson 2021-06-04 11:20:34 +00:00
parent 0a04302fb2
commit 0444435589
13 changed files with 382 additions and 13 deletions

View File

@ -835,6 +835,11 @@ workflows:
suite: itest-deals_padding
target: "./itests/deals_padding_test.go"
- test:
name: test-itest-deals_partial_retrieval
suite: itest-deals_partial_retrieval
target: "./itests/deals_partial_retrieval_test.go"
- test:
name: test-itest-deals_power
suite: itest-deals_power

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/ipfs/go-cid"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/go-address"
@ -931,9 +932,10 @@ type MarketDeal struct {
type RetrievalOrder struct {
// TODO: make this less unixfs specific
Root cid.Cid
Piece *cid.Cid
Size uint64
Root cid.Cid
Piece *cid.Cid
DatamodelPathSelector *textselector.Expression
Size uint64
FromLocalCAR string // if specified, get data from a local CARv2 file.
// TODO: support offset

View File

@ -27,6 +27,7 @@ import (
filestore2 "github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-jsonrpc/auth"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
@ -90,6 +91,7 @@ func init() {
addExample(&pid)
storeIDExample := imports.ID(50)
textSelExample := textselector.Expression("Links/21/Hash/Links/42/Hash")
addExample(bitfield.NewFromSet([]uint64{5}))
addExample(abi.RegisteredSealProof_StackedDrg32GiBV1_1)
@ -124,6 +126,7 @@ func init() {
addExample(&storeIDExample)
addExample(retrievalmarket.ClientEventDealAccepted)
addExample(retrievalmarket.DealStatusNew)
addExample(&textSelExample)
addExample(network.ReachabilityPublic)
addExample(build.NewestNetworkVersion)
addExample(map[string]int{"name": 42})

Binary file not shown.

View File

@ -26,6 +26,7 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil/cidenc"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multibase"
"github.com/urfave/cli/v2"
@ -1047,6 +1048,10 @@ var clientRetrieveCmd = &cli.Command{
Name: "miner",
Usage: "miner address for retrieval, if not present it'll use local discovery",
},
&cli.StringFlag{
Name: "datamodel-path-selector",
Usage: "a rudimentary (DM-level-only) text-path selector, allowing for sub-selection within a deal",
},
&cli.StringFlag{
Name: "maxPrice",
Usage: fmt.Sprintf("maximum price the client is willing to consider (default: %s FIL)", DefaultMaxRetrievePrice),
@ -1182,6 +1187,10 @@ var clientRetrieveCmd = &cli.Command{
IsCAR: cctx.Bool("car"),
}
if sel := textselector.Expression(cctx.String("datamodel-path-selector")); sel != "" {
order.DatamodelPathSelector = &sel
}
updates, err := fapi.ClientRetrieveWithEvents(ctx, *order, ref)
if err != nil {
return xerrors.Errorf("error setting up retrieval: %w", err)

View File

@ -1467,6 +1467,7 @@ Inputs:
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"Piece": null,
"DatamodelPathSelector": "Links/21/Hash/Links/42/Hash",
"Size": 42,
"FromLocalCAR": "string value",
"Total": "0",
@ -1521,6 +1522,7 @@ Inputs:
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"Piece": null,
"DatamodelPathSelector": "Links/21/Hash/Links/42/Hash",
"Size": 42,
"FromLocalCAR": "string value",
"Total": "0",

View File

@ -1531,6 +1531,7 @@ Inputs:
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"Piece": null,
"DatamodelPathSelector": "Links/21/Hash/Links/42/Hash",
"Size": 42,
"FromLocalCAR": "string value",
"Total": "0",
@ -1585,6 +1586,7 @@ Inputs:
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"Piece": null,
"DatamodelPathSelector": "Links/21/Hash/Links/42/Hash",
"Size": 42,
"FromLocalCAR": "string value",
"Total": "0",

View File

@ -544,13 +544,14 @@ CATEGORY:
RETRIEVAL
OPTIONS:
--from value address to send transactions from
--car export to a car file instead of a regular file (default: false)
--miner value miner address for retrieval, if not present it'll use local discovery
--maxPrice value maximum price the client is willing to consider (default: 0.01 FIL)
--pieceCid value require data to be retrieved from a specific Piece CID
--allow-local (default: false)
--help, -h show help (default: false)
--from value address to send transactions from
--car export to a car file instead of a regular file (default: false)
--miner value miner address for retrieval, if not present it'll use local discovery
--datamodel-path-selector value a rudimentary (DM-level-only) text-path selector, allowing for sub-selection within a deal
--maxPrice value maximum price the client is willing to consider (default: 0.01 FIL)
--pieceCid value require data to be retrieved from a specific Piece CID
--allow-local (default: false)
--help, -h show help (default: false)
```

2
go.mod
View File

@ -99,7 +99,9 @@ require (
github.com/ipfs/interface-go-ipfs-core v0.2.3
github.com/ipld/go-car v0.3.1-0.20210601190600-f512dac51e8e
github.com/ipld/go-car/v2 v2.0.3-0.20210811121346-c514a30114d7
github.com/ipld/go-codec-dagpb v1.3.0
github.com/ipld/go-ipld-prime v0.12.0
github.com/ipld/go-ipld-selector-text-lite v0.0.0-20210817134355-4c190a2bb825
github.com/kelseyhightower/envconfig v1.4.0
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-eventbus v0.2.1

3
go.sum
View File

@ -784,6 +784,7 @@ github.com/ipld/go-ipld-prime v0.0.2-0.20200428162820-8b59dc292b8e/go.mod h1:uVI
github.com/ipld/go-ipld-prime v0.5.1-0.20200828233916-988837377a7f/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM=
github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM=
github.com/ipld/go-ipld-prime v0.9.0/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8=
github.com/ipld/go-ipld-prime v0.10.0/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8=
github.com/ipld/go-ipld-prime v0.11.0/go.mod h1:+WIAkokurHmZ/KwzDOMUuoeJgaRQktHtEaLglS3ZeV8=
github.com/ipld/go-ipld-prime v0.12.0 h1:JapyKWTsJgmhrPI7hfx4V798c/RClr85sXfBZnH1VIw=
github.com/ipld/go-ipld-prime v0.12.0/go.mod h1:hy8b93WleDMRKumOJnTIrr0MbbFbx9GD6Kzxa53Xppc=
@ -791,6 +792,8 @@ github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5/go.mod h1
github.com/ipld/go-ipld-prime-proto v0.0.0-20200428191222-c1ffdadc01e1/go.mod h1:OAV6xBmuTLsPZ+epzKkPB1e25FHk/vCtyatkdHcArLs=
github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6/go.mod h1:3pHYooM9Ea65jewRwrb2u5uHZCNkNTe9ABsVB+SrkH0=
github.com/ipld/go-ipld-prime-proto v0.1.0/go.mod h1:11zp8f3sHVgIqtb/c9Kr5ZGqpnCLF1IVTNOez9TopzE=
github.com/ipld/go-ipld-selector-text-lite v0.0.0-20210817134355-4c190a2bb825 h1:sGlmVUuWEhuJpVsErFqCHWy9XTsIy511hZWRWI/Lc4I=
github.com/ipld/go-ipld-selector-text-lite v0.0.0-20210817134355-4c190a2bb825/go.mod h1:U2CQmFb+uWzfIEF3I1arrDa5rwtj00PrpiwwCO+k1RM=
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4=
github.com/jackpal/gateway v1.0.4/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=

View File

@ -0,0 +1,185 @@
package itests
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"testing"
"time"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/itests/kit"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/stretchr/testify/require"
)
// use the mainnet carfile as text fixture: it will always be here
// https://dweb.link/ipfs/bafy2bzacecnamqgqmifpluoeldx7zzglxcljo6oja4vrmtj7432rphldpdmm2/8/1/8/1/0/1/0
var (
sourceCar = "../build/genesis/mainnet.car"
carRoot, _ = cid.Parse("bafy2bzacecnamqgqmifpluoeldx7zzglxcljo6oja4vrmtj7432rphldpdmm2")
carCommp, _ = cid.Parse("baga6ea4seaqmrivgzei3fmx5qxtppwankmtou6zvigyjaveu3z2zzwhysgzuina")
carPieceSize = abi.PaddedPieceSize(2097152)
textSelector = textselector.Expression("8/1/8/1/0/1/0")
expectedResult = "fil/1/storagepower"
)
func TestPartialRetrieval(t *testing.T) {
ctx := context.Background()
policy.SetPreCommitChallengeDelay(2)
kit.EnableLargeSectors(t)
kit.QuietMiningLogs()
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.MockProofs(), kit.SectorSize(512<<20))
dh := kit.NewDealHarness(t, client, miner, miner)
ens.InterconnectAll().BeginMining(50 * time.Millisecond)
_, err := client.ClientImport(ctx, api.FileRef{Path: sourceCar, IsCAR: true})
require.NoError(t, err)
caddr, err := client.WalletDefaultAddress(ctx)
require.NoError(t, err)
// first test retrieval from local car, then do an actual deal
for _, fullCycle := range []bool{false, true} {
var retOrder api.RetrievalOrder
if !fullCycle {
retOrder.FromLocalCAR = sourceCar
retOrder.Root = carRoot
} else {
dp := dh.DefaultStartDealParams()
dp.Data = &storagemarket.DataRef{
// FIXME: figure out how to do this with an online partial transfer
TransferType: storagemarket.TTManual,
Root: carRoot,
PieceCid: &carCommp,
PieceSize: carPieceSize.Unpadded(),
}
proposalCid := dh.StartDeal(ctx, dp)
// Wait for the deal to reach StorageDealCheckForAcceptance on the client
cd, err := client.ClientGetDealInfo(ctx, *proposalCid)
require.NoError(t, err)
require.Eventually(t, func() bool {
cd, _ := client.ClientGetDealInfo(ctx, *proposalCid)
return cd.State == storagemarket.StorageDealCheckForAcceptance
}, 30*time.Second, 1*time.Second, "actual deal status is %s", storagemarket.DealStates[cd.State])
err = miner.DealsImportData(ctx, *proposalCid, sourceCar)
require.NoError(t, err)
// Wait for the deal to be published, we should be able to start retrieval right away
dh.WaitDealPublished(ctx, proposalCid)
offers, err := client.ClientFindData(ctx, carRoot, nil)
require.NoError(t, err)
require.NotEmpty(t, offers, "no offers")
retOrder = offers[0].Order(caddr)
}
retOrder.DatamodelPathSelector = &textSelector
// test retrieval of either data or constructing a partial selective-car
for _, retrieveAsCar := range []bool{false, true} {
outFile, err := ioutil.TempFile(t.TempDir(), "ret-file")
require.NoError(t, err)
defer outFile.Close() //nolint:errcheck
require.NoError(t, testGenesisRetrieval(
ctx,
client,
retOrder,
&api.FileRef{
Path: outFile.Name(),
IsCAR: retrieveAsCar,
},
outFile,
))
// UGH if I do not sleep here, I get things like:
/*
retrieval failed: Retrieve failed: there is an active retrieval deal with peer 12D3KooWK9fB9a3HZ4PQLVmEQ6pweMMn5CAyKtumB71CPTnuBDi6 for payload CID bafy2bzacecnamqgqmifpluoeldx7zzglxcljo6oja4vrmtj7432rphldpdmm2 (retrieval deal ID 1631259332180384709, state DealStatusFinalizingBlockstore) - existing deal must be cancelled before starting a new retrieval deal:
github.com/filecoin-project/lotus/node/impl/client.(*API).ClientRetrieve
/home/circleci/project/node/impl/client/client.go:774
*/
time.Sleep(time.Second)
}
}
}
func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrder api.RetrievalOrder, retRef *api.FileRef, outFile *os.File) error {
if retOrder.Total.Nil() {
retOrder.Total = big.Zero()
}
if retOrder.UnsealPrice.Nil() {
retOrder.UnsealPrice = big.Zero()
}
err := client.ClientRetrieve(ctx, retOrder, retRef)
if err != nil {
return err
}
var data []byte
if !retRef.IsCAR {
data, err = io.ReadAll(outFile)
if err != nil {
return err
}
} else {
cr, err := car.NewCarReader(outFile)
if err != nil {
return err
}
if len(cr.Header.Roots) != 1 {
return fmt.Errorf("expected a single root in result car, got %d", len(cr.Header.Roots))
} else if cr.Header.Roots[0].String() != carRoot.String() {
return fmt.Errorf("expected root cid '%s', got '%s'", carRoot.String(), cr.Header.Roots[0].String())
}
blks := make([]blocks.Block, 0)
for {
b, err := cr.Next()
if err == io.EOF {
break
} else if err != nil {
return err
}
blks = append(blks, b)
}
if len(blks) != 3 {
return fmt.Errorf("expected a car file with 3 blocks, got one with %d instead", len(blks))
}
data = blks[2].RawData()
}
if string(data) != expectedResult {
return fmt.Errorf("retrieved data mismatch: expected '%s' got '%s'", expectedResult, data)
}
return nil
}

View File

@ -0,0 +1,91 @@
package utils
import (
"bytes"
"context"
"fmt"
"io"
// must be imported to init() raw-codec support
_ "github.com/ipld/go-ipld-prime/codec/raw"
"github.com/ipfs/go-cid"
mdagipld "github.com/ipfs/go-ipld-format"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
)
func TraverseDag(
ctx context.Context,
ds mdagipld.DAGService,
startFrom cid.Cid,
optionalSelector ipld.Node,
visitCallback traversal.AdvVisitFn,
) error {
var parsedSelector selector.Selector
if optionalSelector == nil {
parsedSelector = selectorparse.CommonSelector_MatchAllRecursively
} else {
var err error
parsedSelector, err = selector.ParseSelector(optionalSelector)
if err != nil {
return err
}
}
// not sure what this is for TBH: we also provide ctx in &traversal.Config{}
linkContext := ipld.LinkContext{Ctx: ctx}
// this is what allows us to understand dagpb
nodePrototypeChooser := dagpb.AddSupportToChooser(
func(ipld.Link, ipld.LinkContext) (ipld.NodePrototype, error) {
return basicnode.Prototype.Any, nil
},
)
// this is how we implement GETs
linkSystem := cidlink.DefaultLinkSystem()
linkSystem.StorageReadOpener = func(lctx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
cl, isCid := lnk.(cidlink.Link)
if !isCid {
return nil, fmt.Errorf("unexpected link type %#v", lnk)
}
node, err := ds.Get(lctx.Ctx, cl.Cid)
if err != nil {
return nil, err
}
return bytes.NewBuffer(node.RawData()), nil
}
// this is how we pull the start node out of the DS
startLink := cidlink.Link{Cid: startFrom}
startNodePrototype, err := nodePrototypeChooser(startLink, linkContext)
if err != nil {
return err
}
startNode, err := linkSystem.Load(
linkContext,
startLink,
startNodePrototype,
)
if err != nil {
return err
}
// this is the actual execution, invoking the supplied callback
return traversal.Progress{
Cfg: &traversal.Config{
Ctx: ctx,
LinkSystem: linkSystem,
LinkTargetNodePrototypeChooser: nodePrototypeChooser,
},
}.WalkAdv(startNode, parsedSelector, visitCallback)
}

View File

@ -24,10 +24,15 @@ import (
"github.com/ipfs/go-cid"
offline "github.com/ipfs/go-ipfs-exchange-offline"
files "github.com/ipfs/go-ipfs-files"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-merkledag"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multibase"
@ -68,6 +73,8 @@ import (
"github.com/filecoin-project/lotus/node/repo"
)
var log = logging.Logger("client")
var DefaultHashFunction = uint64(mh.BLAKE2B_MIN + 31)
// 8 days ~= SealDuration + PreCommit + MaxProveCommitDuration + 8 hour buffer
@ -500,7 +507,7 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (res *api.Impor
}
if ref.IsCAR {
// user gave us a CAR fil, use it as-is
// user gave us a CAR file, use it as-is
// validate that it's either a carv1 or carv2, and has one root.
f, err := os.Open(ref.Path)
if err != nil {
@ -835,6 +842,29 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
}
sel := shared.AllSelector()
if order.DatamodelPathSelector != nil {
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
selspec, err := textselector.SelectorSpecFromPath(
*order.DatamodelPathSelector,
// URGH - this is a direct copy from https://github.com/filecoin-project/go-fil-markets/blob/v1.12.0/shared/selectors.go#L10-L16
// Unable to use it because we need the SelectorSpec, and markets exposes just a reified node
ssb.ExploreRecursive(
selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge()),
),
)
if err != nil {
finish(xerrors.Errorf("failed to parse text-selector '%s': %w", *order.DatamodelPathSelector, err))
return
}
sel = selspec.Node()
log.Infof("partial retrieval of datamodel-path-selector %s/*", *order.DatamodelPathSelector)
}
// summary:
// 1. if we're retrieving from an import, FromLocalCAR will be set.
@ -961,8 +991,8 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
// Are we outputting a CAR?
if ref.IsCAR {
// not IPFS - just extract the CARv1 from the CARv2 we stored the retrieval in
if !retrieveIntoIPFS {
// not IPFS and we do full selection - just extract the CARv1 from the CARv2 we stored the retrieval in
if !retrieveIntoIPFS && order.DatamodelPathSelector == nil {
finish(carv2.ExtractV1File(carPath, ref.Path))
return
}
@ -995,6 +1025,40 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
ds := merkledag.NewDAGService(blockservice.New(retrievalBs, offline.Exchange(retrievalBs)))
root := order.Root
// if we used a selector - need to find the sub-root the user actually wanted to retrieve
if order.DatamodelPathSelector != nil {
var subRootFound bool
// no err check - we just compiled this before starting, but now we do not wrap a `*`
selspec, _ := textselector.SelectorSpecFromPath(*order.DatamodelPathSelector, nil) //nolint:errcheck
if err := utils.TraverseDag(
ctx,
ds,
root,
selspec.Node(),
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
if r == traversal.VisitReason_SelectionMatch {
cidLnk, castOK := p.LastBlock.Link.(cidlink.Link)
if !castOK {
return xerrors.Errorf("cidlink cast unexpectedly failed on '%s'", p.LastBlock.Link.String())
}
root = cidLnk.Cid
subRootFound = true
}
return nil
},
); err != nil {
finish(xerrors.Errorf("Finding partial retrieval sub-root: %w", err))
return
}
if !subRootFound {
finish(xerrors.Errorf("Path selection '%s' does not match a node within %s", order.DatamodelPathSelector, root))
return
}
}
nd, err := ds.Get(ctx, root)
if err != nil {
finish(xerrors.Errorf("ClientRetrieve: %w", err))