piecereader: Avoid allocating 1024MB slices per read

This commit is contained in:
Łukasz Magiera 2021-12-09 15:49:37 +01:00 committed by Jennifer Wang
parent a438e6fa73
commit b4c1e340ea
2 changed files with 25 additions and 4 deletions

View File

@ -16,13 +16,21 @@ type unpadReader struct {
work []byte
}
func BufSize(sz abi.PaddedPieceSize) int {
return int(MTTresh * mtChunkCount(sz))
}
func NewUnpadReader(src io.Reader, sz abi.PaddedPieceSize) (io.Reader, error) {
buf := make([]byte, BufSize(sz))
return NewUnpadReaderBuf(src, sz, buf)
}
func NewUnpadReaderBuf(src io.Reader, sz abi.PaddedPieceSize, buf []byte) (io.Reader, error) {
if err := sz.Validate(); err != nil {
return nil, xerrors.Errorf("bad piece size: %w", err)
}
buf := make([]byte, MTTresh*mtChunkCount(sz))
return &unpadReader{
src: src,

View File

@ -94,6 +94,8 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
return nil, nil
}
buf := make([]byte, fr32.BufSize(size.Padded()))
pr, err := (&pieceReader{
ctx: ctx,
getReader: func(ctx context.Context, startOffset uint64) (io.ReadCloser, error) {
@ -104,7 +106,7 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
return nil, xerrors.Errorf("getting reader at +%d: %w", startOffsetAligned, err)
}
upr, err := fr32.NewUnpadReader(r, size.Padded())
upr, err := fr32.NewUnpadReaderBuf(r, size.Padded(), buf)
if err != nil {
r.Close() // nolint
return nil, xerrors.Errorf("creating unpadded reader: %w", err)
@ -113,6 +115,7 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
bir := bufio.NewReaderSize(upr, 127)
if startOffset > uint64(startOffsetAligned) {
if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil {
r.Close() // nolint
return nil, xerrors.Errorf("discarding bytes for startOffset: %w", err)
}
}
@ -122,7 +125,9 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
io.Closer
}{
Reader: bir,
Closer: r,
Closer: funcCloser(func() error {
return r.Close()
}),
}, nil
},
len: size,
@ -137,6 +142,14 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
return pr, err
}
type funcCloser func() error
func (f funcCloser) Close() error {
return f()
}
var _ io.Closer = funcCloser(nil)
// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
// If an Unsealed sector file exists with the Piece Unsealed in it, we'll use that for the read.
// Otherwise, we will Unseal a Sealed sector file for the given sector and read the Unsealed piece from it.