piecereader: Avoid redundant roundtrips when seeking

This commit is contained in:
Łukasz Magiera 2021-12-09 14:52:33 +01:00
parent 13b260e7f7
commit a3d8494a04
4 changed files with 104 additions and 26 deletions

View File

@ -80,21 +80,28 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
return nil, xerrors.Errorf("acquiring read sector lock: %w", err)
}
// Reader returns a reader getter for an unsealed piece at the given offset in the given sector.
// The returned reader will be nil if none of the workers has an unsealed sector file containing
// the unsealed piece.
rg, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()), size.Padded())
if err != nil {
cancel()
log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err)
return nil, err
}
if rg == nil {
cancel()
return nil, nil
}
pr, err := (&pieceReader{
ctx: ctx,
getReader: func(ctx context.Context, startOffset uint64) (io.ReadCloser, error) {
startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127
// Reader returns a reader for an unsealed piece at the given offset in the given sector.
// The returned reader will be nil if none of the workers has an unsealed sector file containing
// the unsealed piece.
r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()+startOffsetAligned.Padded()), size.Padded()-abi.PaddedPieceSize(startOffsetAligned.Padded()))
r, err := rg(startOffsetAligned.Padded())
if err != nil {
log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err)
return nil, err
}
if r == nil {
return nil, nil
return nil, xerrors.Errorf("getting reader at +%d: %w", startOffsetAligned, err)
}
upr, err := fr32.NewUnpadReader(r, size.Padded())

View File

@ -67,6 +67,9 @@ func (p *pieceReader) Close() error {
}
if p.r != nil {
if err := p.r.Close(); err != nil {
return err
}
if err := p.r.Close(); err != nil {
return err
}

View File

@ -585,7 +585,7 @@ func (r *Remote) CheckIsUnsealed(ctx context.Context, s storage.SectorRef, offse
// 1. no worker(local worker included) has an unsealed file for the given sector OR
// 2. no worker(local worker included) has the unsealed piece in their unsealed sector file.
// Will return a nil reader and a nil error in such a case.
func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) {
func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error), error) {
ft := storiface.FTUnsealed
// check if we have the unsealed sector file locally
@ -623,7 +623,52 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
if has {
log.Infof("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size)
return r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset), size)
return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
// don't reuse between readers unless closed
f := pf
pf = nil
if f == nil {
f, err = r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
if err != nil {
return nil, xerrors.Errorf("opening partial file: %w", err)
}
log.Debugf("local partial file (re)opened %s (+%d,%d)", path, offset, size)
}
r, err := r.pfHandler.Reader(f, storiface.PaddedByteIndex(offset)+startOffsetAligned, size-abi.PaddedPieceSize(startOffsetAligned))
if err != nil {
return nil, err
}
return struct {
io.Reader
io.Closer
}{
Reader: r,
Closer: funcCloser(func() error {
// if we already have a reader cached, close this one
if pf != nil {
if f == nil {
return nil
}
if pf == f {
pf = nil
}
tmp := f
f = nil
return tmp.Close()
}
// otherwise stash it away for reuse
pf = f
return nil
}),
}, nil
}, nil
}
log.Debugf("miner has unsealed file but not unseal piece, %s (+%d,%d)", path, offset, size)
@ -666,16 +711,19 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
continue
}
// readRemote fetches a reader that we can use to read the unsealed piece from the remote worker.
// It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file.
rd, err := r.readRemote(ctx, url, offset, size)
if err != nil {
log.Warnw("reading from remote", "url", url, "error", err)
lastErr = err
continue
}
log.Infof("Read remote %s (+%d,%d)", url, offset, size)
return rd, nil
return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
// readRemote fetches a reader that we can use to read the unsealed piece from the remote worker.
// It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file.
rd, err := r.readRemote(ctx, url, offset+abi.PaddedPieceSize(startOffsetAligned), size)
if err != nil {
log.Warnw("reading from remote", "url", url, "error", err)
return nil, err
}
log.Infof("Read remote %s (+%d,%d)", url, offset, size)
return rd, err
}, nil
}
}
@ -692,3 +740,11 @@ func (r *Remote) Reserve(ctx context.Context, sid storage.SectorRef, ft storifac
}
var _ Store = &Remote{}
type funcCloser func() error
func (f funcCloser) Close() error {
return f()
}
var _ io.Closer = funcCloser(nil)

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
@ -470,12 +471,20 @@ func TestReader(t *testing.T) {
remoteStore := stores.NewRemote(lstore, index, nil, 6000, pfhandler)
rd, err := remoteStore.Reader(ctx, sectorRef, offset, size)
rdg, err := remoteStore.Reader(ctx, sectorRef, offset, size)
var rd io.ReadCloser
if tc.errStr != "" {
require.Error(t, err)
require.Nil(t, rd)
require.Contains(t, err.Error(), tc.errStr)
if rdg == nil {
require.Error(t, err)
require.Nil(t, rdg)
require.Contains(t, err.Error(), tc.errStr)
} else {
rd, err = rdg(0)
require.Error(t, err)
require.Nil(t, rd)
require.Contains(t, err.Error(), tc.errStr)
}
} else {
require.NoError(t, err)
}
@ -483,7 +492,10 @@ func TestReader(t *testing.T) {
if !tc.expectedNonNilReader {
require.Nil(t, rd)
} else {
require.NotNil(t, rd)
require.NotNil(t, rdg)
rd, err := rdg(0)
require.NoError(t, err)
defer func() {
require.NoError(t, rd.Close())
}()