piecereader: address review
This commit is contained in:
parent
5e58f64380
commit
b58daf5340
@ -14,6 +14,7 @@ import (
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"golang.org/x/xerrors"
|
||||
@ -21,6 +22,7 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/partialfile"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
@ -28,6 +30,10 @@ var FetchTempSubdir = "fetching"
|
||||
|
||||
var CopyBuf = 1 << 20
|
||||
|
||||
// LocalReaderTimeout is the timeout for keeping local reader files open without
|
||||
// any read activity.
|
||||
var LocalReaderTimeout = 5 * time.Second
|
||||
|
||||
type Remote struct {
|
||||
local Store
|
||||
index SectorIndex
|
||||
@ -598,7 +604,66 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size
|
||||
if has {
|
||||
log.Infof("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size)
|
||||
|
||||
// refs keep track of the currently opened pf
|
||||
// if they drop to 0 for longer than LocalReaderTimeout, pf will be closed
|
||||
var refsLk sync.Mutex
|
||||
refs := 0
|
||||
|
||||
cleanupIdle := func() {
|
||||
lastRefs := 1
|
||||
|
||||
for range time.After(LocalReaderTimeout) {
|
||||
refsLk.Lock()
|
||||
if refs == 0 && lastRefs == 0 && pf != nil { // pf can't really be nil here, but better be safe
|
||||
log.Infow("closing idle partial file", "path", path)
|
||||
err := pf.Close()
|
||||
if err != nil {
|
||||
log.Errorw("closing idle partial file", "path", path, "error", err)
|
||||
}
|
||||
|
||||
pf = nil
|
||||
refsLk.Unlock()
|
||||
return
|
||||
}
|
||||
lastRefs = refs
|
||||
refsLk.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
getPF := func() (*partialfile.PartialFile, func() error, error) {
|
||||
refsLk.Lock()
|
||||
defer refsLk.Unlock()
|
||||
|
||||
if pf == nil {
|
||||
// got closed in the meantime, reopen
|
||||
|
||||
var err error
|
||||
pf, err = r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
|
||||
if err != nil {
|
||||
return nil, nil, xerrors.Errorf("reopening partial file: %w", err)
|
||||
}
|
||||
log.Debugf("local partial file reopened %s (+%d,%d)", path, offset, size)
|
||||
|
||||
go cleanupIdle()
|
||||
}
|
||||
|
||||
refs++
|
||||
|
||||
return pf, func() error {
|
||||
refsLk.Lock()
|
||||
defer refsLk.Unlock()
|
||||
|
||||
refs--
|
||||
return nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
return func(startOffsetAligned, endOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
|
||||
pf, done, err := getPF()
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("getting partialfile handle: %w", err)
|
||||
}
|
||||
|
||||
r, err := r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset)+startOffsetAligned, abi.PaddedPieceSize(endOffsetAligned-startOffsetAligned))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -609,10 +674,7 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size
|
||||
io.Closer
|
||||
}{
|
||||
Reader: r,
|
||||
Closer: funcCloser(func() error {
|
||||
// todo keep some refcount, close pf (or push to some lru) when it hits 0
|
||||
return nil
|
||||
}),
|
||||
Closer: funcCloser(done),
|
||||
}, nil
|
||||
}, nil
|
||||
|
||||
|
@ -8,6 +8,21 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
)
|
||||
|
||||
// UnpaddedFr32Chunk is the minimum amount of data which can be fr32-padded
|
||||
// Fr32 padding inserts two zero bits every 254 bits, so the minimum amount of
|
||||
// data which can be padded is 254 bits. 127 bytes is the smallest multiple of
|
||||
// 254 bits which has a whole number of bytes.
|
||||
const UnpaddedFr32Chunk abi.UnpaddedPieceSize = 127
|
||||
|
||||
// PaddedFr32Chunk is the size of a UnpaddedFr32Chunk chunk after fr32 padding
|
||||
const PaddedFr32Chunk abi.PaddedPieceSize = 128
|
||||
|
||||
func init() {
|
||||
if PaddedFr32Chunk != UnpaddedFr32Chunk.Padded() {
|
||||
panic("bad math")
|
||||
}
|
||||
}
|
||||
|
||||
var MTTresh = uint64(512 << 10)
|
||||
|
||||
func mtChunkCount(usz abi.PaddedPieceSize) uint64 {
|
||||
|
@ -84,27 +84,30 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
|
||||
// Reader returns a reader getter for an unsealed piece at the given offset in the given sector.
|
||||
// The returned reader will be nil if none of the workers has an unsealed sector file containing
|
||||
// the unsealed piece.
|
||||
rg, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()), pieceSize.Padded())
|
||||
readerGetter, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()), pieceSize.Padded())
|
||||
if err != nil {
|
||||
cancel()
|
||||
log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err)
|
||||
return nil, err
|
||||
}
|
||||
if rg == nil {
|
||||
if readerGetter == nil {
|
||||
cancel()
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
pr, err := (&pieceReader{
|
||||
ctx: ctx,
|
||||
getReader: func(ctx context.Context, startOffset, readSize uint64) (io.ReadCloser, error) {
|
||||
startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127
|
||||
getReader: func(startOffset, readSize uint64) (io.ReadCloser, error) {
|
||||
// The request is for unpadded bytes, at any offset.
|
||||
// storage.Reader readers give us fr32-padded bytes, so we need to
|
||||
// do the unpadding here.
|
||||
|
||||
startOffsetAligned := storiface.UnpaddedFloor(startOffset)
|
||||
startOffsetDiff := int(startOffset - uint64(startOffsetAligned))
|
||||
|
||||
endOffset := startOffset + readSize
|
||||
endOffsetAligned := storiface.UnpaddedByteIndex((endOffset + 126) / 127 * 127) // ceil to multiple of 127
|
||||
endOffsetAligned := storiface.UnpaddedCeil(endOffset)
|
||||
|
||||
r, err := rg(startOffsetAligned.Padded(), endOffsetAligned.Padded())
|
||||
r, err := readerGetter(startOffsetAligned.Padded(), endOffsetAligned.Padded())
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("getting reader at +%d: %w", startOffsetAligned, err)
|
||||
}
|
||||
@ -133,8 +136,6 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
|
||||
}{
|
||||
Reader: bir,
|
||||
Closer: funcCloser(func() error {
|
||||
// this close can be called more than once - double close signals to the paths.Reader that we are done with the piece
|
||||
|
||||
closeOnce.Do(func() {
|
||||
pool.Put(buf)
|
||||
})
|
||||
@ -145,7 +146,7 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
|
||||
len: pieceSize,
|
||||
onClose: cancel,
|
||||
pieceCid: pc,
|
||||
}).init()
|
||||
}).init(ctx)
|
||||
if err != nil || pr == nil { // pr == nil to make sure we don't return typed nil
|
||||
cancel()
|
||||
return nil, err
|
||||
|
@ -25,10 +25,9 @@ var ReadBuf = 128 * (127 * 8) // unpadded(128k)
|
||||
|
||||
var MinRandomReadSize = int64(4 << 10)
|
||||
|
||||
type pieceGetter func(ctx context.Context, offset, size uint64) (io.ReadCloser, error)
|
||||
type pieceGetter func(offset, size uint64) (io.ReadCloser, error)
|
||||
|
||||
type pieceReader struct {
|
||||
ctx context.Context
|
||||
getReader pieceGetter
|
||||
pieceCid cid.Cid
|
||||
len abi.UnpaddedPieceSize
|
||||
@ -53,11 +52,11 @@ type pieceReader struct {
|
||||
// a long sequential read
|
||||
}
|
||||
|
||||
func (p *pieceReader) init() (_ *pieceReader, err error) {
|
||||
stats.Record(p.ctx, metrics.DagStorePRInitCount.M(1))
|
||||
func (p *pieceReader) init(ctx context.Context) (_ *pieceReader, err error) {
|
||||
stats.Record(ctx, metrics.DagStorePRInitCount.M(1))
|
||||
|
||||
p.seqMCtx, _ = tag.New(p.ctx, tag.Upsert(metrics.PRReadType, "seq"))
|
||||
p.atMCtx, _ = tag.New(p.ctx, tag.Upsert(metrics.PRReadType, "rand"))
|
||||
p.seqMCtx, _ = tag.New(ctx, tag.Upsert(metrics.PRReadType, "seq"))
|
||||
p.atMCtx, _ = tag.New(ctx, tag.Upsert(metrics.PRReadType, "rand"))
|
||||
|
||||
p.remReads, err = lru.New[int64, []byte](100)
|
||||
if err != nil {
|
||||
@ -65,7 +64,7 @@ func (p *pieceReader) init() (_ *pieceReader, err error) {
|
||||
}
|
||||
|
||||
p.rAt = 0
|
||||
p.r, err = p.getReader(p.ctx, uint64(p.rAt), uint64(p.len))
|
||||
p.r, err = p.getReader(uint64(p.rAt), uint64(p.len))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -95,9 +94,6 @@ func (p *pieceReader) Close() error {
|
||||
}
|
||||
|
||||
if p.r != nil {
|
||||
if err := p.r.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := p.r.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -177,7 +173,7 @@ func (p *pieceReader) readSeqReader(b []byte) (n int, err error) {
|
||||
}
|
||||
|
||||
p.rAt = off
|
||||
p.r, err = p.getReader(p.ctx, uint64(p.rAt), uint64(p.len))
|
||||
p.r, err = p.getReader(uint64(p.rAt), uint64(p.len))
|
||||
p.br = bufio.NewReaderSize(p.r, ReadBuf)
|
||||
if err != nil {
|
||||
return 0, xerrors.Errorf("getting backing reader: %w", err)
|
||||
@ -288,7 +284,7 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
|
||||
}
|
||||
|
||||
func (p *pieceReader) readInto(b []byte, off int64) (n int, err error) {
|
||||
rd, err := p.getReader(p.ctx, uint64(off), uint64(len(b)))
|
||||
rd, err := p.getReader(uint64(off), uint64(len(b)))
|
||||
if err != nil {
|
||||
return 0, xerrors.Errorf("getting reader: %w", err)
|
||||
}
|
||||
|
@ -8,6 +8,8 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/storage/sealer/fr32"
|
||||
)
|
||||
|
||||
var ErrSectorNotFound = errors.New("sector not found")
|
||||
@ -26,6 +28,14 @@ func (i UnpaddedByteIndex) Valid() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func UnpaddedFloor(n uint64) UnpaddedByteIndex {
|
||||
return UnpaddedByteIndex(n / uint64(fr32.UnpaddedFr32Chunk) * uint64(fr32.UnpaddedFr32Chunk))
|
||||
}
|
||||
|
||||
func UnpaddedCeil(n uint64) UnpaddedByteIndex {
|
||||
return UnpaddedByteIndex((n + uint64(fr32.UnpaddedFr32Chunk-1)) / uint64(fr32.UnpaddedFr32Chunk) * uint64(fr32.UnpaddedFr32Chunk))
|
||||
}
|
||||
|
||||
type PaddedByteIndex uint64
|
||||
|
||||
type RGetter func(ctx context.Context, id abi.SectorID) (sealed cid.Cid, update bool, err error)
|
||||
|
Loading…
Reference in New Issue
Block a user