fix: wait for piece store to start before starting DAG store wrapper
This commit is contained in:
parent
213dbc6bc8
commit
6057f2e6d9
@ -12,6 +12,7 @@ import (
|
||||
)
|
||||
|
||||
type LotusAccessor interface {
|
||||
Start(ctx context.Context) error
|
||||
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
|
||||
GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error)
|
||||
}
|
||||
@ -30,6 +31,30 @@ func NewLotusMountAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalP
|
||||
}
|
||||
}
|
||||
|
||||
func (m *lotusAccessor) Start(ctx context.Context) error {
|
||||
// Wait for the piece store to startup
|
||||
ready := make(chan error)
|
||||
m.pieceStore.OnReady(func(err error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case ready <- err:
|
||||
}
|
||||
})
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return xerrors.Errorf("context cancelled waiting for piece store startup: %w", ctx.Err())
|
||||
case err := <-ready:
|
||||
// Piece store has started up, check if there was an error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Piece store has started up successfully
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
|
||||
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid)
|
||||
if err != nil {
|
||||
|
@ -96,13 +96,20 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (ds *Wrapper) Start(ctx context.Context) {
|
||||
func (ds *Wrapper) Start(ctx context.Context) error {
|
||||
ds.ctx, ds.cancel = context.WithCancel(ctx)
|
||||
|
||||
err := ds.mountApi.Start(ctx)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to start mount API: %w", err)
|
||||
}
|
||||
|
||||
ds.backgroundWg.Add(1)
|
||||
|
||||
// Run a go-routine to handle failures, traces and GC
|
||||
go ds.background()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ds *Wrapper) background() {
|
||||
|
@ -599,8 +599,7 @@ func DagStoreWrapper(
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
dsw.Start(ctx)
|
||||
return nil
|
||||
return dsw.Start(ctx)
|
||||
},
|
||||
OnStop: func(context.Context) error {
|
||||
return dsw.Close()
|
||||
|
Loading…
Reference in New Issue
Block a user