remove multistore from Lotus

This commit is contained in:
aarshkshah1992 2021-07-03 12:55:20 +05:30
parent dae6c757b5
commit 40b30d1fb1
27 changed files with 164 additions and 504 deletions

View File

@ -14,7 +14,6 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer" datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/crypto"
@ -320,7 +319,7 @@ type FullNode interface {
// ClientImport imports file under the specified path into filestore. // ClientImport imports file under the specified path into filestore.
ClientImport(ctx context.Context, ref FileRef) (*ImportRes, error) //perm:admin ClientImport(ctx context.Context, ref FileRef) (*ImportRes, error) //perm:admin
// ClientRemoveImport removes file import // ClientRemoveImport removes file import
ClientRemoveImport(ctx context.Context, importID multistore.StoreID) error //perm:admin ClientRemoveImport(ctx context.Context, importID uint64) error //perm:admin
// ClientStartDeal proposes a deal with a miner. // ClientStartDeal proposes a deal with a miner.
ClientStartDeal(ctx context.Context, params *StartDealParams) (*cid.Cid, error) //perm:admin ClientStartDeal(ctx context.Context, params *StartDealParams) (*cid.Cid, error) //perm:admin
// ClientStatelessDeal fire-and-forget-proposes an offline deal to a miner without subsequent tracking. // ClientStatelessDeal fire-and-forget-proposes an offline deal to a miner without subsequent tracking.
@ -712,11 +711,11 @@ type MinerSectors struct {
type ImportRes struct { type ImportRes struct {
Root cid.Cid Root cid.Cid
ImportID multistore.StoreID ImportID uint64
} }
type Import struct { type Import struct {
Key multistore.StoreID Key uint64
Err string Err string
Root *cid.Cid Root *cid.Cid

View File

@ -27,7 +27,6 @@ import (
filestore2 "github.com/filecoin-project/go-fil-markets/filestore" filestore2 "github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/crypto"
@ -88,7 +87,7 @@ func init() {
addExample(pid) addExample(pid)
addExample(&pid) addExample(&pid)
multistoreIDExample := multistore.StoreID(50) storeIDExample := uint64(50)
addExample(bitfield.NewFromSet([]uint64{5})) addExample(bitfield.NewFromSet([]uint64{5}))
addExample(abi.RegisteredSealProof_StackedDrg32GiBV1_1) addExample(abi.RegisteredSealProof_StackedDrg32GiBV1_1)
@ -119,8 +118,8 @@ func init() {
addExample(time.Minute) addExample(time.Minute)
addExample(datatransfer.TransferID(3)) addExample(datatransfer.TransferID(3))
addExample(datatransfer.Ongoing) addExample(datatransfer.Ongoing)
addExample(multistoreIDExample) addExample(storeIDExample)
addExample(&multistoreIDExample) addExample(&storeIDExample)
addExample(retrievalmarket.ClientEventDealAccepted) addExample(retrievalmarket.ClientEventDealAccepted)
addExample(retrievalmarket.DealStatusNew) addExample(retrievalmarket.DealStatusNew)
addExample(network.ReachabilityPublic) addExample(network.ReachabilityPublic)
@ -175,7 +174,7 @@ func init() {
// miner specific // miner specific
addExample(filestore2.Path(".lotusminer/fstmp123")) addExample(filestore2.Path(".lotusminer/fstmp123"))
si := multistore.StoreID(12) si := uint64(12)
addExample(&si) addExample(&si)
addExample(retrievalmarket.DealID(5)) addExample(retrievalmarket.DealID(5))
addExample(abi.ActorID(1000)) addExample(abi.ActorID(1000))

View File

@ -14,7 +14,6 @@ import (
retrievalmarket "github.com/filecoin-project/go-fil-markets/retrievalmarket" retrievalmarket "github.com/filecoin-project/go-fil-markets/retrievalmarket"
storagemarket "github.com/filecoin-project/go-fil-markets/storagemarket" storagemarket "github.com/filecoin-project/go-fil-markets/storagemarket"
auth "github.com/filecoin-project/go-jsonrpc/auth" auth "github.com/filecoin-project/go-jsonrpc/auth"
multistore "github.com/filecoin-project/go-multistore"
abi "github.com/filecoin-project/go-state-types/abi" abi "github.com/filecoin-project/go-state-types/abi"
big "github.com/filecoin-project/go-state-types/big" big "github.com/filecoin-project/go-state-types/big"
crypto "github.com/filecoin-project/go-state-types/crypto" crypto "github.com/filecoin-project/go-state-types/crypto"
@ -716,7 +715,7 @@ func (mr *MockFullNodeMockRecorder) ClientQueryAsk(arg0, arg1, arg2 interface{})
} }
// ClientRemoveImport mocks base method. // ClientRemoveImport mocks base method.
func (m *MockFullNode) ClientRemoveImport(arg0 context.Context, arg1 multistore.StoreID) error { func (m *MockFullNode) ClientRemoveImport(arg0 context.Context, arg1 uint64) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ClientRemoveImport", arg0, arg1) ret := m.ctrl.Call(m, "ClientRemoveImport", arg0, arg1)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)

View File

@ -13,7 +13,6 @@ import (
"github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/go-state-types/dline"
@ -197,7 +196,7 @@ type FullNodeStruct struct {
ClientQueryAsk func(p0 context.Context, p1 peer.ID, p2 address.Address) (*storagemarket.StorageAsk, error) `perm:"read"` ClientQueryAsk func(p0 context.Context, p1 peer.ID, p2 address.Address) (*storagemarket.StorageAsk, error) `perm:"read"`
ClientRemoveImport func(p0 context.Context, p1 multistore.StoreID) error `perm:"admin"` ClientRemoveImport func(p0 context.Context, p1 uint64) error `perm:"admin"`
ClientRestartDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"` ClientRestartDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"`
@ -1381,11 +1380,11 @@ func (s *FullNodeStub) ClientQueryAsk(p0 context.Context, p1 peer.ID, p2 address
return nil, xerrors.New("method not supported") return nil, xerrors.New("method not supported")
} }
func (s *FullNodeStruct) ClientRemoveImport(p0 context.Context, p1 multistore.StoreID) error { func (s *FullNodeStruct) ClientRemoveImport(p0 context.Context, p1 uint64) error {
return s.Internal.ClientRemoveImport(p0, p1) return s.Internal.ClientRemoveImport(p0, p1)
} }
func (s *FullNodeStub) ClientRemoveImport(p0 context.Context, p1 multistore.StoreID) error { func (s *FullNodeStub) ClientRemoveImport(p0 context.Context, p1 uint64) error {
return xerrors.New("method not supported") return xerrors.New("method not supported")
} }

View File

@ -8,7 +8,6 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer" datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/go-state-types/dline"
@ -301,7 +300,7 @@ type FullNode interface {
// ClientImport imports file under the specified path into filestore. // ClientImport imports file under the specified path into filestore.
ClientImport(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) //perm:admin ClientImport(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) //perm:admin
// ClientRemoveImport removes file import // ClientRemoveImport removes file import
ClientRemoveImport(ctx context.Context, importID multistore.StoreID) error //perm:admin ClientRemoveImport(ctx context.Context, importID uint64) error //perm:admin
// ClientStartDeal proposes a deal with a miner. // ClientStartDeal proposes a deal with a miner.
ClientStartDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) //perm:admin ClientStartDeal(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) //perm:admin
// ClientStatelessDeal fire-and-forget-proposes an offline deal to a miner without subsequent tracking. // ClientStatelessDeal fire-and-forget-proposes an offline deal to a miner without subsequent tracking.

View File

@ -10,7 +10,6 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer" datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/go-state-types/dline"
@ -115,7 +114,7 @@ type FullNodeStruct struct {
ClientQueryAsk func(p0 context.Context, p1 peer.ID, p2 address.Address) (*storagemarket.StorageAsk, error) `perm:"read"` ClientQueryAsk func(p0 context.Context, p1 peer.ID, p2 address.Address) (*storagemarket.StorageAsk, error) `perm:"read"`
ClientRemoveImport func(p0 context.Context, p1 multistore.StoreID) error `perm:"admin"` ClientRemoveImport func(p0 context.Context, p1 uint64) error `perm:"admin"`
ClientRestartDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"` ClientRestartDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"`
@ -794,11 +793,11 @@ func (s *FullNodeStub) ClientQueryAsk(p0 context.Context, p1 peer.ID, p2 address
return nil, xerrors.New("method not supported") return nil, xerrors.New("method not supported")
} }
func (s *FullNodeStruct) ClientRemoveImport(p0 context.Context, p1 multistore.StoreID) error { func (s *FullNodeStruct) ClientRemoveImport(p0 context.Context, p1 uint64) error {
return s.Internal.ClientRemoveImport(p0, p1) return s.Internal.ClientRemoveImport(p0, p1)
} }
func (s *FullNodeStub) ClientRemoveImport(p0 context.Context, p1 multistore.StoreID) error { func (s *FullNodeStub) ClientRemoveImport(p0 context.Context, p1 uint64) error {
return xerrors.New("method not supported") return xerrors.New("method not supported")
} }

View File

@ -14,7 +14,6 @@ import (
retrievalmarket "github.com/filecoin-project/go-fil-markets/retrievalmarket" retrievalmarket "github.com/filecoin-project/go-fil-markets/retrievalmarket"
storagemarket "github.com/filecoin-project/go-fil-markets/storagemarket" storagemarket "github.com/filecoin-project/go-fil-markets/storagemarket"
auth "github.com/filecoin-project/go-jsonrpc/auth" auth "github.com/filecoin-project/go-jsonrpc/auth"
multistore "github.com/filecoin-project/go-multistore"
abi "github.com/filecoin-project/go-state-types/abi" abi "github.com/filecoin-project/go-state-types/abi"
big "github.com/filecoin-project/go-state-types/big" big "github.com/filecoin-project/go-state-types/big"
crypto "github.com/filecoin-project/go-state-types/crypto" crypto "github.com/filecoin-project/go-state-types/crypto"
@ -716,7 +715,7 @@ func (mr *MockFullNodeMockRecorder) ClientQueryAsk(arg0, arg1, arg2 interface{})
} }
// ClientRemoveImport mocks base method. // ClientRemoveImport mocks base method.
func (m *MockFullNode) ClientRemoveImport(arg0 context.Context, arg1 multistore.StoreID) error { func (m *MockFullNode) ClientRemoveImport(arg0 context.Context, arg1 uint64) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ClientRemoveImport", arg0, arg1) ret := m.ctrl.Call(m, "ClientRemoveImport", arg0, arg1)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -34,7 +34,6 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
@ -174,14 +173,14 @@ var clientDropCmd = &cli.Command{
defer closer() defer closer()
ctx := ReqContext(cctx) ctx := ReqContext(cctx)
var ids []multistore.StoreID var ids []uint64
for i, s := range cctx.Args().Slice() { for i, s := range cctx.Args().Slice() {
id, err := strconv.ParseInt(s, 10, 0) id, err := strconv.ParseUint(s, 10, 64)
if err != nil { if err != nil {
return xerrors.Errorf("parsing %d-th import ID: %w", i, err) return xerrors.Errorf("parsing %d-th import ID: %w", i, err)
} }
ids = append(ids, multistore.StoreID(id)) ids = append(ids, id)
} }
for _, id := range ids { for _, id := range ids {

View File

@ -687,7 +687,6 @@ Response:
"SlashEpoch": 10101, "SlashEpoch": 10101,
"FastRetrieval": true, "FastRetrieval": true,
"Message": "string value", "Message": "string value",
"StoreID": 12,
"FundsReserved": "0", "FundsReserved": "0",
"Ref": { "Ref": {
"TransferType": "string value", "TransferType": "string value",
@ -706,7 +705,8 @@ Response:
"Responder": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", "Responder": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf",
"ID": 3 "ID": 3
}, },
"SectorNumber": 9 "SectorNumber": 9,
"CARv2FilePath": "string value"
} }
``` ```

View File

@ -1286,7 +1286,7 @@ Response:
"Root": { "Root": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
}, },
"ImportID": 50 "ImportID": 42
} }
``` ```
@ -1406,7 +1406,7 @@ Perms: admin
Inputs: Inputs:
```json ```json
[ [
50 42
] ]
``` ```
@ -1444,7 +1444,7 @@ Inputs:
}, },
"Piece": null, "Piece": null,
"Size": 42, "Size": 42,
"LocalStore": 12, "LocalCARV2FilePath": "string value",
"Total": "0", "Total": "0",
"UnsealPrice": "0", "UnsealPrice": "0",
"PaymentInterval": 42, "PaymentInterval": 42,
@ -1498,7 +1498,7 @@ Inputs:
}, },
"Piece": null, "Piece": null,
"Size": 42, "Size": 42,
"LocalStore": 12, "LocalCARV2FilePath": "string value",
"Total": "0", "Total": "0",
"UnsealPrice": "0", "UnsealPrice": "0",
"PaymentInterval": 42, "PaymentInterval": 42,

View File

@ -1288,7 +1288,7 @@ Response:
"Root": { "Root": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
}, },
"ImportID": 50 "ImportID": 42
} }
``` ```
@ -1408,7 +1408,7 @@ Perms: admin
Inputs: Inputs:
```json ```json
[ [
50 42
] ]
``` ```
@ -1446,7 +1446,7 @@ Inputs:
}, },
"Piece": null, "Piece": null,
"Size": 42, "Size": 42,
"LocalStore": 12, "LocalCARV2FilePath": "string value",
"Total": "0", "Total": "0",
"UnsealPrice": "0", "UnsealPrice": "0",
"PaymentInterval": 42, "PaymentInterval": 42,
@ -1500,7 +1500,7 @@ Inputs:
}, },
"Piece": null, "Piece": null,
"Size": 42, "Size": 42,
"LocalStore": 12, "LocalCARV2FilePath": "string value",
"Total": "0", "Total": "0",
"UnsealPrice": "0", "UnsealPrice": "0",
"PaymentInterval": 42, "PaymentInterval": 42,

