diff --git a/faults.go b/faults.go index 1f5475259..6bf036089 100644 --- a/faults.go +++ b/faults.go @@ -19,13 +19,13 @@ func (m *Manager) CheckProvable(ctx context.Context, spt abi.RegisteredProof, se var bad []abi.SectorID // TODO: More better checks + // TODO: Use proper locking for _, sector := range sectors { err := func() error { - lp, _, done, err := m.localStore.AcquireSector(ctx, sector, spt, stores.FTSealed|stores.FTCache, stores.FTNone, false, stores.AcquireMove) + lp, _, err := m.localStore.AcquireSector(ctx, sector, spt, stores.FTSealed|stores.FTCache, stores.FTNone, false, stores.AcquireMove) if err != nil { return xerrors.Errorf("acquire sector in checkProvable: %w", err) } - defer done() if lp.Sealed == "" || lp.Cache == "" { log.Warnw("CheckProvable Sector FAULT: cache an/or sealed paths not found", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache) diff --git a/ffiwrapper/basicfs/fs.go b/ffiwrapper/basicfs/fs.go index 41ec8d4b4..3f865f590 100644 --- a/ffiwrapper/basicfs/fs.go +++ b/ffiwrapper/basicfs/fs.go @@ -24,7 +24,7 @@ type Provider struct { waitSector map[sectorFile]chan struct{} } -func (b *Provider) AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) { +func (b *Provider) AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, ptype stores.PathType) (stores.SectorPaths, func(), error) { if err := os.Mkdir(filepath.Join(b.Root, stores.FTUnsealed.String()), 0755); err != nil && !os.IsExist(err) { return stores.SectorPaths{}, nil, err } diff --git a/ffiwrapper/types.go b/ffiwrapper/types.go index cf211056f..c640df2e7 100644 --- a/ffiwrapper/types.go +++ b/ffiwrapper/types.go @@ -43,7 +43,7 @@ type Verifier interface { type SectorProvider interface { // * returns storiface.ErrSectorNotFound if a requested existing sector doesn't exist // * returns an error when allocate is set, and existing isn't, and the sector exists - AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) + AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, ptype stores.PathType) (stores.SectorPaths, func(), error) } var _ SectorProvider = &basicfs.Provider{} diff --git a/localworker.go b/localworker.go index 31d357bb0..fad5d42ec 100644 --- a/localworker.go +++ b/localworker.go @@ -59,8 +59,8 @@ type localWorkerPathProvider struct { op stores.AcquireMode } -func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) { - paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, stores.PathType(sealing), l.op) +func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing stores.PathType) (stores.SectorPaths, func(), error) { + paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, sealing, l.op) if err != nil { return stores.SectorPaths{}, nil, err } @@ -68,8 +68,6 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi. log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths) return paths, func() { - done() - for _, fileType := range pathTypes { if fileType&allocate == 0 { continue @@ -106,8 +104,8 @@ func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs [] return sb.AddPiece(ctx, sector, epcs, sz, r) } -func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType stores.SectorFileType, sealing bool, am stores.AcquireMode) error { - _, done, err := (&localWorkerPathProvider{w: l, op: am}).AcquireSector(ctx, sector, fileType, stores.FTNone, sealing) +func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error { + _, done, err := (&localWorkerPathProvider{w: l, op: am}).AcquireSector(ctx, sector, fileType, stores.FTNone, ptype) if err != nil { return err } @@ -176,6 +174,10 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) e return xerrors.Errorf("removing unsealed data: %w", err) } + return nil +} + +func (l *LocalWorker) MoveStorage(ctx context.Context, sector abi.SectorID) error { if err := l.storage.MoveStorage(ctx, sector, l.scfg.SealProofType, stores.FTSealed|stores.FTCache); err != nil { return xerrors.Errorf("moving sealed data to storage: %w", err) } diff --git a/manager.go b/manager.go index 07484e041..bc6dd1d9d 100644 --- a/manager.go +++ b/manager.go @@ -29,7 +29,9 @@ type URLs []string type Worker interface { ffiwrapper.StorageSealer - Fetch(ctx context.Context, s abi.SectorID, ft stores.SectorFileType, sealing bool, am stores.AcquireMode) error + MoveStorage(ctx context.Context, sector abi.SectorID) error + + Fetch(ctx context.Context, s abi.SectorID, ft stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error UnsealPiece(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) error @@ -88,7 +90,7 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg return nil, err } - prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor}, cfg) + prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si}, cfg) if err != nil { return nil, xerrors.Errorf("creating prover instance: %w", err) } @@ -184,13 +186,20 @@ func schedNop(context.Context, Worker) error { return nil } -func schedFetch(sector abi.SectorID, ft stores.SectorFileType, sealing bool, am stores.AcquireMode) func(context.Context, Worker) error { +func schedFetch(sector abi.SectorID, ft stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) func(context.Context, Worker) error { return func(ctx context.Context, worker Worker) error { - return worker.Fetch(ctx, sector, ft, sealing, am) + return worker.Fetch(ctx, sector, ft, ptype, am) } } func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if err := m.index.StorageLock(ctx, sector, stores.FTSealed|stores.FTCache, stores.FTUnsealed); err != nil { + return xerrors.Errorf("acquiring sector lock: %w", err) + } + best, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false) if err != nil { return xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err) @@ -198,7 +207,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect var selector WorkerSelector if len(best) == 0 { // new - selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed) + selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing) } else { // append to existing selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false) } @@ -233,7 +242,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect return xerrors.Errorf("creating readPiece selector: %w", err) } - err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error { + err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { return w.ReadPiece(ctx, sink, sector, offset, size) }) if err != nil { @@ -249,10 +258,17 @@ func (m *Manager) NewSector(ctx context.Context, sector abi.SectorID) error { } func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if err := m.index.StorageLock(ctx, sector, stores.FTNone, stores.FTUnsealed); err != nil { + return abi.PieceInfo{}, xerrors.Errorf("acquiring sector lock: %w", err) + } + var selector WorkerSelector var err error if len(existingPieces) == 0 { // new - selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed) + selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing) } else { // use existing selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false) } @@ -274,14 +290,21 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie } func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if err := m.index.StorageLock(ctx, sector, stores.FTUnsealed, stores.FTSealed|stores.FTCache); err != nil { + return nil, xerrors.Errorf("acquiring sector lock: %w", err) + } + // TODO: also consider where the unsealed data sits - selector, err := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed) + selector, err := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathSealing) if err != nil { return nil, xerrors.Errorf("creating path selector: %w", err) } - err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error { + err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { p, err := w.SealPreCommit1(ctx, sector, ticket, pieces) if err != nil { return err @@ -294,12 +317,19 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke } func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (out storage.SectorCids, err error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if err := m.index.StorageLock(ctx, sector, stores.FTSealed, stores.FTCache); err != nil { + return storage.SectorCids{}, xerrors.Errorf("acquiring sector lock: %w", err) + } + selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, true) if err != nil { return storage.SectorCids{}, xerrors.Errorf("creating path selector: %w", err) } - err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error { + err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { p, err := w.SealPreCommit2(ctx, sector, phase1Out) if err != nil { return err @@ -311,16 +341,22 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase } func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (out storage.Commit1Out, err error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if err := m.index.StorageLock(ctx, sector, stores.FTSealed, stores.FTCache); err != nil { + return storage.Commit1Out{}, xerrors.Errorf("acquiring sector lock: %w", err) + } + + // NOTE: We set allowFetch to false in so that we always execute on a worker + // with direct access to the data. We want to do that because this step is + // generally very cheap / fast, and transferring data is not worth the effort selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, false) if err != nil { return storage.Commit1Out{}, xerrors.Errorf("creating path selector: %w", err) } - // TODO: Try very hard to execute on worker with access to the sectors - // (except, don't.. for now at least - we are using this step to bring data - // into 'provable' storage. Optimally we'd do that in commit2, in parallel - // with snark compute) - err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error { + err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { p, err := w.SealCommit1(ctx, sector, ticket, seed, pieces, cids) if err != nil { return err @@ -347,16 +383,54 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou } func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error { - selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, false) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if err := m.index.StorageLock(ctx, sector, stores.FTNone, stores.FTSealed|stores.FTUnsealed|stores.FTCache); err != nil { + return xerrors.Errorf("acquiring sector lock: %w", err) + } + + unsealed := stores.FTUnsealed + { + unsealedStores, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false) + if err != nil { + return xerrors.Errorf("finding unsealed sector: %w", err) + } + + if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine + unsealed = stores.FTNone + } + } + + selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed|unsealed, false) if err != nil { return xerrors.Errorf("creating path selector: %w", err) } - return m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, - schedFetch(sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, false, stores.AcquireMove), + err = m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, + schedFetch(sector, stores.FTCache|stores.FTSealed|unsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { return w.FinalizeSector(ctx, sector) }) + if err != nil { + return err + } + + fetchSel, err := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathStorage) + if err != nil { + return xerrors.Errorf("creating fetchSel: %w", err) + } + + err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel, + schedFetch(sector, stores.FTCache|stores.FTSealed, stores.PathStorage, stores.AcquireMove), + func(ctx context.Context, w Worker) error { + return w.MoveStorage(ctx, sector) + }) + if err != nil { + return xerrors.Errorf("moving sector to storage: %w", err) + } + + return nil } func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) { diff --git a/mock/mock.go b/mock/mock.go index 26aacce13..2e80bde36 100644 --- a/mock/mock.go +++ b/mock/mock.go @@ -70,17 +70,6 @@ func (mgr *SectorMgr) NewSector(ctx context.Context, sector abi.SectorID) error func (mgr *SectorMgr) AddPiece(ctx context.Context, sectorId abi.SectorID, existingPieces []abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) { log.Warn("Add piece: ", sectorId, size, mgr.proofType) - mgr.lk.Lock() - ss, ok := mgr.sectors[sectorId] - if !ok { - ss = §orState{ - state: statePacking, - } - mgr.sectors[sectorId] = ss - } - mgr.lk.Unlock() - ss.lk.Lock() - defer ss.lk.Unlock() var b bytes.Buffer tr := io.TeeReader(r, &b) @@ -92,9 +81,24 @@ func (mgr *SectorMgr) AddPiece(ctx context.Context, sectorId abi.SectorID, exist log.Warn("Generated Piece CID: ", c) + mgr.lk.Lock() mgr.pieces[c] = b.Bytes() + + ss, ok := mgr.sectors[sectorId] + if !ok { + ss = §orState{ + state: statePacking, + } + mgr.sectors[sectorId] = ss + } + mgr.lk.Unlock() + + ss.lk.Lock() ss.pieces = append(ss.pieces, c) + ss.lk.Unlock() + return abi.PieceInfo{ + Size: size.Padded(), PieceCID: c, }, nil diff --git a/roprov.go b/roprov.go index aba6bb5d9..ad63526c9 100644 --- a/roprov.go +++ b/roprov.go @@ -11,16 +11,22 @@ import ( ) type readonlyProvider struct { - stor *stores.Local - spt abi.RegisteredProof + index stores.SectorIndex + stor *stores.Local + spt abi.RegisteredProof } -func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) { +func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing stores.PathType) (stores.SectorPaths, func(), error) { if allocate != stores.FTNone { return stores.SectorPaths{}, nil, xerrors.New("read-only storage") } - p, _, done, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, stores.PathType(sealing), stores.AcquireMove) + ctx, cancel := context.WithCancel(ctx) + if err := l.index.StorageLock(ctx, id, existing, stores.FTNone); err != nil { + return stores.SectorPaths{}, nil, xerrors.Errorf("acquiring sector lock: %w", err) + } - return p, done, err + p, _, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, sealing, stores.AcquireMove) + + return p, cancel, err } diff --git a/selector_alloc.go b/selector_alloc.go index 0a7850424..543020ef1 100644 --- a/selector_alloc.go +++ b/selector_alloc.go @@ -14,12 +14,14 @@ import ( type allocSelector struct { index stores.SectorIndex alloc stores.SectorFileType + ptype stores.PathType } -func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType) (*allocSelector, error) { +func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType, ptype stores.PathType) (*allocSelector, error) { return &allocSelector{ index: index, alloc: alloc, + ptype: ptype, }, nil } @@ -42,7 +44,7 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi have[path.ID] = struct{}{} } - best, err := s.index.StorageBestAlloc(ctx, s.alloc, spt, true) + best, err := s.index.StorageBestAlloc(ctx, s.alloc, spt, s.ptype) if err != nil { return false, xerrors.Errorf("finding best alloc storage: %w", err) } diff --git a/stores/filetype.go b/stores/filetype.go index 1810054d8..fee8b256f 100644 --- a/stores/filetype.go +++ b/stores/filetype.go @@ -11,6 +11,8 @@ const ( FTUnsealed SectorFileType = 1 << iota FTSealed FTCache + + FileTypes = iota ) const ( @@ -71,6 +73,16 @@ func (t SectorFileType) SealSpaceUse(spt abi.RegisteredProof) (uint64, error) { return need, nil } +func (t SectorFileType) All() [FileTypes]bool { + var out [FileTypes]bool + + for i := range out { + out[i] = t&(1< 0 + } + + return out +} + type SectorPaths struct { Id abi.SectorID diff --git a/stores/http_handler.go b/stores/http_handler.go index 7e2330dbd..60f8a41c5 100644 --- a/stores/http_handler.go +++ b/stores/http_handler.go @@ -69,14 +69,15 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ return } + // The caller has a lock on this sector already, no need to get one here + // passing 0 spt because we don't allocate anything - paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, false, AcquireMove) + paths, _, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, false, AcquireMove) if err != nil { log.Error("%+v", err) w.WriteHeader(500) return } - defer done() path := PathByType(paths, ft) if path == "" { diff --git a/stores/index.go b/stores/index.go index e1e35875d..6ef346367 100644 --- a/stores/index.go +++ b/stores/index.go @@ -59,6 +59,9 @@ type SectorIndex interface { // part of storage-miner api StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, allowFetch bool) ([]SectorStorageInfo, error) StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, pathType PathType) ([]StorageInfo, error) + + // atomically acquire locks on all sector file types. close ctx to unlock + StorageLock(ctx context.Context, sector abi.SectorID, read SectorFileType, write SectorFileType) error } type Decl struct { @@ -80,6 +83,7 @@ type storageEntry struct { } type Index struct { + *indexLocks lk sync.RWMutex sectors map[Decl][]*declMeta @@ -88,6 +92,9 @@ type Index struct { func NewIndex() *Index { return &Index{ + indexLocks: &indexLocks{ + locks: map[abi.SectorID]*sectorLock{}, + }, sectors: map[Decl][]*declMeta{}, stores: map[ID]*storageEntry{}, } diff --git a/stores/index_locks.go b/stores/index_locks.go new file mode 100644 index 000000000..8e1a07a02 --- /dev/null +++ b/stores/index_locks.go @@ -0,0 +1,124 @@ +package stores + +import ( + "context" + "sync" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/specs-actors/actors/abi" +) + +type sectorLock struct { + cond *ctxCond + + r [FileTypes]uint + w SectorFileType + + refs uint // access with indexLocks.lk +} + +func (l *sectorLock) canLock(read SectorFileType, write SectorFileType) bool { + for i, b := range write.All() { + if b && l.r[i] > 0 { + return false + } + } + + // check that there are no locks taken for either read or write file types we want + return l.w&read == 0 && l.w&write == 0 +} + +func (l *sectorLock) tryLock(read SectorFileType, write SectorFileType) bool { + if !l.canLock(read, write) { + return false + } + + for i, set := range read.All() { + if set { + l.r[i]++ + } + } + + l.w |= write + + return true +} + +func (l *sectorLock) lock(ctx context.Context, read SectorFileType, write SectorFileType) error { + l.cond.L.Lock() + defer l.cond.L.Unlock() + + for !l.tryLock(read, write) { + if err := l.cond.Wait(ctx); err != nil { + return err + } + } + + return nil +} + +func (l *sectorLock) unlock(read SectorFileType, write SectorFileType) { + l.cond.L.Lock() + defer l.cond.L.Unlock() + + for i, set := range read.All() { + if set { + l.r[i]-- + } + } + + l.w &= ^write + + l.cond.Broadcast() +} + +type indexLocks struct { + lk sync.Mutex + + locks map[abi.SectorID]*sectorLock +} + +func (i *indexLocks) StorageLock(ctx context.Context, sector abi.SectorID, read SectorFileType, write SectorFileType) error { + if read|write == 0 { + return nil + } + + if read|write > (1<