retrieval: Support multi-root export

This commit is contained in:
Łukasz Magiera 2021-11-11 16:17:39 +01:00
parent b0c043cc2f
commit b26906963b
6 changed files with 182 additions and 100 deletions

View File

@ -7,7 +7,6 @@ import (
"time" "time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
@ -937,7 +936,7 @@ type RetrievalOrder struct {
// TODO: make this less unixfs specific // TODO: make this less unixfs specific
Root cid.Cid Root cid.Cid
Piece *cid.Cid Piece *cid.Cid
DatamodelPathSelector *textselector.Expression DataSelector *Selector
Size uint64 Size uint64
Total types.BigInt Total types.BigInt

View File

@ -5,12 +5,10 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/lotus/chain/types"
textselector "github.com/ipld/go-ipld-selector-text-lite"
datatransfer "github.com/filecoin-project/go-data-transfer" datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
@ -204,9 +202,35 @@ type RestrievalRes struct {
DealID retrievalmarket.DealID DealID retrievalmarket.DealID
} }
// Selector specifies ipld selector string
// - if the string starts with '{', it's interpreted as json selector string
// see https://ipld.io/specs/selectors/ and https://ipld.io/specs/selectors/fixtures/selector-fixtures-1/
// - otherwise the string is interpreted as ipld-selector-text-lite (simple ipld path)
// see https://github.com/ipld/go-ipld-selector-text-lite
type Selector string
type DagSpec struct {
// RootSelector specifies root node
// - when using textselector, the path specifies the root node
// - if nil then RootSelector is inferred from DataSelector
// - must match a single node
RootSelector *Selector
// DataSelector matches data to be retrieved
// - when using textselector, the path specifies subtree
DataSelector *Selector
}
type ExportRef struct { type ExportRef struct {
Root cid.Cid Root cid.Cid
DatamodelPathSelector *textselector.Expression
// DAGs array specifies a list of DAGs to export
// - If exporting into a car file, defines car roots
// - If exporting into unixfs files, only one DAG is supported, DataSelector is ignored
// - When not specified defaults to a single DAG:
// - Root - the root node: `{".": {}}`
// - Data - the entire DAG: `{"R":{"l":{"none":{}},":>":{"a":{">":{"@":{}}}}}}`
DAGs []DagSpec
FromLocalCAR string // if specified, get data from a local CARv2 file. FromLocalCAR string // if specified, get data from a local CARv2 file.
DealID retrievalmarket.DealID DealID retrievalmarket.DealID

View File

@ -320,12 +320,20 @@ func (w *WrapperV1Full) clientRetrieve(ctx context.Context, order RetrievalOrder
return return
} }
finish(w.ClientExport(ctx, api.ExportRef{ eref := api.ExportRef{
Root: order.Root, Root: order.Root,
DatamodelPathSelector: order.DatamodelPathSelector,
FromLocalCAR: order.FromLocalCAR, FromLocalCAR: order.FromLocalCAR,
DealID: dealID, DealID: dealID,
}, *ref)) }
if order.DatamodelPathSelector != nil {
s := api.Selector(*order.DatamodelPathSelector)
eref.DAGs = append(eref.DAGs, api.DagSpec{
DataSelector: &s,
})
}
finish(w.ClientExport(ctx, eref, *ref))
} }
var _ FullNode = &WrapperV1Full{} var _ FullNode = &WrapperV1Full{}

View File

