refactor: move lotus mount, dag store etc from markets to lotus

This commit is contained in:
Dirk McCormick 2021-07-19 15:51:46 +02:00
parent 905592a3dc
commit b6a7a8c987
11 changed files with 756 additions and 13 deletions

4
go.mod
View File

@ -26,6 +26,7 @@ require (
github.com/elastic/gosigar v0.12.0
github.com/etclabscore/go-openrpc-reflect v0.0.36
github.com/fatih/color v1.9.0
github.com/filecoin-project/dagstore v0.1.0
github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f
github.com/filecoin-project/go-address v0.0.5
github.com/filecoin-project/go-bitfield v0.2.4
@ -34,7 +35,7 @@ require (
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v1.7.0
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719084150-3111d5504a9e
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719131749-0459d0c576bd
github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
@ -78,6 +79,7 @@ require (
github.com/ipfs/go-fs-lock v0.0.6
github.com/ipfs/go-graphsync v0.6.4
github.com/ipfs/go-ipfs-blockstore v1.0.3
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-ds-help v1.0.0
github.com/ipfs/go-ipfs-exchange-interface v0.0.1

4
go.sum
View File

@ -286,8 +286,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a h1:hyJ+pUm/4U4RdEZBlg6k8Ma4rDiuvqyGpoICXAxwsTg=
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719084150-3111d5504a9e h1:mJHQp7htPo04N1IQjnYneUoif1FcsN43KuHvMbeNF7E=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719084150-3111d5504a9e/go.mod h1:rfdpy6u0CdbknZNxRb5+7t7+yaPAAk4xLLhPaQICr0c=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719131749-0459d0c576bd h1:5Gg9NyMV/5FauQu497je92yPbu8o2kbnb14eI7wKvBg=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719131749-0459d0c576bd/go.mod h1:21Kl9Ml8XIueT5o1UIqjk9XX88UKkRqSSh+VmEqT7To=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=

View File

@ -0,0 +1,174 @@
package dagstore
import (
"context"
"io"
"sync"
"time"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
bstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
"github.com/filecoin-project/go-fil-markets/carstore"
"github.com/filecoin-project/go-fil-markets/shared"
)
var log = logging.Logger("dagstore-wrapper")
var gcInterval = 5 * time.Minute
// MarketDAGStoreConfig is the config the market needs to then construct a DAG Store.
type MarketDAGStoreConfig struct {
TransientsDir string
IndexDir string
Datastore ds.Datastore
}
type closableBlockstore struct {
bstore.Blockstore
io.Closer
}
type dagStoreWrapper struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
dagStore *dagstore.DAGStore
mountApi LotusMountAPI
}
var _ shared.DagStoreWrapper = (*dagStoreWrapper)(nil)
func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusMountAPI) (*dagStoreWrapper, error) {
// construct the DAG Store.
registry := mount.NewRegistry()
if err := registry.Register(lotusScheme, NewLotusMountTemplate(mountApi)); err != nil {
return nil, xerrors.Errorf("failed to create registry: %w", err)
}
failureCh := make(chan dagstore.ShardResult, 1)
dcfg := dagstore.Config{
TransientsDir: cfg.TransientsDir,
IndexDir: cfg.IndexDir,
Datastore: cfg.Datastore,
MountRegistry: registry,
FailureCh: failureCh,
}
dagStore, err := dagstore.NewDAGStore(dcfg)
if err != nil {
return nil, xerrors.Errorf("failed to create DAG store: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
dw := &dagStoreWrapper{
ctx: ctx,
cancel: cancel,
dagStore: dagStore,
mountApi: mountApi,
}
dw.wg.Add(1)
// the dagstore will write Shard failures to the `failureCh` here. Run a go-routine to handle them.
go dw.handleFailures(failureCh)
return dw, nil
}
func (ds *dagStoreWrapper) handleFailures(failureCh chan dagstore.ShardResult) {
defer ds.wg.Done()
ticker := time.NewTicker(gcInterval)
defer ticker.Stop()
select {
case <-ticker.C:
_, _ = ds.dagStore.GC(ds.ctx)
case f := <-failureCh:
log.Errorw("shard failed", "shard-key", f.Key.String(), "error", f.Error)
if err := ds.dagStore.RecoverShard(ds.ctx, f.Key, nil, dagstore.RecoverOpts{}); err != nil {
log.Warnw("shard recovery failed", "shard-key", f.Key.String(), "error", err)
}
case <-ds.ctx.Done():
return
}
}
func (ds *dagStoreWrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error) {
key := shard.KeyFromCID(pieceCid)
resch := make(chan dagstore.ShardResult, 1)
err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{})
if err != nil {
if xerrors.Unwrap(err) != dagstore.ErrShardUnknown {
return nil, xerrors.Errorf("failed to schedule acquire shard for piece CID %s: %w", pieceCid, err)
}
// if the DAGStore does not know about the Shard -> register it and then try to acquire it again.
log.Warnw("failed to load shard as shard is not registered, will re-register", "pieceCID", pieceCid)
if err := shared.RegisterShardSync(ctx, ds, pieceCid, "", false); err != nil {
return nil, xerrors.Errorf("failed to re-register shard during loading piece CID %s: %w", pieceCid, err)
}
log.Warnw("successfully re-registered shard", "pieceCID", pieceCid)
resch = make(chan dagstore.ShardResult, 1)
if err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}); err != nil {
return nil, xerrors.Errorf("failed to acquire Shard for piece CID %s after re-registering: %w", pieceCid, err)
}
}
// TODO: Can I rely on AcquireShard to return an error if the context times out?
var res dagstore.ShardResult
select {
case <-ctx.Done():
return nil, ctx.Err()
case res = <-resch:
if res.Error != nil {
return nil, xerrors.Errorf("failed to acquire shard for piece CID %s: %w", pieceCid, res.Error)
}
}
bs, err := res.Accessor.Blockstore()
if err != nil {
return nil, err
}
return &closableBlockstore{Blockstore: NewReadOnlyBlockstore(bs), Closer: res.Accessor}, nil
}
func (ds *dagStoreWrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string, eagerInit bool, resch chan dagstore.ShardResult) error {
// Create a lotus mount with the piece CID
key := shard.KeyFromCID(pieceCid)
mt, err := NewLotusMount(pieceCid, ds.mountApi)
if err != nil {
return xerrors.Errorf("failed to create lotus mount for piece CID %s: %w", pieceCid, err)
}
// Register the shard
opts := dagstore.RegisterOpts{
ExistingTransient: carPath,
LazyInitialization: !eagerInit,
}
err = ds.dagStore.RegisterShard(ctx, key, mt, resch, opts)
if err != nil {
return xerrors.Errorf("failed to schedule register shard for piece CID %s: %w", pieceCid, err)
}
return nil
}
func (ds *dagStoreWrapper) Close() error {
if err := ds.dagStore.Close(); err != nil {
return err
}
ds.cancel()
ds.wg.Wait()
return nil
}