1
go.mod
View File

@ -36,7 +36,6 @@ require (
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210702071641-2b0175bfe1ed github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210702071641-2b0175bfe1ed
github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20 github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
github.com/filecoin-project/go-paramfetch v0.0.2-0.20210614165157-25a6c7769498 github.com/filecoin-project/go-paramfetch v0.0.2-0.20210614165157-25a6c7769498
github.com/filecoin-project/go-state-types v0.1.1-0.20210506134452-99b279731c48 github.com/filecoin-project/go-state-types v0.1.1-0.20210506134452-99b279731c48

BIN
itests/fstmp889313799 Normal file

Binary file not shown.

View File

@ -324,10 +324,8 @@ func Repo(r repo.Repo) Option {
), ),
Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr), Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr),
Override(new(dtypes.ClientMultiDstore), modules.ClientMultiDatastore),
Override(new(dtypes.ClientBlockstore), modules.ClientBlockstore), Override(new(dtypes.ClientBlockstore), modules.ClientBlockstore),
Override(new(dtypes.ClientRetrievalStoreManager), modules.ClientRetrievalStoreManager),
Override(new(ci.PrivKey), lp2p.PrivKey), Override(new(ci.PrivKey), lp2p.PrivKey),
Override(new(ci.PubKey), ci.PrivKey.GetPublic), Override(new(ci.PubKey), ci.PrivKey.GetPublic),
Override(new(peer.ID), peer.IDFromPublicKey), Override(new(peer.ID), peer.IDFromPublicKey),

