index: Implement basic global sector locking system
This commit is contained in:
parent
9df0cdf193
commit
d9d3ccf6c6
@ -311,15 +311,14 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (out storage.Commit1Out, err error) {
|
func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (out storage.Commit1Out, err error) {
|
||||||
|
// NOTE: We set allowFetch to false in so that we always execute on a worker
|
||||||
|
// with direct access to the data. We want to do that because this step is
|
||||||
|
// generally very cheap / fast, and transferring data is not worth the effort
|
||||||
selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, false)
|
selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return storage.Commit1Out{}, xerrors.Errorf("creating path selector: %w", err)
|
return storage.Commit1Out{}, xerrors.Errorf("creating path selector: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Try very hard to execute on worker with access to the sectors
|
|
||||||
// (except, don't.. for now at least - we are using this step to bring data
|
|
||||||
// into 'provable' storage. Optimally we'd do that in commit2, in parallel
|
|
||||||
// with snark compute)
|
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error {
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error {
|
||||||
p, err := w.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
|
p, err := w.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -11,6 +11,8 @@ const (
|
|||||||
FTUnsealed SectorFileType = 1 << iota
|
FTUnsealed SectorFileType = 1 << iota
|
||||||
FTSealed
|
FTSealed
|
||||||
FTCache
|
FTCache
|
||||||
|
|
||||||
|
FileTypes = iota
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -71,6 +73,14 @@ func (t SectorFileType) SealSpaceUse(spt abi.RegisteredProof) (uint64, error) {
|
|||||||
return need, nil
|
return need, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t SectorFileType) All() (out [FileTypes]bool) {
|
||||||
|
for i := range out {
|
||||||
|
out[i] = t & (1 << i) > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
type SectorPaths struct {
|
type SectorPaths struct {
|
||||||
Id abi.SectorID
|
Id abi.SectorID
|
||||||
|
|
||||||
|
@ -59,6 +59,9 @@ type SectorIndex interface { // part of storage-miner api
|
|||||||
StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, allowFetch bool) ([]SectorStorageInfo, error)
|
StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, allowFetch bool) ([]SectorStorageInfo, error)
|
||||||
|
|
||||||
StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, pathType PathType) ([]StorageInfo, error)
|
StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, pathType PathType) ([]StorageInfo, error)
|
||||||
|
|
||||||
|
// atomically acquire locks on all sector file types. close ctx to unlock
|
||||||
|
StorageLock(ctx context.Context, sector abi.SectorID, read SectorFileType, write SectorFileType) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type Decl struct {
|
type Decl struct {
|
||||||
@ -80,6 +83,7 @@ type storageEntry struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Index struct {
|
type Index struct {
|
||||||
|
*indexLocks
|
||||||
lk sync.RWMutex
|
lk sync.RWMutex
|
||||||
|
|
||||||
sectors map[Decl][]*declMeta
|
sectors map[Decl][]*declMeta
|
||||||
@ -88,6 +92,9 @@ type Index struct {
|
|||||||
|
|
||||||
func NewIndex() *Index {
|
func NewIndex() *Index {
|
||||||
return &Index{
|
return &Index{
|
||||||
|
indexLocks: &indexLocks{
|
||||||
|
locks: map[abi.SectorID]*sectorLock{},
|
||||||
|
},
|
||||||
sectors: map[Decl][]*declMeta{},
|
sectors: map[Decl][]*declMeta{},
|
||||||
stores: map[ID]*storageEntry{},
|
stores: map[ID]*storageEntry{},
|
||||||
}
|
}
|
||||||
|
127
stores/index_locks.go
Normal file
127
stores/index_locks.go
Normal file
@ -0,0 +1,127 @@
|
|||||||
|
package stores
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
|
)
|
||||||
|
|
||||||
|
type sectorLock struct {
|
||||||
|
lk sync.Mutex
|
||||||
|
notif *ctxCond
|
||||||
|
|
||||||
|
r [FileTypes]uint
|
||||||
|
w SectorFileType
|
||||||
|
|
||||||
|
refs uint // access with indexLocks.lk
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *sectorLock) canLock(read SectorFileType, write SectorFileType) bool {
|
||||||
|
for i, b := range write.All() {
|
||||||
|
if b && l.r[i] > 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return l.w & (read | write) == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *sectorLock) tryLock(read SectorFileType, write SectorFileType) bool {
|
||||||
|
if !l.canLock(read, write) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, set := range read.All() {
|
||||||
|
if set {
|
||||||
|
l.r[i]++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
l.w |= write
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *sectorLock) lock(ctx context.Context, read SectorFileType, write SectorFileType) error {
|
||||||
|
l.lk.Lock()
|
||||||
|
defer l.lk.Unlock()
|
||||||
|
|
||||||
|
for {
|
||||||
|
if l.tryLock(read, write) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := l.notif.Wait(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *sectorLock) unlock(read SectorFileType, write SectorFileType) {
|
||||||
|
l.lk.Lock()
|
||||||
|
defer l.lk.Unlock()
|
||||||
|
|
||||||
|
for i, set := range read.All() {
|
||||||
|
if set {
|
||||||
|
l.r[i]--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
l.w &= ^write
|
||||||
|
|
||||||
|
l.notif.Broadcast()
|
||||||
|
}
|
||||||
|
|
||||||
|
type indexLocks struct {
|
||||||
|
lk sync.Mutex
|
||||||
|
|
||||||
|
locks map[abi.SectorID]*sectorLock
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *indexLocks) StorageLock(ctx context.Context, sector abi.SectorID, read SectorFileType, write SectorFileType) error {
|
||||||
|
if read | write == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if read | write > (1 << FileTypes) - 1 {
|
||||||
|
return xerrors.Errorf("unknown file types specified")
|
||||||
|
}
|
||||||
|
|
||||||
|
i.lk.Lock()
|
||||||
|
slk, ok := i.locks[sector]
|
||||||
|
if !ok {
|
||||||
|
slk = §orLock{}
|
||||||
|
slk.notif = newCtxCond(&slk.lk)
|
||||||
|
i.locks[sector] = slk
|
||||||
|
}
|
||||||
|
|
||||||
|
slk.refs++
|
||||||
|
|
||||||
|
i.lk.Unlock()
|
||||||
|
|
||||||
|
if err := slk.lock(ctx, read, write); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
// TODO: we can avoid this goroutine with a bit of creativity and reflect
|
||||||
|
|
||||||
|
<-ctx.Done()
|
||||||
|
i.lk.Lock()
|
||||||
|
|
||||||
|
slk.unlock(read, write)
|
||||||
|
slk.refs--
|
||||||
|
|
||||||
|
if slk.refs == 0 {
|
||||||
|
delete(i.locks, sector)
|
||||||
|
}
|
||||||
|
|
||||||
|
i.lk.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
168
stores/index_locks_test.go
Normal file
168
stores/index_locks_test.go
Normal file
@ -0,0 +1,168 @@
|
|||||||
|
package stores
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
|
)
|
||||||
|
|
||||||
|
var aSector = abi.SectorID{
|
||||||
|
Miner: 2,
|
||||||
|
Number: 9000,
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCanLock(t *testing.T) {
|
||||||
|
lk := sectorLock{
|
||||||
|
r: [FileTypes]uint{},
|
||||||
|
w: FTNone,
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, true, lk.canLock(FTUnsealed, FTNone))
|
||||||
|
require.Equal(t, true, lk.canLock(FTNone, FTUnsealed))
|
||||||
|
|
||||||
|
ftAll := FTUnsealed | FTSealed | FTCache
|
||||||
|
|
||||||
|
require.Equal(t, true, lk.canLock(ftAll, FTNone))
|
||||||
|
require.Equal(t, true, lk.canLock(FTNone, ftAll))
|
||||||
|
|
||||||
|
lk.r[0] = 1 // unsealed read taken
|
||||||
|
|
||||||
|
require.Equal(t, true, lk.canLock(FTUnsealed, FTNone))
|
||||||
|
require.Equal(t, false, lk.canLock(FTNone, FTUnsealed))
|
||||||
|
|
||||||
|
require.Equal(t, true, lk.canLock(ftAll, FTNone))
|
||||||
|
require.Equal(t, false, lk.canLock(FTNone, ftAll))
|
||||||
|
|
||||||
|
require.Equal(t, true, lk.canLock(FTNone, FTSealed | FTCache))
|
||||||
|
require.Equal(t, true, lk.canLock(FTUnsealed, FTSealed | FTCache))
|
||||||
|
|
||||||
|
lk.r[0] = 0
|
||||||
|
|
||||||
|
lk.w = FTSealed
|
||||||
|
|
||||||
|
require.Equal(t, true, lk.canLock(FTUnsealed, FTNone))
|
||||||
|
require.Equal(t, true, lk.canLock(FTNone, FTUnsealed))
|
||||||
|
|
||||||
|
require.Equal(t, false, lk.canLock(FTSealed, FTNone))
|
||||||
|
require.Equal(t, false, lk.canLock(FTNone, FTSealed))
|
||||||
|
|
||||||
|
require.Equal(t, false, lk.canLock(ftAll, FTNone))
|
||||||
|
require.Equal(t, false, lk.canLock(FTNone, ftAll))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestIndexLocksSeq(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
|
||||||
|
ilk := &indexLocks{
|
||||||
|
locks: map[abi.SectorID]*sectorLock{},
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed))
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
|
||||||
|
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed))
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed))
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
|
||||||
|
require.NoError(t, ilk.StorageLock(ctx, aSector, FTUnsealed, FTNone))
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed))
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
|
||||||
|
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed))
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestIndexLocksBlockOn(t *testing.T) {
|
||||||
|
test := func(r1 SectorFileType, w1 SectorFileType, r2 SectorFileType, w2 SectorFileType) func(t *testing.T) {
|
||||||
|
return func(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
ilk := &indexLocks{
|
||||||
|
locks: map[abi.SectorID]*sectorLock{},
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, ilk.StorageLock(ctx, aSector, r1, w1))
|
||||||
|
|
||||||
|
sch := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
sch <- struct{}{}
|
||||||
|
|
||||||
|
require.NoError(t, ilk.StorageLock(ctx, aSector, r2, w2))
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
sch <- struct{}{}
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-sch
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-sch:
|
||||||
|
t.Fatal("that shouldn't happen")
|
||||||
|
case <-time.After(40 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-sch:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("timed out")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("readBlocksWrite", test(FTUnsealed, FTNone, FTNone, FTUnsealed))
|
||||||
|
t.Run("writeBlocksRead", test(FTNone, FTUnsealed, FTUnsealed, FTNone))
|
||||||
|
t.Run("writeBlocksWrite", test(FTNone, FTUnsealed, FTNone, FTUnsealed))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestIndexLocksBlockWonR(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
ilk := &indexLocks{
|
||||||
|
locks: map[abi.SectorID]*sectorLock{},
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, ilk.StorageLock(ctx, aSector, FTUnsealed, FTNone))
|
||||||
|
|
||||||
|
sch := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
sch <- struct{}{}
|
||||||
|
|
||||||
|
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed))
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
sch <- struct{}{}
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-sch
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-sch:
|
||||||
|
t.Fatal("that shouldn't happen")
|
||||||
|
case <-time.After(40 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-sch:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("timed out")
|
||||||
|
}
|
||||||
|
}
|
49
stores/index_locks_util.go
Normal file
49
stores/index_locks_util.go
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
package stores
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// like sync.Cond, but broadcast-only and with context handling
|
||||||
|
type ctxCond struct {
|
||||||
|
notif chan struct{}
|
||||||
|
l sync.Locker
|
||||||
|
|
||||||
|
lk sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func newCtxCond(l sync.Locker) *ctxCond {
|
||||||
|
return &ctxCond{
|
||||||
|
l: l,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ctxCond) Broadcast() {
|
||||||
|
c.lk.Lock()
|
||||||
|
if c.notif != nil {
|
||||||
|
close(c.notif)
|
||||||
|
c.notif = nil
|
||||||
|
}
|
||||||
|
c.lk.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ctxCond) Wait(ctx context.Context) error {
|
||||||
|
c.lk.Lock()
|
||||||
|
if c.notif == nil {
|
||||||
|
c.notif = make(chan struct{})
|
||||||
|
}
|
||||||
|
|
||||||
|
wait := c.notif
|
||||||
|
c.lk.Unlock()
|
||||||
|
|
||||||
|
c.l.Unlock()
|
||||||
|
defer c.l.Lock()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-wait:
|
||||||
|
return nil
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user