View File

@ -0,0 +1,67 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: mount_api.go
// Package mock_dagstore is a generated GoMock package.
package mock_dagstore
import (
context "context"
io "io"
reflect "reflect"
gomock "github.com/golang/mock/gomock"
cid "github.com/ipfs/go-cid"
)
// MockLotusMountAPI is a mock of LotusMountAPI interface.
type MockLotusMountAPI struct {
ctrl *gomock.Controller
recorder *MockLotusMountAPIMockRecorder
}
// MockLotusMountAPIMockRecorder is the mock recorder for MockLotusMountAPI.
type MockLotusMountAPIMockRecorder struct {
mock *MockLotusMountAPI
}
// NewMockLotusMountAPI creates a new mock instance.
func NewMockLotusMountAPI(ctrl *gomock.Controller) *MockLotusMountAPI {
mock := &MockLotusMountAPI{ctrl: ctrl}
mock.recorder = &MockLotusMountAPIMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockLotusMountAPI) EXPECT() *MockLotusMountAPIMockRecorder {
return m.recorder
}
// FetchUnsealedPiece mocks base method.
func (m *MockLotusMountAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchUnsealedPiece", ctx, pieceCid)
ret0, _ := ret[0].(io.ReadCloser)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FetchUnsealedPiece indicates an expected call of FetchUnsealedPiece.
func (mr *MockLotusMountAPIMockRecorder) FetchUnsealedPiece(ctx, pieceCid interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchUnsealedPiece", reflect.TypeOf((*MockLotusMountAPI)(nil).FetchUnsealedPiece), ctx, pieceCid)
}
// GetUnpaddedCARSize mocks base method.
func (m *MockLotusMountAPI) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetUnpaddedCARSize", pieceCid)
ret0, _ := ret[0].(uint64)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetUnpaddedCARSize indicates an expected call of GetUnpaddedCARSize.
func (mr *MockLotusMountAPIMockRecorder) GetUnpaddedCARSize(pieceCid interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUnpaddedCARSize", reflect.TypeOf((*MockLotusMountAPI)(nil).GetUnpaddedCARSize), pieceCid)
}

114
markets/dagstore/mount.go Normal file
View File

@ -0,0 +1,114 @@
package dagstore
import (
"context"
"fmt"
"io"
"net/url"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore/mount"
)
const lotusScheme = "lotus"
const mountURLTemplate = "%s://%s"
var _ mount.Mount = (*LotusMount)(nil)
// LotusMount is the Lotus implementation of a Sharded DAG Store Mount.
// A Filecoin Piece is treated as a Shard by this implementation.
type LotusMount struct {
api LotusMountAPI
pieceCid cid.Cid
}
// This method is called when registering a mount with the DAG store registry.
// The DAG store registry receives an instance of the mount (a "template").
// When the registry needs to deserialize a mount it clones the template then
// calls Deserialize on the cloned instance, which will have a reference to the
// lotus mount API supplied here.
func NewLotusMountTemplate(api LotusMountAPI) *LotusMount {
return &LotusMount{api: api}
}
func NewLotusMount(pieceCid cid.Cid, api LotusMountAPI) (*LotusMount, error) {
return &LotusMount{
pieceCid: pieceCid,
api: api,
}, nil
}
func (l *LotusMount) Serialize() *url.URL {
u := fmt.Sprintf(mountURLTemplate, lotusScheme, l.pieceCid.String())
url, err := url.Parse(u)
if err != nil {
// Should never happen
panic(xerrors.Errorf("failed to parse mount URL '%s': %w", u, err))
}
return url
}
func (l *LotusMount) Deserialize(u *url.URL) error {
if u.Scheme != lotusScheme {
return xerrors.Errorf("scheme '%s' for URL '%s' does not match required scheme '%s'", u.Scheme, u, lotusScheme)
}
pieceCid, err := cid.Decode(u.Host)
if err != nil {
return xerrors.Errorf("failed to parse PieceCid from host '%s': %w", u.Host, err)
}
l.pieceCid = pieceCid
return nil
}
func (l *LotusMount) Fetch(ctx context.Context) (mount.Reader, error) {
r, err := l.api.FetchUnsealedPiece(ctx, l.pieceCid)
if err != nil {
return nil, xerrors.Errorf("failed to fetch unsealed piece %s: %w", l.pieceCid, err)
}
return &readCloser{r}, nil
}
func (l *LotusMount) Info() mount.Info {
return mount.Info{
Kind: mount.KindRemote,
AccessSequential: true,
AccessSeek: false,
AccessRandom: false,
}
}
func (l *LotusMount) Close() error {
return nil
}
func (l *LotusMount) Stat(_ context.Context) (mount.Stat, error) {
size, err := l.api.GetUnpaddedCARSize(l.pieceCid)
if err != nil {
return mount.Stat{}, xerrors.Errorf("failed to fetch piece size for piece %s: %w", l.pieceCid, err)
}
// TODO Mark false when storage deal expires.
return mount.Stat{
Exists: true,
Size: int64(size),
}, nil
}
type readCloser struct {
io.ReadCloser
}
var _ mount.Reader = (*readCloser)(nil)
func (r *readCloser) ReadAt(p []byte, off int64) (n int, err error) {
return 0, xerrors.Errorf("ReadAt called but not implemented")
}
func (r *readCloser) Seek(offset int64, whence int) (int64, error) {
return 0, xerrors.Errorf("Seek called but not implemented")
}

View File

@ -0,0 +1,83 @@
package dagstore
import (
"context"
"io"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
)
type LotusMountAPI interface {
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error)
}
type lotusMountApiImpl struct {
pieceStore piecestore.PieceStore
rm retrievalmarket.RetrievalProviderNode
}
var _ LotusMountAPI = (*lotusMountApiImpl)(nil)
func NewLotusMountAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) *lotusMountApiImpl {
return &lotusMountApiImpl{
pieceStore: store,
rm: rm,
}
}
func (m *lotusMountApiImpl) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid)
if err != nil {
return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
}
if len(pieceInfo.Deals) <= 0 {
return nil, xerrors.Errorf("no storage deals found for Piece %s", pieceCid)
}
// prefer an unsealed sector containing the piece if one exists
for _, deal := range pieceInfo.Deals {
isUnsealed, err := m.rm.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
if err != nil {
continue
}
if isUnsealed {
// UnsealSector will NOT unseal a sector if we already have an unsealed copy lying around.
reader, err := m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
if err == nil {
return reader, nil
}
}
}
lastErr := xerrors.New("no sectors found to unseal from")
// if there is no unsealed sector containing the piece, just read the piece from the first sector we are able to unseal.
for _, deal := range pieceInfo.Deals {
reader, err := m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
if err == nil {
return reader, nil
}
lastErr = err
}
return nil, lastErr
}
func (m *lotusMountApiImpl) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) {
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid)
if err != nil {
return 0, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
}
if len(pieceInfo.Deals) == 0 {
return 0, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
}
len := pieceInfo.Deals[0].Length
return uint64(len), nil
}