View File

@ -168,9 +168,10 @@ func ConfigFullNode(c interface{}) Option {
If(cfg.Client.UseIpfs, If(cfg.Client.UseIpfs,
Override(new(dtypes.ClientBlockstore), modules.IpfsClientBlockstore(ipfsMaddr, cfg.Client.IpfsOnlineMode)), Override(new(dtypes.ClientBlockstore), modules.IpfsClientBlockstore(ipfsMaddr, cfg.Client.IpfsOnlineMode)),
If(cfg.Client.IpfsUseForRetrieval, // TODO Fix this when we use IPFS for retrieval here.
/*If(cfg.Client.IpfsUseForRetrieval,
Override(new(dtypes.ClientRetrievalStoreManager), modules.ClientBlockstoreRetrievalStoreManager), Override(new(dtypes.ClientRetrievalStoreManager), modules.ClientBlockstoreRetrievalStoreManager),
), ),*/
), ),
Override(new(dtypes.Graphsync), modules.Graphsync(cfg.Client.SimultaneousTransfers)), Override(new(dtypes.Graphsync), modules.Graphsync(cfg.Client.SimultaneousTransfers)),

View File

@ -4,6 +4,7 @@ import (
"errors" "errors"
"time" "time"
"github.com/filecoin-project/go-fil-markets/shared_testutil"
"go.uber.org/fx" "go.uber.org/fx"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -129,7 +130,6 @@ func ConfigStorageMiner(c interface{}) Option {
If(cfg.Subsystems.EnableStorageMarket, If(cfg.Subsystems.EnableStorageMarket,
// Markets // Markets
Override(new(dtypes.StagingMultiDstore), modules.StagingMultiDatastore),
Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore), Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore),
Override(new(dtypes.StagingDAG), modules.StagingDAG), Override(new(dtypes.StagingDAG), modules.StagingDAG),
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(cfg.Dealmaking.SimultaneousTransfers)), Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(cfg.Dealmaking.SimultaneousTransfers)),
@ -146,6 +146,9 @@ func ConfigStorageMiner(c interface{}) Option {
})), })),
Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)), Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)),
// DAG Store
Override(new(shared_testutil.DagStore), modules.DAGStore),
// Markets (retrieval) // Markets (retrieval)
Override(new(retrievalmarket.RetrievalProviderNode), retrievaladapter.NewRetrievalProviderNode), Override(new(retrievalmarket.RetrievalProviderNode), retrievaladapter.NewRetrievalProviderNode),
Override(new(rmnet.RetrievalMarketNetwork), modules.RetrievalNetwork), Override(new(rmnet.RetrievalMarketNetwork), modules.RetrievalNetwork),

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"math/rand"
"os" "os"
"sort" "sort"
"time" "time"
@ -22,7 +23,6 @@ import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
offline "github.com/ipfs/go-ipfs-exchange-offline" offline "github.com/ipfs/go-ipfs-exchange-offline"
files "github.com/ipfs/go-ipfs-files" files "github.com/ipfs/go-ipfs-files"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag" "github.com/ipfs/go-merkledag"
unixfile "github.com/ipfs/go-unixfs/file" unixfile "github.com/ipfs/go-unixfs/file"
"github.com/ipld/go-car" "github.com/ipld/go-car"
@ -46,7 +46,6 @@ import (
"github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/v3/actors/builtin/market" "github.com/filecoin-project/specs-actors/v3/actors/builtin/market"
@ -82,12 +81,10 @@ type API struct {
Chain *store.ChainStore Chain *store.ChainStore
Imports dtypes.ClientImportMgr Imports dtypes.ClientImportMgr
Mds dtypes.ClientMultiDstore
CombinedBstore dtypes.ClientBlockstore // TODO: try to remove CombinedBstore dtypes.ClientBlockstore // TODO: try to remove
RetrievalStoreMgr dtypes.ClientRetrievalStoreManager DataTransfer dtypes.ClientDataTransfer
DataTransfer dtypes.ClientDataTransfer Host host.Host
Host host.Host
// TODO How do we inject the Repo Path here ? // TODO How do we inject the Repo Path here ?
} }
@ -416,16 +413,29 @@ func (a *API) newDealInfoWithTransfer(transferCh *api.DataTransferChannel, v sto
func (a *API) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) { func (a *API) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) {
// TODO: check if we have the ENTIRE dag // TODO: check if we have the ENTIRE dag
importIDs, err := a.imgr().List()
offExch := merkledag.NewDAGService(blockservice.New(a.Imports.Blockstore, offline.Exchange(a.Imports.Blockstore)))
_, err := offExch.Get(ctx, root)
if err == ipld.ErrNotFound {
return false, nil
}
if err != nil { if err != nil {
return false, err return false, xerrors.Errorf("failed to list imports: %w", err)
} }
return true, nil
for _, importID := range importIDs {
info, err := a.imgr().Info(importID)
if err != nil {
continue
}
if info.Labels[importmgr.LRootCid] == "" {
continue
}
c, err := cid.Parse(info.Labels[importmgr.LRootCid])
if err != nil {
continue
}
if c.Equals(root) {
return true, nil
}
}
return false, nil
} }
func (a *API) ClientFindData(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) { func (a *API) ClientFindData(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) {
@ -487,17 +497,11 @@ func (a *API) makeRetrievalQuery(ctx context.Context, rp rm.RetrievalPeer, paylo
} }
func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (res *api.ImportRes, finalErr error) { func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (res *api.ImportRes, finalErr error) {
id, st, err := a.imgr().NewStore() id, err := a.imgr().NewStore()
if err != nil { if err != nil {
return nil, err return nil, err
} }
// we don't need the store any more after we return from here. clean it up completely.
// we only need to retain the metadata related to this import which is identified by the storeID/importID.
defer func() {
_ = ((*multistore.MultiStore)(a.Mds)).Delete(id)
}()
carV2File, err := a.imgr().NewTempFile(id) carV2File, err := a.imgr().NewTempFile(id)
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to create temp CARv2 file: %w", err) return nil, xerrors.Errorf("failed to create temp CARv2 file: %w", err)
@ -516,7 +520,7 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (res *api.Impor
return nil, xerrors.Errorf("failed to import CAR file: %w", err) return nil, xerrors.Errorf("failed to import CAR file: %w", err)
} }
} else { } else {
root, err = importNormalFileToCARv2(ctx, st, ref.Path, carV2File) root, err = a.importNormalFileToCARv2(ctx, id, ref.Path, carV2File)
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to import normal file to CARv2: %w", err) return nil, xerrors.Errorf("failed to import normal file to CARv2: %w", err)
} }
@ -541,10 +545,10 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (res *api.Impor
}, nil }, nil
} }
func (a *API) ClientRemoveImport(ctx context.Context, importID multistore.StoreID) error { func (a *API) ClientRemoveImport(ctx context.Context, importID uint64) error {
info, err := a.imgr().Info(importID) info, err := a.imgr().Info(importID)
if err != nil { if err != nil {
return xerrors.Errorf("failed to fetch multistore info: %w", err) return xerrors.Errorf("failed to fetch import info: %w", err)
} }
// remove the CARv2 file if we've created one. // remove the CARv2 file if we've created one.
@ -555,46 +559,33 @@ func (a *API) ClientRemoveImport(ctx context.Context, importID multistore.StoreI
return a.imgr().Remove(importID) return a.imgr().Remove(importID)
} }
// FIXME func (a *API) ClientImportLocal(ctx context.Context, r io.Reader) (cid.Cid, error) {
func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (c cid.Cid, finalErr error) { // write payload to temp file
id, st, err := a.imgr().NewStore() tmpPath, err := a.imgr().NewTempFile(rand.Uint64())
if err != nil { if err != nil {
return cid.Undef, err return cid.Undef, err
} }
// we don't need the store any more after we return from here. clean it up completely. defer os.Remove(tmpPath) //nolint:errcheck
// we only need to retain the metadata related to this import which is identified by the storeID/importID. tmpF, err := os.Open(tmpPath)
defer func() {
_ = ((*multistore.MultiStore)(a.Mds)).Delete(id)
}()
carV2File, err := a.imgr().NewTempFile(id)
if err != nil { if err != nil {
return cid.Undef, xerrors.Errorf("failed to create temp CARv2 file: %w", err) return cid.Undef, err
} }
// make sure to remove the CARv2 file if anything goes wrong from here on. defer tmpF.Close() //nolint:errcheck
defer func() { if _, err := io.Copy(tmpF, r); err != nil {
if finalErr != nil { return cid.Undef, err
_ = os.Remove(carV2File)
}
}()
// FIXME
root, err := importNormalFileToCARv2(ctx, st, "", carV2File)
if err != nil {
return root, xerrors.Errorf("failed to import to CARv2 file: %w", err)
} }
if err := tmpF.Close(); err != nil {
if err := a.imgr().AddLabel(id, importmgr.LSource, "import-local"); err != nil {
return cid.Cid{}, err
}
if err := a.imgr().AddLabel(id, importmgr.LRootCid, root.String()); err != nil {
return cid.Cid{}, err
}
if err := a.imgr().AddLabel(id, importmgr.LCARv2FilePath, carV2File); err != nil {
return cid.Undef, err return cid.Undef, err
} }
return root, nil res, err := a.ClientImport(ctx, api.FileRef{
Path: tmpPath,
IsCAR: false,
})
if err != nil {
return cid.Undef, err
}
return res.Root, nil
} }
func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) { func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) {
@ -818,8 +809,8 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
} }
carV2FilePath = resp.CarFilePath carV2FilePath = resp.CarFilePath
// remove the temp CARv2 fil when retrieval is complete // remove the temp CARv2 file when retrieval is complete
defer os.Remove(carV2FilePath) defer os.Remove(carV2FilePath) //nolint:errcheck
} else { } else {
carV2FilePath = order.LocalCARV2FilePath carV2FilePath = order.LocalCARV2FilePath
} }
@ -837,7 +828,7 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
finish(err) finish(err)
return return
} }
defer carv2Reader.Close() defer carv2Reader.Close() //nolint:errcheck
if _, err := io.Copy(f, carv2Reader.CarV1Reader()); err != nil { if _, err := io.Copy(f, carv2Reader.CarV1Reader()); err != nil {
finish(err) finish(err)
return return
@ -852,7 +843,7 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
finish(err) finish(err)
return return
} }
defer rw.Close() defer rw.Close() //nolint:errcheck
bsvc := blockservice.New(rw, offline.Exchange(rw)) bsvc := blockservice.New(rw, offline.Exchange(rw))
dag := merkledag.NewDAGService(bsvc) dag := merkledag.NewDAGService(bsvc)
@ -947,19 +938,6 @@ func (a *API) newRetrievalInfo(ctx context.Context, v rm.ClientDealState) api.Re
return a.newRetrievalInfoWithTransfer(transferCh, v) return a.newRetrievalInfoWithTransfer(transferCh, v)
} }
type multiStoreRetrievalStore struct {
storeID multistore.StoreID
store *multistore.Store
}
func (mrs *multiStoreRetrievalStore) StoreID() *multistore.StoreID {
return &mrs.storeID
}
func (mrs *multiStoreRetrievalStore) DAGService() ipld.DAGService {
return mrs.store.DAG
}
func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) { func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) {
mi, err := a.StateMinerInfo(ctx, miner, types.EmptyTSK) mi, err := a.StateMinerInfo(ctx, miner, types.EmptyTSK)
if err != nil { if err != nil {
@ -1065,28 +1043,30 @@ func (a *API) ClientDealPieceCID(ctx context.Context, root cid.Cid) (api.DataCID
} }
func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath string) error { func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath string) error {
id, st, err := a.imgr().NewStore() id := rand.Uint64()
tmpCARv2File, err := a.imgr().NewTempFile(id)
if err != nil { if err != nil {
return err return xerrors.Errorf("failed to create temp file: %w", err)
} }
defer os.Remove(tmpCARv2File) //nolint:errcheck
defer func() { root, err := a.importNormalFileToCARv2(ctx, id, ref.Path, tmpCARv2File)
// Clean up the store as we don't need it anymore.
_ = a.imgr().Remove(id)
}()
c, err := importNormalFileToUnixfsDAG(ctx, ref.Path, st.DAG)
if err != nil { if err != nil {
return xerrors.Errorf("failed to import file to store: %w", err) return xerrors.Errorf("failed to import normal file to CARv2")
} }
// generate a deterministic CARv1 payload from the UnixFS DAG by doing an IPLD // generate a deterministic CARv1 payload from the UnixFS DAG by doing an IPLD
// traversal over the Unixfs DAGs in the blockstore using the "all selector" i.e the entire DAG selector. // traversal over the Unixfs DAG in the CARv2 file using the "all selector" i.e the entire DAG selector.
rdOnly, err := blockstore.OpenReadOnly(tmpCARv2File, true)
if err != nil {
return xerrors.Errorf("failed to open read only CARv2 blockstore: %w", err)
}
defer rdOnly.Close() //nolint:errcheck
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(), allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()
sc := car.NewSelectiveCar(ctx, rdOnly, []car.Dag{{Root: root, Selector: allSelector}})
sc := car.NewSelectiveCar(ctx, st.Bstore, []car.Dag{{Root: c, Selector: allSelector}})
f, err := os.Create(outputPath) f, err := os.Create(outputPath)
if err != nil { if err != nil {
return err return err

View File

@ -3,11 +3,9 @@ package client
import ( import (
"bufio" "bufio"
"context" "context"
"fmt"
"io" "io"
"os" "os"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/ipfs/go-blockservice" "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -25,43 +23,47 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
) )
// importNormalFileToCARv2 imports the client's normal file to a CARv2 file. // importNormalFileToCARv2 transforms the client's "normal file" to a Unixfs IPLD DAG and writes out the DAG to a CARv2 file at the given output path.
// It first generates a Unixfs DAG using the given store to store the resulting blocks and gets the root cid of the Unixfs DAG. func (a *API) importNormalFileToCARv2(ctx context.Context, importID uint64, inputFilePath string, outputCARv2Path string) (c cid.Cid, finalErr error) {
// It then writes out the Unixfs DAG to a CARv2 file by generating a Unixfs DAG again using a CARv2 read-write blockstore as the backing store
// and then finalizing the CARv2 read-write blockstore to get the backing CARv2 file. // TODO: We've currently put in a hack to create the Unixfs DAG as a CARv2 without using Badger.
func importNormalFileToCARv2(ctx context.Context, st *multistore.Store, inputFilePath string, outputCARv2Path string) (c cid.Cid, finalErr error) { // We first transform the Unixfs DAG to a rootless CARv2 file as CARv2 doesen't allow streaming writes without specifying the root upfront and we
// create the UnixFS DAG and import the file to store to get the root. // don't have the root till the Unixfs DAG is created.
root, err := importNormalFileToUnixfsDAG(ctx, inputFilePath, st.DAG) //
// In the second pass, we create a CARv2 file with the root present using the root node we get in the above step.
// This hack should be fixed when CARv2 allows specifying the root AFTER finishing the CARv2 streaming write.
tmpCARv2Path, err := a.imgr().NewTempFile(importID)
if err != nil {
return cid.Undef, xerrors.Errorf("failed to create temp CARv2 file: %w", err)
}
defer os.Remove(tmpCARv2Path) //nolint:errcheck
tempCARv2Store, err := blockstore.NewReadWrite(tmpCARv2Path, []cid.Cid{})
if err != nil {
return cid.Undef, xerrors.Errorf("failed to create rootless temp CARv2 rw store: %w", err)
}
defer tempCARv2Store.Finalize() //nolint:errcheck
bsvc := blockservice.New(tempCARv2Store, offline.Exchange(tempCARv2Store))
// ---- First Pass --- Write out the UnixFS DAG to a rootless CARv2 file by instantiating a read-write CARv2 blockstore without the root.
root, err := importNormalFileToUnixfsDAG(ctx, inputFilePath, merkledag.NewDAGService(bsvc))
if err != nil { if err != nil {
return cid.Undef, xerrors.Errorf("failed to import file to store: %w", err) return cid.Undef, xerrors.Errorf("failed to import file to store: %w", err)
} }
//--- //------ Second Pass --- Now that we have the root of the Unixfs DAG -> write out the Unixfs DAG to a CARv2 file with the root present by using a read-write CARv2 blockstore.
// transform the file to a CARv2 file by writing out a Unixfs DAG via the CARv2 read-write blockstore.
rw, err := blockstore.NewReadWrite(outputCARv2Path, []cid.Cid{root}) rw, err := blockstore.NewReadWrite(outputCARv2Path, []cid.Cid{root})
if err != nil { if err != nil {
return cid.Undef, xerrors.Errorf("failed to create a CARv2 read-write blockstore: %w", err) return cid.Undef, xerrors.Errorf("failed to create a CARv2 read-write blockstore: %w", err)
} }
defer rw.Finalize() //nolint:errcheck
// make sure to call finalize on the CARv2 read-write blockstore to ensure that the blockstore flushes out a valid CARv2 file bsvc = blockservice.New(rw, offline.Exchange(rw))
// and releases the file handle it acquires.
defer func() {
err := rw.Finalize()
if finalErr != nil {
finalErr = xerrors.Errorf("failed to import file to CARv2, err=%w", finalErr)
} else {
finalErr = err
}
}()
bsvc := blockservice.New(rw, offline.Exchange(rw))
root2, err := importNormalFileToUnixfsDAG(ctx, inputFilePath, merkledag.NewDAGService(bsvc)) root2, err := importNormalFileToUnixfsDAG(ctx, inputFilePath, merkledag.NewDAGService(bsvc))
if err != nil { if err != nil {
return cid.Undef, xerrors.Errorf("failed to create Unixfs DAG with CARv2 blockstore: %w", err) return cid.Undef, xerrors.Errorf("failed to create Unixfs DAG with CARv2 blockstore: %w", err)
} }
fmt.Printf("\n root1 is %s and root2 is %s", root, root2)
if root != root2 { if root != root2 {
return cid.Undef, xerrors.New("roots do not match") return cid.Undef, xerrors.New("roots do not match")
} }
@ -75,7 +77,7 @@ func importNormalFileToUnixfsDAG(ctx context.Context, inputFilePath string, dag
if err != nil { if err != nil {
return cid.Undef, xerrors.Errorf("failed to open input file: %w", err) return cid.Undef, xerrors.Errorf("failed to open input file: %w", err)
} }
defer f.Close() defer f.Close() //nolint:errcheck
stat, err := f.Stat() stat, err := f.Stat()
if err != nil { if err != nil {
@ -147,7 +149,7 @@ func transformCarToCARv2(inputCARPath string, outputCARv2Path string) (root cid.
if err != nil { if err != nil {
return cid.Undef, xerrors.Errorf("failed to open output CARv2 file: %w", err) return cid.Undef, xerrors.Errorf("failed to open output CARv2 file: %w", err)
} }
defer outF.Close() defer outF.Close() //nolint:errcheck
_, err = io.Copy(outF, inputF) _, err = io.Copy(outF, inputF)
if err != nil { if err != nil {
return cid.Undef, xerrors.Errorf("failed to copy CARv2 file: %w", err) return cid.Undef, xerrors.Errorf("failed to copy CARv2 file: %w", err)

View File

@ -23,7 +23,6 @@ import (
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl" storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/namespace"
@ -38,10 +37,8 @@ import (
"github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/impl/full"
payapi "github.com/filecoin-project/lotus/node/impl/paych" payapi "github.com/filecoin-project/lotus/node/impl/paych"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/node/repo/importmgr" "github.com/filecoin-project/lotus/node/repo/importmgr"
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
) )
func HandleMigrateClientFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, wallet full.WalletAPI, fundMgr *market.FundManager) { func HandleMigrateClientFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, wallet full.WalletAPI, fundMgr *market.FundManager) {
@ -78,34 +75,14 @@ func HandleMigrateClientFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, wallet full
}) })
} }
func ClientMultiDatastore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ClientMultiDstore, error) { func ClientImportMgr(ds dtypes.MetadataDS, r repo.LockedRepo) dtypes.ClientImportMgr {
ctx := helpers.LifecycleCtx(mctx, lc) return importmgr.New(namespace.Wrap(ds, datastore.NewKey("/client")), r.Path())
ds, err := r.Datastore(ctx, "/client")
if err != nil {
return nil, xerrors.Errorf("getting datastore out of repo: %w", err)
}
mds, err := multistore.NewMultiDstore(ds)
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return mds.Close()
},
})
return mds, nil
}
func ClientImportMgr(mds dtypes.ClientMultiDstore, ds dtypes.MetadataDS, r repo.LockedRepo) dtypes.ClientImportMgr {
return importmgr.New(mds, namespace.Wrap(ds, datastore.NewKey("/client")), r.Path())
} }
// TODO Ge this to work when we work on IPFS integration. What we effectively need here is a cross-shard/cross-CAR files index for the Storage client's imports.
func ClientBlockstore(imgr dtypes.ClientImportMgr) dtypes.ClientBlockstore { func ClientBlockstore(imgr dtypes.ClientImportMgr) dtypes.ClientBlockstore {
// in most cases this is now unused in normal operations -- however, it's important to preserve for the IPFS use case // in most cases this is now unused in normal operations -- however, it's important to preserve for the IPFS use case
return blockstore.WrapIDStore(imgr.Blockstore) return blockstore.WrapIDStore(blockstore.FromDatastore(datastore.NewMapDatastore()))
} }
// RegisterClientValidator is an initialization hook that registers the client // RegisterClientValidator is an initialization hook that registers the client
@ -232,13 +209,3 @@ func RetrievalClient(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.
}) })
return client, nil return client, nil
} }
// ClientRetrievalStoreManager is the default version of the RetrievalStoreManager that runs on multistore
func ClientRetrievalStoreManager(imgr dtypes.ClientImportMgr) dtypes.ClientRetrievalStoreManager {
return retrievalstoremgr.NewMultiStoreRetrievalStoreManager(imgr)
}
// ClientBlockstoreRetrievalStoreManager is the default version of the RetrievalStoreManager that runs on multistore
func ClientBlockstoreRetrievalStoreManager(bs dtypes.ClientBlockstore) dtypes.ClientRetrievalStoreManager {
return retrievalstoremgr.NewBlockstoreRetrievalStoreManager(bs)
}

