Integrate index locks into manager

This commit is contained in:
Łukasz Magiera 2020-06-03 22:00:34 +02:00
parent d9d3ccf6c6
commit a39bc94c58
7 changed files with 69 additions and 17 deletions

View File

@ -88,7 +88,7 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg
return nil, err
}
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor}, cfg)
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si}, cfg)
if err != nil {
return nil, xerrors.Errorf("creating prover instance: %w", err)
}
@ -191,6 +191,13 @@ func schedFetch(sector abi.SectorID, ft stores.SectorFileType, sealing bool, am
}
func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := m.index.StorageLock(ctx, sector, stores.FTSealed|stores.FTCache, stores.FTUnsealed); err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
}
best, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false)
if err != nil {
return xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err)
@ -249,6 +256,13 @@ func (m *Manager) NewSector(ctx context.Context, sector abi.SectorID) error {
}
func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := m.index.StorageLock(ctx, sector, stores.FTNone, stores.FTUnsealed); err != nil {
return abi.PieceInfo{}, xerrors.Errorf("acquiring sector lock: %w", err)
}
var selector WorkerSelector
var err error
if len(existingPieces) == 0 { // new
@ -274,6 +288,13 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
}
func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := m.index.StorageLock(ctx, sector, stores.FTUnsealed, stores.FTSealed|stores.FTCache); err != nil {
return nil, xerrors.Errorf("acquiring sector lock: %w", err)
}
// TODO: also consider where the unsealed data sits
selector, err := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed)
@ -294,6 +315,13 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
}
func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (out storage.SectorCids, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := m.index.StorageLock(ctx, sector, stores.FTSealed, stores.FTCache); err != nil {
return storage.SectorCids{}, xerrors.Errorf("acquiring sector lock: %w", err)
}
selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, true)
if err != nil {
return storage.SectorCids{}, xerrors.Errorf("creating path selector: %w", err)
@ -311,6 +339,13 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
}
func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (out storage.Commit1Out, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := m.index.StorageLock(ctx, sector, stores.FTSealed, stores.FTCache); err != nil {
return storage.Commit1Out{}, xerrors.Errorf("acquiring sector lock: %w", err)
}
// NOTE: We set allowFetch to false in so that we always execute on a worker
// with direct access to the data. We want to do that because this step is
// generally very cheap / fast, and transferring data is not worth the effort
@ -346,6 +381,13 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou
}
func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := m.index.StorageLock(ctx, sector, stores.FTSealed, stores.FTUnsealed|stores.FTCache); err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
}
selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, false)
if err != nil {
return xerrors.Errorf("creating path selector: %w", err)

View File

@ -11,6 +11,7 @@ import (
)
type readonlyProvider struct {
index stores.SectorIndex
stor *stores.Local
spt abi.RegisteredProof
}
@ -20,7 +21,15 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, e
return stores.SectorPaths{}, nil, xerrors.New("read-only storage")
}
ctx, cancel := context.WithCancel(ctx)
if err := l.index.StorageLock(ctx, id, existing, stores.FTNone); err != nil {
return stores.SectorPaths{}, nil, xerrors.Errorf("acquiring sector lock: %w", err)
}
p, _, done, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, stores.PathType(sealing), stores.AcquireMove)
return p, done, err
return p, func() {
cancel()
done()
}, err
}

View File

@ -124,4 +124,3 @@ func (i *indexLocks) StorageLock(ctx context.Context, sector abi.SectorID, read
return nil
}

View File

@ -68,6 +68,7 @@ func TestIndexLocksSeq(t *testing.T) {
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed))
cancel()
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed))
cancel()
@ -75,6 +76,7 @@ func TestIndexLocksSeq(t *testing.T) {
require.NoError(t, ilk.StorageLock(ctx, aSector, FTUnsealed, FTNone))
cancel()
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed))
cancel()