changes as per review

This commit is contained in:
aarshkshah1992 2021-07-07 14:10:59 +05:30
parent ba236fe258
commit f8d32f5328
7 changed files with 92 additions and 28 deletions

View File

@ -326,6 +326,8 @@ func Repo(r repo.Repo) Option {
Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr), Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr),
Override(new(dtypes.ClientBlockstore), modules.ClientBlockstore), Override(new(dtypes.ClientBlockstore), modules.ClientBlockstore),
Override(new(dtypes.ClientRetrievalStoreManager), modules.ClientBlockstoreRetrievalStoreManager(false)),
Override(new(ci.PrivKey), lp2p.PrivKey), Override(new(ci.PrivKey), lp2p.PrivKey),
Override(new(ci.PubKey), ci.PrivKey.GetPublic), Override(new(ci.PubKey), ci.PrivKey.GetPublic),
Override(new(peer.ID), peer.IDFromPublicKey), Override(new(peer.ID), peer.IDFromPublicKey),

View File

@ -169,7 +169,7 @@ func ConfigFullNode(c interface{}) Option {
If(cfg.Client.UseIpfs, If(cfg.Client.UseIpfs,
Override(new(dtypes.ClientBlockstore), modules.IpfsClientBlockstore(ipfsMaddr, cfg.Client.IpfsOnlineMode)), Override(new(dtypes.ClientBlockstore), modules.IpfsClientBlockstore(ipfsMaddr, cfg.Client.IpfsOnlineMode)),
If(cfg.Client.IpfsUseForRetrieval, If(cfg.Client.IpfsUseForRetrieval,
Override(new(dtypes.ClientRetrievalStoreManager), modules.ClientBlockstoreRetrievalStoreManager), Override(new(dtypes.ClientRetrievalStoreManager), modules.ClientBlockstoreRetrievalStoreManager(true)),
), ),
), ),
Override(new(dtypes.Graphsync), modules.Graphsync(cfg.Client.SimultaneousTransfers)), Override(new(dtypes.Graphsync), modules.Graphsync(cfg.Client.SimultaneousTransfers)),

View File

@ -84,6 +84,8 @@ type API struct {
DataTransfer dtypes.ClientDataTransfer DataTransfer dtypes.ClientDataTransfer
Host host.Host Host host.Host
RetrievalStoreMgr dtypes.ClientRetrievalStoreManager
} }
func calcDealExpiration(minDuration uint64, md *dline.Info, startEpoch abi.ChainEpoch) abi.ChainEpoch { func calcDealExpiration(minDuration uint64, md *dline.Info, startEpoch abi.ChainEpoch) abi.ChainEpoch {
@ -619,10 +621,6 @@ func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID retrievalmar
} }
func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error { func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error {
if ref == nil || ref.Path == "" {
return xerrors.New("must pass output file path for the retrieval deal")
}
events := make(chan marketevents.RetrievalEvent) events := make(chan marketevents.RetrievalEvent)
go a.clientRetrieve(ctx, order, ref, events) go a.clientRetrieve(ctx, order, ref, events)
@ -643,10 +641,6 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
} }
func (a *API) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { func (a *API) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
if ref == nil || ref.Path == "" {
return nil, xerrors.New("must pass output file path for the retrieval deal")
}
events := make(chan marketevents.RetrievalEvent) events := make(chan marketevents.RetrievalEvent)
go a.clientRetrieve(ctx, order, ref, events) go a.clientRetrieve(ctx, order, ref, events)
return events, nil return events, nil
@ -783,6 +777,37 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
carV2FilePath = order.LocalCARV2FilePath carV2FilePath = order.LocalCARV2FilePath
} }
// TODO We only support this currently for the IPFS Retrieval use case
// where users want to write out filecoin retrievals directly to IPFS.
// If users haven' configured the Ipfs retrieval flag, the blockstore we get here will be a "no-op" blockstore.
// write out the CARv2 file to the retrieval block-store (which is really an IPFS node behind the scenes).
rs, err := a.RetrievalStoreMgr.NewStore()
defer a.RetrievalStoreMgr.ReleaseStore(rs) //nolint:errcheck
if err != nil {
finish(xerrors.Errorf("Error setting up new store: %w", err))
return
}
if rs.IsIPFSRetrieval() {
// write out the CARv1 blocks of the CARv2 file to the IPFS blockstore.
carv2Reader, err := carv2.NewReaderMmap(carV2FilePath)
if err != nil {
finish(err)
return
}
defer carv2Reader.Close() //nolint:errcheck
if _, err := car.LoadCar(rs.Blockstore(), carv2Reader.CarV1Reader()); err != nil {
finish(err)
return
}
}
// If ref is nil, it only fetches the data into the configured blockstore.
if ref == nil {
finish(nil)
return
}
if ref.IsCAR { if ref.IsCAR {
// user wants a CAR file, transform the CARv2 to a CARv1 and write it out. // user wants a CAR file, transform the CARv2 to a CARv1 and write it out.
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644) f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
@ -806,13 +831,13 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return return
} }
rw, err := blockstore.OpenReadOnly(carV2FilePath) readOnly, err := blockstore.OpenReadOnly(carV2FilePath)
if err != nil { if err != nil {
finish(err) finish(err)
return return
} }
defer rw.Close() //nolint:errcheck defer readOnly.Close() //nolint:errcheck
bsvc := blockservice.New(rw, offline.Exchange(rw)) bsvc := blockservice.New(readOnly, offline.Exchange(readOnly))
dag := merkledag.NewDAGService(bsvc) dag := merkledag.NewDAGService(bsvc)
nd, err := dag.Get(ctx, order.Root) nd, err := dag.Get(ctx, order.Root)

