diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index b223731d9..aba088e54 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -225,9 +225,11 @@ type fsLockedRepo struct { repoType RepoType closer io.Closer - ds datastore.Batching - dsErr error - dsOnce sync.Once + ds map[string]datastore.Batching + multiDs map[string]map[int64]datastore.Batching + dsErr error + dsOnce sync.Once + dsLk sync.Mutex storageLk sync.Mutex configLk sync.Mutex @@ -244,8 +246,10 @@ func (fsr *fsLockedRepo) Close() error { return xerrors.Errorf("could not remove API file: %w", err) } if fsr.ds != nil { - if err := fsr.ds.Close(); err != nil { - return xerrors.Errorf("could not close datastore: %w", err) + for _, ds := range fsr.ds { + if err := ds.Close(); err != nil { + return xerrors.Errorf("could not close datastore: %w", err) + } } } diff --git a/node/repo/fsrepo_ds.go b/node/repo/fsrepo_ds.go index 034635c4f..cadb85d45 100644 --- a/node/repo/fsrepo_ds.go +++ b/node/repo/fsrepo_ds.go @@ -1,12 +1,13 @@ package repo import ( + "fmt" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/mount" - "github.com/ipfs/go-datastore/namespace" "golang.org/x/xerrors" + "io/ioutil" "os" "path/filepath" + "strconv" badger "github.com/ipfs/go-ds-badger2" levelds "github.com/ipfs/go-ds-leveldb" @@ -14,13 +15,18 @@ import ( ldbopts "github.com/syndtr/goleveldb/leveldb/opt" ) -var fsDatastores = map[string]func(path string) (datastore.Batching, error){ +type dsCtor func(path string) (datastore.Batching, error) + +var fsDatastores = map[string]dsCtor{ "chain": badgerDs, "metadata": levelDs, // Those need to be fast for large writes... but also need a really good GC :c "staging": badgerDs, // miner specific - "client": badgerDs, // client specific +} + +var fsMultiDatastores = map[string]dsCtor{ + "client": badgerDs, // client specific } func badgerDs(path string) (datastore.Batching, error) { @@ -36,12 +42,12 @@ func levelDs(path string) (datastore.Batching, error) { }) } -func (fsr *fsLockedRepo) openDatastore() (datastore.Batching, error) { +func (fsr *fsLockedRepo) openDatastores() (map[string]datastore.Batching, error) { if err := os.MkdirAll(fsr.join(fsDatastore), 0755); err != nil { return nil, xerrors.Errorf("mkdir %s: %w", fsr.join(fsDatastore), err) } - var mounts []mount.Mount + out := map[string]datastore.Batching{} for p, ctor := range fsDatastores { prefix := datastore.NewKey(p) @@ -54,21 +60,159 @@ func (fsr *fsLockedRepo) openDatastore() (datastore.Batching, error) { ds = measure.New("fsrepo."+p, ds) - mounts = append(mounts, mount.Mount{ - Prefix: prefix, - Datastore: ds, - }) + out[datastore.NewKey(p).String()] = ds } - return mount.New(mounts), nil + return out, nil +} + +func (fsr *fsLockedRepo) openMultiDatastores() (map[string]map[int64]datastore.Batching, error) { + out := map[string]map[int64]datastore.Batching{} + + for p, ctor := range fsMultiDatastores { + path := fsr.join(filepath.Join(fsDatastore, p)) + if err := os.MkdirAll(path, 0755); err != nil { + return nil, xerrors.Errorf("mkdir %s: %w", path, err) + } + + di, err := ioutil.ReadDir(path) + if err != nil { + return nil, xerrors.Errorf("readdir '%s': %w", path, err) + } + + out[p] = map[int64]datastore.Batching{} + + for _, info := range di { + path = filepath.Join(path, info.Name()) + + prefix := datastore.NewKey(p) + + id, err := strconv.ParseInt(info.Name(), 10, 64) + if err != nil { + log.Errorf("error parsing multi-datastore id for '%s': %w", path, err) + continue + } + + // TODO: optimization: don't init datastores we don't need + ds, err := ctor(path) + if err != nil { + return nil, xerrors.Errorf("opening datastore %s: %w", prefix, err) + } + + ds = measure.New("fsrepo."+p+"."+info.Name(), ds) + + out[p][id] = ds + } + } + + return out, nil +} + +func (fsr *fsLockedRepo) openMultiDatastore(ns string, idx int64) (datastore.Batching, error) { + ctor, ok := fsMultiDatastores[ns] + if !ok { + return nil, xerrors.Errorf("no multi-datastore with namespace '%s'", ns) + } + + si := fmt.Sprintf("%d", idx) + path := fsr.join(filepath.Join(fsDatastore, ns, si)) + + ds, err := ctor(path) + if err != nil { + return nil, xerrors.Errorf("opening datastore %s: %w", path, err) + } + + ds = measure.New("fsrepo."+ns+"."+si, ds) + + return ds, nil } func (fsr *fsLockedRepo) Datastore(ns string) (datastore.Batching, error) { fsr.dsOnce.Do(func() { - fsr.ds, fsr.dsErr = fsr.openDatastore() + var err error + fsr.ds, err = fsr.openDatastores() + if err != nil { + fsr.dsErr = err + return + } + + fsr.multiDs, fsr.dsErr = fsr.openMultiDatastores() }) if fsr.dsErr != nil { return nil, fsr.dsErr } - return namespace.Wrap(fsr.ds, datastore.NewKey(ns)), nil + ds, ok := fsr.ds[ns] + if ok { + return ds, nil + } + + k := datastore.NewKey(ns) + parts := k.List() + if len(parts) != 2 { + return nil, xerrors.Errorf("expected multi-datastore namespace to have 2 parts") + } + + fsr.dsLk.Lock() + defer fsr.dsLk.Unlock() + + mds, ok := fsr.multiDs[parts[0]] + if !ok { + return nil, xerrors.Errorf("no multi-datastore with namespace %s", ns) + } + + idx, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return nil, xerrors.Errorf("parsing mult-datastore index('%s'): %w", parts[1], err) + } + + ds, ok = mds[idx] + if !ok { + ds, err = fsr.openMultiDatastore(parts[0], idx) + if err != nil { + return nil, xerrors.Errorf("opening multi-datastore: %w", err) + } + + mds[idx] = ds + } + + return ds, nil } + +func (fsr *fsLockedRepo) DeleteDatastore(ns string) error { + k := datastore.NewKey(ns) + parts := k.List() + if len(parts) != 2 { + return xerrors.Errorf("expected multi-datastore namespace to have 2 parts") + } + + mds, ok := fsr.multiDs[parts[0]] + if !ok { + return xerrors.Errorf("no multi-datastore with namespace %s", ns) + } + + idx, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return xerrors.Errorf("parsing mult-datastore index('%s'): %w", parts[1], err) + } + + fsr.dsLk.Lock() + defer fsr.dsLk.Unlock() + + ds, ok := mds[idx] + if !ok { + return xerrors.Errorf("no multi-datastore with at index (namespace %s)", ns) + } + + if err := ds.Close(); err != nil { + return xerrors.Errorf("closing datastore: %w", err) + } + + path := fsr.join(filepath.Join(fsDatastore, parts[0], parts[1])) + + log.Warnw("removing sub-datastore", "path", path, "namespace", ns) + if err := os.RemoveAll(path); err != nil { + return xerrors.Errorf("remove '%s': %w", path, err) + } + + return nil +} \ No newline at end of file diff --git a/node/repo/interface.go b/node/repo/interface.go index 5950f813f..8699ae20a 100644 --- a/node/repo/interface.go +++ b/node/repo/interface.go @@ -35,6 +35,7 @@ type LockedRepo interface { // Returns datastore defined in this repo. Datastore(namespace string) (datastore.Batching, error) + DeleteDatastore(namespace string) error // Returns config in this repo Config() (interface{}, error) diff --git a/node/repo/memrepo.go b/node/repo/memrepo.go index 399b239c1..e99342c55 100644 --- a/node/repo/memrepo.go +++ b/node/repo/memrepo.go @@ -233,6 +233,11 @@ func (lmem *lockedMemRepo) Datastore(ns string) (datastore.Batching, error) { return namespace.Wrap(lmem.mem.datastore, datastore.NewKey(ns)), nil } +func (lmem *lockedMemRepo) DeleteDatastore(ns string) error { + /** poof **/ + return nil +} + func (lmem *lockedMemRepo) Config() (interface{}, error) { if err := lmem.checkToken(); err != nil { return nil, err