diff --git a/markets/dagstore/piecereader.go b/markets/dagstore/piecereader.go index f9ba881f5..14a027bd1 100644 --- a/markets/dagstore/piecereader.go +++ b/markets/dagstore/piecereader.go @@ -1,21 +1,23 @@ package dagstore import ( + "bufio" "context" - "github.com/filecoin-project/lotus/metrics" - "go.opencensus.io/stats" "io" "github.com/ipfs/go-cid" + "go.opencensus.io/stats" "golang.org/x/xerrors" "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/metrics" ) // For small read skips, it's faster to "burn" some bytes than to setup new sector reader. // Assuming 1ms stream seek latency, and 1G/s stream rate, we're willing to discard up to 1 MiB. var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M +var ReadBuf = 128 * (127 * 8) // unpadded(128k) type pieceReader struct { ctx context.Context @@ -27,6 +29,7 @@ type pieceReader struct { seqAt int64 // next byte to be read by io.Reader r io.ReadCloser + br *bufio.Reader rAt int64 } @@ -38,6 +41,7 @@ func (p *pieceReader) init() (_ *pieceReader, err error) { if err != nil { return nil, err } + p.br = bufio.NewReaderSize(p.r, ReadBuf) return p, nil } @@ -111,9 +115,10 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { return 0, xerrors.Errorf("closing backing reader: %w", err) } p.r = nil + p.br = nil } - log.Debugw("pieceReader new stream", "piece", p.pieceCid, "at", p.rAt, "off", off-p.rAt) + log.Debugw("pieceReader new stream", "piece", p.pieceCid, "at", p.rAt, "off", off-p.rAt, "n", len(b)) if off > p.rAt { stats.Record(p.ctx, metrics.DagStorePRSeekForwardBytes.M(off-p.rAt), metrics.DagStorePRSeekForwardCount.M(1)) @@ -123,6 +128,7 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { p.rAt = off p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt)) + p.br = bufio.NewReaderSize(p.r, ReadBuf) if err != nil { return 0, xerrors.Errorf("getting backing reader: %w", err) } @@ -132,7 +138,7 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { if off > p.rAt { stats.Record(p.ctx, metrics.DagStorePRBytesDiscarded.M(off-p.rAt), metrics.DagStorePRDiscardCount.M(1)) - n, err := io.CopyN(io.Discard, p.r, off-p.rAt) + n, err := io.CopyN(io.Discard, p.br, off-p.rAt) p.rAt += n if err != nil { return 0, xerrors.Errorf("discarding read gap: %w", err) @@ -145,7 +151,11 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { } // 4. Read! - n, err = p.r.Read(b) + n, err = io.ReadFull(p.br, b) + if n < len(b) { + log.Debugw("pieceReader short read", "piece", p.pieceCid, "at", p.rAt, "toEnd", int64(p.len)-p.rAt, "n", len(b), "read", n, "err", err) + } + p.rAt += int64(n) return n, err }