feat(multistore): extract multistore code to repo
Extract multistore + multiread blockstore to a seperate repo
This commit is contained in:
parent
7410c057c6
commit
dd885b7302
1
go.mod
1
go.mod
@ -25,6 +25,7 @@ require (
|
||||
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f
|
||||
github.com/filecoin-project/go-fil-markets v0.5.1
|
||||
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24
|
||||
github.com/filecoin-project/go-multistore v0.0.1
|
||||
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261
|
||||
github.com/filecoin-project/go-statestore v0.1.0
|
||||
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
|
||||
|
2
go.sum
2
go.sum
@ -248,6 +248,8 @@ github.com/filecoin-project/go-fil-markets v0.5.1 h1:Y69glslNCuXnygfesCmyilTVhEE
|
||||
github.com/filecoin-project/go-fil-markets v0.5.1/go.mod h1:GKGigsFNMvKmx/+Mcn7093TdZTiCDLc7YGxQ7d6fq2s=
|
||||
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24 h1:Jc7vkplmZYVuaEcSXGHDwefvZIdoyyaoGDLqSr8Svms=
|
||||
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24/go.mod h1:j6zV//WXIIY5kky873Q3iIKt/ViOE8rcijovmpxrXzM=
|
||||
github.com/filecoin-project/go-multistore v0.0.1 h1:wXCd02azCxEcMNlDE9lksraQO+iIjFGNw01IZyf8GPA=
|
||||
github.com/filecoin-project/go-multistore v0.0.1/go.mod h1:z8NeSPWubEvrzi0XolhZ1NjTeW9ZDR779M+EDhf4QIQ=
|
||||
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs=
|
||||
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6/go.mod h1:0HgYnrkeSU4lu1p+LEOeDpFsNBssa0OGGriWdA4hvaE=
|
||||
github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
|
||||
|
@ -33,6 +33,7 @@ import (
|
||||
rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||
"github.com/filecoin-project/go-fil-markets/shared"
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||
"github.com/filecoin-project/go-multistore"
|
||||
"github.com/filecoin-project/sector-storage/ffiwrapper"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||
@ -556,7 +557,7 @@ func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath stri
|
||||
return f.Close()
|
||||
}
|
||||
|
||||
func (a *API) clientImport(ctx context.Context, ref api.FileRef, store *importmgr.Store) (cid.Cid, error) {
|
||||
func (a *API) clientImport(ctx context.Context, ref api.FileRef, store *multistore.Store) (cid.Cid, error) {
|
||||
f, err := os.Open(ref.Path)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/go-multistore"
|
||||
"github.com/filecoin-project/lotus/lib/bufbstore"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
@ -42,7 +43,7 @@ func ClientMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.ClientMult
|
||||
return nil, xerrors.Errorf("getting datastore out of reop: %w", err)
|
||||
}
|
||||
|
||||
mds, err := importmgr.NewMultiDstore(ds)
|
||||
mds, err := multistore.NewMultiDstore(ds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
format "github.com/ipfs/go-ipld-format"
|
||||
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
|
||||
"github.com/filecoin-project/go-multistore"
|
||||
|
||||
datatransfer "github.com/filecoin-project/go-data-transfer"
|
||||
"github.com/filecoin-project/go-fil-markets/piecestore"
|
||||
@ -27,7 +28,7 @@ type ChainGCBlockstore blockstore.GCBlockstore
|
||||
type ChainExchange exchange.Interface
|
||||
type ChainBlockService bserv.BlockService
|
||||
|
||||
type ClientMultiDstore *importmgr.MultiStore
|
||||
type ClientMultiDstore *multistore.MultiStore
|
||||
type ClientImportMgr *importmgr.Mgr
|
||||
type ClientBlockstore blockstore.Blockstore
|
||||
type ClientDealStore *statestore.StateStore
|
||||
|
@ -1,95 +0,0 @@
|
||||
package importmgr
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
blocks "github.com/ipfs/go-block-format"
|
||||
"github.com/ipfs/go-cid"
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
)
|
||||
|
||||
type multiReadBs struct {
|
||||
// TODO: some caching
|
||||
mds *MultiStore
|
||||
}
|
||||
|
||||
func (m *multiReadBs) Has(cid cid.Cid) (bool, error) {
|
||||
m.mds.lk.RLock()
|
||||
defer m.mds.lk.RUnlock()
|
||||
|
||||
var merr error
|
||||
for i, store := range m.mds.open {
|
||||
has, err := store.Bstore.Has(cid)
|
||||
if err != nil {
|
||||
merr = multierror.Append(merr, xerrors.Errorf("has (ds %d): %w", i, err))
|
||||
continue
|
||||
}
|
||||
if !has {
|
||||
continue
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, merr
|
||||
}
|
||||
|
||||
func (m *multiReadBs) Get(cid cid.Cid) (blocks.Block, error) {
|
||||
m.mds.lk.RLock()
|
||||
defer m.mds.lk.RUnlock()
|
||||
|
||||
var merr error
|
||||
for i, store := range m.mds.open {
|
||||
has, err := store.Bstore.Has(cid)
|
||||
if err != nil {
|
||||
merr = multierror.Append(merr, xerrors.Errorf("has (ds %d): %w", i, err))
|
||||
continue
|
||||
}
|
||||
if !has {
|
||||
continue
|
||||
}
|
||||
|
||||
val, err := store.Bstore.Get(cid)
|
||||
if err != nil {
|
||||
merr = multierror.Append(merr, xerrors.Errorf("get (ds %d): %w", i, err))
|
||||
continue
|
||||
}
|
||||
|
||||
return val, nil
|
||||
}
|
||||
|
||||
if merr == nil {
|
||||
return nil, blockstore.ErrNotFound
|
||||
}
|
||||
|
||||
return nil, merr
|
||||
}
|
||||
|
||||
func (m *multiReadBs) DeleteBlock(cid cid.Cid) error {
|
||||
return xerrors.Errorf("operation not supported")
|
||||
}
|
||||
|
||||
func (m *multiReadBs) GetSize(cid cid.Cid) (int, error) {
|
||||
return 0, xerrors.Errorf("operation not supported")
|
||||
}
|
||||
|
||||
func (m *multiReadBs) Put(block blocks.Block) error {
|
||||
return xerrors.Errorf("operation not supported")
|
||||
}
|
||||
|
||||
func (m *multiReadBs) PutMany(blocks []blocks.Block) error {
|
||||
return xerrors.Errorf("operation not supported")
|
||||
}
|
||||
|
||||
func (m *multiReadBs) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
|
||||
return nil, xerrors.Errorf("operation not supported")
|
||||
}
|
||||
|
||||
func (m *multiReadBs) HashOnRead(enabled bool) {
|
||||
return
|
||||
}
|
||||
|
||||
var _ blockstore.Blockstore = &multiReadBs{}
|
@ -9,10 +9,12 @@ import (
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
|
||||
"github.com/filecoin-project/go-multistore"
|
||||
)
|
||||
|
||||
type Mgr struct {
|
||||
mds *MultiStore
|
||||
mds *multistore.MultiStore
|
||||
ds datastore.Batching
|
||||
|
||||
Blockstore blockstore.Blockstore
|
||||
@ -27,12 +29,10 @@ const (
|
||||
LMTime = "mtime" // File modification timestamp
|
||||
)
|
||||
|
||||
func New(mds *MultiStore, ds datastore.Batching) *Mgr {
|
||||
func New(mds *multistore.MultiStore, ds datastore.Batching) *Mgr {
|
||||
return &Mgr{
|
||||
mds: mds,
|
||||
Blockstore: &multiReadBs{
|
||||
mds: mds,
|
||||
},
|
||||
Blockstore: mds.MultiReadBlockstore(),
|
||||
|
||||
ds: datastore.NewLogDatastore(namespace.Wrap(ds, datastore.NewKey("/stores")), "storess"),
|
||||
}
|
||||
@ -42,7 +42,7 @@ type StoreMeta struct {
|
||||
Labels map[string]string
|
||||
}
|
||||
|
||||
func (m *Mgr) NewStore() (int, *Store, error) {
|
||||
func (m *Mgr) NewStore() (int, *multistore.Store, error) {
|
||||
id := m.mds.Next()
|
||||
st, err := m.mds.Get(id)
|
||||
if err != nil {
|
||||
|
@ -1,188 +0,0 @@
|
||||
package importmgr
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/multierr"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/ipfs/go-datastore"
|
||||
ktds "github.com/ipfs/go-datastore/keytransform"
|
||||
"github.com/ipfs/go-datastore/query"
|
||||
)
|
||||
|
||||
type MultiStore struct {
|
||||
ds datastore.Batching
|
||||
|
||||
open map[int]*Store
|
||||
next int
|
||||
|
||||
lk sync.RWMutex
|
||||
}
|
||||
|
||||
var dsListKey = datastore.NewKey("/list")
|
||||
var dsMultiKey = datastore.NewKey("/multi")
|
||||
|
||||
func NewMultiDstore(ds datastore.Batching) (*MultiStore, error) {
|
||||
listBytes, err := ds.Get(dsListKey)
|
||||
if xerrors.Is(err, datastore.ErrNotFound) {
|
||||
listBytes, _ = json.Marshal([]int{})
|
||||
} else if err != nil {
|
||||
return nil, xerrors.Errorf("could not read multistore list: %w", err)
|
||||
}
|
||||
|
||||
var ids []int
|
||||
if err := json.Unmarshal(listBytes, &ids); err != nil {
|
||||
return nil, xerrors.Errorf("could not unmarshal multistore list: %w", err)
|
||||
}
|
||||
|
||||
mds := &MultiStore{
|
||||
ds: ds,
|
||||
open: map[int]*Store{},
|
||||
}
|
||||
|
||||
for _, i := range ids {
|
||||
if i > mds.next {
|
||||
mds.next = i
|
||||
}
|
||||
|
||||
_, err := mds.Get(i)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("open store %d: %w", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
return mds, nil
|
||||
}
|
||||
|
||||
func (mds *MultiStore) Next() int {
|
||||
mds.lk.Lock()
|
||||
defer mds.lk.Unlock()
|
||||
|
||||
mds.next++
|
||||
return mds.next
|
||||
}
|
||||
|
||||
func (mds *MultiStore) updateStores() error {
|
||||
stores := make([]int, 0, len(mds.open))
|
||||
for k := range mds.open {
|
||||
stores = append(stores, k)
|
||||
}
|
||||
sort.Ints(stores)
|
||||
|
||||
listBytes, err := json.Marshal(stores)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("could not marshal list: %w", err)
|
||||
}
|
||||
err = mds.ds.Put(dsListKey, listBytes)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("could not save stores list: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mds *MultiStore) Get(i int) (*Store, error) {
|
||||
mds.lk.Lock()
|
||||
defer mds.lk.Unlock()
|
||||
|
||||
store, ok := mds.open[i]
|
||||
if ok {
|
||||
return store, nil
|
||||
}
|
||||
|
||||
wds := ktds.Wrap(mds.ds, ktds.PrefixTransform{
|
||||
Prefix: dsMultiKey.ChildString(fmt.Sprintf("%d", i)),
|
||||
})
|
||||
|
||||
var err error
|
||||
mds.open[i], err = openStore(wds)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("could not open new store: %w", err)
|
||||
}
|
||||
|
||||
err = mds.updateStores()
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("updating stores: %w", err)
|
||||
}
|
||||
|
||||
return mds.open[i], nil
|
||||
}
|
||||
|
||||
func (mds *MultiStore) List() []int {
|
||||
mds.lk.RLock()
|
||||
defer mds.lk.RUnlock()
|
||||
|
||||
out := make([]int, 0, len(mds.open))
|
||||
for i := range mds.open {
|
||||
out = append(out, i)
|
||||
}
|
||||
sort.Ints(out)
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func (mds *MultiStore) Delete(i int) error {
|
||||
mds.lk.Lock()
|
||||
defer mds.lk.Unlock()
|
||||
|
||||
store, ok := mds.open[i]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
delete(mds.open, i)
|
||||
err := store.Close()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("closing store: %w", err)
|
||||
}
|
||||
|
||||
err = mds.updateStores()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("updating stores: %w", err)
|
||||
}
|
||||
|
||||
qres, err := store.ds.Query(query.Query{KeysOnly: true})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("query error: %w", err)
|
||||
}
|
||||
defer qres.Close() //nolint:errcheck
|
||||
|
||||
b, err := store.ds.Batch()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("batch error: %w", err)
|
||||
}
|
||||
|
||||
for r := range qres.Next() {
|
||||
if r.Error != nil {
|
||||
_ = b.Commit()
|
||||
return xerrors.Errorf("iterator error: %w", err)
|
||||
}
|
||||
err := b.Delete(datastore.NewKey(r.Key))
|
||||
if err != nil {
|
||||
_ = b.Commit()
|
||||
return xerrors.Errorf("adding to batch: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
err = b.Commit()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("committing: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mds *MultiStore) Close() error {
|
||||
mds.lk.Lock()
|
||||
defer mds.lk.Unlock()
|
||||
|
||||
var err error
|
||||
for _, s := range mds.open {
|
||||
err = multierr.Append(err, s.Close())
|
||||
}
|
||||
mds.open = make(map[int]*Store)
|
||||
|
||||
return err
|
||||
}
|
@ -1,54 +0,0 @@
|
||||
package importmgr
|
||||
|
||||
import (
|
||||
"github.com/ipfs/go-blockservice"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
"github.com/ipfs/go-filestore"
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
offline "github.com/ipfs/go-ipfs-exchange-offline"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
"github.com/ipfs/go-merkledag"
|
||||
)
|
||||
|
||||
type Store struct {
|
||||
ds datastore.Batching
|
||||
|
||||
fm *filestore.FileManager
|
||||
Fstore *filestore.Filestore
|
||||
|
||||
Bstore blockstore.Blockstore
|
||||
|
||||
bsvc blockservice.BlockService
|
||||
DAG ipld.DAGService
|
||||
}
|
||||
|
||||
func openStore(ds datastore.Batching) (*Store, error) {
|
||||
blocks := namespace.Wrap(ds, datastore.NewKey("blocks"))
|
||||
bs := blockstore.NewBlockstore(blocks)
|
||||
|
||||
fm := filestore.NewFileManager(ds, "/")
|
||||
fm.AllowFiles = true
|
||||
|
||||
fstore := filestore.NewFilestore(bs, fm)
|
||||
ibs := blockstore.NewIdStore(fstore)
|
||||
|
||||
bsvc := blockservice.New(ibs, offline.Exchange(ibs))
|
||||
dag := merkledag.NewDAGService(bsvc)
|
||||
|
||||
return &Store{
|
||||
ds: ds,
|
||||
|
||||
fm: fm,
|
||||
Fstore: fstore,
|
||||
|
||||
Bstore: ibs,
|
||||
|
||||
bsvc: bsvc,
|
||||
DAG: dag,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Store) Close() error {
|
||||
return s.bsvc.Close()
|
||||
}
|
Loading…
Reference in New Issue
Block a user