Merge pull request #7306 from filecoin-project/chore/prep-retrieval-for-selectors_no-func-changes
Prep retrieval for selectors: no functional changes
This commit is contained in:
commit
1ce7c250d7
@ -14,7 +14,7 @@ import (
|
|||||||
unixfile "github.com/ipfs/go-unixfs/file"
|
unixfile "github.com/ipfs/go-unixfs/file"
|
||||||
"github.com/ipld/go-car"
|
"github.com/ipld/go-car"
|
||||||
carv2 "github.com/ipld/go-car/v2"
|
carv2 "github.com/ipld/go-car/v2"
|
||||||
"github.com/ipld/go-car/v2/blockstore"
|
carv2bs "github.com/ipld/go-car/v2/blockstore"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-padreader"
|
"github.com/filecoin-project/go-padreader"
|
||||||
@ -41,7 +41,6 @@ import (
|
|||||||
datatransfer "github.com/filecoin-project/go-data-transfer"
|
datatransfer "github.com/filecoin-project/go-data-transfer"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-fil-markets/discovery"
|
"github.com/filecoin-project/go-fil-markets/discovery"
|
||||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
|
||||||
rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||||
"github.com/filecoin-project/go-fil-markets/shared"
|
"github.com/filecoin-project/go-fil-markets/shared"
|
||||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||||
@ -91,7 +90,7 @@ type API struct {
|
|||||||
// accessors for imports and retrievals.
|
// accessors for imports and retrievals.
|
||||||
Imports dtypes.ClientImportMgr
|
Imports dtypes.ClientImportMgr
|
||||||
StorageBlockstoreAccessor storagemarket.BlockstoreAccessor
|
StorageBlockstoreAccessor storagemarket.BlockstoreAccessor
|
||||||
RtvlBlockstoreAccessor retrievalmarket.BlockstoreAccessor
|
RtvlBlockstoreAccessor rm.BlockstoreAccessor
|
||||||
|
|
||||||
DataTransfer dtypes.ClientDataTransfer
|
DataTransfer dtypes.ClientDataTransfer
|
||||||
Host host.Host
|
Host host.Host
|
||||||
@ -619,7 +618,7 @@ func (a *API) ClientImportLocal(ctx context.Context, r io.Reader) (cid.Cid, erro
|
|||||||
return cid.Undef, xerrors.Errorf("failed to calculate placeholder root: %w", err)
|
return cid.Undef, xerrors.Errorf("failed to calculate placeholder root: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
bs, err := blockstore.OpenReadWrite(path, []cid.Cid{placeholderRoot}, blockstore.UseWholeCIDs(true))
|
bs, err := carv2bs.OpenReadWrite(path, []cid.Cid{placeholderRoot}, carv2bs.UseWholeCIDs(true))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, xerrors.Errorf("failed to create carv2 read/write blockstore: %w", err)
|
return cid.Undef, xerrors.Errorf("failed to create carv2 read/write blockstore: %w", err)
|
||||||
}
|
}
|
||||||
@ -730,7 +729,7 @@ func (a *API) ClientListImports(_ context.Context) ([]api.Import, error) {
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID retrievalmarket.DealID) error {
|
func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID rm.DealID) error {
|
||||||
cerr := make(chan error)
|
cerr := make(chan error)
|
||||||
go func() {
|
go func() {
|
||||||
err := a.Retrieval.CancelDeal(dealID)
|
err := a.Retrieval.CancelDeal(dealID)
|
||||||
@ -784,7 +783,7 @@ type retrievalSubscribeEvent struct {
|
|||||||
state rm.ClientDealState
|
state rm.ClientDealState
|
||||||
}
|
}
|
||||||
|
|
||||||
func consumeAllEvents(ctx context.Context, dealID retrievalmarket.DealID, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error {
|
func consumeAllEvents(ctx context.Context, dealID rm.DealID, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error {
|
||||||
for {
|
for {
|
||||||
var subscribeEvent retrievalSubscribeEvent
|
var subscribeEvent retrievalSubscribeEvent
|
||||||
select {
|
select {
|
||||||
@ -835,24 +834,29 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sel := shared.AllSelector()
|
||||||
|
|
||||||
// summary:
|
// summary:
|
||||||
// 1. if we're retrieving from an import, FromLocalCAR will be informed.
|
// 1. if we're retrieving from an import, FromLocalCAR will be set.
|
||||||
// Open as a Filestore and populate the target CAR or UnixFS export from it.
|
// Skip the retrieval itself, and use the provided car as a blockstore further down
|
||||||
// (cannot use ExtractV1File because user wants a dense CAR, not a ref CAR/filestore)
|
// to extract a CAR or UnixFS export from.
|
||||||
// 2. if we're using an IPFS blockstore for retrieval, retrieve into it,
|
// 2. if we're using an IPFS blockstore for retrieval, retrieve into it,
|
||||||
// then extract the CAR or UnixFS export from it.
|
// then use the virtual blockstore to extract a CAR or UnixFS export from it.
|
||||||
// 3. if we have to retrieve, perform a CARv2 retrieval, then extract
|
// 3. if we have to retrieve, perform a CARv2 retrieval, then either
|
||||||
// the CARv1 (with ExtractV1File) or UnixFS export from it.
|
// extract the CARv1 (with ExtractV1File) or use it as a blockstore further down.
|
||||||
|
|
||||||
// this indicates we're proxying to IPFS.
|
// this indicates we're proxying to IPFS.
|
||||||
proxyBss, retrieveIntoIPFS := a.RtvlBlockstoreAccessor.(*retrievaladapter.ProxyBlockstoreAccessor)
|
proxyBss, retrieveIntoIPFS := a.RtvlBlockstoreAccessor.(*retrievaladapter.ProxyBlockstoreAccessor)
|
||||||
|
|
||||||
carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievaladapter.CARBlockstoreAccessor)
|
carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievaladapter.CARBlockstoreAccessor)
|
||||||
|
|
||||||
carPath := order.FromLocalCAR
|
carPath := order.FromLocalCAR
|
||||||
|
|
||||||
|
// we actually need to retrieve from the network
|
||||||
if carPath == "" {
|
if carPath == "" {
|
||||||
|
|
||||||
if !retrieveIntoIPFS && !retrieveIntoCAR {
|
if !retrieveIntoIPFS && !retrieveIntoCAR {
|
||||||
// we actually need to retrieve from the network, but we don't
|
// we don't recognize the blockstore accessor.
|
||||||
// recognize the blockstore accessor.
|
|
||||||
finish(xerrors.Errorf("unsupported retrieval blockstore accessor"))
|
finish(xerrors.Errorf("unsupported retrieval blockstore accessor"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -864,7 +868,7 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
order.MinerPeer = &retrievalmarket.RetrievalPeer{
|
order.MinerPeer = &rm.RetrievalPeer{
|
||||||
ID: *mi.PeerId,
|
ID: *mi.PeerId,
|
||||||
Address: order.Miner,
|
Address: order.Miner,
|
||||||
}
|
}
|
||||||
@ -882,7 +886,7 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
|||||||
|
|
||||||
ppb := types.BigDiv(order.Total, types.NewInt(order.Size))
|
ppb := types.BigDiv(order.Total, types.NewInt(order.Size))
|
||||||
|
|
||||||
params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, shared.AllSelector(), order.Piece, order.UnsealPrice)
|
params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, sel, order.Piece, order.UnsealPrice)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
finish(xerrors.Errorf("Error in retrieval params: %s", err))
|
finish(xerrors.Errorf("Error in retrieval params: %s", err))
|
||||||
return
|
return
|
||||||
@ -940,37 +944,10 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Are we outputting a CAR?
|
// determine where did the retrieval go
|
||||||
if ref.IsCAR {
|
var retrievalBs bstore.Blockstore
|
||||||
if retrieveIntoIPFS {
|
|
||||||
// generating a CARv1 from IPFS.
|
|
||||||
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
|
|
||||||
if err != nil {
|
|
||||||
finish(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
bs := proxyBss.Blockstore
|
|
||||||
dags := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
|
|
||||||
err = car.WriteCar(ctx, dags, []cid.Cid{order.Root}, f)
|
|
||||||
if err != nil {
|
|
||||||
finish(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
finish(f.Close())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// generating a CARv1 from the CARv2 where we stored the retrieval.
|
|
||||||
err := carv2.ExtractV1File(carPath, ref.Path)
|
|
||||||
finish(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// we are extracting a UnixFS file.
|
|
||||||
var bs bstore.Blockstore
|
|
||||||
if retrieveIntoIPFS {
|
if retrieveIntoIPFS {
|
||||||
bs = proxyBss.Blockstore
|
retrievalBs = proxyBss.Blockstore
|
||||||
} else {
|
} else {
|
||||||
cbs, err := stores.ReadOnlyFilestore(carPath)
|
cbs, err := stores.ReadOnlyFilestore(carPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -978,18 +955,52 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer cbs.Close() //nolint:errcheck
|
defer cbs.Close() //nolint:errcheck
|
||||||
bs = cbs
|
retrievalBs = cbs
|
||||||
}
|
}
|
||||||
|
|
||||||
bsvc := blockservice.New(bs, offline.Exchange(bs))
|
// Are we outputting a CAR?
|
||||||
dag := merkledag.NewDAGService(bsvc)
|
if ref.IsCAR {
|
||||||
|
|
||||||
nd, err := dag.Get(ctx, order.Root)
|
// not IPFS - just extract the CARv1 from the CARv2 we stored the retrieval in
|
||||||
|
if !retrieveIntoIPFS {
|
||||||
|
finish(carv2.ExtractV1File(carPath, ref.Path))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// generating a CARv1 from the configured blockstore
|
||||||
|
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
|
||||||
|
if err != nil {
|
||||||
|
finish(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = car.NewSelectiveCar(
|
||||||
|
ctx,
|
||||||
|
retrievalBs,
|
||||||
|
[]car.Dag{{
|
||||||
|
Root: order.Root,
|
||||||
|
Selector: sel,
|
||||||
|
}},
|
||||||
|
).Write(f)
|
||||||
|
if err != nil {
|
||||||
|
finish(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
finish(f.Close())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// we are extracting a UnixFS file.
|
||||||
|
ds := merkledag.NewDAGService(blockservice.New(retrievalBs, offline.Exchange(retrievalBs)))
|
||||||
|
root := order.Root
|
||||||
|
|
||||||
|
nd, err := ds.Get(ctx, root)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
finish(xerrors.Errorf("ClientRetrieve: %w", err))
|
finish(xerrors.Errorf("ClientRetrieve: %w", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
file, err := unixfile.NewUnixfsFile(ctx, dag, nd)
|
file, err := unixfile.NewUnixfsFile(ctx, ds, nd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
finish(xerrors.Errorf("ClientRetrieve: %w", err))
|
finish(xerrors.Errorf("ClientRetrieve: %w", err))
|
||||||
return
|
return
|
||||||
|
Loading…
Reference in New Issue
Block a user