feat(retrievalstoremgr): add retrievalstoremgr

add manager for retrievals to handle different cases for IPFS/non-ipfs
This commit is contained in:
hannahhoward 2020-07-31 13:24:50 -07:00
parent 6b67c1cf39
commit 3cac1080cc
7 changed files with 278 additions and 14 deletions

View File

@ -406,11 +406,13 @@ func ConfigFullNode(c interface{}) Option {
}
ipfsMaddr := cfg.Client.IpfsMAddr
useForRetrieval := cfg.Client.IpfsUseForRetrieval
return Options(
ConfigCommon(&cfg.Common),
If(cfg.Client.UseIpfs,
Override(new(dtypes.ClientBlockstore), modules.IpfsClientBlockstore(ipfsMaddr, useForRetrieval)),
Override(new(dtypes.ClientBlockstore), modules.IpfsClientBlockstore(ipfsMaddr)),
If(cfg.Client.IpfsUseForRetrieval,
Override(new(dtypes.ClientRetrievalStoreManager), modules.ClientBlockstoreRetrievalStoreManager),
),
),
If(cfg.Metrics.HeadNotifs,
Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)),
@ -456,7 +458,7 @@ func Repo(r repo.Repo) Option {
Override(new(dtypes.ClientMultiDstore), modules.ClientMultiDatastore),
Override(new(dtypes.ClientBlockstore), modules.ClientBlockstore),
Override(new(dtypes.ClientRetrievalStoreManager), modules.ClientRetrievalStoreManager),
Override(new(ci.PrivKey), lp2p.PrivKey),
Override(new(ci.PubKey), ci.PrivKey.GetPublic),
Override(new(peer.ID), peer.IDFromPublicKey),

View File

@ -71,6 +71,7 @@ type API struct {
Imports dtypes.ClientImportMgr
CombinedBstore dtypes.ClientBlockstore // TODO: try to remove
RetrievalStoreMgr dtypes.ClientRetrievalStoreManager
}
func calcDealExpiration(minDuration uint64, md *miner.DeadlineInfo, startEpoch abi.ChainEpoch) abi.ChainEpoch {
@ -451,6 +452,16 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
if err != nil {
return xerrors.Errorf("Error in retrieval params: %s", err)
}
store, err := a.RetrievalStoreMgr.NewStore()
if err != nil {
return xerrors.Errorf("Error setting up new store: %w", err)
}
defer func() {
_ = a.RetrievalStoreMgr.ReleaseStore(store)
}()
_, err = a.Retrieval.Retrieve(
ctx,
order.Root,
@ -459,7 +470,7 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
order.MinerPeerID,
order.Client,
order.Miner,
nil) // TODO: pass the store here somehow
store.StoreID())
if err != nil {
return xerrors.Errorf("Retrieve failed: %w", err)
@ -480,7 +491,7 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return nil
}
rdag := merkledag.NewDAGService(blockservice.New(a.RetBstore, offline.Exchange(a.RetBstore)))
rdag := store.DAGService()
if ref.IsCAR {
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)

View File

@ -35,6 +35,7 @@ import (
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/node/repo/importmgr"
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
"github.com/filecoin-project/lotus/paychmgr"
)
@ -156,3 +157,13 @@ func RetrievalClient(lc fx.Lifecycle, h host.Host, mds dtypes.ClientMultiDstore,
})
return client, nil
}
// ClientRetrievalStoreManager is the default version of the RetrievalStoreManager that runs on multistore
func ClientRetrievalStoreManager(imgr dtypes.ClientImportMgr) dtypes.ClientRetrievalStoreManager {
return retrievalstoremgr.NewMultiStoreRetrievalStoreManager(imgr)
}
// ClientBlockstoreRetrievalStoreManager is the default version of the RetrievalStoreManager that runs on multistore
func ClientBlockstoreRetrievalStoreManager(bs dtypes.ClientBlockstore) dtypes.ClientRetrievalStoreManager {
return retrievalstoremgr.NewBlockstoreRetrievalStoreManager(bs)
}

View File

@ -16,6 +16,7 @@ import (
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/node/repo/importmgr"
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
)
// MetadataDS stores metadata
@ -35,6 +36,7 @@ type ClientBlockstore blockstore.Blockstore
type ClientDealStore *statestore.StateStore
type ClientRequestValidator *requestvalidation.UnifiedRequestValidator
type ClientDatastore datastore.Batching
type ClientRetrievalStoreManager retrievalstoremgr.RetrievalStoreManager
type Graphsync graphsync.GraphExchange

View File

