From 2c5f4386ba082db2f35b2b431306770aa86e71fb Mon Sep 17 00:00:00 2001 From: Peter Rabbitson Date: Fri, 4 Jun 2021 11:16:39 +0000 Subject: [PATCH] Prepare the retrieval codepath for selectors Slight import-name shuffle, no functional changes --- node/impl/client/client.go | 113 ++++++++++++++++++++----------------- 1 file changed, 62 insertions(+), 51 deletions(-) diff --git a/node/impl/client/client.go b/node/impl/client/client.go index f06a62f90..5f08f93cb 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -14,7 +14,7 @@ import ( unixfile "github.com/ipfs/go-unixfs/file" "github.com/ipld/go-car" carv2 "github.com/ipld/go-car/v2" - "github.com/ipld/go-car/v2/blockstore" + carv2bs "github.com/ipld/go-car/v2/blockstore" "golang.org/x/xerrors" "github.com/filecoin-project/go-padreader" @@ -41,7 +41,6 @@ import ( datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/discovery" - "github.com/filecoin-project/go-fil-markets/retrievalmarket" rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" @@ -91,7 +90,7 @@ type API struct { // accessors for imports and retrievals. Imports dtypes.ClientImportMgr StorageBlockstoreAccessor storagemarket.BlockstoreAccessor - RtvlBlockstoreAccessor retrievalmarket.BlockstoreAccessor + RtvlBlockstoreAccessor rm.BlockstoreAccessor DataTransfer dtypes.ClientDataTransfer Host host.Host @@ -619,7 +618,7 @@ func (a *API) ClientImportLocal(ctx context.Context, r io.Reader) (cid.Cid, erro return cid.Undef, xerrors.Errorf("failed to calculate placeholder root: %w", err) } - bs, err := blockstore.OpenReadWrite(path, []cid.Cid{placeholderRoot}, blockstore.UseWholeCIDs(true)) + bs, err := carv2bs.OpenReadWrite(path, []cid.Cid{placeholderRoot}, carv2bs.UseWholeCIDs(true)) if err != nil { return cid.Undef, xerrors.Errorf("failed to create carv2 read/write blockstore: %w", err) } @@ -730,7 +729,7 @@ func (a *API) ClientListImports(_ context.Context) ([]api.Import, error) { return out, nil } -func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID retrievalmarket.DealID) error { +func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID rm.DealID) error { cerr := make(chan error) go func() { err := a.Retrieval.CancelDeal(dealID) @@ -784,7 +783,7 @@ type retrievalSubscribeEvent struct { state rm.ClientDealState } -func consumeAllEvents(ctx context.Context, dealID retrievalmarket.DealID, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error { +func consumeAllEvents(ctx context.Context, dealID rm.DealID, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error { for { var subscribeEvent retrievalSubscribeEvent select { @@ -835,24 +834,29 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref } } + sel := shared.AllSelector() + // summary: - // 1. if we're retrieving from an import, FromLocalCAR will be informed. - // Open as a Filestore and populate the target CAR or UnixFS export from it. - // (cannot use ExtractV1File because user wants a dense CAR, not a ref CAR/filestore) + // 1. if we're retrieving from an import, FromLocalCAR will be set. + // Skip the retrieval itself, and use the provided car as a blockstore further down + // to extract a CAR or UnixFS export from. // 2. if we're using an IPFS blockstore for retrieval, retrieve into it, - // then extract the CAR or UnixFS export from it. - // 3. if we have to retrieve, perform a CARv2 retrieval, then extract - // the CARv1 (with ExtractV1File) or UnixFS export from it. + // then use the virtual blockstore to extract a CAR or UnixFS export from it. + // 3. if we have to retrieve, perform a CARv2 retrieval, then either + // extract the CARv1 (with ExtractV1File) or use it as a blockstore further down. // this indicates we're proxying to IPFS. proxyBss, retrieveIntoIPFS := a.RtvlBlockstoreAccessor.(*retrievaladapter.ProxyBlockstoreAccessor) + carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievaladapter.CARBlockstoreAccessor) carPath := order.FromLocalCAR + + // we actually need to retrieve from the network if carPath == "" { + if !retrieveIntoIPFS && !retrieveIntoCAR { - // we actually need to retrieve from the network, but we don't - // recognize the blockstore accessor. + // we don't recognize the blockstore accessor. finish(xerrors.Errorf("unsupported retrieval blockstore accessor")) return } @@ -864,7 +868,7 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref return } - order.MinerPeer = &retrievalmarket.RetrievalPeer{ + order.MinerPeer = &rm.RetrievalPeer{ ID: *mi.PeerId, Address: order.Miner, } @@ -882,7 +886,7 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref ppb := types.BigDiv(order.Total, types.NewInt(order.Size)) - params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, shared.AllSelector(), order.Piece, order.UnsealPrice) + params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, sel, order.Piece, order.UnsealPrice) if err != nil { finish(xerrors.Errorf("Error in retrieval params: %s", err)) return @@ -940,37 +944,10 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref return } - // Are we outputting a CAR? - if ref.IsCAR { - if retrieveIntoIPFS { - // generating a CARv1 from IPFS. - f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - finish(err) - return - } - - bs := proxyBss.Blockstore - dags := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) - err = car.WriteCar(ctx, dags, []cid.Cid{order.Root}, f) - if err != nil { - finish(err) - return - } - finish(f.Close()) - return - } - - // generating a CARv1 from the CARv2 where we stored the retrieval. - err := carv2.ExtractV1File(carPath, ref.Path) - finish(err) - return - } - - // we are extracting a UnixFS file. - var bs bstore.Blockstore + // determine where did the retrieval go + var retrievalBs bstore.Blockstore if retrieveIntoIPFS { - bs = proxyBss.Blockstore + retrievalBs = proxyBss.Blockstore } else { cbs, err := stores.ReadOnlyFilestore(carPath) if err != nil { @@ -978,18 +955,52 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref return } defer cbs.Close() //nolint:errcheck - bs = cbs + retrievalBs = cbs } - bsvc := blockservice.New(bs, offline.Exchange(bs)) - dag := merkledag.NewDAGService(bsvc) + // Are we outputting a CAR? + if ref.IsCAR { - nd, err := dag.Get(ctx, order.Root) + // not IPFS - just extract the CARv1 from the CARv2 we stored the retrieval in + if !retrieveIntoIPFS { + finish(carv2.ExtractV1File(carPath, ref.Path)) + return + } + + // generating a CARv1 from the configured blockstore + f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + finish(err) + return + } + + err = car.NewSelectiveCar( + ctx, + retrievalBs, + []car.Dag{{ + Root: order.Root, + Selector: sel, + }}, + ).Write(f) + if err != nil { + finish(err) + return + } + + finish(f.Close()) + return + } + + // we are extracting a UnixFS file. + ds := merkledag.NewDAGService(blockservice.New(retrievalBs, offline.Exchange(retrievalBs))) + root := order.Root + + nd, err := ds.Get(ctx, root) if err != nil { finish(xerrors.Errorf("ClientRetrieve: %w", err)) return } - file, err := unixfile.NewUnixfsFile(ctx, dag, nd) + file, err := unixfile.NewUnixfsFile(ctx, ds, nd) if err != nil { finish(xerrors.Errorf("ClientRetrieve: %w", err)) return