@ -26,7 +26,6 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer" datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil/cidenc" "github.com/ipfs/go-cidutil/cidenc"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multibase" "github.com/multiformats/go-multibase"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
@ -1202,10 +1201,16 @@ var clientRetrieveCmd = &cli.Command{
continue continue
} }
} }
event := "New"
if evt.Event != nil {
event = retrievalmarket.ClientEvents[*evt.Event]
}
afmt.Printf("> Recv: %s, Paid %s, %s (%s)\n", afmt.Printf("> Recv: %s, Paid %s, %s (%s)\n",
types.SizeStr(types.NewInt(evt.BytesReceived)), types.SizeStr(types.NewInt(evt.BytesReceived)),
types.FIL(evt.TotalPaid), types.FIL(evt.TotalPaid),
retrievalmarket.ClientEvents[*evt.Event], event,
retrievalmarket.DealStatuses[evt.Status], retrievalmarket.DealStatuses[evt.Status],
) )
switch evt.Status { switch evt.Status {
@ -1226,8 +1231,8 @@ var clientRetrieveCmd = &cli.Command{
} }
} }
if sel := textselector.Expression(cctx.String("datamodel-path-selector")); sel != "" { if sel := api.Selector(cctx.String("datamodel-path-selector")); sel != "" {
eref.DatamodelPathSelector = &sel eref.DAGs = append(eref.DAGs, api.DagSpec{DataSelector: &sel})
} }
err = fapi.ClientExport(ctx, *eref, lapi.FileRef{ err = fapi.ClientExport(ctx, *eref, lapi.FileRef{

View File

@ -20,7 +20,6 @@ import (
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipld/go-car" "github.com/ipld/go-car"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -31,9 +30,10 @@ var (
carRoot, _ = cid.Parse("bafy2bzacecnamqgqmifpluoeldx7zzglxcljo6oja4vrmtj7432rphldpdmm2") carRoot, _ = cid.Parse("bafy2bzacecnamqgqmifpluoeldx7zzglxcljo6oja4vrmtj7432rphldpdmm2")
carCommp, _ = cid.Parse("baga6ea4seaqmrivgzei3fmx5qxtppwankmtou6zvigyjaveu3z2zzwhysgzuina") carCommp, _ = cid.Parse("baga6ea4seaqmrivgzei3fmx5qxtppwankmtou6zvigyjaveu3z2zzwhysgzuina")
carPieceSize = abi.PaddedPieceSize(2097152) carPieceSize = abi.PaddedPieceSize(2097152)
textSelector = textselector.Expression("8/1/8/1/0/1/0") textSelector = api.Selector("8/1/8/1/0/1/0")
textSelectorNonLink = textselector.Expression("8/1/8/1/0/1") storPowCid, _ = cid.Parse("bafkqaetgnfwc6mjpon2g64tbm5sxa33xmvza")
textSelectorNonexistent = textselector.Expression("42") textSelectorNonLink = api.Selector("8/1/8/1/0/1")
textSelectorNonexistent = api.Selector("42")
expectedResult = "fil/1/storagepower" expectedResult = "fil/1/storagepower"
) )
@ -94,8 +94,10 @@ func TestPartialRetrieval(t *testing.T) {
retOrder = offers[0].Order(caddr) retOrder = offers[0].Order(caddr)
} }
retOrder.DatamodelPathSelector = &textSelector retOrder.DataSelector = &textSelector
eref.DatamodelPathSelector = &textSelector eref.DAGs = append(eref.DAGs, api.DagSpec{
DataSelector: &textSelector,
})
eref.Root = carRoot eref.Root = carRoot
// test retrieval of either data or constructing a partial selective-car // test retrieval of either data or constructing a partial selective-car
@ -113,6 +115,7 @@ func TestPartialRetrieval(t *testing.T) {
Path: outFile.Name(), Path: outFile.Name(),
IsCAR: retrieveAsCar, IsCAR: retrieveAsCar,
}, },
storPowCid,
outFile, outFile,
)) ))
@ -134,17 +137,18 @@ func TestPartialRetrieval(t *testing.T) {
client, client,
api.RetrievalOrder{ api.RetrievalOrder{
Root: carRoot, Root: carRoot,
DatamodelPathSelector: &textSelectorNonexistent, DataSelector: &textSelectorNonexistent,
}, },
api.ExportRef{ api.ExportRef{
Root: carRoot, Root: carRoot,
FromLocalCAR: sourceCar, FromLocalCAR: sourceCar,
DatamodelPathSelector: &textSelectorNonexistent, DAGs: []api.DagSpec{{DataSelector: &textSelectorNonexistent}},
}, },
&api.FileRef{}, &api.FileRef{},
storPowCid,
nil, nil,
), ),
fmt.Sprintf("path selection does not match a node within %s", carRoot), fmt.Sprintf("parsing dag spec: path selection does not match a node within %s", carRoot),
) )
// ensure non-boundary retrievals fail // ensure non-boundary retrievals fail
@ -155,21 +159,22 @@ func TestPartialRetrieval(t *testing.T) {
client, client,
api.RetrievalOrder{ api.RetrievalOrder{
Root: carRoot, Root: carRoot,
DatamodelPathSelector: &textSelectorNonLink, DataSelector: &textSelectorNonLink,
}, },
api.ExportRef{ api.ExportRef{
Root: carRoot, Root: carRoot,
FromLocalCAR: sourceCar, FromLocalCAR: sourceCar,
DatamodelPathSelector: &textSelectorNonLink, DAGs: []api.DagSpec{{DataSelector: &textSelectorNonLink}},
}, },
&api.FileRef{}, &api.FileRef{},
storPowCid,
nil, nil,
), ),
fmt.Sprintf("error while locating partial retrieval sub-root: unsupported selection path '%s' does not correspond to a block boundary (a.k.a. CID link)", textSelectorNonLink), fmt.Sprintf("parsing dag spec: error while locating partial retrieval sub-root: unsupported selection path '%s' does not correspond to a block boundary (a.k.a. CID link)", textSelectorNonLink),
) )
} }
func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrder api.RetrievalOrder, eref api.ExportRef, retRef *api.FileRef, outFile *os.File) error { func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrder api.RetrievalOrder, eref api.ExportRef, retRef *api.FileRef, expRootCid cid.Cid, outFile *os.File) error {
if retOrder.Total.Nil() { if retOrder.Total.Nil() {
retOrder.Total = big.Zero() retOrder.Total = big.Zero()
@ -212,7 +217,7 @@ func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrde
if len(cr.Header.Roots) != 1 { if len(cr.Header.Roots) != 1 {
return fmt.Errorf("expected a single root in result car, got %d", len(cr.Header.Roots)) return fmt.Errorf("expected a single root in result car, got %d", len(cr.Header.Roots))
} else if cr.Header.Roots[0].String() != carRoot.String() { } else if cr.Header.Roots[0].String() != expRootCid.String() {
return fmt.Errorf("expected root cid '%s', got '%s'", carRoot.String(), cr.Header.Roots[0].String()) return fmt.Errorf("expected root cid '%s', got '%s'", carRoot.String(), cr.Header.Roots[0].String())
} }
@ -228,11 +233,11 @@ func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrde
blks = append(blks, b) blks = append(blks, b)
} }
if len(blks) != 3 { if len(blks) != 1 {
return fmt.Errorf("expected a car file with 3 blocks, got one with %d instead", len(blks)) return fmt.Errorf("expected a car file with 1 blocks, got one with %d instead", len(blks))
} }
data = blks[2].RawData() data = blks[0].RawData()
} }
if string(data) != expectedResult { if string(data) != expectedResult {

View File

@ -12,6 +12,7 @@ import (
"time" "time"
bstore "github.com/ipfs/go-ipfs-blockstore" bstore "github.com/ipfs/go-ipfs-blockstore"
format "github.com/ipfs/go-ipld-format"
unixfile "github.com/ipfs/go-unixfs/file" unixfile "github.com/ipfs/go-unixfs/file"
"github.com/ipld/go-car" "github.com/ipld/go-car"
carv2 "github.com/ipld/go-car/v2" carv2 "github.com/ipld/go-car/v2"
@ -761,7 +762,7 @@ func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID rm.DealID) e
} }
} }
func getRetrievalSelector(dps *textselector.Expression) (datamodel.Node, error) { func getDataSelector(dps *api.Selector) (datamodel.Node, error) {
sel := selectorparse.CommonSelector_ExploreAllRecursively sel := selectorparse.CommonSelector_ExploreAllRecursively
if dps != nil { if dps != nil {
@ -775,7 +776,7 @@ func getRetrievalSelector(dps *textselector.Expression) (datamodel.Node, error)
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
selspec, err := textselector.SelectorSpecFromPath( selspec, err := textselector.SelectorSpecFromPath(
*dps, textselector.Expression(*dps),
// URGH - this is a direct copy from https://github.com/filecoin-project/go-fil-markets/blob/v1.12.0/shared/selectors.go#L10-L16 // URGH - this is a direct copy from https://github.com/filecoin-project/go-fil-markets/blob/v1.12.0/shared/selectors.go#L10-L16
// Unable to use it because we need the SelectorSpec, and markets exposes just a reified node // Unable to use it because we need the SelectorSpec, and markets exposes just a reified node
@ -797,7 +798,7 @@ func getRetrievalSelector(dps *textselector.Expression) (datamodel.Node, error)
} }
func (a *API) ClientRetrieve(ctx context.Context, params api.RetrievalOrder) (*api.RestrievalRes, error) { func (a *API) ClientRetrieve(ctx context.Context, params api.RetrievalOrder) (*api.RestrievalRes, error) {
sel, err := getRetrievalSelector(params.DatamodelPathSelector) sel, err := getDataSelector(params.DataSelector)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -914,11 +915,6 @@ func (a *API) ClientExport(ctx context.Context, exportRef api.ExportRef, ref api
carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievaladapter.CARBlockstoreAccessor) carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievaladapter.CARBlockstoreAccessor)
carPath := exportRef.FromLocalCAR carPath := exportRef.FromLocalCAR
sel, err := getRetrievalSelector(exportRef.DatamodelPathSelector)
if err != nil {
return err
}
if carPath == "" { if carPath == "" {
if !retrieveIntoIPFS && !retrieveIntoCAR { if !retrieveIntoIPFS && !retrieveIntoCAR {
return xerrors.Errorf("unsupported retrieval blockstore accessor") return xerrors.Errorf("unsupported retrieval blockstore accessor")
@ -941,33 +937,48 @@ func (a *API) ClientExport(ctx context.Context, exportRef api.ExportRef, ref api
retrievalBs = cbs retrievalBs = cbs
} }
dserv := merkledag.NewDAGService(blockservice.New(retrievalBs, offline.Exchange(retrievalBs)))
roots, err := parseDagSpec(ctx, exportRef.Root, exportRef.DAGs, dserv)
if err != nil {
return xerrors.Errorf("parsing dag spec: %w", err)
}
// Are we outputting a CAR? // Are we outputting a CAR?
if ref.IsCAR { if ref.IsCAR {
// not IPFS and we do full selection - just extract the CARv1 from the CARv2 we stored the retrieval in // not IPFS and we do full selection - just extract the CARv1 from the CARv2 we stored the retrieval in
if !retrieveIntoIPFS && exportRef.DatamodelPathSelector == nil { if !retrieveIntoIPFS && len(exportRef.DAGs) == 0 {
return carv2.ExtractV1File(carPath, ref.Path) return carv2.ExtractV1File(carPath, ref.Path)
} }
return a.outputCAR(ctx, exportRef.Root, sel, retrievalBs, ref) return a.outputCAR(ctx, roots, retrievalBs, ref)
} }
return a.outputUnixFS(ctx, exportRef.Root, exportRef.DatamodelPathSelector, retrievalBs, ref) if len(roots) != 1 {
return xerrors.Errorf("unixfs retrieval requires one root node, got %d", len(roots))
}
return a.outputUnixFS(ctx, roots[0].root, dserv, ref)
} }
func (a *API) outputCAR(ctx context.Context, root cid.Cid, sel datamodel.Node, bs bstore.Blockstore, ref api.FileRef) error { func (a *API) outputCAR(ctx context.Context, dags []dagSpec, bs bstore.Blockstore, ref api.FileRef) error {
// generating a CARv1 from the configured blockstore // generating a CARv1 from the configured blockstore
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644) f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil { if err != nil {
return err return err
} }
carDags := make([]car.Dag, len(dags))
for i, dag := range dags {
carDags[i] = car.Dag{
Root: dag.root,
Selector: dag.selector,
}
}
err = car.NewSelectiveCar( err = car.NewSelectiveCar(
ctx, ctx,
bs, bs,
[]car.Dag{{ carDags,
Root: root,
Selector: sel,
}},
car.MaxTraversalLinks(config.MaxTraversalLinks), car.MaxTraversalLinks(config.MaxTraversalLinks),
).Write(f) ).Write(f)
if err != nil { if err != nil {
@ -977,30 +988,46 @@ func (a *API) outputCAR(ctx context.Context, root cid.Cid, sel datamodel.Node, b
return f.Close() return f.Close()
} }
func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, sels *textselector.Expression, bs bstore.Blockstore, ref api.FileRef) error { type dagSpec struct {
ds := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) root cid.Cid
selector ipld.Node
}
// if we used a selector - need to find the sub-root the user actually wanted to retrieve func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds format.DAGService) ([]dagSpec, error) {
if sels != nil { if len(dsp) == 0 {
var subRootFound bool return []dagSpec{
sel := selectorparse.CommonSelector_ExploreAllRecursively {
root: root,
selector: nil,
},
}, nil
}
if strings.HasPrefix(string(*sels), "{") { out := make([]dagSpec, len(dsp))
for i, spec := range dsp {
if spec.RootSelector == nil {
spec.RootSelector = spec.DataSelector
}
if spec.RootSelector != nil {
var rsn ipld.Node
if strings.HasPrefix(string(*spec.RootSelector), "{") {
var err error var err error
sel, err = selectorparse.ParseJSONSelector(string(*sels)) rsn, err = selectorparse.ParseJSONSelector(string(*spec.RootSelector))
if err != nil { if err != nil {
return xerrors.Errorf("failed to parse json-selector '%s': %w", *sels, err) return nil, xerrors.Errorf("failed to parse json-selector '%s': %w", *spec.RootSelector, err)
} }
} else { } else {
selspec, _ := textselector.SelectorSpecFromPath(*sels, nil) //nolint:errcheck selspec, _ := textselector.SelectorSpecFromPath(textselector.Expression(*spec.RootSelector), nil) //nolint:errcheck
sel = selspec.Node() rsn = selspec.Node()
} }
if err := utils.TraverseDag( if err := utils.TraverseDag(
ctx, ctx,
ds, ds,
root, root,
sel, rsn,
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error { func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
if r == traversal.VisitReason_SelectionMatch { if r == traversal.VisitReason_SelectionMatch {
@ -1017,20 +1044,32 @@ func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, sels *textselector
return xerrors.Errorf("cidlink cast unexpectedly failed on '%s'", p.LastBlock.Link) return xerrors.Errorf("cidlink cast unexpectedly failed on '%s'", p.LastBlock.Link)
} }
root = cidLnk.Cid out[i].root = cidLnk.Cid
subRootFound = true
} }
return nil return nil
}, },
); err != nil { ); err != nil {
return xerrors.Errorf("error while locating partial retrieval sub-root: %w", err) return nil, xerrors.Errorf("error while locating partial retrieval sub-root: %w", err)
} }
if !subRootFound { if out[i].root == cid.Undef {
return xerrors.Errorf("path selection does not match a node within %s", root) return nil, xerrors.Errorf("path selection does not match a node within %s", root)
} }
} }
if spec.DataSelector != nil {
var err error
out[i].selector, err = getDataSelector(spec.DataSelector)
if err != nil {
return nil, err
}
}
}
return out, nil
}
func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, ds format.DAGService, ref api.FileRef) error {
nd, err := ds.Get(ctx, root) nd, err := ds.Get(ctx, root)
if err != nil { if err != nil {
return xerrors.Errorf("ClientRetrieve: %w", err) return xerrors.Errorf("ClientRetrieve: %w", err)
@ -1072,8 +1111,10 @@ func (a *API) ClientListRetrievals(ctx context.Context) ([]api.RetrievalInfo, er
func (a *API) ClientGetRetrievalUpdates(ctx context.Context) (<-chan api.RetrievalInfo, error) { func (a *API) ClientGetRetrievalUpdates(ctx context.Context) (<-chan api.RetrievalInfo, error) {
updates := make(chan api.RetrievalInfo) updates := make(chan api.RetrievalInfo)
unsub := a.Retrieval.SubscribeToEvents(func(_ rm.ClientEvent, deal rm.ClientDealState) { unsub := a.Retrieval.SubscribeToEvents(func(evt rm.ClientEvent, deal rm.ClientDealState) {
updates <- a.newRetrievalInfo(ctx, deal) update := a.newRetrievalInfo(ctx, deal)
update.Event = &evt
updates <- update
}) })
go func() { go func() {