Merge pull request #2286 from filecoin-project/feat/client-multi-bstore

Multiple client blockstores
This commit is contained in:
Łukasz Magiera 2020-07-08 23:21:22 +02:00 committed by GitHub
commit 7d841dbfa8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 845 additions and 204 deletions

View File

@ -5,7 +5,6 @@ import (
"time" "time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-filestore"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
@ -192,7 +191,9 @@ type FullNode interface {
// retrieval markets as a client // retrieval markets as a client
// ClientImport imports file under the specified path into filestore. // ClientImport imports file under the specified path into filestore.
ClientImport(ctx context.Context, ref FileRef) (cid.Cid, error) ClientImport(ctx context.Context, ref FileRef) (*ImportRes, error)
// ClientRemoveImport removes file import
ClientRemoveImport(ctx context.Context, importID int64) error
// ClientStartDeal proposes a deal with a miner. // ClientStartDeal proposes a deal with a miner.
ClientStartDeal(ctx context.Context, params *StartDealParams) (*cid.Cid, error) ClientStartDeal(ctx context.Context, params *StartDealParams) (*cid.Cid, error)
// ClientGetDealInfo returns the latest information about a given deal. // ClientGetDealInfo returns the latest information about a given deal.
@ -360,11 +361,18 @@ type MinerSectors struct {
Pset uint64 Pset uint64
} }
type ImportRes struct {
Root cid.Cid
ImportID int64
}
type Import struct { type Import struct {
Status filestore.Status Key int64
Key cid.Cid Err string
Root *cid.Cid
Source string
FilePath string FilePath string
Size uint64
} }
type DealInfo struct { type DealInfo struct {

View File

@ -111,8 +111,9 @@ type FullNodeStruct struct {
WalletImport func(context.Context, *types.KeyInfo) (address.Address, error) `perm:"admin"` WalletImport func(context.Context, *types.KeyInfo) (address.Address, error) `perm:"admin"`
WalletDelete func(context.Context, address.Address) error `perm:"write"` WalletDelete func(context.Context, address.Address) error `perm:"write"`
ClientImport func(ctx context.Context, ref api.FileRef) (cid.Cid, error) `perm:"admin"` ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"`
ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"` ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"`
ClientRemoveImport func(ctx context.Context, importID int64) error `perm:"admin"`
ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"`
ClientFindData func(ctx context.Context, root cid.Cid) ([]api.QueryOffer, error) `perm:"read"` ClientFindData func(ctx context.Context, root cid.Cid) ([]api.QueryOffer, error) `perm:"read"`
ClientMinerQueryOffer func(ctx context.Context, root cid.Cid, miner address.Address) (api.QueryOffer, error) `perm:"read"` ClientMinerQueryOffer func(ctx context.Context, root cid.Cid, miner address.Address) (api.QueryOffer, error) `perm:"read"`
@ -345,7 +346,11 @@ func (c *FullNodeStruct) ClientListImports(ctx context.Context) ([]api.Import, e
return c.Internal.ClientListImports(ctx) return c.Internal.ClientListImports(ctx)
} }
func (c *FullNodeStruct) ClientImport(ctx context.Context, ref api.FileRef) (cid.Cid, error) { func (c *FullNodeStruct) ClientRemoveImport(ctx context.Context, importID int64) error {
return c.Internal.ClientRemoveImport(ctx, importID)
}
func (c *FullNodeStruct) ClientImport(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) {
return c.Internal.ClientImport(ctx, ref) return c.Internal.ClientImport(ctx, ref)
} }

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"sort"
"strconv" "strconv"
"text/tabwriter" "text/tabwriter"
@ -55,6 +56,7 @@ var clientCmd = &cli.Command{
Usage: "Make deals, store data, retrieve data", Usage: "Make deals, store data, retrieve data",
Subcommands: []*cli.Command{ Subcommands: []*cli.Command{
clientImportCmd, clientImportCmd,
clientDropCmd,
clientCommPCmd, clientCommPCmd,
clientLocalCmd, clientLocalCmd,
clientDealCmd, clientDealCmd,
@ -75,6 +77,11 @@ var clientImportCmd = &cli.Command{
Name: "car", Name: "car",
Usage: "import from a car file instead of a regular file", Usage: "import from a car file instead of a regular file",
}, },
&cli.BoolFlag{
Name: "quiet",
Aliases: []string{"q"},
Usage: "Output root CID only",
},
&CidBaseFlag, &CidBaseFlag,
}, },
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
@ -103,7 +110,46 @@ var clientImportCmd = &cli.Command{
return err return err
} }
fmt.Println(encoder.Encode(c)) if !cctx.Bool("quiet") {
fmt.Printf("Import %d, Root ", c.ImportID)
}
fmt.Println(encoder.Encode(c.Root))
return nil
},
}
var clientDropCmd = &cli.Command{
Name: "drop",
Usage: "Remove import",
ArgsUsage: "[import ID...]",
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return xerrors.Errorf("no imports specified")
}
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
var ids []int64
for i, s := range cctx.Args().Slice() {
id, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return xerrors.Errorf("parsing %d-th import ID: %w", i, err)
}
ids = append(ids, id)
}
for _, id := range ids {
if err := api.ClientRemoveImport(ctx, id); err != nil {
return xerrors.Errorf("removing import %d: %w", id, err)
}
}
return nil return nil
}, },
@ -203,8 +249,20 @@ var clientLocalCmd = &cli.Command{
return err return err
} }
sort.Slice(list, func(i, j int) bool {
return list[i].Key < list[j].Key
})
for _, v := range list { for _, v := range list {
fmt.Printf("%s %s %s %s\n", encoder.Encode(v.Key), v.FilePath, types.SizeStr(types.NewInt(v.Size)), v.Status) cidStr := "<nil>"
if v.Root != nil {
cidStr = encoder.Encode(*v.Root)
}
fmt.Printf("%d: %s @%s (%s)\n", v.Key, cidStr, v.FilePath, v.Source)
if v.Err != "" {
fmt.Printf("\terror: %s\n", v.Err)
}
} }
return nil return nil
}, },

View File

