feat(markets): update markets mulitple deal stores

This commit is contained in:
hannahhoward 2020-07-27 23:13:10 -07:00
parent c0f0e2ba45
commit 5a623cccb5
12 changed files with 117 additions and 61 deletions

View File

@ -9,6 +9,7 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
@ -201,7 +202,7 @@ type FullNode interface {
// ClientImport imports file under the specified path into filestore.
ClientImport(ctx context.Context, ref FileRef) (*ImportRes, error)
// ClientRemoveImport removes file import
ClientRemoveImport(ctx context.Context, importID int) error
ClientRemoveImport(ctx context.Context, importID multistore.StoreID) error
// ClientStartDeal proposes a deal with a miner.
ClientStartDeal(ctx context.Context, params *StartDealParams) (*cid.Cid, error)
// ClientGetDealInfo returns the latest information about a given deal.
@ -404,11 +405,11 @@ type SectorLocation struct {
type ImportRes struct {
Root cid.Cid
ImportID int
ImportID multistore.StoreID
}
type Import struct {
Key int
Key multistore.StoreID
Err string
Root *cid.Cid

View File

@ -12,6 +12,7 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/sector-storage/fsutil"
"github.com/filecoin-project/sector-storage/sealtasks"
"github.com/filecoin-project/sector-storage/stores"
@ -115,7 +116,7 @@ type FullNodeStruct struct {
ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"`
ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"`
ClientRemoveImport func(ctx context.Context, importID int) error `perm:"admin"`
ClientRemoveImport func(ctx context.Context, importID multistore.StoreID) error `perm:"admin"`
ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"`
ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"`
ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"`
@ -360,7 +361,7 @@ func (c *FullNodeStruct) ClientListImports(ctx context.Context) ([]api.Import, e
return c.Internal.ClientListImports(ctx)
}
func (c *FullNodeStruct) ClientRemoveImport(ctx context.Context, importID int) error {
func (c *FullNodeStruct) ClientRemoveImport(ctx context.Context, importID multistore.StoreID) error {
return c.Internal.ClientRemoveImport(ctx, importID)
}

View File

@ -225,7 +225,10 @@ func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client
t.Fatal(err)
}
deal, err := client.ClientStartDeal(ctx, &api.StartDealParams{
Data: &storagemarket.DataRef{Root: fcid},
Data: &storagemarket.DataRef{
TransferType: storagemarket.TTGraphsync,
Root: fcid,
},
Wallet: addr,
Miner: maddr,
EpochPrice: types.NewInt(1000000),

View File

@ -18,6 +18,7 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
@ -142,14 +143,14 @@ var clientDropCmd = &cli.Command{
defer closer()
ctx := ReqContext(cctx)
var ids []int
var ids []multistore.StoreID
for i, s := range cctx.Args().Slice() {
id, err := strconv.ParseInt(s, 10, 0)
if err != nil {
return xerrors.Errorf("parsing %d-th import ID: %w", i, err)
}
ids = append(ids, int(id))
ids = append(ids, multistore.StoreID(id))
}
for _, id := range ids {

8
go.mod
View File

@ -21,11 +21,11 @@ require (
github.com/filecoin-project/go-bitfield v0.1.0
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v0.5.0
github.com/filecoin-project/go-data-transfer v0.5.1
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f
github.com/filecoin-project/go-fil-markets v0.5.1
github.com/filecoin-project/go-fil-markets v0.5.2-0.20200728055441-cf6b0a74e50c
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24
github.com/filecoin-project/go-multistore v0.0.1
github.com/filecoin-project/go-multistore v0.0.2
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261
github.com/filecoin-project/go-statestore v0.1.0
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
@ -53,7 +53,7 @@ require (
github.com/ipfs/go-ds-measure v0.1.0
github.com/ipfs/go-filestore v1.0.0
github.com/ipfs/go-fs-lock v0.0.1
github.com/ipfs/go-graphsync v0.0.6-0.20200715204712-ef06b3d32e83
github.com/ipfs/go-graphsync v0.0.6-0.20200721211002-c376cbe14c0a
github.com/ipfs/go-ipfs-blockstore v1.0.0
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-ds-help v1.0.0

17
go.sum
View File

@ -239,17 +239,17 @@ github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:a
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-transfer v0.5.0 h1:pvWlab69BD5dwheRHjjBjFB6m7CEqEZeI+aChtVqKVk=
github.com/filecoin-project/go-data-transfer v0.5.0/go.mod h1:7yckbsPPMGuN3O1+SYNE/lowwheaUn5woGILpjN52UI=
github.com/filecoin-project/go-data-transfer v0.5.1 h1:tDPmVVSgkit3cEG+9TFr6nwhKHdUipt9f0dEZSmvjbg=
github.com/filecoin-project/go-data-transfer v0.5.1/go.mod h1:PRs78hp9u8T4G2Jce5NOkHB1bAqecPkvSLsMPuJckGU=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5/go.mod h1:JbkIgFF/Z9BDlvrJO1FuKkaWsH673/UdFaiVS6uIHlA=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1:GxJzR3oRIMTPtpZ0b7QF8FKPK6/iPAc7trhlL5k/g+s=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-markets v0.5.1 h1:Y69glslNCuXnygfesCmyilTVhEEjcLK7CtAohKP9SL8=
github.com/filecoin-project/go-fil-markets v0.5.1/go.mod h1:GKGigsFNMvKmx/+Mcn7093TdZTiCDLc7YGxQ7d6fq2s=
github.com/filecoin-project/go-fil-markets v0.5.2-0.20200728055441-cf6b0a74e50c h1:cIoO/T2DxLDpPjIp1cvtreTlq8YBzIbLPk/MpVOrkrw=
github.com/filecoin-project/go-fil-markets v0.5.2-0.20200728055441-cf6b0a74e50c/go.mod h1:tBjxNhgi3djW9RZacV3D/HhADZe+tgvfCeMyWWEgVlI=
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24 h1:Jc7vkplmZYVuaEcSXGHDwefvZIdoyyaoGDLqSr8Svms=
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24/go.mod h1:j6zV//WXIIY5kky873Q3iIKt/ViOE8rcijovmpxrXzM=
github.com/filecoin-project/go-multistore v0.0.1 h1:wXCd02azCxEcMNlDE9lksraQO+iIjFGNw01IZyf8GPA=
github.com/filecoin-project/go-multistore v0.0.1/go.mod h1:z8NeSPWubEvrzi0XolhZ1NjTeW9ZDR779M+EDhf4QIQ=
github.com/filecoin-project/go-multistore v0.0.2 h1:JZEddnXXt3mMzHi7bi9IH7Yi1NpGLy19J5Lk/xbxBMs=
github.com/filecoin-project/go-multistore v0.0.2/go.mod h1:edte5g7DHqJasFNOvdm9ZS6CjdfFTPoQ6xeKs1eOBIA=
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs=
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6/go.mod h1:0HgYnrkeSU4lu1p+LEOeDpFsNBssa0OGGriWdA4hvaE=
github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
@ -528,8 +528,8 @@ github.com/ipfs/go-filestore v1.0.0 h1:QR7ekKH+q2AGiWDc7W2Q0qHuYSRZGUJqUn0GsegEP
github.com/ipfs/go-filestore v1.0.0/go.mod h1:/XOCuNtIe2f1YPbiXdYvD0BKLA0JR1MgPiFOdcuu9SM=
github.com/ipfs/go-fs-lock v0.0.1 h1:XHX8uW4jQBYWHj59XXcjg7BHlHxV9ZOYs6Y43yb7/l0=
github.com/ipfs/go-fs-lock v0.0.1/go.mod h1:DNBekbboPKcxs1aukPSaOtFA3QfSdi5C855v0i9XJ8Y=
github.com/ipfs/go-graphsync v0.0.6-0.20200715204712-ef06b3d32e83 h1:tkGDAwcZfzDFeBNyBWYOM02Qw0rGpA2UuCvq49T3K5o=
github.com/ipfs/go-graphsync v0.0.6-0.20200715204712-ef06b3d32e83/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE=
github.com/ipfs/go-graphsync v0.0.6-0.20200721211002-c376cbe14c0a h1:QViYKbSYNKtfivrYx69UFJiH7HfdE5APQBbIu5fCK3k=
github.com/ipfs/go-graphsync v0.0.6-0.20200721211002-c376cbe14c0a/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE=
github.com/ipfs/go-hamt-ipld v0.0.15-0.20200131012125-dd88a59d3f2e/go.mod h1:9aQJu/i/TaRDW6jqB5U217dLIDopn50wxLdHXM2CTfE=
github.com/ipfs/go-hamt-ipld v0.0.15-0.20200204200533-99b8553ef242/go.mod h1:kq3Pi+UP3oHhAdKexE+kHHYRKMoFNuGero0R7q3hWGg=
github.com/ipfs/go-hamt-ipld v0.1.1 h1:0IQdvwnAAUKmDE+PMJa5y1QiwOPHpI9+eAbQEEEYthk=
@ -634,7 +634,6 @@ github.com/ipfs/iptb v1.4.0 h1:YFYTrCkLMRwk/35IMyC6+yjoQSHTEcNcefBStLJzgvo=
github.com/ipfs/iptb v1.4.0/go.mod h1:1rzHpCYtNp87/+hTxG5TfCVn/yMY3dKnLn8tBiMfdmg=
github.com/ipfs/iptb-plugins v0.2.1 h1:au4HWn9/pRPbkxA08pDx2oRAs4cnbgQWgV0teYXuuGA=
github.com/ipfs/iptb-plugins v0.2.1/go.mod h1:QXMbtIWZ+jRsW8a4h13qAKU7jcM7qaittO8wOsTP0Rs=
github.com/ipld/go-car v0.1.1-0.20200429200904-c222d793c339/go.mod h1:eajxljm6I8o3LitnFeVEmucwZmz7+yLSiKce9yYMefg=
github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae h1:OV9dxl8iPMCOD8Vi/hvFwRh3JWPXqmkYSVxWr9JnEzM=
github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae/go.mod h1:2mvxpu4dKRnuH3mj5u6KW/tmRSCcXvy/KYiJ4nC6h4c=
github.com/ipld/go-ipld-prime v0.0.2-0.20200428162820-8b59dc292b8e h1:ZISbJlM0urTANR9KRfRaqlBmyOj5uUtxs2r4Up9IXsA=

View File

@ -298,6 +298,7 @@ func Online() Option {
Override(new(*storage.Miner), modules.StorageMiner),
Override(new(dtypes.NetworkName), modules.StorageNetworkName),
Override(new(dtypes.StagingMultiDstore), modules.StagingMultiDatastore),
Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore),
Override(new(dtypes.StagingDAG), modules.StagingDAG),
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync),

View File

@ -85,6 +85,27 @@ func (a *API) imgr() *importmgr.Mgr {
}
func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) {
var storeID *multistore.StoreID
if params.Data.TransferType == storagemarket.TTGraphsync {
importIDs := a.imgr().List()
for _, importID := range importIDs {
info, err := a.imgr().Info(importID)
if err != nil {
continue
}
if info.Labels[importmgr.LRootCid] == "" {
continue
}
c, err := cid.Parse(info.Labels[importmgr.LRootCid])
if err != nil {
continue
}
if c.Equals(params.Data.Root) {
storeID = &importID
break
}
}
}
exist, err := a.WalletHas(ctx, params.Wallet)
if err != nil {
return nil, xerrors.Errorf("failed getting addr from wallet: %w", params.Wallet)
@ -133,19 +154,19 @@ func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams)
dealStart = ts.Height() + dealStartBuffer
}
result, err := a.SMDealClient.ProposeStorageDeal(
ctx,
params.Wallet,
&providerInfo,
params.Data,
dealStart,
calcDealExpiration(params.MinBlocksDuration, md, dealStart),
params.EpochPrice,
big.Zero(),
rt,
params.FastRetrieval,
params.VerifiedDeal,
)
result, err := a.SMDealClient.ProposeStorageDeal(ctx, storagemarket.ProposeStorageDealParams{
Addr: params.Wallet,
Info: &providerInfo,
Data: params.Data,
StartEpoch: dealStart,
EndEpoch: calcDealExpiration(params.MinBlocksDuration, md, dealStart),
Price: params.EpochPrice,
Collateral: big.Zero(),
Rt: rt,
FastRetrieval: params.FastRetrieval,
VerifiedDeal: params.VerifiedDeal,
StoreID: storeID,
})
if err != nil {
return nil, xerrors.Errorf("failed to start deal: %w", err)
@ -300,7 +321,7 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (*api.ImportRes
}, nil
}
func (a *API) ClientRemoveImport(ctx context.Context, importID int) error {
func (a *API) ClientRemoveImport(ctx context.Context, importID multistore.StoreID) error {
return a.imgr().Remove(importID)
}
@ -341,6 +362,9 @@ func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, erro
if err != nil {
return cid.Undef, err
}
if err := a.imgr().AddLabel(id, "root", nd.Cid().String()); err != nil {
return cid.Cid{}, err
}
return nd.Cid(), bufferedDS.Commit()
}
@ -425,6 +449,15 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
if err != nil {
return xerrors.Errorf("Error in retrieval params: %s", err)
}
storeID, store, err := a.imgr().NewStore()
if err != nil {
return xerrors.Errorf("Error setting up new store: %w", err)
}
defer func() {
_ = a.imgr().Remove(storeID)
}()
_, err = a.Retrieval.Retrieve(
ctx,
order.Root,
@ -432,7 +465,8 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
order.Total,
order.MinerPeerID,
order.Client,
order.Miner) // TODO: pass the store here somehow
order.Miner,
&storeID) // TODO: should we ignore storeID if we are using the IPFS blockstore?
if err != nil {
return xerrors.Errorf("Retrieve failed: %w", err)
}
@ -452,28 +486,27 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return nil
}
rdag := merkledag.NewDAGService(blockservice.New(a.RetBstore, offline.Exchange(a.RetBstore)))
if ref.IsCAR {
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
err = car.WriteCar(ctx, rdag, []cid.Cid{order.Root}, f)
err = car.WriteCar(ctx, store.DAG, []cid.Cid{order.Root}, f)
if err != nil {
return err
}
return f.Close()
}
nd, err := rdag.Get(ctx, order.Root)
nd, err := store.DAG.Get(ctx, order.Root)
if err != nil {
return xerrors.Errorf("ClientRetrieve: %w", err)
}
file, err := unixfile.NewUnixfsFile(ctx, rdag, nd)
file, err := unixfile.NewUnixfsFile(ctx, store.DAG, nd)
if err != nil {
return xerrors.Errorf("ClientRetrieve: %w", err)
}
return files.WriteTo(file, ref.Path)
}

View File

@ -5,7 +5,6 @@ import (
"time"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/lotus/lib/bufbstore"
"golang.org/x/xerrors"
blockstore "github.com/ipfs/go-ipfs-blockstore"
@ -62,12 +61,8 @@ func ClientImportMgr(mds dtypes.ClientMultiDstore, ds dtypes.MetadataDS) dtypes.
}
func ClientBlockstore(imgr dtypes.ClientImportMgr) dtypes.ClientBlockstore {
// TODO: This isn't.. the best
// - If it's easy to pass per-retrieval blockstores with markets we don't need this
// - If it's not easy, we need to store this in a separate datastore on disk
defaultWrite := blockstore.NewBlockstore(datastore.NewMapDatastore())
return blockstore.NewIdStore(bufbstore.NewTieredBstore(imgr.Blockstore, defaultWrite))
// in most cases this is now unused in normal operations -- however, it's important to preserve for the IPFS use case
return blockstore.NewBlockstore(datastore.NewMapDatastore())
}
// RegisterClientValidator is an initialization hook that registers the client
@ -117,9 +112,9 @@ func NewClientRequestValidator(deals dtypes.ClientDealStore) dtypes.ClientReques
return requestvalidation.NewUnifiedRequestValidator(nil, deals)
}
func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode) (storagemarket.StorageClient, error) {
func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, mds dtypes.ClientMultiDstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode) (storagemarket.StorageClient, error) {
net := smnet.NewFromLibp2pHost(h)
c, err := storageimpl.NewClient(net, ibs, dataTransfer, discovery, deals, scn, storageimpl.DealPollingInterval(time.Second))
c, err := storageimpl.NewClient(net, ibs, mds, dataTransfer, discovery, deals, scn, storageimpl.DealPollingInterval(time.Second))
if err != nil {
return nil, err
}
@ -136,9 +131,9 @@ func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, r
}
// RetrievalClient creates a new retrieval client attached to the client blockstore
func RetrievalClient(h host.Host, bs dtypes.ClientBlockstore, dt dtypes.ClientDataTransfer, pmgr *paychmgr.Manager, payapi payapi.PaychAPI, resolver retrievalmarket.PeerResolver, ds dtypes.MetadataDS, chainapi full.ChainAPI) (retrievalmarket.RetrievalClient, error) {
func RetrievalClient(h host.Host, mds dtypes.ClientMultiDstore, dt dtypes.ClientDataTransfer, pmgr *paychmgr.Manager, payapi payapi.PaychAPI, resolver retrievalmarket.PeerResolver, ds dtypes.MetadataDS, chainapi full.ChainAPI) (retrievalmarket.RetrievalClient, error) {
adapter := retrievaladapter.NewRetrievalClientNode(pmgr, payapi, chainapi)
network := rmnet.NewFromLibp2pHost(h)
sc := storedcounter.New(ds, datastore.NewKey("/retr"))
return retrievalimpl.NewClient(network, bs, dt, adapter, resolver, namespace.Wrap(ds, datastore.NewKey("/retrievals/client")), sc)
return retrievalimpl.NewClient(network, mds, dt, adapter, resolver, namespace.Wrap(ds, datastore.NewKey("/retrievals/client")), sc)
}

View File

@ -50,3 +50,4 @@ type ProviderDataTransfer datatransfer.Manager
type StagingDAG format.DAGService
type StagingBlockstore blockstore.Blockstore
type StagingGraphsync graphsync.GraphExchange
type StagingMultiDstore *multistore.MultiStore

View File

@ -39,6 +39,7 @@ import (
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-multistore"
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/go-storedcounter"
@ -244,6 +245,26 @@ func NewProviderPieceStore(ds dtypes.MetadataDS) dtypes.ProviderPieceStore {
return piecestore.NewPieceStore(namespace.Wrap(ds, datastore.NewKey("/storagemarket")))
}
func StagingMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.StagingMultiDstore, error) {
ds, err := r.Datastore("/staging")
if err != nil {
return nil, xerrors.Errorf("getting datastore out of reop: %w", err)
}
mds, err := multistore.NewMultiDstore(ds)
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return mds.Close()
},
})
return mds, nil
}
// StagingBlockstore creates a blockstore for staging blocks for a miner
// in a storage deal, prior to sealing
func StagingBlockstore(r repo.LockedRepo) (dtypes.StagingBlockstore, error) {
@ -339,7 +360,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
ffiConfig *ffiwrapper.Config,
storedAsk *storedask.StoredAsk,
h host.Host, ds dtypes.MetadataDS,
ibs dtypes.StagingBlockstore,
mds dtypes.StagingMultiDstore,
r repo.LockedRepo,
pieceStore dtypes.ProviderPieceStore,
dataTransfer dtypes.ProviderDataTransfer,
@ -406,7 +427,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
return true, "", nil
})
p, err := storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), ibs, store, pieceStore, dataTransfer, spn, address.Address(minerAddress), ffiConfig.SealProofType, storedAsk, opt)
p, err := storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, mds, pieceStore, dataTransfer, spn, address.Address(minerAddress), ffiConfig.SealProofType, storedAsk, opt)
if err != nil {
return p, err
}
@ -415,7 +436,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
}
// RetrievalProvider creates a new retrieval provider attached to the provider blockstore
func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.SectorManager, full lapi.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, ibs dtypes.StagingBlockstore, dt dtypes.ProviderDataTransfer, onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc, offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc) (retrievalmarket.RetrievalProvider, error) {
func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.SectorManager, full lapi.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, mds dtypes.StagingMultiDstore, dt dtypes.ProviderDataTransfer, onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc, offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc) (retrievalmarket.RetrievalProvider, error) {
adapter := retrievaladapter.NewRetrievalProviderNode(miner, sealer, full)
maddr, err := minerAddrFromDS(ds)
@ -448,7 +469,7 @@ func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.S
return true, "", nil
})
return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, ibs, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), opt)
return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, mds, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), opt)
}
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, cfg *ffiwrapper.Config, sc sectorstorage.SealerConfig, urls sectorstorage.URLs, sa sectorstorage.StorageAuth) (*sectorstorage.Manager, error) {

View File

@ -42,7 +42,7 @@ type StoreMeta struct {
Labels map[string]string
}
func (m *Mgr) NewStore() (int, *multistore.Store, error) {
func (m *Mgr) NewStore() (multistore.StoreID, *multistore.Store, error) {
id := m.mds.Next()
st, err := m.mds.Get(id)
if err != nil {
@ -60,7 +60,7 @@ func (m *Mgr) NewStore() (int, *multistore.Store, error) {
return id, st, err
}
func (m *Mgr) AddLabel(id int, key, value string) error { // source, file path, data CID..
func (m *Mgr) AddLabel(id multistore.StoreID, key, value string) error { // source, file path, data CID..
meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id)))
if err != nil {
return xerrors.Errorf("getting metadata form datastore: %w", err)
@ -81,11 +81,11 @@ func (m *Mgr) AddLabel(id int, key, value string) error { // source, file path,
return m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta)
}
func (m *Mgr) List() []int {
func (m *Mgr) List() []multistore.StoreID {
return m.mds.List()
}
func (m *Mgr) Info(id int) (*StoreMeta, error) {
func (m *Mgr) Info(id multistore.StoreID) (*StoreMeta, error) {
meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id)))
if err != nil {
return nil, xerrors.Errorf("getting metadata form datastore: %w", err)
@ -99,7 +99,7 @@ func (m *Mgr) Info(id int) (*StoreMeta, error) {
return &sm, nil
}
func (m *Mgr) Remove(id int) error {
func (m *Mgr) Remove(id multistore.StoreID) error {
if err := m.mds.Delete(id); err != nil {
return xerrors.Errorf("removing import: %w", err)
}