2020-07-06 23:39:30 +00:00
|
|
|
package importmgr
|
|
|
|
|
|
|
|
import (
|
2020-07-17 20:14:03 +00:00
|
|
|
"encoding/json"
|
2020-07-06 23:39:30 +00:00
|
|
|
"fmt"
|
2020-07-17 20:14:03 +00:00
|
|
|
"sort"
|
2020-07-06 23:39:30 +00:00
|
|
|
"sync"
|
|
|
|
|
2020-07-17 20:14:03 +00:00
|
|
|
"go.uber.org/multierr"
|
2020-07-06 23:39:30 +00:00
|
|
|
"golang.org/x/xerrors"
|
|
|
|
|
|
|
|
"github.com/ipfs/go-datastore"
|
2020-07-17 20:14:03 +00:00
|
|
|
ktds "github.com/ipfs/go-datastore/keytransform"
|
|
|
|
"github.com/ipfs/go-datastore/query"
|
2020-07-06 23:39:30 +00:00
|
|
|
)
|
|
|
|
|
2020-07-08 21:05:54 +00:00
|
|
|
type MultiStore struct {
|
2020-07-17 20:14:03 +00:00
|
|
|
ds datastore.Batching
|
2020-07-06 23:39:30 +00:00
|
|
|
|
2020-07-17 20:14:03 +00:00
|
|
|
open map[int]*Store
|
|
|
|
next int
|
2020-07-06 23:39:30 +00:00
|
|
|
|
|
|
|
lk sync.RWMutex
|
|
|
|
}
|
|
|
|
|
2020-07-17 20:14:03 +00:00
|
|
|
var dsListKey = datastore.NewKey("/list")
|
|
|
|
var dsMultiKey = datastore.NewKey("/multi")
|
|
|
|
|
|
|
|
func NewMultiDstore(ds datastore.Batching) (*MultiStore, error) {
|
|
|
|
listBytes, err := ds.Get(dsListKey)
|
|
|
|
if xerrors.Is(err, datastore.ErrNotFound) {
|
|
|
|
listBytes, _ = json.Marshal([]int{})
|
|
|
|
} else if err != nil {
|
|
|
|
return nil, xerrors.Errorf("could not read multistore list: %w", err)
|
2020-07-06 23:39:30 +00:00
|
|
|
}
|
|
|
|
|
2020-07-17 20:14:03 +00:00
|
|
|
var ids []int
|
|
|
|
if err := json.Unmarshal(listBytes, &ids); err != nil {
|
|
|
|
return nil, xerrors.Errorf("could not unmarshal multistore list: %w", err)
|
|
|
|
}
|
2020-07-07 09:12:32 +00:00
|
|
|
|
2020-07-17 20:14:03 +00:00
|
|
|
mds := &MultiStore{
|
|
|
|
ds: ds,
|
|
|
|
open: map[int]*Store{},
|
2020-07-06 23:39:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
for _, i := range ids {
|
|
|
|
if i > mds.next {
|
|
|
|
mds.next = i
|
|
|
|
}
|
|
|
|
|
|
|
|
_, err := mds.Get(i)
|
|
|
|
if err != nil {
|
|
|
|
return nil, xerrors.Errorf("open store %d: %w", i, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return mds, nil
|
|
|
|
}
|
|
|
|
|
2020-07-17 20:14:03 +00:00
|
|
|
func (mds *MultiStore) Next() int {
|
|
|
|
mds.lk.Lock()
|
|
|
|
defer mds.lk.Unlock()
|
2020-07-06 23:39:30 +00:00
|
|
|
|
2020-07-17 20:14:03 +00:00
|
|
|
mds.next++
|
|
|
|
return mds.next
|
2020-07-06 23:39:30 +00:00
|
|
|
}
|
|
|
|
|
2020-07-17 20:44:16 +00:00
|
|
|
func (mds *MultiStore) updateStores() error {
|
|
|
|
stores := make([]int, 0, len(mds.open))
|
|
|
|
for k := range mds.open {
|
|
|
|
stores = append(stores, k)
|
|
|
|
}
|
|
|
|
sort.Ints(stores)
|
|
|
|
|
|
|
|
listBytes, err := json.Marshal(stores)
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("could not marshal list: %w", err)
|
|
|
|
}
|
|
|
|
err = mds.ds.Put(dsListKey, listBytes)
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("could not save stores list: %w", err)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-07-17 20:14:03 +00:00
|
|
|
func (mds *MultiStore) Get(i int) (*Store, error) {
|
2020-07-06 23:39:30 +00:00
|
|
|
mds.lk.Lock()
|
|
|
|
defer mds.lk.Unlock()
|
|
|
|
|
|
|
|
store, ok := mds.open[i]
|
|
|
|
if ok {
|
|
|
|
return store, nil
|
|
|
|
}
|
|
|
|
|
2020-07-17 20:14:03 +00:00
|
|
|
wds := ktds.Wrap(mds.ds, ktds.PrefixTransform{
|
|
|
|
Prefix: dsMultiKey.ChildString(fmt.Sprintf("%d", i)),
|
|
|
|
})
|
|
|
|
|
|
|
|
var err error
|
|
|
|
mds.open[i], err = openStore(wds)
|
2020-07-17 20:18:38 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, xerrors.Errorf("could not open new store: %w", err)
|
|
|
|
}
|
2020-07-17 20:14:03 +00:00
|
|
|
|
2020-07-17 20:44:16 +00:00
|
|
|
err = mds.updateStores()
|
2020-07-06 23:39:30 +00:00
|
|
|
if err != nil {
|
2020-07-17 20:44:16 +00:00
|
|
|
return nil, xerrors.Errorf("updating stores: %w", err)
|
2020-07-06 23:39:30 +00:00
|
|
|
}
|
|
|
|
|
2020-07-17 20:44:16 +00:00
|
|
|
return mds.open[i], nil
|
2020-07-06 23:39:30 +00:00
|
|
|
}
|
|
|
|
|
2020-07-17 20:14:03 +00:00
|
|
|
func (mds *MultiStore) List() []int {
|
2020-07-07 08:52:19 +00:00
|
|
|
mds.lk.RLock()
|
|
|
|
defer mds.lk.RUnlock()
|
2020-07-17 20:14:03 +00:00
|
|
|
|
|
|
|
out := make([]int, 0, len(mds.open))
|
2020-07-07 08:52:19 +00:00
|
|
|
for i := range mds.open {
|
|
|
|
out = append(out, i)
|
|
|
|
}
|
2020-07-17 20:14:03 +00:00
|
|
|
sort.Ints(out)
|
2020-07-07 08:52:19 +00:00
|
|
|
|
|
|
|
return out
|
|
|
|
}
|
|
|
|
|
2020-07-17 20:14:03 +00:00
|
|
|
func (mds *MultiStore) Delete(i int) error {
|
2020-07-06 23:39:30 +00:00
|
|
|
mds.lk.Lock()
|
|
|
|
defer mds.lk.Unlock()
|
|
|
|
|
|
|
|
store, ok := mds.open[i]
|
2020-07-17 20:14:03 +00:00
|
|
|
if !ok {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
delete(mds.open, i)
|
|
|
|
err := store.Close()
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("closing store: %w", err)
|
|
|
|
}
|
|
|
|
|
2020-07-17 20:44:16 +00:00
|
|
|
err = mds.updateStores()
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("updating stores: %w", err)
|
|
|
|
}
|
|
|
|
|
2020-07-17 20:14:03 +00:00
|
|
|
qres, err := store.ds.Query(query.Query{KeysOnly: true})
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("query error: %w", err)
|
|
|
|
}
|
2020-07-17 20:18:38 +00:00
|
|
|
defer qres.Close() //nolint:errcheck
|
2020-07-06 23:39:30 +00:00
|
|
|
|
2020-07-17 20:14:03 +00:00
|
|
|
b, err := store.ds.Batch()
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("batch error: %w", err)
|
|
|
|
}
|
2020-07-17 20:18:38 +00:00
|
|
|
|
2020-07-17 20:14:03 +00:00
|
|
|
for r := range qres.Next() {
|
|
|
|
if r.Error != nil {
|
|
|
|
_ = b.Commit()
|
|
|
|
return xerrors.Errorf("iterator error: %w", err)
|
|
|
|
}
|
2020-07-17 20:18:38 +00:00
|
|
|
err := b.Delete(datastore.NewKey(r.Key))
|
|
|
|
if err != nil {
|
|
|
|
_ = b.Commit()
|
|
|
|
return xerrors.Errorf("adding to batch: %w", err)
|
|
|
|
}
|
2020-07-17 20:14:03 +00:00
|
|
|
}
|
2020-07-17 20:18:38 +00:00
|
|
|
|
2020-07-17 20:14:03 +00:00
|
|
|
err = b.Commit()
|
|
|
|
if err != nil {
|
2020-07-17 20:18:38 +00:00
|
|
|
return xerrors.Errorf("committing: %w", err)
|
2020-07-06 23:39:30 +00:00
|
|
|
}
|
|
|
|
|
2020-07-17 20:14:03 +00:00
|
|
|
return nil
|
2020-07-06 23:39:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (mds *MultiStore) Close() error {
|
|
|
|
mds.lk.Lock()
|
|
|
|
defer mds.lk.Unlock()
|
|
|
|
|
|
|
|
var err error
|
2020-07-17 20:14:03 +00:00
|
|
|
for _, s := range mds.open {
|
|
|
|
err = multierr.Append(err, s.Close())
|
2020-07-06 23:39:30 +00:00
|
|
|
}
|
2020-07-17 20:14:03 +00:00
|
|
|
mds.open = make(map[int]*Store)
|
2020-07-06 23:39:30 +00:00
|
|
|
|
|
|
|
return err
|
|
|
|
}
|