diff --git a/itests/deals_partial_retrieval_dm-level_test.go b/itests/deals_partial_retrieval_dm-level_test.go index e3414c191..46571769a 100644 --- a/itests/deals_partial_retrieval_dm-level_test.go +++ b/itests/deals_partial_retrieval_dm-level_test.go @@ -48,7 +48,7 @@ func TestDMLevelPartialRetrieval(t *testing.T) { ctx := context.Background() kit.QuietMiningLogs() - client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.MockProofs()) + client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC()) dh := kit.NewDealHarness(t, client, miner, miner) ens.InterconnectAll().BeginMining(50 * time.Millisecond) diff --git a/storage/sealer/piece_reader.go b/storage/sealer/piece_reader.go index 7d5b77307..5d900736e 100644 --- a/storage/sealer/piece_reader.go +++ b/storage/sealer/piece_reader.go @@ -12,6 +12,7 @@ import ( "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/go-state-types/abi" + lru "github.com/hashicorp/golang-lru/v2" "github.com/filecoin-project/lotus/metrics" ) @@ -21,6 +22,8 @@ import ( var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M var ReadBuf = 128 * (127 * 8) // unpadded(128k) +var MinRandomReadSize = int64(4 << 10) + type pieceGetter func(ctx context.Context, offset, size uint64) (io.ReadCloser, error) type pieceReader struct { @@ -33,15 +36,27 @@ type pieceReader struct { closed bool seqAt int64 // next byte to be read by io.Reader - mu sync.Mutex - r io.ReadCloser - br *bufio.Reader - rAt int64 + // sequential reader + seqMu sync.Mutex + r io.ReadCloser + br *bufio.Reader + rAt int64 + + // random read cache + remReads *lru.Cache[int64, []byte] // data start offset -> data + // todo try carrying a "bytes read sequentially so far" counter with those + // cacahed byte buffers, increase buffer sizes when we see that we're doing + // a long sequential read } func (p *pieceReader) init() (_ *pieceReader, err error) { stats.Record(p.ctx, metrics.DagStorePRInitCount.M(1)) + p.remReads, err = lru.New[int64, []byte](100) + if err != nil { + return nil, err + } + p.rAt = 0 p.r, err = p.getReader(p.ctx, uint64(p.rAt), uint64(p.len)) if err != nil { @@ -65,8 +80,8 @@ func (p *pieceReader) check() error { } func (p *pieceReader) Close() error { - p.mu.Lock() - defer p.mu.Unlock() + p.seqMu.Lock() + defer p.seqMu.Unlock() if err := p.check(); err != nil { return err @@ -90,21 +105,21 @@ func (p *pieceReader) Close() error { } func (p *pieceReader) Read(b []byte) (int, error) { - p.mu.Lock() - defer p.mu.Unlock() + p.seqMu.Lock() + defer p.seqMu.Unlock() if err := p.check(); err != nil { return 0, err } - n, err := p.readAtUnlocked(b, p.seqAt) + n, err := p.readSeqReader(b) p.seqAt += int64(n) return n, err } func (p *pieceReader) Seek(offset int64, whence int) (int64, error) { - p.mu.Lock() - defer p.mu.Unlock() + p.seqMu.Lock() + defer p.seqMu.Unlock() if err := p.check(); err != nil { return 0, err @@ -124,14 +139,9 @@ func (p *pieceReader) Seek(offset int64, whence int) (int64, error) { return p.seqAt, nil } -func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { - p.mu.Lock() - defer p.mu.Unlock() +func (p *pieceReader) readSeqReader(b []byte) (n int, err error) { + off := p.seqAt - return p.readAtUnlocked(b, off) -} - -func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) { if err := p.check(); err != nil { return 0, err } @@ -196,4 +206,81 @@ func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) { return n, err } +func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { + var filled int64 + + // try to get a buf from lru + data, ok := p.remReads.Get(off) + if ok { + n = copy(b, data) + filled += int64(n) + + if n < len(data) { + p.remReads.Add(off+int64(n), data[n:]) + + // keep the header buffered + if off != 0 { + p.remReads.Remove(off) + } + } + } + if filled == int64(len(b)) { + return n, nil + } + + readOff := off + filled + readSize := int64(len(b)) - filled + + smallRead := readSize < MinRandomReadSize + + if smallRead { + // read into small read buf + readBuf := make([]byte, MinRandomReadSize) + bn, err := p.readInto(readBuf, readOff) + if err != nil && err != io.EOF { + return int(filled), err + } + + // reslice so that the slice is the data + readBuf = readBuf[:bn] + + // fill user data + used := copy(b[filled:], readBuf[:]) + filled += int64(used) + readBuf = readBuf[used:] + + // cache the rest + if len(readBuf) > 0 { + p.remReads.Add(readOff+int64(used), readBuf) + } + } else { + // read into user buf + bn, err := p.readInto(b[filled:], readOff) + if err != nil { + return int(filled), err + } + filled += int64(bn) + } + + if filled < int64(len(b)) { + return int(filled), io.EOF + } + + return int(filled), nil +} + +func (p *pieceReader) readInto(b []byte, off int64) (n int, err error) { + rd, err := p.getReader(p.ctx, uint64(off), uint64(len(b))) + if err != nil { + return 0, xerrors.Errorf("getting reader: %w", err) + } + + n, err = io.ReadFull(rd, b) + if err == io.ErrUnexpectedEOF { + err = io.EOF + } + + return n, err +} + var _ mount.Reader = (*pieceReader)(nil)