This commit is contained in:
yaohcn 2020-08-05 20:36:49 +08:00
parent ad9a691e0a
commit de3d3b48f7
4 changed files with 31 additions and 49 deletions

View File

@ -213,12 +213,9 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
var selector WorkerSelector
if len(best) == 0 { // new
selector = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing)
selector = newAllocSelector(m.index, stores.FTUnsealed, stores.PathSealing)
} else { // append to existing
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
}
if err != nil {
return xerrors.Errorf("creating unsealPiece selector: %w", err)
selector = newExistingSelector(m.index, sector, stores.FTUnsealed, false)
}
var readOk bool
@ -226,10 +223,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
if len(best) > 0 {
// There is unsealed sector, see if we can read from it
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
if err != nil {
return xerrors.Errorf("creating readPiece selector: %w", err)
}
selector = newExistingSelector(m.index, sector, stores.FTUnsealed, false)
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error {
readOk, err = w.ReadPiece(ctx, sink, sector, offset, size)
@ -264,10 +258,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
return err
}
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
if err != nil {
return xerrors.Errorf("creating readPiece selector: %w", err)
}
selector = newExistingSelector(m.index, sector, stores.FTUnsealed, false)
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error {
readOk, err = w.ReadPiece(ctx, sink, sector, offset, size)
@ -300,12 +291,9 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
var selector WorkerSelector
var err error
if len(existingPieces) == 0 { // new
selector = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing)
selector = newAllocSelector(m.index, stores.FTUnsealed, stores.PathSealing)
} else { // use existing
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
}
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("creating path selector: %w", err)
selector = newExistingSelector(m.index, sector, stores.FTUnsealed, false)
}
var out abi.PieceInfo
@ -331,7 +319,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
// TODO: also consider where the unsealed data sits
selector := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathSealing)
selector := newAllocSelector(m.index, stores.FTCache|stores.FTSealed, stores.PathSealing)
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error {
p, err := w.SealPreCommit1(ctx, sector, ticket, pieces)
@ -353,10 +341,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
return storage.SectorCids{}, xerrors.Errorf("acquiring sector lock: %w", err)
}
selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, true)
if err != nil {
return storage.SectorCids{}, xerrors.Errorf("creating path selector: %w", err)
}
selector := newExistingSelector(m.index, sector, stores.FTCache|stores.FTSealed, true)
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error {
p, err := w.SealPreCommit2(ctx, sector, phase1Out)
@ -380,10 +365,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
// NOTE: We set allowFetch to false in so that we always execute on a worker
// with direct access to the data. We want to do that because this step is
// generally very cheap / fast, and transferring data is not worth the effort
selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, false)
if err != nil {
return storage.Commit1Out{}, xerrors.Errorf("creating path selector: %w", err)
}
selector := newExistingSelector(m.index, sector, stores.FTCache|stores.FTSealed, false)
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error {
p, err := w.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
@ -431,12 +413,9 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU
}
}
selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, false)
if err != nil {
return xerrors.Errorf("creating path selector: %w", err)
}
selector := newExistingSelector(m.index, sector, stores.FTCache|stores.FTSealed, false)
err = m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
schedFetch(sector, stores.FTCache|stores.FTSealed|unsealed, stores.PathSealing, stores.AcquireMove),
func(ctx context.Context, w Worker) error {
return w.FinalizeSector(ctx, sector, keepUnsealed)
@ -445,7 +424,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU
return err
}
fetchSel := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathStorage)
fetchSel := newAllocSelector(m.index, stores.FTCache|stores.FTSealed, stores.PathStorage)
moveUnsealed := unsealed
{
if len(keepUnsealed) == 0 {
@ -490,10 +469,7 @@ func (m *Manager) Remove(ctx context.Context, sector abi.SectorID) error {
}
}
selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, false)
if err != nil {
return xerrors.Errorf("creating selector: %w", err)
}
selector := newExistingSelector(m.index, sector, stores.FTCache|stores.FTSealed, false)
return m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
schedFetch(sector, stores.FTCache|stores.FTSealed|unsealed, stores.PathStorage, stores.AcquireMove),

View File

@ -207,7 +207,7 @@ func TestSched(t *testing.T) {
done := make(chan struct{})
rm.done[taskName] = done
sel := newAllocSelector(ctx, index, stores.FTCache, stores.PathSealing)
sel := newAllocSelector(index, stores.FTCache, stores.PathSealing)
rm.wg.Add(1)
go func() {

View File

@ -17,7 +17,7 @@ type allocSelector struct {
ptype stores.PathType
}
func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType, ptype stores.PathType) *allocSelector {
func newAllocSelector(index stores.SectorIndex, alloc stores.SectorFileType, ptype stores.PathType) *allocSelector {
return &allocSelector{
index: index,
alloc: alloc,

View File

@ -12,18 +12,19 @@ import (
)
type existingSelector struct {
best []stores.SectorStorageInfo
}
func newExistingSelector(ctx context.Context, index stores.SectorIndex, sector abi.SectorID, alloc stores.SectorFileType, allowFetch bool) (*existingSelector, error) {
best, err := index.StorageFindSector(ctx, sector, alloc, allowFetch)
if err != nil {
return nil, err
index stores.SectorIndex
sector abi.SectorID
alloc stores.SectorFileType
allowFetch bool
}
func newExistingSelector(index stores.SectorIndex, sector abi.SectorID, alloc stores.SectorFileType, allowFetch bool) *existingSelector {
return &existingSelector{
best: best,
}, nil
index: index,
sector: sector,
alloc: alloc,
allowFetch: allowFetch,
}
}
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) {
@ -45,7 +46,12 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt
have[path.ID] = struct{}{}
}
for _, info := range s.best {
best, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, s.allowFetch)
if err != nil {
return false, xerrors.Errorf("finding best storage: %w", err)
}
for _, info := range best {
if _, ok := have[info.ID]; ok {
return true, nil
}