Merge pull request #3778 from filecoin-project/feat/less-restrictive-read-piece-lock
Allow retrievals while sealing
This commit is contained in:
commit
801e01bca0
44
extern/sector-storage/manager.go
vendored
44
extern/sector-storage/manager.go
vendored
@ -203,25 +203,26 @@ func schedFetch(sector abi.SectorID, ft stores.SectorFileType, ptype stores.Path
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error {
|
||||
func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (foundUnsealed bool, readOk bool, selector WorkerSelector, returnErr error) {
|
||||
|
||||
// acquire a lock purely for reading unsealed sectors
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
if err := m.index.StorageLock(ctx, sector, stores.FTSealed|stores.FTCache, stores.FTUnsealed); err != nil {
|
||||
return xerrors.Errorf("acquiring sector lock: %w", err)
|
||||
if err := m.index.StorageLock(ctx, sector, stores.FTUnsealed, stores.FTNone); err != nil {
|
||||
returnErr = xerrors.Errorf("acquiring read sector lock: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
// passing 0 spt because we only need it when allowFetch is true
|
||||
best, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, 0, false)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err)
|
||||
returnErr = xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
var readOk bool
|
||||
var selector WorkerSelector
|
||||
if len(best) == 0 { // new
|
||||
selector = newAllocSelector(m.index, stores.FTUnsealed, stores.PathSealing)
|
||||
} else { // append to existing
|
||||
foundUnsealed = len(best) > 0
|
||||
if foundUnsealed { // append to existing
|
||||
// There is unsealed sector, see if we can read from it
|
||||
|
||||
selector = newExistingSelector(m.index, sector, stores.FTUnsealed, false)
|
||||
@ -231,12 +232,27 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("reading piece from sealed sector: %w", err)
|
||||
returnErr = xerrors.Errorf("reading piece from sealed sector: %w", err)
|
||||
}
|
||||
} else {
|
||||
selector = newAllocSelector(m.index, stores.FTUnsealed, stores.PathSealing)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if readOk {
|
||||
return nil
|
||||
}
|
||||
func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error {
|
||||
foundUnsealed, readOk, selector, err := m.tryReadUnsealedPiece(ctx, sink, sector, offset, size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if readOk {
|
||||
return nil
|
||||
}
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
if err := m.index.StorageLock(ctx, sector, stores.FTSealed|stores.FTCache, stores.FTUnsealed); err != nil {
|
||||
return xerrors.Errorf("acquiring unseal sector lock: %w", err)
|
||||
}
|
||||
|
||||
unsealFetch := func(ctx context.Context, worker Worker) error {
|
||||
@ -244,7 +260,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
|
||||
return xerrors.Errorf("copy sealed/cache sector data: %w", err)
|
||||
}
|
||||
|
||||
if len(best) > 0 {
|
||||
if foundUnsealed {
|
||||
if err := worker.Fetch(ctx, sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove); err != nil {
|
||||
return xerrors.Errorf("copy unsealed sector data: %w", err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user