@ -234,7 +234,6 @@ func Online() Option {
Override(new(dtypes.ChainGCBlockstore), modules.ChainGCBlockstore), Override(new(dtypes.ChainGCBlockstore), modules.ChainGCBlockstore),
Override(new(dtypes.ChainExchange), modules.ChainExchange), Override(new(dtypes.ChainExchange), modules.ChainExchange),
Override(new(dtypes.ChainBlockService), modules.ChainBlockservice), Override(new(dtypes.ChainBlockService), modules.ChainBlockservice),
Override(new(dtypes.ClientDAG), testing.MemoryClientDag),
// Filecoin services // Filecoin services
Override(new(*chain.Syncer), modules.NewSyncer), Override(new(*chain.Syncer), modules.NewSyncer),
@ -440,9 +439,10 @@ func Repo(r repo.Repo) Option {
Override(new(dtypes.MetadataDS), modules.Datastore), Override(new(dtypes.MetadataDS), modules.Datastore),
Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore), Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore),
Override(new(dtypes.ClientFilestore), modules.ClientFstore), Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr),
Override(new(dtypes.ClientMultiDstore), modules.ClientMultiDatastore),
Override(new(dtypes.ClientBlockstore), modules.ClientBlockstore), Override(new(dtypes.ClientBlockstore), modules.ClientBlockstore),
Override(new(dtypes.ClientDAG), modules.ClientDAG),
Override(new(ci.PrivKey), lp2p.PrivKey), Override(new(ci.PrivKey), lp2p.PrivKey),
Override(new(ci.PubKey), ci.PrivKey.GetPublic), Override(new(ci.PubKey), ci.PrivKey.GetPublic),

View File

