client: cleanup clientRetrieve
This commit is contained in:
parent
08e297a217
commit
60ea33b1c7
@ -835,31 +835,21 @@ func consumeAllEvents(ctx context.Context, dealID rm.DealID, subscribeEvents cha
|
||||
}
|
||||
}
|
||||
|
||||
func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef, events chan marketevents.RetrievalEvent) {
|
||||
defer close(events)
|
||||
|
||||
finish := func(e error) {
|
||||
if e != nil {
|
||||
events <- marketevents.RetrievalEvent{Err: e.Error(), FundsSpent: big.Zero()}
|
||||
}
|
||||
}
|
||||
|
||||
func getRetrievalSelector(dps *textselector.Expression) (datamodel.Node, error) {
|
||||
sel := selectorparse.CommonSelector_ExploreAllRecursively
|
||||
if order.DatamodelPathSelector != nil {
|
||||
if dps != nil {
|
||||
|
||||
if strings.HasPrefix(string(*order.DatamodelPathSelector), "{") {
|
||||
if strings.HasPrefix(string(*dps), "{") {
|
||||
var err error
|
||||
sel, err = selectorparse.ParseJSONSelector(string(*order.DatamodelPathSelector))
|
||||
sel, err = selectorparse.ParseJSONSelector(string(*dps))
|
||||
if err != nil {
|
||||
finish(xerrors.Errorf("failed to parse json-selector '%s': %w", *order.DatamodelPathSelector, err))
|
||||
return
|
||||
return nil, xerrors.Errorf("failed to parse json-selector '%s': %w", *dps, err)
|
||||
}
|
||||
} else {
|
||||
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
|
||||
|
||||
selspec, err := textselector.SelectorSpecFromPath(
|
||||
|
||||
*order.DatamodelPathSelector,
|
||||
*dps,
|
||||
|
||||
// URGH - this is a direct copy from https://github.com/filecoin-project/go-fil-markets/blob/v1.12.0/shared/selectors.go#L10-L16
|
||||
// Unable to use it because we need the SelectorSpec, and markets exposes just a reified node
|
||||
@ -869,15 +859,179 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
finish(xerrors.Errorf("failed to parse text-selector '%s': %w", *order.DatamodelPathSelector, err))
|
||||
return
|
||||
return nil, xerrors.Errorf("failed to parse text-selector '%s': %w", *dps, err)
|
||||
}
|
||||
|
||||
sel = selspec.Node()
|
||||
log.Infof("partial retrieval of datamodel-path-selector %s/*", *order.DatamodelPathSelector)
|
||||
log.Infof("partial retrieval of datamodel-path-selector %s/*", *dps)
|
||||
}
|
||||
}
|
||||
|
||||
return sel, nil
|
||||
}
|
||||
|
||||
func (a *API) doRetrieval(ctx context.Context, order api.RetrievalOrder, sel datamodel.Node, events chan marketevents.RetrievalEvent) (rm.DealID, error) {
|
||||
if order.MinerPeer == nil || order.MinerPeer.ID == "" {
|
||||
mi, err := a.StateMinerInfo(ctx, order.Miner, types.EmptyTSK)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
order.MinerPeer = &rm.RetrievalPeer{
|
||||
ID: *mi.PeerId,
|
||||
Address: order.Miner,
|
||||
}
|
||||
}
|
||||
|
||||
if order.Total.Int == nil {
|
||||
return 0, xerrors.Errorf("cannot make retrieval deal for null total")
|
||||
}
|
||||
|
||||
if order.Size == 0 {
|
||||
return 0, xerrors.Errorf("cannot make retrieval deal for zero bytes")
|
||||
}
|
||||
|
||||
ppb := types.BigDiv(order.Total, types.NewInt(order.Size))
|
||||
|
||||
params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, sel, order.Piece, order.UnsealPrice)
|
||||
if err != nil {
|
||||
return 0, xerrors.Errorf("Error in retrieval params: %s", err)
|
||||
}
|
||||
|
||||
// Subscribe to events before retrieving to avoid losing events.
|
||||
subscribeEvents := make(chan retrievalSubscribeEvent, 1)
|
||||
subscribeCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
|
||||
// We'll check the deal IDs inside consumeAllEvents.
|
||||
if state.PayloadCID.Equals(order.Root) {
|
||||
select {
|
||||
case <-subscribeCtx.Done():
|
||||
case subscribeEvents <- retrievalSubscribeEvent{event, state}:
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
id := a.Retrieval.NextID()
|
||||
id, err = a.Retrieval.Retrieve(
|
||||
ctx,
|
||||
id,
|
||||
order.Root,
|
||||
params,
|
||||
order.Total,
|
||||
*order.MinerPeer,
|
||||
order.Client,
|
||||
order.Miner,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
unsubscribe()
|
||||
return 0, xerrors.Errorf("Retrieve failed: %w", err)
|
||||
}
|
||||
|
||||
err = consumeAllEvents(ctx, id, subscribeEvents, events)
|
||||
|
||||
unsubscribe()
|
||||
if err != nil {
|
||||
return 0, xerrors.Errorf("Retrieve: %w", err)
|
||||
}
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (a *API) outputCAR(ctx context.Context, order api.RetrievalOrder, sel datamodel.Node, bs bstore.Blockstore, ref *api.FileRef) error {
|
||||
// generating a CARv1 from the configured blockstore
|
||||
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = car.NewSelectiveCar(
|
||||
ctx,
|
||||
bs,
|
||||
[]car.Dag{{
|
||||
Root: order.Root,
|
||||
Selector: sel,
|
||||
}},
|
||||
car.MaxTraversalLinks(config.MaxTraversalLinks),
|
||||
).Write(f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return f.Close()
|
||||
}
|
||||
|
||||
func (a *API) outputUnixFS(ctx context.Context, order api.RetrievalOrder, sel datamodel.Node, bs bstore.Blockstore, ref *api.FileRef) error {
|
||||
ds := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
|
||||
root := order.Root
|
||||
|
||||
// if we used a selector - need to find the sub-root the user actually wanted to retrieve
|
||||
if order.DatamodelPathSelector != nil {
|
||||
var subRootFound bool
|
||||
|
||||
if err := utils.TraverseDag(
|
||||
ctx,
|
||||
ds,
|
||||
root,
|
||||
sel,
|
||||
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
|
||||
if r == traversal.VisitReason_SelectionMatch {
|
||||
|
||||
if p.LastBlock.Path.String() != p.Path.String() {
|
||||
return xerrors.Errorf("unsupported selection path '%s' does not correspond to a block boundary (a.k.a. CID link)", p.Path.String())
|
||||
}
|
||||
|
||||
if p.LastBlock.Link == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
cidLnk, castOK := p.LastBlock.Link.(cidlink.Link)
|
||||
if !castOK {
|
||||
return xerrors.Errorf("cidlink cast unexpectedly failed on '%s'", p.LastBlock.Link)
|
||||
}
|
||||
|
||||
root = cidLnk.Cid
|
||||
subRootFound = true
|
||||
}
|
||||
return nil
|
||||
},
|
||||
); err != nil {
|
||||
return xerrors.Errorf("error while locating partial retrieval sub-root: %w", err)
|
||||
}
|
||||
|
||||
if !subRootFound {
|
||||
return xerrors.Errorf("path selection '%s' does not match a node within %s", *order.DatamodelPathSelector, root)
|
||||
}
|
||||
}
|
||||
|
||||
nd, err := ds.Get(ctx, root)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("ClientRetrieve: %w", err)
|
||||
}
|
||||
file, err := unixfile.NewUnixfsFile(ctx, ds, nd)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("ClientRetrieve: %w", err)
|
||||
}
|
||||
|
||||
return files.WriteTo(file, ref.Path)
|
||||
}
|
||||
|
||||
func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef, events chan marketevents.RetrievalEvent) {
|
||||
defer close(events)
|
||||
|
||||
finish := func(e error) {
|
||||
if e != nil {
|
||||
events <- marketevents.RetrievalEvent{Err: e.Error(), FundsSpent: big.Zero()}
|
||||
}
|
||||
}
|
||||
|
||||
sel, err := getRetrievalSelector(order.DatamodelPathSelector)
|
||||
if err != nil {
|
||||
finish(err)
|
||||
return
|
||||
}
|
||||
|
||||
// summary:
|
||||
// 1. if we're retrieving from an import, FromLocalCAR will be set.
|
||||
// Skip the retrieval itself, and use the provided car as a blockstore further down
|
||||
@ -889,88 +1043,20 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
||||
|
||||
// this indicates we're proxying to IPFS.
|
||||
proxyBss, retrieveIntoIPFS := a.RtvlBlockstoreAccessor.(*retrievaladapter.ProxyBlockstoreAccessor)
|
||||
|
||||
carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievaladapter.CARBlockstoreAccessor)
|
||||
|
||||
carPath := order.FromLocalCAR
|
||||
|
||||
// we actually need to retrieve from the network
|
||||
if carPath == "" {
|
||||
|
||||
if !retrieveIntoIPFS && !retrieveIntoCAR {
|
||||
// we don't recognize the blockstore accessor.
|
||||
finish(xerrors.Errorf("unsupported retrieval blockstore accessor"))
|
||||
return
|
||||
}
|
||||
|
||||
if order.MinerPeer == nil || order.MinerPeer.ID == "" {
|
||||
mi, err := a.StateMinerInfo(ctx, order.Miner, types.EmptyTSK)
|
||||
if err != nil {
|
||||
finish(err)
|
||||
return
|
||||
}
|
||||
|
||||
order.MinerPeer = &rm.RetrievalPeer{
|
||||
ID: *mi.PeerId,
|
||||
Address: order.Miner,
|
||||
}
|
||||
}
|
||||
|
||||
if order.Total.Int == nil {
|
||||
finish(xerrors.Errorf("cannot make retrieval deal for null total"))
|
||||
return
|
||||
}
|
||||
|
||||
if order.Size == 0 {
|
||||
finish(xerrors.Errorf("cannot make retrieval deal for zero bytes"))
|
||||
return
|
||||
}
|
||||
|
||||
ppb := types.BigDiv(order.Total, types.NewInt(order.Size))
|
||||
|
||||
params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, sel, order.Piece, order.UnsealPrice)
|
||||
id, err := a.doRetrieval(ctx, order, sel, events)
|
||||
if err != nil {
|
||||
finish(xerrors.Errorf("Error in retrieval params: %s", err))
|
||||
return
|
||||
}
|
||||
|
||||
// Subscribe to events before retrieving to avoid losing events.
|
||||
subscribeEvents := make(chan retrievalSubscribeEvent, 1)
|
||||
subscribeCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
|
||||
// We'll check the deal IDs inside consumeAllEvents.
|
||||
if state.PayloadCID.Equals(order.Root) {
|
||||
select {
|
||||
case <-subscribeCtx.Done():
|
||||
case subscribeEvents <- retrievalSubscribeEvent{event, state}:
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
id := a.Retrieval.NextID()
|
||||
id, err = a.Retrieval.Retrieve(
|
||||
ctx,
|
||||
id,
|
||||
order.Root,
|
||||
params,
|
||||
order.Total,
|
||||
*order.MinerPeer,
|
||||
order.Client,
|
||||
order.Miner,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
unsubscribe()
|
||||
finish(xerrors.Errorf("Retrieve failed: %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
err = consumeAllEvents(ctx, id, subscribeEvents, events)
|
||||
|
||||
unsubscribe()
|
||||
if err != nil {
|
||||
finish(xerrors.Errorf("Retrieve: %w", err))
|
||||
finish(err)
|
||||
return
|
||||
}
|
||||
|
||||
@ -1002,106 +1088,17 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
||||
|
||||
// Are we outputting a CAR?
|
||||
if ref.IsCAR {
|
||||
|
||||
// not IPFS and we do full selection - just extract the CARv1 from the CARv2 we stored the retrieval in
|
||||
if !retrieveIntoIPFS && order.DatamodelPathSelector == nil {
|
||||
finish(carv2.ExtractV1File(carPath, ref.Path))
|
||||
return
|
||||
}
|
||||
|
||||
// generating a CARv1 from the configured blockstore
|
||||
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
finish(err)
|
||||
return
|
||||
}
|
||||
|
||||
err = car.NewSelectiveCar(
|
||||
ctx,
|
||||
retrievalBs,
|
||||
[]car.Dag{{
|
||||
Root: order.Root,
|
||||
Selector: sel,
|
||||
}},
|
||||
car.MaxTraversalLinks(config.MaxTraversalLinks),
|
||||
).Write(f)
|
||||
if err != nil {
|
||||
finish(err)
|
||||
return
|
||||
}
|
||||
|
||||
finish(f.Close())
|
||||
finish(a.outputCAR(ctx, order, sel, retrievalBs, ref))
|
||||
return
|
||||
}
|
||||
|
||||
// we are extracting a UnixFS file.
|
||||
ds := merkledag.NewDAGService(blockservice.New(retrievalBs, offline.Exchange(retrievalBs)))
|
||||
root := order.Root
|
||||
|
||||
// if we used a selector - need to find the sub-root the user actually wanted to retrieve
|
||||
if order.DatamodelPathSelector != nil {
|
||||
|
||||
var subRootFound bool
|
||||
|
||||
var sel datamodel.Node
|
||||
|
||||
// no err check - we just compiled this before starting, but now we do not wrap a `*`
|
||||
if strings.HasPrefix(string(*order.DatamodelPathSelector), "{") {
|
||||
sel, _ = selectorparse.ParseJSONSelector(string(*order.DatamodelPathSelector))
|
||||
} else {
|
||||
selspec, _ := textselector.SelectorSpecFromPath(*order.DatamodelPathSelector, nil) //nolint:errcheck
|
||||
sel = selspec.Node()
|
||||
}
|
||||
|
||||
if err := utils.TraverseDag(
|
||||
ctx,
|
||||
ds,
|
||||
root,
|
||||
sel,
|
||||
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
|
||||
if r == traversal.VisitReason_SelectionMatch {
|
||||
|
||||
if p.LastBlock.Path.String() != p.Path.String() {
|
||||
return xerrors.Errorf("unsupported selection path '%s' does not correspond to a block boundary (a.k.a. CID link)", p.Path.String())
|
||||
}
|
||||
|
||||
if p.LastBlock.Link == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
cidLnk, castOK := p.LastBlock.Link.(cidlink.Link)
|
||||
if !castOK {
|
||||
return xerrors.Errorf("cidlink cast unexpectedly failed on '%s'", p.LastBlock.Link)
|
||||
}
|
||||
|
||||
root = cidLnk.Cid
|
||||
subRootFound = true
|
||||
}
|
||||
return nil
|
||||
},
|
||||
); err != nil {
|
||||
finish(xerrors.Errorf("error while locating partial retrieval sub-root: %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
if !subRootFound {
|
||||
finish(xerrors.Errorf("path selection '%s' does not match a node within %s", *order.DatamodelPathSelector, root))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
nd, err := ds.Get(ctx, root)
|
||||
if err != nil {
|
||||
finish(xerrors.Errorf("ClientRetrieve: %w", err))
|
||||
return
|
||||
}
|
||||
file, err := unixfile.NewUnixfsFile(ctx, ds, nd)
|
||||
if err != nil {
|
||||
finish(xerrors.Errorf("ClientRetrieve: %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
finish(files.WriteTo(file, ref.Path))
|
||||
finish(a.outputUnixFS(ctx, order, sel, retrievalBs, ref))
|
||||
}
|
||||
|
||||
func (a *API) ClientListRetrievals(ctx context.Context) ([]api.RetrievalInfo, error) {
|
||||
|
Loading…
Reference in New Issue
Block a user