View File

@ -0,0 +1,166 @@
package dagstore
import (
"bytes"
"context"
"io"
"testing"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
"github.com/filecoin-project/go-fil-markets/piecestore"
piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared"
)
const unsealedSectorID = abi.SectorNumber(1)
const sealedSectorID = abi.SectorNumber(2)
func TestLotusMountApiFetchUnsealedPiece(t *testing.T) {
ctx := context.Background()
cid1, err := cid.Parse("bafkqaaa")
require.NoError(t, err)
unsealedSectorData := "unsealed"
sealedSectorData := "sealed"
mockData := map[abi.SectorNumber]string{
unsealedSectorID: unsealedSectorData,
sealedSectorID: sealedSectorData,
}
testCases := []struct {
name string
deals []abi.SectorNumber
fetchedData string
expectErr bool
}{{
// Expect error if there is no deal info for piece CID
name: "no deals",
expectErr: true,
}, {
// Expect the API to always fetch the unsealed deal (because it's
// cheaper than fetching the sealed deal)
name: "prefer unsealed deal",
deals: []abi.SectorNumber{unsealedSectorID, sealedSectorID},
fetchedData: unsealedSectorData,
}, {
// Expect the API to unseal the data if there are no unsealed deals
name: "unseal if necessary",
deals: []abi.SectorNumber{sealedSectorID},
fetchedData: sealedSectorData,
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ps := getPieceStore(t)
rpn := &mockRPN{
sectors: mockData,
}
api := NewLotusMountAPI(ps, rpn)
// Add deals to piece store
for _, sectorID := range tc.deals {
dealInfo := piecestore.DealInfo{
SectorID: sectorID,
}
err = ps.AddDealForPiece(cid1, dealInfo)
require.NoError(t, err)
}
// Fetch the piece
r, err := api.FetchUnsealedPiece(ctx, cid1)
if tc.expectErr {
require.Error(t, err)
return
}
// Check that the returned reader is for the correct piece
require.NoError(t, err)
bz, err := io.ReadAll(r)
require.NoError(t, err)
require.Equal(t, tc.fetchedData, string(bz))
})
}
}
func TestLotusMountApiGetUnpaddedCARSize(t *testing.T) {
cid1, err := cid.Parse("bafkqaaa")
require.NoError(t, err)
ps := getPieceStore(t)
rpn := &mockRPN{}
api := NewLotusMountAPI(ps, rpn)
// Add a deal with data Length 10
dealInfo := piecestore.DealInfo{
Length: 10,
}
err = ps.AddDealForPiece(cid1, dealInfo)
require.NoError(t, err)
// Check that the data length is correct
len, err := api.GetUnpaddedCARSize(cid1)
require.NoError(t, err)
require.EqualValues(t, 10, len)
}
func getPieceStore(t *testing.T) piecestore.PieceStore {
ps, err := piecestoreimpl.NewPieceStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
require.NoError(t, err)
err = ps.Start(context.Background())
require.NoError(t, err)
ready := make(chan error)
ps.OnReady(func(err error) {
ready <- err
})
err = <-ready
require.NoError(t, err)
return ps
}
type mockRPN struct {
sectors map[abi.SectorNumber]string
}
func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
data, ok := m.sectors[sectorID]
if !ok {
panic("sector not found")
}
return io.NopCloser(bytes.NewBuffer([]byte(data))), nil
}
func (m *mockRPN) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) {
return sectorID == unsealedSectorID, nil
}
func (m *mockRPN) GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) {
panic("implement me")
}
func (m *mockRPN) GetMinerWorkerAddress(ctx context.Context, miner address.Address, tok shared.TipSetToken) (address.Address, error) {
panic("implement me")
}
func (m *mockRPN) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *paych.SignedVoucher, proof []byte, expectedAmount abi.TokenAmount, tok shared.TipSetToken) (abi.TokenAmount, error) {
panic("implement me")
}
func (m *mockRPN) GetRetrievalPricingInput(ctx context.Context, pieceCID cid.Cid, storageDeals []abi.DealID) (retrievalmarket.PricingInput, error) {
panic("implement me")
}
var _ retrievalmarket.RetrievalProviderNode = (*mockRPN)(nil)

