diff --git a/manager.go b/manager.go index 8302427c7..580bd6fa5 100644 --- a/manager.go +++ b/manager.go @@ -88,7 +88,7 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg return nil, err } - prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor}, cfg) + prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si}, cfg) if err != nil { return nil, xerrors.Errorf("creating prover instance: %w", err) } @@ -191,6 +191,13 @@ func schedFetch(sector abi.SectorID, ft stores.SectorFileType, sealing bool, am } func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if err := m.index.StorageLock(ctx, sector, stores.FTSealed|stores.FTCache, stores.FTUnsealed); err != nil { + return xerrors.Errorf("acquiring sector lock: %w", err) + } + best, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false) if err != nil { return xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err) @@ -249,6 +256,13 @@ func (m *Manager) NewSector(ctx context.Context, sector abi.SectorID) error { } func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if err := m.index.StorageLock(ctx, sector, stores.FTNone, stores.FTUnsealed); err != nil { + return abi.PieceInfo{}, xerrors.Errorf("acquiring sector lock: %w", err) + } + var selector WorkerSelector var err error if len(existingPieces) == 0 { // new @@ -274,6 +288,13 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie } func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if err := m.index.StorageLock(ctx, sector, stores.FTUnsealed, stores.FTSealed|stores.FTCache); err != nil { + return nil, xerrors.Errorf("acquiring sector lock: %w", err) + } + // TODO: also consider where the unsealed data sits selector, err := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed) @@ -294,6 +315,13 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke } func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (out storage.SectorCids, err error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if err := m.index.StorageLock(ctx, sector, stores.FTSealed, stores.FTCache); err != nil { + return storage.SectorCids{}, xerrors.Errorf("acquiring sector lock: %w", err) + } + selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, true) if err != nil { return storage.SectorCids{}, xerrors.Errorf("creating path selector: %w", err) @@ -311,6 +339,13 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase } func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (out storage.Commit1Out, err error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if err := m.index.StorageLock(ctx, sector, stores.FTSealed, stores.FTCache); err != nil { + return storage.Commit1Out{}, xerrors.Errorf("acquiring sector lock: %w", err) + } + // NOTE: We set allowFetch to false in so that we always execute on a worker // with direct access to the data. We want to do that because this step is // generally very cheap / fast, and transferring data is not worth the effort @@ -346,6 +381,13 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou } func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if err := m.index.StorageLock(ctx, sector, stores.FTSealed, stores.FTUnsealed|stores.FTCache); err != nil { + return xerrors.Errorf("acquiring sector lock: %w", err) + } + selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, false) if err != nil { return xerrors.Errorf("creating path selector: %w", err) diff --git a/roprov.go b/roprov.go index aba6bb5d9..0e8950478 100644 --- a/roprov.go +++ b/roprov.go @@ -11,8 +11,9 @@ import ( ) type readonlyProvider struct { - stor *stores.Local - spt abi.RegisteredProof + index stores.SectorIndex + stor *stores.Local + spt abi.RegisteredProof } func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) { @@ -20,7 +21,15 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, e return stores.SectorPaths{}, nil, xerrors.New("read-only storage") } + ctx, cancel := context.WithCancel(ctx) + if err := l.index.StorageLock(ctx, id, existing, stores.FTNone); err != nil { + return stores.SectorPaths{}, nil, xerrors.Errorf("acquiring sector lock: %w", err) + } + p, _, done, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, stores.PathType(sealing), stores.AcquireMove) - return p, done, err + return p, func() { + cancel() + done() + }, err } diff --git a/stores/filetype.go b/stores/filetype.go index e3cc4042c..cfb9db38a 100644 --- a/stores/filetype.go +++ b/stores/filetype.go @@ -75,7 +75,7 @@ func (t SectorFileType) SealSpaceUse(spt abi.RegisteredProof) (uint64, error) { func (t SectorFileType) All() (out [FileTypes]bool) { for i := range out { - out[i] = t & (1 << i) > 0 + out[i] = t&(1< 0 } return out diff --git a/stores/index.go b/stores/index.go index c6856ef8e..6ef346367 100644 --- a/stores/index.go +++ b/stores/index.go @@ -95,8 +95,8 @@ func NewIndex() *Index { indexLocks: &indexLocks{ locks: map[abi.SectorID]*sectorLock{}, }, - sectors: map[Decl][]*declMeta{}, - stores: map[ID]*storageEntry{}, + sectors: map[Decl][]*declMeta{}, + stores: map[ID]*storageEntry{}, } } diff --git a/stores/index_locks.go b/stores/index_locks.go index f7770d5e5..0bce92a22 100644 --- a/stores/index_locks.go +++ b/stores/index_locks.go @@ -10,7 +10,7 @@ import ( ) type sectorLock struct { - lk sync.Mutex + lk sync.Mutex notif *ctxCond r [FileTypes]uint @@ -26,7 +26,7 @@ func (l *sectorLock) canLock(read SectorFileType, write SectorFileType) bool { } } - return l.w & (read | write) == 0 + return l.w&(read|write) == 0 } func (l *sectorLock) tryLock(read SectorFileType, write SectorFileType) bool { @@ -82,11 +82,11 @@ type indexLocks struct { } func (i *indexLocks) StorageLock(ctx context.Context, sector abi.SectorID, read SectorFileType, write SectorFileType) error { - if read | write == 0 { + if read|write == 0 { return nil } - if read | write > (1 << FileTypes) - 1 { + if read|write > (1<