Try to not unseal in ReadPiece when don't need to
This commit is contained in:
parent
33dfb9a9cc
commit
3d2084ab93
@ -279,6 +279,25 @@ func (pf *partialFile) Allocated() (rlepluslazy.RunIterator, error) {
|
|||||||
return pf.allocated.RunIterator()
|
return pf.allocated.RunIterator()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pf *partialFile) HasAllocated(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
|
||||||
|
have, err := pf.Allocated()
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
u, err := rlepluslazy.Union(have, pieceRun(offset.Padded(), size.Padded()))
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
uc, err := rlepluslazy.Count(u)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return abi.PaddedPieceSize(uc) == size.Padded(), nil
|
||||||
|
}
|
||||||
|
|
||||||
func pieceRun(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) rlepluslazy.RunIterator {
|
func pieceRun(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) rlepluslazy.RunIterator {
|
||||||
var runs []rlepluslazy.Run
|
var runs []rlepluslazy.Run
|
||||||
if offset > 0 {
|
if offset > 0 {
|
||||||
|
@ -361,10 +361,10 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
|
func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
|
||||||
path, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, stores.PathStorage)
|
path, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, stores.PathStorage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("acquire unsealed sector path: %w", err)
|
return false, xerrors.Errorf("acquire unsealed sector path: %w", err)
|
||||||
}
|
}
|
||||||
defer done()
|
defer done()
|
||||||
|
|
||||||
@ -372,30 +372,41 @@ func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.Se
|
|||||||
|
|
||||||
pf, err := openPartialFile(maxPieceSize, path.Unsealed)
|
pf, err := openPartialFile(maxPieceSize, path.Unsealed)
|
||||||
if xerrors.Is(err, os.ErrNotExist) {
|
if xerrors.Is(err, os.ErrNotExist) {
|
||||||
return xerrors.Errorf("opening partial file: %w", err)
|
return false, xerrors.Errorf("opening partial file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ok, err := pf.HasAllocated(offset, size)
|
||||||
|
if err != nil {
|
||||||
|
pf.Close()
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
pf.Close()
|
||||||
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
f, err := pf.Reader(offset.Padded(), size.Padded())
|
f, err := pf.Reader(offset.Padded(), size.Padded())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
pf.Close()
|
pf.Close()
|
||||||
return xerrors.Errorf("getting partial file reader: %w", err)
|
return false, xerrors.Errorf("getting partial file reader: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
upr, err := fr32.NewUnpadReader(f, size.Padded())
|
upr, err := fr32.NewUnpadReader(f, size.Padded())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("creating unpadded reader: %w", err)
|
return false, xerrors.Errorf("creating unpadded reader: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := io.CopyN(writer, upr, int64(size)); err != nil {
|
if _, err := io.CopyN(writer, upr, int64(size)); err != nil {
|
||||||
pf.Close()
|
pf.Close()
|
||||||
return xerrors.Errorf("reading unsealed file: %w", err)
|
return false, xerrors.Errorf("reading unsealed file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := pf.Close(); err != nil {
|
if err := pf.Close(); err != nil {
|
||||||
return xerrors.Errorf("closing partial file: %w", err)
|
return false, xerrors.Errorf("closing partial file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
|
func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
|
||||||
|
@ -29,7 +29,7 @@ type Storage interface {
|
|||||||
StorageSealer
|
StorageSealer
|
||||||
|
|
||||||
UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error
|
UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error
|
||||||
ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error
|
ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Verifier interface {
|
type Verifier interface {
|
||||||
|
@ -237,10 +237,10 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, inde
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
|
func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
|
||||||
sb, err := l.sb()
|
sb, err := l.sb()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return sb.ReadPiece(ctx, writer, sector, index, size)
|
return sb.ReadPiece(ctx, writer, sector, index, size)
|
||||||
|
32
manager.go
32
manager.go
@ -34,7 +34,7 @@ type Worker interface {
|
|||||||
|
|
||||||
Fetch(ctx context.Context, s abi.SectorID, ft stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error
|
Fetch(ctx context.Context, s abi.SectorID, ft stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error
|
||||||
UnsealPiece(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
|
UnsealPiece(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
|
||||||
ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) error
|
ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (bool, error)
|
||||||
|
|
||||||
TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
|
TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
|
||||||
|
|
||||||
@ -221,7 +221,28 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
|
|||||||
return xerrors.Errorf("creating unsealPiece selector: %w", err)
|
return xerrors.Errorf("creating unsealPiece selector: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Optimization: don't send unseal to a worker if the requested range is already unsealed
|
var readOk bool
|
||||||
|
|
||||||
|
if len(best) > 0 {
|
||||||
|
// There is unsealed sector, see if we can read from it
|
||||||
|
|
||||||
|
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("creating readPiece selector: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error {
|
||||||
|
readOk, err = w.ReadPiece(ctx, sink, sector, offset, size)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("reading piece from sealed sector: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if readOk {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
unsealFetch := func(ctx context.Context, worker Worker) error {
|
unsealFetch := func(ctx context.Context, worker Worker) error {
|
||||||
if err := worker.Fetch(ctx, sector, stores.FTSealed|stores.FTCache, stores.PathSealing, stores.AcquireCopy); err != nil {
|
if err := worker.Fetch(ctx, sector, stores.FTSealed|stores.FTCache, stores.PathSealing, stores.AcquireCopy); err != nil {
|
||||||
@ -249,12 +270,17 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
|
|||||||
}
|
}
|
||||||
|
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error {
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error {
|
||||||
return w.ReadPiece(ctx, sink, sector, offset, size)
|
readOk, err = w.ReadPiece(ctx, sink, sector, offset, size)
|
||||||
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("reading piece from sealed sector: %w", err)
|
return xerrors.Errorf("reading piece from sealed sector: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if readOk {
|
||||||
|
return xerrors.Errorf("failed to read unsealed piece")
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,7 +120,7 @@ func (t *trackedWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index
|
|||||||
return t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid)
|
return t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
|
func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
|
||||||
defer t.tracker.track(id, sealtasks.TTReadUnsealed)()
|
defer t.tracker.track(id, sealtasks.TTReadUnsealed)()
|
||||||
|
|
||||||
return t.Worker.ReadPiece(ctx, writer, id, index, size)
|
return t.Worker.ReadPiece(ctx, writer, id, index, size)
|
||||||
|
Loading…
Reference in New Issue
Block a user