diff --git a/manager.go b/manager.go index 07484e041..8302427c7 100644 --- a/manager.go +++ b/manager.go @@ -311,15 +311,14 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase } func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (out storage.Commit1Out, err error) { + // NOTE: We set allowFetch to false in so that we always execute on a worker + // with direct access to the data. We want to do that because this step is + // generally very cheap / fast, and transferring data is not worth the effort selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, false) if err != nil { return storage.Commit1Out{}, xerrors.Errorf("creating path selector: %w", err) } - // TODO: Try very hard to execute on worker with access to the sectors - // (except, don't.. for now at least - we are using this step to bring data - // into 'provable' storage. Optimally we'd do that in commit2, in parallel - // with snark compute) err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error { p, err := w.SealCommit1(ctx, sector, ticket, seed, pieces, cids) if err != nil { diff --git a/stores/filetype.go b/stores/filetype.go index 1810054d8..e3cc4042c 100644 --- a/stores/filetype.go +++ b/stores/filetype.go @@ -11,6 +11,8 @@ const ( FTUnsealed SectorFileType = 1 << iota FTSealed FTCache + + FileTypes = iota ) const ( @@ -71,6 +73,14 @@ func (t SectorFileType) SealSpaceUse(spt abi.RegisteredProof) (uint64, error) { return need, nil } +func (t SectorFileType) All() (out [FileTypes]bool) { + for i := range out { + out[i] = t & (1 << i) > 0 + } + + return out +} + type SectorPaths struct { Id abi.SectorID diff --git a/stores/index.go b/stores/index.go index e1e35875d..c6856ef8e 100644 --- a/stores/index.go +++ b/stores/index.go @@ -59,6 +59,9 @@ type SectorIndex interface { // part of storage-miner api StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, allowFetch bool) ([]SectorStorageInfo, error) StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, pathType PathType) ([]StorageInfo, error) + + // atomically acquire locks on all sector file types. close ctx to unlock + StorageLock(ctx context.Context, sector abi.SectorID, read SectorFileType, write SectorFileType) error } type Decl struct { @@ -80,6 +83,7 @@ type storageEntry struct { } type Index struct { + *indexLocks lk sync.RWMutex sectors map[Decl][]*declMeta @@ -88,8 +92,11 @@ type Index struct { func NewIndex() *Index { return &Index{ - sectors: map[Decl][]*declMeta{}, - stores: map[ID]*storageEntry{}, + indexLocks: &indexLocks{ + locks: map[abi.SectorID]*sectorLock{}, + }, + sectors: map[Decl][]*declMeta{}, + stores: map[ID]*storageEntry{}, } } diff --git a/stores/index_locks.go b/stores/index_locks.go new file mode 100644 index 000000000..f7770d5e5 --- /dev/null +++ b/stores/index_locks.go @@ -0,0 +1,127 @@ +package stores + +import ( + "context" + "sync" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/specs-actors/actors/abi" +) + +type sectorLock struct { + lk sync.Mutex + notif *ctxCond + + r [FileTypes]uint + w SectorFileType + + refs uint // access with indexLocks.lk +} + +func (l *sectorLock) canLock(read SectorFileType, write SectorFileType) bool { + for i, b := range write.All() { + if b && l.r[i] > 0 { + return false + } + } + + return l.w & (read | write) == 0 +} + +func (l *sectorLock) tryLock(read SectorFileType, write SectorFileType) bool { + if !l.canLock(read, write) { + return false + } + + for i, set := range read.All() { + if set { + l.r[i]++ + } + } + + l.w |= write + + return true +} + +func (l *sectorLock) lock(ctx context.Context, read SectorFileType, write SectorFileType) error { + l.lk.Lock() + defer l.lk.Unlock() + + for { + if l.tryLock(read, write) { + return nil + } + + if err := l.notif.Wait(ctx); err != nil { + return err + } + } +} + +func (l *sectorLock) unlock(read SectorFileType, write SectorFileType) { + l.lk.Lock() + defer l.lk.Unlock() + + for i, set := range read.All() { + if set { + l.r[i]-- + } + } + + l.w &= ^write + + l.notif.Broadcast() +} + +type indexLocks struct { + lk sync.Mutex + + locks map[abi.SectorID]*sectorLock +} + +func (i *indexLocks) StorageLock(ctx context.Context, sector abi.SectorID, read SectorFileType, write SectorFileType) error { + if read | write == 0 { + return nil + } + + if read | write > (1 << FileTypes) - 1 { + return xerrors.Errorf("unknown file types specified") + } + + i.lk.Lock() + slk, ok := i.locks[sector] + if !ok { + slk = §orLock{} + slk.notif = newCtxCond(&slk.lk) + i.locks[sector] = slk + } + + slk.refs++ + + i.lk.Unlock() + + if err := slk.lock(ctx, read, write); err != nil { + return err + } + + go func() { + // TODO: we can avoid this goroutine with a bit of creativity and reflect + + <-ctx.Done() + i.lk.Lock() + + slk.unlock(read, write) + slk.refs-- + + if slk.refs == 0 { + delete(i.locks, sector) + } + + i.lk.Unlock() + }() + + return nil +} + diff --git a/stores/index_locks_test.go b/stores/index_locks_test.go new file mode 100644 index 000000000..aeeddd137 --- /dev/null +++ b/stores/index_locks_test.go @@ -0,0 +1,168 @@ +package stores + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/specs-actors/actors/abi" +) + +var aSector = abi.SectorID{ + Miner: 2, + Number: 9000, +} + +func TestCanLock(t *testing.T) { + lk := sectorLock{ + r: [FileTypes]uint{}, + w: FTNone, + } + + require.Equal(t, true, lk.canLock(FTUnsealed, FTNone)) + require.Equal(t, true, lk.canLock(FTNone, FTUnsealed)) + + ftAll := FTUnsealed | FTSealed | FTCache + + require.Equal(t, true, lk.canLock(ftAll, FTNone)) + require.Equal(t, true, lk.canLock(FTNone, ftAll)) + + lk.r[0] = 1 // unsealed read taken + + require.Equal(t, true, lk.canLock(FTUnsealed, FTNone)) + require.Equal(t, false, lk.canLock(FTNone, FTUnsealed)) + + require.Equal(t, true, lk.canLock(ftAll, FTNone)) + require.Equal(t, false, lk.canLock(FTNone, ftAll)) + + require.Equal(t, true, lk.canLock(FTNone, FTSealed | FTCache)) + require.Equal(t, true, lk.canLock(FTUnsealed, FTSealed | FTCache)) + + lk.r[0] = 0 + + lk.w = FTSealed + + require.Equal(t, true, lk.canLock(FTUnsealed, FTNone)) + require.Equal(t, true, lk.canLock(FTNone, FTUnsealed)) + + require.Equal(t, false, lk.canLock(FTSealed, FTNone)) + require.Equal(t, false, lk.canLock(FTNone, FTSealed)) + + require.Equal(t, false, lk.canLock(ftAll, FTNone)) + require.Equal(t, false, lk.canLock(FTNone, ftAll)) +} + +func TestIndexLocksSeq(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + + ilk := &indexLocks{ + locks: map[abi.SectorID]*sectorLock{}, + } + + require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed)) + cancel() + + ctx, cancel = context.WithTimeout(context.Background(), time.Second) + require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed)) + cancel() + + require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed)) + cancel() + + ctx, cancel = context.WithTimeout(context.Background(), time.Second) + require.NoError(t, ilk.StorageLock(ctx, aSector, FTUnsealed, FTNone)) + cancel() + + require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed)) + cancel() + + ctx, cancel = context.WithTimeout(context.Background(), time.Second) + require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed)) + cancel() +} + +func TestIndexLocksBlockOn(t *testing.T) { + test := func(r1 SectorFileType, w1 SectorFileType, r2 SectorFileType, w2 SectorFileType) func(t *testing.T) { + return func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + ilk := &indexLocks{ + locks: map[abi.SectorID]*sectorLock{}, + } + + require.NoError(t, ilk.StorageLock(ctx, aSector, r1, w1)) + + sch := make(chan struct{}) + go func() { + ctx, cancel := context.WithCancel(context.Background()) + + sch <- struct{}{} + + require.NoError(t, ilk.StorageLock(ctx, aSector, r2, w2)) + cancel() + + sch <- struct{}{} + }() + + <-sch + + select { + case <-sch: + t.Fatal("that shouldn't happen") + case <-time.After(40 * time.Millisecond): + } + + cancel() + + select { + case <-sch: + case <-time.After(time.Second): + t.Fatal("timed out") + } + } + } + + t.Run("readBlocksWrite", test(FTUnsealed, FTNone, FTNone, FTUnsealed)) + t.Run("writeBlocksRead", test(FTNone, FTUnsealed, FTUnsealed, FTNone)) + t.Run("writeBlocksWrite", test(FTNone, FTUnsealed, FTNone, FTUnsealed)) +} + +func TestIndexLocksBlockWonR(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + ilk := &indexLocks{ + locks: map[abi.SectorID]*sectorLock{}, + } + + require.NoError(t, ilk.StorageLock(ctx, aSector, FTUnsealed, FTNone)) + + sch := make(chan struct{}) + go func() { + ctx, cancel := context.WithCancel(context.Background()) + + sch <- struct{}{} + + require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed)) + cancel() + + sch <- struct{}{} + }() + + <-sch + + select { + case <-sch: + t.Fatal("that shouldn't happen") + case <-time.After(40 * time.Millisecond): + } + + cancel() + + select { + case <-sch: + case <-time.After(time.Second): + t.Fatal("timed out") + } +} diff --git a/stores/index_locks_util.go b/stores/index_locks_util.go new file mode 100644 index 000000000..5e4ab6ab2 --- /dev/null +++ b/stores/index_locks_util.go @@ -0,0 +1,49 @@ +package stores + +import ( + "context" + "sync" +) + +// like sync.Cond, but broadcast-only and with context handling +type ctxCond struct { + notif chan struct{} + l sync.Locker + + lk sync.Mutex +} + +func newCtxCond(l sync.Locker) *ctxCond { + return &ctxCond{ + l: l, + } +} + +func (c *ctxCond) Broadcast() { + c.lk.Lock() + if c.notif != nil { + close(c.notif) + c.notif = nil + } + c.lk.Unlock() +} + +func (c *ctxCond) Wait(ctx context.Context) error { + c.lk.Lock() + if c.notif == nil { + c.notif = make(chan struct{}) + } + + wait := c.notif + c.lk.Unlock() + + c.l.Unlock() + defer c.l.Lock() + + select { + case <-wait: + return nil + case <-ctx.Done(): + return ctx.Err() + } +}