diff --git a/markets/dagstore/lotusaccessor.go b/markets/dagstore/lotusaccessor.go index 9a0f7fa30..f9272d43a 100644 --- a/markets/dagstore/lotusaccessor.go +++ b/markets/dagstore/lotusaccessor.go @@ -3,8 +3,8 @@ package dagstore import ( "context" "io" - "sync" + "github.com/filecoin-project/go-fil-markets/shared" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -15,15 +15,14 @@ import ( type LotusAccessor interface { FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) + Start(ctx context.Context) error } type lotusAccessor struct { pieceStore piecestore.PieceStore rm retrievalmarket.RetrievalProviderNode - startLk sync.Mutex - started bool - startErr error + readyMgr *shared.ReadyManager } var _ LotusAccessor = (*lotusAccessor)(nil) @@ -32,17 +31,11 @@ func NewLotusAccessor(store piecestore.PieceStore, rm retrievalmarket.RetrievalP return &lotusAccessor{ pieceStore: store, rm: rm, + readyMgr: shared.NewReadyManager(), } } -func (m *lotusAccessor) start(ctx context.Context) error { - m.startLk.Lock() - defer m.startLk.Unlock() - - if m.started { - return m.startErr - } - +func (m *lotusAccessor) Start(ctx context.Context) error { // Wait for the piece store to startup ready := make(chan error) m.pieceStore.OnReady(func(err error) { @@ -54,13 +47,17 @@ func (m *lotusAccessor) start(ctx context.Context) error { select { case <-ctx.Done(): - return xerrors.Errorf("context cancelled waiting for piece store startup: %w", ctx.Err()) - case err := <-ready: - // Piece store has started up, check if there was an error - if err != nil { - m.startErr = err - return err + err := xerrors.Errorf("context cancelled waiting for piece store startup: %w", ctx.Err()) + if err := m.readyMgr.FireReady(err); err != nil { + log.Warnw("failed to pubish ready event", "err", err) } + return err + + case err := <-ready: + if err := m.readyMgr.FireReady(err); err != nil { + log.Warnw("failed to pubish ready event", "err", err) + } + return err } // Piece store has started up successfully @@ -68,7 +65,7 @@ func (m *lotusAccessor) start(ctx context.Context) error { } func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) { - err := m.start(ctx) + err := m.readyMgr.AwaitReady() if err != nil { return nil, err } @@ -118,7 +115,7 @@ func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid } func (m *lotusAccessor) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) { - err := m.start(ctx) + err := m.readyMgr.AwaitReady() if err != nil { return 0, err } diff --git a/markets/dagstore/lotusaccessor_test.go b/markets/dagstore/lotusaccessor_test.go index 2c6a10cd9..1836f0ed3 100644 --- a/markets/dagstore/lotusaccessor_test.go +++ b/markets/dagstore/lotusaccessor_test.go @@ -66,6 +66,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) { sectors: mockData, } api := NewLotusAccessor(ps, rpn) + require.NoError(t, api.Start(ctx)) // Add deals to piece store for _, sectorID := range tc.deals { @@ -101,6 +102,7 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) { ps := getPieceStore(t) rpn := &mockRPN{} api := NewLotusAccessor(ps, rpn) + require.NoError(t, api.Start(ctx)) // Add a deal with data Length 10 dealInfo := piecestore.DealInfo{ @@ -121,14 +123,6 @@ func getPieceStore(t *testing.T) piecestore.PieceStore { err = ps.Start(context.Background()) require.NoError(t, err) - // - //ready := make(chan error) - //ps.OnReady(func(err error) { - // ready <- err - //}) - //err = <-ready - //require.NoError(t, err) - // return ps } diff --git a/markets/dagstore/mocks/mock_lotus_accessor.go b/markets/dagstore/mocks/mock_lotus_accessor.go index e8a3a73ac..edd5f856f 100644 --- a/markets/dagstore/mocks/mock_lotus_accessor.go +++ b/markets/dagstore/mocks/mock_lotus_accessor.go @@ -31,6 +31,10 @@ func NewMockLotusAccessor(ctrl *gomock.Controller) *MockLotusAccessor { return mock } +func (mr *MockLotusAccessor) Start(_ context.Context) error { + return nil +} + // EXPECT returns an object that allows the caller to indicate expected use. func (m *MockLotusAccessor) EXPECT() *MockLotusAccessorMockRecorder { return m.recorder diff --git a/node/builder_miner.go b/node/builder_miner.go index 3fd2ec547..1a115c92f 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -147,6 +147,7 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)), // DAG Store + Override(new(dagstore.LotusAccessor), modules.NewLotusAccessor), Override(new(*dagstore.Wrapper), modules.DagStoreWrapper), // Markets (retrieval) diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index e00328248..d0b71b02f 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -576,12 +576,28 @@ func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.Conside } } +func NewLotusAccessor(lc fx.Lifecycle, + pieceStore dtypes.ProviderPieceStore, + rpn retrievalmarket.RetrievalProviderNode, +) (dagstore.LotusAccessor, error) { + mountApi := dagstore.NewLotusAccessor(pieceStore, rpn) + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + return mountApi.Start(ctx) + }, + OnStop: func(context.Context) error { + return nil + }, + }) + + return mountApi, nil +} + func DagStoreWrapper( lc fx.Lifecycle, ds dtypes.MetadataDS, r repo.LockedRepo, - pieceStore dtypes.ProviderPieceStore, - rpn retrievalmarket.RetrievalProviderNode, + lotusAccessor dagstore.LotusAccessor, ) (*dagstore.Wrapper, error) { dagStoreDir := filepath.Join(r.Path(), dagStore) dagStoreDS := namespace.Wrap(ds, datastore.NewKey("/dagstore/provider")) @@ -591,8 +607,8 @@ func DagStoreWrapper( Datastore: dagStoreDS, GCInterval: 5 * time.Minute, } - mountApi := dagstore.NewLotusAccessor(pieceStore, rpn) - dsw, err := dagstore.NewDagStoreWrapper(cfg, mountApi) + + dsw, err := dagstore.NewDagStoreWrapper(cfg, lotusAccessor) if err != nil { return nil, xerrors.Errorf("failed to create DAG store wrapper: %w", err) }