Merge pull request #2753 from filecoin-project/feat/retrieval-store-manager
Retrieval Store Manager
This commit is contained in:
commit
4e9343c487
@ -406,11 +406,13 @@ func ConfigFullNode(c interface{}) Option {
|
||||
}
|
||||
|
||||
ipfsMaddr := cfg.Client.IpfsMAddr
|
||||
useForRetrieval := cfg.Client.IpfsUseForRetrieval
|
||||
return Options(
|
||||
ConfigCommon(&cfg.Common),
|
||||
If(cfg.Client.UseIpfs,
|
||||
Override(new(dtypes.ClientBlockstore), modules.IpfsClientBlockstore(ipfsMaddr, useForRetrieval)),
|
||||
Override(new(dtypes.ClientBlockstore), modules.IpfsClientBlockstore(ipfsMaddr)),
|
||||
If(cfg.Client.IpfsUseForRetrieval,
|
||||
Override(new(dtypes.ClientRetrievalStoreManager), modules.ClientBlockstoreRetrievalStoreManager),
|
||||
),
|
||||
),
|
||||
If(cfg.Metrics.HeadNotifs,
|
||||
Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)),
|
||||
@ -456,7 +458,7 @@ func Repo(r repo.Repo) Option {
|
||||
Override(new(dtypes.ClientMultiDstore), modules.ClientMultiDatastore),
|
||||
|
||||
Override(new(dtypes.ClientBlockstore), modules.ClientBlockstore),
|
||||
|
||||
Override(new(dtypes.ClientRetrievalStoreManager), modules.ClientRetrievalStoreManager),
|
||||
Override(new(ci.PrivKey), lp2p.PrivKey),
|
||||
Override(new(ci.PubKey), ci.PrivKey.GetPublic),
|
||||
Override(new(peer.ID), peer.IDFromPublicKey),
|
||||
|
@ -71,6 +71,7 @@ type API struct {
|
||||
Imports dtypes.ClientImportMgr
|
||||
|
||||
CombinedBstore dtypes.ClientBlockstore // TODO: try to remove
|
||||
RetrievalStoreMgr dtypes.ClientRetrievalStoreManager
|
||||
}
|
||||
|
||||
func calcDealExpiration(minDuration uint64, md *miner.DeadlineInfo, startEpoch abi.ChainEpoch) abi.ChainEpoch {
|
||||
@ -451,13 +452,14 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
||||
if err != nil {
|
||||
return xerrors.Errorf("Error in retrieval params: %s", err)
|
||||
}
|
||||
storeID, store, err := a.imgr().NewStore()
|
||||
|
||||
store, err := a.RetrievalStoreMgr.NewStore()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("Error setting up new store: %w", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_ = a.imgr().Remove(storeID)
|
||||
_ = a.RetrievalStoreMgr.ReleaseStore(store)
|
||||
}()
|
||||
|
||||
_, err = a.Retrieval.Retrieve(
|
||||
@ -468,7 +470,8 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
||||
order.MinerPeerID,
|
||||
order.Client,
|
||||
order.Miner,
|
||||
&storeID) // TODO: should we ignore storeID if we are using the IPFS blockstore?
|
||||
store.StoreID())
|
||||
|
||||
if err != nil {
|
||||
return xerrors.Errorf("Retrieve failed: %w", err)
|
||||
}
|
||||
@ -488,27 +491,28 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
||||
return nil
|
||||
}
|
||||
|
||||
rdag := store.DAGService()
|
||||
|
||||
if ref.IsCAR {
|
||||
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = car.WriteCar(ctx, store.DAG, []cid.Cid{order.Root}, f)
|
||||
err = car.WriteCar(ctx, rdag, []cid.Cid{order.Root}, f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return f.Close()
|
||||
}
|
||||
|
||||
nd, err := store.DAG.Get(ctx, order.Root)
|
||||
nd, err := rdag.Get(ctx, order.Root)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("ClientRetrieve: %w", err)
|
||||
}
|
||||
file, err := unixfile.NewUnixfsFile(ctx, store.DAG, nd)
|
||||
file, err := unixfile.NewUnixfsFile(ctx, rdag, nd)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("ClientRetrieve: %w", err)
|
||||
}
|
||||
|
||||
return files.WriteTo(file, ref.Path)
|
||||
}
|
||||
|
||||
|
@ -35,6 +35,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/node/repo/importmgr"
|
||||
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
|
||||
"github.com/filecoin-project/lotus/paychmgr"
|
||||
)
|
||||
|
||||
@ -156,3 +157,13 @@ func RetrievalClient(lc fx.Lifecycle, h host.Host, mds dtypes.ClientMultiDstore,
|
||||
})
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// ClientRetrievalStoreManager is the default version of the RetrievalStoreManager that runs on multistore
|
||||
func ClientRetrievalStoreManager(imgr dtypes.ClientImportMgr) dtypes.ClientRetrievalStoreManager {
|
||||
return retrievalstoremgr.NewMultiStoreRetrievalStoreManager(imgr)
|
||||
}
|
||||
|
||||
// ClientBlockstoreRetrievalStoreManager is the default version of the RetrievalStoreManager that runs on multistore
|
||||
func ClientBlockstoreRetrievalStoreManager(bs dtypes.ClientBlockstore) dtypes.ClientRetrievalStoreManager {
|
||||
return retrievalstoremgr.NewBlockstoreRetrievalStoreManager(bs)
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
|
||||
"github.com/filecoin-project/lotus/lib/blockstore"
|
||||
"github.com/filecoin-project/lotus/node/repo/importmgr"
|
||||
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
|
||||
)
|
||||
|
||||
// MetadataDS stores metadata
|
||||
@ -35,6 +36,7 @@ type ClientBlockstore blockstore.Blockstore
|
||||
type ClientDealStore *statestore.StateStore
|
||||
type ClientRequestValidator *requestvalidation.UnifiedRequestValidator
|
||||
type ClientDatastore datastore.Batching
|
||||
type ClientRetrievalStoreManager retrievalstoremgr.RetrievalStoreManager
|
||||
|
||||
type Graphsync graphsync.GraphExchange
|
||||
|
||||
|
@ -7,7 +7,6 @@ import (
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
|
||||
"github.com/filecoin-project/lotus/lib/blockstore"
|
||||
"github.com/filecoin-project/lotus/lib/bufbstore"
|
||||
"github.com/filecoin-project/lotus/lib/ipfsbstore"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||
@ -17,7 +16,7 @@ import (
|
||||
// If ipfsMaddr is empty, a local IPFS node is assumed considering IPFS_PATH configuration.
|
||||
// If ipfsMaddr is not empty, it will connect to the remote IPFS node with the provided multiaddress.
|
||||
// The flag useForRetrieval indicates if the IPFS node will also be used for storing retrieving deals.
|
||||
func IpfsClientBlockstore(ipfsMaddr string, useForRetrieval bool) func(helpers.MetricsCtx, fx.Lifecycle, dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) {
|
||||
func IpfsClientBlockstore(ipfsMaddr string) func(helpers.MetricsCtx, fx.Lifecycle, dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) {
|
||||
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, localStore dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) {
|
||||
var err error
|
||||
var ipfsbs blockstore.Blockstore
|
||||
@ -34,12 +33,6 @@ func IpfsClientBlockstore(ipfsMaddr string, useForRetrieval bool) func(helpers.M
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("constructing ipfs blockstore: %w", err)
|
||||
}
|
||||
ipfsbs = blockstore.WrapIDStore(ipfsbs)
|
||||
var ws blockstore.Blockstore
|
||||
ws = ipfsbs
|
||||
if !useForRetrieval {
|
||||
ws = blockstore.WrapIDStore(localStore.Blockstore)
|
||||
}
|
||||
return bufbstore.NewTieredBstore(ipfsbs, ws), nil
|
||||
return blockstore.WrapIDStore(ipfsbs), nil
|
||||
}
|
||||
}
|
||||
|
110
node/repo/retrievalstoremgr/retrievalstoremgr.go
Normal file
110
node/repo/retrievalstoremgr/retrievalstoremgr.go
Normal file
@ -0,0 +1,110 @@
|
||||
package retrievalstoremgr
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/filecoin-project/go-multistore"
|
||||
"github.com/filecoin-project/lotus/lib/blockstore"
|
||||
"github.com/filecoin-project/lotus/node/repo/importmgr"
|
||||
"github.com/ipfs/go-blockservice"
|
||||
offline "github.com/ipfs/go-ipfs-exchange-offline"
|
||||
ipldformat "github.com/ipfs/go-ipld-format"
|
||||
"github.com/ipfs/go-merkledag"
|
||||
)
|
||||
|
||||
// RetrievalStore references a store for a retrieval deal
|
||||
// which may or may not have a multistore ID associated with it
|
||||
type RetrievalStore interface {
|
||||
StoreID() *multistore.StoreID
|
||||
DAGService() ipldformat.DAGService
|
||||
}
|
||||
|
||||
// RetrievalStoreManager manages stores for retrieval deals, abstracting
|
||||
// the underlying storage mechanism
|
||||
type RetrievalStoreManager interface {
|
||||
NewStore() (RetrievalStore, error)
|
||||
ReleaseStore(RetrievalStore) error
|
||||
}
|
||||
|
||||
// MultiStoreRetrievalStoreManager manages stores on top of the import manager
|
||||
type MultiStoreRetrievalStoreManager struct {
|
||||
imgr *importmgr.Mgr
|
||||
}
|
||||
|
||||
var _ RetrievalStoreManager = &MultiStoreRetrievalStoreManager{}
|
||||
|
||||
// NewMultiStoreRetrievalStoreManager returns a new multstore based RetrievalStoreManager
|
||||
func NewMultiStoreRetrievalStoreManager(imgr *importmgr.Mgr) RetrievalStoreManager {
|
||||
return &MultiStoreRetrievalStoreManager{
|
||||
imgr: imgr,
|
||||
}
|
||||
}
|
||||
|
||||
// NewStore creates a new store (uses multistore)
|
||||
func (mrsm *MultiStoreRetrievalStoreManager) NewStore() (RetrievalStore, error) {
|
||||
storeID, store, err := mrsm.imgr.NewStore()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &multiStoreRetrievalStore{storeID, store}, nil
|
||||
}
|
||||
|
||||
// ReleaseStore releases a store (uses multistore remove)
|
||||
func (mrsm *MultiStoreRetrievalStoreManager) ReleaseStore(retrievalStore RetrievalStore) error {
|
||||
mrs, ok := retrievalStore.(*multiStoreRetrievalStore)
|
||||
if !ok {
|
||||
return errors.New("Cannot release this store type")
|
||||
}
|
||||
return mrsm.imgr.Remove(mrs.storeID)
|
||||
}
|
||||
|
||||
type multiStoreRetrievalStore struct {
|
||||
storeID multistore.StoreID
|
||||
store *multistore.Store
|
||||
}
|
||||
|
||||
func (mrs *multiStoreRetrievalStore) StoreID() *multistore.StoreID {
|
||||
return &mrs.storeID
|
||||
}
|
||||
|
||||
func (mrs *multiStoreRetrievalStore) DAGService() ipldformat.DAGService {
|
||||
return mrs.store.DAG
|
||||
}
|
||||
|
||||
// BlockstoreRetrievalStoreManager manages a single blockstore as if it were multiple stores
|
||||
type BlockstoreRetrievalStoreManager struct {
|
||||
bs blockstore.Blockstore
|
||||
}
|
||||
|
||||
var _ RetrievalStoreManager = &BlockstoreRetrievalStoreManager{}
|
||||
|
||||
// NewBlockstoreRetrievalStoreManager returns a new blockstore based RetrievalStoreManager
|
||||
func NewBlockstoreRetrievalStoreManager(bs blockstore.Blockstore) RetrievalStoreManager {
|
||||
return &BlockstoreRetrievalStoreManager{
|
||||
bs: bs,
|
||||
}
|
||||
}
|
||||
|
||||
// NewStore creates a new store (just uses underlying blockstore)
|
||||
func (brsm *BlockstoreRetrievalStoreManager) NewStore() (RetrievalStore, error) {
|
||||
return &blockstoreRetrievalStore{
|
||||
dagService: merkledag.NewDAGService(blockservice.New(brsm.bs, offline.Exchange(brsm.bs))),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ReleaseStore for this implementation does nothing
|
||||
func (brsm *BlockstoreRetrievalStoreManager) ReleaseStore(RetrievalStore) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type blockstoreRetrievalStore struct {
|
||||
dagService ipldformat.DAGService
|
||||
}
|
||||
|
||||
func (brs *blockstoreRetrievalStore) StoreID() *multistore.StoreID {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (brs *blockstoreRetrievalStore) DAGService() ipldformat.DAGService {
|
||||
return brs.dagService
|
||||
}
|
137
node/repo/retrievalstoremgr/retrievalstoremgr_test.go
Normal file
137
node/repo/retrievalstoremgr/retrievalstoremgr_test.go
Normal file
@ -0,0 +1,137 @@
|
||||
package retrievalstoremgr_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/query"
|
||||
dss "github.com/ipfs/go-datastore/sync"
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
format "github.com/ipfs/go-ipld-format"
|
||||
dag "github.com/ipfs/go-merkledag"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-multistore"
|
||||
|
||||
"github.com/filecoin-project/lotus/node/repo/importmgr"
|
||||
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
|
||||
)
|
||||
|
||||
func TestMultistoreRetrievalStoreManager(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ds := dss.MutexWrap(datastore.NewMapDatastore())
|
||||
multiDS, err := multistore.NewMultiDstore(ds)
|
||||
require.NoError(t, err)
|
||||
imgr := importmgr.New(multiDS, ds)
|
||||
retrievalStoreMgr := retrievalstoremgr.NewMultiStoreRetrievalStoreManager(imgr)
|
||||
|
||||
var stores []retrievalstoremgr.RetrievalStore
|
||||
for i := 0; i < 5; i++ {
|
||||
store, err := retrievalStoreMgr.NewStore()
|
||||
require.NoError(t, err)
|
||||
stores = append(stores, store)
|
||||
nds := generateNodesOfSize(5, 100)
|
||||
err = store.DAGService().AddMany(ctx, nds)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
t.Run("creates all keys", func(t *testing.T) {
|
||||
qres, err := ds.Query(query.Query{KeysOnly: true})
|
||||
require.NoError(t, err)
|
||||
all, err := qres.Rest()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, all, 31)
|
||||
})
|
||||
|
||||
t.Run("loads DAG services", func(t *testing.T) {
|
||||
for _, store := range stores {
|
||||
mstore, err := multiDS.Get(*store.StoreID())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, mstore.DAG, store.DAGService())
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("delete stores", func(t *testing.T) {
|
||||
err := retrievalStoreMgr.ReleaseStore(stores[4])
|
||||
require.NoError(t, err)
|
||||
storeIndexes := multiDS.List()
|
||||
require.Len(t, storeIndexes, 4)
|
||||
|
||||
qres, err := ds.Query(query.Query{KeysOnly: true})
|
||||
require.NoError(t, err)
|
||||
all, err := qres.Rest()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, all, 25)
|
||||
})
|
||||
}
|
||||
|
||||
func TestBlockstoreRetrievalStoreManager(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ds := dss.MutexWrap(datastore.NewMapDatastore())
|
||||
bs := blockstore.NewBlockstore(ds)
|
||||
retrievalStoreMgr := retrievalstoremgr.NewBlockstoreRetrievalStoreManager(bs)
|
||||
var stores []retrievalstoremgr.RetrievalStore
|
||||
var cids []cid.Cid
|
||||
for i := 0; i < 5; i++ {
|
||||
store, err := retrievalStoreMgr.NewStore()
|
||||
require.NoError(t, err)
|
||||
stores = append(stores, store)
|
||||
nds := generateNodesOfSize(5, 100)
|
||||
err = store.DAGService().AddMany(ctx, nds)
|
||||
require.NoError(t, err)
|
||||
for _, nd := range nds {
|
||||
cids = append(cids, nd.Cid())
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("creates all keys", func(t *testing.T) {
|
||||
qres, err := ds.Query(query.Query{KeysOnly: true})
|
||||
require.NoError(t, err)
|
||||
all, err := qres.Rest()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, all, 25)
|
||||
})
|
||||
|
||||
t.Run("loads DAG services, all DAG has all nodes", func(t *testing.T) {
|
||||
for _, store := range stores {
|
||||
dagService := store.DAGService()
|
||||
for _, cid := range cids {
|
||||
_, err := dagService.Get(ctx, cid)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("release store has no effect", func(t *testing.T) {
|
||||
err := retrievalStoreMgr.ReleaseStore(stores[4])
|
||||
require.NoError(t, err)
|
||||
qres, err := ds.Query(query.Query{KeysOnly: true})
|
||||
require.NoError(t, err)
|
||||
all, err := qres.Rest()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, all, 25)
|
||||
})
|
||||
}
|
||||
|
||||
var seedSeq int64 = 0
|
||||
|
||||
func randomBytes(n int64) []byte {
|
||||
randBytes := make([]byte, n)
|
||||
r := rand.New(rand.NewSource(seedSeq))
|
||||
_, _ = r.Read(randBytes)
|
||||
seedSeq++
|
||||
return randBytes
|
||||
}
|
||||
|
||||
func generateNodesOfSize(n int, size int64) []format.Node {
|
||||
generatedNodes := make([]format.Node, 0, n)
|
||||
for i := 0; i < n; i++ {
|
||||
b := dag.NewRawNode(randomBytes(size))
|
||||
generatedNodes = append(generatedNodes, b)
|
||||
|
||||
}
|
||||
return generatedNodes
|
||||
}
|
Loading…
Reference in New Issue
Block a user