From de3d3b48f71bbcb8e2b7bb9732d4ed7cc4ca1c7a Mon Sep 17 00:00:00 2001 From: yaohcn Date: Wed, 5 Aug 2020 20:36:49 +0800 Subject: [PATCH] fix 2806 --- manager.go | 50 ++++++++++++-------------------------------- sched_test.go | 2 +- selector_alloc.go | 2 +- selector_existing.go | 26 ++++++++++++++--------- 4 files changed, 31 insertions(+), 49 deletions(-) diff --git a/manager.go b/manager.go index 5f2b8e334..303df2169 100644 --- a/manager.go +++ b/manager.go @@ -213,12 +213,9 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect var selector WorkerSelector if len(best) == 0 { // new - selector = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing) + selector = newAllocSelector(m.index, stores.FTUnsealed, stores.PathSealing) } else { // append to existing - selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false) - } - if err != nil { - return xerrors.Errorf("creating unsealPiece selector: %w", err) + selector = newExistingSelector(m.index, sector, stores.FTUnsealed, false) } var readOk bool @@ -226,10 +223,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect if len(best) > 0 { // There is unsealed sector, see if we can read from it - selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false) - if err != nil { - return xerrors.Errorf("creating readPiece selector: %w", err) - } + selector = newExistingSelector(m.index, sector, stores.FTUnsealed, false) err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { readOk, err = w.ReadPiece(ctx, sink, sector, offset, size) @@ -264,10 +258,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect return err } - selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false) - if err != nil { - return xerrors.Errorf("creating readPiece selector: %w", err) - } + selector = newExistingSelector(m.index, sector, stores.FTUnsealed, false) err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { readOk, err = w.ReadPiece(ctx, sink, sector, offset, size) @@ -300,12 +291,9 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie var selector WorkerSelector var err error if len(existingPieces) == 0 { // new - selector = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing) + selector = newAllocSelector(m.index, stores.FTUnsealed, stores.PathSealing) } else { // use existing - selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false) - } - if err != nil { - return abi.PieceInfo{}, xerrors.Errorf("creating path selector: %w", err) + selector = newExistingSelector(m.index, sector, stores.FTUnsealed, false) } var out abi.PieceInfo @@ -331,7 +319,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke // TODO: also consider where the unsealed data sits - selector := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathSealing) + selector := newAllocSelector(m.index, stores.FTCache|stores.FTSealed, stores.PathSealing) err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { p, err := w.SealPreCommit1(ctx, sector, ticket, pieces) @@ -353,10 +341,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase return storage.SectorCids{}, xerrors.Errorf("acquiring sector lock: %w", err) } - selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, true) - if err != nil { - return storage.SectorCids{}, xerrors.Errorf("creating path selector: %w", err) - } + selector := newExistingSelector(m.index, sector, stores.FTCache|stores.FTSealed, true) err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { p, err := w.SealPreCommit2(ctx, sector, phase1Out) @@ -380,10 +365,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a // NOTE: We set allowFetch to false in so that we always execute on a worker // with direct access to the data. We want to do that because this step is // generally very cheap / fast, and transferring data is not worth the effort - selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, false) - if err != nil { - return storage.Commit1Out{}, xerrors.Errorf("creating path selector: %w", err) - } + selector := newExistingSelector(m.index, sector, stores.FTCache|stores.FTSealed, false) err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { p, err := w.SealCommit1(ctx, sector, ticket, seed, pieces, cids) @@ -431,12 +413,9 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU } } - selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, false) - if err != nil { - return xerrors.Errorf("creating path selector: %w", err) - } + selector := newExistingSelector(m.index, sector, stores.FTCache|stores.FTSealed, false) - err = m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, + err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, schedFetch(sector, stores.FTCache|stores.FTSealed|unsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { return w.FinalizeSector(ctx, sector, keepUnsealed) @@ -445,7 +424,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU return err } - fetchSel := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathStorage) + fetchSel := newAllocSelector(m.index, stores.FTCache|stores.FTSealed, stores.PathStorage) moveUnsealed := unsealed { if len(keepUnsealed) == 0 { @@ -490,10 +469,7 @@ func (m *Manager) Remove(ctx context.Context, sector abi.SectorID) error { } } - selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, false) - if err != nil { - return xerrors.Errorf("creating selector: %w", err) - } + selector := newExistingSelector(m.index, sector, stores.FTCache|stores.FTSealed, false) return m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, schedFetch(sector, stores.FTCache|stores.FTSealed|unsealed, stores.PathStorage, stores.AcquireMove), diff --git a/sched_test.go b/sched_test.go index c96f7838c..6490e738e 100644 --- a/sched_test.go +++ b/sched_test.go @@ -207,7 +207,7 @@ func TestSched(t *testing.T) { done := make(chan struct{}) rm.done[taskName] = done - sel := newAllocSelector(ctx, index, stores.FTCache, stores.PathSealing) + sel := newAllocSelector(index, stores.FTCache, stores.PathSealing) rm.wg.Add(1) go func() { diff --git a/selector_alloc.go b/selector_alloc.go index 35221921f..cf7937587 100644 --- a/selector_alloc.go +++ b/selector_alloc.go @@ -17,7 +17,7 @@ type allocSelector struct { ptype stores.PathType } -func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType, ptype stores.PathType) *allocSelector { +func newAllocSelector(index stores.SectorIndex, alloc stores.SectorFileType, ptype stores.PathType) *allocSelector { return &allocSelector{ index: index, alloc: alloc, diff --git a/selector_existing.go b/selector_existing.go index 3f99010cb..20cb1b209 100644 --- a/selector_existing.go +++ b/selector_existing.go @@ -12,18 +12,19 @@ import ( ) type existingSelector struct { - best []stores.SectorStorageInfo + index stores.SectorIndex + sector abi.SectorID + alloc stores.SectorFileType + allowFetch bool } -func newExistingSelector(ctx context.Context, index stores.SectorIndex, sector abi.SectorID, alloc stores.SectorFileType, allowFetch bool) (*existingSelector, error) { - best, err := index.StorageFindSector(ctx, sector, alloc, allowFetch) - if err != nil { - return nil, err - } - +func newExistingSelector(index stores.SectorIndex, sector abi.SectorID, alloc stores.SectorFileType, allowFetch bool) *existingSelector { return &existingSelector{ - best: best, - }, nil + index: index, + sector: sector, + alloc: alloc, + allowFetch: allowFetch, + } } func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) { @@ -45,7 +46,12 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt have[path.ID] = struct{}{} } - for _, info := range s.best { + best, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, s.allowFetch) + if err != nil { + return false, xerrors.Errorf("finding best storage: %w", err) + } + + for _, info := range best { if _, ok := have[info.ID]; ok { return true, nil }