From dd885b7302351cefeb52ca781725155a4dd8d51a Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Fri, 24 Jul 2020 14:47:22 -0700 Subject: [PATCH] feat(multistore): extract multistore code to repo Extract multistore + multiread blockstore to a seperate repo --- go.mod | 1 + go.sum | 2 + node/impl/client/client.go | 3 +- node/modules/client.go | 3 +- node/modules/dtypes/storage.go | 3 +- node/repo/importmgr/mbstore.go | 95 --------------- node/repo/importmgr/mgr.go | 14 +-- node/repo/importmgr/multistore.go | 188 ------------------------------ node/repo/importmgr/store.go | 54 --------- 9 files changed, 16 insertions(+), 347 deletions(-) delete mode 100644 node/repo/importmgr/mbstore.go delete mode 100644 node/repo/importmgr/multistore.go delete mode 100644 node/repo/importmgr/store.go diff --git a/go.mod b/go.mod index 89a63ec2c..b15ddb81e 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f github.com/filecoin-project/go-fil-markets v0.5.1 github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24 + github.com/filecoin-project/go-multistore v0.0.1 github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261 github.com/filecoin-project/go-statestore v0.1.0 github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b diff --git a/go.sum b/go.sum index 5bafb5f91..52de2ca8d 100644 --- a/go.sum +++ b/go.sum @@ -248,6 +248,8 @@ github.com/filecoin-project/go-fil-markets v0.5.1 h1:Y69glslNCuXnygfesCmyilTVhEE github.com/filecoin-project/go-fil-markets v0.5.1/go.mod h1:GKGigsFNMvKmx/+Mcn7093TdZTiCDLc7YGxQ7d6fq2s= github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24 h1:Jc7vkplmZYVuaEcSXGHDwefvZIdoyyaoGDLqSr8Svms= github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24/go.mod h1:j6zV//WXIIY5kky873Q3iIKt/ViOE8rcijovmpxrXzM= +github.com/filecoin-project/go-multistore v0.0.1 h1:wXCd02azCxEcMNlDE9lksraQO+iIjFGNw01IZyf8GPA= +github.com/filecoin-project/go-multistore v0.0.1/go.mod h1:z8NeSPWubEvrzi0XolhZ1NjTeW9ZDR779M+EDhf4QIQ= github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs= github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6/go.mod h1:0HgYnrkeSU4lu1p+LEOeDpFsNBssa0OGGriWdA4hvaE= github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc= diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 53dacb8de..1e7fa8146 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -33,6 +33,7 @@ import ( rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/sector-storage/ffiwrapper" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" @@ -556,7 +557,7 @@ func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath stri return f.Close() } -func (a *API) clientImport(ctx context.Context, ref api.FileRef, store *importmgr.Store) (cid.Cid, error) { +func (a *API) clientImport(ctx context.Context, ref api.FileRef, store *multistore.Store) (cid.Cid, error) { f, err := os.Open(ref.Path) if err != nil { return cid.Undef, err diff --git a/node/modules/client.go b/node/modules/client.go index a24709beb..77e66a69a 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/lotus/lib/bufbstore" "golang.org/x/xerrors" @@ -42,7 +43,7 @@ func ClientMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.ClientMult return nil, xerrors.Errorf("getting datastore out of reop: %w", err) } - mds, err := importmgr.NewMultiDstore(ds) + mds, err := multistore.NewMultiDstore(ds) if err != nil { return nil, err } diff --git a/node/modules/dtypes/storage.go b/node/modules/dtypes/storage.go index 7ede8aaab..b4712a37e 100644 --- a/node/modules/dtypes/storage.go +++ b/node/modules/dtypes/storage.go @@ -9,6 +9,7 @@ import ( format "github.com/ipfs/go-ipld-format" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" + "github.com/filecoin-project/go-multistore" datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/piecestore" @@ -27,7 +28,7 @@ type ChainGCBlockstore blockstore.GCBlockstore type ChainExchange exchange.Interface type ChainBlockService bserv.BlockService -type ClientMultiDstore *importmgr.MultiStore +type ClientMultiDstore *multistore.MultiStore type ClientImportMgr *importmgr.Mgr type ClientBlockstore blockstore.Blockstore type ClientDealStore *statestore.StateStore diff --git a/node/repo/importmgr/mbstore.go b/node/repo/importmgr/mbstore.go deleted file mode 100644 index 3b6058bee..000000000 --- a/node/repo/importmgr/mbstore.go +++ /dev/null @@ -1,95 +0,0 @@ -package importmgr - -import ( - "context" - - "github.com/hashicorp/go-multierror" - "golang.org/x/xerrors" - - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - blockstore "github.com/ipfs/go-ipfs-blockstore" -) - -type multiReadBs struct { - // TODO: some caching - mds *MultiStore -} - -func (m *multiReadBs) Has(cid cid.Cid) (bool, error) { - m.mds.lk.RLock() - defer m.mds.lk.RUnlock() - - var merr error - for i, store := range m.mds.open { - has, err := store.Bstore.Has(cid) - if err != nil { - merr = multierror.Append(merr, xerrors.Errorf("has (ds %d): %w", i, err)) - continue - } - if !has { - continue - } - - return true, nil - } - - return false, merr -} - -func (m *multiReadBs) Get(cid cid.Cid) (blocks.Block, error) { - m.mds.lk.RLock() - defer m.mds.lk.RUnlock() - - var merr error - for i, store := range m.mds.open { - has, err := store.Bstore.Has(cid) - if err != nil { - merr = multierror.Append(merr, xerrors.Errorf("has (ds %d): %w", i, err)) - continue - } - if !has { - continue - } - - val, err := store.Bstore.Get(cid) - if err != nil { - merr = multierror.Append(merr, xerrors.Errorf("get (ds %d): %w", i, err)) - continue - } - - return val, nil - } - - if merr == nil { - return nil, blockstore.ErrNotFound - } - - return nil, merr -} - -func (m *multiReadBs) DeleteBlock(cid cid.Cid) error { - return xerrors.Errorf("operation not supported") -} - -func (m *multiReadBs) GetSize(cid cid.Cid) (int, error) { - return 0, xerrors.Errorf("operation not supported") -} - -func (m *multiReadBs) Put(block blocks.Block) error { - return xerrors.Errorf("operation not supported") -} - -func (m *multiReadBs) PutMany(blocks []blocks.Block) error { - return xerrors.Errorf("operation not supported") -} - -func (m *multiReadBs) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { - return nil, xerrors.Errorf("operation not supported") -} - -func (m *multiReadBs) HashOnRead(enabled bool) { - return -} - -var _ blockstore.Blockstore = &multiReadBs{} diff --git a/node/repo/importmgr/mgr.go b/node/repo/importmgr/mgr.go index d3139918e..754f41df2 100644 --- a/node/repo/importmgr/mgr.go +++ b/node/repo/importmgr/mgr.go @@ -9,10 +9,12 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" blockstore "github.com/ipfs/go-ipfs-blockstore" + + "github.com/filecoin-project/go-multistore" ) type Mgr struct { - mds *MultiStore + mds *multistore.MultiStore ds datastore.Batching Blockstore blockstore.Blockstore @@ -27,12 +29,10 @@ const ( LMTime = "mtime" // File modification timestamp ) -func New(mds *MultiStore, ds datastore.Batching) *Mgr { +func New(mds *multistore.MultiStore, ds datastore.Batching) *Mgr { return &Mgr{ - mds: mds, - Blockstore: &multiReadBs{ - mds: mds, - }, + mds: mds, + Blockstore: mds.MultiReadBlockstore(), ds: datastore.NewLogDatastore(namespace.Wrap(ds, datastore.NewKey("/stores")), "storess"), } @@ -42,7 +42,7 @@ type StoreMeta struct { Labels map[string]string } -func (m *Mgr) NewStore() (int, *Store, error) { +func (m *Mgr) NewStore() (int, *multistore.Store, error) { id := m.mds.Next() st, err := m.mds.Get(id) if err != nil { diff --git a/node/repo/importmgr/multistore.go b/node/repo/importmgr/multistore.go deleted file mode 100644 index e7a814d4c..000000000 --- a/node/repo/importmgr/multistore.go +++ /dev/null @@ -1,188 +0,0 @@ -package importmgr - -import ( - "encoding/json" - "fmt" - "sort" - "sync" - - "go.uber.org/multierr" - "golang.org/x/xerrors" - - "github.com/ipfs/go-datastore" - ktds "github.com/ipfs/go-datastore/keytransform" - "github.com/ipfs/go-datastore/query" -) - -type MultiStore struct { - ds datastore.Batching - - open map[int]*Store - next int - - lk sync.RWMutex -} - -var dsListKey = datastore.NewKey("/list") -var dsMultiKey = datastore.NewKey("/multi") - -func NewMultiDstore(ds datastore.Batching) (*MultiStore, error) { - listBytes, err := ds.Get(dsListKey) - if xerrors.Is(err, datastore.ErrNotFound) { - listBytes, _ = json.Marshal([]int{}) - } else if err != nil { - return nil, xerrors.Errorf("could not read multistore list: %w", err) - } - - var ids []int - if err := json.Unmarshal(listBytes, &ids); err != nil { - return nil, xerrors.Errorf("could not unmarshal multistore list: %w", err) - } - - mds := &MultiStore{ - ds: ds, - open: map[int]*Store{}, - } - - for _, i := range ids { - if i > mds.next { - mds.next = i - } - - _, err := mds.Get(i) - if err != nil { - return nil, xerrors.Errorf("open store %d: %w", i, err) - } - } - - return mds, nil -} - -func (mds *MultiStore) Next() int { - mds.lk.Lock() - defer mds.lk.Unlock() - - mds.next++ - return mds.next -} - -func (mds *MultiStore) updateStores() error { - stores := make([]int, 0, len(mds.open)) - for k := range mds.open { - stores = append(stores, k) - } - sort.Ints(stores) - - listBytes, err := json.Marshal(stores) - if err != nil { - return xerrors.Errorf("could not marshal list: %w", err) - } - err = mds.ds.Put(dsListKey, listBytes) - if err != nil { - return xerrors.Errorf("could not save stores list: %w", err) - } - return nil -} - -func (mds *MultiStore) Get(i int) (*Store, error) { - mds.lk.Lock() - defer mds.lk.Unlock() - - store, ok := mds.open[i] - if ok { - return store, nil - } - - wds := ktds.Wrap(mds.ds, ktds.PrefixTransform{ - Prefix: dsMultiKey.ChildString(fmt.Sprintf("%d", i)), - }) - - var err error - mds.open[i], err = openStore(wds) - if err != nil { - return nil, xerrors.Errorf("could not open new store: %w", err) - } - - err = mds.updateStores() - if err != nil { - return nil, xerrors.Errorf("updating stores: %w", err) - } - - return mds.open[i], nil -} - -func (mds *MultiStore) List() []int { - mds.lk.RLock() - defer mds.lk.RUnlock() - - out := make([]int, 0, len(mds.open)) - for i := range mds.open { - out = append(out, i) - } - sort.Ints(out) - - return out -} - -func (mds *MultiStore) Delete(i int) error { - mds.lk.Lock() - defer mds.lk.Unlock() - - store, ok := mds.open[i] - if !ok { - return nil - } - delete(mds.open, i) - err := store.Close() - if err != nil { - return xerrors.Errorf("closing store: %w", err) - } - - err = mds.updateStores() - if err != nil { - return xerrors.Errorf("updating stores: %w", err) - } - - qres, err := store.ds.Query(query.Query{KeysOnly: true}) - if err != nil { - return xerrors.Errorf("query error: %w", err) - } - defer qres.Close() //nolint:errcheck - - b, err := store.ds.Batch() - if err != nil { - return xerrors.Errorf("batch error: %w", err) - } - - for r := range qres.Next() { - if r.Error != nil { - _ = b.Commit() - return xerrors.Errorf("iterator error: %w", err) - } - err := b.Delete(datastore.NewKey(r.Key)) - if err != nil { - _ = b.Commit() - return xerrors.Errorf("adding to batch: %w", err) - } - } - - err = b.Commit() - if err != nil { - return xerrors.Errorf("committing: %w", err) - } - - return nil -} - -func (mds *MultiStore) Close() error { - mds.lk.Lock() - defer mds.lk.Unlock() - - var err error - for _, s := range mds.open { - err = multierr.Append(err, s.Close()) - } - mds.open = make(map[int]*Store) - - return err -} diff --git a/node/repo/importmgr/store.go b/node/repo/importmgr/store.go deleted file mode 100644 index 78bb7462b..000000000 --- a/node/repo/importmgr/store.go +++ /dev/null @@ -1,54 +0,0 @@ -package importmgr - -import ( - "github.com/ipfs/go-blockservice" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - "github.com/ipfs/go-filestore" - blockstore "github.com/ipfs/go-ipfs-blockstore" - offline "github.com/ipfs/go-ipfs-exchange-offline" - ipld "github.com/ipfs/go-ipld-format" - "github.com/ipfs/go-merkledag" -) - -type Store struct { - ds datastore.Batching - - fm *filestore.FileManager - Fstore *filestore.Filestore - - Bstore blockstore.Blockstore - - bsvc blockservice.BlockService - DAG ipld.DAGService -} - -func openStore(ds datastore.Batching) (*Store, error) { - blocks := namespace.Wrap(ds, datastore.NewKey("blocks")) - bs := blockstore.NewBlockstore(blocks) - - fm := filestore.NewFileManager(ds, "/") - fm.AllowFiles = true - - fstore := filestore.NewFilestore(bs, fm) - ibs := blockstore.NewIdStore(fstore) - - bsvc := blockservice.New(ibs, offline.Exchange(ibs)) - dag := merkledag.NewDAGService(bsvc) - - return &Store{ - ds: ds, - - fm: fm, - Fstore: fstore, - - Bstore: ibs, - - bsvc: bsvc, - DAG: dag, - }, nil -} - -func (s *Store) Close() error { - return s.bsvc.Close() -}