From 3d2084ab931932da4b6dddd32b8b35df0fe3bc19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 30 Jul 2020 22:03:43 +0200 Subject: [PATCH 1/2] Try to not unseal in ReadPiece when don't need to --- ffiwrapper/partialfile.go | 19 +++++++++++++++++++ ffiwrapper/sealer_cgo.go | 27 +++++++++++++++++++-------- ffiwrapper/types.go | 2 +- localworker.go | 4 ++-- manager.go | 32 +++++++++++++++++++++++++++++--- work_tracker.go | 2 +- 6 files changed, 71 insertions(+), 15 deletions(-) diff --git a/ffiwrapper/partialfile.go b/ffiwrapper/partialfile.go index b1ab8c53c..f6c03f1a3 100644 --- a/ffiwrapper/partialfile.go +++ b/ffiwrapper/partialfile.go @@ -279,6 +279,25 @@ func (pf *partialFile) Allocated() (rlepluslazy.RunIterator, error) { return pf.allocated.RunIterator() } +func (pf *partialFile) HasAllocated(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) { + have, err := pf.Allocated() + if err != nil { + return false, err + } + + u, err := rlepluslazy.Union(have, pieceRun(offset.Padded(), size.Padded())) + if err != nil { + return false, err + } + + uc, err := rlepluslazy.Count(u) + if err != nil { + return false, err + } + + return abi.PaddedPieceSize(uc) == size.Padded(), nil +} + func pieceRun(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) rlepluslazy.RunIterator { var runs []rlepluslazy.Run if offset > 0 { diff --git a/ffiwrapper/sealer_cgo.go b/ffiwrapper/sealer_cgo.go index 416bfa70b..8a4f18bc7 100644 --- a/ffiwrapper/sealer_cgo.go +++ b/ffiwrapper/sealer_cgo.go @@ -361,10 +361,10 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s return nil } -func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { +func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) { path, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, stores.PathStorage) if err != nil { - return xerrors.Errorf("acquire unsealed sector path: %w", err) + return false, xerrors.Errorf("acquire unsealed sector path: %w", err) } defer done() @@ -372,30 +372,41 @@ func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.Se pf, err := openPartialFile(maxPieceSize, path.Unsealed) if xerrors.Is(err, os.ErrNotExist) { - return xerrors.Errorf("opening partial file: %w", err) + return false, xerrors.Errorf("opening partial file: %w", err) + } + + ok, err := pf.HasAllocated(offset, size) + if err != nil { + pf.Close() + return false, err + } + + if !ok { + pf.Close() + return false, nil } f, err := pf.Reader(offset.Padded(), size.Padded()) if err != nil { pf.Close() - return xerrors.Errorf("getting partial file reader: %w", err) + return false, xerrors.Errorf("getting partial file reader: %w", err) } upr, err := fr32.NewUnpadReader(f, size.Padded()) if err != nil { - return xerrors.Errorf("creating unpadded reader: %w", err) + return false, xerrors.Errorf("creating unpadded reader: %w", err) } if _, err := io.CopyN(writer, upr, int64(size)); err != nil { pf.Close() - return xerrors.Errorf("reading unsealed file: %w", err) + return false, xerrors.Errorf("reading unsealed file: %w", err) } if err := pf.Close(); err != nil { - return xerrors.Errorf("closing partial file: %w", err) + return false, xerrors.Errorf("closing partial file: %w", err) } - return nil + return false, nil } func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) { diff --git a/ffiwrapper/types.go b/ffiwrapper/types.go index 13c0ee990..bc3c44f54 100644 --- a/ffiwrapper/types.go +++ b/ffiwrapper/types.go @@ -29,7 +29,7 @@ type Storage interface { StorageSealer UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error - ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error + ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) } type Verifier interface { diff --git a/localworker.go b/localworker.go index 7b9bbdee1..14ed1cd0b 100644 --- a/localworker.go +++ b/localworker.go @@ -237,10 +237,10 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, inde return nil } -func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { +func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) { sb, err := l.sb() if err != nil { - return err + return false, err } return sb.ReadPiece(ctx, writer, sector, index, size) diff --git a/manager.go b/manager.go index 4791eb5e6..5f2b8e334 100644 --- a/manager.go +++ b/manager.go @@ -34,7 +34,7 @@ type Worker interface { Fetch(ctx context.Context, s abi.SectorID, ft stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error UnsealPiece(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error - ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) error + ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (bool, error) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) @@ -221,7 +221,28 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect return xerrors.Errorf("creating unsealPiece selector: %w", err) } - // TODO: Optimization: don't send unseal to a worker if the requested range is already unsealed + var readOk bool + + if len(best) > 0 { + // There is unsealed sector, see if we can read from it + + selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false) + if err != nil { + return xerrors.Errorf("creating readPiece selector: %w", err) + } + + err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { + readOk, err = w.ReadPiece(ctx, sink, sector, offset, size) + return err + }) + if err != nil { + return xerrors.Errorf("reading piece from sealed sector: %w", err) + } + + if readOk { + return nil + } + } unsealFetch := func(ctx context.Context, worker Worker) error { if err := worker.Fetch(ctx, sector, stores.FTSealed|stores.FTCache, stores.PathSealing, stores.AcquireCopy); err != nil { @@ -249,12 +270,17 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect } err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { - return w.ReadPiece(ctx, sink, sector, offset, size) + readOk, err = w.ReadPiece(ctx, sink, sector, offset, size) + return err }) if err != nil { return xerrors.Errorf("reading piece from sealed sector: %w", err) } + if readOk { + return xerrors.Errorf("failed to read unsealed piece") + } + return nil } diff --git a/work_tracker.go b/work_tracker.go index f1e243ed2..7453752c9 100644 --- a/work_tracker.go +++ b/work_tracker.go @@ -120,7 +120,7 @@ func (t *trackedWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index return t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid) } -func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { +func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) { defer t.tracker.track(id, sealtasks.TTReadUnsealed)() return t.Worker.ReadPiece(ctx, writer, id, index, size) From 7153e1dd05b5e7aca17098eb47a65f9855bfdec5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 30 Jul 2020 22:38:05 +0200 Subject: [PATCH 2/2] Fix tests --- ffiwrapper/partialfile.go | 2 +- ffiwrapper/sealer_test.go | 17 ++++++++++------- sched_test.go | 2 +- testworker_test.go | 2 +- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/ffiwrapper/partialfile.go b/ffiwrapper/partialfile.go index f6c03f1a3..3e8b32288 100644 --- a/ffiwrapper/partialfile.go +++ b/ffiwrapper/partialfile.go @@ -285,7 +285,7 @@ func (pf *partialFile) HasAllocated(offset storiface.UnpaddedByteIndex, size abi return false, err } - u, err := rlepluslazy.Union(have, pieceRun(offset.Padded(), size.Padded())) + u, err := rlepluslazy.And(have, pieceRun(offset.Padded(), size.Padded())) if err != nil { return false, err } diff --git a/ffiwrapper/sealer_test.go b/ffiwrapper/sealer_test.go index 0b5018d84..f795be159 100644 --- a/ffiwrapper/sealer_test.go +++ b/ffiwrapper/sealer_test.go @@ -111,7 +111,7 @@ func (s *seal) unseal(t *testing.T, sb *Sealer, sp *basicfs.Provider, si abi.Sec defer done() var b bytes.Buffer - err := sb.ReadPiece(context.TODO(), &b, si, 0, 1016) + _, err := sb.ReadPiece(context.TODO(), &b, si, 0, 1016) if err != nil { t.Fatal(err) } @@ -130,7 +130,7 @@ func (s *seal) unseal(t *testing.T, sb *Sealer, sp *basicfs.Provider, si abi.Sec } sd() - err = sb.ReadPiece(context.TODO(), &b, si, 0, 1016) + _, err = sb.ReadPiece(context.TODO(), &b, si, 0, 1016) if err == nil { t.Fatal("HOW?!") } @@ -141,7 +141,7 @@ func (s *seal) unseal(t *testing.T, sb *Sealer, sp *basicfs.Provider, si abi.Sec } b.Reset() - err = sb.ReadPiece(context.TODO(), &b, si, 0, 1016) + _, err = sb.ReadPiece(context.TODO(), &b, si, 0, 1016) if err != nil { t.Fatal(err) } @@ -150,14 +150,17 @@ func (s *seal) unseal(t *testing.T, sb *Sealer, sp *basicfs.Provider, si abi.Sec require.Equal(t, expect, b.Bytes()) b.Reset() - err = sb.ReadPiece(context.TODO(), &b, si, 0, 2032) + have, err := sb.ReadPiece(context.TODO(), &b, si, 0, 2032) if err != nil { t.Fatal(err) } - expect = append(expect, bytes.Repeat([]byte{0}, 1016)...) - if !bytes.Equal(b.Bytes(), expect) { - t.Fatal("read wrong bytes") + if have { + t.Errorf("didn't expect to read things") + } + + if b.Len() != 0 { + t.Fatal("read bytes") } } diff --git a/sched_test.go b/sched_test.go index caf7f0b4b..c96f7838c 100644 --- a/sched_test.go +++ b/sched_test.go @@ -88,7 +88,7 @@ func (s *schedTestWorker) UnsealPiece(ctx context.Context, id abi.SectorID, inde panic("implement me") } -func (s *schedTestWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { +func (s *schedTestWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) { panic("implement me") } diff --git a/testworker_test.go b/testworker_test.go index bdfff1915..40151a84d 100644 --- a/testworker_test.go +++ b/testworker_test.go @@ -53,7 +53,7 @@ func (t *testWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index sto panic("implement me") } -func (t *testWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { +func (t *testWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) { panic("implement me") }