dagstore pieceReader: Always read full in ReadAt
This commit is contained in:
parent
dad9190142
commit
3969d6b767
@ -1,21 +1,23 @@
|
|||||||
package dagstore
|
package dagstore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"context"
|
"context"
|
||||||
"github.com/filecoin-project/lotus/metrics"
|
|
||||||
"go.opencensus.io/stats"
|
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
"go.opencensus.io/stats"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/dagstore/mount"
|
"github.com/filecoin-project/dagstore/mount"
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
"github.com/filecoin-project/lotus/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
// For small read skips, it's faster to "burn" some bytes than to setup new sector reader.
|
// For small read skips, it's faster to "burn" some bytes than to setup new sector reader.
|
||||||
// Assuming 1ms stream seek latency, and 1G/s stream rate, we're willing to discard up to 1 MiB.
|
// Assuming 1ms stream seek latency, and 1G/s stream rate, we're willing to discard up to 1 MiB.
|
||||||
var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M
|
var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M
|
||||||
|
var ReadBuf = 128 * (127 * 8) // unpadded(128k)
|
||||||
|
|
||||||
type pieceReader struct {
|
type pieceReader struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
@ -27,6 +29,7 @@ type pieceReader struct {
|
|||||||
seqAt int64 // next byte to be read by io.Reader
|
seqAt int64 // next byte to be read by io.Reader
|
||||||
|
|
||||||
r io.ReadCloser
|
r io.ReadCloser
|
||||||
|
br *bufio.Reader
|
||||||
rAt int64
|
rAt int64
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -38,6 +41,7 @@ func (p *pieceReader) init() (_ *pieceReader, err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
p.br = bufio.NewReaderSize(p.r, ReadBuf)
|
||||||
|
|
||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
@ -111,9 +115,10 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
|
|||||||
return 0, xerrors.Errorf("closing backing reader: %w", err)
|
return 0, xerrors.Errorf("closing backing reader: %w", err)
|
||||||
}
|
}
|
||||||
p.r = nil
|
p.r = nil
|
||||||
|
p.br = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugw("pieceReader new stream", "piece", p.pieceCid, "at", p.rAt, "off", off-p.rAt)
|
log.Debugw("pieceReader new stream", "piece", p.pieceCid, "at", p.rAt, "off", off-p.rAt, "n", len(b))
|
||||||
|
|
||||||
if off > p.rAt {
|
if off > p.rAt {
|
||||||
stats.Record(p.ctx, metrics.DagStorePRSeekForwardBytes.M(off-p.rAt), metrics.DagStorePRSeekForwardCount.M(1))
|
stats.Record(p.ctx, metrics.DagStorePRSeekForwardBytes.M(off-p.rAt), metrics.DagStorePRSeekForwardCount.M(1))
|
||||||
@ -123,6 +128,7 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
|
|||||||
|
|
||||||
p.rAt = off
|
p.rAt = off
|
||||||
p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
|
p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
|
||||||
|
p.br = bufio.NewReaderSize(p.r, ReadBuf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, xerrors.Errorf("getting backing reader: %w", err)
|
return 0, xerrors.Errorf("getting backing reader: %w", err)
|
||||||
}
|
}
|
||||||
@ -132,7 +138,7 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
|
|||||||
if off > p.rAt {
|
if off > p.rAt {
|
||||||
stats.Record(p.ctx, metrics.DagStorePRBytesDiscarded.M(off-p.rAt), metrics.DagStorePRDiscardCount.M(1))
|
stats.Record(p.ctx, metrics.DagStorePRBytesDiscarded.M(off-p.rAt), metrics.DagStorePRDiscardCount.M(1))
|
||||||
|
|
||||||
n, err := io.CopyN(io.Discard, p.r, off-p.rAt)
|
n, err := io.CopyN(io.Discard, p.br, off-p.rAt)
|
||||||
p.rAt += n
|
p.rAt += n
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, xerrors.Errorf("discarding read gap: %w", err)
|
return 0, xerrors.Errorf("discarding read gap: %w", err)
|
||||||
@ -145,7 +151,11 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 4. Read!
|
// 4. Read!
|
||||||
n, err = p.r.Read(b)
|
n, err = io.ReadFull(p.br, b)
|
||||||
|
if n < len(b) {
|
||||||
|
log.Debugw("pieceReader short read", "piece", p.pieceCid, "at", p.rAt, "toEnd", int64(p.len)-p.rAt, "n", len(b), "read", n, "err", err)
|
||||||
|
}
|
||||||
|
|
||||||
p.rAt += int64(n)
|
p.rAt += int64(n)
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user