View File

@ -0,0 +1,103 @@
package dagstore
import (
"context"
"fmt"
"io/ioutil"
"net/url"
"strings"
"testing"
"github.com/golang/mock/gomock"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/dagstore/mount"
mock_dagstore "github.com/filecoin-project/lotus/markets/dagstore/mocks"
)
func TestLotusMount(t *testing.T) {
ctx := context.Background()
bgen := blocksutil.NewBlockGenerator()
cid := bgen.Next().Cid()
mockCtrl := gomock.NewController(t)
// when test is done, assert expectations on all mock objects.
defer mockCtrl.Finish()
// create a mock lotus api that returns the reader we want
mockLotusMountAPI := mock_dagstore.NewMockLotusMountAPI(mockCtrl)
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1)
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1)
mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(cid).Return(uint64(100), nil).Times(1)
mnt, err := NewLotusMount(cid, mockLotusMountAPI)
require.NoError(t, err)
info := mnt.Info()
require.Equal(t, info.Kind, mount.KindRemote)
// fetch and assert success
rd, err := mnt.Fetch(context.Background())
require.NoError(t, err)
bz, err := ioutil.ReadAll(rd)
require.NoError(t, err)
require.NoError(t, rd.Close())
require.Equal(t, []byte("testing"), bz)
stat, err := mnt.Stat(ctx)
require.NoError(t, err)
require.EqualValues(t, 100, stat.Size)
// serialize url then deserialize from mount template -> should get back
// the same mount
url := mnt.Serialize()
mnt2 := NewLotusMountTemplate(mockLotusMountAPI)
err = mnt2.Deserialize(url)
require.NoError(t, err)
// fetching on this mount should get us back the same data.
rd, err = mnt2.Fetch(context.Background())
require.NoError(t, err)
bz, err = ioutil.ReadAll(rd)
require.NoError(t, err)
require.NoError(t, rd.Close())
require.Equal(t, []byte("testing"), bz)
}
func TestLotusMountDeserialize(t *testing.T) {
api := &lotusMountApiImpl{}
bgen := blocksutil.NewBlockGenerator()
cid := bgen.Next().Cid()
// success
us := fmt.Sprintf(mountURLTemplate, lotusScheme, cid.String())
u, err := url.Parse(us)
require.NoError(t, err)
mnt := NewLotusMountTemplate(api)
err = mnt.Deserialize(u)
require.NoError(t, err)
require.Equal(t, cid, mnt.pieceCid)
require.Equal(t, api, mnt.api)
// fails if scheme is not Lotus
us = fmt.Sprintf(mountURLTemplate, "http", cid.String())
u, err = url.Parse(us)
require.NoError(t, err)
err = mnt.Deserialize(u)
require.Error(t, err)
require.Contains(t, err.Error(), "does not match")
// fails if cid is not valid
us = fmt.Sprintf(mountURLTemplate, lotusScheme, "rand")
u, err = url.Parse(us)
require.NoError(t, err)
err = mnt.Deserialize(u)
require.Error(t, err)
require.Contains(t, err.Error(), "failed to parse PieceCid")
}