View File

@ -73,7 +73,7 @@ func TestImportNormalFileToCARv2(t *testing.T) {
a := &API{ a := &API{
Imports: &importmgr.Mgr{}, Imports: &importmgr.Mgr{},
} }
importID := rand.Uint64() importID := importmgr.ImportID(rand.Uint64())
inputFilePath, inputContents := genNormalInputFile(t) inputFilePath, inputContents := genNormalInputFile(t)
defer os.Remove(inputFilePath) //nolint:errcheck defer os.Remove(inputFilePath) //nolint:errcheck
@ -148,6 +148,31 @@ func TestTransformCarv1ToCARv2(t *testing.T) {
require.Equal(t, bzin, bzout) require.Equal(t, bzin, bzout)
} }
func TestLoadCARv2ToBlockstore(t *testing.T) {
inputFilePath, _ := genNormalInputFile(t)
defer os.Remove(inputFilePath) //nolint:errcheck
carv1FilePath := genCARv1(t, inputFilePath)
defer os.Remove(carv1FilePath) //nolint:errcheck
outputCARv2 := genTmpFile(t)
defer os.Remove(outputCARv2) //nolint:errcheck
root, err := transformCarToCARv2(carv1FilePath, outputCARv2)
require.NoError(t, err)
require.NotEqual(t, cid.Undef, root)
bs := bstore.NewMemorySync()
carv2, err := carv2.NewReaderMmap(outputCARv2)
require.NoError(t, err)
defer carv2.Close() //nolint:errcheck
header, err := car.LoadCar(bs, carv2.CarV1Reader())
require.NoError(t, err)
require.EqualValues(t, root, header.Roots[0])
require.EqualValues(t, 1, header.Version)
}
func genCARv1(t *testing.T, normalFilePath string) string { func genCARv1(t *testing.T, normalFilePath string) string {
ctx := context.Background() ctx := context.Background()
bs := bstore.NewMemorySync() bs := bstore.NewMemorySync()

View File

@ -212,6 +212,8 @@ func RetrievalClient(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.
} }
// ClientBlockstoreRetrievalStoreManager is the default version of the RetrievalStoreManager that runs on multistore // ClientBlockstoreRetrievalStoreManager is the default version of the RetrievalStoreManager that runs on multistore
func ClientBlockstoreRetrievalStoreManager(bs dtypes.ClientBlockstore) dtypes.ClientRetrievalStoreManager { func ClientBlockstoreRetrievalStoreManager(isIpfsRetrieval bool) func(bs dtypes.ClientBlockstore) (dtypes.ClientRetrievalStoreManager, error) {
return retrievalstoremgr.NewBlockstoreRetrievalStoreManager(bs) return func(bs dtypes.ClientBlockstore) (dtypes.ClientRetrievalStoreManager, error) {
return retrievalstoremgr.NewBlockstoreRetrievalStoreManager(bs, isIpfsRetrieval), nil
}
} }

View File

@ -9,12 +9,15 @@ import (
"github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/shared"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore/query" "github.com/ipfs/go-datastore/query"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/namespace"
) )
var log = logging.Logger("importmgr")
type ImportID uint64 type ImportID uint64
type Mgr struct { type Mgr struct {
@ -135,6 +138,7 @@ func (m *Mgr) CARV2FilePathFor(dagRoot cid.Cid) (string, error) {
for _, importID := range importIDs { for _, importID := range importIDs {
info, err := m.Info(importID) info, err := m.Info(importID)
if err != nil { if err != nil {
log.Errorf("failed to fetch info, importID=%d: %s", importID, err)
continue continue
} }
if info.Labels[LRootCid] == "" { if info.Labels[LRootCid] == "" {
@ -142,6 +146,7 @@ func (m *Mgr) CARV2FilePathFor(dagRoot cid.Cid) (string, error) {
} }
c, err := cid.Parse(info.Labels[LRootCid]) c, err := cid.Parse(info.Labels[LRootCid])
if err != nil { if err != nil {
log.Errorf("failed to parse Root cid %s: %w", info.Labels[LRootCid], err)
continue continue
} }
if c.Equals(dagRoot) { if c.Equals(dagRoot) {

View File

@ -2,15 +2,12 @@ package retrievalstoremgr
import ( import (
"github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/blockstore"
"github.com/ipfs/go-blockservice"
offline "github.com/ipfs/go-ipfs-exchange-offline"
ipldformat "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
) )
// RetrievalStore references a store for a retrieval deal. // RetrievalStore references a store for a retrieval deal.
type RetrievalStore interface { type RetrievalStore interface {
DAGService() ipldformat.DAGService IsIPFSRetrieval() bool
Blockstore() blockstore.BasicBlockstore
} }
// RetrievalStoreManager manages stores for retrieval deals, abstracting // RetrievalStoreManager manages stores for retrieval deals, abstracting
@ -20,24 +17,27 @@ type RetrievalStoreManager interface {
ReleaseStore(RetrievalStore) error ReleaseStore(RetrievalStore) error
} }
// BlockstoreRetrievalStoreManager manages a single blockstore as if it were multiple stores // BlockstoreRetrievalStoreManager is a blockstore used for retrieval.
type BlockstoreRetrievalStoreManager struct { type BlockstoreRetrievalStoreManager struct {
bs blockstore.BasicBlockstore bs blockstore.BasicBlockstore
isIpfsRetrieval bool
} }
var _ RetrievalStoreManager = &BlockstoreRetrievalStoreManager{} var _ RetrievalStoreManager = &BlockstoreRetrievalStoreManager{}
// NewBlockstoreRetrievalStoreManager returns a new blockstore based RetrievalStoreManager // NewBlockstoreRetrievalStoreManager returns a new blockstore based RetrievalStoreManager
func NewBlockstoreRetrievalStoreManager(bs blockstore.BasicBlockstore) RetrievalStoreManager { func NewBlockstoreRetrievalStoreManager(bs blockstore.BasicBlockstore, isIpfsRetrieval bool) RetrievalStoreManager {
return &BlockstoreRetrievalStoreManager{ return &BlockstoreRetrievalStoreManager{
bs: bs, bs: bs,
isIpfsRetrieval: isIpfsRetrieval,
} }
} }
// NewStore creates a new store (just uses underlying blockstore) // NewStore creates a new store (just uses underlying blockstore)
func (brsm *BlockstoreRetrievalStoreManager) NewStore() (RetrievalStore, error) { func (brsm *BlockstoreRetrievalStoreManager) NewStore() (RetrievalStore, error) {
return &blockstoreRetrievalStore{ return &blockstoreRetrievalStore{
dagService: merkledag.NewDAGService(blockservice.New(brsm.bs, offline.Exchange(brsm.bs))), bs: brsm.bs,
isIpfsRetrieval: brsm.isIpfsRetrieval,
}, nil }, nil
} }
@ -47,9 +47,14 @@ func (brsm *BlockstoreRetrievalStoreManager) ReleaseStore(RetrievalStore) error
} }
type blockstoreRetrievalStore struct { type blockstoreRetrievalStore struct {
dagService ipldformat.DAGService bs blockstore.BasicBlockstore
isIpfsRetrieval bool
} }
func (brs *blockstoreRetrievalStore) DAGService() ipldformat.DAGService { func (brs *blockstoreRetrievalStore) Blockstore() blockstore.BasicBlockstore {
return brs.dagService return brs.bs
}
func (brs *blockstoreRetrievalStore) IsIPFSRetrieval() bool {
return brs.isIpfsRetrieval
} }