fix lotus accessor startup

This commit is contained in:
aarshkshah1992 2021-07-21 17:34:07 +05:30
parent 0dbb1940f0
commit 4ed9cd152c
5 changed files with 44 additions and 32 deletions

View File

@ -3,8 +3,8 @@ package dagstore
import ( import (
"context" "context"
"io" "io"
"sync"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -15,15 +15,14 @@ import (
type LotusAccessor interface { type LotusAccessor interface {
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error)
Start(ctx context.Context) error
} }
type lotusAccessor struct { type lotusAccessor struct {
pieceStore piecestore.PieceStore pieceStore piecestore.PieceStore
rm retrievalmarket.RetrievalProviderNode rm retrievalmarket.RetrievalProviderNode
startLk sync.Mutex readyMgr *shared.ReadyManager
started bool
startErr error
} }
var _ LotusAccessor = (*lotusAccessor)(nil) var _ LotusAccessor = (*lotusAccessor)(nil)
@ -32,17 +31,11 @@ func NewLotusAccessor(store piecestore.PieceStore, rm retrievalmarket.RetrievalP
return &lotusAccessor{ return &lotusAccessor{
pieceStore: store, pieceStore: store,
rm: rm, rm: rm,
readyMgr: shared.NewReadyManager(),
} }
} }
func (m *lotusAccessor) start(ctx context.Context) error { func (m *lotusAccessor) Start(ctx context.Context) error {
m.startLk.Lock()
defer m.startLk.Unlock()
if m.started {
return m.startErr
}
// Wait for the piece store to startup // Wait for the piece store to startup
ready := make(chan error) ready := make(chan error)
m.pieceStore.OnReady(func(err error) { m.pieceStore.OnReady(func(err error) {
@ -54,13 +47,17 @@ func (m *lotusAccessor) start(ctx context.Context) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return xerrors.Errorf("context cancelled waiting for piece store startup: %w", ctx.Err()) err := xerrors.Errorf("context cancelled waiting for piece store startup: %w", ctx.Err())
case err := <-ready: if err := m.readyMgr.FireReady(err); err != nil {
// Piece store has started up, check if there was an error log.Warnw("failed to pubish ready event", "err", err)
if err != nil {
m.startErr = err
return err
} }
return err
case err := <-ready:
if err := m.readyMgr.FireReady(err); err != nil {
log.Warnw("failed to pubish ready event", "err", err)
}
return err
} }
// Piece store has started up successfully // Piece store has started up successfully
@ -68,7 +65,7 @@ func (m *lotusAccessor) start(ctx context.Context) error {
} }
func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) { func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
err := m.start(ctx) err := m.readyMgr.AwaitReady()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -118,7 +115,7 @@ func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid
} }
func (m *lotusAccessor) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) { func (m *lotusAccessor) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {
err := m.start(ctx) err := m.readyMgr.AwaitReady()
if err != nil { if err != nil {
return 0, err return 0, err
} }

View File

@ -66,6 +66,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
sectors: mockData, sectors: mockData,
} }
api := NewLotusAccessor(ps, rpn) api := NewLotusAccessor(ps, rpn)
require.NoError(t, api.Start(ctx))
// Add deals to piece store // Add deals to piece store
for _, sectorID := range tc.deals { for _, sectorID := range tc.deals {
@ -101,6 +102,7 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) {
ps := getPieceStore(t) ps := getPieceStore(t)
rpn := &mockRPN{} rpn := &mockRPN{}
api := NewLotusAccessor(ps, rpn) api := NewLotusAccessor(ps, rpn)
require.NoError(t, api.Start(ctx))
// Add a deal with data Length 10 // Add a deal with data Length 10
dealInfo := piecestore.DealInfo{ dealInfo := piecestore.DealInfo{
@ -121,14 +123,6 @@ func getPieceStore(t *testing.T) piecestore.PieceStore {
err = ps.Start(context.Background()) err = ps.Start(context.Background())
require.NoError(t, err) require.NoError(t, err)
//
//ready := make(chan error)
//ps.OnReady(func(err error) {
// ready <- err
//})
//err = <-ready
//require.NoError(t, err)
//
return ps return ps
} }

View File

@ -31,6 +31,10 @@ func NewMockLotusAccessor(ctrl *gomock.Controller) *MockLotusAccessor {
return mock return mock
} }
func (mr *MockLotusAccessor) Start(_ context.Context) error {
return nil
}
// EXPECT returns an object that allows the caller to indicate expected use. // EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockLotusAccessor) EXPECT() *MockLotusAccessorMockRecorder { func (m *MockLotusAccessor) EXPECT() *MockLotusAccessorMockRecorder {
return m.recorder return m.recorder

View File

@ -147,6 +147,7 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)), Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)),
// DAG Store // DAG Store
Override(new(dagstore.LotusAccessor), modules.NewLotusAccessor),
Override(new(*dagstore.Wrapper), modules.DagStoreWrapper), Override(new(*dagstore.Wrapper), modules.DagStoreWrapper),
// Markets (retrieval) // Markets (retrieval)

View File

@ -576,12 +576,28 @@ func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.Conside
} }
} }
func NewLotusAccessor(lc fx.Lifecycle,
pieceStore dtypes.ProviderPieceStore,
rpn retrievalmarket.RetrievalProviderNode,
) (dagstore.LotusAccessor, error) {
mountApi := dagstore.NewLotusAccessor(pieceStore, rpn)
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return mountApi.Start(ctx)
},
OnStop: func(context.Context) error {
return nil
},
})
return mountApi, nil
}
func DagStoreWrapper( func DagStoreWrapper(
lc fx.Lifecycle, lc fx.Lifecycle,
ds dtypes.MetadataDS, ds dtypes.MetadataDS,
r repo.LockedRepo, r repo.LockedRepo,
pieceStore dtypes.ProviderPieceStore, lotusAccessor dagstore.LotusAccessor,
rpn retrievalmarket.RetrievalProviderNode,
) (*dagstore.Wrapper, error) { ) (*dagstore.Wrapper, error) {
dagStoreDir := filepath.Join(r.Path(), dagStore) dagStoreDir := filepath.Join(r.Path(), dagStore)
dagStoreDS := namespace.Wrap(ds, datastore.NewKey("/dagstore/provider")) dagStoreDS := namespace.Wrap(ds, datastore.NewKey("/dagstore/provider"))
@ -591,8 +607,8 @@ func DagStoreWrapper(
Datastore: dagStoreDS, Datastore: dagStoreDS,
GCInterval: 5 * time.Minute, GCInterval: 5 * time.Minute,
} }
mountApi := dagstore.NewLotusAccessor(pieceStore, rpn)
dsw, err := dagstore.NewDagStoreWrapper(cfg, mountApi) dsw, err := dagstore.NewDagStoreWrapper(cfg, lotusAccessor)
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to create DAG store wrapper: %w", err) return nil, xerrors.Errorf("failed to create DAG store wrapper: %w", err)
} }