diff --git a/markets/dagstore/mount_api.go b/markets/dagstore/lotusaccessor.go similarity index 71% rename from markets/dagstore/mount_api.go rename to markets/dagstore/lotusaccessor.go index 9821d28e2..f7e136582 100644 --- a/markets/dagstore/mount_api.go +++ b/markets/dagstore/lotusaccessor.go @@ -11,32 +11,32 @@ import ( "github.com/filecoin-project/go-fil-markets/retrievalmarket" ) -type LotusMountAPI interface { +type LotusAccessor interface { FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) } -type lotusMountApiImpl struct { +type lotusAccessor struct { pieceStore piecestore.PieceStore rm retrievalmarket.RetrievalProviderNode } -var _ LotusMountAPI = (*lotusMountApiImpl)(nil) +var _ LotusAccessor = (*lotusAccessor)(nil) -func NewLotusMountAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) *lotusMountApiImpl { - return &lotusMountApiImpl{ +func NewLotusMountAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) *lotusAccessor { + return &lotusAccessor{ pieceStore: store, rm: rm, } } -func (m *lotusMountApiImpl) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) { +func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) { pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid) if err != nil { return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err) } - if len(pieceInfo.Deals) <= 0 { + if len(pieceInfo.Deals) == 0 { return nil, xerrors.Errorf("no storage deals found for Piece %s", pieceCid) } @@ -44,6 +44,7 @@ func (m *lotusMountApiImpl) FetchUnsealedPiece(ctx context.Context, pieceCid cid for _, deal := range pieceInfo.Deals { isUnsealed, err := m.rm.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) if err != nil { + log.Warnf("failed to check if deal %d unsealed: %s", deal.DealID, err) continue } if isUnsealed { @@ -58,16 +59,23 @@ func (m *lotusMountApiImpl) FetchUnsealedPiece(ctx context.Context, pieceCid cid lastErr := xerrors.New("no sectors found to unseal from") // if there is no unsealed sector containing the piece, just read the piece from the first sector we are able to unseal. for _, deal := range pieceInfo.Deals { + // Note that if the deal data is not already unsealed, unsealing may + // block for a long time with the current PoRep reader, err := m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) - if err == nil { - return reader, nil + if err != nil { + lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err) + log.Warn(lastErr.Error()) + continue } - lastErr = err + + // Successfully fetched the deal data so return a reader over the data + return reader, nil } + return nil, lastErr } -func (m *lotusMountApiImpl) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) { +func (m *lotusAccessor) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) { pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid) if err != nil { return 0, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err) diff --git a/markets/dagstore/mount_api_test.go b/markets/dagstore/lotusaccessor_test.go similarity index 97% rename from markets/dagstore/mount_api_test.go rename to markets/dagstore/lotusaccessor_test.go index 265155786..21eebab76 100644 --- a/markets/dagstore/mount_api_test.go +++ b/markets/dagstore/lotusaccessor_test.go @@ -24,7 +24,7 @@ import ( const unsealedSectorID = abi.SectorNumber(1) const sealedSectorID = abi.SectorNumber(2) -func TestLotusMountApiFetchUnsealedPiece(t *testing.T) { +func TestLotusAccessorFetchUnsealedPiece(t *testing.T) { ctx := context.Background() cid1, err := cid.Parse("bafkqaaa") @@ -93,7 +93,7 @@ func TestLotusMountApiFetchUnsealedPiece(t *testing.T) { } } -func TestLotusMountApiGetUnpaddedCARSize(t *testing.T) { +func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) { cid1, err := cid.Parse("bafkqaaa") require.NoError(t, err) diff --git a/markets/dagstore/mount.go b/markets/dagstore/mount.go index fe5c302fb..69406633b 100644 --- a/markets/dagstore/mount.go +++ b/markets/dagstore/mount.go @@ -2,7 +2,6 @@ package dagstore import ( "context" - "fmt" "io" "net/url" @@ -13,14 +12,13 @@ import ( ) const lotusScheme = "lotus" -const mountURLTemplate = "%s://%s" var _ mount.Mount = (*LotusMount)(nil) // LotusMount is the Lotus implementation of a Sharded DAG Store Mount. // A Filecoin Piece is treated as a Shard by this implementation. type LotusMount struct { - api LotusMountAPI + api LotusAccessor pieceCid cid.Cid } @@ -29,11 +27,11 @@ type LotusMount struct { // When the registry needs to deserialize a mount it clones the template then // calls Deserialize on the cloned instance, which will have a reference to the // lotus mount API supplied here. -func NewLotusMountTemplate(api LotusMountAPI) *LotusMount { +func NewLotusMountTemplate(api LotusAccessor) *LotusMount { return &LotusMount{api: api} } -func NewLotusMount(pieceCid cid.Cid, api LotusMountAPI) (*LotusMount, error) { +func NewLotusMount(pieceCid cid.Cid, api LotusAccessor) (*LotusMount, error) { return &LotusMount{ pieceCid: pieceCid, api: api, @@ -41,21 +39,12 @@ func NewLotusMount(pieceCid cid.Cid, api LotusMountAPI) (*LotusMount, error) { } func (l *LotusMount) Serialize() *url.URL { - u := fmt.Sprintf(mountURLTemplate, lotusScheme, l.pieceCid.String()) - url, err := url.Parse(u) - if err != nil { - // Should never happen - panic(xerrors.Errorf("failed to parse mount URL '%s': %w", u, err)) + return &url.URL{ + Host: l.pieceCid.String(), } - - return url } func (l *LotusMount) Deserialize(u *url.URL) error { - if u.Scheme != lotusScheme { - return xerrors.Errorf("scheme '%s' for URL '%s' does not match required scheme '%s'", u.Scheme, u, lotusScheme) - } - pieceCid, err := cid.Decode(u.Host) if err != nil { return xerrors.Errorf("failed to parse PieceCid from host '%s': %w", u.Host, err) diff --git a/markets/dagstore/mount_test.go b/markets/dagstore/mount_test.go index 66e4e7d52..b7e9dde45 100644 --- a/markets/dagstore/mount_test.go +++ b/markets/dagstore/mount_test.go @@ -2,7 +2,6 @@ package dagstore import ( "context" - "fmt" "io/ioutil" "net/url" "strings" @@ -67,13 +66,13 @@ func TestLotusMount(t *testing.T) { } func TestLotusMountDeserialize(t *testing.T) { - api := &lotusMountApiImpl{} + api := &lotusAccessor{} bgen := blocksutil.NewBlockGenerator() cid := bgen.Next().Cid() // success - us := fmt.Sprintf(mountURLTemplate, lotusScheme, cid.String()) + us := lotusScheme + "://" + cid.String() u, err := url.Parse(us) require.NoError(t, err) @@ -84,17 +83,8 @@ func TestLotusMountDeserialize(t *testing.T) { require.Equal(t, cid, mnt.pieceCid) require.Equal(t, api, mnt.api) - // fails if scheme is not Lotus - us = fmt.Sprintf(mountURLTemplate, "http", cid.String()) - u, err = url.Parse(us) - require.NoError(t, err) - - err = mnt.Deserialize(u) - require.Error(t, err) - require.Contains(t, err.Error(), "does not match") - // fails if cid is not valid - us = fmt.Sprintf(mountURLTemplate, lotusScheme, "rand") + us = lotusScheme + "://" + "rand" u, err = url.Parse(us) require.NoError(t, err) err = mnt.Deserialize(u) diff --git a/markets/dagstore/dagstorewrapper.go b/markets/dagstore/wrapper.go similarity index 59% rename from markets/dagstore/dagstorewrapper.go rename to markets/dagstore/wrapper.go index ee67618b7..032575f09 100644 --- a/markets/dagstore/dagstorewrapper.go +++ b/markets/dagstore/wrapper.go @@ -2,8 +2,10 @@ package dagstore import ( "context" + "errors" "io" "sync" + "sync/atomic" "time" "github.com/ipfs/go-cid" @@ -35,23 +37,25 @@ type closableBlockstore struct { } type dagStoreWrapper struct { - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + backgroundWg sync.WaitGroup - dagStore *dagstore.DAGStore - mountApi LotusMountAPI + dagStore *dagstore.DAGStore + mountApi LotusAccessor + failureCh chan dagstore.ShardResult } var _ shared.DagStoreWrapper = (*dagStoreWrapper)(nil) -func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusMountAPI) (*dagStoreWrapper, error) { +func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*dagStoreWrapper, error) { // construct the DAG Store. registry := mount.NewRegistry() if err := registry.Register(lotusScheme, NewLotusMountTemplate(mountApi)); err != nil { return nil, xerrors.Errorf("failed to create registry: %w", err) } + // The dagstore will write Shard failures to the `failureCh` here. failureCh := make(chan dagstore.ShardResult, 1) dcfg := dagstore.Config{ TransientsDir: cfg.TransientsDir, @@ -65,37 +69,77 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusMountAPI) (*dagS return nil, xerrors.Errorf("failed to create DAG store: %w", err) } - ctx, cancel := context.WithCancel(context.Background()) - dw := &dagStoreWrapper{ - ctx: ctx, - cancel: cancel, - - dagStore: dagStore, - mountApi: mountApi, - } - - dw.wg.Add(1) - // the dagstore will write Shard failures to the `failureCh` here. Run a go-routine to handle them. - go dw.handleFailures(failureCh) - - return dw, nil + return &dagStoreWrapper{ + dagStore: dagStore, + mountApi: mountApi, + failureCh: failureCh, + }, nil } -func (ds *dagStoreWrapper) handleFailures(failureCh chan dagstore.ShardResult) { - defer ds.wg.Done() - ticker := time.NewTicker(gcInterval) - defer ticker.Stop() +func (ds *dagStoreWrapper) Start(ctx context.Context) { + ds.ctx, ds.cancel = context.WithCancel(ctx) - select { - case <-ticker.C: - _, _ = ds.dagStore.GC(ds.ctx) - case f := <-failureCh: - log.Errorw("shard failed", "shard-key", f.Key.String(), "error", f.Error) - if err := ds.dagStore.RecoverShard(ds.ctx, f.Key, nil, dagstore.RecoverOpts{}); err != nil { - log.Warnw("shard recovery failed", "shard-key", f.Key.String(), "error", err) + ds.backgroundWg.Add(1) + + // Run a go-routine to handle failures and GC + go ds.background(ds.failureCh) +} + +func (ds *dagStoreWrapper) background(failureCh chan dagstore.ShardResult) { + defer ds.backgroundWg.Done() + + gcTicker := time.NewTicker(gcInterval) + defer gcTicker.Stop() + + recoverShardResults := make(chan dagstore.ShardResult, 32) + var recShardResCount int32 + done := make(chan struct{}) + defer close(done) + go func() { + // Consume recover shard results + for { + select { + + // When the DAG store wrapper shuts down, drain the channel so as + // not to block the DAG store + case <-done: + for i := atomic.LoadInt32(&recShardResCount); i > 0; i-- { + res := <-recoverShardResults + if res.Error != nil { + log.Warnw("shard recovery failed", "shard-key", res.Key.String(), "error", res.Error) + } + } + return + + case res := <-recoverShardResults: + atomic.AddInt32(&recShardResCount, -1) + if res.Error != nil { + log.Warnw("shard recovery failed", "shard-key", res.Key.String(), "error", res.Error) + } + } + } + }() + + for ds.ctx.Err() != nil { + select { + + // GC the DAG store on every tick + case <-gcTicker.C: + _, _ = ds.dagStore.GC(ds.ctx) + + // Handle shard failures by attempting to recover the shard + case f := <-failureCh: + atomic.AddInt32(&recShardResCount, 1) + log.Warnw("shard failed", "shard-key", f.Key.String(), "error", f.Error) + if err := ds.dagStore.RecoverShard(ds.ctx, f.Key, recoverShardResults, dagstore.RecoverOpts{}); err != nil { + log.Warnw("shard recovery failed", "shard-key", f.Key.String(), "error", err) + atomic.AddInt32(&recShardResCount, -1) + } + + // Exit when the DAG store wrapper is shutdown + case <-ds.ctx.Done(): + return } - case <-ds.ctx.Done(): - return } } @@ -105,7 +149,7 @@ func (ds *dagStoreWrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (car err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}) if err != nil { - if xerrors.Unwrap(err) != dagstore.ErrShardUnknown { + if !errors.Is(err, dagstore.ErrShardUnknown) { return nil, xerrors.Errorf("failed to schedule acquire shard for piece CID %s: %w", pieceCid, err) } @@ -122,7 +166,10 @@ func (ds *dagStoreWrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (car } } - // TODO: Can I rely on AcquireShard to return an error if the context times out? + // TODO: The context is not yet being actively monitored by the DAG store, + // so we need to select against ctx.Done() until the following issue is + // implemented: + // https://github.com/filecoin-project/dagstore/issues/39 var res dagstore.ShardResult select { case <-ctx.Done(): @@ -163,12 +210,16 @@ func (ds *dagStoreWrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, } func (ds *dagStoreWrapper) Close() error { + // Cancel the context + ds.cancel() + + // Close the DAG store if err := ds.dagStore.Close(); err != nil { - return err + return xerrors.Errorf("failed to close DAG store: %w", err) } - ds.cancel() - ds.wg.Wait() + // Wait for the background go routine to exit + ds.backgroundWg.Wait() return nil } diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 85601d1e3..4f42cc09b 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -576,6 +576,7 @@ func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.Conside } func DagStoreWrapper( + lc fx.Lifecycle, ds dtypes.MetadataDS, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, @@ -589,7 +590,21 @@ func DagStoreWrapper( Datastore: dagStoreDS, } mountApi := dagstore.NewLotusMountAPI(pieceStore, rpn) - return dagstore.NewDagStoreWrapper(cfg, mountApi) + dsw, err := dagstore.NewDagStoreWrapper(cfg, mountApi) + if err != nil { + return nil, xerrors.Errorf("failed to create DAG store wrapper: %w", err) + } + + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + dsw.Start(ctx) + return nil + }, + OnStop: func(context.Context) error { + return dsw.Close() + }, + }) + return dsw, nil } func StorageProvider(minerAddress dtypes.MinerAddress,