diff --git a/cmd/lotus-shed/sectors.go b/cmd/lotus-shed/sectors.go index 465b0e772..899e0f290 100644 --- a/cmd/lotus-shed/sectors.go +++ b/cmd/lotus-shed/sectors.go @@ -653,7 +653,7 @@ fr32 padding is removed from the output.`, return xerrors.Errorf("getting reader: %w", err) } - rd, err := readStarter(0) + rd, err := readStarter(0, storiface.PaddedByteIndex(length)) if err != nil { return xerrors.Errorf("starting reader: %w", err) } diff --git a/itests/deals_partial_retrieval_dm-level_test.go b/itests/deals_partial_retrieval_dm-level_test.go index 4e59b163b..391c467a9 100644 --- a/itests/deals_partial_retrieval_dm-level_test.go +++ b/itests/deals_partial_retrieval_dm-level_test.go @@ -48,7 +48,7 @@ func TestDMLevelPartialRetrieval(t *testing.T) { ctx := context.Background() kit.QuietMiningLogs() - client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.MockProofs()) + client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC()) dh := kit.NewDealHarness(t, client, miner, miner) ens.InterconnectAll().BeginMining(50 * time.Millisecond) diff --git a/lib/readerutil/readerutil.go b/lib/readerutil/readerutil.go new file mode 100644 index 000000000..ea2fed426 --- /dev/null +++ b/lib/readerutil/readerutil.go @@ -0,0 +1,44 @@ +package readerutil + +import ( + "io" + "os" +) + +// NewReadSeekerFromReaderAt returns a new io.ReadSeeker from a io.ReaderAt. +// The returned io.ReadSeeker will read from the io.ReaderAt starting at the +// given base offset. +func NewReadSeekerFromReaderAt(readerAt io.ReaderAt, base int64) io.ReadSeeker { + return &readSeekerFromReaderAt{ + readerAt: readerAt, + base: base, + pos: 0, + } +} + +type readSeekerFromReaderAt struct { + readerAt io.ReaderAt + base int64 + pos int64 +} + +func (rs *readSeekerFromReaderAt) Read(p []byte) (n int, err error) { + n, err = rs.readerAt.ReadAt(p, rs.pos+rs.base) + rs.pos += int64(n) + return n, err +} + +func (rs *readSeekerFromReaderAt) Seek(offset int64, whence int) (int64, error) { + switch whence { + case io.SeekStart: + rs.pos = offset + case io.SeekCurrent: + rs.pos += offset + case io.SeekEnd: + return 0, io.ErrUnexpectedEOF + default: + return 0, os.ErrInvalid + } + + return rs.pos, nil +} diff --git a/metrics/metrics.go b/metrics/metrics.go index 13627a663..ee7bd8695 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -61,6 +61,10 @@ var ( // vm execution ExecutionLane, _ = tag.NewKey("lane") + + // piecereader + PRReadType, _ = tag.NewKey("pr_type") // seq / rand + PRReadSize, _ = tag.NewKey("pr_size") // small / big ) // Measures @@ -153,8 +157,9 @@ var ( SchedCycleOpenWindows = stats.Int64("sched/assigner_cycle_open_window", "Number of open windows in scheduling cycles", stats.UnitDimensionless) SchedCycleQueueSize = stats.Int64("sched/assigner_cycle_task_queue_entry", "Number of task queue entries in scheduling cycles", stats.UnitDimensionless) - DagStorePRInitCount = stats.Int64("dagstore/pr_init_count", "PieceReader init count", stats.UnitDimensionless) - DagStorePRBytesRequested = stats.Int64("dagstore/pr_requested_bytes", "PieceReader requested bytes", stats.UnitBytes) + DagStorePRInitCount = stats.Int64("dagstore/pr_init_count", "PieceReader init count", stats.UnitDimensionless) + DagStorePRBytesRequested = stats.Int64("dagstore/pr_requested_bytes", "PieceReader requested bytes", stats.UnitBytes) + DagStorePRBytesDiscarded = stats.Int64("dagstore/pr_discarded_bytes", "PieceReader discarded bytes", stats.UnitBytes) DagStorePRDiscardCount = stats.Int64("dagstore/pr_discard_count", "PieceReader discard count", stats.UnitDimensionless) DagStorePRSeekBackCount = stats.Int64("dagstore/pr_seek_back_count", "PieceReader seek back count", stats.UnitDimensionless) @@ -162,6 +167,12 @@ var ( DagStorePRSeekBackBytes = stats.Int64("dagstore/pr_seek_back_bytes", "PieceReader seek back bytes", stats.UnitBytes) DagStorePRSeekForwardBytes = stats.Int64("dagstore/pr_seek_forward_bytes", "PieceReader seek forward bytes", stats.UnitBytes) + DagStorePRAtHitBytes = stats.Int64("dagstore/pr_at_hit_bytes", "PieceReader ReadAt bytes from cache", stats.UnitBytes) + DagStorePRAtHitCount = stats.Int64("dagstore/pr_at_hit_count", "PieceReader ReadAt from cache hits", stats.UnitDimensionless) + DagStorePRAtCacheFillCount = stats.Int64("dagstore/pr_at_cache_fill_count", "PieceReader ReadAt full cache fill count", stats.UnitDimensionless) + DagStorePRAtReadBytes = stats.Int64("dagstore/pr_at_read_bytes", "PieceReader ReadAt bytes read from source", stats.UnitBytes) // PRReadSize tag + DagStorePRAtReadCount = stats.Int64("dagstore/pr_at_read_count", "PieceReader ReadAt reads from source", stats.UnitDimensionless) // PRReadSize tag + // splitstore SplitstoreMiss = stats.Int64("splitstore/miss", "Number of misses in hotstre access", stats.UnitDimensionless) SplitstoreCompactionTimeSeconds = stats.Float64("splitstore/compaction_time", "Compaction time in seconds", stats.UnitSeconds) @@ -487,6 +498,7 @@ var ( DagStorePRBytesRequestedView = &view.View{ Measure: DagStorePRBytesRequested, Aggregation: view.Sum(), + TagKeys: []tag.Key{PRReadType}, } DagStorePRBytesDiscardedView = &view.View{ Measure: DagStorePRBytesDiscarded, @@ -513,6 +525,29 @@ var ( Aggregation: view.Sum(), } + DagStorePRAtHitBytesView = &view.View{ + Measure: DagStorePRAtHitBytes, + Aggregation: view.Sum(), + } + DagStorePRAtHitCountView = &view.View{ + Measure: DagStorePRAtHitCount, + Aggregation: view.Count(), + } + DagStorePRAtCacheFillCountView = &view.View{ + Measure: DagStorePRAtCacheFillCount, + Aggregation: view.Count(), + } + DagStorePRAtReadBytesView = &view.View{ + Measure: DagStorePRAtReadBytes, + Aggregation: view.Sum(), + TagKeys: []tag.Key{PRReadSize}, + } + DagStorePRAtReadCountView = &view.View{ + Measure: DagStorePRAtReadCount, + Aggregation: view.Count(), + TagKeys: []tag.Key{PRReadSize}, + } + // splitstore SplitstoreMissView = &view.View{ Measure: SplitstoreMiss, @@ -779,6 +814,11 @@ var MinerNodeViews = append([]*view.View{ DagStorePRSeekForwardCountView, DagStorePRSeekBackBytesView, DagStorePRSeekForwardBytesView, + DagStorePRAtHitBytesView, + DagStorePRAtHitCountView, + DagStorePRAtCacheFillCountView, + DagStorePRAtReadBytesView, + DagStorePRAtReadCountView, }, DefaultViews...) var GatewayNodeViews = append([]*view.View{ diff --git a/storage/paths/http_handler.go b/storage/paths/http_handler.go index fbf1c85b0..4d0539079 100644 --- a/storage/paths/http_handler.go +++ b/storage/paths/http_handler.go @@ -3,6 +3,7 @@ package paths import ( "bytes" "encoding/json" + "io" "net/http" "os" "strconv" @@ -35,7 +36,7 @@ func (d *DefaultPartialFileHandler) HasAllocated(pf *partialfile.PartialFile, of return pf.HasAllocated(offset, size) } -func (d *DefaultPartialFileHandler) Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) { +func (d *DefaultPartialFileHandler) Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Reader, error) { return pf.Reader(offset, size) } diff --git a/storage/paths/http_handler_test.go b/storage/paths/http_handler_test.go index 4987936dd..cf6d71c37 100644 --- a/storage/paths/http_handler_test.go +++ b/storage/paths/http_handler_test.go @@ -207,8 +207,8 @@ func TestRemoteGetAllocated(t *testing.T) { pfhandler := mocks.NewMockPartialFileHandler(mockCtrl) handler := &paths.FetchHandler{ - lstore, - pfhandler, + Local: lstore, + PfHandler: pfhandler, } // run http server diff --git a/storage/paths/interface.go b/storage/paths/interface.go index b0e714c13..d96135de8 100644 --- a/storage/paths/interface.go +++ b/storage/paths/interface.go @@ -2,7 +2,7 @@ package paths import ( "context" - "os" + "io" "github.com/filecoin-project/go-state-types/abi" @@ -24,7 +24,7 @@ type PartialFileHandler interface { HasAllocated(pf *partialfile.PartialFile, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) // Reader returns a file from which we can read the unsealed piece in the partial file. - Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) + Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Reader, error) // Close closes the partial file Close(pf *partialfile.PartialFile) error diff --git a/storage/paths/mocks/pf.go b/storage/paths/mocks/pf.go index 50b020aaa..43b3bc489 100644 --- a/storage/paths/mocks/pf.go +++ b/storage/paths/mocks/pf.go @@ -5,7 +5,7 @@ package mocks import ( - os "os" + io "io" reflect "reflect" gomock "github.com/golang/mock/gomock" @@ -84,10 +84,10 @@ func (mr *MockPartialFileHandlerMockRecorder) OpenPartialFile(arg0, arg1 interfa } // Reader mocks base method. -func (m *MockPartialFileHandler) Reader(arg0 *partialfile.PartialFile, arg1 storiface.PaddedByteIndex, arg2 abi.PaddedPieceSize) (*os.File, error) { +func (m *MockPartialFileHandler) Reader(arg0 *partialfile.PartialFile, arg1 storiface.PaddedByteIndex, arg2 abi.PaddedPieceSize) (io.Reader, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Reader", arg0, arg1, arg2) - ret0, _ := ret[0].(*os.File) + ret0, _ := ret[0].(io.Reader) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/storage/paths/remote.go b/storage/paths/remote.go index 06bd39bf1..ab23e9789 100644 --- a/storage/paths/remote.go +++ b/storage/paths/remote.go @@ -14,6 +14,7 @@ import ( "sort" "strings" "sync" + "time" "github.com/hashicorp/go-multierror" "golang.org/x/xerrors" @@ -21,6 +22,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/storage/sealer/fsutil" + "github.com/filecoin-project/lotus/storage/sealer/partialfile" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -28,6 +30,10 @@ var FetchTempSubdir = "fetching" var CopyBuf = 1 << 20 +// LocalReaderTimeout is the timeout for keeping local reader files open without +// any read activity. +var LocalReaderTimeout = 5 * time.Second + type Remote struct { local Store index SectorIndex @@ -563,7 +569,7 @@ func (r *Remote) CheckIsUnsealed(ctx context.Context, s storiface.SectorRef, off // 1. no worker(local worker included) has an unsealed file for the given sector OR // 2. no worker(local worker included) has the unsealed piece in their unsealed sector file. // Will return a nil reader and a nil error in such a case. -func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size abi.PaddedPieceSize) (func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error), error) { +func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size abi.PaddedPieceSize) (func(startOffsetAligned, endOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error), error) { ft := storiface.FTUnsealed // check if we have the unsealed sector file locally @@ -602,20 +608,67 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size if has { log.Infof("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size) - return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) { - // don't reuse between readers unless closed - f := pf - pf = nil + // refs keep track of the currently opened pf + // if they drop to 0 for longer than LocalReaderTimeout, pf will be closed + var refsLk sync.Mutex + refs := 0 - if f == nil { - f, err = r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path) - if err != nil { - return nil, xerrors.Errorf("opening partial file: %w", err) + cleanupIdle := func() { + lastRefs := 1 + + for range time.After(LocalReaderTimeout) { + refsLk.Lock() + if refs == 0 && lastRefs == 0 && pf != nil { // pf can't really be nil here, but better be safe + log.Infow("closing idle partial file", "path", path) + err := pf.Close() + if err != nil { + log.Errorw("closing idle partial file", "path", path, "error", err) + } + + pf = nil + refsLk.Unlock() + return } - log.Debugf("local partial file (re)opened %s (+%d,%d)", path, offset, size) + lastRefs = refs + refsLk.Unlock() + } + } + + getPF := func() (*partialfile.PartialFile, func() error, error) { + refsLk.Lock() + defer refsLk.Unlock() + + if pf == nil { + // got closed in the meantime, reopen + + var err error + pf, err = r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path) + if err != nil { + return nil, nil, xerrors.Errorf("reopening partial file: %w", err) + } + log.Debugf("local partial file reopened %s (+%d,%d)", path, offset, size) + + go cleanupIdle() } - r, err := r.pfHandler.Reader(f, storiface.PaddedByteIndex(offset)+startOffsetAligned, size-abi.PaddedPieceSize(startOffsetAligned)) + refs++ + + return pf, func() error { + refsLk.Lock() + defer refsLk.Unlock() + + refs-- + return nil + }, nil + } + + return func(startOffsetAligned, endOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) { + pf, done, err := getPF() + if err != nil { + return nil, xerrors.Errorf("getting partialfile handle: %w", err) + } + + r, err := r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset)+startOffsetAligned, abi.PaddedPieceSize(endOffsetAligned-startOffsetAligned)) if err != nil { return nil, err } @@ -625,25 +678,7 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size io.Closer }{ Reader: r, - Closer: funcCloser(func() error { - // if we already have a reader cached, close this one - if pf != nil { - if f == nil { - return nil - } - if pf == f { - pf = nil - } - - tmp := f - f = nil - return tmp.Close() - } - - // otherwise stash it away for reuse - pf = f - return nil - }), + Closer: funcCloser(done), }, nil }, nil @@ -689,10 +724,10 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size continue } - return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) { + return func(startOffsetAligned, endOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) { // readRemote fetches a reader that we can use to read the unsealed piece from the remote worker. // It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file. - rd, err := r.readRemote(ctx, url, offset+abi.PaddedPieceSize(startOffsetAligned), size) + rd, err := r.readRemote(ctx, url, offset+abi.PaddedPieceSize(startOffsetAligned), offset+abi.PaddedPieceSize(endOffsetAligned)) if err != nil { log.Warnw("reading from remote", "url", url, "error", err) return nil, err diff --git a/storage/paths/remote_test.go b/storage/paths/remote_test.go index 41d5e8a17..e3376e6fa 100644 --- a/storage/paths/remote_test.go +++ b/storage/paths/remote_test.go @@ -477,7 +477,7 @@ func TestReader(t *testing.T) { require.Nil(t, rdg) require.Contains(t, err.Error(), tc.errStr) } else { - rd, err = rdg(0) + rd, err = rdg(0, storiface.PaddedByteIndex(size)) require.Error(t, err) require.Nil(t, rd) require.Contains(t, err.Error(), tc.errStr) @@ -490,7 +490,7 @@ func TestReader(t *testing.T) { require.Nil(t, rd) } else { require.NotNil(t, rdg) - rd, err := rdg(0) + rd, err := rdg(0, storiface.PaddedByteIndex(size)) require.NoError(t, err) defer func() { diff --git a/storage/sealer/fr32/fr32.go b/storage/sealer/fr32/fr32.go index 24175719c..6f5be65b7 100644 --- a/storage/sealer/fr32/fr32.go +++ b/storage/sealer/fr32/fr32.go @@ -8,6 +8,21 @@ import ( "github.com/filecoin-project/go-state-types/abi" ) +// UnpaddedFr32Chunk is the minimum amount of data which can be fr32-padded +// Fr32 padding inserts two zero bits every 254 bits, so the minimum amount of +// data which can be padded is 254 bits. 127 bytes is the smallest multiple of +// 254 bits which has a whole number of bytes. +const UnpaddedFr32Chunk abi.UnpaddedPieceSize = 127 + +// PaddedFr32Chunk is the size of a UnpaddedFr32Chunk chunk after fr32 padding +const PaddedFr32Chunk abi.PaddedPieceSize = 128 + +func init() { + if PaddedFr32Chunk != UnpaddedFr32Chunk.Padded() { + panic("bad math") + } +} + var MTTresh = uint64(512 << 10) func mtChunkCount(usz abi.PaddedPieceSize) uint64 { diff --git a/storage/sealer/partialfile/partialfile.go b/storage/sealer/partialfile/partialfile.go index 6e8c2d843..4357f796d 100644 --- a/storage/sealer/partialfile/partialfile.go +++ b/storage/sealer/partialfile/partialfile.go @@ -13,6 +13,7 @@ import ( rlepluslazy "github.com/filecoin-project/go-bitfield/rle" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/lib/readerutil" "github.com/filecoin-project/lotus/storage/sealer/fsutil" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -249,7 +250,10 @@ func (pf *PartialFile) Free(offset storiface.PaddedByteIndex, size abi.PaddedPie return nil } -func (pf *PartialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) { +// Reader forks off a new reader from the underlying file, and returns a reader +// starting at the given offset and reading the given size. Safe for concurrent +// use. +func (pf *PartialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Reader, error) { if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil { return nil, xerrors.Errorf("seek piece start: %w", err) } @@ -275,7 +279,7 @@ func (pf *PartialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedP } } - return pf.file, nil + return io.LimitReader(readerutil.NewReadSeekerFromReaderAt(pf.file, int64(offset)), int64(size)), nil } func (pf *PartialFile) Allocated() (rlepluslazy.RunIterator, error) { diff --git a/storage/sealer/piece_provider.go b/storage/sealer/piece_provider.go index f49b8b0c7..0e992b679 100644 --- a/storage/sealer/piece_provider.go +++ b/storage/sealer/piece_provider.go @@ -4,8 +4,10 @@ import ( "bufio" "context" "io" + "sync" "github.com/ipfs/go-cid" + pool "github.com/libp2p/go-buffer-pool" "golang.org/x/xerrors" "github.com/filecoin-project/dagstore/mount" @@ -71,7 +73,7 @@ func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storiface.SectorR // It will NOT try to schedule an Unseal of a sealed sector file for the read. // // Returns a nil reader if the piece does NOT exist in any unsealed file or there is no unsealed file for the given sector on any of the workers. -func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, sector storiface.SectorRef, pieceOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (mount.Reader, error) { +func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, sector storiface.SectorRef, pieceOffset storiface.UnpaddedByteIndex, pieceSize abi.UnpaddedPieceSize) (mount.Reader, error) { // acquire a lock purely for reading unsealed sectors ctx, cancel := context.WithCancel(ctx) if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil { @@ -82,30 +84,37 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se // Reader returns a reader getter for an unsealed piece at the given offset in the given sector. // The returned reader will be nil if none of the workers has an unsealed sector file containing // the unsealed piece. - rg, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()), size.Padded()) + readerGetter, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()), pieceSize.Padded()) if err != nil { cancel() log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err) return nil, err } - if rg == nil { + if readerGetter == nil { cancel() return nil, nil } - buf := make([]byte, fr32.BufSize(size.Padded())) - pr, err := (&pieceReader{ - ctx: ctx, - getReader: func(ctx context.Context, startOffset uint64) (io.ReadCloser, error) { - startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127 + getReader: func(startOffset, readSize uint64) (io.ReadCloser, error) { + // The request is for unpadded bytes, at any offset. + // storage.Reader readers give us fr32-padded bytes, so we need to + // do the unpadding here. - r, err := rg(startOffsetAligned.Padded()) + startOffsetAligned := storiface.UnpaddedFloor(startOffset) + startOffsetDiff := int(startOffset - uint64(startOffsetAligned)) + + endOffset := startOffset + readSize + endOffsetAligned := storiface.UnpaddedCeil(endOffset) + + r, err := readerGetter(startOffsetAligned.Padded(), endOffsetAligned.Padded()) if err != nil { return nil, xerrors.Errorf("getting reader at +%d: %w", startOffsetAligned, err) } - upr, err := fr32.NewUnpadReaderBuf(r, size.Padded(), buf) + buf := pool.Get(fr32.BufSize(pieceSize.Padded())) + + upr, err := fr32.NewUnpadReaderBuf(r, pieceSize.Padded(), buf) if err != nil { r.Close() // nolint return nil, xerrors.Errorf("creating unpadded reader: %w", err) @@ -113,26 +122,31 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se bir := bufio.NewReaderSize(upr, 127) if startOffset > uint64(startOffsetAligned) { - if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil { + if _, err := bir.Discard(startOffsetDiff); err != nil { r.Close() // nolint return nil, xerrors.Errorf("discarding bytes for startOffset: %w", err) } } + var closeOnce sync.Once + return struct { io.Reader io.Closer }{ Reader: bir, Closer: funcCloser(func() error { + closeOnce.Do(func() { + pool.Put(buf) + }) return r.Close() }), }, nil }, - len: size, + len: pieceSize, onClose: cancel, pieceCid: pc, - }).init() + }).init(ctx) if err != nil || pr == nil { // pr == nil to make sure we don't return typed nil cancel() return nil, err diff --git a/storage/sealer/piece_reader.go b/storage/sealer/piece_reader.go index 6ef616e41..7a7cd1841 100644 --- a/storage/sealer/piece_reader.go +++ b/storage/sealer/piece_reader.go @@ -6,8 +6,10 @@ import ( "io" "sync" + lru "github.com/hashicorp/golang-lru/v2" "github.com/ipfs/go-cid" "go.opencensus.io/stats" + "go.opencensus.io/tag" "golang.org/x/xerrors" "github.com/filecoin-project/dagstore/mount" @@ -21,29 +23,48 @@ import ( var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M var ReadBuf = 128 * (127 * 8) // unpadded(128k) -type pieceGetter func(ctx context.Context, offset uint64) (io.ReadCloser, error) +var MinRandomReadSize = int64(4 << 10) + +type pieceGetter func(offset, size uint64) (io.ReadCloser, error) type pieceReader struct { - ctx context.Context getReader pieceGetter pieceCid cid.Cid len abi.UnpaddedPieceSize onClose context.CancelFunc + seqMCtx context.Context + atMCtx context.Context + closed bool seqAt int64 // next byte to be read by io.Reader - mu sync.Mutex - r io.ReadCloser - br *bufio.Reader - rAt int64 + // sequential reader + seqMu sync.Mutex + r io.ReadCloser + br *bufio.Reader + rAt int64 + + // random read cache + remReads *lru.Cache[int64, []byte] // data start offset -> data + // todo try carrying a "bytes read sequentially so far" counter with those + // cacahed byte buffers, increase buffer sizes when we see that we're doing + // a long sequential read } -func (p *pieceReader) init() (_ *pieceReader, err error) { - stats.Record(p.ctx, metrics.DagStorePRInitCount.M(1)) +func (p *pieceReader) init(ctx context.Context) (_ *pieceReader, err error) { + stats.Record(ctx, metrics.DagStorePRInitCount.M(1)) + + p.seqMCtx, _ = tag.New(ctx, tag.Upsert(metrics.PRReadType, "seq")) + p.atMCtx, _ = tag.New(ctx, tag.Upsert(metrics.PRReadType, "rand")) + + p.remReads, err = lru.New[int64, []byte](100) + if err != nil { + return nil, err + } p.rAt = 0 - p.r, err = p.getReader(p.ctx, uint64(p.rAt)) + p.r, err = p.getReader(uint64(p.rAt), uint64(p.len)) if err != nil { return nil, err } @@ -65,17 +86,14 @@ func (p *pieceReader) check() error { } func (p *pieceReader) Close() error { - p.mu.Lock() - defer p.mu.Unlock() + p.seqMu.Lock() + defer p.seqMu.Unlock() if err := p.check(); err != nil { return err } if p.r != nil { - if err := p.r.Close(); err != nil { - return err - } if err := p.r.Close(); err != nil { return err } @@ -90,21 +108,21 @@ func (p *pieceReader) Close() error { } func (p *pieceReader) Read(b []byte) (int, error) { - p.mu.Lock() - defer p.mu.Unlock() + p.seqMu.Lock() + defer p.seqMu.Unlock() if err := p.check(); err != nil { return 0, err } - n, err := p.readAtUnlocked(b, p.seqAt) + n, err := p.readSeqReader(b) p.seqAt += int64(n) return n, err } func (p *pieceReader) Seek(offset int64, whence int) (int64, error) { - p.mu.Lock() - defer p.mu.Unlock() + p.seqMu.Lock() + defer p.seqMu.Unlock() if err := p.check(); err != nil { return 0, err @@ -124,19 +142,14 @@ func (p *pieceReader) Seek(offset int64, whence int) (int64, error) { return p.seqAt, nil } -func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { - p.mu.Lock() - defer p.mu.Unlock() +func (p *pieceReader) readSeqReader(b []byte) (n int, err error) { + off := p.seqAt - return p.readAtUnlocked(b, off) -} - -func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) { if err := p.check(); err != nil { return 0, err } - stats.Record(p.ctx, metrics.DagStorePRBytesRequested.M(int64(len(b)))) + stats.Record(p.seqMCtx, metrics.DagStorePRBytesRequested.M(int64(len(b)))) // 1. Get the backing reader into the correct position @@ -154,13 +167,13 @@ func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) { log.Debugw("pieceReader new stream", "piece", p.pieceCid, "at", p.rAt, "off", off-p.rAt, "n", len(b)) if off > p.rAt { - stats.Record(p.ctx, metrics.DagStorePRSeekForwardBytes.M(off-p.rAt), metrics.DagStorePRSeekForwardCount.M(1)) + stats.Record(p.seqMCtx, metrics.DagStorePRSeekForwardBytes.M(off-p.rAt), metrics.DagStorePRSeekForwardCount.M(1)) } else { - stats.Record(p.ctx, metrics.DagStorePRSeekBackBytes.M(p.rAt-off), metrics.DagStorePRSeekBackCount.M(1)) + stats.Record(p.seqMCtx, metrics.DagStorePRSeekBackBytes.M(p.rAt-off), metrics.DagStorePRSeekBackCount.M(1)) } p.rAt = off - p.r, err = p.getReader(p.ctx, uint64(p.rAt)) + p.r, err = p.getReader(uint64(p.rAt), uint64(p.len)) p.br = bufio.NewReaderSize(p.r, ReadBuf) if err != nil { return 0, xerrors.Errorf("getting backing reader: %w", err) @@ -169,7 +182,7 @@ func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) { // 2. Check if we need to burn some bytes if off > p.rAt { - stats.Record(p.ctx, metrics.DagStorePRBytesDiscarded.M(off-p.rAt), metrics.DagStorePRDiscardCount.M(1)) + stats.Record(p.seqMCtx, metrics.DagStorePRBytesDiscarded.M(off-p.rAt), metrics.DagStorePRDiscardCount.M(1)) n, err := io.CopyN(io.Discard, p.br, off-p.rAt) p.rAt += n @@ -196,4 +209,99 @@ func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) { return n, err } +func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { + stats.Record(p.atMCtx, metrics.DagStorePRBytesRequested.M(int64(len(b)))) + + var filled int64 + + // try to get a buf from lru + data, ok := p.remReads.Get(off) + if ok { + n = copy(b, data) + filled += int64(n) + + if n < len(data) { + p.remReads.Add(off+int64(n), data[n:]) + + // keep the header buffered + if off != 0 { + p.remReads.Remove(off) + } + } + + stats.Record(p.atMCtx, metrics.DagStorePRAtHitBytes.M(int64(n)), metrics.DagStorePRAtHitCount.M(1)) + // dagstore/pr_at_hit_bytes, dagstore/pr_at_hit_count + } + if filled == int64(len(b)) { + // dagstore/pr_at_cache_fill_count + stats.Record(p.atMCtx, metrics.DagStorePRAtCacheFillCount.M(1)) + return n, nil + } + + readOff := off + filled + readSize := int64(len(b)) - filled + + smallRead := readSize < MinRandomReadSize + + if smallRead { + // read into small read buf + readBuf := make([]byte, MinRandomReadSize) + bn, err := p.readInto(readBuf, readOff) + if err != nil && err != io.EOF { + return int(filled), err + } + + _ = stats.RecordWithTags(p.atMCtx, []tag.Mutator{tag.Insert(metrics.PRReadSize, "small")}, metrics.DagStorePRAtReadBytes.M(int64(bn)), metrics.DagStorePRAtReadCount.M(1)) + + // reslice so that the slice is the data + readBuf = readBuf[:bn] + + // fill user data + used := copy(b[filled:], readBuf[:]) + filled += int64(used) + readBuf = readBuf[used:] + + // cache the rest + if len(readBuf) > 0 { + p.remReads.Add(readOff+int64(used), readBuf) + } + } else { + // read into user buf + bn, err := p.readInto(b[filled:], readOff) + if err != nil { + return int(filled), err + } + filled += int64(bn) + + _ = stats.RecordWithTags(p.atMCtx, []tag.Mutator{tag.Insert(metrics.PRReadSize, "big")}, metrics.DagStorePRAtReadBytes.M(int64(bn)), metrics.DagStorePRAtReadCount.M(1)) + } + + if filled < int64(len(b)) { + return int(filled), io.EOF + } + + return int(filled), nil +} + +func (p *pieceReader) readInto(b []byte, off int64) (n int, err error) { + rd, err := p.getReader(uint64(off), uint64(len(b))) + if err != nil { + return 0, xerrors.Errorf("getting reader: %w", err) + } + + n, err = io.ReadFull(rd, b) + + cerr := rd.Close() + + if err == io.ErrUnexpectedEOF { + err = io.EOF + } + + if err != nil { + return n, err + } + + return n, cerr +} + var _ mount.Reader = (*pieceReader)(nil) diff --git a/storage/sealer/storiface/ffi.go b/storage/sealer/storiface/ffi.go index 9696b29db..4a9f832b8 100644 --- a/storage/sealer/storiface/ffi.go +++ b/storage/sealer/storiface/ffi.go @@ -8,6 +8,8 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/storage/sealer/fr32" ) var ErrSectorNotFound = errors.New("sector not found") @@ -26,6 +28,14 @@ func (i UnpaddedByteIndex) Valid() error { return nil } +func UnpaddedFloor(n uint64) UnpaddedByteIndex { + return UnpaddedByteIndex(n / uint64(fr32.UnpaddedFr32Chunk) * uint64(fr32.UnpaddedFr32Chunk)) +} + +func UnpaddedCeil(n uint64) UnpaddedByteIndex { + return UnpaddedByteIndex((n + uint64(fr32.UnpaddedFr32Chunk-1)) / uint64(fr32.UnpaddedFr32Chunk) * uint64(fr32.UnpaddedFr32Chunk)) +} + type PaddedByteIndex uint64 type RGetter func(ctx context.Context, id abi.SectorID) (sealed cid.Cid, update bool, err error)