From 5a623cccb5085ad1bce1431cc169580c741d9065 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 27 Jul 2020 23:13:10 -0700 Subject: [PATCH] feat(markets): update markets mulitple deal stores --- api/api_full.go | 7 ++-- api/apistruct/struct.go | 5 ++- api/test/deals.go | 5 ++- cli/client.go | 5 ++- go.mod | 8 ++-- go.sum | 17 ++++---- node/builder.go | 1 + node/impl/client/client.go | 73 ++++++++++++++++++++++++---------- node/modules/client.go | 17 +++----- node/modules/dtypes/storage.go | 1 + node/modules/storageminer.go | 29 ++++++++++++-- node/repo/importmgr/mgr.go | 10 ++--- 12 files changed, 117 insertions(+), 61 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index 134427b67..2b14c62d8 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -9,6 +9,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin/market" @@ -201,7 +202,7 @@ type FullNode interface { // ClientImport imports file under the specified path into filestore. ClientImport(ctx context.Context, ref FileRef) (*ImportRes, error) // ClientRemoveImport removes file import - ClientRemoveImport(ctx context.Context, importID int) error + ClientRemoveImport(ctx context.Context, importID multistore.StoreID) error // ClientStartDeal proposes a deal with a miner. ClientStartDeal(ctx context.Context, params *StartDealParams) (*cid.Cid, error) // ClientGetDealInfo returns the latest information about a given deal. @@ -404,11 +405,11 @@ type SectorLocation struct { type ImportRes struct { Root cid.Cid - ImportID int + ImportID multistore.StoreID } type Import struct { - Key int + Key multistore.StoreID Err string Root *cid.Cid diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 77b500cc9..ad5982a52 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -12,6 +12,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-jsonrpc/auth" + "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/sector-storage/fsutil" "github.com/filecoin-project/sector-storage/sealtasks" "github.com/filecoin-project/sector-storage/stores" @@ -115,7 +116,7 @@ type FullNodeStruct struct { ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"` ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"` - ClientRemoveImport func(ctx context.Context, importID int) error `perm:"admin"` + ClientRemoveImport func(ctx context.Context, importID multistore.StoreID) error `perm:"admin"` ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"` ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"` @@ -360,7 +361,7 @@ func (c *FullNodeStruct) ClientListImports(ctx context.Context) ([]api.Import, e return c.Internal.ClientListImports(ctx) } -func (c *FullNodeStruct) ClientRemoveImport(ctx context.Context, importID int) error { +func (c *FullNodeStruct) ClientRemoveImport(ctx context.Context, importID multistore.StoreID) error { return c.Internal.ClientRemoveImport(ctx, importID) } diff --git a/api/test/deals.go b/api/test/deals.go index 7fbb1a2b8..fb102a143 100644 --- a/api/test/deals.go +++ b/api/test/deals.go @@ -225,7 +225,10 @@ func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client t.Fatal(err) } deal, err := client.ClientStartDeal(ctx, &api.StartDealParams{ - Data: &storagemarket.DataRef{Root: fcid}, + Data: &storagemarket.DataRef{ + TransferType: storagemarket.TTGraphsync, + Root: fcid, + }, Wallet: addr, Miner: maddr, EpochPrice: types.NewInt(1000000), diff --git a/cli/client.go b/cli/client.go index 693b2b83e..63658c27d 100644 --- a/cli/client.go +++ b/cli/client.go @@ -18,6 +18,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin/market" @@ -142,14 +143,14 @@ var clientDropCmd = &cli.Command{ defer closer() ctx := ReqContext(cctx) - var ids []int + var ids []multistore.StoreID for i, s := range cctx.Args().Slice() { id, err := strconv.ParseInt(s, 10, 0) if err != nil { return xerrors.Errorf("parsing %d-th import ID: %w", i, err) } - ids = append(ids, int(id)) + ids = append(ids, multistore.StoreID(id)) } for _, id := range ids { diff --git a/go.mod b/go.mod index 1c3bfd7eb..129abd241 100644 --- a/go.mod +++ b/go.mod @@ -21,11 +21,11 @@ require ( github.com/filecoin-project/go-bitfield v0.1.0 github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 - github.com/filecoin-project/go-data-transfer v0.5.0 + github.com/filecoin-project/go-data-transfer v0.5.1 github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f - github.com/filecoin-project/go-fil-markets v0.5.1 + github.com/filecoin-project/go-fil-markets v0.5.2-0.20200728055441-cf6b0a74e50c github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24 - github.com/filecoin-project/go-multistore v0.0.1 + github.com/filecoin-project/go-multistore v0.0.2 github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261 github.com/filecoin-project/go-statestore v0.1.0 github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b @@ -53,7 +53,7 @@ require ( github.com/ipfs/go-ds-measure v0.1.0 github.com/ipfs/go-filestore v1.0.0 github.com/ipfs/go-fs-lock v0.0.1 - github.com/ipfs/go-graphsync v0.0.6-0.20200715204712-ef06b3d32e83 + github.com/ipfs/go-graphsync v0.0.6-0.20200721211002-c376cbe14c0a github.com/ipfs/go-ipfs-blockstore v1.0.0 github.com/ipfs/go-ipfs-chunker v0.0.5 github.com/ipfs/go-ipfs-ds-help v1.0.0 diff --git a/go.sum b/go.sum index f1ef52518..f5f6c00eb 100644 --- a/go.sum +++ b/go.sum @@ -239,17 +239,17 @@ github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:a github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= -github.com/filecoin-project/go-data-transfer v0.5.0 h1:pvWlab69BD5dwheRHjjBjFB6m7CEqEZeI+aChtVqKVk= -github.com/filecoin-project/go-data-transfer v0.5.0/go.mod h1:7yckbsPPMGuN3O1+SYNE/lowwheaUn5woGILpjN52UI= +github.com/filecoin-project/go-data-transfer v0.5.1 h1:tDPmVVSgkit3cEG+9TFr6nwhKHdUipt9f0dEZSmvjbg= +github.com/filecoin-project/go-data-transfer v0.5.1/go.mod h1:PRs78hp9u8T4G2Jce5NOkHB1bAqecPkvSLsMPuJckGU= github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5/go.mod h1:JbkIgFF/Z9BDlvrJO1FuKkaWsH673/UdFaiVS6uIHlA= github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1:GxJzR3oRIMTPtpZ0b7QF8FKPK6/iPAc7trhlL5k/g+s= github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= -github.com/filecoin-project/go-fil-markets v0.5.1 h1:Y69glslNCuXnygfesCmyilTVhEEjcLK7CtAohKP9SL8= -github.com/filecoin-project/go-fil-markets v0.5.1/go.mod h1:GKGigsFNMvKmx/+Mcn7093TdZTiCDLc7YGxQ7d6fq2s= +github.com/filecoin-project/go-fil-markets v0.5.2-0.20200728055441-cf6b0a74e50c h1:cIoO/T2DxLDpPjIp1cvtreTlq8YBzIbLPk/MpVOrkrw= +github.com/filecoin-project/go-fil-markets v0.5.2-0.20200728055441-cf6b0a74e50c/go.mod h1:tBjxNhgi3djW9RZacV3D/HhADZe+tgvfCeMyWWEgVlI= github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24 h1:Jc7vkplmZYVuaEcSXGHDwefvZIdoyyaoGDLqSr8Svms= github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24/go.mod h1:j6zV//WXIIY5kky873Q3iIKt/ViOE8rcijovmpxrXzM= -github.com/filecoin-project/go-multistore v0.0.1 h1:wXCd02azCxEcMNlDE9lksraQO+iIjFGNw01IZyf8GPA= -github.com/filecoin-project/go-multistore v0.0.1/go.mod h1:z8NeSPWubEvrzi0XolhZ1NjTeW9ZDR779M+EDhf4QIQ= +github.com/filecoin-project/go-multistore v0.0.2 h1:JZEddnXXt3mMzHi7bi9IH7Yi1NpGLy19J5Lk/xbxBMs= +github.com/filecoin-project/go-multistore v0.0.2/go.mod h1:edte5g7DHqJasFNOvdm9ZS6CjdfFTPoQ6xeKs1eOBIA= github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs= github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6/go.mod h1:0HgYnrkeSU4lu1p+LEOeDpFsNBssa0OGGriWdA4hvaE= github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc= @@ -528,8 +528,8 @@ github.com/ipfs/go-filestore v1.0.0 h1:QR7ekKH+q2AGiWDc7W2Q0qHuYSRZGUJqUn0GsegEP github.com/ipfs/go-filestore v1.0.0/go.mod h1:/XOCuNtIe2f1YPbiXdYvD0BKLA0JR1MgPiFOdcuu9SM= github.com/ipfs/go-fs-lock v0.0.1 h1:XHX8uW4jQBYWHj59XXcjg7BHlHxV9ZOYs6Y43yb7/l0= github.com/ipfs/go-fs-lock v0.0.1/go.mod h1:DNBekbboPKcxs1aukPSaOtFA3QfSdi5C855v0i9XJ8Y= -github.com/ipfs/go-graphsync v0.0.6-0.20200715204712-ef06b3d32e83 h1:tkGDAwcZfzDFeBNyBWYOM02Qw0rGpA2UuCvq49T3K5o= -github.com/ipfs/go-graphsync v0.0.6-0.20200715204712-ef06b3d32e83/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE= +github.com/ipfs/go-graphsync v0.0.6-0.20200721211002-c376cbe14c0a h1:QViYKbSYNKtfivrYx69UFJiH7HfdE5APQBbIu5fCK3k= +github.com/ipfs/go-graphsync v0.0.6-0.20200721211002-c376cbe14c0a/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE= github.com/ipfs/go-hamt-ipld v0.0.15-0.20200131012125-dd88a59d3f2e/go.mod h1:9aQJu/i/TaRDW6jqB5U217dLIDopn50wxLdHXM2CTfE= github.com/ipfs/go-hamt-ipld v0.0.15-0.20200204200533-99b8553ef242/go.mod h1:kq3Pi+UP3oHhAdKexE+kHHYRKMoFNuGero0R7q3hWGg= github.com/ipfs/go-hamt-ipld v0.1.1 h1:0IQdvwnAAUKmDE+PMJa5y1QiwOPHpI9+eAbQEEEYthk= @@ -634,7 +634,6 @@ github.com/ipfs/iptb v1.4.0 h1:YFYTrCkLMRwk/35IMyC6+yjoQSHTEcNcefBStLJzgvo= github.com/ipfs/iptb v1.4.0/go.mod h1:1rzHpCYtNp87/+hTxG5TfCVn/yMY3dKnLn8tBiMfdmg= github.com/ipfs/iptb-plugins v0.2.1 h1:au4HWn9/pRPbkxA08pDx2oRAs4cnbgQWgV0teYXuuGA= github.com/ipfs/iptb-plugins v0.2.1/go.mod h1:QXMbtIWZ+jRsW8a4h13qAKU7jcM7qaittO8wOsTP0Rs= -github.com/ipld/go-car v0.1.1-0.20200429200904-c222d793c339/go.mod h1:eajxljm6I8o3LitnFeVEmucwZmz7+yLSiKce9yYMefg= github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae h1:OV9dxl8iPMCOD8Vi/hvFwRh3JWPXqmkYSVxWr9JnEzM= github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae/go.mod h1:2mvxpu4dKRnuH3mj5u6KW/tmRSCcXvy/KYiJ4nC6h4c= github.com/ipld/go-ipld-prime v0.0.2-0.20200428162820-8b59dc292b8e h1:ZISbJlM0urTANR9KRfRaqlBmyOj5uUtxs2r4Up9IXsA= diff --git a/node/builder.go b/node/builder.go index 46fd260d5..16be84d2c 100644 --- a/node/builder.go +++ b/node/builder.go @@ -298,6 +298,7 @@ func Online() Option { Override(new(*storage.Miner), modules.StorageMiner), Override(new(dtypes.NetworkName), modules.StorageNetworkName), + Override(new(dtypes.StagingMultiDstore), modules.StagingMultiDatastore), Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore), Override(new(dtypes.StagingDAG), modules.StagingDAG), Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync), diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 1e7fa8146..9c320f832 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -85,6 +85,27 @@ func (a *API) imgr() *importmgr.Mgr { } func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) { + var storeID *multistore.StoreID + if params.Data.TransferType == storagemarket.TTGraphsync { + importIDs := a.imgr().List() + for _, importID := range importIDs { + info, err := a.imgr().Info(importID) + if err != nil { + continue + } + if info.Labels[importmgr.LRootCid] == "" { + continue + } + c, err := cid.Parse(info.Labels[importmgr.LRootCid]) + if err != nil { + continue + } + if c.Equals(params.Data.Root) { + storeID = &importID + break + } + } + } exist, err := a.WalletHas(ctx, params.Wallet) if err != nil { return nil, xerrors.Errorf("failed getting addr from wallet: %w", params.Wallet) @@ -133,19 +154,19 @@ func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams) dealStart = ts.Height() + dealStartBuffer } - result, err := a.SMDealClient.ProposeStorageDeal( - ctx, - params.Wallet, - &providerInfo, - params.Data, - dealStart, - calcDealExpiration(params.MinBlocksDuration, md, dealStart), - params.EpochPrice, - big.Zero(), - rt, - params.FastRetrieval, - params.VerifiedDeal, - ) + result, err := a.SMDealClient.ProposeStorageDeal(ctx, storagemarket.ProposeStorageDealParams{ + Addr: params.Wallet, + Info: &providerInfo, + Data: params.Data, + StartEpoch: dealStart, + EndEpoch: calcDealExpiration(params.MinBlocksDuration, md, dealStart), + Price: params.EpochPrice, + Collateral: big.Zero(), + Rt: rt, + FastRetrieval: params.FastRetrieval, + VerifiedDeal: params.VerifiedDeal, + StoreID: storeID, + }) if err != nil { return nil, xerrors.Errorf("failed to start deal: %w", err) @@ -300,7 +321,7 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (*api.ImportRes }, nil } -func (a *API) ClientRemoveImport(ctx context.Context, importID int) error { +func (a *API) ClientRemoveImport(ctx context.Context, importID multistore.StoreID) error { return a.imgr().Remove(importID) } @@ -341,6 +362,9 @@ func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, erro if err != nil { return cid.Undef, err } + if err := a.imgr().AddLabel(id, "root", nd.Cid().String()); err != nil { + return cid.Cid{}, err + } return nd.Cid(), bufferedDS.Commit() } @@ -425,6 +449,15 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref if err != nil { return xerrors.Errorf("Error in retrieval params: %s", err) } + storeID, store, err := a.imgr().NewStore() + if err != nil { + return xerrors.Errorf("Error setting up new store: %w", err) + } + + defer func() { + _ = a.imgr().Remove(storeID) + }() + _, err = a.Retrieval.Retrieve( ctx, order.Root, @@ -432,7 +465,8 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref order.Total, order.MinerPeerID, order.Client, - order.Miner) // TODO: pass the store here somehow + order.Miner, + &storeID) // TODO: should we ignore storeID if we are using the IPFS blockstore? if err != nil { return xerrors.Errorf("Retrieve failed: %w", err) } @@ -452,28 +486,27 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref return nil } - rdag := merkledag.NewDAGService(blockservice.New(a.RetBstore, offline.Exchange(a.RetBstore))) - if ref.IsCAR { f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return err } - err = car.WriteCar(ctx, rdag, []cid.Cid{order.Root}, f) + err = car.WriteCar(ctx, store.DAG, []cid.Cid{order.Root}, f) if err != nil { return err } return f.Close() } - nd, err := rdag.Get(ctx, order.Root) + nd, err := store.DAG.Get(ctx, order.Root) if err != nil { return xerrors.Errorf("ClientRetrieve: %w", err) } - file, err := unixfile.NewUnixfsFile(ctx, rdag, nd) + file, err := unixfile.NewUnixfsFile(ctx, store.DAG, nd) if err != nil { return xerrors.Errorf("ClientRetrieve: %w", err) } + return files.WriteTo(file, ref.Path) } diff --git a/node/modules/client.go b/node/modules/client.go index 77e66a69a..fe81c853b 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -5,7 +5,6 @@ import ( "time" "github.com/filecoin-project/go-multistore" - "github.com/filecoin-project/lotus/lib/bufbstore" "golang.org/x/xerrors" blockstore "github.com/ipfs/go-ipfs-blockstore" @@ -62,12 +61,8 @@ func ClientImportMgr(mds dtypes.ClientMultiDstore, ds dtypes.MetadataDS) dtypes. } func ClientBlockstore(imgr dtypes.ClientImportMgr) dtypes.ClientBlockstore { - // TODO: This isn't.. the best - // - If it's easy to pass per-retrieval blockstores with markets we don't need this - // - If it's not easy, we need to store this in a separate datastore on disk - defaultWrite := blockstore.NewBlockstore(datastore.NewMapDatastore()) - - return blockstore.NewIdStore(bufbstore.NewTieredBstore(imgr.Blockstore, defaultWrite)) + // in most cases this is now unused in normal operations -- however, it's important to preserve for the IPFS use case + return blockstore.NewBlockstore(datastore.NewMapDatastore()) } // RegisterClientValidator is an initialization hook that registers the client @@ -117,9 +112,9 @@ func NewClientRequestValidator(deals dtypes.ClientDealStore) dtypes.ClientReques return requestvalidation.NewUnifiedRequestValidator(nil, deals) } -func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode) (storagemarket.StorageClient, error) { +func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, mds dtypes.ClientMultiDstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode) (storagemarket.StorageClient, error) { net := smnet.NewFromLibp2pHost(h) - c, err := storageimpl.NewClient(net, ibs, dataTransfer, discovery, deals, scn, storageimpl.DealPollingInterval(time.Second)) + c, err := storageimpl.NewClient(net, ibs, mds, dataTransfer, discovery, deals, scn, storageimpl.DealPollingInterval(time.Second)) if err != nil { return nil, err } @@ -136,9 +131,9 @@ func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, r } // RetrievalClient creates a new retrieval client attached to the client blockstore -func RetrievalClient(h host.Host, bs dtypes.ClientBlockstore, dt dtypes.ClientDataTransfer, pmgr *paychmgr.Manager, payapi payapi.PaychAPI, resolver retrievalmarket.PeerResolver, ds dtypes.MetadataDS, chainapi full.ChainAPI) (retrievalmarket.RetrievalClient, error) { +func RetrievalClient(h host.Host, mds dtypes.ClientMultiDstore, dt dtypes.ClientDataTransfer, pmgr *paychmgr.Manager, payapi payapi.PaychAPI, resolver retrievalmarket.PeerResolver, ds dtypes.MetadataDS, chainapi full.ChainAPI) (retrievalmarket.RetrievalClient, error) { adapter := retrievaladapter.NewRetrievalClientNode(pmgr, payapi, chainapi) network := rmnet.NewFromLibp2pHost(h) sc := storedcounter.New(ds, datastore.NewKey("/retr")) - return retrievalimpl.NewClient(network, bs, dt, adapter, resolver, namespace.Wrap(ds, datastore.NewKey("/retrievals/client")), sc) + return retrievalimpl.NewClient(network, mds, dt, adapter, resolver, namespace.Wrap(ds, datastore.NewKey("/retrievals/client")), sc) } diff --git a/node/modules/dtypes/storage.go b/node/modules/dtypes/storage.go index b4712a37e..a4e75c4b8 100644 --- a/node/modules/dtypes/storage.go +++ b/node/modules/dtypes/storage.go @@ -50,3 +50,4 @@ type ProviderDataTransfer datatransfer.Manager type StagingDAG format.DAGService type StagingBlockstore blockstore.Blockstore type StagingGraphsync graphsync.GraphExchange +type StagingMultiDstore *multistore.MultiStore diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 1a22fdd95..ba0178e10 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -39,6 +39,7 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask" smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-jsonrpc/auth" + "github.com/filecoin-project/go-multistore" paramfetch "github.com/filecoin-project/go-paramfetch" "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/go-storedcounter" @@ -244,6 +245,26 @@ func NewProviderPieceStore(ds dtypes.MetadataDS) dtypes.ProviderPieceStore { return piecestore.NewPieceStore(namespace.Wrap(ds, datastore.NewKey("/storagemarket"))) } +func StagingMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.StagingMultiDstore, error) { + ds, err := r.Datastore("/staging") + if err != nil { + return nil, xerrors.Errorf("getting datastore out of reop: %w", err) + } + + mds, err := multistore.NewMultiDstore(ds) + if err != nil { + return nil, err + } + + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return mds.Close() + }, + }) + + return mds, nil +} + // StagingBlockstore creates a blockstore for staging blocks for a miner // in a storage deal, prior to sealing func StagingBlockstore(r repo.LockedRepo) (dtypes.StagingBlockstore, error) { @@ -339,7 +360,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress, ffiConfig *ffiwrapper.Config, storedAsk *storedask.StoredAsk, h host.Host, ds dtypes.MetadataDS, - ibs dtypes.StagingBlockstore, + mds dtypes.StagingMultiDstore, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, dataTransfer dtypes.ProviderDataTransfer, @@ -406,7 +427,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress, return true, "", nil }) - p, err := storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), ibs, store, pieceStore, dataTransfer, spn, address.Address(minerAddress), ffiConfig.SealProofType, storedAsk, opt) + p, err := storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, mds, pieceStore, dataTransfer, spn, address.Address(minerAddress), ffiConfig.SealProofType, storedAsk, opt) if err != nil { return p, err } @@ -415,7 +436,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress, } // RetrievalProvider creates a new retrieval provider attached to the provider blockstore -func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.SectorManager, full lapi.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, ibs dtypes.StagingBlockstore, dt dtypes.ProviderDataTransfer, onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc, offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc) (retrievalmarket.RetrievalProvider, error) { +func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.SectorManager, full lapi.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, mds dtypes.StagingMultiDstore, dt dtypes.ProviderDataTransfer, onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc, offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc) (retrievalmarket.RetrievalProvider, error) { adapter := retrievaladapter.NewRetrievalProviderNode(miner, sealer, full) maddr, err := minerAddrFromDS(ds) @@ -448,7 +469,7 @@ func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.S return true, "", nil }) - return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, ibs, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), opt) + return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, mds, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), opt) } func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, cfg *ffiwrapper.Config, sc sectorstorage.SealerConfig, urls sectorstorage.URLs, sa sectorstorage.StorageAuth) (*sectorstorage.Manager, error) { diff --git a/node/repo/importmgr/mgr.go b/node/repo/importmgr/mgr.go index 754f41df2..3deb75b70 100644 --- a/node/repo/importmgr/mgr.go +++ b/node/repo/importmgr/mgr.go @@ -42,7 +42,7 @@ type StoreMeta struct { Labels map[string]string } -func (m *Mgr) NewStore() (int, *multistore.Store, error) { +func (m *Mgr) NewStore() (multistore.StoreID, *multistore.Store, error) { id := m.mds.Next() st, err := m.mds.Get(id) if err != nil { @@ -60,7 +60,7 @@ func (m *Mgr) NewStore() (int, *multistore.Store, error) { return id, st, err } -func (m *Mgr) AddLabel(id int, key, value string) error { // source, file path, data CID.. +func (m *Mgr) AddLabel(id multistore.StoreID, key, value string) error { // source, file path, data CID.. meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id))) if err != nil { return xerrors.Errorf("getting metadata form datastore: %w", err) @@ -81,11 +81,11 @@ func (m *Mgr) AddLabel(id int, key, value string) error { // source, file path, return m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta) } -func (m *Mgr) List() []int { +func (m *Mgr) List() []multistore.StoreID { return m.mds.List() } -func (m *Mgr) Info(id int) (*StoreMeta, error) { +func (m *Mgr) Info(id multistore.StoreID) (*StoreMeta, error) { meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id))) if err != nil { return nil, xerrors.Errorf("getting metadata form datastore: %w", err) @@ -99,7 +99,7 @@ func (m *Mgr) Info(id int) (*StoreMeta, error) { return &sm, nil } -func (m *Mgr) Remove(id int) error { +func (m *Mgr) Remove(id multistore.StoreID) error { if err := m.mds.Delete(id); err != nil { return xerrors.Errorf("removing import: %w", err) }