fsrepo: multi-datastores
This commit is contained in:
parent
e7aec53631
commit
92e4507cf7
@ -225,9 +225,11 @@ type fsLockedRepo struct {
|
|||||||
repoType RepoType
|
repoType RepoType
|
||||||
closer io.Closer
|
closer io.Closer
|
||||||
|
|
||||||
ds datastore.Batching
|
ds map[string]datastore.Batching
|
||||||
dsErr error
|
multiDs map[string]map[int64]datastore.Batching
|
||||||
dsOnce sync.Once
|
dsErr error
|
||||||
|
dsOnce sync.Once
|
||||||
|
dsLk sync.Mutex
|
||||||
|
|
||||||
storageLk sync.Mutex
|
storageLk sync.Mutex
|
||||||
configLk sync.Mutex
|
configLk sync.Mutex
|
||||||
@ -244,8 +246,10 @@ func (fsr *fsLockedRepo) Close() error {
|
|||||||
return xerrors.Errorf("could not remove API file: %w", err)
|
return xerrors.Errorf("could not remove API file: %w", err)
|
||||||
}
|
}
|
||||||
if fsr.ds != nil {
|
if fsr.ds != nil {
|
||||||
if err := fsr.ds.Close(); err != nil {
|
for _, ds := range fsr.ds {
|
||||||
return xerrors.Errorf("could not close datastore: %w", err)
|
if err := ds.Close(); err != nil {
|
||||||
|
return xerrors.Errorf("could not close datastore: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,12 +1,13 @@
|
|||||||
package repo
|
package repo
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
"github.com/ipfs/go-datastore/mount"
|
|
||||||
"github.com/ipfs/go-datastore/namespace"
|
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
badger "github.com/ipfs/go-ds-badger2"
|
badger "github.com/ipfs/go-ds-badger2"
|
||||||
levelds "github.com/ipfs/go-ds-leveldb"
|
levelds "github.com/ipfs/go-ds-leveldb"
|
||||||
@ -14,13 +15,18 @@ import (
|
|||||||
ldbopts "github.com/syndtr/goleveldb/leveldb/opt"
|
ldbopts "github.com/syndtr/goleveldb/leveldb/opt"
|
||||||
)
|
)
|
||||||
|
|
||||||
var fsDatastores = map[string]func(path string) (datastore.Batching, error){
|
type dsCtor func(path string) (datastore.Batching, error)
|
||||||
|
|
||||||
|
var fsDatastores = map[string]dsCtor{
|
||||||
"chain": badgerDs,
|
"chain": badgerDs,
|
||||||
"metadata": levelDs,
|
"metadata": levelDs,
|
||||||
|
|
||||||
// Those need to be fast for large writes... but also need a really good GC :c
|
// Those need to be fast for large writes... but also need a really good GC :c
|
||||||
"staging": badgerDs, // miner specific
|
"staging": badgerDs, // miner specific
|
||||||
"client": badgerDs, // client specific
|
}
|
||||||
|
|
||||||
|
var fsMultiDatastores = map[string]dsCtor{
|
||||||
|
"client": badgerDs, // client specific
|
||||||
}
|
}
|
||||||
|
|
||||||
func badgerDs(path string) (datastore.Batching, error) {
|
func badgerDs(path string) (datastore.Batching, error) {
|
||||||
@ -36,12 +42,12 @@ func levelDs(path string) (datastore.Batching, error) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fsr *fsLockedRepo) openDatastore() (datastore.Batching, error) {
|
func (fsr *fsLockedRepo) openDatastores() (map[string]datastore.Batching, error) {
|
||||||
if err := os.MkdirAll(fsr.join(fsDatastore), 0755); err != nil {
|
if err := os.MkdirAll(fsr.join(fsDatastore), 0755); err != nil {
|
||||||
return nil, xerrors.Errorf("mkdir %s: %w", fsr.join(fsDatastore), err)
|
return nil, xerrors.Errorf("mkdir %s: %w", fsr.join(fsDatastore), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var mounts []mount.Mount
|
out := map[string]datastore.Batching{}
|
||||||
|
|
||||||
for p, ctor := range fsDatastores {
|
for p, ctor := range fsDatastores {
|
||||||
prefix := datastore.NewKey(p)
|
prefix := datastore.NewKey(p)
|
||||||
@ -54,21 +60,159 @@ func (fsr *fsLockedRepo) openDatastore() (datastore.Batching, error) {
|
|||||||
|
|
||||||
ds = measure.New("fsrepo."+p, ds)
|
ds = measure.New("fsrepo."+p, ds)
|
||||||
|
|
||||||
mounts = append(mounts, mount.Mount{
|
out[datastore.NewKey(p).String()] = ds
|
||||||
Prefix: prefix,
|
|
||||||
Datastore: ds,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return mount.New(mounts), nil
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsr *fsLockedRepo) openMultiDatastores() (map[string]map[int64]datastore.Batching, error) {
|
||||||
|
out := map[string]map[int64]datastore.Batching{}
|
||||||
|
|
||||||
|
for p, ctor := range fsMultiDatastores {
|
||||||
|
path := fsr.join(filepath.Join(fsDatastore, p))
|
||||||
|
if err := os.MkdirAll(path, 0755); err != nil {
|
||||||
|
return nil, xerrors.Errorf("mkdir %s: %w", path, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
di, err := ioutil.ReadDir(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("readdir '%s': %w", path, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
out[p] = map[int64]datastore.Batching{}
|
||||||
|
|
||||||
|
for _, info := range di {
|
||||||
|
path = filepath.Join(path, info.Name())
|
||||||
|
|
||||||
|
prefix := datastore.NewKey(p)
|
||||||
|
|
||||||
|
id, err := strconv.ParseInt(info.Name(), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error parsing multi-datastore id for '%s': %w", path, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: optimization: don't init datastores we don't need
|
||||||
|
ds, err := ctor(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("opening datastore %s: %w", prefix, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ds = measure.New("fsrepo."+p+"."+info.Name(), ds)
|
||||||
|
|
||||||
|
out[p][id] = ds
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsr *fsLockedRepo) openMultiDatastore(ns string, idx int64) (datastore.Batching, error) {
|
||||||
|
ctor, ok := fsMultiDatastores[ns]
|
||||||
|
if !ok {
|
||||||
|
return nil, xerrors.Errorf("no multi-datastore with namespace '%s'", ns)
|
||||||
|
}
|
||||||
|
|
||||||
|
si := fmt.Sprintf("%d", idx)
|
||||||
|
path := fsr.join(filepath.Join(fsDatastore, ns, si))
|
||||||
|
|
||||||
|
ds, err := ctor(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("opening datastore %s: %w", path, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ds = measure.New("fsrepo."+ns+"."+si, ds)
|
||||||
|
|
||||||
|
return ds, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fsr *fsLockedRepo) Datastore(ns string) (datastore.Batching, error) {
|
func (fsr *fsLockedRepo) Datastore(ns string) (datastore.Batching, error) {
|
||||||
fsr.dsOnce.Do(func() {
|
fsr.dsOnce.Do(func() {
|
||||||
fsr.ds, fsr.dsErr = fsr.openDatastore()
|
var err error
|
||||||
|
fsr.ds, err = fsr.openDatastores()
|
||||||
|
if err != nil {
|
||||||
|
fsr.dsErr = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fsr.multiDs, fsr.dsErr = fsr.openMultiDatastores()
|
||||||
})
|
})
|
||||||
if fsr.dsErr != nil {
|
if fsr.dsErr != nil {
|
||||||
return nil, fsr.dsErr
|
return nil, fsr.dsErr
|
||||||
}
|
}
|
||||||
return namespace.Wrap(fsr.ds, datastore.NewKey(ns)), nil
|
ds, ok := fsr.ds[ns]
|
||||||
|
if ok {
|
||||||
|
return ds, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
k := datastore.NewKey(ns)
|
||||||
|
parts := k.List()
|
||||||
|
if len(parts) != 2 {
|
||||||
|
return nil, xerrors.Errorf("expected multi-datastore namespace to have 2 parts")
|
||||||
|
}
|
||||||
|
|
||||||
|
fsr.dsLk.Lock()
|
||||||
|
defer fsr.dsLk.Unlock()
|
||||||
|
|
||||||
|
mds, ok := fsr.multiDs[parts[0]]
|
||||||
|
if !ok {
|
||||||
|
return nil, xerrors.Errorf("no multi-datastore with namespace %s", ns)
|
||||||
|
}
|
||||||
|
|
||||||
|
idx, err := strconv.ParseInt(parts[1], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("parsing mult-datastore index('%s'): %w", parts[1], err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ds, ok = mds[idx]
|
||||||
|
if !ok {
|
||||||
|
ds, err = fsr.openMultiDatastore(parts[0], idx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("opening multi-datastore: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mds[idx] = ds
|
||||||
|
}
|
||||||
|
|
||||||
|
return ds, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fsr *fsLockedRepo) DeleteDatastore(ns string) error {
|
||||||
|
k := datastore.NewKey(ns)
|
||||||
|
parts := k.List()
|
||||||
|
if len(parts) != 2 {
|
||||||
|
return xerrors.Errorf("expected multi-datastore namespace to have 2 parts")
|
||||||
|
}
|
||||||
|
|
||||||
|
mds, ok := fsr.multiDs[parts[0]]
|
||||||
|
if !ok {
|
||||||
|
return xerrors.Errorf("no multi-datastore with namespace %s", ns)
|
||||||
|
}
|
||||||
|
|
||||||
|
idx, err := strconv.ParseInt(parts[1], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("parsing mult-datastore index('%s'): %w", parts[1], err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fsr.dsLk.Lock()
|
||||||
|
defer fsr.dsLk.Unlock()
|
||||||
|
|
||||||
|
ds, ok := mds[idx]
|
||||||
|
if !ok {
|
||||||
|
return xerrors.Errorf("no multi-datastore with at index (namespace %s)", ns)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := ds.Close(); err != nil {
|
||||||
|
return xerrors.Errorf("closing datastore: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
path := fsr.join(filepath.Join(fsDatastore, parts[0], parts[1]))
|
||||||
|
|
||||||
|
log.Warnw("removing sub-datastore", "path", path, "namespace", ns)
|
||||||
|
if err := os.RemoveAll(path); err != nil {
|
||||||
|
return xerrors.Errorf("remove '%s': %w", path, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
@ -35,6 +35,7 @@ type LockedRepo interface {
|
|||||||
|
|
||||||
// Returns datastore defined in this repo.
|
// Returns datastore defined in this repo.
|
||||||
Datastore(namespace string) (datastore.Batching, error)
|
Datastore(namespace string) (datastore.Batching, error)
|
||||||
|
DeleteDatastore(namespace string) error
|
||||||
|
|
||||||
// Returns config in this repo
|
// Returns config in this repo
|
||||||
Config() (interface{}, error)
|
Config() (interface{}, error)
|
||||||
|
@ -233,6 +233,11 @@ func (lmem *lockedMemRepo) Datastore(ns string) (datastore.Batching, error) {
|
|||||||
return namespace.Wrap(lmem.mem.datastore, datastore.NewKey(ns)), nil
|
return namespace.Wrap(lmem.mem.datastore, datastore.NewKey(ns)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (lmem *lockedMemRepo) DeleteDatastore(ns string) error {
|
||||||
|
/** poof **/
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (lmem *lockedMemRepo) Config() (interface{}, error) {
|
func (lmem *lockedMemRepo) Config() (interface{}, error) {
|
||||||
if err := lmem.checkToken(); err != nil {
|
if err := lmem.checkToken(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
Loading…
Reference in New Issue
Block a user