Wire up client import manager

This commit is contained in:
Łukasz Magiera 2020-07-07 10:52:19 +02:00
parent 8942967223
commit 47f0898ce9
10 changed files with 215 additions and 211 deletions

View File

@ -5,7 +5,6 @@ import (
"time"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-filestore"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/go-address"
@ -356,10 +355,12 @@ type MinerSectors struct {
}
type Import struct {
Status filestore.Status
Key cid.Cid
Key int64
Err error
Root *cid.Cid
Source string
FilePath string
Size uint64
}
type DealInfo struct {

View File

@ -204,7 +204,15 @@ var clientLocalCmd = &cli.Command{
}
for _, v := range list {
fmt.Printf("%s %s %s %s\n", encoder.Encode(v.Key), v.FilePath, types.SizeStr(types.NewInt(v.Size)), v.Status)
cidStr := "<nil>"
if v.Root != nil {
cidStr = encoder.Encode(*v.Root)
}
fmt.Printf("%d: %s @%s (%s)\n", v.Key, cidStr, v.FilePath, v.Source)
if v.Err != nil {
fmt.Printf("\terror: %s\n", v.Err)
}
}
return nil
},

View File

@ -435,8 +435,9 @@ func Repo(r repo.Repo) Option {
Override(new(dtypes.MetadataDS), modules.Datastore),
Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore),
Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr),
Override(new(dtypes.ClientMultiDstore), modules.ClientMultiDatastore),
Override(new(dtypes.ClientFilestore), modules.ClientFstore),
Override(new(dtypes.ClientBlockstore), modules.ClientBlockstore),
Override(new(ci.PrivKey), lp2p.PrivKey),

View File

