diff --git a/api/api_full.go b/api/api_full.go index 01c15176b..849b165d7 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -5,7 +5,6 @@ import ( "time" "github.com/ipfs/go-cid" - "github.com/ipfs/go-filestore" "github.com/libp2p/go-libp2p-core/peer" "github.com/filecoin-project/go-address" @@ -192,7 +191,9 @@ type FullNode interface { // retrieval markets as a client // ClientImport imports file under the specified path into filestore. - ClientImport(ctx context.Context, ref FileRef) (cid.Cid, error) + ClientImport(ctx context.Context, ref FileRef) (*ImportRes, error) + // ClientRemoveImport removes file import + ClientRemoveImport(ctx context.Context, importID int64) error // ClientStartDeal proposes a deal with a miner. ClientStartDeal(ctx context.Context, params *StartDealParams) (*cid.Cid, error) // ClientGetDealInfo returns the latest information about a given deal. @@ -360,11 +361,18 @@ type MinerSectors struct { Pset uint64 } +type ImportRes struct { + Root cid.Cid + ImportID int64 +} + type Import struct { - Status filestore.Status - Key cid.Cid + Key int64 + Err string + + Root *cid.Cid + Source string FilePath string - Size uint64 } type DealInfo struct { diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 2f7d17539..c23c7329a 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -111,8 +111,9 @@ type FullNodeStruct struct { WalletImport func(context.Context, *types.KeyInfo) (address.Address, error) `perm:"admin"` WalletDelete func(context.Context, address.Address) error `perm:"write"` - ClientImport func(ctx context.Context, ref api.FileRef) (cid.Cid, error) `perm:"admin"` + ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"` ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"` + ClientRemoveImport func(ctx context.Context, importID int64) error `perm:"admin"` ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` ClientFindData func(ctx context.Context, root cid.Cid) ([]api.QueryOffer, error) `perm:"read"` ClientMinerQueryOffer func(ctx context.Context, root cid.Cid, miner address.Address) (api.QueryOffer, error) `perm:"read"` @@ -345,7 +346,11 @@ func (c *FullNodeStruct) ClientListImports(ctx context.Context) ([]api.Import, e return c.Internal.ClientListImports(ctx) } -func (c *FullNodeStruct) ClientImport(ctx context.Context, ref api.FileRef) (cid.Cid, error) { +func (c *FullNodeStruct) ClientRemoveImport(ctx context.Context, importID int64) error { + return c.Internal.ClientRemoveImport(ctx, importID) +} + +func (c *FullNodeStruct) ClientImport(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) { return c.Internal.ClientImport(ctx, ref) } diff --git a/cli/client.go b/cli/client.go index 5e8d61e83..4aaa789ba 100644 --- a/cli/client.go +++ b/cli/client.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "sort" "strconv" "text/tabwriter" @@ -55,6 +56,7 @@ var clientCmd = &cli.Command{ Usage: "Make deals, store data, retrieve data", Subcommands: []*cli.Command{ clientImportCmd, + clientDropCmd, clientCommPCmd, clientLocalCmd, clientDealCmd, @@ -75,6 +77,11 @@ var clientImportCmd = &cli.Command{ Name: "car", Usage: "import from a car file instead of a regular file", }, + &cli.BoolFlag{ + Name: "quiet", + Aliases: []string{"q"}, + Usage: "Output root CID only", + }, &CidBaseFlag, }, Action: func(cctx *cli.Context) error { @@ -103,7 +110,46 @@ var clientImportCmd = &cli.Command{ return err } - fmt.Println(encoder.Encode(c)) + if !cctx.Bool("quiet") { + fmt.Printf("Import %d, Root ", c.ImportID) + } + fmt.Println(encoder.Encode(c.Root)) + + return nil + }, +} + +var clientDropCmd = &cli.Command{ + Name: "drop", + Usage: "Remove import", + ArgsUsage: "[import ID...]", + Action: func(cctx *cli.Context) error { + if !cctx.Args().Present() { + return xerrors.Errorf("no imports specified") + } + + api, closer, err := GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + + var ids []int64 + for i, s := range cctx.Args().Slice() { + id, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return xerrors.Errorf("parsing %d-th import ID: %w", i, err) + } + + ids = append(ids, id) + } + + for _, id := range ids { + if err := api.ClientRemoveImport(ctx, id); err != nil { + return xerrors.Errorf("removing import %d: %w", id, err) + } + } return nil }, @@ -203,8 +249,20 @@ var clientLocalCmd = &cli.Command{ return err } + sort.Slice(list, func(i, j int) bool { + return list[i].Key < list[j].Key + }) + for _, v := range list { - fmt.Printf("%s %s %s %s\n", encoder.Encode(v.Key), v.FilePath, types.SizeStr(types.NewInt(v.Size)), v.Status) + cidStr := "" + if v.Root != nil { + cidStr = encoder.Encode(*v.Root) + } + + fmt.Printf("%d: %s @%s (%s)\n", v.Key, cidStr, v.FilePath, v.Source) + if v.Err != "" { + fmt.Printf("\terror: %s\n", v.Err) + } } return nil }, diff --git a/node/builder.go b/node/builder.go index a438bf5bd..713d338fb 100644 --- a/node/builder.go +++ b/node/builder.go @@ -234,7 +234,6 @@ func Online() Option { Override(new(dtypes.ChainGCBlockstore), modules.ChainGCBlockstore), Override(new(dtypes.ChainExchange), modules.ChainExchange), Override(new(dtypes.ChainBlockService), modules.ChainBlockservice), - Override(new(dtypes.ClientDAG), testing.MemoryClientDag), // Filecoin services Override(new(*chain.Syncer), modules.NewSyncer), @@ -440,9 +439,10 @@ func Repo(r repo.Repo) Option { Override(new(dtypes.MetadataDS), modules.Datastore), Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore), - Override(new(dtypes.ClientFilestore), modules.ClientFstore), + Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr), + Override(new(dtypes.ClientMultiDstore), modules.ClientMultiDatastore), + Override(new(dtypes.ClientBlockstore), modules.ClientBlockstore), - Override(new(dtypes.ClientDAG), modules.ClientDAG), Override(new(ci.PrivKey), lp2p.PrivKey), Override(new(ci.PubKey), ci.PrivKey.GetPublic), diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 1d9855e26..afffb9094 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -2,15 +2,7 @@ package client import ( "context" - "errors" "fmt" - - "github.com/filecoin-project/go-fil-markets/pieceio" - basicnode "github.com/ipld/go-ipld-prime/node/basic" - "github.com/ipld/go-ipld-prime/traversal/selector" - "github.com/ipld/go-ipld-prime/traversal/selector/builder" - "github.com/multiformats/go-multiaddr" - "io" "os" @@ -18,7 +10,7 @@ import ( "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" - "github.com/ipfs/go-filestore" + "github.com/ipfs/go-cidutil" chunker "github.com/ipfs/go-ipfs-chunker" offline "github.com/ipfs/go-ipfs-exchange-offline" files "github.com/ipfs/go-ipfs-files" @@ -28,10 +20,16 @@ import ( "github.com/ipfs/go-unixfs/importer/balanced" ihelper "github.com/ipfs/go-unixfs/importer/helpers" "github.com/ipld/go-car" + basicnode "github.com/ipld/go-ipld-prime/node/basic" + "github.com/ipld/go-ipld-prime/traversal/selector" + "github.com/ipld/go-ipld-prime/traversal/selector/builder" "github.com/libp2p/go-libp2p-core/peer" + "github.com/multiformats/go-multiaddr" + mh "github.com/multiformats/go-multihash" "go.uber.org/fx" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-markets/pieceio" rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/sector-storage/ffiwrapper" @@ -47,8 +45,11 @@ import ( "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/impl/paych" "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/repo/importmgr" ) +var DefaultHashFunction = uint64(mh.BLAKE2B_MIN + 31) + const dealStartBuffer abi.ChainEpoch = 10000 // TODO: allow setting type API struct { @@ -64,9 +65,9 @@ type API struct { Retrieval rm.RetrievalClient Chain *store.ChainStore - LocalDAG dtypes.ClientDAG - Blockstore dtypes.ClientBlockstore - Filestore dtypes.ClientFilestore `optional:"true"` + Imports dtypes.ClientImportMgr + + RetBstore dtypes.ClientBlockstore // TODO: try to remove } func calcDealExpiration(minDuration uint64, md *miner.DeadlineInfo, startEpoch abi.ChainEpoch) abi.ChainEpoch { @@ -77,6 +78,10 @@ func calcDealExpiration(minDuration uint64, md *miner.DeadlineInfo, startEpoch a return minExp + miner.WPoStProvingPeriod - (minExp % miner.WPoStProvingPeriod) + (md.PeriodStart % miner.WPoStProvingPeriod) - 1 } +func (a *API) imgr() *importmgr.Mgr { + return a.Imports +} + func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) { exist, err := a.WalletHas(ctx, params.Wallet) if err != nil { @@ -195,7 +200,7 @@ func (a *API) ClientGetDealInfo(ctx context.Context, d cid.Cid) (*api.DealInfo, func (a *API) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) { // TODO: check if we have the ENTIRE dag - offExch := merkledag.NewDAGService(blockservice.New(a.Blockstore, offline.Exchange(a.Blockstore))) + offExch := merkledag.NewDAGService(blockservice.New(a.Imports.Blockstore, offline.Exchange(a.Imports.Blockstore))) _, err := offExch.Get(ctx, root) if err == ipld.ErrNotFound { return false, nil @@ -259,28 +264,65 @@ func (a *API) makeRetrievalQuery(ctx context.Context, rp rm.RetrievalPeer, paylo } } -func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (cid.Cid, error) { - - bufferedDS := ipld.NewBufferedDAG(ctx, a.LocalDAG) - nd, err := a.clientImport(ref, bufferedDS) - +func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) { + id, st, err := a.imgr().NewStore() if err != nil { - return cid.Undef, err + return nil, err + } + if err := a.imgr().AddLabel(id, importmgr.LSource, "import"); err != nil { + return nil, err } - return nd, nil + if err := a.imgr().AddLabel(id, importmgr.LFileName, ref.Path); err != nil { + return nil, err + } + + nd, err := a.clientImport(ctx, ref, st) + if err != nil { + return nil, err + } + + if err := a.imgr().AddLabel(id, importmgr.LRootCid, nd.String()); err != nil { + return nil, err + } + + return &api.ImportRes{ + Root: nd, + ImportID: id, + }, nil +} + +func (a *API) ClientRemoveImport(ctx context.Context, importID int64) error { + return a.imgr().Remove(importID) } func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, error) { file := files.NewReaderFile(f) - bufferedDS := ipld.NewBufferedDAG(ctx, a.LocalDAG) + id, st, err := a.imgr().NewStore() + if err != nil { + return cid.Undef, err + } + if err := a.imgr().AddLabel(id, "source", "import-local"); err != nil { + return cid.Cid{}, err + } + + bufferedDS := ipld.NewBufferedDAG(ctx, st.DAG) + + prefix, err := merkledag.PrefixForCidVersion(1) + if err != nil { + return cid.Undef, err + } + prefix.MhType = DefaultHashFunction params := ihelper.DagBuilderParams{ - Maxlinks: build.UnixfsLinksPerLevel, - RawLeaves: true, - CidBuilder: nil, - Dagserv: bufferedDS, + Maxlinks: build.UnixfsLinksPerLevel, + RawLeaves: true, + CidBuilder: cidutil.InlineBuilder{ + Builder: prefix, + Limit: 126, + }, + Dagserv: bufferedDS, } db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize))) @@ -296,49 +338,38 @@ func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, erro } func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) { - if a.Filestore == nil { - return nil, errors.New("listing imports is not supported with in-memory dag yet") - } - next, err := filestore.ListAll(a.Filestore, false) - if err != nil { - return nil, err - } + importIDs := a.imgr().List() - // TODO: make this less very bad by tracking root cids instead of using ListAll - - out := make([]api.Import, 0) - lowest := make([]uint64, 0) - for { - r := next() - if r == nil { - return out, nil + out := make([]api.Import, len(importIDs)) + for i, id := range importIDs { + info, err := a.imgr().Info(id) + if err != nil { + out[i] = api.Import{ + Key: id, + Err: xerrors.Errorf("getting info: %w", err).Error(), + } + continue } - matched := false - for i := range out { - if out[i].FilePath == r.FilePath { - matched = true - if lowest[i] > r.Offset { - lowest[i] = r.Offset - out[i] = api.Import{ - Status: r.Status, - Key: r.Key, - FilePath: r.FilePath, - Size: r.Size, - } - } - break + + ai := api.Import{ + Key: id, + Source: info.Labels[importmgr.LSource], + FilePath: info.Labels[importmgr.LFileName], + } + + if info.Labels[importmgr.LRootCid] != "" { + c, err := cid.Parse(info.Labels[importmgr.LRootCid]) + if err != nil { + ai.Err = err.Error() + } else { + ai.Root = &c } } - if !matched { - out = append(out, api.Import{ - Status: r.Status, - Key: r.Key, - FilePath: r.FilePath, - Size: r.Size, - }) - lowest = append(lowest, r.Offset) - } + + out[i] = ai } + + return out, nil } func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error { @@ -348,13 +379,21 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref return err } - order.MinerPeerID = peer.ID(mi.PeerId) + order.MinerPeerID = mi.PeerId } if order.Size == 0 { return xerrors.Errorf("cannot make retrieval deal for zero bytes") } + /*id, st, err := a.imgr().NewStore() + if err != nil { + return err + } + if err := a.imgr().AddLabel(id, "source", "retrieval"); err != nil { + return err + }*/ + retrievalResult := make(chan error, 1) unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) { @@ -399,7 +438,7 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref order.Total, order.MinerPeerID, order.Client, - order.Miner) + order.Miner) // TODO: pass the store here somehow if err != nil { return xerrors.Errorf("Retrieve failed: %w", err) } @@ -419,23 +458,25 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref return nil } + rdag := merkledag.NewDAGService(blockservice.New(a.RetBstore, offline.Exchange(a.RetBstore))) + if ref.IsCAR { f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return err } - err = car.WriteCar(ctx, a.LocalDAG, []cid.Cid{order.Root}, f) + err = car.WriteCar(ctx, rdag, []cid.Cid{order.Root}, f) if err != nil { return err } return f.Close() } - nd, err := a.LocalDAG.Get(ctx, order.Root) + nd, err := rdag.Get(ctx, order.Root) if err != nil { return xerrors.Errorf("ClientRetrieve: %w", err) } - file, err := unixfile.NewUnixfsFile(ctx, a.LocalDAG, nd) + file, err := unixfile.NewUnixfsFile(ctx, rdag, nd) if err != nil { return xerrors.Errorf("ClientRetrieve: %w", err) } @@ -485,14 +526,22 @@ func (a *API) ClientCalcCommP(ctx context.Context, inpath string, miner address. } func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath string) error { + id, st, err := a.imgr().NewStore() + if err != nil { + return err + } + if err := a.imgr().AddLabel(id, "source", "gen-car"); err != nil { + return err + } - bufferedDS := ipld.NewBufferedDAG(ctx, a.LocalDAG) - c, err := a.clientImport(ref, bufferedDS) + bufferedDS := ipld.NewBufferedDAG(ctx, st.DAG) + c, err := a.clientImport(ctx, ref, st) if err != nil { return err } + // TODO: does that defer mean to remove the whole blockstore? defer bufferedDS.Remove(ctx, c) //nolint:errcheck ssb := builder.NewSelectorSpecBuilder(basicnode.Style.Any) @@ -505,7 +554,7 @@ func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath stri return err } - sc := car.NewSelectiveCar(ctx, a.Blockstore, []car.Dag{{Root: c, Selector: allSelector}}) + sc := car.NewSelectiveCar(ctx, st.Bstore, []car.Dag{{Root: c, Selector: allSelector}}) if err = sc.Write(f); err != nil { return err } @@ -513,7 +562,7 @@ func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath stri return f.Close() } -func (a *API) clientImport(ref api.FileRef, bufferedDS *ipld.BufferedDAG) (cid.Cid, error) { +func (a *API) clientImport(ctx context.Context, ref api.FileRef, store *importmgr.Store) (cid.Cid, error) { f, err := os.Open(ref.Path) if err != nil { return cid.Undef, err @@ -530,13 +579,13 @@ func (a *API) clientImport(ref api.FileRef, bufferedDS *ipld.BufferedDAG) (cid.C } if ref.IsCAR { - var store car.Store - if a.Filestore == nil { - store = a.Blockstore + var st car.Store + if store.Fstore == nil { + st = store.Bstore } else { - store = (*filestore.Filestore)(a.Filestore) + st = store.Fstore } - result, err := car.LoadCar(store, file) + result, err := car.LoadCar(st, file) if err != nil { return cid.Undef, err } @@ -548,12 +597,23 @@ func (a *API) clientImport(ref api.FileRef, bufferedDS *ipld.BufferedDAG) (cid.C return result.Roots[0], nil } + bufDs := ipld.NewBufferedDAG(ctx, store.DAG) + + prefix, err := merkledag.PrefixForCidVersion(1) + if err != nil { + return cid.Undef, err + } + prefix.MhType = DefaultHashFunction + params := ihelper.DagBuilderParams{ - Maxlinks: build.UnixfsLinksPerLevel, - RawLeaves: true, - CidBuilder: nil, - Dagserv: bufferedDS, - NoCopy: true, + Maxlinks: build.UnixfsLinksPerLevel, + RawLeaves: true, + CidBuilder: cidutil.InlineBuilder{ + Builder: prefix, + Limit: 126, + }, + Dagserv: bufDs, + NoCopy: true, } db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize))) @@ -565,7 +625,7 @@ func (a *API) clientImport(ref api.FileRef, bufferedDS *ipld.BufferedDAG) (cid.C return cid.Undef, err } - if err := bufferedDS.Commit(); err != nil { + if err := bufDs.Commit(); err != nil { return cid.Undef, err } diff --git a/node/modules/client.go b/node/modules/client.go index 5a8e97a04..a7437f297 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -2,13 +2,11 @@ package modules import ( "context" - "path/filepath" + "github.com/filecoin-project/lotus/lib/bufbstore" "time" blockstore "github.com/ipfs/go-ipfs-blockstore" - "github.com/ipfs/go-merkledag" "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/routing" "go.uber.org/fx" dtimpl "github.com/filecoin-project/go-data-transfer/impl" @@ -24,44 +22,44 @@ import ( smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/go-storedcounter" - "github.com/ipfs/go-bitswap" - "github.com/ipfs/go-bitswap/network" - "github.com/ipfs/go-blockservice" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" - "github.com/ipfs/go-filestore" "github.com/filecoin-project/lotus/markets/retrievaladapter" "github.com/filecoin-project/lotus/node/impl/full" payapi "github.com/filecoin-project/lotus/node/impl/paych" "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" + "github.com/filecoin-project/lotus/node/repo/importmgr" "github.com/filecoin-project/lotus/paychmgr" ) -func ClientFstore(r repo.LockedRepo) (dtypes.ClientFilestore, error) { - clientds, err := r.Datastore("/client") - if err != nil { - return nil, err - } - blocks := namespace.Wrap(clientds, datastore.NewKey("blocks")) - - absPath, err := filepath.Abs(r.Path()) +func ClientMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.ClientMultiDstore, error) { + mds, err := importmgr.NewMultiDstore(r, "/client") if err != nil { return nil, err } - fm := filestore.NewFileManager(clientds, filepath.Dir(absPath)) - fm.AllowFiles = true - // TODO: fm.AllowUrls (needs more code in client import) + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return mds.Close() + }, + }) - bs := blockstore.NewBlockstore(blocks) - return filestore.NewFilestore(bs, fm), nil + return mds, nil } -func ClientBlockstore(fstore dtypes.ClientFilestore) dtypes.ClientBlockstore { - return blockstore.NewIdStore((*filestore.Filestore)(fstore)) +func ClientImportMgr(mds dtypes.ClientMultiDstore, ds dtypes.MetadataDS) dtypes.ClientImportMgr { + return importmgr.New(mds, namespace.Wrap(ds, datastore.NewKey("/client"))) +} + +func ClientBlockstore(imgr dtypes.ClientImportMgr) dtypes.ClientBlockstore { + // TODO: This isn't.. the best + // - If it's easy to pass per-retrieval blockstores with markets we don't need this + // - If it's not easy, we need to store this in a separate datastore on disk + defaultWrite := blockstore.NewBlockstore(datastore.NewMapDatastore()) + + return blockstore.NewIdStore(bufbstore.NewTieredBstore(imgr.Blockstore, defaultWrite)) } // RegisterClientValidator is an initialization hook that registers the client @@ -107,24 +105,6 @@ func NewClientDatastore(ds dtypes.MetadataDS) dtypes.ClientDatastore { return namespace.Wrap(ds, datastore.NewKey("/deals/client")) } -// ClientDAG is a DAGService for the ClientBlockstore -func ClientDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.ClientBlockstore, rt routing.Routing, h host.Host) dtypes.ClientDAG { - bitswapNetwork := network.NewFromIpfsHost(h, rt) - bitswapOptions := []bitswap.Option{bitswap.ProvideEnabled(false)} - exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, ibs, bitswapOptions...) - - bsvc := blockservice.New(ibs, exch) - dag := merkledag.NewDAGService(bsvc) - - lc.Append(fx.Hook{ - OnStop: func(_ context.Context) error { - return bsvc.Close() - }, - }) - - return dag -} - func NewClientRequestValidator(deals dtypes.ClientDealStore) dtypes.ClientRequestValidator { return requestvalidation.NewUnifiedRequestValidator(nil, deals) } diff --git a/node/modules/dtypes/storage.go b/node/modules/dtypes/storage.go index 3eba07e4e..7ede8aaab 100644 --- a/node/modules/dtypes/storage.go +++ b/node/modules/dtypes/storage.go @@ -1,18 +1,19 @@ package dtypes import ( - "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" bserv "github.com/ipfs/go-blockservice" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-filestore" "github.com/ipfs/go-graphsync" blockstore "github.com/ipfs/go-ipfs-blockstore" exchange "github.com/ipfs/go-ipfs-exchange-interface" format "github.com/ipfs/go-ipld-format" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-statestore" + "github.com/filecoin-project/lotus/node/repo/importmgr" ) // MetadataDS stores metadata @@ -26,9 +27,9 @@ type ChainGCBlockstore blockstore.GCBlockstore type ChainExchange exchange.Interface type ChainBlockService bserv.BlockService -type ClientFilestore *filestore.Filestore +type ClientMultiDstore *importmgr.MultiStore +type ClientImportMgr *importmgr.Mgr type ClientBlockstore blockstore.Blockstore -type ClientDAG format.DAGService type ClientDealStore *statestore.StateStore type ClientRequestValidator *requestvalidation.UnifiedRequestValidator type ClientDatastore datastore.Batching diff --git a/node/modules/ipfsclient.go b/node/modules/ipfsclient.go index f405cb4c6..93bedf598 100644 --- a/node/modules/ipfsclient.go +++ b/node/modules/ipfsclient.go @@ -4,7 +4,6 @@ import ( "go.uber.org/fx" "golang.org/x/xerrors" - "github.com/ipfs/go-filestore" blockstore "github.com/ipfs/go-ipfs-blockstore" "github.com/multiformats/go-multiaddr" @@ -18,8 +17,8 @@ import ( // If ipfsMaddr is empty, a local IPFS node is assumed considering IPFS_PATH configuration. // If ipfsMaddr is not empty, it will connect to the remote IPFS node with the provided multiaddress. // The flag useForRetrieval indicates if the IPFS node will also be used for storing retrieving deals. -func IpfsClientBlockstore(ipfsMaddr string, useForRetrieval bool) func(helpers.MetricsCtx, fx.Lifecycle, dtypes.ClientFilestore) (dtypes.ClientBlockstore, error) { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fstore dtypes.ClientFilestore) (dtypes.ClientBlockstore, error) { +func IpfsClientBlockstore(ipfsMaddr string, useForRetrieval bool) func(helpers.MetricsCtx, fx.Lifecycle, dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, localStore dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) { var err error var ipfsbs *ipfsbstore.IpfsBstore if ipfsMaddr != "" { @@ -38,7 +37,7 @@ func IpfsClientBlockstore(ipfsMaddr string, useForRetrieval bool) func(helpers.M var ws blockstore.Blockstore ws = ipfsbs if !useForRetrieval { - ws = blockstore.NewIdStore((*filestore.Filestore)(fstore)) + ws = blockstore.NewIdStore(localStore.Blockstore) } return bufbstore.NewTieredBstore(ipfsbs, ws), nil } diff --git a/node/modules/testing/storage.go b/node/modules/testing/storage.go deleted file mode 100644 index 525d86b06..000000000 --- a/node/modules/testing/storage.go +++ /dev/null @@ -1,39 +0,0 @@ -package testing - -import ( - "context" - - "github.com/ipfs/go-blockservice" - "github.com/ipfs/go-datastore" - dsync "github.com/ipfs/go-datastore/sync" - blockstore "github.com/ipfs/go-ipfs-blockstore" - offline "github.com/ipfs/go-ipfs-exchange-offline" - ipld "github.com/ipfs/go-ipld-format" - "github.com/ipfs/go-merkledag" - "go.uber.org/fx" -) - -func MapBlockstore() blockstore.Blockstore { - // TODO: proper datastore - bds := dsync.MutexWrap(datastore.NewMapDatastore()) - bs := blockstore.NewBlockstore(bds) - return blockstore.NewIdStore(bs) -} - -func MapDatastore() datastore.Batching { - return dsync.MutexWrap(datastore.NewMapDatastore()) -} - -func MemoryClientDag(lc fx.Lifecycle) ipld.DAGService { - ibs := blockstore.NewBlockstore(datastore.NewMapDatastore()) - bsvc := blockservice.New(ibs, offline.Exchange(ibs)) - dag := merkledag.NewDAGService(bsvc) - - lc.Append(fx.Hook{ - OnStop: func(_ context.Context) error { - return bsvc.Close() - }, - }) - - return dag -} diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index c1f0db9bb..682983415 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -226,9 +226,11 @@ type fsLockedRepo struct { repoType RepoType closer io.Closer - ds datastore.Batching - dsErr error - dsOnce sync.Once + ds map[string]datastore.Batching + multiDs map[string]map[int64]datastore.Batching + dsErr error + dsOnce sync.Once + dsLk sync.Mutex storageLk sync.Mutex configLk sync.Mutex @@ -245,8 +247,10 @@ func (fsr *fsLockedRepo) Close() error { return xerrors.Errorf("could not remove API file: %w", err) } if fsr.ds != nil { - if err := fsr.ds.Close(); err != nil { - return xerrors.Errorf("could not close datastore: %w", err) + for _, ds := range fsr.ds { + if err := ds.Close(); err != nil { + return xerrors.Errorf("could not close datastore: %w", err) + } } } diff --git a/node/repo/fsrepo_ds.go b/node/repo/fsrepo_ds.go index f58e23d29..473e4bafa 100644 --- a/node/repo/fsrepo_ds.go +++ b/node/repo/fsrepo_ds.go @@ -1,13 +1,13 @@ package repo import ( + "fmt" + "github.com/ipfs/go-datastore" + "golang.org/x/xerrors" + "io/ioutil" "os" "path/filepath" - - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/mount" - "github.com/ipfs/go-datastore/namespace" - "golang.org/x/xerrors" + "strconv" dgbadger "github.com/dgraph-io/badger/v2" badger "github.com/ipfs/go-ds-badger2" @@ -16,13 +16,18 @@ import ( ldbopts "github.com/syndtr/goleveldb/leveldb/opt" ) -var fsDatastores = map[string]func(path string) (datastore.Batching, error){ +type dsCtor func(path string) (datastore.Batching, error) + +var fsDatastores = map[string]dsCtor{ "chain": badgerDs, "metadata": levelDs, // Those need to be fast for large writes... but also need a really good GC :c "staging": badgerDs, // miner specific - "client": badgerDs, // client specific +} + +var fsMultiDatastores = map[string]dsCtor{ + "client": badgerDs, // client specific } func badgerDs(path string) (datastore.Batching, error) { @@ -39,12 +44,12 @@ func levelDs(path string) (datastore.Batching, error) { }) } -func (fsr *fsLockedRepo) openDatastore() (datastore.Batching, error) { +func (fsr *fsLockedRepo) openDatastores() (map[string]datastore.Batching, error) { if err := os.MkdirAll(fsr.join(fsDatastore), 0755); err != nil { return nil, xerrors.Errorf("mkdir %s: %w", fsr.join(fsDatastore), err) } - var mounts []mount.Mount + out := map[string]datastore.Batching{} for p, ctor := range fsDatastores { prefix := datastore.NewKey(p) @@ -57,21 +62,184 @@ func (fsr *fsLockedRepo) openDatastore() (datastore.Batching, error) { ds = measure.New("fsrepo."+p, ds) - mounts = append(mounts, mount.Mount{ - Prefix: prefix, - Datastore: ds, - }) + out[datastore.NewKey(p).String()] = ds } - return mount.New(mounts), nil + return out, nil +} + +func (fsr *fsLockedRepo) openMultiDatastores() (map[string]map[int64]datastore.Batching, error) { + out := map[string]map[int64]datastore.Batching{} + + for p, ctor := range fsMultiDatastores { + path := fsr.join(filepath.Join(fsDatastore, p)) + if err := os.MkdirAll(path, 0755); err != nil { + return nil, xerrors.Errorf("mkdir %s: %w", path, err) + } + + di, err := ioutil.ReadDir(path) + if err != nil { + return nil, xerrors.Errorf("readdir '%s': %w", path, err) + } + + out[p] = map[int64]datastore.Batching{} + + for _, info := range di { + path := filepath.Join(path, info.Name()) + + prefix := datastore.NewKey(p) + + id, err := strconv.ParseInt(info.Name(), 10, 64) + if err != nil { + log.Errorf("error parsing multi-datastore id for '%s': %w", path, err) + continue + } + + // TODO: optimization: don't init datastores we don't need + ds, err := ctor(path) + if err != nil { + return nil, xerrors.Errorf("opening datastore %s: %w", prefix, err) + } + + ds = measure.New("fsrepo."+p+"."+info.Name(), ds) + + out[p][id] = ds + } + } + + return out, nil +} + +func (fsr *fsLockedRepo) openMultiDatastore(ns string, idx int64) (datastore.Batching, error) { + ctor, ok := fsMultiDatastores[ns] + if !ok { + return nil, xerrors.Errorf("no multi-datastore with namespace '%s'", ns) + } + + si := fmt.Sprintf("%d", idx) + path := fsr.join(filepath.Join(fsDatastore, ns, si)) + + ds, err := ctor(path) + if err != nil { + return nil, xerrors.Errorf("opening datastore %s: %w", path, err) + } + + ds = measure.New("fsrepo."+ns+"."+si, ds) + + return ds, nil } func (fsr *fsLockedRepo) Datastore(ns string) (datastore.Batching, error) { fsr.dsOnce.Do(func() { - fsr.ds, fsr.dsErr = fsr.openDatastore() + var err error + fsr.ds, err = fsr.openDatastores() + if err != nil { + fsr.dsErr = err + return + } + + fsr.multiDs, fsr.dsErr = fsr.openMultiDatastores() }) if fsr.dsErr != nil { return nil, fsr.dsErr } - return namespace.Wrap(fsr.ds, datastore.NewKey(ns)), nil + ds, ok := fsr.ds[ns] + if ok { + return ds, nil + } + + k := datastore.NewKey(ns) + parts := k.List() + if len(parts) != 2 { + return nil, xerrors.Errorf("expected multi-datastore namespace to have 2 parts") + } + + fsr.dsLk.Lock() + defer fsr.dsLk.Unlock() + + mds, ok := fsr.multiDs[parts[0]] + if !ok { + return nil, xerrors.Errorf("no multi-datastore with namespace %s", ns) + } + + idx, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return nil, xerrors.Errorf("parsing mult-datastore index('%s'): %w", parts[1], err) + } + + ds, ok = mds[idx] + if !ok { + ds, err = fsr.openMultiDatastore(parts[0], idx) + if err != nil { + return nil, xerrors.Errorf("opening multi-datastore: %w", err) + } + + mds[idx] = ds + } + + return ds, nil +} + +func (fsr *fsLockedRepo) ListDatastores(ns string) ([]int64, error) { + k := datastore.NewKey(ns) + parts := k.List() + if len(parts) != 1 { + return nil, xerrors.Errorf("expected multi-datastore namespace to have 1 part") + } + + fsr.dsLk.Lock() + defer fsr.dsLk.Unlock() + + mds, ok := fsr.multiDs[parts[0]] + if !ok { + return nil, xerrors.Errorf("no multi-datastore with namespace %s", ns) + } + + out := make([]int64, 0, len(mds)) + for i := range mds { + out = append(out, i) + } + + return out, nil +} + +func (fsr *fsLockedRepo) DeleteDatastore(ns string) error { + k := datastore.NewKey(ns) + parts := k.List() + if len(parts) != 2 { + return xerrors.Errorf("expected multi-datastore namespace to have 2 parts") + } + + mds, ok := fsr.multiDs[parts[0]] + if !ok { + return xerrors.Errorf("no multi-datastore with namespace %s", ns) + } + + idx, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return xerrors.Errorf("parsing mult-datastore index('%s'): %w", parts[1], err) + } + + fsr.dsLk.Lock() + defer fsr.dsLk.Unlock() + + ds, ok := mds[idx] + if !ok { + return xerrors.Errorf("no multi-datastore with at index (namespace %s)", ns) + } + + delete(mds, idx) + + if err := ds.Close(); err != nil { + return xerrors.Errorf("closing datastore: %w", err) + } + + path := fsr.join(filepath.Join(fsDatastore, parts[0], parts[1])) + + log.Warnw("removing sub-datastore", "path", path, "namespace", ns) + if err := os.RemoveAll(path); err != nil { + return xerrors.Errorf("remove '%s': %w", path, err) + } + + return nil } diff --git a/node/repo/importmgr/mbstore.go b/node/repo/importmgr/mbstore.go new file mode 100644 index 000000000..889752cf2 --- /dev/null +++ b/node/repo/importmgr/mbstore.go @@ -0,0 +1,96 @@ +package importmgr + +import ( + "context" + + "github.com/hashicorp/go-multierror" + "golang.org/x/xerrors" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + blockstore "github.com/ipfs/go-ipfs-blockstore" +) + +type multiReadBs struct { + // TODO: some caching + mds *MultiStore +} + +func (m *multiReadBs) Has(cid cid.Cid) (bool, error) { + m.mds.lk.RLock() + defer m.mds.lk.RUnlock() + + var merr error + for i, store := range m.mds.open { + has, err := store.Bstore.Has(cid) + if err != nil { + merr = multierror.Append(merr, xerrors.Errorf("has (ds %d): %w", i, err)) + continue + } + if !has { + continue + } + + return true, nil + } + + return false, merr +} + +func (m *multiReadBs) Get(cid cid.Cid) (blocks.Block, error) { + m.mds.lk.RLock() + defer m.mds.lk.RUnlock() + + var merr error + for i, store := range m.mds.open { + has, err := store.Bstore.Has(cid) + if err != nil { + merr = multierror.Append(merr, xerrors.Errorf("has (ds %d): %w", i, err)) + continue + } + if !has { + continue + } + + val, err := store.Bstore.Get(cid) + if err != nil { + merr = multierror.Append(merr, xerrors.Errorf("get (ds %d): %w", i, err)) + continue + } + + return val, nil + } + + if merr == nil { + return nil, datastore.ErrNotFound + } + + return nil, merr +} + +func (m *multiReadBs) DeleteBlock(cid cid.Cid) error { + return xerrors.Errorf("operation not supported") +} + +func (m *multiReadBs) GetSize(cid cid.Cid) (int, error) { + return 0, xerrors.Errorf("operation not supported") +} + +func (m *multiReadBs) Put(block blocks.Block) error { + return xerrors.Errorf("operation not supported") +} + +func (m *multiReadBs) PutMany(blocks []blocks.Block) error { + return xerrors.Errorf("operation not supported") +} + +func (m *multiReadBs) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return nil, xerrors.Errorf("operation not supported") +} + +func (m *multiReadBs) HashOnRead(enabled bool) { + return +} + +var _ blockstore.Blockstore = &multiReadBs{} diff --git a/node/repo/importmgr/mgr.go b/node/repo/importmgr/mgr.go new file mode 100644 index 000000000..464f4ac33 --- /dev/null +++ b/node/repo/importmgr/mgr.go @@ -0,0 +1,112 @@ +package importmgr + +import ( + "encoding/json" + "fmt" + + "golang.org/x/xerrors" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + blockstore "github.com/ipfs/go-ipfs-blockstore" +) + +type Mgr struct { + mds *MultiStore + ds datastore.Batching + + Blockstore blockstore.Blockstore +} + +type Label string + +const ( + LSource = "source" // Function which created the import + LRootCid = "root" // Root CID + LFileName = "filename" // Local file path + LMTime = "mtime" // File modification timestamp +) + +func New(mds *MultiStore, ds datastore.Batching) *Mgr { + return &Mgr{ + mds: mds, + Blockstore: &multiReadBs{ + mds: mds, + }, + + ds: datastore.NewLogDatastore(namespace.Wrap(ds, datastore.NewKey("/stores")), "storess"), + } +} + +type StoreMeta struct { + Labels map[string]string +} + +func (m *Mgr) NewStore() (int64, *Store, error) { + id := m.mds.Next() + st, err := m.mds.Get(id) + if err != nil { + return 0, nil, err + } + + meta, err := json.Marshal(&StoreMeta{Labels: map[string]string{ + "source": "unknown", + }}) + if err != nil { + return 0, nil, xerrors.Errorf("marshaling empty store metadata: %w", err) + } + + err = m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta) + return id, st, err +} + +func (m *Mgr) AddLabel(id int64, key, value string) error { // source, file path, data CID.. + meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id))) + if err != nil { + return xerrors.Errorf("getting metadata form datastore: %w", err) + } + + var sm StoreMeta + if err := json.Unmarshal(meta, &sm); err != nil { + return xerrors.Errorf("unmarshaling store meta: %w", err) + } + + sm.Labels[key] = value + + meta, err = json.Marshal(&sm) + if err != nil { + return xerrors.Errorf("marshaling store meta: %w", err) + } + + return m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta) +} + +func (m *Mgr) List() []int64 { + return m.mds.List() +} + +func (m *Mgr) Info(id int64) (*StoreMeta, error) { + meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id))) + if err != nil { + return nil, xerrors.Errorf("getting metadata form datastore: %w", err) + } + + var sm StoreMeta + if err := json.Unmarshal(meta, &sm); err != nil { + return nil, xerrors.Errorf("unmarshaling store meta: %w", err) + } + + return &sm, nil +} + +func (m *Mgr) Remove(id int64) error { + if err := m.mds.Delete(id); err != nil { + return xerrors.Errorf("removing import: %w", err) + } + + if err := m.ds.Delete(datastore.NewKey(fmt.Sprintf("%d", id))); err != nil { + return xerrors.Errorf("removing import metadata: %w", err) + } + + return nil +} diff --git a/node/repo/importmgr/multistore.go b/node/repo/importmgr/multistore.go new file mode 100644 index 000000000..705a3c947 --- /dev/null +++ b/node/repo/importmgr/multistore.go @@ -0,0 +1,124 @@ +package importmgr + +import ( + "fmt" + "path" + "sync" + "sync/atomic" + + "github.com/hashicorp/go-multierror" + "golang.org/x/xerrors" + + "github.com/ipfs/go-datastore" +) + +type dsProvider interface { + Datastore(namespace string) (datastore.Batching, error) + ListDatastores(namespace string) ([]int64, error) + DeleteDatastore(namespace string) error +} + +type MultiStore struct { + provider dsProvider + namespace string + + open map[int64]*Store + next int64 + + lk sync.RWMutex +} + +func NewMultiDstore(provider dsProvider, namespace string) (*MultiStore, error) { + ids, err := provider.ListDatastores(namespace) + if err != nil { + return nil, xerrors.Errorf("listing datastores: %w", err) + } + + mds := &MultiStore{ + provider: provider, + namespace: namespace, + + open: map[int64]*Store{}, + } + + for _, i := range ids { + if i > mds.next { + mds.next = i + } + + _, err := mds.Get(i) + if err != nil { + return nil, xerrors.Errorf("open store %d: %w", i, err) + } + } + + return mds, nil +} + +func (mds *MultiStore) path(i int64) string { + return path.Join("/", mds.namespace, fmt.Sprintf("%d", i)) +} + +func (mds *MultiStore) Next() int64 { + return atomic.AddInt64(&mds.next, 1) +} + +func (mds *MultiStore) Get(i int64) (*Store, error) { + mds.lk.Lock() + defer mds.lk.Unlock() + + store, ok := mds.open[i] + if ok { + return store, nil + } + + ds, err := mds.provider.Datastore(mds.path(i)) + if err != nil { + return nil, err + } + + mds.open[i], err = openStore(ds) + return mds.open[i], err +} + +func (mds *MultiStore) List() []int64 { + mds.lk.RLock() + defer mds.lk.RUnlock() + out := make([]int64, 0, len(mds.open)) + for i := range mds.open { + out = append(out, i) + } + + return out +} + +func (mds *MultiStore) Delete(i int64) error { + mds.lk.Lock() + defer mds.lk.Unlock() + + store, ok := mds.open[i] + if ok { + if err := store.Close(); err != nil { + return xerrors.Errorf("closing sub-datastore %d: %w", i, err) + } + + delete(mds.open, i) + } + + return mds.provider.DeleteDatastore(mds.path(i)) +} + +func (mds *MultiStore) Close() error { + mds.lk.Lock() + defer mds.lk.Unlock() + + var err error + for i, store := range mds.open { + cerr := store.Close() + if cerr != nil { + err = multierror.Append(err, xerrors.Errorf("closing sub-datastore %d: %w", i, cerr)) + } + } + + return err +} diff --git a/node/repo/importmgr/store.go b/node/repo/importmgr/store.go new file mode 100644 index 000000000..78bb7462b --- /dev/null +++ b/node/repo/importmgr/store.go @@ -0,0 +1,54 @@ +package importmgr + +import ( + "github.com/ipfs/go-blockservice" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + "github.com/ipfs/go-filestore" + blockstore "github.com/ipfs/go-ipfs-blockstore" + offline "github.com/ipfs/go-ipfs-exchange-offline" + ipld "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-merkledag" +) + +type Store struct { + ds datastore.Batching + + fm *filestore.FileManager + Fstore *filestore.Filestore + + Bstore blockstore.Blockstore + + bsvc blockservice.BlockService + DAG ipld.DAGService +} + +func openStore(ds datastore.Batching) (*Store, error) { + blocks := namespace.Wrap(ds, datastore.NewKey("blocks")) + bs := blockstore.NewBlockstore(blocks) + + fm := filestore.NewFileManager(ds, "/") + fm.AllowFiles = true + + fstore := filestore.NewFilestore(bs, fm) + ibs := blockstore.NewIdStore(fstore) + + bsvc := blockservice.New(ibs, offline.Exchange(ibs)) + dag := merkledag.NewDAGService(bsvc) + + return &Store{ + ds: ds, + + fm: fm, + Fstore: fstore, + + Bstore: ibs, + + bsvc: bsvc, + DAG: dag, + }, nil +} + +func (s *Store) Close() error { + return s.bsvc.Close() +} diff --git a/node/repo/interface.go b/node/repo/interface.go index 17336d413..c19a656af 100644 --- a/node/repo/interface.go +++ b/node/repo/interface.go @@ -36,6 +36,8 @@ type LockedRepo interface { // Returns datastore defined in this repo. Datastore(namespace string) (datastore.Batching, error) + ListDatastores(namespace string) ([]int64, error) + DeleteDatastore(namespace string) error // Returns config in this repo Config() (interface{}, error) diff --git a/node/repo/memrepo.go b/node/repo/memrepo.go index 229705929..f2762acea 100644 --- a/node/repo/memrepo.go +++ b/node/repo/memrepo.go @@ -243,6 +243,15 @@ func (lmem *lockedMemRepo) Datastore(ns string) (datastore.Batching, error) { return namespace.Wrap(lmem.mem.datastore, datastore.NewKey(ns)), nil } +func (lmem *lockedMemRepo) ListDatastores(ns string) ([]int64, error) { + return nil, nil +} + +func (lmem *lockedMemRepo) DeleteDatastore(ns string) error { + /** poof **/ + return nil +} + func (lmem *lockedMemRepo) Config() (interface{}, error) { if err := lmem.checkToken(); err != nil { return nil, err