piecereader: Retune to allow parallel ReadAt calls
This commit is contained in:
parent
8b2ef40f4e
commit
cd75ea0fe4
@ -48,7 +48,7 @@ func TestDMLevelPartialRetrieval(t *testing.T) {
|
|||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
kit.QuietMiningLogs()
|
kit.QuietMiningLogs()
|
||||||
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.MockProofs())
|
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC())
|
||||||
dh := kit.NewDealHarness(t, client, miner, miner)
|
dh := kit.NewDealHarness(t, client, miner, miner)
|
||||||
ens.InterconnectAll().BeginMining(50 * time.Millisecond)
|
ens.InterconnectAll().BeginMining(50 * time.Millisecond)
|
||||||
|
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/dagstore/mount"
|
"github.com/filecoin-project/dagstore/mount"
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/metrics"
|
"github.com/filecoin-project/lotus/metrics"
|
||||||
)
|
)
|
||||||
@ -21,6 +22,8 @@ import (
|
|||||||
var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M
|
var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M
|
||||||
var ReadBuf = 128 * (127 * 8) // unpadded(128k)
|
var ReadBuf = 128 * (127 * 8) // unpadded(128k)
|
||||||
|
|
||||||
|
var MinRandomReadSize = int64(4 << 10)
|
||||||
|
|
||||||
type pieceGetter func(ctx context.Context, offset, size uint64) (io.ReadCloser, error)
|
type pieceGetter func(ctx context.Context, offset, size uint64) (io.ReadCloser, error)
|
||||||
|
|
||||||
type pieceReader struct {
|
type pieceReader struct {
|
||||||
@ -33,15 +36,27 @@ type pieceReader struct {
|
|||||||
closed bool
|
closed bool
|
||||||
seqAt int64 // next byte to be read by io.Reader
|
seqAt int64 // next byte to be read by io.Reader
|
||||||
|
|
||||||
mu sync.Mutex
|
// sequential reader
|
||||||
r io.ReadCloser
|
seqMu sync.Mutex
|
||||||
br *bufio.Reader
|
r io.ReadCloser
|
||||||
rAt int64
|
br *bufio.Reader
|
||||||
|
rAt int64
|
||||||
|
|
||||||
|
// random read cache
|
||||||
|
remReads *lru.Cache[int64, []byte] // data start offset -> data
|
||||||
|
// todo try carrying a "bytes read sequentially so far" counter with those
|
||||||
|
// cacahed byte buffers, increase buffer sizes when we see that we're doing
|
||||||
|
// a long sequential read
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pieceReader) init() (_ *pieceReader, err error) {
|
func (p *pieceReader) init() (_ *pieceReader, err error) {
|
||||||
stats.Record(p.ctx, metrics.DagStorePRInitCount.M(1))
|
stats.Record(p.ctx, metrics.DagStorePRInitCount.M(1))
|
||||||
|
|
||||||
|
p.remReads, err = lru.New[int64, []byte](100)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
p.rAt = 0
|
p.rAt = 0
|
||||||
p.r, err = p.getReader(p.ctx, uint64(p.rAt), uint64(p.len))
|
p.r, err = p.getReader(p.ctx, uint64(p.rAt), uint64(p.len))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -65,8 +80,8 @@ func (p *pieceReader) check() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *pieceReader) Close() error {
|
func (p *pieceReader) Close() error {
|
||||||
p.mu.Lock()
|
p.seqMu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.seqMu.Unlock()
|
||||||
|
|
||||||
if err := p.check(); err != nil {
|
if err := p.check(); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -90,21 +105,21 @@ func (p *pieceReader) Close() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *pieceReader) Read(b []byte) (int, error) {
|
func (p *pieceReader) Read(b []byte) (int, error) {
|
||||||
p.mu.Lock()
|
p.seqMu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.seqMu.Unlock()
|
||||||
|
|
||||||
if err := p.check(); err != nil {
|
if err := p.check(); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err := p.readAtUnlocked(b, p.seqAt)
|
n, err := p.readSeqReader(b)
|
||||||
p.seqAt += int64(n)
|
p.seqAt += int64(n)
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pieceReader) Seek(offset int64, whence int) (int64, error) {
|
func (p *pieceReader) Seek(offset int64, whence int) (int64, error) {
|
||||||
p.mu.Lock()
|
p.seqMu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.seqMu.Unlock()
|
||||||
|
|
||||||
if err := p.check(); err != nil {
|
if err := p.check(); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
@ -124,14 +139,9 @@ func (p *pieceReader) Seek(offset int64, whence int) (int64, error) {
|
|||||||
return p.seqAt, nil
|
return p.seqAt, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
|
func (p *pieceReader) readSeqReader(b []byte) (n int, err error) {
|
||||||
p.mu.Lock()
|
off := p.seqAt
|
||||||
defer p.mu.Unlock()
|
|
||||||
|
|
||||||
return p.readAtUnlocked(b, off)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) {
|
|
||||||
if err := p.check(); err != nil {
|
if err := p.check(); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
@ -196,4 +206,81 @@ func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) {
|
|||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
|
||||||
|
var filled int64
|
||||||
|
|
||||||
|
// try to get a buf from lru
|
||||||
|
data, ok := p.remReads.Get(off)
|
||||||
|
if ok {
|
||||||
|
n = copy(b, data)
|
||||||
|
filled += int64(n)
|
||||||
|
|
||||||
|
if n < len(data) {
|
||||||
|
p.remReads.Add(off+int64(n), data[n:])
|
||||||
|
|
||||||
|
// keep the header buffered
|
||||||
|
if off != 0 {
|
||||||
|
p.remReads.Remove(off)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if filled == int64(len(b)) {
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
readOff := off + filled
|
||||||
|
readSize := int64(len(b)) - filled
|
||||||
|
|
||||||
|
smallRead := readSize < MinRandomReadSize
|
||||||
|
|
||||||
|
if smallRead {
|
||||||
|
// read into small read buf
|
||||||
|
readBuf := make([]byte, MinRandomReadSize)
|
||||||
|
bn, err := p.readInto(readBuf, readOff)
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
return int(filled), err
|
||||||
|
}
|
||||||
|
|
||||||
|
// reslice so that the slice is the data
|
||||||
|
readBuf = readBuf[:bn]
|
||||||
|
|
||||||
|
// fill user data
|
||||||
|
used := copy(b[filled:], readBuf[:])
|
||||||
|
filled += int64(used)
|
||||||
|
readBuf = readBuf[used:]
|
||||||
|
|
||||||
|
// cache the rest
|
||||||
|
if len(readBuf) > 0 {
|
||||||
|
p.remReads.Add(readOff+int64(used), readBuf)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// read into user buf
|
||||||
|
bn, err := p.readInto(b[filled:], readOff)
|
||||||
|
if err != nil {
|
||||||
|
return int(filled), err
|
||||||
|
}
|
||||||
|
filled += int64(bn)
|
||||||
|
}
|
||||||
|
|
||||||
|
if filled < int64(len(b)) {
|
||||||
|
return int(filled), io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
return int(filled), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pieceReader) readInto(b []byte, off int64) (n int, err error) {
|
||||||
|
rd, err := p.getReader(p.ctx, uint64(off), uint64(len(b)))
|
||||||
|
if err != nil {
|
||||||
|
return 0, xerrors.Errorf("getting reader: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err = io.ReadFull(rd, b)
|
||||||
|
if err == io.ErrUnexpectedEOF {
|
||||||
|
err = io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
var _ mount.Reader = (*pieceReader)(nil)
|
var _ mount.Reader = (*pieceReader)(nil)
|
||||||
|
Loading…
Reference in New Issue
Block a user