View File

@ -0,0 +1,33 @@
package dagstore
import (
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
bstore "github.com/ipfs/go-ipfs-blockstore"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore"
)
// ReadOnlyBlockstore stubs out Blockstore mutators with methods that error out
type ReadOnlyBlockstore struct {
dagstore.ReadBlockstore
}
func NewReadOnlyBlockstore(rbs dagstore.ReadBlockstore) bstore.Blockstore {
return ReadOnlyBlockstore{ReadBlockstore: rbs}
}
func (r ReadOnlyBlockstore) DeleteBlock(c cid.Cid) error {
return xerrors.Errorf("DeleteBlock called but not implemented")
}
func (r ReadOnlyBlockstore) Put(block blocks.Block) error {
return xerrors.Errorf("Put called but not implemented")
}
func (r ReadOnlyBlockstore) PutMany(blocks []blocks.Block) error {
return xerrors.Errorf("PutMany called but not implemented")
}
var _ bstore.Blockstore = (*ReadOnlyBlockstore)(nil)

View File

@ -7,7 +7,6 @@ import (
"go.uber.org/fx"
"golang.org/x/xerrors"
mktdagstore "github.com/filecoin-project/go-fil-markets/dagstore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/storagemarket"
@ -23,6 +22,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/markets/dagstore"
"github.com/filecoin-project/lotus/markets/dealfilter"
"github.com/filecoin-project/lotus/markets/retrievaladapter"
"github.com/filecoin-project/lotus/markets/storageadapter"
@ -147,7 +147,7 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)),
// DAG Store
Override(new(mktdagstore.DagStoreWrapper), modules.DagStoreWrapper),
Override(new(dagstore.DagStoreWrapper), modules.DagStoreWrapper),
// Markets (retrieval)
Override(new(retrievalmarket.RetrievalProviderNode), retrievaladapter.NewRetrievalProviderNode),

