piecereader: Avoid allocating 1024MB slices per read

This commit is contained in:
Łukasz Magiera 2021-12-09 15:49:37 +01:00
parent a3d8494a04
commit 9c75a3aaa8
2 changed files with 25 additions and 4 deletions

View File

@ -16,13 +16,21 @@ type unpadReader struct {
work []byte work []byte
} }
func BufSize(sz abi.PaddedPieceSize) int {
return int(MTTresh * mtChunkCount(sz))
}
func NewUnpadReader(src io.Reader, sz abi.PaddedPieceSize) (io.Reader, error) { func NewUnpadReader(src io.Reader, sz abi.PaddedPieceSize) (io.Reader, error) {
buf := make([]byte, BufSize(sz))
return NewUnpadReaderBuf(src, sz, buf)
}
func NewUnpadReaderBuf(src io.Reader, sz abi.PaddedPieceSize, buf []byte) (io.Reader, error) {
if err := sz.Validate(); err != nil { if err := sz.Validate(); err != nil {
return nil, xerrors.Errorf("bad piece size: %w", err) return nil, xerrors.Errorf("bad piece size: %w", err)
} }
buf := make([]byte, MTTresh*mtChunkCount(sz))
return &unpadReader{ return &unpadReader{
src: src, src: src,

View File

@ -94,6 +94,8 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
return nil, nil return nil, nil
} }
buf := make([]byte, fr32.BufSize(size.Padded()))
pr, err := (&pieceReader{ pr, err := (&pieceReader{
ctx: ctx, ctx: ctx,
getReader: func(ctx context.Context, startOffset uint64) (io.ReadCloser, error) { getReader: func(ctx context.Context, startOffset uint64) (io.ReadCloser, error) {
@ -104,7 +106,7 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
return nil, xerrors.Errorf("getting reader at +%d: %w", startOffsetAligned, err) return nil, xerrors.Errorf("getting reader at +%d: %w", startOffsetAligned, err)
} }
upr, err := fr32.NewUnpadReader(r, size.Padded()) upr, err := fr32.NewUnpadReaderBuf(r, size.Padded(), buf)
if err != nil { if err != nil {
r.Close() // nolint r.Close() // nolint
return nil, xerrors.Errorf("creating unpadded reader: %w", err) return nil, xerrors.Errorf("creating unpadded reader: %w", err)
@ -113,6 +115,7 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
bir := bufio.NewReaderSize(upr, 127) bir := bufio.NewReaderSize(upr, 127)
if startOffset > uint64(startOffsetAligned) { if startOffset > uint64(startOffsetAligned) {
if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil { if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil {
r.Close() // nolint
return nil, xerrors.Errorf("discarding bytes for startOffset: %w", err) return nil, xerrors.Errorf("discarding bytes for startOffset: %w", err)
} }
} }
@ -122,7 +125,9 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
io.Closer io.Closer
}{ }{
Reader: bir, Reader: bir,
Closer: r, Closer: funcCloser(func() error {
return r.Close()
}),
}, nil }, nil
}, },
len: size, len: size,
@ -137,6 +142,14 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
return pr, err return pr, err
} }
type funcCloser func() error
func (f funcCloser) Close() error {
return f()
}
var _ io.Closer = funcCloser(nil)
// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector // ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
// If an Unsealed sector file exists with the Piece Unsealed in it, we'll use that for the read. // If an Unsealed sector file exists with the Piece Unsealed in it, we'll use that for the read.
// Otherwise, we will Unseal a Sealed sector file for the given sector and read the Unsealed piece from it. // Otherwise, we will Unseal a Sealed sector file for the given sector and read the Unsealed piece from it.