Use single multi ds

Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
Jakub Sztandera 2020-07-17 22:14:03 +02:00
parent 6ff99315a1
commit cc1d23a94c
No known key found for this signature in database
GPG Key ID: 9A9AF56F8B3879BA
10 changed files with 119 additions and 239 deletions

View File

@ -191,7 +191,7 @@ type FullNode interface {
// ClientImport imports file under the specified path into filestore.
ClientImport(ctx context.Context, ref FileRef) (*ImportRes, error)
// ClientRemoveImport removes file import
ClientRemoveImport(ctx context.Context, importID int64) error
ClientRemoveImport(ctx context.Context, importID int) error
// ClientStartDeal proposes a deal with a miner.
ClientStartDeal(ctx context.Context, params *StartDealParams) (*cid.Cid, error)
// ClientGetDealInfo returns the latest information about a given deal.
@ -393,11 +393,11 @@ type SectorLocation struct {
type ImportRes struct {
Root cid.Cid
ImportID int64
ImportID int
}
type Import struct {
Key int64
Key int
Err string
Root *cid.Cid

View File

@ -113,7 +113,7 @@ type FullNodeStruct struct {
ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"`
ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"`
ClientRemoveImport func(ctx context.Context, importID int64) error `perm:"admin"`
ClientRemoveImport func(ctx context.Context, importID int) error `perm:"admin"`
ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"`
ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"`
ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"`
@ -354,7 +354,7 @@ func (c *FullNodeStruct) ClientListImports(ctx context.Context) ([]api.Import, e
return c.Internal.ClientListImports(ctx)
}
func (c *FullNodeStruct) ClientRemoveImport(ctx context.Context, importID int64) error {
func (c *FullNodeStruct) ClientRemoveImport(ctx context.Context, importID int) error {
return c.Internal.ClientRemoveImport(ctx, importID)
}

View File

@ -140,14 +140,14 @@ var clientDropCmd = &cli.Command{
defer closer()
ctx := ReqContext(cctx)
var ids []int64
var ids []int
for i, s := range cctx.Args().Slice() {
id, err := strconv.ParseInt(s, 10, 64)
id, err := strconv.ParseInt(s, 10, 0)
if err != nil {
return xerrors.Errorf("parsing %d-th import ID: %w", i, err)
}
ids = append(ids, id)
ids = append(ids, int(id))
}
for _, id := range ids {

View File

@ -296,7 +296,7 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (*api.ImportRes
}, nil
}
func (a *API) ClientRemoveImport(ctx context.Context, importID int64) error {
func (a *API) ClientRemoveImport(ctx context.Context, importID int) error {
return a.imgr().Remove(importID)
}

View File

@ -5,6 +5,7 @@ import (
"time"
"github.com/filecoin-project/lotus/lib/bufbstore"
"golang.org/x/xerrors"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/libp2p/go-libp2p-core/host"
@ -36,7 +37,12 @@ import (
)
func ClientMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.ClientMultiDstore, error) {
mds, err := importmgr.NewMultiDstore(r, "/client")
ds, err := r.Datastore("/client")
if err != nil {
return nil, xerrors.Errorf("getting datastore out of reop: %w", err)
}
mds, err := importmgr.NewMultiDstore(ds)
if err != nil {
return nil, err
}

View File

@ -226,11 +226,10 @@ type fsLockedRepo struct {
repoType RepoType
closer io.Closer
ds map[string]datastore.Batching
multiDs map[string]map[int64]datastore.Batching
dsErr error
dsOnce sync.Once
dsLk sync.Mutex
ds map[string]datastore.Batching
dsErr error
dsOnce sync.Once
dsLk sync.Mutex
storageLk sync.Mutex
configLk sync.Mutex

View File

@ -1,13 +1,11 @@
package repo
import (
"fmt"
"github.com/ipfs/go-datastore"
"golang.org/x/xerrors"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"github.com/ipfs/go-datastore"
"golang.org/x/xerrors"
dgbadger "github.com/dgraph-io/badger/v2"
badger "github.com/ipfs/go-ds-badger2"
@ -24,9 +22,7 @@ var fsDatastores = map[string]dsCtor{
// Those need to be fast for large writes... but also need a really good GC :c
"staging": badgerDs, // miner specific
}
var fsMultiDatastores = map[string]dsCtor{
"client": badgerDs, // client specific
}
@ -68,78 +64,11 @@ func (fsr *fsLockedRepo) openDatastores() (map[string]datastore.Batching, error)
return out, nil
}
func (fsr *fsLockedRepo) openMultiDatastores() (map[string]map[int64]datastore.Batching, error) {
out := map[string]map[int64]datastore.Batching{}
for p, ctor := range fsMultiDatastores {
path := fsr.join(filepath.Join(fsDatastore, p))
if err := os.MkdirAll(path, 0755); err != nil {
return nil, xerrors.Errorf("mkdir %s: %w", path, err)
}
di, err := ioutil.ReadDir(path)
if err != nil {
return nil, xerrors.Errorf("readdir '%s': %w", path, err)
}
out[p] = map[int64]datastore.Batching{}
for _, info := range di {
path := filepath.Join(path, info.Name())
prefix := datastore.NewKey(p)
id, err := strconv.ParseInt(info.Name(), 10, 64)
if err != nil {
log.Errorf("error parsing multi-datastore id for '%s': %w", path, err)
continue
}
// TODO: optimization: don't init datastores we don't need
ds, err := ctor(path)
if err != nil {
return nil, xerrors.Errorf("opening datastore %s: %w", prefix, err)
}
ds = measure.New("fsrepo."+p+"."+info.Name(), ds)
out[p][id] = ds
}
}
return out, nil
}
func (fsr *fsLockedRepo) openMultiDatastore(ns string, idx int64) (datastore.Batching, error) {
ctor, ok := fsMultiDatastores[ns]
if !ok {
return nil, xerrors.Errorf("no multi-datastore with namespace '%s'", ns)
}
si := fmt.Sprintf("%d", idx)
path := fsr.join(filepath.Join(fsDatastore, ns, si))
ds, err := ctor(path)
if err != nil {
return nil, xerrors.Errorf("opening datastore %s: %w", path, err)
}
ds = measure.New("fsrepo."+ns+"."+si, ds)
return ds, nil
}
func (fsr *fsLockedRepo) Datastore(ns string) (datastore.Batching, error) {
fsr.dsOnce.Do(func() {
var err error
fsr.ds, err = fsr.openDatastores()
if err != nil {
fsr.dsErr = err
return
}
fsr.multiDs, fsr.dsErr = fsr.openMultiDatastores()
fsr.ds, fsr.dsErr = fsr.openDatastores()
})
if fsr.dsErr != nil {
return nil, fsr.dsErr
}
@ -147,99 +76,5 @@ func (fsr *fsLockedRepo) Datastore(ns string) (datastore.Batching, error) {
if ok {
return ds, nil
}
k := datastore.NewKey(ns)
parts := k.List()
if len(parts) != 2 {
return nil, xerrors.Errorf("expected multi-datastore namespace to have 2 parts")
}
fsr.dsLk.Lock()
defer fsr.dsLk.Unlock()
mds, ok := fsr.multiDs[parts[0]]
if !ok {
return nil, xerrors.Errorf("no multi-datastore with namespace %s", ns)
}
idx, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return nil, xerrors.Errorf("parsing mult-datastore index('%s'): %w", parts[1], err)
}
ds, ok = mds[idx]
if !ok {
ds, err = fsr.openMultiDatastore(parts[0], idx)
if err != nil {
return nil, xerrors.Errorf("opening multi-datastore: %w", err)
}
mds[idx] = ds
}
return ds, nil
}
func (fsr *fsLockedRepo) ListDatastores(ns string) ([]int64, error) {
k := datastore.NewKey(ns)
parts := k.List()
if len(parts) != 1 {
return nil, xerrors.Errorf("expected multi-datastore namespace to have 1 part")
}
fsr.dsLk.Lock()
defer fsr.dsLk.Unlock()
mds, ok := fsr.multiDs[parts[0]]
if !ok {
return nil, xerrors.Errorf("no multi-datastore with namespace %s", ns)
}
out := make([]int64, 0, len(mds))
for i := range mds {
out = append(out, i)
}
return out, nil
}
func (fsr *fsLockedRepo) DeleteDatastore(ns string) error {
k := datastore.NewKey(ns)
parts := k.List()
if len(parts) != 2 {
return xerrors.Errorf("expected multi-datastore namespace to have 2 parts")
}
mds, ok := fsr.multiDs[parts[0]]
if !ok {
return xerrors.Errorf("no multi-datastore with namespace %s", ns)
}
idx, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return xerrors.Errorf("parsing mult-datastore index('%s'): %w", parts[1], err)
}
fsr.dsLk.Lock()
defer fsr.dsLk.Unlock()
ds, ok := mds[idx]
if !ok {
return xerrors.Errorf("no multi-datastore with at index (namespace %s)", ns)
}
delete(mds, idx)
if err := ds.Close(); err != nil {
return xerrors.Errorf("closing datastore: %w", err)
}
path := fsr.join(filepath.Join(fsDatastore, parts[0], parts[1]))
log.Warnw("removing sub-datastore", "path", path, "namespace", ns)
if err := os.RemoveAll(path); err != nil {
return xerrors.Errorf("remove '%s': %w", path, err)
}
return nil
return nil, xerrors.Errorf("no such datastore: %s", ns)
}

View File

@ -42,7 +42,7 @@ type StoreMeta struct {
Labels map[string]string
}
func (m *Mgr) NewStore() (int64, *Store, error) {
func (m *Mgr) NewStore() (int, *Store, error) {
id := m.mds.Next()
st, err := m.mds.Get(id)
if err != nil {
@ -60,7 +60,7 @@ func (m *Mgr) NewStore() (int64, *Store, error) {
return id, st, err
}
func (m *Mgr) AddLabel(id int64, key, value string) error { // source, file path, data CID..
func (m *Mgr) AddLabel(id int, key, value string) error { // source, file path, data CID..
meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id)))
if err != nil {
return xerrors.Errorf("getting metadata form datastore: %w", err)
@ -81,11 +81,11 @@ func (m *Mgr) AddLabel(id int64, key, value string) error { // source, file path
return m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta)
}
func (m *Mgr) List() []int64 {
func (m *Mgr) List() []int {
return m.mds.List()
}
func (m *Mgr) Info(id int64) (*StoreMeta, error) {
func (m *Mgr) Info(id int) (*StoreMeta, error) {
meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id)))
if err != nil {
return nil, xerrors.Errorf("getting metadata form datastore: %w", err)
@ -99,7 +99,7 @@ func (m *Mgr) Info(id int64) (*StoreMeta, error) {
return &sm, nil
}
func (m *Mgr) Remove(id int64) error {
func (m *Mgr) Remove(id int) error {
if err := m.mds.Delete(id); err != nil {
return xerrors.Errorf("removing import: %w", err)
}

View File

@ -1,44 +1,47 @@
package importmgr
import (
"encoding/json"
"fmt"
"path"
"sort"
"sync"
"sync/atomic"
"github.com/hashicorp/go-multierror"
"go.uber.org/multierr"
"golang.org/x/xerrors"
"github.com/ipfs/go-datastore"
ktds "github.com/ipfs/go-datastore/keytransform"
"github.com/ipfs/go-datastore/query"
)
type dsProvider interface {
Datastore(namespace string) (datastore.Batching, error)
ListDatastores(namespace string) ([]int64, error)
DeleteDatastore(namespace string) error
}
type MultiStore struct {
provider dsProvider
namespace string
ds datastore.Batching
open map[int64]*Store
next int64
open map[int]*Store
next int
lk sync.RWMutex
}
func NewMultiDstore(provider dsProvider, namespace string) (*MultiStore, error) {
ids, err := provider.ListDatastores(namespace)
if err != nil {
return nil, xerrors.Errorf("listing datastores: %w", err)
var dsListKey = datastore.NewKey("/list")
var dsMultiKey = datastore.NewKey("/multi")
func NewMultiDstore(ds datastore.Batching) (*MultiStore, error) {
listBytes, err := ds.Get(dsListKey)
if xerrors.Is(err, datastore.ErrNotFound) {
listBytes, _ = json.Marshal([]int{})
} else if err != nil {
return nil, xerrors.Errorf("could not read multistore list: %w", err)
}
var ids []int
if err := json.Unmarshal(listBytes, &ids); err != nil {
return nil, xerrors.Errorf("could not unmarshal multistore list: %w", err)
}
mds := &MultiStore{
provider: provider,
namespace: namespace,
open: map[int64]*Store{},
ds: ds,
open: map[int]*Store{},
}
for _, i := range ids {
@ -55,15 +58,15 @@ func NewMultiDstore(provider dsProvider, namespace string) (*MultiStore, error)
return mds, nil
}
func (mds *MultiStore) path(i int64) string {
return path.Join("/", mds.namespace, fmt.Sprintf("%d", i))
func (mds *MultiStore) Next() int {
mds.lk.Lock()
defer mds.lk.Unlock()
mds.next++
return mds.next
}
func (mds *MultiStore) Next() int64 {
return atomic.AddInt64(&mds.next, 1)
}
func (mds *MultiStore) Get(i int64) (*Store, error) {
func (mds *MultiStore) Get(i int) (*Store, error) {
mds.lk.Lock()
defer mds.lk.Unlock()
@ -72,40 +75,81 @@ func (mds *MultiStore) Get(i int64) (*Store, error) {
return store, nil
}
ds, err := mds.provider.Datastore(mds.path(i))
wds := ktds.Wrap(mds.ds, ktds.PrefixTransform{
Prefix: dsMultiKey.ChildString(fmt.Sprintf("%d", i)),
})
var err error
mds.open[i], err = openStore(wds)
stores := make([]int, 0, len(mds.open))
for k := range mds.open {
stores = append(stores, k)
}
sort.Ints(stores)
listBytes, err := json.Marshal(stores)
if err != nil {
return nil, err
return nil, xerrors.Errorf("could not marshal list: %w", err)
}
err = mds.ds.Put(dsListKey, listBytes)
if err != nil {
return nil, xerrors.Errorf("could not save stores list: %w", err)
}
mds.open[i], err = openStore(ds)
return mds.open[i], err
}
func (mds *MultiStore) List() []int64 {
func (mds *MultiStore) List() []int {
mds.lk.RLock()
defer mds.lk.RUnlock()
out := make([]int64, 0, len(mds.open))
out := make([]int, 0, len(mds.open))
for i := range mds.open {
out = append(out, i)
}
sort.Ints(out)
return out
}
func (mds *MultiStore) Delete(i int64) error {
func (mds *MultiStore) Delete(i int) error {
mds.lk.Lock()
defer mds.lk.Unlock()
store, ok := mds.open[i]
if ok {
if err := store.Close(); err != nil {
return xerrors.Errorf("closing sub-datastore %d: %w", i, err)
}
delete(mds.open, i)
if !ok {
return nil
}
delete(mds.open, i)
err := store.Close()
if err != nil {
return xerrors.Errorf("closing store: %w", err)
}
return mds.provider.DeleteDatastore(mds.path(i))
qres, err := store.ds.Query(query.Query{KeysOnly: true})
if err != nil {
return xerrors.Errorf("query error: %w", err)
}
b, err := store.ds.Batch()
if err != nil {
return xerrors.Errorf("batch error: %w", err)
}
for r := range qres.Next() {
if r.Error != nil {
_ = qres.Close()
_ = b.Commit()
return xerrors.Errorf("iterator error: %w", err)
}
b.Delete(datastore.NewKey(r.Key))
}
err = b.Commit()
if err != nil {
return xerrors.Errorf("commiting: %w", err)
}
return nil
}
func (mds *MultiStore) Close() error {
@ -113,12 +157,10 @@ func (mds *MultiStore) Close() error {
defer mds.lk.Unlock()
var err error
for i, store := range mds.open {
cerr := store.Close()
if cerr != nil {
err = multierror.Append(err, xerrors.Errorf("closing sub-datastore %d: %w", i, cerr))
}
for _, s := range mds.open {
err = multierr.Append(err, s.Close())
}
mds.open = make(map[int]*Store)
return err
}

View File

@ -36,8 +36,6 @@ type LockedRepo interface {
// Returns datastore defined in this repo.
Datastore(namespace string) (datastore.Batching, error)
ListDatastores(namespace string) ([]int64, error)
DeleteDatastore(namespace string) error
// Returns config in this repo
Config() (interface{}, error)