@ -7,7 +7,6 @@ import (
"github.com/multiformats/go-multiaddr"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/lib/bufbstore"
"github.com/filecoin-project/lotus/lib/ipfsbstore"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
@ -17,7 +16,7 @@ import (
// If ipfsMaddr is empty, a local IPFS node is assumed considering IPFS_PATH configuration.
// If ipfsMaddr is not empty, it will connect to the remote IPFS node with the provided multiaddress.
// The flag useForRetrieval indicates if the IPFS node will also be used for storing retrieving deals.
func IpfsClientBlockstore(ipfsMaddr string, useForRetrieval bool) func(helpers.MetricsCtx, fx.Lifecycle, dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) {
func IpfsClientBlockstore(ipfsMaddr string) func(helpers.MetricsCtx, fx.Lifecycle, dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, localStore dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) {
var err error
var ipfsbs blockstore.Blockstore
@ -34,12 +33,6 @@ func IpfsClientBlockstore(ipfsMaddr string, useForRetrieval bool) func(helpers.M
if err != nil {
return nil, xerrors.Errorf("constructing ipfs blockstore: %w", err)
}
ipfsbs = blockstore.WrapIDStore(ipfsbs)
var ws blockstore.Blockstore
ws = ipfsbs
if !useForRetrieval {
ws = blockstore.WrapIDStore(localStore.Blockstore)
}
return bufbstore.NewTieredBstore(ipfsbs, ws), nil
return blockstore.WrapIDStore(ipfsbs), nil
}
}

View File

@ -0,0 +1,110 @@
package retrievalstoremgr
import (
"errors"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/node/repo/importmgr"
"github.com/ipfs/go-blockservice"
offline "github.com/ipfs/go-ipfs-exchange-offline"
ipldformat "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
)
// RetrievalStore references a store for a retrieval deal
// which may or may not have a multistore ID associated with it
type RetrievalStore interface {
StoreID() *multistore.StoreID
DAGService() ipldformat.DAGService
}
// RetrievalStoreManager manages stores for retrieval deals, abstracting
// the underlying storage mechanism
type RetrievalStoreManager interface {
NewStore() (RetrievalStore, error)
ReleaseStore(RetrievalStore) error
}
// MultiStoreRetrievalStoreManager manages stores on top of the import manager
type MultiStoreRetrievalStoreManager struct {
imgr *importmgr.Mgr
}
var _ RetrievalStoreManager = &MultiStoreRetrievalStoreManager{}
// NewMultiStoreRetrievalStoreManager returns a new multstore based RetrievalStoreManager
func NewMultiStoreRetrievalStoreManager(imgr *importmgr.Mgr) RetrievalStoreManager {
return &MultiStoreRetrievalStoreManager{
imgr: imgr,
}
}
// NewStore creates a new store (uses multistore)
func (mrsm *MultiStoreRetrievalStoreManager) NewStore() (RetrievalStore, error) {
storeID, store, err := mrsm.imgr.NewStore()
if err != nil {
return nil, err
}
return &multiStoreRetrievalStore{storeID, store}, nil
}
// ReleaseStore releases a store (uses multistore remove)
func (mrsm *MultiStoreRetrievalStoreManager) ReleaseStore(retrievalStore RetrievalStore) error {
mrs, ok := retrievalStore.(*multiStoreRetrievalStore)
if !ok {
return errors.New("Cannot release this store type")
}
return mrsm.imgr.Remove(mrs.storeID)
}
type multiStoreRetrievalStore struct {
storeID multistore.StoreID
store *multistore.Store
}
func (mrs *multiStoreRetrievalStore) StoreID() *multistore.StoreID {
return &mrs.storeID
}
func (mrs *multiStoreRetrievalStore) DAGService() ipldformat.DAGService {
return mrs.store.DAG
}
// BlockstoreRetrievalStoreManager manages a single blockstore as if it were multiple stores
type BlockstoreRetrievalStoreManager struct {
bs blockstore.Blockstore
}
var _ RetrievalStoreManager = &BlockstoreRetrievalStoreManager{}
// NewBlockstoreRetrievalStoreManager returns a new blockstore based RetrievalStoreManager
func NewBlockstoreRetrievalStoreManager(bs blockstore.Blockstore) RetrievalStoreManager {
return &BlockstoreRetrievalStoreManager{
bs: bs,
}
}
// NewStore creates a new store (just uses underlying blockstore)
func (brsm *BlockstoreRetrievalStoreManager) NewStore() (RetrievalStore, error) {
return &blockstoreRetrievalStore{
dagService: merkledag.NewDAGService(blockservice.New(brsm.bs, offline.Exchange(brsm.bs))),
}, nil
}
// ReleaseStore for this implementation does nothing
func (brsm *BlockstoreRetrievalStoreManager) ReleaseStore(RetrievalStore) error {
return nil
}
type blockstoreRetrievalStore struct {
dagService ipldformat.DAGService
}
func (brs *blockstoreRetrievalStore) StoreID() *multistore.StoreID {
return nil
}
func (brs *blockstoreRetrievalStore) DAGService() ipldformat.DAGService {
return brs.dagService
}

View File

@ -0,0 +1,135 @@
package retrievalstoremgr_test
import (
"context"
"math/rand"
"testing"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
dss "github.com/ipfs/go-datastore/sync"
blockstore "github.com/ipfs/go-ipfs-blockstore"
format "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/lotus/node/repo/importmgr"
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
)
func TestMultistoreRetrievalStoreManager(t *testing.T) {
ctx := context.Background()
ds := dss.MutexWrap(datastore.NewMapDatastore())
multiDS, err := multistore.NewMultiDstore(ds)
require.NoError(t, err)
imgr := importmgr.New(multiDS, ds)
retrievalStoreMgr := retrievalstoremgr.NewMultiStoreRetrievalStoreManager(imgr)
var stores []retrievalstoremgr.RetrievalStore
for i := 0; i < 5; i++ {
store, err := retrievalStoreMgr.NewStore()
require.NoError(t, err)
stores = append(stores, store)
nds := generateNodesOfSize(5, 100)
err = store.DAGService().AddMany(ctx, nds)
require.NoError(t, err)
}
t.Run("creates all keys", func(t *testing.T) {
qres, err := ds.Query(query.Query{KeysOnly: true})
require.NoError(t, err)
all, err := qres.Rest()
require.NoError(t, err)
require.Len(t, all, 31)
})
t.Run("loads DAG services", func(t *testing.T) {
for _, store := range stores {
mstore, err := multiDS.Get(*store.StoreID())
require.NoError(t, err)
require.Equal(t, mstore.DAG, store.DAGService())
}
})
t.Run("delete stores", func(t *testing.T) {
retrievalStoreMgr.ReleaseStore(stores[4])
storeIndexes := multiDS.List()
require.Len(t, storeIndexes, 4)
qres, err := ds.Query(query.Query{KeysOnly: true})
require.NoError(t, err)
all, err := qres.Rest()
require.NoError(t, err)
require.Len(t, all, 25)
})
}
func TestBlockstoreRetrievalStoreManager(t *testing.T) {
ctx := context.Background()
ds := dss.MutexWrap(datastore.NewMapDatastore())
bs := blockstore.NewBlockstore(ds)
retrievalStoreMgr := retrievalstoremgr.NewBlockstoreRetrievalStoreManager(bs)
var stores []retrievalstoremgr.RetrievalStore
var cids []cid.Cid
for i := 0; i < 5; i++ {
store, err := retrievalStoreMgr.NewStore()
require.NoError(t, err)
stores = append(stores, store)
nds := generateNodesOfSize(5, 100)
err = store.DAGService().AddMany(ctx, nds)
require.NoError(t, err)
for _, nd := range nds {
cids = append(cids, nd.Cid())
}
}
t.Run("creates all keys", func(t *testing.T) {
qres, err := ds.Query(query.Query{KeysOnly: true})
require.NoError(t, err)
all, err := qres.Rest()
require.NoError(t, err)
require.Len(t, all, 25)
})
t.Run("loads DAG services, all DAG has all nodes", func(t *testing.T) {
for _, store := range stores {
dagService := store.DAGService()
for _, cid := range cids {
_, err := dagService.Get(ctx, cid)
require.NoError(t, err)
}
}
})
t.Run("release store has no effect", func(t *testing.T) {
retrievalStoreMgr.ReleaseStore(stores[4])
qres, err := ds.Query(query.Query{KeysOnly: true})
require.NoError(t, err)
all, err := qres.Rest()
require.NoError(t, err)
require.Len(t, all, 25)
})
}
var seedSeq int64 = 0
func randomBytes(n int64) []byte {
randBytes := make([]byte, n)
r := rand.New(rand.NewSource(seedSeq))
_, _ = r.Read(randBytes)
seedSeq++
return randBytes
}
func generateNodesOfSize(n int, size int64) []format.Node {
generatedNodes := make([]format.Node, 0, n)
for i := 0; i < n; i++ {
b := dag.NewRawNode(randomBytes(size))
generatedNodes = append(generatedNodes, b)
}
return generatedNodes
}