removed combined blockstore

This commit is contained in:
aarshkshah1992 2021-07-03 14:01:32 +05:30
parent 40b30d1fb1
commit 303d5d6ff6
2 changed files with 69 additions and 46 deletions

View File

@ -82,9 +82,8 @@ type API struct {
Imports dtypes.ClientImportMgr
CombinedBstore dtypes.ClientBlockstore // TODO: try to remove
DataTransfer dtypes.ClientDataTransfer
Host host.Host
DataTransfer dtypes.ClientDataTransfer
Host host.Host
// TODO How do we inject the Repo Path here ?
}
@ -125,28 +124,14 @@ func (a *API) dealStarter(ctx context.Context, params *api.StartDealParams, isSt
return nil, xerrors.New("stateless storage deals can only be initiated with storage price of 0")
}
} else if params.Data.TransferType == storagemarket.TTGraphsync {
importIDs, err := a.imgr().List()
c, err := a.imgr().CARV2FilePathFor(params.Data.Root)
if err != nil {
return nil, xerrors.Errorf("failed to fetch import IDs: %w", err)
return nil, xerrors.Errorf("failed to find CARv2 file path: %w", err)
}
for _, importID := range importIDs {
info, err := a.imgr().Info(importID)
if err != nil {
continue
}
if info.Labels[importmgr.LRootCid] == "" {
continue
}
c, err := cid.Parse(info.Labels[importmgr.LRootCid])
if err != nil {
continue
}
if c.Equals(params.Data.Root) {
CARV2FilePath = info.Labels[importmgr.LCARv2FilePath]
break
}
if c == "" {
return nil, xerrors.New("no CARv2 file path for deal")
}
CARV2FilePath = c
}
walletKey, err := a.StateAccountKey(ctx, params.Wallet, types.EmptyTSK)
@ -413,29 +398,14 @@ func (a *API) newDealInfoWithTransfer(transferCh *api.DataTransferChannel, v sto
func (a *API) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) {
// TODO: check if we have the ENTIRE dag
importIDs, err := a.imgr().List()
carv2Path, err := a.imgr().CARV2FilePathFor(root)
if err != nil {
return false, xerrors.Errorf("failed to list imports: %w", err)
return false, err
}
for _, importID := range importIDs {
info, err := a.imgr().Info(importID)
if err != nil {
continue
}
if info.Labels[importmgr.LRootCid] == "" {
continue
}
c, err := cid.Parse(info.Labels[importmgr.LRootCid])
if err != nil {
continue
}
if c.Equals(root) {
return true, nil
}
if len(carv2Path) == 0 {
return false, nil
}
return false, nil
return true, nil
}
func (a *API) ClientFindData(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) {
@ -1006,11 +976,24 @@ func (w *lenWriter) Write(p []byte) (n int, err error) {
}
func (a *API) ClientDealSize(ctx context.Context, root cid.Cid) (api.DataSize, error) {
dag := merkledag.NewDAGService(blockservice.New(a.CombinedBstore, offline.Exchange(a.CombinedBstore)))
carv2FilePath, err := a.imgr().CARV2FilePathFor(root)
if err != nil {
return api.DataSize{}, xerrors.Errorf("failed to find CARv2 file for root: %w", err)
}
if len(carv2FilePath) == 0 {
return api.DataSize{}, xerrors.New("no CARv2 file for root")
}
rdOnly, err := blockstore.OpenReadOnly(carv2FilePath, false)
if err != nil {
return api.DataSize{}, xerrors.Errorf("failed to open read only blockstore: %w", err)
}
defer rdOnly.Close() //nolint:errcheck
dag := merkledag.NewDAGService(blockservice.New(rdOnly, offline.Exchange(rdOnly)))
w := lenWriter(0)
err := car.WriteCar(ctx, dag, []cid.Cid{root}, &w)
err = car.WriteCar(ctx, dag, []cid.Cid{root}, &w)
if err != nil {
return api.DataSize{}, err
}
@ -1024,12 +1007,25 @@ func (a *API) ClientDealSize(ctx context.Context, root cid.Cid) (api.DataSize, e
}
func (a *API) ClientDealPieceCID(ctx context.Context, root cid.Cid) (api.DataCIDSize, error) {
dag := merkledag.NewDAGService(blockservice.New(a.CombinedBstore, offline.Exchange(a.CombinedBstore)))
carv2FilePath, err := a.imgr().CARV2FilePathFor(root)
if err != nil {
return api.DataCIDSize{}, xerrors.Errorf("failed to find CARv2 file for root: %w", err)
}
if len(carv2FilePath) == 0 {
return api.DataCIDSize{}, xerrors.New("no CARv2 file for root")
}
rdOnly, err := blockstore.OpenReadOnly(carv2FilePath, false)
if err != nil {
return api.DataCIDSize{}, xerrors.Errorf("failed to open read only blockstore: %w", err)
}
defer rdOnly.Close() //nolint:errcheck
dag := merkledag.NewDAGService(blockservice.New(rdOnly, offline.Exchange(rdOnly)))
w := &writer.Writer{}
bw := bufio.NewWriterSize(w, int(writer.CommPBuf))
err := car.WriteCar(ctx, dag, []cid.Cid{root}, w)
err = car.WriteCar(ctx, dag, []cid.Cid{root}, w)
if err != nil {
return api.DataCIDSize{}, err
}

View File

@ -7,6 +7,7 @@ import (
"strconv"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore/query"
"golang.org/x/xerrors"
@ -123,6 +124,32 @@ func (m *Mgr) Remove(id uint64) error {
return nil
}
func (m *Mgr) CARV2FilePathFor(dagRoot cid.Cid) (string, error) {
importIDs, err := m.List()
if err != nil {
return "", xerrors.Errorf("failed to fetch import IDs: %w", err)
}
for _, importID := range importIDs {
info, err := m.Info(importID)
if err != nil {
continue
}
if info.Labels[LRootCid] == "" {
continue
}
c, err := cid.Parse(info.Labels[LRootCid])
if err != nil {
continue
}
if c.Equals(dagRoot) {
return info.Labels[LCARv2FilePath], nil
}
}
return "", nil
}
func (m *Mgr) NewTempFile(id uint64) (string, error) {
file, err := ioutil.TempFile(m.repoPath, fmt.Sprintf("%d", id))
if err != nil {