@ -2,15 +2,7 @@ package client
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"github.com/filecoin-project/go-fil-markets/pieceio"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/multiformats/go-multiaddr"
"io" "io"
"os" "os"
@ -18,7 +10,7 @@ import (
"github.com/ipfs/go-blockservice" "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-filestore" "github.com/ipfs/go-cidutil"
chunker "github.com/ipfs/go-ipfs-chunker" chunker "github.com/ipfs/go-ipfs-chunker"
offline "github.com/ipfs/go-ipfs-exchange-offline" offline "github.com/ipfs/go-ipfs-exchange-offline"
files "github.com/ipfs/go-ipfs-files" files "github.com/ipfs/go-ipfs-files"
@ -28,10 +20,16 @@ import (
"github.com/ipfs/go-unixfs/importer/balanced" "github.com/ipfs/go-unixfs/importer/balanced"
ihelper "github.com/ipfs/go-unixfs/importer/helpers" ihelper "github.com/ipfs/go-unixfs/importer/helpers"
"github.com/ipld/go-car" "github.com/ipld/go-car"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
mh "github.com/multiformats/go-multihash"
"go.uber.org/fx" "go.uber.org/fx"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/pieceio"
rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/sector-storage/ffiwrapper" "github.com/filecoin-project/sector-storage/ffiwrapper"
@ -47,8 +45,11 @@ import (
"github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/impl/paych" "github.com/filecoin-project/lotus/node/impl/paych"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo/importmgr"
) )
var DefaultHashFunction = uint64(mh.BLAKE2B_MIN + 31)
const dealStartBuffer abi.ChainEpoch = 10000 // TODO: allow setting const dealStartBuffer abi.ChainEpoch = 10000 // TODO: allow setting
type API struct { type API struct {
@ -64,9 +65,9 @@ type API struct {
Retrieval rm.RetrievalClient Retrieval rm.RetrievalClient
Chain *store.ChainStore Chain *store.ChainStore
LocalDAG dtypes.ClientDAG Imports dtypes.ClientImportMgr
Blockstore dtypes.ClientBlockstore
Filestore dtypes.ClientFilestore `optional:"true"` RetBstore dtypes.ClientBlockstore // TODO: try to remove
} }
func calcDealExpiration(minDuration uint64, md *miner.DeadlineInfo, startEpoch abi.ChainEpoch) abi.ChainEpoch { func calcDealExpiration(minDuration uint64, md *miner.DeadlineInfo, startEpoch abi.ChainEpoch) abi.ChainEpoch {
@ -77,6 +78,10 @@ func calcDealExpiration(minDuration uint64, md *miner.DeadlineInfo, startEpoch a
return minExp + miner.WPoStProvingPeriod - (minExp % miner.WPoStProvingPeriod) + (md.PeriodStart % miner.WPoStProvingPeriod) - 1 return minExp + miner.WPoStProvingPeriod - (minExp % miner.WPoStProvingPeriod) + (md.PeriodStart % miner.WPoStProvingPeriod) - 1
} }
func (a *API) imgr() *importmgr.Mgr {
return a.Imports
}
func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) { func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) {
exist, err := a.WalletHas(ctx, params.Wallet) exist, err := a.WalletHas(ctx, params.Wallet)
if err != nil { if err != nil {
@ -195,7 +200,7 @@ func (a *API) ClientGetDealInfo(ctx context.Context, d cid.Cid) (*api.DealInfo,
func (a *API) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) { func (a *API) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) {
// TODO: check if we have the ENTIRE dag // TODO: check if we have the ENTIRE dag
offExch := merkledag.NewDAGService(blockservice.New(a.Blockstore, offline.Exchange(a.Blockstore))) offExch := merkledag.NewDAGService(blockservice.New(a.Imports.Blockstore, offline.Exchange(a.Imports.Blockstore)))
_, err := offExch.Get(ctx, root) _, err := offExch.Get(ctx, root)
if err == ipld.ErrNotFound { if err == ipld.ErrNotFound {
return false, nil return false, nil
@ -259,28 +264,65 @@ func (a *API) makeRetrievalQuery(ctx context.Context, rp rm.RetrievalPeer, paylo
} }
} }
func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (cid.Cid, error) { func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) {
id, st, err := a.imgr().NewStore()
bufferedDS := ipld.NewBufferedDAG(ctx, a.LocalDAG)
nd, err := a.clientImport(ref, bufferedDS)
if err != nil { if err != nil {
return cid.Undef, err return nil, err
}
if err := a.imgr().AddLabel(id, importmgr.LSource, "import"); err != nil {
return nil, err
} }
return nd, nil if err := a.imgr().AddLabel(id, importmgr.LFileName, ref.Path); err != nil {
return nil, err
}
nd, err := a.clientImport(ctx, ref, st)
if err != nil {
return nil, err
}
if err := a.imgr().AddLabel(id, importmgr.LRootCid, nd.String()); err != nil {
return nil, err
}
return &api.ImportRes{
Root: nd,
ImportID: id,
}, nil
}
func (a *API) ClientRemoveImport(ctx context.Context, importID int64) error {
return a.imgr().Remove(importID)
} }
func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, error) { func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, error) {
file := files.NewReaderFile(f) file := files.NewReaderFile(f)
bufferedDS := ipld.NewBufferedDAG(ctx, a.LocalDAG) id, st, err := a.imgr().NewStore()
if err != nil {
return cid.Undef, err
}
if err := a.imgr().AddLabel(id, "source", "import-local"); err != nil {
return cid.Cid{}, err
}
bufferedDS := ipld.NewBufferedDAG(ctx, st.DAG)
prefix, err := merkledag.PrefixForCidVersion(1)
if err != nil {
return cid.Undef, err
}
prefix.MhType = DefaultHashFunction
params := ihelper.DagBuilderParams{ params := ihelper.DagBuilderParams{
Maxlinks: build.UnixfsLinksPerLevel, Maxlinks: build.UnixfsLinksPerLevel,
RawLeaves: true, RawLeaves: true,
CidBuilder: nil, CidBuilder: cidutil.InlineBuilder{
Dagserv: bufferedDS, Builder: prefix,
Limit: 126,
},
Dagserv: bufferedDS,
} }
db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize))) db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize)))
@ -296,49 +338,38 @@ func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, erro
} }
func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) { func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) {
if a.Filestore == nil { importIDs := a.imgr().List()
return nil, errors.New("listing imports is not supported with in-memory dag yet")
}
next, err := filestore.ListAll(a.Filestore, false)
if err != nil {
return nil, err
}
// TODO: make this less very bad by tracking root cids instead of using ListAll out := make([]api.Import, len(importIDs))
for i, id := range importIDs {
out := make([]api.Import, 0) info, err := a.imgr().Info(id)
lowest := make([]uint64, 0) if err != nil {
for { out[i] = api.Import{
r := next() Key: id,
if r == nil { Err: xerrors.Errorf("getting info: %w", err).Error(),
return out, nil }
continue
} }
matched := false
for i := range out { ai := api.Import{
if out[i].FilePath == r.FilePath { Key: id,
matched = true Source: info.Labels[importmgr.LSource],
if lowest[i] > r.Offset { FilePath: info.Labels[importmgr.LFileName],
lowest[i] = r.Offset }
out[i] = api.Import{
Status: r.Status, if info.Labels[importmgr.LRootCid] != "" {
Key: r.Key, c, err := cid.Parse(info.Labels[importmgr.LRootCid])
FilePath: r.FilePath, if err != nil {
Size: r.Size, ai.Err = err.Error()
} } else {
} ai.Root = &c
break
} }
} }
if !matched {
out = append(out, api.Import{ out[i] = ai
Status: r.Status,
Key: r.Key,
FilePath: r.FilePath,
Size: r.Size,
})
lowest = append(lowest, r.Offset)
}
} }
return out, nil
} }
func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error { func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error {
@ -348,13 +379,21 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return err return err
} }
order.MinerPeerID = peer.ID(mi.PeerId) order.MinerPeerID = mi.PeerId
} }
if order.Size == 0 { if order.Size == 0 {
return xerrors.Errorf("cannot make retrieval deal for zero bytes") return xerrors.Errorf("cannot make retrieval deal for zero bytes")
} }
/*id, st, err := a.imgr().NewStore()
if err != nil {
return err
}
if err := a.imgr().AddLabel(id, "source", "retrieval"); err != nil {
return err
}*/
retrievalResult := make(chan error, 1) retrievalResult := make(chan error, 1)
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) { unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
@ -399,7 +438,7 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
order.Total, order.Total,
order.MinerPeerID, order.MinerPeerID,
order.Client, order.Client,
order.Miner) order.Miner) // TODO: pass the store here somehow
if err != nil { if err != nil {
return xerrors.Errorf("Retrieve failed: %w", err) return xerrors.Errorf("Retrieve failed: %w", err)
} }
@ -419,23 +458,25 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return nil return nil
} }
rdag := merkledag.NewDAGService(blockservice.New(a.RetBstore, offline.Exchange(a.RetBstore)))
if ref.IsCAR { if ref.IsCAR {
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644) f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil { if err != nil {
return err return err
} }
err = car.WriteCar(ctx, a.LocalDAG, []cid.Cid{order.Root}, f) err = car.WriteCar(ctx, rdag, []cid.Cid{order.Root}, f)
if err != nil { if err != nil {
return err return err
} }
return f.Close() return f.Close()
} }
nd, err := a.LocalDAG.Get(ctx, order.Root) nd, err := rdag.Get(ctx, order.Root)
if err != nil { if err != nil {
return xerrors.Errorf("ClientRetrieve: %w", err) return xerrors.Errorf("ClientRetrieve: %w", err)
} }
file, err := unixfile.NewUnixfsFile(ctx, a.LocalDAG, nd) file, err := unixfile.NewUnixfsFile(ctx, rdag, nd)
if err != nil { if err != nil {
return xerrors.Errorf("ClientRetrieve: %w", err) return xerrors.Errorf("ClientRetrieve: %w", err)
} }
@ -485,14 +526,22 @@ func (a *API) ClientCalcCommP(ctx context.Context, inpath string, miner address.
} }
func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath string) error { func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath string) error {
id, st, err := a.imgr().NewStore()
if err != nil {
return err
}
if err := a.imgr().AddLabel(id, "source", "gen-car"); err != nil {
return err
}
bufferedDS := ipld.NewBufferedDAG(ctx, a.LocalDAG) bufferedDS := ipld.NewBufferedDAG(ctx, st.DAG)
c, err := a.clientImport(ref, bufferedDS) c, err := a.clientImport(ctx, ref, st)
if err != nil { if err != nil {
return err return err
} }
// TODO: does that defer mean to remove the whole blockstore?
defer bufferedDS.Remove(ctx, c) //nolint:errcheck defer bufferedDS.Remove(ctx, c) //nolint:errcheck
ssb := builder.NewSelectorSpecBuilder(basicnode.Style.Any) ssb := builder.NewSelectorSpecBuilder(basicnode.Style.Any)
@ -505,7 +554,7 @@ func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath stri
return err return err
} }
sc := car.NewSelectiveCar(ctx, a.Blockstore, []car.Dag{{Root: c, Selector: allSelector}}) sc := car.NewSelectiveCar(ctx, st.Bstore, []car.Dag{{Root: c, Selector: allSelector}})
if err = sc.Write(f); err != nil { if err = sc.Write(f); err != nil {
return err return err
} }
@ -513,7 +562,7 @@ func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath stri
return f.Close() return f.Close()
} }
func (a *API) clientImport(ref api.FileRef, bufferedDS *ipld.BufferedDAG) (cid.Cid, error) { func (a *API) clientImport(ctx context.Context, ref api.FileRef, store *importmgr.Store) (cid.Cid, error) {
f, err := os.Open(ref.Path) f, err := os.Open(ref.Path)
if err != nil { if err != nil {
return cid.Undef, err return cid.Undef, err
@ -530,13 +579,13 @@ func (a *API) clientImport(ref api.FileRef, bufferedDS *ipld.BufferedDAG) (cid.C
} }
if ref.IsCAR { if ref.IsCAR {
var store car.Store var st car.Store
if a.Filestore == nil { if store.Fstore == nil {
store = a.Blockstore st = store.Bstore
} else { } else {
store = (*filestore.Filestore)(a.Filestore) st = store.Fstore
} }
result, err := car.LoadCar(store, file) result, err := car.LoadCar(st, file)
if err != nil { if err != nil {
return cid.Undef, err return cid.Undef, err
} }
@ -548,12 +597,23 @@ func (a *API) clientImport(ref api.FileRef, bufferedDS *ipld.BufferedDAG) (cid.C
return result.Roots[0], nil return result.Roots[0], nil
} }
bufDs := ipld.NewBufferedDAG(ctx, store.DAG)
prefix, err := merkledag.PrefixForCidVersion(1)
if err != nil {
return cid.Undef, err
}
prefix.MhType = DefaultHashFunction
params := ihelper.DagBuilderParams{ params := ihelper.DagBuilderParams{
Maxlinks: build.UnixfsLinksPerLevel, Maxlinks: build.UnixfsLinksPerLevel,
RawLeaves: true, RawLeaves: true,
CidBuilder: nil, CidBuilder: cidutil.InlineBuilder{
Dagserv: bufferedDS, Builder: prefix,
NoCopy: true, Limit: 126,
},
Dagserv: bufDs,
NoCopy: true,
} }
db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize))) db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize)))
@ -565,7 +625,7 @@ func (a *API) clientImport(ref api.FileRef, bufferedDS *ipld.BufferedDAG) (cid.C
return cid.Undef, err return cid.Undef, err
} }
if err := bufferedDS.Commit(); err != nil { if err := bufDs.Commit(); err != nil {
return cid.Undef, err return cid.Undef, err
} }