View File

@ -7,16 +7,13 @@ import (
exchange "github.com/ipfs/go-ipfs-exchange-interface" exchange "github.com/ipfs/go-ipfs-exchange-interface"
format "github.com/ipfs/go-ipld-format" format "github.com/ipfs/go-ipld-format"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
"github.com/filecoin-project/go-multistore"
datatransfer "github.com/filecoin-project/go-data-transfer" datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
"github.com/filecoin-project/go-statestore" "github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/node/repo/importmgr" "github.com/filecoin-project/lotus/node/repo/importmgr"
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
) )
// MetadataDS stores metadata. By default it's namespaced under /metadata in // MetadataDS stores metadata. By default it's namespaced under /metadata in
@ -68,13 +65,11 @@ type (
type ChainBitswap exchange.Interface type ChainBitswap exchange.Interface
type ChainBlockService bserv.BlockService type ChainBlockService bserv.BlockService
type ClientMultiDstore *multistore.MultiStore
type ClientImportMgr *importmgr.Mgr type ClientImportMgr *importmgr.Mgr
type ClientBlockstore blockstore.BasicBlockstore type ClientBlockstore blockstore.BasicBlockstore
type ClientDealStore *statestore.StateStore type ClientDealStore *statestore.StateStore
type ClientRequestValidator *requestvalidation.UnifiedRequestValidator type ClientRequestValidator *requestvalidation.UnifiedRequestValidator
type ClientDatastore datastore.Batching type ClientDatastore datastore.Batching
type ClientRetrievalStoreManager retrievalstoremgr.RetrievalStoreManager
type Graphsync graphsync.GraphExchange type Graphsync graphsync.GraphExchange
@ -92,4 +87,3 @@ type ProviderDataTransfer datatransfer.Manager
type StagingDAG format.DAGService type StagingDAG format.DAGService
type StagingBlockstore blockstore.BasicBlockstore type StagingBlockstore blockstore.BasicBlockstore
type StagingGraphsync graphsync.GraphExchange type StagingGraphsync graphsync.GraphExchange
type StagingMultiDstore *multistore.MultiStore

View File

@ -11,6 +11,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/filecoin-project/go-fil-markets/shared_testutil"
"github.com/filecoin-project/lotus/markets/pricing" "github.com/filecoin-project/lotus/markets/pricing"
"go.uber.org/fx" "go.uber.org/fx"
"go.uber.org/multierr" "go.uber.org/multierr"
@ -44,7 +45,6 @@ import (
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-paramfetch" "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statestore" "github.com/filecoin-project/go-statestore"
@ -376,27 +376,6 @@ func NewProviderPieceStore(lc fx.Lifecycle, ds dtypes.MetadataDS) (dtypes.Provid
return ps, nil return ps, nil
} }
func StagingMultiDatastore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.StagingMultiDstore, error) {
ctx := helpers.LifecycleCtx(mctx, lc)
ds, err := r.Datastore(ctx, "/staging")
if err != nil {
return nil, xerrors.Errorf("getting datastore out of reop: %w", err)
}
mds, err := multistore.NewMultiDstore(ds)
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return mds.Close()
},
})
return mds, nil
}
// StagingBlockstore creates a blockstore for staging blocks for a miner // StagingBlockstore creates a blockstore for staging blocks for a miner
// in a storage deal, prior to sealing // in a storage deal, prior to sealing
func StagingBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.StagingBlockstore, error) { func StagingBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.StagingBlockstore, error) {
@ -589,15 +568,20 @@ func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.Conside
} }
} }
func DAGStore(r repo.LockedRepo) (shared_testutil.DagStore, error) {
md := shared_testutil.NewMockDagStore(r.Path())
return md, nil
}
func StorageProvider(minerAddress dtypes.MinerAddress, func StorageProvider(minerAddress dtypes.MinerAddress,
storedAsk *storedask.StoredAsk, storedAsk *storedask.StoredAsk,
h host.Host, ds dtypes.MetadataDS, h host.Host, ds dtypes.MetadataDS,
mds dtypes.StagingMultiDstore,
r repo.LockedRepo, r repo.LockedRepo,
pieceStore dtypes.ProviderPieceStore, pieceStore dtypes.ProviderPieceStore,
dataTransfer dtypes.ProviderDataTransfer, dataTransfer dtypes.ProviderDataTransfer,
spn storagemarket.StorageProviderNode, spn storagemarket.StorageProviderNode,
df dtypes.StorageDealFilter, df dtypes.StorageDealFilter,
dagStore shared_testutil.DagStore,
) (storagemarket.StorageProvider, error) { ) (storagemarket.StorageProvider, error) {
net := smnet.NewFromLibp2pHost(h) net := smnet.NewFromLibp2pHost(h)
store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path())) store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path()))
@ -607,7 +591,8 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
opt := storageimpl.CustomDealDecisionLogic(storageimpl.DealDeciderFunc(df)) opt := storageimpl.CustomDealDecisionLogic(storageimpl.DealDeciderFunc(df))
return storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, pieceStore, dataTransfer, spn, address.Address(minerAddress), storedAsk, opt) return storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, dagStore, pieceStore,
dataTransfer, spn, address.Address(minerAddress), storedAsk, opt)
} }
func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc, func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc,
@ -668,14 +653,13 @@ func RetrievalProvider(
netwk rmnet.RetrievalMarketNetwork, netwk rmnet.RetrievalMarketNetwork,
ds dtypes.MetadataDS, ds dtypes.MetadataDS,
pieceStore dtypes.ProviderPieceStore, pieceStore dtypes.ProviderPieceStore,
mds dtypes.StagingMultiDstore,
dt dtypes.ProviderDataTransfer, dt dtypes.ProviderDataTransfer,
pieceProvider sectorstorage.PieceProvider,
pricingFnc dtypes.RetrievalPricingFunc, pricingFnc dtypes.RetrievalPricingFunc,
userFilter dtypes.RetrievalDealFilter, userFilter dtypes.RetrievalDealFilter,
dagStore shared_testutil.DagStore,
) (retrievalmarket.RetrievalProvider, error) { ) (retrievalmarket.RetrievalProvider, error) {
opt := retrievalimpl.DealDeciderOpt(retrievalimpl.DealDecider(userFilter)) opt := retrievalimpl.DealDeciderOpt(retrievalimpl.DealDecider(userFilter))
return retrievalimpl.NewProvider(address.Address(maddr), adapter, netwk, pieceStore, mds, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), return retrievalimpl.NewProvider(address.Address(maddr), adapter, netwk, pieceStore, dagStore, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")),
retrievalimpl.RetrievalPricingFunc(pricingFnc), opt) retrievalimpl.RetrievalPricingFunc(pricingFnc), opt)
} }