View File

@ -32,7 +32,6 @@ import (
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
dtnet "github.com/filecoin-project/go-data-transfer/network"
dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
mktdagstore "github.com/filecoin-project/go-fil-markets/dagstore"
piecefilestore "github.com/filecoin-project/go-fil-markets/filestore"
piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
@ -66,6 +65,7 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/markets"
"github.com/filecoin-project/lotus/markets/dagstore"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/markets/pricing"
lotusminer "github.com/filecoin-project/lotus/miner"
@ -580,16 +580,16 @@ func DagStoreWrapper(
r repo.LockedRepo,
pieceStore dtypes.ProviderPieceStore,
rpn retrievalmarket.RetrievalProviderNode,
) (mktdagstore.DagStoreWrapper, error) {
) (shared.DagStoreWrapper, error) {
dagStoreDir := filepath.Join(r.Path(), "dagstore")
dagStoreDS := namespace.Wrap(ds, datastore.NewKey("/dagstore/provider"))
cfg := mktdagstore.MarketDAGStoreConfig{
cfg := dagstore.MarketDAGStoreConfig{
TransientsDir: filepath.Join(dagStoreDir, "transients"),
IndexDir: filepath.Join(dagStoreDir, "index"),
Datastore: dagStoreDS,
}
mountApi := mktdagstore.NewLotusMountAPI(pieceStore, rpn)
return mktdagstore.NewDagStoreWrapper(cfg, mountApi)
mountApi := dagstore.NewLotusMountAPI(pieceStore, rpn)
return dagstore.NewDagStoreWrapper(cfg, mountApi)
}
func StorageProvider(minerAddress dtypes.MinerAddress,
@ -600,7 +600,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
dataTransfer dtypes.ProviderDataTransfer,
spn storagemarket.StorageProviderNode,
df dtypes.StorageDealFilter,
dagStore mktdagstore.DagStoreWrapper,
dagStore shared.DagStoreWrapper,
) (storagemarket.StorageProvider, error) {
net := smnet.NewFromLibp2pHost(h)
store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path()))
@ -609,9 +609,10 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
}
opt := storageimpl.CustomDealDecisionLogic(storageimpl.DealDeciderFunc(df))
shardMigrator := storageimpl.NewShardMigrator(address.Address(minerAddress), ds, dagStore, pieceStore, spn)
return storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, dagStore, pieceStore,
dataTransfer, spn, address.Address(minerAddress), storedAsk, opt)
dataTransfer, spn, address.Address(minerAddress), storedAsk, shardMigrator, opt)
}
func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc,
@ -675,7 +676,7 @@ func RetrievalProvider(
dt dtypes.ProviderDataTransfer,
pricingFnc dtypes.RetrievalPricingFunc,
userFilter dtypes.RetrievalDealFilter,
dagStore mktdagstore.DagStoreWrapper,
dagStore shared.DagStoreWrapper,
) (retrievalmarket.RetrievalProvider, error) {
opt := retrievalimpl.DealDeciderOpt(retrievalimpl.DealDecider(userFilter))
return retrievalimpl.NewProvider(