View File

@ -2,13 +2,11 @@ package modules
import ( import (
"context" "context"
"path/filepath" "github.com/filecoin-project/lotus/lib/bufbstore"
"time" "time"
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-merkledag"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/routing"
"go.uber.org/fx" "go.uber.org/fx"
dtimpl "github.com/filecoin-project/go-data-transfer/impl" dtimpl "github.com/filecoin-project/go-data-transfer/impl"
@ -24,44 +22,44 @@ import (
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-statestore" "github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/go-storedcounter" "github.com/filecoin-project/go-storedcounter"
"github.com/ipfs/go-bitswap"
"github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-filestore"
"github.com/filecoin-project/lotus/markets/retrievaladapter" "github.com/filecoin-project/lotus/markets/retrievaladapter"
"github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/impl/full"
payapi "github.com/filecoin-project/lotus/node/impl/paych" payapi "github.com/filecoin-project/lotus/node/impl/paych"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/node/repo/importmgr"
"github.com/filecoin-project/lotus/paychmgr" "github.com/filecoin-project/lotus/paychmgr"
) )
func ClientFstore(r repo.LockedRepo) (dtypes.ClientFilestore, error) { func ClientMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.ClientMultiDstore, error) {
clientds, err := r.Datastore("/client") mds, err := importmgr.NewMultiDstore(r, "/client")
if err != nil {
return nil, err
}
blocks := namespace.Wrap(clientds, datastore.NewKey("blocks"))
absPath, err := filepath.Abs(r.Path())
if err != nil { if err != nil {
return nil, err return nil, err
} }
fm := filestore.NewFileManager(clientds, filepath.Dir(absPath)) lc.Append(fx.Hook{
fm.AllowFiles = true OnStop: func(ctx context.Context) error {
// TODO: fm.AllowUrls (needs more code in client import) return mds.Close()
},
})
bs := blockstore.NewBlockstore(blocks) return mds, nil
return filestore.NewFilestore(bs, fm), nil
} }
func ClientBlockstore(fstore dtypes.ClientFilestore) dtypes.ClientBlockstore { func ClientImportMgr(mds dtypes.ClientMultiDstore, ds dtypes.MetadataDS) dtypes.ClientImportMgr {
return blockstore.NewIdStore((*filestore.Filestore)(fstore)) return importmgr.New(mds, namespace.Wrap(ds, datastore.NewKey("/client")))
}
func ClientBlockstore(imgr dtypes.ClientImportMgr) dtypes.ClientBlockstore {
// TODO: This isn't.. the best
// - If it's easy to pass per-retrieval blockstores with markets we don't need this
// - If it's not easy, we need to store this in a separate datastore on disk
defaultWrite := blockstore.NewBlockstore(datastore.NewMapDatastore())
return blockstore.NewIdStore(bufbstore.NewTieredBstore(imgr.Blockstore, defaultWrite))
} }
// RegisterClientValidator is an initialization hook that registers the client // RegisterClientValidator is an initialization hook that registers the client
@ -107,24 +105,6 @@ func NewClientDatastore(ds dtypes.MetadataDS) dtypes.ClientDatastore {
return namespace.Wrap(ds, datastore.NewKey("/deals/client")) return namespace.Wrap(ds, datastore.NewKey("/deals/client"))
} }
// ClientDAG is a DAGService for the ClientBlockstore
func ClientDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.ClientBlockstore, rt routing.Routing, h host.Host) dtypes.ClientDAG {
bitswapNetwork := network.NewFromIpfsHost(h, rt)
bitswapOptions := []bitswap.Option{bitswap.ProvideEnabled(false)}
exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, ibs, bitswapOptions...)
bsvc := blockservice.New(ibs, exch)
dag := merkledag.NewDAGService(bsvc)
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return bsvc.Close()
},
})
return dag
}
func NewClientRequestValidator(deals dtypes.ClientDealStore) dtypes.ClientRequestValidator { func NewClientRequestValidator(deals dtypes.ClientDealStore) dtypes.ClientRequestValidator {
return requestvalidation.NewUnifiedRequestValidator(nil, deals) return requestvalidation.NewUnifiedRequestValidator(nil, deals)
} }

