diff --git a/node/builder.go b/node/builder.go index 961216c62..a1c32c9ce 100644 --- a/node/builder.go +++ b/node/builder.go @@ -326,6 +326,8 @@ func Repo(r repo.Repo) Option { Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr), Override(new(dtypes.ClientBlockstore), modules.ClientBlockstore), + Override(new(dtypes.ClientRetrievalStoreManager), modules.ClientBlockstoreRetrievalStoreManager(false)), + Override(new(ci.PrivKey), lp2p.PrivKey), Override(new(ci.PubKey), ci.PrivKey.GetPublic), Override(new(peer.ID), peer.IDFromPublicKey), diff --git a/node/builder_chain.go b/node/builder_chain.go index 31cb95984..cfc1933b2 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -169,7 +169,7 @@ func ConfigFullNode(c interface{}) Option { If(cfg.Client.UseIpfs, Override(new(dtypes.ClientBlockstore), modules.IpfsClientBlockstore(ipfsMaddr, cfg.Client.IpfsOnlineMode)), If(cfg.Client.IpfsUseForRetrieval, - Override(new(dtypes.ClientRetrievalStoreManager), modules.ClientBlockstoreRetrievalStoreManager), + Override(new(dtypes.ClientRetrievalStoreManager), modules.ClientBlockstoreRetrievalStoreManager(true)), ), ), Override(new(dtypes.Graphsync), modules.Graphsync(cfg.Client.SimultaneousTransfers)), diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 97d4b4f79..00b439256 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -84,6 +84,8 @@ type API struct { DataTransfer dtypes.ClientDataTransfer Host host.Host + + RetrievalStoreMgr dtypes.ClientRetrievalStoreManager } func calcDealExpiration(minDuration uint64, md *dline.Info, startEpoch abi.ChainEpoch) abi.ChainEpoch { @@ -619,10 +621,6 @@ func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID retrievalmar } func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error { - if ref == nil || ref.Path == "" { - return xerrors.New("must pass output file path for the retrieval deal") - } - events := make(chan marketevents.RetrievalEvent) go a.clientRetrieve(ctx, order, ref, events) @@ -643,10 +641,6 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref } func (a *API) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { - if ref == nil || ref.Path == "" { - return nil, xerrors.New("must pass output file path for the retrieval deal") - } - events := make(chan marketevents.RetrievalEvent) go a.clientRetrieve(ctx, order, ref, events) return events, nil @@ -783,6 +777,37 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref carV2FilePath = order.LocalCARV2FilePath } + // TODO We only support this currently for the IPFS Retrieval use case + // where users want to write out filecoin retrievals directly to IPFS. + // If users haven' configured the Ipfs retrieval flag, the blockstore we get here will be a "no-op" blockstore. + // write out the CARv2 file to the retrieval block-store (which is really an IPFS node behind the scenes). + rs, err := a.RetrievalStoreMgr.NewStore() + defer a.RetrievalStoreMgr.ReleaseStore(rs) //nolint:errcheck + if err != nil { + finish(xerrors.Errorf("Error setting up new store: %w", err)) + return + } + if rs.IsIPFSRetrieval() { + // write out the CARv1 blocks of the CARv2 file to the IPFS blockstore. + carv2Reader, err := carv2.NewReaderMmap(carV2FilePath) + if err != nil { + finish(err) + return + } + defer carv2Reader.Close() //nolint:errcheck + + if _, err := car.LoadCar(rs.Blockstore(), carv2Reader.CarV1Reader()); err != nil { + finish(err) + return + } + } + + // If ref is nil, it only fetches the data into the configured blockstore. + if ref == nil { + finish(nil) + return + } + if ref.IsCAR { // user wants a CAR file, transform the CARv2 to a CARv1 and write it out. f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644) @@ -806,13 +831,13 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref return } - rw, err := blockstore.OpenReadOnly(carV2FilePath) + readOnly, err := blockstore.OpenReadOnly(carV2FilePath) if err != nil { finish(err) return } - defer rw.Close() //nolint:errcheck - bsvc := blockservice.New(rw, offline.Exchange(rw)) + defer readOnly.Close() //nolint:errcheck + bsvc := blockservice.New(readOnly, offline.Exchange(readOnly)) dag := merkledag.NewDAGService(bsvc) nd, err := dag.Get(ctx, order.Root) diff --git a/node/impl/client/import_test.go b/node/impl/client/import_test.go index b481c7636..f90c9db8a 100644 --- a/node/impl/client/import_test.go +++ b/node/impl/client/import_test.go @@ -73,7 +73,7 @@ func TestImportNormalFileToCARv2(t *testing.T) { a := &API{ Imports: &importmgr.Mgr{}, } - importID := rand.Uint64() + importID := importmgr.ImportID(rand.Uint64()) inputFilePath, inputContents := genNormalInputFile(t) defer os.Remove(inputFilePath) //nolint:errcheck @@ -148,6 +148,31 @@ func TestTransformCarv1ToCARv2(t *testing.T) { require.Equal(t, bzin, bzout) } +func TestLoadCARv2ToBlockstore(t *testing.T) { + inputFilePath, _ := genNormalInputFile(t) + defer os.Remove(inputFilePath) //nolint:errcheck + + carv1FilePath := genCARv1(t, inputFilePath) + defer os.Remove(carv1FilePath) //nolint:errcheck + + outputCARv2 := genTmpFile(t) + defer os.Remove(outputCARv2) //nolint:errcheck + + root, err := transformCarToCARv2(carv1FilePath, outputCARv2) + require.NoError(t, err) + require.NotEqual(t, cid.Undef, root) + + bs := bstore.NewMemorySync() + + carv2, err := carv2.NewReaderMmap(outputCARv2) + require.NoError(t, err) + defer carv2.Close() //nolint:errcheck + header, err := car.LoadCar(bs, carv2.CarV1Reader()) + require.NoError(t, err) + require.EqualValues(t, root, header.Roots[0]) + require.EqualValues(t, 1, header.Version) +} + func genCARv1(t *testing.T, normalFilePath string) string { ctx := context.Background() bs := bstore.NewMemorySync() diff --git a/node/modules/client.go b/node/modules/client.go index 0e3a2e320..221eca815 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -212,6 +212,8 @@ func RetrievalClient(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes. } // ClientBlockstoreRetrievalStoreManager is the default version of the RetrievalStoreManager that runs on multistore -func ClientBlockstoreRetrievalStoreManager(bs dtypes.ClientBlockstore) dtypes.ClientRetrievalStoreManager { - return retrievalstoremgr.NewBlockstoreRetrievalStoreManager(bs) +func ClientBlockstoreRetrievalStoreManager(isIpfsRetrieval bool) func(bs dtypes.ClientBlockstore) (dtypes.ClientRetrievalStoreManager, error) { + return func(bs dtypes.ClientBlockstore) (dtypes.ClientRetrievalStoreManager, error) { + return retrievalstoremgr.NewBlockstoreRetrievalStoreManager(bs, isIpfsRetrieval), nil + } } diff --git a/node/repo/importmgr/mgr.go b/node/repo/importmgr/mgr.go index 6a5ba7f61..e0b101c82 100644 --- a/node/repo/importmgr/mgr.go +++ b/node/repo/importmgr/mgr.go @@ -9,12 +9,15 @@ import ( "github.com/filecoin-project/go-fil-markets/shared" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore/query" + logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" ) +var log = logging.Logger("importmgr") + type ImportID uint64 type Mgr struct { @@ -135,6 +138,7 @@ func (m *Mgr) CARV2FilePathFor(dagRoot cid.Cid) (string, error) { for _, importID := range importIDs { info, err := m.Info(importID) if err != nil { + log.Errorf("failed to fetch info, importID=%d: %s", importID, err) continue } if info.Labels[LRootCid] == "" { @@ -142,6 +146,7 @@ func (m *Mgr) CARV2FilePathFor(dagRoot cid.Cid) (string, error) { } c, err := cid.Parse(info.Labels[LRootCid]) if err != nil { + log.Errorf("failed to parse Root cid %s: %w", info.Labels[LRootCid], err) continue } if c.Equals(dagRoot) { diff --git a/node/repo/retrievalstoremgr/retrievalstoremgr.go b/node/repo/retrievalstoremgr/retrievalstoremgr.go index c7fe812c5..118062258 100644 --- a/node/repo/retrievalstoremgr/retrievalstoremgr.go +++ b/node/repo/retrievalstoremgr/retrievalstoremgr.go @@ -2,15 +2,12 @@ package retrievalstoremgr import ( "github.com/filecoin-project/lotus/blockstore" - "github.com/ipfs/go-blockservice" - offline "github.com/ipfs/go-ipfs-exchange-offline" - ipldformat "github.com/ipfs/go-ipld-format" - "github.com/ipfs/go-merkledag" ) // RetrievalStore references a store for a retrieval deal. type RetrievalStore interface { - DAGService() ipldformat.DAGService + IsIPFSRetrieval() bool + Blockstore() blockstore.BasicBlockstore } // RetrievalStoreManager manages stores for retrieval deals, abstracting @@ -20,24 +17,27 @@ type RetrievalStoreManager interface { ReleaseStore(RetrievalStore) error } -// BlockstoreRetrievalStoreManager manages a single blockstore as if it were multiple stores +// BlockstoreRetrievalStoreManager is a blockstore used for retrieval. type BlockstoreRetrievalStoreManager struct { - bs blockstore.BasicBlockstore + bs blockstore.BasicBlockstore + isIpfsRetrieval bool } var _ RetrievalStoreManager = &BlockstoreRetrievalStoreManager{} // NewBlockstoreRetrievalStoreManager returns a new blockstore based RetrievalStoreManager -func NewBlockstoreRetrievalStoreManager(bs blockstore.BasicBlockstore) RetrievalStoreManager { +func NewBlockstoreRetrievalStoreManager(bs blockstore.BasicBlockstore, isIpfsRetrieval bool) RetrievalStoreManager { return &BlockstoreRetrievalStoreManager{ - bs: bs, + bs: bs, + isIpfsRetrieval: isIpfsRetrieval, } } // NewStore creates a new store (just uses underlying blockstore) func (brsm *BlockstoreRetrievalStoreManager) NewStore() (RetrievalStore, error) { return &blockstoreRetrievalStore{ - dagService: merkledag.NewDAGService(blockservice.New(brsm.bs, offline.Exchange(brsm.bs))), + bs: brsm.bs, + isIpfsRetrieval: brsm.isIpfsRetrieval, }, nil } @@ -47,9 +47,14 @@ func (brsm *BlockstoreRetrievalStoreManager) ReleaseStore(RetrievalStore) error } type blockstoreRetrievalStore struct { - dagService ipldformat.DAGService + bs blockstore.BasicBlockstore + isIpfsRetrieval bool } -func (brs *blockstoreRetrievalStore) DAGService() ipldformat.DAGService { - return brs.dagService +func (brs *blockstoreRetrievalStore) Blockstore() blockstore.BasicBlockstore { + return brs.bs +} + +func (brs *blockstoreRetrievalStore) IsIPFSRetrieval() bool { + return brs.isIpfsRetrieval }