Prepare the retrieval codepath for selectors

Slight import-name shuffle, no functional changes
This commit is contained in:
Peter Rabbitson 2021-06-04 11:16:39 +00:00
parent 92c12f9bde
commit 2c5f4386ba

View File

@ -14,7 +14,7 @@ import (
unixfile "github.com/ipfs/go-unixfs/file"
"github.com/ipld/go-car"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
carv2bs "github.com/ipld/go-car/v2/blockstore"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-padreader"
@ -41,7 +41,6 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/discovery"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
@ -91,7 +90,7 @@ type API struct {
// accessors for imports and retrievals.
Imports dtypes.ClientImportMgr
StorageBlockstoreAccessor storagemarket.BlockstoreAccessor
RtvlBlockstoreAccessor retrievalmarket.BlockstoreAccessor
RtvlBlockstoreAccessor rm.BlockstoreAccessor
DataTransfer dtypes.ClientDataTransfer
Host host.Host
@ -619,7 +618,7 @@ func (a *API) ClientImportLocal(ctx context.Context, r io.Reader) (cid.Cid, erro
return cid.Undef, xerrors.Errorf("failed to calculate placeholder root: %w", err)
}
bs, err := blockstore.OpenReadWrite(path, []cid.Cid{placeholderRoot}, blockstore.UseWholeCIDs(true))
bs, err := carv2bs.OpenReadWrite(path, []cid.Cid{placeholderRoot}, carv2bs.UseWholeCIDs(true))
if err != nil {
return cid.Undef, xerrors.Errorf("failed to create carv2 read/write blockstore: %w", err)
}
@ -730,7 +729,7 @@ func (a *API) ClientListImports(_ context.Context) ([]api.Import, error) {
return out, nil
}
func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID retrievalmarket.DealID) error {
func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID rm.DealID) error {
cerr := make(chan error)
go func() {
err := a.Retrieval.CancelDeal(dealID)
@ -784,7 +783,7 @@ type retrievalSubscribeEvent struct {
state rm.ClientDealState
}
func consumeAllEvents(ctx context.Context, dealID retrievalmarket.DealID, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error {
func consumeAllEvents(ctx context.Context, dealID rm.DealID, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error {
for {
var subscribeEvent retrievalSubscribeEvent
select {
@ -835,24 +834,29 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
}
}
sel := shared.AllSelector()
// summary:
// 1. if we're retrieving from an import, FromLocalCAR will be informed.
// Open as a Filestore and populate the target CAR or UnixFS export from it.
// (cannot use ExtractV1File because user wants a dense CAR, not a ref CAR/filestore)
// 1. if we're retrieving from an import, FromLocalCAR will be set.
// Skip the retrieval itself, and use the provided car as a blockstore further down
// to extract a CAR or UnixFS export from.
// 2. if we're using an IPFS blockstore for retrieval, retrieve into it,
// then extract the CAR or UnixFS export from it.
// 3. if we have to retrieve, perform a CARv2 retrieval, then extract
// the CARv1 (with ExtractV1File) or UnixFS export from it.
// then use the virtual blockstore to extract a CAR or UnixFS export from it.
// 3. if we have to retrieve, perform a CARv2 retrieval, then either
// extract the CARv1 (with ExtractV1File) or use it as a blockstore further down.
// this indicates we're proxying to IPFS.
proxyBss, retrieveIntoIPFS := a.RtvlBlockstoreAccessor.(*retrievaladapter.ProxyBlockstoreAccessor)
carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievaladapter.CARBlockstoreAccessor)
carPath := order.FromLocalCAR
// we actually need to retrieve from the network
if carPath == "" {
if !retrieveIntoIPFS && !retrieveIntoCAR {
// we actually need to retrieve from the network, but we don't
// recognize the blockstore accessor.
// we don't recognize the blockstore accessor.
finish(xerrors.Errorf("unsupported retrieval blockstore accessor"))
return
}
@ -864,7 +868,7 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return
}
order.MinerPeer = &retrievalmarket.RetrievalPeer{
order.MinerPeer = &rm.RetrievalPeer{
ID: *mi.PeerId,
Address: order.Miner,
}
@ -882,7 +886,7 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
ppb := types.BigDiv(order.Total, types.NewInt(order.Size))
params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, shared.AllSelector(), order.Piece, order.UnsealPrice)
params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, sel, order.Piece, order.UnsealPrice)
if err != nil {
finish(xerrors.Errorf("Error in retrieval params: %s", err))
return
@ -940,37 +944,10 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return
}
// Are we outputting a CAR?
if ref.IsCAR {
if retrieveIntoIPFS {
// generating a CARv1 from IPFS.
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
finish(err)
return
}
bs := proxyBss.Blockstore
dags := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
err = car.WriteCar(ctx, dags, []cid.Cid{order.Root}, f)
if err != nil {
finish(err)
return
}
finish(f.Close())
return
}
// generating a CARv1 from the CARv2 where we stored the retrieval.
err := carv2.ExtractV1File(carPath, ref.Path)
finish(err)
return
}
// we are extracting a UnixFS file.
var bs bstore.Blockstore
// determine where did the retrieval go
var retrievalBs bstore.Blockstore
if retrieveIntoIPFS {
bs = proxyBss.Blockstore
retrievalBs = proxyBss.Blockstore
} else {
cbs, err := stores.ReadOnlyFilestore(carPath)
if err != nil {
@ -978,18 +955,52 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return
}
defer cbs.Close() //nolint:errcheck
bs = cbs
retrievalBs = cbs
}
bsvc := blockservice.New(bs, offline.Exchange(bs))
dag := merkledag.NewDAGService(bsvc)
// Are we outputting a CAR?
if ref.IsCAR {
nd, err := dag.Get(ctx, order.Root)
// not IPFS - just extract the CARv1 from the CARv2 we stored the retrieval in
if !retrieveIntoIPFS {
finish(carv2.ExtractV1File(carPath, ref.Path))
return
}
// generating a CARv1 from the configured blockstore
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
finish(err)
return
}
err = car.NewSelectiveCar(
ctx,
retrievalBs,
[]car.Dag{{
Root: order.Root,
Selector: sel,
}},
).Write(f)
if err != nil {
finish(err)
return
}
finish(f.Close())
return
}
// we are extracting a UnixFS file.
ds := merkledag.NewDAGService(blockservice.New(retrievalBs, offline.Exchange(retrievalBs)))
root := order.Root
nd, err := ds.Get(ctx, root)
if err != nil {
finish(xerrors.Errorf("ClientRetrieve: %w", err))
return
}
file, err := unixfile.NewUnixfsFile(ctx, dag, nd)
file, err := unixfile.NewUnixfsFile(ctx, ds, nd)
if err != nil {
finish(xerrors.Errorf("ClientRetrieve: %w", err))
return