View File

@ -1,18 +1,19 @@
package dtypes package dtypes
import ( import (
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
bserv "github.com/ipfs/go-blockservice" bserv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-filestore"
"github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync"
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
exchange "github.com/ipfs/go-ipfs-exchange-interface" exchange "github.com/ipfs/go-ipfs-exchange-interface"
format "github.com/ipfs/go-ipld-format" format "github.com/ipfs/go-ipld-format"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
datatransfer "github.com/filecoin-project/go-data-transfer" datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-statestore" "github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/lotus/node/repo/importmgr"
) )
// MetadataDS stores metadata // MetadataDS stores metadata
@ -26,9 +27,9 @@ type ChainGCBlockstore blockstore.GCBlockstore
type ChainExchange exchange.Interface type ChainExchange exchange.Interface
type ChainBlockService bserv.BlockService type ChainBlockService bserv.BlockService
type ClientFilestore *filestore.Filestore type ClientMultiDstore *importmgr.MultiStore
type ClientImportMgr *importmgr.Mgr
type ClientBlockstore blockstore.Blockstore type ClientBlockstore blockstore.Blockstore
type ClientDAG format.DAGService
type ClientDealStore *statestore.StateStore type ClientDealStore *statestore.StateStore
type ClientRequestValidator *requestvalidation.UnifiedRequestValidator type ClientRequestValidator *requestvalidation.UnifiedRequestValidator
type ClientDatastore datastore.Batching type ClientDatastore datastore.Batching

View File

