Merge pull request #6808 from filecoin-project/fix/wait-piece-store-start
Wait for piece store to start before starting DAG store wrapper
This commit is contained in:
commit
839abdf910
@ -12,6 +12,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type LotusAccessor interface {
|
type LotusAccessor interface {
|
||||||
|
Start(ctx context.Context) error
|
||||||
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
|
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
|
||||||
GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error)
|
GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error)
|
||||||
}
|
}
|
||||||
@ -30,6 +31,30 @@ func NewLotusMountAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalP
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *lotusAccessor) Start(ctx context.Context) error {
|
||||||
|
// Wait for the piece store to startup
|
||||||
|
ready := make(chan error)
|
||||||
|
m.pieceStore.OnReady(func(err error) {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case ready <- err:
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return xerrors.Errorf("context cancelled waiting for piece store startup: %w", ctx.Err())
|
||||||
|
case err := <-ready:
|
||||||
|
// Piece store has started up, check if there was an error
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Piece store has started up successfully
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
|
func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
|
||||||
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid)
|
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -96,13 +96,20 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *Wrapper) Start(ctx context.Context) {
|
func (ds *Wrapper) Start(ctx context.Context) error {
|
||||||
ds.ctx, ds.cancel = context.WithCancel(ctx)
|
ds.ctx, ds.cancel = context.WithCancel(ctx)
|
||||||
|
|
||||||
|
err := ds.mountApi.Start(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to start mount API: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
ds.backgroundWg.Add(1)
|
ds.backgroundWg.Add(1)
|
||||||
|
|
||||||
// Run a go-routine to handle failures, traces and GC
|
// Run a go-routine to handle failures, traces and GC
|
||||||
go ds.background()
|
go ds.background()
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *Wrapper) background() {
|
func (ds *Wrapper) background() {
|
||||||
|
@ -599,8 +599,7 @@ func DagStoreWrapper(
|
|||||||
|
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
OnStart: func(ctx context.Context) error {
|
OnStart: func(ctx context.Context) error {
|
||||||
dsw.Start(ctx)
|
return dsw.Start(ctx)
|
||||||
return nil
|
|
||||||
},
|
},
|
||||||
OnStop: func(context.Context) error {
|
OnStop: func(context.Context) error {
|
||||||
return dsw.Close()
|
return dsw.Close()
|
||||||
|
Loading…
Reference in New Issue
Block a user