Allow FinalizeSector on all nodes

This commit is contained in:
Łukasz Magiera 2020-06-03 23:44:59 +02:00
parent a39bc94c58
commit b5674f12f0
3 changed files with 41 additions and 14 deletions

View File

@ -176,6 +176,10 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) e
return xerrors.Errorf("removing unsealed data: %w", err) return xerrors.Errorf("removing unsealed data: %w", err)
} }
return nil
}
func (l *LocalWorker) MoveStorage(ctx context.Context, sector abi.SectorID) error {
if err := l.storage.MoveStorage(ctx, sector, l.scfg.SealProofType, stores.FTSealed|stores.FTCache); err != nil { if err := l.storage.MoveStorage(ctx, sector, l.scfg.SealProofType, stores.FTSealed|stores.FTCache); err != nil {
return xerrors.Errorf("moving sealed data to storage: %w", err) return xerrors.Errorf("moving sealed data to storage: %w", err)
} }

View File

@ -29,6 +29,8 @@ type URLs []string
type Worker interface { type Worker interface {
ffiwrapper.StorageSealer ffiwrapper.StorageSealer
MoveStorage(ctx context.Context, sector abi.SectorID) error
Fetch(ctx context.Context, s abi.SectorID, ft stores.SectorFileType, sealing bool, am stores.AcquireMode) error Fetch(ctx context.Context, s abi.SectorID, ft stores.SectorFileType, sealing bool, am stores.AcquireMode) error
UnsealPiece(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error UnsealPiece(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) error ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) error
@ -184,9 +186,9 @@ func schedNop(context.Context, Worker) error {
return nil return nil
} }
func schedFetch(sector abi.SectorID, ft stores.SectorFileType, sealing bool, am stores.AcquireMode) func(context.Context, Worker) error { func schedFetch(sector abi.SectorID, ft stores.SectorFileType, sealing stores.PathType, am stores.AcquireMode) func(context.Context, Worker) error {
return func(ctx context.Context, worker Worker) error { return func(ctx context.Context, worker Worker) error {
return worker.Fetch(ctx, sector, ft, sealing, am) return worker.Fetch(ctx, sector, ft, bool(sealing), am)
} }
} }
@ -205,7 +207,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
var selector WorkerSelector var selector WorkerSelector
if len(best) == 0 { // new if len(best) == 0 { // new
selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed) selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing)
} else { // append to existing } else { // append to existing
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false) selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
} }
@ -240,7 +242,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
return xerrors.Errorf("creating readPiece selector: %w", err) return xerrors.Errorf("creating readPiece selector: %w", err)
} }
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error {
return w.ReadPiece(ctx, sink, sector, offset, size) return w.ReadPiece(ctx, sink, sector, offset, size)
}) })
if err != nil { if err != nil {
@ -266,7 +268,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
var selector WorkerSelector var selector WorkerSelector
var err error var err error
if len(existingPieces) == 0 { // new if len(existingPieces) == 0 { // new
selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed) selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing)
} else { // use existing } else { // use existing
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false) selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
} }
@ -297,12 +299,12 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
// TODO: also consider where the unsealed data sits // TODO: also consider where the unsealed data sits
selector, err := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed) selector, err := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathSealing)
if err != nil { if err != nil {
return nil, xerrors.Errorf("creating path selector: %w", err) return nil, xerrors.Errorf("creating path selector: %w", err)
} }
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error {
p, err := w.SealPreCommit1(ctx, sector, ticket, pieces) p, err := w.SealPreCommit1(ctx, sector, ticket, pieces)
if err != nil { if err != nil {
return err return err
@ -327,7 +329,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
return storage.SectorCids{}, xerrors.Errorf("creating path selector: %w", err) return storage.SectorCids{}, xerrors.Errorf("creating path selector: %w", err)
} }
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error {
p, err := w.SealPreCommit2(ctx, sector, phase1Out) p, err := w.SealPreCommit2(ctx, sector, phase1Out)
if err != nil { if err != nil {
return err return err
@ -354,7 +356,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
return storage.Commit1Out{}, xerrors.Errorf("creating path selector: %w", err) return storage.Commit1Out{}, xerrors.Errorf("creating path selector: %w", err)
} }
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error {
p, err := w.SealCommit1(ctx, sector, ticket, seed, pieces, cids) p, err := w.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
if err != nil { if err != nil {
return err return err
@ -384,7 +386,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
if err := m.index.StorageLock(ctx, sector, stores.FTSealed, stores.FTUnsealed|stores.FTCache); err != nil { if err := m.index.StorageLock(ctx, sector, stores.FTNone, stores.FTSealed|stores.FTUnsealed|stores.FTCache); err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err) return xerrors.Errorf("acquiring sector lock: %w", err)
} }
@ -393,11 +395,30 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error
return xerrors.Errorf("creating path selector: %w", err) return xerrors.Errorf("creating path selector: %w", err)
} }
return m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, err = m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
schedFetch(sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, false, stores.AcquireMove), schedFetch(sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, stores.PathSealing, stores.AcquireMove),
func(ctx context.Context, w Worker) error { func(ctx context.Context, w Worker) error {
return w.FinalizeSector(ctx, sector) return w.FinalizeSector(ctx, sector)
}) })
if err != nil {
return err
}
fetchSel, err := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathStorage)
if err != nil {
return xerrors.Errorf("creating fetchSel: %w", err)
}
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
schedFetch(sector, stores.FTCache|stores.FTSealed, stores.PathStorage, stores.AcquireMove),
func(ctx context.Context, w Worker) error {
return w.MoveStorage(ctx, sector)
})
if err != nil {
return xerrors.Errorf("moving sector to storage: %w", err)
}
return nil
} }
func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) { func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) {

View File

@ -14,12 +14,14 @@ import (
type allocSelector struct { type allocSelector struct {
index stores.SectorIndex index stores.SectorIndex
alloc stores.SectorFileType alloc stores.SectorFileType
ptype stores.PathType
} }
func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType) (*allocSelector, error) { func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType, ptype stores.PathType) (*allocSelector, error) {
return &allocSelector{ return &allocSelector{
index: index, index: index,
alloc: alloc, alloc: alloc,
ptype: ptype,
}, nil }, nil
} }
@ -42,7 +44,7 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi
have[path.ID] = struct{}{} have[path.ID] = struct{}{}
} }
best, err := s.index.StorageBestAlloc(ctx, s.alloc, spt, true) best, err := s.index.StorageBestAlloc(ctx, s.alloc, spt, s.ptype)
if err != nil { if err != nil {
return false, xerrors.Errorf("finding best alloc storage: %w", err) return false, xerrors.Errorf("finding best alloc storage: %w", err)
} }