From 6057f2e6d96115e911bdec5dd130f09fa512baa3 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Wed, 21 Jul 2021 11:53:02 +0200 Subject: [PATCH] fix: wait for piece store to start before starting DAG store wrapper --- markets/dagstore/lotusaccessor.go | 25 +++++++++++++++++++++++++ markets/dagstore/wrapper.go | 9 ++++++++- node/modules/storageminer.go | 3 +-- 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/markets/dagstore/lotusaccessor.go b/markets/dagstore/lotusaccessor.go index f7e136582..fe010ecc4 100644 --- a/markets/dagstore/lotusaccessor.go +++ b/markets/dagstore/lotusaccessor.go @@ -12,6 +12,7 @@ import ( ) type LotusAccessor interface { + Start(ctx context.Context) error FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) } @@ -30,6 +31,30 @@ func NewLotusMountAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalP } } +func (m *lotusAccessor) Start(ctx context.Context) error { + // Wait for the piece store to startup + ready := make(chan error) + m.pieceStore.OnReady(func(err error) { + select { + case <-ctx.Done(): + case ready <- err: + } + }) + + select { + case <-ctx.Done(): + return xerrors.Errorf("context cancelled waiting for piece store startup: %w", ctx.Err()) + case err := <-ready: + // Piece store has started up, check if there was an error + if err != nil { + return err + } + } + + // Piece store has started up successfully + return nil +} + func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) { pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid) if err != nil { diff --git a/markets/dagstore/wrapper.go b/markets/dagstore/wrapper.go index 437d0cbd8..c054882d6 100644 --- a/markets/dagstore/wrapper.go +++ b/markets/dagstore/wrapper.go @@ -96,13 +96,20 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap }, nil } -func (ds *Wrapper) Start(ctx context.Context) { +func (ds *Wrapper) Start(ctx context.Context) error { ds.ctx, ds.cancel = context.WithCancel(ctx) + err := ds.mountApi.Start(ctx) + if err != nil { + return xerrors.Errorf("failed to start mount API: %w", err) + } + ds.backgroundWg.Add(1) // Run a go-routine to handle failures, traces and GC go ds.background() + + return nil } func (ds *Wrapper) background() { diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index d779dac6b..e4453bc29 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -599,8 +599,7 @@ func DagStoreWrapper( lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { - dsw.Start(ctx) - return nil + return dsw.Start(ctx) }, OnStop: func(context.Context) error { return dsw.Close()