View File

@ -6,21 +6,18 @@ import (
"io/ioutil" "io/ioutil"
"strconv" "strconv"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/ipfs/go-datastore/query" "github.com/ipfs/go-datastore/query"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/lotus/blockstore"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/namespace"
) )
type Mgr struct { type Mgr struct {
mds *multistore.MultiStore
ds datastore.Batching ds datastore.Batching
repoPath string repoPath string
counter *shared.TimeCounter
Blockstore blockstore.BasicBlockstore
} }
type Label string type Label string
@ -32,13 +29,11 @@ const (
LCARv2FilePath = "CARv2Path" // path of the CARv2 file. LCARv2FilePath = "CARv2Path" // path of the CARv2 file.
) )
func New(mds *multistore.MultiStore, ds datastore.Batching, repoPath string) *Mgr { func New(ds datastore.Batching, repoPath string) *Mgr {
return &Mgr{ return &Mgr{
mds: mds, repoPath: repoPath,
Blockstore: blockstore.Adapt(mds.MultiReadBlockstore()), ds: datastore.NewLogDatastore(namespace.Wrap(ds, datastore.NewKey("/stores")), "storess"),
repoPath: repoPath, counter: shared.NewTimeCounter(),
ds: datastore.NewLogDatastore(namespace.Wrap(ds, datastore.NewKey("/stores")), "storess"),
} }
} }
@ -46,25 +41,21 @@ type StoreMeta struct {
Labels map[string]string Labels map[string]string
} }
func (m *Mgr) NewStore() (multistore.StoreID, *multistore.Store, error) { func (m *Mgr) NewStore() (uint64, error) {
id := m.mds.Next() id := m.counter.Next()
st, err := m.mds.Get(id)
if err != nil {
return 0, nil, err
}
meta, err := json.Marshal(&StoreMeta{Labels: map[string]string{ meta, err := json.Marshal(&StoreMeta{Labels: map[string]string{
"source": "unknown", "source": "unknown",
}}) }})
if err != nil { if err != nil {
return 0, nil, xerrors.Errorf("marshaling empty store metadata: %w", err) return 0, xerrors.Errorf("marshaling empty store metadata: %w", err)
} }
err = m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta) err = m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta)
return id, st, err return id, err
} }
func (m *Mgr) AddLabel(id multistore.StoreID, key, value string) error { // source, file path, data CID.. func (m *Mgr) AddLabel(id uint64, key, value string) error { // source, file path, data CID..
meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id))) meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id)))
if err != nil { if err != nil {
return xerrors.Errorf("getting metadata form datastore: %w", err) return xerrors.Errorf("getting metadata form datastore: %w", err)
@ -85,8 +76,8 @@ func (m *Mgr) AddLabel(id multistore.StoreID, key, value string) error { // sour
return m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta) return m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta)
} }
func (m *Mgr) List() ([]multistore.StoreID, error) { func (m *Mgr) List() ([]uint64, error) {
var keys []multistore.StoreID var keys []uint64
qres, err := m.ds.Query(query.Query{KeysOnly: true}) qres, err := m.ds.Query(query.Query{KeysOnly: true})
if err != nil { if err != nil {
@ -104,13 +95,13 @@ func (m *Mgr) List() ([]multistore.StoreID, error) {
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to parse key %s to uint64, err=%w", r.Key, err) return nil, xerrors.Errorf("failed to parse key %s to uint64, err=%w", r.Key, err)
} }
keys = append(keys, multistore.StoreID(id)) keys = append(keys, id)
} }
return keys, nil return keys, nil
} }
func (m *Mgr) Info(id multistore.StoreID) (*StoreMeta, error) { func (m *Mgr) Info(id uint64) (*StoreMeta, error) {
meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id))) meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id)))
if err != nil { if err != nil {
return nil, xerrors.Errorf("getting metadata form datastore: %w", err) return nil, xerrors.Errorf("getting metadata form datastore: %w", err)
@ -124,11 +115,7 @@ func (m *Mgr) Info(id multistore.StoreID) (*StoreMeta, error) {
return &sm, nil return &sm, nil
} }
func (m *Mgr) Remove(id multistore.StoreID) error { func (m *Mgr) Remove(id uint64) error {
if err := m.mds.Delete(id); err != nil {
return xerrors.Errorf("removing import: %w", err)
}
if err := m.ds.Delete(datastore.NewKey(fmt.Sprintf("%d", id))); err != nil { if err := m.ds.Delete(datastore.NewKey(fmt.Sprintf("%d", id))); err != nil {
return xerrors.Errorf("removing import metadata: %w", err) return xerrors.Errorf("removing import metadata: %w", err)
} }
@ -136,8 +123,8 @@ func (m *Mgr) Remove(id multistore.StoreID) error {
return nil return nil
} }
func (a *Mgr) NewTempFile(id multistore.StoreID) (string, error) { func (m *Mgr) NewTempFile(id uint64) (string, error) {
file, err := ioutil.TempFile(a.repoPath, fmt.Sprintf("%d", id)) file, err := ioutil.TempFile(m.repoPath, fmt.Sprintf("%d", id))
if err != nil { if err != nil {
return "", xerrors.Errorf("failed to create temp file: %w", err) return "", xerrors.Errorf("failed to create temp file: %w", err)
} }

View File

@ -1,110 +0,0 @@
package retrievalstoremgr
import (
"errors"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/node/repo/importmgr"
"github.com/ipfs/go-blockservice"
offline "github.com/ipfs/go-ipfs-exchange-offline"
ipldformat "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
)
// RetrievalStore references a store for a retrieval deal
// which may or may not have a multistore ID associated with it
type RetrievalStore interface {
StoreID() *multistore.StoreID
DAGService() ipldformat.DAGService
}
// RetrievalStoreManager manages stores for retrieval deals, abstracting
// the underlying storage mechanism
type RetrievalStoreManager interface {
NewStore() (RetrievalStore, error)
ReleaseStore(RetrievalStore) error
}
// MultiStoreRetrievalStoreManager manages stores on top of the import manager
type MultiStoreRetrievalStoreManager struct {
imgr *importmgr.Mgr
}
var _ RetrievalStoreManager = &MultiStoreRetrievalStoreManager{}
// NewMultiStoreRetrievalStoreManager returns a new multstore based RetrievalStoreManager
func NewMultiStoreRetrievalStoreManager(imgr *importmgr.Mgr) RetrievalStoreManager {
return &MultiStoreRetrievalStoreManager{
imgr: imgr,
}
}
// NewStore creates a new store (uses multistore)
func (mrsm *MultiStoreRetrievalStoreManager) NewStore() (RetrievalStore, error) {
storeID, store, err := mrsm.imgr.NewStore()
if err != nil {
return nil, err
}
return &multiStoreRetrievalStore{storeID, store}, nil
}
// ReleaseStore releases a store (uses multistore remove)
func (mrsm *MultiStoreRetrievalStoreManager) ReleaseStore(retrievalStore RetrievalStore) error {
mrs, ok := retrievalStore.(*multiStoreRetrievalStore)
if !ok {
return errors.New("Cannot release this store type")
}
return mrsm.imgr.Remove(mrs.storeID)
}
type multiStoreRetrievalStore struct {
storeID multistore.StoreID
store *multistore.Store
}
func (mrs *multiStoreRetrievalStore) StoreID() *multistore.StoreID {
return &mrs.storeID
}
func (mrs *multiStoreRetrievalStore) DAGService() ipldformat.DAGService {
return mrs.store.DAG
}
// BlockstoreRetrievalStoreManager manages a single blockstore as if it were multiple stores
type BlockstoreRetrievalStoreManager struct {
bs blockstore.BasicBlockstore
}
var _ RetrievalStoreManager = &BlockstoreRetrievalStoreManager{}
// NewBlockstoreRetrievalStoreManager returns a new blockstore based RetrievalStoreManager
func NewBlockstoreRetrievalStoreManager(bs blockstore.BasicBlockstore) RetrievalStoreManager {
return &BlockstoreRetrievalStoreManager{
bs: bs,
}
}
// NewStore creates a new store (just uses underlying blockstore)
func (brsm *BlockstoreRetrievalStoreManager) NewStore() (RetrievalStore, error) {
return &blockstoreRetrievalStore{
dagService: merkledag.NewDAGService(blockservice.New(brsm.bs, offline.Exchange(brsm.bs))),
}, nil
}
// ReleaseStore for this implementation does nothing
func (brsm *BlockstoreRetrievalStoreManager) ReleaseStore(RetrievalStore) error {
return nil
}
type blockstoreRetrievalStore struct {
dagService ipldformat.DAGService
}
func (brs *blockstoreRetrievalStore) StoreID() *multistore.StoreID {
return nil
}
func (brs *blockstoreRetrievalStore) DAGService() ipldformat.DAGService {
return brs.dagService
}

View File

@ -1,137 +0,0 @@
package retrievalstoremgr_test
import (
"context"
"math/rand"
"testing"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
dss "github.com/ipfs/go-datastore/sync"
format "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/node/repo/importmgr"
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
)
func TestMultistoreRetrievalStoreManager(t *testing.T) {
ctx := context.Background()
ds := dss.MutexWrap(datastore.NewMapDatastore())
multiDS, err := multistore.NewMultiDstore(ds)
require.NoError(t, err)
imgr := importmgr.New(multiDS, ds)
retrievalStoreMgr := retrievalstoremgr.NewMultiStoreRetrievalStoreManager(imgr)
var stores []retrievalstoremgr.RetrievalStore
for i := 0; i < 5; i++ {
store, err := retrievalStoreMgr.NewStore()
require.NoError(t, err)
stores = append(stores, store)
nds := generateNodesOfSize(5, 100)
err = store.DAGService().AddMany(ctx, nds)
require.NoError(t, err)
}
t.Run("creates all keys", func(t *testing.T) {
qres, err := ds.Query(query.Query{KeysOnly: true})
require.NoError(t, err)
all, err := qres.Rest()
require.NoError(t, err)
require.Len(t, all, 31)
})
t.Run("loads DAG services", func(t *testing.T) {
for _, store := range stores {
mstore, err := multiDS.Get(*store.StoreID())
require.NoError(t, err)
require.Equal(t, mstore.DAG, store.DAGService())
}
})
t.Run("delete stores", func(t *testing.T) {
err := retrievalStoreMgr.ReleaseStore(stores[4])
require.NoError(t, err)
storeIndexes := multiDS.List()
require.Len(t, storeIndexes, 4)
qres, err := ds.Query(query.Query{KeysOnly: true})
require.NoError(t, err)
all, err := qres.Rest()
require.NoError(t, err)
require.Len(t, all, 25)
})
}
func TestBlockstoreRetrievalStoreManager(t *testing.T) {
ctx := context.Background()
ds := dss.MutexWrap(datastore.NewMapDatastore())
bs := blockstore.FromDatastore(ds)
retrievalStoreMgr := retrievalstoremgr.NewBlockstoreRetrievalStoreManager(bs)
var stores []retrievalstoremgr.RetrievalStore
var cids []cid.Cid
for i := 0; i < 5; i++ {
store, err := retrievalStoreMgr.NewStore()
require.NoError(t, err)
stores = append(stores, store)
nds := generateNodesOfSize(5, 100)
err = store.DAGService().AddMany(ctx, nds)
require.NoError(t, err)
for _, nd := range nds {
cids = append(cids, nd.Cid())
}
}
t.Run("creates all keys", func(t *testing.T) {
qres, err := ds.Query(query.Query{KeysOnly: true})
require.NoError(t, err)
all, err := qres.Rest()
require.NoError(t, err)
require.Len(t, all, 25)
})
t.Run("loads DAG services, all DAG has all nodes", func(t *testing.T) {
for _, store := range stores {
dagService := store.DAGService()
for _, cid := range cids {
_, err := dagService.Get(ctx, cid)
require.NoError(t, err)
}
}
})
t.Run("release store has no effect", func(t *testing.T) {
err := retrievalStoreMgr.ReleaseStore(stores[4])
require.NoError(t, err)
qres, err := ds.Query(query.Query{KeysOnly: true})
require.NoError(t, err)
all, err := qres.Rest()
require.NoError(t, err)
require.Len(t, all, 25)
})
}
var seedSeq int64 = 0
func randomBytes(n int64) []byte {
randBytes := make([]byte, n)
r := rand.New(rand.NewSource(seedSeq))
_, _ = r.Read(randBytes)
seedSeq++
return randBytes
}
func generateNodesOfSize(n int, size int64) []format.Node {
generatedNodes := make([]format.Node, 0, n)
for i := 0; i < n; i++ {
b := dag.NewRawNode(randomBytes(size))
generatedNodes = append(generatedNodes, b)
}
return generatedNodes
}