From 92e4507cf7ad9c16a4535b61183445bd903acfac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 6 Jul 2020 22:03:37 +0200 Subject: [PATCH 01/12] fsrepo: multi-datastores --- node/repo/fsrepo.go | 14 ++-- node/repo/fsrepo_ds.go | 170 +++++++++++++++++++++++++++++++++++++---- node/repo/interface.go | 1 + node/repo/memrepo.go | 5 ++ 4 files changed, 172 insertions(+), 18 deletions(-) diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index b223731d9..aba088e54 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -225,9 +225,11 @@ type fsLockedRepo struct { repoType RepoType closer io.Closer - ds datastore.Batching - dsErr error - dsOnce sync.Once + ds map[string]datastore.Batching + multiDs map[string]map[int64]datastore.Batching + dsErr error + dsOnce sync.Once + dsLk sync.Mutex storageLk sync.Mutex configLk sync.Mutex @@ -244,8 +246,10 @@ func (fsr *fsLockedRepo) Close() error { return xerrors.Errorf("could not remove API file: %w", err) } if fsr.ds != nil { - if err := fsr.ds.Close(); err != nil { - return xerrors.Errorf("could not close datastore: %w", err) + for _, ds := range fsr.ds { + if err := ds.Close(); err != nil { + return xerrors.Errorf("could not close datastore: %w", err) + } } } diff --git a/node/repo/fsrepo_ds.go b/node/repo/fsrepo_ds.go index 034635c4f..cadb85d45 100644 --- a/node/repo/fsrepo_ds.go +++ b/node/repo/fsrepo_ds.go @@ -1,12 +1,13 @@ package repo import ( + "fmt" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/mount" - "github.com/ipfs/go-datastore/namespace" "golang.org/x/xerrors" + "io/ioutil" "os" "path/filepath" + "strconv" badger "github.com/ipfs/go-ds-badger2" levelds "github.com/ipfs/go-ds-leveldb" @@ -14,13 +15,18 @@ import ( ldbopts "github.com/syndtr/goleveldb/leveldb/opt" ) -var fsDatastores = map[string]func(path string) (datastore.Batching, error){ +type dsCtor func(path string) (datastore.Batching, error) + +var fsDatastores = map[string]dsCtor{ "chain": badgerDs, "metadata": levelDs, // Those need to be fast for large writes... but also need a really good GC :c "staging": badgerDs, // miner specific - "client": badgerDs, // client specific +} + +var fsMultiDatastores = map[string]dsCtor{ + "client": badgerDs, // client specific } func badgerDs(path string) (datastore.Batching, error) { @@ -36,12 +42,12 @@ func levelDs(path string) (datastore.Batching, error) { }) } -func (fsr *fsLockedRepo) openDatastore() (datastore.Batching, error) { +func (fsr *fsLockedRepo) openDatastores() (map[string]datastore.Batching, error) { if err := os.MkdirAll(fsr.join(fsDatastore), 0755); err != nil { return nil, xerrors.Errorf("mkdir %s: %w", fsr.join(fsDatastore), err) } - var mounts []mount.Mount + out := map[string]datastore.Batching{} for p, ctor := range fsDatastores { prefix := datastore.NewKey(p) @@ -54,21 +60,159 @@ func (fsr *fsLockedRepo) openDatastore() (datastore.Batching, error) { ds = measure.New("fsrepo."+p, ds) - mounts = append(mounts, mount.Mount{ - Prefix: prefix, - Datastore: ds, - }) + out[datastore.NewKey(p).String()] = ds } - return mount.New(mounts), nil + return out, nil +} + +func (fsr *fsLockedRepo) openMultiDatastores() (map[string]map[int64]datastore.Batching, error) { + out := map[string]map[int64]datastore.Batching{} + + for p, ctor := range fsMultiDatastores { + path := fsr.join(filepath.Join(fsDatastore, p)) + if err := os.MkdirAll(path, 0755); err != nil { + return nil, xerrors.Errorf("mkdir %s: %w", path, err) + } + + di, err := ioutil.ReadDir(path) + if err != nil { + return nil, xerrors.Errorf("readdir '%s': %w", path, err) + } + + out[p] = map[int64]datastore.Batching{} + + for _, info := range di { + path = filepath.Join(path, info.Name()) + + prefix := datastore.NewKey(p) + + id, err := strconv.ParseInt(info.Name(), 10, 64) + if err != nil { + log.Errorf("error parsing multi-datastore id for '%s': %w", path, err) + continue + } + + // TODO: optimization: don't init datastores we don't need + ds, err := ctor(path) + if err != nil { + return nil, xerrors.Errorf("opening datastore %s: %w", prefix, err) + } + + ds = measure.New("fsrepo."+p+"."+info.Name(), ds) + + out[p][id] = ds + } + } + + return out, nil +} + +func (fsr *fsLockedRepo) openMultiDatastore(ns string, idx int64) (datastore.Batching, error) { + ctor, ok := fsMultiDatastores[ns] + if !ok { + return nil, xerrors.Errorf("no multi-datastore with namespace '%s'", ns) + } + + si := fmt.Sprintf("%d", idx) + path := fsr.join(filepath.Join(fsDatastore, ns, si)) + + ds, err := ctor(path) + if err != nil { + return nil, xerrors.Errorf("opening datastore %s: %w", path, err) + } + + ds = measure.New("fsrepo."+ns+"."+si, ds) + + return ds, nil } func (fsr *fsLockedRepo) Datastore(ns string) (datastore.Batching, error) { fsr.dsOnce.Do(func() { - fsr.ds, fsr.dsErr = fsr.openDatastore() + var err error + fsr.ds, err = fsr.openDatastores() + if err != nil { + fsr.dsErr = err + return + } + + fsr.multiDs, fsr.dsErr = fsr.openMultiDatastores() }) if fsr.dsErr != nil { return nil, fsr.dsErr } - return namespace.Wrap(fsr.ds, datastore.NewKey(ns)), nil + ds, ok := fsr.ds[ns] + if ok { + return ds, nil + } + + k := datastore.NewKey(ns) + parts := k.List() + if len(parts) != 2 { + return nil, xerrors.Errorf("expected multi-datastore namespace to have 2 parts") + } + + fsr.dsLk.Lock() + defer fsr.dsLk.Unlock() + + mds, ok := fsr.multiDs[parts[0]] + if !ok { + return nil, xerrors.Errorf("no multi-datastore with namespace %s", ns) + } + + idx, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return nil, xerrors.Errorf("parsing mult-datastore index('%s'): %w", parts[1], err) + } + + ds, ok = mds[idx] + if !ok { + ds, err = fsr.openMultiDatastore(parts[0], idx) + if err != nil { + return nil, xerrors.Errorf("opening multi-datastore: %w", err) + } + + mds[idx] = ds + } + + return ds, nil } + +func (fsr *fsLockedRepo) DeleteDatastore(ns string) error { + k := datastore.NewKey(ns) + parts := k.List() + if len(parts) != 2 { + return xerrors.Errorf("expected multi-datastore namespace to have 2 parts") + } + + mds, ok := fsr.multiDs[parts[0]] + if !ok { + return xerrors.Errorf("no multi-datastore with namespace %s", ns) + } + + idx, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return xerrors.Errorf("parsing mult-datastore index('%s'): %w", parts[1], err) + } + + fsr.dsLk.Lock() + defer fsr.dsLk.Unlock() + + ds, ok := mds[idx] + if !ok { + return xerrors.Errorf("no multi-datastore with at index (namespace %s)", ns) + } + + if err := ds.Close(); err != nil { + return xerrors.Errorf("closing datastore: %w", err) + } + + path := fsr.join(filepath.Join(fsDatastore, parts[0], parts[1])) + + log.Warnw("removing sub-datastore", "path", path, "namespace", ns) + if err := os.RemoveAll(path); err != nil { + return xerrors.Errorf("remove '%s': %w", path, err) + } + + return nil +} \ No newline at end of file diff --git a/node/repo/interface.go b/node/repo/interface.go index 5950f813f..8699ae20a 100644 --- a/node/repo/interface.go +++ b/node/repo/interface.go @@ -35,6 +35,7 @@ type LockedRepo interface { // Returns datastore defined in this repo. Datastore(namespace string) (datastore.Batching, error) + DeleteDatastore(namespace string) error // Returns config in this repo Config() (interface{}, error) diff --git a/node/repo/memrepo.go b/node/repo/memrepo.go index 399b239c1..e99342c55 100644 --- a/node/repo/memrepo.go +++ b/node/repo/memrepo.go @@ -233,6 +233,11 @@ func (lmem *lockedMemRepo) Datastore(ns string) (datastore.Batching, error) { return namespace.Wrap(lmem.mem.datastore, datastore.NewKey(ns)), nil } +func (lmem *lockedMemRepo) DeleteDatastore(ns string) error { + /** poof **/ + return nil +} + func (lmem *lockedMemRepo) Config() (interface{}, error) { if err := lmem.checkToken(); err != nil { return nil, err From 89429672239aedcf1b75f32c3080f6e531044954 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 7 Jul 2020 01:39:30 +0200 Subject: [PATCH 02/12] Client Import manager --- node/builder.go | 3 +- node/impl/client/client.go | 84 +++++++++---- node/modules/client.go | 16 +++ node/modules/dtypes/storage.go | 6 +- node/modules/testing/storage.go | 39 ------ node/repo/fsrepo_ds.go | 23 ++++ node/repo/importmgr/mbstore.go | 1 + node/repo/importmgr/mgr.go | 74 +++++++++++ node/repo/importmgr/multistore.go | 198 ++++++++++++++++++++++++++++++ node/repo/importmgr/store.go | 54 ++++++++ node/repo/interface.go | 1 + node/repo/memrepo.go | 4 + 12 files changed, 433 insertions(+), 70 deletions(-) delete mode 100644 node/modules/testing/storage.go create mode 100644 node/repo/importmgr/mbstore.go create mode 100644 node/repo/importmgr/mgr.go create mode 100644 node/repo/importmgr/multistore.go create mode 100644 node/repo/importmgr/store.go diff --git a/node/builder.go b/node/builder.go index 2ffe88921..e1c82f626 100644 --- a/node/builder.go +++ b/node/builder.go @@ -232,7 +232,6 @@ func Online() Option { Override(new(dtypes.ChainGCBlockstore), modules.ChainGCBlockstore), Override(new(dtypes.ChainExchange), modules.ChainExchange), Override(new(dtypes.ChainBlockService), modules.ChainBlockservice), - Override(new(dtypes.ClientDAG), testing.MemoryClientDag), // Filecoin services Override(new(*chain.Syncer), modules.NewSyncer), @@ -436,9 +435,9 @@ func Repo(r repo.Repo) Option { Override(new(dtypes.MetadataDS), modules.Datastore), Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore), + Override(new(dtypes.ClientMultiDstore), modules.ClientMultiDatastore), Override(new(dtypes.ClientFilestore), modules.ClientFstore), Override(new(dtypes.ClientBlockstore), modules.ClientBlockstore), - Override(new(dtypes.ClientDAG), modules.ClientDAG), Override(new(ci.PrivKey), lp2p.PrivKey), Override(new(ci.PubKey), ci.PrivKey.GetPublic), diff --git a/node/impl/client/client.go b/node/impl/client/client.go index ca3dda5e4..07cd4f5c0 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -46,7 +46,7 @@ import ( "github.com/filecoin-project/lotus/markets/utils" "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/impl/paych" - "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/repo/importmgr" ) const dealStartBuffer abi.ChainEpoch = 10000 // TODO: allow setting @@ -64,9 +64,7 @@ type API struct { Retrieval rm.RetrievalClient Chain *store.ChainStore - LocalDAG dtypes.ClientDAG - Blockstore dtypes.ClientBlockstore - Filestore dtypes.ClientFilestore `optional:"true"` + Imports *importmgr.Mgr } func calcDealExpiration(minDuration uint64, md *miner.DeadlineInfo, startEpoch abi.ChainEpoch) abi.ChainEpoch { @@ -195,7 +193,7 @@ func (a *API) ClientGetDealInfo(ctx context.Context, d cid.Cid) (*api.DealInfo, func (a *API) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) { // TODO: check if we have the ENTIRE dag - offExch := merkledag.NewDAGService(blockservice.New(a.Blockstore, offline.Exchange(a.Blockstore))) + offExch := merkledag.NewDAGService(blockservice.New(a.Imports.Bs, offline.Exchange(a.Imports.Bs))) _, err := offExch.Get(ctx, root) if err == ipld.ErrNotFound { return false, nil @@ -260,9 +258,15 @@ func (a *API) makeRetrievalQuery(ctx context.Context, rp rm.RetrievalPeer, paylo } func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (cid.Cid, error) { + id, st, err := a.Imports.NewStore() + if err != nil { + return cid.Cid{}, err + } + if err := a.Imports.AddLabel(id, "source", "import"); err != nil { + return cid.Cid{}, err + } - bufferedDS := ipld.NewBufferedDAG(ctx, a.LocalDAG) - nd, err := a.clientImport(ref, bufferedDS) + nd, err := a.clientImport(ctx, ref, st) if err != nil { return cid.Undef, err @@ -274,12 +278,20 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (cid.Cid, error func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, error) { file := files.NewReaderFile(f) - bufferedDS := ipld.NewBufferedDAG(ctx, a.LocalDAG) + id, st, err := a.Imports.NewStore() + if err != nil { + return cid.Cid{}, err + } + if err := a.Imports.AddLabel(id, "source", "import-local"); err != nil { + return cid.Cid{}, err + } + + bufferedDS := ipld.NewBufferedDAG(ctx, st.DAG) params := ihelper.DagBuilderParams{ Maxlinks: build.UnixfsLinksPerLevel, RawLeaves: true, - CidBuilder: nil, + CidBuilder: cid.V1Builder{}, Dagserv: bufferedDS, } @@ -348,13 +360,21 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref return err } - order.MinerPeerID = peer.ID(mi.PeerId) + order.MinerPeerID = mi.PeerId } if order.Size == 0 { return xerrors.Errorf("cannot make retrieval deal for zero bytes") } + id, st, err := a.Imports.NewStore() + if err != nil { + return err + } + if err := a.Imports.AddLabel(id, "source", "retrieval"); err != nil { + return err + } + retrievalResult := make(chan error, 1) unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) { @@ -392,14 +412,14 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref ppb := types.BigDiv(order.Total, types.NewInt(order.Size)) - _, err := a.Retrieval.Retrieve( + _, err = a.Retrieval.Retrieve( ctx, order.Root, rm.NewParamsV0(ppb, order.PaymentInterval, order.PaymentIntervalIncrease), order.Total, order.MinerPeerID, order.Client, - order.Miner) + order.Miner) // TODO: pass the store here somehow if err != nil { return xerrors.Errorf("Retrieve failed: %w", err) } @@ -424,18 +444,18 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref if err != nil { return err } - err = car.WriteCar(ctx, a.LocalDAG, []cid.Cid{order.Root}, f) + err = car.WriteCar(ctx, st.DAG, []cid.Cid{order.Root}, f) if err != nil { return err } return f.Close() } - nd, err := a.LocalDAG.Get(ctx, order.Root) + nd, err := st.DAG.Get(ctx, order.Root) if err != nil { return xerrors.Errorf("ClientRetrieve: %w", err) } - file, err := unixfile.NewUnixfsFile(ctx, a.LocalDAG, nd) + file, err := unixfile.NewUnixfsFile(ctx, st.DAG, nd) if err != nil { return xerrors.Errorf("ClientRetrieve: %w", err) } @@ -485,14 +505,22 @@ func (a *API) ClientCalcCommP(ctx context.Context, inpath string, miner address. } func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath string) error { + id, st, err := a.Imports.NewStore() + if err != nil { + return err + } + if err := a.Imports.AddLabel(id, "source", "gen-car"); err != nil { + return err + } - bufferedDS := ipld.NewBufferedDAG(ctx, a.LocalDAG) - c, err := a.clientImport(ref, bufferedDS) + bufferedDS := ipld.NewBufferedDAG(ctx, st.DAG) + c, err := a.clientImport(ctx, ref, st) if err != nil { return err } + // TODO: does that defer mean to remove the whole blockstore? defer bufferedDS.Remove(ctx, c) //nolint:errcheck ssb := builder.NewSelectorSpecBuilder(basicnode.Style.Any) @@ -505,7 +533,7 @@ func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath stri return err } - sc := car.NewSelectiveCar(ctx, a.Blockstore, []car.Dag{{Root: c, Selector: allSelector}}) + sc := car.NewSelectiveCar(ctx, st.Bstore, []car.Dag{{Root: c, Selector: allSelector}}) if err = sc.Write(f); err != nil { return err } @@ -513,7 +541,7 @@ func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath stri return f.Close() } -func (a *API) clientImport(ref api.FileRef, bufferedDS *ipld.BufferedDAG) (cid.Cid, error) { +func (a *API) clientImport(ctx context.Context, ref api.FileRef, store *importmgr.Store) (cid.Cid, error) { f, err := os.Open(ref.Path) if err != nil { return cid.Undef, err @@ -530,13 +558,13 @@ func (a *API) clientImport(ref api.FileRef, bufferedDS *ipld.BufferedDAG) (cid.C } if ref.IsCAR { - var store car.Store - if a.Filestore == nil { - store = a.Blockstore + var st car.Store + if store.Fstore == nil { + st = store.Bstore } else { - store = (*filestore.Filestore)(a.Filestore) + st = store.Fstore } - result, err := car.LoadCar(store, file) + result, err := car.LoadCar(st, file) if err != nil { return cid.Undef, err } @@ -548,11 +576,13 @@ func (a *API) clientImport(ref api.FileRef, bufferedDS *ipld.BufferedDAG) (cid.C return result.Roots[0], nil } + bufDs := ipld.NewBufferedDAG(ctx, store.DAG) + params := ihelper.DagBuilderParams{ Maxlinks: build.UnixfsLinksPerLevel, RawLeaves: true, - CidBuilder: nil, - Dagserv: bufferedDS, + CidBuilder: cid.V1Builder{}, + Dagserv: bufDs, NoCopy: true, } @@ -565,7 +595,7 @@ func (a *API) clientImport(ref api.FileRef, bufferedDS *ipld.BufferedDAG) (cid.C return cid.Undef, err } - if err := bufferedDS.Commit(); err != nil { + if err := bufDs.Commit(); err != nil { return cid.Undef, err } diff --git a/node/modules/client.go b/node/modules/client.go index b4618015a..dc4637a1f 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -34,9 +34,25 @@ import ( "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" + "github.com/filecoin-project/lotus/node/repo/importmgr" "github.com/filecoin-project/lotus/paychmgr" ) +func ClientMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.ClientMultiDstore, error) { + mds, err := importmgr.NewMultiDstore(r, "/client") + if err != nil { + return nil, err + } + + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return mds.Close() + }, + }) + + return mds, nil +} + func ClientFstore(r repo.LockedRepo) (dtypes.ClientFilestore, error) { clientds, err := r.Datastore("/client") if err != nil { diff --git a/node/modules/dtypes/storage.go b/node/modules/dtypes/storage.go index 3eba07e4e..15c5c35e4 100644 --- a/node/modules/dtypes/storage.go +++ b/node/modules/dtypes/storage.go @@ -1,7 +1,6 @@ package dtypes import ( - "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" bserv "github.com/ipfs/go-blockservice" "github.com/ipfs/go-datastore" "github.com/ipfs/go-filestore" @@ -10,9 +9,12 @@ import ( exchange "github.com/ipfs/go-ipfs-exchange-interface" format "github.com/ipfs/go-ipld-format" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-statestore" + "github.com/filecoin-project/lotus/node/repo/importmgr" ) // MetadataDS stores metadata @@ -26,9 +28,9 @@ type ChainGCBlockstore blockstore.GCBlockstore type ChainExchange exchange.Interface type ChainBlockService bserv.BlockService +type ClientMultiDstore *importmgr.MultiStore type ClientFilestore *filestore.Filestore type ClientBlockstore blockstore.Blockstore -type ClientDAG format.DAGService type ClientDealStore *statestore.StateStore type ClientRequestValidator *requestvalidation.UnifiedRequestValidator type ClientDatastore datastore.Batching diff --git a/node/modules/testing/storage.go b/node/modules/testing/storage.go deleted file mode 100644 index 525d86b06..000000000 --- a/node/modules/testing/storage.go +++ /dev/null @@ -1,39 +0,0 @@ -package testing - -import ( - "context" - - "github.com/ipfs/go-blockservice" - "github.com/ipfs/go-datastore" - dsync "github.com/ipfs/go-datastore/sync" - blockstore "github.com/ipfs/go-ipfs-blockstore" - offline "github.com/ipfs/go-ipfs-exchange-offline" - ipld "github.com/ipfs/go-ipld-format" - "github.com/ipfs/go-merkledag" - "go.uber.org/fx" -) - -func MapBlockstore() blockstore.Blockstore { - // TODO: proper datastore - bds := dsync.MutexWrap(datastore.NewMapDatastore()) - bs := blockstore.NewBlockstore(bds) - return blockstore.NewIdStore(bs) -} - -func MapDatastore() datastore.Batching { - return dsync.MutexWrap(datastore.NewMapDatastore()) -} - -func MemoryClientDag(lc fx.Lifecycle) ipld.DAGService { - ibs := blockstore.NewBlockstore(datastore.NewMapDatastore()) - bsvc := blockservice.New(ibs, offline.Exchange(ibs)) - dag := merkledag.NewDAGService(bsvc) - - lc.Append(fx.Hook{ - OnStop: func(_ context.Context) error { - return bsvc.Close() - }, - }) - - return dag -} diff --git a/node/repo/fsrepo_ds.go b/node/repo/fsrepo_ds.go index cadb85d45..c0a57d5af 100644 --- a/node/repo/fsrepo_ds.go +++ b/node/repo/fsrepo_ds.go @@ -178,6 +178,29 @@ func (fsr *fsLockedRepo) Datastore(ns string) (datastore.Batching, error) { return ds, nil } +func (fsr *fsLockedRepo) ListDatastores(ns string) ([]int64, error) { + k := datastore.NewKey(ns) + parts := k.List() + if len(parts) != 1 { + return nil, xerrors.Errorf("expected multi-datastore namespace to have 1 part") + } + + fsr.dsLk.Lock() + defer fsr.dsLk.Unlock() + + mds, ok := fsr.multiDs[parts[0]] + if !ok { + return nil, xerrors.Errorf("no multi-datastore with namespace %s", ns) + } + + out := make([]int64, 0, len(mds)) + for i := range mds { + out = append(out, i) + } + + return out, nil +} + func (fsr *fsLockedRepo) DeleteDatastore(ns string) error { k := datastore.NewKey(ns) parts := k.List() diff --git a/node/repo/importmgr/mbstore.go b/node/repo/importmgr/mbstore.go new file mode 100644 index 000000000..78d4c8572 --- /dev/null +++ b/node/repo/importmgr/mbstore.go @@ -0,0 +1 @@ +package importmgr diff --git a/node/repo/importmgr/mgr.go b/node/repo/importmgr/mgr.go new file mode 100644 index 000000000..57b58212c --- /dev/null +++ b/node/repo/importmgr/mgr.go @@ -0,0 +1,74 @@ +package importmgr + +import ( + "encoding/json" + "fmt" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + blockstore "github.com/ipfs/go-ipfs-blockstore" + "golang.org/x/xerrors" +) + +type Mgr struct { + mds *MultiStore + Bs blockstore.Blockstore + ds datastore.Batching +} + +func New(mds *MultiStore, ds datastore.Batching) *Mgr { + return &Mgr{ + mds: mds, + bs: &multiReadBs{ + mds: mds, + }, + + ds: namespace.Wrap(ds, datastore.NewKey("/stores")), + } +} + +type storeMeta struct { + Labels map[string]string +} + +func (m *Mgr) NewStore() (int64, *Store, error) { + id := m.mds.Next() + st, err := m.mds.Get(id) + if err != nil { + return 0, nil, err + } + + meta, err := json.Marshal(&storeMeta{Labels: map[string]string{ + "source": "unknown", + }}) + if err != nil { + return 0, nil, xerrors.Errorf("marshaling empty store metadata: %w", err) + } + + err = m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta) + return id, st, err +} + +func (m *Mgr) AddLabel(id int64, key, value string) error { // source, file path, data CID.. + meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id))) + if err != nil { + return xerrors.Errorf("getting metadata form datastore: %w", err) + } + + var sm storeMeta + if err := json.Unmarshal(meta, &sm); err != nil { + return xerrors.Errorf("unmarshaling store meta: %w", err) + } + + sm.Labels[key] = value + + meta, err = json.Marshal(&storeMeta{}) + if err != nil { + return xerrors.Errorf("marshaling store meta: %w", err) + } + + return m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta) +} + +// m.List +// m.Info +// m.Delete \ No newline at end of file diff --git a/node/repo/importmgr/multistore.go b/node/repo/importmgr/multistore.go new file mode 100644 index 000000000..1852cf54a --- /dev/null +++ b/node/repo/importmgr/multistore.go @@ -0,0 +1,198 @@ +package importmgr + +import ( + "context" + "fmt" + "path" + "sync" + "sync/atomic" + + "github.com/hashicorp/go-multierror" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + blockstore "github.com/ipfs/go-ipfs-blockstore" + "golang.org/x/xerrors" + + "github.com/ipfs/go-datastore" +) + +type dsProvider interface { + Datastore(namespace string) (datastore.Batching, error) + ListDatastores(namespace string) ([]int64, error) + DeleteDatastore(namespace string) error +} + +type MultiStore struct { + provider dsProvider + namespace string + + open map[int64]*Store + next int64 + + lk sync.RWMutex +} + +func NewMultiDstore(provider dsProvider, namespace string) (*MultiStore, error) { + ids, err := provider.ListDatastores(namespace) + if err != nil { + return nil, xerrors.Errorf("listing datastores: %w", err) + } + + mds := &MultiStore{ + provider: provider, + namespace: namespace, + } + + for _, i := range ids { + if i > mds.next { + mds.next = i + } + + _, err := mds.Get(i) + if err != nil { + return nil, xerrors.Errorf("open store %d: %w", i, err) + } + } + + return mds, nil +} + +func (mds *MultiStore) path(i int64) string { + return path.Join("/", mds.namespace, fmt.Sprintf("%d", i)) +} + +func (mds *MultiStore) Next() int64 { + return atomic.AddInt64(&mds.next, 1) +} + +func (mds *MultiStore) Get(i int64) (*Store, error) { + mds.lk.Lock() + defer mds.lk.Unlock() + + store, ok := mds.open[i] + if ok { + return store, nil + } + + ds, err := mds.provider.Datastore(mds.path(i)) + if err != nil { + return nil, err + } + + mds.open[i], err = openStore(ds) + return mds.open[i], err +} + +func (mds *MultiStore) Delete(i int64) error { + mds.lk.Lock() + defer mds.lk.Unlock() + + store, ok := mds.open[i] + if ok { + if err := store.Close(); err != nil { + return xerrors.Errorf("closing sub-datastore %d: %w", i, err) + } + + delete(mds.open, i) + } + + return mds.provider.DeleteDatastore(mds.path(i)) +} + +func (mds *MultiStore) Close() error { + mds.lk.Lock() + defer mds.lk.Unlock() + + var err error + for i, store := range mds.open { + cerr := store.Close() + if cerr != nil { + err = multierror.Append(err, xerrors.Errorf("closing sub-datastore %d: %w", i, cerr)) + } + } + + return err +} + +type multiReadBs struct { + // TODO: some caching + mds *MultiStore +} + +func (m *multiReadBs) Has(cid cid.Cid) (bool, error) { + m.mds.lk.RLock() + defer m.mds.lk.RUnlock() + + var merr error + for i, store := range m.mds.open { + has, err := store.Bstore.Has(cid) + if err != nil { + merr = multierror.Append(merr, xerrors.Errorf("has (ds %d): %w", i, err)) + continue + } + if !has { + continue + } + + return true, nil + } + + return false, merr +} + +func (m *multiReadBs) Get(cid cid.Cid) (blocks.Block, error) { + m.mds.lk.RLock() + defer m.mds.lk.RUnlock() + + var merr error + for i, store := range m.mds.open { + has, err := store.Bstore.Has(cid) + if err != nil { + merr = multierror.Append(merr, xerrors.Errorf("has (ds %d): %w", i, err)) + continue + } + if !has { + continue + } + + val, err := store.Bstore.Get(cid) + if err != nil { + merr = multierror.Append(merr, xerrors.Errorf("get (ds %d): %w", i, err)) + continue + } + + return val, nil + } + + if merr == nil { + return nil, datastore.ErrNotFound + } + + return nil, merr +} + +func (m *multiReadBs) DeleteBlock(cid cid.Cid) error { + return xerrors.Errorf("operation not supported") +} + +func (m *multiReadBs) GetSize(cid cid.Cid) (int, error) { + return 0, xerrors.Errorf("operation not supported") +} + +func (m *multiReadBs) Put(block blocks.Block) error { + return xerrors.Errorf("operation not supported") +} + +func (m *multiReadBs) PutMany(blocks []blocks.Block) error { + return xerrors.Errorf("operation not supported") +} + +func (m *multiReadBs) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return nil, xerrors.Errorf("operation not supported") +} + +func (m *multiReadBs) HashOnRead(enabled bool) { + return +} + +var _ blockstore.Blockstore = &multiReadBs{} \ No newline at end of file diff --git a/node/repo/importmgr/store.go b/node/repo/importmgr/store.go new file mode 100644 index 000000000..a08974262 --- /dev/null +++ b/node/repo/importmgr/store.go @@ -0,0 +1,54 @@ +package importmgr + +import ( + "github.com/ipfs/go-blockservice" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + "github.com/ipfs/go-filestore" + blockstore "github.com/ipfs/go-ipfs-blockstore" + offline "github.com/ipfs/go-ipfs-exchange-offline" + ipld "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-merkledag" +) + +type Store struct { + ds datastore.Batching + + fm *filestore.FileManager + Fstore *filestore.Filestore + + Bstore blockstore.Blockstore + + bsvc blockservice.BlockService + DAG ipld.DAGService +} + +func openStore(ds datastore.Batching) (*Store, error) { + blocks := namespace.Wrap(ds, datastore.NewKey("blocks")) + bs := blockstore.NewBlockstore(blocks) + + fm := filestore.NewFileManager(ds, "/") + fm.AllowFiles = true + + fstore := filestore.NewFilestore(bs, fm) + ibs := blockstore.NewIdStore(fstore) + + bsvc := blockservice.New(ibs, offline.Exchange(ibs)) + dag := merkledag.NewDAGService(bsvc) + + return &Store{ + ds: ds, + + fm: fm, + Fstore: fstore, + + Bstore: ibs, + + bsvc: bsvc, + DAG: dag, + }, nil +} + +func (s *Store) Close() error { + return s.bsvc.Close() +} diff --git a/node/repo/interface.go b/node/repo/interface.go index 8699ae20a..a4c09e1ee 100644 --- a/node/repo/interface.go +++ b/node/repo/interface.go @@ -35,6 +35,7 @@ type LockedRepo interface { // Returns datastore defined in this repo. Datastore(namespace string) (datastore.Batching, error) + ListDatastores(namespace string) ([]int64, error) DeleteDatastore(namespace string) error // Returns config in this repo diff --git a/node/repo/memrepo.go b/node/repo/memrepo.go index e99342c55..7e4cf71dc 100644 --- a/node/repo/memrepo.go +++ b/node/repo/memrepo.go @@ -233,6 +233,10 @@ func (lmem *lockedMemRepo) Datastore(ns string) (datastore.Batching, error) { return namespace.Wrap(lmem.mem.datastore, datastore.NewKey(ns)), nil } +func (lmem *lockedMemRepo) ListDatastores(ns string) ([]int64, error) { + return nil, nil +} + func (lmem *lockedMemRepo) DeleteDatastore(ns string) error { /** poof **/ return nil From 47f0898ce94120239e20736cbce9897741057ca0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 7 Jul 2020 10:52:19 +0200 Subject: [PATCH 03/12] Wire up client import manager --- api/api_full.go | 9 +-- cli/client.go | 10 ++- node/builder.go | 3 +- node/impl/client/client.go | 103 ++++++++++++++---------------- node/modules/client.go | 50 ++------------- node/modules/dtypes/storage.go | 3 +- node/modules/ipfsclient.go | 7 +- node/repo/importmgr/mbstore.go | 95 +++++++++++++++++++++++++++ node/repo/importmgr/mgr.go | 48 +++++++++++--- node/repo/importmgr/multistore.go | 98 ++++------------------------ 10 files changed, 215 insertions(+), 211 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index 9212f4d21..3c0407069 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -5,7 +5,6 @@ import ( "time" "github.com/ipfs/go-cid" - "github.com/ipfs/go-filestore" "github.com/libp2p/go-libp2p-core/peer" "github.com/filecoin-project/go-address" @@ -356,10 +355,12 @@ type MinerSectors struct { } type Import struct { - Status filestore.Status - Key cid.Cid + Key int64 + Err error + + Root *cid.Cid + Source string FilePath string - Size uint64 } type DealInfo struct { diff --git a/cli/client.go b/cli/client.go index 0b2df500c..04a583590 100644 --- a/cli/client.go +++ b/cli/client.go @@ -204,7 +204,15 @@ var clientLocalCmd = &cli.Command{ } for _, v := range list { - fmt.Printf("%s %s %s %s\n", encoder.Encode(v.Key), v.FilePath, types.SizeStr(types.NewInt(v.Size)), v.Status) + cidStr := "" + if v.Root != nil { + cidStr = encoder.Encode(*v.Root) + } + + fmt.Printf("%d: %s @%s (%s)\n", v.Key, cidStr, v.FilePath, v.Source) + if v.Err != nil { + fmt.Printf("\terror: %s\n", v.Err) + } } return nil }, diff --git a/node/builder.go b/node/builder.go index e1c82f626..76ba46a39 100644 --- a/node/builder.go +++ b/node/builder.go @@ -435,8 +435,9 @@ func Repo(r repo.Repo) Option { Override(new(dtypes.MetadataDS), modules.Datastore), Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore), + Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr), Override(new(dtypes.ClientMultiDstore), modules.ClientMultiDatastore), - Override(new(dtypes.ClientFilestore), modules.ClientFstore), + Override(new(dtypes.ClientBlockstore), modules.ClientBlockstore), Override(new(ci.PrivKey), lp2p.PrivKey), diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 07cd4f5c0..8c6f10896 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -2,15 +2,8 @@ package client import ( "context" - "errors" "fmt" - "github.com/filecoin-project/go-fil-markets/pieceio" - basicnode "github.com/ipld/go-ipld-prime/node/basic" - "github.com/ipld/go-ipld-prime/traversal/selector" - "github.com/ipld/go-ipld-prime/traversal/selector/builder" - "github.com/multiformats/go-multiaddr" - "io" "os" @@ -18,7 +11,6 @@ import ( "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" - "github.com/ipfs/go-filestore" chunker "github.com/ipfs/go-ipfs-chunker" offline "github.com/ipfs/go-ipfs-exchange-offline" files "github.com/ipfs/go-ipfs-files" @@ -28,10 +20,15 @@ import ( "github.com/ipfs/go-unixfs/importer/balanced" ihelper "github.com/ipfs/go-unixfs/importer/helpers" "github.com/ipld/go-car" + basicnode "github.com/ipld/go-ipld-prime/node/basic" + "github.com/ipld/go-ipld-prime/traversal/selector" + "github.com/ipld/go-ipld-prime/traversal/selector/builder" "github.com/libp2p/go-libp2p-core/peer" + "github.com/multiformats/go-multiaddr" "go.uber.org/fx" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-markets/pieceio" rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/sector-storage/ffiwrapper" @@ -46,6 +43,7 @@ import ( "github.com/filecoin-project/lotus/markets/utils" "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/impl/paych" + "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo/importmgr" ) @@ -64,7 +62,7 @@ type API struct { Retrieval rm.RetrievalClient Chain *store.ChainStore - Imports *importmgr.Mgr + Imports dtypes.ClientImportMgr } func calcDealExpiration(minDuration uint64, md *miner.DeadlineInfo, startEpoch abi.ChainEpoch) abi.ChainEpoch { @@ -75,6 +73,10 @@ func calcDealExpiration(minDuration uint64, md *miner.DeadlineInfo, startEpoch a return minExp + miner.WPoStProvingPeriod - (minExp % miner.WPoStProvingPeriod) + (md.PeriodStart % miner.WPoStProvingPeriod) - 1 } +func (a *API) imgr() *importmgr.Mgr { + return a.Imports +} + func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) { exist, err := a.WalletHas(ctx, params.Wallet) if err != nil { @@ -193,7 +195,7 @@ func (a *API) ClientGetDealInfo(ctx context.Context, d cid.Cid) (*api.DealInfo, func (a *API) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) { // TODO: check if we have the ENTIRE dag - offExch := merkledag.NewDAGService(blockservice.New(a.Imports.Bs, offline.Exchange(a.Imports.Bs))) + offExch := merkledag.NewDAGService(blockservice.New(a.Imports.Blockstore, offline.Exchange(a.Imports.Blockstore))) _, err := offExch.Get(ctx, root) if err == ipld.ErrNotFound { return false, nil @@ -258,11 +260,11 @@ func (a *API) makeRetrievalQuery(ctx context.Context, rp rm.RetrievalPeer, paylo } func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (cid.Cid, error) { - id, st, err := a.Imports.NewStore() + id, st, err := a.imgr().NewStore() if err != nil { return cid.Cid{}, err } - if err := a.Imports.AddLabel(id, "source", "import"); err != nil { + if err := a.imgr().AddLabel(id, "source", "import"); err != nil { return cid.Cid{}, err } @@ -278,11 +280,11 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (cid.Cid, error func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, error) { file := files.NewReaderFile(f) - id, st, err := a.Imports.NewStore() + id, st, err := a.imgr().NewStore() if err != nil { return cid.Cid{}, err } - if err := a.Imports.AddLabel(id, "source", "import-local"); err != nil { + if err := a.imgr().AddLabel(id, "source", "import-local"); err != nil { return cid.Cid{}, err } @@ -308,49 +310,38 @@ func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, erro } func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) { - if a.Filestore == nil { - return nil, errors.New("listing imports is not supported with in-memory dag yet") - } - next, err := filestore.ListAll(a.Filestore, false) - if err != nil { - return nil, err - } + importIDs := a.imgr().List() - // TODO: make this less very bad by tracking root cids instead of using ListAll - - out := make([]api.Import, 0) - lowest := make([]uint64, 0) - for { - r := next() - if r == nil { - return out, nil + out := make([]api.Import, len(importIDs)) + for i, id := range importIDs { + info, err := a.imgr().Info(id) + if err != nil { + out[i] = api.Import{ + Key: id, + Err: xerrors.Errorf("getting info: %w", err), + } + continue } - matched := false - for i := range out { - if out[i].FilePath == r.FilePath { - matched = true - if lowest[i] > r.Offset { - lowest[i] = r.Offset - out[i] = api.Import{ - Status: r.Status, - Key: r.Key, - FilePath: r.FilePath, - Size: r.Size, - } - } - break + + ai := api.Import{ + Key: id, + Source: info.Labels[importmgr.LSource], + FilePath: info.Labels[importmgr.LFileName], + } + + if info.Labels[importmgr.LRootCid] != "" { + c, err := cid.Parse(info.Labels[importmgr.LRootCid]) + if err != nil { + ai.Err = err + } else { + ai.Root = &c } } - if !matched { - out = append(out, api.Import{ - Status: r.Status, - Key: r.Key, - FilePath: r.FilePath, - Size: r.Size, - }) - lowest = append(lowest, r.Offset) - } + + out[i] = ai } + + return out, nil } func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error { @@ -367,11 +358,11 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref return xerrors.Errorf("cannot make retrieval deal for zero bytes") } - id, st, err := a.Imports.NewStore() + id, st, err := a.imgr().NewStore() if err != nil { return err } - if err := a.Imports.AddLabel(id, "source", "retrieval"); err != nil { + if err := a.imgr().AddLabel(id, "source", "retrieval"); err != nil { return err } @@ -505,11 +496,11 @@ func (a *API) ClientCalcCommP(ctx context.Context, inpath string, miner address. } func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath string) error { - id, st, err := a.Imports.NewStore() + id, st, err := a.imgr().NewStore() if err != nil { return err } - if err := a.Imports.AddLabel(id, "source", "gen-car"); err != nil { + if err := a.imgr().AddLabel(id, "source", "gen-car"); err != nil { return err } diff --git a/node/modules/client.go b/node/modules/client.go index dc4637a1f..da0f3be06 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -2,12 +2,9 @@ package modules import ( "context" - "path/filepath" blockstore "github.com/ipfs/go-ipfs-blockstore" - "github.com/ipfs/go-merkledag" "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/routing" "go.uber.org/fx" graphsyncimpl "github.com/filecoin-project/go-data-transfer/impl/graphsync" @@ -21,18 +18,13 @@ import ( smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/go-storedcounter" - "github.com/ipfs/go-bitswap" - "github.com/ipfs/go-bitswap/network" - "github.com/ipfs/go-blockservice" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" - "github.com/ipfs/go-filestore" "github.com/filecoin-project/lotus/markets/retrievaladapter" "github.com/filecoin-project/lotus/node/impl/full" payapi "github.com/filecoin-project/lotus/node/impl/paych" "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo/importmgr" "github.com/filecoin-project/lotus/paychmgr" @@ -53,28 +45,12 @@ func ClientMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.ClientMult return mds, nil } -func ClientFstore(r repo.LockedRepo) (dtypes.ClientFilestore, error) { - clientds, err := r.Datastore("/client") - if err != nil { - return nil, err - } - blocks := namespace.Wrap(clientds, datastore.NewKey("blocks")) - - absPath, err := filepath.Abs(r.Path()) - if err != nil { - return nil, err - } - - fm := filestore.NewFileManager(clientds, filepath.Dir(absPath)) - fm.AllowFiles = true - // TODO: fm.AllowUrls (needs more code in client import) - - bs := blockstore.NewBlockstore(blocks) - return filestore.NewFilestore(bs, fm), nil +func ClientImportMgr(mds dtypes.ClientMultiDstore, ds dtypes.MetadataDS) dtypes.ClientImportMgr { + return importmgr.New(mds, namespace.Wrap(ds, datastore.NewKey("/client"))) } -func ClientBlockstore(fstore dtypes.ClientFilestore) dtypes.ClientBlockstore { - return blockstore.NewIdStore((*filestore.Filestore)(fstore)) +func ClientBlockstore(imgr dtypes.ClientImportMgr) dtypes.ClientBlockstore { + return blockstore.NewIdStore(imgr.Blockstore) } // RegisterClientValidator is an initialization hook that registers the client @@ -103,24 +79,6 @@ func NewClientDatastore(ds dtypes.MetadataDS) dtypes.ClientDatastore { return namespace.Wrap(ds, datastore.NewKey("/deals/client")) } -// ClientDAG is a DAGService for the ClientBlockstore -func ClientDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.ClientBlockstore, rt routing.Routing, h host.Host) dtypes.ClientDAG { - bitswapNetwork := network.NewFromIpfsHost(h, rt) - bitswapOptions := []bitswap.Option{bitswap.ProvideEnabled(false)} - exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, ibs, bitswapOptions...) - - bsvc := blockservice.New(ibs, exch) - dag := merkledag.NewDAGService(bsvc) - - lc.Append(fx.Hook{ - OnStop: func(_ context.Context) error { - return bsvc.Close() - }, - }) - - return dag -} - func NewClientRequestValidator(deals dtypes.ClientDealStore) dtypes.ClientRequestValidator { return requestvalidation.NewUnifiedRequestValidator(nil, deals) } diff --git a/node/modules/dtypes/storage.go b/node/modules/dtypes/storage.go index 15c5c35e4..7ede8aaab 100644 --- a/node/modules/dtypes/storage.go +++ b/node/modules/dtypes/storage.go @@ -3,7 +3,6 @@ package dtypes import ( bserv "github.com/ipfs/go-blockservice" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-filestore" "github.com/ipfs/go-graphsync" blockstore "github.com/ipfs/go-ipfs-blockstore" exchange "github.com/ipfs/go-ipfs-exchange-interface" @@ -29,7 +28,7 @@ type ChainExchange exchange.Interface type ChainBlockService bserv.BlockService type ClientMultiDstore *importmgr.MultiStore -type ClientFilestore *filestore.Filestore +type ClientImportMgr *importmgr.Mgr type ClientBlockstore blockstore.Blockstore type ClientDealStore *statestore.StateStore type ClientRequestValidator *requestvalidation.UnifiedRequestValidator diff --git a/node/modules/ipfsclient.go b/node/modules/ipfsclient.go index f405cb4c6..93bedf598 100644 --- a/node/modules/ipfsclient.go +++ b/node/modules/ipfsclient.go @@ -4,7 +4,6 @@ import ( "go.uber.org/fx" "golang.org/x/xerrors" - "github.com/ipfs/go-filestore" blockstore "github.com/ipfs/go-ipfs-blockstore" "github.com/multiformats/go-multiaddr" @@ -18,8 +17,8 @@ import ( // If ipfsMaddr is empty, a local IPFS node is assumed considering IPFS_PATH configuration. // If ipfsMaddr is not empty, it will connect to the remote IPFS node with the provided multiaddress. // The flag useForRetrieval indicates if the IPFS node will also be used for storing retrieving deals. -func IpfsClientBlockstore(ipfsMaddr string, useForRetrieval bool) func(helpers.MetricsCtx, fx.Lifecycle, dtypes.ClientFilestore) (dtypes.ClientBlockstore, error) { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fstore dtypes.ClientFilestore) (dtypes.ClientBlockstore, error) { +func IpfsClientBlockstore(ipfsMaddr string, useForRetrieval bool) func(helpers.MetricsCtx, fx.Lifecycle, dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, localStore dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) { var err error var ipfsbs *ipfsbstore.IpfsBstore if ipfsMaddr != "" { @@ -38,7 +37,7 @@ func IpfsClientBlockstore(ipfsMaddr string, useForRetrieval bool) func(helpers.M var ws blockstore.Blockstore ws = ipfsbs if !useForRetrieval { - ws = blockstore.NewIdStore((*filestore.Filestore)(fstore)) + ws = blockstore.NewIdStore(localStore.Blockstore) } return bufbstore.NewTieredBstore(ipfsbs, ws), nil } diff --git a/node/repo/importmgr/mbstore.go b/node/repo/importmgr/mbstore.go index 78d4c8572..889752cf2 100644 --- a/node/repo/importmgr/mbstore.go +++ b/node/repo/importmgr/mbstore.go @@ -1 +1,96 @@ package importmgr + +import ( + "context" + + "github.com/hashicorp/go-multierror" + "golang.org/x/xerrors" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + blockstore "github.com/ipfs/go-ipfs-blockstore" +) + +type multiReadBs struct { + // TODO: some caching + mds *MultiStore +} + +func (m *multiReadBs) Has(cid cid.Cid) (bool, error) { + m.mds.lk.RLock() + defer m.mds.lk.RUnlock() + + var merr error + for i, store := range m.mds.open { + has, err := store.Bstore.Has(cid) + if err != nil { + merr = multierror.Append(merr, xerrors.Errorf("has (ds %d): %w", i, err)) + continue + } + if !has { + continue + } + + return true, nil + } + + return false, merr +} + +func (m *multiReadBs) Get(cid cid.Cid) (blocks.Block, error) { + m.mds.lk.RLock() + defer m.mds.lk.RUnlock() + + var merr error + for i, store := range m.mds.open { + has, err := store.Bstore.Has(cid) + if err != nil { + merr = multierror.Append(merr, xerrors.Errorf("has (ds %d): %w", i, err)) + continue + } + if !has { + continue + } + + val, err := store.Bstore.Get(cid) + if err != nil { + merr = multierror.Append(merr, xerrors.Errorf("get (ds %d): %w", i, err)) + continue + } + + return val, nil + } + + if merr == nil { + return nil, datastore.ErrNotFound + } + + return nil, merr +} + +func (m *multiReadBs) DeleteBlock(cid cid.Cid) error { + return xerrors.Errorf("operation not supported") +} + +func (m *multiReadBs) GetSize(cid cid.Cid) (int, error) { + return 0, xerrors.Errorf("operation not supported") +} + +func (m *multiReadBs) Put(block blocks.Block) error { + return xerrors.Errorf("operation not supported") +} + +func (m *multiReadBs) PutMany(blocks []blocks.Block) error { + return xerrors.Errorf("operation not supported") +} + +func (m *multiReadBs) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return nil, xerrors.Errorf("operation not supported") +} + +func (m *multiReadBs) HashOnRead(enabled bool) { + return +} + +var _ blockstore.Blockstore = &multiReadBs{} diff --git a/node/repo/importmgr/mgr.go b/node/repo/importmgr/mgr.go index 57b58212c..c1550689a 100644 --- a/node/repo/importmgr/mgr.go +++ b/node/repo/importmgr/mgr.go @@ -3,22 +3,33 @@ package importmgr import ( "encoding/json" "fmt" + + "golang.org/x/xerrors" + "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" blockstore "github.com/ipfs/go-ipfs-blockstore" - "golang.org/x/xerrors" ) type Mgr struct { - mds *MultiStore - Bs blockstore.Blockstore - ds datastore.Batching + mds *MultiStore + ds datastore.Batching + + Blockstore blockstore.Blockstore } +type Label string +const ( + LSource = "source" // Function which created the import + LRootCid = "root" // Root CID + LFileName = "filename" // Local file path + LMTime = "mtime" // File modification timestamp +) + func New(mds *MultiStore, ds datastore.Batching) *Mgr { return &Mgr{ mds: mds, - bs: &multiReadBs{ + Blockstore: &multiReadBs{ mds: mds, }, @@ -26,7 +37,7 @@ func New(mds *MultiStore, ds datastore.Batching) *Mgr { } } -type storeMeta struct { +type StoreMeta struct { Labels map[string]string } @@ -37,7 +48,7 @@ func (m *Mgr) NewStore() (int64, *Store, error) { return 0, nil, err } - meta, err := json.Marshal(&storeMeta{Labels: map[string]string{ + meta, err := json.Marshal(&StoreMeta{Labels: map[string]string{ "source": "unknown", }}) if err != nil { @@ -54,14 +65,14 @@ func (m *Mgr) AddLabel(id int64, key, value string) error { // source, file path return xerrors.Errorf("getting metadata form datastore: %w", err) } - var sm storeMeta + var sm StoreMeta if err := json.Unmarshal(meta, &sm); err != nil { return xerrors.Errorf("unmarshaling store meta: %w", err) } sm.Labels[key] = value - meta, err = json.Marshal(&storeMeta{}) + meta, err = json.Marshal(&StoreMeta{}) if err != nil { return xerrors.Errorf("marshaling store meta: %w", err) } @@ -69,6 +80,23 @@ func (m *Mgr) AddLabel(id int64, key, value string) error { // source, file path return m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta) } -// m.List +func (m *Mgr) List() []int64 { + return m.mds.List() +} + +func (m *Mgr) Info(id int64) (*StoreMeta, error) { + meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id))) + if err != nil { + return nil, xerrors.Errorf("getting metadata form datastore: %w", err) + } + + var sm StoreMeta + if err := json.Unmarshal(meta, &sm); err != nil { + return nil, xerrors.Errorf("unmarshaling store meta: %w", err) + } + + return &sm, nil +} + // m.Info // m.Delete \ No newline at end of file diff --git a/node/repo/importmgr/multistore.go b/node/repo/importmgr/multistore.go index 1852cf54a..43f51b768 100644 --- a/node/repo/importmgr/multistore.go +++ b/node/repo/importmgr/multistore.go @@ -1,16 +1,12 @@ package importmgr import ( - "context" "fmt" "path" "sync" "sync/atomic" "github.com/hashicorp/go-multierror" - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - blockstore "github.com/ipfs/go-ipfs-blockstore" "golang.org/x/xerrors" "github.com/ipfs/go-datastore" @@ -83,6 +79,17 @@ func (mds *MultiStore) Get(i int64) (*Store, error) { return mds.open[i], err } +func (mds *MultiStore) List() []int64 { + mds.lk.RLock() + defer mds.lk.RUnlock() + out := make([]int64, 0, len(mds.open)) + for i := range mds.open { + out = append(out, i) + } + + return out +} + func (mds *MultiStore) Delete(i int64) error { mds.lk.Lock() defer mds.lk.Unlock() @@ -113,86 +120,3 @@ func (mds *MultiStore) Close() error { return err } - -type multiReadBs struct { - // TODO: some caching - mds *MultiStore -} - -func (m *multiReadBs) Has(cid cid.Cid) (bool, error) { - m.mds.lk.RLock() - defer m.mds.lk.RUnlock() - - var merr error - for i, store := range m.mds.open { - has, err := store.Bstore.Has(cid) - if err != nil { - merr = multierror.Append(merr, xerrors.Errorf("has (ds %d): %w", i, err)) - continue - } - if !has { - continue - } - - return true, nil - } - - return false, merr -} - -func (m *multiReadBs) Get(cid cid.Cid) (blocks.Block, error) { - m.mds.lk.RLock() - defer m.mds.lk.RUnlock() - - var merr error - for i, store := range m.mds.open { - has, err := store.Bstore.Has(cid) - if err != nil { - merr = multierror.Append(merr, xerrors.Errorf("has (ds %d): %w", i, err)) - continue - } - if !has { - continue - } - - val, err := store.Bstore.Get(cid) - if err != nil { - merr = multierror.Append(merr, xerrors.Errorf("get (ds %d): %w", i, err)) - continue - } - - return val, nil - } - - if merr == nil { - return nil, datastore.ErrNotFound - } - - return nil, merr -} - -func (m *multiReadBs) DeleteBlock(cid cid.Cid) error { - return xerrors.Errorf("operation not supported") -} - -func (m *multiReadBs) GetSize(cid cid.Cid) (int, error) { - return 0, xerrors.Errorf("operation not supported") -} - -func (m *multiReadBs) Put(block blocks.Block) error { - return xerrors.Errorf("operation not supported") -} - -func (m *multiReadBs) PutMany(blocks []blocks.Block) error { - return xerrors.Errorf("operation not supported") -} - -func (m *multiReadBs) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { - return nil, xerrors.Errorf("operation not supported") -} - -func (m *multiReadBs) HashOnRead(enabled bool) { - return -} - -var _ blockstore.Blockstore = &multiReadBs{} \ No newline at end of file From f59eb94d9267dcc58d6be8d28bd3674cef035b23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 7 Jul 2020 11:12:32 +0200 Subject: [PATCH 04/12] client: Set correct dag builder params --- api/api_full.go | 2 +- cli/client.go | 2 +- node/impl/client/client.go | 33 ++++++++++++++++++++++++------- node/repo/importmgr/multistore.go | 2 ++ 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index 3c0407069..ca4e1d721 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -356,7 +356,7 @@ type MinerSectors struct { type Import struct { Key int64 - Err error + Err string Root *cid.Cid Source string diff --git a/cli/client.go b/cli/client.go index 04a583590..6de47c314 100644 --- a/cli/client.go +++ b/cli/client.go @@ -210,7 +210,7 @@ var clientLocalCmd = &cli.Command{ } fmt.Printf("%d: %s @%s (%s)\n", v.Key, cidStr, v.FilePath, v.Source) - if v.Err != nil { + if v.Err != "" { fmt.Printf("\terror: %s\n", v.Err) } } diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 8c6f10896..8d92f659b 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -3,7 +3,6 @@ package client import ( "context" "fmt" - "io" "os" @@ -11,6 +10,7 @@ import ( "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" + "github.com/ipfs/go-cidutil" chunker "github.com/ipfs/go-ipfs-chunker" offline "github.com/ipfs/go-ipfs-exchange-offline" files "github.com/ipfs/go-ipfs-files" @@ -25,6 +25,7 @@ import ( "github.com/ipld/go-ipld-prime/traversal/selector/builder" "github.com/libp2p/go-libp2p-core/peer" "github.com/multiformats/go-multiaddr" + mh "github.com/multiformats/go-multihash" "go.uber.org/fx" "github.com/filecoin-project/go-address" @@ -47,6 +48,7 @@ import ( "github.com/filecoin-project/lotus/node/repo/importmgr" ) +var DefaultHashFunction = uint64(mh.BLAKE2B_MIN + 31) const dealStartBuffer abi.ChainEpoch = 10000 // TODO: allow setting type API struct { @@ -282,7 +284,7 @@ func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, erro id, st, err := a.imgr().NewStore() if err != nil { - return cid.Cid{}, err + return cid.Undef, err } if err := a.imgr().AddLabel(id, "source", "import-local"); err != nil { return cid.Cid{}, err @@ -290,10 +292,19 @@ func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, erro bufferedDS := ipld.NewBufferedDAG(ctx, st.DAG) + prefix, err := merkledag.PrefixForCidVersion(1) + if err != nil { + return cid.Undef, err + } + prefix.MhType = DefaultHashFunction + params := ihelper.DagBuilderParams{ Maxlinks: build.UnixfsLinksPerLevel, RawLeaves: true, - CidBuilder: cid.V1Builder{}, + CidBuilder: cidutil.InlineBuilder{ + Builder: prefix, + Limit: 126, + }, Dagserv: bufferedDS, } @@ -318,7 +329,7 @@ func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) { if err != nil { out[i] = api.Import{ Key: id, - Err: xerrors.Errorf("getting info: %w", err), + Err: xerrors.Errorf("getting info: %w", err).Error(), } continue } @@ -332,7 +343,7 @@ func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) { if info.Labels[importmgr.LRootCid] != "" { c, err := cid.Parse(info.Labels[importmgr.LRootCid]) if err != nil { - ai.Err = err + ai.Err = err.Error() } else { ai.Root = &c } @@ -569,12 +580,20 @@ func (a *API) clientImport(ctx context.Context, ref api.FileRef, store *importmg bufDs := ipld.NewBufferedDAG(ctx, store.DAG) + prefix, err := merkledag.PrefixForCidVersion(1) + if err != nil { + return cid.Undef, err + } + prefix.MhType = DefaultHashFunction + params := ihelper.DagBuilderParams{ Maxlinks: build.UnixfsLinksPerLevel, RawLeaves: true, - CidBuilder: cid.V1Builder{}, + CidBuilder: cidutil.InlineBuilder{ + Builder: prefix, + Limit: 126, + }, Dagserv: bufDs, - NoCopy: true, } db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize))) diff --git a/node/repo/importmgr/multistore.go b/node/repo/importmgr/multistore.go index 43f51b768..32853148e 100644 --- a/node/repo/importmgr/multistore.go +++ b/node/repo/importmgr/multistore.go @@ -37,6 +37,8 @@ func NewMultiDstore(provider dsProvider, namespace string) (*MultiStore, error) mds := &MultiStore{ provider: provider, namespace: namespace, + + open: map[int64]*Store{}, } for _, i := range ids { From 24ed43d54101905d33c8851b9bf0a52296de32d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 7 Jul 2020 11:38:09 +0200 Subject: [PATCH 05/12] client: Fix import labeling --- node/impl/client/client.go | 15 +++++++++++---- node/repo/importmgr/mgr.go | 4 ++-- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 8d92f659b..6df7dcbeb 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -264,18 +264,25 @@ func (a *API) makeRetrievalQuery(ctx context.Context, rp rm.RetrievalPeer, paylo func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (cid.Cid, error) { id, st, err := a.imgr().NewStore() if err != nil { - return cid.Cid{}, err + return cid.Undef, err } - if err := a.imgr().AddLabel(id, "source", "import"); err != nil { - return cid.Cid{}, err + if err := a.imgr().AddLabel(id, importmgr.LSource, "import"); err != nil { + return cid.Undef, err + } + + if err := a.imgr().AddLabel(id, importmgr.LFileName, ref.Path); err != nil { + return cid.Undef, err } nd, err := a.clientImport(ctx, ref, st) - if err != nil { return cid.Undef, err } + if err := a.imgr().AddLabel(id, importmgr.LRootCid, nd.String()); err != nil { + return cid.Undef, err + } + return nd, nil } diff --git a/node/repo/importmgr/mgr.go b/node/repo/importmgr/mgr.go index c1550689a..9d658c24d 100644 --- a/node/repo/importmgr/mgr.go +++ b/node/repo/importmgr/mgr.go @@ -33,7 +33,7 @@ func New(mds *MultiStore, ds datastore.Batching) *Mgr { mds: mds, }, - ds: namespace.Wrap(ds, datastore.NewKey("/stores")), + ds: datastore.NewLogDatastore(namespace.Wrap(ds, datastore.NewKey("/stores")), "storess"), } } @@ -72,7 +72,7 @@ func (m *Mgr) AddLabel(id int64, key, value string) error { // source, file path sm.Labels[key] = value - meta, err = json.Marshal(&StoreMeta{}) + meta, err = json.Marshal(&sm) if err != nil { return xerrors.Errorf("marshaling store meta: %w", err) } From 7175b1dd650f96effb3c6c51500a7dc95770e764 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 7 Jul 2020 11:38:22 +0200 Subject: [PATCH 06/12] gofmt --- api/api_full.go | 4 ++-- node/impl/client/client.go | 13 +++++++------ node/repo/fsrepo_ds.go | 2 +- node/repo/importmgr/mgr.go | 15 ++++++++------- node/repo/importmgr/multistore.go | 2 +- node/repo/importmgr/store.go | 6 +++--- storage/wdpost_run.go | 8 ++++---- 7 files changed, 26 insertions(+), 24 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index ca4e1d721..97d846ca4 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -355,10 +355,10 @@ type MinerSectors struct { } type Import struct { - Key int64 + Key int64 Err string - Root *cid.Cid + Root *cid.Cid Source string FilePath string } diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 6df7dcbeb..27210cbab 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -49,6 +49,7 @@ import ( ) var DefaultHashFunction = uint64(mh.BLAKE2B_MIN + 31) + const dealStartBuffer abi.ChainEpoch = 10000 // TODO: allow setting type API struct { @@ -306,13 +307,13 @@ func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, erro prefix.MhType = DefaultHashFunction params := ihelper.DagBuilderParams{ - Maxlinks: build.UnixfsLinksPerLevel, - RawLeaves: true, + Maxlinks: build.UnixfsLinksPerLevel, + RawLeaves: true, CidBuilder: cidutil.InlineBuilder{ Builder: prefix, Limit: 126, }, - Dagserv: bufferedDS, + Dagserv: bufferedDS, } db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize))) @@ -594,13 +595,13 @@ func (a *API) clientImport(ctx context.Context, ref api.FileRef, store *importmg prefix.MhType = DefaultHashFunction params := ihelper.DagBuilderParams{ - Maxlinks: build.UnixfsLinksPerLevel, - RawLeaves: true, + Maxlinks: build.UnixfsLinksPerLevel, + RawLeaves: true, CidBuilder: cidutil.InlineBuilder{ Builder: prefix, Limit: 126, }, - Dagserv: bufDs, + Dagserv: bufDs, } db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize))) diff --git a/node/repo/fsrepo_ds.go b/node/repo/fsrepo_ds.go index c0a57d5af..a92431307 100644 --- a/node/repo/fsrepo_ds.go +++ b/node/repo/fsrepo_ds.go @@ -238,4 +238,4 @@ func (fsr *fsLockedRepo) DeleteDatastore(ns string) error { } return nil -} \ No newline at end of file +} diff --git a/node/repo/importmgr/mgr.go b/node/repo/importmgr/mgr.go index 9d658c24d..74aadad40 100644 --- a/node/repo/importmgr/mgr.go +++ b/node/repo/importmgr/mgr.go @@ -12,24 +12,25 @@ import ( ) type Mgr struct { - mds *MultiStore - ds datastore.Batching + mds *MultiStore + ds datastore.Batching Blockstore blockstore.Blockstore } type Label string + const ( - LSource = "source" // Function which created the import - LRootCid = "root" // Root CID + LSource = "source" // Function which created the import + LRootCid = "root" // Root CID LFileName = "filename" // Local file path - LMTime = "mtime" // File modification timestamp + LMTime = "mtime" // File modification timestamp ) func New(mds *MultiStore, ds datastore.Batching) *Mgr { return &Mgr{ mds: mds, - Blockstore: &multiReadBs{ + Blockstore: &multiReadBs{ mds: mds, }, @@ -99,4 +100,4 @@ func (m *Mgr) Info(id int64) (*StoreMeta, error) { } // m.Info -// m.Delete \ No newline at end of file +// m.Delete diff --git a/node/repo/importmgr/multistore.go b/node/repo/importmgr/multistore.go index 32853148e..705a3c947 100644 --- a/node/repo/importmgr/multistore.go +++ b/node/repo/importmgr/multistore.go @@ -19,7 +19,7 @@ type dsProvider interface { } type MultiStore struct { - provider dsProvider + provider dsProvider namespace string open map[int64]*Store diff --git a/node/repo/importmgr/store.go b/node/repo/importmgr/store.go index a08974262..78bb7462b 100644 --- a/node/repo/importmgr/store.go +++ b/node/repo/importmgr/store.go @@ -37,15 +37,15 @@ func openStore(ds datastore.Batching) (*Store, error) { dag := merkledag.NewDAGService(bsvc) return &Store{ - ds: ds, + ds: ds, fm: fm, Fstore: fstore, Bstore: ibs, - bsvc: bsvc, - DAG: dag, + bsvc: bsvc, + DAG: dag, }, nil } diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index 5ee74915f..f6ec64583 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -150,16 +150,16 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, deadline if err != nil { return xerrors.Errorf("checking unrecovered sectors: %w", err) } - + // if all sectors failed to recover, don't declare recoveries sbfCount, err := sbf.Count() if err != nil { return xerrors.Errorf("counting recovered sectors: %w", err) } - + if sbfCount == 0 { - log.Warnw("No recoveries to declare", "deadline", deadline, "faulty", uc) - return nil + log.Warnw("No recoveries to declare", "deadline", deadline, "faulty", uc) + return nil } params := &miner.DeclareFaultsRecoveredParams{ From 18fc3337ff033d2beef9bf433a48533b7b907e9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 7 Jul 2020 13:45:02 +0200 Subject: [PATCH 07/12] client: API/Command to drop imports --- api/api_full.go | 9 ++++++- api/apistruct/struct.go | 9 +++++-- cli/client.go | 52 +++++++++++++++++++++++++++++++++++++- node/impl/client/client.go | 21 ++++++++++----- node/repo/fsrepo_ds.go | 2 ++ node/repo/importmgr/mgr.go | 13 ++++++++-- 6 files changed, 93 insertions(+), 13 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index 97d846ca4..b2539537c 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -190,7 +190,9 @@ type FullNode interface { // retrieval markets as a client // ClientImport imports file under the specified path into filestore. - ClientImport(ctx context.Context, ref FileRef) (cid.Cid, error) + ClientImport(ctx context.Context, ref FileRef) (*ImportRes, error) + // ClientRemoveImport removes file import + ClientRemoveImport(ctx context.Context, importID int64) error // ClientStartDeal proposes a deal with a miner. ClientStartDeal(ctx context.Context, params *StartDealParams) (*cid.Cid, error) // ClientGetDealInfo returns the latest information about a given deal. @@ -354,6 +356,11 @@ type MinerSectors struct { Pset uint64 } +type ImportRes struct { + Root cid.Cid + ImportID int64 +} + type Import struct { Key int64 Err string diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index e93707a8e..2d49a6bf5 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -109,8 +109,9 @@ type FullNodeStruct struct { WalletImport func(context.Context, *types.KeyInfo) (address.Address, error) `perm:"admin"` WalletDelete func(context.Context, address.Address) error `perm:"write"` - ClientImport func(ctx context.Context, ref api.FileRef) (cid.Cid, error) `perm:"admin"` + ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"` ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"` + ClientRemoveImport func(ctx context.Context, importID int64) error `perm:"admin"` ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` ClientFindData func(ctx context.Context, root cid.Cid) ([]api.QueryOffer, error) `perm:"read"` ClientMinerQueryOffer func(ctx context.Context, root cid.Cid, miner address.Address) (api.QueryOffer, error) `perm:"read"` @@ -340,7 +341,11 @@ func (c *FullNodeStruct) ClientListImports(ctx context.Context) ([]api.Import, e return c.Internal.ClientListImports(ctx) } -func (c *FullNodeStruct) ClientImport(ctx context.Context, ref api.FileRef) (cid.Cid, error) { +func (c *FullNodeStruct) ClientRemoveImport(ctx context.Context, importID int64) error { + return c.Internal.ClientRemoveImport(ctx, importID) +} + +func (c *FullNodeStruct) ClientImport(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) { return c.Internal.ClientImport(ctx, ref) } diff --git a/cli/client.go b/cli/client.go index 6de47c314..aec65ba03 100644 --- a/cli/client.go +++ b/cli/client.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "sort" "strconv" "text/tabwriter" @@ -55,6 +56,7 @@ var clientCmd = &cli.Command{ Usage: "Make deals, store data, retrieve data", Subcommands: []*cli.Command{ clientImportCmd, + clientDropCmd, clientCommPCmd, clientLocalCmd, clientDealCmd, @@ -75,6 +77,11 @@ var clientImportCmd = &cli.Command{ Name: "car", Usage: "import from a car file instead of a regular file", }, + &cli.BoolFlag{ + Name: "quiet", + Aliases: []string{"q"}, + Usage: "Output root CID only", + }, &CidBaseFlag, }, Action: func(cctx *cli.Context) error { @@ -103,7 +110,46 @@ var clientImportCmd = &cli.Command{ return err } - fmt.Println(encoder.Encode(c)) + if !cctx.Bool("quiet") { + fmt.Printf("Import %d, Root ", c.ImportID) + } + fmt.Println(encoder.Encode(c.Root)) + + return nil + }, +} + +var clientDropCmd = &cli.Command{ + Name: "drop", + Usage: "Remove import", + ArgsUsage: "[import ID...]", + Action: func(cctx *cli.Context) error { + if !cctx.Args().Present() { + return xerrors.Errorf("no imports specified") + } + + api, closer, err := GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + + var ids []int64 + for i, s := range cctx.Args().Slice() { + id, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return xerrors.Errorf("parsing %d-th import ID: %w", i, err) + } + + ids = append(ids, id) + } + + for _, id := range ids { + if err := api.ClientRemoveImport(ctx, id); err != nil { + return xerrors.Errorf("removing import %d: %w", id, err) + } + } return nil }, @@ -203,6 +249,10 @@ var clientLocalCmd = &cli.Command{ return err } + sort.Slice(list, func(i, j int) bool { + return list[i].Key < list[j].Key + }) + for _, v := range list { cidStr := "" if v.Root != nil { diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 27210cbab..8026d7e7c 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -262,29 +262,36 @@ func (a *API) makeRetrievalQuery(ctx context.Context, rp rm.RetrievalPeer, paylo } } -func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (cid.Cid, error) { +func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) { id, st, err := a.imgr().NewStore() if err != nil { - return cid.Undef, err + return nil, err } if err := a.imgr().AddLabel(id, importmgr.LSource, "import"); err != nil { - return cid.Undef, err + return nil, err } if err := a.imgr().AddLabel(id, importmgr.LFileName, ref.Path); err != nil { - return cid.Undef, err + return nil, err } nd, err := a.clientImport(ctx, ref, st) if err != nil { - return cid.Undef, err + return nil, err } if err := a.imgr().AddLabel(id, importmgr.LRootCid, nd.String()); err != nil { - return cid.Undef, err + return nil, err } - return nd, nil + return &api.ImportRes{ + Root: nd, + ImportID: id, + }, nil +} + +func (a *API) ClientRemoveImport(ctx context.Context, importID int64) error { + return a.imgr().Remove(importID) } func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, error) { diff --git a/node/repo/fsrepo_ds.go b/node/repo/fsrepo_ds.go index a92431307..ba3e567b0 100644 --- a/node/repo/fsrepo_ds.go +++ b/node/repo/fsrepo_ds.go @@ -226,6 +226,8 @@ func (fsr *fsLockedRepo) DeleteDatastore(ns string) error { return xerrors.Errorf("no multi-datastore with at index (namespace %s)", ns) } + delete(mds, idx) + if err := ds.Close(); err != nil { return xerrors.Errorf("closing datastore: %w", err) } diff --git a/node/repo/importmgr/mgr.go b/node/repo/importmgr/mgr.go index 74aadad40..464f4ac33 100644 --- a/node/repo/importmgr/mgr.go +++ b/node/repo/importmgr/mgr.go @@ -99,5 +99,14 @@ func (m *Mgr) Info(id int64) (*StoreMeta, error) { return &sm, nil } -// m.Info -// m.Delete +func (m *Mgr) Remove(id int64) error { + if err := m.mds.Delete(id); err != nil { + return xerrors.Errorf("removing import: %w", err) + } + + if err := m.ds.Delete(datastore.NewKey(fmt.Sprintf("%d", id))); err != nil { + return xerrors.Errorf("removing import metadata: %w", err) + } + + return nil +} From d370e0ae2db3fc7aa3382fd8054426ad2f7d2fb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 7 Jul 2020 13:48:13 +0200 Subject: [PATCH 08/12] Go is hard --- node/repo/fsrepo_ds.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/repo/fsrepo_ds.go b/node/repo/fsrepo_ds.go index ba3e567b0..745645c9b 100644 --- a/node/repo/fsrepo_ds.go +++ b/node/repo/fsrepo_ds.go @@ -83,7 +83,7 @@ func (fsr *fsLockedRepo) openMultiDatastores() (map[string]map[int64]datastore.B out[p] = map[int64]datastore.Batching{} for _, info := range di { - path = filepath.Join(path, info.Name()) + path := filepath.Join(path, info.Name()) prefix := datastore.NewKey(p) From 55f089d997aafab258f5d151e8efc9852c3a834d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 7 Jul 2020 14:35:02 +0200 Subject: [PATCH 09/12] client: Unbreak retrieval --- node/impl/client/client.go | 16 ++++++++++------ node/modules/client.go | 9 +++++++-- node/repo/importmgr/multistore.go | 2 +- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 8026d7e7c..e4e1aa7f9 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -66,6 +66,8 @@ type API struct { Chain *store.ChainStore Imports dtypes.ClientImportMgr + + RetBstore dtypes.ClientBlockstore // TODO: try to remove } func calcDealExpiration(minDuration uint64, md *miner.DeadlineInfo, startEpoch abi.ChainEpoch) abi.ChainEpoch { @@ -384,13 +386,13 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref return xerrors.Errorf("cannot make retrieval deal for zero bytes") } - id, st, err := a.imgr().NewStore() + /*id, st, err := a.imgr().NewStore() if err != nil { return err } if err := a.imgr().AddLabel(id, "source", "retrieval"); err != nil { return err - } + }*/ retrievalResult := make(chan error, 1) @@ -429,7 +431,7 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref ppb := types.BigDiv(order.Total, types.NewInt(order.Size)) - _, err = a.Retrieval.Retrieve( + _, err := a.Retrieval.Retrieve( ctx, order.Root, rm.NewParamsV0(ppb, order.PaymentInterval, order.PaymentIntervalIncrease), @@ -456,23 +458,25 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref return nil } + rdag := merkledag.NewDAGService(blockservice.New(a.RetBstore, offline.Exchange(a.RetBstore))) + if ref.IsCAR { f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return err } - err = car.WriteCar(ctx, st.DAG, []cid.Cid{order.Root}, f) + err = car.WriteCar(ctx, rdag, []cid.Cid{order.Root}, f) if err != nil { return err } return f.Close() } - nd, err := st.DAG.Get(ctx, order.Root) + nd, err := rdag.Get(ctx, order.Root) if err != nil { return xerrors.Errorf("ClientRetrieve: %w", err) } - file, err := unixfile.NewUnixfsFile(ctx, st.DAG, nd) + file, err := unixfile.NewUnixfsFile(ctx, rdag, nd) if err != nil { return xerrors.Errorf("ClientRetrieve: %w", err) } diff --git a/node/modules/client.go b/node/modules/client.go index da0f3be06..55a6ae196 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -2,7 +2,7 @@ package modules import ( "context" - + "github.com/filecoin-project/lotus/lib/bufbstore" blockstore "github.com/ipfs/go-ipfs-blockstore" "github.com/libp2p/go-libp2p-core/host" "go.uber.org/fx" @@ -50,7 +50,12 @@ func ClientImportMgr(mds dtypes.ClientMultiDstore, ds dtypes.MetadataDS) dtypes. } func ClientBlockstore(imgr dtypes.ClientImportMgr) dtypes.ClientBlockstore { - return blockstore.NewIdStore(imgr.Blockstore) + // TODO: This isn't.. the best + // - If it's easy to pass per-retrieval blockstores with markets we don't need this + // - If it's not easy, we need to store this in a separate datastore on disk + defaultWrite := blockstore.NewBlockstore(datastore.NewMapDatastore()) + + return blockstore.NewIdStore(bufbstore.NewTieredBstore(imgr.Blockstore, defaultWrite)) } // RegisterClientValidator is an initialization hook that registers the client diff --git a/node/repo/importmgr/multistore.go b/node/repo/importmgr/multistore.go index 705a3c947..31bf7be35 100644 --- a/node/repo/importmgr/multistore.go +++ b/node/repo/importmgr/multistore.go @@ -18,7 +18,7 @@ type dsProvider interface { DeleteDatastore(namespace string) error } -type MultiStore struct { +type MultiStore struct { provider dsProvider namespace string From 802f16c542721f5f4d2f7acce6bb7c306c4c8344 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 8 Jul 2020 23:05:43 +0200 Subject: [PATCH 10/12] client: Reenable nocopy on import --- node/impl/client/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/node/impl/client/client.go b/node/impl/client/client.go index f8d3ec45e..4afea8e30 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -323,6 +323,7 @@ func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, erro Limit: 126, }, Dagserv: bufferedDS, + NoCopy: true, } db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize))) @@ -613,6 +614,7 @@ func (a *API) clientImport(ctx context.Context, ref api.FileRef, store *importmg Limit: 126, }, Dagserv: bufDs, + NoCopy: true, } db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize))) From 5a117d8edff4e47c3bad507b9593df17e72d9009 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 8 Jul 2020 23:05:54 +0200 Subject: [PATCH 11/12] gofmt --- node/impl/client/client.go | 2 +- node/repo/importmgr/multistore.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 4afea8e30..da6e9ba0a 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -323,7 +323,7 @@ func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, erro Limit: 126, }, Dagserv: bufferedDS, - NoCopy: true, + NoCopy: true, } db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize))) diff --git a/node/repo/importmgr/multistore.go b/node/repo/importmgr/multistore.go index 31bf7be35..705a3c947 100644 --- a/node/repo/importmgr/multistore.go +++ b/node/repo/importmgr/multistore.go @@ -18,7 +18,7 @@ type dsProvider interface { DeleteDatastore(namespace string) error } -type MultiStore struct { +type MultiStore struct { provider dsProvider namespace string From cc09c5b6d90bf9dfb4fabeafd314113f25690a63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 8 Jul 2020 23:13:14 +0200 Subject: [PATCH 12/12] client: Don't use filestore for local improts --- node/impl/client/client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/node/impl/client/client.go b/node/impl/client/client.go index da6e9ba0a..afffb9094 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -323,7 +323,6 @@ func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, erro Limit: 126, }, Dagserv: bufferedDS, - NoCopy: true, } db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize)))