@ -4,7 +4,6 @@ import (
"go.uber.org/fx" "go.uber.org/fx"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/ipfs/go-filestore"
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
@ -18,8 +17,8 @@ import (
// If ipfsMaddr is empty, a local IPFS node is assumed considering IPFS_PATH configuration. // If ipfsMaddr is empty, a local IPFS node is assumed considering IPFS_PATH configuration.
// If ipfsMaddr is not empty, it will connect to the remote IPFS node with the provided multiaddress. // If ipfsMaddr is not empty, it will connect to the remote IPFS node with the provided multiaddress.
// The flag useForRetrieval indicates if the IPFS node will also be used for storing retrieving deals. // The flag useForRetrieval indicates if the IPFS node will also be used for storing retrieving deals.
func IpfsClientBlockstore(ipfsMaddr string, useForRetrieval bool) func(helpers.MetricsCtx, fx.Lifecycle, dtypes.ClientFilestore) (dtypes.ClientBlockstore, error) { func IpfsClientBlockstore(ipfsMaddr string, useForRetrieval bool) func(helpers.MetricsCtx, fx.Lifecycle, dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fstore dtypes.ClientFilestore) (dtypes.ClientBlockstore, error) { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, localStore dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) {
var err error var err error
var ipfsbs *ipfsbstore.IpfsBstore var ipfsbs *ipfsbstore.IpfsBstore
if ipfsMaddr != "" { if ipfsMaddr != "" {
@ -38,7 +37,7 @@ func IpfsClientBlockstore(ipfsMaddr string, useForRetrieval bool) func(helpers.M
var ws blockstore.Blockstore var ws blockstore.Blockstore
ws = ipfsbs ws = ipfsbs
if !useForRetrieval { if !useForRetrieval {
ws = blockstore.NewIdStore((*filestore.Filestore)(fstore)) ws = blockstore.NewIdStore(localStore.Blockstore)
} }
return bufbstore.NewTieredBstore(ipfsbs, ws), nil return bufbstore.NewTieredBstore(ipfsbs, ws), nil
} }

View File

@ -1,39 +0,0 @@
package testing
import (
"context"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
dsync "github.com/ipfs/go-datastore/sync"
blockstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
"go.uber.org/fx"
)
func MapBlockstore() blockstore.Blockstore {
// TODO: proper datastore
bds := dsync.MutexWrap(datastore.NewMapDatastore())
bs := blockstore.NewBlockstore(bds)
return blockstore.NewIdStore(bs)
}
func MapDatastore() datastore.Batching {
return dsync.MutexWrap(datastore.NewMapDatastore())
}
func MemoryClientDag(lc fx.Lifecycle) ipld.DAGService {
ibs := blockstore.NewBlockstore(datastore.NewMapDatastore())
bsvc := blockservice.New(ibs, offline.Exchange(ibs))
dag := merkledag.NewDAGService(bsvc)
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return bsvc.Close()
},
})
return dag
}

View File

@ -226,9 +226,11 @@ type fsLockedRepo struct {
repoType RepoType repoType RepoType
closer io.Closer closer io.Closer
ds datastore.Batching ds map[string]datastore.Batching
dsErr error multiDs map[string]map[int64]datastore.Batching
dsOnce sync.Once dsErr error
dsOnce sync.Once
dsLk sync.Mutex
storageLk sync.Mutex storageLk sync.Mutex
configLk sync.Mutex configLk sync.Mutex
@ -245,8 +247,10 @@ func (fsr *fsLockedRepo) Close() error {
return xerrors.Errorf("could not remove API file: %w", err) return xerrors.Errorf("could not remove API file: %w", err)
} }
if fsr.ds != nil { if fsr.ds != nil {
if err := fsr.ds.Close(); err != nil { for _, ds := range fsr.ds {
return xerrors.Errorf("could not close datastore: %w", err) if err := ds.Close(); err != nil {
return xerrors.Errorf("could not close datastore: %w", err)
}
} }
} }

View File

@ -1,13 +1,13 @@
package repo package repo
import ( import (
"fmt"
"github.com/ipfs/go-datastore"
"golang.org/x/xerrors"
"io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"strconv"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/mount"
"github.com/ipfs/go-datastore/namespace"
"golang.org/x/xerrors"
dgbadger "github.com/dgraph-io/badger/v2" dgbadger "github.com/dgraph-io/badger/v2"
badger "github.com/ipfs/go-ds-badger2" badger "github.com/ipfs/go-ds-badger2"
@ -16,13 +16,18 @@ import (
ldbopts "github.com/syndtr/goleveldb/leveldb/opt" ldbopts "github.com/syndtr/goleveldb/leveldb/opt"
) )
var fsDatastores = map[string]func(path string) (datastore.Batching, error){ type dsCtor func(path string) (datastore.Batching, error)
var fsDatastores = map[string]dsCtor{
"chain": badgerDs, "chain": badgerDs,
"metadata": levelDs, "metadata": levelDs,
// Those need to be fast for large writes... but also need a really good GC :c // Those need to be fast for large writes... but also need a really good GC :c
"staging": badgerDs, // miner specific "staging": badgerDs, // miner specific
"client": badgerDs, // client specific }
var fsMultiDatastores = map[string]dsCtor{
"client": badgerDs, // client specific
} }
func badgerDs(path string) (datastore.Batching, error) { func badgerDs(path string) (datastore.Batching, error) {
@ -39,12 +44,12 @@ func levelDs(path string) (datastore.Batching, error) {
}) })
} }
func (fsr *fsLockedRepo) openDatastore() (datastore.Batching, error) { func (fsr *fsLockedRepo) openDatastores() (map[string]datastore.Batching, error) {
if err := os.MkdirAll(fsr.join(fsDatastore), 0755); err != nil { if err := os.MkdirAll(fsr.join(fsDatastore), 0755); err != nil {
return nil, xerrors.Errorf("mkdir %s: %w", fsr.join(fsDatastore), err) return nil, xerrors.Errorf("mkdir %s: %w", fsr.join(fsDatastore), err)
} }
var mounts []mount.Mount out := map[string]datastore.Batching{}
for p, ctor := range fsDatastores { for p, ctor := range fsDatastores {
prefix := datastore.NewKey(p) prefix := datastore.NewKey(p)
@ -57,21 +62,184 @@ func (fsr *fsLockedRepo) openDatastore() (datastore.Batching, error) {
ds = measure.New("fsrepo."+p, ds) ds = measure.New("fsrepo."+p, ds)
mounts = append(mounts, mount.Mount{ out[datastore.NewKey(p).String()] = ds
Prefix: prefix,
Datastore: ds,
})
} }
return mount.New(mounts), nil return out, nil
}
func (fsr *fsLockedRepo) openMultiDatastores() (map[string]map[int64]datastore.Batching, error) {
out := map[string]map[int64]datastore.Batching{}
for p, ctor := range fsMultiDatastores {
path := fsr.join(filepath.Join(fsDatastore, p))
if err := os.MkdirAll(path, 0755); err != nil {
return nil, xerrors.Errorf("mkdir %s: %w", path, err)
}
di, err := ioutil.ReadDir(path)
if err != nil {
return nil, xerrors.Errorf("readdir '%s': %w", path, err)
}
out[p] = map[int64]datastore.Batching{}
for _, info := range di {
path := filepath.Join(path, info.Name())
prefix := datastore.NewKey(p)
id, err := strconv.ParseInt(info.Name(), 10, 64)
if err != nil {
log.Errorf("error parsing multi-datastore id for '%s': %w", path, err)
continue
}
// TODO: optimization: don't init datastores we don't need
ds, err := ctor(path)
if err != nil {
return nil, xerrors.Errorf("opening datastore %s: %w", prefix, err)
}
ds = measure.New("fsrepo."+p+"."+info.Name(), ds)
out[p][id] = ds
}
}
return out, nil
}
func (fsr *fsLockedRepo) openMultiDatastore(ns string, idx int64) (datastore.Batching, error) {
ctor, ok := fsMultiDatastores[ns]
if !ok {
return nil, xerrors.Errorf("no multi-datastore with namespace '%s'", ns)
}
si := fmt.Sprintf("%d", idx)
path := fsr.join(filepath.Join(fsDatastore, ns, si))
ds, err := ctor(path)
if err != nil {
return nil, xerrors.Errorf("opening datastore %s: %w", path, err)
}
ds = measure.New("fsrepo."+ns+"."+si, ds)
return ds, nil
} }
func (fsr *fsLockedRepo) Datastore(ns string) (datastore.Batching, error) { func (fsr *fsLockedRepo) Datastore(ns string) (datastore.Batching, error) {
fsr.dsOnce.Do(func() { fsr.dsOnce.Do(func() {
fsr.ds, fsr.dsErr = fsr.openDatastore() var err error
fsr.ds, err = fsr.openDatastores()
if err != nil {
fsr.dsErr = err
return
}
fsr.multiDs, fsr.dsErr = fsr.openMultiDatastores()
}) })
if fsr.dsErr != nil { if fsr.dsErr != nil {
return nil, fsr.dsErr return nil, fsr.dsErr
} }
return namespace.Wrap(fsr.ds, datastore.NewKey(ns)), nil ds, ok := fsr.ds[ns]
if ok {
return ds, nil
}
k := datastore.NewKey(ns)
parts := k.List()
if len(parts) != 2 {
return nil, xerrors.Errorf("expected multi-datastore namespace to have 2 parts")
}
fsr.dsLk.Lock()
defer fsr.dsLk.Unlock()
mds, ok := fsr.multiDs[parts[0]]
if !ok {
return nil, xerrors.Errorf("no multi-datastore with namespace %s", ns)
}
idx, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return nil, xerrors.Errorf("parsing mult-datastore index('%s'): %w", parts[1], err)
}
ds, ok = mds[idx]
if !ok {
ds, err = fsr.openMultiDatastore(parts[0], idx)
if err != nil {
return nil, xerrors.Errorf("opening multi-datastore: %w", err)
}
mds[idx] = ds
}
return ds, nil
}
func (fsr *fsLockedRepo) ListDatastores(ns string) ([]int64, error) {
k := datastore.NewKey(ns)
parts := k.List()
if len(parts) != 1 {
return nil, xerrors.Errorf("expected multi-datastore namespace to have 1 part")
}
fsr.dsLk.Lock()
defer fsr.dsLk.Unlock()
mds, ok := fsr.multiDs[parts[0]]
if !ok {
return nil, xerrors.Errorf("no multi-datastore with namespace %s", ns)
}
out := make([]int64, 0, len(mds))
for i := range mds {
out = append(out, i)
}
return out, nil
}
func (fsr *fsLockedRepo) DeleteDatastore(ns string) error {
k := datastore.NewKey(ns)
parts := k.List()
if len(parts) != 2 {
return xerrors.Errorf("expected multi-datastore namespace to have 2 parts")
}
mds, ok := fsr.multiDs[parts[0]]
if !ok {
return xerrors.Errorf("no multi-datastore with namespace %s", ns)
}
idx, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return xerrors.Errorf("parsing mult-datastore index('%s'): %w", parts[1], err)
}
fsr.dsLk.Lock()
defer fsr.dsLk.Unlock()
ds, ok := mds[idx]
if !ok {
return xerrors.Errorf("no multi-datastore with at index (namespace %s)", ns)
}
delete(mds, idx)
if err := ds.Close(); err != nil {
return xerrors.Errorf("closing datastore: %w", err)
}
path := fsr.join(filepath.Join(fsDatastore, parts[0], parts[1]))
log.Warnw("removing sub-datastore", "path", path, "namespace", ns)
if err := os.RemoveAll(path); err != nil {
return xerrors.Errorf("remove '%s': %w", path, err)
}
return nil
} }

