Merge pull request #2459 from filecoin-project/feat/single-multi-ds

Use single multi ds
This commit is contained in:
Łukasz Magiera 2020-07-18 00:18:32 +02:00 committed by GitHub
commit 6cd1240fa2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 139 additions and 238 deletions

View File

@ -191,7 +191,7 @@ type FullNode interface {
// ClientImport imports file under the specified path into filestore. // ClientImport imports file under the specified path into filestore.
ClientImport(ctx context.Context, ref FileRef) (*ImportRes, error) ClientImport(ctx context.Context, ref FileRef) (*ImportRes, error)
// ClientRemoveImport removes file import // ClientRemoveImport removes file import
ClientRemoveImport(ctx context.Context, importID int64) error ClientRemoveImport(ctx context.Context, importID int) error
// ClientStartDeal proposes a deal with a miner. // ClientStartDeal proposes a deal with a miner.
ClientStartDeal(ctx context.Context, params *StartDealParams) (*cid.Cid, error) ClientStartDeal(ctx context.Context, params *StartDealParams) (*cid.Cid, error)
// ClientGetDealInfo returns the latest information about a given deal. // ClientGetDealInfo returns the latest information about a given deal.
@ -393,11 +393,11 @@ type SectorLocation struct {
type ImportRes struct { type ImportRes struct {
Root cid.Cid Root cid.Cid
ImportID int64 ImportID int
} }
type Import struct { type Import struct {
Key int64 Key int
Err string Err string
Root *cid.Cid Root *cid.Cid

View File

@ -113,7 +113,7 @@ type FullNodeStruct struct {
ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"` ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"`
ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"` ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"`
ClientRemoveImport func(ctx context.Context, importID int64) error `perm:"admin"` ClientRemoveImport func(ctx context.Context, importID int) error `perm:"admin"`
ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"`
ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"` ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"`
ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"` ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"`
@ -354,7 +354,7 @@ func (c *FullNodeStruct) ClientListImports(ctx context.Context) ([]api.Import, e
return c.Internal.ClientListImports(ctx) return c.Internal.ClientListImports(ctx)
} }
func (c *FullNodeStruct) ClientRemoveImport(ctx context.Context, importID int64) error { func (c *FullNodeStruct) ClientRemoveImport(ctx context.Context, importID int) error {
return c.Internal.ClientRemoveImport(ctx, importID) return c.Internal.ClientRemoveImport(ctx, importID)
} }

View File

@ -140,14 +140,14 @@ var clientDropCmd = &cli.Command{
defer closer() defer closer()
ctx := ReqContext(cctx) ctx := ReqContext(cctx)
var ids []int64 var ids []int
for i, s := range cctx.Args().Slice() { for i, s := range cctx.Args().Slice() {
id, err := strconv.ParseInt(s, 10, 64) id, err := strconv.ParseInt(s, 10, 0)
if err != nil { if err != nil {
return xerrors.Errorf("parsing %d-th import ID: %w", i, err) return xerrors.Errorf("parsing %d-th import ID: %w", i, err)
} }
ids = append(ids, id) ids = append(ids, int(id))
} }
for _, id := range ids { for _, id := range ids {

View File

@ -296,7 +296,7 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (*api.ImportRes
}, nil }, nil
} }
func (a *API) ClientRemoveImport(ctx context.Context, importID int64) error { func (a *API) ClientRemoveImport(ctx context.Context, importID int) error {
return a.imgr().Remove(importID) return a.imgr().Remove(importID)
} }

View File

@ -5,6 +5,7 @@ import (
"time" "time"
"github.com/filecoin-project/lotus/lib/bufbstore" "github.com/filecoin-project/lotus/lib/bufbstore"
"golang.org/x/xerrors"
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
@ -36,7 +37,12 @@ import (
) )
func ClientMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.ClientMultiDstore, error) { func ClientMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.ClientMultiDstore, error) {
mds, err := importmgr.NewMultiDstore(r, "/client") ds, err := r.Datastore("/client")
if err != nil {
return nil, xerrors.Errorf("getting datastore out of reop: %w", err)
}
mds, err := importmgr.NewMultiDstore(ds)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -227,10 +227,8 @@ type fsLockedRepo struct {
closer io.Closer closer io.Closer
ds map[string]datastore.Batching ds map[string]datastore.Batching
multiDs map[string]map[int64]datastore.Batching
dsErr error dsErr error
dsOnce sync.Once dsOnce sync.Once
dsLk sync.Mutex
storageLk sync.Mutex storageLk sync.Mutex
configLk sync.Mutex configLk sync.Mutex

View File

@ -1,13 +1,11 @@
package repo package repo
import ( import (
"fmt"
"github.com/ipfs/go-datastore"
"golang.org/x/xerrors"
"io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"strconv"
"github.com/ipfs/go-datastore"
"golang.org/x/xerrors"
dgbadger "github.com/dgraph-io/badger/v2" dgbadger "github.com/dgraph-io/badger/v2"
badger "github.com/ipfs/go-ds-badger2" badger "github.com/ipfs/go-ds-badger2"
@ -24,9 +22,7 @@ var fsDatastores = map[string]dsCtor{
// Those need to be fast for large writes... but also need a really good GC :c // Those need to be fast for large writes... but also need a really good GC :c
"staging": badgerDs, // miner specific "staging": badgerDs, // miner specific
}
var fsMultiDatastores = map[string]dsCtor{
"client": badgerDs, // client specific "client": badgerDs, // client specific
} }
@ -68,78 +64,11 @@ func (fsr *fsLockedRepo) openDatastores() (map[string]datastore.Batching, error)
return out, nil return out, nil
} }
func (fsr *fsLockedRepo) openMultiDatastores() (map[string]map[int64]datastore.Batching, error) {
out := map[string]map[int64]datastore.Batching{}
for p, ctor := range fsMultiDatastores {
path := fsr.join(filepath.Join(fsDatastore, p))
if err := os.MkdirAll(path, 0755); err != nil {
return nil, xerrors.Errorf("mkdir %s: %w", path, err)
}
di, err := ioutil.ReadDir(path)
if err != nil {
return nil, xerrors.Errorf("readdir '%s': %w", path, err)
}
out[p] = map[int64]datastore.Batching{}
for _, info := range di {
path := filepath.Join(path, info.Name())
prefix := datastore.NewKey(p)
id, err := strconv.ParseInt(info.Name(), 10, 64)
if err != nil {
log.Errorf("error parsing multi-datastore id for '%s': %w", path, err)
continue
}
// TODO: optimization: don't init datastores we don't need
ds, err := ctor(path)
if err != nil {
return nil, xerrors.Errorf("opening datastore %s: %w", prefix, err)
}
ds = measure.New("fsrepo."+p+"."+info.Name(), ds)
out[p][id] = ds
}
}
return out, nil
}
func (fsr *fsLockedRepo) openMultiDatastore(ns string, idx int64) (datastore.Batching, error) {
ctor, ok := fsMultiDatastores[ns]
if !ok {
return nil, xerrors.Errorf("no multi-datastore with namespace '%s'", ns)
}
si := fmt.Sprintf("%d", idx)
path := fsr.join(filepath.Join(fsDatastore, ns, si))
ds, err := ctor(path)
if err != nil {
return nil, xerrors.Errorf("opening datastore %s: %w", path, err)
}
ds = measure.New("fsrepo."+ns+"."+si, ds)
return ds, nil
}
func (fsr *fsLockedRepo) Datastore(ns string) (datastore.Batching, error) { func (fsr *fsLockedRepo) Datastore(ns string) (datastore.Batching, error) {
fsr.dsOnce.Do(func() { fsr.dsOnce.Do(func() {
var err error fsr.ds, fsr.dsErr = fsr.openDatastores()
fsr.ds, err = fsr.openDatastores()
if err != nil {
fsr.dsErr = err
return
}
fsr.multiDs, fsr.dsErr = fsr.openMultiDatastores()
}) })
if fsr.dsErr != nil { if fsr.dsErr != nil {
return nil, fsr.dsErr return nil, fsr.dsErr
} }
@ -147,99 +76,5 @@ func (fsr *fsLockedRepo) Datastore(ns string) (datastore.Batching, error) {
if ok { if ok {
return ds, nil return ds, nil
} }
return nil, xerrors.Errorf("no such datastore: %s", ns)
k := datastore.NewKey(ns)
parts := k.List()
if len(parts) != 2 {
return nil, xerrors.Errorf("expected multi-datastore namespace to have 2 parts")
}
fsr.dsLk.Lock()
defer fsr.dsLk.Unlock()
mds, ok := fsr.multiDs[parts[0]]
if !ok {
return nil, xerrors.Errorf("no multi-datastore with namespace %s", ns)
}
idx, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return nil, xerrors.Errorf("parsing mult-datastore index('%s'): %w", parts[1], err)
}
ds, ok = mds[idx]
if !ok {
ds, err = fsr.openMultiDatastore(parts[0], idx)
if err != nil {
return nil, xerrors.Errorf("opening multi-datastore: %w", err)
}
mds[idx] = ds
}
return ds, nil
}
func (fsr *fsLockedRepo) ListDatastores(ns string) ([]int64, error) {
k := datastore.NewKey(ns)
parts := k.List()
if len(parts) != 1 {
return nil, xerrors.Errorf("expected multi-datastore namespace to have 1 part")
}
fsr.dsLk.Lock()
defer fsr.dsLk.Unlock()
mds, ok := fsr.multiDs[parts[0]]
if !ok {
return nil, xerrors.Errorf("no multi-datastore with namespace %s", ns)
}
out := make([]int64, 0, len(mds))
for i := range mds {
out = append(out, i)
}
return out, nil
}
func (fsr *fsLockedRepo) DeleteDatastore(ns string) error {
k := datastore.NewKey(ns)
parts := k.List()
if len(parts) != 2 {
return xerrors.Errorf("expected multi-datastore namespace to have 2 parts")
}
mds, ok := fsr.multiDs[parts[0]]
if !ok {
return xerrors.Errorf("no multi-datastore with namespace %s", ns)
}
idx, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return xerrors.Errorf("parsing mult-datastore index('%s'): %w", parts[1], err)
}
fsr.dsLk.Lock()
defer fsr.dsLk.Unlock()
ds, ok := mds[idx]
if !ok {
return xerrors.Errorf("no multi-datastore with at index (namespace %s)", ns)
}
delete(mds, idx)
if err := ds.Close(); err != nil {
return xerrors.Errorf("closing datastore: %w", err)
}
path := fsr.join(filepath.Join(fsDatastore, parts[0], parts[1]))
log.Warnw("removing sub-datastore", "path", path, "namespace", ns)
if err := os.RemoveAll(path); err != nil {
return xerrors.Errorf("remove '%s': %w", path, err)
}
return nil
} }

View File

@ -42,7 +42,7 @@ type StoreMeta struct {
Labels map[string]string Labels map[string]string
} }
func (m *Mgr) NewStore() (int64, *Store, error) { func (m *Mgr) NewStore() (int, *Store, error) {
id := m.mds.Next() id := m.mds.Next()
st, err := m.mds.Get(id) st, err := m.mds.Get(id)
if err != nil { if err != nil {
@ -60,7 +60,7 @@ func (m *Mgr) NewStore() (int64, *Store, error) {
return id, st, err return id, st, err
} }
func (m *Mgr) AddLabel(id int64, key, value string) error { // source, file path, data CID.. func (m *Mgr) AddLabel(id int, key, value string) error { // source, file path, data CID..
meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id))) meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id)))
if err != nil { if err != nil {
return xerrors.Errorf("getting metadata form datastore: %w", err) return xerrors.Errorf("getting metadata form datastore: %w", err)
@ -81,11 +81,11 @@ func (m *Mgr) AddLabel(id int64, key, value string) error { // source, file path
return m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta) return m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta)
} }
func (m *Mgr) List() []int64 { func (m *Mgr) List() []int {
return m.mds.List() return m.mds.List()
} }
func (m *Mgr) Info(id int64) (*StoreMeta, error) { func (m *Mgr) Info(id int) (*StoreMeta, error) {
meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id))) meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id)))
if err != nil { if err != nil {
return nil, xerrors.Errorf("getting metadata form datastore: %w", err) return nil, xerrors.Errorf("getting metadata form datastore: %w", err)
@ -99,7 +99,7 @@ func (m *Mgr) Info(id int64) (*StoreMeta, error) {
return &sm, nil return &sm, nil
} }
func (m *Mgr) Remove(id int64) error { func (m *Mgr) Remove(id int) error {
if err := m.mds.Delete(id); err != nil { if err := m.mds.Delete(id); err != nil {
return xerrors.Errorf("removing import: %w", err) return xerrors.Errorf("removing import: %w", err)
} }

View File

@ -1,44 +1,47 @@
package importmgr package importmgr
import ( import (
"encoding/json"
"fmt" "fmt"
"path" "sort"
"sync" "sync"
"sync/atomic"
"github.com/hashicorp/go-multierror" "go.uber.org/multierr"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
ktds "github.com/ipfs/go-datastore/keytransform"
"github.com/ipfs/go-datastore/query"
) )
type dsProvider interface {
Datastore(namespace string) (datastore.Batching, error)
ListDatastores(namespace string) ([]int64, error)
DeleteDatastore(namespace string) error
}
type MultiStore struct { type MultiStore struct {
provider dsProvider ds datastore.Batching
namespace string
open map[int64]*Store open map[int]*Store
next int64 next int
lk sync.RWMutex lk sync.RWMutex
} }
func NewMultiDstore(provider dsProvider, namespace string) (*MultiStore, error) { var dsListKey = datastore.NewKey("/list")
ids, err := provider.ListDatastores(namespace) var dsMultiKey = datastore.NewKey("/multi")
if err != nil {
return nil, xerrors.Errorf("listing datastores: %w", err) func NewMultiDstore(ds datastore.Batching) (*MultiStore, error) {
listBytes, err := ds.Get(dsListKey)
if xerrors.Is(err, datastore.ErrNotFound) {
listBytes, _ = json.Marshal([]int{})
} else if err != nil {
return nil, xerrors.Errorf("could not read multistore list: %w", err)
}
var ids []int
if err := json.Unmarshal(listBytes, &ids); err != nil {
return nil, xerrors.Errorf("could not unmarshal multistore list: %w", err)
} }
mds := &MultiStore{ mds := &MultiStore{
provider: provider, ds: ds,
namespace: namespace, open: map[int]*Store{},
open: map[int64]*Store{},
} }
for _, i := range ids { for _, i := range ids {
@ -55,15 +58,33 @@ func NewMultiDstore(provider dsProvider, namespace string) (*MultiStore, error)
return mds, nil return mds, nil
} }
func (mds *MultiStore) path(i int64) string { func (mds *MultiStore) Next() int {
return path.Join("/", mds.namespace, fmt.Sprintf("%d", i)) mds.lk.Lock()
defer mds.lk.Unlock()
mds.next++
return mds.next
} }
func (mds *MultiStore) Next() int64 { func (mds *MultiStore) updateStores() error {
return atomic.AddInt64(&mds.next, 1) stores := make([]int, 0, len(mds.open))
for k := range mds.open {
stores = append(stores, k)
}
sort.Ints(stores)
listBytes, err := json.Marshal(stores)
if err != nil {
return xerrors.Errorf("could not marshal list: %w", err)
}
err = mds.ds.Put(dsListKey, listBytes)
if err != nil {
return xerrors.Errorf("could not save stores list: %w", err)
}
return nil
} }
func (mds *MultiStore) Get(i int64) (*Store, error) { func (mds *MultiStore) Get(i int) (*Store, error) {
mds.lk.Lock() mds.lk.Lock()
defer mds.lk.Unlock() defer mds.lk.Unlock()
@ -72,40 +93,85 @@ func (mds *MultiStore) Get(i int64) (*Store, error) {
return store, nil return store, nil
} }
ds, err := mds.provider.Datastore(mds.path(i)) wds := ktds.Wrap(mds.ds, ktds.PrefixTransform{
Prefix: dsMultiKey.ChildString(fmt.Sprintf("%d", i)),
})
var err error
mds.open[i], err = openStore(wds)
if err != nil { if err != nil {
return nil, err return nil, xerrors.Errorf("could not open new store: %w", err)
} }
mds.open[i], err = openStore(ds) err = mds.updateStores()
return mds.open[i], err if err != nil {
return nil, xerrors.Errorf("updating stores: %w", err)
}
return mds.open[i], nil
} }
func (mds *MultiStore) List() []int64 { func (mds *MultiStore) List() []int {
mds.lk.RLock() mds.lk.RLock()
defer mds.lk.RUnlock() defer mds.lk.RUnlock()
out := make([]int64, 0, len(mds.open))
out := make([]int, 0, len(mds.open))
for i := range mds.open { for i := range mds.open {
out = append(out, i) out = append(out, i)
} }
sort.Ints(out)
return out return out
} }
func (mds *MultiStore) Delete(i int64) error { func (mds *MultiStore) Delete(i int) error {
mds.lk.Lock() mds.lk.Lock()
defer mds.lk.Unlock() defer mds.lk.Unlock()
store, ok := mds.open[i] store, ok := mds.open[i]
if ok { if !ok {
if err := store.Close(); err != nil { return nil
return xerrors.Errorf("closing sub-datastore %d: %w", i, err)
} }
delete(mds.open, i) delete(mds.open, i)
err := store.Close()
if err != nil {
return xerrors.Errorf("closing store: %w", err)
} }
return mds.provider.DeleteDatastore(mds.path(i)) err = mds.updateStores()
if err != nil {
return xerrors.Errorf("updating stores: %w", err)
}
qres, err := store.ds.Query(query.Query{KeysOnly: true})
if err != nil {
return xerrors.Errorf("query error: %w", err)
}
defer qres.Close() //nolint:errcheck
b, err := store.ds.Batch()
if err != nil {
return xerrors.Errorf("batch error: %w", err)
}
for r := range qres.Next() {
if r.Error != nil {
_ = b.Commit()
return xerrors.Errorf("iterator error: %w", err)
}
err := b.Delete(datastore.NewKey(r.Key))
if err != nil {
_ = b.Commit()
return xerrors.Errorf("adding to batch: %w", err)
}
}
err = b.Commit()
if err != nil {
return xerrors.Errorf("committing: %w", err)
}
return nil
} }
func (mds *MultiStore) Close() error { func (mds *MultiStore) Close() error {
@ -113,12 +179,10 @@ func (mds *MultiStore) Close() error {
defer mds.lk.Unlock() defer mds.lk.Unlock()
var err error var err error
for i, store := range mds.open { for _, s := range mds.open {
cerr := store.Close() err = multierr.Append(err, s.Close())
if cerr != nil {
err = multierror.Append(err, xerrors.Errorf("closing sub-datastore %d: %w", i, cerr))
}
} }
mds.open = make(map[int]*Store)
return err return err
} }

View File

@ -36,8 +36,6 @@ type LockedRepo interface {
// Returns datastore defined in this repo. // Returns datastore defined in this repo.
Datastore(namespace string) (datastore.Batching, error) Datastore(namespace string) (datastore.Batching, error)
ListDatastores(namespace string) ([]int64, error)
DeleteDatastore(namespace string) error
// Returns config in this repo // Returns config in this repo
Config() (interface{}, error) Config() (interface{}, error)