diff --git a/api/api_full.go b/api/api_full.go index 43587d58e..fc1d70d0f 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -191,7 +191,7 @@ type FullNode interface { // ClientImport imports file under the specified path into filestore. ClientImport(ctx context.Context, ref FileRef) (*ImportRes, error) // ClientRemoveImport removes file import - ClientRemoveImport(ctx context.Context, importID int64) error + ClientRemoveImport(ctx context.Context, importID int) error // ClientStartDeal proposes a deal with a miner. ClientStartDeal(ctx context.Context, params *StartDealParams) (*cid.Cid, error) // ClientGetDealInfo returns the latest information about a given deal. @@ -393,11 +393,11 @@ type SectorLocation struct { type ImportRes struct { Root cid.Cid - ImportID int64 + ImportID int } type Import struct { - Key int64 + Key int Err string Root *cid.Cid diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 7e127b3d4..89edc1762 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -113,7 +113,7 @@ type FullNodeStruct struct { ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"` ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"` - ClientRemoveImport func(ctx context.Context, importID int64) error `perm:"admin"` + ClientRemoveImport func(ctx context.Context, importID int) error `perm:"admin"` ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"` ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"` @@ -354,7 +354,7 @@ func (c *FullNodeStruct) ClientListImports(ctx context.Context) ([]api.Import, e return c.Internal.ClientListImports(ctx) } -func (c *FullNodeStruct) ClientRemoveImport(ctx context.Context, importID int64) error { +func (c *FullNodeStruct) ClientRemoveImport(ctx context.Context, importID int) error { return c.Internal.ClientRemoveImport(ctx, importID) } diff --git a/cli/client.go b/cli/client.go index 414c04cd8..7846dd243 100644 --- a/cli/client.go +++ b/cli/client.go @@ -140,14 +140,14 @@ var clientDropCmd = &cli.Command{ defer closer() ctx := ReqContext(cctx) - var ids []int64 + var ids []int for i, s := range cctx.Args().Slice() { - id, err := strconv.ParseInt(s, 10, 64) + id, err := strconv.ParseInt(s, 10, 0) if err != nil { return xerrors.Errorf("parsing %d-th import ID: %w", i, err) } - ids = append(ids, id) + ids = append(ids, int(id)) } for _, id := range ids { diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 3eb3736bc..16024a0da 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -296,7 +296,7 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (*api.ImportRes }, nil } -func (a *API) ClientRemoveImport(ctx context.Context, importID int64) error { +func (a *API) ClientRemoveImport(ctx context.Context, importID int) error { return a.imgr().Remove(importID) } diff --git a/node/modules/client.go b/node/modules/client.go index 780aa91ee..a24709beb 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -5,6 +5,7 @@ import ( "time" "github.com/filecoin-project/lotus/lib/bufbstore" + "golang.org/x/xerrors" blockstore "github.com/ipfs/go-ipfs-blockstore" "github.com/libp2p/go-libp2p-core/host" @@ -36,7 +37,12 @@ import ( ) func ClientMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.ClientMultiDstore, error) { - mds, err := importmgr.NewMultiDstore(r, "/client") + ds, err := r.Datastore("/client") + if err != nil { + return nil, xerrors.Errorf("getting datastore out of reop: %w", err) + } + + mds, err := importmgr.NewMultiDstore(ds) if err != nil { return nil, err } diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index 682983415..9a3ec9ca4 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -226,11 +226,10 @@ type fsLockedRepo struct { repoType RepoType closer io.Closer - ds map[string]datastore.Batching - multiDs map[string]map[int64]datastore.Batching - dsErr error - dsOnce sync.Once - dsLk sync.Mutex + ds map[string]datastore.Batching + dsErr error + dsOnce sync.Once + dsLk sync.Mutex storageLk sync.Mutex configLk sync.Mutex diff --git a/node/repo/fsrepo_ds.go b/node/repo/fsrepo_ds.go index 473e4bafa..7075bda79 100644 --- a/node/repo/fsrepo_ds.go +++ b/node/repo/fsrepo_ds.go @@ -1,13 +1,11 @@ package repo import ( - "fmt" - "github.com/ipfs/go-datastore" - "golang.org/x/xerrors" - "io/ioutil" "os" "path/filepath" - "strconv" + + "github.com/ipfs/go-datastore" + "golang.org/x/xerrors" dgbadger "github.com/dgraph-io/badger/v2" badger "github.com/ipfs/go-ds-badger2" @@ -24,9 +22,7 @@ var fsDatastores = map[string]dsCtor{ // Those need to be fast for large writes... but also need a really good GC :c "staging": badgerDs, // miner specific -} -var fsMultiDatastores = map[string]dsCtor{ "client": badgerDs, // client specific } @@ -68,78 +64,11 @@ func (fsr *fsLockedRepo) openDatastores() (map[string]datastore.Batching, error) return out, nil } -func (fsr *fsLockedRepo) openMultiDatastores() (map[string]map[int64]datastore.Batching, error) { - out := map[string]map[int64]datastore.Batching{} - - for p, ctor := range fsMultiDatastores { - path := fsr.join(filepath.Join(fsDatastore, p)) - if err := os.MkdirAll(path, 0755); err != nil { - return nil, xerrors.Errorf("mkdir %s: %w", path, err) - } - - di, err := ioutil.ReadDir(path) - if err != nil { - return nil, xerrors.Errorf("readdir '%s': %w", path, err) - } - - out[p] = map[int64]datastore.Batching{} - - for _, info := range di { - path := filepath.Join(path, info.Name()) - - prefix := datastore.NewKey(p) - - id, err := strconv.ParseInt(info.Name(), 10, 64) - if err != nil { - log.Errorf("error parsing multi-datastore id for '%s': %w", path, err) - continue - } - - // TODO: optimization: don't init datastores we don't need - ds, err := ctor(path) - if err != nil { - return nil, xerrors.Errorf("opening datastore %s: %w", prefix, err) - } - - ds = measure.New("fsrepo."+p+"."+info.Name(), ds) - - out[p][id] = ds - } - } - - return out, nil -} - -func (fsr *fsLockedRepo) openMultiDatastore(ns string, idx int64) (datastore.Batching, error) { - ctor, ok := fsMultiDatastores[ns] - if !ok { - return nil, xerrors.Errorf("no multi-datastore with namespace '%s'", ns) - } - - si := fmt.Sprintf("%d", idx) - path := fsr.join(filepath.Join(fsDatastore, ns, si)) - - ds, err := ctor(path) - if err != nil { - return nil, xerrors.Errorf("opening datastore %s: %w", path, err) - } - - ds = measure.New("fsrepo."+ns+"."+si, ds) - - return ds, nil -} - func (fsr *fsLockedRepo) Datastore(ns string) (datastore.Batching, error) { fsr.dsOnce.Do(func() { - var err error - fsr.ds, err = fsr.openDatastores() - if err != nil { - fsr.dsErr = err - return - } - - fsr.multiDs, fsr.dsErr = fsr.openMultiDatastores() + fsr.ds, fsr.dsErr = fsr.openDatastores() }) + if fsr.dsErr != nil { return nil, fsr.dsErr } @@ -147,99 +76,5 @@ func (fsr *fsLockedRepo) Datastore(ns string) (datastore.Batching, error) { if ok { return ds, nil } - - k := datastore.NewKey(ns) - parts := k.List() - if len(parts) != 2 { - return nil, xerrors.Errorf("expected multi-datastore namespace to have 2 parts") - } - - fsr.dsLk.Lock() - defer fsr.dsLk.Unlock() - - mds, ok := fsr.multiDs[parts[0]] - if !ok { - return nil, xerrors.Errorf("no multi-datastore with namespace %s", ns) - } - - idx, err := strconv.ParseInt(parts[1], 10, 64) - if err != nil { - return nil, xerrors.Errorf("parsing mult-datastore index('%s'): %w", parts[1], err) - } - - ds, ok = mds[idx] - if !ok { - ds, err = fsr.openMultiDatastore(parts[0], idx) - if err != nil { - return nil, xerrors.Errorf("opening multi-datastore: %w", err) - } - - mds[idx] = ds - } - - return ds, nil -} - -func (fsr *fsLockedRepo) ListDatastores(ns string) ([]int64, error) { - k := datastore.NewKey(ns) - parts := k.List() - if len(parts) != 1 { - return nil, xerrors.Errorf("expected multi-datastore namespace to have 1 part") - } - - fsr.dsLk.Lock() - defer fsr.dsLk.Unlock() - - mds, ok := fsr.multiDs[parts[0]] - if !ok { - return nil, xerrors.Errorf("no multi-datastore with namespace %s", ns) - } - - out := make([]int64, 0, len(mds)) - for i := range mds { - out = append(out, i) - } - - return out, nil -} - -func (fsr *fsLockedRepo) DeleteDatastore(ns string) error { - k := datastore.NewKey(ns) - parts := k.List() - if len(parts) != 2 { - return xerrors.Errorf("expected multi-datastore namespace to have 2 parts") - } - - mds, ok := fsr.multiDs[parts[0]] - if !ok { - return xerrors.Errorf("no multi-datastore with namespace %s", ns) - } - - idx, err := strconv.ParseInt(parts[1], 10, 64) - if err != nil { - return xerrors.Errorf("parsing mult-datastore index('%s'): %w", parts[1], err) - } - - fsr.dsLk.Lock() - defer fsr.dsLk.Unlock() - - ds, ok := mds[idx] - if !ok { - return xerrors.Errorf("no multi-datastore with at index (namespace %s)", ns) - } - - delete(mds, idx) - - if err := ds.Close(); err != nil { - return xerrors.Errorf("closing datastore: %w", err) - } - - path := fsr.join(filepath.Join(fsDatastore, parts[0], parts[1])) - - log.Warnw("removing sub-datastore", "path", path, "namespace", ns) - if err := os.RemoveAll(path); err != nil { - return xerrors.Errorf("remove '%s': %w", path, err) - } - - return nil + return nil, xerrors.Errorf("no such datastore: %s", ns) } diff --git a/node/repo/importmgr/mgr.go b/node/repo/importmgr/mgr.go index 464f4ac33..d3139918e 100644 --- a/node/repo/importmgr/mgr.go +++ b/node/repo/importmgr/mgr.go @@ -42,7 +42,7 @@ type StoreMeta struct { Labels map[string]string } -func (m *Mgr) NewStore() (int64, *Store, error) { +func (m *Mgr) NewStore() (int, *Store, error) { id := m.mds.Next() st, err := m.mds.Get(id) if err != nil { @@ -60,7 +60,7 @@ func (m *Mgr) NewStore() (int64, *Store, error) { return id, st, err } -func (m *Mgr) AddLabel(id int64, key, value string) error { // source, file path, data CID.. +func (m *Mgr) AddLabel(id int, key, value string) error { // source, file path, data CID.. meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id))) if err != nil { return xerrors.Errorf("getting metadata form datastore: %w", err) @@ -81,11 +81,11 @@ func (m *Mgr) AddLabel(id int64, key, value string) error { // source, file path return m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta) } -func (m *Mgr) List() []int64 { +func (m *Mgr) List() []int { return m.mds.List() } -func (m *Mgr) Info(id int64) (*StoreMeta, error) { +func (m *Mgr) Info(id int) (*StoreMeta, error) { meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id))) if err != nil { return nil, xerrors.Errorf("getting metadata form datastore: %w", err) @@ -99,7 +99,7 @@ func (m *Mgr) Info(id int64) (*StoreMeta, error) { return &sm, nil } -func (m *Mgr) Remove(id int64) error { +func (m *Mgr) Remove(id int) error { if err := m.mds.Delete(id); err != nil { return xerrors.Errorf("removing import: %w", err) } diff --git a/node/repo/importmgr/multistore.go b/node/repo/importmgr/multistore.go index 705a3c947..e45fc0278 100644 --- a/node/repo/importmgr/multistore.go +++ b/node/repo/importmgr/multistore.go @@ -1,44 +1,47 @@ package importmgr import ( + "encoding/json" "fmt" - "path" + "sort" "sync" - "sync/atomic" - "github.com/hashicorp/go-multierror" + "go.uber.org/multierr" "golang.org/x/xerrors" "github.com/ipfs/go-datastore" + ktds "github.com/ipfs/go-datastore/keytransform" + "github.com/ipfs/go-datastore/query" ) -type dsProvider interface { - Datastore(namespace string) (datastore.Batching, error) - ListDatastores(namespace string) ([]int64, error) - DeleteDatastore(namespace string) error -} - type MultiStore struct { - provider dsProvider - namespace string + ds datastore.Batching - open map[int64]*Store - next int64 + open map[int]*Store + next int lk sync.RWMutex } -func NewMultiDstore(provider dsProvider, namespace string) (*MultiStore, error) { - ids, err := provider.ListDatastores(namespace) - if err != nil { - return nil, xerrors.Errorf("listing datastores: %w", err) +var dsListKey = datastore.NewKey("/list") +var dsMultiKey = datastore.NewKey("/multi") + +func NewMultiDstore(ds datastore.Batching) (*MultiStore, error) { + listBytes, err := ds.Get(dsListKey) + if xerrors.Is(err, datastore.ErrNotFound) { + listBytes, _ = json.Marshal([]int{}) + } else if err != nil { + return nil, xerrors.Errorf("could not read multistore list: %w", err) + } + + var ids []int + if err := json.Unmarshal(listBytes, &ids); err != nil { + return nil, xerrors.Errorf("could not unmarshal multistore list: %w", err) } mds := &MultiStore{ - provider: provider, - namespace: namespace, - - open: map[int64]*Store{}, + ds: ds, + open: map[int]*Store{}, } for _, i := range ids { @@ -55,15 +58,15 @@ func NewMultiDstore(provider dsProvider, namespace string) (*MultiStore, error) return mds, nil } -func (mds *MultiStore) path(i int64) string { - return path.Join("/", mds.namespace, fmt.Sprintf("%d", i)) +func (mds *MultiStore) Next() int { + mds.lk.Lock() + defer mds.lk.Unlock() + + mds.next++ + return mds.next } -func (mds *MultiStore) Next() int64 { - return atomic.AddInt64(&mds.next, 1) -} - -func (mds *MultiStore) Get(i int64) (*Store, error) { +func (mds *MultiStore) Get(i int) (*Store, error) { mds.lk.Lock() defer mds.lk.Unlock() @@ -72,40 +75,81 @@ func (mds *MultiStore) Get(i int64) (*Store, error) { return store, nil } - ds, err := mds.provider.Datastore(mds.path(i)) + wds := ktds.Wrap(mds.ds, ktds.PrefixTransform{ + Prefix: dsMultiKey.ChildString(fmt.Sprintf("%d", i)), + }) + + var err error + mds.open[i], err = openStore(wds) + + stores := make([]int, 0, len(mds.open)) + for k := range mds.open { + stores = append(stores, k) + } + sort.Ints(stores) + + listBytes, err := json.Marshal(stores) if err != nil { - return nil, err + return nil, xerrors.Errorf("could not marshal list: %w", err) + } + err = mds.ds.Put(dsListKey, listBytes) + if err != nil { + return nil, xerrors.Errorf("could not save stores list: %w", err) } - mds.open[i], err = openStore(ds) return mds.open[i], err } -func (mds *MultiStore) List() []int64 { +func (mds *MultiStore) List() []int { mds.lk.RLock() defer mds.lk.RUnlock() - out := make([]int64, 0, len(mds.open)) + + out := make([]int, 0, len(mds.open)) for i := range mds.open { out = append(out, i) } + sort.Ints(out) return out } -func (mds *MultiStore) Delete(i int64) error { +func (mds *MultiStore) Delete(i int) error { mds.lk.Lock() defer mds.lk.Unlock() store, ok := mds.open[i] - if ok { - if err := store.Close(); err != nil { - return xerrors.Errorf("closing sub-datastore %d: %w", i, err) - } - - delete(mds.open, i) + if !ok { + return nil + } + delete(mds.open, i) + err := store.Close() + if err != nil { + return xerrors.Errorf("closing store: %w", err) } - return mds.provider.DeleteDatastore(mds.path(i)) + qres, err := store.ds.Query(query.Query{KeysOnly: true}) + if err != nil { + return xerrors.Errorf("query error: %w", err) + } + + b, err := store.ds.Batch() + if err != nil { + return xerrors.Errorf("batch error: %w", err) + } + for r := range qres.Next() { + if r.Error != nil { + _ = qres.Close() + _ = b.Commit() + return xerrors.Errorf("iterator error: %w", err) + } + b.Delete(datastore.NewKey(r.Key)) + } + err = b.Commit() + if err != nil { + return xerrors.Errorf("commiting: %w", err) + } + + return nil } func (mds *MultiStore) Close() error { @@ -113,12 +157,10 @@ func (mds *MultiStore) Close() error { defer mds.lk.Unlock() var err error - for i, store := range mds.open { - cerr := store.Close() - if cerr != nil { - err = multierror.Append(err, xerrors.Errorf("closing sub-datastore %d: %w", i, cerr)) - } + for _, s := range mds.open { + err = multierr.Append(err, s.Close()) } + mds.open = make(map[int]*Store) return err } diff --git a/node/repo/interface.go b/node/repo/interface.go index c19a656af..17336d413 100644 --- a/node/repo/interface.go +++ b/node/repo/interface.go @@ -36,8 +36,6 @@ type LockedRepo interface { // Returns datastore defined in this repo. Datastore(namespace string) (datastore.Batching, error) - ListDatastores(namespace string) ([]int64, error) - DeleteDatastore(namespace string) error // Returns config in this repo Config() (interface{}, error)