fix dagstore/piecestore lifecycle.

This commit is contained in:
Raúl Kripalani 2021-07-28 17:08:04 +01:00
parent 5d047c59b1
commit 9ba6a41ef4
2 changed files with 9 additions and 24 deletions

View File

@ -54,30 +54,8 @@ func NewLotusAccessor(store piecestore.PieceStore, rm retrievalmarket.RetrievalP
}
}
func (m *lotusAccessor) Start(ctx context.Context) error {
// Wait for the piece store to startup
ready := make(chan error)
m.pieceStore.OnReady(func(err error) {
select {
case <-ctx.Done():
case ready <- err:
}
})
select {
case <-ctx.Done():
err := xerrors.Errorf("context cancelled waiting for piece store startup: %w", ctx.Err())
if ferr := m.readyMgr.FireReady(err); ferr != nil {
log.Warnw("failed to publish ready event", "err", ferr)
}
return err
case err := <-ready:
if ferr := m.readyMgr.FireReady(err); ferr != nil {
log.Warnw("failed to publish ready event", "err", ferr)
}
return err
}
func (m *lotusAccessor) Start(_ context.Context) error {
return m.readyMgr.FireReady(nil)
}
func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {

View File

@ -585,8 +585,15 @@ func NewLotusAccessor(lc fx.Lifecycle,
rpn retrievalmarket.RetrievalProviderNode,
) (dagstore.LotusAccessor, error) {
mountApi := dagstore.NewLotusAccessor(pieceStore, rpn)
ready := make(chan error, 1)
pieceStore.OnReady(func(err error) {
ready <- err
})
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
if err := <-ready; err != nil {
return fmt.Errorf("aborting dagstore start; piecestore failed to start: %s", err)
}
return mountApi.Start(ctx)
},
OnStop: func(context.Context) error {