View File

@ -0,0 +1,96 @@
package importmgr
import (
"context"
"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)
type multiReadBs struct {
// TODO: some caching
mds *MultiStore
}
func (m *multiReadBs) Has(cid cid.Cid) (bool, error) {
m.mds.lk.RLock()
defer m.mds.lk.RUnlock()
var merr error
for i, store := range m.mds.open {
has, err := store.Bstore.Has(cid)
if err != nil {
merr = multierror.Append(merr, xerrors.Errorf("has (ds %d): %w", i, err))
continue
}
if !has {
continue
}
return true, nil
}
return false, merr
}
func (m *multiReadBs) Get(cid cid.Cid) (blocks.Block, error) {
m.mds.lk.RLock()
defer m.mds.lk.RUnlock()
var merr error
for i, store := range m.mds.open {
has, err := store.Bstore.Has(cid)
if err != nil {
merr = multierror.Append(merr, xerrors.Errorf("has (ds %d): %w", i, err))
continue
}
if !has {
continue
}
val, err := store.Bstore.Get(cid)
if err != nil {
merr = multierror.Append(merr, xerrors.Errorf("get (ds %d): %w", i, err))
continue
}
return val, nil
}
if merr == nil {
return nil, datastore.ErrNotFound
}
return nil, merr
}
func (m *multiReadBs) DeleteBlock(cid cid.Cid) error {
return xerrors.Errorf("operation not supported")
}
func (m *multiReadBs) GetSize(cid cid.Cid) (int, error) {
return 0, xerrors.Errorf("operation not supported")
}
func (m *multiReadBs) Put(block blocks.Block) error {
return xerrors.Errorf("operation not supported")
}
func (m *multiReadBs) PutMany(blocks []blocks.Block) error {
return xerrors.Errorf("operation not supported")
}
func (m *multiReadBs) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, xerrors.Errorf("operation not supported")
}
func (m *multiReadBs) HashOnRead(enabled bool) {
return
}
var _ blockstore.Blockstore = &multiReadBs{}

112
node/repo/importmgr/mgr.go Normal file
View File

@ -0,0 +1,112 @@
package importmgr
import (
"encoding/json"
"fmt"
"golang.org/x/xerrors"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)
type Mgr struct {
mds *MultiStore
ds datastore.Batching
Blockstore blockstore.Blockstore
}
type Label string
const (
LSource = "source" // Function which created the import
LRootCid = "root" // Root CID
LFileName = "filename" // Local file path
LMTime = "mtime" // File modification timestamp
)
func New(mds *MultiStore, ds datastore.Batching) *Mgr {
return &Mgr{
mds: mds,
Blockstore: &multiReadBs{
mds: mds,
},
ds: datastore.NewLogDatastore(namespace.Wrap(ds, datastore.NewKey("/stores")), "storess"),
}
}
type StoreMeta struct {
Labels map[string]string
}
func (m *Mgr) NewStore() (int64, *Store, error) {
id := m.mds.Next()
st, err := m.mds.Get(id)
if err != nil {
return 0, nil, err
}
meta, err := json.Marshal(&StoreMeta{Labels: map[string]string{
"source": "unknown",
}})
if err != nil {
return 0, nil, xerrors.Errorf("marshaling empty store metadata: %w", err)
}
err = m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta)
return id, st, err
}
func (m *Mgr) AddLabel(id int64, key, value string) error { // source, file path, data CID..
meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id)))
if err != nil {
return xerrors.Errorf("getting metadata form datastore: %w", err)
}
var sm StoreMeta
if err := json.Unmarshal(meta, &sm); err != nil {
return xerrors.Errorf("unmarshaling store meta: %w", err)
}
sm.Labels[key] = value
meta, err = json.Marshal(&sm)
if err != nil {
return xerrors.Errorf("marshaling store meta: %w", err)
}
return m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta)
}
func (m *Mgr) List() []int64 {
return m.mds.List()
}
func (m *Mgr) Info(id int64) (*StoreMeta, error) {
meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id)))
if err != nil {
return nil, xerrors.Errorf("getting metadata form datastore: %w", err)
}
var sm StoreMeta
if err := json.Unmarshal(meta, &sm); err != nil {
return nil, xerrors.Errorf("unmarshaling store meta: %w", err)
}
return &sm, nil
}
func (m *Mgr) Remove(id int64) error {
if err := m.mds.Delete(id); err != nil {
return xerrors.Errorf("removing import: %w", err)
}
if err := m.ds.Delete(datastore.NewKey(fmt.Sprintf("%d", id))); err != nil {
return xerrors.Errorf("removing import metadata: %w", err)
}
return nil
}

