Merge pull request #88 from filecoin-project/fix/readpiece-nounseal
Try to not unseal in ReadPiece when don't need to
This commit is contained in:
commit
898a72d078
@ -279,6 +279,25 @@ func (pf *partialFile) Allocated() (rlepluslazy.RunIterator, error) {
|
||||
return pf.allocated.RunIterator()
|
||||
}
|
||||
|
||||
func (pf *partialFile) HasAllocated(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
|
||||
have, err := pf.Allocated()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
u, err := rlepluslazy.And(have, pieceRun(offset.Padded(), size.Padded()))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
uc, err := rlepluslazy.Count(u)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return abi.PaddedPieceSize(uc) == size.Padded(), nil
|
||||
}
|
||||
|
||||
func pieceRun(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) rlepluslazy.RunIterator {
|
||||
var runs []rlepluslazy.Run
|
||||
if offset > 0 {
|
||||
|
@ -361,10 +361,10 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
|
||||
func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
|
||||
path, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, stores.PathStorage)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquire unsealed sector path: %w", err)
|
||||
return false, xerrors.Errorf("acquire unsealed sector path: %w", err)
|
||||
}
|
||||
defer done()
|
||||
|
||||
@ -372,30 +372,41 @@ func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.Se
|
||||
|
||||
pf, err := openPartialFile(maxPieceSize, path.Unsealed)
|
||||
if xerrors.Is(err, os.ErrNotExist) {
|
||||
return xerrors.Errorf("opening partial file: %w", err)
|
||||
return false, xerrors.Errorf("opening partial file: %w", err)
|
||||
}
|
||||
|
||||
ok, err := pf.HasAllocated(offset, size)
|
||||
if err != nil {
|
||||
pf.Close()
|
||||
return false, err
|
||||
}
|
||||
|
||||
if !ok {
|
||||
pf.Close()
|
||||
return false, nil
|
||||
}
|
||||
|
||||
f, err := pf.Reader(offset.Padded(), size.Padded())
|
||||
if err != nil {
|
||||
pf.Close()
|
||||
return xerrors.Errorf("getting partial file reader: %w", err)
|
||||
return false, xerrors.Errorf("getting partial file reader: %w", err)
|
||||
}
|
||||
|
||||
upr, err := fr32.NewUnpadReader(f, size.Padded())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("creating unpadded reader: %w", err)
|
||||
return false, xerrors.Errorf("creating unpadded reader: %w", err)
|
||||
}
|
||||
|
||||
if _, err := io.CopyN(writer, upr, int64(size)); err != nil {
|
||||
pf.Close()
|
||||
return xerrors.Errorf("reading unsealed file: %w", err)
|
||||
return false, xerrors.Errorf("reading unsealed file: %w", err)
|
||||
}
|
||||
|
||||
if err := pf.Close(); err != nil {
|
||||
return xerrors.Errorf("closing partial file: %w", err)
|
||||
return false, xerrors.Errorf("closing partial file: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
|
||||
|
@ -111,7 +111,7 @@ func (s *seal) unseal(t *testing.T, sb *Sealer, sp *basicfs.Provider, si abi.Sec
|
||||
defer done()
|
||||
|
||||
var b bytes.Buffer
|
||||
err := sb.ReadPiece(context.TODO(), &b, si, 0, 1016)
|
||||
_, err := sb.ReadPiece(context.TODO(), &b, si, 0, 1016)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -130,7 +130,7 @@ func (s *seal) unseal(t *testing.T, sb *Sealer, sp *basicfs.Provider, si abi.Sec
|
||||
}
|
||||
sd()
|
||||
|
||||
err = sb.ReadPiece(context.TODO(), &b, si, 0, 1016)
|
||||
_, err = sb.ReadPiece(context.TODO(), &b, si, 0, 1016)
|
||||
if err == nil {
|
||||
t.Fatal("HOW?!")
|
||||
}
|
||||
@ -141,7 +141,7 @@ func (s *seal) unseal(t *testing.T, sb *Sealer, sp *basicfs.Provider, si abi.Sec
|
||||
}
|
||||
|
||||
b.Reset()
|
||||
err = sb.ReadPiece(context.TODO(), &b, si, 0, 1016)
|
||||
_, err = sb.ReadPiece(context.TODO(), &b, si, 0, 1016)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -150,14 +150,17 @@ func (s *seal) unseal(t *testing.T, sb *Sealer, sp *basicfs.Provider, si abi.Sec
|
||||
require.Equal(t, expect, b.Bytes())
|
||||
|
||||
b.Reset()
|
||||
err = sb.ReadPiece(context.TODO(), &b, si, 0, 2032)
|
||||
have, err := sb.ReadPiece(context.TODO(), &b, si, 0, 2032)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
expect = append(expect, bytes.Repeat([]byte{0}, 1016)...)
|
||||
if !bytes.Equal(b.Bytes(), expect) {
|
||||
t.Fatal("read wrong bytes")
|
||||
if have {
|
||||
t.Errorf("didn't expect to read things")
|
||||
}
|
||||
|
||||
if b.Len() != 0 {
|
||||
t.Fatal("read bytes")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -29,7 +29,7 @@ type Storage interface {
|
||||
StorageSealer
|
||||
|
||||
UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error
|
||||
ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error
|
||||
ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
|
||||
}
|
||||
|
||||
type Verifier interface {
|
||||
|
@ -237,10 +237,10 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, inde
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
|
||||
func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
|
||||
sb, err := l.sb()
|
||||
if err != nil {
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
|
||||
return sb.ReadPiece(ctx, writer, sector, index, size)
|
||||
|
32
manager.go
32
manager.go
@ -34,7 +34,7 @@ type Worker interface {
|
||||
|
||||
Fetch(ctx context.Context, s abi.SectorID, ft stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error
|
||||
UnsealPiece(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
|
||||
ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) error
|
||||
ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (bool, error)
|
||||
|
||||
TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
|
||||
|
||||
@ -221,7 +221,28 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
|
||||
return xerrors.Errorf("creating unsealPiece selector: %w", err)
|
||||
}
|
||||
|
||||
// TODO: Optimization: don't send unseal to a worker if the requested range is already unsealed
|
||||
var readOk bool
|
||||
|
||||
if len(best) > 0 {
|
||||
// There is unsealed sector, see if we can read from it
|
||||
|
||||
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("creating readPiece selector: %w", err)
|
||||
}
|
||||
|
||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error {
|
||||
readOk, err = w.ReadPiece(ctx, sink, sector, offset, size)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("reading piece from sealed sector: %w", err)
|
||||
}
|
||||
|
||||
if readOk {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
unsealFetch := func(ctx context.Context, worker Worker) error {
|
||||
if err := worker.Fetch(ctx, sector, stores.FTSealed|stores.FTCache, stores.PathSealing, stores.AcquireCopy); err != nil {
|
||||
@ -249,12 +270,17 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
|
||||
}
|
||||
|
||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error {
|
||||
return w.ReadPiece(ctx, sink, sector, offset, size)
|
||||
readOk, err = w.ReadPiece(ctx, sink, sector, offset, size)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("reading piece from sealed sector: %w", err)
|
||||
}
|
||||
|
||||
if readOk {
|
||||
return xerrors.Errorf("failed to read unsealed piece")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -88,7 +88,7 @@ func (s *schedTestWorker) UnsealPiece(ctx context.Context, id abi.SectorID, inde
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s *schedTestWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
|
||||
func (s *schedTestWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
|
@ -53,7 +53,7 @@ func (t *testWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index sto
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *testWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
|
||||
func (t *testWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
|
@ -120,7 +120,7 @@ func (t *trackedWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index
|
||||
return t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid)
|
||||
}
|
||||
|
||||
func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
|
||||
func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
|
||||
defer t.tracker.track(id, sealtasks.TTReadUnsealed)()
|
||||
|
||||
return t.Worker.ReadPiece(ctx, writer, id, index, size)
|
||||
|
Loading…
Reference in New Issue
Block a user