@ -2,15 +2,8 @@ package client
import (
"context"
"errors"
"fmt"
"github.com/filecoin-project/go-fil-markets/pieceio"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/multiformats/go-multiaddr"
"io"
"os"
@ -18,7 +11,6 @@ import (
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-filestore"
chunker "github.com/ipfs/go-ipfs-chunker"
offline "github.com/ipfs/go-ipfs-exchange-offline"
files "github.com/ipfs/go-ipfs-files"
@ -28,10 +20,15 @@ import (
"github.com/ipfs/go-unixfs/importer/balanced"
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
"github.com/ipld/go-car"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
"go.uber.org/fx"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/pieceio"
rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/sector-storage/ffiwrapper"
@ -46,6 +43,7 @@ import (
"github.com/filecoin-project/lotus/markets/utils"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/impl/paych"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo/importmgr"
)
@ -64,7 +62,7 @@ type API struct {
Retrieval rm.RetrievalClient
Chain *store.ChainStore
Imports *importmgr.Mgr
Imports dtypes.ClientImportMgr
}
func calcDealExpiration(minDuration uint64, md *miner.DeadlineInfo, startEpoch abi.ChainEpoch) abi.ChainEpoch {
@ -75,6 +73,10 @@ func calcDealExpiration(minDuration uint64, md *miner.DeadlineInfo, startEpoch a
return minExp + miner.WPoStProvingPeriod - (minExp % miner.WPoStProvingPeriod) + (md.PeriodStart % miner.WPoStProvingPeriod) - 1
}
func (a *API) imgr() *importmgr.Mgr {
return a.Imports
}
func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) {
exist, err := a.WalletHas(ctx, params.Wallet)
if err != nil {
@ -193,7 +195,7 @@ func (a *API) ClientGetDealInfo(ctx context.Context, d cid.Cid) (*api.DealInfo,
func (a *API) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) {
// TODO: check if we have the ENTIRE dag
offExch := merkledag.NewDAGService(blockservice.New(a.Imports.Bs, offline.Exchange(a.Imports.Bs)))
offExch := merkledag.NewDAGService(blockservice.New(a.Imports.Blockstore, offline.Exchange(a.Imports.Blockstore)))
_, err := offExch.Get(ctx, root)
if err == ipld.ErrNotFound {
return false, nil
@ -258,11 +260,11 @@ func (a *API) makeRetrievalQuery(ctx context.Context, rp rm.RetrievalPeer, paylo
}
func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (cid.Cid, error) {
id, st, err := a.Imports.NewStore()
id, st, err := a.imgr().NewStore()
if err != nil {
return cid.Cid{}, err
}
if err := a.Imports.AddLabel(id, "source", "import"); err != nil {
if err := a.imgr().AddLabel(id, "source", "import"); err != nil {
return cid.Cid{}, err
}
@ -278,11 +280,11 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (cid.Cid, error
func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, error) {
file := files.NewReaderFile(f)
id, st, err := a.Imports.NewStore()
id, st, err := a.imgr().NewStore()
if err != nil {
return cid.Cid{}, err
}
if err := a.Imports.AddLabel(id, "source", "import-local"); err != nil {
if err := a.imgr().AddLabel(id, "source", "import-local"); err != nil {
return cid.Cid{}, err
}
@ -308,49 +310,38 @@ func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, erro
}
func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) {
if a.Filestore == nil {
return nil, errors.New("listing imports is not supported with in-memory dag yet")
}
next, err := filestore.ListAll(a.Filestore, false)
importIDs := a.imgr().List()
out := make([]api.Import, len(importIDs))
for i, id := range importIDs {
info, err := a.imgr().Info(id)
if err != nil {
return nil, err
}
// TODO: make this less very bad by tracking root cids instead of using ListAll
out := make([]api.Import, 0)
lowest := make([]uint64, 0)
for {
r := next()
if r == nil {
return out, nil
}
matched := false
for i := range out {
if out[i].FilePath == r.FilePath {
matched = true
if lowest[i] > r.Offset {
lowest[i] = r.Offset
out[i] = api.Import{
Status: r.Status,
Key: r.Key,
FilePath: r.FilePath,
Size: r.Size,
Key: id,
Err: xerrors.Errorf("getting info: %w", err),
}
continue
}
ai := api.Import{
Key: id,
Source: info.Labels[importmgr.LSource],
FilePath: info.Labels[importmgr.LFileName],
}
if info.Labels[importmgr.LRootCid] != "" {
c, err := cid.Parse(info.Labels[importmgr.LRootCid])
if err != nil {
ai.Err = err
} else {
ai.Root = &c
}
}
break
}
}
if !matched {
out = append(out, api.Import{
Status: r.Status,
Key: r.Key,
FilePath: r.FilePath,
Size: r.Size,
})
lowest = append(lowest, r.Offset)
}
out[i] = ai
}
return out, nil
}
func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error {
@ -367,11 +358,11 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return xerrors.Errorf("cannot make retrieval deal for zero bytes")
}
id, st, err := a.Imports.NewStore()
id, st, err := a.imgr().NewStore()
if err != nil {
return err
}
if err := a.Imports.AddLabel(id, "source", "retrieval"); err != nil {
if err := a.imgr().AddLabel(id, "source", "retrieval"); err != nil {
return err
}
@ -505,11 +496,11 @@ func (a *API) ClientCalcCommP(ctx context.Context, inpath string, miner address.
}
func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath string) error {
id, st, err := a.Imports.NewStore()
id, st, err := a.imgr().NewStore()
if err != nil {
return err
}
if err := a.Imports.AddLabel(id, "source", "gen-car"); err != nil {
if err := a.imgr().AddLabel(id, "source", "gen-car"); err != nil {
return err
}

View File

@ -2,12 +2,9 @@ package modules
import (
"context"
"path/filepath"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-merkledag"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/routing"
"go.uber.org/fx"
graphsyncimpl "github.com/filecoin-project/go-data-transfer/impl/graphsync"
@ -21,18 +18,13 @@ import (
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/go-storedcounter"
"github.com/ipfs/go-bitswap"
"github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-filestore"
"github.com/filecoin-project/lotus/markets/retrievaladapter"
"github.com/filecoin-project/lotus/node/impl/full"
payapi "github.com/filecoin-project/lotus/node/impl/paych"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/node/repo/importmgr"
"github.com/filecoin-project/lotus/paychmgr"
@ -53,28 +45,12 @@ func ClientMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.ClientMult
return mds, nil
}
func ClientFstore(r repo.LockedRepo) (dtypes.ClientFilestore, error) {
clientds, err := r.Datastore("/client")
if err != nil {
return nil, err
}
blocks := namespace.Wrap(clientds, datastore.NewKey("blocks"))
absPath, err := filepath.Abs(r.Path())
if err != nil {
return nil, err
}
fm := filestore.NewFileManager(clientds, filepath.Dir(absPath))
fm.AllowFiles = true
// TODO: fm.AllowUrls (needs more code in client import)
bs := blockstore.NewBlockstore(blocks)
return filestore.NewFilestore(bs, fm), nil
func ClientImportMgr(mds dtypes.ClientMultiDstore, ds dtypes.MetadataDS) dtypes.ClientImportMgr {
return importmgr.New(mds, namespace.Wrap(ds, datastore.NewKey("/client")))
}
func ClientBlockstore(fstore dtypes.ClientFilestore) dtypes.ClientBlockstore {
return blockstore.NewIdStore((*filestore.Filestore)(fstore))
func ClientBlockstore(imgr dtypes.ClientImportMgr) dtypes.ClientBlockstore {
return blockstore.NewIdStore(imgr.Blockstore)
}
// RegisterClientValidator is an initialization hook that registers the client
@ -103,24 +79,6 @@ func NewClientDatastore(ds dtypes.MetadataDS) dtypes.ClientDatastore {
return namespace.Wrap(ds, datastore.NewKey("/deals/client"))
}
// ClientDAG is a DAGService for the ClientBlockstore
func ClientDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.ClientBlockstore, rt routing.Routing, h host.Host) dtypes.ClientDAG {
bitswapNetwork := network.NewFromIpfsHost(h, rt)
bitswapOptions := []bitswap.Option{bitswap.ProvideEnabled(false)}
exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, ibs, bitswapOptions...)
bsvc := blockservice.New(ibs, exch)
dag := merkledag.NewDAGService(bsvc)
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return bsvc.Close()
},
})
return dag
}
func NewClientRequestValidator(deals dtypes.ClientDealStore) dtypes.ClientRequestValidator {
return requestvalidation.NewUnifiedRequestValidator(nil, deals)
}

View File

@ -3,7 +3,6 @@ package dtypes
import (
bserv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-filestore"
"github.com/ipfs/go-graphsync"
blockstore "github.com/ipfs/go-ipfs-blockstore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
@ -29,7 +28,7 @@ type ChainExchange exchange.Interface
type ChainBlockService bserv.BlockService
type ClientMultiDstore *importmgr.MultiStore
type ClientFilestore *filestore.Filestore
type ClientImportMgr *importmgr.Mgr
type ClientBlockstore blockstore.Blockstore
type ClientDealStore *statestore.StateStore
type ClientRequestValidator *requestvalidation.UnifiedRequestValidator

View File

@ -4,7 +4,6 @@ import (
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/ipfs/go-filestore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/multiformats/go-multiaddr"
@ -18,8 +17,8 @@ import (
// If ipfsMaddr is empty, a local IPFS node is assumed considering IPFS_PATH configuration.
// If ipfsMaddr is not empty, it will connect to the remote IPFS node with the provided multiaddress.
// The flag useForRetrieval indicates if the IPFS node will also be used for storing retrieving deals.
func IpfsClientBlockstore(ipfsMaddr string, useForRetrieval bool) func(helpers.MetricsCtx, fx.Lifecycle, dtypes.ClientFilestore) (dtypes.ClientBlockstore, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fstore dtypes.ClientFilestore) (dtypes.ClientBlockstore, error) {
func IpfsClientBlockstore(ipfsMaddr string, useForRetrieval bool) func(helpers.MetricsCtx, fx.Lifecycle, dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, localStore dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) {
var err error
var ipfsbs *ipfsbstore.IpfsBstore
if ipfsMaddr != "" {
@ -38,7 +37,7 @@ func IpfsClientBlockstore(ipfsMaddr string, useForRetrieval bool) func(helpers.M
var ws blockstore.Blockstore
ws = ipfsbs
if !useForRetrieval {
ws = blockstore.NewIdStore((*filestore.Filestore)(fstore))
ws = blockstore.NewIdStore(localStore.Blockstore)
}
return bufbstore.NewTieredBstore(ipfsbs, ws), nil
}

View File

@ -1 +1,96 @@
package importmgr
import (
"context"
"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)
type multiReadBs struct {
// TODO: some caching
mds *MultiStore
}
func (m *multiReadBs) Has(cid cid.Cid) (bool, error) {
m.mds.lk.RLock()
defer m.mds.lk.RUnlock()
var merr error
for i, store := range m.mds.open {
has, err := store.Bstore.Has(cid)
if err != nil {
merr = multierror.Append(merr, xerrors.Errorf("has (ds %d): %w", i, err))
continue
}
if !has {
continue
}
return true, nil
}
return false, merr
}
func (m *multiReadBs) Get(cid cid.Cid) (blocks.Block, error) {
m.mds.lk.RLock()
defer m.mds.lk.RUnlock()
var merr error
for i, store := range m.mds.open {
has, err := store.Bstore.Has(cid)
if err != nil {
merr = multierror.Append(merr, xerrors.Errorf("has (ds %d): %w", i, err))
continue
}
if !has {
continue
}
val, err := store.Bstore.Get(cid)
if err != nil {
merr = multierror.Append(merr, xerrors.Errorf("get (ds %d): %w", i, err))
continue
}
return val, nil
}
if merr == nil {
return nil, datastore.ErrNotFound
}
return nil, merr
}
func (m *multiReadBs) DeleteBlock(cid cid.Cid) error {
return xerrors.Errorf("operation not supported")
}
func (m *multiReadBs) GetSize(cid cid.Cid) (int, error) {
return 0, xerrors.Errorf("operation not supported")
}
func (m *multiReadBs) Put(block blocks.Block) error {
return xerrors.Errorf("operation not supported")
}
func (m *multiReadBs) PutMany(blocks []blocks.Block) error {
return xerrors.Errorf("operation not supported")
}
func (m *multiReadBs) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, xerrors.Errorf("operation not supported")
}
func (m *multiReadBs) HashOnRead(enabled bool) {
return
}
var _ blockstore.Blockstore = &multiReadBs{}

View File

@ -3,22 +3,33 @@ package importmgr
import (
"encoding/json"
"fmt"
"golang.org/x/xerrors"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"golang.org/x/xerrors"
)
type Mgr struct {
mds *MultiStore
Bs blockstore.Blockstore
ds datastore.Batching
Blockstore blockstore.Blockstore
}
type Label string
const (
LSource = "source" // Function which created the import
LRootCid = "root" // Root CID
LFileName = "filename" // Local file path
LMTime = "mtime" // File modification timestamp
)
func New(mds *MultiStore, ds datastore.Batching) *Mgr {
return &Mgr{
mds: mds,
bs: &multiReadBs{
Blockstore: &multiReadBs{
mds: mds,
},
@ -26,7 +37,7 @@ func New(mds *MultiStore, ds datastore.Batching) *Mgr {
}
}
type storeMeta struct {
type StoreMeta struct {
Labels map[string]string
}
@ -37,7 +48,7 @@ func (m *Mgr) NewStore() (int64, *Store, error) {
return 0, nil, err
}
meta, err := json.Marshal(&storeMeta{Labels: map[string]string{
meta, err := json.Marshal(&StoreMeta{Labels: map[string]string{
"source": "unknown",
}})
if err != nil {
@ -54,14 +65,14 @@ func (m *Mgr) AddLabel(id int64, key, value string) error { // source, file path
return xerrors.Errorf("getting metadata form datastore: %w", err)
}
var sm storeMeta
var sm StoreMeta
if err := json.Unmarshal(meta, &sm); err != nil {
return xerrors.Errorf("unmarshaling store meta: %w", err)
}
sm.Labels[key] = value
meta, err = json.Marshal(&storeMeta{})
meta, err = json.Marshal(&StoreMeta{})
if err != nil {
return xerrors.Errorf("marshaling store meta: %w", err)
}
@ -69,6 +80,23 @@ func (m *Mgr) AddLabel(id int64, key, value string) error { // source, file path
return m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta)
}
// m.List
func (m *Mgr) List() []int64 {
return m.mds.List()
}
func (m *Mgr) Info(id int64) (*StoreMeta, error) {
meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id)))
if err != nil {
return nil, xerrors.Errorf("getting metadata form datastore: %w", err)
}
var sm StoreMeta
if err := json.Unmarshal(meta, &sm); err != nil {
return nil, xerrors.Errorf("unmarshaling store meta: %w", err)
}
return &sm, nil
}
// m.Info
// m.Delete

View File

@ -1,16 +1,12 @@
package importmgr
import (
"context"
"fmt"
"path"
"sync"
"sync/atomic"
"github.com/hashicorp/go-multierror"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"golang.org/x/xerrors"
"github.com/ipfs/go-datastore"
@ -83,6 +79,17 @@ func (mds *MultiStore) Get(i int64) (*Store, error) {
return mds.open[i], err
}
func (mds *MultiStore) List() []int64 {
mds.lk.RLock()
defer mds.lk.RUnlock()
out := make([]int64, 0, len(mds.open))
for i := range mds.open {
out = append(out, i)
}
return out
}
func (mds *MultiStore) Delete(i int64) error {
mds.lk.Lock()
defer mds.lk.Unlock()
@ -113,86 +120,3 @@ func (mds *MultiStore) Close() error {
return err
}
type multiReadBs struct {
// TODO: some caching
mds *MultiStore
}
func (m *multiReadBs) Has(cid cid.Cid) (bool, error) {
m.mds.lk.RLock()
defer m.mds.lk.RUnlock()
var merr error
for i, store := range m.mds.open {
has, err := store.Bstore.Has(cid)
if err != nil {
merr = multierror.Append(merr, xerrors.Errorf("has (ds %d): %w", i, err))
continue
}
if !has {
continue
}
return true, nil
}
return false, merr
}
func (m *multiReadBs) Get(cid cid.Cid) (blocks.Block, error) {
m.mds.lk.RLock()
defer m.mds.lk.RUnlock()
var merr error
for i, store := range m.mds.open {
has, err := store.Bstore.Has(cid)
if err != nil {
merr = multierror.Append(merr, xerrors.Errorf("has (ds %d): %w", i, err))
continue
}
if !has {
continue
}
val, err := store.Bstore.Get(cid)
if err != nil {
merr = multierror.Append(merr, xerrors.Errorf("get (ds %d): %w", i, err))
continue
}
return val, nil
}
if merr == nil {
return nil, datastore.ErrNotFound
}
return nil, merr
}
func (m *multiReadBs) DeleteBlock(cid cid.Cid) error {
return xerrors.Errorf("operation not supported")
}
func (m *multiReadBs) GetSize(cid cid.Cid) (int, error) {
return 0, xerrors.Errorf("operation not supported")
}
func (m *multiReadBs) Put(block blocks.Block) error {
return xerrors.Errorf("operation not supported")
}
func (m *multiReadBs) PutMany(blocks []blocks.Block) error {
return xerrors.Errorf("operation not supported")
}
func (m *multiReadBs) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, xerrors.Errorf("operation not supported")
}
func (m *multiReadBs) HashOnRead(enabled bool) {
return
}
var _ blockstore.Blockstore = &multiReadBs{}