View File

@ -0,0 +1,124 @@
package importmgr
import (
"fmt"
"path"
"sync"
"sync/atomic"
"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors"
"github.com/ipfs/go-datastore"
)
type dsProvider interface {
Datastore(namespace string) (datastore.Batching, error)
ListDatastores(namespace string) ([]int64, error)
DeleteDatastore(namespace string) error
}
type MultiStore struct {
provider dsProvider
namespace string
open map[int64]*Store
next int64
lk sync.RWMutex
}
func NewMultiDstore(provider dsProvider, namespace string) (*MultiStore, error) {
ids, err := provider.ListDatastores(namespace)
if err != nil {
return nil, xerrors.Errorf("listing datastores: %w", err)
}
mds := &MultiStore{
provider: provider,
namespace: namespace,
open: map[int64]*Store{},
}
for _, i := range ids {
if i > mds.next {
mds.next = i
}
_, err := mds.Get(i)
if err != nil {
return nil, xerrors.Errorf("open store %d: %w", i, err)
}
}
return mds, nil
}
func (mds *MultiStore) path(i int64) string {
return path.Join("/", mds.namespace, fmt.Sprintf("%d", i))
}
func (mds *MultiStore) Next() int64 {
return atomic.AddInt64(&mds.next, 1)
}
func (mds *MultiStore) Get(i int64) (*Store, error) {
mds.lk.Lock()
defer mds.lk.Unlock()
store, ok := mds.open[i]
if ok {
return store, nil
}
ds, err := mds.provider.Datastore(mds.path(i))
if err != nil {
return nil, err
}
mds.open[i], err = openStore(ds)
return mds.open[i], err
}
func (mds *MultiStore) List() []int64 {
mds.lk.RLock()
defer mds.lk.RUnlock()
out := make([]int64, 0, len(mds.open))
for i := range mds.open {
out = append(out, i)
}
return out
}
func (mds *MultiStore) Delete(i int64) error {
mds.lk.Lock()
defer mds.lk.Unlock()
store, ok := mds.open[i]
if ok {
if err := store.Close(); err != nil {
return xerrors.Errorf("closing sub-datastore %d: %w", i, err)
}
delete(mds.open, i)
}
return mds.provider.DeleteDatastore(mds.path(i))
}
func (mds *MultiStore) Close() error {
mds.lk.Lock()
defer mds.lk.Unlock()
var err error
for i, store := range mds.open {
cerr := store.Close()
if cerr != nil {
err = multierror.Append(err, xerrors.Errorf("closing sub-datastore %d: %w", i, cerr))
}
}
return err
}

View File

@ -0,0 +1,54 @@
package importmgr
import (
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-filestore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
)
type Store struct {
ds datastore.Batching
fm *filestore.FileManager
Fstore *filestore.Filestore
Bstore blockstore.Blockstore
bsvc blockservice.BlockService
DAG ipld.DAGService
}
func openStore(ds datastore.Batching) (*Store, error) {
blocks := namespace.Wrap(ds, datastore.NewKey("blocks"))
bs := blockstore.NewBlockstore(blocks)
fm := filestore.NewFileManager(ds, "/")
fm.AllowFiles = true
fstore := filestore.NewFilestore(bs, fm)
ibs := blockstore.NewIdStore(fstore)
bsvc := blockservice.New(ibs, offline.Exchange(ibs))
dag := merkledag.NewDAGService(bsvc)
return &Store{
ds: ds,
fm: fm,
Fstore: fstore,
Bstore: ibs,
bsvc: bsvc,
DAG: dag,
}, nil
}
func (s *Store) Close() error {
return s.bsvc.Close()
}

View File

@ -36,6 +36,8 @@ type LockedRepo interface {
// Returns datastore defined in this repo. // Returns datastore defined in this repo.
Datastore(namespace string) (datastore.Batching, error) Datastore(namespace string) (datastore.Batching, error)
ListDatastores(namespace string) ([]int64, error)
DeleteDatastore(namespace string) error
// Returns config in this repo // Returns config in this repo
Config() (interface{}, error) Config() (interface{}, error)

View File

@ -243,6 +243,15 @@ func (lmem *lockedMemRepo) Datastore(ns string) (datastore.Batching, error) {
return namespace.Wrap(lmem.mem.datastore, datastore.NewKey(ns)), nil return namespace.Wrap(lmem.mem.datastore, datastore.NewKey(ns)), nil
} }
func (lmem *lockedMemRepo) ListDatastores(ns string) ([]int64, error) {
return nil, nil
}
func (lmem *lockedMemRepo) DeleteDatastore(ns string) error {
/** poof **/
return nil
}
func (lmem *lockedMemRepo) Config() (interface{}, error) { func (lmem *lockedMemRepo) Config() (interface{}, error) {
if err := lmem.checkToken(); err != nil { if err := lmem.checkToken(); err != nil {
return nil, err return nil, err