piecereader: Fix parallel buffer access, close readers in readInto
This commit is contained in:
parent
cd75ea0fe4
commit
587c3fde58
@ -3,6 +3,7 @@ package sealer
|
|||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"context"
|
"context"
|
||||||
|
pool "github.com/libp2p/go-buffer-pool"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
@ -93,8 +94,6 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := make([]byte, fr32.BufSize(pieceSize.Padded()))
|
|
||||||
|
|
||||||
pr, err := (&pieceReader{
|
pr, err := (&pieceReader{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
getReader: func(ctx context.Context, startOffset, readSize uint64) (io.ReadCloser, error) {
|
getReader: func(ctx context.Context, startOffset, readSize uint64) (io.ReadCloser, error) {
|
||||||
@ -109,6 +108,8 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
|
|||||||
return nil, xerrors.Errorf("getting reader at +%d: %w", startOffsetAligned, err)
|
return nil, xerrors.Errorf("getting reader at +%d: %w", startOffsetAligned, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
buf := pool.Get(fr32.BufSize(pieceSize.Padded()))
|
||||||
|
|
||||||
upr, err := fr32.NewUnpadReaderBuf(r, pieceSize.Padded(), buf)
|
upr, err := fr32.NewUnpadReaderBuf(r, pieceSize.Padded(), buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Close() // nolint
|
r.Close() // nolint
|
||||||
@ -129,6 +130,7 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
|
|||||||
}{
|
}{
|
||||||
Reader: bir,
|
Reader: bir,
|
||||||
Closer: funcCloser(func() error {
|
Closer: funcCloser(func() error {
|
||||||
|
pool.Put(buf)
|
||||||
return r.Close()
|
return r.Close()
|
||||||
}),
|
}),
|
||||||
}, nil
|
}, nil
|
||||||
|
@ -276,11 +276,18 @@ func (p *pieceReader) readInto(b []byte, off int64) (n int, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
n, err = io.ReadFull(rd, b)
|
n, err = io.ReadFull(rd, b)
|
||||||
|
|
||||||
|
cerr := rd.Close()
|
||||||
|
|
||||||
if err == io.ErrUnexpectedEOF {
|
if err == io.ErrUnexpectedEOF {
|
||||||
err = io.EOF
|
err = io.EOF
|
||||||
}
|
}
|
||||||
|
|
||||||
return n, err
|
if err != nil {
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return n, cerr
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ mount.Reader = (*pieceReader)(nil)
|
var _ mount.Reader = (*pieceReader)(nil)
|
||||||
|
Loading…
Reference in New Issue
Block a user