diff --git a/extern/sector-storage/piece_provider.go b/extern/sector-storage/piece_provider.go index c500b4e30..2d9ca33be 100644 --- a/extern/sector-storage/piece_provider.go +++ b/extern/sector-storage/piece_provider.go @@ -80,21 +80,28 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se return nil, xerrors.Errorf("acquiring read sector lock: %w", err) } + // Reader returns a reader getter for an unsealed piece at the given offset in the given sector. + // The returned reader will be nil if none of the workers has an unsealed sector file containing + // the unsealed piece. + rg, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()), size.Padded()) + if err != nil { + cancel() + log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err) + return nil, err + } + if rg == nil { + cancel() + return nil, nil + } + pr, err := (&pieceReader{ ctx: ctx, getReader: func(ctx context.Context, startOffset uint64) (io.ReadCloser, error) { startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127 - // Reader returns a reader for an unsealed piece at the given offset in the given sector. - // The returned reader will be nil if none of the workers has an unsealed sector file containing - // the unsealed piece. - r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()+startOffsetAligned.Padded()), size.Padded()-abi.PaddedPieceSize(startOffsetAligned.Padded())) + r, err := rg(startOffsetAligned.Padded()) if err != nil { - log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err) - return nil, err - } - if r == nil { - return nil, nil + return nil, xerrors.Errorf("getting reader at +%d: %w", startOffsetAligned, err) } upr, err := fr32.NewUnpadReader(r, size.Padded()) diff --git a/extern/sector-storage/piece_reader.go b/extern/sector-storage/piece_reader.go index 14f13f017..d7a3f4e98 100644 --- a/extern/sector-storage/piece_reader.go +++ b/extern/sector-storage/piece_reader.go @@ -67,6 +67,9 @@ func (p *pieceReader) Close() error { } if p.r != nil { + if err := p.r.Close(); err != nil { + return err + } if err := p.r.Close(); err != nil { return err } diff --git a/extern/sector-storage/stores/remote.go b/extern/sector-storage/stores/remote.go index 7935556a9..0681026c9 100644 --- a/extern/sector-storage/stores/remote.go +++ b/extern/sector-storage/stores/remote.go @@ -585,7 +585,7 @@ func (r *Remote) CheckIsUnsealed(ctx context.Context, s storage.SectorRef, offse // 1. no worker(local worker included) has an unsealed file for the given sector OR // 2. no worker(local worker included) has the unsealed piece in their unsealed sector file. // Will return a nil reader and a nil error in such a case. -func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) { +func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error), error) { ft := storiface.FTUnsealed // check if we have the unsealed sector file locally @@ -623,7 +623,52 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a if has { log.Infof("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size) - return r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset), size) + + return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) { + // don't reuse between readers unless closed + f := pf + pf = nil + + if f == nil { + f, err = r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path) + if err != nil { + return nil, xerrors.Errorf("opening partial file: %w", err) + } + log.Debugf("local partial file (re)opened %s (+%d,%d)", path, offset, size) + } + + r, err := r.pfHandler.Reader(f, storiface.PaddedByteIndex(offset)+startOffsetAligned, size-abi.PaddedPieceSize(startOffsetAligned)) + if err != nil { + return nil, err + } + + return struct { + io.Reader + io.Closer + }{ + Reader: r, + Closer: funcCloser(func() error { + // if we already have a reader cached, close this one + if pf != nil { + if f == nil { + return nil + } + if pf == f { + pf = nil + } + + tmp := f + f = nil + return tmp.Close() + } + + // otherwise stash it away for reuse + pf = f + return nil + }), + }, nil + }, nil + } log.Debugf("miner has unsealed file but not unseal piece, %s (+%d,%d)", path, offset, size) @@ -666,16 +711,19 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a continue } - // readRemote fetches a reader that we can use to read the unsealed piece from the remote worker. - // It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file. - rd, err := r.readRemote(ctx, url, offset, size) - if err != nil { - log.Warnw("reading from remote", "url", url, "error", err) - lastErr = err - continue - } - log.Infof("Read remote %s (+%d,%d)", url, offset, size) - return rd, nil + return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) { + // readRemote fetches a reader that we can use to read the unsealed piece from the remote worker. + // It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file. + rd, err := r.readRemote(ctx, url, offset+abi.PaddedPieceSize(startOffsetAligned), size) + if err != nil { + log.Warnw("reading from remote", "url", url, "error", err) + return nil, err + } + log.Infof("Read remote %s (+%d,%d)", url, offset, size) + + return rd, err + }, nil + } } @@ -692,3 +740,11 @@ func (r *Remote) Reserve(ctx context.Context, sid storage.SectorRef, ft storifac } var _ Store = &Remote{} + +type funcCloser func() error + +func (f funcCloser) Close() error { + return f() +} + +var _ io.Closer = funcCloser(nil) diff --git a/extern/sector-storage/stores/remote_test.go b/extern/sector-storage/stores/remote_test.go index ea9179655..0bc439dee 100644 --- a/extern/sector-storage/stores/remote_test.go +++ b/extern/sector-storage/stores/remote_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "net/http/httptest" @@ -470,12 +471,20 @@ func TestReader(t *testing.T) { remoteStore := stores.NewRemote(lstore, index, nil, 6000, pfhandler) - rd, err := remoteStore.Reader(ctx, sectorRef, offset, size) + rdg, err := remoteStore.Reader(ctx, sectorRef, offset, size) + var rd io.ReadCloser if tc.errStr != "" { - require.Error(t, err) - require.Nil(t, rd) - require.Contains(t, err.Error(), tc.errStr) + if rdg == nil { + require.Error(t, err) + require.Nil(t, rdg) + require.Contains(t, err.Error(), tc.errStr) + } else { + rd, err = rdg(0) + require.Error(t, err) + require.Nil(t, rd) + require.Contains(t, err.Error(), tc.errStr) + } } else { require.NoError(t, err) } @@ -483,7 +492,10 @@ func TestReader(t *testing.T) { if !tc.expectedNonNilReader { require.Nil(t, rd) } else { - require.NotNil(t, rd) + require.NotNil(t, rdg) + rd, err := rdg(0) + require.NoError(t, err) + defer func() { require.NoError(t, rd.Close()) }()